32
32
import java .util .concurrent .atomic .AtomicInteger ;
33
33
import java .util .function .Consumer ;
34
34
35
+ import org .synchronoss .cloud .nio .multipart .DefaultPartBodyStreamStorageFactory ;
35
36
import org .synchronoss .cloud .nio .multipart .Multipart ;
36
37
import org .synchronoss .cloud .nio .multipart .MultipartContext ;
37
38
import org .synchronoss .cloud .nio .multipart .MultipartUtils ;
38
39
import org .synchronoss .cloud .nio .multipart .NioMultipartParser ;
39
40
import org .synchronoss .cloud .nio .multipart .NioMultipartParserListener ;
41
+ import org .synchronoss .cloud .nio .multipart .PartBodyStreamStorageFactory ;
40
42
import org .synchronoss .cloud .nio .stream .storage .StreamStorage ;
41
43
import reactor .core .publisher .Flux ;
42
44
import reactor .core .publisher .FluxSink ;
@@ -72,6 +74,8 @@ public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part>
72
74
73
75
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory ();
74
76
77
+ private final PartBodyStreamStorageFactory streamStorageFactory = new DefaultPartBodyStreamStorageFactory ();
78
+
75
79
76
80
@ Override
77
81
public List <MediaType > getReadableMediaTypes () {
@@ -89,7 +93,7 @@ public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType
89
93
public Flux <Part > read (ResolvableType elementType , ReactiveHttpInputMessage message ,
90
94
Map <String , Object > hints ) {
91
95
92
- return Flux .create (new SynchronossPartGenerator (message , this .bufferFactory ));
96
+ return Flux .create (new SynchronossPartGenerator (message , this .bufferFactory , this . streamStorageFactory ));
93
97
}
94
98
95
99
@@ -111,11 +115,15 @@ private static class SynchronossPartGenerator implements Consumer<FluxSink<Part>
111
115
private final ReactiveHttpInputMessage inputMessage ;
112
116
113
117
private final DataBufferFactory bufferFactory ;
118
+
119
+ private final PartBodyStreamStorageFactory streamStorageFactory ;
114
120
115
121
116
- SynchronossPartGenerator (ReactiveHttpInputMessage inputMessage , DataBufferFactory factory ) {
122
+ SynchronossPartGenerator (ReactiveHttpInputMessage inputMessage , DataBufferFactory bufferFactory ,
123
+ PartBodyStreamStorageFactory streamStorageFactory ) {
117
124
this .inputMessage = inputMessage ;
118
- this .bufferFactory = factory ;
125
+ this .bufferFactory = bufferFactory ;
126
+ this .streamStorageFactory = streamStorageFactory ;
119
127
}
120
128
121
129
@@ -130,7 +138,10 @@ public void accept(FluxSink<Part> emitter) {
130
138
MultipartContext context = new MultipartContext (mediaType .toString (), length , charset .name ());
131
139
132
140
NioMultipartParserListener listener = new FluxSinkAdapterListener (emitter , this .bufferFactory , context );
133
- NioMultipartParser parser = Multipart .multipart (context ).forNIO (listener );
141
+ NioMultipartParser parser = Multipart
142
+ .multipart (context )
143
+ .usePartBodyStreamStorageFactory (streamStorageFactory )
144
+ .forNIO (listener );
134
145
135
146
this .inputMessage .getBody ().subscribe (buffer -> {
136
147
byte [] resultBytes = new byte [buffer .readableByteCount ()];
0 commit comments