feat: add deferred response buffering, TableDataProvider, and stealth improvements
- ApiResponse: add readToBuffer option to defer/stream body instead of eagerly buffering - TableDataProvider: implement HTML table parser with per-column provider support - StealthConfig: add 10s page load timeout and copyCookiesFrom parameter for cookie sharing - StealthUnitDownloader: catch WebDriverTimeoutException on navigation, log warning instead of throwing - Bump version to 2.9.0
This commit is contained in:
+3
-3
@@ -22,7 +22,7 @@ namespace Beam.Api;
|
||||
|
||||
private string? ContentType = "application/json";
|
||||
|
||||
public async Task<ApiResponse> GetResponse(ILogger<ApiResponse>? logger, (int @try, int max)? tries = null, CancellationToken ct = default) {
|
||||
public async Task<ApiResponse> GetResponse(ILogger<ApiResponse>? logger, (int @try, int max)? tries = null, bool readToBuffer = true, CancellationToken ct = default) {
|
||||
SanitizeHeaders();
|
||||
|
||||
var request = new HttpRequestMessage(Method, Uri);
|
||||
@@ -40,10 +40,10 @@ namespace Beam.Api;
|
||||
|
||||
if (tries is not null && tries?.@try < tries?.max && !SuccessCodes.Contains(response.StatusCode)) {
|
||||
await Task.Delay((int)Math.Min(Math.Pow(2, tries.Value.@try), 60) * 1000, ct);
|
||||
return await GetResponse(logger, (tries.Value.@try + 1, tries.Value.max), ct);
|
||||
return await GetResponse(logger, (tries.Value.@try + 1, tries.Value.max), readToBuffer, ct);
|
||||
}
|
||||
|
||||
return await ApiResponse.CreateAsync(response, logger, RequestData, ct);
|
||||
return await ApiResponse.CreateAsync(response, logger, RequestData, readToBuffer, ct);
|
||||
}
|
||||
|
||||
private void SanitizeHeaders() {
|
||||
|
||||
@@ -33,7 +33,7 @@ namespace Beam.Api;
|
||||
// sequential
|
||||
var sequential = new List<ApiResponse>(_calls.Count);
|
||||
foreach (var call in _calls)
|
||||
sequential.Add(await call.GetResponse(logger, tries, ct));
|
||||
sequential.Add(await call.GetResponse(logger, tries, true, ct));
|
||||
return sequential;
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ namespace Beam.Api;
|
||||
_calls.Select((c, i) => (call: c, idx: i)),
|
||||
new ParallelOptions { MaxDegreeOfParallelism = _maxDegree, CancellationToken = ct },
|
||||
async (item, token) => {
|
||||
var response = await item.call.GetResponse(logger, tries, token);
|
||||
var response = await item.call.GetResponse(logger, tries, true, token);
|
||||
bag.Add((item.idx, response));
|
||||
});
|
||||
|
||||
|
||||
+36
-10
@@ -14,11 +14,13 @@ namespace Beam.Api;
|
||||
/// Wrapper that lets the response body be read any number of times (even concurrently).
|
||||
/// </summary>
|
||||
public sealed class ApiResponse {
|
||||
private readonly byte[] _buffer;
|
||||
private byte[] _buffer;
|
||||
private bool _read_has_been_deferred;
|
||||
|
||||
private ApiResponse(HttpResponseMessage response, byte[] buffer, ILogger<ApiResponse>? logger, object? requestData = null) {
|
||||
Response = response;
|
||||
_buffer = buffer;
|
||||
_read_has_been_deferred = _buffer.Length == 0;
|
||||
Logger = logger;
|
||||
RequestData = requestData;
|
||||
}
|
||||
@@ -33,8 +35,10 @@ namespace Beam.Api;
|
||||
HttpResponseMessage response,
|
||||
ILogger<ApiResponse>? logger = null,
|
||||
object? requestData = null,
|
||||
bool readToBuffer = true,
|
||||
CancellationToken ct = default) {
|
||||
if (response is null) throw new ArgumentNullException(nameof(response));
|
||||
if (!readToBuffer) return new ApiResponse(response, [], logger, requestData);
|
||||
|
||||
var buffer = response.Content is null
|
||||
? []
|
||||
@@ -55,32 +59,54 @@ namespace Beam.Api;
|
||||
if (!Is200) errorHandler(Response.StatusCode);
|
||||
return this;
|
||||
}
|
||||
|
||||
/* ---------- content helpers ---------- */
|
||||
|
||||
public Task<T?> AsSerializedObject<T>(CancellationToken ct = default) {
|
||||
private async Task ReadToBuffer(CancellationToken ct = default) {
|
||||
if (!_read_has_been_deferred) return;
|
||||
_buffer = Response.Content is null
|
||||
? []
|
||||
: await Response.Content.ReadAsByteArrayAsync(ct).ConfigureAwait(false);
|
||||
_read_has_been_deferred = false;
|
||||
}
|
||||
|
||||
public async Task<T?> AsSerializedObject<T>(CancellationToken ct = default) {
|
||||
if (!Is200) throw new InvalidOperationException();
|
||||
if (Response.Content?.Headers.ContentType?.MediaType != "application/json")
|
||||
Logger?.LogWarning("Content-Type is not JSON, yet JSON deserialization was requested.");
|
||||
|
||||
return Task.FromResult(JsonSerializer.Deserialize<T>(_buffer));
|
||||
if (_read_has_been_deferred) {
|
||||
return await JsonSerializer.DeserializeAsync<T>(await Response.Content!.ReadAsStreamAsync(ct), (JsonSerializerOptions?)null, ct);
|
||||
} else {
|
||||
return JsonSerializer.Deserialize<T>(_buffer);
|
||||
}
|
||||
}
|
||||
|
||||
public Task<T?> AsDynamicObject<T>(T _, CancellationToken ct = default)
|
||||
=> AsSerializedObject<T>(ct);
|
||||
|
||||
public Task<string> AsString(CancellationToken ct = default) {
|
||||
public async Task<string> AsString(CancellationToken ct = default) {
|
||||
if (!Is200) Logger?.LogWarning("Non-success response; attempting to read content.");
|
||||
return Task.FromResult(Encoding.UTF8.GetString(_buffer));
|
||||
if (_read_has_been_deferred) {
|
||||
await ReadToBuffer(ct);
|
||||
}
|
||||
|
||||
return Encoding.UTF8.GetString(_buffer);
|
||||
}
|
||||
|
||||
public Task<byte[]> AsBinary(CancellationToken ct = default) {
|
||||
public async Task<byte[]> AsBinary(CancellationToken ct = default) {
|
||||
if (!Is200) Logger?.LogWarning("Non-success response; attempting to read content.");
|
||||
return Task.FromResult(_buffer);
|
||||
if (_read_has_been_deferred) {
|
||||
await ReadToBuffer(ct);
|
||||
}
|
||||
return _buffer;
|
||||
}
|
||||
|
||||
public Task<Stream> AsStream(CancellationToken ct = default) {
|
||||
public async Task<Stream> AsStream(CancellationToken ct = default) {
|
||||
if (!Is200) Logger?.LogWarning("Non-success response; attempting to read content.");
|
||||
return Task.FromResult<Stream>(new MemoryStream(_buffer, writable: false));
|
||||
if (_read_has_been_deferred) {
|
||||
return await Response.Content!.ReadAsStreamAsync(ct);
|
||||
} else {
|
||||
return new MemoryStream(_buffer, writable: false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user