-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Serialize Cursors #81
base: main
Are you sure you want to change the base?
Conversation
@@ -9,6 +9,8 @@ module JobIteration | |||
|
|||
INTEGRATIONS = [:resque, :sidekiq] | |||
|
|||
Deprecation = ActiveSupport::Deprecation.new("2.0", "JobIteration") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally used ActiveSupport::Deprecation.warn
directly, that labelled deprecations as coming from Rails. This uses our own object, but does mean that the host app's settings for ActiveSupport::Deprecation
do not propagate (e.g. disallowed_deprecations
).
@@ -35,7 +35,7 @@ module IntegrationBehaviour | |||
end | |||
|
|||
test "unserializable corruption is prevented" do | |||
skip "Deferred until 2.0.0" | |||
skip "Breaking change deferred until 2.0" if Gem::Version.new(JobIteration::VERSION) < Gem::Version.new("2.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put this in to ensure the change makes it into 2.0
test/unit/iteration_test.rb
Outdated
@@ -66,6 +66,7 @@ def each_iteration(*) | |||
class InvalidCursorJob < ActiveJob::Base | |||
include JobIteration::Iteration | |||
def each_iteration(*) | |||
return if Gem::Version.new(JobIteration::VERSION) < Gem::Version.new("2.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise here. We can't bump to 2.0 without also making unsafe cursors an error.
lib/job-iteration/iteration.rb
Outdated
"Strings, Integers, Floats, Arrays, Hashes, true, false, or nil.", | ||
cursor: cursor, | ||
) | ||
Deprecation.warn(<<~DEPRECATION_MESSAGE.chomp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally looked at using deprecation_warning
(as mentioned here), which has a much richer built in message. However, it is method oriented, and we aren't getting rid of any method.
lib/job-iteration/iteration.rb
Outdated
) | ||
Deprecation.warn(<<~DEPRECATION_MESSAGE.chomp) | ||
Cursor must be composed of objects capable of built-in (de)serialization: | ||
Strings, Integers, Floats, Arrays, Hashes, true, false, or nil. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also allow to pass Symbol as part of the cursor?
Yesterday we noticed that having a Hash with keys as symbols also broke. Hash with symbols should be accepted as well 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Symbols are not safe to serialize as JSON, because they are deserialized as Strings:
$ ruby -rjson -e 'p JSON.parse(JSON.dump(:symbol))'
"symbol"
cursor_position
is serialized by the queue adapter (Resque/Sidekiq), not by ActiveJob itself, so none of Rails' magic symbol serialization happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation
test/unit/iteration_test.rb
Outdated
error.message, | ||
) | ||
def assert_cursor_deprecation_warning | ||
original_behaviour = JobIteration::Deprecation.behavior |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we simplify this test assertion with something like:
JobIteration::Deprecation.any_instance.expects(:warn)
Feels that the current implementation is quite complex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could, I just wanted to make sure the deprecation message was formatted correctly.
I'll simplify the deprecation warning message 👍 |
aed4b36
to
123ac38
Compare
I've updated the description to be simpler, and slightly tweaked the test helpers. Should be good for final review. CI on Ruby 2.5 will start passing again once sorbet/sorbet#4281 is merged and a new version of |
123ac38
to
c148ad4
Compare
c148ad4
to
f8eb4f4
Compare
I've circled back to this and made some changes. This PR still introduces the deprecation warning, however it also introduces a config to force the new behavior. This config can be set globally and/or per-job, allowing consumers to globally opt-in to enforcement ahead of 2.0, and optionally opt-out per job until they are made compliant, or alternatively opt-in per compliant job until they are able to opt-in globally. This should allow for a smoother transition. Version 2.0 would simply delete (or no-op & deprecate) the config and instruct consumers to remove it from their code. I've also added documentation about this, which applies regardless of the deprecation/error. |
f8eb4f4
to
33112f0
Compare
lib/job-iteration.rb
Outdated
@@ -20,6 +22,25 @@ module JobIteration | |||
# Defaults to nil which means that jobs will not be interrupted except on termination signal. | |||
attr_accessor :max_job_runtime | |||
|
|||
# Set this to `true` to enforce that cursors be composed of objects capable | |||
# of built-in (de)serialization: Strings, Integers, Floats, Arrays, Hashes, | |||
# true, false, or nil. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it have to be only these built-in primitives? Active Job allows custom serializers to be defined for parameters, like we use in our Money gem. Would be nice if we can piggyback on top of this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... Good question. If Active Job performs a serialization step before handing off the job to the backend, and if we can test that an object is serializable by it, then that should work. I shall have to look into this.
Indeed my goal is not to be restrictive, but to prevent foot-guns, as my team encountered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, so with the code as-is, no, we couldn't rely on this behavior, as Active Job only serializes arguments
, and doesn't bother with the rest of the job data.
However, an alternative approach might be to serialize and deserialize the cursor ourselves, since we do already override those methods:
job-iteration/lib/job-iteration/iteration.rb
Lines 79 to 92 in 17e6ec5
def serialize # @private | |
super.merge( | |
"cursor_position" => cursor_position, | |
"times_interrupted" => times_interrupted, | |
"total_time" => total_time, | |
) | |
end | |
def deserialize(job_data) # @private | |
super | |
self.cursor_position = job_data["cursor_position"] | |
self.times_interrupted = job_data["times_interrupted"] || 0 | |
self.total_time = job_data["total_time"] || 0 | |
end |
It's a bit awkward because the serialization API expects an Array
of arguments, but we could conceivably do something like
def serialize # @private
super.merge(
"cursor_position" => ActiveJob::Arguments.serialize([cursor_position]).first,
"times_interrupted" => times_interrupted,
"total_time" => total_time,
)
end
def deserialize(job_data) # @private
super
self.cursor_position = ActiveJob::Arguments.deserialize([job_data["cursor_position"]]).first
self.times_interrupted = job_data["times_interrupted"] || 0
self.total_time = job_data["total_time"] || 0
end
Another consideration is that the enforcement API here happens on every cursor, regardless of if the job is interrupted, which has pros and cons compared to the serialization approach:
- Checking every cursor involves more computations than only serializing as needed (and blowing up if it fails)
- Checking every cursor gives you instant feedback immediately after the enumerator produces an unserializable cursor (usually before the first call to
each_iteration
), rather than only if the job is interrupted and the cursor requires serialization
I suppose we could keep cursor serializability enforced for each cursor by simply attempting to serialize it, but I'm not sure how much more expensive that is compared to the naive class checking approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8186aad
to
eb2859f
Compare
Alright, I've updated the branch again, this time to leverage |
CHANGELOG.md
Outdated
@@ -2,6 +2,8 @@ | |||
|
|||
- [241](https://github.com/Shopify/job-iteration/pull/241) - Require Ruby 2.7+, dropping 2.6 support | |||
- [241](https://github.com/Shopify/job-iteration/pull/241) - Require Rails 6.0+, dropping 5.2 support | |||
- [80](https://github.com/Shopify/job-iteration/pull/80) - Deprecate un(de)serializable cursors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably update this to "Serialize cursors"
lib/job-iteration/iteration.rb
Outdated
def serialize_cursor(cursor) | ||
ActiveJob::Arguments.serialize([cursor]).first | ||
rescue ActiveJob::SerializationError | ||
raise if job_iteration_enforce_serializable_cursors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This used to wrap the error in a custom CursorError
, but I figure it might be adequate to just re-raise the ActiveJob::SerializationError
.
include JobIteration::Iteration | ||
UnserializableCursor = Class.new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're getting all of ActiveJob
's serialization now, we have to explicitly create an object that cannot be serialized. BasicObject
doesn't work though, because that straight up blows up ActiveJob
because it doesn't respond to respond_to?
😅
test/unit/iteration_test.rb
Outdated
UnserializableCursor = Class.new | ||
SerializableCursor = Class.new | ||
|
||
class SerializableCursorSerializer < ActiveJob::Serializers::ObjectSerializer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, I could make it work with GlobalID
.
eb2859f
to
e96553e
Compare
if job_data.key?("serialized_cursor_position") | ||
deserialize_cursor(job_data.fetch("serialized_cursor_position")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated this so if a serialized cursor is found, it should always be deserializable.
test/unit/iteration_test.rb
Outdated
def test_job_interrupted_with_only_cursor_position_should_resume | ||
with_interruption do | ||
InfiniteCursorLoggingJob.perform_now | ||
|
||
work_one_job do |job_data| | ||
job_data["cursor_position"] = "raw cursor" | ||
job_data.delete("serialized_cursor_position") | ||
end | ||
|
||
assert_equal("raw cursor", InfiniteCursorLoggingJob.cursors.last) | ||
end | ||
ensure | ||
InfiniteCursorLoggingJob.cursors.clear | ||
end | ||
|
||
def test_job_interrupted_with_serialized_cursor_position_should_ignore_unserialized_cursor_position | ||
with_interruption do | ||
InfiniteCursorLoggingJob.perform_now | ||
|
||
work_one_job do |job_data| | ||
job_data["cursor_position"] = "should be ignored" | ||
job_data["serialized_cursor_position"] = ActiveJob::Arguments.serialize([SerializableCursor.new]).first | ||
end | ||
|
||
assert_instance_of(SerializableCursor, InfiniteCursorLoggingJob.cursors.last) | ||
end | ||
ensure | ||
InfiniteCursorLoggingJob.cursors.clear | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if I'm missing something, but I wasn't able to find any tests that actually exercise resuming jobs. Everything seems to be testing running or interrupting jobs, but no tests actually resume anything. So this happens to add test coverage for that 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some suggestions to polish things up, but overall this looks good!
Sidekiq 7 has been released, and it now errors if job arguments contain objects that don't round trip as JSON, so this PR is now not only desirable to avoid bugs, but also improves compatibility with Sidekiq. I'll see if I can get it over the finish line this week. |
e96553e
to
6fb5f5e
Compare
Looks like CI is broken for Rails edge due to the removal of |
39af58a
to
efd94dd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work on this Sam, I know it's been a long time coming! 🙌
A couple minor nits / comments, but nothing blocking.
end | ||
end | ||
|
||
def test_jobs_using_unserializable_cursor_when_interrupted_should_only_store_the_cursor_and_no_serialized_cursor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love the thoroughness of your tests, but I'm wondering about the value of this test (and the similar one below). Technically how the cursors are stored is private to JobIteration::Iteration
. IMHO it feels like we're testing the implementation too much here vs the expected behaviour.
I think that testing whether we can interrupt + resume jobs that are using unserializable cursors is covered by test_job_interrupted_with_only_cursor_position_should_resume
.
I wonder if test_jobs_using_serializable_cursor_when_interrupted_should_store_both_legacy_cursor_and_serialized_cursor
and test_job_interrupted_with_serialized_cursor_position_should_ignore_unserialized_cursor_position
could be combined into one test that , rather than explicitly checking how the cursors are being stored and read, simply checks that a job can be interrupted + resumed with a serializable cursor as expected?
Let me know if that makes sense 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I's been a while since I wrote these, but IIRC the reason for testing this in so much detail is that we want to ensure dequeuing works across gem versions. During the upgrade (or downgrade), jobs may be:
- interrupted on an old replica, and
- resumed on an old replica, or
- resumed on an new replica, or
- interrupted on a new replica, and
- resumed on a new replica, or
- resumed on an old replica
Because we can't actually test those scenarios (without significant effort), we test:
- what happens given various inputs arguments we might encounter in such scenarios, and
- what the output arguments are in such scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, skimming through these tests now, they make sense to me. I probably just wasn't thinking about needing to make sure we test forwards / backwards compatibility with job workers running the new vs old version of the code 🤷♀️
We should be able to remove some of these test cases for the 2.0 release that enforces serializable cursors, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. 2.0 should be able to safely stop reading (and writing) to the old argument, and we can (at least) drop all the tests that use it as an input/output, if not drop all the tests on the implementation and just test the resumption behavior.
1c662e2
to
c62500c
Compare
c62500c
to
07b6fd3
Compare
07b6fd3
to
c62500c
Compare
c62500c
to
e4356ed
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rebased on latest main
, and made some small changes, the main one being that we're defining the instance reader for job_iteration_enforce_serializable_cursors
ourselves, to use the per-class setting if non-nil?
, and otherwise fallback to the global setting. @Mangara and I 🎩'd on a test app and can confirm the deprecation warning shows up, and enabling serializable cursor enforcement works.
end | ||
end | ||
|
||
def test_jobs_using_unserializable_cursor_when_interrupted_should_only_store_the_cursor_and_no_serialized_cursor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I's been a while since I wrote these, but IIRC the reason for testing this in so much detail is that we want to ensure dequeuing works across gem versions. During the upgrade (or downgrade), jobs may be:
- interrupted on an old replica, and
- resumed on an old replica, or
- resumed on an new replica, or
- interrupted on a new replica, and
- resumed on a new replica, or
- resumed on an old replica
Because we can't actually test those scenarios (without significant effort), we test:
- what happens given various inputs arguments we might encounter in such scenarios, and
- what the output arguments are in such scenarios.
38e4cc1
to
f39525d
Compare
@@ -286,6 +281,13 @@ def job_iteration_max_job_runtime | |||
[global_max, class_max].min | |||
end | |||
|
|||
def job_iteration_enforce_serializable_cursors? # TODO: Add a test for the edge case of registering it afterwards |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we add this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch – yes, test_global_max_job_runtime_with_updated_value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we should remove the TODO 🙂
f39525d
to
a434fe8
Compare
|
||
Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(3)) | ||
The Enumerator returned by #{self.class.name}#build_enumerator yielded a cursor which is unsafe to serialize. | ||
See https://github.com/Shopify/job-iteration/blob/main/guides/custom-enumerator.md#cursor-types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to match the heading we added.
See https://github.com/Shopify/job-iteration/blob/main/guides/custom-enumerator.md#cursor-types | |
See https://github.com/Shopify/job-iteration/blob/main/guides/custom-enumerator.md#cursor-serialization |
Previously, `cursor_position` was handed as-is to the queue adapter. This could lead to the queue adapter corrupting cursors of certain classes. For example, if given a `Time` cursor, Sidekiq would save it as JSON by calling `to_s`, resulting in the deserialized cursor being a `String` instead of a `Time`. To prevent this, we now leverage `ActiveJob::Arguments` to (de)serialize the `cursor_position` and ensure it will make the round trip safely. However, as this is a breaking change (as unsafe cursors would previously be accepted, but possibly corrupted, whereas they would now be rejected), we begin by rescuing (de)serialization failures and emitting a deprecation warning. Starting in Job Iteration version 2.0, the deprecation warning will be removed, and (de)serialization failure will raise. Application owners can opt-in to the 2.0 behavior either globally by setting JobIteration.enforce_serializable_cursors = true or on an inheritable per-class basis by setting class MyJob < ActiveJob::Base include JobIteration::Iteration self.job_iteration_enforce_serializable_cursors = true # ... end
a434fe8
to
5086f6d
Compare
end | ||
end | ||
|
||
def test_jobs_using_unserializable_cursor_when_interrupted_should_only_store_the_cursor_and_no_serialized_cursor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, skimming through these tests now, they make sense to me. I probably just wasn't thinking about needing to make sure we test forwards / backwards compatibility with job workers running the new vs old version of the code 🤷♀️
We should be able to remove some of these test cases for the 2.0 release that enforces serializable cursors, right?
TL;DR
This is a redo of #73, but using a deprecation warning instead of raising an error. The error is deferred until
2.0
.Details
It is unsafe to use cursors which cannot be serialized using JSON.dump and deserialized again using JSON.parse. Such cursors may result in a different cursor being deserialized, or in serialization failing entirely, which may result in incorrect behaviour in iterative jobs.
Starting in 2.0, usage of such cursors will raise an ArgumentError. Until then, a deprecation warning will be logged.
#73 contains more context on the change, and #80 contains information as to the motivation for this alternative approach.
To facilitate migration, this also adds
enforce_serializable_cursors
config which can force the error 2.0 will introduceenforce_serializable_cursors
inheritable override configJobIteration::Deprecation
#441 is merged.Closes #80