Added channel streaming feature.

This commit is contained in:
Martin Edenhofer 2016-01-09 13:23:11 +01:00
parent c60ceb66ba
commit edb6c66b6b
8 changed files with 349 additions and 30 deletions

View file

@ -11,6 +11,10 @@ class Channel < ApplicationModel
after_update :email_address_check
after_destroy :email_address_check
# rubocop:disable Style/ClassVars
@@channel_stream = {}
# rubocop:enable Style/ClassVars
=begin
fetch all accounts
@ -55,6 +59,7 @@ fetch one account
result = driver_instance.fetch(adapter_options, self)
self.status_in = result[:result]
self.last_log_in = result[:notice]
preferences[:last_fetch] = Time.zone.now
save
rescue => e
error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
@ -62,6 +67,7 @@ fetch one account
logger.error e.backtrace
self.status_in = 'error'
self.last_log_in = error
preferences[:last_fetch] = Time.zone.now
save
end
@ -69,6 +75,116 @@ fetch one account
=begin
stream instance of account
channel = Channel.where(area: 'Twitter::Account').first
stream_instance = channel.stream_instance
# start stream
stream_instance.stream
=end
def stream_instance
adapter = options[:adapter]
begin
# we need to require each channel backend individually otherwise we get a
# 'warning: toplevel constant Twitter referenced by Channel::Driver::Twitter' error e.g.
# so we have to convert the channel name to the filename via Rails String.underscore
# http://stem.ps/rails/2015/01/25/ruby-gotcha-toplevel-constant-referenced-by.html
require "channel/driver/#{adapter.to_filename}"
driver_class = Object.const_get("Channel::Driver::#{adapter.to_classname}")
driver_instance = driver_class.new
# check is stream exists
return if !driver_instance.respond_to?(:stream_instance)
driver_instance.stream_instance(self)
# set scheduler job to active
return driver_instance
rescue => e
error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
logger.error error
logger.error e.backtrace
self.status_in = 'error'
self.last_log_in = error
save
end
end
=begin
stream all accounts
Channel.stream
=end
def self.stream
Thread.abort_on_exception = true
last_channels = []
loop do
logger.debug 'stream controll loop'
current_channels = []
channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
channels.each {|channel|
next if channel.options[:adapter] != 'twitter'
current_channels.push channel.id
# exit it channel has changed
if @@channel_stream[channel.id] && @@channel_stream[channel.id][:updated_at] != channel.updated_at
logger.debug "channel (#{channel.id}) has changed, restart thread"
@@channel_stream[channel.id][:thread].exit
@@channel_stream[channel.id][:thread].join
@@channel_stream[channel.id][:stream_instance].disconnect
@@channel_stream[channel.id] = false
end
#logger.debug "thread for channel (#{channel.id}) already running" if @@channel_stream[channel.id]
next if @@channel_stream[channel.id]
@@channel_stream[channel.id] = {
updated_at: channel.updated_at
}
# start threads for each channel
@@channel_stream[channel.id][:thread] = Thread.new {
logger.debug "Started stream channel for '#{channel.id}' (#{channel.area})..."
@@channel_stream[channel.id][:stream_instance] = channel.stream_instance
@@channel_stream[channel.id][:stream_instance].stream
@@channel_stream[channel.id][:stream_instance].disconnect
@@channel_stream[channel.id] = false
logger.debug " ...stopped thread for '#{channel.id}'"
}
}
# cleanup deleted channels
last_channels.each {|channel_id|
next if current_channels.include?(channel_id)
logger.debug "channel (#{channel_id}) not longer active, stop thread"
@@channel_stream[channel_id][:thread].exit
@@channel_stream[channel_id][:thread].join
@@channel_stream[channel_id][:stream_instance].disconnect
@@channel_stream[channel_id] = false
}
last_channels = current_channels
sleep 30
end
end
=begin
send via account
channel = Channel.where(area: 'Email::Account').first

View file

@ -49,7 +49,15 @@ class Channel::Driver::Twitter
options = check_external_credential(options)
# check if stream scheduler is already running and return
# only fetch once a hour
if Rails.env.production? || Rails.env.development?
if channel.preferences && channel.preferences[:last_fetch] && channel.preferences[:last_fetch] > Time.zone.now - 1.hour
return {
result: 'ok',
notice: '',
}
end
end
@rest_client = TweetRest.new(options[:auth])
@sync = options[:sync]
@ -108,12 +116,65 @@ class Channel::Driver::Twitter
@rest_client.disconnect if @rest_client
end
=begin
create stream endpoint form twitter account
options = {
adapter: 'twitter',
auth: {
consumer_key: consumer_key,
consumer_secret: consumer_secret,
oauth_token: armin_theo_token,
oauth_token_secret: armin_theo_token_secret,
},
sync: {
search: [
{
term: '#citheo42',
group_id: 2,
},
{
term: '#citheo24',
group_id: 1,
},
],
mentions: {
group_id: 2,
},
direct_messages: {
group_id: 2,
}
}
}
instance = Channel::Driver::Twitter.new
stream_instance = instance.stream_instance(channel)
returns
instance_of_stream_handle
=end
def stream_instance(channel)
@channel = channel
options = @channel.options
@stream_client = TweetStream.new(options[:auth])
end
=begin
stream tweets from twitter account
stream_instance.stream
returns
# endless loop
=end
def stream
hashtags = []
@channel.options['sync']['search'].each {|item|
@ -133,7 +194,7 @@ class Channel::Driver::Twitter
# check direct message
if tweet.class == Twitter::DirectMessage
if @channel.options['sync']['direct_messages']['group_id'] != ''
next if direct_message_limit_reached(tweet)
next if @stream_client.direct_message_limit_reached(tweet)
@stream_client.to_group(tweet, @channel.options['sync']['direct_messages']['group_id'], @channel)
end
next

View file

@ -0,0 +1,15 @@
class UpdateChannel2 < ActiveRecord::Migration
def up
Scheduler.create_or_update(
name: 'Check streams for Channel ',
method: 'Channel.stream',
period: 60,
prio: 1,
active: true,
updated_by_id: 1,
created_by_id: 1,
)
end
end

View file

@ -3291,6 +3291,15 @@ Scheduler.create_if_not_exists(
updated_by_id: 1,
created_by_id: 1,
)
Scheduler.create_if_not_exists(
name: 'Check streams for Channel ',
method: 'Channel.stream',
period: 60,
prio: 1,
active: true,
updated_by_id: 1,
created_by_id: 1,
)
Scheduler.create_if_not_exists(
name: 'Generate Session data',
method: 'Sessions.jobs',

View file

@ -0,0 +1,16 @@
# file is based on Twitter::Streaming::Client, needed to get custom_connection_handle
# to close connection after config has changed
require 'twitter/streaming/connection_custom'
class Twitter::Streaming::ClientCustom < Twitter::Streaming::Client
def initialize(options = {})
super
@connection = Twitter::Streaming::ConnectionCustom.new(options)
end
def custom_connection_handle
@connection.custom_connection_handle
end
end

View file

@ -0,0 +1,21 @@
# file is based on Twitter::Streaming::Connection, needed to get custom_connection_handle
# to close connection after config has changed
class Twitter::Streaming::ConnectionCustom < Twitter::Streaming::Connection
def stream(request, response)
client_context = OpenSSL::SSL::SSLContext.new
@client = @tcp_socket_class.new(Resolv.getaddress(request.uri.host), request.uri.port)
ssl_client = @ssl_socket_class.new(@client, client_context)
ssl_client.connect
request.stream(ssl_client)
while body = ssl_client.readpartial(1024) # rubocop:disable AssignmentInCondition
response << body
end
end
def custom_connection_handle
@client
end
end

View file

@ -5,34 +5,34 @@ class TwitterBrowserTest < TestCase
def test_add_config
# app config
if !ENV['TWITTER_CONSUMER_KEY']
fail "ERROR: Need TWITTER_CONSUMER_KEY - hint TWITTER_CONSUMER_KEY='1234'"
if !ENV['TWITTER_BT_CONSUMER_KEY']
fail "ERROR: Need TWITTER_BT_CONSUMER_KEY - hint TWITTER_BT_CONSUMER_KEY='1234'"
end
consumer_key = ENV['TWITTER_CONSUMER_KEY']
if !ENV['TWITTER_CONSUMER_SECRET']
fail "ERROR: Need TWITTER_CONSUMER_SECRET - hint TWITTER_CONSUMER_SECRET='1234'"
consumer_key = ENV['TWITTER_BT_CONSUMER_KEY']
if !ENV['TWITTER_BT_CONSUMER_SECRET']
fail "ERROR: Need TWITTER_BT_CONSUMER_SECRET - hint TWITTER_BT_CONSUMER_SECRET='1234'"
end
consumer_secret = ENV['TWITTER_CONSUMER_SECRET']
consumer_secret = ENV['TWITTER_BT_CONSUMER_SECRET']
if !ENV['TWITTER_USER_LOGIN']
fail "ERROR: Need TWITTER_USER_LOGIN - hint TWITTER_USER_LOGIN='1234'"
if !ENV['TWITTER_BT_USER_LOGIN']
fail "ERROR: Need TWITTER_BT_USER_LOGIN - hint TWITTER_BT_USER_LOGIN='1234'"
end
twitter_user_login = ENV['TWITTER_USER_LOGIN']
twitter_user_login = ENV['TWITTER_BT_USER_LOGIN']
if !ENV['TWITTER_USER_PW']
fail "ERROR: Need TWITTER_USER_PW - hint TWITTER_USER_PW='1234'"
if !ENV['TWITTER_BT_USER_PW']
fail "ERROR: Need TWITTER_BT_USER_PW - hint TWITTER_BT_USER_PW='1234'"
end
twitter_user_pw = ENV['TWITTER_USER_PW']
twitter_user_pw = ENV['TWITTER_BT_USER_PW']
if !ENV['TWITTER_CUSTOMER_TOKEN']
fail "ERROR: Need TWITTER_CUSTOMER_TOKEN - hint TWITTER_CUSTOMER_TOKEN='1234'"
if !ENV['TWITTER_BT_CUSTOMER_TOKEN']
fail "ERROR: Need TWITTER_BT_CUSTOMER_TOKEN - hint TWITTER_BT_CUSTOMER_TOKEN='1234'"
end
twitter_customer_token = ENV['TWITTER_CUSTOMER_TOKEN']
twitter_customer_token = ENV['TWITTER_BT_CUSTOMER_TOKEN']
if !ENV['TWITTER_CUSTOMER_TOKEN_SECRET']
fail "ERROR: Need TWITTER_CUSTOMER_TOKEN_SECRET - hint TWITTER_CUSTOMER_TOKEN_SECRET='1234'"
if !ENV['TWITTER_BT_CUSTOMER_TOKEN_SECRET']
fail "ERROR: Need TWITTER_BT_CUSTOMER_TOKEN_SECRET - hint TWITTER_BT_CUSTOMER_TOKEN_SECRET='1234'"
end
twitter_customer_token_secret = ENV['TWITTER_CUSTOMER_TOKEN_SECRET']
twitter_customer_token_secret = ENV['TWITTER_BT_CUSTOMER_TOKEN_SECRET']
hash = "#sweetcheck#{rand(99_999)}"

View file

@ -16,14 +16,14 @@ class TwitterTest < ActiveSupport::TestCase
)
# app config
if !ENV['TWITTER_APP_CONSUMER_KEY']
fail "ERROR: Need TWITTER_APP_CONSUMER_KEY - hint TWITTER_APP_CONSUMER_KEY='1234'"
if !ENV['TWITTER_CONSUMER_KEY']
fail "ERROR: Need TWITTER_CONSUMER_KEY - hint TWITTER_CONSUMER_KEY='1234'"
end
if !ENV['TWITTER_APP_CONSUMER_SECRET']
fail "ERROR: Need TWITTER_APP_CONSUMER_SECRET - hint TWITTER_APP_CONSUMER_SECRET='1234'"
if !ENV['TWITTER_CONSUMER_SECRET']
fail "ERROR: Need TWITTER_CONSUMER_SECRET - hint TWITTER_CONSUMER_SECRET='1234'"
end
consumer_key = ENV['TWITTER_APP_CONSUMER_KEY']
consumer_secret = ENV['TWITTER_APP_CONSUMER_SECRET']
consumer_key = ENV['TWITTER_CONSUMER_KEY']
consumer_secret = ENV['TWITTER_CONSUMER_SECRET']
# armin_theo (is system and is following marion_bauer)
if !ENV['TWITTER_SYSTEM_TOKEN']
@ -35,15 +35,15 @@ class TwitterTest < ActiveSupport::TestCase
armin_theo_token = ENV['TWITTER_SYSTEM_TOKEN']
armin_theo_token_secret = ENV['TWITTER_SYSTEM_TOKEN_SECRET']
# me_bauer (is following armin_theo)
# me_bauer (is customer and is following armin_theo)
if !ENV['TWITTER_CUSTOMER_TOKEN']
fail "ERROR: Need CUSTOMER_TOKEN - hint TWITTER_CUSTOMER_TOKEN='1234'"
end
if !ENV['TWITTER_CUSTOMER_TOKEN_SECREET']
fail "ERROR: Need CUSTOMER_TOKEN_SECREET - hint TWITTER_CUSTOMER_TOKEN_SECREET='1234'"
if !ENV['TWITTER_CUSTOMER_TOKEN_SECRET']
fail "ERROR: Need CUSTOMER_TOKEN_SECRET - hint TWITTER_CUSTOMER_TOKEN_SECRET='1234'"
end
me_bauer_token = ENV['TWITTER_CUSTOMER_TOKEN']
me_bauer_token_secret = ENV['TWITTER_CUSTOMER_TOKEN_SECREET']
me_bauer_token_secret = ENV['TWITTER_CUSTOMER_TOKEN_SECRET']
# add channel
current = Channel.where(area: 'Twitter::Account')
@ -365,4 +365,85 @@ class TwitterTest < ActiveSupport::TestCase
assert_equal('ok', channel.status_in)
end
test 'd streaming test' do
Thread.new {
Channel.stream
}
sleep 10
# new tweet I - by me_bauer
client = Twitter::REST::Client.new do |config|
config.consumer_key = consumer_key
config.consumer_secret = consumer_secret
config.access_token = me_bauer_token
config.access_token_secret = me_bauer_token_secret
end
hash = '#citheo24 #' + rand(9999).to_s
text = "Today... #{hash}"
tweet = client.update(
text,
)
sleep 10
article = nil
(1..2).each {
article = Ticket::Article.find_by(message_id: tweet.id)
break if article
sleep 10
}
assert(article)
assert_equal('@me_bauer', article.from, 'ticket article from')
assert_equal(nil, article.to, 'ticket article to')
# new tweet II - by me_bauer
client = Twitter::REST::Client.new do |config|
config.consumer_key = consumer_key
config.consumer_secret = consumer_secret
config.access_token = me_bauer_token
config.access_token_secret = me_bauer_token_secret
end
hash = '#citheo24 #' + rand(9999).to_s
text = "Today...2 #{hash}"
tweet = client.update(
text,
)
ActiveRecord::Base.connection.reconnect!
sleep 10
article = nil
(1..2).each {
article = Ticket::Article.find_by(message_id: tweet.id)
break if article
sleep 10
}
assert(article)
assert_equal('@me_bauer', article.from, 'ticket article from')
assert_equal(nil, article.to, 'ticket article to')
# get dm via stream
client = Twitter::REST::Client.new(
consumer_key: consumer_key,
consumer_secret: consumer_secret,
access_token: me_bauer_token,
access_token_secret: me_bauer_token_secret
)
hash = '#citheo44' + rand(9999).to_s
text = 'How about the details? ' + hash
dm = client.create_direct_message(
'armin_theo',
text,
)
assert(dm, "dm with ##{hash} created")
#ActiveRecord::Base.connection.reconnect!
sleep 10
article = nil
(1..2).each {
article = Ticket::Article.find_by(message_id: dm.id)
break if article
sleep 10
}
assert(article, "inbound article '#{text}' created")
assert_equal('@me_bauer', article.from, 'ticket article from')
assert_equal('@armin_theo', article.to, 'ticket article to')
end
end