trabajo-afectivo/app/models/channel.rb

366 lines
10 KiB
Ruby
Raw Normal View History

2016-10-19 03:11:36 +00:00
# Copyright (C) 2012-2016 Zammad Foundation, http://zammad-foundation.org/
class Channel < ApplicationModel
include Channel::Assets
belongs_to :group, optional: true
2016-06-19 16:58:41 +00:00
store :options
store :preferences
after_create :email_address_check
after_update :email_address_check
after_destroy :email_address_check
2016-01-09 12:23:11 +00:00
# rubocop:disable Style/ClassVars
@@channel_stream = {}
@@channel_stream_started_till_at = {}
2016-01-09 12:23:11 +00:00
# rubocop:enable Style/ClassVars
2015-08-28 00:53:14 +00:00
=begin
fetch all accounts
Channel.fetch
=end
2012-04-10 14:06:46 +00:00
def self.fetch
2015-08-28 00:53:14 +00:00
channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
channels.each(&:fetch)
2012-04-10 14:06:46 +00:00
end
2015-08-28 00:53:14 +00:00
=begin
fetch one account
channel = Channel.where(area: 'Email::Account').first
channel.fetch
=end
2016-01-10 13:24:54 +00:00
def fetch(force = false)
2015-08-28 00:53:14 +00:00
adapter = options[:adapter]
adapter_options = options
if options[:inbound] && options[:inbound][:adapter]
2015-08-28 00:53:14 +00:00
adapter = options[:inbound][:adapter]
adapter_options = options[:inbound][:options]
end
refresh_xoauth2!
driver_class = self.class.driver_class(adapter)
driver_instance = driver_class.new
return if !force && !driver_instance.fetchable?(self)
result = driver_instance.fetch(adapter_options, self)
self.status_in = result[:result]
self.last_log_in = result[:notice]
preferences[:last_fetch] = Time.zone.now
save!
true
rescue => e
error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
logger.error error
logger.error e
self.status_in = 'error'
self.last_log_in = error
preferences[:last_fetch] = Time.zone.now
save!
false
2016-01-09 12:23:11 +00:00
end
=begin
stream instance of account
channel = Channel.where(area: 'Twitter::Account').first
stream_instance = channel.stream_instance
# start stream
stream_instance.stream
=end
def stream_instance
adapter = options[:adapter]
begin
driver_class = self.class.driver_class(adapter)
2016-01-09 12:23:11 +00:00
driver_instance = driver_class.new
# check is stream exists
return if !driver_instance.respond_to?(:stream_instance)
2016-01-09 12:23:11 +00:00
driver_instance.stream_instance(self)
# set scheduler job to active
driver_instance
2015-08-28 00:53:14 +00:00
rescue => e
error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
logger.error error
logger.error e
2015-08-28 00:53:14 +00:00
self.status_in = 'error'
self.last_log_in = error
save!
2015-08-28 00:53:14 +00:00
end
end
=begin
2016-01-09 12:23:11 +00:00
stream all accounts
Channel.stream
=end
def self.stream
Thread.abort_on_exception = true
auto_reconnect_after = 180
delay_before_reconnect = 70
2016-01-09 12:23:11 +00:00
last_channels = []
loop do
logger.debug { 'stream controll loop' }
2016-01-09 12:23:11 +00:00
current_channels = []
channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
channels.each do |channel|
adapter = channel.options[:adapter]
next if adapter.blank?
driver_class = self.driver_class(adapter)
next if !driver_class.respond_to?(:streamable?)
next if !driver_class.streamable?
channel_id = channel.id.to_s
current_channels.push channel_id
# exit it channel has changed or connection is older then 180 minutes
if @@channel_stream[channel_id].present?
if @@channel_stream[channel_id][:options] != channel.options
logger.info "channel options (#{channel.id}) has changed, stop stream thread"
@@channel_stream[channel_id][:thread].exit
@@channel_stream[channel_id][:thread].join
@@channel_stream[channel_id][:stream_instance].disconnect
@@channel_stream.delete(channel_id)
@@channel_stream_started_till_at[channel_id] = Time.zone.now
next
elsif @@channel_stream[channel_id][:started_at] && @@channel_stream[channel_id][:started_at] < Time.zone.now - auto_reconnect_after.minutes
logger.info "channel (#{channel.id}) reconnect - stream thread is older then #{auto_reconnect_after} minutes, restart thread"
@@channel_stream[channel_id][:thread].exit
@@channel_stream[channel_id][:thread].join
@@channel_stream[channel_id][:stream_instance].disconnect
@@channel_stream.delete(channel_id)
@@channel_stream_started_till_at[channel_id] = Time.zone.now
next
end
2016-01-09 12:23:11 +00:00
end
local_delay_before_reconnect = delay_before_reconnect
if channel.status_in == 'error'
local_delay_before_reconnect = local_delay_before_reconnect * 2
end
if @@channel_stream[channel_id].blank? && @@channel_stream_started_till_at[channel_id].present?
wait_in_seconds = @@channel_stream_started_till_at[channel_id] - (Time.zone.now - local_delay_before_reconnect.seconds)
if wait_in_seconds.positive?
logger.info "skipp channel (#{channel_id}) for streaming, already tried to connect or connection was active within the last #{local_delay_before_reconnect} seconds - wait another #{wait_in_seconds} seconds"
next
end
end
#logger.info "thread stream for channel (#{channel.id}) already running" if @@channel_stream[channel_id].present?
next if @@channel_stream[channel_id].present?
2016-01-09 12:23:11 +00:00
@@channel_stream[channel_id] = {
options: channel.options,
started_at: Time.zone.now,
2016-01-09 12:23:11 +00:00
}
# start channels with delay
sleep @@channel_stream.count
2016-01-09 12:23:11 +00:00
# start threads for each channel
@@channel_stream[channel_id][:thread] = Thread.new do
logger.info "Started stream channel for '#{channel.id}' (#{channel.area})..."
channel.status_in = 'ok'
channel.last_log_in = ''
channel.save!
@@channel_stream_started_till_at[channel_id] = Time.zone.now
@@channel_stream[channel_id] ||= {}
@@channel_stream[channel_id][:stream_instance] = channel.stream_instance
@@channel_stream[channel_id][:stream_instance].stream
@@channel_stream[channel_id][:stream_instance].disconnect
@@channel_stream.delete(channel_id)
@@channel_stream_started_till_at[channel_id] = Time.zone.now
logger.info " ...stopped stream thread for '#{channel.id}'"
rescue => e
error = "Can't use stream for channel (#{channel.id}): #{e.inspect}"
logger.error error
logger.error e
channel.status_in = 'error'
channel.last_log_in = error
channel.save!
@@channel_stream.delete(channel_id)
@@channel_stream_started_till_at[channel_id] = Time.zone.now
end
end
2016-01-09 12:23:11 +00:00
# cleanup deleted channels
last_channels.each do |channel_id|
next if @@channel_stream[channel_id].blank?
2016-01-09 12:23:11 +00:00
next if current_channels.include?(channel_id)
logger.info "channel (#{channel_id}) not longer active, stop stream thread"
@@channel_stream[channel_id][:thread].exit
@@channel_stream[channel_id][:thread].join
@@channel_stream[channel_id][:stream_instance].disconnect
@@channel_stream.delete(channel_id)
@@channel_stream_started_till_at[channel_id] = Time.zone.now
end
2016-01-09 12:23:11 +00:00
last_channels = current_channels
sleep 20
2016-01-09 12:23:11 +00:00
end
end
=begin
2015-08-28 00:53:14 +00:00
send via account
channel = Channel.where(area: 'Email::Account').first
channel.deliver(params, notification)
2015-08-28 00:53:14 +00:00
=end
def deliver(params, notification = false)
2015-08-28 00:53:14 +00:00
adapter = options[:adapter]
adapter_options = options
if options[:outbound] && options[:outbound][:adapter]
2015-08-28 00:53:14 +00:00
adapter = options[:outbound][:adapter]
adapter_options = options[:outbound][:options]
end
refresh_xoauth2!
driver_class = self.class.driver_class(adapter)
driver_instance = driver_class.new
result = driver_instance.send(adapter_options, params, notification)
self.status_out = 'ok'
self.last_log_out = ''
save!
2015-08-28 01:08:55 +00:00
result
rescue => e
error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
logger.error error
logger.error e
self.status_out = 'error'
self.last_log_out = error
save!
raise error
2015-08-28 00:53:14 +00:00
end
=begin
process via account
channel = Channel.where(area: 'Email::Account').first
channel.process(params)
=end
def process(params)
adapter = options[:adapter]
adapter_options = options
if options[:inbound] && options[:inbound][:adapter]
adapter = options[:inbound][:adapter]
adapter_options = options[:inbound][:options]
end
result = nil
begin
driver_class = self.class.driver_class(adapter)
driver_instance = driver_class.new
result = driver_instance.process(adapter_options, params, self)
self.status_in = 'ok'
self.last_log_in = ''
save!
rescue => e
error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
logger.error error
logger.error e
self.status_in = 'error'
self.last_log_in = error
save!
raise e, error
end
result
end
=begin
load channel driver and return class
klass = Channel.driver_class('Imap')
=end
def self.driver_class(adapter)
# we need to require each channel backend individually otherwise we get a
# 'warning: toplevel constant Twitter referenced by Channel::Driver::Twitter' error e.g.
# so we have to convert the channel name to the filename via Rails String.underscore
# http://stem.ps/rails/2015/01/25/ruby-gotcha-toplevel-constant-referenced-by.html
require_dependency "channel/driver/#{adapter.to_filename}"
"::Channel::Driver::#{adapter.to_classname}".constantize
end
=begin
get instance of channel driver
channel.driver_instance
=end
def driver_instance
self.class.driver_class(options[:adapter])
end
def refresh_xoauth2!(force: false)
return if options.dig(:auth, :type) != 'XOAUTH2'
return if !force && ApplicationHandleInfo.current == 'application_server'
result = ExternalCredential.refresh_token(options[:auth][:provider], options[:auth])
options[:auth] = result
options[:inbound][:options][:password] = result[:access_token]
options[:outbound][:options][:password] = result[:access_token]
return if new_record?
save!
rescue => e
logger.error e
raise "Failed to refresh XOAUTH2 access_token of provider '#{options[:auth][:provider]}': #{e.message}"
end
private
def email_address_check
# reset non existing channel_ids
EmailAddress.channel_cleanup
end
end