refactor: modularize Beam into new projects and interfaces
- Introduced modularity by splitting Beam into new projects: Beam.Abstractions, Beam.Models, and Beam.Downloaders. - Refactored existing classes into appropriate namespaces and projects. - Replaced specific implementations with abstractions (e.g., SourceLinkBuilder to LinkBuilder, State to IState, etc.). - Updated interfaces: added ITemplate, IArticleData, IDownloadReport, and others for improved extensibility. - Removed deprecated classes like SourceLinkBuilder and StateChangerFactory. - Enhanced link handling in downloaders by refactoring to use `string` over `SourceLink`. - Consolidated shared logic under Beam.Abstractions.
This commit is contained in:
@@ -0,0 +1,74 @@
|
||||
using Beam.Abstractions;
|
||||
using Beam.Models;
|
||||
using HtmlAgilityPack;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Beam.Downloaders {
|
||||
//public delegate T HtmlTransformer<out T>(HtmlDocument doc);
|
||||
//public delegate Task<T> AsyncHtmlTransformer<T>(HtmlDocument doc);
|
||||
//public delegate Task<T> AsyncBinaryTransformer<T>(byte[] bin);
|
||||
|
||||
public class DownloadContext<RawType> {
|
||||
private bool disposedValue;
|
||||
|
||||
public HttpClient Client { get; }
|
||||
public HtmlWeb Web { get; }
|
||||
public IProgress<IDownloadReport>? DownloadReporter { get; set; }
|
||||
public IProgress<IRetryReport>? RetryReporter { get; set; }
|
||||
public AsyncDownloadFailurePredicate<RawType>?[]? AsyncFailurePredicates { get; }
|
||||
public TimeSpan TimeOut { get; set; }
|
||||
public IEnumerable<string> Links { get; }
|
||||
public CancellationToken CancellationToken { get; }
|
||||
public DocumentCache Cache { get; private set; } = [];
|
||||
public ILogger? DownloadLogger { get; set; }
|
||||
|
||||
public DownloadContext(HtmlWeb web,
|
||||
HttpClient client,
|
||||
IEnumerable<string> links,
|
||||
CancellationToken cancellationToken = default,
|
||||
IProgress<IDownloadReport>? downloadReporter = null,
|
||||
IProgress<IRetryReport>? retryReporter = null,
|
||||
AsyncDownloadFailurePredicate<RawType>?[]? asyncFailurePredicates = null,
|
||||
TimeSpan? timeOut = null,
|
||||
ILogger? downloadLogger = null) {
|
||||
ArgumentNullException.ThrowIfNull(web, nameof(web));
|
||||
ArgumentNullException.ThrowIfNull(links, nameof(links));
|
||||
|
||||
Web = web;
|
||||
Client = client;
|
||||
Links = links;
|
||||
CancellationToken = cancellationToken;
|
||||
DownloadReporter = downloadReporter;
|
||||
RetryReporter = retryReporter;
|
||||
AsyncFailurePredicates = asyncFailurePredicates;
|
||||
TimeOut = timeOut ?? TimeSpan.FromMinutes(1);
|
||||
DownloadLogger = downloadLogger;
|
||||
}
|
||||
|
||||
protected virtual void Dispose(bool disposing) {
|
||||
if (!disposedValue) {
|
||||
if (disposing) {
|
||||
// TODO: dispose managed state (managed objects)
|
||||
Cache = null;
|
||||
}
|
||||
|
||||
// TODO: free unmanaged resources (unmanaged objects) and override finalizer
|
||||
// TODO: set large fields to null
|
||||
disposedValue = true;
|
||||
}
|
||||
}
|
||||
|
||||
// // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources
|
||||
// ~DownloadContext()
|
||||
// {
|
||||
// // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
|
||||
// Dispose(disposing: false);
|
||||
// }
|
||||
|
||||
public void Dispose() {
|
||||
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
|
||||
Dispose(disposing: true);
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,118 @@
|
||||
using Beam.Abstractions;
|
||||
using Beam.Models;
|
||||
using HtmlAgilityPack;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Beam.Downloaders {
|
||||
|
||||
public class DownloadContextBuilder<RawType> {
|
||||
private HtmlWeb _web;
|
||||
private HttpClient _client;
|
||||
private IProgress<IDownloadReport>? _downloadReporter;
|
||||
private IProgress<IRetryReport>? _retryReporter;
|
||||
private AsyncDownloadFailurePredicate<RawType>?[] _asyncFailurePredicates = [];
|
||||
private TimeSpan _timeOut;
|
||||
private IEnumerable<string> _links;
|
||||
private CancellationToken _cancellationToken;
|
||||
private DocumentCache _cache;
|
||||
private ILogger? _downloadLogger;
|
||||
|
||||
public DownloadContextBuilder(HttpClient? client = null, HtmlWeb? web = null) {
|
||||
// You can initialize defaults here if needed, e.g.:
|
||||
// _timeOut = TimeSpan.FromSeconds(30);
|
||||
// _cancellationToken = CancellationToken.None;
|
||||
_client = client ?? new();
|
||||
_web = web ?? new();
|
||||
_links = [];
|
||||
}
|
||||
|
||||
public DownloadContextBuilder<RawType> WithWeb(HtmlWeb web) {
|
||||
_web = web;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DownloadContextBuilder<RawType> WithClient(HttpClient client) {
|
||||
_client = client;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DownloadContextBuilder<RawType> WithDownloadReporter(IProgress<IDownloadReport> downloadReporter) {
|
||||
_downloadReporter = downloadReporter;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DownloadContextBuilder<RawType> WithRetryReporter(IProgress<IRetryReport> retryReporter) {
|
||||
_retryReporter = retryReporter;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DownloadContextBuilder<RawType> WithAsyncFailurePredicates(params AsyncDownloadFailurePredicate<RawType>[] predicates) {
|
||||
_asyncFailurePredicates = predicates;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DownloadContextBuilder<RawType> WithTimeOut(TimeSpan timeOut) {
|
||||
_timeOut = timeOut;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DownloadContextBuilder<RawType> WithLinks(IEnumerable<string> links) {
|
||||
_links = links;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DownloadContextBuilder<RawType> WithCancellationToken(CancellationToken cancellationToken) {
|
||||
_cancellationToken = cancellationToken;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DownloadContextBuilder<RawType> WithCache(DocumentCache cache) {
|
||||
_cache = cache;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DownloadContextBuilder<RawType> WithDownloadLogger(ILogger downloadLogger) {
|
||||
_downloadLogger = downloadLogger;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
public DownloadContext<RawType> Build() {
|
||||
// Construct the DownloadContext<T> using the collected values.
|
||||
var context = new DownloadContext<RawType>(
|
||||
web: _web,
|
||||
client: _client,
|
||||
links: _links,
|
||||
cancellationToken: _cancellationToken,
|
||||
downloadReporter: _downloadReporter,
|
||||
retryReporter: _retryReporter,
|
||||
asyncFailurePredicates: _asyncFailurePredicates,
|
||||
timeOut: _timeOut,
|
||||
downloadLogger: _downloadLogger
|
||||
);
|
||||
|
||||
//// Assign the DocumentCache if it's been set in the builder.
|
||||
//// (Even though Cache has a private setter, this code assumes builder
|
||||
//// is in the same assembly or that the setter will be made internal.
|
||||
//// Otherwise, remove or adjust this line.)
|
||||
//context.Cache = _cache;
|
||||
|
||||
return context;
|
||||
}
|
||||
|
||||
public static DownloadContextBuilder<RawType> FromContext(DownloadContext<RawType> existing) {
|
||||
if (existing == null) throw new ArgumentNullException(nameof(existing));
|
||||
|
||||
return new DownloadContextBuilder<RawType>(existing.Client, existing.Web)
|
||||
.WithLinks(existing.Links)
|
||||
.WithCancellationToken(existing.CancellationToken)
|
||||
.WithDownloadReporter(existing.DownloadReporter!)
|
||||
.WithRetryReporter(existing.RetryReporter!)
|
||||
.WithAsyncFailurePredicates(existing.AsyncFailurePredicates ?? Array.Empty<AsyncDownloadFailurePredicate<RawType>>())
|
||||
.WithTimeOut(existing.TimeOut)
|
||||
.WithDownloadLogger(existing.DownloadLogger!)
|
||||
.WithCache(existing.Cache);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
using Beam.Abstractions;
|
||||
using Beam.Models;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Beam.Downloaders {
|
||||
public class SequentialDownloader<RawType, OutType> : IAsyncEnumerator<OutType> {
|
||||
public OutType Current { get; protected set; }
|
||||
public DownloadContext<RawType> Context { get; }
|
||||
public ILogger? Logger { get; set; }
|
||||
public int LastOrder { get; set; } = 0;
|
||||
|
||||
protected IEnumerator<string> LinksEnumerator;
|
||||
|
||||
public Func<IUnitDownloader<OutType>> GetUnitDownloader { get; set; }
|
||||
|
||||
public SequentialDownloader(DownloadContext<RawType> context, Func<DownloadContext<RawType>, IUnitDownloader<OutType>> getUnitDownloader, ILogger? logger = null) {
|
||||
Context = context;
|
||||
Logger = logger;
|
||||
LinksEnumerator = Context.Links.GetEnumerator();
|
||||
|
||||
try {
|
||||
LinksEnumerator.Reset();
|
||||
} catch (NotSupportedException) {
|
||||
Logger?.LogWarning("Enumerator of type {} does not support resets. This may cause buggy behavior", LinksEnumerator.GetType());
|
||||
}
|
||||
Current = default(OutType);
|
||||
GetUnitDownloader = () => getUnitDownloader(Context);
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync() {
|
||||
GC.SuppressFinalize(this);
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
public async ValueTask<bool> MoveNextAsync() {
|
||||
if (!LinksEnumerator.MoveNext())
|
||||
return false;
|
||||
|
||||
//Logger?.LogInformation("MoveNextAsync()");
|
||||
var unit = GetUnitDownloader(); // safe to instantiate per request.
|
||||
var idealLinkCount = unit.LinksPerDownload;
|
||||
List<Ordered<string>> links = [];
|
||||
|
||||
//Logger?.LogInformation("MoveNextAsync() \n\t -> Links.Current = {} ", LinksEnumerator.Current.Link.AbsoluteUri);
|
||||
links.Add(new Ordered<string>(LinksEnumerator.Current, LastOrder++));
|
||||
|
||||
while (LinksEnumerator.MoveNext() && !string.IsNullOrWhiteSpace(LinksEnumerator.Current) && links.Count < idealLinkCount)
|
||||
links.Add(new Ordered<string>(LinksEnumerator.Current, LastOrder++));
|
||||
//Logger?.LogInformation("MoveNextAsync() \n\t -> links.Count = {} ", links.Count);
|
||||
if (links.Count == 0) {
|
||||
Logger?.LogInformation("Out of links!");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (links.Any((x) => string.IsNullOrWhiteSpace(x.Data)))
|
||||
return false;
|
||||
|
||||
var (result, downloadedT) = await unit.TryDownload(
|
||||
links.ToArray(),
|
||||
Context.CancellationToken,
|
||||
tryProgress: Context.RetryReporter);
|
||||
|
||||
if (!result) {
|
||||
Logger?.LogWarning("Failed to download Unit<{}>", typeof(OutType).Name);
|
||||
return false; // unit download failed
|
||||
}
|
||||
if (downloadedT is null) {
|
||||
Logger?.LogWarning("Failed to download Unit<{}>", typeof(OutType).Name);
|
||||
return false; // unit download failed
|
||||
}
|
||||
|
||||
Current = downloadedT;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,101 @@
|
||||
using Beam.Abstractions;
|
||||
using Beam.Models;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Beam.Downloaders {
|
||||
public class SequentialFragmentDownloader<RawType, OutType> : SequentialDownloader<RawType, Fragment<Ordered<OutType>>> {
|
||||
public SequentialFragmentDownloader(
|
||||
DownloadContext<RawType> context,
|
||||
Func<DownloadContext<RawType>, IUnitDownloader<Fragment<Ordered<OutType>>>> getUnitDownloader,
|
||||
ILogger? logger = null)
|
||||
: base(context, getUnitDownloader, logger) {}
|
||||
}
|
||||
|
||||
// public class SequentialChunkDownloader<T> : IAsyncEnumerator<Fragment<Ordered<T>>> {
|
||||
// public Fragment<Ordered<T>> Current { get; protected set; }
|
||||
// public DownloadContext<T> Context { get; }
|
||||
// protected IEnumerator<DocumentSourceLink> LinksEnumerator;
|
||||
// protected ConcurrentQueue<Fragment<Ordered<T>>> DownloadQueue { get; set; } = [];
|
||||
// public int ChunkSize { get; }
|
||||
|
||||
// private ILogger? Logger => Context.DownloadLogger;
|
||||
|
||||
// public UnitDownloader<T> GetUnitDownloader()
|
||||
// => new(Context.Web, Context.AsyncTranformer, Context.AsyncFailurePredicates);
|
||||
|
||||
// public SequentialChunkDownloader(DownloadContext<T> context, int chunkSize) {
|
||||
// Context = context;
|
||||
// LinksEnumerator = Context.Links.GetEnumerator();
|
||||
// Current = new Fragment<Ordered<T>>(0);
|
||||
// ChunkSize = chunkSize;
|
||||
// }
|
||||
|
||||
// public ValueTask DisposeAsync() {
|
||||
// GC.SuppressFinalize(this);
|
||||
// return ValueTask.CompletedTask;
|
||||
// }
|
||||
|
||||
// protected Task<bool>? DownloadsTask = null;
|
||||
// protected virtual async Task<bool> ProcessDownloads() {
|
||||
// if (DownloadQueue.IsEmpty)
|
||||
// return true;
|
||||
// if (DownloadsTask is null) {
|
||||
// DownloadsTask = Task.Run(async () => {
|
||||
// if (!DownloadQueue.TryDequeue(out var fragment))
|
||||
// return true; // no fragments left, likely race condition but return true as technically all items have been downloaded
|
||||
// var unit = GetUnitDownloader(); // instantiates unit downloader per request (okay)
|
||||
// if (!Fragment<Ordered<T>>.TryAcquireUpdater(fragment, out var updater)) { // gets the add method for the current fragment
|
||||
// Logger?.LogError("Failed to acquire updater for fragment {{{}}}", fragment.GetHashCode());
|
||||
// return false; // fragment is unsafe to modify
|
||||
// }
|
||||
// try {
|
||||
// var links = Enumerable.Range(0, ChunkSize).Select((x) => {
|
||||
// if (!LinksEnumerator.MoveNext())
|
||||
// return new Ordered<DocumentSourceLink>(DocumentSourceLink.InvalidLink, -1); // stops link collection if end-of-links is reached
|
||||
// return new Ordered<DocumentSourceLink>(LinksEnumerator.Current, x);
|
||||
// }).Where((x) => x.Data != DocumentSourceLink.InvalidLink); // filter invalid links
|
||||
// await Parallel.ForEachAsync(links, async (x, ct) => {
|
||||
// Logger?.LogInformation("Started download for {} order={}", x.Data.Link, x.Order);
|
||||
// var (result, downloadedT) = await unit.TryDownload( // download (parallel) objects
|
||||
// x.Data.Link.ToString(), // use link from links collection (exposed as x)
|
||||
// ct, // use ct provided with method call
|
||||
// tryProgress: Context.RetryReporter);
|
||||
// if (!result) { // download failure (soft because it was detected)
|
||||
// Logger?.LogError("Failed to retrieve {} order={}", x.Data.Link, x.Order);
|
||||
// return;
|
||||
// }
|
||||
// if (downloadedT is null) { // download failure (hard because it was not detected)
|
||||
// Logger?.LogCritical("Failed to retrieve {} order={}", x.Data.Link, x.Order);
|
||||
// return;
|
||||
// }
|
||||
// Logger?.LogInformation("Retrieved {} order={} successfully", x.Data.Link, x.Order);
|
||||
// updater(new Ordered<T>(downloadedT, x.Order)); // update the fragment
|
||||
// });
|
||||
// Fragment<Ordered<T>>.SetComplete(fragment, true);
|
||||
// } finally {
|
||||
// Fragment<Ordered<T>>.TryReleaseUpdater(fragment, updater); // returns updater to allow modification
|
||||
// }
|
||||
|
||||
|
||||
// return fragment.Size == fragment.MaxSize;
|
||||
// });
|
||||
// }
|
||||
// if (DownloadsTask.IsCompleted) {
|
||||
// DownloadsTask = null;
|
||||
// return await ProcessDownloads();
|
||||
// }
|
||||
// return true; // if task is still processing return should be neither true or false...
|
||||
// }
|
||||
|
||||
// public async ValueTask<bool> MoveNextAsync() {
|
||||
// if (Current.IsComplete && Current.Size < Current.MaxSize)
|
||||
// return false; // if a fragment is marked complete despite being unsaturated, we've run out links!
|
||||
// if (DownloadQueue.Count == 0) {
|
||||
// Current = new Fragment<Ordered<T>>(ChunkSize);
|
||||
// DownloadQueue.Enqueue(Current);
|
||||
// }
|
||||
|
||||
// return await ProcessDownloads();
|
||||
// }
|
||||
// }
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
using Beam.Abstractions;
|
||||
using Beam.Models;
|
||||
using HtmlAgilityPack;
|
||||
|
||||
namespace Beam.Downloaders {
|
||||
/// <summary>
|
||||
/// A download managing class that manages a singular download with failure-detection and exponential-backoff retries. This class is safe to instantiate per request.
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="web"></param>
|
||||
/// <param name="transformer"></param>
|
||||
/// <param name="failurePredicate"></param>
|
||||
public class UnitDownloader<T>(HtmlWeb web, AsyncTransformer<HtmlDocument, T> transformer, AsyncDownloadFailurePredicate<HtmlDocument>?[]? failurePredicate = null) : IUnitDownloader<T> {
|
||||
public HtmlWeb Web { get; } = web;
|
||||
public virtual AsyncTransformer<HtmlDocument, T> Transformer { get; } = transformer;
|
||||
public virtual AsyncDownloadFailurePredicate<HtmlDocument>?[]? FailurePredicates { get; } = failurePredicate;
|
||||
|
||||
public int LinksPerDownload { get; } = 1;
|
||||
|
||||
protected virtual async Task<bool> IsFailure(HtmlDocument doc) {
|
||||
if (FailurePredicates is null)
|
||||
return false;
|
||||
var failed = false;
|
||||
await Parallel.ForEachAsync(FailurePredicates, async (x, ct) => {
|
||||
if (failed == true)
|
||||
return;
|
||||
if (x is null)
|
||||
return;
|
||||
if (await x(doc))
|
||||
failed = true;
|
||||
});
|
||||
|
||||
return failed;
|
||||
}
|
||||
|
||||
protected virtual async Task<(bool, T?)> TryDownloadWithNoRetries(string link, CancellationToken ct) {
|
||||
try {
|
||||
var html = await Web.LoadFromWebAsync(link, ct);
|
||||
if (FailurePredicates is null || !(await IsFailure(html)))
|
||||
return (true, await Transformer(html));
|
||||
else
|
||||
return (false, default);
|
||||
} catch(Exception) {
|
||||
return (false, default);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<(bool, T?)> TryDownload(IOrdered<string>[] link, CancellationToken ct, int maximumRetryCount = 7, IProgress<IRetryReport>? tryProgress = null) {
|
||||
if (link.Length == 0)
|
||||
return (false, default);
|
||||
|
||||
T? doc = default;
|
||||
int tryCount = 0;
|
||||
while (tryCount < maximumRetryCount) {
|
||||
ct.ThrowIfCancellationRequested();
|
||||
(var success, doc) = await TryDownloadWithNoRetries(link[0].Data, ct);
|
||||
if (success && doc != null)
|
||||
return (true, doc);
|
||||
++tryCount;
|
||||
tryProgress?.Report(new RetryReport(tryCount, link[0].Data));
|
||||
await Task.Delay((int)Math.Pow(2, tryCount) * 1000);
|
||||
}
|
||||
|
||||
return (false, doc);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
using Beam.Abstractions;
|
||||
using Beam.Models;
|
||||
|
||||
namespace Beam.Downloaders {
|
||||
/// <summary>
|
||||
/// A download-managing class that retrieves binary data through <see cref="HttpClient"/>,
|
||||
/// applies an <see cref="AsyncBinaryTransformer{T}"/>, and supports failure detection
|
||||
/// plus exponential-back-off retries. Safe to instantiate per request.
|
||||
/// </summary>
|
||||
public class UnitDownloaderBinary<T>(
|
||||
HttpClient client,
|
||||
AsyncTransformer<ByteDocument, T> transformer,
|
||||
AsyncDownloadFailurePredicate<ByteDocument>?[]? failurePredicates = null)
|
||||
: IUnitDownloader<T> {
|
||||
public HttpClient Client { get; } = client;
|
||||
public virtual AsyncTransformer<ByteDocument, T> Transformer { get; } = transformer;
|
||||
public virtual AsyncDownloadFailurePredicate<ByteDocument>?[]? FailurePredicates { get; } = failurePredicates;
|
||||
|
||||
public int LinksPerDownload { get; } = 1;
|
||||
|
||||
/// <summary>Runs all configured failure predicates in parallel on the raw HTTP response.</summary>
|
||||
protected virtual async Task<bool> IsFailure(ByteDocument response) {
|
||||
if (FailurePredicates is null) return false;
|
||||
|
||||
var failed = false;
|
||||
await Parallel.ForEachAsync(FailurePredicates, async (pred, ct) => {
|
||||
if (failed || pred is null) return;
|
||||
if (await pred(response))
|
||||
failed = true;
|
||||
});
|
||||
return failed;
|
||||
}
|
||||
|
||||
/// <summary>One attempt without retries or back-off.</summary>
|
||||
protected virtual async Task<(bool Success, T? Result)> TryDownloadWithNoRetries(string link, CancellationToken ct) {
|
||||
try {
|
||||
using var response = await Client.GetAsync(link, HttpCompletionOption.ResponseHeadersRead, ct);
|
||||
if (!response.IsSuccessStatusCode) return (false, default);
|
||||
|
||||
var bytes = await response.Content.ReadAsByteArrayAsync(ct);
|
||||
var doc = new ByteDocument(link, bytes);
|
||||
if (await IsFailure(doc)) return (false, default);
|
||||
|
||||
return (true, await Transformer(doc));
|
||||
} catch {
|
||||
return (false, default);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<(bool, T?)> TryDownload(
|
||||
IOrdered<string>[] link,
|
||||
CancellationToken ct,
|
||||
int maximumRetryCount = 7,
|
||||
IProgress<IRetryReport>? tryProgress = null) {
|
||||
if (link.Length == 0) return (false, default);
|
||||
|
||||
T? result = default;
|
||||
var attempt = 0;
|
||||
|
||||
while (attempt < maximumRetryCount) {
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
(var success, result) = await TryDownloadWithNoRetries(link[0].Data, ct);
|
||||
if (success && result is not null) return (true, result);
|
||||
|
||||
++attempt;
|
||||
tryProgress?.Report(new RetryReport(attempt, link[0].Data));
|
||||
await Task.Delay((int)Math.Pow(2, attempt) * 1000, ct);
|
||||
}
|
||||
|
||||
return (false, result);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,62 @@
|
||||
using Beam.Abstractions;
|
||||
using Beam.Exceptions;
|
||||
using Beam.Models;
|
||||
using HtmlAgilityPack;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Beam.Downloaders {
|
||||
public class UnitFragmentDownloader<T> : IUnitDownloader<Fragment<Ordered<T>>> {
|
||||
public UnitFragmentDownloader(HtmlWeb web,
|
||||
AsyncTransformer<HtmlDocument, T> transformer,
|
||||
AsyncDownloadFailurePredicate<HtmlDocument>?[]? failurePredicate = null,
|
||||
int fragmentSize = 4,
|
||||
ILogger? logger = null,
|
||||
IUnitDownloader<T>? internalDownloader = null) {
|
||||
Web = web;
|
||||
Transformer = transformer;
|
||||
FailurePredicate = failurePredicate;
|
||||
UnitDownloader = internalDownloader ?? new UnitDownloader<T>(Web, Transformer, FailurePredicate);
|
||||
LinksPerDownload = fragmentSize;
|
||||
Logger = logger;
|
||||
}
|
||||
|
||||
public HtmlWeb Web { get; }
|
||||
public AsyncTransformer<HtmlDocument, T> Transformer { get; }
|
||||
public AsyncDownloadFailurePredicate<HtmlDocument>?[]? FailurePredicate { get; }
|
||||
public int LinksPerDownload { get; set; }
|
||||
public ILogger? Logger { get; set; }
|
||||
|
||||
private readonly IUnitDownloader<T> UnitDownloader;
|
||||
|
||||
async Task<(bool, Fragment<Ordered<T>>?)> IUnitDownloader<Fragment<Ordered<T>>>.TryDownload(IOrdered<string>[] link, CancellationToken ct, int maximumRetryCount, IProgress<IRetryReport>? tryProgress) {
|
||||
Fragment<Ordered<T>> fragment = new Fragment<Ordered<T>>(link.Length);
|
||||
if (!Fragment<Ordered<T>>.TryAcquireUpdater(fragment, out var updater))
|
||||
throw new AssertionException(Exceptions.Exceptions.fragment_locked);
|
||||
bool isFailure = false;
|
||||
await Parallel.ForEachAsync(link, async (x, pct) => {
|
||||
pct.ThrowIfCancellationRequested();
|
||||
ct.ThrowIfCancellationRequested();
|
||||
var (result, downloadedT) = await UnitDownloader.TryDownload([x], ct, maximumRetryCount, tryProgress);
|
||||
if (!result) {
|
||||
Interlocked.Exchange(ref isFailure, true);
|
||||
Logger?.LogError("Failed to retrieve {0} order={1}", x.Data, x.Order);
|
||||
return;
|
||||
}
|
||||
if (downloadedT == null) {
|
||||
Interlocked.Exchange(ref isFailure, true);
|
||||
Logger?.LogCritical("Failed to retrieve {0} order={1}", x.Data, x.Order);
|
||||
return;
|
||||
}
|
||||
updater(new Ordered<T>(downloadedT, x.Order));
|
||||
});
|
||||
|
||||
if (!isFailure)
|
||||
Fragment<Ordered<T>>.SetComplete(fragment, true);
|
||||
|
||||
Fragment<Ordered<T>>.TryReleaseUpdater(fragment, updater);
|
||||
|
||||
return (!isFailure, fragment);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
using Beam.Abstractions;
|
||||
using Beam.Exceptions;
|
||||
using Beam.Models;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Beam.Downloaders {
|
||||
/// <summary>
|
||||
/// Groups multiple binary downloads into a single Fragment, applying
|
||||
/// failure detection and exponential-back-off retries for each link.
|
||||
/// </summary>
|
||||
public class UnitFragmentDownloaderBinary<T>
|
||||
: IUnitDownloader<Fragment<Ordered<T>>> {
|
||||
public UnitFragmentDownloaderBinary(HttpClient client,
|
||||
AsyncTransformer<ByteDocument, T> transformer,
|
||||
AsyncDownloadFailurePredicate<ByteDocument>?[]? failurePredicate = null,
|
||||
int fragmentSize = 4,
|
||||
ILogger? logger = null,
|
||||
IUnitDownloader<T>? internalDownloader = null) {
|
||||
Client = client;
|
||||
Transformer = transformer;
|
||||
FailurePredicate = failurePredicate;
|
||||
UnitDownloader = internalDownloader
|
||||
?? new UnitDownloaderBinary<T>(Client, Transformer, FailurePredicate);
|
||||
LinksPerDownload = fragmentSize;
|
||||
Logger = logger;
|
||||
}
|
||||
|
||||
public HttpClient Client { get; }
|
||||
public AsyncTransformer<ByteDocument, T> Transformer { get; }
|
||||
public AsyncDownloadFailurePredicate<ByteDocument>?[]? FailurePredicate { get; }
|
||||
public int LinksPerDownload { get; set; }
|
||||
public ILogger? Logger { get; set; }
|
||||
|
||||
private readonly IUnitDownloader<T> UnitDownloader;
|
||||
|
||||
async Task<(bool, Fragment<Ordered<T>>?)> IUnitDownloader<Fragment<Ordered<T>>>.TryDownload(
|
||||
IOrdered<string>[] link,
|
||||
CancellationToken ct,
|
||||
int maximumRetryCount,
|
||||
IProgress<IRetryReport>? tryProgress) {
|
||||
var fragment = new Fragment<Ordered<T>>(link.Length);
|
||||
if (!Fragment<Ordered<T>>.TryAcquireUpdater(fragment, out var updater))
|
||||
throw new AssertionException(Exceptions.Exceptions.fragment_locked);
|
||||
|
||||
var isFailure = false;
|
||||
|
||||
await Parallel.ForEachAsync(link, async (orderedLink, pct) => {
|
||||
pct.ThrowIfCancellationRequested();
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
var (success, downloaded) =
|
||||
await UnitDownloader.TryDownload([orderedLink],
|
||||
ct,
|
||||
maximumRetryCount,
|
||||
tryProgress);
|
||||
|
||||
if (!success || downloaded is null) {
|
||||
Interlocked.Exchange(ref isFailure, true);
|
||||
Logger?.LogError("Failed to retrieve {Link} order={Order}",
|
||||
orderedLink.Data, orderedLink.Order);
|
||||
return;
|
||||
}
|
||||
|
||||
updater(new Ordered<T>(downloaded, orderedLink.Order));
|
||||
});
|
||||
|
||||
if (!isFailure)
|
||||
Fragment<Ordered<T>>.SetComplete(fragment, true);
|
||||
|
||||
Fragment<Ordered<T>>.TryReleaseUpdater(fragment, updater);
|
||||
return (!isFailure, fragment);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user