Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2198,11 +2198,20 @@ private void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, object cont
var poolReturn = true;
try
{
// Note: don't test for (numBytes >= ctx.record.required_bytes) for this initial read, as the file may legitimately end before the
// InitialIOSize request can be fulfilled.
ctx.record.available_bytes = (int)numBytes;

Debug.Assert(!(*(RecordInfo*)ctx.record.GetValidPointer()).Invalid, $"Invalid records should not be in the hash chain for pending IO; address {ctx.logicalAddress}");
// available_bytes is the count of valid bytes starting at GetValidPointer() (= aligned_pointer + valid_offset), so we must subtract
// valid_offset from the device's reported transfer count (which is total bytes written into the aligned buffer starting at aligned_pointer).
Debug.Assert(numBytes <= (uint)(ctx.record.available_bytes + ctx.record.valid_offset),
$"Expected numBytes ({numBytes}) <= (available_bytes ({ctx.record.available_bytes}) + valid_offset ({ctx.record.valid_offset})), per {nameof(GetAndPopulateReadBuffer)}()");
Debug.Assert(numBytes >= (uint)ctx.record.valid_offset,
$"Short read: {numBytes} bytes were read, which is below the valid_offset ({ctx.record.valid_offset}); the record start was not delivered by the device");
ctx.record.available_bytes = (int)numBytes - ctx.record.valid_offset;

if (ctx.record.available_bytes >= RecordInfo.Size)
{
var recordInfo = *(RecordInfo*)ctx.record.GetValidPointer();
Debug.Assert(!recordInfo.Invalid,
$"Invalid records should not be in the hash chain for pending IO; address {ctx.logicalAddress}, recordInfo {recordInfo}");
}

if (!VerifyRecordFromDiskCallback(ref ctx, out var prevAddressToRead, out var prevLengthToRead))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@ public interface IStreamBuffer : IDisposable
/// <summary>The size of the buffer used for writing data to and reading it from the disk. Must be a sector multiple.</summary>
internal const int BufferSize = 1 << LogSettings.kMinObjectLogSegmentSizeBits;

/// <summary>Initial IO size to read.</summary>
internal static int InitialIOSize => Environment.SystemPageSize;
/// <summary>Initial IO size to read. Sized to comfortably cover a typical small record (header + small key + small value)
/// in one device-sector IO. The previous default of one OS system page (4 KB on Linux x64) caused most reads of small
/// records to span 4 KB NAND-page boundaries on NVMe, doubling per-IO device latency (~0.92 ms vs ~0.67 ms for sector-aligned
/// 4 KB reads). With a 128-byte speculative read, the sector-aligned IO is typically 1 sector (and up to 2 sectors when the
/// record begins near the end of a sector), and usually captures a full small record with no re-read.
/// Records larger than what fits in the speculative read trigger a precise re-read via VerifyRecordFromDiskCallback with the
/// Records larger than what fits in the speculative read trigger a precise re-read via VerifyRecordFromDiskCallback with the
/// now-known recordLength, same as before — the cost is one extra IO per multi-sector record, which is a fair trade against
/// avoiding the NAND-crossing penalty on every small-record IO.</summary>
internal const int InitialIOSize = 128;

/// <summary>
/// We use these buffers for only read or only write operations, never both at the same time.
Expand Down
15 changes: 11 additions & 4 deletions libs/storage/Tsavorite/cs/src/core/Device/ShardedStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, IntPtr
long currentReadStart = (long)sourceAddress;
long readEnd = currentReadStart + readLength;
uint aggregateErrorCode = 0;
uint aggregateNumBytes = 0;
while (currentReadStart < readEnd)
{
long newStart = partitions.MapRange(currentReadStart, readEnd, out int shard, out long shardStartAddress, out long shardEndAddress);
Expand All @@ -275,13 +276,18 @@ public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, IntPtr
(ulong)shardStartAddress,
IntPtr.Add(destinationAddress, (int)writeOffset),
(uint)(shardEndAddress - shardStartAddress),
(e, n, o) =>
(errorCode, numBytes, ctx) =>
{
// TODO: this is incorrect if returned "bytes" written is allowed to be less than requested like POSIX.
if (e != 0) aggregateErrorCode = e;
if (errorCode != 0)
aggregateErrorCode = errorCode;
_ = Interlocked.Add(ref aggregateNumBytes, numBytes);

if (countdown.Signal())
{
callback(aggregateErrorCode, n, o);
// ReadAsync has called the ending .Signal() and exited, and we're the last parallel reader to finish.
// Call the callback with the full length read.
callback(aggregateErrorCode, aggregateNumBytes, ctx);
countdown.Dispose();
}
},
Expand All @@ -293,7 +299,8 @@ public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, IntPtr
// TODO: Check handling of overlapped wrapper
if (countdown.Signal())
{
callback(aggregateErrorCode, readLength, context);
// All parallel readers have finished. Call the callback with the full length read.
callback(aggregateErrorCode, aggregateNumBytes, context);
countdown.Dispose();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using Garnet.test.cluster;
using NUnit.Framework;

[SetUpFixture]
public class TestProjectSetup
{
[OneTimeSetUp]
public void SetPort() => ClusterTestContext.Port = (int)ClusterPortAssignment.ClusterMultiLogDiskless;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using Garnet.test.cluster;
using NUnit.Framework;

[SetUpFixture]
public class TestProjectSetup
{
[OneTimeSetUp]
public void SetPort() => ClusterTestContext.Port = (int)ClusterPortAssignment.ClusterReplicationRangeIndex;
}
17 changes: 12 additions & 5 deletions test/cluster/Garnet.test.cluster/ClusterTestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public enum ClusterPortAssignment
ClusterReplicationDiskless = 7500,
ClusterVectorSets = 7600,
ClusterMultiLog = 7700,
ClusterReplicationRangeIndex = 7800,
ClusterMultiLogDiskless = 7900,
}

public class ClusterTestContext
Expand Down Expand Up @@ -162,14 +164,19 @@ public void TearDown()
TestContext.CurrentContext.Result.Message);
}

cts.Cancel();
cts.Dispose();
waiter?.Dispose();
clusterTestUtils?.Dispose();

var timeoutSeconds = 60;
string failureReason = null;

// Phase 0: Tear down cancellation token, waiter, and client connections.
// Each disposal is isolated so a throw from one (e.g., ConnectionMultiplexer
// in a bad state after a mid-flight test failure) does not skip DisposeCluster
// below, which would leave GarnetServer nodes alive with TCP ports bound and
// cascade-fail subsequent tests.
try { cts.Cancel(); } catch (Exception ex) { logger?.LogError(ex, "cts.Cancel failed"); }
try { cts.Dispose(); } catch (Exception ex) { logger?.LogError(ex, "cts.Dispose failed"); }
try { waiter?.Dispose(); } catch (Exception ex) { logger?.LogError(ex, "waiter.Dispose failed"); }
try { clusterTestUtils?.Dispose(); } catch (Exception ex) { logger?.LogError(ex, "clusterTestUtils.Dispose failed"); }

// Phase 1: Dispose cluster nodes (may timeout if handlers are stuck)
try
{
Expand Down
Loading