From d15728f779fa43c30c575bc2b4187755968d087e Mon Sep 17 00:00:00 2001 From: Martin Edenhofer Date: Sat, 4 Aug 2012 00:46:05 +0200 Subject: [PATCH] Moved to Threads. --- app/models/ticket.rb | 7 +- lib/web_socket.rb | 730 ++++++++++++++++++++----------------- script/websocket-server.rb | 15 +- 3 files changed, 404 insertions(+), 348 deletions(-) diff --git a/app/models/ticket.rb b/app/models/ticket.rb index 823252143..62429021a 100644 --- a/app/models/ticket.rb +++ b/app/models/ticket.rb @@ -221,12 +221,17 @@ class Ticket < ApplicationModel order( overview_selected[:order][:by].to_s + ' ' + overview_selected[:order][:direction].to_s ). limit( 500 ) + ticket_ids = [] + tickets.each { |ticket| + ticket_ids.push ticket.id + } + tickets_count = Ticket.where( :group_id => group_ids ). where( overview_selected.condition ). count() return { - :tickets => tickets, + :tickets => ticket_ids, :tickets_count => tickets_count, :overview => overview_selected_raw, } diff --git a/lib/web_socket.rb b/lib/web_socket.rb index d6b94f727..8bd86b773 100644 --- a/lib/web_socket.rb +++ b/lib/web_socket.rb @@ -2,6 +2,8 @@ require 'json' module Session @path = '/tmp/websocket' + @@user_threads = {} + @@client_threads = {} def self.create( client_id, session ) path = @path + '/' + client_id.to_s @@ -18,10 +20,10 @@ module Session return if !File.exist? session_file File.open( session_file, 'r' ) { |file| all = '' - while line = file.gets - all = all + line + while line = file.gets + all = all + line end - begin + begin data = Marshal.load( all ) rescue return @@ -40,6 +42,15 @@ module Session filename = @path + '/' + client_id.to_s + '/transaction-' + Time.new().to_i.to_s + '-3' if File::exists?( filename ) filename = @path + '/' + client_id.to_s + '/transaction-' + Time.new().to_i.to_s + '-4' + if File::exists?( filename ) + filename = @path + '/' + client_id.to_s + '/transaction-' + Time.new().to_i.to_s + '-5' + if File::exists?( filename ) + filename = @path + '/' + client_id.to_s + '/transaction-' + Time.new().to_i.to_s + '-6' + if File::exists?( filename ) + filename = @path + '/' + client_id.to_s + '/transaction-' + Time.new().to_i.to_s + '-7' + end + end + end end end end @@ -51,364 +62,42 @@ module Session end def self.jobs - state_client_ids = {} - state_user_ids = {} + Thread.abort_on_exception = true while true client_ids = self.sessions client_ids.each { |client_id| - if !state_client_ids[client_id] - state_client_ids[client_id] = {} - end - # get current user user_session = Session.get( client_id ) next if !user_session next if !user_session[:id] user = User.find( user_session[:id] ) + next if !user - if !state_user_ids[ user.id ] - state_user_ids[ user.id ] = {} + # start user thread + if !@@user_threads[user.id] + @@user_threads[user.id] = Thread.new { + UserState.new(user.id) + @@user_threads[user.id] = nil +# raise "Exception from thread" + } end - # check user cache - if self.jobs_user_needed( state_user_ids[ user.id ], :overview, 2.seconds ) - puts "fetch for user_id #{ user.id } :overview..." - # overview meta data - overview = Ticket.overview( - :current_user_id => user.id, - ) - self.jobs_user_data_set( state_user_ids[ user.id ], :overview, overview ) - else - puts "use user_id #{ user.id } cache for :overview..." - overview = self.jobs_user_data_get( state_user_ids[ user.id ], :overview ) + # start client thread + if !@@client_threads[client_id] + @@client_threads[client_id] = Thread.new { + ClientState.new(client_id) + @@client_threads[client_id] = nil +# raise "Exception from thread" + } end - if state_client_ids[client_id][:overview] != overview - state_client_ids[client_id][:overview] = overview - - # send update to browser - Session.transaction( client_id, { - :data => overview, - :event => 'navupdate_ticket_overview', - }) - end - - # ticket overview lists - overviews = Ticket.overview_list( - :current_user_id => user.id, - ) - if !state_client_ids[client_id][:overview_data] - state_client_ids[client_id][:overview_data] = {} - end - overviews.each { |overview| - - # check user cache - cache_key = ( 'overview_data_' + overview.meta[:url] ).to_sym - if self.jobs_user_needed( state_user_ids[ user.id ], cache_key, 2.seconds ) - puts "fetch for user_id #{ user.id } #{ cache_key.to_s }..." - overview_data = Ticket.overview( - :view => overview.meta[:url], - # :view_mode => params[:view_mode], - :current_user_id => user.id, - :array => true, - ) - self.jobs_user_data_set( state_user_ids[ user.id ], cache_key, overview_data ) - else - puts "use user_id #{ user.id } cache for #{ cache_key.to_s }..." - overview_data = self.jobs_user_data_get( state_user_ids[ user.id ], cache_key ) - end - - if state_client_ids[client_id][ cache_key ] != overview_data - state_client_ids[client_id][ cache_key ] = overview_data -puts 'push overview ' + overview.meta[:url].to_s - users = {} - tickets = [] - ticket_list = [] - overview_data[:tickets].each {|ticket| - ticket_list.push ticket.id - self.jobs_ticket( ticket.id, state_client_ids[client_id], tickets, users ) - } - - # send update to browser - Session.transaction( client_id, { - :data => { - :overview => overview_data[:overview], - :ticket_list => ticket_list, - :tickets_count => overview_data[:tickets_count], - :collections => { - :User => users, - :Ticket => tickets, - } - }, - :event => [ 'loadCollection', 'ticket_overview_rebuild' ], - :collection => 'ticket_overview_' + overview.meta[:url].to_s, - }) - end - } - - # recent viewed - self.jobs_recent_viewed( - user, - client_id, - state_client_ids[ client_id ], - state_user_ids[ user.id ], - ) - - # activity stream - self.jobs_activity_stream( - user, - client_id, - state_client_ids[client_id], - state_user_ids[ user.id ], - ) - - # ticket create - self.jobs_create_attributes( - user, - client_id, - state_client_ids[client_id], - state_user_ids[ user.id ], - ) - # system settings - - - # rss view - self.jobs_rss( - user, - client_id, - state_client_ids[client_id], - 'http://www.heise.de/newsticker/heise-atom.xml' - ) - sleep 0.5 + sleep 0.3 } end end - def self.jobs_ticket(ticket_id, client_state, tickets, users) - - puts 'check :overview' - - if !client_state['tickets'] - client_state['tickets'] = {} - end - - # add ticket if needed - data = Ticket.full_data(ticket_id) - if client_state['tickets'][ticket_id] != data - client_state['tickets'][ticket_id] = data - tickets.push data - end - - # add users if needed - self.jobs_user( data['owner_id'], client_state, users ) - self.jobs_user( data['customer_id'], client_state, users ) - self.jobs_user( data['created_by_id'], client_state, users ) - end - - def self.jobs_user(user_id, client_state, users) - - if !client_state['users'] - client_state['users'] = {} - end - - # get user - user = User.user_data_full( user_id ) - - # user is already on client and not changed - return if client_state['users'][ user_id ] == user - - puts 'push user ... ' + user['login'] - # user not on client or different - users[ user_id ] = user - client_state['users'][ user_id ] = user - end - - # rss view - def self.jobs_rss(user_id, client_id, client_state, url) - - # name space - if !client_state[:rss_items] - client_state[:rss_items] = {} - end - - # only fetch every 5 minutes - return if client_state[:rss_items][:last_run] && Time.new - client_state[:rss_items][:last_run] < 5.minutes - - # remember last run - client_state[:rss_items][:last_run] = Time.new - - puts 'check :rss' - # fetch rss - rss_items = RSS.fetch( url, 8 ) - if client_state[:rss_items][:data] != rss_items - client_state[:rss_items][:data] = rss_items - - # send update to browser - Session.transaction( client_id, { - :event => 'rss_rebuild', - :collection => 'dashboard_rss', - :data => { - head: 'Heise ATOM', - items: rss_items, - }, - }) - end - end - - def self.jobs_recent_viewed(user, client_id, client_state, state_user ) - - # name space - if !client_state[:recent_viewed] - client_state[:recent_viewed] = {} - end - - # only fetch every x seconds - return if client_state[:recent_viewed][:last_run] && Time.new - client_state[:recent_viewed][:last_run] < 4.seconds - - # remember last run - client_state[:recent_viewed][:last_run] = Time.new - - puts 'check :recent_viewed' - - # fetch data - if self.jobs_user_needed( state_user, :recent_viewed, 10.seconds ) - puts "fetch for user_id #{ user.id } :recent_viewed..." - - recent_viewed = History.recent_viewed(user) - self.jobs_user_data_set( state_user, :recent_viewed, recent_viewed ) - else - puts "use user_id #{ user.id } cache for :recent_viewed..." - recent_viewed = self.jobs_user_data_get( state_user, :recent_viewed ) - end - - if client_state[:recent_viewed][:data] != recent_viewed - client_state[:recent_viewed][:data] = recent_viewed - - # tickets and users - recent_viewed = History.recent_viewed_fulldata(user) - - # send update to browser - Session.transaction( client_id, { - :data => recent_viewed, - :event => 'update_recent_viewed', - }) - end - end - - def self.jobs_activity_stream(user, client_id, client_state, state_user) - - # name space - if !client_state[:activity_stream] - client_state[:activity_stream] = {} - end - - # only fetch every x seconds - return if client_state[:activity_stream][:last_run] && Time.new - client_state[:activity_stream][:last_run] < 8.seconds - - # remember last run - client_state[:activity_stream][:last_run] = Time.new - - puts 'check :activity_stream' - - if self.jobs_user_needed( state_user, :activity_stream, 3.seconds ) - puts "fetch for user_id #{ user.id } :activity_stream..." - - activity_stream = History.activity_stream(user) - self.jobs_user_data_set( state_user, :activity_stream, activity_stream ) - else - puts "use user_id #{ user.id } cache for :activity_stream..." - activity_stream = self.jobs_user_data_get( state_user, :activity_stream ) - end - - if client_state[:activity_stream][:data] != activity_stream - client_state[:activity_stream][:data] = activity_stream - - activity_stream = History.activity_stream_fulldata(user) - - # send update to browser - Session.transaction( client_id, { - :event => 'activity_stream_rebuild', - :collection => 'activity_stream', - :data => activity_stream, - }) - end - end - - def self.jobs_create_attributes(user, client_id, client_state, state_user) - - # name space - if !client_state[:create_attributes] - client_state[:create_attributes] = {} - end - - # only fetch every x seconds - return if client_state[:create_attributes][:last_run] && Time.new - client_state[:create_attributes][:last_run] < 12.seconds - - # remember last run - client_state[:create_attributes][:last_run] = Time.new - - puts 'check :create_attributes' - if self.jobs_user_needed( state_user, :create_attributes, 26.seconds ) - puts "fetch for user_id #{ user.id } :create_attributes..." - - ticket_create_attributes = Ticket.create_attributes( - :current_user_id => user.id, - ) - self.jobs_user_data_set( state_user, :create_attributes, ticket_create_attributes ) - else - puts "use user_id #{ user.id } cache for :create_attributes..." - ticket_create_attributes = self.jobs_user_data_get( state_user, :create_attributes ) - end - - if client_state[:create_attributes][:data] != ticket_create_attributes - client_state[:create_attributes][:data] = ticket_create_attributes - - # send update to browser - Session.transaction( client_id, { - :data => ticket_create_attributes, - :collection => 'ticket_create_attributes', - }) - end - end - - def self.jobs_user_needed( item, key, ttl ) - - if !item[key] - item[key] = {} - end - - # run needed on initial - if !item[key][:last_run] - - # set new last run - item[key][:last_run] = Time.new - return true - end - - # run needed if ttl is over - if Time.new - item[key][:last_run] > ttl - - # set new last run - item[key][:last_run] = Time.new - return true - end - - # no new run needed - return false - end - - def self.jobs_user_data_get( item, key ) - return item[key][:data] - end - - def self.jobs_user_data_set( item, key, data ) - item[key][:data] = data - return true - end - - def self.sessions path = @path + '/' data = [] @@ -450,3 +139,360 @@ puts 'push overview ' + overview.meta[:url].to_s end end + +module CacheIn + @@data = {} + @@expires_in = {} + @@expires_in_ttl = {} + def self.set( key, value, params = {} ) +# puts 'CacheIn.set:' + key + '-' + value.inspect + if params[:expires_in] + @@expires_in[key] = Time.now + params[:expires_in] + @@expires_in_ttl[key] = params[:expires_in] + end + @@data[ key ] = value + end + def self.expired( key ) + if @@expires_in[key] + return true if @@expires_in[key] < Time.now + return false + end + return true + end + def self.get( key, params = {} ) +# puts 'CacheIn.get:' + key + '-' + @@data[ key ].inspect + if !params[:re_expire] + if @@expires_in[key] + return if self.expired(key) + end + else + if @@expires_in[key] + @@expires_in[key] = Time.now + @@expires_in_ttl[key] + end + end + @@data[ key ] + end +end + + +class UserState + def initialize( user_id ) + @user_id = user_id + @data = {} + @cache_key = 'user_' + user_id.to_s + self.log "---user started user state" + + CacheIn.set( 'last_run_' + user_id.to_s , true, { :expires_in => 20.seconds } ) + + self.fetch + end + + def fetch + user = User.find( @user_id ) + return if !user + + while true + + # check if user is still with min one open connection + if !CacheIn.get( 'last_run_' + user.id.to_s ) + self.log "---user - closeing thread - no open user connection" + return + end + + self.log "---user - fetch user data" + # overview + cache_key = @cache_key + '_overview' + if CacheIn.expired(cache_key) + overview = Ticket.overview( + :current_user_id => user.id, + ) + overview_cache = CacheIn.get( cache_key, { :re_expire => true } ) + self.log 'fetch overview - ' + cache_key + if overview != overview_cache + self.log 'fetch overview changed - ' + cache_key +# puts overview.inspect +# puts '------' +# puts overview_cache.inspect + CacheIn.set( cache_key, overview, { :expires_in => 3.seconds } ) + end + end + + # overview lists + overviews = Ticket.overview_list( + :current_user_id => user.id, + ) + overviews.each { |overview| + cache_key = @cache_key + '_overview_data_' + overview.meta[:url] + if CacheIn.expired(cache_key) + overview_data = Ticket.overview( + :view => overview.meta[:url], +# :view_mode => params[:view_mode], + :current_user_id => user.id, + :array => true, + ) + overview_data_cache = CacheIn.get( cache_key, { :re_expire => true } ) + self.log 'fetch overview_data - ' + cache_key + if overview_data != overview_data_cache + self.log 'fetch overview_data changed - ' + cache_key + CacheIn.set( cache_key, overview_data, { :expires_in => 5.seconds } ) + end + end + } + + # create_attributes + cache_key = @cache_key + '_ticket_create_attributes' + if CacheIn.expired(cache_key) + ticket_create_attributes = Ticket.create_attributes( + :current_user_id => user.id, + ) + ticket_create_attributes_cache = CacheIn.get( cache_key, { :re_expire => true } ) + self.log 'fetch ticket_create_attributes - ' + cache_key + if ticket_create_attributes != ticket_create_attributes_cache + self.log 'fetch ticket_create_attributes changed - ' + cache_key + + CacheIn.set( cache_key, ticket_create_attributes, { :expires_in => 2.minutes } ) + end + end + + # recent viewed + cache_key = @cache_key + '_recent_viewed' + if CacheIn.expired(cache_key) + recent_viewed = History.recent_viewed(user) + recent_viewed_cache = CacheIn.get( cache_key, { :re_expire => true } ) + self.log 'fetch recent_viewed - ' + cache_key + if recent_viewed != recent_viewed_cache + self.log 'fetch recent_viewed changed - ' + cache_key + + CacheIn.set( cache_key, recent_viewed, { :expires_in => 5.seconds } ) + + recent_viewed_full = History.recent_viewed_fulldata(user) + CacheIn.set( cache_key + '_push', recent_viewed_full ) + end + end + + # activity steam + cache_key = @cache_key + '_activity_stream' + if CacheIn.expired(cache_key) + activity_stream = History.activity_stream( user ) + activity_stream_cache = CacheIn.get( cache_key, { :re_expire => true } ) + self.log 'fetch activity_stream - ' + cache_key + if activity_stream != activity_stream_cache + self.log 'fetch activity_stream changed - ' + cache_key + + CacheIn.set( cache_key, activity_stream, { :expires_in => 1.minutes } ) + + activity_stream_full = History.activity_stream_fulldata( user ) + CacheIn.set( cache_key + '_push', activity_stream_full ) + end + end + + # rss + cache_key = @cache_key + '_rss' + if CacheIn.expired(cache_key) + url = 'http://www.heise.de/newsticker/heise-atom.xml' + rss_items = RSS.fetch( url, 8 ) + rss_items_cache = CacheIn.get( cache_key, { :re_expire => true } ) + self.log 'fetch rss - ' + cache_key + if rss_items != rss_items_cache + self.log 'fetch rss changed - ' + cache_key + CacheIn.set( cache_key, rss_items, { :expires_in => 2.minutes } ) + CacheIn.set( cache_key + '_push', { + head: 'Heise ATOM', + items: rss_items, + }) + end + end + self.log "---/user-" + sleep 1.2 + end + end + + def log( data ) + puts "#{Time.now}:user_id(#{ @user_id }) #{ data }" + end +end + + +class ClientState + def initialize( client_id ) + @client_id = client_id + @data = {} + @pushed = {} + self.log "---client start ws connection---" + self.fetch + self.log "---client exiting ws connection---" + end + + def fetch + while true + + # get connection user + user_session = Session.get( @client_id ) + return if !user_session + return if !user_session[:id] + user = User.find( user_session[:id] ) + return if !user + + self.log "---client - looking for data of user #{user.id}" + + # remember last run + CacheIn.set( 'last_run_' + user.id.to_s , true, { :expires_in => 20.seconds } ) + + # overview + cache_key = 'user_' + user.id.to_s + '_overview' + overview = CacheIn.get( cache_key ) + if overview && @data[:overview] != overview + @data[:overview] = overview + self.log "push overview for user #{user.id}" + + # send update to browser + self.transaction({ + :event => 'navupdate_ticket_overview', + :data => overview, + }) + end + + # overview_data + overviews = Ticket.overview_list( + :current_user_id => user.id, + ) + overviews.each { |overview| + cache_key = 'user_' + user.id.to_s + '_overview_data_' + overview.meta[:url] + + overview_data = CacheIn.get( cache_key ) + if overview_data && @data[cache_key] != overview_data + @data[cache_key] = overview_data + self.log "push overview_data for user #{user.id}" + + users = {} + tickets = [] + overview_data[:tickets].each {|ticket_id| + self.ticket( ticket_id, tickets, users ) + } + # send update to browser + self.transaction({ + :data => { + :overview => overview_data[:overview], + :ticket_list => overview_data[:tickets], + :tickets_count => overview_data[:tickets_count], + :collections => { + :User => users, + :Ticket => tickets, + } + }, + :event => [ 'loadCollection', 'ticket_overview_rebuild' ], + :collection => 'ticket_overview_' + overview.meta[:url].to_s, + }) + end + } + + # ticket_create_attributes + cache_key = 'user_' + user.id.to_s + '_ticket_create_attributes' + ticket_create_attributes = CacheIn.get( cache_key ) + if ticket_create_attributes && @data[:ticket_create_attributes] != ticket_create_attributes + @data[:ticket_create_attributes] = ticket_create_attributes + self.log "push ticket_create_attributes for user #{user.id}" + + # send update to browser + self.transaction({ + :collection => 'ticket_create_attributes', + :data => ticket_create_attributes, + }) + end + + # recent viewed + cache_key = 'user_' + user.id.to_s + '_recent_viewed' + recent_viewed = CacheIn.get( cache_key ) + if recent_viewed && @data[:recent_viewed] != recent_viewed + @data[:recent_viewed] = recent_viewed + self.log "push recent_viewed for user #{user.id}" + + # send update to browser + r = CacheIn.get( cache_key + '_push' ) + self.transaction({ + :event => 'update_recent_viewed', + :data => r, + }) + end + + # activity stream + cache_key = 'user_' + user.id.to_s + '_activity_stream' + activity_stream = CacheIn.get( cache_key ) + if activity_stream && @data[:activity_stream] != activity_stream + @data[:activity_stream] = activity_stream + self.log "push activity_stream for user #{user.id}" + + # send update to browser + r = CacheIn.get( cache_key + '_push' ) + self.transaction({ + :event => 'activity_stream_rebuild', + :collection => 'activity_stream', + :data => r, + }) + end + + # rss + cache_key = 'user_' + user.id.to_s + '_rss' + rss_items = CacheIn.get( cache_key ) + if rss_items && @data[:rss] != rss_items + @data[:rss] = rss_items + self.log "push rss for user #{user.id}" + + # send update to browser + r = CacheIn.get( cache_key + '_push' ) + self.transaction({ + :event => 'rss_rebuild', + :collection => 'dashboard_rss', + :data => r, + }) + end + self.log "---/client-" +# sleep 1 + sleep 1 + end + end + + # add ticket if needed + def ticket( ticket_id, tickets, users ) + if !@pushed[:tickets] + @pushed[:tickets] = {} + end + ticket = Ticket.full_data(ticket_id) + if @pushed[:tickets][ticket_id] != ticket + @pushed[:tickets][ticket_id] = ticket + tickets.push ticket + end + + # add users if needed + self.user( ticket['owner_id'], users ) + self.user( ticket['customer_id'], users ) + self.user( ticket['created_by_id'], users ) + end + + # add user if needed + def user( user_id, users ) + if !@pushed[:users] + @pushed[:users] = {} + end + + # get user + user = User.user_data_full( user_id ) + + # user is already on client and not changed + return if @pushed[:users][ user_id ] == user + @pushed[:users][user_id] = user + + # user not on client or different + self.log 'push user ... ' + user['login'] + users[ user_id ] = user + end + + # send update to browser + def transaction( data ) + Session.transaction( @client_id, data ) + end + + def log( data ) + puts "#{Time.now}:client(#{ @client_id }) #{ data }" + end +end \ No newline at end of file diff --git a/script/websocket-server.rb b/script/websocket-server.rb index c2755ba5f..10991eaee 100644 --- a/script/websocket-server.rb +++ b/script/websocket-server.rb @@ -81,20 +81,25 @@ EventMachine.run { } end - EventMachine.add_periodic_timer(0.4) { + EventMachine.add_periodic_timer(0.2) { puts "loop" @clients.each { |client_id, client| - puts 'checking client...' + client_id.to_s + log 'checking waiting data...', client_id begin queue = Session.queue( client_id ) if queue && queue[0] - puts "send to #{client_id} " + queue.inspect +# log "send " + queue.inspect, client_id + log "send data to client", client_id client[:websocket].send( queue.to_json ) end - rescue - puts 'problem' + rescue => e + log 'problem:' + e.inspect, client_id end } } + + def log( data, client_id ) + puts "#{Time.now}:client(#{ client_id }) #{ data }" + end }