Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] LimitEnumerator #424

Open
N1koKouloufakos opened this issue Aug 24, 2023 · 0 comments
Open

[Feature] LimitEnumerator #424

N1koKouloufakos opened this issue Aug 24, 2023 · 0 comments

Comments

@N1koKouloufakos
Copy link

What

An enumerator that stops after a certain number of elements have been yielded.

Why

The use case we have:
we are iterating through results from an API and we want to only iterate over 10_000 results in this job. Because this job is run periodically, we want every job to have a hard, defined stopping point.

The ThrottleEnumerator is really similar, but it re-enqueues itself if a condition is hit. I want an enumerator that will shutdown the job

How

Because the job can shutdown at any time, and we want the limit respected regardless of retries, I've added a counter variable that gets serialized and deserialized so that the enumerator knows how many more elements to yield regardless of how many times the job was re-queued.

Code

# typed: true
# frozen_string_literal: true

module JobIteration
  module Limitable
    attr_accessor :counter

    def serialize
      super.merge("counter" => counter)
    end

    def deserialize(job_data)
      super
      @counter = job_data["counter"]
    end

    # This method could probably be moved into enumerator_builder.rb
    def build_limit_enumerator(enum, limit:)
      LimitEnumerator.new(enum, self, limit: limit, counter: counter || 0).to_enum
    end

    # LimitEnumerator allows you to limit iterations
    # to a specific number of items.
    # @example
    #   def build_enumerator(_params, cursor:)
    #     build_limit_enumerator(
    #       enumerator_builder.active_record_on_batches(
    #         Account.inactive,
    #         cursor: cursor
    #       ),
    #       limit: 1_000,
    #     )
    #   end
    # The enumerator from above will mimic the provided enumerator, active_record_on_batches,
    # except when over 1_000 items have been yielded from the enumerator.
    # In that case, it will `throw :abort` which will quietly shutdown the job.
    # This enumerator works by serializing a counter variable into the job's metadata if it shuts down, so that
    # when the job is re-enqueued, it can pick up where it left off and remember how many items it has already yielded.
    class LimitEnumerator
      attr_accessor :enum, :job, :limit, :counter

      def initialize(enum, job, limit:, counter:)
        @enum = enum
        @job = job
        @limit = limit
        @counter = counter
      end

      def to_enum
        Enumerator.new do |yielder|
          @enum.each do |*val|
            if should_stop?
              ActiveSupport::Notifications.instrument("limited.iteration", job_class: @job.class.name)
              throw(:abort)
            end

            @counter += 1
            yielder.yield(*val)
          end
        end
      end

      def should_stop?
        @counter >= @limit
      end
    end
  end
end

Usage:

class MyLimitedIterationJob < ActiveJob::Base
  include JobIteration::Iteration
  include JobIteration::Limitable

  def build_enumerator(cursor:)
    build_limited_enumerator(
       enumerator_builder.active_record_on_batches(
          Account.inactive,
          cursor: cursor
         ),
         limit: 1_000,
    )
  end

  def each_iteration(batch)
    # Do work
  end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant