From 0fd2efa0cfe179232c2121a9b02222786c36cdbf Mon Sep 17 00:00:00 2001 From: Thorsten Eckel Date: Wed, 19 Dec 2018 15:31:06 +0100 Subject: [PATCH] Refactoring: Exctracted Websocket Server logic from script file into lib backend class for better encapsulation, easier testing and reusage possibilities (e.g. Capybara). --- lib/websocket_server.rb | 227 +++++++++++++++++++++++++++++++++++++ script/websocket-server.rb | 209 +--------------------------------- 2 files changed, 232 insertions(+), 204 deletions(-) create mode 100644 lib/websocket_server.rb diff --git a/lib/websocket_server.rb b/lib/websocket_server.rb new file mode 100644 index 000000000..a81830739 --- /dev/null +++ b/lib/websocket_server.rb @@ -0,0 +1,227 @@ +class WebsocketServer + + cattr_reader :clients, :options + + def self.run(options) + @options = options + @clients = {} + + Rails.configuration.interface = 'websocket' + EventMachine.run do + EventMachine::WebSocket.start( host: @options[:b], port: @options[:p], secure: @options[:s], tls_options: @options[:tls_options] ) do |ws| + + # register client connection + ws.onopen do |handshake| + WebsocketServer.onopen(ws, handshake) + end + + # unregister client connection + ws.onclose do + WebsocketServer.onclose(ws) + end + + # manage messages + ws.onmessage do |msg| + WebsocketServer.onmessage(ws, msg) + end + end + + # check unused connections + EventMachine.add_timer(0.5) do + WebsocketServer.check_unused_connections + end + + # check open unused connections, kick all connection without activitie in the last 2 minutes + EventMachine.add_periodic_timer(120) do + WebsocketServer.check_unused_connections + end + + EventMachine.add_periodic_timer(20) do + WebsocketServer.log_status + end + + EventMachine.add_periodic_timer(0.4) do + WebsocketServer.send_to_client + end + end + end + + def self.onopen(websocket, handshake) + headers = handshake.headers + remote_ip = get_remote_ip(headers) + client_id = websocket.object_id.to_s + log 'notice', 'Client connected.', client_id + Sessions.create( client_id, {}, { type: 'websocket' } ) + + return if @clients.include? client_id + + @clients[client_id] = { + websocket: websocket, + last_ping: Time.now.utc.to_i, + error_count: 0, + headers: headers, + remote_ip: remote_ip, + } + end + + def self.onclose(websocket) + client_id = websocket.object_id.to_s + log 'notice', 'Client disconnected.', client_id + + # removed from current client list + if @clients.include? client_id + @clients.delete client_id + end + + Sessions.destroy(client_id) + end + + def self.onmessage(websocket, msg) + client_id = websocket.object_id.to_s + log 'debug', "received: #{msg} ", client_id + begin + data = JSON.parse(msg) + rescue => e + log 'error', "can't parse message: #{msg}, #{e.inspect}", client_id + return + end + + # check if connection not already exists + return if !@clients[client_id] + + Sessions.touch(client_id) # rubocop:disable Rails/SkipsModelValidations + @clients[client_id][:last_ping] = Time.now.utc.to_i + + # spool messages for new connects + if data['spool'] + Sessions.spool_create(data) + end + + if data['event'] + log 'debug', "execute event '#{data['event']}'", client_id + message = Sessions::Event.run( + event: data['event'], + payload: data, + session: @clients[client_id][:session], + remote_ip: @clients[client_id][:remote_ip], + client_id: client_id, + clients: @clients, + options: @options, + ) + if message + websocket_send(client_id, message) + end + else + log 'error', "unknown message '#{data.inspect}'", client_id + end + end + + def self.get_remote_ip(headers) + return headers['X-Forwarded-For'] if headers && headers['X-Forwarded-For'] + + nil + end + + def self.websocket_send(client_id, data) + msg = if data.class != Array + "[#{data.to_json}]" + else + data.to_json + end + log 'debug', "send #{msg}", client_id + if !@clients[client_id] + log 'error', "no such @clients for #{client_id}", client_id + return + end + @clients[client_id][:websocket].send(msg) + end + + def self.check_unused_connections + log 'notice', 'check unused idle connections...' + + idle_time_in_sec = 4 * 60 + + # close unused web socket sessions + @clients.each do |client_id, client| + + next if ( client[:last_ping].to_i + idle_time_in_sec ) >= Time.now.utc.to_i + + log 'notice', 'closing idle websocket connection', client_id + + # remember to not use this connection anymore + client[:disconnect] = true + + # try to close regular + client[:websocket].close_websocket + + # delete session from client list + sleep 0.3 + @clients.delete(client_id) + end + + # close unused ajax long polling sessions + clients = Sessions.destroy_idle_sessions(idle_time_in_sec) + clients.each do |client_id| + log 'notice', 'closing idle long polling connection', client_id + end + end + + def self.send_to_client + return if @clients.size.zero? + + #log 'debug', 'checking for data to send...' + @clients.each do |client_id, client| + next if client[:disconnect] + + log 'debug', 'checking for data...', client_id + begin + queue = Sessions.queue(client_id) + next if queue.blank? + + log 'notice', 'send data to client', client_id + websocket_send(client_id, queue) + rescue => e + log 'error', 'problem:' + e.inspect, client_id + + # disconnect client + client[:error_count] += 1 + if client[:error_count] > 20 + if @clients.include? client_id + @clients.delete client_id + end + end + end + end + end + + def self.log_status + # websocket + log 'notice', "Status: websocket clients: #{@clients.size}" + @clients.each_key do |client_id| + log 'notice', 'working...', client_id + end + + # ajax + client_list = Sessions.list + clients = 0 + client_list.each_value do |client| + next if client[:meta][:type] == 'websocket' + + clients = clients + 1 + end + log 'notice', "Status: ajax clients: #{clients}" + client_list.each do |client_id, client| + next if client[:meta][:type] == 'websocket' + + log 'notice', 'working...', client_id + end + end + + def self.log(level, data, client_id = '-') + if !@options[:v] + return if level == 'debug' + end + puts "#{Time.now.utc.iso8601}:client(#{client_id}) #{data}" # rubocop:disable Rails/Output + #puts "#{Time.now.utc.iso8601}:#{ level }:client(#{ client_id }) #{ data }" + end +end diff --git a/script/websocket-server.rb b/script/websocket-server.rb index a4a425a45..633df6f18 100755 --- a/script/websocket-server.rb +++ b/script/websocket-server.rb @@ -87,10 +87,12 @@ OptionParser.new do |opts| @options[:i] = i end opts.on('-k', '--private-key [OPT]', '/path/to/server.key for secure connections') do |k| - tls_options[:private_key_file] = k + options[:tls_options] ||= {} + options[:tls_options][:private_key_file] = k end opts.on('-c', '--certificate [OPT]', '/path/to/server.crt for secure connections') do |c| - tls_options[:cert_chain_file] = c + options[:tls_options] ||= {} + options[:tls_options][:cert_chain_file] = c end end.parse! @@ -125,205 +127,4 @@ if ARGV[0] == 'start' && @options[:d] after_fork(dir) end -@clients = {} -Rails.configuration.interface = 'websocket' -EventMachine.run do - EventMachine::WebSocket.start( host: @options[:b], port: @options[:p], secure: @options[:s], tls_options: tls_options ) do |ws| - - # register client connection - ws.onopen do |handshake| - headers = handshake.headers - remote_ip = get_remote_ip(headers) - client_id = ws.object_id.to_s - log 'notice', 'Client connected.', client_id - Sessions.create( client_id, {}, { type: 'websocket' } ) - - if !@clients.include? client_id - @clients[client_id] = { - websocket: ws, - last_ping: Time.now.utc.to_i, - error_count: 0, - headers: headers, - remote_ip: remote_ip, - } - end - end - - # unregister client connection - ws.onclose do - client_id = ws.object_id.to_s - log 'notice', 'Client disconnected.', client_id - - # removed from current client list - if @clients.include? client_id - @clients.delete client_id - end - - Sessions.destroy(client_id) - end - - # manage messages - ws.onmessage do |msg| - - client_id = ws.object_id.to_s - log 'debug', "received: #{msg} ", client_id - begin - data = JSON.parse(msg) - rescue => e - log 'error', "can't parse message: #{msg}, #{e.inspect}", client_id - next - end - - # check if connection not already exists - next if !@clients[client_id] - - Sessions.touch(client_id) # rubocop:disable Rails/SkipsModelValidations - @clients[client_id][:last_ping] = Time.now.utc.to_i - - # spool messages for new connects - if data['spool'] - Sessions.spool_create(data) - end - - if data['event'] - log 'debug', "execute event '#{data['event']}'", client_id - message = Sessions::Event.run( - event: data['event'], - payload: data, - session: @clients[client_id][:session], - remote_ip: @clients[client_id][:remote_ip], - client_id: client_id, - clients: @clients, - options: @options, - ) - if message - websocket_send(client_id, message) - end - else - log 'error', "unknown message '#{data.inspect}'", client_id - end - end - end - - # check unused connections - EventMachine.add_timer(0.5) do - check_unused_connections - end - - # check open unused connections, kick all connection without activitie in the last 2 minutes - EventMachine.add_periodic_timer(120) do - check_unused_connections - end - - EventMachine.add_periodic_timer(20) do - - # websocket - log 'notice', "Status: websocket clients: #{@clients.size}" - @clients.each_key do |client_id| - log 'notice', 'working...', client_id - end - - # ajax - client_list = Sessions.list - clients = 0 - client_list.each_value do |client| - next if client[:meta][:type] == 'websocket' - - clients = clients + 1 - end - log 'notice', "Status: ajax clients: #{clients}" - client_list.each do |client_id, client| - next if client[:meta][:type] == 'websocket' - - log 'notice', 'working...', client_id - end - - end - - EventMachine.add_periodic_timer(0.4) do - next if @clients.size.zero? - - #log 'debug', 'checking for data to send...' - @clients.each do |client_id, client| - next if client[:disconnect] - - log 'debug', 'checking for data...', client_id - begin - queue = Sessions.queue(client_id) - next if queue.blank? - - log 'notice', 'send data to client', client_id - websocket_send(client_id, queue) - rescue => e - log 'error', 'problem:' + e.inspect, client_id - - # disconnect client - client[:error_count] += 1 - if client[:error_count] > 20 - if @clients.include? client_id - @clients.delete client_id - end - end - end - end - end - - def get_remote_ip(headers) - return headers['X-Forwarded-For'] if headers && headers['X-Forwarded-For'] - - nil - end - - def websocket_send(client_id, data) - msg = if data.class != Array - "[#{data.to_json}]" - else - data.to_json - end - log 'debug', "send #{msg}", client_id - if !@clients[client_id] - log 'error', "no such @clients for #{client_id}", client_id - return - end - @clients[client_id][:websocket].send(msg) - end - - def check_unused_connections - log 'notice', 'check unused idle connections...' - - idle_time_in_sec = 4 * 60 - - # close unused web socket sessions - @clients.each do |client_id, client| - - next if ( client[:last_ping].to_i + idle_time_in_sec ) >= Time.now.utc.to_i - - log 'notice', 'closing idle websocket connection', client_id - - # remember to not use this connection anymore - client[:disconnect] = true - - # try to close regular - client[:websocket].close_websocket - - # delete session from client list - sleep 0.3 - @clients.delete(client_id) - end - - # close unused ajax long polling sessions - clients = Sessions.destroy_idle_sessions(idle_time_in_sec) - clients.each do |client_id| - log 'notice', 'closing idle long polling connection', client_id - end - end - - def log(level, data, client_id = '-') - if !@options[:v] - return if level == 'debug' - end - puts "#{Time.now.utc.iso8601}:client(#{client_id}) #{data}" - #puts "#{Time.now.utc.iso8601}:#{ level }:client(#{ client_id }) #{ data }" - end - -end +WebsocketServer.run(@options)