Fixed issue #1406 - Channel: Twitter streaming is complaining about.

This commit is contained in:
Martin Edenhofer 2017-10-02 13:23:14 +02:00
parent 4e8bc9564e
commit 5fa34aa559
6 changed files with 99 additions and 24 deletions

View file

@ -15,6 +15,7 @@ class Channel < ApplicationModel
# rubocop:disable Style/ClassVars # rubocop:disable Style/ClassVars
@@channel_stream = {} @@channel_stream = {}
@@channel_stream_started_till_at = {}
# rubocop:enable Style/ClassVars # rubocop:enable Style/ClassVars
=begin =begin
@ -63,7 +64,7 @@ fetch one account
self.status_in = result[:result] self.status_in = result[:result]
self.last_log_in = result[:notice] self.last_log_in = result[:notice]
preferences[:last_fetch] = Time.zone.now preferences[:last_fetch] = Time.zone.now
save save!
rescue => e rescue => e
error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}" error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
logger.error error logger.error error
@ -71,7 +72,7 @@ fetch one account
self.status_in = 'error' self.status_in = 'error'
self.last_log_in = error self.last_log_in = error
preferences[:last_fetch] = Time.zone.now preferences[:last_fetch] = Time.zone.now
save save!
end end
end end
@ -116,7 +117,7 @@ stream instance of account
logger.error e.backtrace logger.error e.backtrace
self.status_in = 'error' self.status_in = 'error'
self.last_log_in = error self.last_log_in = error
save save!
end end
end end
@ -132,7 +133,7 @@ stream all accounts
def self.stream def self.stream
Thread.abort_on_exception = true Thread.abort_on_exception = true
auto_reconnect_after = 25 auto_reconnect_after = 180
last_channels = [] last_channels = []
loop do loop do
@ -141,11 +142,20 @@ stream all accounts
current_channels = [] current_channels = []
channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account') channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
channels.each do |channel| channels.each do |channel|
next if channel.options[:adapter] != 'twitter' adapter = channel.options[:adapter]
driver_class = Object.const_get("Channel::Driver::#{adapter.to_classname}")
next if !driver_class.respond_to?(:streamable?)
next if !driver_class.streamable?
channel_id = channel.id.to_s channel_id = channel.id.to_s
if @@channel_stream[channel_id].blank? && @@channel_stream_started_till_at[channel_id] && @@channel_stream_started_till_at[channel_id] > Time.zone.now - 65.seconds
logger.info "skipp channel (#{channel_id}) for streaming, already tried to connect or connection was active within the last minute"
next
end
current_channels.push channel_id current_channels.push channel_id
# exit it channel has changed or connection is older then 25 min. # exit it channel has changed or connection is older then 180 minutes
if @@channel_stream[channel_id] if @@channel_stream[channel_id]
if @@channel_stream[channel_id][:updated_at] != channel.updated_at if @@channel_stream[channel_id][:updated_at] != channel.updated_at
logger.info "channel (#{channel.id}) has changed, stop thread" logger.info "channel (#{channel.id}) has changed, stop thread"
@ -153,12 +163,14 @@ stream all accounts
@@channel_stream[channel_id][:thread].join @@channel_stream[channel_id][:thread].join
@@channel_stream[channel_id][:stream_instance].disconnect @@channel_stream[channel_id][:stream_instance].disconnect
@@channel_stream[channel_id] = false @@channel_stream[channel_id] = false
@@channel_stream_started_till_at[channel_id] = Time.zone.now
elsif @@channel_stream[channel_id][:started_at] && @@channel_stream[channel_id][:started_at] < Time.zone.now - auto_reconnect_after.minutes elsif @@channel_stream[channel_id][:started_at] && @@channel_stream[channel_id][:started_at] < Time.zone.now - auto_reconnect_after.minutes
logger.info "channel (#{channel.id}) reconnect - thread is older then #{auto_reconnect_after} minutes, restart thread" logger.info "channel (#{channel.id}) reconnect - thread is older then #{auto_reconnect_after} minutes, restart thread"
@@channel_stream[channel_id][:thread].exit @@channel_stream[channel_id][:thread].exit
@@channel_stream[channel_id][:thread].join @@channel_stream[channel_id][:thread].join
@@channel_stream[channel_id][:stream_instance].disconnect @@channel_stream[channel_id][:stream_instance].disconnect
@@channel_stream[channel_id] = false @@channel_stream[channel_id] = false
@@channel_stream_started_till_at[channel_id] = Time.zone.now
end end
end end
@ -177,19 +189,24 @@ stream all accounts
@@channel_stream[channel_id][:thread] = Thread.new do @@channel_stream[channel_id][:thread] = Thread.new do
begin begin
logger.info "Started stream channel for '#{channel.id}' (#{channel.area})..." logger.info "Started stream channel for '#{channel.id}' (#{channel.area})..."
channel.status_in = 'ok'
channel.last_log_in = ''
channel.save!
@@channel_stream_started_till_at[channel_id] = Time.zone.now
@@channel_stream[channel_id] ||= {} @@channel_stream[channel_id] ||= {}
@@channel_stream[channel_id][:stream_instance] = channel.stream_instance @@channel_stream[channel_id][:stream_instance] = channel.stream_instance
@@channel_stream[channel_id][:stream_instance].stream @@channel_stream[channel_id][:stream_instance].stream
@@channel_stream[channel_id][:stream_instance].disconnect @@channel_stream[channel_id][:stream_instance].disconnect
@@channel_stream[channel_id] = false @@channel_stream[channel_id] = false
logger.info " ...stopped thread for '#{channel.id}'" @@channel_stream_started_till_at[channel_id] = Time.zone.now
logger.info " ...stopped stream thread for '#{channel.id}'"
rescue => e rescue => e
error = "Can't use channel (#{channel.id}): #{e.inspect}" error = "Can't use stream for channel (#{channel.id}): #{e.inspect}"
logger.error error logger.error error
logger.error e.backtrace logger.error e.backtrace
channel.status_in = 'error' channel.status_in = 'error'
channel.last_log_in = error channel.last_log_in = error
channel.save channel.save!
@@channel_stream[channel_id] = false @@channel_stream[channel_id] = false
end end
end end
@ -197,14 +214,16 @@ stream all accounts
# cleanup deleted channels # cleanup deleted channels
last_channels.each do |channel_id| last_channels.each do |channel_id|
next if !@@channel_stream[channel_id.to_s] next if @@channel_stream[channel_id].blank?
next if current_channels.include?(channel_id) next if current_channels.include?(channel_id)
logger.info "channel (#{channel_id}) not longer active, stop thread" logger.info "channel (#{channel_id}) not longer active, stop stream thread"
@@channel_stream[channel_id.to_s][:thread].exit @@channel_stream[channel_id][:thread].exit
@@channel_stream[channel_id.to_s][:thread].join @@channel_stream[channel_id][:thread].join
@@channel_stream[channel_id.to_s][:stream_instance].disconnect @@channel_stream[channel_id][:stream_instance].disconnect
@@channel_stream[channel_id.to_s] = false @@channel_stream[channel_id] = false
@@channel_stream_started_till_at[channel_id] = Time.zone.now
end end
last_channels = current_channels last_channels = current_channels
sleep 20 sleep 20
@ -245,14 +264,14 @@ send via account
result = driver_instance.send(adapter_options, mail_params, notification) result = driver_instance.send(adapter_options, mail_params, notification)
self.status_out = 'ok' self.status_out = 'ok'
self.last_log_out = '' self.last_log_out = ''
save save!
rescue => e rescue => e
error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}" error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
logger.error error logger.error error
logger.error e.backtrace logger.error e.backtrace
self.status_out = 'error' self.status_out = 'error'
self.last_log_out = error self.last_log_out = error
save save!
raise error raise error
end end
result result

View file

@ -63,6 +63,20 @@ class Channel::Driver::Facebook
def disconnect def disconnect
end end
=begin
Channel::Driver::Facebook.streamable?
returns
true|false
=end
def self.streamable?
false
end
private private
def get_page(page_id) def get_page(page_id)

View file

@ -44,7 +44,7 @@ returns
=end =end
def fetch (options, channel, check_type = '', verify_string = '') def fetch(options, channel, check_type = '', verify_string = '')
ssl = true ssl = true
port = 995 port = 995
if options.key?(:ssl) && options[:ssl] == false if options.key?(:ssl) && options[:ssl] == false
@ -180,6 +180,20 @@ returns
true true
end end
=begin
Channel::Driver::Pop3.streamable?
returns
true|false
=end
def self.streamable?
false
end
def disconnect def disconnect
return if !@pop return if !@pop
@pop.finish @pop.finish

View file

@ -30,6 +30,20 @@ class Channel::Driver::Telegram
message message
end end
=begin
Channel::Driver::Telegram.streamable?
returns
true|false
=end
def self.streamable?
false
end
private private
def check_external_credential(options) def check_external_credential(options)

View file

@ -123,6 +123,20 @@ returns
@rest_client.disconnect if @rest_client @rest_client.disconnect if @rest_client
end end
=begin
Channel::Driver::Twitter.streamable?
returns
true|false
=end
def self.streamable?
true
end
=begin =begin
create stream endpoint form twitter account create stream endpoint form twitter account
@ -183,7 +197,7 @@ returns
=end =end
def stream def stream
sleep_on_unauthorized = 61 sleep_on_unauthorized = 65
2.times do |loop_count| 2.times do |loop_count|
begin begin
stream_start stream_start

View file

@ -22,11 +22,6 @@ class Sessions::Backend::Collections::Base < Sessions::Backend::Base
def push def push
# check permission based access
if self.class.permissions
return if !@user.permissions?(self.class.permissions)
end
# check timeout # check timeout
timeout = Sessions::CacheIn.get(client_key) timeout = Sessions::CacheIn.get(client_key)
return if timeout return if timeout
@ -34,6 +29,11 @@ class Sessions::Backend::Collections::Base < Sessions::Backend::Base
# set new timeout # set new timeout
Sessions::CacheIn.set(client_key, true, { expires_in: @ttl.seconds }) Sessions::CacheIn.set(client_key, true, { expires_in: @ttl.seconds })
# check permission based access
if self.class.permissions
return if !@user.permissions?(self.class.permissions)
end
# check if update has been done # check if update has been done
last_change = self.class.model.constantize.latest_change last_change = self.class.model.constantize.latest_change
return if last_change.to_s == @last_change return if last_change.to_s == @last_change