Improved wording.
This commit is contained in:
parent
ef060311b5
commit
6569fe4eaa
1 changed files with 33 additions and 9 deletions
|
@ -2,33 +2,33 @@
|
||||||
|
|
||||||
class Scheduler < ApplicationModel
|
class Scheduler < ApplicationModel
|
||||||
|
|
||||||
def self.run( worker, worker_count )
|
def self.run( runner, runner_count )
|
||||||
|
|
||||||
Thread.abort_on_exception = true
|
Thread.abort_on_exception = true
|
||||||
|
|
||||||
jobs_started = {}
|
jobs_started = {}
|
||||||
while true
|
while true
|
||||||
puts "Scheduler running (worker #{worker} of #{worker_count})..."
|
puts "Scheduler running (runner #{runner} of #{runner_count})..."
|
||||||
|
|
||||||
# read/load jobs and check if it is alredy started
|
# read/load jobs and check if it is alredy started
|
||||||
jobs = Scheduler.where( 'active = ? AND prio = ?', true, worker )
|
jobs = Scheduler.where( 'active = ? AND prio = ?', true, runner )
|
||||||
jobs.each {|job|
|
jobs.each {|job|
|
||||||
next if jobs_started[ job.id ]
|
next if jobs_started[ job.id ]
|
||||||
jobs_started[ job.id ] = true
|
jobs_started[ job.id ] = true
|
||||||
self.start_job( job, worker, worker_count )
|
self.start_job( job, runner, runner_count )
|
||||||
}
|
}
|
||||||
sleep 45
|
sleep 45
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.start_job( job, worker, worker_count )
|
def self.start_job( job, runner, runner_count )
|
||||||
puts "started job thread for '#{job.name}' (#{job.method})..."
|
puts "started job thread for '#{job.name}' (#{job.method})..."
|
||||||
sleep 4
|
sleep 4
|
||||||
|
|
||||||
Thread.new {
|
Thread.new {
|
||||||
if job.period
|
if job.period
|
||||||
while true
|
while true
|
||||||
self._start_job( job, worker, worker_count )
|
self._start_job( job, runner, runner_count )
|
||||||
job = Scheduler.where( :id => job.id ).first
|
job = Scheduler.where( :id => job.id ).first
|
||||||
|
|
||||||
# exit is job got deleted
|
# exit is job got deleted
|
||||||
|
@ -44,7 +44,7 @@ class Scheduler < ApplicationModel
|
||||||
sleep job.period
|
sleep job.period
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
self._start_job( job, worker, worker_count )
|
self._start_job( job, runner, runner_count )
|
||||||
end
|
end
|
||||||
# raise "Exception from thread"
|
# raise "Exception from thread"
|
||||||
job.pid = ''
|
job.pid = ''
|
||||||
|
@ -53,14 +53,38 @@ class Scheduler < ApplicationModel
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
def self._start_job( job, worker, worker_count )
|
def self._start_job( job, runner, runner_count )
|
||||||
puts "execute #{job.method} (worker #{worker} of #{worker_count})..."
|
puts "execute #{job.method} (runner #{runner} of #{runner_count})..."
|
||||||
job.last_run = Time.now
|
job.last_run = Time.now
|
||||||
job.pid = Thread.current.object_id
|
job.pid = Thread.current.object_id
|
||||||
job.save
|
job.save
|
||||||
eval job.method()
|
eval job.method()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def self.worker
|
||||||
|
wait = 10
|
||||||
|
puts "*** Starting worker #{Delayed::Job.to_s}"
|
||||||
|
|
||||||
|
loop do
|
||||||
|
result = nil
|
||||||
|
|
||||||
|
realtime = Benchmark.realtime do
|
||||||
|
result = Delayed::Worker.new.work_off
|
||||||
|
end
|
||||||
|
|
||||||
|
count = result.sum
|
||||||
|
|
||||||
|
break if $exit
|
||||||
|
|
||||||
|
if count.zero?
|
||||||
|
sleep(wait)
|
||||||
|
puts "*** worker loop"
|
||||||
|
else
|
||||||
|
printf "*** #{count} jobs processed at %.4f j/s, %d failed ...\n" % [count / realtime, result.last]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def self.check( name, time_warning = 10, time_critical = 20 )
|
def self.check( name, time_warning = 10, time_critical = 20 )
|
||||||
time_warning_time = Time.now - time_warning.minutes
|
time_warning_time = Time.now - time_warning.minutes
|
||||||
time_critical_time = Time.now - time_critical.minutes
|
time_critical_time = Time.now - time_critical.minutes
|
||||||
|
|
Loading…
Reference in a new issue