using Beam.Abstractions; using Beam.Models; using HtmlAgilityPack; using File = System.IO.File; namespace Beam.Downloaders { /// /// A download managing class that manages a singular download with failure-detection and exponential-backoff retries. This class is safe to instantiate per request. /// /// /// /// /// public class UnitDownloader(UnitDownloaderOptions options) : IUnitDownloader where RawType : IDocument { public UnitDownloaderOptions Options { get; } = options; public HttpClient Client => Options.Client; public virtual AsyncTransformer Transformer => Options.AsyncTransformer; public virtual AsyncDownloadFailurePredicate?[]? FailurePredicates => Options?.FailurePredicateOptions?.AsyncDownloadFailurePredicates; public int LinksPerDownload { get; } = 1; protected virtual async Task DownloadToStream(string url, int bufferSize, Stream destinationStream, IProgress progress, CancellationToken ct) { var stream = await Client.GetStreamAsync(url, ct); byte[] buffer = new byte[bufferSize]; int inBuffer = 0; long downloaded = 0; long? remaining() { try { return stream.Length - downloaded; } catch { return null; } } while ((inBuffer = stream.Read(buffer)) > 0) { downloaded += inBuffer; await destinationStream.WriteAsync(buffer.AsMemory(0, inBuffer), ct); progress?.Report(new DownloadReport() { BytesDownloaded = inBuffer, BytesRemaining = remaining() }); ct.ThrowIfCancellationRequested(); } } protected virtual async Task DownloadToFile(string url, int bufferSize, string path, IProgress progress, CancellationToken ct) { if (!Directory.Exists(Path.GetDirectoryName(path))) throw new InvalidOperationException( string.Format(Exceptions.Exceptions.unit_download_directory_nonexistant, path)); await using var file = File.OpenWrite(path); await DownloadToStream(url, bufferSize, file, progress, ct); } protected virtual async Task DownloadToMemory(string url, int bufferSize, IProgress progress, CancellationToken ct) { await using var ms = new MemoryStream(); await DownloadToStream(url, bufferSize, ms, progress, ct); if (!ms.TryGetBuffer(out var bytes)) throw new Exception(Exceptions.Exceptions.unit_download_invalid_memory_stream); return new ByteDocument(url, bytes); } protected virtual async Task IsFailure(RawType doc, CancellationToken ct) { if (FailurePredicates is null) return false; if (!(Options?.FailurePredicateOptions?.ProcessInParallel ?? false)) foreach (var pred in FailurePredicates) { if (pred is null) continue; if (await pred(doc)) return true; } else { var failed = false; await Parallel.ForEachAsync(FailurePredicates, new ParallelOptions() { MaxDegreeOfParallelism = Options?.FailurePredicateOptions?.ParallelThreads ?? 4, CancellationToken = ct }, async (predicate, token) => { if (token.IsCancellationRequested) return; if (failed) return; if (predicate == null) return; if (await predicate(doc)) Interlocked.CompareExchange(ref failed, true, false); } ); return failed; } return false; } protected virtual async Task _Download(string link, IProgress progress, CancellationToken ct) { if (Options.DownloadFolder is not null && this is UnitDownloader) { var path = Path.Combine(Options.DownloadFolder, Path.GetRandomFileName()); await DownloadToFile(link, Options.BufferSize, path, progress, ct); return (RawType)(object)new StringDocument(link, path); } if (this is UnitDownloader) { return (RawType)(object)(await DownloadToMemory(link, Options.BufferSize, progress, ct)); } throw new NotSupportedException(Exceptions.Exceptions.unit_downloader_limited_support); } protected virtual async Task<(bool, OutType?)> Transform(RawType download, CancellationToken ct) { try { if (FailurePredicates is null || !(await IsFailure(download, ct))) return (true, await Transformer(download)); else return (false, default); } catch(Exception) { return (false, default); } } public async Task<(bool, OutType?)> TryDownload(IOrdered[] link, CancellationToken ct, int maximumRetryCount = 7, IProgress? downProgress = null, IProgress? tryProgress = null) { if (link.Length == 0) return (false, default); downProgress ??= new Progress(); OutType? ot = default; int tryCount = 0; while (tryCount < maximumRetryCount) { ct.ThrowIfCancellationRequested(); var rt = await _Download(link[0].Data, downProgress, ct); (var success, ot) = await Transform(rt, ct); if (success && ot != null) return (true, ot); ++tryCount; tryProgress?.Report(new RetryReport(tryCount, link[0].Data)); await Task.Delay((int)Math.Pow(2, tryCount) * 1000); } return (false, ot); } } }