Skip to content

Commit

Permalink
ClusterState API
Browse files Browse the repository at this point in the history
Summary:
Expose a globally accessible CluserState api
  • Loading branch information
muhamadazmy committed Jan 6, 2025
1 parent 4f0433f commit 4efae2c
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ hyper-rustls = { version = "0.27.2", default-features = false, features = [
"logging",
] }
hyper-util = { version = "0.1" }
indexmap = { version = "2" }
itertools = "0.13.0"
jsonschema = "0.26.0"
metrics = { version = "0.24" }
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ hostname = { workspace = true }
http = { workspace = true }
http-body = { workspace = true }
http-body-util = { workspace = true }
indexmap = { workspace = true }
humantime = { workspace = true }
hyper = { workspace = true }
hyper-util = { workspace = true, features = ["server-graceful", "server"] }
Expand Down
113 changes: 113 additions & 0 deletions crates/core/src/cluster_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use indexmap::IndexSet;
use restate_types::{GenerationalNodeId, NodeId, PlainNodeId};
use tokio::sync::watch;

use crate::{Metadata, TaskCenter};

//todo(azmy): Default is temporary, remove it when the implementation is done
#[derive(Debug, Clone, Default)]
pub struct ClusterState {
watch: watch::Sender<()>,
}

impl ClusterState {
// this is just a place holder since cluster state
// should only be constructed via the failure detector
pub fn new() -> Self {
Self {
watch: watch::Sender::new(()),
}
}

pub fn try_current() -> Option<Self> {
TaskCenter::with_current(|h| h.cluster_state())
}

#[track_caller]
pub fn current() -> Self {
TaskCenter::with_current(|h| h.cluster_state()).expect("called outside task-center scope")
}

pub fn watch(&self) -> watch::Receiver<()> {
self.watch.subscribe()
}

/// Gets an iterator over all alive nodes
pub fn alive(&self) -> impl Iterator<Item = GenerationalNodeId> {
// Dummy implementation

// assumes all nodes are alive
let nodes_config = Metadata::with_current(|m| m.nodes_config_ref());
nodes_config
.iter()
.map(|(_, n)| n.current_generation)
.collect::<Vec<_>>()
.into_iter()
}

/// Gets an iterator over all dead nodes
pub fn dead(&self) -> impl Iterator<Item = PlainNodeId> {
// Dummy implementation

// assumes all nodes are alive
std::iter::empty()
}

/// Checks if a node is a live, returning its generation node id if it is.
pub fn is_alive(&self, node_id: NodeId) -> Option<GenerationalNodeId> {
// Dummy implementation

// assume all nodes are alive
let nodes_config = Metadata::with_current(|m| m.nodes_config_ref());
nodes_config
.find_node_by_id(node_id)
.map(|n| n.current_generation)
.ok()
}

/// Finds the first alive node in the given slice.
pub fn first_alive(&self, nodes: &[PlainNodeId]) -> Option<GenerationalNodeId> {
// Dummy implementation

// assumes all nodes are alive hence
// always return the first node
if nodes.is_empty() {
return None;
}

let nodes_config = Metadata::with_current(|m| m.nodes_config_ref());
nodes_config
.find_node_by_id(nodes[0])
.map(|n| n.current_generation)
.ok()
}

/// Returns the subset of alive nodes from the given node set preserving their order.
pub fn intersect(&self, nodes: &IndexSet<PlainNodeId>) -> IndexSet<GenerationalNodeId> {
// Dummy implementation

// this dummy implementation just assumes
// all nodes in the set are alive and return the
// current known generational id
let nodes_config = Metadata::with_current(|m| m.nodes_config_ref());
nodes
.iter()
.filter_map(|plain_id| {
nodes_config
.find_node_by_id(*plain_id)
.map(|n| n.current_generation)
.ok()
})
.collect()
}
}
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub mod cluster_state;
mod error;
mod metadata;
pub mod metadata_store;
Expand Down
17 changes: 17 additions & 0 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use tracing::{debug, error, info, trace, warn};
use restate_types::identifiers::PartitionId;
use restate_types::GenerationalNodeId;

use crate::cluster_state::ClusterState;
use crate::metric_definitions::{
self, TC_FINISHED, TC_SPAWN, TC_STATUS_COMPLETED, TC_STATUS_FAILED,
};
Expand Down Expand Up @@ -122,6 +123,10 @@ impl TaskCenter {
Self::with_current(|tc| tc.try_set_global_metadata(metadata))
}

pub fn try_set_global_cluster_state(cluster_state: ClusterState) -> bool {
Self::with_current(|tc| tc.try_set_global_cluster_state(cluster_state))
}

/// Launch a new task
#[track_caller]
pub fn spawn<F>(kind: TaskKind, name: &'static str, future: F) -> Result<TaskId, ShutdownError>
Expand Down Expand Up @@ -277,6 +282,7 @@ struct TaskCenterInner {
current_exit_code: AtomicI32,
managed_tasks: Mutex<HashMap<TaskId, Arc<Task>>>,
global_metadata: OnceLock<Metadata>,
global_cluster_state: OnceLock<ClusterState>,
root_task_context: TaskContext,
}

Expand Down Expand Up @@ -310,6 +316,7 @@ impl TaskCenterInner {
current_exit_code: AtomicI32::new(0),
managed_tasks: Mutex::new(HashMap::new()),
global_metadata: OnceLock::new(),
global_cluster_state: OnceLock::new(),
managed_runtimes: Mutex::new(HashMap::with_capacity(64)),
root_task_context,
pause_time,
Expand All @@ -322,10 +329,20 @@ impl TaskCenterInner {
self.global_metadata.set(metadata).is_ok()
}

/// Attempt to set the global cluster state handle. This should be called once
/// at the startup of the node.
pub fn try_set_global_cluster_state(self: &Arc<Self>, cluster_state: ClusterState) -> bool {
self.global_cluster_state.set(cluster_state).is_ok()
}

pub fn global_metadata(self: &Arc<Self>) -> Option<&Metadata> {
self.global_metadata.get()
}

pub fn cluster_state(self: &Arc<Self>) -> Option<&ClusterState> {
self.global_cluster_state.get()
}

pub fn metadata(self: &Arc<Self>) -> Option<Metadata> {
match OVERRIDES.try_with(|overrides| overrides.metadata.clone()) {
Ok(Some(o)) => Some(o),
Expand Down
12 changes: 11 additions & 1 deletion crates/core/src/task_center/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use restate_types::identifiers::PartitionId;
use tokio_util::sync::CancellationToken;
use tracing::{instrument, Instrument};

use crate::{Metadata, ShutdownError};
use crate::{cluster_state::ClusterState, Metadata, ShutdownError};

use super::{
RuntimeError, RuntimeTaskHandle, TaskCenterInner, TaskContext, TaskHandle, TaskId, TaskKind,
Expand Down Expand Up @@ -60,6 +60,12 @@ impl Handle {
self.inner.try_set_global_metadata(metadata)
}

/// Attempt to set the global cluster state handle. This should be called once
/// at the startup of the node.
pub fn try_set_global_cluster_state(&self, cluster_state: ClusterState) -> bool {
self.inner.try_set_global_cluster_state(cluster_state)
}

/// Sets the current task_center but doesn't create a task. Use this when you need to run a
/// closure within task_center scope.
pub fn run_sync<F, O>(&self, f: F) -> O
Expand Down Expand Up @@ -153,6 +159,10 @@ impl Handle {
self.inner.metadata()
}

pub fn cluster_state(&self) -> Option<ClusterState> {
self.inner.cluster_state().cloned()
}

/// Take control over the running task from task-center. This returns None if the task was not
/// found, completed, or has been cancelled.
pub fn take_task(&self, task_id: TaskId) -> Option<TaskHandle<()>> {
Expand Down
5 changes: 5 additions & 0 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod roles;

use std::sync::Arc;

use restate_core::cluster_state::ClusterState;
use tracing::{debug, error, info, trace};

use codederror::CodedError;
Expand Down Expand Up @@ -323,6 +324,10 @@ impl Node {
let is_set = TaskCenter::try_set_global_metadata(metadata.clone());
debug_assert!(is_set, "Global metadata was already set");

// todo(azmy): this is just a dummy setup until failure detector is actually implemented
let is_set = TaskCenter::try_set_global_cluster_state(ClusterState::new());
debug_assert!(is_set, "Cluster state was already set");

// Start metadata manager
spawn_metadata_manager(self.metadata_manager)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/utoipa/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ debug = []
[dependencies]
serde = { workspace = true }
serde_json = { workspace = true }
indexmap = { version = "2", features = ["serde"] }
indexmap = { workspace = true, features = ["serde"] }

[dev-dependencies]
assert-json-diff = "2"
Expand Down

0 comments on commit 4efae2c

Please sign in to comment.