File Database Index watching
This commit is contained in:
@@ -4,11 +4,14 @@ using DeepDrftContent.Services.FileDatabase.Utils;
|
||||
namespace DeepDrftContent.Services.FileDatabase.Services;
|
||||
|
||||
/// <summary>
|
||||
/// Main file database class that orchestrates multiple media vaults
|
||||
/// Main file database class that orchestrates multiple media vaults.
|
||||
/// Includes file watching for automatic index reloading when modified by external processes.
|
||||
/// </summary>
|
||||
public class FileDatabase : DirectoryIndexDirectory
|
||||
public class FileDatabase : DirectoryIndexDirectory, IDisposable
|
||||
{
|
||||
private readonly StructuralMap<string, MediaVault> _vaults;
|
||||
private readonly IndexWatcher _indexWatcher;
|
||||
private bool _disposed;
|
||||
|
||||
/// <summary>
|
||||
/// Factory method to create a FileDatabase instance
|
||||
@@ -31,6 +34,7 @@ public class FileDatabase : DirectoryIndexDirectory
|
||||
private FileDatabase(string rootPath, IDirectoryIndex index) : base(rootPath, index)
|
||||
{
|
||||
_vaults = new StructuralMap<string, MediaVault>();
|
||||
_indexWatcher = new IndexWatcher();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -49,7 +53,7 @@ public class FileDatabase : DirectoryIndexDirectory
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a specific vault
|
||||
/// Initializes a specific vault and sets up file watching for its index
|
||||
/// </summary>
|
||||
private async Task InitVaultAsync(string vaultId, MediaVaultType vaultType)
|
||||
{
|
||||
@@ -59,6 +63,13 @@ public class FileDatabase : DirectoryIndexDirectory
|
||||
if (directoryVault != null)
|
||||
{
|
||||
_vaults.Set(vaultId, directoryVault);
|
||||
|
||||
// Watch the vault's index file for external modifications
|
||||
_indexWatcher.Watch(path, () =>
|
||||
{
|
||||
// Reload the index asynchronously when file changes
|
||||
_ = directoryVault.ReloadIndexAsync();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,4 +197,15 @@ public class FileDatabase : DirectoryIndexDirectory
|
||||
return _vaults.Size;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Disposes the file database and stops all file watchers
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
|
||||
_indexWatcher.Dispose();
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,9 +45,9 @@ public abstract class AbstractIndexContainer
|
||||
/// </summary>
|
||||
public abstract class IndexDirectory : AbstractIndexContainer
|
||||
{
|
||||
protected IEntryQueryable Index { get; }
|
||||
protected IEntryQueryable Index { get; set; }
|
||||
|
||||
protected IndexDirectory(string rootPath, IndexType type, IEntryQueryable index, IIndexDataFactory? indexDataFactory = null)
|
||||
protected IndexDirectory(string rootPath, IndexType type, IEntryQueryable index, IIndexDataFactory? indexDataFactory = null)
|
||||
: base(rootPath, type, indexDataFactory)
|
||||
{
|
||||
Index = index;
|
||||
@@ -81,21 +81,72 @@ public class DirectoryIndexDirectory : IndexDirectory
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Vault index directory implementation
|
||||
/// Vault index directory implementation with support for index reloading
|
||||
/// </summary>
|
||||
public class VaultIndexDirectory : IndexDirectory
|
||||
{
|
||||
private readonly IVaultIndex _vaultIndex;
|
||||
private IVaultIndex _vaultIndex;
|
||||
private readonly object _indexLock = new();
|
||||
private readonly IndexFactoryService _factoryService = new();
|
||||
|
||||
public VaultIndexDirectory(string rootPath, IVaultIndex index, IIndexDataFactory? indexDataFactory = null)
|
||||
: base(rootPath, IndexType.Vault, index, indexDataFactory)
|
||||
public VaultIndexDirectory(string rootPath, IVaultIndex index, IIndexDataFactory? indexDataFactory = null)
|
||||
: base(rootPath, IndexType.Vault, index, indexDataFactory)
|
||||
{
|
||||
_vaultIndex = index;
|
||||
}
|
||||
|
||||
protected async Task AddToIndexAsync(string entryId, MetaData metaData)
|
||||
{
|
||||
_vaultIndex.PutEntry(entryId, metaData);
|
||||
lock (_indexLock)
|
||||
{
|
||||
_vaultIndex.PutEntry(entryId, metaData);
|
||||
}
|
||||
await SaveIndexAsync(_vaultIndex);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reloads the index from disk. Called when the index file is modified externally.
|
||||
/// </summary>
|
||||
public async Task ReloadIndexAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
var newIndex = await _factoryService.LoadIndexAsync(IndexType.Vault, RootPath);
|
||||
if (newIndex is IVaultIndex vaultIndex)
|
||||
{
|
||||
lock (_indexLock)
|
||||
{
|
||||
_vaultIndex = vaultIndex;
|
||||
Index = vaultIndex;
|
||||
}
|
||||
Console.WriteLine($"VaultIndexDirectory: Reloaded index for {RootPath}, {vaultIndex.GetEntriesSize()} entries");
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"VaultIndexDirectory: Failed to reload index for {RootPath}: {ex.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Thread-safe check for index entry
|
||||
/// </summary>
|
||||
public new bool HasIndexEntry(string entryId)
|
||||
{
|
||||
lock (_indexLock)
|
||||
{
|
||||
return _vaultIndex.HasEntry(entryId);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Thread-safe get entry metadata
|
||||
/// </summary>
|
||||
public MetaData? GetEntryMetadata(string entryId)
|
||||
{
|
||||
lock (_indexLock)
|
||||
{
|
||||
return _vaultIndex.GetEntry(entryId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
using DeepDrftContent.Services.FileDatabase.Models;
|
||||
|
||||
namespace DeepDrftContent.Services.FileDatabase.Services;
|
||||
|
||||
/// <summary>
|
||||
/// Watches index files for external modifications and triggers reloads.
|
||||
/// Uses FileSystemWatcher to detect changes made by other processes (e.g., CLI).
|
||||
/// </summary>
|
||||
public class IndexWatcher : IDisposable
|
||||
{
|
||||
private readonly Dictionary<string, FileSystemWatcher> _watchers = new();
|
||||
private readonly Dictionary<string, Action> _reloadCallbacks = new();
|
||||
private readonly object _lock = new();
|
||||
private bool _disposed;
|
||||
|
||||
/// <summary>
|
||||
/// Registers an index file to be watched for changes.
|
||||
/// </summary>
|
||||
/// <param name="indexPath">Full path to the directory containing the index file</param>
|
||||
/// <param name="onChanged">Callback to invoke when the index file changes</param>
|
||||
public void Watch(string indexPath, Action onChanged)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_disposed) return;
|
||||
|
||||
// Already watching this path
|
||||
if (_watchers.ContainsKey(indexPath))
|
||||
{
|
||||
_reloadCallbacks[indexPath] = onChanged;
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var watcher = new FileSystemWatcher(indexPath)
|
||||
{
|
||||
Filter = "index",
|
||||
NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.CreationTime,
|
||||
EnableRaisingEvents = true
|
||||
};
|
||||
|
||||
watcher.Changed += OnIndexChanged;
|
||||
watcher.Created += OnIndexChanged;
|
||||
|
||||
_watchers[indexPath] = watcher;
|
||||
_reloadCallbacks[indexPath] = onChanged;
|
||||
|
||||
Console.WriteLine($"IndexWatcher: Watching {indexPath}/index");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"IndexWatcher: Failed to watch {indexPath}: {ex.Message}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stops watching an index file.
|
||||
/// </summary>
|
||||
public void Unwatch(string indexPath)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_watchers.TryGetValue(indexPath, out var watcher))
|
||||
{
|
||||
watcher.EnableRaisingEvents = false;
|
||||
watcher.Dispose();
|
||||
_watchers.Remove(indexPath);
|
||||
_reloadCallbacks.Remove(indexPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void OnIndexChanged(object sender, FileSystemEventArgs e)
|
||||
{
|
||||
var watcher = sender as FileSystemWatcher;
|
||||
if (watcher == null) return;
|
||||
|
||||
var indexPath = watcher.Path;
|
||||
|
||||
lock (_lock)
|
||||
{
|
||||
if (_reloadCallbacks.TryGetValue(indexPath, out var callback))
|
||||
{
|
||||
Console.WriteLine($"IndexWatcher: Index changed at {indexPath}, triggering reload");
|
||||
|
||||
// Invoke callback on a background thread to avoid blocking the watcher
|
||||
Task.Run(() =>
|
||||
{
|
||||
try
|
||||
{
|
||||
callback();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"IndexWatcher: Reload callback failed: {ex.Message}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
|
||||
foreach (var watcher in _watchers.Values)
|
||||
{
|
||||
watcher.EnableRaisingEvents = false;
|
||||
watcher.Dispose();
|
||||
}
|
||||
_watchers.Clear();
|
||||
_reloadCallbacks.Clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -62,25 +62,24 @@ public abstract class MediaVault : VaultIndexDirectory
|
||||
{
|
||||
// Infer MediaVaultType from the generic type T
|
||||
var vaultType = MediaVaultTypeMap.GetVaultType<T>();
|
||||
|
||||
|
||||
// Use thread-safe method from VaultIndexDirectory
|
||||
if (!HasIndexEntry(entryId))
|
||||
return null;
|
||||
|
||||
if (Index is not VaultIndex vaultIndex)
|
||||
return null;
|
||||
|
||||
var metaData = vaultIndex.GetEntry(entryId);
|
||||
// Use thread-safe metadata retrieval
|
||||
var metaData = GetEntryMetadata(entryId);
|
||||
if (metaData == null)
|
||||
return null;
|
||||
|
||||
var mediaPath = GetMediaPathFromEntryKey(metaData.MediaKey, metaData.Extension);
|
||||
|
||||
|
||||
if (!FileUtils.FileExists(mediaPath))
|
||||
return null;
|
||||
|
||||
var fileBinary = await FileUtils.FetchFileAsync(mediaPath);
|
||||
var parameters = MediaParamsFactory.Create(vaultType, fileBinary, metaData);
|
||||
|
||||
|
||||
var result = FileBinaryFactory.Create(vaultType, parameters);
|
||||
return (T)result;
|
||||
}
|
||||
|
||||
@@ -14,6 +14,8 @@ import { PlaybackScheduler } from './PlaybackScheduler.js';
|
||||
export interface AudioResult {
|
||||
success: boolean;
|
||||
error?: string;
|
||||
seekBeyondBuffer?: boolean;
|
||||
byteOffset?: number;
|
||||
}
|
||||
|
||||
export interface StreamingResult extends AudioResult {
|
||||
@@ -89,7 +91,13 @@ export class AudioPlayer {
|
||||
|
||||
initializeStreaming(totalStreamLength: number): AudioResult {
|
||||
try {
|
||||
// Full cleanup before starting new stream
|
||||
this.stopProgressTracking();
|
||||
this.scheduler.clear();
|
||||
this.streamDecoder.reset();
|
||||
this.resetState();
|
||||
|
||||
// Initialize new stream
|
||||
this.isStreamingMode = true;
|
||||
this.streamDecoder.initialize(totalStreamLength);
|
||||
console.log(`Streaming initialized: ${totalStreamLength} bytes expected`);
|
||||
@@ -236,16 +244,105 @@ export class AudioPlayer {
|
||||
return { success: false, error: 'Invalid seek position' };
|
||||
}
|
||||
|
||||
// Get buffered duration (accounting for playback offset)
|
||||
const bufferedDuration = this.scheduler.getTotalDuration() + this.scheduler.getPlaybackOffset();
|
||||
|
||||
// Check if seeking within buffered content
|
||||
if (position <= bufferedDuration) {
|
||||
return this.seekWithinBuffer(position);
|
||||
} else {
|
||||
// Seeking beyond buffer - signal C# to fetch new stream
|
||||
return this.seekBeyondBuffer(position);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Seek within currently buffered content
|
||||
*/
|
||||
private seekWithinBuffer(position: number): AudioResult {
|
||||
try {
|
||||
const wasPlaying = this.isPlaying;
|
||||
this.scheduler.stopAllSources();
|
||||
|
||||
// Adjust position relative to buffer start (subtract playback offset)
|
||||
const bufferRelativePosition = position - this.scheduler.getPlaybackOffset();
|
||||
this.pausePosition = position;
|
||||
|
||||
if (wasPlaying) {
|
||||
this.scheduler.playFromPosition(position);
|
||||
this.scheduler.playFromPosition(Math.max(0, bufferRelativePosition));
|
||||
}
|
||||
|
||||
console.log(`🔍 Seeked to ${position.toFixed(3)}s`);
|
||||
console.log(`🔍 Seeked within buffer to ${position.toFixed(3)}s (buffer-relative: ${bufferRelativePosition.toFixed(3)}s)`);
|
||||
return { success: true };
|
||||
} catch (error) {
|
||||
return { success: false, error: (error as Error).message };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Seek beyond buffered content - calculate byte offset for server request
|
||||
*/
|
||||
private seekBeyondBuffer(position: number): AudioResult {
|
||||
try {
|
||||
const byteOffset = this.streamDecoder.calculateByteOffset(position);
|
||||
if (byteOffset <= 0) {
|
||||
return { success: false, error: 'Cannot calculate byte offset' };
|
||||
}
|
||||
|
||||
console.log(`🔍 Seek beyond buffer to ${position.toFixed(3)}s requires byte offset ${byteOffset}`);
|
||||
|
||||
// Signal that C# needs to request new stream from offset
|
||||
return {
|
||||
success: true,
|
||||
seekBeyondBuffer: true,
|
||||
byteOffset: byteOffset
|
||||
};
|
||||
} catch (error) {
|
||||
return { success: false, error: (error as Error).message };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the total buffered duration (for C# to check if seek is within buffer)
|
||||
*/
|
||||
getBufferedDuration(): number {
|
||||
return this.scheduler.getTotalDuration() + this.scheduler.getPlaybackOffset();
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate byte offset for a time position (for C# layer)
|
||||
*/
|
||||
calculateByteOffset(positionSeconds: number): number {
|
||||
return this.streamDecoder.calculateByteOffset(positionSeconds);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reinitialize for offset streaming after seek-beyond-buffer
|
||||
* Called by C# after receiving new stream from server
|
||||
*/
|
||||
reinitializeFromOffset(totalStreamLength: number, seekPosition: number): AudioResult {
|
||||
try {
|
||||
console.log(`\n=== Reinitializing for offset stream ===`);
|
||||
console.log(`Seek position: ${seekPosition.toFixed(3)}s, Stream length: ${totalStreamLength}`);
|
||||
|
||||
// Stop current playback
|
||||
this.stopProgressTracking();
|
||||
const wasPlaying = this.isPlaying;
|
||||
this.isPlaying = false;
|
||||
|
||||
// Clear buffers and set new offset
|
||||
this.scheduler.clearForSeek();
|
||||
this.scheduler.setPlaybackOffset(seekPosition);
|
||||
|
||||
// Reinitialize decoder for new stream
|
||||
this.streamDecoder.reinitializeForOffset(totalStreamLength);
|
||||
|
||||
// Update state
|
||||
this.pausePosition = seekPosition;
|
||||
this.streamingStarted = false; // Will restart when new buffers arrive
|
||||
this.streamingCompleted = false;
|
||||
|
||||
console.log(`✅ Reinitialized for offset, was playing: ${wasPlaying}`);
|
||||
return { success: true };
|
||||
} catch (error) {
|
||||
return { success: false, error: (error as Error).message };
|
||||
|
||||
Reference in New Issue
Block a user