Refactoring: Migrated BackgroundJobSearchIndex (Delayed::Job) to SearchIndexJob (ActiveJob).
Refactoring: Enable MonitoringController to handle ActiveJob background jobs.
This commit is contained in:
parent
a84edbe764
commit
a211d20db9
7 changed files with 92 additions and 25 deletions
|
@ -99,11 +99,26 @@ curl http://localhost/api/v1/monitoring/health_check?token=XXX
|
||||||
issues.push "#{count_failed_jobs} failing background jobs"
|
issues.push "#{count_failed_jobs} failing background jobs"
|
||||||
end
|
end
|
||||||
|
|
||||||
listed_failed_jobs = failed_jobs.select(:handler, :attempts).limit(10)
|
handler_attempts_map = {}
|
||||||
sorted_failed_jobs = listed_failed_jobs.group_by(&:name).sort_by { |_handler, entries| entries.length }.reverse.to_h
|
failed_jobs.order(:created_at).limit(10).each do |job|
|
||||||
sorted_failed_jobs.each_with_index do |(name, jobs), index|
|
|
||||||
attempts = jobs.map(&:attempts).sum
|
job_name = if job.name == 'ActiveJob::QueueAdapters::DelayedJobAdapter::JobWrapper'.freeze
|
||||||
issues.push "Failed to run background job ##{index += 1} '#{name}' #{jobs.count} time(s) with #{attempts} attempt(s)."
|
job.payload_object.job_data['job_class']
|
||||||
|
else
|
||||||
|
job.name
|
||||||
|
end
|
||||||
|
|
||||||
|
handler_attempts_map[job_name] ||= {
|
||||||
|
count: 0,
|
||||||
|
attempts: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
handler_attempts_map[job_name][:count] += 1
|
||||||
|
handler_attempts_map[job_name][:attempts] += job.attempts
|
||||||
|
end
|
||||||
|
|
||||||
|
Hash[handler_attempts_map.sort].each_with_index do |(job_name, job_data), index|
|
||||||
|
issues.push "Failed to run background job ##{index + 1} '#{job_name}' #{job_data[:count]} time(s) with #{job_data[:attempts]} attempt(s)."
|
||||||
end
|
end
|
||||||
|
|
||||||
# job count check
|
# job count check
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
# Copyright (C) 2012-2016 Zammad Foundation, http://zammad-foundation.org/
|
class SearchIndexJob < ApplicationJob
|
||||||
class BackgroundJobSearchIndex
|
|
||||||
def initialize(object, o_id)
|
retry_on StandardError, attempts: 20
|
||||||
|
|
||||||
|
def perform(object, o_id)
|
||||||
@object = object
|
@object = object
|
||||||
@o_id = o_id
|
@o_id = o_id
|
||||||
end
|
|
||||||
|
|
||||||
def perform
|
|
||||||
record = @object.constantize.lookup(id: @o_id)
|
record = @object.constantize.lookup(id: @o_id)
|
||||||
return if !exists?(record)
|
return if !exists?(record)
|
||||||
|
|
||||||
|
@ -20,9 +20,4 @@ class BackgroundJobSearchIndex
|
||||||
Rails.logger.info "Can't index #{@object}.lookup(id: #{@o_id}), no such record found"
|
Rails.logger.info "Can't index #{@object}.lookup(id: #{@o_id}), no such record found"
|
||||||
false
|
false
|
||||||
end
|
end
|
||||||
|
|
||||||
def max_attempts
|
|
||||||
20
|
|
||||||
end
|
|
||||||
|
|
||||||
end
|
end
|
|
@ -24,7 +24,7 @@ update search index, if configured - will be executed automatically
|
||||||
# start background job to transfer data to search index
|
# start background job to transfer data to search index
|
||||||
return true if !SearchIndexBackend.enabled?
|
return true if !SearchIndexBackend.enabled?
|
||||||
|
|
||||||
Delayed::Job.enqueue(BackgroundJobSearchIndex.new(self.class.to_s, id))
|
SearchIndexJob.perform_later(self.class.to_s, id)
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
24
spec/jobs/search_index_job_spec.rb
Normal file
24
spec/jobs/search_index_job_spec.rb
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
require 'rails_helper'
|
||||||
|
|
||||||
|
RSpec.describe SearchIndexJob, type: :job do
|
||||||
|
|
||||||
|
it 'calls search_index_update_backend on matching record' do
|
||||||
|
user = create(:user)
|
||||||
|
expect(::User).to receive(:lookup).with(id: user.id).and_return(user)
|
||||||
|
expect(user).to receive(:search_index_update_backend)
|
||||||
|
|
||||||
|
described_class.perform_now('User', user.id)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "doesn't perform for non existing records" do
|
||||||
|
id = 9999
|
||||||
|
expect(::User).to receive(:lookup).with(id: id).and_return(nil)
|
||||||
|
described_class.perform_now('User', id)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'retries on exception' do
|
||||||
|
expect(::User).to receive(:lookup).and_raise(RuntimeError)
|
||||||
|
described_class.perform_now('User', 1)
|
||||||
|
expect(SearchIndexJob).to have_been_enqueued
|
||||||
|
end
|
||||||
|
end
|
33
spec/models/concerns/has_search_index_backend_examples.rb
Normal file
33
spec/models/concerns/has_search_index_backend_examples.rb
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
RSpec.shared_examples 'HasSearchIndexBackend' do |indexed_factory:|
|
||||||
|
|
||||||
|
context '#search_index_update', performs_jobs: true do
|
||||||
|
subject { create(indexed_factory) }
|
||||||
|
|
||||||
|
before(:each) do
|
||||||
|
allow(SearchIndexBackend).to receive(:enabled?).and_return(true)
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'record indexing' do
|
||||||
|
|
||||||
|
before(:each) do
|
||||||
|
expect(subject).to be_present
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'indexes on create' do
|
||||||
|
expect(SearchIndexJob).to have_been_enqueued
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'indexes on update' do
|
||||||
|
clear_jobs
|
||||||
|
subject.update(note: 'Updated')
|
||||||
|
expect(SearchIndexJob).to have_been_enqueued
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'indexes on touch' do
|
||||||
|
clear_jobs
|
||||||
|
subject.touch
|
||||||
|
expect(SearchIndexJob).to have_been_enqueued
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -1,8 +1,10 @@
|
||||||
require 'rails_helper'
|
require 'rails_helper'
|
||||||
require 'models/concerns/can_lookup_examples'
|
require 'models/concerns/can_lookup_examples'
|
||||||
|
require 'models/concerns/has_search_index_backend_examples'
|
||||||
|
|
||||||
RSpec.describe Organization do
|
RSpec.describe Organization do
|
||||||
include_examples 'CanLookup'
|
include_examples 'CanLookup'
|
||||||
|
include_examples 'HasSearchIndexBackend', indexed_factory: :organization
|
||||||
|
|
||||||
context '.where_or_cis' do
|
context '.where_or_cis' do
|
||||||
|
|
||||||
|
|
|
@ -372,7 +372,6 @@ RSpec.describe 'Monitoring', type: :request do
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'does check health false' do
|
it 'does check health false' do
|
||||||
|
|
||||||
channel = Channel.find_by(active: true)
|
channel = Channel.find_by(active: true)
|
||||||
channel.status_in = 'ok'
|
channel.status_in = 'ok'
|
||||||
channel.status_out = 'error'
|
channel.status_out = 'error'
|
||||||
|
@ -423,7 +422,7 @@ RSpec.describe 'Monitoring', type: :request do
|
||||||
# health_check - scheduler job count
|
# health_check - scheduler job count
|
||||||
travel 2.seconds
|
travel 2.seconds
|
||||||
8001.times do
|
8001.times do
|
||||||
Delayed::Job.enqueue( BackgroundJobSearchIndex.new('Ticket', 1))
|
SearchIndexJob.perform_later('Ticket', 1)
|
||||||
end
|
end
|
||||||
Scheduler.where(active: true).each do |local_scheduler|
|
Scheduler.where(active: true).each do |local_scheduler|
|
||||||
local_scheduler.last_run = Time.zone.now
|
local_scheduler.last_run = Time.zone.now
|
||||||
|
@ -520,7 +519,6 @@ RSpec.describe 'Monitoring', type: :request do
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'does check failed delayed job', db_strategy: :reset do
|
it 'does check failed delayed job', db_strategy: :reset do
|
||||||
|
|
||||||
# disable elasticsearch
|
# disable elasticsearch
|
||||||
prev_es_config = Setting.get('es_url')
|
prev_es_config = Setting.get('es_url')
|
||||||
Setting.set('es_url', 'http://127.0.0.1:92001')
|
Setting.set('es_url', 'http://127.0.0.1:92001')
|
||||||
|
@ -598,11 +596,11 @@ RSpec.describe 'Monitoring', type: :request do
|
||||||
expect(json_response['message']).to be_truthy
|
expect(json_response['message']).to be_truthy
|
||||||
expect(json_response['issues']).to be_truthy
|
expect(json_response['issues']).to be_truthy
|
||||||
expect(json_response['healthy']).to eq(false)
|
expect(json_response['healthy']).to eq(false)
|
||||||
expect( json_response['message']).to eq("Failed to run background job #1 'BackgroundJobSearchIndex' 1 time(s) with 4 attempt(s).")
|
expect( json_response['message']).to eq("Failed to run background job #1 'SearchIndexJob' 4 time(s) with 4 attempt(s).")
|
||||||
|
|
||||||
# add another job
|
# add another job
|
||||||
manual_added = Delayed::Job.enqueue( BackgroundJobSearchIndex.new('Ticket', 1))
|
manual_added = SearchIndexJob.perform_later('Ticket', 1)
|
||||||
manual_added.update!(attempts: 10)
|
Delayed::Job.find(manual_added.provider_job_id).update!(attempts: 10)
|
||||||
|
|
||||||
# health_check
|
# health_check
|
||||||
get "/api/v1/monitoring/health_check?token=#{token}", params: {}, as: :json
|
get "/api/v1/monitoring/health_check?token=#{token}", params: {}, as: :json
|
||||||
|
@ -612,7 +610,7 @@ RSpec.describe 'Monitoring', type: :request do
|
||||||
expect(json_response['message']).to be_truthy
|
expect(json_response['message']).to be_truthy
|
||||||
expect(json_response['issues']).to be_truthy
|
expect(json_response['issues']).to be_truthy
|
||||||
expect(json_response['healthy']).to eq(false)
|
expect(json_response['healthy']).to eq(false)
|
||||||
expect( json_response['message']).to eq("Failed to run background job #1 'BackgroundJobSearchIndex' 2 time(s) with 14 attempt(s).")
|
expect( json_response['message']).to eq("Failed to run background job #1 'SearchIndexJob' 5 time(s) with 14 attempt(s).")
|
||||||
|
|
||||||
# add another job
|
# add another job
|
||||||
dummy_class = Class.new do
|
dummy_class = Class.new do
|
||||||
|
@ -633,7 +631,7 @@ RSpec.describe 'Monitoring', type: :request do
|
||||||
expect(json_response['message']).to be_truthy
|
expect(json_response['message']).to be_truthy
|
||||||
expect(json_response['issues']).to be_truthy
|
expect(json_response['issues']).to be_truthy
|
||||||
expect(json_response['healthy']).to eq(false)
|
expect(json_response['healthy']).to eq(false)
|
||||||
expect( json_response['message']).to eq("Failed to run background job #1 'BackgroundJobSearchIndex' 2 time(s) with 14 attempt(s).;Failed to run background job #2 'Object' 1 time(s) with 5 attempt(s).")
|
expect( json_response['message']).to eq("Failed to run background job #1 'Object' 1 time(s) with 5 attempt(s).;Failed to run background job #2 'SearchIndexJob' 5 time(s) with 14 attempt(s).")
|
||||||
|
|
||||||
# reset settings
|
# reset settings
|
||||||
Setting.set('es_url', prev_es_config)
|
Setting.set('es_url', prev_es_config)
|
||||||
|
@ -652,7 +650,7 @@ RSpec.describe 'Monitoring', type: :request do
|
||||||
expect(json_response['message']).to be_truthy
|
expect(json_response['message']).to be_truthy
|
||||||
expect(json_response['issues']).to be_truthy
|
expect(json_response['issues']).to be_truthy
|
||||||
expect(json_response['healthy']).to eq(false)
|
expect(json_response['healthy']).to eq(false)
|
||||||
expect( json_response['message']).to eq("13 failing background jobs;Failed to run background job #1 'Object' 8 time(s) with 40 attempt(s).;Failed to run background job #2 'BackgroundJobSearchIndex' 2 time(s) with 14 attempt(s).")
|
expect(json_response['message']).to eq("16 failing background jobs;Failed to run background job #1 'Object' 5 time(s) with 25 attempt(s).;Failed to run background job #2 'SearchIndexJob' 5 time(s) with 14 attempt(s).")
|
||||||
|
|
||||||
# cleanup
|
# cleanup
|
||||||
Delayed::Job.delete_all
|
Delayed::Job.delete_all
|
||||||
|
|
Loading…
Reference in a new issue