Producing IAsyncEnumerable

The introduction of IAsyncEnumerable to C# allowed for simplifying a lot of scenarios that used to require a lot of code to get async behavior similar to an equivalent synchronous scenario. The await foreach construction specifically makes the consumption side almost trivial. The combination of the delayed execution of IEnumerable and asynchrony does add some complexity though, which can often come into play on the side of producing the async collection and trying to build control flow or error handling into the process.

Part of the complexity comes from the way methods constructed to return IAsyncEnumerable treat the async keyword. Take these simple examples:

public IAsyncEnumerable<int> GetNumbers()
{
    return Enumerable.Range(0, 10).ToAsyncEnumerable();
}

public async IAsyncEnumerable<int> GetNumbersAsync()
{
    foreach (int i in Enumerable.Range(0, 10))
    {
        await Task.Delay(i);
        yield return i;
    }
}

Like with normal async/await usage, adding the async keyword on a method forces different syntax usage and will ultimately generate much different code internally at runtime. With IAsyncEnumerable it allows using await within the method (like normal async methods) but doesn’t change the return type to Task like usual, because the returned IAsyncEnumerable is already inherently asynchronous. Internal to the method comes another change. Where the non-async method can directly return an instance of IAsyncEnumerable, adding async requires use of yield statements to return each item individually which will populate a returned IAsyncEnumerable instance. For simple examples like this that’s not a lot of disruption but can cause difficulty in more complex scenarios.

Roots in IEnumerable and yield return

To understand the difference in the async behavior it can help to look at the simpler case of IEnumerable. Methods returning IEnumerable can choose between the same two options seen in the earlier examples: directly return an instance or use yield statements to return items individually. Either one can be used, but within a single method it needs to be either one or the other, not a mix of both. In either case, delayed execution is involved because of how IEnumerable works, but the two options can shift where exactly that execution takes place.

To demonstrate the problem I’m going to use an example of reading text from a Stream and returning the resulting characters one at a time. A validation method is going to be used to ensure that only numeric digits are allowed. As an optimization, short strings are going to be validated immediately and throw an exception before returning any characters if anything is found to be invalid. Longer strings will be read one character at a time. For the initial implementation, I’m just going to return IEnumerable instances.

public IEnumerable<string> ReadDataStream(Stream stream)
{
    if (stream.Length < 10)
    {
        var sr = new StreamReader(stream);
        var text = sr.ReadToEnd();
        if (!text.IsValid())
        {
            throw new Exception($"Invalid characters in text '{text}'");
        }

        return text.Select(c => c.ToString());
    }

    var characterList = new List<string>();
    var buffer = new byte[1];
    while (stream.Read(buffer) > 0)
    {
        string text = Encoding.UTF8.GetString(buffer);
        if (!text.IsValid())
        {
            throw new Exception($"Invalid character in text '{text}'");
        }
        characterList.Add(text);
    }

    return characterList;
}

To test this method I’m going to execute the method 3 times with different inputs: a valid 10 digit number, 7 characters containing a non-digit at the fifth character, 20 characters containing a non-digit at the twelfth character. The calling code will separately catch exceptions at the method call and while iterating the collection so it’s clear when errors are happening.

    Console.WriteLine("Begin Processing");
    try
    {
        var data = reader.ReadDataStream(stream);
        try
        {
            foreach (string item in data)
            {
                Console.WriteLine($"Got digit '{item}'");
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error iterating collection: {ex.Message}");
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Error reading data: {ex.Message}");
    }

Here are the results of running the code with inputs of "0123456789", "0123A56", "01234567890I23456789".

Begin Processing
Got digit '0'
Got digit '1'
Got digit '2'
Got digit '3'
Got digit '4'
Got digit '5'
Got digit '6'
Got digit '7'
Got digit '8'
Got digit '9'

Begin Processing
Error reading data: Invalid characters in text '0123A56'

Begin Processing
Error reading data: Invalid character in text 'I'

In both of the failure cases no characters were produced because the Exception occurred within the ReadDataStream method when it was invoked, rather than later during iterator execution. It’s pretty clear why: for both short and long strings the entire stream is processed before returning from the method. This may be desireable behavior in some scenarios but keeping in mind the goal of enabling async execution this is not going to allow that.

Adding yield

Making some changes to the code it can start to move in the right direction by using yield return to enable delayed iterator execution.

public IEnumerable<string> ReadDataStream(Stream stream)
{
    if (stream.Length < 10)
    {
        var sr = new StreamReader(stream);
        var text = sr.ReadToEnd();
        if (!text.IsValid())
        {
            throw new Exception($"Invalid characters in text '{text}'");
        }

        foreach (char item in text) yield return item.ToString();
    }

    var buffer = new byte[1];
    while (stream.Read(buffer) > 0)
    {
        string text = Encoding.UTF8.GetString(buffer);
        if (!text.IsValid())
        {
            throw new Exception($"Invalid character in text '{text}'");
        }
        yield return text;
    }
}

For short strings we’re still doing validation up front but now the long execution only processes characters as they’re read. Here’s the new output:

Begin Processing
Got digit '0'
Got digit '1'
Got digit '2'
Got digit '3'
Got digit '4'
Got digit '5'
Got digit '6'
Got digit '7'
Got digit '8'
Got digit '9'

Begin Processing
Error iterating collection: Invalid characters in text '0123A56'

Begin Processing
Got digit '0'
Got digit '1'
Got digit '2'
Got digit '3'
Got digit '4'
Got digit '5'
Got digit '6'
Got digit '7'
Got digit '8'
Got digit '9'
Got digit '0'
Error iterating collection: Invalid character in text 'I'

There’s a new potential problem here though. In the immediate validation case, we should expect that the exception will be thrown when the method is called (giving the Error reading data message seen before), but because of the iterator delay that’s no longer the case. So now both cases are behaving the same way but with the opposite behavior. What we’re really looking for is a mix of both within the same method.

Mixing Techniques

Because of the way that methods switch back and forth between these two different modes, what I really need is two separate methods, one which returns an IEnumerable instance to handle the up-front validation, and separate iterator method which uses yield to build iteratively.

public IEnumerable<string> ReadDataStream(Stream stream)
{
    if (stream.Length < 10)
    {
        var sr = new StreamReader(stream);
        var text = sr.ReadToEnd();
        if (!text.IsValid())
        {
            throw new Exception($"Invalid characters in text '{text}'");
        }

        return text.Select(c => c.ToString());
    }

    return ReadIteratively(stream);
}

private static IEnumerable<string> ReadIteratively(Stream stream)
{
    var buffer = new byte[1];
    while (stream.Read(buffer) > 0)
    {
        string text = Encoding.UTF8.GetString(buffer);
        if (!text.IsValid())
        {
            throw new Exception($"Invalid character in text '{text}'");
        }
        yield return text;
    }
}

This combination now gives me the mix of both behaviors that I wanted, with delayed execution for the longer text, but immediate execution of the up-front validation at method call time.

Begin Processing
Got digit '0'
Got digit '1'
Got digit '2'
Got digit '3'
Got digit '4'
Got digit '5'
Got digit '6'
Got digit '7'
Got digit '8'
Got digit '9'

Begin Processing
Error reading data: Invalid characters in text '0123A56'

Begin Processing
Got digit '0'
Got digit '1'
Got digit '2'
Got digit '3'
Got digit '4'
Got digit '5'
Got digit '6'
Got digit '7'
Got digit '8'
Got digit '9'
Got digit '0'
Error iterating collection: Invalid character in text 'I'

Application to IAsyncEnumerable

Now that we have insight into solving the IEnumerable part of the complexity, what changes when we add asynchronous execution back in? To start, I’ll switch what I can to use available async equivalents: IEnumerable to IAsyncEnumerable and the two stream Read methods.

public async IAsyncEnumerable<string> ReadDataStream(Stream stream)
{
    if (stream.Length < 10)
    {
        var sr = new StreamReader(stream);
        var text = await sr.ReadToEndAsync();
        if (!text.IsValid())
        {
            throw new Exception($"Invalid characters in text '{text}'");
        }

        return text.Select(c => c.ToString()).ToAsyncEnumerable();
    }

    return ReadIteratively(stream);
}

private static async IAsyncEnumerable<string> ReadIteratively(Stream stream)
{
    var buffer = new byte[1];
    while (await stream.ReadAsync(buffer) > 0)
    {
        string text = Encoding.UTF8.GetString(buffer);
        if (!text.IsValid())
        {
            throw new Exception($"Invalid character in text '{text}'");
        }
        yield return text;
    }
}

This looks nice and easy and might be expected to just work, but if you try to compile it you’ll see the problem. It causes a compiler error:

Error CS1622: Cannot return a value from an iterator. Use the yield return statement to return a value, or yield break to end the iteration.

This is the additional requirement added by IAsyncEnumerable. I need to use await within the ReadDataStream method to use the ReadToEndAsync method, but adding async to the method has the additional effect of forcing it to use the iterator mode. The ReadIteratively method doesn’t have this problem because it is already using yield.

The most direct route to fix this is to just iterate the validated string and yield return each item. I also need to inline the other iterator method that was pulled out now that the outer method is an iterator.

public async IAsyncEnumerable<string> ReadDataStream(Stream stream)
{
    if (stream.Length < 10)
    {
        var sr = new StreamReader(stream);
        var text = await sr.ReadToEndAsync();
        if (!text.IsValid())
        {
            throw new Exception($"Invalid characters in text '{text}'");
        }

        foreach (var digit in text.Select(c => c.ToString()))
        {
            yield return digit;
        }
    }

    var buffer = new byte[1];
    while (await stream.ReadAsync(buffer) > 0)
    {
        string text = Encoding.UTF8.GetString(buffer);
        if (!text.IsValid())
        {
            throw new Exception($"Invalid character in text '{text}'");
        }
        yield return text;
    }
}

Now it compiles, but what happens at runtime?

Begin Processing Async
Got digit '0'
Got digit '1'
Got digit '2'
Got digit '3'
Got digit '4'
Got digit '5'
Got digit '6'
Got digit '7'
Got digit '8'
Got digit '9'

Begin Processing Async
Error iterating collection: Invalid characters in text '0123A56'

Begin Processing Async
Got digit '0'
Got digit '1'
Got digit '2'
Got digit '3'
Got digit '4'
Got digit '5'
Got digit '6'
Got digit '7'
Got digit '8'
Got digit '9'
Got digit '0'
Error iterating collection: Invalid character in text 'I'

We’re now back to an earlier problem: the up-front validation exception is being delayed until the iterator is executed rather than at method call time like I wanted. Recall that in the non-async case this was solved by making the outer method that threw the exception return IEnumerable instances, but we’re boxed in here by the additional async restriction that caused the compile errors in the equivalent code.

Stepping Back

To solve this problem, it helps to step back from the code and think about what is actually happening during execution. Looking again at the calling code for reference, there are 2 possible places for exceptions to show up: reader.ReadDataStream or the await foreach line.

    Console.WriteLine("Begin Processing Async");
    try
    {
        IAsyncEnumerable<string> data = reader.ReadDataStream(stream);
        try
        {
            await foreach (string item in data)
            {
                Console.WriteLine($"Got digit '{item}'");
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error iterating collection: {ex.Message}");
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Error reading data: {ex.Message}");
    }

    Console.WriteLine();

When IAsyncEnumerable is returned from an iterator version of the method, there is nothing actually executed at the method call line. The result produced by the method is essentially just an unexecuted function, which then begins executing when enumeration of the data variable begins. This means that no matter what code is within that ReadDataStream method, as long as it is an iterator type method, there will never be an exception thrown at the call site. As we saw with the non-compiling example, making the outer ReadDataStream method not an iterator isn’t straightforward, but why is that?

In a normal async method, the returned Task displays similar behavior. Exceptions are thrown at the point where the Task is awaited, not at the method call site. The problem being caused by the IAsyncEnumerable iterator method is that the async keyword on the method is doing double duty. The call to await sr.ReadToEndAsync() in another context would result in an awaitable Task returning from the method and an await reader.ReadDataStream(stream) would cause the behavior we want. That doesn’t work here because of the IAsyncEnumerable return type which isn’t awaitable until iterated.

Mixing Task and IAsyncEnumerable

Since using await has become problematic, with a little restructuring I could instead turn the validation part into a synchronous call by getting the result of the task as soon as it is created.

public IAsyncEnumerable<string> ReadDataStream(Stream stream)
{
    if (stream.Length < 10)
    {
        var digits = ValidateAndGetDigits(stream);
        return digits.GetAwaiter().GetResult();
    }

    return ReadIteratively(stream);
}

private static async Task<IAsyncEnumerable<string>> ValidateAndGetDigits(Stream stream)
{
    var sr = new StreamReader(stream);
    var text = await sr.ReadToEndAsync();
    if (!text.IsValid())
    {
        throw new Exception($"Invalid characters in text '{text}'");
    }

    return text.Select(c => c.ToString()).ToAsyncEnumerable();
}

private static async IAsyncEnumerable<string> ReadIteratively(Stream stream)
{
    var buffer = new byte[1];
    while (await stream.ReadAsync(buffer) > 0)
    {
        string text = Encoding.UTF8.GetString(buffer);
        if (!text.IsValid())
        {
            throw new Exception($"Invalid character in text '{text}'");
        }
        yield return text;
    }
}

Here the ValidateAndGetDigits is getting out of the trap we were in before by wrapping the IAsyncEnumerable in a normal Task. With that change, we now have a method that can use await without requiring yield. Now, with the resulting Task, using GetAwaiter().GetResult() will cause immediate execution of the new method and throw at the method call line.

Begin Processing Async
Got digit '0'
Got digit '1'
Got digit '2'
Got digit '3'
Got digit '4'
Got digit '5'
Got digit '6'
Got digit '7'
Got digit '8'
Got digit '9'

Begin Processing Async
Error reading data: Invalid characters in text '0123A56'

Begin Processing Async
Got digit '0'
Got digit '1'
Got digit '2'
Got digit '3'
Got digit '4'
Got digit '5'
Got digit '6'
Got digit '7'
Got digit '8'
Got digit '9'
Got digit '0'
Error iterating collection: Invalid character in text 'I'

Finally, we have the desired resulting output. Getting to this point did require a little bit of a cheat though. The goal was to have asynchronous execution, but introducing GetResult split the execution into synchronous and asynchronous paths, for short and long inputs, respectively. This might be good enough for some scenarios, where that short validation path can be limited to known fast execution paths. But what if I need to do something like make a database or API call in my validation method? Can I get back async execution on that short path?

To do this properly, I need to return back to the calling code both the IAsyncEnumerable representing the data and a separate awaitable Task representing the validation step. Extending the method I used earlier to explicitly wrap the collection with a task I can now return both. The only difference in my calling code is a new await:

    IAsyncEnumerable<string> data = await reader.ReadDataStream(stream);

And the method’s internal code is now cleaner, with no extra early task evaluation. Executing still gives the same final output as the previous code.

public async Task<IAsyncEnumerable<string>> ReadDataStream(Stream stream)
{
    if (stream.Length < 10)
    {
        var sr = new StreamReader(stream);
        var text = await sr.ReadToEndAsync();
        if (!await text.IsValidAsync())
        {
            throw new Exception($"Invalid characters in text '{text}'");
        }

        return text.Select(c => c.ToString()).ToAsyncEnumerable();
    }

    return ReadIteratively(stream);
}

private static async IAsyncEnumerable<string> ReadIteratively(Stream stream)
{
    var buffer = new byte[1];
    while (await stream.ReadAsync(buffer) > 0)
    {
        string text = Encoding.UTF8.GetString(buffer);
        if (!await text.IsValidAsync())
        {
            throw new Exception($"Invalid character in text '{text}'");
        }
        yield return text;
    }
}

Code Versions

Example code is using .NET 7.0 with these library versions:

  • System.Linq.Async 6.0.1