trabajo-afectivo/lib/sessions.rb

638 lines
12 KiB
Ruby

# Copyright (C) 2012-2021 Zammad Foundation, http://zammad-foundation.org/
module Sessions
@store = case Rails.application.config.websocket_session_store
when :redis then Sessions::Store::Redis.new
else Sessions::Store::File.new
end
# create global vars for threads
@@client_threads = {} # rubocop:disable Style/ClassVars
=begin
start new session
Sessions.create(client_id, session_data, { type: 'websocket' })
returns
true|false
=end
def self.create(client_id, session, meta)
# collect session data
meta[:last_ping] = Time.now.utc.to_i
data = {
user: session,
meta: meta,
}
content = data.to_json
@store.create(client_id, content)
# send update to browser
return if !session || session['id'].blank?
send(
client_id,
{
event: 'ws:login',
data: { success: true },
}
)
end
=begin
list of all session
client_ids = Sessions.sessions
returns
['4711', '4712']
=end
def self.sessions
@store.sessions
end
=begin
list of all session
Sessions.session_exists?(client_id)
returns
true|false
=end
def self.session_exists?(client_id)
@store.session_exists?(client_id)
end
=begin
list of all session with data
client_ids_with_data = Sessions.list
returns
{
'4711' => {
user: {
'id' => 123,
},
meta: {
type: 'websocket',
last_ping: time_of_last_ping,
}
},
'4712' => {
user: {
'id' => 124,
},
meta: {
type: 'ajax',
last_ping: time_of_last_ping,
}
},
}
=end
def self.list
client_ids = sessions
session_list = {}
client_ids.each do |client_id|
data = get(client_id)
next if !data
session_list[client_id] = data
end
session_list
end
=begin
destroy session
Sessions.destroy(client_id)
returns
true|false
=end
def self.destroy(client_id)
@store.destroy(client_id)
end
=begin
destroy idle session
list_of_client_ids = Sessions.destroy_idle_sessions
returns
['4711', '4712']
=end
def self.destroy_idle_sessions(idle_time_in_sec = 240)
list_of_closed_sessions = []
clients = Sessions.list
clients.each do |client_id, client|
if !client[:meta] || !client[:meta][:last_ping] || (client[:meta][:last_ping].to_i + idle_time_in_sec) < Time.now.utc.to_i
list_of_closed_sessions.push client_id
Sessions.destroy(client_id)
end
end
list_of_closed_sessions
end
=begin
touch session
Sessions.touch(client_id)
returns
true|false
=end
def self.touch(client_id)
data = get(client_id)
return false if !data
data[:meta][:last_ping] = Time.now.utc.to_i
@store.set(client_id, data)
true
end
=begin
get session data
data = Sessions.get(client_id)
returns
{
user: {
'id' => 123,
},
meta: {
type: 'websocket',
last_ping: time_of_last_ping,
}
}
=end
def self.get(client_id)
@store.get client_id
end
=begin
send message to client
Sessions.send(client_id_of_recipient, data)
returns
true|false
=end
def self.send(client_id, data)
@store.send_data(client_id, data)
end
=begin
send message to recipient client
Sessions.send_to(user_id, data)
e. g.
Sessions.send_to(user_id, {
event: 'session:takeover',
data: {
taskbar_id: 12312
},
})
returns
true|false
=end
def self.send_to(user_id, data)
# list all current clients
client_list = sessions
client_list.each do |client_id|
session = Sessions.get(client_id)
next if !session
next if !session[:user]
next if !session[:user]['id']
next if session[:user]['id'].to_i != user_id.to_i
Sessions.send(client_id, data)
end
true
end
=begin
send message to all authenticated client
Sessions.broadcast(data)
returns
[array_with_client_ids_of_recipients]
broadcase also to not authenticated client
Sessions.broadcast(data, 'public') # public|authenticated
broadcase also not to sender
Sessions.broadcast(data, 'public', sender_user_id)
=end
def self.broadcast(data, recipient = 'authenticated', sender_user_id = nil)
# list all current clients
recipients = []
client_list = sessions
client_list.each do |client_id|
session = Sessions.get(client_id)
next if !session
if recipient != 'public'
next if session[:user].blank?
next if session[:user]['id'].blank?
end
next if sender_user_id && session[:user] && session[:user]['id'] && session[:user]['id'].to_i == sender_user_id.to_i
Sessions.send(client_id, data)
recipients.push client_id
end
recipients
end
=begin
get messages for client
messages = Sessions.queue(client_id_of_recipient)
returns
[
{
key1 => 'some data of message 1',
key2 => 'some data of message 1',
},
{
key1 => 'some data of message 2',
key2 => 'some data of message 2',
},
]
=end
def self.queue(client_id)
@store.queue(client_id)
end
=begin
remove all session and spool messages
Sessions.cleanup
=end
def self.cleanup
@store.cleanup
end
=begin
create spool messages
Sessions.spool_create(some: 'data')
=end
def self.spool_create(data)
msg = JSON.generate(data)
data = {
msg: msg,
timestamp: Time.now.utc.to_i,
}
@store.add_to_spool(data)
end
=begin
get spool messages
Sessions.spool_list(junger_then, for_user_id)
=end
def self.spool_list(timestamp, current_user_id)
data = []
to_delete = []
@store.each_spool do |message, entry|
message_parsed = {}
begin
spool = JSON.parse(message)
message_parsed = JSON.parse(spool['msg'])
rescue => e
log('error', "can't parse spool message: #{message}, #{e.inspect}")
to_delete.push [message, entry]
next
end
# ignore message older then 48h
if spool['timestamp'] + (2 * 86_400) < Time.now.utc.to_i
to_delete.push [message, entry]
next
end
# add spool attribute to push spool info to clients
message_parsed['spool'] = true
# only send not already older messages
if !timestamp || timestamp < spool['timestamp']
# spool to recipient list
if message_parsed['recipient'] && message_parsed['recipient']['user_id']
message_parsed['recipient']['user_id'].each do |user_id|
next if current_user_id != user_id
message = message_parsed
if message_parsed['event'] == 'broadcast'
message = message_parsed['data']
end
item = {
type: 'direct',
message: message,
}
data.push item
end
# spool to every client
else
message = message_parsed
if message_parsed['event'] == 'broadcast'
message = message_parsed['data']
end
item = {
type: 'broadcast',
message: message,
}
data.push item
end
end
end
to_delete.each do |item|
@store.remove_from_spool(*item)
end
data
end
=begin
delete spool messages
Sessions.spool_delete
=end
def self.spool_delete
@store.clear_spool
end
def self.jobs(node_id = nil)
# dispatch sessions
if node_id.blank? && ENV['ZAMMAD_SESSION_JOBS_CONCURRENT'].to_i.positive?
previous_nodes_sessions = Sessions::Node.stats
if previous_nodes_sessions.present?
log('info', "Cleaning up previous Sessions::Node sessions: #{previous_nodes_sessions}")
Sessions::Node.cleanup
end
dispatcher_pid = Process.pid
node_count = ENV['ZAMMAD_SESSION_JOBS_CONCURRENT'].to_i
node_pids = []
(1..node_count).each do |worker_node_id|
node_pids << fork do
title = "Zammad Session Jobs Node ##{worker_node_id}: dispatch_pid:#{dispatcher_pid} -> worker_pid:#{Process.pid}"
$PROGRAM_NAME = title
log('info', "#{title} started.")
::Sessions.jobs(worker_node_id)
sleep node_count
rescue Interrupt
nil
end
end
Signal.trap 'SIGTERM' do
node_pids.each do |node_pid|
Process.kill 'TERM', node_pid
end
Process.waitall
raise SignalException, 'SIGTERM'
end
# dispatch client_ids to nodes
loop do
# nodes
nodes_stats = Sessions::Node.stats
client_ids = sessions
client_ids.each do |client_id|
# ask nodes for nodes
next if nodes_stats[client_id]
# assign to node
Sessions::Node.session_assigne(client_id)
sleep 1
end
sleep 1
end
end
Thread.abort_on_exception = true
loop do
if node_id
# register node
Sessions::Node.register(node_id)
# watch for assigned sessions
client_ids = Sessions::Node.sessions_by(node_id)
else
client_ids = sessions
end
client_ids.each do |client_id|
# connection already open, ignore
next if @@client_threads[client_id]
# get current user
session_data = Sessions.get(client_id)
next if session_data.blank?
next if session_data[:user].blank?
next if session_data[:user]['id'].blank?
user = User.lookup(id: session_data[:user]['id'])
next if user.blank?
# start client thread
next if @@client_threads[client_id].present?
@@client_threads[client_id] = true
@@client_threads[client_id] = Thread.new do
thread_client(client_id, 0, Time.now.utc, node_id)
@@client_threads[client_id] = nil
log('debug', "close client (#{client_id}) thread")
if ActiveRecord::Base.connection.owner == Thread.current
ActiveRecord::Base.connection.close
end
end
sleep 1
end
sleep 1
end
end
=begin
check if thread for client_id is running
Sessions.thread_client_exists?(client_id)
returns
thread
=end
def self.thread_client_exists?(client_id)
@@client_threads[client_id]
end
=begin
start client for browser
Sessions.thread_client(client_id)
returns
thread
=end
def self.thread_client(client_id, try_count = 0, try_run_time = Time.now.utc, node_id)
log('debug', "LOOP #{node_id}.#{client_id} - #{try_count}")
begin
Sessions::Client.new(client_id, node_id)
rescue => e
log('error', "thread_client #{client_id} exited with error #{e.inspect}")
log('error', e.backtrace.join("\n "))
sleep 10
begin
ActiveRecord::Base.connection_pool.release_connection
rescue => e
log('error', "Can't reconnect to database #{e.inspect}")
end
try_run_max = 10
try_count += 1
# reset error counter if to old
if try_run_time + (60 * 5) < Time.now.utc
try_count = 0
end
try_run_time = Time.now.utc
# restart job again
if try_run_max > try_count
thread_client(client_id, try_count, try_run_time, node_id)
end
raise "STOP thread_client for client #{node_id}.#{client_id} after #{try_run_max} tries"
end
log('debug', "/LOOP #{node_id}.#{client_id} - #{try_count}")
end
def self.symbolize_keys(hash)
hash.each_with_object({}) do |(key, value), result|
new_key = case key
when String then key.to_sym
else key
end
new_value = case value
when Hash then symbolize_keys(value)
else value
end
result[new_key] = new_value
end
end
# we use it in rails and non rails context
def self.log(level, message)
if defined?(Rails)
case level
when 'debug'
Rails.logger.debug { message }
when 'info'
Rails.logger.info message
else
Rails.logger.error message
end
return
end
puts "#{Time.now.utc.iso8601}:#{level} #{message}" # rubocop:disable Rails/Output
end
end