diff --git a/app/assets/javascripts/app/lib/app_post/websocket.js.coffee b/app/assets/javascripts/app/lib/app_post/websocket.js.coffee index 7f9988bf3..22cfb7cee 100644 --- a/app/assets/javascripts/app/lib/app_post/websocket.js.coffee +++ b/app/assets/javascripts/app/lib/app_post/websocket.js.coffee @@ -254,7 +254,6 @@ class _Singleton extends App.Controller # stop init request if new one is started if @_ajaxInitWorking @_ajaxInitWorking.abort() - # call init request @_ajaxInitWorking = App.Com.ajax( type: 'POST' diff --git a/app/controllers/long_polling_controller.rb b/app/controllers/long_polling_controller.rb index 86acbc17d..1a08657ad 100644 --- a/app/controllers/long_polling_controller.rb +++ b/app/controllers/long_polling_controller.rb @@ -9,7 +9,7 @@ class LongPollingController < ApplicationController if !client_id new_connection = true client_id = client_id_gen - puts 'NEW CLIENT CONNECTION: ' + client_id.to_s + log 'notice', "new client connection", client_id else # cerify client id if !client_id_verify @@ -21,6 +21,33 @@ class LongPollingController < ApplicationController params['data'] = {} end + # spool messages for new connects + if params['data']['spool'] + msg = JSON.generate( params ) + Session.spool_create(msg) + end + + # get spool messages and send them to new client connection + if params['data']['action'] == 'spool' + log 'notice', "request spool data", client_id + + spool = Session.spool_list( params['data']['timestamp'], current_user.id ) + spool.each { |item| + if item[:type] == 'direct' + log 'notice', "send spool to (user_id=#{ current_user.id })", client_id + Session.send( client_id, item[:message] ) + else + log 'notice', "send spool", client_id + Session.send( client_id, item[:message] ) + end + } + + # send spool:sent event to client + log 'notice', "send spool:sent event", client_id + Session.send( client_id, { :event => 'spool:sent' } ) + end + + # receive message if params['data']['action'] == 'login' user_id = session[:user_id] @@ -28,6 +55,7 @@ class LongPollingController < ApplicationController if user_id user = User.user_data_full( user_id ) end + log 'notice', "send auth login (user_id #{user_id})", client_id Session.create( client_id, user, { :type => 'ajax' } ) # broadcast @@ -42,11 +70,13 @@ class LongPollingController < ApplicationController if params['data']['recipient'] && params['data']['recipient']['user_id'] params['data']['recipient']['user_id'].each { |user_id| if local_client[:user][:id] == user_id + log 'notice', "send broadcast (user_id #{user_id})", local_client_id Session.send( local_client_id, params['data'] ) end } # broadcast every client else + log 'notice', "send broadcast", local_client_id Session.send( local_client_id, params['data'] ) end end @@ -80,7 +110,7 @@ class LongPollingController < ApplicationController count = count - 1 queue = Session.queue( client_id ) if queue && queue[0] - # puts "send " + queue.inspect + client_id.to_s +# puts "send " + queue.inspect + client_id.to_s render :json => queue return end @@ -90,7 +120,9 @@ class LongPollingController < ApplicationController return end end - rescue + rescue Exception => e + puts e.inspect + puts e.backtrace render :json => { :error => 'Invalid client_id in receive loop!' }, :status => :unprocessable_entity return end @@ -112,4 +144,12 @@ class LongPollingController < ApplicationController # Session.touch( params[:client_id] ) return true end -end \ No newline at end of file + + def log( level, data, client_id = '-' ) + if false + return if level == 'debug' + end + puts "#{Time.now}:client(#{ client_id }) #{ data }" +# puts "#{Time.now}:#{ level }:client(#{ client_id }) #{ data }" + end +end diff --git a/lib/session.rb b/lib/session.rb index d7f8e7697..03425a7e0 100644 --- a/lib/session.rb +++ b/lib/session.rb @@ -42,6 +42,80 @@ module Session end end + def self.spool_create( msg ) + path = @path + '/spool/' + FileUtils.mkpath path + file = Time.new.to_f.to_s + '-' + rand(99999).to_s + File.open( path + '/' + file , 'wb' ) { |file| + data = { + :msg => msg, + :timestamp => Time.now.to_i, + } +# puts 'CREATE' + Marshal.dump(data) + file.write data.to_json + } + end + + def self.spool_list( timestamp, current_user_id ) + path = @path + '/spool/' + FileUtils.mkpath path + data = [] + to_delete = [] + Dir.foreach( path ) do |entry| + next if entry == '.' || entry == '..' + File.open( path + '/' + entry, 'rb' ) { |file| + all = file.read + spool = JSON.parse( all ) + begin + message_parsed = JSON.parse( spool['msg'] ) + rescue => e + log 'error', "can't parse spool message: #{ message }, #{ e.inspect }" + next + end + + # ignore message older then 48h + if spool['timestamp'] + (2 * 86400) < Time.now.to_i + to_delete.push path + '/' + entry + next + end + + # add spool attribute to push spool info to clients + message_parsed['data']['spool'] = true + + # only send not already now messages + if !timestamp || timestamp < spool['timestamp'] + + # spool to recipient list + if message_parsed['data']['recipient'] && message_parsed['data']['recipient']['user_id'] + message_parsed['data']['recipient']['user_id'].each { |user_id| + if current_user_id == user_id + item = { + :type => 'direct', + :message => message_parsed, + :spool => spool, + } + data.push item + end + } + + # spool to every client + else + item = { + :type => 'broadcast', + :message => message_parsed, + :spool => spool, + } + data.push item + end + end + } + end + to_delete.each {|file| + File.delete(file) + } + return data + end + def self.list client_ids = self.sessions session_list = {} @@ -85,14 +159,18 @@ module Session def self.send( client_id, data ) path = @path + '/' + client_id.to_s + '/' - filename = 'send-' + Time.new().to_i.to_s + '-' + rand(99999999).to_s + filename = 'send-' + Time.new().to_f.to_s# + '-' + rand(99999999).to_s check = true + count = 0 while check if File::exists?( path + filename ) - filename = filename + '-' + rand(99999).to_s + count += 1 + filename = filename + '-' + count +# filename = filename + '-' + rand(99999).to_s +# filename = filename + '-' + rand(99999).to_s else check = false - end + end end return false if !File.directory? path File.open( path + 'a-' + filename, 'wb' ) { |file| @@ -121,7 +199,7 @@ module Session # connection already open next if @@client_threads[client_id] - # get current user + # get current user session_data = Session.get( client_id ) next if !session_data next if !session_data[:user] @@ -172,13 +250,12 @@ module Session data = [] Dir.foreach( path ) do |entry| - if entry != '.' && entry != '..' - data.push entry - end + next if entry == '.' || entry == '..' || entry == 'spool' + data.push entry end return data end - + def self.queue( client_id ) path = @path + '/' + client_id.to_s + '/' data = [] @@ -197,9 +274,7 @@ module Session data = nil all = '' File.open( file_new, 'rb' ) { |file| - while line = file.gets - all = all + line - end + all = file.read } File.delete( file_new ) data = JSON.parse( all ) @@ -757,4 +832,4 @@ class ClientState return if level == 'notice' puts "#{Time.now}:client(#{ @client_id }) #{ data }" end -end \ No newline at end of file +end diff --git a/script/websocket-server.rb b/script/websocket-server.rb index 91a9614d7..35990145f 100755 --- a/script/websocket-server.rb +++ b/script/websocket-server.rb @@ -81,7 +81,6 @@ if ARGV[0] == 'start' && @options[:d] end @clients = {} -@spool = [] EventMachine.run { EventMachine::WebSocket.start( :host => @options[:b], :port => @options[:p], :secure => @options[:s], :tls_options => tls_options ) do |ws| @@ -129,50 +128,30 @@ EventMachine.run { # spool messages for new connects if data['spool'] - meta = { - :msg => msg, - :msg_object => data, - :timestamp => Time.now.to_i, - } - @spool.push meta + Session.spool_create(msg) end - # get spool messages + # get spool messages and send them to new client connection if data['action'] == 'spool' - @spool.each { |message| + log 'notice', "request spool data", client_id - begin - message_parsed = JSON.parse( message[:msg] ) - rescue => e - log 'error', "can't parse spool message: #{ message }, #{ e.inspect }" - next - end - # add spool attribute to push spool info to clients - message_parsed['data']['spool'] = true - msg = JSON.generate( message_parsed ) + spool = Session.spool_list( data['timestamp'], @clients[client_id][:session]['id'] ) + spool.each { |item| - # only send not already now messages - if !data['timestamp'] || data['timestamp'] < message[:timestamp] - - # spool to recipient list - if message_parsed['data']['recipient'] && message_parsed['data']['recipient']['user_id'] - message_parsed['data']['recipient']['user_id'].each { |user_id| - if @clients[client_id][:session]['id'] == user_id - log 'notice', "send spool to (user_id=#{user_id})", client_id - @clients[client_id][:websocket].send( "[#{ msg }]" ) - end - } - - # spool to every client - else - log 'notice', "send spool", client_id - @clients[client_id][:websocket].send( "[#{ msg }]" ) - end + # create new msg to push to client + msg = JSON.generate( item[:message] ) + if item[:type] == 'direct' + log 'notice', "send spool to (user_id=#{ @clients[client_id][:session]['id'] })", client_id + @clients[client_id][:websocket].send( "[#{ msg }]" ) + else + log 'notice', "send spool", client_id + @clients[client_id][:websocket].send( "[#{ msg }]" ) end } # send spool:sent event to client + log 'notice', "send spool:sent event", client_id @clients[client_id][:websocket].send( '[{"event":"spool:sent"}]' ) end