Skip to content

Support for Parallel Replication#1556

Merged
vazois merged 477 commits into
devfrom
vazois/mmrt-dev
Apr 27, 2026
Merged

Support for Parallel Replication#1556
vazois merged 477 commits into
devfrom
vazois/mmrt-dev

Conversation

@vazois

@vazois vazois commented Feb 11, 2026

Copy link
Copy Markdown
Contributor

Multi-Log Parallel Replication Feature

This PR introduces multi-log based Append-Only File (AOF) support to Garnet, enhancing write throughput and enabling optimized parallel replication replay. The feature leverages multiple physical TsavoriteLog instances to shard write operations and parallelize log scanning, shipping, and replay across multiple connections and iterators. While designed primarily for cluster mode replication, this feature can also be used in standalone mode to improve performance when AOF is enabled.

Feature Requirements

1. Sharded AOF Architecture

  • Improves AOF write-throughput through key-based sharding across distinct physical TsavoriteLog instances.
  • Accelerates replica synchronization through parallel log scanning and shipping across the network.
  • Full backward compatibility with existing single-log deployments

2. Flexible Parallel Replay with Tunable Task Granularity

  • Introduces virtual sublog abstraction to allow for parallel replay within a given physical sublog.
  • Minimizes inter-task coordination to maximize parallel execution efficiency

3. Read Consistency Protocol

  • Per session prefix consistency through the use of timestamp-based sequence numbers.
  • Sketch based key-level replay status tracking for efficient and lightweight freshness validation.
  • Version-based prefix-consistency across replica reconfiguration operations.
  • Ensures monotonically increasing sequence numbers across failovers through offset tracking during replica promotion.

4. Transaction Support

  • Coordinates multi-exec transactions across sublogs to maintain ACID properties during parallel replay.
  • Preserves consistent commit ordering per session through timestamp-based sequence numbers.

5. Fast Prefix-Consistent Recovery

  • Multi-sublog prefix-consistent recovery within the persisted commit boundaries.
  • Intra-page parallelism during recovery using multiple replay tasks.

Newly Introduced Configuration Parameters

Parameter Purpose
AofPhysicalSublogCount Number of physical TsavoriteLog instances
AofReplayTaskCount Replay tasks per physical sublog at replica
AofRefreshPhysicalSublogTailFrequencyMs Background task frequency for advancing idle sublog timestamps

Implementation Plan

Phase 1: Core Infrastructure

  • 1.1 Implement AofHeader extensions to eliminate single log overhead.

    • ShardedHeader for standalone operations.
    • TransactionHeader for coordinated operations.
  • 1.2 Implement GarnetLog abstraction layer.

    • SingleLog wrapper for legacy single log.
    • ShardedLog implementation for multi-log.
  • 1.3 SequenceNumberGenerator class.

    • Generate monotonically increasing sequence number using timestamps.
    • Ensure monotonicity at failover and recovery by using starting offset.

Phase 2: Primary Replication Stream

  • 2.1 AofSyncDriver class.

    • Single instance AofSyncDriver per attached replica.
    • Multiple instances of AofSyncTask per physical sublog.
    • Use dedicated AdvanceTime background task per attached replica.
  • 2.2 AofSyncTask class.

    • Independent log iterators per sublog
    • Network page shipping per sublog
    • Error handling and connection teardown
  • 2.3 AdvanceTime background task.

    • Primary monitors log changes by comparing last know tail address to the current tail address.
    • Primary associates the current tail address snapshot with a sequence number (timestamp) that is strictly larger than all sequence numbers assigned until that moment and notifies the replica.
    • Replica maintains an advance time background task that updates sublog time using the information from the primary's signal.
    • Primary advances last known tail address to the observed tail address.
    • The system reaches equilibrium when writes are quiesced and not more signals are send unless a new change is detected.

Phase 3: Replica Replay Stream

  • 3.1 ReplicaReplayDriver class.

    • Per-physical-sublog enqueue, scan and replay coordination
    • Manages ReplicaReplayTask for parallel replay within a single physical sublog.
  • 3.2 ReplicaReplayTask class.

    • Record filtering by task affinity.
    • Coordinated update of virtual sublog replay state to enable read prefix consistency.
  • 3.3 Standalone operation replay

    • Each operation executes within its appropriate context (BasicContext or TransactionalContext).
    • The virtual sublog replay state is updated prior to replay to maintain prefix consistency for read operations.
  • 3.4 Multi-exec transaction replay

    • Transaction operations are distributed across replay tasks based on key affinity.
    • Upon encountering the TxnCommit marker, each participating task acquires exclusive locks for its assigned keys.
    • The associated virtual sublog replay state gets updated following the standalone operation replay.
    • All participating tasks synchronize at a barrier before commit, which releases locks and makes results visible.
    • The commit marker advances time prior to execution, ensuring timestamp consistency while locks are still held.
  • 3.5 Custom transaction procedure replay

    • Similar to multi-exec transaction with the exception of having a single thread execute the custom procedure.
    • Virtual sublog replay state gets updated prior to lock acquisition.
    • Exclusive lock acquisition ensures that transaction partial results are not exposed to readers.

Phase 4: Read Consistency Protocol

  • 4.1 ReadConsistencyManager class

    • VirtualSublogReplayState struct using sketch arrays for key freshness tracking and sequence number frontier computation.
    • Provides APIs for updating sequence numbers at key or virtual sublog granularity.
    • Tracks version to maintain prefix consistency during replica reconfiguration events.
  • 4.2 Session based prefix consistency enforcement

    • Implement ConsistentReadGarnetApi and TransactionalConsistentReadGarnetApi to allow the jitter to optimize operational calls.
    • Define callbacks to enforce consistent read protocol (e.g. ValidateKeySequenceNumber, UpdateKeySequenceNumber).
    • Session level ReplicaReadSessionContext struct used to maximumSessionSequenceNumber metadata (i.e. sessionVersion, lastHash, lastVirtualSublogIdx) to enforce prefix consistency when is stable or during recovery

Phase 6: Prefix consistent recovery

  • 5.1 Commit operation

    • Occurs in unison across alls sublogs. AutoCommit disabled and triggered at the GarnetLog layer instead of within TsavoriteLog to control across sublogs commit.
    • Commit adds cookie tracking the timestamp value of when commit occurred to enforce prefix consistent recovery.
  • 5.2 RecoverLogDriver implementation

    • Independent iterators with shared bounds.
    • Record filtering by sequenceNumber < untilSequenceNumber.
    • Build ReadConsistencyManager state at recovery to initialize SequenceNumberGenerator.
    • Allow intra-page parallel recovery using scan, BulkConsume interface.

Phase 6: Testing & Validation

  • 6.1 Replication base tests passing with multi-log enabled
  • 6.2 Replication diskless sync tests passing with multi-log enabled

NOTES

Prefix Consistent Single Key Read Protocol

  • Each session tracks the maximum observed sequence number $T_{ms}$ and only proceeds when the key frontier $T_k$ (max of key and sublog sequence numbers) exceeds that value, guaranteeing visibility of earlier writes.
  • After the read, refresh $T_ms$ with the key's latest sequence number; timestamps are strictly increasing, so doing this post-read remains safe even though freshness validation occurred beforehand, and boundary reads never slip through.

Prefix Consistent Batch Read

  • For every key $K_i$ in the batch, ensure $T_{ms} &lt; T_{k_i}$, then compute $T_{max} = max(T_{k_1}..T_{k_n})$ before issuing the batched read.
  • Once the batch returns, verify each key still satisfies $T_{k_i} \leq T_max$; if any key advanced beyond $T_max$, redo the batch since a concurrent update happened. Because freshness gating blocks boundary reads, caching just $T_max$ is sufficient to detect drift.

[dev]

Method Job Runtime Params Mean Error StdDev Allocated
Get .NET 10 .NET 10.0 AOF 22.13 us 0.021 us 0.019 us -
Set .NET 10 .NET 10.0 AOF 30.25 us 0.155 us 0.145 us -
MGet .NET 10 .NET 10.0 AOF 12.37 us 0.008 us 0.007 us -
MSet .NET 10 .NET 10.0 AOF 25.39 us 0.032 us 0.029 us -
Get .NET 8 .NET 8.0 AOF 23.89 us 0.019 us 0.017 us -
Set .NET 8 .NET 8.0 AOF 36.90 us 0.404 us 0.378 us 1 B
MGet .NET 8 .NET 8.0 AOF 12.90 us 0.011 us 0.010 us -
MSet .NET 8 .NET 8.0 AOF 28.39 us 0.064 us 0.060 us -
Get .NET 10 .NET 10.0 DSV 15.88 us 0.013 us 0.012 us -
Set .NET 10 .NET 10.0 DSV 16.09 us 0.017 us 0.016 us -
MGet .NET 10 .NET 10.0 DSV 11.34 us 0.010 us 0.009 us -
MSet .NET 10 .NET 10.0 DSV 17.58 us 0.013 us 0.011 us -
Get .NET 8 .NET 8.0 DSV 17.29 us 0.028 us 0.026 us -
Set .NET 8 .NET 8.0 DSV 24.05 us 0.035 us 0.033 us -
MGet .NET 8 .NET 8.0 DSV 12.16 us 0.009 us 0.007 us -
MSet .NET 8 .NET 8.0 DSV 20.70 us 0.012 us 0.011 us -
Get .NET 10 .NET 10.0 None 22.13 us 0.032 us 0.030 us -
Set .NET 10 .NET 10.0 None 22.22 us 0.024 us 0.022 us -
MGet .NET 10 .NET 10.0 None 12.26 us 0.006 us 0.006 us -
MSet .NET 10 .NET 10.0 None 19.56 us 0.007 us 0.007 us -
Get .NET 8 .NET 8.0 None 22.83 us 0.017 us 0.015 us -
Set .NET 8 .NET 8.0 None 29.99 us 0.026 us 0.025 us -
MGet .NET 8 .NET 8.0 None 13.35 us 0.003 us 0.003 us -
MSet .NET 8 .NET 8.0 None 21.64 us 0.016 us 0.015 us -

[vazois/mmrt-dev]

Method Job Runtime Params Mean Error StdDev Allocated % Change vs dev
Get .NET 10 .NET 10.0 AOF 22.81 us 0.057 us 0.053 us - +2.29 %
Set .NET 10 .NET 10.0 AOF 30.30 us 0.164 us 0.146 us - -3.93 %
MGet .NET 10 .NET 10.0 AOF 12.30 us 0.036 us 0.034 us - -4.87 %
MSet .NET 10 .NET 10.0 AOF 26.23 us 0.110 us 0.103 us - +2.70 %
Get .NET 8 .NET 8.0 AOF 22.87 us 0.065 us 0.061 us - -2.18 %
Set .NET 8 .NET 8.0 AOF 36.96 us 0.378 us 0.354 us - -2.56 %
MGet .NET 8 .NET 8.0 AOF 13.12 us 0.032 us 0.030 us - -4.37 %
MSet .NET 8 .NET 8.0 AOF 29.24 us 0.034 us 0.030 us - +2.31 %
Get .NET 10 .NET 10.0 DSV 15.96 us 0.014 us 0.013 us - +1.53 %
Set .NET 10 .NET 10.0 DSV 16.33 us 0.073 us 0.068 us - +2.32 %
MGet .NET 10 .NET 10.0 DSV 11.38 us 0.012 us 0.011 us - +0.18 %
MSet .NET 10 .NET 10.0 DSV 17.46 us 0.010 us 0.009 us - -6.18 %
Get .NET 8 .NET 8.0 DSV 18.29 us 0.023 us 0.022 us - +5.18 %
Set .NET 8 .NET 8.0 DSV 23.29 us 0.075 us 0.070 us - +2.02 %
MGet .NET 8 .NET 8.0 DSV 12.33 us 0.004 us 0.004 us - -0.72 %
MSet .NET 8 .NET 8.0 DSV 20.72 us 0.011 us 0.010 us - -1.10 %
Get .NET 10 .NET 10.0 None 22.50 us 0.013 us 0.012 us - +1.81 %
Set .NET 10 .NET 10.0 None 23.12 us 0.030 us 0.028 us - +3.91 %
MGet .NET 10 .NET 10.0 None 12.24 us 0.019 us 0.017 us - -1.45 %
MSet .NET 10 .NET 10.0 None 18.85 us 0.010 us 0.010 us - -1.15 %
Get .NET 8 .NET 8.0 None 22.57 us 0.043 us 0.040 us - +3.20 %
Set .NET 8 .NET 8.0 None 29.25 us 0.010 us 0.009 us - +1.88 %
MGet .NET 8 .NET 8.0 None 12.99 us 0.028 us 0.025 us - -2.33 %
MSet .NET 8 .NET 8.0 None 22.17 us 0.009 us 0.008 us - +2.02 %

TODO

  • Ensure transaction replay releases locks in the event of an exception
  • Add timestamp tracking at primary per physical sublog.
  • Ensure timestamp tracking is consistent with recovery.
  • Ensure commit recovery does not recover on boundaries.
  • Failed Garnet.test.cluster.ClusterReplicationAsyncReplay.ClusterReplicationManualCheckpointing [CI]
  • Failed Garnet.test.cluster.ClusterReplicationTLS.ClusterSRNoCheckpointRestartSecondary(False,False)[CI]
  • Failed Garnet.test.cluster.ClusterMigrateTests(False).ClusterMigrateWrite[CI]
  • Failed Garnet.test.cluster.ClusterReplicationShardedLog.ClusterReplicationShardedLogRecover[CI]
  • Validate special case where maximumSessionSequenceNumber is 0 and FrontierSequenceNumber is also 0.
  • ClusterResetHardDuringDisklessReplicationAttach [CI]
  • ClusterReplicationCheckpointCleanupTest [CI]

vazois and others added 30 commits November 4, 2025 19:24
Comment thread libs/server/Servers/GarnetServerOptions.cs Outdated
Comment thread libs/cluster/Server/Gossip/GarnetServerNode.cs Outdated
Comment thread libs/host/Configuration/Options.cs Outdated
Comment thread libs/host/Garnet.host.csproj
Comment thread libs/server/AOF/ReplayCoordinator/AofReplayContext.cs
Comment thread libs/server/AOF/AofHeader.cs Outdated
Comment thread test/Garnet.test/RespTests.cs
Comment thread test/Garnet.test.cluster/ClusterTestContext.cs Outdated
Comment thread libs/server/TaskManager/TaskType.cs
Comment thread libs/server/TaskManager/TaskType.cs

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 169 out of 172 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread libs/cluster/Server/Replication/CheckpointEntry.cs
Comment thread libs/storage/Tsavorite/cs/src/core/Index/Interfaces/SessionFunctionsBase.cs Outdated
Comment thread libs/storage/Tsavorite/cs/src/core/ClientSession/NoOpSessionFunctions.cs Outdated
Comment thread website/docs/cluster/replication.md
Comment thread libs/host/defaults.conf
Comment thread libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs
Comment thread libs/cluster/Server/Replication/CheckpointStore.cs
@vazois vazois merged commit 30c6dee into dev Apr 27, 2026
29 of 30 checks passed
@vazois vazois deleted the vazois/mmrt-dev branch April 27, 2026 18:19
Mathos1432 added a commit that referenced this pull request May 14, 2026
…idempotency

- Use ActiveWorkerMonitor (backported from PR #1556) to drain in-flight ReplicaSyncTaskAsync before disposing GarnetClientSession, eliminating cross-thread dispose races against ExecuteClusterAppendLog.
- Defensively call Dispose() in ReplicaSyncTaskAsync's finally when TryRemove returns false, guarding against future removal sites that forget to dispose.
- Make AofSyncTaskInfo.Dispose() idempotent (Interlocked guard) so multiple disposal sites cannot trigger ObjectDisposedException from cts.Cancel after cts.Dispose.
- Drop the unreachable enteredMonitor flag; control only enters the try block when TryEnter succeeded.
- Test: extend DisposeReleasesGarnetClientSession to assert idempotency.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
badrishc pushed a commit that referenced this pull request May 15, 2026
… and dispose race in AofSyncTaskInfo (#1791)

* Fix GarnetClientSession leak and diskless replication dedup failure

Two related bugs in AofTaskStore caused unbounded accumulation
of AofSyncTaskInfo tasks on clusters using diskless replication.

1. TryAddReplicationTasks (the diskless path) compared existing
   tasks against rss.replicaNodeId for dedup. ReplicaSyncSession
   has two node ID fields: replicaNodeId (set by the disk-based
   constructor, null for diskless) and replicaSyncMetadata.originNodeId
   (set by the diskless constructor). The AofSyncTaskInfo was
   created with originNodeId, but dedup compared against the null
   replicaNodeId — so it never matched and every call added a new
   task. Over time numTasks grew unboundedly, inflating the
   RoleInfo[] from INFO REPLICATION until the response exceeded
   the network output buffer.

   Fix: use rss.replicaSyncMetadata.originNodeId in the dedup
   comparison. The singular TryAddReplicationTask (disk-based
   and CLUSTER AOFSYNC) is unaffected.

2. AofSyncTaskInfo.Dispose() did not dispose its owned
   GarnetClientSession. When ReplicaSyncTaskAsync is running,
   CTS cancellation causes it to exit and the finally block
   cleans up. But when ReplicaSyncTaskAsync has not yet started
   (e.g. the task fails to be added), Dispose() is the only
   cleanup path and the session was leaked.

   Fix: add garnetClient?.Dispose() to AofSyncTaskInfo.Dispose()
   and remove the redundant call from ReplicaSyncTaskAsync's
   finally block, giving a single disposal site.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* AofSyncTaskInfo termination log fix

Clarify that the client disposal is no longer happening in the finally block.

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Fix formatting issues from CI

* Add Allure attributes to AofSyncTaskInfoTests

Apply [AllureNUnit] attribute and inherit AllureTestBase to
match repo test conventions required by CI.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add REPLICATION category to AofSyncTaskInfoTests

Categorize the test to match the existing replication test
convention in the cluster test project.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Backport ActiveWorkerMonitor

* Address review feedback: drain in-flight workers, defensive Dispose, idempotency

- Use ActiveWorkerMonitor (backported from PR #1556) to drain in-flight ReplicaSyncTaskAsync before disposing GarnetClientSession, eliminating cross-thread dispose races against ExecuteClusterAppendLog.
- Defensively call Dispose() in ReplicaSyncTaskAsync's finally when TryRemove returns false, guarding against future removal sites that forget to dispose.
- Make AofSyncTaskInfo.Dispose() idempotent (Interlocked guard) so multiple disposal sites cannot trigger ObjectDisposedException from cts.Cancel after cts.Dispose.
- Drop the unreachable enteredMonitor flag; control only enters the try block when TryEnter succeeded.
- Test: extend DisposeReleasesGarnetClientSession to assert idempotency.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Bump version to 1.1.6.2 and surface Revision in startup log

Version.props now declares 1.1.6.2 (was 1.1.6) so that nuget packages produced from this branch carry the patch identifier.

GarnetServer.GetVersion() previously returned only Major.Minor.Build, which caused 1.1.6.2 builds to log themselves as 1.1.6 at startup. Append the Revision when non-zero so the startup log matches the assembly version.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Revert "Bump version to 1.1.6.2 and surface Revision in startup log"

This reverts commit 54cbd42. The version bump and the GetVersion() Revision-aware formatting are out of scope for this PR; the version bump should land on its own.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Bump version from 1.1.7 to 1.1.8

release/v1 just bumped to 1.1.7 in #1792. Bump again to 1.1.8 as part of this PR so the package built from this branch sorts above the latest published version.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* retrigger CI

* trigger CI

---------

Co-authored-by: Simon Nattress <simonn@microsoft.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Simon Nattress <nattress@gmail.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Vasileios Zois <vazois@microsoft.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants