|
| 1 | +[主页](https://github.com/vonzhou/Blog) | [读书](https://github.com/vonzhou/readings) | [知乎](https://www.zhihu.com/people/vonzhou) |
| 2 | +--- |
| 3 | +# HBase 实现分页查询 |
| 4 | + |
| 5 | + |
| 6 | +## 序 |
| 7 | + |
| 8 | +按时间区间分页导出HBase中的数据。 |
| 9 | + |
| 10 | +## Rowkey的设计 |
| 11 | + |
| 12 | +在使用HBase时,Rowkey的设计很重要,取决于业务。 |
| 13 | + |
| 14 | +比如要把用户关联的数据存入HBase,后续根据时间查询,可以这样设计rowkey: |
| 15 | + |
| 16 | +``` |
| 17 | +userId + (Long.MAX - timestamp) + uid |
| 18 | +``` |
| 19 | + |
| 20 | +这样能满足: |
| 21 | + |
| 22 | +* 可以根据userId的特点预分区 |
| 23 | +* 时间戳逆转,可以保证最近的数据rowkey排序靠前 |
| 24 | +* 分布式环境下时间戳可能一样,所以追加一个UID,防止重复 |
| 25 | + |
| 26 | +示例代码: |
| 27 | + |
| 28 | +```java |
| 29 | +private String getRowKeyStr(String userId, long ts, long uid) { |
| 30 | + return String.format("%s%013d%019d", userId, Long.MAX_VALUE - ts, uid); |
| 31 | +} |
| 32 | +``` |
| 33 | + |
| 34 | + |
| 35 | +## 构造Table实例 |
| 36 | + |
| 37 | +需要自己保证Table的线程安全性。 |
| 38 | + |
| 39 | +```java |
| 40 | +public Table getTable() throws Exception { |
| 41 | + Table table = tableThreadLocal.get(); |
| 42 | + if (table == null) { |
| 43 | + table = getTableInternal(); |
| 44 | + if (table != null) { |
| 45 | + tableThreadLocal.set(table); |
| 46 | + } |
| 47 | + } |
| 48 | + |
| 49 | + return table; |
| 50 | +} |
| 51 | + |
| 52 | +public Table getTableInternal() throws Exception { |
| 53 | + Configuration config = HBaseConfiguration.create(); |
| 54 | + config.set(HConstants.ZOOKEEPER_QUORUM, hBaseConfig.getZkQuorum()); |
| 55 | + config.set(HConstants.ZOOKEEPER_CLIENT_PORT, hBaseConfig.getZkClientPort()); |
| 56 | + config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, hBaseConfig.getZkZnodeParent()); |
| 57 | + |
| 58 | + config.setInt("hbase.rpc.timeout", 20000); |
| 59 | + config.setInt("hbase.client.operation.timeout", 30000); |
| 60 | + config.setInt("hbase.client.scanner.timeout.period", 20000); |
| 61 | + config.setInt("hbase.client.pause", 50); |
| 62 | + config.setInt("hbase.client.retries.number", 15); |
| 63 | + |
| 64 | +// HBaseAdmin.checkHBaseAvailable(config); |
| 65 | + |
| 66 | + Connection connection = ConnectionFactory.createConnection(config); |
| 67 | + |
| 68 | + Table table = connection.getTable(TableName.valueOf(hBaseConfig.getTableName())); |
| 69 | + |
| 70 | + return table; |
| 71 | +} |
| 72 | +``` |
| 73 | + |
| 74 | +## 分页查询 |
| 75 | + |
| 76 | +这里要注意是Scan中的startRow,stopRow是左闭右开区间,所以为了避免下一页中包含上一页的最后一条数据, 下一页Scan的时候startRow追加了一个0字节。 |
| 77 | + |
| 78 | +```java |
| 79 | +Filter filter = new PageFilter(15); |
| 80 | + |
| 81 | +byte[] lastRow = null; |
| 82 | +byte[] startRow = getRowKey(userId, end, 0L); |
| 83 | +byte[] endRow = getRowKey(userId, start, Long.MAX_VALUE); |
| 84 | + |
| 85 | +Table table = getTable(); |
| 86 | +if (table == null) { |
| 87 | + return; |
| 88 | +} |
| 89 | + |
| 90 | +int sum = 0; |
| 91 | + |
| 92 | +while (true) { |
| 93 | + Scan scan = new Scan(); |
| 94 | + scan.setFilter(filter); |
| 95 | + |
| 96 | + byte[] sr = null; |
| 97 | + if (lastRow != null) { |
| 98 | + sr = Bytes.add(lastRow, new byte[1]);// 重点1 |
| 99 | + } else { |
| 100 | + sr = startRow; |
| 101 | + } |
| 102 | + |
| 103 | + scan.setStartRow(sr); |
| 104 | + scan.setStopRow(endRow); |
| 105 | + ResultScanner scanner = table.getScanner(scan); |
| 106 | + Result result = null; |
| 107 | + int cnt = 0; |
| 108 | + while ((result = scanner.next()) != null) { |
| 109 | + // 从Result中解析数据,进行处理 |
| 110 | + cnt++; |
| 111 | + lastRow = result.getRow(); |
| 112 | + } |
| 113 | + |
| 114 | + scanner.close(); |
| 115 | + if (cnt == 0) { |
| 116 | + break; |
| 117 | + } |
| 118 | +} |
| 119 | +``` |
| 120 | + |
| 121 | +## Filter |
| 122 | + |
| 123 | +上述只是用了PageFilter实现分页,如果需要根据列的各种条件进行查询,就需要用到FilterList,或者自己实现Filter。 |
| 124 | + |
| 125 | + |
| 126 | + |
| 127 | + |
| 128 | + |
| 129 | + |
| 130 | + |
| 131 | + |
| 132 | + |
| 133 | + |
0 commit comments