Login Register





Tutorial Streaming responses filter_list
Author
Message
Streaming responses #1
So this is gonna be another one of my late night threads, and not really a tutorial, be warned.

Have you ever had to deliver a HUGE list of JSON objects, and hate the latency? Well this is your solution. Here's a bit of code:
Code:
public class AsyncStreamExecutor<T> : IActionResultExecutor<AsyncStreamResult<T>>
{
    public async Task ExecuteAsync(ActionContext context, AsyncStreamResult<T> result)
    {
        context.HttpContext.Response.StatusCode = result.StatusCode;
        context.HttpContext.Response.Headers.Add("Content-Type", "text/event-stream");
        await context.HttpContext.Response.Body.FlushAsync();
        if (result.StatusCode < 200 || result.StatusCode >= 300)
            return;
        var sw = new StreamWriter(context.HttpContext.Response.Body);
        var writer = new Newtonsoft.Json.JsonTextWriter(sw);
        var setting = new JsonSerializerSettings()
        {
            TypeNameHandling = TypeNameHandling.None,
            Formatting = Formatting.None
        };
        JsonSerializer seri = Newtonsoft.Json.JsonSerializer.Create(setting);
       
        writer.WriteStartArray();
        await writer.FlushAsync();

        int count = 0;
        await foreach (var obj in result.AsyncEnmerable)
        {
            seri.Serialize(writer, obj);
            await writer.FlushAsync();
           
           
            count++;
        }
        writer.WriteEndArray();
        await writer.FlushAsync();
    }
}

public class AsyncStreamResult<T> : IActionResult
{
    public IAsyncEnumerable<T> AsyncEnmerable { get; set; }
    public readonly int StatusCode;
    public AsyncStreamResult(IAsyncEnumerable<T> asyncEnmerable, int statusCode = StatusCodes.Status200OK)
    {
        this.AsyncEnmerable = asyncEnmerable;
        this.StatusCode = statusCode;
    }

    public Task ExecuteResultAsync(ActionContext context)
    {
        if (context == null)
        {
            throw new ArgumentNullException(nameof(context));
        }

        var executor = context.HttpContext.RequestServices.GetRequiredService<IActionResultExecutor<AsyncStreamResult<T>>>();
        return executor.ExecuteAsync(context, this);
    }
}

public static class AsyncStreamerExtensionMethods
{
    public static IServiceCollection AddAsyncEnumerableStreamer<T>(this IServiceCollection services)
    {
        services.TryAddSingleton<IActionResultExecutor<AsyncStreamResult<T>>, AsyncStreamExecutor<T>> ();
        return services;
    }
}

Ok, why did I just give you all that code? Well, the answer is HTTP/2. There's no reason you can't async stream your responses with .net5, and that's required to do it. Here's a real world example:

Code:
public async Task<AsyncStreamResult<byte[]>> GetFileContentsAsync(Guid fileId)
    {
        async IAsyncEnumerable<byte[]> GetFileContentsAsyncBg(Guid fileId)
        {
            if (!await HasPermission(fileId, ACLPermission.Read))
                throw new UnauthorizedAccessException();
            if (await db.FsItems.FindAsync(fileId) is not FSItem item || item.Type != FSItemType.File)
                throw new FileNotFoundException();
            await using FileStream fs = System.IO.File.OpenRead(item.SeedDataPath);
            byte[] mbuf = new byte[4096];
            Memory<byte> buffer = new(mbuf);
            int nRead;
            while ((nRead = await fs.ReadAsync(buffer)) > 0)
                yield return buffer.Slice(0, nRead).ToArray();
        }

        try
        {
            return new(GetFileContentsAsyncBg(fileId));
        }
        catch (FileNotFoundException)
        {
            return new(null, StatusCodes.Status404NotFound);
        }
        catch (UnauthorizedAccessException)
        {
            return new(null, StatusCodes.Status401Unauthorized);
        }
    }

Is the exception BS really necessary? no. but its in my code.
Alright, so what does this do?
well, on its own, nothing. However, if you understand IAsyncEnumerable and its applications in your code, you will be delighted to know that this is the network equivalent. However, on its own, it's useless. HttpClient will buffer the data and receive it as normal. But lets say you want that stream to be available as an async enumerable or equivalent? Well, here's my client code:

Code:
public static async Task<bool> GetFileAsync(Guid Id, string path)
{
    HttpRequestMessage request = new(HttpMethod.Get, $"/api/fs/{Id}/Contents");
    HttpResponseMessage response = await Settings.Client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
    if (response.IsSuccessStatusCode)
    {
        if (File.Exists($"{path}.download"))
            File.Delete($"{path}.download");
        await using var stream = await response.Content.ReadAsStreamAsync();
        using StreamReader str = new StreamReader(stream);
        
        var reader = new JsonTextReader(str);
        var setting = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None };
        var seri = JsonSerializer.Create(setting);

        var  o = new List<object>();

        //TODO: lock download file
        if (File.Exists($"{path}.download"))
            File.Delete($"{path}.download");
        await using (FileStream fw = File.OpenWrite($"{path}.download"))
        {
            while (await reader.ReadAsync())
            {
                if (reader.TokenType == JsonToken.String)
                {
                    byte[] oo = seri.Deserialize<byte[]>(reader);
                    await fw.WriteAsync(oo);
                }
                else if (reader.TokenType == JsonToken.EndArray)
                    break;
            }
            await fw.FlushAsync();
        }
        File.Move($"{path}.download", path, true);
        return true;
    }

    if (response.StatusCode == HttpStatusCode.Unauthorized)
        throw new UnauthorizedException();
    return false;
}

And....this is (for now) presented without comment. Enjoy.

Reply







Users browsing this thread: 1 Guest(s)