Skip to content

Commit bbd2ae4

Browse files
committed
Merge branch 'fix/bulk-all-2x' into 2.x
2 parents d944c70 + 102e5fe commit bbd2ae4

File tree

10 files changed

+730
-25
lines changed

10 files changed

+730
-25
lines changed

src/Nest/Document/Multiple/Bulk/ElasticClient-Bulk.cs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,53 +5,53 @@
55
namespace Nest
66
{
77
public partial interface IElasticClient
8-
{
8+
{
99
/// <summary>
1010
/// The bulk API makes it possible to perform many index/delete operations in a single API call.
1111
/// This can greatly increase the indexing speed.
1212
/// <para> </para>http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html
1313
/// </summary>
1414
/// <param name="request">A descriptor the describe the index/create/delete operation for this bulk operation</param>
15-
IBulkResponse Bulk(IBulkRequest request);
16-
15+
IBulkResponse Bulk(IBulkRequest request);
16+
1717
/// <inheritdoc/>
18-
IBulkResponse Bulk(Func<BulkDescriptor, IBulkRequest> selector = null);
19-
18+
IBulkResponse Bulk(Func<BulkDescriptor, IBulkRequest> selector = null);
19+
2020
/// <inheritdoc/>
21-
Task<IBulkResponse> BulkAsync(IBulkRequest request);
22-
21+
Task<IBulkResponse> BulkAsync(IBulkRequest request);
22+
2323
/// <inheritdoc/>
2424
Task<IBulkResponse> BulkAsync(Func<BulkDescriptor, IBulkRequest> selector = null);
2525

2626
}
2727

2828
public partial class ElasticClient
29-
{
29+
{
3030
/// <inheritdoc/>
3131
public IBulkResponse Bulk(IBulkRequest request) =>
3232
this.Dispatcher.Dispatch<IBulkRequest, BulkRequestParameters, BulkResponse>(
3333
request, this.LowLevelDispatch.BulkDispatch<BulkResponse>
34-
);
35-
34+
);
35+
3636
/// <inheritdoc/>
3737
public IBulkResponse Bulk(Func<BulkDescriptor, IBulkRequest> selector = null)
38-
{
39-
// selector should not be nullable, but we can't change it for backwards compatibility reasons
38+
{
39+
// selector should not be nullable, but we can't change it for backwards compatibility reasons
4040
if (selector == null)
41-
throw new ArgumentNullException(nameof(selector));
42-
return this.Bulk(selector.InvokeOrDefault(new BulkDescriptor()));
43-
}
44-
41+
throw new ArgumentNullException(nameof(selector));
42+
return this.Bulk(selector.InvokeOrDefault(new BulkDescriptor()));
43+
}
44+
4545
/// <inheritdoc/>
4646
public Task<IBulkResponse> BulkAsync(IBulkRequest request) =>
4747
this.Dispatcher.DispatchAsync<IBulkRequest, BulkRequestParameters, BulkResponse, IBulkResponse>(
4848
request, this.LowLevelDispatch.BulkDispatchAsync<BulkResponse>
49-
);
50-
49+
);
50+
5151
/// <inheritdoc/>
5252
public Task<IBulkResponse> BulkAsync(Func<BulkDescriptor, IBulkRequest> selector = null)
53-
{
54-
// selector should not be nullable, but we can't change it for backwards compatibility reasons
53+
{
54+
// selector should not be nullable, but we can't change it for backwards compatibility reasons
5555
if (selector == null)
5656
throw new ArgumentNullException(nameof(selector));
5757
return this.BulkAsync(selector.InvokeOrDefault(new BulkDescriptor()));
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
using System;
2+
using System.Collections;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Elasticsearch.Net;
8+
9+
namespace Nest
10+
{
11+
public class BulkAllObservable<T> : IDisposable, IObservable<IBulkAllResponse> where T : class
12+
{
13+
private readonly IBulkAllRequest<T> _partionedBulkRequest;
14+
private readonly IConnectionSettingsValues _connectionSettings;
15+
private readonly IElasticClient _client;
16+
private readonly TimeSpan _backOffTime;
17+
private readonly int _backOffRetries;
18+
private readonly int _bulkSize;
19+
private readonly int _maxDegreeOfParallelism;
20+
private Action _incrementFailed = () => { };
21+
private Action _incrementRetries = () => { };
22+
23+
private readonly CancellationToken _cancelToken;
24+
private readonly CancellationToken _compositeCancelToken;
25+
private readonly CancellationTokenSource _compositeCancelTokenSource;
26+
27+
private Func<IBulkResponse, bool> HandleErrors { get; set; }
28+
29+
public BulkAllObservable(
30+
IElasticClient client,
31+
IConnectionSettingsValues connectionSettings,
32+
IBulkAllRequest<T> partionedBulkRequest,
33+
CancellationToken cancellationToken = default(CancellationToken)
34+
)
35+
{
36+
this._client = client;
37+
this._connectionSettings = connectionSettings;
38+
this._partionedBulkRequest = partionedBulkRequest;
39+
this._backOffRetries = this._partionedBulkRequest.BackOffRetries.GetValueOrDefault(0);
40+
this._backOffTime = (this._partionedBulkRequest?.BackOffTime?.ToTimeSpan() ?? TimeSpan.FromMinutes(1));
41+
this._bulkSize = this._partionedBulkRequest.Size ?? 1000;
42+
this._maxDegreeOfParallelism = this._partionedBulkRequest.MaxDegreeOfParallelism.HasValue
43+
? this._partionedBulkRequest.MaxDegreeOfParallelism.Value
44+
: 20;
45+
this._cancelToken = cancellationToken;
46+
this._compositeCancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(this._cancelToken);
47+
this._compositeCancelToken = this._compositeCancelTokenSource.Token;
48+
}
49+
50+
public IDisposable Subscribe(BulkAllObserver observer)
51+
{
52+
_incrementFailed = observer.IncrementTotalNumberOfFailedBuffers;
53+
_incrementRetries = observer.IncrementTotalNumberOfRetries;
54+
return this.Subscribe((IObserver<IBulkAllResponse>)observer);
55+
}
56+
57+
public IDisposable Subscribe(IObserver<IBulkAllResponse> observer)
58+
{
59+
observer.ThrowIfNull(nameof(observer));
60+
try
61+
{
62+
this.BulkAll(observer);
63+
}
64+
catch (Exception e)
65+
{
66+
observer.OnError(e);
67+
}
68+
return this;
69+
}
70+
71+
private ElasticsearchClientException Throw(string message, IApiCallDetails details) =>
72+
new ElasticsearchClientException(PipelineFailure.BadResponse, message, details);
73+
74+
private void BulkAll(IObserver<IBulkAllResponse> observer)
75+
{
76+
var documents = this._partionedBulkRequest.Documents;
77+
var partioned = new PartitionHelper<T>(documents, this._bulkSize, this._maxDegreeOfParallelism);
78+
partioned.ForEachAsync(
79+
(buffer, page) => this.BulkAsync(buffer, page, 0),
80+
(buffer, response) => observer.OnNext(response),
81+
t => OnCompleted(t, observer)
82+
);
83+
}
84+
85+
private void OnCompleted(Task task, IObserver<IBulkAllResponse> observer)
86+
{
87+
switch (task.Status)
88+
{
89+
case System.Threading.Tasks.TaskStatus.RanToCompletion:
90+
if (this._partionedBulkRequest.RefreshOnCompleted)
91+
{
92+
var refresh = this._client.Refresh(this._partionedBulkRequest.Index);
93+
if (!refresh.IsValid) throw Throw($"Refreshing after all documents have indexed failed", refresh.ApiCall);
94+
}
95+
observer.OnCompleted();
96+
break;
97+
case System.Threading.Tasks.TaskStatus.Faulted:
98+
observer.OnError(task.Exception.InnerException);
99+
break;
100+
case System.Threading.Tasks.TaskStatus.Canceled:
101+
observer.OnError(new TaskCanceledException(task));
102+
break;
103+
}
104+
}
105+
106+
private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int backOffRetries)
107+
{
108+
this._compositeCancelToken.ThrowIfCancellationRequested();
109+
110+
var r = this._partionedBulkRequest;
111+
var response = await this._client.BulkAsync(s =>
112+
{
113+
s.Index(r.Index).Type(r.Type);
114+
if (r.BufferToBulk != null) r.BufferToBulk(s, buffer);
115+
else s.IndexMany(buffer);
116+
if (r.Refresh.HasValue) s.Refresh(r.Refresh.Value);
117+
if (!string.IsNullOrEmpty(r.Routing)) s.Routing(r.Routing);
118+
if (r.Consistency.HasValue) s.Consistency(r.Consistency.Value);
119+
120+
121+
return s;
122+
}).ConfigureAwait(false);
123+
124+
this._compositeCancelToken.ThrowIfCancellationRequested();
125+
if (!response.IsValid && backOffRetries < this._backOffRetries)
126+
{
127+
this._incrementRetries();
128+
//wait before or after fishing out retriable docs?
129+
await Task.Delay(this._backOffTime, this._compositeCancelToken).ConfigureAwait(false);
130+
var retryDocuments = response.Items.Zip(buffer, (i, d) => new { i, d })
131+
.Where(x => x.i.Status == 429)
132+
.Select(x => x.d)
133+
.ToList();
134+
135+
return await this.BulkAsync(retryDocuments, page, ++backOffRetries).ConfigureAwait(false);
136+
}
137+
else if (!response.IsValid)
138+
{
139+
this._incrementFailed();
140+
throw Throw($"Bulk indexing failed and after retrying {backOffRetries} times", response.ApiCall);
141+
}
142+
return new BulkAllResponse { Retries = backOffRetries, Page = page };
143+
}
144+
145+
private sealed class PartitionHelper<TDocument> : IEnumerable<IList<TDocument>>
146+
{
147+
readonly IEnumerable<TDocument> _items;
148+
readonly int _partitionSize;
149+
readonly int _semaphoreSize;
150+
bool _hasMoreItems;
151+
152+
internal PartitionHelper(IEnumerable<TDocument> i, int ps, int semaphoreSize)
153+
{
154+
_items = i;
155+
_semaphoreSize = semaphoreSize;
156+
_partitionSize = ps;
157+
}
158+
159+
IEnumerator IEnumerable.GetEnumerator() => this.GetEnumerator();
160+
public IEnumerator<IList<TDocument>> GetEnumerator()
161+
{
162+
using (var enumerator = _items.GetEnumerator())
163+
{
164+
_hasMoreItems = enumerator.MoveNext();
165+
while (_hasMoreItems)
166+
yield return GetNextBatch(enumerator).ToList();
167+
}
168+
}
169+
170+
IEnumerable<TDocument> GetNextBatch(IEnumerator<TDocument> enumerator)
171+
{
172+
for (int i = 0; i < _partitionSize; ++i)
173+
{
174+
yield return enumerator.Current;
175+
_hasMoreItems = enumerator.MoveNext();
176+
if (!_hasMoreItems) yield break;
177+
}
178+
}
179+
180+
public Task ForEachAsync<TResult>(
181+
Func<IList<TDocument>, long, Task<TResult>> taskSelector,
182+
Action<IList<TDocument>, TResult> resultProcessor,
183+
Action<Task> done
184+
)
185+
{
186+
var semaphore = new SemaphoreSlim(initialCount: _semaphoreSize, maxCount: _semaphoreSize);
187+
long page = 0;
188+
return Task.WhenAll(
189+
from item in this
190+
select ProcessAsync(item, taskSelector, resultProcessor, semaphore, page++)
191+
).ContinueWith(done);
192+
}
193+
194+
private async Task ProcessAsync<TSource, TResult>(
195+
TSource item,
196+
Func<TSource, long, Task<TResult>> taskSelector,
197+
Action<TSource, TResult> resultProcessor,
198+
SemaphoreSlim semaphoreSlim,
199+
long page)
200+
{
201+
if (semaphoreSlim != null) await semaphoreSlim.WaitAsync().ConfigureAwait(false);
202+
try
203+
{
204+
var result = await taskSelector(item, page).ConfigureAwait(false);
205+
resultProcessor(item, result);
206+
}
207+
catch
208+
{
209+
throw;
210+
}
211+
finally { if (semaphoreSlim != null) semaphoreSlim.Release(); }
212+
}
213+
}
214+
215+
public bool IsDisposed { get; private set; }
216+
public void Dispose()
217+
{
218+
this.IsDisposed = true;
219+
this._compositeCancelTokenSource?.Cancel();
220+
}
221+
}
222+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
using System;
2+
using System.Threading;
3+
4+
namespace Nest
5+
{
6+
public class BulkAllObserver : CoordinatedRequestObserverBase<IBulkAllResponse>
7+
{
8+
private long _totalNumberOfFailedBuffers;
9+
private long _totalNumberOfRetries;
10+
11+
public long TotalNumberOfRetries => _totalNumberOfRetries;
12+
public long TotalNumberOfFailedBuffers => _totalNumberOfFailedBuffers;
13+
14+
internal void IncrementTotalNumberOfRetries() => Interlocked.Increment(ref _totalNumberOfRetries);
15+
internal void IncrementTotalNumberOfFailedBuffers() => Interlocked.Increment(ref _totalNumberOfFailedBuffers);
16+
17+
public BulkAllObserver(
18+
Action<IBulkAllResponse> onNext = null,
19+
Action<Exception> onError = null,
20+
Action onCompleted = null
21+
)
22+
: base(onNext, onError, onCompleted)
23+
{
24+
}
25+
26+
}
27+
}

0 commit comments

Comments
 (0)