Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Max iters per instance + enqueue after shutdown #99

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def inspected_cursor
:start_time,
:times_interrupted,
:total_time,
:instance_iterations
)

define_callbacks :start
Expand All @@ -42,11 +43,17 @@ def inspected_cursor
end

module ClassMethods
cattr_accessor :max_iters

def method_added(method_name)
ban_perform_definition if method_name.to_sym == :perform
super
end

def max_iters_per_run(value)
self.max_iters = value
end

def on_start(*filters, &blk)
set_callback(:start, :after, *filters, &blk)
end
Expand All @@ -70,6 +77,7 @@ def initialize(*arguments)
super
self.times_interrupted = 0
self.total_time = 0.0
self.instance_iterations = 0
assert_implements_methods!
end
ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true)
Expand Down Expand Up @@ -136,6 +144,8 @@ def interruptible_perform(*arguments)
run_callbacks(:complete)
output_interrupt_summary
end

reenqueue_iteration_job unless completed
end

def iterate_with_enumerator(enumerator, arguments)
Expand All @@ -149,11 +159,11 @@ def iterate_with_enumerator(enumerator, arguments)
found_record = true
each_iteration(object_from_enumerator, *arguments)
self.cursor_position = index
self.instance_iterations += 1
end

next unless job_should_exit?
self.executions -= 1 if executions > 1
reenqueue_iteration_job
return false
end

Expand Down Expand Up @@ -256,6 +266,8 @@ def output_interrupt_summary
end

def job_should_exit?
return true if self.class.max_iters.present? && self.class.max_iters == instance_iterations

if ::JobIteration.max_job_runtime && start_time && (Time.now.utc - start_time) > ::JobIteration.max_job_runtime
return true
end
Expand Down