Follow-up 1eef2a0e0
- Websocket session store back end 'File' does not clean up spool messages correctly.
This commit is contained in:
parent
aaa17d72e0
commit
365b3eaf1f
4 changed files with 31 additions and 11 deletions
|
@ -365,20 +365,20 @@ get spool messages
|
||||||
def self.spool_list(timestamp, current_user_id)
|
def self.spool_list(timestamp, current_user_id)
|
||||||
data = []
|
data = []
|
||||||
to_delete = []
|
to_delete = []
|
||||||
@store.each_spool do |message|
|
@store.each_spool do |message, entry|
|
||||||
message_parsed = {}
|
message_parsed = {}
|
||||||
begin
|
begin
|
||||||
spool = JSON.parse(message)
|
spool = JSON.parse(message)
|
||||||
message_parsed = JSON.parse(spool['msg'])
|
message_parsed = JSON.parse(spool['msg'])
|
||||||
rescue => e
|
rescue => e
|
||||||
log('error', "can't parse spool message: #{message}, #{e.inspect}")
|
log('error', "can't parse spool message: #{message}, #{e.inspect}")
|
||||||
to_delete.push message
|
to_delete.push [message, entry]
|
||||||
next
|
next
|
||||||
end
|
end
|
||||||
|
|
||||||
# ignore message older then 48h
|
# ignore message older then 48h
|
||||||
if spool['timestamp'] + (2 * 86_400) < Time.now.utc.to_i
|
if spool['timestamp'] + (2 * 86_400) < Time.now.utc.to_i
|
||||||
to_delete.push message
|
to_delete.push [message, entry]
|
||||||
next
|
next
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -421,8 +421,8 @@ get spool messages
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
to_delete.each do |file|
|
to_delete.each do |item|
|
||||||
@store.remove_from_spool(file)
|
@store.remove_from_spool(*item)
|
||||||
end
|
end
|
||||||
data
|
data
|
||||||
end
|
end
|
||||||
|
|
|
@ -183,13 +183,14 @@ class Sessions::Store::File
|
||||||
message = file.read
|
message = file.read
|
||||||
file.flock(File::LOCK_UN)
|
file.flock(File::LOCK_UN)
|
||||||
|
|
||||||
yield message
|
yield message, entry
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def remove_from_spool(entry)
|
def remove_from_spool(_message, entry)
|
||||||
File.remove "#{path}/#{entry}"
|
path = "#{@path}/spool/"
|
||||||
|
FileUtils.rm "#{path}/#{entry}"
|
||||||
end
|
end
|
||||||
|
|
||||||
def clear_spool
|
def clear_spool
|
||||||
|
|
|
@ -79,11 +79,13 @@ class Sessions::Store::Redis
|
||||||
@redis.rpush SPOOL_KEY, data.to_json
|
@redis.rpush SPOOL_KEY, data.to_json
|
||||||
end
|
end
|
||||||
|
|
||||||
def each_spool(&block)
|
def each_spool()
|
||||||
@redis.lrange(SPOOL_KEY, 0, -1).each(&block)
|
@redis.lrange(SPOOL_KEY, 0, -1).each do |message|
|
||||||
|
yield message, nil
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def remove_from_spool(message)
|
def remove_from_spool(message, _entry)
|
||||||
@redis.lrem SPOOL_KEY, 1, message
|
@redis.lrem SPOOL_KEY, 1, message
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -122,7 +122,24 @@ RSpec.describe 'LongPolling', type: :request do
|
||||||
|
|
||||||
spool_list = Sessions.spool_list(nil, agent.id)
|
spool_list = Sessions.spool_list(nil, agent.id)
|
||||||
expect(spool_list).to eq([{ message: { 'taskbar_id' => 9_391_633 }, type: 'direct' }])
|
expect(spool_list).to eq([{ message: { 'taskbar_id' => 9_391_633 }, type: 'direct' }])
|
||||||
end
|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it 'automatically cleans-up old spool entries' do
|
||||||
|
authenticated_as(agent)
|
||||||
|
Sessions.spool_create({ data: 'my message', event: 'broadcast' })
|
||||||
|
|
||||||
|
# Message found
|
||||||
|
travel 2.seconds
|
||||||
|
expect(Sessions.spool_list(nil, agent.id)).to eq([{ message: 'my message', type: 'broadcast' }])
|
||||||
|
|
||||||
|
# Message expired. In this case spool_list needs to also delete it.
|
||||||
|
travel 4.days
|
||||||
|
expect(Sessions.spool_list(nil, agent.id)).to eq([])
|
||||||
|
|
||||||
|
# Verify that the message was correctly deleted
|
||||||
|
travel(-4.days)
|
||||||
|
expect(Sessions.spool_list(nil, agent.id)).to eq([])
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue