diff --git a/pom.xml b/pom.xml index 9268b8b..9544c7c 100644 --- a/pom.xml +++ b/pom.xml @@ -21,15 +21,9 @@ org.postgresql postgresql - 42.1.2-SNAPSHOT + 42.2.6-SNAPSHOT compile - - - com.google.protobuf - protobuf-java - 3.2.0 - diff --git a/src/main/java/com/postgresintl/logicaldecoding/App.java b/src/main/java/com/postgresintl/logicaldecoding/App.java index 22e46e0..16f11c4 100644 --- a/src/main/java/com/postgresintl/logicaldecoding/App.java +++ b/src/main/java/com/postgresintl/logicaldecoding/App.java @@ -6,6 +6,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.postgresintl.logicaldecoding.model.Attribute; +import com.postgresintl.logicaldecoding.model.Relation; import org.postgresql.PGConnection; import org.postgresql.PGProperty; import org.postgresql.core.BaseConnection; @@ -22,8 +24,8 @@ public class App { private final static String SLOT_NAME="slot"; - private final static String HOST="localhost"; - private final static String PORT="5433"; + private final static String HOST="127.0.0.1"; + private final static String PORT="5432"; private final static String DATABASE="test"; Connection connection; @@ -31,11 +33,102 @@ public class App private static String toString(ByteBuffer buffer) { - int offset = buffer.arrayOffset(); - byte[] source = buffer.array(); - int length = source.length - offset; - return new String(source, offset, length); + byte cmd = buffer.get(); + switch (cmd) { + case 'R': + Relation relation = new Relation(); + relation.setOid(buffer.getInt()); + + relation.setSchema(getString(buffer)); + relation.setName(getString(buffer)); + byte replicaIdent = buffer.get(); + short numAttrs = buffer.getShort(); + for (int i = 0; i < numAttrs;i++){ + byte replicaIdentityFull = buffer.get(); + String attrName=getString(buffer); + int attrId = buffer.getInt(); + int attrMode = buffer.getInt(); + relation.addAttribute(new Attribute(attrId, attrName, attrMode, replicaIdentityFull)); + } + + return relation.toString(); + + case 'B': + LogSequenceNumber finalLSN = LogSequenceNumber.valueOf(buffer.getLong()); + Timestamp commitTime = new Timestamp(buffer.getLong()); + int transactionId = buffer.getInt(); + return "BEGIN finalLSN: " + finalLSN.toString() + " Commit Time: " + commitTime + " XID: " + transactionId; + + case 'C': + // COMMIT + byte unusedFlag = buffer.get(); + LogSequenceNumber commitLSN = LogSequenceNumber.valueOf( buffer.getLong() ); + LogSequenceNumber endLSN = LogSequenceNumber.valueOf( buffer.getLong() ); + commitTime = new Timestamp(buffer.getLong()); + return "COMMIT commitLSN:" + commitLSN.toString() + " endLSN:" + endLSN.toString() + " commitTime: " + commitTime; + + case 'U': // UPDATE + case 'D': // DELETE + StringBuffer sb = new StringBuffer(cmd=='U'?"UPDATE: ":"DELETE: "); + int oid = buffer.getInt(); + /* + this can be O or K if Delete or possibly N if UPDATE + K means key + O means old data + N means new data + */ + char keyOrTuple = (char)buffer.get(); + getTuple(buffer, sb); + return sb.toString(); + + case 'I': + sb = new StringBuffer(); + // oid of relation that is being inserted + oid = buffer.getInt(); + // should be an N + char isNew = (char)buffer.get(); + getTuple(buffer, sb); + return sb.toString(); + } + return ""; + } + + private static void getTuple(ByteBuffer buffer, StringBuffer sb) { + short numAttrs; + numAttrs = buffer.getShort(); + for (int i = 0; i < numAttrs; i++) { + byte c = buffer.get(); + switch (c) { + case 'n': // null + sb.append("NULL, "); + break; + case 'u': // unchanged toast column + break; + case 't': // textual data + int strLen = buffer.getInt(); + byte[] bytes = new byte[strLen]; + buffer.get(bytes, 0, strLen); + String value = new String(bytes); + sb.append(value).append(", "); + break; + default: + sb.append("command: ").append((char) c); + + } + } + } + + private static String getString(ByteBuffer buffer){ + StringBuffer sb = new StringBuffer(); + while ( true ){ + byte c = buffer.get(); + if ( c == 0 ) { + break; + } + sb.append((char)c); + } + return sb.toString(); } private String createUrl(){ return "jdbc:postgresql://"+HOST+':'+PORT+'/'+DATABASE; @@ -44,11 +137,11 @@ public void createConnection() { try { - connection = DriverManager.getConnection(createUrl(),"davec",""); + connection = DriverManager.getConnection(createUrl(),"test","test"); } catch (SQLException ex) { - + ex.printStackTrace(); } } @@ -163,9 +256,9 @@ public void receiveChangesOccursBeforStartReplication() throws Exception { .logical() .withSlotName(SLOT_NAME) .withStartPosition(lsn) - // .withSlotOption("proto_version",1) - // .withSlotOption("publication_names", "pub1") - .withSlotOption("include-xids", true) + .withSlotOption("proto_version",1) + .withSlotOption("publication_names", "pub1") + // .withSlotOption("include-xids", true) // .withSlotOption("skip-empty-xacts", true) .withStatusInterval(10, TimeUnit.SECONDS) .start(); @@ -206,8 +299,8 @@ private LogSequenceNumber getCurrentLSN() throws SQLException private void openReplicationConnection() throws Exception { Properties properties = new Properties(); - properties.setProperty("user","davec"); - properties.setProperty("password",""); + properties.setProperty("user","test"); + properties.setProperty("password","test"); PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); PGProperty.REPLICATION.set(properties, "database"); PGProperty.PREFER_QUERY_MODE.set(properties, "simple"); @@ -218,7 +311,7 @@ private boolean isServerCompatible() { } public static void main( String[] args ) { - String pluginName = "wal2json"; + String pluginName = "pgoutput"; App app = new App(); app.createConnection(); @@ -228,8 +321,8 @@ public static void main( String[] args ) } try { app.createLogicalReplicationSlot(SLOT_NAME, pluginName ); -// app.dropPublication("pub1"); -// app.createPublication("pub1"); + app.dropPublication("pub1"); + app.createPublication("pub1"); app.openReplicationConnection(); app.receiveChangesOccursBeforStartReplication(); } catch (InterruptedException e) { diff --git a/src/main/java/com/postgresintl/logicaldecoding/model/Attribute.java b/src/main/java/com/postgresintl/logicaldecoding/model/Attribute.java new file mode 100644 index 0000000..a4dfd9e --- /dev/null +++ b/src/main/java/com/postgresintl/logicaldecoding/model/Attribute.java @@ -0,0 +1,18 @@ +package com.postgresintl.logicaldecoding.model; + +public class Attribute { + int oid; + String name; + int mode; + byte identityFull; + + public Attribute(int attrId, String attrName, int attrMode, byte replicaIdentityFull) { + oid = attrId; + name = attrName; + mode = attrMode; + identityFull = replicaIdentityFull; + } + public String toString(){ + return "oid:" + oid + " name:" + name + " mode:" + mode + " identityFull:" + identityFull; + } +} diff --git a/src/main/java/com/postgresintl/logicaldecoding/model/Relation.java b/src/main/java/com/postgresintl/logicaldecoding/model/Relation.java new file mode 100644 index 0000000..86cec86 --- /dev/null +++ b/src/main/java/com/postgresintl/logicaldecoding/model/Relation.java @@ -0,0 +1,51 @@ +package com.postgresintl.logicaldecoding.model; + +import java.util.ArrayList; +import java.util.List; + +public class Relation { + int oid; + String schema; + String name; + List attributes = new ArrayList(); + + public int getOid() { + return oid; + } + + public void setOid(int oid) { + this.oid = oid; + } + + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public List getAttributes() { + return attributes; + } + + public void setAttributes(List attributes) { + this.attributes = attributes; + } + + public void addAttribute(Attribute attribute){ + attributes.add(attribute); + } + @Override + public String toString(){ + return ""+schema+'.'+name + attributes; + } +}