Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronously load next records in ActiveRecordCursor #344

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions lib/job-iteration/active_record_cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class ActiveRecordCursor # @private
include Comparable

attr_reader :position
attr_reader :next_relation
attr_accessor :reached_end

class ConditionNotSupportedError < ArgumentError
Expand All @@ -33,6 +34,7 @@ def initialize(relation, columns = nil, position = nil)

@base_relation = relation.reorder(@columns.join(","))
@reached_end = false
@async_enabled = relation.connection.try(:async_enabled?)
end

def <=>(other)
Expand All @@ -59,11 +61,7 @@ def update_from_record(record)
def next_batch(batch_size)
return nil if @reached_end

relation = @base_relation.limit(batch_size)

if (conditions = self.conditions).any?
relation = relation.where(*conditions)
end
relation = (@next_relation || build_relation(batch_size))

records = relation.uncached do
relation.to_a
Expand All @@ -72,7 +70,12 @@ def next_batch(batch_size)
update_from_record(records.last) unless records.empty?
@reached_end = records.size < batch_size

records.empty? ? nil : records
if records.empty?
nil
else
@next_relation = async_preload(batch_size)
records
end
end

protected
Expand All @@ -94,5 +97,24 @@ def conditions
ret.pop
ret
end

private

def build_relation(batch_size)
relation = @base_relation.limit(batch_size)

if (conditions = self.conditions).any?
relation = relation.where(*conditions)
end

relation
end

def async_preload(batch_size)
return unless @async_enabled

relation = build_relation(batch_size)
relation.uncached { relation.load_async }
end
end
end
3 changes: 3 additions & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class Product < ActiveRecord::Base
}
connection_config[:password] = "root" if ENV["CI"]

if ActiveRecord.respond_to?(:async_query_executor)
ActiveRecord.async_query_executor = :global_thread_pool
end
ActiveRecord::Base.establish_connection(connection_config)

Redis.singleton_class.class_eval do
Expand Down
16 changes: 16 additions & 0 deletions test/unit/active_record_cursor_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

require "test_helper"

module JobIteration
class ActiveRecordCursorTest < IterationUnitTest
test "#next_batch preloads the following batch asynchronously" do
skip("Only supported on Rails >= 7") unless Product.connection.try(:async_enabled?)

cursor = ActiveRecordCursor.new(Product.all)
cursor.next_batch(2)

assert_predicate(cursor.next_relation, :scheduled?)
Copy link
Contributor Author

@odlp odlp Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the fence about whether cursor should have an attribute reader for next_relation - it makes testing easier, but perhaps this should be kept private as internal state.

We could also verify the async loading behaviour with mocha:

ActiveRecord::Relation.any_instance.expects(:load_async)

But this felt awkward too in it's own way...

end
end
end