trabajo-afectivo/lib/sessions.rb

641 lines
13 KiB
Ruby
Raw Normal View History

2012-07-23 22:22:23 +00:00
require 'json'
2013-05-05 22:54:06 +00:00
require 'session_helper'
2012-07-23 22:22:23 +00:00
module Sessions
2013-01-24 01:01:47 +00:00
# get application root directory
@root = Dir.pwd.to_s
2013-02-01 00:01:20 +00:00
if !@root || @root.empty? || @root == '/'
2013-01-24 01:01:47 +00:00
@root = Rails.root
end
# get working directories
@path = "#{@root}/tmp/websocket"
2013-01-24 01:01:47 +00:00
# create global vars for threads
@@client_threads = {} # rubocop:disable Style/ClassVars
2012-07-23 22:22:23 +00:00
2014-06-27 06:43:37 +00:00
=begin
start new session
Sessions.create( client_id, session_data, { type: 'websocket' } )
2014-06-27 06:43:37 +00:00
returns
true|false
=end
def self.create(client_id, session, meta)
path = "#{@path}/#{client_id}"
path_tmp = "#{@path}/tmp/#{client_id}"
session_file = "#{path_tmp}/session"
# collect session data
2015-05-10 20:53:15 +00:00
meta[:last_ping] = Time.now.utc.to_i
data = {
user: session,
meta: meta,
}
content = data.to_json
# store session data in session file
FileUtils.mkpath path_tmp
File.open(session_file, 'wb') { |file|
file.write content
2012-07-23 22:22:23 +00:00
}
2012-11-02 16:10:22 +00:00
2015-05-03 12:11:47 +00:00
# destory old session if needed
if File.exist?(path)
2015-05-03 12:11:47 +00:00
Sessions.destory(client_id)
end
# move to destination directory
FileUtils.mv(path_tmp, path)
2012-11-02 16:10:22 +00:00
# send update to browser
2014-09-21 12:50:39 +00:00
if session && session['id']
send(
2015-04-27 15:21:17 +00:00
client_id,
{
event: 'ws:login',
data: { success: true },
}
)
2012-11-02 16:10:22 +00:00
end
2012-07-23 22:22:23 +00:00
end
2014-06-27 06:43:37 +00:00
=begin
list of all session
client_ids = Sessions.sessions
returns
['4711', '4712']
=end
def self.sessions
path = "#{@path}/"
2014-06-27 06:43:37 +00:00
# just make sure that spool path exists
if !File.exist?(path)
2014-06-27 06:43:37 +00:00
FileUtils.mkpath path
end
data = []
Dir.foreach(path) do |entry|
next if entry == '.'
next if entry == '..'
next if entry == 'tmp'
next if entry == 'spool'
2014-06-27 06:43:37 +00:00
data.push entry.to_s
end
data
end
=begin
list of all session
Sessions.session_exists?(client_id)
returns
true|false
=end
def self.session_exists?(client_id)
client_ids = sessions
2014-06-27 06:43:37 +00:00
client_ids.include? client_id.to_s
end
=begin
list of all session with data
client_ids_with_data = Sessions.list
returns
{
'4711' => {
user: {
'id' => 123,
2014-06-27 06:43:37 +00:00
},
meta: {
type: 'websocket',
last_ping: time_of_last_ping,
2014-06-27 06:43:37 +00:00
}
},
'4712' => {
user: {
'id' => 124,
2014-06-27 06:43:37 +00:00
},
meta: {
type: 'ajax',
last_ping: time_of_last_ping,
2014-06-27 06:43:37 +00:00
}
},
}
=end
def self.list
client_ids = sessions
2014-06-27 06:43:37 +00:00
session_list = {}
client_ids.each { |client_id|
data = get(client_id)
2014-06-27 06:43:37 +00:00
next if !data
session_list[client_id] = data
}
session_list
end
=begin
destroy session
Sessions.destory(client_id)
2014-06-27 06:43:37 +00:00
returns
true|false
=end
def self.destory(client_id)
path = "#{@path}/#{client_id}"
2014-06-27 06:43:37 +00:00
FileUtils.rm_rf path
end
=begin
destroy idle session
list_of_client_ids = Sessions.destory_idle_sessions
returns
['4711', '4712']
=end
def self.destory_idle_sessions(idle_time_in_sec = 240)
2014-06-27 06:43:37 +00:00
list_of_closed_sessions = []
clients = Sessions.list
2014-06-27 06:43:37 +00:00
clients.each { |client_id, client|
2015-05-10 19:47:17 +00:00
if !client[:meta] || !client[:meta][:last_ping] || ( client[:meta][:last_ping].to_i + idle_time_in_sec ) < Time.now.utc.to_i
2014-06-27 06:43:37 +00:00
list_of_closed_sessions.push client_id
Sessions.destory(client_id)
2014-06-27 06:43:37 +00:00
end
}
list_of_closed_sessions
end
=begin
touch session
Sessions.touch(client_id)
returns
true|false
=end
def self.touch(client_id)
data = get(client_id)
2014-06-27 06:43:37 +00:00
return false if !data
path = "#{@path}/#{client_id}"
2015-05-10 20:53:15 +00:00
data[:meta][:last_ping] = Time.now.utc.to_i
content = data.to_json
2014-06-27 06:43:37 +00:00
File.open( path + '/session', 'wb' ) { |file|
file.write content
2014-06-27 06:43:37 +00:00
}
true
end
=begin
get session data
data = Sessions.get(client_id)
returns
{
user: {
'id' => 123,
2014-06-27 06:43:37 +00:00
},
meta: {
type: 'websocket',
last_ping: time_of_last_ping,
2014-06-27 06:43:37 +00:00
}
}
=end
def self.get(client_id)
session_dir = "#{@path}/#{client_id}"
session_file = "#{session_dir}/session"
data = nil
2015-05-12 08:46:07 +00:00
# if no session dir exists, session got destoried
2015-05-03 12:11:47 +00:00
if !File.exist? session_dir
destory(client_id)
log('debug', "missing session directory for '#{client_id}', remove session.")
2015-05-03 12:11:47 +00:00
return
end
2015-05-12 08:46:07 +00:00
# if only session file is missing, then it's an error behavior
2014-10-23 18:08:00 +00:00
if !File.exist? session_file
destory(client_id)
log('error', "missing session file for '#{client_id}', remove session.")
2014-10-23 18:08:00 +00:00
return
end
2014-06-27 06:43:37 +00:00
begin
File.open(session_file, 'rb') { |file|
file.flock(File::LOCK_EX)
2014-06-27 06:43:37 +00:00
all = file.read
file.flock(File::LOCK_UN)
data_json = JSON.parse(all)
if data_json
2015-05-10 20:53:15 +00:00
data = symbolize_keys(data_json)
data[:user] = data_json['user'] # for compat. reasons
end
2014-06-27 06:43:37 +00:00
}
rescue => e
log('error', e.inspect)
destory(client_id)
log('error', "error in reading/parsing session file '#{session_file}', remove session.")
2014-06-27 06:43:37 +00:00
return
end
data
end
=begin
send message to client
Sessions.send(client_id_of_recipient, data)
returns
true|false
=end
def self.send(client_id, data)
path = "#{@path}/#{client_id}/"
filename = "send-#{Time.now.utc.to_f}"
check = true
count = 0
2014-06-27 06:43:37 +00:00
while check
if File.exist?(path + filename)
2014-06-27 06:43:37 +00:00
count += 1
filename = "#{filename}-#{count}"
2014-06-27 06:43:37 +00:00
else
check = false
end
end
return false if !File.directory? path
File.open(path + 'a-' + filename, 'wb') { |file|
file.flock(File::LOCK_EX)
2014-06-27 06:43:37 +00:00
file.write data.to_json
file.flock(File::LOCK_UN)
2014-06-27 06:43:37 +00:00
file.close
}
return false if !File.exist?(path + 'a-' + filename)
FileUtils.mv(path + 'a-' + filename, path + filename)
2014-06-27 06:43:37 +00:00
true
end
=begin
send message to recipient client
Sessions.send_to(user_id, data)
returns
true|false
=end
def self.send_to(user_id, data)
# list all current clients
client_list = sessions
client_list.each {|client_id|
session = Sessions.get(client_id)
next if !session
next if !session[:user]
next if !session[:user]['id']
next if session[:user]['id'].to_i != user_id.to_i
Sessions.send(client_id, data)
}
true
end
=begin
send message to all authenticated client
2014-06-27 06:43:37 +00:00
Sessions.broadcast(data)
returns
true|false
=end
def self.broadcast(data)
2014-06-27 06:43:37 +00:00
# list all current clients
client_list = sessions
2014-06-27 06:43:37 +00:00
client_list.each {|client_id|
session = Sessions.get(client_id)
next if !session
next if !session[:user]
next if !session[:user]['id']
Sessions.send(client_id, data)
2014-06-27 06:43:37 +00:00
}
true
end
=begin
get messages for client
messages = Sessions.queue(client_id_of_recipient)
returns
[
{
key1 => 'some data of message 1',
key2 => 'some data of message 1',
},
{
key1 => 'some data of message 2',
key2 => 'some data of message 2',
},
]
=end
def self.queue(client_id)
path = "#{@path}/#{client_id}/"
data = []
2014-06-27 06:43:37 +00:00
files = []
Dir.foreach( path ) {|entry|
next if entry == '.'
next if entry == '..'
2014-06-27 06:43:37 +00:00
files.push entry
}
files.sort.each {|entry|
filename = "#{path}/#{entry}"
if /^send/.match(entry)
data.push Sessions.queue_file_read(path, entry)
2014-06-27 06:43:37 +00:00
end
}
data
end
def self.queue_file_read(path, filename)
file_old = "#{path}#{filename}"
file_new = "#{path}a-#{filename}"
FileUtils.mv(file_old, file_new)
2014-06-27 06:43:37 +00:00
all = ''
File.open(file_new, 'rb') { |file|
2014-06-27 06:43:37 +00:00
all = file.read
}
File.delete(file_new)
JSON.parse(all)
2014-06-27 06:43:37 +00:00
end
def self.cleanup
path = "#{@path}/spool/"
FileUtils.rm_rf path
path = "#{@path}/tmp/"
2014-06-29 19:30:55 +00:00
FileUtils.rm_rf path
end
def self.spool_create(msg)
path = "#{@path}/spool/"
FileUtils.mkpath path
2015-05-10 19:47:17 +00:00
file_path = path + "/#{Time.now.utc.to_f}-#{rand(99_999)}"
File.open( file_path, 'wb' ) { |file|
data = {
msg: msg,
2015-05-10 19:47:17 +00:00
timestamp: Time.now.utc.to_i,
}
file.write data.to_json
}
end
def self.spool_list(timestamp, current_user_id)
path = "#{@path}/spool/"
FileUtils.mkpath path
data = []
to_delete = []
files = []
Dir.foreach(path) {|entry|
next if entry == '.'
next if entry == '..'
files.push entry
}
files.sort.each {|entry|
filename = "#{path}/#{entry}"
next if !File.exist?(filename)
File.open(filename, 'rb') { |file|
all = file.read
spool = JSON.parse(all)
begin
message_parsed = JSON.parse(spool['msg'])
rescue => e
log('error', "can't parse spool message: #{message}, #{e.inspect}")
next
end
# ignore message older then 48h
2015-05-10 19:47:17 +00:00
if spool['timestamp'] + (2 * 86_400) < Time.now.utc.to_i
to_delete.push "#{path}/#{entry}"
next
end
# add spool attribute to push spool info to clients
2013-06-28 22:26:04 +00:00
message_parsed['spool'] = true
# only send not already now messages
if !timestamp || timestamp < spool['timestamp']
# spool to recipient list
2013-06-28 22:26:04 +00:00
if message_parsed['recipient'] && message_parsed['recipient']['user_id']
2013-06-28 22:26:04 +00:00
message_parsed['recipient']['user_id'].each { |user_id|
next if current_user_id != user_id
item = {
type: 'direct',
message: message_parsed,
}
data.push item
}
# spool to every client
else
item = {
type: 'broadcast',
message: message_parsed,
}
data.push item
end
end
}
}
to_delete.each {|file|
File.delete(file)
}
data
end
2012-07-23 22:22:23 +00:00
def self.jobs
# just make sure that spool path exists
if !File.exist?(@path)
2013-01-15 23:10:27 +00:00
FileUtils.mkpath @path
end
2012-08-03 22:46:05 +00:00
Thread.abort_on_exception = true
loop do
client_ids = sessions
2012-07-23 22:22:23 +00:00
client_ids.each { |client_id|
2014-06-29 19:30:55 +00:00
# connection already open, ignore
2012-08-07 21:53:43 +00:00
next if @@client_threads[client_id]
# get current user
session_data = Sessions.get(client_id)
2012-11-26 05:04:44 +00:00
next if !session_data
next if !session_data[:user]
next if !session_data[:user]['id']
user = User.lookup( id: session_data[:user]['id'] )
2012-08-03 22:46:05 +00:00
next if !user
# start client thread
next if @@client_threads[client_id]
@@client_threads[client_id] = true
@@client_threads[client_id] = Thread.new {
thread_client(client_id)
@@client_threads[client_id] = nil
log('debug', "close client (#{client_id}) thread")
ActiveRecord::Base.connection.close
}
sleep 0.5
2012-07-23 22:22:23 +00:00
}
2012-08-07 06:43:15 +00:00
# system settings
2012-11-26 23:22:52 +00:00
sleep 0.5
2012-07-23 22:22:23 +00:00
end
end
2014-06-29 19:30:55 +00:00
=begin
check if thread for client_id is running
Sessions.thread_client_exists?(client_id)
returns
thread
=end
def self.thread_client_exists?(client_id)
@@client_threads[client_id]
2013-12-01 11:55:21 +00:00
end
2014-06-29 19:30:55 +00:00
=begin
start client for browser
Sessions.thread_client(client_id)
returns
thread
=end
2015-05-10 19:47:17 +00:00
def self.thread_client(client_id, try_count = 0, try_run_time = Time.now.utc)
log('debug', "LOOP #{client_id} - #{try_count}")
2014-06-29 19:30:55 +00:00
begin
Sessions::Client.new(client_id)
rescue => e
log('error', "thread_client #{client_id} exited with error #{e.inspect}")
log('error', e.backtrace.join("\n ") )
2014-06-29 19:30:55 +00:00
sleep 10
2013-12-01 11:55:21 +00:00
begin
2014-06-29 19:30:55 +00:00
ActiveRecord::Base.connection_pool.release_connection
2013-12-01 11:55:21 +00:00
rescue => e
log('error', "Can't reconnect to database #{e.inspect}")
2014-06-29 19:30:55 +00:00
end
2014-04-24 05:49:30 +00:00
2014-06-29 19:30:55 +00:00
try_run_max = 10
try_count += 1
2014-04-24 05:49:30 +00:00
2014-06-29 19:30:55 +00:00
# reset error counter if to old
2015-05-10 19:47:17 +00:00
if try_run_time + ( 60 * 5 ) < Time.now.utc
2014-06-29 19:30:55 +00:00
try_count = 0
end
2015-05-10 19:47:17 +00:00
try_run_time = Time.now.utc
2014-04-24 05:49:30 +00:00
2014-06-29 19:30:55 +00:00
# restart job again
if try_run_max > try_count
thread_client(client_id, try_count, try_run_time)
else
raise "STOP thread_client for client #{client_id} after #{try_run_max} tries"
2013-12-01 11:55:21 +00:00
end
2014-06-29 19:30:55 +00:00
end
log('debug', "/LOOP #{client_id} - #{try_count}")
2013-12-01 11:55:21 +00:00
end
def self.symbolize_keys(hash)
hash.each_with_object({}) {|(key, value), result|
new_key = case key
when String then key.to_sym
else key
end
new_value = case value
when Hash then symbolize_keys(value)
else value
end
result[new_key] = new_value
}
end
# we use it in rails and non rails context
def self.log(level, message)
if defined?(Rails)
if level == 'debug'
Rails.logger.debug message
elsif level == 'notice'
Rails.logger.notice message
else
Rails.logger.error message
end
return
end
puts "#{Time.now.utc.iso8601}:#{level} #{message}" # rubocop:disable Rails/Output
end
end