1
+ using System ;
2
+
3
+ namespace Nest
4
+ {
5
+ public class ReindexObservable < T > : IDisposable , IObservable < IReindexResponse < T > > where T : class
6
+ {
7
+ private ReindexDescriptor < T > _reindexDescriptor ;
8
+ internal IElasticClient CurrentClient { get ; set ; }
9
+ internal ReindexDescriptor < T > ReindexDescriptor { get ; set ; }
10
+
11
+ public ReindexObservable ( IElasticClient client , ReindexDescriptor < T > reindexDescriptor )
12
+ {
13
+ this . _reindexDescriptor = reindexDescriptor ;
14
+ this . CurrentClient = client ;
15
+ }
16
+
17
+ public IDisposable Subscribe ( IObserver < IReindexResponse < T > > observer )
18
+ {
19
+ observer . ThrowIfNull ( "observer" ) ;
20
+ try
21
+ {
22
+ this . Reindex ( observer ) ;
23
+ }
24
+ catch ( Exception e )
25
+ {
26
+ observer . OnError ( e ) ;
27
+ }
28
+ return this ;
29
+
30
+ }
31
+
32
+ private void Reindex ( IObserver < IReindexResponse < T > > observer )
33
+ {
34
+ var fromIndex = this . _reindexDescriptor . _FromIndexName ;
35
+ var toIndex = this . _reindexDescriptor . _ToIndexName ;
36
+ var scroll = this . _reindexDescriptor . _Scroll ?? "2m" ;
37
+
38
+ fromIndex . ThrowIfNullOrEmpty ( "fromIndex" ) ;
39
+ toIndex . ThrowIfNullOrEmpty ( "toIndex" ) ;
40
+
41
+ var indexSettings = this . CurrentClient . GetIndexSettings ( this . _reindexDescriptor . _FromIndexName ) ;
42
+ var createSettings = new CreateIndexDescriptor ( this . CurrentClient . Settings ) . InitializeUsing ( indexSettings . Settings ) ;
43
+ var createIndexResponse = this . CurrentClient
44
+ . CreateIndex ( toIndex , ( c ) => this . _reindexDescriptor . _CreateIndexSelector ( createSettings ) ) ;
45
+ if ( ! createIndexResponse . IsValid )
46
+ throw new ReindexException ( createIndexResponse . ConnectionStatus ) ;
47
+
48
+ var page = 0 ;
49
+ var searchResult = this . CurrentClient . Search < T > (
50
+ s => s
51
+ . Index ( fromIndex )
52
+ . AllTypes ( )
53
+ . From ( 0 )
54
+ . Take ( 100 )
55
+ . Query ( this . _reindexDescriptor . _QuerySelector )
56
+ . SearchType ( SearchType . Scan )
57
+ . Scroll ( scroll )
58
+ ) ;
59
+ if ( searchResult . Total <= 0 )
60
+ throw new ReindexException ( searchResult . ConnectionStatus , "index " + fromIndex + " has no documents!" ) ;
61
+ IBulkResponse indexResult = null ;
62
+ do
63
+ {
64
+ searchResult = this . CurrentClient . Scroll < T > ( scroll , searchResult . ScrollId ) ;
65
+ if ( searchResult . Documents . HasAny ( ) )
66
+ indexResult = this . IndexSearchResults ( searchResult , observer , toIndex , page ) ;
67
+ page ++ ;
68
+ } while ( searchResult . IsValid && indexResult != null && indexResult . IsValid && searchResult . Documents . HasAny ( ) ) ;
69
+
70
+
71
+ observer . OnCompleted ( ) ;
72
+ }
73
+
74
+ public IBulkResponse IndexSearchResults ( IQueryResponse < T > searchResult , IObserver < IReindexResponse < T > > observer , string toIndex , int page )
75
+ {
76
+ if ( ! searchResult . IsValid )
77
+ throw new ReindexException ( searchResult . ConnectionStatus , "reindex failed on scroll #" + page ) ;
78
+
79
+ var indexResult = this . CurrentClient . IndexMany ( searchResult . Documents , toIndex ) ;
80
+ if ( ! indexResult . IsValid )
81
+ throw new ReindexException ( indexResult . ConnectionStatus , "reindex failed when indexing page " + page ) ;
82
+
83
+ observer . OnNext ( new ReindexResponse < T > ( )
84
+ {
85
+ BulkResponse = indexResult ,
86
+ QueryResponse = searchResult ,
87
+ Scroll = page
88
+ } ) ;
89
+ return indexResult ;
90
+ }
91
+
92
+
93
+ public void Dispose ( )
94
+ {
95
+
96
+ }
97
+ }
98
+ }
0 commit comments