Skip to content

Commit 2963512

Browse files
committed
HBASE-29699 Scan#setLimit ignored in MapReduce jobs
1 parent f800a13 commit 2963512

File tree

6 files changed

+124
-8
lines changed

6 files changed

+124
-8
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,6 +1099,9 @@ public static ClientProtos.Scan toScan(final Scan scan) throws IOException {
10991099
scanBuilder.setNeedCursorResult(true);
11001100
}
11011101
scanBuilder.setQueryMetricsEnabled(scan.isQueryMetricsEnabled());
1102+
if (scan.getLimit() > 0) {
1103+
scanBuilder.setLimit(scan.getLimit());
1104+
}
11021105
return scanBuilder.build();
11031106
}
11041107

@@ -1204,6 +1207,9 @@ public static Scan toScan(final ClientProtos.Scan proto) throws IOException {
12041207
scan.setNeedCursorResult(true);
12051208
}
12061209
scan.setQueryMetricsEnabled(proto.getQueryMetricsEnabled());
1210+
if (proto.hasLimit()) {
1211+
scan.setLimit(proto.getLimit());
1212+
}
12071213
return scan;
12081214
}
12091215

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,10 @@ public ClientSideRegionScanner getScanner() {
257257
public void initialize(InputSplit split, Configuration conf) throws IOException {
258258
this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
259259
this.split = split;
260-
this.rowLimitPerSplit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0);
260+
int confLimit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0);
261+
int scanLimit = Math.max(scan.getLimit(), 0);
262+
this.rowLimitPerSplit =
263+
confLimit == 0 ? scanLimit : scanLimit == 0 ? confLimit : Math.min(confLimit, scanLimit);
261264
TableDescriptor htd = split.htd;
262265
RegionInfo hri = this.split.getRegionInfo();
263266
FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,12 @@ static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expected
151151
* Create table data and run tests on specified htable using the o.a.h.hbase.mapreduce API.
152152
*/
153153
static void runTestMapreduce(Table table) throws IOException, InterruptedException {
154+
runTestMapreduce(table, new Scan());
155+
}
156+
157+
static void runTestMapreduce(Table table, Scan s) throws IOException, InterruptedException {
154158
org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr =
155159
new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl();
156-
Scan s = new Scan();
157160
s.withStartRow(Bytes.toBytes("aaa"));
158161
s.withStopRow(Bytes.toBytes("zzz"));
159162
s.addFamily(FAMILY);
@@ -171,7 +174,10 @@ static void runTestMapreduce(Table table) throws IOException, InterruptedExcepti
171174
checkResult(r, key, Bytes.toBytes("aaa"), Bytes.toBytes("value aaa"));
172175

173176
more = trr.nextKeyValue();
174-
assertTrue(more);
177+
if (s.getLimit() == 1) {
178+
assertFalse(more);
179+
return;
180+
}
175181
key = trr.getCurrentKey();
176182
r = trr.getCurrentValue();
177183
checkResult(r, key, Bytes.toBytes("bbb"), Bytes.toBytes("value bbb"));
@@ -254,6 +260,20 @@ public void testTableRecordReaderMapreduce() throws IOException, InterruptedExce
254260
runTestMapreduce(table);
255261
}
256262

263+
/**
264+
* Run test assuming no errors using newer mapreduce api with a Scan object with limit set
265+
*/
266+
@Test
267+
public void testTableRecordReaderMapreduceLimit1() throws IOException, InterruptedException {
268+
Table table = createTable(Bytes.toBytes("table1-mr-limit-1"));
269+
Scan scan = new Scan();
270+
scan.setLimit(1);
271+
272+
// Serialize and deserialize the Scan to mimic actual usage.
273+
runTestMapreduce(table,
274+
TableMapReduceUtil.convertStringToScan(TableMapReduceUtil.convertScanToString(scan)));
275+
}
276+
257277
/**
258278
* Run test assuming Scanner IOException failure using newer mapreduce api
259279
*/

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,15 @@
2424

2525
import java.io.Closeable;
2626
import java.io.File;
27+
import java.io.IOException;
2728
import java.net.URI;
2829
import java.util.Collection;
2930
import org.apache.hadoop.conf.Configuration;
3031
import org.apache.hadoop.hbase.HBaseClassTestRule;
3132
import org.apache.hadoop.hbase.HBaseTestingUtil;
33+
import org.apache.hadoop.hbase.client.Consistency;
3234
import org.apache.hadoop.hbase.client.Scan;
35+
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
3336
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
3437
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
3538
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
@@ -295,4 +298,67 @@ public void testInitCredentialsForClusterUri() throws Exception {
295298
kdc.stop();
296299
}
297300
}
301+
302+
@Test
303+
public void testScanSerialization() throws IOException {
304+
final byte[] cf = "cf".getBytes();
305+
final Scan scan = new Scan();
306+
scan.setLimit(1);
307+
scan.setBatch(1);
308+
scan.setMaxResultSize(1);
309+
scan.setAllowPartialResults(true);
310+
scan.setLoadColumnFamiliesOnDemand(true);
311+
scan.readVersions(1);
312+
scan.setColumnFamilyTimeRange(cf, 0, 1);
313+
scan.setTimeRange(0, 1);
314+
scan.setAttribute("cf", cf);
315+
scan.withStartRow("0".getBytes(), false);
316+
scan.withStopRow("1".getBytes(), true);
317+
scan.setFilter(new KeyOnlyFilter());
318+
scan.addColumn(cf, cf);
319+
scan.setMaxResultsPerColumnFamily(1);
320+
scan.setRowOffsetPerColumnFamily(1);
321+
scan.setReversed(true);
322+
scan.setConsistency(Consistency.TIMELINE);
323+
scan.setCaching(1);
324+
scan.setReadType(Scan.ReadType.STREAM);
325+
scan.setNeedCursorResult(true);
326+
scan.setQueryMetricsEnabled(true);
327+
328+
final String serialized = TableMapReduceUtil.convertScanToString(scan);
329+
final Scan deserialized = TableMapReduceUtil.convertStringToScan(serialized);
330+
final String reserialized = TableMapReduceUtil.convertScanToString(deserialized);
331+
332+
// Verify that serialization is symmetric
333+
assertEquals(serialized, reserialized);
334+
335+
// Verify individual fields to catch potential omissions
336+
assertEquals(scan.getLimit(), deserialized.getLimit());
337+
assertEquals(scan.getBatch(), deserialized.getBatch());
338+
assertEquals(scan.getMaxResultSize(), deserialized.getMaxResultSize());
339+
assertEquals(scan.getAllowPartialResults(), deserialized.getAllowPartialResults());
340+
assertEquals(scan.getLoadColumnFamiliesOnDemandValue(),
341+
deserialized.getLoadColumnFamiliesOnDemandValue());
342+
assertEquals(scan.getMaxVersions(), deserialized.getMaxVersions());
343+
assertEquals(scan.getColumnFamilyTimeRange().get(cf).toString(),
344+
deserialized.getColumnFamilyTimeRange().get(cf).toString());
345+
assertEquals(scan.getTimeRange().toString(), deserialized.getTimeRange().toString());
346+
assertEquals(Bytes.toString(scan.getAttribute("cf")),
347+
Bytes.toString(deserialized.getAttribute("cf")));
348+
assertEquals(0, Bytes.compareTo(scan.getStartRow(), deserialized.getStartRow()));
349+
assertEquals(scan.includeStartRow(), deserialized.includeStartRow());
350+
assertEquals(0, Bytes.compareTo(scan.getStopRow(), deserialized.getStopRow()));
351+
assertEquals(scan.includeStopRow(), deserialized.includeStopRow());
352+
assertEquals(scan.getFilter().getClass().getName(),
353+
deserialized.getFilter().getClass().getName());
354+
assertEquals(scan.getFamilyMap().size(), deserialized.getFamilyMap().size());
355+
assertEquals(scan.getMaxResultsPerColumnFamily(), deserialized.getMaxResultsPerColumnFamily());
356+
assertEquals(scan.getRowOffsetPerColumnFamily(), deserialized.getRowOffsetPerColumnFamily());
357+
assertEquals(scan.isReversed(), deserialized.isReversed());
358+
assertEquals(scan.getConsistency(), deserialized.getConsistency());
359+
assertEquals(scan.getCaching(), deserialized.getCaching());
360+
assertEquals(scan.getReadType(), deserialized.getReadType());
361+
assertEquals(scan.isNeedCursorResult(), deserialized.isNeedCursorResult());
362+
assertEquals(scan.isQueryMetricsEnabled(), deserialized.isQueryMetricsEnabled());
363+
}
298364
}

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -304,13 +304,14 @@ public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception {
304304
}
305305
}
306306

307-
@Test
308-
public void testScanLimit() throws Exception {
307+
private void testScanLimit(int confLimit, Scan scan, int expectedLimit) throws Exception {
309308
final TableName tableName = TableName.valueOf(name.getMethodName());
310309
final String snapshotName = tableName + "Snapshot";
311310
Table table = null;
312311
try {
313-
UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10);
312+
if (confLimit > 0) {
313+
UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, confLimit);
314+
}
314315
if (UTIL.getAdmin().tableExists(tableName)) {
315316
UTIL.deleteTable(tableName);
316317
}
@@ -332,15 +333,14 @@ public void testScanLimit() throws Exception {
332333

333334
Job job = new Job(UTIL.getConfiguration());
334335
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
335-
Scan scan = new Scan();
336336
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
337337
TestTableSnapshotInputFormat.class);
338338

339339
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
340340
RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true,
341341
tmpTableDir);
342342
Assert.assertTrue(job.waitForCompletion(true));
343-
Assert.assertEquals(10 * regionNum,
343+
Assert.assertEquals(expectedLimit * regionNum,
344344
job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue());
345345
} finally {
346346
if (table != null) {
@@ -352,6 +352,26 @@ public void testScanLimit() throws Exception {
352352
}
353353
}
354354

355+
@Test
356+
public void testScanLimitOnConfig() throws Exception {
357+
testScanLimit(10, new Scan(), 10);
358+
}
359+
360+
@Test
361+
public void testScanLimitOnScan() throws Exception {
362+
testScanLimit(0, new Scan().setLimit(10), 10);
363+
}
364+
365+
@Test
366+
public void testScanLimitOnBothButScanWins() throws Exception {
367+
testScanLimit(10, new Scan().setLimit(5), 5);
368+
}
369+
370+
@Test
371+
public void testScanLimitOnBothButConfigWins() throws Exception {
372+
testScanLimit(5, new Scan().setLimit(10), 5);
373+
}
374+
355375
@Test
356376
public void testNoDuplicateResultsWhenSplitting() throws Exception {
357377
TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");

hbase-protocol-shaded/src/main/protobuf/client/Client.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ message Scan {
281281
optional ReadType readType = 23 [default = DEFAULT];
282282
optional bool need_cursor_result = 24 [default = false];
283283
optional bool query_metrics_enabled = 25 [default = false];
284+
optional uint32 limit = 26;
284285
}
285286

286287
/**

0 commit comments

Comments
 (0)