| | | 1 | | // Licensed to the .NET Foundation under one or more agreements. |
| | | 2 | | // The .NET Foundation licenses this file to you under the MIT license. |
| | | 3 | | |
| | | 4 | | using System.Collections.Generic; |
| | | 5 | | using System.Diagnostics; |
| | | 6 | | using System.Threading; |
| | | 7 | | using System.Threading.Tasks; |
| | | 8 | | |
| | | 9 | | namespace System.Text.Json.Serialization.Converters |
| | | 10 | | { |
| | | 11 | | internal sealed class IAsyncEnumerableOfTConverter<TAsyncEnumerable, TElement> |
| | | 12 | | : JsonCollectionConverter<TAsyncEnumerable, TElement> |
| | | 13 | | where TAsyncEnumerable : IAsyncEnumerable<TElement> |
| | | 14 | | { |
| | | 15 | | internal override bool OnTryRead(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options, s |
| | 0 | 16 | | { |
| | 0 | 17 | | if (!typeToConvert.IsAssignableFrom(typeof(IAsyncEnumerable<TElement>))) |
| | 0 | 18 | | { |
| | 0 | 19 | | ThrowHelper.ThrowNotSupportedException_CannotPopulateCollection(Type, ref reader, ref state); |
| | | 20 | | } |
| | | 21 | | |
| | 0 | 22 | | return base.OnTryRead(ref reader, typeToConvert, options, ref state, out value!); |
| | 0 | 23 | | } |
| | | 24 | | |
| | | 25 | | protected override void Add(in TElement value, ref ReadStack state) |
| | 0 | 26 | | { |
| | 0 | 27 | | ((BufferedAsyncEnumerable)state.Current.ReturnValue!)._buffer.Add(value); |
| | 0 | 28 | | } |
| | | 29 | | |
| | 0 | 30 | | internal override bool SupportsCreateObjectDelegate => false; |
| | | 31 | | protected override void CreateCollection(ref Utf8JsonReader reader, scoped ref ReadStack state, JsonSerializerOp |
| | 0 | 32 | | { |
| | 0 | 33 | | state.Current.ReturnValue = new BufferedAsyncEnumerable(); |
| | 0 | 34 | | } |
| | | 35 | | |
| | | 36 | | internal override bool OnTryWrite(Utf8JsonWriter writer, TAsyncEnumerable value, JsonSerializerOptions options, |
| | 0 | 37 | | { |
| | 0 | 38 | | if (!state.SupportAsync) |
| | 0 | 39 | | { |
| | 0 | 40 | | ThrowHelper.ThrowNotSupportedException_TypeRequiresAsyncSerialization(Type); |
| | | 41 | | } |
| | | 42 | | |
| | 0 | 43 | | return base.OnTryWrite(writer, value, options, ref state); |
| | 0 | 44 | | } |
| | | 45 | | |
| | | 46 | | [Diagnostics.CodeAnalysis.SuppressMessage("Reliability", "CA2012:Use ValueTasks correctly", Justification = "Con |
| | | 47 | | protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable value, JsonSerializerOptions optio |
| | 0 | 48 | | { |
| | | 49 | | IAsyncEnumerator<TElement> enumerator; |
| | | 50 | | ValueTask<bool> moveNextTask; |
| | | 51 | | |
| | 0 | 52 | | if (state.Current.AsyncDisposable is null) |
| | 0 | 53 | | { |
| | 0 | 54 | | enumerator = value.GetAsyncEnumerator(state.CancellationToken); |
| | | 55 | | // async enumerators can only be disposed asynchronously; |
| | | 56 | | // store in the WriteStack for future disposal |
| | | 57 | | // by the root async serialization context. |
| | 0 | 58 | | state.Current.AsyncDisposable = enumerator; |
| | | 59 | | // enumerator.MoveNextAsync() calls can throw, |
| | | 60 | | // ensure the enumerator already is stored |
| | | 61 | | // in the WriteStack for proper disposal. |
| | 0 | 62 | | moveNextTask = enumerator.MoveNextAsync(); |
| | | 63 | | |
| | 0 | 64 | | if (!moveNextTask.IsCompleted) |
| | 0 | 65 | | { |
| | | 66 | | // It is common for first-time MoveNextAsync() calls to return pending tasks, |
| | | 67 | | // since typically that is when underlying network connections are being established. |
| | | 68 | | // For this case only, suppress flushing the current buffer contents (e.g. the leading '[' token of |
| | | 69 | | // to give the stream owner the ability to recover in case of a connection error. |
| | 0 | 70 | | state.SuppressFlush = true; |
| | 0 | 71 | | goto SuspendDueToPendingTask; |
| | | 72 | | } |
| | 0 | 73 | | } |
| | | 74 | | else |
| | 0 | 75 | | { |
| | 0 | 76 | | Debug.Assert(state.Current.AsyncDisposable is IAsyncEnumerator<TElement>); |
| | 0 | 77 | | enumerator = (IAsyncEnumerator<TElement>)state.Current.AsyncDisposable; |
| | | 78 | | |
| | 0 | 79 | | if (state.Current.AsyncEnumeratorIsPendingCompletion) |
| | 0 | 80 | | { |
| | | 81 | | // converter was previously suspended due to a pending MoveNextAsync() task |
| | 0 | 82 | | Debug.Assert(state.PendingTask is Task<bool> && state.PendingTask.IsCompleted); |
| | 0 | 83 | | moveNextTask = new ValueTask<bool>((Task<bool>)state.PendingTask); |
| | 0 | 84 | | state.Current.AsyncEnumeratorIsPendingCompletion = false; |
| | 0 | 85 | | state.PendingTask = null; |
| | 0 | 86 | | } |
| | | 87 | | else |
| | 0 | 88 | | { |
| | | 89 | | // converter was suspended for a different reason; |
| | | 90 | | // the last MoveNextAsync() call can only have completed with 'true'. |
| | 0 | 91 | | moveNextTask = new ValueTask<bool>(true); |
| | 0 | 92 | | } |
| | 0 | 93 | | } |
| | | 94 | | |
| | 0 | 95 | | Debug.Assert(moveNextTask.IsCompleted); |
| | 0 | 96 | | JsonConverter<TElement> converter = GetElementConverter(ref state); |
| | | 97 | | |
| | | 98 | | // iterate through the enumerator while elements are being returned synchronously |
| | | 99 | | do |
| | 0 | 100 | | { |
| | 0 | 101 | | if (!moveNextTask.Result) |
| | 0 | 102 | | { |
| | | 103 | | // we have completed serialization for the enumerator, |
| | | 104 | | // clear from the stack and schedule for async disposal. |
| | 0 | 105 | | state.Current.AsyncDisposable = null; |
| | 0 | 106 | | state.AddCompletedAsyncDisposable(enumerator); |
| | 0 | 107 | | return true; |
| | | 108 | | } |
| | | 109 | | |
| | 0 | 110 | | if (ShouldFlush(ref state, writer)) |
| | 0 | 111 | | { |
| | 0 | 112 | | return false; |
| | | 113 | | } |
| | | 114 | | |
| | 0 | 115 | | TElement element = enumerator.Current; |
| | 0 | 116 | | if (!converter.TryWrite(writer, element, options, ref state)) |
| | 0 | 117 | | { |
| | 0 | 118 | | return false; |
| | | 119 | | } |
| | | 120 | | |
| | 0 | 121 | | state.Current.EndCollectionElement(); |
| | 0 | 122 | | moveNextTask = enumerator.MoveNextAsync(); |
| | 0 | 123 | | } while (moveNextTask.IsCompleted); |
| | | 124 | | |
| | 0 | 125 | | SuspendDueToPendingTask: |
| | | 126 | | // we have a pending MoveNextAsync() call; |
| | | 127 | | // wrap inside a regular task so that it can be awaited multiple times; |
| | | 128 | | // mark the current stackframe as pending completion. |
| | 0 | 129 | | Debug.Assert(state.PendingTask is null); |
| | 0 | 130 | | state.PendingTask = moveNextTask.AsTask(); |
| | 0 | 131 | | state.Current.AsyncEnumeratorIsPendingCompletion = true; |
| | 0 | 132 | | return false; |
| | 0 | 133 | | } |
| | | 134 | | |
| | | 135 | | private sealed class BufferedAsyncEnumerable : IAsyncEnumerable<TElement> |
| | | 136 | | { |
| | 0 | 137 | | public readonly List<TElement> _buffer = new(); |
| | | 138 | | |
| | | 139 | | #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously |
| | | 140 | | public async IAsyncEnumerator<TElement> GetAsyncEnumerator(CancellationToken _) |
| | 0 | 141 | | { |
| | 0 | 142 | | foreach (TElement element in _buffer) |
| | 0 | 143 | | { |
| | 0 | 144 | | yield return element; |
| | 0 | 145 | | } |
| | 0 | 146 | | } |
| | | 147 | | #pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously |
| | | 148 | | } |
| | | 149 | | } |
| | | 150 | | } |