using Beam.Abstractions;
using Beam.Models;
using HtmlAgilityPack;
using File = System.IO.File;
namespace Beam.Downloaders {
///
/// A download managing class that manages a singular download with failure-detection and exponential-backoff retries. This class is safe to instantiate per request.
///
///
///
///
///
public class UnitDownloader(UnitDownloaderOptions options) : IUnitDownloader where RawType : IDocument {
public UnitDownloaderOptions Options { get; } = options;
public HttpClient Client => Options.Client;
public virtual AsyncTransformer Transformer => Options.AsyncTransformer;
public virtual AsyncDownloadFailurePredicate?[]? FailurePredicates =>
Options?.FailurePredicateOptions?.AsyncDownloadFailurePredicates;
public int LinksPerDownload { get; } = 1;
protected virtual async Task DownloadToStream(string url, int bufferSize, Stream destinationStream, IProgress progress,
CancellationToken ct) {
var stream = await Client.GetStreamAsync(url, ct);
byte[] buffer = new byte[bufferSize];
int inBuffer = 0;
long downloaded = 0;
long? remaining() {
try {
return stream.Length - downloaded;
}
catch {
return null;
}
}
while ((inBuffer = stream.Read(buffer)) > 0) {
downloaded += inBuffer;
await destinationStream.WriteAsync(buffer.AsMemory(0, inBuffer), ct);
progress?.Report(new DownloadReport() {
BytesDownloaded = inBuffer,
BytesRemaining = remaining()
});
ct.ThrowIfCancellationRequested();
}
}
protected virtual async Task DownloadToFile(string url, int bufferSize, string path,
IProgress progress, CancellationToken ct) {
if (!Directory.Exists(Path.GetDirectoryName(path)))
throw new InvalidOperationException(
string.Format(Exceptions.Exceptions.unit_download_directory_nonexistant, path));
await using var file = File.OpenWrite(path);
await DownloadToStream(url, bufferSize, file, progress, ct);
}
protected virtual async Task DownloadToMemory(string url, int bufferSize,
IProgress progress, CancellationToken ct) {
await using var ms = new MemoryStream();
await DownloadToStream(url, bufferSize, ms, progress, ct);
if (!ms.TryGetBuffer(out var bytes))
throw new Exception(Exceptions.Exceptions.unit_download_invalid_memory_stream);
return new ByteDocument(url, bytes);
}
protected virtual async Task IsFailure(RawType doc, CancellationToken ct) {
if (FailurePredicates is null)
return false;
if (!(Options?.FailurePredicateOptions?.ProcessInParallel ?? false))
foreach (var pred in FailurePredicates) {
if (pred is null)
continue;
if (await pred(doc))
return true;
}
else {
var failed = false;
await Parallel.ForEachAsync(FailurePredicates, new ParallelOptions() {
MaxDegreeOfParallelism = Options?.FailurePredicateOptions?.ParallelThreads ?? 4,
CancellationToken = ct
},
async (predicate, token) => {
if (token.IsCancellationRequested)
return;
if (failed)
return;
if (predicate == null)
return;
if (await predicate(doc))
Interlocked.CompareExchange(ref failed, true, false);
}
);
return failed;
}
return false;
}
protected virtual async Task _Download(string link, IProgress progress, CancellationToken ct) {
if (Options.DownloadFolder is not null && this is UnitDownloader) {
var path = Path.Combine(Options.DownloadFolder, Path.GetRandomFileName());
await DownloadToFile(link, Options.BufferSize, path, progress, ct);
return (RawType)(object)new StringDocument(link, path);
}
if (this is UnitDownloader) {
return (RawType)(object)(await DownloadToMemory(link, Options.BufferSize, progress, ct));
}
throw new NotSupportedException(Exceptions.Exceptions.unit_downloader_limited_support);
}
protected virtual async Task<(bool, OutType?)> Transform(RawType download, CancellationToken ct) {
try {
if (FailurePredicates is null || !(await IsFailure(download, ct)))
return (true, await Transformer(download));
else
return (false, default);
} catch(Exception) {
return (false, default);
}
}
public async Task<(bool, OutType?)> TryDownload(IOrdered[] link, CancellationToken ct, int maximumRetryCount = 7, IProgress? downProgress = null, IProgress? tryProgress = null) {
if (link.Length == 0)
return (false, default);
downProgress ??= new Progress();
OutType? ot = default;
int tryCount = 0;
while (tryCount < maximumRetryCount) {
ct.ThrowIfCancellationRequested();
var rt = await _Download(link[0].Data, downProgress, ct);
(var success, ot) = await Transform(rt, ct);
if (success && ot != null)
return (true, ot);
++tryCount;
tryProgress?.Report(new RetryReport(tryCount, link[0].Data));
await Task.Delay((int)Math.Pow(2, tryCount) * 1000);
}
return (false, ot);
}
}
}