From 5fa34aa559edb40bbab0880d74083a31486a8839 Mon Sep 17 00:00:00 2001 From: Martin Edenhofer Date: Mon, 2 Oct 2017 13:23:14 +0200 Subject: [PATCH] Fixed issue #1406 - Channel: Twitter streaming is complaining about. --- app/models/channel.rb | 53 ++++++++++++++++-------- app/models/channel/driver/facebook.rb | 14 +++++++ app/models/channel/driver/pop3.rb | 16 ++++++- app/models/channel/driver/telegram.rb | 14 +++++++ app/models/channel/driver/twitter.rb | 16 ++++++- lib/sessions/backend/collections/base.rb | 10 ++--- 6 files changed, 99 insertions(+), 24 deletions(-) diff --git a/app/models/channel.rb b/app/models/channel.rb index 00a872600..6b55defd8 100644 --- a/app/models/channel.rb +++ b/app/models/channel.rb @@ -15,6 +15,7 @@ class Channel < ApplicationModel # rubocop:disable Style/ClassVars @@channel_stream = {} + @@channel_stream_started_till_at = {} # rubocop:enable Style/ClassVars =begin @@ -63,7 +64,7 @@ fetch one account self.status_in = result[:result] self.last_log_in = result[:notice] preferences[:last_fetch] = Time.zone.now - save + save! rescue => e error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}" logger.error error @@ -71,7 +72,7 @@ fetch one account self.status_in = 'error' self.last_log_in = error preferences[:last_fetch] = Time.zone.now - save + save! end end @@ -116,7 +117,7 @@ stream instance of account logger.error e.backtrace self.status_in = 'error' self.last_log_in = error - save + save! end end @@ -132,7 +133,7 @@ stream all accounts def self.stream Thread.abort_on_exception = true - auto_reconnect_after = 25 + auto_reconnect_after = 180 last_channels = [] loop do @@ -141,11 +142,20 @@ stream all accounts current_channels = [] channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account') channels.each do |channel| - next if channel.options[:adapter] != 'twitter' + adapter = channel.options[:adapter] + driver_class = Object.const_get("Channel::Driver::#{adapter.to_classname}") + next if !driver_class.respond_to?(:streamable?) + next if !driver_class.streamable? channel_id = channel.id.to_s + + if @@channel_stream[channel_id].blank? && @@channel_stream_started_till_at[channel_id] && @@channel_stream_started_till_at[channel_id] > Time.zone.now - 65.seconds + logger.info "skipp channel (#{channel_id}) for streaming, already tried to connect or connection was active within the last minute" + next + end + current_channels.push channel_id - # exit it channel has changed or connection is older then 25 min. + # exit it channel has changed or connection is older then 180 minutes if @@channel_stream[channel_id] if @@channel_stream[channel_id][:updated_at] != channel.updated_at logger.info "channel (#{channel.id}) has changed, stop thread" @@ -153,12 +163,14 @@ stream all accounts @@channel_stream[channel_id][:thread].join @@channel_stream[channel_id][:stream_instance].disconnect @@channel_stream[channel_id] = false + @@channel_stream_started_till_at[channel_id] = Time.zone.now elsif @@channel_stream[channel_id][:started_at] && @@channel_stream[channel_id][:started_at] < Time.zone.now - auto_reconnect_after.minutes logger.info "channel (#{channel.id}) reconnect - thread is older then #{auto_reconnect_after} minutes, restart thread" @@channel_stream[channel_id][:thread].exit @@channel_stream[channel_id][:thread].join @@channel_stream[channel_id][:stream_instance].disconnect @@channel_stream[channel_id] = false + @@channel_stream_started_till_at[channel_id] = Time.zone.now end end @@ -177,19 +189,24 @@ stream all accounts @@channel_stream[channel_id][:thread] = Thread.new do begin logger.info "Started stream channel for '#{channel.id}' (#{channel.area})..." + channel.status_in = 'ok' + channel.last_log_in = '' + channel.save! + @@channel_stream_started_till_at[channel_id] = Time.zone.now @@channel_stream[channel_id] ||= {} @@channel_stream[channel_id][:stream_instance] = channel.stream_instance @@channel_stream[channel_id][:stream_instance].stream @@channel_stream[channel_id][:stream_instance].disconnect @@channel_stream[channel_id] = false - logger.info " ...stopped thread for '#{channel.id}'" + @@channel_stream_started_till_at[channel_id] = Time.zone.now + logger.info " ...stopped stream thread for '#{channel.id}'" rescue => e - error = "Can't use channel (#{channel.id}): #{e.inspect}" + error = "Can't use stream for channel (#{channel.id}): #{e.inspect}" logger.error error logger.error e.backtrace channel.status_in = 'error' channel.last_log_in = error - channel.save + channel.save! @@channel_stream[channel_id] = false end end @@ -197,14 +214,16 @@ stream all accounts # cleanup deleted channels last_channels.each do |channel_id| - next if !@@channel_stream[channel_id.to_s] + next if @@channel_stream[channel_id].blank? next if current_channels.include?(channel_id) - logger.info "channel (#{channel_id}) not longer active, stop thread" - @@channel_stream[channel_id.to_s][:thread].exit - @@channel_stream[channel_id.to_s][:thread].join - @@channel_stream[channel_id.to_s][:stream_instance].disconnect - @@channel_stream[channel_id.to_s] = false + logger.info "channel (#{channel_id}) not longer active, stop stream thread" + @@channel_stream[channel_id][:thread].exit + @@channel_stream[channel_id][:thread].join + @@channel_stream[channel_id][:stream_instance].disconnect + @@channel_stream[channel_id] = false + @@channel_stream_started_till_at[channel_id] = Time.zone.now end + last_channels = current_channels sleep 20 @@ -245,14 +264,14 @@ send via account result = driver_instance.send(adapter_options, mail_params, notification) self.status_out = 'ok' self.last_log_out = '' - save + save! rescue => e error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}" logger.error error logger.error e.backtrace self.status_out = 'error' self.last_log_out = error - save + save! raise error end result diff --git a/app/models/channel/driver/facebook.rb b/app/models/channel/driver/facebook.rb index 910595bf6..3d46638e6 100644 --- a/app/models/channel/driver/facebook.rb +++ b/app/models/channel/driver/facebook.rb @@ -63,6 +63,20 @@ class Channel::Driver::Facebook def disconnect end +=begin + + Channel::Driver::Facebook.streamable? + +returns + + true|false + +=end + + def self.streamable? + false + end + private def get_page(page_id) diff --git a/app/models/channel/driver/pop3.rb b/app/models/channel/driver/pop3.rb index a8341b6e4..4b5542709 100644 --- a/app/models/channel/driver/pop3.rb +++ b/app/models/channel/driver/pop3.rb @@ -44,7 +44,7 @@ returns =end - def fetch (options, channel, check_type = '', verify_string = '') + def fetch(options, channel, check_type = '', verify_string = '') ssl = true port = 995 if options.key?(:ssl) && options[:ssl] == false @@ -180,6 +180,20 @@ returns true end +=begin + + Channel::Driver::Pop3.streamable? + +returns + + true|false + +=end + + def self.streamable? + false + end + def disconnect return if !@pop @pop.finish diff --git a/app/models/channel/driver/telegram.rb b/app/models/channel/driver/telegram.rb index ffa24fc67..66f1eb5b5 100644 --- a/app/models/channel/driver/telegram.rb +++ b/app/models/channel/driver/telegram.rb @@ -30,6 +30,20 @@ class Channel::Driver::Telegram message end +=begin + + Channel::Driver::Telegram.streamable? + +returns + + true|false + +=end + + def self.streamable? + false + end + private def check_external_credential(options) diff --git a/app/models/channel/driver/twitter.rb b/app/models/channel/driver/twitter.rb index 2a3f60447..0780b7d2c 100644 --- a/app/models/channel/driver/twitter.rb +++ b/app/models/channel/driver/twitter.rb @@ -123,6 +123,20 @@ returns @rest_client.disconnect if @rest_client end +=begin + + Channel::Driver::Twitter.streamable? + +returns + + true|false + +=end + + def self.streamable? + true + end + =begin create stream endpoint form twitter account @@ -183,7 +197,7 @@ returns =end def stream - sleep_on_unauthorized = 61 + sleep_on_unauthorized = 65 2.times do |loop_count| begin stream_start diff --git a/lib/sessions/backend/collections/base.rb b/lib/sessions/backend/collections/base.rb index 33588df94..1b3e526a2 100644 --- a/lib/sessions/backend/collections/base.rb +++ b/lib/sessions/backend/collections/base.rb @@ -22,11 +22,6 @@ class Sessions::Backend::Collections::Base < Sessions::Backend::Base def push - # check permission based access - if self.class.permissions - return if !@user.permissions?(self.class.permissions) - end - # check timeout timeout = Sessions::CacheIn.get(client_key) return if timeout @@ -34,6 +29,11 @@ class Sessions::Backend::Collections::Base < Sessions::Backend::Base # set new timeout Sessions::CacheIn.set(client_key, true, { expires_in: @ttl.seconds }) + # check permission based access + if self.class.permissions + return if !@user.permissions?(self.class.permissions) + end + # check if update has been done last_change = self.class.model.constantize.latest_change return if last_change.to_s == @last_change