Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara committed Jan 8, 2025
1 parent b2ac7c2 commit 66d062a
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 32 deletions.
18 changes: 7 additions & 11 deletions arroyo/backends/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,12 @@ def __init__(
KafkaError.REQUEST_TIMED_OUT,
KafkaError.NOT_COORDINATOR,
KafkaError._WAIT_COORD,
KafkaError.STALE_MEMBER_EPOCH # kip-848
),
)

configuration = dict(configuration)
self.__is_incremental = (
configuration.get("partition.assignment.strategy") == "cooperative-sticky"
or configuration.get("group.protocol") == "consumer"
)
self.__is_cooperative_sticky = configuration.get("partition.assignment.strategy") == "cooperative-sticky"
auto_offset_reset = configuration.get("auto.offset.reset", "largest")

# This is a special flag that controls the auto offset behavior for
Expand Down Expand Up @@ -315,18 +313,16 @@ def assignment_callback(
def revocation_callback(
consumer: ConfluentConsumer, partitions: Sequence[ConfluentTopicPartition]
) -> None:
print("REVOCATION CALLBACK")
self.__state = KafkaConsumerState.REVOKING

arroyo_partitions = [Partition(Topic(i.topic), i.partition) for i in partitions]
partitions = [Partition(Topic(i.topic), i.partition) for i in partitions]

try:
if on_revoke is not None:
on_revoke(arroyo_partitions)
on_revoke(partitions)
finally:
if self.__is_incremental:
self.__consumer.incremental_unassign(partitions)

for partition in arroyo_partitions:
for partition in partitions:
# Staged offsets are deleted during partition revocation to
# prevent later committing offsets for partitions that are
# no longer owned by this consumer.
Expand Down Expand Up @@ -466,7 +462,7 @@ def __assign(self, offsets: Mapping[Partition, int]) -> None:
ConfluentTopicPartition(partition.topic.name, partition.index, offset)
for partition, offset in offsets.items()
]
if self.__is_incremental:
if self.__is_cooperative_sticky:
self.__consumer.incremental_assign(partitions)
else:
self.__consumer.assign(partitions)
Expand Down
77 changes: 77 additions & 0 deletions consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from typing import Any

import time
import signal
from contextlib import closing
import os
import logging
from typing import Mapping

from arroyo.types import Commit, Message, Partition, Topic
from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload
from arroyo.processing.strategies import RunTask, CommitOffsets, ProcessingStrategy
from arroyo.processing.strategies.abstract import ProcessingStrategyFactory
from arroyo.processing.processor import StreamProcessor
from arroyo.backends.kafka import KafkaProducer

logging.basicConfig(level=logging.INFO)

TOPIC = "test-kip848"

def print_msg(message: Message[Any]) -> Message[Any]:
((partition, offset),) = message.committable.items()
print(f"message: {partition.index}-{offset}")
return message

class Strat(RunTask[Any, Any]):
def join(self, *args: Any, **kwargs: Any) -> None:
print("joining strategy, sleeping 5 seconds")
time.sleep(5)
print("joining strategy, sleeping 5 seconds -- DONE")
return super().join(*args, **kwargs)

class Factory(ProcessingStrategyFactory[KafkaPayload]):
def create_with_partitions(self, commit: Commit, partitions: Mapping[Partition, int]) -> ProcessingStrategy[KafkaPayload]:
print("assign: ", [p.index for p in partitions])
return Strat(print_msg, CommitOffsets(commit))


default_config = {
"bootstrap.servers": os.environ.get("DEFAULT_BROKERS", "localhost:9092")
}

producer = KafkaProducer(default_config)

with closing(producer):
for i in range(30):
message = KafkaPayload(None, i.to_bytes(), [])
producer.produce(Topic(TOPIC), message).result()

consumer_config = build_kafka_consumer_configuration(
default_config,
group_id="kip848",
)

consumer_config["group.protocol"] = "consumer"
consumer_config.pop("session.timeout.ms", None)
consumer_config.pop("max.poll.interval.ms", None)
consumer_config.pop("partition.assignment.strategy", None)
consumer_config.pop("group.protocol.type", None)
consumer_config.pop("heartbeat.interval.ms", None)

consumer = KafkaConsumer(consumer_config)

processor = StreamProcessor(
consumer=consumer, topic=Topic(TOPIC), processor_factory=Factory()
)


def handler(signum: int, frame: Any) -> None:
processor.signal_shutdown()


signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)

processor.run()
22 changes: 1 addition & 21 deletions tests/test_kip848_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def create_with_partitions(self, commit: Commit, partitions: Mapping[Partition,

with closing(producer):
for i in range(30):
message = KafkaPayload(None, i.to_bytes(1, "big"), [])
message = KafkaPayload(None, i.to_bytes(), [])
producer.produce(topic, message).result()

consumer_config = build_kafka_consumer_configuration(
Expand All @@ -82,7 +82,6 @@ def create_with_partitions(self, commit: Commit, partitions: Mapping[Partition,
)

consumer_config["group.protocol"] = "consumer"
consumer_config["group.remote.assignor"] = "range"
consumer_config.pop("session.timeout.ms", None)
consumer_config.pop("max.poll.interval.ms", None)
consumer_config.pop("partition.assignment.strategy", None)
Expand All @@ -95,23 +94,4 @@ def create_with_partitions(self, commit: Commit, partitions: Mapping[Partition,
consumer=consumer, topic=Topic(TOPIC), processor_factory=Factory()
)

def handler(signum: int, frame: Any) -> None:
processor.signal_shutdown()

signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)

def shutdown() -> None:
for i in range(100):
time.sleep(0.1)
if counter == 30:
break
print("shutting down")
processor.signal_shutdown()

t = threading.Thread(target=shutdown, daemon=True)
t.start()

processor.run()

assert counter == 30

0 comments on commit 66d062a

Please sign in to comment.