Skip to content

update pom to use pgjbc 42.2.6-SNAPSHOT, added model and parsing for pgoutput #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,9 @@
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.1.2-SNAPSHOT</version>
<version>42.2.6-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.2.0</version>
</dependency>

</dependencies>
</project>
125 changes: 109 additions & 16 deletions src/main/java/com/postgresintl/logicaldecoding/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.postgresintl.logicaldecoding.model.Attribute;
import com.postgresintl.logicaldecoding.model.Relation;
import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.core.BaseConnection;
Expand All @@ -22,20 +24,111 @@
public class App
{
private final static String SLOT_NAME="slot";
private final static String HOST="localhost";
private final static String PORT="5433";
private final static String HOST="127.0.0.1";
private final static String PORT="5432";
private final static String DATABASE="test";

Connection connection;
Connection replicationConnection;


private static String toString(ByteBuffer buffer) {
int offset = buffer.arrayOffset();
byte[] source = buffer.array();
int length = source.length - offset;

return new String(source, offset, length);
byte cmd = buffer.get();
switch (cmd) {
case 'R':
Relation relation = new Relation();
relation.setOid(buffer.getInt());

relation.setSchema(getString(buffer));
relation.setName(getString(buffer));
byte replicaIdent = buffer.get();
short numAttrs = buffer.getShort();
for (int i = 0; i < numAttrs;i++){
byte replicaIdentityFull = buffer.get();
String attrName=getString(buffer);
int attrId = buffer.getInt();
int attrMode = buffer.getInt();
relation.addAttribute(new Attribute(attrId, attrName, attrMode, replicaIdentityFull));
}

return relation.toString();

case 'B':
LogSequenceNumber finalLSN = LogSequenceNumber.valueOf(buffer.getLong());
Timestamp commitTime = new Timestamp(buffer.getLong());
int transactionId = buffer.getInt();
return "BEGIN finalLSN: " + finalLSN.toString() + " Commit Time: " + commitTime + " XID: " + transactionId;

case 'C':
// COMMIT
byte unusedFlag = buffer.get();
LogSequenceNumber commitLSN = LogSequenceNumber.valueOf( buffer.getLong() );
LogSequenceNumber endLSN = LogSequenceNumber.valueOf( buffer.getLong() );
commitTime = new Timestamp(buffer.getLong());
return "COMMIT commitLSN:" + commitLSN.toString() + " endLSN:" + endLSN.toString() + " commitTime: " + commitTime;

case 'U': // UPDATE
case 'D': // DELETE
StringBuffer sb = new StringBuffer(cmd=='U'?"UPDATE: ":"DELETE: ");
int oid = buffer.getInt();
/*
this can be O or K if Delete or possibly N if UPDATE
K means key
O means old data
N means new data
*/
char keyOrTuple = (char)buffer.get();
getTuple(buffer, sb);
return sb.toString();

case 'I':
sb = new StringBuffer();
// oid of relation that is being inserted
oid = buffer.getInt();
// should be an N
char isNew = (char)buffer.get();
getTuple(buffer, sb);
return sb.toString();
}
return "";
}

private static void getTuple(ByteBuffer buffer, StringBuffer sb) {
short numAttrs;
numAttrs = buffer.getShort();
for (int i = 0; i < numAttrs; i++) {
byte c = buffer.get();
switch (c) {
case 'n': // null
sb.append("NULL, ");
break;
case 'u': // unchanged toast column
break;
case 't': // textual data
int strLen = buffer.getInt();
byte[] bytes = new byte[strLen];
buffer.get(bytes, 0, strLen);
String value = new String(bytes);
sb.append(value).append(", ");
break;
default:
sb.append("command: ").append((char) c);

}
}
}

private static String getString(ByteBuffer buffer){
StringBuffer sb = new StringBuffer();
while ( true ){
byte c = buffer.get();
if ( c == 0 ) {
break;
}
sb.append((char)c);
}
return sb.toString();
}
private String createUrl(){
return "jdbc:postgresql://"+HOST+':'+PORT+'/'+DATABASE;
Expand All @@ -44,11 +137,11 @@ public void createConnection()
{
try
{
connection = DriverManager.getConnection(createUrl(),"davec","");
connection = DriverManager.getConnection(createUrl(),"test","test");
}
catch (SQLException ex)
{

ex.printStackTrace();
}

}
Expand Down Expand Up @@ -163,9 +256,9 @@ public void receiveChangesOccursBeforStartReplication() throws Exception {
.logical()
.withSlotName(SLOT_NAME)
.withStartPosition(lsn)
// .withSlotOption("proto_version",1)
// .withSlotOption("publication_names", "pub1")
.withSlotOption("include-xids", true)
.withSlotOption("proto_version",1)
.withSlotOption("publication_names", "pub1")
// .withSlotOption("include-xids", true)
// .withSlotOption("skip-empty-xacts", true)
.withStatusInterval(10, TimeUnit.SECONDS)
.start();
Expand Down Expand Up @@ -206,8 +299,8 @@ private LogSequenceNumber getCurrentLSN() throws SQLException

private void openReplicationConnection() throws Exception {
Properties properties = new Properties();
properties.setProperty("user","davec");
properties.setProperty("password","");
properties.setProperty("user","test");
properties.setProperty("password","test");
PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
PGProperty.REPLICATION.set(properties, "database");
PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
Expand All @@ -218,7 +311,7 @@ private boolean isServerCompatible() {
}
public static void main( String[] args )
{
String pluginName = "wal2json";
String pluginName = "pgoutput";

App app = new App();
app.createConnection();
Expand All @@ -228,8 +321,8 @@ public static void main( String[] args )
}
try {
app.createLogicalReplicationSlot(SLOT_NAME, pluginName );
// app.dropPublication("pub1");
// app.createPublication("pub1");
app.dropPublication("pub1");
app.createPublication("pub1");
app.openReplicationConnection();
app.receiveChangesOccursBeforStartReplication();
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.postgresintl.logicaldecoding.model;

public class Attribute {
int oid;
String name;
int mode;
byte identityFull;

public Attribute(int attrId, String attrName, int attrMode, byte replicaIdentityFull) {
oid = attrId;
name = attrName;
mode = attrMode;
identityFull = replicaIdentityFull;
}
public String toString(){
return "oid:" + oid + " name:" + name + " mode:" + mode + " identityFull:" + identityFull;
}
}
51 changes: 51 additions & 0 deletions src/main/java/com/postgresintl/logicaldecoding/model/Relation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.postgresintl.logicaldecoding.model;

import java.util.ArrayList;
import java.util.List;

public class Relation {
int oid;
String schema;
String name;
List <Attribute> attributes = new ArrayList<Attribute>();

public int getOid() {
return oid;
}

public void setOid(int oid) {
this.oid = oid;
}

public String getSchema() {
return schema;
}

public void setSchema(String schema) {
this.schema = schema;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public List<Attribute> getAttributes() {
return attributes;
}

public void setAttributes(List<Attribute> attributes) {
this.attributes = attributes;
}

public void addAttribute(Attribute attribute){
attributes.add(attribute);
}
@Override
public String toString(){
return ""+schema+'.'+name + attributes;
}
}