using Beam.Abstractions; using Beam.Models; using Microsoft.Extensions.Logging; namespace Beam.Downloaders { public class SequentialFragmentDownloader : SequentialDownloader>> { public SequentialFragmentDownloader( DownloadContext context, Func>>> getUnitDownloader, ILogger? logger = null) : base(context, getUnitDownloader, logger) {} } // public class SequentialChunkDownloader : IAsyncEnumerator>> { // public Fragment> Current { get; protected set; } // public DownloadContext Context { get; } // protected IEnumerator LinksEnumerator; // protected ConcurrentQueue>> DownloadQueue { get; set; } = []; // public int ChunkSize { get; } // private ILogger? Logger => Context.DownloadLogger; // public UnitDownloader GetUnitDownloader() // => new(Context.Web, Context.AsyncTranformer, Context.AsyncFailurePredicates); // public SequentialChunkDownloader(DownloadContext context, int chunkSize) { // Context = context; // LinksEnumerator = Context.Links.GetEnumerator(); // Current = new Fragment>(0); // ChunkSize = chunkSize; // } // public ValueTask DisposeAsync() { // GC.SuppressFinalize(this); // return ValueTask.CompletedTask; // } // protected Task? DownloadsTask = null; // protected virtual async Task ProcessDownloads() { // if (DownloadQueue.IsEmpty) // return true; // if (DownloadsTask is null) { // DownloadsTask = Task.Run(async () => { // if (!DownloadQueue.TryDequeue(out var fragment)) // return true; // no fragments left, likely race condition but return true as technically all items have been downloaded // var unit = GetUnitDownloader(); // instantiates unit downloader per request (okay) // if (!Fragment>.TryAcquireUpdater(fragment, out var updater)) { // gets the add method for the current fragment // Logger?.LogError("Failed to acquire updater for fragment {{{}}}", fragment.GetHashCode()); // return false; // fragment is unsafe to modify // } // try { // var links = Enumerable.Range(0, ChunkSize).Select((x) => { // if (!LinksEnumerator.MoveNext()) // return new Ordered(DocumentSourceLink.InvalidLink, -1); // stops link collection if end-of-links is reached // return new Ordered(LinksEnumerator.Current, x); // }).Where((x) => x.Data != DocumentSourceLink.InvalidLink); // filter invalid links // await Parallel.ForEachAsync(links, async (x, ct) => { // Logger?.LogInformation("Started download for {} order={}", x.Data.Link, x.Order); // var (result, downloadedT) = await unit.TryDownload( // download (parallel) objects // x.Data.Link.ToString(), // use link from links collection (exposed as x) // ct, // use ct provided with method call // tryProgress: Context.RetryReporter); // if (!result) { // download failure (soft because it was detected) // Logger?.LogError("Failed to retrieve {} order={}", x.Data.Link, x.Order); // return; // } // if (downloadedT is null) { // download failure (hard because it was not detected) // Logger?.LogCritical("Failed to retrieve {} order={}", x.Data.Link, x.Order); // return; // } // Logger?.LogInformation("Retrieved {} order={} successfully", x.Data.Link, x.Order); // updater(new Ordered(downloadedT, x.Order)); // update the fragment // }); // Fragment>.SetComplete(fragment, true); // } finally { // Fragment>.TryReleaseUpdater(fragment, updater); // returns updater to allow modification // } // return fragment.Size == fragment.MaxSize; // }); // } // if (DownloadsTask.IsCompleted) { // DownloadsTask = null; // return await ProcessDownloads(); // } // return true; // if task is still processing return should be neither true or false... // } // public async ValueTask MoveNextAsync() { // if (Current.IsComplete && Current.Size < Current.MaxSize) // return false; // if a fragment is marked complete despite being unsaturated, we've run out links! // if (DownloadQueue.Count == 0) { // Current = new Fragment>(ChunkSize); // DownloadQueue.Enqueue(Current); // } // return await ProcessDownloads(); // } // } }