Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume Not working #411

Open
earthspiligrim opened this issue Dec 17, 2024 · 0 comments
Open

Resume Not working #411

earthspiligrim opened this issue Dec 17, 2024 · 0 comments

Comments

@earthspiligrim
Copy link

earthspiligrim commented Dec 17, 2024

Expected Behavior

The pausing the topic consumption works fine but the receiver is not resumed. I am expecting the receiver to resume listening to the events from the topic work upon calling called consumer.resume(consumer.assignment());

Actual Behavior

It is throwing an error You must call one of receive*() methods before using doOnConsumer"

Steps to Reproduce

public <T> void applyCircuitBreaker(CircuitBreaker circuitBreaker, KafkaReceiver<T, T> kafkaReceiver) {
        circuitBreaker.getEventPublisher().onStateTransition((event) -> {
            if (event.getStateTransition() == StateTransition.CLOSED_TO_OPEN) {
                this.pauseReceiver(kafkaReceiver);
            } else if (event.getStateTransition() == StateTransition.OPEN_TO_HALF_OPEN || event.getStateTransition() == StateTransition.HALF_OPEN_TO_CLOSED) {
                this.resumeReceiver(kafkaReceiver);
            }

        });
    }

    private <T> void pauseReceiver(KafkaReceiver<T, T> kafkaReceiver) {
        kafkaReceiver.doOnConsumer((consumer) -> {
            consumer.pause(consumer.assignment());
            return consumer;
        }).doOnSuccess((success) -> {
            log.info("Successful pause {}", success.paused());
        }).doOnError((e) -> {
            log.error("Error in pausing", e);
        }).subscribe();
    }

    private <T> void resumeReceiver(KafkaReceiver<T, T> kafkaReceiver) {
        kafkaReceiver.doOnConsumer((consumer) -> {
            consumer.resume(consumer.assignment());
            return consumer;
        }).doOnSuccess((success) -> {
            log.info("Successfully resumed kafka receiver {}", success.assignment());
        }).doOnError((e) -> {
            log.error("Error in resuming", e);
        }).subscribe();
        log.info("resumeReceiver: Kafka receiver resumed.");
    }

Possible Solution

Your Environment

  • Reactor version(s) used:
  • Other relevant libraries versions (eg. netty, ...):
  • JVM version (java -version):
  • OS and version (eg uname -a):
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant