Refactor downloaders to use generic options and unify logic
Replaces specialized binary and HTML downloaders with a generic, options-driven UnitDownloader and UnitFragmentDownloader pattern. Introduces UnitDownloaderOptions and builder classes for flexible configuration, updates interfaces and method signatures to support progress reporting, and removes redundant binary-specific classes. Updates Playwright and Stealth downloaders to use the new generic base, and adds improved error handling and reporting. Also updates dependency versions and makes minor API consistency improvements across the Fluent and Models layers.
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.9" />
|
||||
<PackageReference Include="System.Linq.Async" Version="6.0.3" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@@ -58,6 +58,7 @@ namespace Beam.Downloaders {
|
||||
var (result, downloadedT) = await unit.TryDownload(
|
||||
links.ToArray(),
|
||||
Context.CancellationToken,
|
||||
downProgress: Context.DownloadReporter,
|
||||
tryProgress: Context.RetryReporter);
|
||||
|
||||
if (!result) {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
using Beam.Abstractions;
|
||||
using Beam.Models;
|
||||
using HtmlAgilityPack;
|
||||
using File = System.IO.File;
|
||||
|
||||
namespace Beam.Downloaders {
|
||||
/// <summary>
|
||||
@@ -10,34 +11,104 @@ namespace Beam.Downloaders {
|
||||
/// <param name="web"></param>
|
||||
/// <param name="transformer"></param>
|
||||
/// <param name="failurePredicate"></param>
|
||||
public class UnitDownloader<T>(HtmlWeb web, AsyncTransformer<HtmlDocument, T> transformer, AsyncDownloadFailurePredicate<HtmlDocument>?[]? failurePredicate = null) : IUnitDownloader<T> {
|
||||
public HtmlWeb Web { get; } = web;
|
||||
public virtual AsyncTransformer<HtmlDocument, T> Transformer { get; } = transformer;
|
||||
public virtual AsyncDownloadFailurePredicate<HtmlDocument>?[]? FailurePredicates { get; } = failurePredicate;
|
||||
public class UnitDownloader<RawType, OutType>(UnitDownloaderOptions<RawType, OutType> options) : IUnitDownloader<OutType> where RawType : IDocument {
|
||||
public UnitDownloaderOptions<RawType, OutType> Options { get; } = options;
|
||||
public HttpClient Client => Options.Client;
|
||||
public virtual AsyncTransformer<RawType, OutType> Transformer => Options.AsyncTransformer;
|
||||
|
||||
public virtual AsyncDownloadFailurePredicate<RawType>?[]? FailurePredicates =>
|
||||
Options?.FailurePredicateOptions?.AsyncDownloadFailurePredicates;
|
||||
|
||||
public int LinksPerDownload { get; } = 1;
|
||||
|
||||
protected virtual async Task<bool> IsFailure(HtmlDocument doc) {
|
||||
if (FailurePredicates is null)
|
||||
return false;
|
||||
var failed = false;
|
||||
await Parallel.ForEachAsync(FailurePredicates, async (x, ct) => {
|
||||
if (failed == true)
|
||||
return;
|
||||
if (x is null)
|
||||
return;
|
||||
if (await x(doc))
|
||||
failed = true;
|
||||
});
|
||||
protected virtual async Task DownloadToStream(string url, int bufferSize, Stream destinationStream, IProgress<IDownloadReport> progress,
|
||||
CancellationToken ct) {
|
||||
|
||||
return failed;
|
||||
var stream = await Client.GetStreamAsync(url, ct);
|
||||
byte[] buffer = new byte[bufferSize];
|
||||
int inBuffer = 0;
|
||||
long downloaded = 0;
|
||||
while ((inBuffer = stream.Read(buffer)) > 0) {
|
||||
downloaded += inBuffer;
|
||||
await destinationStream.WriteAsync(buffer.AsMemory(0, inBuffer), ct);
|
||||
progress?.Report(new DownloadReport() {
|
||||
BytesDownloaded = inBuffer,
|
||||
BytesRemaining = stream.Length - downloaded
|
||||
});
|
||||
|
||||
ct.ThrowIfCancellationRequested();
|
||||
}
|
||||
}
|
||||
|
||||
protected virtual async Task<(bool, T?)> TryDownloadWithNoRetries(string link, CancellationToken ct) {
|
||||
protected virtual async Task DownloadToFile(string url, int bufferSize, string path,
|
||||
IProgress<IDownloadReport> progress, CancellationToken ct) {
|
||||
|
||||
if (!Directory.Exists(Path.GetDirectoryName(path)))
|
||||
throw new InvalidOperationException(
|
||||
string.Format(Exceptions.Exceptions.unit_download_directory_nonexistant, path));
|
||||
await using var file = File.OpenWrite(path);
|
||||
await DownloadToStream(url, bufferSize, file, progress, ct);
|
||||
}
|
||||
|
||||
protected virtual async Task<ByteDocument> DownloadToMemory(string url, int bufferSize,
|
||||
IProgress<IDownloadReport> progress, CancellationToken ct) {
|
||||
|
||||
await using var ms = new MemoryStream();
|
||||
await DownloadToStream(url, bufferSize, ms, progress, ct);
|
||||
if (!ms.TryGetBuffer(out var bytes))
|
||||
throw new Exception(Exceptions.Exceptions.unit_download_invalid_memory_stream);
|
||||
return new ByteDocument(url, bytes);
|
||||
}
|
||||
|
||||
protected virtual async Task<bool> IsFailure(RawType doc, CancellationToken ct) {
|
||||
if (FailurePredicates is null)
|
||||
return false;
|
||||
if (!(Options?.FailurePredicateOptions?.ProcessInParallel ?? false))
|
||||
foreach (var pred in FailurePredicates) {
|
||||
if (pred is null)
|
||||
continue;
|
||||
if (await pred(doc))
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
var failed = false;
|
||||
await Parallel.ForEachAsync(FailurePredicates, new ParallelOptions() {
|
||||
MaxDegreeOfParallelism = Options?.FailurePredicateOptions?.ParallelThreads ?? 4,
|
||||
CancellationToken = ct
|
||||
},
|
||||
async (predicate, token) => {
|
||||
if (token.IsCancellationRequested)
|
||||
return;
|
||||
if (failed)
|
||||
return;
|
||||
if (predicate == null)
|
||||
return;
|
||||
if (await predicate(doc))
|
||||
Interlocked.CompareExchange(ref failed, true, false);
|
||||
}
|
||||
);
|
||||
return failed;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
protected virtual async Task<RawType> _Download(string link, IProgress<IDownloadReport> progress, CancellationToken ct) {
|
||||
if (Options.DownloadFolder is not null && this is UnitDownloader<StringDocument, OutType>) {
|
||||
var path = Path.Combine(Options.DownloadFolder, Path.GetRandomFileName());
|
||||
await DownloadToFile(link, Options.BufferSize, path, progress, ct);
|
||||
return (RawType)(object)new StringDocument(link, path);
|
||||
}
|
||||
if (this is UnitDownloader<ByteDocument, OutType>) {
|
||||
return (RawType)(object)(await DownloadToMemory(link, Options.BufferSize, progress, ct));
|
||||
}
|
||||
throw new NotSupportedException(Exceptions.Exceptions.unit_downloader_limited_support);
|
||||
}
|
||||
|
||||
protected virtual async Task<(bool, OutType?)> Transform(RawType download, CancellationToken ct) {
|
||||
try {
|
||||
var html = await Web.LoadFromWebAsync(link, ct);
|
||||
if (FailurePredicates is null || !(await IsFailure(html)))
|
||||
return (true, await Transformer(html));
|
||||
if (FailurePredicates is null || !(await IsFailure(download, ct)))
|
||||
return (true, await Transformer(download));
|
||||
else
|
||||
return (false, default);
|
||||
} catch(Exception) {
|
||||
@@ -45,23 +116,26 @@ namespace Beam.Downloaders {
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<(bool, T?)> TryDownload(IOrdered<string>[] link, CancellationToken ct, int maximumRetryCount = 7, IProgress<IRetryReport>? tryProgress = null) {
|
||||
public async Task<(bool, OutType?)> TryDownload(IOrdered<string>[] link, CancellationToken ct, int maximumRetryCount = 7, IProgress<IDownloadReport>? downProgress = null, IProgress<IRetryReport>? tryProgress = null) {
|
||||
if (link.Length == 0)
|
||||
return (false, default);
|
||||
|
||||
T? doc = default;
|
||||
downProgress ??= new Progress<IDownloadReport>();
|
||||
|
||||
OutType? ot = default;
|
||||
int tryCount = 0;
|
||||
while (tryCount < maximumRetryCount) {
|
||||
ct.ThrowIfCancellationRequested();
|
||||
(var success, doc) = await TryDownloadWithNoRetries(link[0].Data, ct);
|
||||
if (success && doc != null)
|
||||
return (true, doc);
|
||||
var rt = await _Download(link[0].Data, downProgress, ct);
|
||||
(var success, ot) = await Transform(rt, ct);
|
||||
if (success && ot != null)
|
||||
return (true, ot);
|
||||
++tryCount;
|
||||
tryProgress?.Report(new RetryReport(tryCount, link[0].Data));
|
||||
await Task.Delay((int)Math.Pow(2, tryCount) * 1000);
|
||||
}
|
||||
|
||||
return (false, doc);
|
||||
return (false, ot);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,74 +0,0 @@
|
||||
using Beam.Abstractions;
|
||||
using Beam.Models;
|
||||
|
||||
namespace Beam.Downloaders {
|
||||
/// <summary>
|
||||
/// A download-managing class that retrieves binary data through <see cref="HttpClient"/>,
|
||||
/// applies an <see cref="AsyncBinaryTransformer{T}"/>, and supports failure detection
|
||||
/// plus exponential-back-off retries. Safe to instantiate per request.
|
||||
/// </summary>
|
||||
public class UnitDownloaderBinary<T>(
|
||||
HttpClient client,
|
||||
AsyncTransformer<ByteDocument, T> transformer,
|
||||
AsyncDownloadFailurePredicate<ByteDocument>?[]? failurePredicates = null)
|
||||
: IUnitDownloader<T> {
|
||||
public HttpClient Client { get; } = client;
|
||||
public virtual AsyncTransformer<ByteDocument, T> Transformer { get; } = transformer;
|
||||
public virtual AsyncDownloadFailurePredicate<ByteDocument>?[]? FailurePredicates { get; } = failurePredicates;
|
||||
|
||||
public int LinksPerDownload { get; } = 1;
|
||||
|
||||
/// <summary>Runs all configured failure predicates in parallel on the raw HTTP response.</summary>
|
||||
protected virtual async Task<bool> IsFailure(ByteDocument response) {
|
||||
if (FailurePredicates is null) return false;
|
||||
|
||||
var failed = false;
|
||||
await Parallel.ForEachAsync(FailurePredicates, async (pred, ct) => {
|
||||
if (failed || pred is null) return;
|
||||
if (await pred(response))
|
||||
failed = true;
|
||||
});
|
||||
return failed;
|
||||
}
|
||||
|
||||
/// <summary>One attempt without retries or back-off.</summary>
|
||||
protected virtual async Task<(bool Success, T? Result)> TryDownloadWithNoRetries(string link, CancellationToken ct) {
|
||||
try {
|
||||
using var response = await Client.GetAsync(link, HttpCompletionOption.ResponseHeadersRead, ct);
|
||||
if (!response.IsSuccessStatusCode) return (false, default);
|
||||
|
||||
var bytes = await response.Content.ReadAsByteArrayAsync(ct);
|
||||
var doc = new ByteDocument(link, bytes);
|
||||
if (await IsFailure(doc)) return (false, default);
|
||||
|
||||
return (true, await Transformer(doc));
|
||||
} catch {
|
||||
return (false, default);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<(bool, T?)> TryDownload(
|
||||
IOrdered<string>[] link,
|
||||
CancellationToken ct,
|
||||
int maximumRetryCount = 7,
|
||||
IProgress<IRetryReport>? tryProgress = null) {
|
||||
if (link.Length == 0) return (false, default);
|
||||
|
||||
T? result = default;
|
||||
var attempt = 0;
|
||||
|
||||
while (attempt < maximumRetryCount) {
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
(var success, result) = await TryDownloadWithNoRetries(link[0].Data, ct);
|
||||
if (success && result is not null) return (true, result);
|
||||
|
||||
++attempt;
|
||||
tryProgress?.Report(new RetryReport(attempt, link[0].Data));
|
||||
await Task.Delay((int)Math.Pow(2, attempt) * 1000, ct);
|
||||
}
|
||||
|
||||
return (false, result);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,198 @@
|
||||
using Beam.Models;
|
||||
|
||||
namespace Beam.Downloaders;
|
||||
|
||||
public record class UnitDownloaderOptions<RawType, OutType> {
|
||||
public HttpClient Client { get; init; } = new();
|
||||
|
||||
public FailurePredicateOptions<RawType>? FailurePredicateOptions { get; init; }
|
||||
public FragmentOptions? FragmentOptions { get; init; }
|
||||
public required AsyncTransformer<RawType, OutType> AsyncTransformer { get; init; }
|
||||
public string? DownloadFolder { get; init; } = null;
|
||||
public int BufferSize { get; init; } = 80 * 1024; // 80kb
|
||||
}
|
||||
|
||||
public record class FailurePredicateOptions<RawType> {
|
||||
public required AsyncDownloadFailurePredicate<RawType>?[]? AsyncDownloadFailurePredicates { get; init; }
|
||||
public bool ProcessInParallel { get; init; } = false;
|
||||
public int? ParallelThreads { get; init; }
|
||||
}
|
||||
|
||||
public record class FragmentOptions {
|
||||
public required int FragmentSize { get; init; }
|
||||
public bool DownloadInParallel { get; init; } = false;
|
||||
public int? ParallelThreads { get; init; }
|
||||
}
|
||||
|
||||
|
||||
// ---------- UnitDownloaderOptions Builder ----------
|
||||
public sealed class UnitDownloaderOptionsBuilder<TRaw, TOut>
|
||||
{
|
||||
private HttpClient _client = new HttpClient();
|
||||
private FailurePredicateOptions<TRaw>? _failureOptions;
|
||||
private FragmentOptions? _fragmentOptions;
|
||||
private AsyncTransformer<TRaw, TOut>? _asyncTransformer;
|
||||
private string? _downloadFolder = null;
|
||||
private int _bufferSize = 80 * 1024;
|
||||
|
||||
public UnitDownloaderOptionsBuilder<TRaw, TOut> WithClient(HttpClient client)
|
||||
{
|
||||
_client = client ?? throw new System.ArgumentNullException(nameof(client));
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnitDownloaderOptionsBuilder<TRaw, TOut> WithFailurePredicateOptions(FailurePredicateOptions<TRaw>? options)
|
||||
{
|
||||
_failureOptions = options;
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnitDownloaderOptionsBuilder<TRaw, TOut> WithFailurePredicates(System.Action<FailurePredicateOptionsBuilder<TRaw>> configure)
|
||||
{
|
||||
if (configure == null) throw new System.ArgumentNullException(nameof(configure));
|
||||
var b = new FailurePredicateOptionsBuilder<TRaw>();
|
||||
configure(b);
|
||||
_failureOptions = b.Build();
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnitDownloaderOptionsBuilder<TRaw, TOut> WithFragmentOptions(FragmentOptions? options)
|
||||
{
|
||||
_fragmentOptions = options;
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnitDownloaderOptionsBuilder<TRaw, TOut> WithFragments(System.Action<FragmentOptionsBuilder> configure)
|
||||
{
|
||||
if (configure == null) throw new System.ArgumentNullException(nameof(configure));
|
||||
var b = new FragmentOptionsBuilder();
|
||||
configure(b);
|
||||
_fragmentOptions = b.Build();
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnitDownloaderOptionsBuilder<TRaw, TOut> WithAsyncTransformer(AsyncTransformer<TRaw, TOut> transformer)
|
||||
{
|
||||
_asyncTransformer = transformer ?? throw new System.ArgumentNullException(nameof(transformer));
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnitDownloaderOptionsBuilder<TRaw, TOut> WithDownloadFolder(string? downloadFolder)
|
||||
{
|
||||
_downloadFolder = downloadFolder;
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnitDownloaderOptionsBuilder<TRaw, TOut> WithBufferSize(int bytes)
|
||||
{
|
||||
if (bytes <= 0) throw new System.ArgumentOutOfRangeException(nameof(bytes));
|
||||
_bufferSize = bytes;
|
||||
return this;
|
||||
}
|
||||
|
||||
public UnitDownloaderOptions<TRaw, TOut> Build()
|
||||
{
|
||||
if (_asyncTransformer == null)
|
||||
throw new System.InvalidOperationException("AsyncTransformer must be provided.");
|
||||
|
||||
return new UnitDownloaderOptions<TRaw, TOut>
|
||||
{
|
||||
Client = _client,
|
||||
FailurePredicateOptions = _failureOptions,
|
||||
FragmentOptions = _fragmentOptions,
|
||||
AsyncTransformer = _asyncTransformer,
|
||||
DownloadFolder = _downloadFolder,
|
||||
BufferSize = _bufferSize
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- FailurePredicateOptions Builder ----------
|
||||
public sealed class FailurePredicateOptionsBuilder<TRaw>
|
||||
{
|
||||
private readonly System.Collections.Generic.List<AsyncDownloadFailurePredicate<TRaw>?> _predicates =
|
||||
new System.Collections.Generic.List<AsyncDownloadFailurePredicate<TRaw>?>();
|
||||
private bool _processInParallel = false;
|
||||
private int? _parallelThreads = null;
|
||||
|
||||
public FailurePredicateOptionsBuilder<TRaw> WithPredicate(AsyncDownloadFailurePredicate<TRaw>? predicate)
|
||||
{
|
||||
_predicates.Add(predicate);
|
||||
return this;
|
||||
}
|
||||
|
||||
public FailurePredicateOptionsBuilder<TRaw> WithPredicates(System.Collections.Generic.IEnumerable<AsyncDownloadFailurePredicate<TRaw>?> predicates)
|
||||
{
|
||||
if (predicates == null) throw new System.ArgumentNullException(nameof(predicates));
|
||||
_predicates.AddRange(predicates);
|
||||
return this;
|
||||
}
|
||||
|
||||
public FailurePredicateOptionsBuilder<TRaw> WithPredicates(params AsyncDownloadFailurePredicate<TRaw>?[] predicates)
|
||||
{
|
||||
_predicates.Clear();
|
||||
if (predicates != null) _predicates.AddRange(predicates);
|
||||
return this;
|
||||
}
|
||||
|
||||
public FailurePredicateOptionsBuilder<TRaw> WithProcessInParallel(bool value = true)
|
||||
{
|
||||
_processInParallel = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public FailurePredicateOptionsBuilder<TRaw> WithParallelThreads(int? threads)
|
||||
{
|
||||
if (threads.HasValue && threads.Value <= 0)
|
||||
throw new System.ArgumentOutOfRangeException(nameof(threads));
|
||||
_parallelThreads = threads;
|
||||
return this;
|
||||
}
|
||||
|
||||
public FailurePredicateOptions<TRaw> Build()
|
||||
{
|
||||
var arr = _predicates.Count == 0 ? [] : _predicates.ToArray();
|
||||
return new FailurePredicateOptions<TRaw>
|
||||
{
|
||||
AsyncDownloadFailurePredicates = arr,
|
||||
ProcessInParallel = _processInParallel,
|
||||
ParallelThreads = _parallelThreads
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- FragmentOptions Builder ----------
|
||||
public sealed class FragmentOptionsBuilder {
|
||||
private int? _fragmentSize;
|
||||
private bool _downloadInParallel = false;
|
||||
private int? _parallelThreads = null;
|
||||
|
||||
public FragmentOptionsBuilder WithFragmentSize(int bytes) {
|
||||
if (bytes <= 0) throw new System.ArgumentOutOfRangeException(nameof(bytes));
|
||||
_fragmentSize = bytes;
|
||||
return this;
|
||||
}
|
||||
|
||||
public FragmentOptionsBuilder WithDownloadInParallel(bool value = true) {
|
||||
_downloadInParallel = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public FragmentOptionsBuilder WithParallelThreads(int? threads) {
|
||||
if (threads.HasValue && threads.Value <= 0)
|
||||
throw new System.ArgumentOutOfRangeException(nameof(threads));
|
||||
_parallelThreads = threads;
|
||||
return this;
|
||||
}
|
||||
|
||||
public FragmentOptions Build() {
|
||||
if (!_fragmentSize.HasValue)
|
||||
throw new System.InvalidOperationException("FragmentSize must be provided.");
|
||||
|
||||
return new FragmentOptions {
|
||||
FragmentSize = _fragmentSize.Value,
|
||||
DownloadInParallel = _downloadInParallel,
|
||||
ParallelThreads = _parallelThreads
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -5,55 +5,39 @@ using HtmlAgilityPack;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Beam.Downloaders {
|
||||
public class UnitFragmentDownloader<T> : IUnitDownloader<Fragment<Ordered<T>>> {
|
||||
public UnitFragmentDownloader(HtmlWeb web,
|
||||
AsyncTransformer<HtmlDocument, T> transformer,
|
||||
AsyncDownloadFailurePredicate<HtmlDocument>?[]? failurePredicate = null,
|
||||
int fragmentSize = 4,
|
||||
ILogger? logger = null,
|
||||
IUnitDownloader<T>? internalDownloader = null) {
|
||||
Web = web;
|
||||
Transformer = transformer;
|
||||
FailurePredicate = failurePredicate;
|
||||
UnitDownloader = internalDownloader ?? new UnitDownloader<T>(Web, Transformer, FailurePredicate);
|
||||
LinksPerDownload = fragmentSize;
|
||||
Logger = logger;
|
||||
}
|
||||
public class UnitFragmentDownloader<RawType, OutType>(UnitDownloaderOptions<RawType, OutType> options,
|
||||
IUnitDownloader<OutType>? internalDownloader = null) : IUnitDownloader<Fragment<Ordered<OutType>>> where RawType : IDocument {
|
||||
|
||||
public HtmlWeb Web { get; }
|
||||
public AsyncTransformer<HtmlDocument, T> Transformer { get; }
|
||||
public AsyncDownloadFailurePredicate<HtmlDocument>?[]? FailurePredicate { get; }
|
||||
public UnitDownloaderOptions<RawType, OutType> Options { get; } = options;
|
||||
public int LinksPerDownload { get; set; }
|
||||
public ILogger? Logger { get; set; }
|
||||
private IUnitDownloader<OutType> UnitDownloader { get; } = internalDownloader ?? new UnitDownloader<RawType, OutType>(options);
|
||||
|
||||
private readonly IUnitDownloader<T> UnitDownloader;
|
||||
|
||||
async Task<(bool, Fragment<Ordered<T>>?)> IUnitDownloader<Fragment<Ordered<T>>>.TryDownload(IOrdered<string>[] link, CancellationToken ct, int maximumRetryCount, IProgress<IRetryReport>? tryProgress) {
|
||||
Fragment<Ordered<T>> fragment = new Fragment<Ordered<T>>(link.Length);
|
||||
if (!Fragment<Ordered<T>>.TryAcquireUpdater(fragment, out var updater))
|
||||
async Task<(bool, Fragment<Ordered<OutType>>?)> IUnitDownloader<Fragment<Ordered<OutType>>>.TryDownload(IOrdered<string>[] link, CancellationToken ct, int maximumRetryCount, IProgress<IDownloadReport>? downProgress, IProgress<IRetryReport>? tryProgress) {
|
||||
Fragment<Ordered<OutType>> fragment = new Fragment<Ordered<OutType>>(link.Length);
|
||||
if (!Fragment<Ordered<OutType>>.TryAcquireUpdater(fragment, out var updater))
|
||||
throw new AssertionException(Exceptions.Exceptions.fragment_locked);
|
||||
bool isFailure = false;
|
||||
await Parallel.ForEachAsync(link, async (x, pct) => {
|
||||
pct.ThrowIfCancellationRequested();
|
||||
ct.ThrowIfCancellationRequested();
|
||||
var (result, downloadedT) = await UnitDownloader.TryDownload([x], ct, maximumRetryCount, tryProgress);
|
||||
if (isFailure)
|
||||
return;
|
||||
var (result, downloadedT) = await UnitDownloader.TryDownload([x], ct, maximumRetryCount, downProgress, tryProgress);
|
||||
if (!result) {
|
||||
Interlocked.Exchange(ref isFailure, true);
|
||||
Logger?.LogError("Failed to retrieve {0} order={1}", x.Data, x.Order);
|
||||
return;
|
||||
}
|
||||
if (downloadedT == null) {
|
||||
Interlocked.Exchange(ref isFailure, true);
|
||||
Logger?.LogCritical("Failed to retrieve {0} order={1}", x.Data, x.Order);
|
||||
return;
|
||||
}
|
||||
updater(new Ordered<T>(downloadedT, x.Order));
|
||||
updater(new Ordered<OutType>(downloadedT, x.Order));
|
||||
});
|
||||
|
||||
if (!isFailure)
|
||||
Fragment<Ordered<T>>.SetComplete(fragment, true);
|
||||
Fragment<Ordered<OutType>>.SetComplete(fragment, true);
|
||||
|
||||
Fragment<Ordered<T>>.TryReleaseUpdater(fragment, updater);
|
||||
Fragment<Ordered<OutType>>.TryReleaseUpdater(fragment, updater);
|
||||
|
||||
return (!isFailure, fragment);
|
||||
|
||||
|
||||
@@ -1,74 +0,0 @@
|
||||
using Beam.Abstractions;
|
||||
using Beam.Exceptions;
|
||||
using Beam.Models;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Beam.Downloaders {
|
||||
/// <summary>
|
||||
/// Groups multiple binary downloads into a single Fragment, applying
|
||||
/// failure detection and exponential-back-off retries for each link.
|
||||
/// </summary>
|
||||
public class UnitFragmentDownloaderBinary<T>
|
||||
: IUnitDownloader<Fragment<Ordered<T>>> {
|
||||
public UnitFragmentDownloaderBinary(HttpClient client,
|
||||
AsyncTransformer<ByteDocument, T> transformer,
|
||||
AsyncDownloadFailurePredicate<ByteDocument>?[]? failurePredicate = null,
|
||||
int fragmentSize = 4,
|
||||
ILogger? logger = null,
|
||||
IUnitDownloader<T>? internalDownloader = null) {
|
||||
Client = client;
|
||||
Transformer = transformer;
|
||||
FailurePredicate = failurePredicate;
|
||||
UnitDownloader = internalDownloader
|
||||
?? new UnitDownloaderBinary<T>(Client, Transformer, FailurePredicate);
|
||||
LinksPerDownload = fragmentSize;
|
||||
Logger = logger;
|
||||
}
|
||||
|
||||
public HttpClient Client { get; }
|
||||
public AsyncTransformer<ByteDocument, T> Transformer { get; }
|
||||
public AsyncDownloadFailurePredicate<ByteDocument>?[]? FailurePredicate { get; }
|
||||
public int LinksPerDownload { get; set; }
|
||||
public ILogger? Logger { get; set; }
|
||||
|
||||
private readonly IUnitDownloader<T> UnitDownloader;
|
||||
|
||||
async Task<(bool, Fragment<Ordered<T>>?)> IUnitDownloader<Fragment<Ordered<T>>>.TryDownload(
|
||||
IOrdered<string>[] link,
|
||||
CancellationToken ct,
|
||||
int maximumRetryCount,
|
||||
IProgress<IRetryReport>? tryProgress) {
|
||||
var fragment = new Fragment<Ordered<T>>(link.Length);
|
||||
if (!Fragment<Ordered<T>>.TryAcquireUpdater(fragment, out var updater))
|
||||
throw new AssertionException(Exceptions.Exceptions.fragment_locked);
|
||||
|
||||
var isFailure = false;
|
||||
|
||||
await Parallel.ForEachAsync(link, async (orderedLink, pct) => {
|
||||
pct.ThrowIfCancellationRequested();
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
var (success, downloaded) =
|
||||
await UnitDownloader.TryDownload([orderedLink],
|
||||
ct,
|
||||
maximumRetryCount,
|
||||
tryProgress);
|
||||
|
||||
if (!success || downloaded is null) {
|
||||
Interlocked.Exchange(ref isFailure, true);
|
||||
Logger?.LogError("Failed to retrieve {Link} order={Order}",
|
||||
orderedLink.Data, orderedLink.Order);
|
||||
return;
|
||||
}
|
||||
|
||||
updater(new Ordered<T>(downloaded, orderedLink.Order));
|
||||
});
|
||||
|
||||
if (!isFailure)
|
||||
Fragment<Ordered<T>>.SetComplete(fragment, true);
|
||||
|
||||
Fragment<Ordered<T>>.TryReleaseUpdater(fragment, updater);
|
||||
return (!isFailure, fragment);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user