mirror of
https://git.v0l.io/Kieran/void.cat.git
synced 2025-04-10 01:49:03 +02:00
Extract base stream filestore
This commit is contained in:
parent
1b460e7236
commit
dfeb4d41de
@ -2,9 +2,9 @@ namespace VoidCat.Model;
|
||||
|
||||
public sealed record IngressPayload(Stream InStream, SecretVoidFileMeta Meta)
|
||||
{
|
||||
public Guid? Id { get; init; }
|
||||
public Guid Id { get; init; } = Guid.NewGuid();
|
||||
public Guid? EditSecret { get; init; }
|
||||
public string? Hash { get; init; }
|
||||
|
||||
public bool IsAppend => Id.HasValue && EditSecret.HasValue;
|
||||
public bool IsAppend => EditSecret.HasValue;
|
||||
}
|
@ -1,29 +1,23 @@
|
||||
using System.Buffers;
|
||||
using System.Security.Cryptography;
|
||||
using VoidCat.Model;
|
||||
using VoidCat.Model.Exceptions;
|
||||
using VoidCat.Services.Abstractions;
|
||||
|
||||
namespace VoidCat.Services.Files;
|
||||
|
||||
public class LocalDiskFileStore : IFileStore
|
||||
public class LocalDiskFileStore : StreamFileStore, IFileStore
|
||||
{
|
||||
private const int BufferSize = 1_048_576;
|
||||
private readonly ILogger<LocalDiskFileStore> _logger;
|
||||
private readonly VoidSettings _settings;
|
||||
private readonly IAggregateStatsCollector _stats;
|
||||
private readonly IFileMetadataStore _metadataStore;
|
||||
private readonly IFileInfoManager _fileInfo;
|
||||
private readonly IUserUploadsStore _userUploads;
|
||||
|
||||
public LocalDiskFileStore(ILogger<LocalDiskFileStore> logger, VoidSettings settings, IAggregateStatsCollector stats,
|
||||
IFileMetadataStore metadataStore, IFileInfoManager fileInfo, IUserUploadsStore userUploads)
|
||||
: base(stats, metadataStore, userUploads)
|
||||
{
|
||||
_settings = settings;
|
||||
_stats = stats;
|
||||
_metadataStore = metadataStore;
|
||||
_fileInfo = fileInfo;
|
||||
_userUploads = userUploads;
|
||||
_logger = logger;
|
||||
|
||||
if (!Directory.Exists(_settings.DataDirectory))
|
||||
@ -38,71 +32,15 @@ public class LocalDiskFileStore : IFileStore
|
||||
if (!File.Exists(path)) throw new VoidFileNotFoundException(request.Id);
|
||||
|
||||
await using var fs = new FileStream(path, FileMode.Open, FileAccess.Read);
|
||||
if (request.Ranges.Any())
|
||||
{
|
||||
await EgressRanges(request.Id, request.Ranges, fs, outStream, cts);
|
||||
}
|
||||
else
|
||||
{
|
||||
await EgressFull(request.Id, fs, outStream, cts);
|
||||
}
|
||||
await EgressFromStream(fs, request, outStream, cts);
|
||||
}
|
||||
|
||||
public async ValueTask<PrivateVoidFile> Ingress(IngressPayload payload, CancellationToken cts)
|
||||
{
|
||||
var id = payload.Id ?? Guid.NewGuid();
|
||||
var fPath = MapPath(id);
|
||||
var meta = payload.Meta;
|
||||
if (payload.IsAppend)
|
||||
{
|
||||
if (meta?.EditSecret != null && meta.EditSecret != payload.EditSecret)
|
||||
{
|
||||
throw new VoidNotAllowedException("Edit secret incorrect!");
|
||||
}
|
||||
}
|
||||
|
||||
// open file
|
||||
var fPath = MapPath(payload.Id);
|
||||
await using var fsTemp = new FileStream(fPath,
|
||||
payload.IsAppend ? FileMode.Append : FileMode.Create, FileAccess.Write);
|
||||
|
||||
var (total, hash) = await IngressInternal(id, payload.InStream, fsTemp, cts);
|
||||
|
||||
if (payload.Hash != null && !hash.Equals(payload.Hash, StringComparison.InvariantCultureIgnoreCase))
|
||||
{
|
||||
throw new CryptographicException("Invalid file hash");
|
||||
}
|
||||
|
||||
if (payload.IsAppend)
|
||||
{
|
||||
meta = meta! with
|
||||
{
|
||||
Size = meta.Size + total
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
meta = meta! with
|
||||
{
|
||||
Digest = hash,
|
||||
Uploaded = DateTimeOffset.UtcNow,
|
||||
EditSecret = Guid.NewGuid(),
|
||||
Size = total
|
||||
};
|
||||
}
|
||||
|
||||
await _metadataStore.Set(id, meta);
|
||||
var vf = new PrivateVoidFile()
|
||||
{
|
||||
Id = id,
|
||||
Metadata = meta
|
||||
};
|
||||
|
||||
if (meta.Uploader.HasValue)
|
||||
{
|
||||
await _userUploads.AddFile(meta.Uploader.Value, vf);
|
||||
}
|
||||
|
||||
return vf;
|
||||
return await IngressToStream(fsTemp, payload, cts);
|
||||
}
|
||||
|
||||
public ValueTask<PagedResult<PublicVoidFile>> ListFiles(PagedRequest request)
|
||||
@ -160,92 +98,6 @@ public class LocalDiskFileStore : IFileStore
|
||||
await _metadataStore.Delete(id);
|
||||
}
|
||||
|
||||
private async Task<(ulong, string)> IngressInternal(Guid id, Stream ingress, Stream fs, CancellationToken cts)
|
||||
{
|
||||
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSize);
|
||||
var total = 0UL;
|
||||
int readLength = 0, offset = 0;
|
||||
var sha = SHA256.Create();
|
||||
while ((readLength = await ingress.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0)
|
||||
{
|
||||
if (readLength != 0 && offset + readLength < buffer.Memory.Length)
|
||||
{
|
||||
// read until buffer full
|
||||
offset += readLength;
|
||||
continue;
|
||||
}
|
||||
|
||||
var totalRead = readLength + offset;
|
||||
var buf = buffer.Memory[..totalRead];
|
||||
await fs.WriteAsync(buf, cts);
|
||||
await _stats.TrackIngress(id, (ulong)buf.Length);
|
||||
sha.TransformBlock(buf.ToArray(), 0, buf.Length, null, 0);
|
||||
total += (ulong)buf.Length;
|
||||
offset = 0;
|
||||
}
|
||||
|
||||
sha.TransformFinalBlock(Array.Empty<byte>(), 0, 0);
|
||||
return (total, BitConverter.ToString(sha.Hash!).Replace("-", string.Empty));
|
||||
}
|
||||
|
||||
private async Task EgressFull(Guid id, FileStream fileStream, Stream outStream,
|
||||
CancellationToken cts)
|
||||
{
|
||||
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSize);
|
||||
int readLength = 0, offset = 0;
|
||||
while ((readLength = await fileStream.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0)
|
||||
{
|
||||
if (readLength != 0 && offset + readLength < buffer.Memory.Length)
|
||||
{
|
||||
// read until buffer full
|
||||
offset += readLength;
|
||||
continue;
|
||||
}
|
||||
|
||||
var fullSize = readLength + offset;
|
||||
await outStream.WriteAsync(buffer.Memory[..fullSize], cts);
|
||||
await _stats.TrackEgress(id, (ulong)fullSize);
|
||||
await outStream.FlushAsync(cts);
|
||||
offset = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EgressRanges(Guid id, IEnumerable<RangeRequest> ranges, FileStream fileStream, Stream outStream,
|
||||
CancellationToken cts)
|
||||
{
|
||||
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSize);
|
||||
foreach (var range in ranges)
|
||||
{
|
||||
fileStream.Seek(range.Start ?? range.End ?? 0L,
|
||||
range.Start.HasValue ? SeekOrigin.Begin : SeekOrigin.End);
|
||||
|
||||
int readLength = 0, offset = 0;
|
||||
var dataRemaining = range.Size ?? 0L;
|
||||
while ((readLength = await fileStream.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0)
|
||||
{
|
||||
if (readLength != 0 && offset + readLength < buffer.Memory.Length)
|
||||
{
|
||||
// read until buffer full
|
||||
offset += readLength;
|
||||
continue;
|
||||
}
|
||||
|
||||
var fullSize = readLength + offset;
|
||||
var toWrite = Math.Min(fullSize, dataRemaining);
|
||||
await outStream.WriteAsync(buffer.Memory[..(int)toWrite], cts);
|
||||
await _stats.TrackEgress(id, (ulong)toWrite);
|
||||
await outStream.FlushAsync(cts);
|
||||
dataRemaining -= toWrite;
|
||||
offset = 0;
|
||||
|
||||
if (dataRemaining == 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private string MapPath(Guid id) =>
|
||||
Path.Join(_settings.DataDirectory, id.ToString());
|
||||
}
|
||||
}
|
174
VoidCat/Services/Files/StreamFileStore.cs
Normal file
174
VoidCat/Services/Files/StreamFileStore.cs
Normal file
@ -0,0 +1,174 @@
|
||||
using System.Buffers;
|
||||
using System.Security.Cryptography;
|
||||
using VoidCat.Model;
|
||||
using VoidCat.Model.Exceptions;
|
||||
using VoidCat.Services.Abstractions;
|
||||
|
||||
namespace VoidCat.Services.Files;
|
||||
|
||||
public abstract class StreamFileStore
|
||||
{
|
||||
private const int BufferSize = 1_048_576;
|
||||
private readonly IAggregateStatsCollector _stats;
|
||||
private readonly IFileMetadataStore _metadataStore;
|
||||
private readonly IUserUploadsStore _userUploads;
|
||||
|
||||
protected StreamFileStore(IAggregateStatsCollector stats, IFileMetadataStore metadataStore,
|
||||
IUserUploadsStore userUploads)
|
||||
{
|
||||
_stats = stats;
|
||||
_metadataStore = metadataStore;
|
||||
_userUploads = userUploads;
|
||||
}
|
||||
|
||||
protected async ValueTask EgressFromStream(Stream stream, EgressRequest request, Stream outStream,
|
||||
CancellationToken cts)
|
||||
{
|
||||
if (request.Ranges.Any() && stream.CanSeek)
|
||||
{
|
||||
await EgressRanges(request.Id, request.Ranges, stream, outStream, cts);
|
||||
}
|
||||
else
|
||||
{
|
||||
await EgressFull(request.Id, stream, outStream, cts);
|
||||
}
|
||||
}
|
||||
|
||||
protected async ValueTask<PrivateVoidFile> IngressToStream(Stream stream, IngressPayload payload,
|
||||
CancellationToken cts)
|
||||
{
|
||||
var id = payload.Id;
|
||||
var meta = payload.Meta;
|
||||
if (payload.IsAppend)
|
||||
{
|
||||
if (meta?.EditSecret != null && meta.EditSecret != payload.EditSecret)
|
||||
{
|
||||
throw new VoidNotAllowedException("Edit secret incorrect!");
|
||||
}
|
||||
}
|
||||
|
||||
var (total, hash) = await IngressInternal(id, payload.InStream, stream, cts);
|
||||
if (payload.Hash != null && !hash.Equals(payload.Hash, StringComparison.InvariantCultureIgnoreCase))
|
||||
{
|
||||
throw new CryptographicException("Invalid file hash");
|
||||
}
|
||||
|
||||
if (payload.IsAppend)
|
||||
{
|
||||
meta = meta! with
|
||||
{
|
||||
Size = meta.Size + total
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
meta = meta! with
|
||||
{
|
||||
Digest = hash,
|
||||
Uploaded = DateTimeOffset.UtcNow,
|
||||
EditSecret = Guid.NewGuid(),
|
||||
Size = total
|
||||
};
|
||||
}
|
||||
|
||||
await _metadataStore.Set(id, meta);
|
||||
var vf = new PrivateVoidFile()
|
||||
{
|
||||
Id = id,
|
||||
Metadata = meta
|
||||
};
|
||||
|
||||
if (meta.Uploader.HasValue)
|
||||
{
|
||||
await _userUploads.AddFile(meta.Uploader.Value, vf);
|
||||
}
|
||||
|
||||
return vf;
|
||||
}
|
||||
|
||||
private async Task<(ulong, string)> IngressInternal(Guid id, Stream ingress, Stream fs, CancellationToken cts)
|
||||
{
|
||||
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSize);
|
||||
var total = 0UL;
|
||||
int readLength = 0, offset = 0;
|
||||
var sha = SHA256.Create();
|
||||
while ((readLength = await ingress.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0)
|
||||
{
|
||||
if (readLength != 0 && offset + readLength < buffer.Memory.Length)
|
||||
{
|
||||
// read until buffer full
|
||||
offset += readLength;
|
||||
continue;
|
||||
}
|
||||
|
||||
var totalRead = readLength + offset;
|
||||
var buf = buffer.Memory[..totalRead];
|
||||
await fs.WriteAsync(buf, cts);
|
||||
await _stats.TrackIngress(id, (ulong) buf.Length);
|
||||
sha.TransformBlock(buf.ToArray(), 0, buf.Length, null, 0);
|
||||
total += (ulong) buf.Length;
|
||||
offset = 0;
|
||||
}
|
||||
|
||||
sha.TransformFinalBlock(Array.Empty<byte>(), 0, 0);
|
||||
return (total, BitConverter.ToString(sha.Hash!).Replace("-", string.Empty));
|
||||
}
|
||||
|
||||
private async Task EgressFull(Guid id, Stream inStream, Stream outStream,
|
||||
CancellationToken cts)
|
||||
{
|
||||
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSize);
|
||||
int readLength = 0, offset = 0;
|
||||
while ((readLength = await inStream.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0)
|
||||
{
|
||||
if (readLength != 0 && offset + readLength < buffer.Memory.Length)
|
||||
{
|
||||
// read until buffer full
|
||||
offset += readLength;
|
||||
continue;
|
||||
}
|
||||
|
||||
var fullSize = readLength + offset;
|
||||
await outStream.WriteAsync(buffer.Memory[..fullSize], cts);
|
||||
await _stats.TrackEgress(id, (ulong) fullSize);
|
||||
await outStream.FlushAsync(cts);
|
||||
offset = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EgressRanges(Guid id, IEnumerable<RangeRequest> ranges, Stream inStream, Stream outStream,
|
||||
CancellationToken cts)
|
||||
{
|
||||
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSize);
|
||||
foreach (var range in ranges)
|
||||
{
|
||||
inStream.Seek(range.Start ?? range.End ?? 0L,
|
||||
range.Start.HasValue ? SeekOrigin.Begin : SeekOrigin.End);
|
||||
|
||||
int readLength = 0, offset = 0;
|
||||
var dataRemaining = range.Size ?? 0L;
|
||||
while ((readLength = await inStream.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0)
|
||||
{
|
||||
if (readLength != 0 && offset + readLength < buffer.Memory.Length)
|
||||
{
|
||||
// read until buffer full
|
||||
offset += readLength;
|
||||
continue;
|
||||
}
|
||||
|
||||
var fullSize = readLength + offset;
|
||||
var toWrite = Math.Min(fullSize, dataRemaining);
|
||||
await outStream.WriteAsync(buffer.Memory[..(int) toWrite], cts);
|
||||
await _stats.TrackEgress(id, (ulong) toWrite);
|
||||
await outStream.FlushAsync(cts);
|
||||
dataRemaining -= toWrite;
|
||||
offset = 0;
|
||||
|
||||
if (dataRemaining == 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user