You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
An enumerator that stops after a certain number of elements have been yielded.
Why
The use case we have:
we are iterating through results from an API and we want to only iterate over 10_000 results in this job. Because this job is run periodically, we want every job to have a hard, defined stopping point.
The ThrottleEnumerator is really similar, but it re-enqueues itself if a condition is hit. I want an enumerator that will shutdown the job
How
Because the job can shutdown at any time, and we want the limit respected regardless of retries, I've added a counter variable that gets serialized and deserialized so that the enumerator knows how many more elements to yield regardless of how many times the job was re-queued.
Code
# typed: true# frozen_string_literal: truemoduleJobIterationmoduleLimitableattr_accessor:counterdefserializesuper.merge("counter"=>counter)enddefdeserialize(job_data)super@counter=job_data["counter"]end# This method could probably be moved into enumerator_builder.rbdefbuild_limit_enumerator(enum,limit:)LimitEnumerator.new(enum,self,limit: limit,counter: counter || 0).to_enumend# LimitEnumerator allows you to limit iterations# to a specific number of items.# @example# def build_enumerator(_params, cursor:)# build_limit_enumerator(# enumerator_builder.active_record_on_batches(# Account.inactive,# cursor: cursor# ),# limit: 1_000,# )# end# The enumerator from above will mimic the provided enumerator, active_record_on_batches,# except when over 1_000 items have been yielded from the enumerator.# In that case, it will `throw :abort` which will quietly shutdown the job.# This enumerator works by serializing a counter variable into the job's metadata if it shuts down, so that# when the job is re-enqueued, it can pick up where it left off and remember how many items it has already yielded.classLimitEnumeratorattr_accessor:enum,:job,:limit,:counterdefinitialize(enum,job,limit:,counter:)@enum=enum@job=job@limit=limit@counter=counterenddefto_enumEnumerator.newdo |yielder|
@enum.eachdo |*val|
ifshould_stop?ActiveSupport::Notifications.instrument("limited.iteration",job_class: @job.class.name)throw(:abort)end@counter += 1yielder.yield(*val)endendenddefshould_stop?@counter >= @limitendendendend
Usage:
classMyLimitedIterationJob < ActiveJob::BaseincludeJobIteration::IterationincludeJobIteration::Limitabledefbuild_enumerator(cursor:)build_limited_enumerator(enumerator_builder.active_record_on_batches(Account.inactive,cursor: cursor),limit: 1_000,)enddefeach_iteration(batch)# Do workendend
The text was updated successfully, but these errors were encountered:
What
An enumerator that stops after a certain number of elements have been yielded.
Why
The use case we have:
we are iterating through results from an API and we want to only iterate over 10_000 results in this job. Because this job is run periodically, we want every job to have a hard, defined stopping point.
The
ThrottleEnumerator
is really similar, but it re-enqueues itself if a condition is hit. I want an enumerator that will shutdown the jobHow
Because the job can shutdown at any time, and we want the limit respected regardless of retries, I've added a
counter
variable that gets serialized and deserialized so that the enumerator knows how many more elements to yield regardless of how many times the job was re-queued.Code
Usage:
The text was updated successfully, but these errors were encountered: