trabajo-afectivo/lib/sessions.rb

354 lines
8.9 KiB
Ruby
Raw Normal View History

2012-07-23 22:22:23 +00:00
require 'json'
2013-04-18 12:51:07 +00:00
require 'rss'
2013-05-05 22:54:06 +00:00
require 'session_helper'
2012-07-23 22:22:23 +00:00
module Sessions
2013-01-24 01:01:47 +00:00
# get application root directory
@root = Dir.pwd.to_s
2013-02-01 00:01:20 +00:00
if !@root || @root.empty? || @root == '/'
2013-01-24 01:01:47 +00:00
@root = Rails.root
end
# get working directories
@path = @root + '/tmp/websocket'
@pid = @root + '/tmp/pids/sessionworker.pid'
# create global vars for threads
2012-08-03 22:46:05 +00:00
@@user_threads = {}
@@client_threads = {}
2012-07-23 22:22:23 +00:00
2012-11-26 05:04:44 +00:00
def self.create( client_id, session, meta )
2012-07-23 22:22:23 +00:00
path = @path + '/' + client_id.to_s
FileUtils.mkpath path
2012-11-26 05:04:44 +00:00
meta[:last_ping] = Time.new.to_i.to_s
2013-04-08 18:56:59 +00:00
File.open( path + '/session', 'wb' ) { |file|
2012-11-26 05:04:44 +00:00
data = {
:user => {
:id => session['id'],
},
:meta => meta,
}
# puts 'CREATE' + Marshal.dump(data)
2013-04-08 18:56:59 +00:00
file.write Marshal.dump(data)
2012-07-23 22:22:23 +00:00
}
2012-11-02 16:10:22 +00:00
# send update to browser
if session['id']
2012-11-26 05:04:44 +00:00
self.send( client_id, {
2012-11-02 16:10:22 +00:00
:event => 'ws:login',
:data => { :success => true },
})
end
2012-07-23 22:22:23 +00:00
end
def self.spool_create( msg )
path = @path + '/spool/'
FileUtils.mkpath path
file = Time.new.to_f.to_s + '-' + rand(99999).to_s
File.open( path + '/' + file , 'wb' ) { |file|
data = {
:msg => msg,
:timestamp => Time.now.to_i,
}
# puts 'CREATE' + Marshal.dump(data)
file.write data.to_json
}
end
def self.spool_list( timestamp, current_user_id )
path = @path + '/spool/'
FileUtils.mkpath path
data = []
to_delete = []
files = []
Dir.foreach( path ) {|entry|
next if entry == '.' || entry == '..'
files.push entry
}
files.sort.each {|entry|
filename = path + '/' + entry
next if !File::exists?( filename )
File.open( filename, 'rb' ) { |file|
all = file.read
spool = JSON.parse( all )
begin
message_parsed = JSON.parse( spool['msg'] )
rescue => e
log 'error', "can't parse spool message: #{ message }, #{ e.inspect }"
next
end
# ignore message older then 48h
if spool['timestamp'] + (2 * 86400) < Time.now.to_i
to_delete.push path + '/' + entry
next
end
# add spool attribute to push spool info to clients
2013-06-28 22:26:04 +00:00
message_parsed['spool'] = true
# only send not already now messages
if !timestamp || timestamp < spool['timestamp']
# spool to recipient list
2013-06-28 22:26:04 +00:00
if message_parsed['recipient'] && message_parsed['recipient']['user_id']
message_parsed['recipient']['user_id'].each { |user_id|
if current_user_id == user_id
item = {
:type => 'direct',
:message => message_parsed,
}
data.push item
end
}
# spool to every client
else
item = {
:type => 'broadcast',
:message => message_parsed,
}
data.push item
end
end
}
}
to_delete.each {|file|
File.delete(file)
}
return data
end
2012-11-26 05:04:44 +00:00
def self.list
client_ids = self.sessions
session_list = {}
client_ids.each { |client_id|
data = self.get(client_id)
next if !data
session_list[client_id] = data
}
return session_list
end
2012-11-26 23:22:52 +00:00
def self.touch( client_id )
data = self.get(client_id)
2013-07-26 22:43:07 +00:00
return if !data
2012-11-26 23:22:52 +00:00
path = @path + '/' + client_id.to_s
data[:meta][:last_ping] = Time.new.to_i.to_s
2013-04-08 18:56:59 +00:00
File.open( path + '/session', 'wb' ) { |file|
file.write Marshal.dump(data)
2012-11-26 23:22:52 +00:00
}
return true
end
2012-07-23 22:22:23 +00:00
def self.get( client_id )
session_file = @path + '/' + client_id.to_s + '/session'
data = nil
return if !File.exist? session_file
2013-04-17 23:45:23 +00:00
begin
File.open( session_file, 'rb' ) { |file|
file.flock( File::LOCK_EX )
all = file.read
file.flock( File::LOCK_UN )
2012-07-23 22:22:23 +00:00
data = Marshal.load( all )
2013-04-17 23:45:23 +00:00
}
rescue Exception => e
2013-04-18 12:51:07 +00:00
File.delete(session_file)
2013-04-17 23:45:23 +00:00
puts "Error reading '#{session_file}':"
puts e.inspect
return
end
2012-07-23 22:22:23 +00:00
return data
end
2012-11-26 05:04:44 +00:00
def self.send( client_id, data )
2012-08-03 23:35:21 +00:00
path = @path + '/' + client_id.to_s + '/'
filename = 'send-' + Time.new().to_f.to_s# + '-' + rand(99999999).to_s
2013-02-19 19:04:35 +00:00
check = true
count = 0
2013-02-19 19:04:35 +00:00
while check
2012-08-03 23:35:21 +00:00
if File::exists?( path + filename )
count += 1
filename = filename + '-' + count
# filename = filename + '-' + rand(99999).to_s
# filename = filename + '-' + rand(99999).to_s
2013-02-19 19:04:35 +00:00
else
check = false
end
2012-07-23 22:22:23 +00:00
end
2012-08-03 23:35:21 +00:00
return false if !File.directory? path
2013-04-08 18:56:59 +00:00
File.open( path + 'a-' + filename, 'wb' ) { |file|
2013-01-23 07:22:27 +00:00
file.flock( File::LOCK_EX )
2013-04-08 18:56:59 +00:00
file.write data.to_json
2013-01-23 07:22:27 +00:00
file.flock( File::LOCK_UN )
file.close
2012-07-23 22:22:23 +00:00
}
2013-02-28 21:38:03 +00:00
return false if !File.exists?( path + 'a-' + filename )
2013-02-19 19:04:35 +00:00
FileUtils.mv( path + 'a-' + filename, path + filename )
2012-07-23 22:22:23 +00:00
return true
end
def self.jobs
# just make sure that spool path exists
2013-01-15 23:10:27 +00:00
if !File::exists?( @path )
FileUtils.mkpath @path
end
2012-08-03 22:46:05 +00:00
Thread.abort_on_exception = true
2012-07-23 22:22:23 +00:00
while true
client_ids = self.sessions
client_ids.each { |client_id|
2012-08-07 21:53:43 +00:00
# connection already open
next if @@client_threads[client_id]
# get current user
session_data = Sessions.get( client_id )
2012-11-26 05:04:44 +00:00
next if !session_data
next if !session_data[:user]
next if !session_data[:user][:id]
user = User.find( session_data[:user][:id] )
2012-08-03 22:46:05 +00:00
next if !user
# start user thread
start_user_thread = false
2012-08-03 22:46:05 +00:00
if !@@user_threads[user.id]
2013-12-01 11:55:21 +00:00
@@user_threads[user.id] = true
2012-08-03 22:46:05 +00:00
@@user_threads[user.id] = Thread.new {
2013-12-01 11:55:21 +00:00
thread_worker(user.id, 0)
2012-08-03 22:46:05 +00:00
@@user_threads[user.id] = nil
2013-12-01 11:55:21 +00:00
puts "close user (#{user.id}) thread"
2012-08-03 22:46:05 +00:00
}
2013-12-01 11:55:21 +00:00
start_user_thread = true
2012-07-23 22:22:23 +00:00
end
# wait with client thread unil user thread has done some little work
if start_user_thread
2012-08-07 21:53:43 +00:00
sleep 0.5
end
2012-08-03 22:46:05 +00:00
# start client thread
if !@@client_threads[client_id]
2013-12-01 11:55:21 +00:00
@@client_threads[client_id] = true
2012-08-03 22:46:05 +00:00
@@client_threads[client_id] = Thread.new {
2013-12-01 11:55:21 +00:00
thread_client(client_id, 0)
2012-08-03 22:46:05 +00:00
@@client_threads[client_id] = nil
2013-12-01 11:55:21 +00:00
puts "close client (#{client_id}) thread"
2012-08-03 22:46:05 +00:00
}
end
2012-07-23 22:22:23 +00:00
}
2012-08-07 06:43:15 +00:00
# system settings
2012-11-26 23:22:52 +00:00
sleep 0.5
2012-07-23 22:22:23 +00:00
end
end
2013-12-01 11:55:21 +00:00
def self.thread_worker(user_id, count)
puts "LOOP WORKER #{user_id} - #{count}"
begin
Sessions::Worker.new(user_id)
rescue => e
puts "thread_client exited with error #{ e.inspect }"
2014-03-27 13:07:57 +00:00
sleep 10
2013-12-01 11:55:21 +00:00
begin
ActiveRecord::Base.connection.reconnect!
rescue => e
puts "Can't reconnect to database #{ e.inspect }"
end
ct = count++1
if ct < 10
thread_worker(user_id, ct)
else
raise "STOP thread_worker for user #{user_id} after 10 tries"
end
end
puts "/LOOP WORKER #{user_id} - #{count}"
end
def self.thread_client(client_id, count)
puts "LOOP #{client_id} - #{count}"
begin
Sessions::Client.new(client_id)
rescue => e
puts "thread_client exited with error #{ e.inspect }"
2014-03-27 13:07:57 +00:00
sleep 10
2013-12-01 11:55:21 +00:00
begin
ActiveRecord::Base.connection.reconnect!
rescue => e
puts "Can't reconnect to database #{ e.inspect }"
end
ct = count++1
if ct < 10
thread_client(client_id, ct)
else
raise "STOP thread_client for client #{client_id} after 10 tries"
end
end
puts "/LOOP #{client_id} - #{count}"
end
2012-08-03 22:46:05 +00:00
def self.sessions
path = @path + '/'
2013-01-15 23:10:27 +00:00
# just make sure that spool path exists
if !File::exists?( path )
FileUtils.mkpath path
end
2012-08-03 22:46:05 +00:00
data = []
Dir.foreach( path ) do |entry|
next if entry == '.' || entry == '..' || entry == 'spool'
data.push entry.to_s
2012-07-30 14:50:54 +00:00
end
2012-08-03 22:46:05 +00:00
return data
2012-07-30 14:50:54 +00:00
end
2012-08-03 22:46:05 +00:00
def self.queue( client_id )
path = @path + '/' + client_id.to_s + '/'
data = []
2013-06-17 11:00:26 +00:00
files = []
Dir.foreach( path ) {|entry|
next if entry == '.' || entry == '..'
files.push entry
}
files.sort.each {|entry|
filename = path + '/' + entry
2012-11-26 05:04:44 +00:00
if /^send/.match( entry )
data.push Sessions.queue_file( path, entry )
2012-08-03 22:46:05 +00:00
end
2013-06-17 11:00:26 +00:00
}
2012-08-03 22:46:05 +00:00
return data
end
2012-07-30 14:50:54 +00:00
2012-08-03 23:35:21 +00:00
def self.queue_file( path, filename )
file_old = path + filename
file_new = path + 'a-' + filename
FileUtils.mv( file_old, file_new )
2012-08-03 22:46:05 +00:00
data = nil
2012-08-03 23:35:21 +00:00
all = ''
2013-04-08 18:56:59 +00:00
File.open( file_new, 'rb' ) { |file|
all = file.read
2012-08-03 22:46:05 +00:00
}
2012-08-03 23:35:21 +00:00
File.delete( file_new )
data = JSON.parse( all )
2012-08-03 22:46:05 +00:00
return data
end
2012-07-30 14:50:54 +00:00
def self.broadcast( data )
# list all current clients
client_list = self.list
client_list.each {|local_client_id, local_client|
Sessions.send( local_client_id, data )
}
return true
end
2012-08-03 22:46:05 +00:00
def self.destory( client_id )
path = @path + '/' + client_id.to_s
FileUtils.rm_rf path
2012-07-30 14:50:54 +00:00
end
end