From 318a79600db5a0857de9ca2c5ce6c798d04125c3 Mon Sep 17 00:00:00 2001 From: Vlad Gramuzov Date: Thu, 2 May 2019 15:15:21 +0300 Subject: [PATCH] fix(streams): Add error-handling switch --- CHANGELOG.md | 7 +++++++ Gemfile.lock | 2 +- README.md | 5 ++++- .../publishers/http_stream.rb | 13 +++++++++++-- .../publishers/http_stream_spec.rb | 12 ++++++++++++ .../publishers/kinesis_stream.rb | 9 +++++++-- .../publishers/kinesis_stream_spec.rb | 15 +++++++++++++++ .../publishers/sns_stream.rb | 9 +++++++-- .../publishers/sns_stream_spec.rb | 15 +++++++++++++++ lib/active_record_streams/version.rb | 2 +- lib/active_record_streams/version_spec.rb | 2 +- 11 files changed, 81 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c67dbbf..2f08545 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,3 +26,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.1.3] - 2019-05-02 ### Changed - `Content-type` header for the HTTP streams is set as a string to fix doubled content-type issue + +## [0.1.4] - 2019-05-02 +### Added +- Error-handler switch for `publish` methods + +### Changed +- README to reflect the error-handler switch diff --git a/Gemfile.lock b/Gemfile.lock index b36215b..e0ee42a 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - active_record_streams (0.1.3) + active_record_streams (0.1.4) activerecord (~> 4.2.10) aws-sdk (~> 2.11.9) diff --git a/README.md b/README.md index 59e895c..0c90ceb 100644 --- a/README.md +++ b/README.md @@ -134,7 +134,10 @@ class SampleHttpReSender def perform(table_name, message_json) message = ActiveRecordStreams::Message.from_json(message_json) ActiveRecordStreams.config.streams.each do |stream| - stream.publish(table_name, message) + # Use `handle_error: false` to raise exceptions instead of + # calling `error_handler`, this will let he Sidekiq to + # perform retries on it's own + stream.publish(table_name, message, handle_error: false) end end end diff --git a/lib/active_record_streams/publishers/http_stream.rb b/lib/active_record_streams/publishers/http_stream.rb index 6a5426d..ea91de6 100644 --- a/lib/active_record_streams/publishers/http_stream.rb +++ b/lib/active_record_streams/publishers/http_stream.rb @@ -30,7 +30,12 @@ def initialize( @error_handler = error_handler end - def publish(table_name, message) + ## + # @param [String] table_name + # @param [ActiveRecordStreams::Message] message + # @param [Boolean] handle_error + + def publish(table_name, message, handle_error: true) return unless (any_table? && allowed_table?(table_name)) || table_name == @table_name @@ -38,7 +43,7 @@ def publish(table_name, message) response = http.request(request) assert_response_code(response) rescue StandardError => e - raise e unless @error_handler.is_a?(Proc) + raise e unless call_error_handler?(handle_error) @error_handler.call(self, table_name, message, e) end @@ -82,6 +87,10 @@ def assert_response_code(response) raise StandardError, response.body end + + def call_error_handler?(handle_error) + @error_handler.is_a?(Proc) && handle_error + end end end end diff --git a/lib/active_record_streams/publishers/http_stream_spec.rb b/lib/active_record_streams/publishers/http_stream_spec.rb index a8a46f4..3f593bc 100644 --- a/lib/active_record_streams/publishers/http_stream_spec.rb +++ b/lib/active_record_streams/publishers/http_stream_spec.rb @@ -89,6 +89,18 @@ end end + context 'error response with error handling off' do + let(:response) { double(body: 'Error', code: 400) } + let(:error_handler) { Proc.new {} } + + it 'raises exception' do + expect(error_handler).not_to receive(:call) + + expect { subject.publish(actual_table_name, message, handle_error: false) } + .to raise_error(StandardError) + end + end + context 'https target' do let(:url) { 'https://hello.world' } diff --git a/lib/active_record_streams/publishers/kinesis_stream.rb b/lib/active_record_streams/publishers/kinesis_stream.rb index 5b33637..404572a 100644 --- a/lib/active_record_streams/publishers/kinesis_stream.rb +++ b/lib/active_record_streams/publishers/kinesis_stream.rb @@ -32,15 +32,16 @@ def initialize( ## # @param [String] table_name # @param [ActiveRecordStreams::Message] message + # @param [Boolean] handle_error - def publish(table_name, message) + def publish(table_name, message, handle_error: true) return unless (any_table? && allowed_table?(table_name)) || table_name == @table_name client.publish(@stream_name, partition_key(table_name), message.json, @overrides) rescue StandardError => e - raise e unless @error_handler.is_a?(Proc) + raise e unless call_error_handler?(handle_error) @error_handler.call(self, table_name, message, e) end @@ -55,6 +56,10 @@ def allowed_table?(table_name) !@ignored_tables.include?(table_name) end + def call_error_handler?(handle_error) + @error_handler.is_a?(Proc) && handle_error + end + def partition_key(table_name) "#{table_name}-#{Time.now.utc.strftime(PARTITION_KEY_TIME_FORMAT)}" end diff --git a/lib/active_record_streams/publishers/kinesis_stream_spec.rb b/lib/active_record_streams/publishers/kinesis_stream_spec.rb index 940b262..fbcd7ce 100644 --- a/lib/active_record_streams/publishers/kinesis_stream_spec.rb +++ b/lib/active_record_streams/publishers/kinesis_stream_spec.rb @@ -110,5 +110,20 @@ subject.publish(actual_table_name, message) end end + + context 'delivery error error with error handling off' do + let(:error_handler) { Proc.new {} } + + it 'raises exception' do + expect(kinesis_client).to receive(:publish) do + raise StandardError, 'Delivery error' + end + + expect(error_handler).not_to receive(:call) + + expect { subject.publish(actual_table_name, message, handle_error: false) } + .to raise_error(StandardError) + end + end end end diff --git a/lib/active_record_streams/publishers/sns_stream.rb b/lib/active_record_streams/publishers/sns_stream.rb index a24f29a..eedf32a 100644 --- a/lib/active_record_streams/publishers/sns_stream.rb +++ b/lib/active_record_streams/publishers/sns_stream.rb @@ -29,14 +29,15 @@ def initialize( ## # @param [String] table_name # @param [ActiveRecordStreams::Message] message + # @param [Boolean] handle_error - def publish(table_name, message) + def publish(table_name, message, handle_error: true) return unless (any_table? && allowed_table?(table_name)) || table_name == @table_name client.publish(@topic_arn, message.json, @overrides) rescue StandardError => e - raise e unless @error_handler.is_a?(Proc) + raise e unless call_error_handler?(handle_error) @error_handler.call(self, table_name, message, e) end @@ -51,6 +52,10 @@ def allowed_table?(table_name) !@ignored_tables.include?(table_name) end + def call_error_handler?(handle_error) + @error_handler.is_a?(Proc) && handle_error + end + def client @client ||= SnsClient.new end diff --git a/lib/active_record_streams/publishers/sns_stream_spec.rb b/lib/active_record_streams/publishers/sns_stream_spec.rb index 8f0768e..72dbe3a 100644 --- a/lib/active_record_streams/publishers/sns_stream_spec.rb +++ b/lib/active_record_streams/publishers/sns_stream_spec.rb @@ -106,5 +106,20 @@ subject.publish(actual_table_name, message) end end + + context 'delivery error with error handling off' do + let(:error_handler) { Proc.new {} } + + it 'raises exception' do + expect(sns_client).to receive(:publish) do + raise StandardError, 'Delivery error' + end + + expect(error_handler).not_to receive(:call) + + expect { subject.publish(actual_table_name, message, handle_error: false) } + .to raise_error(StandardError) + end + end end end diff --git a/lib/active_record_streams/version.rb b/lib/active_record_streams/version.rb index cea9c0e..bebd491 100644 --- a/lib/active_record_streams/version.rb +++ b/lib/active_record_streams/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module ActiveRecordStreams - VERSION = '0.1.3' + VERSION = '0.1.4' end diff --git a/lib/active_record_streams/version_spec.rb b/lib/active_record_streams/version_spec.rb index 3bd2f33..364014e 100644 --- a/lib/active_record_streams/version_spec.rb +++ b/lib/active_record_streams/version_spec.rb @@ -4,6 +4,6 @@ RSpec.describe ActiveRecordStreams::VERSION do it 'has a version number' do - expect(subject).to eq('0.1.3') + expect(subject).to eq('0.1.4') end end