using Beam.Abstractions; using Beam.Exceptions; using Beam.Models; using HtmlAgilityPack; using Microsoft.Extensions.Logging; namespace Beam.Downloaders { public class UnitFragmentDownloader : IUnitDownloader>> { public UnitFragmentDownloader(HtmlWeb web, AsyncTransformer transformer, AsyncDownloadFailurePredicate?[]? failurePredicate = null, int fragmentSize = 4, ILogger? logger = null, IUnitDownloader? internalDownloader = null) { Web = web; Transformer = transformer; FailurePredicate = failurePredicate; UnitDownloader = internalDownloader ?? new UnitDownloader(Web, Transformer, FailurePredicate); LinksPerDownload = fragmentSize; Logger = logger; } public HtmlWeb Web { get; } public AsyncTransformer Transformer { get; } public AsyncDownloadFailurePredicate?[]? FailurePredicate { get; } public int LinksPerDownload { get; set; } public ILogger? Logger { get; set; } private readonly IUnitDownloader UnitDownloader; async Task<(bool, Fragment>?)> IUnitDownloader>>.TryDownload(IOrdered[] link, CancellationToken ct, int maximumRetryCount, IProgress? tryProgress) { Fragment> fragment = new Fragment>(link.Length); if (!Fragment>.TryAcquireUpdater(fragment, out var updater)) throw new AssertionException(Exceptions.Exceptions.fragment_locked); bool isFailure = false; await Parallel.ForEachAsync(link, async (x, pct) => { pct.ThrowIfCancellationRequested(); ct.ThrowIfCancellationRequested(); var (result, downloadedT) = await UnitDownloader.TryDownload([x], ct, maximumRetryCount, tryProgress); if (!result) { Interlocked.Exchange(ref isFailure, true); Logger?.LogError("Failed to retrieve {0} order={1}", x.Data, x.Order); return; } if (downloadedT == null) { Interlocked.Exchange(ref isFailure, true); Logger?.LogCritical("Failed to retrieve {0} order={1}", x.Data, x.Order); return; } updater(new Ordered(downloadedT, x.Order)); }); if (!isFailure) Fragment>.SetComplete(fragment, true); Fragment>.TryReleaseUpdater(fragment, updater); return (!isFailure, fragment); } } }