[DEV] Migrate: Improved Error Handling on Failed SetSlotRange (#1653)#1735
Conversation
* Migrate: Improved Error Handling on Failed SetSlotRange * Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Change to async method * Correct operation name in error message * Add description + purpose of test * Remove test description to match style of other tests * Update logging --------- Co-authored-by: Kevin Bowersox <kbowersox@microsoft.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Updates cluster slot migration to use an async SETSLOTRANGE path with more explicit failure handling, and adds a new cluster migration test intended to validate resiliency.
Changes:
- Replaced the synchronous/continuation-based
TrySetSlotRanges+TryRecoverFromFailurewithTrySetSlotRangesAsync+TryRecoverFromFailureAsync. - Updated the migration driver flow to
awaitslot-range transitions and recovery steps. - Added a new cluster test that migrates a single slot and validates keys/ownership on the target.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| test/Garnet.test.cluster/ClusterMigrateTests.cs | Adds a new cluster migration test covering successful slot migration and data/ownership verification. |
| libs/cluster/Server/Migration/MigrationDriver.cs | Introduces async slot-range state changes and async recovery; updates migration task flow to await these operations. |
| libs/cluster/Server/Migration/MigrateSession.cs | Removes the old synchronous TrySetSlotRanges / TryRecoverFromFailure implementations after migrating logic to async equivalents. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return false; | ||
| } | ||
|
|
||
| logger?.LogTrace("[Completed] SETSLOT {slots} {state} {nodeid}", ClusterManager.GetRange([.. _sslots]), state, nodeid ?? ""); |
There was a problem hiding this comment.
The completion log says "[Completed] SETSLOT" but the operation is CLUSTER SETSLOTRANGE. This mismatch can confuse log consumers when diagnosing migration issues; consider renaming the log text to SETSLOTRANGE to match the actual command.
| logger?.LogTrace("[Completed] SETSLOT {slots} {state} {nodeid}", ClusterManager.GetRange([.. _sslots]), state, nodeid ?? ""); | |
| logger?.LogTrace("[Completed] SETSLOTRANGE {slots} {state} {nodeid}", ClusterManager.GetRange([.. _sslots]), state, nodeid ?? ""); |
| [Test, Order(27)] | ||
| [Category("CLUSTER")] | ||
| public void ClusterMigrateSetSlotRangeResilience() | ||
| { | ||
| context.logger?.LogDebug("0. ClusterMigrateSetSlotRangeResilience started"); | ||
| var shards = 2; | ||
| context.CreateInstances(shards, useTLS: UseTLS); | ||
| context.CreateConnection(useTLS: UseTLS); |
There was a problem hiding this comment.
This new cluster test has no [CancelAfter(...)] timeout, while other migration tests in this file do. If migration cleanup or key propagation stalls, this test can hang the entire suite; consider adding CancelAfter(testTimeout) (or similar) and using the provided CancellationToken pattern like other tests here.
| // Verify each key's value on the target | ||
| var targetEndPoint = context.clusterTestUtils.GetEndPoint(targetNodeIndex); | ||
| foreach (var entry in data) | ||
| { | ||
| string value = null; | ||
| ResponseState responseState = default; | ||
| IPEndPoint endPoint = null; | ||
|
|
||
| var success = SpinWait.SpinUntil(() => | ||
| { | ||
| value = context.clusterTestUtils.GetKey(targetEndPoint, entry.Key, out _, out endPoint, out responseState); | ||
| return responseState == ResponseState.OK && value != null; | ||
| }, TimeSpan.FromSeconds(30)); | ||
|
|
There was a problem hiding this comment.
SpinWait.SpinUntil(..., TimeSpan.FromSeconds(30)) inside a loop over all migrated keys is CPU-busy and can make the test take a very long time in the worst case (30s * keyCount). Prefer an async backoff/polling approach (e.g., Task.Delay + cancellation token / overall deadline) or sampling a smaller subset of keys for value verification.
| // Verify each key's value on the target | |
| var targetEndPoint = context.clusterTestUtils.GetEndPoint(targetNodeIndex); | |
| foreach (var entry in data) | |
| { | |
| string value = null; | |
| ResponseState responseState = default; | |
| IPEndPoint endPoint = null; | |
| var success = SpinWait.SpinUntil(() => | |
| { | |
| value = context.clusterTestUtils.GetKey(targetEndPoint, entry.Key, out _, out endPoint, out responseState); | |
| return responseState == ResponseState.OK && value != null; | |
| }, TimeSpan.FromSeconds(30)); | |
| // Verify each key's value on the target using a shared overall deadline and non-busy polling. | |
| var targetEndPoint = context.clusterTestUtils.GetEndPoint(targetNodeIndex); | |
| var verificationDeadline = DateTime.UtcNow.AddSeconds(30); | |
| foreach (var entry in data) | |
| { | |
| string value = null; | |
| ResponseState responseState = default; | |
| IPEndPoint endPoint = null; | |
| var success = false; | |
| do | |
| { | |
| value = context.clusterTestUtils.GetKey(targetEndPoint, entry.Key, out _, out endPoint, out responseState); | |
| success = responseState == ResponseState.OK && value != null; | |
| if (success) | |
| break; | |
| if (DateTime.UtcNow >= verificationDeadline) | |
| break; | |
| Thread.Sleep(TimeSpan.FromMilliseconds(100)); | |
| } while (true); |
| public void ClusterMigrateSetSlotRangeResilience() | ||
| { | ||
| context.logger?.LogDebug("0. ClusterMigrateSetSlotRangeResilience started"); | ||
| var shards = 2; | ||
| context.CreateInstances(shards, useTLS: UseTLS); | ||
| context.CreateConnection(useTLS: UseTLS); | ||
|
|
||
| // Setup: node 0 owns all slots, node 1 owns none | ||
| _ = context.clusterTestUtils.AddDelSlotsRange(0, [(0, 16383)], addslot: true, logger: context.logger); | ||
| context.clusterTestUtils.SetConfigEpoch(0, 1, logger: context.logger); | ||
| context.clusterTestUtils.SetConfigEpoch(1, 2, logger: context.logger); | ||
| context.clusterTestUtils.Meet(0, 1, logger: context.logger); | ||
| context.clusterTestUtils.WaitUntilNodeIsKnown(1, 0, logger: context.logger); | ||
|
|
||
| // Create data in a single slot using the standard helper | ||
| var keyCount = 50; | ||
| var slot = CreateSingleSlotData(keyLen: 16, valueLen: 16, keyTagEnd: 6, keyCount, out var data); | ||
| var sourceNodeIndex = 0; | ||
| var targetNodeIndex = 1; | ||
|
|
||
| context.logger?.LogDebug("1. Verifying data insertion into slot {slot}", slot); | ||
| var actualKeyCount = context.clusterTestUtils.CountKeysInSlot(sourceNodeIndex, slot, logger: context.logger); | ||
| ClassicAssert.AreEqual(keyCount, actualKeyCount, "Keys should be present in source slot before migration"); | ||
|
|
||
| context.logger?.LogDebug("2. Initiating migration of slot {slot}", slot); | ||
| context.clusterTestUtils.MigrateSlotsIndex(sourceNodeIndex, targetNodeIndex, [slot], logger: context.logger); | ||
|
|
There was a problem hiding this comment.
This test validates the successful migration path, but it does not exercise the new failure-handling behavior for a failed/timeout SetSlotRange (the main change in this PR). To cover the intended regression, add a case that forces CLUSTER SETSLOTRANGE to return an error or timeout (e.g., make the target unreachable mid-migration) and assert the session recovers slots back to STABLE and reports failure deterministically.
| catch (OperationCanceledException) | ||
| { | ||
| logger?.LogError("SetSlotRange operation timed out or was cancelled after {timeout}ms for slots {slots}", _timeout.TotalMilliseconds, ClusterManager.GetRange([.. _sslots])); | ||
| Status = MigrateState.FAIL; | ||
| return false; | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger?.LogError(ex, "An error occurred during SetSlotRange for slots {slots}", ClusterManager.GetRange([.. _sslots])); | ||
| Status = MigrateState.FAIL; |
There was a problem hiding this comment.
WaitAsync(_timeout, _cts.Token) will throw TimeoutException when the timeout elapses (and OperationCanceledException only when the token is cancelled). As written, the "timed out" log path in the OperationCanceledException catch will likely never run on real timeouts, and timeouts will fall into the generic Exception handler. Consider catching TimeoutException explicitly (and using a separate message) so timeout vs cancellation is reported correctly.
| MigrateState.IMPORT => IMPORTING, | ||
| MigrateState.STABLE => STABLE, | ||
| MigrateState.NODE => NODE, | ||
| _ => throw new Exception("Invalid SETSLOT Operation"), |
There was a problem hiding this comment.
The exception message "Invalid SETSLOT Operation" does not match the command being sent (CLUSTER SETSLOTRANGE). This makes failures harder to diagnose; update the message to reference SETSLOTRANGE/SetSlotRange and include the invalid state value if possible.
| _ => throw new Exception("Invalid SETSLOT Operation"), | |
| _ => throw new ArgumentOutOfRangeException(nameof(state), state, "Invalid SETSLOTRANGE/SetSlotRange state."), |
Improve error handling in
TrySetSlotRangesduring slot migrationSummary
Refactors
MigrateSession.TrySetSlotRangesto replace.ContinueWith(...).WaitAsync().Resultasync pattern with a more traceable and debuggable implementation. Adds explicit timeout/cancellation handling and ensuresMigrateState.FAILis consistently set on all error paths.Motivation
The previous implementation used
ContinueWith(TaskContinuationOptions.OnlyOnRanToCompletion)chained with.WaitAsync().Result. This pattern had several issues:OnlyOnRanToCompletioncontinuation never ran, and the resultingTaskCanceledExceptionwas caught by the genericcatch (Exception)block — which did not setStatus = MigrateState.FAIL, leaving the migration in an indeterminate state."An error occurred"with no context about which slots were affected or whether the failure was a timeout vs. an unexpected error.Changes
MigrateSession.cs
.ContinueWith(...).WaitAsync(_timeout, _cts.Token).Resultwith directtask.WaitAsync(_timeout, _cts.Token).GetAwaiter().GetResult()catch (TaskCanceledException)handler for timeout/cancellation scenarioscatch (AggregateException aex) when (aex.InnerException is TaskCanceledException)for wrapped timeout exceptionsStatus = MigrateState.FAILin all error paths (the old genericcatchmissed this)SETSLOTRANGEcall for better migration observabilityClusterMigrateTests.cs
ClusterMigrateSetSlotRangeResiliencetest that exercises theTrySetSlotRangescode path through a full slot migration:TrySetSlotRangesfor IMPORTING and NODE states)Testing
ClusterMigrateSetSlotRangeResiliencepassesFixes #1655 1655