1+ using System ;
2+ using System . Collections . Generic ;
3+ /***using System.Diagnostics.CodeAnalysis;***/
4+ using System . Threading ;
5+ using System . Threading . Tasks ;
6+
7+ namespace /***$rootnamespace$.***/ ULibs . ConcurrentForEach
8+ {
9+ /***[ExcludeFromCodeCoverage]***/
10+ internal static class Concurrent
11+ {
12+ /// <summary>
13+ /// Applies an asynchronous operation to each element in a sequence.
14+ /// </summary>
15+ /// <typeparam name="T">The type of elements in the sequence.</typeparam>
16+ /// <param name="source">The input sequence of elements.</param>
17+ /// <param name="func">The asynchronous operation applied to each element in the sequence. It's strongly
18+ /// recommended that the operation should take care to handle its own expected exceptions.</param>
19+ /// <param name="maxConcurrentTasks">
20+ /// <para>
21+ /// The maximum number of concurrent operations. Must be greater than or equal to 1.
22+ /// </para>
23+ /// <para>
24+ /// The number of simultaneous operations can vary depending on the use case. For cpu intensive
25+ /// operations, consider using <see cref="Environment.ProcessorCount">Environment.ProcessorCount</see>.
26+ /// For operations that invoke the same web service for each item, RFC 7230 suggests that the number
27+ /// of simultaneous requests/connections should be limited (https://tools.ietf.org/html/rfc7230#section-6.4).
28+ /// A search for the connection limits used by common web-browsers suggests that a value in the range 6-8 is
29+ /// appropriate (any more, and you risk triggering abuse detection mechanisms). For operations that invoke a
30+ /// different web service for each item, a search for the connection limits used by common web-browsers
31+ /// suggests that a value in the range 10-20 is appropriate.
32+ /// </para>
33+ /// </param>
34+ /// <param name="cancellationToken">Used to cancel the operations.</param>
35+ /// <returns>A task that can be awaited upon for all operations to complete. Awaiting on the task will
36+ /// raise an <see cref="AggregateException"/> if any operation fails, or work is cancelled via the
37+ /// <paramref name="cancellationToken"/>.</returns>
38+ public static Task ForEachAsync < T > (
39+ this IEnumerable < T > source ,
40+ Func < T , Task > func ,
41+ int maxConcurrentTasks ,
42+ CancellationToken cancellationToken )
43+ {
44+ if ( func == null ) throw new ArgumentNullException ( nameof ( func ) ) ;
45+ return source . ForEachAsync ( ( item , _ ) => func ( item ) , maxConcurrentTasks , cancellationToken ) ;
46+ }
47+
48+ /// <summary>
49+ /// Applies an asynchronous operation to each element in a sequence.
50+ /// </summary>
51+ /// <typeparam name="T">The type of elements in the sequence.</typeparam>
52+ /// <param name="source">The input sequence of elements.</param>
53+ /// <param name="func">The asynchronous operation applied to each element in the sequence. It's strongly
54+ /// recommended that the operation should take care to handle its own expected exceptions.</param>
55+ /// <param name="maxConcurrentTasks">
56+ /// <para>
57+ /// The maximum number of concurrent operations. Must be greater than or equal to 1.
58+ /// </para>
59+ /// <para>
60+ /// The number of simultaneous operations can vary depending on the use case. For cpu intensive
61+ /// operations, consider using <see cref="Environment.ProcessorCount">Environment.ProcessorCount</see>.
62+ /// For operations that invoke the same web service for each item, RFC 7230 suggests that the number
63+ /// of simultaneous requests/connections should be limited (https://tools.ietf.org/html/rfc7230#section-6.4).
64+ /// A search for the connection limits used by common web-browsers suggests that a value in the range 6-8 is
65+ /// appropriate (any more, and you risk triggering abuse detection mechanisms). For operations that invoke a
66+ /// different web service for each item, a search for the connection limits used by common web-browsers
67+ /// suggests that a value in the range 10-20 is appropriate.
68+ /// </para>
69+ /// </param>
70+ /// <param name="cancellationToken">Used to cancel the operations.</param>
71+ /// <returns>A task that can be awaited upon for all operations to complete. Awaiting on the task will
72+ /// raise an <see cref="AggregateException"/> if any operation fails, or work is cancelled via the
73+ /// <paramref name="cancellationToken"/>.</returns>
74+ public static async Task ForEachAsync < T > (
75+ this IEnumerable < T > source ,
76+ Func < T , CancellationToken , Task > func ,
77+ int maxConcurrentTasks ,
78+ CancellationToken cancellationToken )
79+ {
80+ if ( maxConcurrentTasks < 1 )
81+ throw new ArgumentException ( "Value cannot be less than 1" , nameof ( maxConcurrentTasks ) ) ;
82+ if ( source == null ) throw new ArgumentNullException ( nameof ( source ) ) ;
83+ if ( func == null ) throw new ArgumentNullException ( nameof ( func ) ) ;
84+
85+ using ( var semaphore = new SemaphoreSlim ( maxConcurrentTasks , maxConcurrentTasks ) )
86+ {
87+ var tasks = new List < Task > ( ) ;
88+ foreach ( var item in source )
89+ {
90+ // Wait for the next available slot.
91+ try
92+ {
93+ await semaphore . WaitAsync ( cancellationToken ) ;
94+ }
95+ catch ( OperationCanceledException exception )
96+ {
97+ tasks . Add ( Task . FromException ( exception ) ) ;
98+ break ;
99+ }
100+
101+ // Discard completed tasks. Not strictly necessary, but keeps the list size down.
102+ tasks . RemoveAll ( task => task . IsCompleted ) ;
103+
104+ // Kick-off the next task.
105+ tasks . Add ( CreateTask ( func , item , cancellationToken ) . ReleaseSemaphoreOnCompletion ( semaphore ) ) ;
106+ }
107+
108+ await Task . WhenAll ( tasks ) ;
109+ }
110+ }
111+
112+ private static Task CreateTask < T > (
113+ Func < T , CancellationToken , Task > func , T item , CancellationToken cancellationToken )
114+ {
115+ try
116+ {
117+ return func ( item , cancellationToken ) ;
118+ }
119+ catch ( Exception exception )
120+ {
121+ return Task . FromException ( exception ) ;
122+ }
123+ }
124+
125+ private static async Task ReleaseSemaphoreOnCompletion ( this Task task , SemaphoreSlim semaphore )
126+ {
127+ try
128+ {
129+ await task ;
130+ }
131+ finally
132+ {
133+ semaphore . Release ( ) ;
134+ }
135+ }
136+ }
137+ }
0 commit comments