Skip to content

Commit

Permalink
ClusterState API
Browse files Browse the repository at this point in the history
Summary:
Expose a globally accessible CluserState api
  • Loading branch information
muhamadazmy committed Jan 6, 2025
1 parent 4f0433f commit 3b0db22
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 1 deletion.
64 changes: 64 additions & 0 deletions crates/core/src/cluster_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_types::{GenerationalNodeId, NodeId, PlainNodeId};
use std::{iter::Iterator, marker::PhantomData};

use crate::TaskCenter;

struct UnimplementedIter<T> {
_phantom: std::marker::PhantomData<T>,
}

impl<T> UnimplementedIter<T> {
fn new() -> Self {
Self {
_phantom: PhantomData,
}
}
}

impl<T> Iterator for UnimplementedIter<T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
unimplemented!();
}
}

//todo(azmy): Default is temporary, remove it when the implementation is done
#[derive(Debug, Clone, Default)]
pub struct ClusterState {}

impl ClusterState {
pub fn try_current() -> Option<Self> {
TaskCenter::with_current(|h| h.cluster_state())
}

pub fn current() -> Self {
TaskCenter::with_current(|h| h.cluster_state()).expect("called outside task-center scope")
}

pub fn alive(&self) -> impl Iterator<Item = GenerationalNodeId> {
UnimplementedIter::new()
}

pub fn dead(&self) -> impl Iterator<Item = PlainNodeId> {
UnimplementedIter::new()
}

pub fn is_alive(&self, _node_id: NodeId) -> Option<GenerationalNodeId> {
unimplemented!();
}

pub fn first_alive(&self, _nodes: &[PlainNodeId]) -> Option<GenerationalNodeId> {
unimplemented!();
}
}
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod cluster_state;
mod error;
mod metadata;
pub mod metadata_store;
Expand Down
13 changes: 13 additions & 0 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use tracing::{debug, error, info, trace, warn};
use restate_types::identifiers::PartitionId;
use restate_types::GenerationalNodeId;

use crate::cluster_state::ClusterState;
use crate::metric_definitions::{
self, TC_FINISHED, TC_SPAWN, TC_STATUS_COMPLETED, TC_STATUS_FAILED,
};
Expand Down Expand Up @@ -277,6 +278,7 @@ struct TaskCenterInner {
current_exit_code: AtomicI32,
managed_tasks: Mutex<HashMap<TaskId, Arc<Task>>>,
global_metadata: OnceLock<Metadata>,
global_cluster_state: OnceLock<ClusterState>,
root_task_context: TaskContext,
}

Expand Down Expand Up @@ -310,6 +312,7 @@ impl TaskCenterInner {
current_exit_code: AtomicI32::new(0),
managed_tasks: Mutex::new(HashMap::new()),
global_metadata: OnceLock::new(),
global_cluster_state: OnceLock::new(),
managed_runtimes: Mutex::new(HashMap::with_capacity(64)),
root_task_context,
pause_time,
Expand All @@ -322,10 +325,20 @@ impl TaskCenterInner {
self.global_metadata.set(metadata).is_ok()
}

/// Attempt to set the global cluster state handle. This should be called once
/// at the startup of the node.
pub fn try_set_global_cluster_state(self: &Arc<Self>, cluster_state: ClusterState) -> bool {
self.global_cluster_state.set(cluster_state).is_ok()
}

pub fn global_metadata(self: &Arc<Self>) -> Option<&Metadata> {
self.global_metadata.get()
}

pub fn cluster_state(self: &Arc<Self>) -> Option<&ClusterState> {
self.global_cluster_state.get()
}

pub fn metadata(self: &Arc<Self>) -> Option<Metadata> {
match OVERRIDES.try_with(|overrides| overrides.metadata.clone()) {
Ok(Some(o)) => Some(o),
Expand Down
12 changes: 11 additions & 1 deletion crates/core/src/task_center/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use restate_types::identifiers::PartitionId;
use tokio_util::sync::CancellationToken;
use tracing::{instrument, Instrument};

use crate::{Metadata, ShutdownError};
use crate::{cluster_state::ClusterState, Metadata, ShutdownError};

use super::{
RuntimeError, RuntimeTaskHandle, TaskCenterInner, TaskContext, TaskHandle, TaskId, TaskKind,
Expand Down Expand Up @@ -60,6 +60,12 @@ impl Handle {
self.inner.try_set_global_metadata(metadata)
}

/// Attempt to set the global cluster state handle. This should be called once
/// at the startup of the node.
pub fn try_set_global_cluster_state(&self, cluster_state: ClusterState) -> bool {
self.inner.try_set_global_cluster_state(cluster_state)
}

/// Sets the current task_center but doesn't create a task. Use this when you need to run a
/// closure within task_center scope.
pub fn run_sync<F, O>(&self, f: F) -> O
Expand Down Expand Up @@ -153,6 +159,10 @@ impl Handle {
self.inner.metadata()
}

pub fn cluster_state(&self) -> Option<ClusterState> {
self.inner.cluster_state().cloned()
}

/// Take control over the running task from task-center. This returns None if the task was not
/// found, completed, or has been cancelled.
pub fn take_task(&self, task_id: TaskId) -> Option<TaskHandle<()>> {
Expand Down

0 comments on commit 3b0db22

Please sign in to comment.