Rewrite of web socket event backend.

This commit is contained in:
Martin Edenhofer 2015-12-09 14:09:37 +01:00
parent 08c2a59569
commit 5f74e50433
27 changed files with 309 additions and 279 deletions

View file

@ -20,9 +20,9 @@ class Index extends App.ControllerContent
params = @formParam(e.target) params = @formParam(e.target)
App.Event.trigger( App.Event.trigger(
'ws:send' 'ws:send'
action: 'broadcast' event: 'broadcast'
data:
event: 'session:maintenance' event: 'session:maintenance'
spool: false
data: params data: params
) )
@notify @notify

View file

@ -142,7 +142,7 @@ class App.OnlineNotificationWidget extends App.Controller
@alreadyShown[item.id] = true @alreadyShown[item.id] = true
if @fetchedData if @fetchedData
word = "#{item.type}d" word = "#{item.type}d"
title = "#{item.created_by.displayName()} #{App.i18n.translateInline(word)} #{App.i18n.translateInline(item.object_name)} #{item.title}" title = "#{item.created_by.displayName()} #{App.i18n.translateInline(word)} #{App.i18n.translateInline(item.object_name)} \"#{item.title}\""
@notifyDesktop( @notifyDesktop(
url: item.link url: item.link
title: title title: title

View file

@ -1,5 +1,4 @@
class Widget extends App.Controller class Widget extends App.Controller
constructor: -> constructor: ->
super super
@bind() @bind()
@ -14,11 +13,12 @@ class Widget extends App.Controller
# broadcast to other browser instance # broadcast to other browser instance
App.WebSocket.send( App.WebSocket.send(
action: 'broadcast' event: 'broadcast'
event: 'session:takeover'
spool: true spool: true
recipient: recipient:
user_id: [ App.Session.get( 'id' ) ] user_id: [ App.Session.get( 'id' ) ]
data:
event: 'session:takeover'
data: data:
taskbar_id: App.TaskManager.TaskbarId() taskbar_id: App.TaskManager.TaskbarId()
) )

View file

@ -68,7 +68,7 @@ class _webSocketSingleton extends App.Controller
# get spool messages after successful ws login # get spool messages after successful ws login
App.Event.bind( App.Event.bind(
'ws:login', (data) => 'ws:login', =>
@spool() @spool()
'ws' 'ws'
) )
@ -114,7 +114,7 @@ class _webSocketSingleton extends App.Controller
# logon websocket # logon websocket
data = data =
action: 'login' event: 'login'
session_id: App.Config.get('session_id') session_id: App.Config.get('session_id')
fingerprint: App.Browser.fingerprint() fingerprint: App.Browser.fingerprint()
@send(data) @send(data)
@ -125,7 +125,7 @@ class _webSocketSingleton extends App.Controller
# build data to send to server # build data to send to server
data = data =
action: 'spool' event: 'spool'
if @lastSpoolMessage if @lastSpoolMessage
data['timestamp'] = @lastSpoolMessage data['timestamp'] = @lastSpoolMessage
@ -137,10 +137,7 @@ class _webSocketSingleton extends App.Controller
App.Delay.set reset, 60000, 'reset-spool-sent-finished-if-not-returned', 'ws' App.Delay.set reset, 60000, 'reset-spool-sent-finished-if-not-returned', 'ws'
# ask for spool messages # ask for spool messages
App.Event.trigger( @send(data)
'ws:send'
data
)
close: ( params = {} ) => close: ( params = {} ) =>
if params['force'] if params['force']
@ -154,7 +151,7 @@ class _webSocketSingleton extends App.Controller
return if @backend is 'ajax' return if @backend is 'ajax'
@log 'debug', 'send websocket ping' @log 'debug', 'send websocket ping'
@send( { action: 'ping' } ) @send(event: 'ping')
# check if ping is back within 2 min # check if ping is back within 2 min
App.Delay.clear 'websocket-ping-check', 'ws' App.Delay.clear 'websocket-ping-check', 'ws'
@ -166,10 +163,10 @@ class _webSocketSingleton extends App.Controller
pong: -> pong: ->
return if @backend is 'ajax' return if @backend is 'ajax'
@log 'debug', 'received websocket ping' @log 'debug', 'received websocket pong'
# test again after 1 min # test again after 1 min
App.Delay.set @ping, 60000, 'websocket-pong', 'ws' App.Delay.set(@ping, 60000, 'websocket-pong', 'ws')
connect: => connect: =>
@ -216,7 +213,7 @@ class _webSocketSingleton extends App.Controller
@queue = [] @queue = []
# send ping to check connection # send ping to check connection
App.Delay.set @ping, 60000, 'websocket-send-ping-to-heck-connection', 'ws' App.Delay.set(@ping, 60000, 'websocket-send-ping-to-heck-connection', 'ws')
@ws.onmessage = (e) => @ws.onmessage = (e) =>
pipe = JSON.parse(e.data) pipe = JSON.parse(e.data)
@ -271,27 +268,19 @@ class _webSocketSingleton extends App.Controller
# go through all blocks # go through all blocks
for item in data for item in data
@log 'debug', 'onmessage', item
# set timestamp to get spool messages later # set timestamp to get spool messages later
if item['spool'] if item['spool']
@lastSpoolMessage = Math.round( +new Date()/1000 ) @lastSpoolMessage = Math.round( +new Date()/1000 )
# reset reconnect loop # reset reconnect loop
if item['action'] is 'pong' if item['event'] is 'pong'
@pong() @pong()
# fill collection
if item['collection']
@log 'debug', 'onmessage collection:' + item['collection']
# fire event # fire event
if item['event'] if item['event']
if typeof item['event'] is 'object' @log 'debug', "onmessage event: #{item['event']}"
for event in item['event']
@log 'debug', 'onmessage event:' + event
App.Event.trigger( event, item['data'] )
else
@log 'debug', 'onmessage event:' + item['event']
App.Event.trigger(item['event'], item['data']) App.Event.trigger(item['event'], item['data'])
_ajaxInit: (data = {}) => _ajaxInit: (data = {}) =>
@ -304,7 +293,7 @@ class _webSocketSingleton extends App.Controller
id: 'ws-login' id: 'ws-login'
type: 'POST' type: 'POST'
url: @Config.get('api_path') + '/message_send' url: @Config.get('api_path') + '/message_send'
data: JSON.stringify({ data: { action: 'login' } }) data: JSON.stringify(data: { event: 'login' })
processData: false processData: false
queue: false queue: false
success: (data) => success: (data) =>
@ -320,7 +309,7 @@ class _webSocketSingleton extends App.Controller
# try reconnect on error after x sec. # try reconnect on error after x sec.
reconnect = => reconnect = =>
@_ajaxInit(force: true) @_ajaxInit(force: true)
App.Delay.set reconnect, 10000, '_ajaxInit-reconnect-on-error', 'ws' App.Delay.set(reconnect, 10000, '_ajaxInit-reconnect-on-error', 'ws')
) )
_ajaxSend: (data) => _ajaxSend: (data) =>
@ -338,7 +327,7 @@ class _webSocketSingleton extends App.Controller
App.Ajax.request( App.Ajax.request(
type: 'POST' type: 'POST'
url: @Config.get('api_path') + '/message_send' url: @Config.get('api_path') + '/message_send'
data: JSON.stringify({ client_id: @client_id, data: data }) data: JSON.stringify(client_id: @client_id, data: data)
processData: false processData: false
queue: true queue: true
success: (data) => success: (data) =>
@ -358,7 +347,7 @@ class _webSocketSingleton extends App.Controller
id: 'message_receive', id: 'message_receive',
type: 'POST' type: 'POST'
url: @Config.get('api_path') + '/message_receive' url: @Config.get('api_path') + '/message_receive'
data: JSON.stringify({ client_id: @client_id }) data: JSON.stringify(client_id: @client_id)
processData: false processData: false
success: (data) => success: (data) =>
@log 'debug', 'ajax:onmessage', data @log 'debug', 'ajax:onmessage', data

View file

@ -17,77 +17,31 @@ class LongPollingController < ApplicationController
if !params['data'] if !params['data']
params['data'] = {} params['data'] = {}
end end
session_data = {}
if current_user.id
session_data = { 'id' => current_user.id }
end
# spool messages for new connects # spool messages for new connects
if params['data']['spool'] if params['data']['spool']
msg = JSON.generate( params['data'] ) Sessions.spool_create(params['data'])
Sessions.spool_create(msg)
end end
if params['data']['event'] == 'login'
# get spool messages and send them to new client connection Sessions.create(client_id, session_data, { type: 'ajax' })
if params['data']['action'] == 'spool' elsif params['data']['event']
message = Sessions::Event.run(
# error handling event: params['data']['event'],
if params['data']['timestamp'] payload: params['data'],
log "request spool data > '#{Time.zone.at( params['data']['timestamp'] )}'", client_id session: session_data,
else client_id: client_id,
log 'request spool init data', client_id clients: {},
end options: {},
)
if current_user if message
spool = Sessions.spool_list( params['data']['timestamp'], current_user.id ) Sessions.send(client_id, message)
spool.each { |item|
if item[:type] == 'direct'
log "send spool to (user_id=#{current_user.id})", client_id
Sessions.send( client_id, item[:message] )
else
log 'send spool', client_id
Sessions.send( client_id, item[:message] )
end
}
end
# send spool:sent event to client
sleep 0.2
log 'send spool:sent event', client_id
Sessions.send( client_id, { event: 'spool:sent', data: { timestamp: Time.zone.now.utc.to_i } } )
end
# receive message
if params['data']['action'] == 'login'
user_id = session[:user_id]
user = {}
if user_id
user = User.find( user_id ).attributes
end
log "send auth login (user_id #{user_id})", client_id
Sessions.create( client_id, user, { type: 'ajax' } )
# broadcast
elsif params['data']['action'] == 'broadcast'
# list all current clients
client_list = Sessions.list
client_list.each {|local_client_id, local_client|
if local_client_id != client_id
# broadcast to recipient list
if params['data']['recipient'] && params['data']['recipient']['user_id']
params['data']['recipient']['user_id'].each { |loop_user_id|
if local_client[:user]['id'].to_s == loop_user_id.to_s
log "send broadcast from (#{client_id}) to (user_id #{loop_user_id})", local_client_id
Sessions.send( local_client_id, params['data'] )
end
}
# broadcast every client
else
log "send broadcast from (#{client_id})", local_client_id
Sessions.send( local_client_id, params['data'] )
end end
else else
log 'do not send broadcast to it self', client_id log "unknown message '#{params['data'].inspect}'", client_id
end
}
end end
if new_connection if new_connection
@ -119,7 +73,10 @@ class LongPollingController < ApplicationController
Sessions.touch(client_id) Sessions.touch(client_id)
# set max loop time to 24 sec. because of 30 sec. timeout of mod_proxy # set max loop time to 24 sec. because of 30 sec. timeout of mod_proxy
count = 3
if Rails.env.production?
count = 12 count = 12
end
loop do loop do
count = count - 1 count = count - 1
queue = Sessions.queue(client_id) queue = Sessions.queue(client_id)
@ -133,7 +90,7 @@ class LongPollingController < ApplicationController
} }
#sleep 2 #sleep 2
if count == 0 if count == 0
render json: { action: 'pong' } render json: { event: 'pong' }
return return
end end
end end

View file

@ -427,10 +427,11 @@ returns
FileUtils.rm_rf path FileUtils.rm_rf path
end end
def self.spool_create(msg) def self.spool_create(data)
msg = JSON.generate(data)
path = "#{@path}/spool/" path = "#{@path}/spool/"
FileUtils.mkpath path FileUtils.mkpath path
file_path = path + "/#{Time.now.utc.to_f}-#{rand(99_999)}" file_path = "#{path}/#{Time.now.utc.to_f}-#{rand(99_999)}"
File.open( file_path, 'wb' ) { |file| File.open( file_path, 'wb' ) { |file|
data = { data = {
msg: msg, msg: msg,
@ -483,18 +484,27 @@ returns
next if current_user_id != user_id next if current_user_id != user_id
message = message_parsed
if message_parsed['event'] == 'broadcast'
message = message_parsed['data']
end
item = { item = {
type: 'direct', type: 'direct',
message: message_parsed, message: message,
} }
data.push item data.push item
} }
# spool to every client # spool to every client
else else
message = message_parsed
if message_parsed['event'] == 'broadcast'
message = message_parsed['data']
end
item = { item = {
type: 'broadcast', type: 'broadcast',
message: message_parsed, message: message,
} }
data.push item data.push item
end end

View file

@ -80,7 +80,7 @@ class Sessions::Backend::Collections::Base
@client.log "push assets for push_collection #{items.first.class} for user #{@user.id}" @client.log "push assets for push_collection #{items.first.class} for user #{@user.id}"
@client.send( @client.send(
data: assets, data: assets,
event: [ 'loadAssets' ], event: 'loadAssets',
) )
@client.log "push push_collection #{items.first.class} for user #{@user.id}" @client.log "push push_collection #{items.first.class} for user #{@user.id}"

View file

@ -58,7 +58,7 @@ class Sessions::Backend::TicketOverviewIndex
@client.log "push overview_index for user #{@user.id}" @client.log "push overview_index for user #{@user.id}"
@client.send( @client.send(
event: ['ticket_overview_index'], event: 'ticket_overview_index',
data: data, data: data,
) )
end end

View file

@ -107,7 +107,7 @@ class Sessions::Backend::TicketOverviewList
# send update to browser # send update to browser
@client.send( @client.send(
data: assets, data: assets,
event: [ 'loadAssets' ] event: 'loadAssets'
) )
@client.send( @client.send(
data: { data: {
@ -120,7 +120,7 @@ class Sessions::Backend::TicketOverviewList
owner_id: [], owner_id: [],
}, },
}, },
event: [ 'ticket_overview_rebuild' ], event: 'ticket_overview_rebuild',
) )
end end
} }

View file

@ -1,28 +1,18 @@
class Sessions::Event class Sessions::Event
include ApplicationLib include ApplicationLib
def self.run(event, data, session, client_id) def self.run(params)
adapter = "Sessions::Event::#{event.to_classname}" adapter = "Sessions::Event::#{params[:event].to_classname}"
begin begin
backend = load_adapter(adapter) backend = load_adapter(adapter)
rescue => e rescue => e
return { error: "No such event #{event}" } return { error: "No such event #{params[:event]}" }
end end
ActiveRecord::Base.establish_connection instance = backend.new(params)
instance = backend.new(data, session, client_id)
pre = instance.pre
if pre
ActiveRecord::Base.remove_connection
return pre
end
result = instance.run result = instance.run
post = instance.post instance.destroy
if post
ActiveRecord::Base.remove_connection
return post
end
result result
end end

View file

@ -0,0 +1,44 @@
class Sessions::Event::Base
def initialize(params)
params.each { |key, value|
instance_variable_set "@#{key}", value
}
@is_web_socket = false
return if !@clients[@client_id]
@is_web_socket = true
end
def websocket_send(recipient_client_id, data)
if data.class != Array
msg = "[#{data.to_json}]"
else
msg = data.to_json
end
if @clients[recipient_client_id]
log 'debug', "ws send #{msg}", recipient_client_id
@clients[recipient_client_id][:websocket].send(msg)
else
log 'debug', "fs send #{msg}", recipient_client_id
Sessions.send(recipient_client_id, data)
end
end
def log(level, data, client_id = nil)
if !@options[:v]
return if level == 'debug'
end
if !client_id
client_id = @client_id
end
# rubocop:disable Rails/Output
puts "#{Time.now.utc.iso8601}:client(#{client_id}) #{data}"
#puts "#{Time.now.utc.iso8601}:#{ level }:client(#{ client_id }) #{ data }"
# rubocop:enable Rails/Output
end
def destroy
end
end

View file

@ -0,0 +1,45 @@
class Sessions::Event::Broadcast < Sessions::Event::Base
def run
# list all current clients
client_list = Sessions.list
client_list.each {|local_client_id, local_client|
if local_client_id == @client_id
log 'notice', 'do not send broadcast to it self'
next
end
# broadcast to recipient list
if @payload['recipient']
if @payload['recipient'].class != Hash
log 'error', "recipient attribute isn't a hash '#{@payload['recipient'].inspect}'"
else
if !@payload['recipient'].key?('user_id')
log 'error', "need recipient.user_id attribute '#{@payload['recipient'].inspect}'"
else
if @payload['recipient']['user_id'].class != Array
log 'error', "recipient.user_id attribute isn't an array '#{@payload['recipient']['user_id'].inspect}'"
else
@payload['recipient']['user_id'].each { |user_id|
next if local_client[:user]['id'].to_i != user_id.to_i
log 'notice', "send broadcast from (#{@client_id}) to (user_id=#{user_id})", local_client_id
websocket_send(local_client_id, @payload['data'])
}
end
end
end
# broadcast every client
else
log 'notice', "send broadcast from (#{@client_id})", local_client_id
websocket_send(local_client_id, @payload['data'])
end
}
false
end
end

View file

@ -1,11 +1,12 @@
class Sessions::Event::ChatAgentState < Sessions::Event::ChatBase class Sessions::Event::ChatAgentState < Sessions::Event::ChatBase
def run def run
return super if super
# check if user has permissions # check if user has permissions
return if !agent_permission_check return if !agent_permission_check
Chat::Agent.state(@session['id'], @data['data']['active']) Chat::Agent.state(@session['id'], @payload['data']['active'])
# broadcast new state to agents # broadcast new state to agents
broadcast_agent_state_update(@session['id']) broadcast_agent_state_update(@session['id'])
@ -14,8 +15,9 @@ class Sessions::Event::ChatAgentState < Sessions::Event::ChatBase
event: 'chat_agent_state', event: 'chat_agent_state',
data: { data: {
state: 'ok', state: 'ok',
active: @data['data']['active'], active: @payload['data']['active'],
}, },
} }
end end
end end

View file

@ -1,12 +1,17 @@
class Sessions::Event::ChatBase class Sessions::Event::ChatBase < Sessions::Event::Base
def initialize(data, session, client_id) def initialize(params)
@data = data super(params)
@session = session return if !@is_web_socket
@client_id = client_id ActiveRecord::Base.establish_connection
end end
def pre def destroy
return if !@is_web_socket
ActiveRecord::Base.remove_connection
end
def run
# check if feature is enabled # check if feature is enabled
return if Setting.get('chat') return if Setting.get('chat')
@ -18,10 +23,6 @@ class Sessions::Event::ChatBase
} }
end end
def post
false
end
def broadcast_agent_state_update(ignore_user_id = nil) def broadcast_agent_state_update(ignore_user_id = nil)
# send broadcast to agents # send broadcast to agents
@ -99,11 +100,11 @@ class Sessions::Event::ChatBase
end end
def current_chat_session def current_chat_session
Chat::Session.find_by(session_id: @data['data']['session_id']) Chat::Session.find_by(session_id: @payload['data']['session_id'])
end end
def check_chat_session_exists def check_chat_session_exists
if !@data['data'] || !@data['data']['session_id'] if !@payload['data'] || !@payload['data']['session_id']
error = { error = {
event: 'chat_error', event: 'chat_error',
data: { data: {
@ -117,7 +118,7 @@ class Sessions::Event::ChatBase
error = { error = {
event: 'chat_error', event: 'chat_error',
data: { data: {
state: "No such session id #{@data['data']['session_id']}", state: "No such session id #{@payload['data']['session_id']}",
}, },
} }
Sessions.send(@client_id, error) Sessions.send(@client_id, error)
@ -125,7 +126,7 @@ class Sessions::Event::ChatBase
end end
def current_chat def current_chat
Chat.find_by(id: @data['data']['chat_id']) Chat.find_by(id: @payload['data']['chat_id'])
end end
def check_chat_exists def check_chat_exists

View file

@ -1,6 +1,7 @@
class Sessions::Event::ChatSessionClose < Sessions::Event::ChatBase class Sessions::Event::ChatSessionClose < Sessions::Event::ChatBase
def run def run
return super if super
return if !check_chat_session_exists return if !check_chat_session_exists
@ -57,4 +58,5 @@ class Sessions::Event::ChatSessionClose < Sessions::Event::ChatBase
}, },
} }
end end
end end

View file

@ -1,11 +1,12 @@
class Sessions::Event::ChatSessionInit < Sessions::Event::ChatBase class Sessions::Event::ChatSessionInit < Sessions::Event::ChatBase
def run def run
return super if super
return if !check_chat_exists return if !check_chat_exists
# create chat session # create chat session
chat_session = Chat::Session.create( chat_session = Chat::Session.create(
chat_id: @data['data']['chat_id'], chat_id: @payload['data']['chat_id'],
name: '', name: '',
state: 'waiting', state: 'waiting',
preferences: { preferences: {
@ -26,4 +27,5 @@ class Sessions::Event::ChatSessionInit < Sessions::Event::ChatBase
}, },
} }
end end
end end

View file

@ -1,6 +1,7 @@
class Sessions::Event::ChatSessionLeaveTemporary < Sessions::Event::ChatBase class Sessions::Event::ChatSessionLeaveTemporary < Sessions::Event::ChatBase
def run def run
return super if super
return if !check_chat_session_exists return if !check_chat_session_exists
chat_session = current_chat_session chat_session = current_chat_session

View file

@ -1,6 +1,7 @@
class Sessions::Event::ChatSessionMessage < Sessions::Event::ChatBase class Sessions::Event::ChatSessionMessage < Sessions::Event::ChatBase
def run def run
return super if super
return if !check_chat_session_exists return if !check_chat_session_exists
chat_session = current_chat_session chat_session = current_chat_session
@ -10,7 +11,7 @@ class Sessions::Event::ChatSessionMessage < Sessions::Event::ChatBase
end end
chat_message = Chat::Message.create( chat_message = Chat::Message.create(
chat_session_id: chat_session.id, chat_session_id: chat_session.id,
content: @data['data']['content'], content: @payload['data']['content'],
created_by_id: user_id, created_by_id: user_id,
) )
message = { message = {
@ -35,4 +36,5 @@ class Sessions::Event::ChatSessionMessage < Sessions::Event::ChatBase
} }
end end
end end

View file

@ -1,6 +1,7 @@
class Sessions::Event::ChatSessionStart < Sessions::Event::ChatBase class Sessions::Event::ChatSessionStart < Sessions::Event::ChatBase
def run def run
return super if super
agent_permission_check agent_permission_check
# find first in waiting list # find first in waiting list
@ -48,4 +49,5 @@ class Sessions::Event::ChatSessionStart < Sessions::Event::ChatBase
nil nil
end end
end end

View file

@ -1,6 +1,7 @@
class Sessions::Event::ChatSessionTyping < Sessions::Event::ChatBase class Sessions::Event::ChatSessionTyping < Sessions::Event::ChatBase
def run def run
return super if super
return if !check_chat_session_exists return if !check_chat_session_exists
chat_session = current_chat_session chat_session = current_chat_session
@ -28,4 +29,5 @@ class Sessions::Event::ChatSessionTyping < Sessions::Event::ChatBase
}, },
} }
end end
end end

View file

@ -1,6 +1,7 @@
class Sessions::Event::ChatStatusAgent < Sessions::Event::ChatBase class Sessions::Event::ChatStatusAgent < Sessions::Event::ChatBase
def run def run
return super if super
# check if user has permissions # check if user has permissions
return if !agent_permission_check return if !agent_permission_check

View file

@ -1,12 +1,13 @@
class Sessions::Event::ChatStatusCustomer < Sessions::Event::ChatBase class Sessions::Event::ChatStatusCustomer < Sessions::Event::ChatBase
def run def run
return super if super
return if !check_chat_exists return if !check_chat_exists
# check if it's a chat sessin reconnect # check if it's a chat sessin reconnect
session_id = nil session_id = nil
if @data['data']['session_id'] if @payload['data']['session_id']
session_id = @data['data']['session_id'] session_id = @payload['data']['session_id']
# update recipients of existing sessions # update recipients of existing sessions
chat_session = Chat::Session.find_by(session_id: session_id) chat_session = Chat::Session.find_by(session_id: session_id)
@ -17,4 +18,5 @@ class Sessions::Event::ChatStatusCustomer < Sessions::Event::ChatBase
data: current_chat.customer_state(session_id), data: current_chat.customer_state(session_id),
} }
end end
end end

View file

@ -0,0 +1,32 @@
class Sessions::Event::Login < Sessions::Event::Base
def run
# get user_id
if @payload && @payload['session_id']
if @is_web_socket
ActiveRecord::Base.establish_connection
end
session = ActiveRecord::SessionStore::Session.find_by(session_id: @payload['session_id'])
if @is_web_socket
ActiveRecord::Base.remove_connection
end
end
if session && session.data && session.data['user_id']
new_session_data = { 'id' => session.data['user_id'] }
else
new_session_data = {}
end
if @clients[@client_id]
@clients[@client_id][:session] = new_session_data
Sessions.create(@client_id, new_session_data, { type: 'websocket' })
else
Sessions.create(@client_id, new_session_data, { type: 'ajax' })
end
false
end
end

View file

@ -0,0 +1,9 @@
class Sessions::Event::Ping < Sessions::Event::Base
def run
{
event: 'pong',
}
end
end

View file

@ -0,0 +1,41 @@
class Sessions::Event::Spool < Sessions::Event::Base
# get spool messages and send them to new client connection
def run
# error handling
if @payload['timestamp']
log 'notice', "request spool data > '#{Time.at(@payload['timestamp']).utc.iso8601}'"
else
log 'notice', 'request spool with init data'
end
if !@session || !@session['id']
log 'error', "can't send spool, session not authenticated"
return
end
spool = Sessions.spool_list(@payload['timestamp'], @session['id'])
spool.each { |item|
# create new msg to push to client
if item[:type] == 'direct'
log 'notice', "send spool to (user_id=#{@session['id']})"
websocket_send(@client_id, item[:message])
else
log 'notice', 'send spool'
websocket_send(@client_id, item[:message])
end
}
# send spool:sent event to client
log 'notice', 'send spool:sent event'
{
event: 'spool:sent',
data: {
timestamp: Time.now.utc.to_i,
},
}
end
end

View file

@ -145,128 +145,24 @@ EventMachine.run {
# spool messages for new connects # spool messages for new connects
if data['spool'] if data['spool']
Sessions.spool_create(msg) Sessions.spool_create(data)
end end
# get spool messages and send them to new client connection if data['event']
if data['action'] == 'spool' log 'debug', "execute event '#{data['event']}'", client_id
message = Sessions::Event.run(
# error handling event: data['event'],
if data['timestamp'] payload: data,
log 'notice', "request spool data > '#{Time.at(data['timestamp']).utc.iso8601}'", client_id session: @clients[client_id][:session],
else client_id: client_id,
log 'notice', 'request spool with init data', client_id clients: @clients,
end options: @options,
)
if @clients[client_id] && @clients[client_id][:session] && @clients[client_id][:session]['id']
spool = Sessions.spool_list( data['timestamp'], @clients[client_id][:session]['id'] )
spool.each { |item|
# create new msg to push to client
if item[:type] == 'direct'
log 'notice', "send spool to (user_id=#{@clients[client_id][:session]['id']})", client_id
websocket_send(client_id, item[:message])
else
log 'notice', 'send spool', client_id
websocket_send(client_id, item[:message])
end
}
else
log 'error', "can't send spool, session not authenticated", client_id
end
# send spool:sent event to client
log 'notice', 'send spool:sent event', client_id
message = {
event: 'spool:sent',
data: {
timestamp: Time.now.utc.to_i,
},
}
websocket_send(client_id, message)
end
# get session
if data['action'] == 'login'
# get user_id
if data && data['session_id']
ActiveRecord::Base.establish_connection
session = ActiveRecord::SessionStore::Session.find_by( session_id: data['session_id'] )
ActiveRecord::Base.remove_connection
end
if session && session.data && session.data['user_id']
new_session_data = { 'id' => session.data['user_id'] }
else
new_session_data = {}
end
@clients[client_id][:session] = new_session_data
Sessions.create( client_id, new_session_data, { type: 'websocket' } )
# remember ping, send pong back
elsif data['action'] == 'ping'
message = {
action: 'pong',
}
websocket_send(client_id, message)
# broadcast
elsif data['action'] == 'broadcast'
# list all current clients
client_list = Sessions.list
client_list.each {|local_client_id, local_client|
if local_client_id != client_id
# broadcast to recipient list
if data['recipient']
if data['recipient'].class != Hash
log 'error', "recipient attribute isn't a hash '#{data['recipient'].inspect}'"
else
if !data['recipient'].key?('user_id')
log 'error', "need recipient.user_id attribute '#{data['recipient'].inspect}'"
else
if data['recipient']['user_id'].class != Array
log 'error', "recipient.user_id attribute isn't an array '#{data['recipient']['user_id'].inspect}'"
else
data['recipient']['user_id'].each { |user_id|
next if local_client[:user]['id'].to_i != user_id.to_i
log 'notice', "send broadcast from (#{client_id}) to (user_id=#{user_id})", local_client_id
if local_client[:meta][:type] == 'websocket' && @clients[ local_client_id ]
websocket_send(local_client_id, data)
else
Sessions.send(local_client_id, data)
end
}
end
end
end
# broadcast every client
else
log 'notice', "send broadcast from (#{client_id})", local_client_id
if local_client[:meta][:type] == 'websocket' && @clients[ local_client_id ]
websocket_send(local_client_id, data)
else
Sessions.send(local_client_id, data)
end
end
else
log 'notice', 'do not send broadcast to it self', client_id
end
}
elsif data['event']
log 'notice', "execute event '#{data['event']}'", client_id
message = Sessions::Event.run(data['event'], data, @clients[client_id][:session], client_id)
if message if message
websocket_send(client_id, message) websocket_send(client_id, message)
end end
else
log 'error', "unknown message '#{data.inspect}'", client_id
end end
} }
end end