f52aa6123b
Replaces generic RawType with ByteDocument in downloaders and context classes, simplifying type usage. Adds builder classes for FailurePredicateOptions, FragmentOptions, SkipPredicateOptions, and UnitDownloaderOptions to improve configuration flexibility. Introduces DownloadTarget enum and SkipPredicate delegate for more granular download control. Refactors Fluent API interfaces and implementations to remove RawType generics and streamline usage. Adds Playwright and Stealth download strategies for extensibility.
102 lines
5.8 KiB
C#
102 lines
5.8 KiB
C#
using Beam.Abstractions;
|
|
using Beam.Models;
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
namespace Beam.Downloaders {
|
|
public class SequentialFragmentDownloader<OutType> : SequentialDownloader<Fragment<Ordered<OutType>>> {
|
|
public SequentialFragmentDownloader(
|
|
DownloadContext context,
|
|
Func<DownloadContext, IUnitDownloader<Fragment<Ordered<OutType>>>> getUnitDownloader,
|
|
ILogger? logger = null)
|
|
: base(context, getUnitDownloader, logger) {}
|
|
}
|
|
|
|
// public class SequentialChunkDownloader<T> : IAsyncEnumerator<Fragment<Ordered<T>>> {
|
|
// public Fragment<Ordered<T>> Current { get; protected set; }
|
|
// public DownloadContext<T> Context { get; }
|
|
// protected IEnumerator<DocumentSourceLink> LinksEnumerator;
|
|
// protected ConcurrentQueue<Fragment<Ordered<T>>> DownloadQueue { get; set; } = [];
|
|
// public int ChunkSize { get; }
|
|
|
|
// private ILogger? Logger => Context.DownloadLogger;
|
|
|
|
// public UnitDownloader<T> GetUnitDownloader()
|
|
// => new(Context.Web, Context.AsyncTranformer, Context.AsyncFailurePredicates);
|
|
|
|
// public SequentialChunkDownloader(DownloadContext<T> context, int chunkSize) {
|
|
// Context = context;
|
|
// LinksEnumerator = Context.Links.GetEnumerator();
|
|
// Current = new Fragment<Ordered<T>>(0);
|
|
// ChunkSize = chunkSize;
|
|
// }
|
|
|
|
// public ValueTask DisposeAsync() {
|
|
// GC.SuppressFinalize(this);
|
|
// return ValueTask.CompletedTask;
|
|
// }
|
|
|
|
// protected Task<bool>? DownloadsTask = null;
|
|
// protected virtual async Task<bool> ProcessDownloads() {
|
|
// if (DownloadQueue.IsEmpty)
|
|
// return true;
|
|
// if (DownloadsTask is null) {
|
|
// DownloadsTask = Task.Run(async () => {
|
|
// if (!DownloadQueue.TryDequeue(out var fragment))
|
|
// return true; // no fragments left, likely race condition but return true as technically all items have been downloaded
|
|
// var unit = GetUnitDownloader(); // instantiates unit downloader per request (okay)
|
|
// if (!Fragment<Ordered<T>>.TryAcquireUpdater(fragment, out var updater)) { // gets the add method for the current fragment
|
|
// Logger?.LogError("Failed to acquire updater for fragment {{{}}}", fragment.GetHashCode());
|
|
// return false; // fragment is unsafe to modify
|
|
// }
|
|
// try {
|
|
// var links = Enumerable.Range(0, ChunkSize).Select((x) => {
|
|
// if (!LinksEnumerator.MoveNext())
|
|
// return new Ordered<DocumentSourceLink>(DocumentSourceLink.InvalidLink, -1); // stops link collection if end-of-links is reached
|
|
// return new Ordered<DocumentSourceLink>(LinksEnumerator.Current, x);
|
|
// }).Where((x) => x.Data != DocumentSourceLink.InvalidLink); // filter invalid links
|
|
// await Parallel.ForEachAsync(links, async (x, ct) => {
|
|
// Logger?.LogInformation("Started download for {} order={}", x.Data.Link, x.Order);
|
|
// var (result, downloadedT) = await unit.TryDownload( // download (parallel) objects
|
|
// x.Data.Link.ToString(), // use link from links collection (exposed as x)
|
|
// ct, // use ct provided with method call
|
|
// tryProgress: Context.RetryReporter);
|
|
// if (!result) { // download failure (soft because it was detected)
|
|
// Logger?.LogError("Failed to retrieve {} order={}", x.Data.Link, x.Order);
|
|
// return;
|
|
// }
|
|
// if (downloadedT is null) { // download failure (hard because it was not detected)
|
|
// Logger?.LogCritical("Failed to retrieve {} order={}", x.Data.Link, x.Order);
|
|
// return;
|
|
// }
|
|
// Logger?.LogInformation("Retrieved {} order={} successfully", x.Data.Link, x.Order);
|
|
// updater(new Ordered<T>(downloadedT, x.Order)); // update the fragment
|
|
// });
|
|
// Fragment<Ordered<T>>.SetComplete(fragment, true);
|
|
// } finally {
|
|
// Fragment<Ordered<T>>.TryReleaseUpdater(fragment, updater); // returns updater to allow modification
|
|
// }
|
|
|
|
|
|
// return fragment.Size == fragment.MaxSize;
|
|
// });
|
|
// }
|
|
// if (DownloadsTask.IsCompleted) {
|
|
// DownloadsTask = null;
|
|
// return await ProcessDownloads();
|
|
// }
|
|
// return true; // if task is still processing return should be neither true or false...
|
|
// }
|
|
|
|
// public async ValueTask<bool> MoveNextAsync() {
|
|
// if (Current.IsComplete && Current.Size < Current.MaxSize)
|
|
// return false; // if a fragment is marked complete despite being unsaturated, we've run out links!
|
|
// if (DownloadQueue.Count == 0) {
|
|
// Current = new Fragment<Ordered<T>>(ChunkSize);
|
|
// DownloadQueue.Enqueue(Current);
|
|
// }
|
|
|
|
// return await ProcessDownloads();
|
|
// }
|
|
// }
|
|
}
|