Closes #3450 - Add support for Redis as an optional web socket session store back end.

antifascista
Martin Honermeyer 1 year ago committed by Martin Gruner
parent 9415484441
commit 1eef2a0e0f
  1. 3
      .gitignore
  2. 4
      .gitlab-ci.yml
  3. 38
      .gitlab/ci/base.yml
  4. 8
      .gitlab/ci/browser-core.yml
  5. 2
      .gitlab/ci/browser-integration.yml
  6. 3
      .gitlab/ci/pre.yml
  7. 2
      .gitlab/ci/rspec.yml
  8. 4
      .gitlab/ci/rspec/mysql.yml
  9. 4
      .gitlab/ci/rspec/postgresql.yml
  10. 2
      .gitlab/ci/unit/mysql.yml
  11. 2
      .gitlab/ci/unit/postgresql.yml
  12. 111
      .gitlab/configure_environment.rb
  13. 1
      Gemfile
  14. 2
      Gemfile.lock
  15. 3
      config/application.rb
  16. 285
      lib/sessions.rb
  17. 120
      lib/sessions/node.rb
  18. 366
      lib/sessions/store/file.rb
  19. 174
      lib/sessions/store/redis.rb
  20. 62
      script/build/database_config.rb

3
.gitignore vendored

@ -16,6 +16,9 @@
# local backup config file
/contrib/backup/config
# Dynamic environment config for Gitlab
/.gitlab/environment.env
# Third-Party ------------------------------------------------------------------
# The config files / dev tools listed below are optional
# and may not be present on most users' machines

@ -59,6 +59,6 @@ cache:
# Initialize application env
before_script:
- source /etc/profile.d/rvm.sh
- FRESHENVFILE=fresh.env && test -f $FRESHENVFILE && source $FRESHENVFILE
- bundle install -j $(nproc) --path vendor
- bundle exec ruby script/build/database_config.rb
- bundle exec ruby .gitlab/configure_environment.rb
- source .gitlab/environment.env

@ -64,20 +64,46 @@
name: registry.znuny.com/docker/zammad-imap:stable
alias: mail
.docker_redis: &docker_redis
name: redis:latest
alias: redis
# service templates
.services_mysql: &services_mysql
services:
- <<: *docker_mysql
.services_mysql_redis: &services_mysql_redis
variables:
REDIS_URL: "redis://redis:6379"
services:
- <<: *docker_mysql
- <<: *docker_redis
.services_postgresql: &services_postgresql
services:
- <<: *docker_postgresql
.services_postgresql_redis: &services_postgresql_redis
variables:
REDIS_URL: "redis://redis:6379"
services:
- <<: *docker_postgresql
- <<: *docker_redis
.services_mysql_postgresql: &services_mysql_postgresql
services:
- <<: *docker_mysql
- <<: *docker_postgresql
.services_mysql_postgresql_redis: &services_mysql_postgresql_redis
variables:
REDIS_URL: "redis://redis:6379"
services:
- <<: *docker_mysql
- <<: *docker_postgresql
- <<: *docker_redis
.services_postgresql_selenium: &services_postgresql_selenium
services:
- <<: *docker_postgresql
@ -107,6 +133,18 @@
- <<: *docker_selenium
- <<: *docker_imap
.services_mysql_postgresql_elasticsearch_selenium_imap_redis: &services_mysql_postgresql_elasticsearch_selenium_imap_redis
variables:
ELASTICSEARCH_TAG: 'stable'
REDIS_URL: "redis://redis:6379"
services:
- <<: *docker_mysql
- <<: *docker_postgresql
- <<: *docker_elasticsearch
- <<: *docker_selenium
- <<: *docker_imap
- <<: *docker_redis
# we need at least one job to store and include this template
# but we skip this via 'only' -> 'variables' -> '$IGNORE'
# $IGNORE is not defined

@ -19,11 +19,10 @@ include:
- .env_base
- .variables_es
- .variables_app_restart_cmd
- .services_mysql_postgresql_elasticsearch_selenium_imap
- .services_mysql_postgresql_elasticsearch_selenium_imap_redis
variables:
RAILS_ENV: "production"
script:
- env
- script/build/test_slice_tests.sh $TEST_SLICE
- RAILS_ENV=test bundle exec rake db:create
- bundle exec rake zammad:ci:test:start[with_elasticsearch]
@ -54,7 +53,7 @@ include:
extends:
- .env_base
- .variables_app_restart_cmd
- .services_mysql_postgresql
- .services_mysql_postgresql_redis
variables:
RAILS_ENV: "production"
@ -64,9 +63,10 @@ include:
extends:
- .env_base
- .variables_es
- .services_mysql_postgresql_elasticsearch_selenium_imap
- .services_mysql_postgresql_elasticsearch_selenium_imap_redis
variables:
RAILS_ENV: "test"
REDIS_URL: "redis://redis:6379"
.template_browser-core_capybara: &template_browser-core_capybara
extends:

@ -15,7 +15,7 @@ include:
- .env_base
- .variables_app_restart_cmd
- .variables_es
- .services_mysql_postgresql_elasticsearch_selenium_imap
- .services_mysql_postgresql_elasticsearch_selenium_imap_redis
variables:
RAILS_ENV: "test"
script:

@ -28,7 +28,8 @@ zeitwerk_check:
- .services_postgresql
script:
- bundle install -j $(nproc) --path vendor
- bundle exec ruby script/build/database_config.rb
- bundle exec ruby .gitlab/configure_environment.rb
- source .gitlab/environment.env
- bundle exec rake zammad:db:init
- bundle exec rails zeitwerk:check

@ -36,7 +36,7 @@ rspec:integration:
stage: test
extends:
- .env_base
- .services_mysql_postgresql
- .services_mysql_postgresql_redis
- .rspec_integration_rules
variables:
RAILS_ENV: "test"

@ -1,11 +1,11 @@
rspec:mysql:
stage: test
extends:
- .services_mysql
- .services_mysql_redis
- .template_rspec
rspec:mysql:db_reset:
stage: test
extends:
- .services_mysql
- .services_mysql_redis
- .template_rspec_db_reset

@ -1,11 +1,11 @@
rspec:postgresql:
stage: test
extends:
- .services_postgresql
- .services_postgresql_redis
- .template_rspec
rspec:postgresql:db_reset:
stage: test
extends:
- .services_postgresql
- .services_postgresql_redis
- .template_rspec_db_reset

@ -1,5 +1,5 @@
unit:mysql:
stage: test
extends:
- .services_mysql
- .services_mysql_redis
- .template_unit

@ -1,5 +1,5 @@
unit:postgresql:
stage: test
extends:
- .services_postgresql
- .services_postgresql_redis
- .template_unit

@ -0,0 +1,111 @@
#!/usr/bin/env ruby
# Copyright (C) 2012-2021 Zammad Foundation, http://zammad-foundation.org/
require 'yaml'
require 'resolv'
#
# Configures the CI system
# - either (randomly) mysql or postgresql, if it is available
# - (randomly) Redis or File as web socket session back end, if Redis is available
#
# Database config happens directly in config/database.yml, other settings are written to
# .gitlab/environment.env which must be sourced in the CI configuration.
#
class ConfigureEnvironment
@env_file_content = <<~EOF
#!/bin/bash
FRESHENVFILE=fresh.env && test -f $FRESHENVFILE && source $FRESHENVFILE
true
EOF
def self.configure_redis
if ENV['REDIS_URL'].nil? || ENV['REDIS_URL'].empty? # rubocop:disable Rails/Blank
puts 'Redis is not available, using File as web socket session back end.'
return
end
if [true, false].sample
puts 'Using Redis as web socket session back end.'
return
end
puts 'Using File as web socket session back end.'
@env_file_content += "unset REDIS_URL\n"
end
def self.configure_database # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
if File.exist? File.join(__dir__, '../config/database.yml')
puts "'config/database.yml' already exists and will not be changed."
return
end
cnf = YAML.load_file(File.join(__dir__, '../config/database/database.yml'))
cnf.delete('default')
database = ENV['ENFORCE_DB_SERVICE']
# Lookup in /etc/hosts first: gitlab uses that if FF_NETWORK_PER_BUILD is not set.
if !database
hostsfile = '/etc/hosts'
database = %w[postgresql mysql].shuffle.find do |possible_database|
File.foreach(hostsfile).any? { |l| l[possible_database] }
end
end
# Lookup via DNS if needed: gitlab uses that if FF_NETWORK_PER_BUILD is enabled.
if !database
dns = Resolv::DNS.new
dns.timeouts = 3
database = %w[postgresql mysql].shuffle.find do |possible_database|
# Perform a lookup of the database host to check if it is configured as a service.
if dns.getaddress possible_database
next possible_database
end
rescue Resolv::ResolvError
# Ignore DNS lookup errors
end
end
raise "Can't find any supported database." if database.nil?
puts "Using #{database} as database service."
db_settings_map = {
'postgresql' => {
'adapter' => 'postgresql',
'username' => 'zammad',
'password' => 'zammad',
'host' => 'postgresql', # db alias from gitlab-ci.yml
},
'mysql' => {
'adapter' => 'mysql2',
'username' => 'root',
'password' => 'zammad',
'host' => 'mysql', # db alias from gitlab-ci.yml
}
}
# fetch DB settings from settings map and fallback to postgresql
db_settings = db_settings_map.fetch(database) { db_settings_map['postgresql'] }
%w[development test production].each do |environment|
cnf[environment].merge!(db_settings)
end
File.write(File.join(__dir__, '../config/database.yml'), Psych.dump(cnf))
end
def self.write_env_file
File.write(File.join(__dir__, 'environment.env'), @env_file_content)
end
def self.run
configure_redis
configure_database
write_env_file
end
end
ConfigureEnvironment.run

@ -29,6 +29,7 @@ gem 'delayed_job_active_record'
# core - websocket
gem 'em-websocket'
gem 'eventmachine'
gem 'redis', require: false
# core - password security
gem 'argon2'

@ -442,6 +442,7 @@ GEM
rb-inotify (0.10.0)
ffi (~> 1.0)
rchardet (1.8.0)
redis (4.2.5)
regexp_parser (1.8.2)
rest-client (2.0.2)
http-cookie (>= 1.0.2, < 2.0)
@ -666,6 +667,7 @@ DEPENDENCIES
rails-controller-testing
rb-fsevent
rchardet (>= 1.8.0)
redis
rspec-rails
rszr (= 0.5.2)
rubocop

@ -49,6 +49,9 @@ module Zammad
# define cache store
config.cache_store = :zammad_file_store, Rails.root.join('tmp', "cache_file_store_#{Rails.env}"), { expires_in: 7.days }
# define websocket session store
config.websocket_session_store = ENV['REDIS_URL'] ? :redis : :file
# Rails 6.1 returns false when the enqueuing is aborted.
config.active_job.return_false_on_aborted_enqueue = true

@ -2,14 +2,10 @@
module Sessions
# get application root directory
@root = Dir.pwd.to_s
if @root.blank? || @root == '/'
@root = Rails.root
end
# get working directories
@path = "#{@root}/tmp/websocket_#{Rails.env}"
@store = case Rails.application.config.websocket_session_store
when :redis then Sessions::Store::Redis.new
else Sessions::Store::File.new
end
# create global vars for threads
@@client_threads = {} # rubocop:disable Style/ClassVars
@ -27,10 +23,6 @@ returns
=end
def self.create(client_id, session, meta)
path = "#{@path}/#{client_id}"
path_tmp = "#{@path}/tmp/#{client_id}"
session_file = "#{path_tmp}/session"
# collect session data
meta[:last_ping] = Time.now.utc.to_i
data = {
@ -39,19 +31,7 @@ returns
}
content = data.to_json
# store session data in session file
FileUtils.mkpath path_tmp
File.open(session_file, 'wb') do |file|
file.write content
end
# destroy old session if needed
if File.exist?(path)
Sessions.destroy(client_id)
end
# move to destination directory
FileUtils.mv(path_tmp, path)
@store.create(client_id, content)
# send update to browser
return if !session || session['id'].blank?
@ -78,23 +58,7 @@ returns
=end
def self.sessions
path = "#{@path}/"
# just make sure that spool path exists
if !File.exist?(path)
FileUtils.mkpath path
end
data = []
Dir.foreach(path) do |entry|
next if entry == '.'
next if entry == '..'
next if entry == 'tmp'
next if entry == 'spool'
data.push entry.to_s
end
data
@store.sessions
end
=begin
@ -110,13 +74,7 @@ returns
=end
def self.session_exists?(client_id)
session_dir = "#{@path}/#{client_id}"
return false if !File.exist?(session_dir)
session_file = "#{session_dir}/session"
return false if !File.exist?(session_file)
true
@store.session_exists?(client_id)
end
=begin
@ -175,8 +133,7 @@ returns
=end
def self.destroy(client_id)
path = "#{@path}/#{client_id}"
FileUtils.rm_rf path
@store.destroy(client_id)
end
=begin
@ -219,13 +176,8 @@ returns
data = get(client_id)
return false if !data
path = "#{@path}/#{client_id}"
data[:meta][:last_ping] = Time.now.utc.to_i
File.open("#{path}/session", 'wb' ) do |file|
file.flock(File::LOCK_EX)
file.write data.to_json
file.flock(File::LOCK_UN)
end
@store.set(client_id, data)
true
end
@ -250,41 +202,7 @@ returns
=end
def self.get(client_id)
session_dir = "#{@path}/#{client_id}"
session_file = "#{session_dir}/session"
data = nil
# if no session dir exists, session got destoried
if !File.exist?(session_dir)
destroy(client_id)
log('debug', "missing session directory #{session_dir} for '#{client_id}', remove session.")
return
end
# if only session file is missing, then it's an error behavior
if !File.exist?(session_file)
destroy(client_id)
log('error', "missing session file for '#{client_id}', remove session.")
return
end
begin
File.open(session_file, 'rb') do |file|
file.flock(File::LOCK_SH)
all = file.read
file.flock(File::LOCK_UN)
data_json = JSON.parse(all)
if data_json
data = symbolize_keys(data_json)
data[:user] = data_json['user'] # for compat. reasons
end
end
rescue => e
log('error', e.inspect)
destroy(client_id)
log('error', "error in reading/parsing session file '#{session_file}', remove session.")
return
end
data
@store.get client_id
end
=begin
@ -300,34 +218,7 @@ returns
=end
def self.send(client_id, data)
path = "#{@path}/#{client_id}/"
filename = "send-#{Time.now.utc.to_f}"
location = "#{path}#{filename}"
check = true
count = 0
while check
if File.exist?(location)
count += 1
location = "#{path}#{filename}-#{count}"
else
check = false
end
end
return false if !File.directory? path
begin
File.open(location, 'wb') do |file|
file.flock(File::LOCK_EX)
file.write data.to_json
file.flock(File::LOCK_UN)
file.close
end
rescue => e
log('error', e.inspect)
log('error', "error in writing message file '#{location}'")
return false
end
true
@store.send_data(client_id, data)
end
=begin
@ -431,43 +322,7 @@ returns
=end
def self.queue(client_id)
path = "#{@path}/#{client_id}/"
data = []
files = []
Dir.foreach(path) do |entry|
next if entry == '.'
next if entry == '..'
files.push entry
end
files.sort.each do |entry|
next if !entry.start_with?('send')
message = Sessions.queue_file_read(path, entry)
next if !message
data.push message
end
data
end
def self.queue_file_read(path, filename)
location = "#{path}#{filename}"
message = ''
File.open(location, 'rb') do |file|
file.flock(File::LOCK_EX)
message = file.read
file.flock(File::LOCK_UN)
end
File.delete(location)
return if message.blank?
begin
JSON.parse(message)
rescue => e
log('error', "can't parse queue message: #{message}, #{e.inspect}")
nil
end
@store.queue(client_id)
end
=begin
@ -479,10 +334,7 @@ remove all session and spool messages
=end
def self.cleanup
return true if !File.exist?(@path)
FileUtils.rm_rf @path
true
@store.cleanup
end
=begin
@ -495,18 +347,11 @@ create spool messages
def self.spool_create(data)
msg = JSON.generate(data)
path = "#{@path}/spool/"
FileUtils.mkpath path
data = {
msg: msg,
timestamp: Time.now.utc.to_i,
}
file_path = "#{path}/#{Time.now.utc.to_f}-#{rand(99_999)}"
File.open(file_path, 'wb') do |file|
file.flock(File::LOCK_EX)
file.write data.to_json
file.flock(File::LOCK_UN)
end
@store.add_to_spool(data)
end
=begin
@ -518,84 +363,66 @@ get spool messages
=end
def self.spool_list(timestamp, current_user_id)
path = "#{@path}/spool/"
FileUtils.mkpath path
data = []
to_delete = []
files = []
Dir.foreach(path) do |entry|
next if entry == '.'
next if entry == '..'
files.push entry
end
files.sort.each do |entry|
filename = "#{path}/#{entry}"
next if !File.exist?(filename)
File.open(filename, 'rb') do |file|
file.flock(File::LOCK_SH)
message = file.read
file.flock(File::LOCK_UN)
message_parsed = {}
begin
spool = JSON.parse(message)
message_parsed = JSON.parse(spool['msg'])
rescue => e
log('error', "can't parse spool message: #{message}, #{e.inspect}")
to_delete.push "#{path}/#{entry}"
next
end
# ignore message older then 48h
if spool['timestamp'] + (2 * 86_400) < Time.now.utc.to_i
to_delete.push "#{path}/#{entry}"
next
end
# add spool attribute to push spool info to clients
message_parsed['spool'] = true
@store.each_spool do |message|
message_parsed = {}
begin
spool = JSON.parse(message)
message_parsed = JSON.parse(spool['msg'])
rescue => e
log('error', "can't parse spool message: #{message}, #{e.inspect}")
to_delete.push message
next
end
# only send not already older messages
if !timestamp || timestamp < spool['timestamp']
# ignore message older then 48h
if spool['timestamp'] + (2 * 86_400) < Time.now.utc.to_i
to_delete.push message
next
end
# spool to recipient list
if message_parsed['recipient'] && message_parsed['recipient']['user_id']
# add spool attribute to push spool info to clients
message_parsed['spool'] = true
message_parsed['recipient']['user_id'].each do |user_id|
# only send not already older messages
if !timestamp || timestamp < spool['timestamp']
next if current_user_id != user_id
# spool to recipient list
if message_parsed['recipient'] && message_parsed['recipient']['user_id']
message = message_parsed
if message_parsed['event'] == 'broadcast'
message = message_parsed['data']
end
message_parsed['recipient']['user_id'].each do |user_id|
item = {
type: 'direct',
message: message,
}
data.push item
end
next if current_user_id != user_id
# spool to every client
else
message = message_parsed
if message_parsed['event'] == 'broadcast'
message = message_parsed['data']
end
item = {
type: 'broadcast',
type: 'direct',
message: message,
}
data.push item
end
# spool to every client
else
message = message_parsed
if message_parsed['event'] == 'broadcast'
message = message_parsed['data']
end
item = {
type: 'broadcast',
message: message,
}
data.push item
end
end
end
to_delete.each do |file|
File.delete(file)
@store.remove_from_spool(file)
end
data
end
@ -609,17 +436,11 @@ delete spool messages
=end
def self.spool_delete
path = "#{@path}/spool/"
FileUtils.rm_rf path
@store.clear_spool
end
def self.jobs(node_id = nil)
# just make sure that spool path exists
if !File.exist?(@path)
FileUtils.mkpath(@path)
end
# dispatch sessions
if node_id.blank? && ENV['ZAMMAD_SESSION_JOBS_CONCURRENT'].to_i.positive?

@ -2,14 +2,10 @@
module Sessions::Node
# get application root directory
@root = Dir.pwd.to_s
if @root.blank? || @root == '/'
@root = Rails.root
end
# get working directories
@path = "#{@root}/tmp/session_node_#{Rails.env}"
@store = case Rails.application.config.websocket_session_store
when :redis then Sessions::Store::Redis.new
else Sessions::Store::File.new
end
def self.session_assigne(client_id, force = false)
@ -41,90 +37,34 @@ module Sessions::Node
end
def self.cleanup
FileUtils.rm_rf @path
@store.clear_nodes
end
def self.registered
path = "#{@path}/*.status"
nodes = []
files = Dir.glob(path)
files.each do |filename|
File.open(filename, 'rb') do |file|
file.flock(File::LOCK_SH)
content = file.read
file.flock(File::LOCK_UN)
begin
data = JSON.parse(content)
nodes.push data
rescue => e
Rails.logger.error "can't parse status file #{filename}, #{e.inspect}"
#to_delete.push "#{path}/#{entry}"
#next
end
end
end
nodes
@store.nodes
end
def self.register(node_id)
if !File.exist?(@path)
FileUtils.mkpath @path
end
status_file = "#{@path}/#{node_id}.status"
# write node status file
data = {
updated_at_human: Time.now.utc,
updated_at: Time.now.utc.to_i,
node_id: node_id.to_s,
pid: $PROCESS_ID,
}
content = data.to_json
# store session data in session file
File.open(status_file, 'wb') do |file|
file.write content
end
@store.add_node node_id, data
end
def self.stats
# read node sessions
path = "#{@path}/*.session"
sessions = {}
files = Dir.glob(path)
files.each do |filename|
File.open(filename, 'rb') do |file|
file.flock(File::LOCK_SH)
content = file.read
file.flock(File::LOCK_UN)
begin
next if content.blank?
data = JSON.parse(content)
next if data.blank?
next if data['client_id'].blank?
sessions[data['client_id']] = data['node_id']
rescue => e
Rails.logger.error "can't parse session file #{filename}, #{e.inspect}"
#to_delete.push "#{path}/#{entry}"
#next
end
end
@store.each_node_session do |data|
next if data['client_id'].blank?
sessions[data['client_id']] = data['node_id']
end
sessions
end
def self.sessions_for(node_id, client_id)
if !File.exist?(@path)
FileUtils.mkpath @path
end
status_file = "#{@path}/#{node_id}.#{client_id}.session"
# write node status file
data = {
updated_at_human: Time.now.utc,
@ -133,42 +73,16 @@ module Sessions::Node
client_id: client_id.to_s,
pid: $PROCESS_ID,
}
content = data.to_json
# store session data in session file
File.open(status_file, 'wb') do |file|
file.write content
end
@store.create_node_session node_id, client_id, data
end
def self.sessions_by(node_id, force = false)
# read node sessions
path = "#{@path}/#{node_id}.*.session"
sessions = []
files = Dir.glob(path)
files.each do |filename|
File.open(filename, 'rb') do |file|
file.flock(File::LOCK_SH)
content = file.read
file.flock(File::LOCK_UN)
begin
next if content.blank?
data = JSON.parse(content)
next if data.blank?
next if data['client_id'].blank?
next if !Sessions.session_exists?(data['client_id']) && force == false
sessions.push data['client_id']
rescue => e
Rails.logger.error "can't parse session file #{filename}, #{e.inspect}"
#to_delete.push "#{path}/#{entry}"
#next
end
end
@store.each_session_by_node(node_id) do |data|
next if data['client_id'].blank?
next if !Sessions.session_exists?(data['client_id']) && force == false
sessions.push data['client_id']
end
sessions
end

@ -0,0 +1,366 @@
# Copyright (C) 2012-2021 Zammad Foundation, http://zammad-foundation.org/
class Sessions::Store::File
def initialize
# get application root directory
@root = Dir.pwd.to_s
if @root.blank? || @root == '/'
@root = Rails.root
end
# get working directories
@path = "#{@root}/tmp/websocket_#{Rails.env}"
@nodes_path = "#{@root}/tmp/session_node_#{Rails.env}"
end
def create(client_id, content)
path = "#{@path}/#{client_id}"
path_tmp = "#{@path}/tmp/#{client_id}"
session_file = "#{path_tmp}/session"
# store session data in session file
FileUtils.mkpath path_tmp
File.open(session_file, 'wb') do |file|
file.write content
end
# destroy old session if needed
if File.exist?(path)
destroy(client_id)
end
# move to destination directory
FileUtils.mv(path_tmp, path)
end
def sessions
path = "#{@path}/"
# just make sure that spool path exists
if !File.exist?(path)
FileUtils.mkpath path
end
data = []
Dir.foreach(path) do |entry|
next if entry == '.'
next if entry == '..'
next if entry == 'tmp'
next if entry == 'spool'
data.push entry.to_s
end
data
end
def session_exists?(client_id)
session_dir = "#{@path}/#{client_id}"
return false if !File.exist?(session_dir)
session_file = "#{session_dir}/session"
return false if !File.exist?(session_file)
true
end
def destroy(client_id)
path = "#{@path}/#{client_id}"
FileUtils.rm_rf path
end
def set(client_id, data)
path = "#{@path}/#{client_id}"
File.open("#{path}/session", 'wb' ) do |file|
file.flock(File::LOCK_EX)
file.write data.to_json
file.flock(File::LOCK_UN)
end
end
def get(client_id)
session_dir = "#{@path}/#{client_id}"
session_file = "#{session_dir}/session"
data = nil
return if !check_session_file_for_client(client_id, session_dir, session_file)
begin
File.open(session_file, 'rb') do |file|
file.flock(File::LOCK_SH)
all = file.read
file.flock(File::LOCK_UN)
data_json = JSON.parse(all)
if data_json
data = Sessions.symbolize_keys(data_json)
data[:user] = data_json['user'] # for compat. reasons
end
end
rescue => e
Sessions.log('error', e.inspect)
destroy(client_id)
Sessions.log('error', "error in reading/parsing session file '#{session_file}', remove session.")
return
end
data
end
def send_data(client_id, data)
location = new_message_filename_for(client_id)
return false if !location
begin
File.open(location, 'wb') do |file|
file.flock(File::LOCK_EX)
file.write data.to_json
file.flock(File::LOCK_UN)
file.close
end
rescue => e
Sessions.log('error', e.inspect)
Sessions.log('error', "error in writing message file '#{location}'")
return false
end
true
end
def queue(client_id)
path = "#{@path}/#{client_id}/"
data = []
files = []
Dir.foreach(path) do |entry|
next if entry == '.'
next if entry == '..'
files.push entry
end
files.sort.each do |entry|
next if !entry.start_with?('send')
message = queue_file_read(path, entry)
next if !message
data.push message
end
data
end
def cleanup
return true if !File.exist?(@path)
FileUtils.rm_rf @path
true
end
def add_to_spool(data)
path = "#{@path}/spool/"
FileUtils.mkpath path
file_path = "#{path}/#{Time.now.utc.to_f}-#{rand(99_999)}"
File.open(file_path, 'wb') do |file|
file.flock(File::LOCK_EX)
file.write data.to_json
file.flock(File::LOCK_UN)
end
end
def each_spool()
path = "#{@path}/spool/"
FileUtils.mkpath path
files = []
Dir.foreach(path) do |entry|
next if entry == '.'
next if entry == '..'
files.push entry
end
files.sort.each do |entry|
filename = "#{path}/#{entry}"
next if !File.exist?(filename)
File.open(filename, 'rb') do |file|
file.flock(File::LOCK_SH)
message = file.read
file.flock(File::LOCK_UN)
yield message
end
end
end
def remove_from_spool(entry)
File.remove "#{path}/#{entry}"
end
def clear_spool
path = "#{@path}/spool/"
FileUtils.rm_rf path
end
### Node-specific methods ###
def clear_nodes
FileUtils.rm_rf @nodes_path
end
def nodes
path = "#{@nodes_path}/*.status"
nodes = []
files = Dir.glob(path)
files.each do |filename|
File.open(filename, 'rb') do |file|
file.flock(File::LOCK_SH)
content = file.read
file.flock(File::LOCK_UN)
begin
data = JSON.parse(content)
nodes.push data
rescue => e
Rails.logger.error "can't parse status file #{filename}, #{e.inspect}"
#to_delete.push "#{path}/#{entry}"
#next
end
end
end
nodes
end
def add_node(node_id, data)
if !File.exist?(@nodes_path)
FileUtils.mkpath @nodes_path
end
status_file = "#{@nodes_path}/#{node_id}.status"
content = data.to_json
# store session data in session file
File.open(status_file, 'wb') do |file|
file.write content
end
end
def each_node_session()
# read node sessions
path = "#{@nodes_path}/*.session"
files = Dir.glob(path)
files.each do |filename|
File.open(filename, 'rb') do |file|
file.flock(File::LOCK_SH)
content = file.read
file.flock(File::LOCK_UN)
begin
next if content.blank?
data = JSON.parse(content)
next if data.blank?
yield data
rescue => e
Rails.logger.error "can't parse session file #{filename}, #{e.inspect}"
#to_delete.push "#{path}/#{entry}"
#next
end
end
end
end
def create_node_session(node_id, client_id, data)
if !File.exist?(@nodes_path)
FileUtils.mkpath @nodes_path
end
status_file = "#{@nodes_path}/#{node_id}.#{client_id}.session"
content = data.to_json
# store session data in session file
File.open(status_file, 'wb') do |file|
file.write content
end
end
def each_session_by_node(node_id)
# read node sessions
path = "#{@nodes_path}/#{node_id}.*.session"
files = Dir.glob(path)
files.each do |filename|
File.open(filename, 'rb') do |file|
file.flock(File::LOCK_SH)
content = file.read
file.flock(File::LOCK_UN)
begin
next if content.blank?
data = JSON.parse(content)
next if data.blank?
yield data
rescue => e
Rails.logger.error "can't parse session file #{filename}, #{e.inspect}"
#to_delete.push "#{path}/#{entry}"
#next
end
end
end
end
private
def queue_file_read(path, filename)
location = "#{path}#{filename}"
message = ''
File.open(location, 'rb') do |file|
file.flock(File::LOCK_EX)
message = file.read
file.flock(File::LOCK_UN)
end
File.delete(location)
return if message.blank?
begin
JSON.parse(message)
rescue => e
Sessions.log('error', "can't parse queue message: #{message}, #{e.inspect}")
nil
end
end
def check_session_file_for_client(client_id, session_dir, session_file)
# if no session dir exists, session got destoried
if !File.exist?(session_dir)
destroy(client_id)
Sessions.log('debug', "missing session directory #{session_dir} for '#{client_id}', remove session.")
return false
end
# if only session file is missing, then it's an error behavior
if !File.exist?(session_file)
destroy(client_id)
Sessions.log('error', "missing session file for '#{client_id}', remove session.")
return false
end
true
end
def new_message_filename_for(client_id)
path = "#{@path}/#{client_id}/"
filename = "send-#{Time.now.utc.to_f}"
location = "#{path}#{filename}"
check = true
count = 0
while check
if File.exist?(location)
count += 1
location = "#{path}#{filename}-#{count}"
else
check = false
end
end
return nil if !File.directory? path
location
end
end

@ -0,0 +1,174 @@
# Copyright (C) 2012-2021 Zammad Foundation, http://zammad-foundation.org/
class Sessions::Store::Redis
SESSIONS_KEY = 'sessions'.freeze
MESSAGES_KEY = 'messages'.freeze
SPOOL_KEY = 'spool'.freeze
NODES_KEY = 'nodes'.freeze
def initialize
# Only load redis if it is really used.
require 'redis'
@redis = Redis.new
end
def create(client_id, data)
@redis.set client_session_key(client_id), data
@redis.sadd SESSIONS_KEY, client_id
end
def sessions
@redis.smembers SESSIONS_KEY
end
def session_exists?(client_id)
@redis.sismember SESSIONS_KEY, client_id
end
def destroy(client_id)
@redis.srem SESSIONS_KEY, client_id
@redis.del client_session_key(client_id)
@redis.del client_messages_key(client_id)
end
def set(client_id, data)
@redis.set client_session_key(client_id), data.to_json
end
def get(client_id)
data = nil
# if only session is missing, then it's an error behavior
session = @redis.get client_session_key(client_id)
if !session
destroy(client_id)
Sessions.log('error', "missing session value for '#{client_id}', removing session.")
return
end
data_json = JSON.parse(session)
if data_json
data = Sessions.symbolize_keys(data_json)
data[:user] = data_json['user'] # for compat. reasons
end
data
end
def send_data(client_id, data)
@redis.rpush(client_messages_key(client_id), data.to_json).positive?
end
def queue(client_id)
data = []
while (item = @redis.lpop(client_messages_key(client_id)))
data.push JSON.parse(item)
end
data