2424import static software .amazon .awssdk .http .auth .aws .signer .SignerConstant .STREAMING_SIGNED_PAYLOAD_TRAILER ;
2525import static software .amazon .awssdk .http .auth .aws .signer .SignerConstant .STREAMING_UNSIGNED_PAYLOAD_TRAILER ;
2626import static software .amazon .awssdk .http .auth .aws .signer .SignerConstant .X_AMZ_CONTENT_SHA256 ;
27+ import static software .amazon .awssdk .http .auth .aws .signer .SignerConstant .X_AMZ_DECODED_CONTENT_LENGTH ;
2728import static software .amazon .awssdk .http .auth .aws .signer .SignerConstant .X_AMZ_TRAILER ;
2829
2930import java .nio .ByteBuffer ;
3031import java .nio .charset .StandardCharsets ;
3132import java .util .ArrayList ;
3233import java .util .Collections ;
3334import java .util .List ;
35+ import java .util .Optional ;
36+ import java .util .concurrent .CompletableFuture ;
3437import org .reactivestreams .Publisher ;
3538import software .amazon .awssdk .annotations .SdkInternalApi ;
3639import software .amazon .awssdk .checksums .SdkChecksum ;
3740import software .amazon .awssdk .checksums .spi .ChecksumAlgorithm ;
3841import software .amazon .awssdk .http .ContentStreamProvider ;
3942import software .amazon .awssdk .http .Header ;
4043import software .amazon .awssdk .http .SdkHttpRequest ;
44+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .AsyncChunkEncodedPayload ;
4145import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChecksumTrailerProvider ;
4246import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedInputStream ;
47+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPayload ;
48+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPublisher ;
4349import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SigV4ChunkExtensionProvider ;
4450import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SigV4TrailerProvider ;
51+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SyncChunkEncodedPayload ;
4552import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .TrailerProvider ;
46- import software .amazon .awssdk .http .auth .aws .internal .signer .io .ChecksumInputStream ;
4753import software .amazon .awssdk .http .auth .aws .internal .signer .io .ResettableContentStreamProvider ;
54+ import software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerUtils ;
4855import software .amazon .awssdk .http .auth .spi .signer .PayloadChecksumStore ;
4956import software .amazon .awssdk .utils .BinaryUtils ;
5057import software .amazon .awssdk .utils .Logger ;
@@ -79,81 +86,140 @@ public static Builder builder() {
7986
8087 @ Override
8188 public ContentStreamProvider sign (ContentStreamProvider payload , V4RequestSigningResult requestSigningResult ) {
82- SdkHttpRequest .Builder request = requestSigningResult .getSignedRequest ();
83-
84- String checksum = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
85- () -> new IllegalArgumentException (X_AMZ_CONTENT_SHA256 + " must be set!" )
86- );
87-
8889 ChunkedEncodedInputStream .Builder chunkedEncodedInputStreamBuilder = ChunkedEncodedInputStream
8990 .builder ()
9091 .inputStream (payload .newStream ())
9192 .chunkSize (chunkSize )
9293 .header (chunk -> Integer .toHexString (chunk .remaining ()).getBytes (StandardCharsets .UTF_8 ));
9394
94- preExistingTrailers .forEach (trailer -> chunkedEncodedInputStreamBuilder .addTrailer (() -> trailer ));
95+ SyncChunkEncodedPayload chunkedPayload = new SyncChunkEncodedPayload (chunkedEncodedInputStreamBuilder );
96+ signCommon (chunkedPayload , requestSigningResult );
97+
98+ return new ResettableContentStreamProvider (chunkedEncodedInputStreamBuilder ::build );
99+ }
100+
101+ @ Override
102+ public Publisher <ByteBuffer > signAsync (Publisher <ByteBuffer > payload , V4RequestSigningResult requestSigningResult ) {
103+ ChunkedEncodedPublisher .Builder chunkedStreamBuilder = ChunkedEncodedPublisher .builder ()
104+ .publisher (payload )
105+ .chunkSize (chunkSize )
106+ .addEmptyTrailingChunk (true );
107+
108+ AsyncChunkEncodedPayload chunkedPayload = new AsyncChunkEncodedPayload (chunkedStreamBuilder );
109+ signCommon (chunkedPayload , requestSigningResult );
110+
111+ return chunkedStreamBuilder .build ();
112+ }
113+
114+ private void signCommon (ChunkedEncodedPayload payload , V4RequestSigningResult requestSigningResult ) {
115+ preExistingTrailers .forEach (t -> payload .addTrailer (() -> t ));
116+
117+ SdkHttpRequest .Builder request = requestSigningResult .getSignedRequest ();
118+
119+ payload .decodedContentLength (request .firstMatchingHeader (X_AMZ_DECODED_CONTENT_LENGTH )
120+ .map (Long ::parseLong )
121+ .orElseThrow (() -> {
122+ String msg = String .format ("Expected header '%s' to be present" ,
123+ X_AMZ_DECODED_CONTENT_LENGTH );
124+ return new RuntimeException (msg );
125+ }));
126+
127+ String checksum = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
128+ () -> new IllegalArgumentException (X_AMZ_CONTENT_SHA256 + " must be set!" )
129+ );
95130
96131 switch (checksum ) {
97132 case STREAMING_SIGNED_PAYLOAD : {
98133 RollingSigner rollingSigner = new RollingSigner (requestSigningResult .getSigningKey (),
99134 requestSigningResult .getSignature ());
100- chunkedEncodedInputStreamBuilder .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
135+ payload .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
101136 break ;
102137 }
103138 case STREAMING_UNSIGNED_PAYLOAD_TRAILER :
104- setupChecksumTrailerIfNeeded (chunkedEncodedInputStreamBuilder );
139+ setupChecksumTrailerIfNeeded (payload );
105140 break ;
106141 case STREAMING_SIGNED_PAYLOAD_TRAILER : {
142+ setupChecksumTrailerIfNeeded (payload );
107143 RollingSigner rollingSigner = new RollingSigner (requestSigningResult .getSigningKey (),
108144 requestSigningResult .getSignature ());
109- chunkedEncodedInputStreamBuilder .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
110- setupChecksumTrailerIfNeeded (chunkedEncodedInputStreamBuilder );
111- chunkedEncodedInputStreamBuilder .addTrailer (
112- new SigV4TrailerProvider (chunkedEncodedInputStreamBuilder .trailers (), rollingSigner , credentialScope )
145+ payload .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
146+ payload .addTrailer (
147+ new SigV4TrailerProvider (payload .trailers (), rollingSigner , credentialScope )
113148 );
114149 break ;
115150 }
116151 default :
117152 throw new UnsupportedOperationException ();
118153 }
119-
120- return new ResettableContentStreamProvider (chunkedEncodedInputStreamBuilder ::build );
121- }
122-
123- @ Override
124- public Publisher <ByteBuffer > signAsync (Publisher <ByteBuffer > payload , V4RequestSigningResult requestSigningResult ) {
125- // TODO(sra-identity-and-auth): implement this first and remove addFlexibleChecksumInTrailer logic in HttpChecksumStage
126- throw new UnsupportedOperationException ();
127154 }
128155
129156 @ Override
130157 public void beforeSigning (SdkHttpRequest .Builder request , ContentStreamProvider payload ) {
131158 long encodedContentLength = 0 ;
132- long contentLength = moveContentLength (request , payload );
159+ long contentLength = SignerUtils . computeAndMoveContentLength (request , payload );
133160 setupPreExistingTrailers (request );
134161
135162 // pre-existing trailers
163+ encodedContentLength = calculateEncodedContentLength (request , contentLength );
164+
165+ if (checksumAlgorithm != null ) {
166+ String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
167+ request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
168+ }
169+ request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
170+ request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
171+ }
172+
173+ @ Override
174+ public CompletableFuture <Pair <SdkHttpRequest .Builder , Optional <Publisher <ByteBuffer >>>> beforeSigningAsync (
175+ SdkHttpRequest .Builder request , Publisher <ByteBuffer > payload ) {
176+ return moveContentLength (request , payload )
177+ .thenApply (p -> {
178+ SdkHttpRequest .Builder requestBuilder = p .left ();
179+ setupPreExistingTrailers (requestBuilder );
180+
181+ long decodedContentLength = requestBuilder .firstMatchingHeader (X_AMZ_DECODED_CONTENT_LENGTH )
182+ .map (Long ::parseLong )
183+ // should not happen, this header is added by moveContentLength
184+ .orElseThrow (() -> new RuntimeException (X_AMZ_DECODED_CONTENT_LENGTH
185+ + " header not present" ));
186+
187+ long encodedContentLength = calculateEncodedContentLength (request , decodedContentLength );
188+
189+ if (checksumAlgorithm != null ) {
190+ String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
191+ request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
192+ }
193+ request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
194+ request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
195+ return Pair .of (requestBuilder , p .right ());
196+ });
197+ }
198+
199+ private long calculateEncodedContentLength (SdkHttpRequest .Builder requestBuilder , long decodedContentLength ) {
200+ long encodedContentLength = 0 ;
201+
136202 encodedContentLength += calculateExistingTrailersLength ();
137203
138- String checksum = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
204+ String checksum = requestBuilder .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
139205 () -> new IllegalArgumentException (X_AMZ_CONTENT_SHA256 + " must be set!" )
140206 );
141207
142208 switch (checksum ) {
143209 case STREAMING_SIGNED_PAYLOAD : {
144210 long extensionsLength = 81 ; // ;chunk-signature:<sigv4 hex signature, 64 bytes>
145- encodedContentLength += calculateChunksLength (contentLength , extensionsLength );
211+ encodedContentLength += calculateChunksLength (decodedContentLength , extensionsLength );
146212 break ;
147213 }
148214 case STREAMING_UNSIGNED_PAYLOAD_TRAILER :
149215 if (checksumAlgorithm != null ) {
150216 encodedContentLength += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
151217 }
152- encodedContentLength += calculateChunksLength (contentLength , 0 );
218+ encodedContentLength += calculateChunksLength (decodedContentLength , 0 );
153219 break ;
154220 case STREAMING_SIGNED_PAYLOAD_TRAILER : {
155221 long extensionsLength = 81 ; // ;chunk-signature:<sigv4 hex signature, 64 bytes>
156- encodedContentLength += calculateChunksLength (contentLength , extensionsLength );
222+ encodedContentLength += calculateChunksLength (decodedContentLength , extensionsLength );
157223 if (checksumAlgorithm != null ) {
158224 encodedContentLength += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
159225 }
@@ -167,12 +233,7 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider
167233 // terminating \r\n
168234 encodedContentLength += 2 ;
169235
170- if (checksumAlgorithm != null ) {
171- String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
172- request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
173- }
174- request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
175- request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
236+ return encodedContentLength ;
176237 }
177238
178239 /**
@@ -256,12 +317,7 @@ private long calculateChecksumTrailerLength(String checksumHeaderName) {
256317 return lengthInBytes + 2 ;
257318 }
258319
259- /**
260- * Add the checksum as a trailer to the chunk-encoded stream.
261- * <p>
262- * If the checksum-algorithm is not present, then nothing is done.
263- */
264- private void setupChecksumTrailerIfNeeded (ChunkedEncodedInputStream .Builder builder ) {
320+ private void setupChecksumTrailerIfNeeded (ChunkedEncodedPayload payload ) {
265321 if (checksumAlgorithm == null ) {
266322 return ;
267323 }
@@ -273,20 +329,17 @@ private void setupChecksumTrailerIfNeeded(ChunkedEncodedInputStream.Builder buil
273329 if (cachedChecksum != null ) {
274330 LOG .debug (() -> String .format ("Cached payload checksum available for algorithm %s: %s. Using cached value" ,
275331 checksumAlgorithm .algorithmId (), checksumHeaderName ));
276- builder .addTrailer (() -> Pair .of (checksumHeaderName , Collections .singletonList (cachedChecksum )));
332+ payload .addTrailer (() -> Pair .of (checksumHeaderName , Collections .singletonList (cachedChecksum )));
277333 return ;
278334 }
279335
280336 SdkChecksum sdkChecksum = fromChecksumAlgorithm (checksumAlgorithm );
281- ChecksumInputStream checksumInputStream = new ChecksumInputStream (
282- builder .inputStream (),
283- Collections .singleton (sdkChecksum )
284- );
285337
286338 TrailerProvider checksumTrailer =
287339 new ChecksumTrailerProvider (sdkChecksum , checksumHeaderName , checksumAlgorithm , payloadChecksumStore );
288340
289- builder .inputStream (checksumInputStream ).addTrailer (checksumTrailer );
341+ payload .checksumPayload (sdkChecksum );
342+ payload .addTrailer (checksumTrailer );
290343 }
291344
292345 private String getCachedChecksum () {
0 commit comments