using System.Diagnostics.CodeAnalysis; using System.Text; using Beam.Abstractions; using Beam.Models; using HtmlAgilityPack; using File = System.IO.File; namespace Beam.Downloaders { /// /// A download managing class that manages a singular download with failure-detection and exponential-backoff retries. This class is safe to instantiate per request. /// /// /// /// /// public class UnitDownloader(UnitDownloaderOptions options) : IUnitDownloader { public UnitDownloaderOptions Options { get; } = options; public HttpClient Client => Options.Client; public virtual AsyncTransformer Transformer => Options.AsyncTransformer; public virtual AsyncDownloadFailurePredicate?[]? FailurePredicates => Options?.FailurePredicateOptions?.AsyncDownloadFailurePredicates; public int LinksPerDownload { get; } = 1; protected virtual async Task DownloadToStream(string url, int bufferSize, Stream destinationStream, IProgress progress, CancellationToken ct) { var stream = await Client.GetStreamAsync(url, ct); byte[] buffer = new byte[bufferSize]; int inBuffer = 0; long downloaded = 0; long? remaining() { try { return stream.Length - downloaded; } catch { return null; } } while ((inBuffer = stream.Read(buffer)) > 0) { downloaded += inBuffer; await destinationStream.WriteAsync(buffer.AsMemory(0, inBuffer), ct); progress?.Report(new DownloadReport() { BytesDownloaded = inBuffer, BytesRemaining = remaining() }); ct.ThrowIfCancellationRequested(); } } protected virtual async Task DownloadToFile(string url, int bufferSize, string path, IProgress progress, CancellationToken ct) { if (!Directory.Exists(Path.GetDirectoryName(path))) throw new InvalidOperationException( string.Format(Exceptions.Exceptions.unit_download_directory_nonexistant, path)); await using var file = File.OpenWrite(path); await DownloadToStream(url, bufferSize, file, progress, ct); } protected virtual async Task DownloadToMemory(string url, int bufferSize, IProgress progress, CancellationToken ct) { await using var ms = new MemoryStream(); await DownloadToStream(url, bufferSize, ms, progress, ct); if (!ms.TryGetBuffer(out var bytes)) throw new Exception(Exceptions.Exceptions.unit_download_invalid_memory_stream); return new ByteDocument(url, bytes); } protected virtual async Task IsFailure(ByteDocument doc, CancellationToken ct) { if (FailurePredicates is null) return false; if (!(Options?.FailurePredicateOptions?.ProcessInParallel ?? false)) foreach (var pred in FailurePredicates) { if (pred is null) continue; if (await pred(doc)) return true; } else { var failed = false; await Parallel.ForEachAsync(FailurePredicates, new ParallelOptions() { MaxDegreeOfParallelism = Options?.FailurePredicateOptions?.ParallelThreads ?? 4, CancellationToken = ct }, async (predicate, token) => { if (token.IsCancellationRequested) return; if (failed) return; if (predicate == null) return; if (await predicate(doc)) Interlocked.CompareExchange(ref failed, true, false); } ); return failed; } return false; } protected virtual async Task _Download(string link, IProgress progress, CancellationToken ct) { if (Options.DownloadFolder is not null) { var path = Path.Combine(Options.DownloadFolder, options.GetFileNameForDownload(link, [])); await DownloadToFile(link, Options.BufferSize, path, progress, ct); return new ByteDocument(link, Encoding.UTF8.GetBytes(path)); } else { return await DownloadToMemory(link, Options.BufferSize, progress, ct); } } protected virtual async Task<(bool, OutType?)> Transform(ByteDocument download, CancellationToken ct) { try { if (FailurePredicates is null || !(await IsFailure(download, ct))) return (true, await Transformer(download)); else return (false, default); } catch(Exception) { return (false, default); } } public async Task<(bool, OutType?)> TryDownload(IOrdered[] link, CancellationToken ct, int maximumRetryCount = 7, IProgress? downProgress = null, IProgress? tryProgress = null) { if (link.Length == 0) return (false, default); downProgress ??= new Progress(); if (ShouldSkip(link[0].Data, out var defaultType)) return (true, defaultType); OutType? ot = default; int tryCount = 0; while (tryCount < maximumRetryCount) { ct.ThrowIfCancellationRequested(); var rt = await _Download(link[0].Data, downProgress, ct); (var success, ot) = await Transform(rt, ct); if (success && ot != null) return (true, ot); ++tryCount; tryProgress?.Report(new RetryReport(tryCount, link[0].Data)); await Task.Delay((int)Math.Pow(2, tryCount) * 1000); } return (false, ot); } private bool ShouldSkip(string link, [NotNullWhen(true)] out OutType? outType) { outType = default; if (Options.SkipPredicateOptions?.SkipPredicates is null) return false; if (!Options.SkipPredicateOptions.ProcessInParallel) foreach (var pred in Options.SkipPredicateOptions.SkipPredicates) { if (pred is null) continue; if (pred(link, out outType)) return true; } else { var shouldSkip = false; OutType? _outType = default; Parallel.ForEach(Options.SkipPredicateOptions.SkipPredicates, new ParallelOptions() { MaxDegreeOfParallelism = Options?.FailurePredicateOptions?.ParallelThreads ?? 4 }, (predicate, parallelLoopState) => { if (parallelLoopState.ShouldExitCurrentIteration) return; if (predicate == null) return; if (predicate(link, out var _innerLoopOutType)) { Interlocked.CompareExchange(ref shouldSkip, true, false); Interlocked.CompareExchange(ref _outType, _innerLoopOutType, default); parallelLoopState.Break(); } } ); outType = _outType; return shouldSkip; } return false; } } }