Skip to content
This repository has been archived by the owner on Nov 8, 2023. It is now read-only.

Commit

Permalink
try to_owned
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara committed Jun 16, 2022
1 parent 9f70ed2 commit 322a8d2
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
14 changes: 12 additions & 2 deletions src/backends/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,18 @@ impl<'a> KafkaPayload<'a> {
}
}
}
impl<'a> Clone for KafkaPayload<'a> {
fn clone(&self) -> KafkaPayload<'a> {
// impl<'a> Clone for KafkaPayload<'a> {
// fn clone(&self) -> KafkaPayload<'a> {
// match self {
// Self::Borrowed(ref msg) => Self::Owned(msg.detach()),
// Self::Owned(ref msg) => Self::Owned(msg.clone()),
// }
// }
// }

impl ToOwned for KafkaPayload<'static> {
type Owned = KafkaPayload<'static>;
fn to_owned(&self) -> KafkaPayload<'static> {
match self {
Self::Borrowed(ref msg) => Self::Owned(msg.detach()),
Self::Owned(ref msg) => Self::Owned(msg.clone()),
Expand Down
18 changes: 13 additions & 5 deletions src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,20 @@ impl<'a> StreamProcessor<'a> {
} else {
// Otherwise, we need to try fetch a new message from the consumer,
// even if there is no active assignment and/or processing strategy.
let msg = self.consumer.poll(Some(Duration::from_secs(1)));
let msg = self
.consumer
.poll(Some(Duration::from_secs(1)))
.unwrap()
.unwrap()
.to_owned();

self.message = Some(msg)

//TODO: Support errors properly
match msg {
Ok(m) => self.message = m,
Err(_) => return Err(RunError::PollError),
}
// match msg {
// Ok(m) => self.message = m,
// Err(_) => return Err(RunError::PollError),
// }
}

let mut trait_callbacks = self.strategies.lock().unwrap();
Expand Down

0 comments on commit 322a8d2

Please sign in to comment.