Skip to content

Commit

Permalink
tests: add repro for node flap detach race
Browse files Browse the repository at this point in the history
  • Loading branch information
VladLazar committed Jan 7, 2025
1 parent d3fa0f6 commit 83e90d3
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 2 deletions.
2 changes: 2 additions & 0 deletions storage_controller/src/reconciler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,8 @@ impl Reconciler {
};

if increment_generation {
pausable_failpoint!("reconciler-pre-increment-generation");

let generation = self
.persistence
.increment_generation(self.tenant_shard_id, node.get_id())
Expand Down
4 changes: 4 additions & 0 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ use utils::{
generation::Generation,
http::error::ApiError,
id::{NodeId, TenantId, TimelineId},
pausable_failpoint,
sync::gate::Gate,
};

Expand Down Expand Up @@ -1024,6 +1025,8 @@ impl Service {
)
.await;

pausable_failpoint!("heartbeat-pre-node-state-configure");

// This is the code path for geniune availability transitions (i.e node
// goes unavailable and/or comes back online).
let res = self
Expand Down Expand Up @@ -2492,6 +2495,7 @@ impl Service {
// Persist updates
// Ordering: write to the database before applying changes in-memory, so that
// we will not appear time-travel backwards on a restart.

let mut schedule_context = ScheduleContext::default();
for ShardUpdate {
tenant_shard_id,
Expand Down
7 changes: 5 additions & 2 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -2521,6 +2521,7 @@ def start(
self,
extra_env_vars: dict[str, str] | None = None,
timeout_in_seconds: int | None = None,
await_active: bool = True,
) -> Self:
"""
Start the page server.
Expand All @@ -2547,8 +2548,10 @@ def start(
)
self.running = True

if self.env.storage_controller.running and self.env.storage_controller.node_registered(
self.id
if (
await_active
and self.env.storage_controller.running
and self.env.storage_controller.node_registered(self.id)
):
self.env.storage_controller.poll_node_status(
self.id, PageserverAvailability.ACTIVE, None, max_attempts=200, backoff=0.1
Expand Down
111 changes: 111 additions & 0 deletions test_runner/regress/test_storage_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
DEFAULT_AZ_ID,
LogCursor,
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
Expand Down Expand Up @@ -3303,3 +3304,113 @@ def test_storage_controller_detached_stopped(

# Confirm the detach happened
assert env.pageserver.http_client().tenant_list_locations()["tenant_shards"] == []


@run_only_on_default_postgres("Postgres version makes no difference here")
def test_storage_controller_node_flap_detach_race(
neon_env_builder: NeonEnvBuilder,
):
"""
Reproducer for https://github.com/neondatabase/neon/issues/10253.
When a node's availability flaps, the reconciliations spawned by the node
going offline may race with the reconciliation done when then node comes
back online.
"""
neon_env_builder.num_pageservers = 4

env = neon_env_builder.init_configs()
env.start()

tenant_id = TenantId.generate()
env.storage_controller.tenant_create(
tenant_id,
shard_count=2,
)
env.storage_controller.reconcile_until_idle()

stopped_nodes = [s["node_id"] for s in env.storage_controller.locate(tenant_id)]

def has_hit_failpoint(failpoint: str, offset: LogCursor | None = None) -> LogCursor:
res = env.storage_controller.log_contains(f"at failpoint {failpoint}", offset=offset)
assert res
return res[1]

# Stop the nodes which host attached shards.
# This will trigger reconciliations which pause before incrmenenting the generation,
# and, more importantly, updating the `generation_pageserver` of the shards.
env.storage_controller.configure_failpoints(("reconciler-pre-increment-generation", "pause"))
for node_id in stopped_nodes:
env.get_pageserver(node_id).stop(immediate=True)

def failure_handled() -> LogCursor:
stop_offset = None

for node_id in stopped_nodes:
res = env.storage_controller.log_contains(f"node {node_id} going offline")
assert res
stop_offset = res[1]

assert stop_offset
return stop_offset

offset = wait_until(failure_handled)

# Now restart the nodes and make them pause before marking themselves as available
# or running the activation reconciliation.
env.storage_controller.configure_failpoints(("heartbeat-pre-node-state-configure", "pause"))

for node_id in stopped_nodes:
env.get_pageserver(node_id).start(await_active=False)

offset = wait_until(
lambda: has_hit_failpoint("heartbeat-pre-node-state-configure", offset=offset)
)

# The nodes have restarted and are waiting to perform activaction reconciliation.
# Unpause the initial reconciliation triggered by the nodes going offline.
# It will attempt to detach from the old location, but notice that the old location
# is not yet available, and then stop before processing the results of the reconciliation.
env.storage_controller.configure_failpoints(("reconciler-epilogue", "pause"))
env.storage_controller.configure_failpoints(("reconciler-pre-increment-generation", "off"))

offset = wait_until(lambda: has_hit_failpoint("reconciler-epilogue", offset=offset))

# Let the nodes perform activation reconciliation while still holding up processing the result
# from the initial reconcile triggered by going offline.
env.storage_controller.configure_failpoints(("heartbeat-pre-node-state-configure", "off"))

def activate_reconciliation_done():
for node_id in stopped_nodes:
assert env.storage_controller.log_contains(
f"Node {node_id} transition to active", offset=offset
)

wait_until(activate_reconciliation_done)

# Finally, allow the initial reconcile to finish up.
env.storage_controller.configure_failpoints(("reconciler-epilogue", "off"))

# Give things a chance to settle and validate that no stale locations exist
env.storage_controller.reconcile_until_idle()

def validate_locations():
shard_locations = defaultdict(list)
for ps in env.pageservers:
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
shard_locations[loc[0]].append(
{"generation": loc[1]["generation"], "mode": loc[1]["mode"], "node": ps.id}
)

log.info(f"Shard locations: {shard_locations}")

attached_locations = {
k: list(filter(lambda loc: loc["mode"] == "AttachedSingle", v))
for k, v in shard_locations.items()
}

for shard, locs in attached_locations.items():
assert len(locs) == 1, f"{shard} has {len(locs)} attached locations"

wait_until(validate_locations, timeout=10)

0 comments on commit 83e90d3

Please sign in to comment.