|
62 | 62 | import java.io.Serializable; |
63 | 63 | import java.security.NoSuchAlgorithmException; |
64 | 64 | import java.sql.Connection; |
| 65 | +import java.sql.SQLException; |
| 66 | +import java.sql.Statement; |
65 | 67 | import java.util.ArrayList; |
66 | 68 | import java.util.Arrays; |
67 | 69 | import java.util.BitSet; |
|
71 | 73 | import java.util.Map; |
72 | 74 | import java.util.Objects; |
73 | 75 | import java.util.Set; |
| 76 | +import java.util.UUID; |
74 | 77 | import java.util.concurrent.TimeoutException; |
75 | 78 | import java.util.stream.IntStream; |
76 | 79 | import javax.net.ssl.SSLContext; |
|
80 | 83 | import static org.junit.jupiter.api.Assertions.assertInstanceOf; |
81 | 84 | import static org.junit.jupiter.api.Assertions.assertNotNull; |
82 | 85 | import static org.junit.jupiter.api.Assertions.assertThrows; |
| 86 | +import static org.mockito.Mockito.CALLS_REAL_METHODS; |
83 | 87 | import static org.mockito.Mockito.doReturn; |
| 88 | +import static org.mockito.Mockito.mock; |
84 | 89 | import static org.mockito.Mockito.when; |
85 | 90 |
|
86 | 91 | /** |
@@ -122,7 +127,7 @@ public class CaptureChangeMySQLTest { |
122 | 127 | private static final String TEN = "10"; |
123 | 128 | private static final ObjectMapper MAPPER = new ObjectMapper(); |
124 | 129 |
|
125 | | - private CaptureChangeMySQL processor; |
| 130 | + private MockCaptureChangeMySQL processor; |
126 | 131 | private TestRunner testRunner; |
127 | 132 | private MockBinlogClient client; |
128 | 133 |
|
@@ -1295,6 +1300,22 @@ public void testNormalizeQuery() { |
1295 | 1300 | assertEquals("alter table", processor.normalizeQuery(" /* This is a \n multiline comment test */ alter table")); |
1296 | 1301 | } |
1297 | 1302 |
|
| 1303 | + @Test |
| 1304 | + public void testGetTableInfoQuery() throws SQLException { |
| 1305 | + final Statement statement = mock(Statement.class, CALLS_REAL_METHODS); |
| 1306 | + |
| 1307 | + final String prefix = UUID.randomUUID().toString(); |
| 1308 | + final long tableId = 0; |
| 1309 | + |
| 1310 | + final String databaseName = "NiFi 'Quoted' Repository"; |
| 1311 | + final String tableName = "FlowFile"; |
| 1312 | + |
| 1313 | + final TableInfoCacheKey cacheKey = new TableInfoCacheKey(prefix, databaseName, tableName, tableId); |
| 1314 | + final String tableInfoQuery = processor.getTableInfoQuery(statement, cacheKey); |
| 1315 | + |
| 1316 | + assertEquals("SELECT * FROM \"NiFi 'Quoted' Repository\".\"FlowFile\" LIMIT 0", tableInfoQuery); |
| 1317 | + } |
| 1318 | + |
1298 | 1319 | @Test |
1299 | 1320 | void testMigration() { |
1300 | 1321 | final Map<String, String> expectedRenamed = Map.ofEntries( |
@@ -1349,9 +1370,8 @@ protected BinaryLogClient createBinlogClient(String hostname, int port, String u |
1349 | 1370 |
|
1350 | 1371 | @Override |
1351 | 1372 | protected TableInfo loadTableInfo(TableInfoCacheKey key) { |
1352 | | - TableInfo tableInfo = cache.computeIfAbsent(key, k -> new TableInfo(k.getDatabaseName(), k.getTableName(), k.getTableId(), |
| 1373 | + return cache.computeIfAbsent(key, k -> new TableInfo(k.getDatabaseName(), k.getTableName(), k.getTableId(), |
1353 | 1374 | Collections.singletonList(new ColumnDefinition((byte) -4, "string1")))); |
1354 | | - return tableInfo; |
1355 | 1375 | } |
1356 | 1376 |
|
1357 | 1377 | @Override |
|
0 commit comments