Improved streaming reconnect (only reconnect if options has changed, if streaming connect was not successful, delay reconnect for 70 seconds).

This commit is contained in:
Martin Edenhofer 2017-10-15 09:14:34 +02:00
parent 645c1c3de4
commit 734c821abc

View file

@ -134,6 +134,7 @@ stream all accounts
Thread.abort_on_exception = true Thread.abort_on_exception = true
auto_reconnect_after = 180 auto_reconnect_after = 180
delay_before_reconnect = 70
last_channels = [] last_channels = []
loop do loop do
@ -149,37 +150,46 @@ stream all accounts
next if !driver_class.streamable? next if !driver_class.streamable?
channel_id = channel.id.to_s channel_id = channel.id.to_s
if @@channel_stream[channel_id].blank? && @@channel_stream_started_till_at[channel_id] && @@channel_stream_started_till_at[channel_id] > Time.zone.now - 65.seconds
logger.info "skipp channel (#{channel_id}) for streaming, already tried to connect or connection was active within the last minute"
next
end
current_channels.push channel_id current_channels.push channel_id
# exit it channel has changed or connection is older then 180 minutes # exit it channel has changed or connection is older then 180 minutes
if @@channel_stream[channel_id] if @@channel_stream[channel_id].present?
if @@channel_stream[channel_id][:updated_at] != channel.updated_at if @@channel_stream[channel_id][:options] != channel.options
logger.info "channel (#{channel.id}) has changed, stop thread" logger.info "channel options (#{channel.id}) has changed, stop stream thread"
@@channel_stream[channel_id][:thread].exit @@channel_stream[channel_id][:thread].exit
@@channel_stream[channel_id][:thread].join @@channel_stream[channel_id][:thread].join
@@channel_stream[channel_id][:stream_instance].disconnect @@channel_stream[channel_id][:stream_instance].disconnect
@@channel_stream[channel_id] = false @@channel_stream.delete(channel_id)
@@channel_stream_started_till_at[channel_id] = Time.zone.now @@channel_stream_started_till_at[channel_id] = Time.zone.now
next
elsif @@channel_stream[channel_id][:started_at] && @@channel_stream[channel_id][:started_at] < Time.zone.now - auto_reconnect_after.minutes elsif @@channel_stream[channel_id][:started_at] && @@channel_stream[channel_id][:started_at] < Time.zone.now - auto_reconnect_after.minutes
logger.info "channel (#{channel.id}) reconnect - thread is older then #{auto_reconnect_after} minutes, restart thread" logger.info "channel (#{channel.id}) reconnect - stream thread is older then #{auto_reconnect_after} minutes, restart thread"
@@channel_stream[channel_id][:thread].exit @@channel_stream[channel_id][:thread].exit
@@channel_stream[channel_id][:thread].join @@channel_stream[channel_id][:thread].join
@@channel_stream[channel_id][:stream_instance].disconnect @@channel_stream[channel_id][:stream_instance].disconnect
@@channel_stream[channel_id] = false @@channel_stream.delete(channel_id)
@@channel_stream_started_till_at[channel_id] = Time.zone.now @@channel_stream_started_till_at[channel_id] = Time.zone.now
next
end end
end end
#logger.debug "thread for channel (#{channel.id}) already running" if channel_stream local_delay_before_reconnect = delay_before_reconnect
next if @@channel_stream[channel_id] if channel.status_in == 'error'
local_delay_before_reconnect = local_delay_before_reconnect * 2
end
if @@channel_stream[channel_id].blank? && @@channel_stream_started_till_at[channel_id].present?
wait_in_seconds = @@channel_stream_started_till_at[channel_id] - (Time.zone.now - local_delay_before_reconnect.seconds)
if wait_in_seconds.positive?
logger.info "skipp channel (#{channel_id}) for streaming, already tried to connect or connection was active within the last #{local_delay_before_reconnect} seconds - wait another #{wait_in_seconds} seconds"
next
end
end
#logger.info "thread stream for channel (#{channel.id}) already running" if @@channel_stream[channel_id].present?
next if @@channel_stream[channel_id].present?
@@channel_stream[channel_id] = { @@channel_stream[channel_id] = {
updated_at: channel.updated_at, options: channel.options,
started_at: Time.zone.now, started_at: Time.zone.now,
} }
@ -198,7 +208,7 @@ stream all accounts
@@channel_stream[channel_id][:stream_instance] = channel.stream_instance @@channel_stream[channel_id][:stream_instance] = channel.stream_instance
@@channel_stream[channel_id][:stream_instance].stream @@channel_stream[channel_id][:stream_instance].stream
@@channel_stream[channel_id][:stream_instance].disconnect @@channel_stream[channel_id][:stream_instance].disconnect
@@channel_stream[channel_id] = false @@channel_stream.delete(channel_id)
@@channel_stream_started_till_at[channel_id] = Time.zone.now @@channel_stream_started_till_at[channel_id] = Time.zone.now
logger.info " ...stopped stream thread for '#{channel.id}'" logger.info " ...stopped stream thread for '#{channel.id}'"
rescue => e rescue => e
@ -208,7 +218,8 @@ stream all accounts
channel.status_in = 'error' channel.status_in = 'error'
channel.last_log_in = error channel.last_log_in = error
channel.save! channel.save!
@@channel_stream[channel_id] = false @@channel_stream.delete(channel_id)
@@channel_stream_started_till_at[channel_id] = Time.zone.now
end end
end end
end end
@ -221,7 +232,7 @@ stream all accounts
@@channel_stream[channel_id][:thread].exit @@channel_stream[channel_id][:thread].exit
@@channel_stream[channel_id][:thread].join @@channel_stream[channel_id][:thread].join
@@channel_stream[channel_id][:stream_instance].disconnect @@channel_stream[channel_id][:stream_instance].disconnect
@@channel_stream[channel_id] = false @@channel_stream.delete(channel_id)
@@channel_stream_started_till_at[channel_id] = Time.zone.now @@channel_stream_started_till_at[channel_id] = Time.zone.now
end end