Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into test-kip-848
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara committed Jan 8, 2025
2 parents 59773f5 + 472c657 commit b2ac7c2
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 59 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
name: Checkout code
- uses: actions/setup-python@v2
with:
python-version: 3.8
python-version: 3.12
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand All @@ -34,7 +34,7 @@ jobs:
name: Checkout code
- uses: actions/setup-python@v2
with:
python-version: 3.8
python-version: 3.12
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand All @@ -48,7 +48,7 @@ jobs:
strategy:
max-parallel: 5
matrix:
python: [3.8, 3.9, "3.10", "3.11", "3.12"]
python: [3.9, "3.10", "3.11", "3.12", "3.13"]
timeout-minutes: 10
steps:
- uses: actions/checkout@v2
Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ repos:
rev: 3.8.4
hooks:
- id: flake8
language_version: python3.8
language_version: python3.12
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v0.961'
hooks:
Expand All @@ -42,4 +42,4 @@ repos:
entry: rustfmt --edition 2021
files: ^rust-arroyo/.*\.rs$
default_language_version:
python: python3.8
python: python3.12
6 changes: 3 additions & 3 deletions arroyo/backends/local/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ def __init__(
self.__message_storage = message_storage
self.__clock = clock

self.__offsets: MutableMapping[
str, MutableMapping[Partition, int]
] = defaultdict(dict)
self.__offsets: MutableMapping[str, MutableMapping[Partition, int]] = (
defaultdict(dict)
)

# The active subscriptions are stored by consumer group as a mapping
# between the consumer and it's subscribed topics.
Expand Down
12 changes: 8 additions & 4 deletions arroyo/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(
offset: int,
needs_commit: bool = True,
reason: Optional[str] = None,
log_exception: bool = True
log_exception: bool = True,
) -> None:
self.partition = partition
self.offset = offset
Expand Down Expand Up @@ -208,7 +208,9 @@ class NoopDlqProducer(DlqProducer[Any]):
"""

def produce(
self, value: BrokerValue[KafkaPayload], reason: Optional[str] = None,
self,
value: BrokerValue[KafkaPayload],
reason: Optional[str] = None,
) -> Future[BrokerValue[KafkaPayload]]:
future: Future[BrokerValue[KafkaPayload]] = Future()
future.set_running_or_notify_cancel()
Expand Down Expand Up @@ -340,7 +342,7 @@ def __init__(
self,
policy: DlqPolicy[TStrategyPayload],
) -> None:
self.MAX_PENDING_FUTURES = 1000 # This is a per partition max
self.MAX_PENDING_FUTURES = 2000 # This is a per partition max
self.__dlq_policy = policy

self.__futures: MutableMapping[
Expand All @@ -362,7 +364,9 @@ def reset_dlq_limits(self, assignment: Mapping[Partition, int]) -> None:
self.__dlq_policy.limit, assignment
)

def produce(self, message: BrokerValue[TStrategyPayload], reason: Optional[str] = None) -> None:
def produce(
self, message: BrokerValue[TStrategyPayload], reason: Optional[str] = None
) -> None:
"""
Removes all completed futures, then appends the given future to the list.
Blocks if the list is full. If the DLQ limit is exceeded, an exception is raised.
Expand Down
6 changes: 3 additions & 3 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ def __init__(
self.__processor_factory = processor_factory
self.__metrics_buffer = MetricsBuffer()

self.__processing_strategy: Optional[
ProcessingStrategy[TStrategyPayload]
] = None
self.__processing_strategy: Optional[ProcessingStrategy[TStrategyPayload]] = (
None
)

self.__message: Optional[BrokerValue[TStrategyPayload]] = None

Expand Down
6 changes: 3 additions & 3 deletions arroyo/processing/strategies/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ def __init__(
self.__next_step = next_step

if commit_policy is not None:
self.__commit_policy_state: Optional[
CommitPolicyState
] = commit_policy.get_state_machine()
self.__commit_policy_state: Optional[CommitPolicyState] = (
commit_policy.get_state_machine()
)
else:
self.__commit_policy_state = None

Expand Down
6 changes: 3 additions & 3 deletions arroyo/processing/strategies/run_task_with_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,9 +625,9 @@ def __check_for_results(self, timeout: Optional[float] = None) -> None:
while self.__processes:
try:
self.__check_for_results_impl(
timeout=max(deadline - time.time(), 0)
if deadline is not None
else None
timeout=(
max(deadline - time.time(), 0) if deadline is not None else None
)
)
except NextStepTimeoutError:
if deadline is None or deadline > time.time():
Expand Down
2 changes: 1 addition & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[mypy]
python_version = 3.8
python_version = 3.12

[mypy-avro.*]
ignore_missing_imports = True
Expand Down
4 changes: 2 additions & 2 deletions requirements-linter.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
black==24.3.0
flake8==3.8.4
black==24.10.0
flake8==7.1.1
28 changes: 13 additions & 15 deletions rust-arroyo/src/processing/dlq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::gauge;
use crate::types::{BrokerMessage, Partition, Topic, TopicOrPartition};

// This is a per-partition max
const MAX_PENDING_FUTURES: usize = 1000;
const MAX_PENDING_FUTURES: usize = 2000;

pub trait DlqProducer<TPayload>: Send + Sync {
// Send a message to the DLQ.
Expand Down Expand Up @@ -294,7 +294,7 @@ impl<TPayload: Send + Sync + 'static> DlqPolicyWrapper<TPayload> {
dlq_policy,
dlq_limit_state: DlqLimitState::default(),
futures: BTreeMap::new(),
buffered_messages: buffered_messages,
buffered_messages,
}
});
Self { inner }
Expand Down Expand Up @@ -435,13 +435,21 @@ impl<TPayload> BufferedMessages<TPayload> {
}

buffered.push_back(message.clone());
Self::report_partition_metrics(partition_index, buffered);
}

// Number of elements that can be held in buffer deque without reallocating
fn report_partition_metrics<T>(partition_index: u16, buffered: &VecDeque<T>) {
gauge!(
"arroyo.consumer.dlq_buffer.capacity",
buffered.capacity() as u64,
"partition_id" => partition_index
);

gauge!(
"arroyo.consumer.dlq_buffer.len",
buffered.len() as u64,
"partition_id" => partition_index
);
}

/// Return the message at the given offset or None if it is not found in the buffer.
Expand All @@ -458,12 +466,7 @@ impl<TPayload> BufferedMessages<TPayload> {
match message.offset.cmp(&offset) {
Ordering::Equal => {
let first = messages.pop_front();

gauge!(
"arroyo.consumer.dlq_buffer.capacity",
messages.capacity() as u64,
"partition_id" => partition.index
);
Self::report_partition_metrics(partition.index, messages);

return first;
}
Expand All @@ -472,12 +475,7 @@ impl<TPayload> BufferedMessages<TPayload> {
}
Ordering::Less => {
messages.pop_front();

gauge!(
"arroyo.consumer.dlq_buffer.capacity",
messages.capacity() as u64,
"partition_id" => partition.index
);
Self::report_partition_metrics(partition.index, messages);
}
};
}
Expand Down
2 changes: 1 addition & 1 deletion rust-arroyo/src/processing/strategies/produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
) -> Self {
let inner = RunTaskInThreads::new(
next_step,
Box::new(ProduceMessage::new(producer, topic)),
ProduceMessage::new(producer, topic),
concurrency,
Some("produce"),
);
Expand Down
39 changes: 20 additions & 19 deletions rust-arroyo/src/processing/strategies/run_task_in_threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ pub trait TaskRunner<TPayload, TTransformed, TError>: Send + Sync {
fn get_task(&self, message: Message<TPayload>) -> RunTaskFunc<TTransformed, TError>;
}

impl<F, TPayload, TTransformed, TError> TaskRunner<TPayload, TTransformed, TError> for F
where
F: Fn(Message<TPayload>) -> RunTaskFunc<TTransformed, TError>,
F: Send + Sync,
{
fn get_task(&self, message: Message<TPayload>) -> RunTaskFunc<TTransformed, TError> {
self(message)
}
}

/// This is configuration for the [`RunTaskInThreads`] strategy.
///
/// It defines the runtime on which tasks are being spawned, and the number of
Expand Down Expand Up @@ -92,21 +102,22 @@ pub struct RunTaskInThreads<TPayload, TTransformed, TError, N> {
}

impl<TPayload, TTransformed, TError, N> RunTaskInThreads<TPayload, TTransformed, TError, N> {
pub fn new(
pub fn new<TTaskRunner>(
next_step: N,
task_runner: Box<dyn TaskRunner<TPayload, TTransformed, TError>>,
task_runner: TTaskRunner,
concurrency: &ConcurrencyConfig,
// If provided, this name is used for metrics
custom_strategy_name: Option<&'static str>,
) -> Self
where
N: ProcessingStrategy<TTransformed> + 'static,
TTaskRunner: TaskRunner<TPayload, TTransformed, TError> + 'static,
{
let strategy_name = custom_strategy_name.unwrap_or("run_task_in_threads");

RunTaskInThreads {
next_step,
task_runner,
task_runner: Box::new(task_runner),
concurrency: concurrency.concurrency,
runtime: concurrency.handle(),
handles: VecDeque::new(),
Expand Down Expand Up @@ -263,10 +274,8 @@ mod tests {
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};

struct IdentityTaskRunner {}

impl<T: Send + Sync + 'static> TaskRunner<T, T, &'static str> for IdentityTaskRunner {
fn get_task(&self, message: Message<T>) -> RunTaskFunc<T, &'static str> {
fn identity_task_runner<T: Send + 'static>() -> impl TaskRunner<T, T, &'static str> {
|message: Message<T>| -> RunTaskFunc<T, &'static str> {
Box::pin(async move { Ok(message) })
}
}
Expand Down Expand Up @@ -310,12 +319,8 @@ mod tests {
#[test]
fn test() {
let concurrency = ConcurrencyConfig::new(1);
let mut strategy = RunTaskInThreads::new(
Mock::new(),
Box::new(IdentityTaskRunner {}),
&concurrency,
None,
);
let mut strategy =
RunTaskInThreads::new(Mock::new(), identity_task_runner(), &concurrency, None);

let message = Message::new_any_message("hello_world".to_string(), BTreeMap::new());

Expand All @@ -331,12 +336,8 @@ mod tests {
let next_step = Mock::new();
let counts = next_step.counts();
let concurrency = ConcurrencyConfig::new(2);
let mut strategy = RunTaskInThreads::new(
next_step,
Box::new(IdentityTaskRunner {}),
&concurrency,
None,
);
let mut strategy =
RunTaskInThreads::new(next_step, identity_task_runner(), &concurrency, None);

let partition = Partition::new(Topic::new("topic"), 0);

Expand Down

0 comments on commit b2ac7c2

Please sign in to comment.