Skip to content

Commit 55bb17e

Browse files
(Protocol): ProtocolWriter and ProtocolReader contract changes
- ProtocolWriter no longer implements Flushable - BytesWriter instances can be reused for writing multiple requests. - Internal optimizations in BytesWriter to minimize allocations - More unit tests for BytesWriter (+2 squashed commit) - ProtocolResponseReader can now read the data after data-enriched responses - Add the new "SCOPE_DATA" scope which is entered after the values of a data-enriched response is read and the ProtocolResponseReader is at the begging of the attached data bytes.
1 parent bf8db35 commit 55bb17e

File tree

5 files changed

+190
-125
lines changed

5 files changed

+190
-125
lines changed

protocol/src/main/java/com/pcloud/networking/protocol/BytesReader.java

+50-14
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
package com.pcloud.networking.protocol;
1818

1919
import com.pcloud.utils.IOUtils;
20+
import okio.BufferedSink;
2021
import okio.BufferedSource;
2122
import okio.Okio;
2223

2324
import java.io.IOException;
25+
import java.io.OutputStream;
2426
import java.net.ProtocolException;
2527
import java.util.ArrayDeque;
2628
import java.util.Arrays;
@@ -30,11 +32,15 @@
3032
import static com.pcloud.utils.IOUtils.closeQuietly;
3133

3234
/**
33-
* Reads bytes from source
35+
* A reader for pCloud's binary data protocol
3436
* <p>
35-
* An implementation of {@linkplain ProtocolResponseReader} which is able to read bytes from a {@linkplain BufferedSource}
37+
* An implementation of {@linkplain ProtocolResponseReader} which can read data
38+
* encoded in pCloud's binary protocol from a stream of bytes in pull-based fashion
39+
* with minimal overhead.
3640
* <p>
37-
* Generally used to read the bytes of a network response into serialized data
41+
*
42+
* The implementation can be reused for reading multiple responses from a byte source,
43+
* as long as they are read completely.
3844
*
3945
* @see ProtocolReader
4046
* @see ProtocolResponseReader
@@ -121,18 +127,17 @@ public long beginResponse() throws IOException {
121127
lastStringId = 0;
122128
return pullNumber(RESPONSE_LENGTH_BYTESIZE);
123129
}
124-
125-
throw new SerializationException("Trying to start reading a response, which has already been started.");
130+
throw new IllegalStateException("An already started response or data after it has not been fully read.");
126131
}
127132

128133
@Override
129-
public long endResponse() throws IOException {
134+
public boolean endResponse() throws IOException {
130135
if (currentScope != SCOPE_RESPONSE) {
131136
if (currentScope == SCOPE_NONE) {
132-
throw new SerializationException("Trying to end a response " +
137+
throw new IllegalStateException("Trying to end a response " +
133138
"but none is being read, first call beginResponse()");
134139
} else {
135-
throw new SerializationException("Trying to end a response, " +
140+
throw new IllegalStateException("Trying to end a response, " +
136141
"but current scope is " + scopeName(currentScope));
137142
}
138143
}
@@ -146,11 +151,12 @@ public long endResponse() throws IOException {
146151
popScope();
147152
stringCache = null;
148153

149-
if (dataLength == UNKNOWN_SIZE) {
150-
dataLength = 0;
154+
boolean dataAvailable = dataLength != UNKNOWN_SIZE;
155+
if (dataAvailable) {
156+
pushScope(SCOPE_DATA);
151157
}
152158

153-
return dataLength;
159+
return dataAvailable;
154160
}
155161

156162
@Override
@@ -274,9 +280,30 @@ public boolean hasNext() throws IOException {
274280

275281
@Override
276282
public long dataContentLength() {
283+
if (currentScope == SCOPE_NONE) {
284+
throw new IllegalStateException("No response or data after it are being read.");
285+
}
277286
return dataLength;
278287
}
279288

289+
@Override
290+
public void readData(OutputStream outputStream) throws IOException {
291+
readData(Okio.buffer(Okio.sink(outputStream)));
292+
}
293+
294+
@Override
295+
public void readData(BufferedSink sink) throws IOException {
296+
if (currentScope != SCOPE_DATA) {
297+
throw new IllegalStateException("Cannot read data," +
298+
" either the response is not read fully or no data is following.");
299+
}
300+
301+
long length = dataLength;
302+
sink.write(bufferedSource, length);
303+
dataLength = UNKNOWN_SIZE;
304+
popScope();
305+
}
306+
280307
@Override
281308
public ProtocolResponseReader newPeekingReader() {
282309
BytesReader reader = new PeekingByteReader();
@@ -297,10 +324,9 @@ public ProtocolResponseReader newPeekingReader() {
297324
@Override
298325
public void skipValue() throws IOException {
299326

300-
if (currentScope == SCOPE_NONE) {
327+
if (currentScope == SCOPE_NONE || currentScope == SCOPE_DATA) {
301328
throw new IllegalStateException("Trying to skipValue, but currentScope is " +
302-
currentScope +
303-
". You must call beginResponse() first.");
329+
scopeName(currentScope) + ".");
304330
}
305331

306332
final int type = peekType();
@@ -509,5 +535,15 @@ private PeekingByteReader() {
509535
public ProtocolResponseReader newPeekingReader() {
510536
throw new IllegalStateException("Cannot call newPeekingReader(), this reader is already non-consuming.");
511537
}
538+
539+
@Override
540+
public void readData(OutputStream outputStream) throws IOException {
541+
throw new UnsupportedOperationException("Data cannot be peeked.");
542+
}
543+
544+
@Override
545+
public void readData(BufferedSink sink) throws IOException {
546+
throw new UnsupportedOperationException("Data cannot be peeked.");
547+
}
512548
}
513549
}

protocol/src/main/java/com/pcloud/networking/protocol/BytesWriter.java

+30-35
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ public class BytesWriter implements ProtocolRequestWriter {
5050
private static final int MAX_METHOD_LENGTH = 127;
5151
private static final int BITWISE_SHIFT_SIX = 6;
5252

53-
private BufferedSink sink;
54-
private Buffer paramsBuffer;
53+
private final BufferedSink sink;
54+
private final Buffer paramsBuffer;
5555
private DataSource dataSource;
5656

5757
private String methodName;
@@ -63,7 +63,6 @@ public class BytesWriter implements ProtocolRequestWriter {
6363
/**
6464
* Create a {@linkplain BytesReader} instance
6565
* <p>
66-
*
6766
* @param sink a {@linkplain BufferedSink} to house the bytes
6867
* @throws IllegalArgumentException on a null {@linkplain BufferedSink} argument
6968
*/
@@ -106,50 +105,56 @@ public ProtocolRequestWriter endRequest() throws IOException {
106105
checkWriteValueFinished();
107106

108107
if (methodName == null) {
109-
throw new IllegalArgumentException("Cannot end request, writeMethodName() has not been called.");
108+
throw new IllegalArgumentException("Cannot end request, " +
109+
"writeMethodName() has not been called.");
110110
}
111111

112-
113-
Buffer metadataBuffer = new Buffer();
114-
115112
int methodNameLength = (int) Utf8.size(methodName);
116113
if (methodNameLength > MAX_METHOD_LENGTH) {
117-
throw new SerializationException("Method name cannot be larger than 127 bytes.");
114+
throw new SerializationException(
115+
"Invalid method name '%s', " +
116+
"value cannot be larger than 127 bytes in UTF-8 encoding.", methodName);
118117
}
119118

120119
final long dataSourceLength = dataSource != null ? dataSource.contentLength() : 0;
121120
if (dataSourceLength < 0L) {
122-
throw new SerializationException("Unknown or invalid DataSource content length '" +
123-
dataSourceLength +
124-
"'.");
121+
throw new SerializationException(
122+
"Unknown or invalid DataSource content length '%d'.", dataSourceLength);
125123
}
126124

127125
boolean hasData = dataSourceLength > 0;
128-
if (hasData) {
129-
methodNameLength = methodNameLength | (1 << REQUEST_BINARY_DATA_FLAG_POSITION);
130-
}
131-
metadataBuffer.writeByte(methodNameLength);
132126

133-
if (hasData) {
134-
metadataBuffer.writeLongLe(dataSourceLength);
135-
}
136-
137-
metadataBuffer.writeUtf8(methodName);
138-
metadataBuffer.writeByte(parameterCount);
139-
140-
final long requestSize = metadataBuffer.size() + paramsBuffer.size();
127+
final long requestSize =
128+
(hasData ? 8 : 0) + // Data length, if any (8 bytes)
129+
2 + // Method name length + data flag (1 byte) + Parameter count (1 byte)
130+
methodNameLength +
131+
+paramsBuffer.size();
141132
if (requestSize > REQUEST_SIZE_LIMIT_BYTES) {
142133
throw new SerializationException("The maximum allowed request size is 65535 bytes," +
143134
" current is " + requestSize + " bytes.");
144135
}
145136

146-
sink.writeShortLe((int) requestSize);
147-
sink.write(metadataBuffer, metadataBuffer.size());
137+
if (hasData) {
138+
methodNameLength = methodNameLength | (1 << REQUEST_BINARY_DATA_FLAG_POSITION);
139+
}
140+
141+
sink.writeShortLe((int) requestSize); // Request size, 2 bytes
142+
sink.writeByte(methodNameLength);
143+
if (hasData) {
144+
sink.writeLongLe(dataSourceLength); // Size of data after response, 8 bytes
145+
}
146+
sink.writeUtf8(methodName);
147+
sink.writeByte(parameterCount);
148148
sink.write(this.paramsBuffer, paramsBuffer.size());
149149
if (hasData) {
150150
dataSource.writeTo(sink);
151151
}
152152

153+
sink.emit();
154+
dataSource = null;
155+
methodName = null;
156+
requestStarted = false;
157+
parameterCount = 0;
153158
return this;
154159
}
155160

@@ -297,16 +302,6 @@ public void close() {
297302
dataSource = null;
298303
}
299304

300-
/**
301-
* Flush the {@linkplain BufferedSink}
302-
*
303-
* @throws IOException on failed IO operations
304-
*/
305-
@Override
306-
public void flush() throws IOException {
307-
sink.flush();
308-
}
309-
310305
private void checkRequestStarted() {
311306
if (!requestStarted) {
312307
throw new IllegalStateException("Call beginRequest() before calling this method.");

protocol/src/main/java/com/pcloud/networking/protocol/ProtocolWriter.java

+1-8
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
* @see BytesWriter
2626
* @see ValueWriter
2727
*/
28-
public interface ProtocolWriter extends AutoCloseable, Closeable, Flushable {
28+
public interface ProtocolWriter extends AutoCloseable, Closeable {
2929

3030
/**
3131
* Write a key for a data pair
@@ -111,11 +111,4 @@ public interface ProtocolWriter extends AutoCloseable, Closeable, Flushable {
111111
*/
112112
@Override
113113
void close();
114-
115-
/**
116-
* Flush the data source
117-
* @throws IOException on failed IO operations
118-
*/
119-
@Override
120-
void flush() throws IOException;
121114
}

0 commit comments

Comments
 (0)