From 486802770b9cb70548cada0f40b958cc528a873c Mon Sep 17 00:00:00 2001 From: Martin Edenhofer Date: Mon, 4 Dec 2017 08:00:00 +0100 Subject: [PATCH] Added option to use multiple session instances. --- .../app/controllers/ticket_overview.coffee | 1 - lib/sessions.rb | 64 +++++-- lib/sessions/client.rb | 8 +- lib/sessions/node.rb | 169 ++++++++++++++++++ 4 files changed, 223 insertions(+), 19 deletions(-) create mode 100644 lib/sessions/node.rb diff --git a/app/assets/javascripts/app/controllers/ticket_overview.coffee b/app/assets/javascripts/app/controllers/ticket_overview.coffee index de9d4df17..40f9b7a7e 100644 --- a/app/assets/javascripts/app/controllers/ticket_overview.coffee +++ b/app/assets/javascripts/app/controllers/ticket_overview.coffee @@ -956,7 +956,6 @@ class Table extends App.Controller ticketListShow = [] for ticket in tickets ticketListShow.push App.Ticket.find(ticket.id) - console.log('overview', overview) @overview = App.Overview.find(overview.id) @table.update( overviewAttributes: @overview.view.s diff --git a/lib/sessions.rb b/lib/sessions.rb index 95149cd26..58f1dcf0b 100644 --- a/lib/sessions.rb +++ b/lib/sessions.rb @@ -109,8 +109,11 @@ returns =end def self.session_exists?(client_id) - client_ids = sessions - client_ids.include? client_id.to_s + session_dir = "#{@path}/#{client_id}" + return false if !File.exist?(session_dir) + session_file = "#{session_dir}/session" + return false if !File.exist?(session_file) + true end =begin @@ -247,14 +250,14 @@ returns data = nil # if no session dir exists, session got destoried - if !File.exist? session_dir + if !File.exist?(session_dir) destroy(client_id) - log('debug', "missing session directory for '#{client_id}', remove session.") + log('debug', "missing session directory #{session_dir} for '#{client_id}', remove session.") return end # if only session file is missing, then it's an error behavior - if !File.exist? session_file + if !File.exist?(session_file) destroy(client_id) log('error', "missing session file for '#{client_id}', remove session.") return @@ -558,16 +561,47 @@ remove all session and spool messages data end - def self.jobs + def self.jobs(node_id = nil) # just make sure that spool path exists if !File.exist?(@path) FileUtils.mkpath @path end + # dispatch sessions + if node_id && node_id.zero? + loop do + + # nodes + nodes_stats = Sessions::Node.stats + + client_ids = sessions + client_ids.each do |client_id| + + # ask nodes for nodes + next if nodes_stats[client_id] + + # assigne to node + Sessions::Node.session_assigne(client_id) + end + sleep 1 + end + end + Thread.abort_on_exception = true loop do - client_ids = sessions + + if node_id + + # register node + Sessions::Node.register(node_id) + + # watch for assigned sessions + client_ids = Sessions::Node.sessions_by(node_id) + else + client_ids = sessions + end + client_ids.each do |client_id| # connection already open, ignore @@ -586,7 +620,7 @@ remove all session and spool messages @@client_threads[client_id] = true @@client_threads[client_id] = Thread.new do - thread_client(client_id) + thread_client(client_id, 0, Time.now.utc, node_id) @@client_threads[client_id] = nil log('debug', "close client (#{client_id}) thread") if ActiveRecord::Base.connection.owner == Thread.current @@ -629,10 +663,10 @@ returns =end - def self.thread_client(client_id, try_count = 0, try_run_time = Time.now.utc) - log('debug', "LOOP #{client_id} - #{try_count}") + def self.thread_client(client_id, try_count = 0, try_run_time = Time.now.utc, node_id) + log('debug', "LOOP #{node_id}.#{client_id} - #{try_count}") begin - Sessions::Client.new(client_id) + Sessions::Client.new(client_id, node_id) rescue => e log('error', "thread_client #{client_id} exited with error #{e.inspect}") log('error', e.backtrace.join("\n ") ) @@ -654,13 +688,11 @@ returns # restart job again if try_run_max > try_count - thread_client(client_id, try_count, try_run_time) - return + thread_client(client_id, try_count, try_run_time, node_id) end - - raise "STOP thread_client for client #{client_id} after #{try_run_max} tries" + raise "STOP thread_client for client #{node_id}.#{client_id} after #{try_run_max} tries" end - log('debug', "/LOOP #{client_id} - #{try_count}") + log('debug', "/LOOP #{node_id}.#{client_id} - #{try_count}") end def self.symbolize_keys(hash) diff --git a/lib/sessions/client.rb b/lib/sessions/client.rb index 264b4a49f..7258e19c0 100644 --- a/lib/sessions/client.rb +++ b/lib/sessions/client.rb @@ -1,7 +1,8 @@ class Sessions::Client - def initialize(client_id) + def initialize(client_id, node_id) @client_id = client_id + @node_id = node_id log '---client start ws connection---' fetch log '---client exiting ws connection---' @@ -22,6 +23,9 @@ class Sessions::Client loop_count = 0 loop do + # check if session still exists + return if !Sessions.session_exists?(@client_id) + # get connection user session_data = Sessions.get(@client_id) return if !session_data @@ -71,6 +75,6 @@ class Sessions::Client end def log(msg) - Rails.logger.debug "client(#{@client_id}) #{msg}" + Rails.logger.debug "client(#{@node_id}.#{@client_id}) #{msg}" end end diff --git a/lib/sessions/node.rb b/lib/sessions/node.rb new file mode 100644 index 000000000..f023e95c1 --- /dev/null +++ b/lib/sessions/node.rb @@ -0,0 +1,169 @@ +module Sessions::Node + + # get application root directory + @root = Dir.pwd.to_s + if @root.blank? || @root == '/' + @root = Rails.root + end + + # get working directories + @path = "#{@root}/tmp/session_node_#{Rails.env}" + + def self.session_assigne(client_id, force = false) + + # get available nodes + nodes = Sessions::Node.registered + session_count = {} + + nodes.each do |node| + count = Sessions::Node.sessions_by(node['node_id'], force).count + session_count[node['node_id']] = count + end + + # search for lowest session count + node_id = nil + node_count = nil + session_count.each do |local_node_id, count| + next if !node_count.nil? && count > node_count + node_count = count + node_id = local_node_id + end + + # assigne session + Rails.logger.info "Assigne session to node #{node_id} (#{client_id})" + Sessions::Node.sessions_for(node_id, client_id) + + # write node status file + node_id + end + + def self.cleanup + FileUtils.rm_rf @path + end + + def self.registered + path = "#{@path}/*.status" + nodes = [] + files = Dir.glob(path) + files.each do |filename| + File.open(filename, 'rb') do |file| + file.flock(File::LOCK_SH) + content = file.read + file.flock(File::LOCK_UN) + begin + data = JSON.parse(content) + nodes.push data + rescue => e + Rails.logger.error "can't parse status file #{filename}, #{e.inspect}" + #to_delete.push "#{path}/#{entry}" + #next + end + end + end + nodes + end + + def self.register(node_id) + if !File.exist?(@path) + FileUtils.mkpath @path + end + + status_file = "#{@path}/#{node_id}.status" + + # write node status file + data = { + updated_at_human: Time.now.utc, + updated_at: Time.now.utc.to_i, + node_id: node_id.to_s, + pid: $PROCESS_ID, + } + content = data.to_json + + # store session data in session file + File.open(status_file, 'wb') do |file| + file.write content + end + + end + + def self.stats + # read node sessions + path = "#{@path}/*.session" + + sessions = {} + files = Dir.glob(path) + files.each do |filename| + File.open(filename, 'rb') do |file| + file.flock(File::LOCK_SH) + content = file.read + file.flock(File::LOCK_UN) + begin + next if content.blank? + data = JSON.parse(content) + next if data.blank? + next if data['client_id'].blank? + sessions[data['client_id']] = data['node_id'] + rescue => e + Rails.logger.error "can't parse session file #{filename}, #{e.inspect}" + #to_delete.push "#{path}/#{entry}" + #next + end + end + end + sessions + end + + def self.sessions_for(node_id, client_id) + if !File.exist?(@path) + FileUtils.mkpath @path + end + + status_file = "#{@path}/#{node_id}.#{client_id}.session" + + # write node status file + data = { + updated_at_human: Time.now.utc, + updated_at: Time.now.utc.to_i, + node_id: node_id.to_s, + client_id: client_id.to_s, + pid: $PROCESS_ID, + } + content = data.to_json + + # store session data in session file + File.open(status_file, 'wb') do |file| + file.write content + end + + end + + def self.sessions_by(node_id, force = false) + + # read node sessions + path = "#{@path}/#{node_id}.*.session" + + sessions = [] + files = Dir.glob(path) + files.each do |filename| + File.open(filename, 'rb') do |file| + file.flock(File::LOCK_SH) + content = file.read + file.flock(File::LOCK_UN) + begin + next if content.blank? + data = JSON.parse(content) + next if data.blank? + next if data['client_id'].blank? + next if !Sessions.session_exists?(data['client_id']) && force == false + sessions.push data['client_id'] + rescue => e + Rails.logger.error "can't parse session file #{filename}, #{e.inspect}" + #to_delete.push "#{path}/#{entry}" + #next + end + end + end + sessions + end + +end