Skip to content
20 changes: 20 additions & 0 deletions vertx-pg-client/src/main/java/examples/PgClientExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,26 @@ public void batchReturning(SqlClient client) {
});
}

public void importDataToDb(Vertx vertx, PgConnection client) {
vertx.fileSystem().readFile("path/to/file")
.flatMap(bufferAsyncResult -> {
return client.copyFromBytes(
"COPY my_table FROM STDIN (FORMAT csv, HEADER)",
bufferAsyncResult
).execute();
}).onSuccess(result -> {
Long rowsWritten = result.iterator().next().getLong("rowsWritten");
System.out.println("rows written: " + rowsWritten);
});
}

public void exportDataFromDb(Vertx vertx, PgConnection client) {
String path = "path/to/file";
client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)")
.flatMap(result -> vertx.fileSystem().writeFile("path/to/file.csv", result.value()))
.onSuccess(res -> System.out.println("Data exported to " + path));
}

public void pgBouncer(PgConnectOptions connectOptions) {
connectOptions.setUseLayer7Proxy(true);
}
Expand Down
50 changes: 50 additions & 0 deletions vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

package io.vertx.pgclient;

import io.vertx.core.buffer.Buffer;
import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.pgclient.impl.PgConnectionImpl;
import io.vertx.sqlclient.Query;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.SqlResult;

import java.util.List;

/**
* A connection to Postgres.
Expand All @@ -34,6 +41,7 @@
* <ul>
* <li>Notification</li>
* <li>Request Cancellation</li>
* <li>Copy from STDIN / to STDOUT</li>
* </ul>
* </P>
*
Expand Down Expand Up @@ -92,6 +100,48 @@ static Future<PgConnection> connect(Vertx vertx, String connectionUri) {
@Fluent
PgConnection noticeHandler(Handler<PgNotice> handler);

/**
* Imports data into a database.
*
* <p>Use this method when importing opaque bytes, e.g. from a CSV file.
*
* <p>If you need bulk inserts of POJOs, use {@link io.vertx.sqlclient.PreparedQuery#executeBatch(List)} instead.
*
* @param sql COPY command (example {@code COPY my_table FROM STDIN (FORMAT csv, HEADER)})
* @param from byte stream data will be fetched from
* @return result set with single field {@code rowsWritten}
*/
Query<RowSet<Row>> copyFromBytes(String sql, Buffer from);

/**
* Exports data from a database with decoding.
*
* {@code FORMAT} can only be {@code binary}.
*
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT binary)})
* @return decoded records
*/
Query<RowSet<Row>> copyToRows(String sql);

/**
* Exports data from a database as-is, without decoding.
*
* <p>Use this method when exporting opaque bytes, e.g. to a CSV file.
*
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
* @return async result of bytes container data will be written to
*
* - vertx.core.stream - https://vertx.io/docs/apidocs/io/vertx/core/streams/ReadStream.html
* - future of read stream.
* - when we do query operation
* - we should not use query result builder
* - what about SELECT 1;SELECT 1 or COPY ....;COPY ... ?
* - we need a new interface Future<ReadStream<Buffer>>
* - https://vertx.io/docs/apidocs/io/vertx/core/streams/ReadStream.html
* - PgSocketConnection - we will apply backpressure in SocketInternal
*/
Future<SqlResult<Buffer>> copyToBytes(String sql);

/**
* Send a request cancellation message to tell the server to cancel processing request in this connection.
* <br>Note: Use this with caution because the cancellation signal may or may not have any effect.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,36 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.netty.buffer.ByteBuf;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
import io.vertx.pgclient.PgNotice;
import io.vertx.pgclient.PgNotification;
import io.vertx.pgclient.impl.codec.CopyOutCommand;
import io.vertx.pgclient.impl.codec.NoticeResponse;
import io.vertx.pgclient.impl.codec.TxFailedEvent;
import io.vertx.pgclient.spi.PgDriver;
import io.vertx.sqlclient.Query;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.impl.Connection;
import io.vertx.sqlclient.impl.Notification;
import io.vertx.sqlclient.impl.QueryExecutor;
import io.vertx.sqlclient.impl.QueryResultBuilder;
import io.vertx.sqlclient.impl.QueryResultHandler;
import io.vertx.sqlclient.impl.SocketConnectionBase;
import io.vertx.sqlclient.impl.SqlConnectionBase;
import io.vertx.sqlclient.impl.SqlResultImpl;
import io.vertx.sqlclient.impl.command.QueryCommandBase;
import io.vertx.sqlclient.impl.command.SimpleQueryCommand;

import java.util.function.Function;
import java.util.stream.Collector;

public class PgConnectionImpl extends SqlConnectionBase<PgConnectionImpl> implements PgConnection {

Expand Down Expand Up @@ -107,6 +125,32 @@ public PgConnection noticeHandler(Handler<PgNotice> handler) {
return this;
}

@Override
public Query<RowSet<Row>> copyFromBytes(String sql, Buffer from) {
return null;
}

@Override
public Query<RowSet<Row>> copyToRows(String sql) {
return null;
}

@Override
public Future<SqlResult<Buffer>> copyToBytes(String sql) {
Function<Buffer, SqlResultImpl<Buffer>> factory = (buffer) -> new SqlResultImpl<>(buffer);
PromiseInternal<SqlResult<Buffer>> promise = context.promise();

// currently, this loads entire content into Buffer
// it should stream bytes out instead
// TODO: signal completion as soon as the database replied CopyOutResponse 'H' ?
QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler =
new QueryResultBuilder<>(factory, promise);

CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler);
this.schedule(promise.context(), cmd).onComplete(resultHandler);
return promise.future();
}

@Override
public int processId() {
return conn.getProcessId();
Expand All @@ -126,4 +170,5 @@ public Future<Void> cancelRequest() {
});
return promise.future();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.vertx.pgclient.impl.codec;

import io.netty.buffer.ByteBuf;
import io.vertx.core.buffer.Buffer;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.impl.QueryResultBuilder;
import io.vertx.sqlclient.impl.SqlResultImpl;
import io.vertx.sqlclient.impl.command.CommandBase;

import java.util.stream.Collector;

public class CopyOutCommand extends CommandBase<Boolean> {
private final String sql;
private final Collector<ByteBuf, Buffer, Buffer> collector;
private final QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler;

public CopyOutCommand(
String sql,
QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler
) {
this.sql = sql;
this.resultHandler = resultHandler;
this.collector = Collector.of(
Buffer::buffer,
// TODO: this might be unnecessary slow - think of alternatives
(v, chunk) -> v.appendBuffer(Buffer.buffer(chunk)),
(v1, v2) -> null,
(finalResult) -> finalResult
);
}

QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler() {
return resultHandler;
}

String sql() {
return sql;
}

Collector<ByteBuf, Buffer, Buffer> collector() {
return collector;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.vertx.pgclient.impl.codec;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.impl.SqlResultImpl;

class CopyOutCommandCodec extends PgCommandCodec<Boolean, CopyOutCommand> {
CopyOutDataDecoder decoder;

CopyOutCommandCodec(CopyOutCommand cmd) {
super(cmd);
decoder = new CopyOutDataDecoder(cmd.collector());
}

@Override
public void handleCommandComplete(int updated) {
this.result = false;
Buffer result;
Throwable failure;
int size;
if (decoder != null) {
failure = decoder.complete();
result = decoder.result();
size = decoder.size();
decoder.reset();
} else {
failure = null;
result = new BufferImpl();
size = 0;
}
cmd.resultHandler().handleResult(updated, size, null, result, failure);
}

@Override
public void handleErrorResponse(ErrorResponse errorResponse) {
failure = errorResponse.toException();
}

void encode(PgEncoder encoder) {
encoder.writeQuery(new Query(cmd.sql()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.vertx.pgclient.impl.codec;

import io.netty.buffer.ByteBuf;
import io.vertx.core.buffer.Buffer;
import java.util.function.BiConsumer;
import java.util.stream.Collector;

public class CopyOutDataDecoder {

private final Collector<ByteBuf, Buffer, Buffer> collector;
private BiConsumer<Buffer, ByteBuf> accumulator;

private int size;
private Buffer container;
private Throwable failure;
private Buffer result;

protected CopyOutDataDecoder(Collector<ByteBuf, Buffer, Buffer> collector) {
this.collector = collector;
reset();
}

public int size() {
return size;
}

public void handleChunk(ByteBuf in) {
if (failure != null) {
return;
}
if (accumulator == null) {
try {
accumulator = collector.accumulator();
} catch (Exception e) {
failure = e;
return;
}
}
try {
accumulator.accept(container, in);
} catch (Exception e) {
failure = e;
return;
}
size++;
}

public Buffer result() {
return result;
}

public Throwable complete() {
try {
result = collector.finisher().apply(container);
} catch (Exception e) {
failure = e;
}
return failure;
}

public void reset() {
size = 0;
failure = null;
result = null;
try {
this.container = collector.supplier().get();
} catch (Exception e) {
failure = e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,20 @@ private void decodeMessage(ChannelHandlerContext ctx, byte id, ByteBuf in) {
decodeNotificationResponse(ctx, in);
break;
}
// TODO: check if these handlers need to be at this level of loop
// TODO: check if COPY needs a separate loop
case PgProtocolConstants.MESSAGE_TYPE_COPY_OUT_RESPONSE: {
decodeCopyOutResponse(ctx, in);
break;
}
case PgProtocolConstants.MESSAGE_TYPE_COPY_DATA: {
decodeCopyData(ctx, in);
break;
}
case PgProtocolConstants.MESSAGE_TYPE_COPY_COMPLETION: {
decodeCopyCompletion(ctx, in);
break;
}
default: {
throw new UnsupportedOperationException();
}
Expand Down Expand Up @@ -455,4 +469,14 @@ private void decodeBackendKeyData(ByteBuf in) {
private void decodeNotificationResponse(ChannelHandlerContext ctx, ByteBuf in) {
ctx.fireChannelRead(new Notification(in.readInt(), Util.readCStringUTF8(in), Util.readCStringUTF8(in)));
}

private void decodeCopyOutResponse(ChannelHandlerContext ctx, ByteBuf in) {}

private void decodeCopyData(ChannelHandlerContext ctx, ByteBuf in) {
PgCommandCodec<?, ?> codec = inflight.peek();
CopyOutCommandCodec cmdCodec = (CopyOutCommandCodec) codec;
cmdCodec.decoder.handleChunk(in);
}

private void decodeCopyCompletion(ChannelHandlerContext ctx, ByteBuf in) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ void write(CommandBase<?> cmd) {
return new ClosePortalCommandCodec((CloseCursorCommand) cmd);
} else if (cmd instanceof CloseStatementCommand) {
return new CloseStatementCommandCodec((CloseStatementCommand) cmd);
} else if (cmd instanceof CopyOutCommand) {
return new CopyOutCommandCodec((CopyOutCommand) cmd);
}
throw new AssertionError();
}
Expand Down
Loading