Added web socket spool feature for ajax long polling.
This commit is contained in:
parent
facd2857c4
commit
8699b602c4
4 changed files with 145 additions and 52 deletions
|
@ -254,7 +254,6 @@ class _Singleton extends App.Controller
|
||||||
# stop init request if new one is started
|
# stop init request if new one is started
|
||||||
if @_ajaxInitWorking
|
if @_ajaxInitWorking
|
||||||
@_ajaxInitWorking.abort()
|
@_ajaxInitWorking.abort()
|
||||||
|
|
||||||
# call init request
|
# call init request
|
||||||
@_ajaxInitWorking = App.Com.ajax(
|
@_ajaxInitWorking = App.Com.ajax(
|
||||||
type: 'POST'
|
type: 'POST'
|
||||||
|
|
|
@ -9,7 +9,7 @@ class LongPollingController < ApplicationController
|
||||||
if !client_id
|
if !client_id
|
||||||
new_connection = true
|
new_connection = true
|
||||||
client_id = client_id_gen
|
client_id = client_id_gen
|
||||||
puts 'NEW CLIENT CONNECTION: ' + client_id.to_s
|
log 'notice', "new client connection", client_id
|
||||||
else
|
else
|
||||||
# cerify client id
|
# cerify client id
|
||||||
if !client_id_verify
|
if !client_id_verify
|
||||||
|
@ -21,6 +21,33 @@ class LongPollingController < ApplicationController
|
||||||
params['data'] = {}
|
params['data'] = {}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# spool messages for new connects
|
||||||
|
if params['data']['spool']
|
||||||
|
msg = JSON.generate( params )
|
||||||
|
Session.spool_create(msg)
|
||||||
|
end
|
||||||
|
|
||||||
|
# get spool messages and send them to new client connection
|
||||||
|
if params['data']['action'] == 'spool'
|
||||||
|
log 'notice', "request spool data", client_id
|
||||||
|
|
||||||
|
spool = Session.spool_list( params['data']['timestamp'], current_user.id )
|
||||||
|
spool.each { |item|
|
||||||
|
if item[:type] == 'direct'
|
||||||
|
log 'notice', "send spool to (user_id=#{ current_user.id })", client_id
|
||||||
|
Session.send( client_id, item[:message] )
|
||||||
|
else
|
||||||
|
log 'notice', "send spool", client_id
|
||||||
|
Session.send( client_id, item[:message] )
|
||||||
|
end
|
||||||
|
}
|
||||||
|
|
||||||
|
# send spool:sent event to client
|
||||||
|
log 'notice', "send spool:sent event", client_id
|
||||||
|
Session.send( client_id, { :event => 'spool:sent' } )
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
# receive message
|
# receive message
|
||||||
if params['data']['action'] == 'login'
|
if params['data']['action'] == 'login'
|
||||||
user_id = session[:user_id]
|
user_id = session[:user_id]
|
||||||
|
@ -28,6 +55,7 @@ class LongPollingController < ApplicationController
|
||||||
if user_id
|
if user_id
|
||||||
user = User.user_data_full( user_id )
|
user = User.user_data_full( user_id )
|
||||||
end
|
end
|
||||||
|
log 'notice', "send auth login (user_id #{user_id})", client_id
|
||||||
Session.create( client_id, user, { :type => 'ajax' } )
|
Session.create( client_id, user, { :type => 'ajax' } )
|
||||||
|
|
||||||
# broadcast
|
# broadcast
|
||||||
|
@ -42,11 +70,13 @@ class LongPollingController < ApplicationController
|
||||||
if params['data']['recipient'] && params['data']['recipient']['user_id']
|
if params['data']['recipient'] && params['data']['recipient']['user_id']
|
||||||
params['data']['recipient']['user_id'].each { |user_id|
|
params['data']['recipient']['user_id'].each { |user_id|
|
||||||
if local_client[:user][:id] == user_id
|
if local_client[:user][:id] == user_id
|
||||||
|
log 'notice', "send broadcast (user_id #{user_id})", local_client_id
|
||||||
Session.send( local_client_id, params['data'] )
|
Session.send( local_client_id, params['data'] )
|
||||||
end
|
end
|
||||||
}
|
}
|
||||||
# broadcast every client
|
# broadcast every client
|
||||||
else
|
else
|
||||||
|
log 'notice', "send broadcast", local_client_id
|
||||||
Session.send( local_client_id, params['data'] )
|
Session.send( local_client_id, params['data'] )
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -80,7 +110,7 @@ class LongPollingController < ApplicationController
|
||||||
count = count - 1
|
count = count - 1
|
||||||
queue = Session.queue( client_id )
|
queue = Session.queue( client_id )
|
||||||
if queue && queue[0]
|
if queue && queue[0]
|
||||||
# puts "send " + queue.inspect + client_id.to_s
|
# puts "send " + queue.inspect + client_id.to_s
|
||||||
render :json => queue
|
render :json => queue
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
@ -90,7 +120,9 @@ class LongPollingController < ApplicationController
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
rescue
|
rescue Exception => e
|
||||||
|
puts e.inspect
|
||||||
|
puts e.backtrace
|
||||||
render :json => { :error => 'Invalid client_id in receive loop!' }, :status => :unprocessable_entity
|
render :json => { :error => 'Invalid client_id in receive loop!' }, :status => :unprocessable_entity
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
@ -112,4 +144,12 @@ class LongPollingController < ApplicationController
|
||||||
# Session.touch( params[:client_id] )
|
# Session.touch( params[:client_id] )
|
||||||
return true
|
return true
|
||||||
end
|
end
|
||||||
end
|
|
||||||
|
def log( level, data, client_id = '-' )
|
||||||
|
if false
|
||||||
|
return if level == 'debug'
|
||||||
|
end
|
||||||
|
puts "#{Time.now}:client(#{ client_id }) #{ data }"
|
||||||
|
# puts "#{Time.now}:#{ level }:client(#{ client_id }) #{ data }"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
|
@ -42,6 +42,80 @@ module Session
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def self.spool_create( msg )
|
||||||
|
path = @path + '/spool/'
|
||||||
|
FileUtils.mkpath path
|
||||||
|
file = Time.new.to_f.to_s + '-' + rand(99999).to_s
|
||||||
|
File.open( path + '/' + file , 'wb' ) { |file|
|
||||||
|
data = {
|
||||||
|
:msg => msg,
|
||||||
|
:timestamp => Time.now.to_i,
|
||||||
|
}
|
||||||
|
# puts 'CREATE' + Marshal.dump(data)
|
||||||
|
file.write data.to_json
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.spool_list( timestamp, current_user_id )
|
||||||
|
path = @path + '/spool/'
|
||||||
|
FileUtils.mkpath path
|
||||||
|
data = []
|
||||||
|
to_delete = []
|
||||||
|
Dir.foreach( path ) do |entry|
|
||||||
|
next if entry == '.' || entry == '..'
|
||||||
|
File.open( path + '/' + entry, 'rb' ) { |file|
|
||||||
|
all = file.read
|
||||||
|
spool = JSON.parse( all )
|
||||||
|
begin
|
||||||
|
message_parsed = JSON.parse( spool['msg'] )
|
||||||
|
rescue => e
|
||||||
|
log 'error', "can't parse spool message: #{ message }, #{ e.inspect }"
|
||||||
|
next
|
||||||
|
end
|
||||||
|
|
||||||
|
# ignore message older then 48h
|
||||||
|
if spool['timestamp'] + (2 * 86400) < Time.now.to_i
|
||||||
|
to_delete.push path + '/' + entry
|
||||||
|
next
|
||||||
|
end
|
||||||
|
|
||||||
|
# add spool attribute to push spool info to clients
|
||||||
|
message_parsed['data']['spool'] = true
|
||||||
|
|
||||||
|
# only send not already now messages
|
||||||
|
if !timestamp || timestamp < spool['timestamp']
|
||||||
|
|
||||||
|
# spool to recipient list
|
||||||
|
if message_parsed['data']['recipient'] && message_parsed['data']['recipient']['user_id']
|
||||||
|
message_parsed['data']['recipient']['user_id'].each { |user_id|
|
||||||
|
if current_user_id == user_id
|
||||||
|
item = {
|
||||||
|
:type => 'direct',
|
||||||
|
:message => message_parsed,
|
||||||
|
:spool => spool,
|
||||||
|
}
|
||||||
|
data.push item
|
||||||
|
end
|
||||||
|
}
|
||||||
|
|
||||||
|
# spool to every client
|
||||||
|
else
|
||||||
|
item = {
|
||||||
|
:type => 'broadcast',
|
||||||
|
:message => message_parsed,
|
||||||
|
:spool => spool,
|
||||||
|
}
|
||||||
|
data.push item
|
||||||
|
end
|
||||||
|
end
|
||||||
|
}
|
||||||
|
end
|
||||||
|
to_delete.each {|file|
|
||||||
|
File.delete(file)
|
||||||
|
}
|
||||||
|
return data
|
||||||
|
end
|
||||||
|
|
||||||
def self.list
|
def self.list
|
||||||
client_ids = self.sessions
|
client_ids = self.sessions
|
||||||
session_list = {}
|
session_list = {}
|
||||||
|
@ -85,14 +159,18 @@ module Session
|
||||||
|
|
||||||
def self.send( client_id, data )
|
def self.send( client_id, data )
|
||||||
path = @path + '/' + client_id.to_s + '/'
|
path = @path + '/' + client_id.to_s + '/'
|
||||||
filename = 'send-' + Time.new().to_i.to_s + '-' + rand(99999999).to_s
|
filename = 'send-' + Time.new().to_f.to_s# + '-' + rand(99999999).to_s
|
||||||
check = true
|
check = true
|
||||||
|
count = 0
|
||||||
while check
|
while check
|
||||||
if File::exists?( path + filename )
|
if File::exists?( path + filename )
|
||||||
filename = filename + '-' + rand(99999).to_s
|
count += 1
|
||||||
|
filename = filename + '-' + count
|
||||||
|
# filename = filename + '-' + rand(99999).to_s
|
||||||
|
# filename = filename + '-' + rand(99999).to_s
|
||||||
else
|
else
|
||||||
check = false
|
check = false
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
return false if !File.directory? path
|
return false if !File.directory? path
|
||||||
File.open( path + 'a-' + filename, 'wb' ) { |file|
|
File.open( path + 'a-' + filename, 'wb' ) { |file|
|
||||||
|
@ -121,7 +199,7 @@ module Session
|
||||||
# connection already open
|
# connection already open
|
||||||
next if @@client_threads[client_id]
|
next if @@client_threads[client_id]
|
||||||
|
|
||||||
# get current user
|
# get current user
|
||||||
session_data = Session.get( client_id )
|
session_data = Session.get( client_id )
|
||||||
next if !session_data
|
next if !session_data
|
||||||
next if !session_data[:user]
|
next if !session_data[:user]
|
||||||
|
@ -172,13 +250,12 @@ module Session
|
||||||
|
|
||||||
data = []
|
data = []
|
||||||
Dir.foreach( path ) do |entry|
|
Dir.foreach( path ) do |entry|
|
||||||
if entry != '.' && entry != '..'
|
next if entry == '.' || entry == '..' || entry == 'spool'
|
||||||
data.push entry
|
data.push entry
|
||||||
end
|
|
||||||
end
|
end
|
||||||
return data
|
return data
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.queue( client_id )
|
def self.queue( client_id )
|
||||||
path = @path + '/' + client_id.to_s + '/'
|
path = @path + '/' + client_id.to_s + '/'
|
||||||
data = []
|
data = []
|
||||||
|
@ -197,9 +274,7 @@ module Session
|
||||||
data = nil
|
data = nil
|
||||||
all = ''
|
all = ''
|
||||||
File.open( file_new, 'rb' ) { |file|
|
File.open( file_new, 'rb' ) { |file|
|
||||||
while line = file.gets
|
all = file.read
|
||||||
all = all + line
|
|
||||||
end
|
|
||||||
}
|
}
|
||||||
File.delete( file_new )
|
File.delete( file_new )
|
||||||
data = JSON.parse( all )
|
data = JSON.parse( all )
|
||||||
|
@ -757,4 +832,4 @@ class ClientState
|
||||||
return if level == 'notice'
|
return if level == 'notice'
|
||||||
puts "#{Time.now}:client(#{ @client_id }) #{ data }"
|
puts "#{Time.now}:client(#{ @client_id }) #{ data }"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -81,7 +81,6 @@ if ARGV[0] == 'start' && @options[:d]
|
||||||
end
|
end
|
||||||
|
|
||||||
@clients = {}
|
@clients = {}
|
||||||
@spool = []
|
|
||||||
EventMachine.run {
|
EventMachine.run {
|
||||||
EventMachine::WebSocket.start( :host => @options[:b], :port => @options[:p], :secure => @options[:s], :tls_options => tls_options ) do |ws|
|
EventMachine::WebSocket.start( :host => @options[:b], :port => @options[:p], :secure => @options[:s], :tls_options => tls_options ) do |ws|
|
||||||
|
|
||||||
|
@ -129,50 +128,30 @@ EventMachine.run {
|
||||||
|
|
||||||
# spool messages for new connects
|
# spool messages for new connects
|
||||||
if data['spool']
|
if data['spool']
|
||||||
meta = {
|
Session.spool_create(msg)
|
||||||
:msg => msg,
|
|
||||||
:msg_object => data,
|
|
||||||
:timestamp => Time.now.to_i,
|
|
||||||
}
|
|
||||||
@spool.push meta
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# get spool messages
|
# get spool messages and send them to new client connection
|
||||||
if data['action'] == 'spool'
|
if data['action'] == 'spool'
|
||||||
@spool.each { |message|
|
log 'notice', "request spool data", client_id
|
||||||
|
|
||||||
begin
|
spool = Session.spool_list( data['timestamp'], @clients[client_id][:session]['id'] )
|
||||||
message_parsed = JSON.parse( message[:msg] )
|
spool.each { |item|
|
||||||
rescue => e
|
|
||||||
log 'error', "can't parse spool message: #{ message }, #{ e.inspect }"
|
|
||||||
next
|
|
||||||
end
|
|
||||||
# add spool attribute to push spool info to clients
|
|
||||||
message_parsed['data']['spool'] = true
|
|
||||||
msg = JSON.generate( message_parsed )
|
|
||||||
|
|
||||||
# only send not already now messages
|
# create new msg to push to client
|
||||||
if !data['timestamp'] || data['timestamp'] < message[:timestamp]
|
msg = JSON.generate( item[:message] )
|
||||||
|
|
||||||
# spool to recipient list
|
|
||||||
if message_parsed['data']['recipient'] && message_parsed['data']['recipient']['user_id']
|
|
||||||
message_parsed['data']['recipient']['user_id'].each { |user_id|
|
|
||||||
if @clients[client_id][:session]['id'] == user_id
|
|
||||||
log 'notice', "send spool to (user_id=#{user_id})", client_id
|
|
||||||
@clients[client_id][:websocket].send( "[#{ msg }]" )
|
|
||||||
end
|
|
||||||
}
|
|
||||||
|
|
||||||
# spool to every client
|
|
||||||
else
|
|
||||||
log 'notice', "send spool", client_id
|
|
||||||
@clients[client_id][:websocket].send( "[#{ msg }]" )
|
|
||||||
end
|
|
||||||
|
|
||||||
|
if item[:type] == 'direct'
|
||||||
|
log 'notice', "send spool to (user_id=#{ @clients[client_id][:session]['id'] })", client_id
|
||||||
|
@clients[client_id][:websocket].send( "[#{ msg }]" )
|
||||||
|
else
|
||||||
|
log 'notice', "send spool", client_id
|
||||||
|
@clients[client_id][:websocket].send( "[#{ msg }]" )
|
||||||
end
|
end
|
||||||
}
|
}
|
||||||
|
|
||||||
# send spool:sent event to client
|
# send spool:sent event to client
|
||||||
|
log 'notice', "send spool:sent event", client_id
|
||||||
@clients[client_id][:websocket].send( '[{"event":"spool:sent"}]' )
|
@clients[client_id][:websocket].send( '[{"event":"spool:sent"}]' )
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue