diff --git a/Beam.Abstractions/Beam.Abstractions.csproj b/Beam.Abstractions/Beam.Abstractions.csproj
index cfe70e6..994b4c0 100644
--- a/Beam.Abstractions/Beam.Abstractions.csproj
+++ b/Beam.Abstractions/Beam.Abstractions.csproj
@@ -8,7 +8,7 @@
-
+
diff --git a/Beam.Abstractions/IDownloadReport.cs b/Beam.Abstractions/IDownloadReport.cs
index 615b92f..3782238 100644
--- a/Beam.Abstractions/IDownloadReport.cs
+++ b/Beam.Abstractions/IDownloadReport.cs
@@ -1,3 +1,6 @@
namespace Beam.Abstractions;
-public interface IDownloadReport { }
\ No newline at end of file
+public interface IDownloadReport {
+ long BytesDownloaded { get; init; }
+ long? BytesRemaining { get; init; }
+}
\ No newline at end of file
diff --git a/Beam.Abstractions/IUnitDownloader.cs b/Beam.Abstractions/IUnitDownloader.cs
index 736197b..c3e1daf 100644
--- a/Beam.Abstractions/IUnitDownloader.cs
+++ b/Beam.Abstractions/IUnitDownloader.cs
@@ -3,5 +3,5 @@ namespace Beam.Abstractions;
public interface IUnitDownloader {
public int LinksPerDownload { get; }
- public Task<(bool, T?)> TryDownload(IOrdered[] link, CancellationToken ct, int maximumRetryCount = 7, IProgress? tryProgress = null);
+ public Task<(bool, T?)> TryDownload(IOrdered[] link, CancellationToken ct, int maximumRetryCount = 7, IProgress? downProgress = null, IProgress? tryProgress = null);
}
\ No newline at end of file
diff --git a/Beam.Downloaders/Beam.Downloaders.csproj b/Beam.Downloaders/Beam.Downloaders.csproj
index 3e5ee31..ab69f30 100644
--- a/Beam.Downloaders/Beam.Downloaders.csproj
+++ b/Beam.Downloaders/Beam.Downloaders.csproj
@@ -14,6 +14,7 @@
+
diff --git a/Beam.Downloaders/SequentialDownloader.cs b/Beam.Downloaders/SequentialDownloader.cs
index c9f586e..bc35f16 100644
--- a/Beam.Downloaders/SequentialDownloader.cs
+++ b/Beam.Downloaders/SequentialDownloader.cs
@@ -58,6 +58,7 @@ namespace Beam.Downloaders {
var (result, downloadedT) = await unit.TryDownload(
links.ToArray(),
Context.CancellationToken,
+ downProgress: Context.DownloadReporter,
tryProgress: Context.RetryReporter);
if (!result) {
diff --git a/Beam.Downloaders/UnitDownloader.cs b/Beam.Downloaders/UnitDownloader.cs
index a2c192d..2d1c774 100644
--- a/Beam.Downloaders/UnitDownloader.cs
+++ b/Beam.Downloaders/UnitDownloader.cs
@@ -1,6 +1,7 @@
using Beam.Abstractions;
using Beam.Models;
using HtmlAgilityPack;
+using File = System.IO.File;
namespace Beam.Downloaders {
///
@@ -10,34 +11,104 @@ namespace Beam.Downloaders {
///
///
///
- public class UnitDownloader(HtmlWeb web, AsyncTransformer transformer, AsyncDownloadFailurePredicate?[]? failurePredicate = null) : IUnitDownloader {
- public HtmlWeb Web { get; } = web;
- public virtual AsyncTransformer Transformer { get; } = transformer;
- public virtual AsyncDownloadFailurePredicate?[]? FailurePredicates { get; } = failurePredicate;
+ public class UnitDownloader(UnitDownloaderOptions options) : IUnitDownloader where RawType : IDocument {
+ public UnitDownloaderOptions Options { get; } = options;
+ public HttpClient Client => Options.Client;
+ public virtual AsyncTransformer Transformer => Options.AsyncTransformer;
+
+ public virtual AsyncDownloadFailurePredicate?[]? FailurePredicates =>
+ Options?.FailurePredicateOptions?.AsyncDownloadFailurePredicates;
public int LinksPerDownload { get; } = 1;
- protected virtual async Task IsFailure(HtmlDocument doc) {
- if (FailurePredicates is null)
- return false;
- var failed = false;
- await Parallel.ForEachAsync(FailurePredicates, async (x, ct) => {
- if (failed == true)
- return;
- if (x is null)
- return;
- if (await x(doc))
- failed = true;
- });
+ protected virtual async Task DownloadToStream(string url, int bufferSize, Stream destinationStream, IProgress progress,
+ CancellationToken ct) {
- return failed;
+ var stream = await Client.GetStreamAsync(url, ct);
+ byte[] buffer = new byte[bufferSize];
+ int inBuffer = 0;
+ long downloaded = 0;
+ while ((inBuffer = stream.Read(buffer)) > 0) {
+ downloaded += inBuffer;
+ await destinationStream.WriteAsync(buffer.AsMemory(0, inBuffer), ct);
+ progress?.Report(new DownloadReport() {
+ BytesDownloaded = inBuffer,
+ BytesRemaining = stream.Length - downloaded
+ });
+
+ ct.ThrowIfCancellationRequested();
+ }
}
- protected virtual async Task<(bool, T?)> TryDownloadWithNoRetries(string link, CancellationToken ct) {
+ protected virtual async Task DownloadToFile(string url, int bufferSize, string path,
+ IProgress progress, CancellationToken ct) {
+
+ if (!Directory.Exists(Path.GetDirectoryName(path)))
+ throw new InvalidOperationException(
+ string.Format(Exceptions.Exceptions.unit_download_directory_nonexistant, path));
+ await using var file = File.OpenWrite(path);
+ await DownloadToStream(url, bufferSize, file, progress, ct);
+ }
+
+ protected virtual async Task DownloadToMemory(string url, int bufferSize,
+ IProgress progress, CancellationToken ct) {
+
+ await using var ms = new MemoryStream();
+ await DownloadToStream(url, bufferSize, ms, progress, ct);
+ if (!ms.TryGetBuffer(out var bytes))
+ throw new Exception(Exceptions.Exceptions.unit_download_invalid_memory_stream);
+ return new ByteDocument(url, bytes);
+ }
+
+ protected virtual async Task IsFailure(RawType doc, CancellationToken ct) {
+ if (FailurePredicates is null)
+ return false;
+ if (!(Options?.FailurePredicateOptions?.ProcessInParallel ?? false))
+ foreach (var pred in FailurePredicates) {
+ if (pred is null)
+ continue;
+ if (await pred(doc))
+ return true;
+ }
+ else {
+ var failed = false;
+ await Parallel.ForEachAsync(FailurePredicates, new ParallelOptions() {
+ MaxDegreeOfParallelism = Options?.FailurePredicateOptions?.ParallelThreads ?? 4,
+ CancellationToken = ct
+ },
+ async (predicate, token) => {
+ if (token.IsCancellationRequested)
+ return;
+ if (failed)
+ return;
+ if (predicate == null)
+ return;
+ if (await predicate(doc))
+ Interlocked.CompareExchange(ref failed, true, false);
+ }
+ );
+ return failed;
+ }
+
+ return false;
+ }
+
+ protected virtual async Task _Download(string link, IProgress progress, CancellationToken ct) {
+ if (Options.DownloadFolder is not null && this is UnitDownloader) {
+ var path = Path.Combine(Options.DownloadFolder, Path.GetRandomFileName());
+ await DownloadToFile(link, Options.BufferSize, path, progress, ct);
+ return (RawType)(object)new StringDocument(link, path);
+ }
+ if (this is UnitDownloader) {
+ return (RawType)(object)(await DownloadToMemory(link, Options.BufferSize, progress, ct));
+ }
+ throw new NotSupportedException(Exceptions.Exceptions.unit_downloader_limited_support);
+ }
+
+ protected virtual async Task<(bool, OutType?)> Transform(RawType download, CancellationToken ct) {
try {
- var html = await Web.LoadFromWebAsync(link, ct);
- if (FailurePredicates is null || !(await IsFailure(html)))
- return (true, await Transformer(html));
+ if (FailurePredicates is null || !(await IsFailure(download, ct)))
+ return (true, await Transformer(download));
else
return (false, default);
} catch(Exception) {
@@ -45,23 +116,26 @@ namespace Beam.Downloaders {
}
}
- public async Task<(bool, T?)> TryDownload(IOrdered[] link, CancellationToken ct, int maximumRetryCount = 7, IProgress? tryProgress = null) {
+ public async Task<(bool, OutType?)> TryDownload(IOrdered[] link, CancellationToken ct, int maximumRetryCount = 7, IProgress? downProgress = null, IProgress? tryProgress = null) {
if (link.Length == 0)
return (false, default);
- T? doc = default;
+ downProgress ??= new Progress();
+
+ OutType? ot = default;
int tryCount = 0;
while (tryCount < maximumRetryCount) {
ct.ThrowIfCancellationRequested();
- (var success, doc) = await TryDownloadWithNoRetries(link[0].Data, ct);
- if (success && doc != null)
- return (true, doc);
+ var rt = await _Download(link[0].Data, downProgress, ct);
+ (var success, ot) = await Transform(rt, ct);
+ if (success && ot != null)
+ return (true, ot);
++tryCount;
tryProgress?.Report(new RetryReport(tryCount, link[0].Data));
await Task.Delay((int)Math.Pow(2, tryCount) * 1000);
}
- return (false, doc);
+ return (false, ot);
}
}
}
diff --git a/Beam.Downloaders/UnitDownloaderBinary.cs b/Beam.Downloaders/UnitDownloaderBinary.cs
deleted file mode 100644
index f32395e..0000000
--- a/Beam.Downloaders/UnitDownloaderBinary.cs
+++ /dev/null
@@ -1,74 +0,0 @@
-using Beam.Abstractions;
-using Beam.Models;
-
-namespace Beam.Downloaders {
- ///
- /// A download-managing class that retrieves binary data through ,
- /// applies an , and supports failure detection
- /// plus exponential-back-off retries. Safe to instantiate per request.
- ///
- public class UnitDownloaderBinary(
- HttpClient client,
- AsyncTransformer transformer,
- AsyncDownloadFailurePredicate?[]? failurePredicates = null)
- : IUnitDownloader {
- public HttpClient Client { get; } = client;
- public virtual AsyncTransformer Transformer { get; } = transformer;
- public virtual AsyncDownloadFailurePredicate?[]? FailurePredicates { get; } = failurePredicates;
-
- public int LinksPerDownload { get; } = 1;
-
- /// Runs all configured failure predicates in parallel on the raw HTTP response.
- protected virtual async Task IsFailure(ByteDocument response) {
- if (FailurePredicates is null) return false;
-
- var failed = false;
- await Parallel.ForEachAsync(FailurePredicates, async (pred, ct) => {
- if (failed || pred is null) return;
- if (await pred(response))
- failed = true;
- });
- return failed;
- }
-
- /// One attempt without retries or back-off.
- protected virtual async Task<(bool Success, T? Result)> TryDownloadWithNoRetries(string link, CancellationToken ct) {
- try {
- using var response = await Client.GetAsync(link, HttpCompletionOption.ResponseHeadersRead, ct);
- if (!response.IsSuccessStatusCode) return (false, default);
-
- var bytes = await response.Content.ReadAsByteArrayAsync(ct);
- var doc = new ByteDocument(link, bytes);
- if (await IsFailure(doc)) return (false, default);
-
- return (true, await Transformer(doc));
- } catch {
- return (false, default);
- }
- }
-
- public async Task<(bool, T?)> TryDownload(
- IOrdered[] link,
- CancellationToken ct,
- int maximumRetryCount = 7,
- IProgress? tryProgress = null) {
- if (link.Length == 0) return (false, default);
-
- T? result = default;
- var attempt = 0;
-
- while (attempt < maximumRetryCount) {
- ct.ThrowIfCancellationRequested();
-
- (var success, result) = await TryDownloadWithNoRetries(link[0].Data, ct);
- if (success && result is not null) return (true, result);
-
- ++attempt;
- tryProgress?.Report(new RetryReport(attempt, link[0].Data));
- await Task.Delay((int)Math.Pow(2, attempt) * 1000, ct);
- }
-
- return (false, result);
- }
- }
-}
diff --git a/Beam.Downloaders/UnitDownloaderOptions.cs b/Beam.Downloaders/UnitDownloaderOptions.cs
new file mode 100644
index 0000000..dc99555
--- /dev/null
+++ b/Beam.Downloaders/UnitDownloaderOptions.cs
@@ -0,0 +1,198 @@
+using Beam.Models;
+
+namespace Beam.Downloaders;
+
+public record class UnitDownloaderOptions {
+ public HttpClient Client { get; init; } = new();
+
+ public FailurePredicateOptions? FailurePredicateOptions { get; init; }
+ public FragmentOptions? FragmentOptions { get; init; }
+ public required AsyncTransformer AsyncTransformer { get; init; }
+ public string? DownloadFolder { get; init; } = null;
+ public int BufferSize { get; init; } = 80 * 1024; // 80kb
+}
+
+public record class FailurePredicateOptions {
+ public required AsyncDownloadFailurePredicate?[]? AsyncDownloadFailurePredicates { get; init; }
+ public bool ProcessInParallel { get; init; } = false;
+ public int? ParallelThreads { get; init; }
+}
+
+public record class FragmentOptions {
+ public required int FragmentSize { get; init; }
+ public bool DownloadInParallel { get; init; } = false;
+ public int? ParallelThreads { get; init; }
+}
+
+
+ // ---------- UnitDownloaderOptions Builder ----------
+ public sealed class UnitDownloaderOptionsBuilder
+ {
+ private HttpClient _client = new HttpClient();
+ private FailurePredicateOptions? _failureOptions;
+ private FragmentOptions? _fragmentOptions;
+ private AsyncTransformer? _asyncTransformer;
+ private string? _downloadFolder = null;
+ private int _bufferSize = 80 * 1024;
+
+ public UnitDownloaderOptionsBuilder WithClient(HttpClient client)
+ {
+ _client = client ?? throw new System.ArgumentNullException(nameof(client));
+ return this;
+ }
+
+ public UnitDownloaderOptionsBuilder WithFailurePredicateOptions(FailurePredicateOptions? options)
+ {
+ _failureOptions = options;
+ return this;
+ }
+
+ public UnitDownloaderOptionsBuilder WithFailurePredicates(System.Action> configure)
+ {
+ if (configure == null) throw new System.ArgumentNullException(nameof(configure));
+ var b = new FailurePredicateOptionsBuilder();
+ configure(b);
+ _failureOptions = b.Build();
+ return this;
+ }
+
+ public UnitDownloaderOptionsBuilder WithFragmentOptions(FragmentOptions? options)
+ {
+ _fragmentOptions = options;
+ return this;
+ }
+
+ public UnitDownloaderOptionsBuilder WithFragments(System.Action configure)
+ {
+ if (configure == null) throw new System.ArgumentNullException(nameof(configure));
+ var b = new FragmentOptionsBuilder();
+ configure(b);
+ _fragmentOptions = b.Build();
+ return this;
+ }
+
+ public UnitDownloaderOptionsBuilder WithAsyncTransformer(AsyncTransformer transformer)
+ {
+ _asyncTransformer = transformer ?? throw new System.ArgumentNullException(nameof(transformer));
+ return this;
+ }
+
+ public UnitDownloaderOptionsBuilder WithDownloadFolder(string? downloadFolder)
+ {
+ _downloadFolder = downloadFolder;
+ return this;
+ }
+
+ public UnitDownloaderOptionsBuilder WithBufferSize(int bytes)
+ {
+ if (bytes <= 0) throw new System.ArgumentOutOfRangeException(nameof(bytes));
+ _bufferSize = bytes;
+ return this;
+ }
+
+ public UnitDownloaderOptions Build()
+ {
+ if (_asyncTransformer == null)
+ throw new System.InvalidOperationException("AsyncTransformer must be provided.");
+
+ return new UnitDownloaderOptions
+ {
+ Client = _client,
+ FailurePredicateOptions = _failureOptions,
+ FragmentOptions = _fragmentOptions,
+ AsyncTransformer = _asyncTransformer,
+ DownloadFolder = _downloadFolder,
+ BufferSize = _bufferSize
+ };
+ }
+ }
+
+ // ---------- FailurePredicateOptions Builder ----------
+ public sealed class FailurePredicateOptionsBuilder
+ {
+ private readonly System.Collections.Generic.List?> _predicates =
+ new System.Collections.Generic.List?>();
+ private bool _processInParallel = false;
+ private int? _parallelThreads = null;
+
+ public FailurePredicateOptionsBuilder WithPredicate(AsyncDownloadFailurePredicate? predicate)
+ {
+ _predicates.Add(predicate);
+ return this;
+ }
+
+ public FailurePredicateOptionsBuilder WithPredicates(System.Collections.Generic.IEnumerable?> predicates)
+ {
+ if (predicates == null) throw new System.ArgumentNullException(nameof(predicates));
+ _predicates.AddRange(predicates);
+ return this;
+ }
+
+ public FailurePredicateOptionsBuilder WithPredicates(params AsyncDownloadFailurePredicate?[] predicates)
+ {
+ _predicates.Clear();
+ if (predicates != null) _predicates.AddRange(predicates);
+ return this;
+ }
+
+ public FailurePredicateOptionsBuilder WithProcessInParallel(bool value = true)
+ {
+ _processInParallel = value;
+ return this;
+ }
+
+ public FailurePredicateOptionsBuilder WithParallelThreads(int? threads)
+ {
+ if (threads.HasValue && threads.Value <= 0)
+ throw new System.ArgumentOutOfRangeException(nameof(threads));
+ _parallelThreads = threads;
+ return this;
+ }
+
+ public FailurePredicateOptions Build()
+ {
+ var arr = _predicates.Count == 0 ? [] : _predicates.ToArray();
+ return new FailurePredicateOptions
+ {
+ AsyncDownloadFailurePredicates = arr,
+ ProcessInParallel = _processInParallel,
+ ParallelThreads = _parallelThreads
+ };
+ }
+ }
+
+ // ---------- FragmentOptions Builder ----------
+ public sealed class FragmentOptionsBuilder {
+ private int? _fragmentSize;
+ private bool _downloadInParallel = false;
+ private int? _parallelThreads = null;
+
+ public FragmentOptionsBuilder WithFragmentSize(int bytes) {
+ if (bytes <= 0) throw new System.ArgumentOutOfRangeException(nameof(bytes));
+ _fragmentSize = bytes;
+ return this;
+ }
+
+ public FragmentOptionsBuilder WithDownloadInParallel(bool value = true) {
+ _downloadInParallel = value;
+ return this;
+ }
+
+ public FragmentOptionsBuilder WithParallelThreads(int? threads) {
+ if (threads.HasValue && threads.Value <= 0)
+ throw new System.ArgumentOutOfRangeException(nameof(threads));
+ _parallelThreads = threads;
+ return this;
+ }
+
+ public FragmentOptions Build() {
+ if (!_fragmentSize.HasValue)
+ throw new System.InvalidOperationException("FragmentSize must be provided.");
+
+ return new FragmentOptions {
+ FragmentSize = _fragmentSize.Value,
+ DownloadInParallel = _downloadInParallel,
+ ParallelThreads = _parallelThreads
+ };
+ }
+ }
diff --git a/Beam.Downloaders/UnitFragmentDownloader.cs b/Beam.Downloaders/UnitFragmentDownloader.cs
index c834ab3..a399128 100644
--- a/Beam.Downloaders/UnitFragmentDownloader.cs
+++ b/Beam.Downloaders/UnitFragmentDownloader.cs
@@ -5,55 +5,39 @@ using HtmlAgilityPack;
using Microsoft.Extensions.Logging;
namespace Beam.Downloaders {
- public class UnitFragmentDownloader : IUnitDownloader>> {
- public UnitFragmentDownloader(HtmlWeb web,
- AsyncTransformer transformer,
- AsyncDownloadFailurePredicate?[]? failurePredicate = null,
- int fragmentSize = 4,
- ILogger? logger = null,
- IUnitDownloader? internalDownloader = null) {
- Web = web;
- Transformer = transformer;
- FailurePredicate = failurePredicate;
- UnitDownloader = internalDownloader ?? new UnitDownloader(Web, Transformer, FailurePredicate);
- LinksPerDownload = fragmentSize;
- Logger = logger;
- }
+ public class UnitFragmentDownloader(UnitDownloaderOptions options,
+ IUnitDownloader? internalDownloader = null) : IUnitDownloader>> where RawType : IDocument {
- public HtmlWeb Web { get; }
- public AsyncTransformer Transformer { get; }
- public AsyncDownloadFailurePredicate?[]? FailurePredicate { get; }
+ public UnitDownloaderOptions Options { get; } = options;
public int LinksPerDownload { get; set; }
- public ILogger? Logger { get; set; }
+ private IUnitDownloader UnitDownloader { get; } = internalDownloader ?? new UnitDownloader(options);
- private readonly IUnitDownloader UnitDownloader;
-
- async Task<(bool, Fragment>?)> IUnitDownloader>>.TryDownload(IOrdered[] link, CancellationToken ct, int maximumRetryCount, IProgress? tryProgress) {
- Fragment> fragment = new Fragment>(link.Length);
- if (!Fragment>.TryAcquireUpdater(fragment, out var updater))
+ async Task<(bool, Fragment>?)> IUnitDownloader>>.TryDownload(IOrdered[] link, CancellationToken ct, int maximumRetryCount, IProgress? downProgress, IProgress? tryProgress) {
+ Fragment> fragment = new Fragment>(link.Length);
+ if (!Fragment>.TryAcquireUpdater(fragment, out var updater))
throw new AssertionException(Exceptions.Exceptions.fragment_locked);
bool isFailure = false;
await Parallel.ForEachAsync(link, async (x, pct) => {
pct.ThrowIfCancellationRequested();
ct.ThrowIfCancellationRequested();
- var (result, downloadedT) = await UnitDownloader.TryDownload([x], ct, maximumRetryCount, tryProgress);
+ if (isFailure)
+ return;
+ var (result, downloadedT) = await UnitDownloader.TryDownload([x], ct, maximumRetryCount, downProgress, tryProgress);
if (!result) {
Interlocked.Exchange(ref isFailure, true);
- Logger?.LogError("Failed to retrieve {0} order={1}", x.Data, x.Order);
return;
}
if (downloadedT == null) {
Interlocked.Exchange(ref isFailure, true);
- Logger?.LogCritical("Failed to retrieve {0} order={1}", x.Data, x.Order);
return;
}
- updater(new Ordered(downloadedT, x.Order));
+ updater(new Ordered(downloadedT, x.Order));
});
if (!isFailure)
- Fragment>.SetComplete(fragment, true);
+ Fragment>.SetComplete(fragment, true);
- Fragment>.TryReleaseUpdater(fragment, updater);
+ Fragment>.TryReleaseUpdater(fragment, updater);
return (!isFailure, fragment);
diff --git a/Beam.Downloaders/UnitFragmentDownloaderBinary.cs b/Beam.Downloaders/UnitFragmentDownloaderBinary.cs
deleted file mode 100644
index f5c687d..0000000
--- a/Beam.Downloaders/UnitFragmentDownloaderBinary.cs
+++ /dev/null
@@ -1,74 +0,0 @@
-using Beam.Abstractions;
-using Beam.Exceptions;
-using Beam.Models;
-using Microsoft.Extensions.Logging;
-
-namespace Beam.Downloaders {
- ///
- /// Groups multiple binary downloads into a single Fragment, applying
- /// failure detection and exponential-back-off retries for each link.
- ///
- public class UnitFragmentDownloaderBinary
- : IUnitDownloader>> {
- public UnitFragmentDownloaderBinary(HttpClient client,
- AsyncTransformer transformer,
- AsyncDownloadFailurePredicate?[]? failurePredicate = null,
- int fragmentSize = 4,
- ILogger? logger = null,
- IUnitDownloader? internalDownloader = null) {
- Client = client;
- Transformer = transformer;
- FailurePredicate = failurePredicate;
- UnitDownloader = internalDownloader
- ?? new UnitDownloaderBinary(Client, Transformer, FailurePredicate);
- LinksPerDownload = fragmentSize;
- Logger = logger;
- }
-
- public HttpClient Client { get; }
- public AsyncTransformer Transformer { get; }
- public AsyncDownloadFailurePredicate?[]? FailurePredicate { get; }
- public int LinksPerDownload { get; set; }
- public ILogger? Logger { get; set; }
-
- private readonly IUnitDownloader UnitDownloader;
-
- async Task<(bool, Fragment>?)> IUnitDownloader>>.TryDownload(
- IOrdered[] link,
- CancellationToken ct,
- int maximumRetryCount,
- IProgress? tryProgress) {
- var fragment = new Fragment>(link.Length);
- if (!Fragment>.TryAcquireUpdater(fragment, out var updater))
- throw new AssertionException(Exceptions.Exceptions.fragment_locked);
-
- var isFailure = false;
-
- await Parallel.ForEachAsync(link, async (orderedLink, pct) => {
- pct.ThrowIfCancellationRequested();
- ct.ThrowIfCancellationRequested();
-
- var (success, downloaded) =
- await UnitDownloader.TryDownload([orderedLink],
- ct,
- maximumRetryCount,
- tryProgress);
-
- if (!success || downloaded is null) {
- Interlocked.Exchange(ref isFailure, true);
- Logger?.LogError("Failed to retrieve {Link} order={Order}",
- orderedLink.Data, orderedLink.Order);
- return;
- }
-
- updater(new Ordered(downloaded, orderedLink.Order));
- });
-
- if (!isFailure)
- Fragment>.SetComplete(fragment, true);
-
- Fragment>.TryReleaseUpdater(fragment, updater);
- return (!isFailure, fragment);
- }
- }
-}
diff --git a/Beam.Dynamic/Beam.Dynamic.csproj b/Beam.Dynamic/Beam.Dynamic.csproj
index 3d6d4c8..4f3aaeb 100644
--- a/Beam.Dynamic/Beam.Dynamic.csproj
+++ b/Beam.Dynamic/Beam.Dynamic.csproj
@@ -6,7 +6,7 @@
enable
-
+
diff --git a/Beam.Exceptions/Exceptions.Designer.cs b/Beam.Exceptions/Exceptions.Designer.cs
index 5b5cfc9..27d1901 100644
--- a/Beam.Exceptions/Exceptions.Designer.cs
+++ b/Beam.Exceptions/Exceptions.Designer.cs
@@ -157,5 +157,32 @@ namespace Beam.Exceptions {
return ResourceManager.GetString("state_change_error", resourceCulture);
}
}
+
+ ///
+ /// Looks up a localized string similar to Could not open a filestream to a non-existant directory '{0}'..
+ ///
+ public static string unit_download_directory_nonexistant {
+ get {
+ return ResourceManager.GetString("unit_download_directory_nonexistant", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to The memory stream was created with an invisible inner byte array..
+ ///
+ public static string unit_download_invalid_memory_stream {
+ get {
+ return ResourceManager.GetString("unit_download_invalid_memory_stream", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to The base unit downloader class only supports RawType's of string and ByteDocument..
+ ///
+ public static string unit_downloader_limited_support {
+ get {
+ return ResourceManager.GetString("unit_downloader_limited_support", resourceCulture);
+ }
+ }
}
}
diff --git a/Beam.Exceptions/Exceptions.resx b/Beam.Exceptions/Exceptions.resx
index 0adb3c5..c80b7ad 100644
--- a/Beam.Exceptions/Exceptions.resx
+++ b/Beam.Exceptions/Exceptions.resx
@@ -51,4 +51,13 @@
There must be at least one state in resource definition.
+
+ Could not open a filestream to a non-existant directory '{0}'.
+
+
+ The memory stream was created with an invisible inner byte array.
+
+
+ The base unit downloader class only supports RawType's of string and ByteDocument.
+
\ No newline at end of file
diff --git a/Beam.Fluent/Beam.Fluent.csproj b/Beam.Fluent/Beam.Fluent.csproj
index 2d6a5ab..5ccd49d 100644
--- a/Beam.Fluent/Beam.Fluent.csproj
+++ b/Beam.Fluent/Beam.Fluent.csproj
@@ -6,12 +6,11 @@
enable
-
+
-
diff --git a/Beam.Fluent/ContextStage.cs b/Beam.Fluent/ContextStage.cs
index be079d9..c387c04 100644
--- a/Beam.Fluent/ContextStage.cs
+++ b/Beam.Fluent/ContextStage.cs
@@ -8,13 +8,14 @@ using Beam.Downloaders;
namespace Beam.Fluent;
-internal sealed class ContextStage : IContextStage {
+internal sealed class ContextStage : IContextStage where RawType : IDocument {
private readonly DownloadContextBuilder _ctxBuilder;
private readonly AsyncTransformer _transformer;
private FragmentMode _fragmentMode = FragmentMode.Single;
private Channel _channel = Channel.Plain;
private readonly ContentKind _contentKind;
private int _parallelism = 4;
+ private UnitDownloaderOptionsBuilder _optionsBuilder = new();
// ──────────────── playwright ────────────────
private PlaywrightAsyncManipulator? _playwrightManipulator = null;
@@ -31,12 +32,15 @@ internal sealed class ContextStage : IContextStage => ContentKind.Html,
+ AsyncTransformer => ContentKind.File,
AsyncTransformer => ContentKind.Binary,
_ => throw new ArgumentException(string.Format(Exceptions.Exceptions.fluent_unsupported_transformer,
transformer.GetType()
.AsUniqueName()))
};
+
+ _optionsBuilder
+ .WithAsyncTransformer(_transformer);
}
public IContextStage Configure(Action> configure) {
@@ -44,6 +48,12 @@ internal sealed class ContextStage : IContextStage ConfigureUnitDownloaderOptions(
+ Action> configure) {
+ configure(_optionsBuilder);
+ return this;
+ }
+
public IContextStage WithParallelism(int degree) {
_parallelism = Math.Max(1, degree);
return this;
@@ -108,14 +118,14 @@ internal sealed class ContextStage : IContextStage HtmlTransformer()
- => To>(_transformer);
+ AsyncTransformer FileTransformer()
+ => To>(_transformer);
AsyncTransformer ByteTransformer()
=> To>(_transformer);
- AsyncDownloadFailurePredicate[] HtmlFailurePredicates()
- => To[]>(context.AsyncFailurePredicates);
+ AsyncDownloadFailurePredicate[] FileFailurePredicates()
+ => To[]>(context.AsyncFailurePredicates);
AsyncDownloadFailurePredicate[] ByteFailurePredicates()
=> To[]>(context.AsyncFailurePredicates);
@@ -125,82 +135,39 @@ internal sealed class ContextStage : IContextStage x.WithPredicates(context.AsyncFailurePredicates));
+ var options = _optionsBuilder
+ .WithClient(context.Client)
+ .Build();
+
return (_channel, _fragmentMode, _contentKind) switch {
- // ──────────────── fragmented HTML ────────────────
- (Channel.Plain, FragmentMode.Fragmented, ContentKind.Html)
- => new UnitFragmentDownloader(
- context.Web,
- HtmlTransformer(),
- HtmlFailurePredicates(),
- _parallelism,
- context.DownloadLogger),
- // ──────────────── fragmented binary ────────────────
- (Channel.Plain, FragmentMode.Fragmented, ContentKind.Binary)
- => new UnitFragmentDownloaderBinary(
- context.Client,
- ByteTransformer(),
- ByteFailurePredicates(),
- _parallelism,
- context.DownloadLogger),
- // ──────────────── single HTML ────────────────
- (Channel.Plain, FragmentMode.Single, ContentKind.Html)
- => new UnitDownloader(
- context.Web,
- HtmlTransformer(),
- HtmlFailurePredicates()),
- // ──────────────── single binary ────────────────
- (Channel.Plain, FragmentMode.Single, ContentKind.Binary)
- => new UnitDownloaderBinary(
- context.Client,
- ByteTransformer(),
- ByteFailurePredicates()),
- // ──────────────── single playwright binary ────────────────
- (Channel.Playwright, FragmentMode.Single, ContentKind.Binary)
- => new PlaywrightUnitDownloader(
- context.Client,
- EnsureExists(_playwrightManipulator),
- ByteTransformer(),
- ByteFailurePredicates()
- ),
- // ──────────────── single playwrigt HTML ────────────────
- (Channel.Playwright, FragmentMode.Single, ContentKind.Html)
- => new PlaywrightUnitPageDownloader(
- context.Web,
- EnsureExists(_playwrightManipulator),
- HtmlTransformer(),
- HtmlFailurePredicates()),
- // ──────────────── single stealth HTML ────────────────
- (Channel.Stealth, FragmentMode.Single, ContentKind.Html)
- => new StealthUnitPageDownloader(
- context.Web,
- EnsureExists(_stealthConfig),
- EnsureExists(_stealthManipulator),
- HtmlTransformer(),
- HtmlFailurePredicates()),
- // ──────────────── single stealth binary ────────────────
+ // ──────────────── fragmented ────────────────
+ (Channel.Plain, FragmentMode.Fragmented, _)
+ => new UnitFragmentDownloader(options),
+ // ──────────────── single ────────────────
+ (Channel.Plain, FragmentMode.Single, _)
+ => new UnitDownloader(options),
+ // ──────────────── single playwright ────────────────
+ (Channel.Playwright, FragmentMode.Single, _)
+ => new PlaywrightUnitDownloader(options, EnsureExists(_playwrightManipulator)),
+ // ──────────────── single stealth file ────────────────
+ (Channel.Stealth, FragmentMode.Single, ContentKind.File)
+ => new StealthUnitPageDownloader(options, EnsureExists(_stealthConfig), EnsureExists(_stealthManipulator)),
+ // ──────────────── single stealth binary ────────────────
(Channel.Stealth, FragmentMode.Single, ContentKind.Binary)
- => new StealthUnitDownloader(
- context.Client,
+ => new StealthUnitDownloader(options, EnsureExists(_stealthConfig), EnsureExists(_stealthManipulator)),
+ // ──────────────── fragment stealth file ────────────────
+ (Channel.Stealth, FragmentMode.Fragmented, ContentKind.File)
+ => new StealthFragmentPageDownloader(options,
EnsureExists(_stealthConfig),
- EnsureExists(_stealthManipulator),
- ByteTransformer(),
- ByteFailurePredicates()),
- // ──────────────── fragment stealth HTML ────────────────
- (Channel.Stealth, FragmentMode.Fragmented, ContentKind.Html)
- => new StealthFragmentPageDownloader(
- context.Web,
- EnsureExists(_stealthConfig),
- EnsureExists(_stealthManipulator),
- HtmlTransformer(),
- HtmlFailurePredicates()),
+ EnsureExists(_stealthManipulator)),
// ──────────────── fragment stealth binary ────────────────
(Channel.Stealth, FragmentMode.Fragmented, ContentKind.Binary)
- => new StealthFragmentDownloader(
- context.Client,
+ => new StealthFragmentDownloader(options,
EnsureExists(_stealthConfig),
- EnsureExists(_stealthManipulator),
- ByteTransformer(),
- ByteFailurePredicates()),
+ EnsureExists(_stealthManipulator)),
_ => throw new Exception(string.Format(Exceptions.Exceptions.fluent_unsupported_pattern,
$"({_channel}, {_fragmentMode}, {_contentKind})")),
};
diff --git a/Beam.Fluent/Core/FragmentMode.cs b/Beam.Fluent/Core/FragmentMode.cs
index a2e29b1..32aa7de 100644
--- a/Beam.Fluent/Core/FragmentMode.cs
+++ b/Beam.Fluent/Core/FragmentMode.cs
@@ -12,6 +12,6 @@ public enum Channel {
}
public enum ContentKind {
- Html,
+ File,
Binary
}
\ No newline at end of file
diff --git a/Beam.Fluent/DownloadStage.cs b/Beam.Fluent/DownloadStage.cs
index 3186cc1..4b3a896 100644
--- a/Beam.Fluent/DownloadStage.cs
+++ b/Beam.Fluent/DownloadStage.cs
@@ -1,10 +1,11 @@
using System.Collections.Concurrent;
using System.Text.Json;
+using Beam.Abstractions;
using Beam.Models;
namespace Beam.Fluent;
-internal sealed class DownloadStage(DownloadEnumerable download) : IDownloadStage {
+internal sealed class DownloadStage(DownloadEnumerable download) : IDownloadStage where RawType : IDocument {
private IAsyncEnumerable> _download = download;
public DownloadEnumerable AsAsyncEnumerable() {
diff --git a/Beam.Fluent/FluentDownload.cs b/Beam.Fluent/FluentDownload.cs
index 745329d..bce657a 100644
--- a/Beam.Fluent/FluentDownload.cs
+++ b/Beam.Fluent/FluentDownload.cs
@@ -1,4 +1,5 @@
using aeqw89.DataKeys;
+using Beam.Abstractions;
using Beam.Data;
using Beam.Downloaders;
using Beam.Dynamic;
@@ -7,13 +8,13 @@ using Beam.Models;
namespace Beam.Fluent;
public static class FluentDownload {
- public static ITransformStage Links(params IEnumerable links) {
+ public static ITransformStage Links(params IEnumerable links) where RawType : IDocument {
return new TransformStage(new DownloadContextBuilder()
.WithLinks(links));
}
public static ITransformStage
- ResourceDefinition(ResourceDefinition definition) {
+ ResourceDefinition(ResourceDefinition definition) where RawType : IDocument {
if (definition.Location.States.Count == 0)
throw new ArgumentException(Exceptions.Exceptions.resource_definition_invalid_states_count, nameof(definition));
var linkGenerator = new OrderedLinkGenerator(definition.Location.Segments, (NumberedStateChanger)definition.Location.StateChanger.Behavior,
@@ -22,7 +23,7 @@ public static class FluentDownload {
.WithLinks(StringEnumerable.FromGenerator(linkGenerator!)));
}
- public static ITransformStage FromContext(DownloadContext existing) {
+ public static ITransformStage FromContext(DownloadContext existing) where RawType : IDocument {
return new TransformStage(DownloadContextBuilder.FromContext(existing));
}
}
\ No newline at end of file
diff --git a/Beam.Fluent/TransformStage.cs b/Beam.Fluent/TransformStage.cs
index 778c1c2..953ee14 100644
--- a/Beam.Fluent/TransformStage.cs
+++ b/Beam.Fluent/TransformStage.cs
@@ -1,11 +1,12 @@
-using Beam.Data;
+using Beam.Abstractions;
+using Beam.Data;
using Beam.Downloaders;
using Beam.Dynamic;
using Beam.Models;
namespace Beam.Fluent;
-internal sealed class TransformStage(DownloadContextBuilder CtxBuilder) : ITransformStage {
+internal sealed class TransformStage(DownloadContextBuilder CtxBuilder) : ITransformStage where RawType : IDocument {
public IContextStage WithTransformer(AsyncTransformer transformer) {
return new ContextStage(CtxBuilder, transformer);
}
diff --git a/Beam.Models/ByteDocument.cs b/Beam.Models/ByteDocument.cs
index 33600b8..9be20e1 100644
--- a/Beam.Models/ByteDocument.cs
+++ b/Beam.Models/ByteDocument.cs
@@ -1,15 +1,24 @@
using System.Text;
namespace Beam.Models {
- public class ByteDocument(string filename, byte[] content, Encoding? encoding = null) : Document(filename, encoding) {
- public byte[] Content { get; set; } = content;
+ public class ByteDocument : Document {
+ public ByteDocument(string filename, byte[] content, Encoding? encoding = null) : base(filename, encoding) {
+ Content = content;
+ }
+ public ByteDocument(string filename, Memory content, Encoding? encoding = null) :
+ base(filename, encoding) {
+ Content = content;
+ }
+
+ public Memory Content { get; set; }
+
public override byte[] ToBytes() {
- return Content;
+ return Content.ToArray();
}
public override string ToString() {
- return Encoding.GetString(Content);
+ return Encoding.GetString(Content.ToArray());
}
}
}
diff --git a/Beam.Models/DownloadReport.cs b/Beam.Models/DownloadReport.cs
index b27dc57..9464635 100644
--- a/Beam.Models/DownloadReport.cs
+++ b/Beam.Models/DownloadReport.cs
@@ -2,7 +2,9 @@
namespace Beam.Models {
public struct DownloadReport : IDownloadReport {
- // TODO implement download report
+ public long BytesDownloaded { get; init; }
+ public long? BytesRemaining { get; init; }
+
}
}
diff --git a/Beam.Playwright/PlaywrightUnitDownloader.cs b/Beam.Playwright/PlaywrightUnitDownloader.cs
index e4276e8..8e203e8 100644
--- a/Beam.Playwright/PlaywrightUnitDownloader.cs
+++ b/Beam.Playwright/PlaywrightUnitDownloader.cs
@@ -1,36 +1,36 @@
-using Beam.Downloaders;
+using Beam.Abstractions;
+using Beam.Downloaders;
using Beam.Models;
using Microsoft.Playwright;
namespace Beam.Playwright {
- public class PlaywrightUnitDownloader : UnitDownloaderBinary {
- public PlaywrightAsyncManipulator PuppetManipulator { get; }
+ public class PlaywrightUnitDownloader(
+ UnitDownloaderOptions options,
+ PlaywrightAsyncManipulator puppetManipulator)
+ : UnitDownloader(options)
+ where RawType : IDocument {
+ public PlaywrightAsyncManipulator PuppetManipulator { get; } = puppetManipulator;
- public PlaywrightUnitDownloader(HttpClient client, PlaywrightAsyncManipulator puppetManipulator, AsyncTransformer asyncHtmlTransformer, AsyncDownloadFailurePredicate[] asyncDownloadFailurePredicates)
- : base(client, asyncHtmlTransformer, asyncDownloadFailurePredicates) {
- PuppetManipulator = puppetManipulator;
- }
-
- protected override async Task<(bool, T?)> TryDownloadWithNoRetries(string link, CancellationToken ct) {
+ protected override async Task DownloadToStream(string url, int bufferSize, Stream destinationStream, IProgress progress, CancellationToken ct) {
var page = await PlaywrightContext.Browser.Value.NewPageAsync();
try {
- await page.GotoAsync(link);
+ await page.GotoAsync(url);
await PuppetManipulator(page);
var download = await page.WaitForDownloadAsync();
- using var stream = await download.CreateReadStreamAsync();
- byte[] content = new byte[stream.Length];
-
- await stream.ReadExactlyAsync(content, ct);
-
- ByteDocument doc = new ByteDocument(download.SuggestedFilename, content);
- if (FailurePredicates is not null && await IsFailure(doc))
- return (false, default);
-
- var transformed = await Transformer(doc);
- return (true, transformed);
- } catch (Exception) {
- return (false, default);
+ await using var stream = await download.CreateReadStreamAsync();
+ var buffer = new byte[bufferSize];
+ var inBuffer = 0;
+ var downloaded = 0;
+ while ((inBuffer = stream.Read(buffer)) > 0) {
+ downloaded += inBuffer;
+ progress?.Report(new DownloadReport() {
+ BytesDownloaded = downloaded,
+ BytesRemaining = stream.Length - downloaded
+ });
+ await destinationStream.WriteAsync(buffer.AsMemory(0, inBuffer), ct);
+ }
+
} finally {
if (!page.IsClosed)
await page.CloseAsync();
diff --git a/Beam.Playwright/PlaywrightUnitPageDownloader.cs b/Beam.Playwright/PlaywrightUnitPageDownloader.cs
deleted file mode 100644
index 2642f00..0000000
--- a/Beam.Playwright/PlaywrightUnitPageDownloader.cs
+++ /dev/null
@@ -1,39 +0,0 @@
-
-using Beam.Downloaders;
-using Beam.Models;
-using HtmlAgilityPack;
-using Microsoft.Playwright;
-
-namespace Beam.Playwright {
- public class PlaywrightUnitPageDownloader : UnitDownloader {
- public PlaywrightAsyncManipulator PuppetManipulator { get; }
-
- public PlaywrightUnitPageDownloader(HtmlWeb web, PlaywrightAsyncManipulator puppetManipulator, AsyncTransformer asyncHtmlTransformer, AsyncDownloadFailurePredicate[] asyncDownloadFailurePredicates)
- : base(web, asyncHtmlTransformer, asyncDownloadFailurePredicates) {
- PuppetManipulator = puppetManipulator;
- }
-
- protected override async Task<(bool, T?)> TryDownloadWithNoRetries(string link, CancellationToken ct) {
- var page = await PlaywrightContext.Browser.Value.NewPageAsync();
- try {
- await page.GotoAsync(link);
- await PuppetManipulator(page);
- var content = await page.ContentAsync();
- await page.CloseAsync();
-
- HtmlDocument doc = new();
- doc.LoadHtml(content);
- var transformed = await Transformer(doc);
- if (FailurePredicates is null || !(await IsFailure(doc)))
- return (true, transformed);
- return (false, default);
- } catch (Exception) {
- return (false, default);
- } finally {
- if (!page.IsClosed)
- await page.CloseAsync();
- }
- }
- }
-
-}
diff --git a/Beam.Stealth/StealthFragmentDownloader.cs b/Beam.Stealth/StealthFragmentDownloader.cs
index 20d0868..d471165 100644
--- a/Beam.Stealth/StealthFragmentDownloader.cs
+++ b/Beam.Stealth/StealthFragmentDownloader.cs
@@ -4,11 +4,12 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
+using Beam.Abstractions;
using Beam.Downloaders;
using Beam.Models;
namespace Beam.Stealth {
- public class StealthFragmentDownloader : UnitFragmentDownloaderBinary {
- public StealthFragmentDownloader(HttpClient client, StealthConfig config, StealthAsyncManipulator manipulator, AsyncTransformer transformer, AsyncDownloadFailurePredicate?[]? failurePredicate = null, int fragmentSize = 4, ILogger? logger = null) : base(client, transformer, failurePredicate, fragmentSize, logger, new StealthUnitDownloader(client, config, manipulator, transformer, failurePredicate)) {}
+ public class StealthFragmentDownloader : UnitFragmentDownloader where RawType : IDocument {
+ public StealthFragmentDownloader(UnitDownloaderOptions options, StealthConfig config, StealthAsyncManipulator manipulator) : base(options, new StealthUnitDownloader(options, config, manipulator)) {}
}
}
diff --git a/Beam.Stealth/StealthFragmentPageDownloader.cs b/Beam.Stealth/StealthFragmentPageDownloader.cs
index 6a33b52..9d931f5 100644
--- a/Beam.Stealth/StealthFragmentPageDownloader.cs
+++ b/Beam.Stealth/StealthFragmentPageDownloader.cs
@@ -5,11 +5,12 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
+using Beam.Abstractions;
using Beam.Downloaders;
using Beam.Models;
namespace Beam.Stealth {
- public class StealthFragmentPageDownloader : UnitFragmentDownloader {
- public StealthFragmentPageDownloader(HtmlWeb web, StealthConfig config, StealthAsyncManipulator manipulator, AsyncTransformer transformer, AsyncDownloadFailurePredicate?[]? failurePredicate = null, int fragmentSize = 4, ILogger? logger = null) : base(web, transformer, failurePredicate, fragmentSize, logger, new StealthUnitPageDownloader(web, config, manipulator, transformer, failurePredicate)) {}
+ public class StealthFragmentPageDownloader : UnitFragmentDownloader where RawType : IDocument {
+ public StealthFragmentPageDownloader(UnitDownloaderOptions options, StealthConfig config, StealthAsyncManipulator manipulator) : base(options, new StealthUnitPageDownloader(options, config, manipulator)) {}
}
}
diff --git a/Beam.Stealth/StealthUnitDownloader.cs b/Beam.Stealth/StealthUnitDownloader.cs
index fa2b2e9..5a5ac27 100644
--- a/Beam.Stealth/StealthUnitDownloader.cs
+++ b/Beam.Stealth/StealthUnitDownloader.cs
@@ -6,50 +6,37 @@ using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
+using Beam.Abstractions;
using Beam.Downloaders;
using Beam.Models;
namespace Beam.Stealth {
using File = System.IO.File;
- public class StealthUnitDownloader