From edb6c66b6b392d28a2831d1dede4689299f49aaa Mon Sep 17 00:00:00 2001 From: Martin Edenhofer Date: Sat, 9 Jan 2016 13:23:11 +0100 Subject: [PATCH] Added channel streaming feature. --- app/models/channel.rb | 116 +++++++++++++++++++ app/models/channel/driver/twitter.rb | 65 ++++++++++- db/migrate/20160109000001_update_channel2.rb | 15 +++ db/seeds.rb | 9 ++ lib/twitter/streaming/client_custom.rb | 16 +++ lib/twitter/streaming/connection_custom.rb | 21 ++++ test/integration/twitter_browser_test.rb | 36 +++--- test/integration/twitter_test.rb | 101 ++++++++++++++-- 8 files changed, 349 insertions(+), 30 deletions(-) create mode 100644 db/migrate/20160109000001_update_channel2.rb create mode 100644 lib/twitter/streaming/client_custom.rb create mode 100644 lib/twitter/streaming/connection_custom.rb diff --git a/app/models/channel.rb b/app/models/channel.rb index e5535b4cb..4b0384433 100644 --- a/app/models/channel.rb +++ b/app/models/channel.rb @@ -11,6 +11,10 @@ class Channel < ApplicationModel after_update :email_address_check after_destroy :email_address_check + # rubocop:disable Style/ClassVars + @@channel_stream = {} +# rubocop:enable Style/ClassVars + =begin fetch all accounts @@ -55,6 +59,7 @@ fetch one account result = driver_instance.fetch(adapter_options, self) self.status_in = result[:result] self.last_log_in = result[:notice] + preferences[:last_fetch] = Time.zone.now save rescue => e error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}" @@ -62,6 +67,7 @@ fetch one account logger.error e.backtrace self.status_in = 'error' self.last_log_in = error + preferences[:last_fetch] = Time.zone.now save end @@ -69,6 +75,116 @@ fetch one account =begin +stream instance of account + + channel = Channel.where(area: 'Twitter::Account').first + stream_instance = channel.stream_instance + + # start stream + stream_instance.stream + +=end + + def stream_instance + + adapter = options[:adapter] + + begin + + # we need to require each channel backend individually otherwise we get a + # 'warning: toplevel constant Twitter referenced by Channel::Driver::Twitter' error e.g. + # so we have to convert the channel name to the filename via Rails String.underscore + # http://stem.ps/rails/2015/01/25/ruby-gotcha-toplevel-constant-referenced-by.html + require "channel/driver/#{adapter.to_filename}" + + driver_class = Object.const_get("Channel::Driver::#{adapter.to_classname}") + driver_instance = driver_class.new + + # check is stream exists + return if !driver_instance.respond_to?(:stream_instance) + driver_instance.stream_instance(self) + + # set scheduler job to active + + return driver_instance + rescue => e + error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}" + logger.error error + logger.error e.backtrace + self.status_in = 'error' + self.last_log_in = error + save + end + + end + +=begin + +stream all accounts + + Channel.stream + +=end + + def self.stream + Thread.abort_on_exception = true + + last_channels = [] + + loop do + logger.debug 'stream controll loop' + current_channels = [] + channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account') + channels.each {|channel| + next if channel.options[:adapter] != 'twitter' + + current_channels.push channel.id + + # exit it channel has changed + if @@channel_stream[channel.id] && @@channel_stream[channel.id][:updated_at] != channel.updated_at + logger.debug "channel (#{channel.id}) has changed, restart thread" + @@channel_stream[channel.id][:thread].exit + @@channel_stream[channel.id][:thread].join + @@channel_stream[channel.id][:stream_instance].disconnect + @@channel_stream[channel.id] = false + end + + #logger.debug "thread for channel (#{channel.id}) already running" if @@channel_stream[channel.id] + next if @@channel_stream[channel.id] + + @@channel_stream[channel.id] = { + updated_at: channel.updated_at + } + + # start threads for each channel + @@channel_stream[channel.id][:thread] = Thread.new { + logger.debug "Started stream channel for '#{channel.id}' (#{channel.area})..." + @@channel_stream[channel.id][:stream_instance] = channel.stream_instance + @@channel_stream[channel.id][:stream_instance].stream + @@channel_stream[channel.id][:stream_instance].disconnect + @@channel_stream[channel.id] = false + logger.debug " ...stopped thread for '#{channel.id}'" + } + } + + # cleanup deleted channels + last_channels.each {|channel_id| + next if current_channels.include?(channel_id) + logger.debug "channel (#{channel_id}) not longer active, stop thread" + @@channel_stream[channel_id][:thread].exit + @@channel_stream[channel_id][:thread].join + @@channel_stream[channel_id][:stream_instance].disconnect + @@channel_stream[channel_id] = false + } + last_channels = current_channels + + sleep 30 + end + + end + +=begin + send via account channel = Channel.where(area: 'Email::Account').first diff --git a/app/models/channel/driver/twitter.rb b/app/models/channel/driver/twitter.rb index 4ad84a193..4e2d6a872 100644 --- a/app/models/channel/driver/twitter.rb +++ b/app/models/channel/driver/twitter.rb @@ -49,7 +49,15 @@ class Channel::Driver::Twitter options = check_external_credential(options) - # check if stream scheduler is already running and return + # only fetch once a hour + if Rails.env.production? || Rails.env.development? + if channel.preferences && channel.preferences[:last_fetch] && channel.preferences[:last_fetch] > Time.zone.now - 1.hour + return { + result: 'ok', + notice: '', + } + end + end @rest_client = TweetRest.new(options[:auth]) @sync = options[:sync] @@ -108,12 +116,65 @@ class Channel::Driver::Twitter @rest_client.disconnect if @rest_client end +=begin + +create stream endpoint form twitter account + + options = { + adapter: 'twitter', + auth: { + consumer_key: consumer_key, + consumer_secret: consumer_secret, + oauth_token: armin_theo_token, + oauth_token_secret: armin_theo_token_secret, + }, + sync: { + search: [ + { + term: '#citheo42', + group_id: 2, + }, + { + term: '#citheo24', + group_id: 1, + }, + ], + mentions: { + group_id: 2, + }, + direct_messages: { + group_id: 2, + } + } + } + + instance = Channel::Driver::Twitter.new + stream_instance = instance.stream_instance(channel) + +returns + + instance_of_stream_handle + +=end + def stream_instance(channel) @channel = channel options = @channel.options @stream_client = TweetStream.new(options[:auth]) end +=begin + +stream tweets from twitter account + + stream_instance.stream + +returns + + # endless loop + +=end + def stream hashtags = [] @channel.options['sync']['search'].each {|item| @@ -133,7 +194,7 @@ class Channel::Driver::Twitter # check direct message if tweet.class == Twitter::DirectMessage if @channel.options['sync']['direct_messages']['group_id'] != '' - next if direct_message_limit_reached(tweet) + next if @stream_client.direct_message_limit_reached(tweet) @stream_client.to_group(tweet, @channel.options['sync']['direct_messages']['group_id'], @channel) end next diff --git a/db/migrate/20160109000001_update_channel2.rb b/db/migrate/20160109000001_update_channel2.rb new file mode 100644 index 000000000..b1fb18742 --- /dev/null +++ b/db/migrate/20160109000001_update_channel2.rb @@ -0,0 +1,15 @@ +class UpdateChannel2 < ActiveRecord::Migration + def up + + Scheduler.create_or_update( + name: 'Check streams for Channel ', + method: 'Channel.stream', + period: 60, + prio: 1, + active: true, + updated_by_id: 1, + created_by_id: 1, + ) + + end +end diff --git a/db/seeds.rb b/db/seeds.rb index a760854c2..35f34ad27 100644 --- a/db/seeds.rb +++ b/db/seeds.rb @@ -3291,6 +3291,15 @@ Scheduler.create_if_not_exists( updated_by_id: 1, created_by_id: 1, ) +Scheduler.create_if_not_exists( + name: 'Check streams for Channel ', + method: 'Channel.stream', + period: 60, + prio: 1, + active: true, + updated_by_id: 1, + created_by_id: 1, +) Scheduler.create_if_not_exists( name: 'Generate Session data', method: 'Sessions.jobs', diff --git a/lib/twitter/streaming/client_custom.rb b/lib/twitter/streaming/client_custom.rb new file mode 100644 index 000000000..57a13a920 --- /dev/null +++ b/lib/twitter/streaming/client_custom.rb @@ -0,0 +1,16 @@ +# file is based on Twitter::Streaming::Client, needed to get custom_connection_handle +# to close connection after config has changed +require 'twitter/streaming/connection_custom' + +class Twitter::Streaming::ClientCustom < Twitter::Streaming::Client + + def initialize(options = {}) + super + @connection = Twitter::Streaming::ConnectionCustom.new(options) + end + + def custom_connection_handle + @connection.custom_connection_handle + end + +end diff --git a/lib/twitter/streaming/connection_custom.rb b/lib/twitter/streaming/connection_custom.rb new file mode 100644 index 000000000..6b90480f6 --- /dev/null +++ b/lib/twitter/streaming/connection_custom.rb @@ -0,0 +1,21 @@ +# file is based on Twitter::Streaming::Connection, needed to get custom_connection_handle +# to close connection after config has changed +class Twitter::Streaming::ConnectionCustom < Twitter::Streaming::Connection + + def stream(request, response) + client_context = OpenSSL::SSL::SSLContext.new + @client = @tcp_socket_class.new(Resolv.getaddress(request.uri.host), request.uri.port) + ssl_client = @ssl_socket_class.new(@client, client_context) + + ssl_client.connect + request.stream(ssl_client) + while body = ssl_client.readpartial(1024) # rubocop:disable AssignmentInCondition + response << body + end + end + + def custom_connection_handle + @client + end + +end diff --git a/test/integration/twitter_browser_test.rb b/test/integration/twitter_browser_test.rb index 6593c2b9a..f4f261962 100644 --- a/test/integration/twitter_browser_test.rb +++ b/test/integration/twitter_browser_test.rb @@ -5,34 +5,34 @@ class TwitterBrowserTest < TestCase def test_add_config # app config - if !ENV['TWITTER_CONSUMER_KEY'] - fail "ERROR: Need TWITTER_CONSUMER_KEY - hint TWITTER_CONSUMER_KEY='1234'" + if !ENV['TWITTER_BT_CONSUMER_KEY'] + fail "ERROR: Need TWITTER_BT_CONSUMER_KEY - hint TWITTER_BT_CONSUMER_KEY='1234'" end - consumer_key = ENV['TWITTER_CONSUMER_KEY'] - if !ENV['TWITTER_CONSUMER_SECRET'] - fail "ERROR: Need TWITTER_CONSUMER_SECRET - hint TWITTER_CONSUMER_SECRET='1234'" + consumer_key = ENV['TWITTER_BT_CONSUMER_KEY'] + if !ENV['TWITTER_BT_CONSUMER_SECRET'] + fail "ERROR: Need TWITTER_BT_CONSUMER_SECRET - hint TWITTER_BT_CONSUMER_SECRET='1234'" end - consumer_secret = ENV['TWITTER_CONSUMER_SECRET'] + consumer_secret = ENV['TWITTER_BT_CONSUMER_SECRET'] - if !ENV['TWITTER_USER_LOGIN'] - fail "ERROR: Need TWITTER_USER_LOGIN - hint TWITTER_USER_LOGIN='1234'" + if !ENV['TWITTER_BT_USER_LOGIN'] + fail "ERROR: Need TWITTER_BT_USER_LOGIN - hint TWITTER_BT_USER_LOGIN='1234'" end - twitter_user_login = ENV['TWITTER_USER_LOGIN'] + twitter_user_login = ENV['TWITTER_BT_USER_LOGIN'] - if !ENV['TWITTER_USER_PW'] - fail "ERROR: Need TWITTER_USER_PW - hint TWITTER_USER_PW='1234'" + if !ENV['TWITTER_BT_USER_PW'] + fail "ERROR: Need TWITTER_BT_USER_PW - hint TWITTER_BT_USER_PW='1234'" end - twitter_user_pw = ENV['TWITTER_USER_PW'] + twitter_user_pw = ENV['TWITTER_BT_USER_PW'] - if !ENV['TWITTER_CUSTOMER_TOKEN'] - fail "ERROR: Need TWITTER_CUSTOMER_TOKEN - hint TWITTER_CUSTOMER_TOKEN='1234'" + if !ENV['TWITTER_BT_CUSTOMER_TOKEN'] + fail "ERROR: Need TWITTER_BT_CUSTOMER_TOKEN - hint TWITTER_BT_CUSTOMER_TOKEN='1234'" end - twitter_customer_token = ENV['TWITTER_CUSTOMER_TOKEN'] + twitter_customer_token = ENV['TWITTER_BT_CUSTOMER_TOKEN'] - if !ENV['TWITTER_CUSTOMER_TOKEN_SECRET'] - fail "ERROR: Need TWITTER_CUSTOMER_TOKEN_SECRET - hint TWITTER_CUSTOMER_TOKEN_SECRET='1234'" + if !ENV['TWITTER_BT_CUSTOMER_TOKEN_SECRET'] + fail "ERROR: Need TWITTER_BT_CUSTOMER_TOKEN_SECRET - hint TWITTER_BT_CUSTOMER_TOKEN_SECRET='1234'" end - twitter_customer_token_secret = ENV['TWITTER_CUSTOMER_TOKEN_SECRET'] + twitter_customer_token_secret = ENV['TWITTER_BT_CUSTOMER_TOKEN_SECRET'] hash = "#sweetcheck#{rand(99_999)}" diff --git a/test/integration/twitter_test.rb b/test/integration/twitter_test.rb index 4108a596f..7680eb2c6 100644 --- a/test/integration/twitter_test.rb +++ b/test/integration/twitter_test.rb @@ -16,14 +16,14 @@ class TwitterTest < ActiveSupport::TestCase ) # app config - if !ENV['TWITTER_APP_CONSUMER_KEY'] - fail "ERROR: Need TWITTER_APP_CONSUMER_KEY - hint TWITTER_APP_CONSUMER_KEY='1234'" + if !ENV['TWITTER_CONSUMER_KEY'] + fail "ERROR: Need TWITTER_CONSUMER_KEY - hint TWITTER_CONSUMER_KEY='1234'" end - if !ENV['TWITTER_APP_CONSUMER_SECRET'] - fail "ERROR: Need TWITTER_APP_CONSUMER_SECRET - hint TWITTER_APP_CONSUMER_SECRET='1234'" + if !ENV['TWITTER_CONSUMER_SECRET'] + fail "ERROR: Need TWITTER_CONSUMER_SECRET - hint TWITTER_CONSUMER_SECRET='1234'" end - consumer_key = ENV['TWITTER_APP_CONSUMER_KEY'] - consumer_secret = ENV['TWITTER_APP_CONSUMER_SECRET'] + consumer_key = ENV['TWITTER_CONSUMER_KEY'] + consumer_secret = ENV['TWITTER_CONSUMER_SECRET'] # armin_theo (is system and is following marion_bauer) if !ENV['TWITTER_SYSTEM_TOKEN'] @@ -35,15 +35,15 @@ class TwitterTest < ActiveSupport::TestCase armin_theo_token = ENV['TWITTER_SYSTEM_TOKEN'] armin_theo_token_secret = ENV['TWITTER_SYSTEM_TOKEN_SECRET'] - # me_bauer (is following armin_theo) + # me_bauer (is customer and is following armin_theo) if !ENV['TWITTER_CUSTOMER_TOKEN'] fail "ERROR: Need CUSTOMER_TOKEN - hint TWITTER_CUSTOMER_TOKEN='1234'" end - if !ENV['TWITTER_CUSTOMER_TOKEN_SECREET'] - fail "ERROR: Need CUSTOMER_TOKEN_SECREET - hint TWITTER_CUSTOMER_TOKEN_SECREET='1234'" + if !ENV['TWITTER_CUSTOMER_TOKEN_SECRET'] + fail "ERROR: Need CUSTOMER_TOKEN_SECRET - hint TWITTER_CUSTOMER_TOKEN_SECRET='1234'" end me_bauer_token = ENV['TWITTER_CUSTOMER_TOKEN'] - me_bauer_token_secret = ENV['TWITTER_CUSTOMER_TOKEN_SECREET'] + me_bauer_token_secret = ENV['TWITTER_CUSTOMER_TOKEN_SECRET'] # add channel current = Channel.where(area: 'Twitter::Account') @@ -365,4 +365,85 @@ class TwitterTest < ActiveSupport::TestCase assert_equal('ok', channel.status_in) end + test 'd streaming test' do + Thread.new { + Channel.stream + } + sleep 10 + + # new tweet I - by me_bauer + client = Twitter::REST::Client.new do |config| + config.consumer_key = consumer_key + config.consumer_secret = consumer_secret + config.access_token = me_bauer_token + config.access_token_secret = me_bauer_token_secret + end + hash = '#citheo24 #' + rand(9999).to_s + text = "Today... #{hash}" + tweet = client.update( + text, + ) + sleep 10 + article = nil + (1..2).each { + article = Ticket::Article.find_by(message_id: tweet.id) + break if article + sleep 10 + } + assert(article) + assert_equal('@me_bauer', article.from, 'ticket article from') + assert_equal(nil, article.to, 'ticket article to') + + # new tweet II - by me_bauer + client = Twitter::REST::Client.new do |config| + config.consumer_key = consumer_key + config.consumer_secret = consumer_secret + config.access_token = me_bauer_token + config.access_token_secret = me_bauer_token_secret + end + hash = '#citheo24 #' + rand(9999).to_s + text = "Today...2 #{hash}" + tweet = client.update( + text, + ) + ActiveRecord::Base.connection.reconnect! + sleep 10 + article = nil + (1..2).each { + article = Ticket::Article.find_by(message_id: tweet.id) + break if article + sleep 10 + } + assert(article) + assert_equal('@me_bauer', article.from, 'ticket article from') + assert_equal(nil, article.to, 'ticket article to') + + # get dm via stream + client = Twitter::REST::Client.new( + consumer_key: consumer_key, + consumer_secret: consumer_secret, + access_token: me_bauer_token, + access_token_secret: me_bauer_token_secret + ) + hash = '#citheo44' + rand(9999).to_s + text = 'How about the details? ' + hash + dm = client.create_direct_message( + 'armin_theo', + text, + ) + assert(dm, "dm with ##{hash} created") + #ActiveRecord::Base.connection.reconnect! + sleep 10 + article = nil + (1..2).each { + article = Ticket::Article.find_by(message_id: dm.id) + break if article + sleep 10 + } + assert(article, "inbound article '#{text}' created") + assert_equal('@me_bauer', article.from, 'ticket article from') + assert_equal('@armin_theo', article.to, 'ticket article to') + + end + end