18c5ad83da
- Removed obsolete data providers: `AnchorCollectionDataProvider`, `ContentsDataProvider`, and others, consolidating logic into new composable providers. - Added `ComposeDataProviders`, `SelectDataProvider`, and `RelationalDataProvider` for improved flexibility and reusability. - Introduced `IManySelectionComposableDataProvider` interface to support multiple-node selection. - Enhanced `UnitDownloader` with more robust progress tracking. - Updated package references and project dependencies for consistency. - Improved error handling in `StealthConfig` initialization for better fallback on browser drivers. - Incremented project version to 2.4.5.
152 lines
6.6 KiB
C#
152 lines
6.6 KiB
C#
using Beam.Abstractions;
|
|
using Beam.Models;
|
|
using HtmlAgilityPack;
|
|
using File = System.IO.File;
|
|
|
|
namespace Beam.Downloaders {
|
|
/// <summary>
|
|
/// A download managing class that manages a singular download with failure-detection and exponential-backoff retries. This class is safe to instantiate per request.
|
|
/// </summary>
|
|
/// <typeparam name="T"></typeparam>
|
|
/// <param name="web"></param>
|
|
/// <param name="transformer"></param>
|
|
/// <param name="failurePredicate"></param>
|
|
public class UnitDownloader<RawType, OutType>(UnitDownloaderOptions<RawType, OutType> options) : IUnitDownloader<OutType> where RawType : IDocument {
|
|
public UnitDownloaderOptions<RawType, OutType> Options { get; } = options;
|
|
public HttpClient Client => Options.Client;
|
|
public virtual AsyncTransformer<RawType, OutType> Transformer => Options.AsyncTransformer;
|
|
|
|
public virtual AsyncDownloadFailurePredicate<RawType>?[]? FailurePredicates =>
|
|
Options?.FailurePredicateOptions?.AsyncDownloadFailurePredicates;
|
|
|
|
public int LinksPerDownload { get; } = 1;
|
|
|
|
protected virtual async Task DownloadToStream(string url, int bufferSize, Stream destinationStream, IProgress<IDownloadReport> progress,
|
|
CancellationToken ct) {
|
|
|
|
var stream = await Client.GetStreamAsync(url, ct);
|
|
byte[] buffer = new byte[bufferSize];
|
|
int inBuffer = 0;
|
|
long downloaded = 0;
|
|
|
|
long? remaining() {
|
|
try {
|
|
return stream.Length - downloaded;
|
|
}
|
|
catch {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
while ((inBuffer = stream.Read(buffer)) > 0) {
|
|
downloaded += inBuffer;
|
|
await destinationStream.WriteAsync(buffer.AsMemory(0, inBuffer), ct);
|
|
progress?.Report(new DownloadReport() {
|
|
BytesDownloaded = inBuffer,
|
|
BytesRemaining = remaining()
|
|
});
|
|
|
|
ct.ThrowIfCancellationRequested();
|
|
}
|
|
}
|
|
|
|
protected virtual async Task DownloadToFile(string url, int bufferSize, string path,
|
|
IProgress<IDownloadReport> progress, CancellationToken ct) {
|
|
|
|
if (!Directory.Exists(Path.GetDirectoryName(path)))
|
|
throw new InvalidOperationException(
|
|
string.Format(Exceptions.Exceptions.unit_download_directory_nonexistant, path));
|
|
await using var file = File.OpenWrite(path);
|
|
await DownloadToStream(url, bufferSize, file, progress, ct);
|
|
}
|
|
|
|
protected virtual async Task<ByteDocument> DownloadToMemory(string url, int bufferSize,
|
|
IProgress<IDownloadReport> progress, CancellationToken ct) {
|
|
|
|
await using var ms = new MemoryStream();
|
|
await DownloadToStream(url, bufferSize, ms, progress, ct);
|
|
if (!ms.TryGetBuffer(out var bytes))
|
|
throw new Exception(Exceptions.Exceptions.unit_download_invalid_memory_stream);
|
|
return new ByteDocument(url, bytes);
|
|
}
|
|
|
|
protected virtual async Task<bool> IsFailure(RawType doc, CancellationToken ct) {
|
|
if (FailurePredicates is null)
|
|
return false;
|
|
if (!(Options?.FailurePredicateOptions?.ProcessInParallel ?? false))
|
|
foreach (var pred in FailurePredicates) {
|
|
if (pred is null)
|
|
continue;
|
|
if (await pred(doc))
|
|
return true;
|
|
}
|
|
else {
|
|
var failed = false;
|
|
await Parallel.ForEachAsync(FailurePredicates, new ParallelOptions() {
|
|
MaxDegreeOfParallelism = Options?.FailurePredicateOptions?.ParallelThreads ?? 4,
|
|
CancellationToken = ct
|
|
},
|
|
async (predicate, token) => {
|
|
if (token.IsCancellationRequested)
|
|
return;
|
|
if (failed)
|
|
return;
|
|
if (predicate == null)
|
|
return;
|
|
if (await predicate(doc))
|
|
Interlocked.CompareExchange(ref failed, true, false);
|
|
}
|
|
);
|
|
return failed;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
protected virtual async Task<RawType> _Download(string link, IProgress<IDownloadReport> progress, CancellationToken ct) {
|
|
if (Options.DownloadFolder is not null && this is UnitDownloader<StringDocument, OutType>) {
|
|
var path = Path.Combine(Options.DownloadFolder, Path.GetRandomFileName());
|
|
await DownloadToFile(link, Options.BufferSize, path, progress, ct);
|
|
return (RawType)(object)new StringDocument(link, path);
|
|
}
|
|
if (this is UnitDownloader<ByteDocument, OutType>) {
|
|
return (RawType)(object)(await DownloadToMemory(link, Options.BufferSize, progress, ct));
|
|
}
|
|
throw new NotSupportedException(Exceptions.Exceptions.unit_downloader_limited_support);
|
|
}
|
|
|
|
protected virtual async Task<(bool, OutType?)> Transform(RawType download, CancellationToken ct) {
|
|
try {
|
|
if (FailurePredicates is null || !(await IsFailure(download, ct)))
|
|
return (true, await Transformer(download));
|
|
else
|
|
return (false, default);
|
|
} catch(Exception) {
|
|
return (false, default);
|
|
}
|
|
}
|
|
|
|
public async Task<(bool, OutType?)> TryDownload(IOrdered<string>[] link, CancellationToken ct, int maximumRetryCount = 7, IProgress<IDownloadReport>? downProgress = null, IProgress<IRetryReport>? tryProgress = null) {
|
|
if (link.Length == 0)
|
|
return (false, default);
|
|
|
|
downProgress ??= new Progress<IDownloadReport>();
|
|
|
|
OutType? ot = default;
|
|
int tryCount = 0;
|
|
while (tryCount < maximumRetryCount) {
|
|
ct.ThrowIfCancellationRequested();
|
|
var rt = await _Download(link[0].Data, downProgress, ct);
|
|
(var success, ot) = await Transform(rt, ct);
|
|
if (success && ot != null)
|
|
return (true, ot);
|
|
++tryCount;
|
|
tryProgress?.Report(new RetryReport(tryCount, link[0].Data));
|
|
await Task.Delay((int)Math.Pow(2, tryCount) * 1000);
|
|
}
|
|
|
|
return (false, ot);
|
|
}
|
|
}
|
|
}
|