fix(vault): atomic streamed write via temp→rename, suppress OCE log noise
AddEntryStreamingAsync now writes to a temp file in the same vault directory, renames it into place (POSIX rename(2) — atomic on Linux), and updates the index only after the rename succeeds. A client disconnect or I/O fault during the write leaves the original backing file intact and the index unchanged; the temp file is cleaned up best-effort on failure. Fixes the data-corruption regression on the replace path where a cancelled write could truncate the live backing file after the index update and FileMode.Create already ran. Also filters OperationCanceledException from error-level logging in RegisterResourceStreamingAsync — a normal client disconnect is not an error. Two tests added to AudioStoreStreamingTests covering cancel and fault on the replace path.
This commit is contained in:
@@ -206,10 +206,14 @@ public class FileDatabase : DirectoryIndexDirectory, IDisposable
|
|||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
// Swallow and return false, matching RegisterResourceAsync. Logged (unlike the buffered
|
// Swallow and return false, matching RegisterResourceAsync. Log at error for real failures
|
||||||
// path) because a streamed write failure can leave a partial backing file worth noticing.
|
// only — a normal client cancel (OperationCanceledException) is not an error condition and
|
||||||
|
// would spam the error log on every client disconnect during a large upload or replace.
|
||||||
|
if (ex is not OperationCanceledException)
|
||||||
|
{
|
||||||
_logger.LogError(ex, "RegisterResourceStreamingAsync failed for vault {VaultId} entry {EntryId}", vaultId, entryId);
|
_logger.LogError(ex, "RegisterResourceStreamingAsync failed for vault {VaultId} entry {EntryId}", vaultId, entryId);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -57,16 +57,18 @@ public abstract class MediaVault : VaultIndexDirectory
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Streams an entry's bytes into the vault without ever materializing the whole file in memory:
|
/// Streams an entry's bytes into the vault without ever materializing the whole file in memory.
|
||||||
/// records the supplied <paramref name="metaData"/> in the index, then invokes
|
|
||||||
/// <paramref name="writeContent"/> to emit bytes directly to the backing <see cref="FileStream"/>.
|
|
||||||
/// The metadata is supplied by the caller (there is no in-memory <see cref="FileBinary"/> to infer
|
/// The metadata is supplied by the caller (there is no in-memory <see cref="FileBinary"/> to infer
|
||||||
/// it from) — the store path (upload / replace-audio) sources its bytes from a staging file, not a
|
/// it from) — the store path (upload / replace-audio) sources its bytes from a staging file, not a
|
||||||
/// buffer. Returns the number of bytes written, for the caller to log.
|
/// buffer. Returns the number of bytes written, for the caller to log.
|
||||||
///
|
///
|
||||||
/// Index-then-file ordering matches <see cref="AddEntryAsync"/>; a mid-write failure therefore
|
/// Write ordering (atomic-replace guarantee): bytes are streamed to a temp file in the same vault
|
||||||
/// leaves an index entry over a partial/missing file, the same exposure the buffered path has on
|
/// directory, the temp file is renamed over the final backing-file path (POSIX <c>rename(2)</c> —
|
||||||
/// an I/O fault. The caller treats a thrown exception as a failed register.
|
/// atomic on the Linux prod host), and the index is updated only after the rename succeeds.
|
||||||
|
/// This ordering ensures: (a) the index never advertises a not-yet-present file; (b) a client
|
||||||
|
/// disconnect or I/O fault during the write leaves any prior backing file intact and the index
|
||||||
|
/// unchanged; (c) the temp file is cleaned up best-effort on any failure before re-throwing so the
|
||||||
|
/// vault directory stays tidy. The caller treats a thrown exception as a failed register.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public async Task<long> AddEntryStreamingAsync(
|
public async Task<long> AddEntryStreamingAsync(
|
||||||
string entryId,
|
string entryId,
|
||||||
@@ -74,17 +76,41 @@ public abstract class MediaVault : VaultIndexDirectory
|
|||||||
Func<Stream, CancellationToken, Task> writeContent,
|
Func<Stream, CancellationToken, Task> writeContent,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var mediaPath = GetMediaPathFromEntryKey(entryId, metaData.Extension);
|
var finalPath = GetMediaPathFromEntryKey(entryId, metaData.Extension);
|
||||||
|
var tempPath = Path.Combine(RootPath, Path.GetRandomFileName() + ".tmp");
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
long bytesWritten;
|
||||||
|
await using (var tempStream = new FileStream(
|
||||||
|
tempPath, FileMode.CreateNew, FileAccess.Write, FileShare.None,
|
||||||
|
bufferSize: 81920, useAsync: true))
|
||||||
|
{
|
||||||
|
await writeContent(tempStream, cancellationToken);
|
||||||
|
await tempStream.FlushAsync(cancellationToken);
|
||||||
|
bytesWritten = tempStream.Length;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rename into place — atomic on the Linux prod host (POSIX rename(2)); overwrites any
|
||||||
|
// existing same-extension backing file safely on the replace path.
|
||||||
|
File.Move(tempPath, finalPath, overwrite: true);
|
||||||
|
|
||||||
|
// Update the index only after the file is durably in place. A crash between Move and
|
||||||
|
// AddToIndexAsync leaves an unreferenced file on disk (a harmless orphan recoverable
|
||||||
|
// by a vault scan); a crash or cancel during the temp write leaves the original backing
|
||||||
|
// file and the index both unchanged.
|
||||||
await AddToIndexAsync(entryId, metaData);
|
await AddToIndexAsync(entryId, metaData);
|
||||||
|
|
||||||
await using var fileStream = new FileStream(
|
return bytesWritten;
|
||||||
mediaPath, FileMode.Create, FileAccess.Write, FileShare.None,
|
}
|
||||||
bufferSize: 81920, useAsync: true);
|
catch
|
||||||
await writeContent(fileStream, cancellationToken);
|
{
|
||||||
await fileStream.FlushAsync(cancellationToken);
|
// Best-effort temp-file cleanup. After a successful rename tempPath is gone and the
|
||||||
|
// delete is a no-op. After a write failure or cancel tempPath holds partial bytes that
|
||||||
return fileStream.Length;
|
// must be removed so the vault directory stays tidy.
|
||||||
|
try { if (File.Exists(tempPath)) File.Delete(tempPath); } catch { /* best-effort */ }
|
||||||
|
throw;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ using System.Text;
|
|||||||
using DeepDrftContent;
|
using DeepDrftContent;
|
||||||
using DeepDrftContent.Constants;
|
using DeepDrftContent.Constants;
|
||||||
using DeepDrftContent.FileDatabase.Models;
|
using DeepDrftContent.FileDatabase.Models;
|
||||||
|
using DeepDrftContent.FileDatabase.Services;
|
||||||
using DeepDrftContent.Processors;
|
using DeepDrftContent.Processors;
|
||||||
using FileDb = DeepDrftContent.FileDatabase.Services.FileDatabase;
|
using FileDb = DeepDrftContent.FileDatabase.Services.FileDatabase;
|
||||||
|
|
||||||
@@ -217,6 +218,94 @@ public class AudioStoreStreamingTests
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// -- atomic-write safety (cancel / fault on replace path) ---------------------------------
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// A client disconnect (OperationCanceledException) during the streamed write must leave any
|
||||||
|
/// pre-existing backing file byte-identical. The atomic temp→rename ordering ensures the rename
|
||||||
|
/// never happens on an incomplete write, so the original file is never truncated or overwritten.
|
||||||
|
/// </summary>
|
||||||
|
[Test]
|
||||||
|
public async Task AddEntryStreamingAsync_CancelMidWrite_OriginalBackingFileUnchanged()
|
||||||
|
{
|
||||||
|
var vault = await AudioVault.FromAsync(_testDir);
|
||||||
|
Assert.That(vault, Is.Not.Null);
|
||||||
|
|
||||||
|
const string entryId = "replace-target";
|
||||||
|
var original = new byte[] { 0xAA, 0xBB, 0xCC, 0xDD, 0xEE };
|
||||||
|
var originalMeta = MetaDataFactory.CreateAudioMetaData(entryId, ".wav", 10.0, 1411);
|
||||||
|
|
||||||
|
// Write the original entry — this is the backing file that must survive a cancelled replace.
|
||||||
|
await vault!.AddEntryStreamingAsync(entryId, originalMeta,
|
||||||
|
async (s, ct) => await s.WriteAsync(original, ct));
|
||||||
|
|
||||||
|
var backingPath = Path.Combine(_testDir, "replace-target.wav");
|
||||||
|
Assert.That(File.Exists(backingPath), Is.True, "Pre-condition: original backing file must exist");
|
||||||
|
var originalOnDisk = await File.ReadAllBytesAsync(backingPath);
|
||||||
|
Assert.That(originalOnDisk, Is.EqualTo(original), "Pre-condition: backing file must hold original bytes");
|
||||||
|
|
||||||
|
// Attempt a replace whose writeContent cancels after writing only a portion.
|
||||||
|
using var cts = new CancellationTokenSource();
|
||||||
|
var replacement = new byte[10_000];
|
||||||
|
Array.Fill(replacement, (byte)0xFF);
|
||||||
|
var replaceMeta = MetaDataFactory.CreateAudioMetaData(entryId, ".wav", 5.0, 1411);
|
||||||
|
|
||||||
|
Assert.ThrowsAsync<OperationCanceledException>(async () =>
|
||||||
|
await vault.AddEntryStreamingAsync(entryId, replaceMeta,
|
||||||
|
async (s, ct) =>
|
||||||
|
{
|
||||||
|
await s.WriteAsync(replacement.AsMemory(0, 100), ct);
|
||||||
|
await cts.CancelAsync();
|
||||||
|
ct.ThrowIfCancellationRequested(); // surfaces the cancel from the token
|
||||||
|
},
|
||||||
|
cts.Token));
|
||||||
|
|
||||||
|
// The original backing file must be byte-identical — no truncation, no partial replacement.
|
||||||
|
var afterContent = await File.ReadAllBytesAsync(backingPath);
|
||||||
|
Assert.That(afterContent, Is.EqualTo(original),
|
||||||
|
"Original backing file must be byte-identical after a cancelled replace");
|
||||||
|
|
||||||
|
// No stray temp files should remain in the vault directory.
|
||||||
|
var tmpFiles = Directory.GetFiles(_testDir, "*.tmp");
|
||||||
|
Assert.That(tmpFiles, Is.Empty, "Temp file must be cleaned up after cancel");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// An I/O fault during the streamed write must leave any pre-existing backing file intact.
|
||||||
|
/// The atomic temp→rename ordering ensures a faulting write never reaches rename, so the
|
||||||
|
/// original file is never touched.
|
||||||
|
/// </summary>
|
||||||
|
[Test]
|
||||||
|
public async Task AddEntryStreamingAsync_FaultMidWrite_OriginalBackingFileUnchanged()
|
||||||
|
{
|
||||||
|
var vault = await AudioVault.FromAsync(_testDir);
|
||||||
|
Assert.That(vault, Is.Not.Null);
|
||||||
|
|
||||||
|
const string entryId = "fault-target";
|
||||||
|
var original = new byte[] { 0x01, 0x02, 0x03, 0x04, 0x05 };
|
||||||
|
var originalMeta = MetaDataFactory.CreateAudioMetaData(entryId, ".wav", 10.0, 1411);
|
||||||
|
|
||||||
|
await vault!.AddEntryStreamingAsync(entryId, originalMeta,
|
||||||
|
async (s, ct) => await s.WriteAsync(original, ct));
|
||||||
|
|
||||||
|
var backingPath = Path.Combine(_testDir, "fault-target.wav");
|
||||||
|
var originalOnDisk = await File.ReadAllBytesAsync(backingPath);
|
||||||
|
Assert.That(originalOnDisk, Is.EqualTo(original), "Pre-condition: backing file must hold original bytes");
|
||||||
|
|
||||||
|
// Attempt a replace whose writeContent throws a simulated I/O fault.
|
||||||
|
Assert.ThrowsAsync<IOException>(async () =>
|
||||||
|
await vault.AddEntryStreamingAsync(entryId, originalMeta,
|
||||||
|
(_, _) => throw new IOException("Simulated I/O fault")));
|
||||||
|
|
||||||
|
// The original backing file must be intact.
|
||||||
|
var afterContent = await File.ReadAllBytesAsync(backingPath);
|
||||||
|
Assert.That(afterContent, Is.EqualTo(original),
|
||||||
|
"Original backing file must be byte-identical after a faulting replace");
|
||||||
|
|
||||||
|
var tmpFiles = Directory.GetFiles(_testDir, "*.tmp");
|
||||||
|
Assert.That(tmpFiles, Is.Empty, "Temp file must be cleaned up after fault");
|
||||||
|
}
|
||||||
|
|
||||||
// -- builders -----------------------------------------------------------------------------
|
// -- builders -----------------------------------------------------------------------------
|
||||||
|
|
||||||
private async Task<string> WriteAsync(byte[] bytes, string extension)
|
private async Task<string> WriteAsync(byte[] bytes, string extension)
|
||||||
|
|||||||
Reference in New Issue
Block a user