2016-10-19 03:11:36 +00:00
# Copyright (C) 2012-2016 Zammad Foundation, http://zammad-foundation.org/
2013-06-12 15:59:58 +00:00
2012-09-20 12:08:02 +00:00
class Channel < ApplicationModel
2015-08-30 11:58:05 +00:00
include Channel :: Assets
2019-07-04 11:16:55 +00:00
belongs_to :group , optional : true
2016-06-19 16:58:41 +00:00
2012-04-13 13:51:10 +00:00
store :options
2015-08-29 11:46:48 +00:00
store :preferences
2012-04-13 13:51:10 +00:00
2015-08-28 08:19:27 +00:00
after_create :email_address_check
after_update :email_address_check
after_destroy :email_address_check
2016-01-09 12:23:11 +00:00
# rubocop:disable Style/ClassVars
@@channel_stream = { }
2017-10-02 11:23:14 +00:00
@@channel_stream_started_till_at = { }
2016-01-09 12:23:11 +00:00
# rubocop:enable Style/ClassVars
2015-08-28 00:53:14 +00:00
= begin
fetch all accounts
Channel . fetch
= end
2012-04-10 14:06:46 +00:00
def self . fetch
2015-08-28 00:53:14 +00:00
channels = Channel . where ( 'active = ? AND area LIKE ?' , true , '%::Account' )
channels . each ( & :fetch )
2012-04-10 14:06:46 +00:00
end
2015-08-28 00:53:14 +00:00
= begin
fetch one account
channel = Channel . where ( area : 'Email::Account' ) . first
channel . fetch
= end
2016-01-10 13:24:54 +00:00
def fetch ( force = false )
2015-08-28 00:53:14 +00:00
adapter = options [ :adapter ]
adapter_options = options
2015-08-30 11:58:05 +00:00
if options [ :inbound ] && options [ :inbound ] [ :adapter ]
2015-08-28 00:53:14 +00:00
adapter = options [ :inbound ] [ :adapter ]
adapter_options = options [ :inbound ] [ :options ]
end
2020-11-20 10:54:09 +00:00
refresh_xoauth2!
2018-10-09 06:17:41 +00:00
2020-11-20 10:54:09 +00:00
driver_class = self . class . driver_class ( adapter )
driver_instance = driver_class . new
return if ! force && ! driver_instance . fetchable? ( self )
result = driver_instance . fetch ( adapter_options , self )
self . status_in = result [ :result ]
self . last_log_in = result [ :notice ]
preferences [ :last_fetch ] = Time . zone . now
save!
true
rescue = > e
error = " Can't use Channel::Driver:: #{ adapter . to_classname } : #{ e . inspect } "
logger . error error
logger . error e
self . status_in = 'error'
self . last_log_in = error
preferences [ :last_fetch ] = Time . zone . now
save!
false
2016-01-09 12:23:11 +00:00
end
= begin
stream instance of account
channel = Channel . where ( area : 'Twitter::Account' ) . first
stream_instance = channel . stream_instance
# start stream
stream_instance . stream
= end
def stream_instance
adapter = options [ :adapter ]
begin
2018-10-16 08:45:15 +00:00
driver_class = self . class . driver_class ( adapter )
2016-01-09 12:23:11 +00:00
driver_instance = driver_class . new
# check is stream exists
return if ! driver_instance . respond_to? ( :stream_instance )
2018-10-09 06:17:41 +00:00
2016-01-09 12:23:11 +00:00
driver_instance . stream_instance ( self )
# set scheduler job to active
2020-02-18 19:51:31 +00:00
driver_instance
2015-08-28 00:53:14 +00:00
rescue = > e
error = " Can't use Channel::Driver:: #{ adapter . to_classname } : #{ e . inspect } "
logger . error error
2018-11-27 09:56:23 +00:00
logger . error e
2015-08-28 00:53:14 +00:00
self . status_in = 'error'
self . last_log_in = error
2017-10-02 11:23:14 +00:00
save!
2015-08-28 00:53:14 +00:00
end
end
= begin
2016-01-09 12:23:11 +00:00
stream all accounts
Channel . stream
= end
def self . stream
Thread . abort_on_exception = true
2017-10-02 11:23:14 +00:00
auto_reconnect_after = 180
2017-10-15 07:14:34 +00:00
delay_before_reconnect = 70
2016-01-09 12:23:11 +00:00
last_channels = [ ]
loop do
2018-03-20 17:47:49 +00:00
logger . debug { 'stream controll loop' }
2017-06-01 06:23:53 +00:00
2016-01-09 12:23:11 +00:00
current_channels = [ ]
channels = Channel . where ( 'active = ? AND area LIKE ?' , true , '%::Account' )
2017-10-01 12:25:52 +00:00
channels . each do | channel |
2017-10-02 13:10:07 +00:00
adapter = channel . options [ :adapter ]
next if adapter . blank?
2018-10-09 06:17:41 +00:00
2018-10-16 08:45:15 +00:00
driver_class = self . driver_class ( adapter )
2017-10-02 11:23:14 +00:00
next if ! driver_class . respond_to? ( :streamable? )
next if ! driver_class . streamable?
2018-10-09 06:17:41 +00:00
2017-06-01 06:23:53 +00:00
channel_id = channel . id . to_s
2017-10-02 11:23:14 +00:00
2017-06-01 06:23:53 +00:00
current_channels . push channel_id
2017-10-02 11:23:14 +00:00
# exit it channel has changed or connection is older then 180 minutes
2017-10-15 07:14:34 +00:00
if @@channel_stream [ channel_id ] . present?
if @@channel_stream [ channel_id ] [ :options ] != channel . options
logger . info " channel options ( #{ channel . id } ) has changed, stop stream thread "
2017-06-01 06:23:53 +00:00
@@channel_stream [ channel_id ] [ :thread ] . exit
@@channel_stream [ channel_id ] [ :thread ] . join
@@channel_stream [ channel_id ] [ :stream_instance ] . disconnect
2017-10-15 07:14:34 +00:00
@@channel_stream . delete ( channel_id )
2017-10-02 11:23:14 +00:00
@@channel_stream_started_till_at [ channel_id ] = Time . zone . now
2017-10-15 07:14:34 +00:00
next
2017-06-01 06:23:53 +00:00
elsif @@channel_stream [ channel_id ] [ :started_at ] && @@channel_stream [ channel_id ] [ :started_at ] < Time . zone . now - auto_reconnect_after . minutes
2017-10-15 07:14:34 +00:00
logger . info " channel ( #{ channel . id } ) reconnect - stream thread is older then #{ auto_reconnect_after } minutes, restart thread "
2017-06-01 06:23:53 +00:00
@@channel_stream [ channel_id ] [ :thread ] . exit
@@channel_stream [ channel_id ] [ :thread ] . join
@@channel_stream [ channel_id ] [ :stream_instance ] . disconnect
2017-10-15 07:14:34 +00:00
@@channel_stream . delete ( channel_id )
2017-10-02 11:23:14 +00:00
@@channel_stream_started_till_at [ channel_id ] = Time . zone . now
2017-10-15 07:14:34 +00:00
next
2017-06-01 06:23:53 +00:00
end
2016-01-09 12:23:11 +00:00
end
2017-10-15 07:14:34 +00:00
local_delay_before_reconnect = delay_before_reconnect
if channel . status_in == 'error'
local_delay_before_reconnect = local_delay_before_reconnect * 2
end
if @@channel_stream [ channel_id ] . blank? && @@channel_stream_started_till_at [ channel_id ] . present?
wait_in_seconds = @@channel_stream_started_till_at [ channel_id ] - ( Time . zone . now - local_delay_before_reconnect . seconds )
if wait_in_seconds . positive?
logger . info " skipp channel ( #{ channel_id } ) for streaming, already tried to connect or connection was active within the last #{ local_delay_before_reconnect } seconds - wait another #{ wait_in_seconds } seconds "
next
end
end
#logger.info "thread stream for channel (#{channel.id}) already running" if @@channel_stream[channel_id].present?
next if @@channel_stream [ channel_id ] . present?
2016-01-09 12:23:11 +00:00
2017-06-01 06:23:53 +00:00
@@channel_stream [ channel_id ] = {
2018-12-19 17:31:51 +00:00
options : channel . options ,
2017-06-01 06:23:53 +00:00
started_at : Time . zone . now ,
2016-01-09 12:23:11 +00:00
}
2016-11-03 23:09:05 +00:00
# start channels with delay
sleep @@channel_stream . count
2016-01-09 12:23:11 +00:00
# start threads for each channel
2017-10-01 12:25:52 +00:00
@@channel_stream [ channel_id ] [ :thread ] = Thread . new do
2019-06-27 18:26:28 +00:00
logger . info " Started stream channel for ' #{ channel . id } ' ( #{ channel . area } )... "
channel . status_in = 'ok'
channel . last_log_in = ''
channel . save!
@@channel_stream_started_till_at [ channel_id ] = Time . zone . now
@@channel_stream [ channel_id ] || = { }
@@channel_stream [ channel_id ] [ :stream_instance ] = channel . stream_instance
@@channel_stream [ channel_id ] [ :stream_instance ] . stream
@@channel_stream [ channel_id ] [ :stream_instance ] . disconnect
@@channel_stream . delete ( channel_id )
@@channel_stream_started_till_at [ channel_id ] = Time . zone . now
logger . info " ...stopped stream thread for ' #{ channel . id } ' "
rescue = > e
error = " Can't use stream for channel ( #{ channel . id } ): #{ e . inspect } "
logger . error error
logger . error e
channel . status_in = 'error'
channel . last_log_in = error
channel . save!
@@channel_stream . delete ( channel_id )
@@channel_stream_started_till_at [ channel_id ] = Time . zone . now
2017-10-01 12:25:52 +00:00
end
end
2016-01-09 12:23:11 +00:00
# cleanup deleted channels
2017-10-01 12:25:52 +00:00
last_channels . each do | channel_id |
2017-10-02 11:23:14 +00:00
next if @@channel_stream [ channel_id ] . blank?
2016-01-09 12:23:11 +00:00
next if current_channels . include? ( channel_id )
2018-10-09 06:17:41 +00:00
2017-10-02 11:23:14 +00:00
logger . info " channel ( #{ channel_id } ) not longer active, stop stream thread "
@@channel_stream [ channel_id ] [ :thread ] . exit
@@channel_stream [ channel_id ] [ :thread ] . join
@@channel_stream [ channel_id ] [ :stream_instance ] . disconnect
2017-10-15 07:14:34 +00:00
@@channel_stream . delete ( channel_id )
2017-10-02 11:23:14 +00:00
@@channel_stream_started_till_at [ channel_id ] = Time . zone . now
2017-10-01 12:25:52 +00:00
end
2017-10-02 11:23:14 +00:00
2016-01-09 12:23:11 +00:00
last_channels = current_channels
2017-06-01 06:23:53 +00:00
sleep 20
2016-01-09 12:23:11 +00:00
end
end
= begin
2015-08-28 00:53:14 +00:00
send via account
channel = Channel . where ( area : 'Email::Account' ) . first
2018-10-16 08:45:15 +00:00
channel . deliver ( params , notification )
2015-08-28 00:53:14 +00:00
= end
2018-10-16 08:45:15 +00:00
def deliver ( params , notification = false )
2015-08-28 00:53:14 +00:00
adapter = options [ :adapter ]
adapter_options = options
2015-08-30 11:58:05 +00:00
if options [ :outbound ] && options [ :outbound ] [ :adapter ]
2015-08-28 00:53:14 +00:00
adapter = options [ :outbound ] [ :adapter ]
adapter_options = options [ :outbound ] [ :options ]
end
2020-11-20 10:54:09 +00:00
refresh_xoauth2!
driver_class = self . class . driver_class ( adapter )
driver_instance = driver_class . new
result = driver_instance . send ( adapter_options , params , notification )
self . status_out = 'ok'
self . last_log_out = ''
save!
2015-08-28 01:08:55 +00:00
result
2020-11-20 10:54:09 +00:00
rescue = > e
error = " Can't use Channel::Driver:: #{ adapter . to_classname } : #{ e . inspect } "
logger . error error
logger . error e
self . status_out = 'error'
self . last_log_out = error
save!
raise error
2015-08-28 00:53:14 +00:00
end
2018-10-16 08:45:15 +00:00
= begin
process via account
channel = Channel . where ( area : 'Email::Account' ) . first
channel . process ( params )
= end
def process ( params )
adapter = options [ :adapter ]
adapter_options = options
if options [ :inbound ] && options [ :inbound ] [ :adapter ]
adapter = options [ :inbound ] [ :adapter ]
adapter_options = options [ :inbound ] [ :options ]
end
result = nil
begin
driver_class = self . class . driver_class ( adapter )
driver_instance = driver_class . new
result = driver_instance . process ( adapter_options , params , self )
self . status_in = 'ok'
self . last_log_in = ''
save!
rescue = > e
error = " Can't use Channel::Driver:: #{ adapter . to_classname } : #{ e . inspect } "
logger . error error
2018-11-27 09:56:23 +00:00
logger . error e
2018-10-16 08:45:15 +00:00
self . status_in = 'error'
self . last_log_in = error
save!
raise e , error
end
result
end
= begin
load channel driver and return class
klass = Channel . driver_class ( 'Imap' )
= end
def self . driver_class ( adapter )
# we need to require each channel backend individually otherwise we get a
# 'warning: toplevel constant Twitter referenced by Channel::Driver::Twitter' error e.g.
# so we have to convert the channel name to the filename via Rails String.underscore
# http://stem.ps/rails/2015/01/25/ruby-gotcha-toplevel-constant-referenced-by.html
2018-10-18 12:22:02 +00:00
require_dependency " channel/driver/ #{ adapter . to_filename } "
2018-10-16 08:45:15 +00:00
2019-01-06 18:41:29 +00:00
" ::Channel::Driver:: #{ adapter . to_classname } " . constantize
2018-10-16 08:45:15 +00:00
end
= begin
get instance of channel driver
channel . driver_instance
= end
def driver_instance
self . class . driver_class ( options [ :adapter ] )
end
2020-11-20 10:47:11 +00:00
def refresh_xoauth2!
2020-05-28 13:28:07 +00:00
return if options . dig ( :auth , :type ) != 'XOAUTH2'
2020-11-20 10:54:09 +00:00
return if ApplicationHandleInfo . current == 'application_server'
2020-05-28 13:28:07 +00:00
result = ExternalCredential . refresh_token ( options [ :auth ] [ :provider ] , options [ :auth ] )
options [ :auth ] = result
options [ :inbound ] [ :options ] [ :password ] = result [ :access_token ]
options [ :outbound ] [ :options ] [ :password ] = result [ :access_token ]
2020-08-28 09:41:38 +00:00
return if new_record?
2020-11-20 10:54:09 +00:00
save!
2020-06-22 09:04:57 +00:00
rescue = > e
2020-05-28 13:28:07 +00:00
logger . error e
2020-11-20 10:54:09 +00:00
raise " Failed to refresh XOAUTH2 access_token of provider ' #{ options [ :auth ] [ :provider ] } ': #{ e . message } "
2020-05-28 13:28:07 +00:00
end
2015-08-28 08:19:27 +00:00
private
def email_address_check
# reset non existing channel_ids
EmailAddress . channel_cleanup
end
2013-06-12 15:59:58 +00:00
end