From 8cef58b4da9aae9d501da17368b5e992dc47f227 Mon Sep 17 00:00:00 2001 From: Martin Edenhofer Date: Thu, 1 Jun 2017 08:23:53 +0200 Subject: [PATCH] Improved streaming handling (fetch parent tweets via REST). Improved auto reconnect to stream if channel config has changed. Improved max import for streaming (already reached max import on initial config - took 15 min. to import first tweet). Changed fallback REST tweet search from 30 to 20 minutes. --- app/models/channel.rb | 66 ++++++++++++--------- app/models/channel/driver/twitter.rb | 54 +++++++++++++++-- lib/tweet_base.rb | 51 ++++++++-------- lib/tweet_stream.rb | 1 + test/integration/twitter_browser_test.rb | 3 +- test/integration/twitter_test.rb | 75 ++++++++++++++++++++---- 6 files changed, 181 insertions(+), 69 deletions(-) diff --git a/app/models/channel.rb b/app/models/channel.rb index b56a8e68e..2decd642e 100644 --- a/app/models/channel.rb +++ b/app/models/channel.rb @@ -132,45 +132,57 @@ stream all accounts def self.stream Thread.abort_on_exception = true + auto_reconnect_after = 25 last_channels = [] loop do logger.debug 'stream controll loop' + current_channels = [] channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account') channels.each { |channel| next if channel.options[:adapter] != 'twitter' + channel_id = channel.id.to_s + current_channels.push channel_id - current_channels.push channel.id - - # exit it channel has changed - if @@channel_stream[channel.id] && @@channel_stream[channel.id][:updated_at] != channel.updated_at - logger.debug "channel (#{channel.id}) has changed, restart thread" - @@channel_stream[channel.id][:thread].exit - @@channel_stream[channel.id][:thread].join - @@channel_stream[channel.id][:stream_instance].disconnect - @@channel_stream[channel.id] = false + # exit it channel has changed or connection is older then 25 min. + if @@channel_stream[channel_id] + if @@channel_stream[channel_id][:updated_at] != channel.updated_at + logger.info "channel (#{channel.id}) has changed, stop thread" + @@channel_stream[channel_id][:thread].exit + @@channel_stream[channel_id][:thread].join + @@channel_stream[channel_id][:stream_instance].disconnect + @@channel_stream[channel_id] = false + elsif @@channel_stream[channel_id][:started_at] && @@channel_stream[channel_id][:started_at] < Time.zone.now - auto_reconnect_after.minutes + logger.info "channel (#{channel.id}) reconnect - thread is older then #{auto_reconnect_after} minutes, restart thread" + @@channel_stream[channel_id][:thread].exit + @@channel_stream[channel_id][:thread].join + @@channel_stream[channel_id][:stream_instance].disconnect + @@channel_stream[channel_id] = false + end end - #logger.debug "thread for channel (#{channel.id}) already running" if @@channel_stream[channel.id] - next if @@channel_stream[channel.id] + #logger.debug "thread for channel (#{channel.id}) already running" if channel_stream + next if @@channel_stream[channel_id] - @@channel_stream[channel.id] = { - updated_at: channel.updated_at + @@channel_stream[channel_id] = { + updated_at: channel.updated_at, + started_at: Time.zone.now, } # start channels with delay sleep @@channel_stream.count # start threads for each channel - @@channel_stream[channel.id][:thread] = Thread.new { + @@channel_stream[channel_id][:thread] = Thread.new { begin logger.info "Started stream channel for '#{channel.id}' (#{channel.area})..." - @@channel_stream[channel.id][:stream_instance] = channel.stream_instance - @@channel_stream[channel.id][:stream_instance].stream - @@channel_stream[channel.id][:stream_instance].disconnect - @@channel_stream[channel.id] = false - logger.debug " ...stopped thread for '#{channel.id}'" + @@channel_stream[channel_id] ||= {} + @@channel_stream[channel_id][:stream_instance] = channel.stream_instance + @@channel_stream[channel_id][:stream_instance].stream + @@channel_stream[channel_id][:stream_instance].disconnect + @@channel_stream[channel_id] = false + logger.info " ...stopped thread for '#{channel.id}'" rescue => e error = "Can't use channel (#{channel.id}): #{e.inspect}" logger.error error @@ -178,24 +190,24 @@ stream all accounts channel.status_in = 'error' channel.last_log_in = error channel.save - @@channel_stream[channel.id] = false + @@channel_stream[channel_id] = false end } } # cleanup deleted channels last_channels.each { |channel_id| - next if !@@channel_stream[channel_id] + next if !@@channel_stream[channel_id.to_s] next if current_channels.include?(channel_id) - logger.debug "channel (#{channel_id}) not longer active, stop thread" - @@channel_stream[channel_id][:thread].exit - @@channel_stream[channel_id][:thread].join - @@channel_stream[channel_id][:stream_instance].disconnect - @@channel_stream[channel_id] = false + logger.info "channel (#{channel_id}) not longer active, stop thread" + @@channel_stream[channel_id.to_s][:thread].exit + @@channel_stream[channel_id.to_s][:thread].join + @@channel_stream[channel_id.to_s][:stream_instance].disconnect + @@channel_stream[channel_id.to_s] = false } last_channels = current_channels - sleep 30 + sleep 20 end end diff --git a/app/models/channel/driver/twitter.rb b/app/models/channel/driver/twitter.rb index 349b76031..56d7146bb 100644 --- a/app/models/channel/driver/twitter.rb +++ b/app/models/channel/driver/twitter.rb @@ -82,7 +82,7 @@ returns # only fetch once in 30 minutes return true if !channel.preferences return true if !channel.preferences[:last_fetch] - return false if channel.preferences[:last_fetch] > Time.zone.now - 30.minutes + return false if channel.preferences[:last_fetch] > Time.zone.now - 20.minutes true end @@ -183,6 +183,24 @@ returns =end def stream + sleep_on_unauthorized = 61 + 2.times { |loop_count| + begin + stream_start + rescue Twitter::Error::Unauthorized => e + Rails.logger.info "Unable to stream, try #{loop_count}, error #{e.inspect}" + if loop_count < 2 + Rails.logger.info "wait for #{sleep_on_unauthorized} sec. and try it again" + sleep sleep_on_unauthorized + else + raise "Unable to stream, try #{loop_count}, error #{e.inspect}" + end + end + } + end + + def stream_start + sync = @channel.options['sync'] raise 'Need channel.options[\'sync\'] for account, but no params found' if !sync @@ -204,20 +222,21 @@ returns next if tweet.class != Twitter::Tweet && tweet.class != Twitter::DirectMessage # wait until own posts are stored in local database to prevent importing own tweets - sleep 4 + next if @stream_client.locale_sender?(tweet) && own_tweet_already_imported?(tweet) + next if Ticket::Article.find_by(message_id: tweet.id) # check direct message if tweet.class == Twitter::DirectMessage if sync['direct_messages'] && sync['direct_messages']['group_id'] != '' - next if @stream_client.direct_message_limit_reached(tweet) + next if @stream_client.direct_message_limit_reached(tweet, 2) @stream_client.to_group(tweet, sync['direct_messages']['group_id'], @channel) end next end next if !track_retweets? && tweet.retweet? - next if @stream_client.tweet_limit_reached(tweet) + next if @stream_client.tweet_limit_reached(tweet, 2) # check if it's mention if sync['mentions'] && sync['mentions']['group_id'] != '' @@ -290,6 +309,9 @@ returns Rails.logger.debug "tweet to old: #{tweet.id}/#{tweet.created_at}" next end + + next if @rest_client.locale_sender?(tweet) && own_tweet_already_imported?(tweet) + next if Ticket::Article.find_by(message_id: tweet.id) break if @rest_client.tweet_limit_reached(tweet) @rest_client.to_group(tweet, search[:group_id], @channel) @@ -351,4 +373,28 @@ returns def track_retweets? @channel.options && @channel.options['sync'] && @channel.options['sync']['track_retweets'] end + + def own_tweet_already_imported?(tweet) + event_time = Time.zone.now + sleep 4 + 12.times { |loop_count| + if Ticket::Article.find_by(message_id: tweet.id) + Rails.logger.debug "Own tweet already imported, skipping tweet #{tweet.id}" + return true + end + count = Delayed::Job.where('created_at < ?', event_time).count + break if count.zero? + sleep_time = 2 * count + sleep_time = 5 if sleep_time > 5 + Rails.logger.debug "Delay importing own tweets - sleep #{sleep_time} (loop #{loop_count})" + sleep sleep_time + } + + if Ticket::Article.find_by(message_id: tweet.id) + Rails.logger.debug "Own tweet already imported, skipping tweet #{tweet.id}" + return true + end + false + end + end diff --git a/lib/tweet_base.rb b/lib/tweet_base.rb index 24a41465e..25cae7f01 100644 --- a/lib/tweet_base.rb +++ b/lib/tweet_base.rb @@ -62,7 +62,7 @@ class TweetBase user_data[:active] = true user_data[:role_ids] = Role.signup_role_ids - user = User.create(user_data) + user = User.create!(user_data) end if user_data[:image_source] @@ -93,7 +93,7 @@ class TweetBase if auth auth.update_attributes(auth_data) else - Authorization.create(auth_data) + Authorization.create!(auth_data) end user @@ -128,10 +128,10 @@ class TweetBase state = get_state(channel, tweet) - Ticket.create( + Ticket.create!( customer_id: user.id, title: title, - group_id: group_id, + group_id: group_id || Group.first.id, state: state, priority: Ticket::Priority.find_by(name: '2 normal'), preferences: { @@ -235,29 +235,12 @@ class TweetBase Rails.logger.debug 'import tweet' - ticket = nil # use transaction if @connection_type == 'stream' ActiveRecord::Base.connection.reconnect! - - # if sender is a system account, wait until twitter message id is stored - # on article to prevent two (own created & twitter created) articles - tweet_user = user(tweet) - Channel.where(area: 'Twitter::Account').each { |local_channel| - next if !local_channel.options - next if !local_channel.options[:user] - next if !local_channel.options[:user][:id] - next if local_channel.options[:user][:id].to_s != tweet_user.id.to_s - sleep 5 - - # return if tweet already exists (send via system) - if Ticket::Article.find_by(message_id: tweet.id) - Rails.logger.debug "Do not import tweet.id #{tweet.id}, article already exists" - return nil - end - } end + ticket = nil Transaction.execute(reset_user_id: true) do # check if parent exists @@ -272,6 +255,11 @@ class TweetBase ticket = existing_article.ticket else begin + + # in case of streaming mode, get parent tweet via REST client + if !@client && @auth + @client = TweetRest.new(@auth) + end parent_tweet = @client.status(tweet.in_reply_to_status_id) ticket = to_group(parent_tweet, group_id, channel) rescue Twitter::Error::NotFound, Twitter::Error::Forbidden => e @@ -343,11 +331,12 @@ class TweetBase Ticket::State.find_by(default_follow_up: true) end - def tweet_limit_reached(tweet) + def tweet_limit_reached(tweet, factor = 1) max_count = 120 if @connection_type == 'stream' max_count = 30 end + max_count = max_count * factor type_id = Ticket::Article::Type.lookup(name: 'twitter status').id created_at = Time.zone.now - 15.minutes created_count = Ticket::Article.where('created_at > ? AND type_id = ?', created_at, type_id).count @@ -358,11 +347,12 @@ class TweetBase false end - def direct_message_limit_reached(tweet) + def direct_message_limit_reached(tweet, factor = 1) max_count = 100 if @connection_type == 'stream' max_count = 40 end + max_count = max_count * factor type_id = Ticket::Article::Type.lookup(name: 'twitter direct-message').id created_at = Time.zone.now - 15.minutes created_count = Ticket::Article.where('created_at > ? AND type_id = ?', created_at, type_id).count @@ -390,4 +380,17 @@ class TweetBase preferences end + def locale_sender?(tweet) + tweet_user = user(tweet) + Channel.where(area: 'Twitter::Account').each { |local_channel| + next if !local_channel.options + next if !local_channel.options[:user] + next if !local_channel.options[:user][:id] + next if local_channel.options[:user][:id].to_s != tweet_user.id.to_s + Rails.logger.debug "Tweet is sent by local account with user id #{tweet_user.id} and tweet.id #{tweet.id}" + return true + } + false + end + end diff --git a/lib/tweet_stream.rb b/lib/tweet_stream.rb index 1abf1d15f..dc270932c 100644 --- a/lib/tweet_stream.rb +++ b/lib/tweet_stream.rb @@ -6,6 +6,7 @@ class TweetStream < TweetBase def initialize(auth) @connection_type = 'stream' + @auth = auth @client = Twitter::Streaming::ClientCustom.new do |config| config.consumer_key = auth[:consumer_key] config.consumer_secret = auth[:consumer_secret] diff --git a/test/integration/twitter_browser_test.rb b/test/integration/twitter_browser_test.rb index abd763ae8..8be58b1b3 100644 --- a/test/integration/twitter_browser_test.rb +++ b/test/integration/twitter_browser_test.rb @@ -187,7 +187,7 @@ class TwitterBrowserTest < TestCase ) # wait till new streaming of channel is active - sleep 60 + sleep 80 # start tweet from customer client = Twitter::REST::Client.new do |config| @@ -211,7 +211,6 @@ class TwitterBrowserTest < TestCase ) click(text: 'Unassigned & Open') - sleep 6 # till overview is rendered watch_for( css: '.content.active', diff --git a/test/integration/twitter_test.rb b/test/integration/twitter_test.rb index 786cc2773..53f640191 100644 --- a/test/integration/twitter_test.rb +++ b/test/integration/twitter_test.rb @@ -526,14 +526,15 @@ class TwitterTest < ActiveSupport::TestCase tweet = client.update( text, ) - sleep 10 + article = nil - 2.times { + 5.times { + Scheduler.worker(true) article = Ticket::Article.find_by(message_id: tweet.id) break if article ActiveRecord::Base.clear_all_connections! ActiveRecord::Base.connection.query_cache.clear - sleep 15 + sleep 10 } assert(article, "article from customer with text '#{text}' message_id '#{tweet.id}' created") assert_equal(customer_login, article.from, 'ticket article from') @@ -551,9 +552,10 @@ class TwitterTest < ActiveSupport::TestCase tweet = client.update( text, ) - sleep 10 + article = nil - 2.times { + 5.times { + Scheduler.worker(true) article = Ticket::Article.find_by(message_id: tweet.id) break if article ActiveRecord::Base.clear_all_connections! @@ -594,7 +596,7 @@ class TwitterTest < ActiveSupport::TestCase assert(tweet_found, "found outbound '#{reply_text}' tweet '#{article.message_id}'") count = Ticket::Article.where(message_id: article.message_id).count - assert_equal(1, count) + assert_equal(1, count, "tweet #{article.message_id}") channel_id = article.ticket.preferences[:channel_id] assert(channel_id) @@ -616,13 +618,12 @@ class TwitterTest < ActiveSupport::TestCase text, ) assert(dm, "dm with ##{hash} created") - sleep 10 + article = nil - 2.times { + 5.times { + Scheduler.worker(true) article = Ticket::Article.find_by(message_id: dm.id) break if article - ActiveRecord::Base.clear_all_connections! - ActiveRecord::Base.connection.query_cache.clear sleep 10 } assert(article, "inbound article '#{text}' message_id '#{dm.id}' created") @@ -719,9 +720,8 @@ class TwitterTest < ActiveSupport::TestCase retweet = client.retweet(tweet).first # fetch check system account - sleep 15 article = nil - 2.times { + 4.times { # check if ticket and article has been created article = Ticket::Article.find_by(message_id: retweet.id) break if article @@ -734,6 +734,57 @@ class TwitterTest < ActiveSupport::TestCase thread.join end + test 'i restart stream after config of channel has changed' do + hash = "#citheo#{rand(999)}" + + thread = Thread.new { + Channel.stream + sleep 10 + item = { + term: hash, + group_id: group.id, + } + channel_thread = Channel.find(channel.id) + channel_thread[:options]['sync']['search'].push item + channel_thread.save! + } + + sleep 60 + + # new tweet - by me_bauer + client = Twitter::REST::Client.new do |config| + config.consumer_key = consumer_key + config.consumer_secret = consumer_secret + config.access_token = customer_token + config.access_token_secret = customer_token_secret + end + + hash = "#{hash_tag1} ##{hash_gen}" + text = "Today... #{rand_word} #{hash}" + tweet = client.update( + text, + ) + article = nil + 5.times { + Scheduler.worker(true) + article = Ticket::Article.find_by(message_id: tweet.id) + break if article + ActiveRecord::Base.clear_all_connections! + ActiveRecord::Base.connection.query_cache.clear + sleep 10 + } + assert(article, "article from customer with text '#{text}' message_id '#{tweet.id}' created") + assert_equal(customer_login, article.from, 'ticket article from') + assert_nil(article.to, 'ticket article to') + + thread.exit + thread.join + + channel_thread = Channel.find(channel.id) + channel_thread[:options]['sync']['search'].pop + channel_thread.save! + end + def hash_gen rand(999).to_s + (0...10).map { ('a'..'z').to_a[rand(26)] }.join end