Skip to content

Commit 979cfc6

Browse files
SongChujunpettyjamesm
authored andcommitted
Add vectorized null-suppression for block serde
1 parent 01ff3a3 commit 979cfc6

File tree

16 files changed

+500
-59
lines changed

16 files changed

+500
-59
lines changed

core/trino-main/src/main/java/io/trino/metadata/BlockEncodingManager.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
*/
1414
package io.trino.metadata;
1515

16+
import com.google.inject.Inject;
17+
import io.trino.simd.BlockEncodingSimdSupport;
18+
import io.trino.simd.BlockEncodingSimdSupport.SimdSupport;
1619
import io.trino.spi.block.ArrayBlockEncoding;
1720
import io.trino.spi.block.Block;
1821
import io.trino.spi.block.BlockEncoding;
@@ -41,14 +44,16 @@ public final class BlockEncodingManager
4144
// for serialization
4245
private final Map<Class<? extends Block>, BlockEncoding> blockEncodingNamesByClass = new ConcurrentHashMap<>();
4346

44-
public BlockEncodingManager()
47+
@Inject
48+
public BlockEncodingManager(BlockEncodingSimdSupport blockEncodingSimdSupport)
4549
{
4650
// add the built-in BlockEncodings
51+
SimdSupport simdSupport = blockEncodingSimdSupport.getSimdSupport();
4752
addBlockEncoding(new VariableWidthBlockEncoding());
48-
addBlockEncoding(new ByteArrayBlockEncoding());
49-
addBlockEncoding(new ShortArrayBlockEncoding());
50-
addBlockEncoding(new IntArrayBlockEncoding());
51-
addBlockEncoding(new LongArrayBlockEncoding());
53+
addBlockEncoding(new ByteArrayBlockEncoding(simdSupport.expandAndCompressByte()));
54+
addBlockEncoding(new ShortArrayBlockEncoding(simdSupport.expandAndCompressShort()));
55+
addBlockEncoding(new IntArrayBlockEncoding(simdSupport.expandAndCompressInt()));
56+
addBlockEncoding(new LongArrayBlockEncoding(simdSupport.expandAndCompressLong()));
5257
addBlockEncoding(new Fixed12BlockEncoding());
5358
addBlockEncoding(new Int128ArrayBlockEncoding());
5459
addBlockEncoding(new DictionaryBlockEncoding());

core/trino-main/src/main/java/io/trino/testing/PlanTester.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@
140140
import io.trino.server.security.HeaderAuthenticatorManager;
141141
import io.trino.server.security.PasswordAuthenticatorConfig;
142142
import io.trino.server.security.PasswordAuthenticatorManager;
143+
import io.trino.simd.BlockEncodingSimdSupport;
143144
import io.trino.spi.PageIndexerFactory;
144145
import io.trino.spi.PageSorter;
145146
import io.trino.spi.Plugin;
@@ -273,6 +274,8 @@
273274
public class PlanTester
274275
implements Closeable
275276
{
277+
public static final BlockEncodingManager TESTING_BLOCK_ENCODING_MANAGER = new BlockEncodingManager(new BlockEncodingSimdSupport(true));
278+
276279
private final Session defaultSession;
277280
private final ExecutorService notificationExecutor;
278281
private final ScheduledExecutorService yieldExecutor;
@@ -359,10 +362,9 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
359362
catalogManager,
360363
notificationExecutor);
361364

362-
BlockEncodingManager blockEncodingManager = new BlockEncodingManager();
363365
TypeRegistry typeRegistry = new TypeRegistry(typeOperators, new FeaturesConfig());
364366
TypeManager typeManager = new InternalTypeManager(typeRegistry);
365-
InternalBlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(blockEncodingManager, typeManager);
367+
InternalBlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, typeManager);
366368
SecretsResolver secretsResolver = new SecretsResolver(ImmutableMap.of());
367369

368370
this.globalFunctionCatalog = new GlobalFunctionCatalog(
@@ -497,7 +499,7 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
497499
new GroupProviderManager(secretsResolver),
498500
new SessionPropertyDefaults(nodeInfo, accessControl, secretsResolver),
499501
typeRegistry,
500-
blockEncodingManager,
502+
TESTING_BLOCK_ENCODING_MANAGER,
501503
new HandleResolver(),
502504
exchangeManagerRegistry,
503505
spoolingManagerRegistry);

core/trino-main/src/test/java/io/trino/execution/buffer/TestPagesSerde.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.airlift.slice.SliceInput;
2020
import io.airlift.slice.SliceOutput;
2121
import io.airlift.slice.Slices;
22-
import io.trino.metadata.BlockEncodingManager;
2322
import io.trino.metadata.InternalBlockEncodingSerde;
2423
import io.trino.spi.Page;
2524
import io.trino.spi.PageBuilder;
@@ -51,6 +50,7 @@
5150
import static io.trino.spi.type.BigintType.BIGINT;
5251
import static io.trino.spi.type.DoubleType.DOUBLE;
5352
import static io.trino.spi.type.VarcharType.VARCHAR;
53+
import static io.trino.testing.PlanTester.TESTING_BLOCK_ENCODING_MANAGER;
5454
import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
5555
import static io.trino.util.Ciphers.createRandomAesEncryptionKey;
5656
import static org.assertj.core.api.Assertions.assertThat;
@@ -66,7 +66,7 @@ public class TestPagesSerde
6666
@BeforeAll
6767
public void setup()
6868
{
69-
blockEncodingSerde = new InternalBlockEncodingSerde(new BlockEncodingManager(), TESTING_TYPE_MANAGER);
69+
blockEncodingSerde = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, TESTING_TYPE_MANAGER);
7070
}
7171

7272
@AfterAll

core/trino-main/src/test/java/io/trino/execution/buffer/TestingPagesSerdes.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,17 @@
1313
*/
1414
package io.trino.execution.buffer;
1515

16-
import io.trino.metadata.BlockEncodingManager;
1716
import io.trino.metadata.InternalBlockEncodingSerde;
1817

1918
import static io.trino.execution.buffer.CompressionCodec.NONE;
19+
import static io.trino.testing.PlanTester.TESTING_BLOCK_ENCODING_MANAGER;
2020
import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
2121

2222
public final class TestingPagesSerdes
2323
{
2424
private TestingPagesSerdes() {}
2525

26-
private static final InternalBlockEncodingSerde BLOCK_ENCODING_SERDE = new InternalBlockEncodingSerde(new BlockEncodingManager(), TESTING_TYPE_MANAGER);
26+
private static final InternalBlockEncodingSerde BLOCK_ENCODING_SERDE = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, TESTING_TYPE_MANAGER);
2727

2828
public static PagesSerdeFactory createTestingPagesSerdeFactory()
2929
{

core/trino-main/src/test/java/io/trino/metadata/TestMetadataManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import static io.trino.client.NodeVersion.UNKNOWN;
3131
import static io.trino.metadata.CatalogManager.NO_CATALOGS;
32+
import static io.trino.testing.PlanTester.TESTING_BLOCK_ENCODING_MANAGER;
3233
import static io.trino.transaction.InMemoryTransactionManager.createTestTransactionManager;
3334
import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
3435
import static java.util.Objects.requireNonNull;
@@ -98,7 +99,7 @@ public MetadataManager build()
9899
}
99100

100101
if (languageFunctionManager == null) {
101-
BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(new BlockEncodingManager(), typeManager);
102+
BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, typeManager);
102103
LanguageFunctionEngineManager engineManager = new LanguageFunctionEngineManager();
103104
languageFunctionManager = new LanguageFunctionManager(new SqlParser(), typeManager, _ -> ImmutableSet.of(), blockEncodingSerde, engineManager);
104105
}

core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import io.trino.server.FailTaskRequest;
6262
import io.trino.server.HttpRemoteTaskFactory;
6363
import io.trino.server.TaskUpdateRequest;
64+
import io.trino.simd.BlockEncodingSimdSupport;
6465
import io.trino.spi.ErrorCode;
6566
import io.trino.spi.QueryId;
6667
import io.trino.spi.block.Block;
@@ -676,6 +677,7 @@ public void configure(Binder binder)
676677
jsonCodecBinder(binder).bindJsonCodec(TaskUpdateRequest.class);
677678
jsonCodecBinder(binder).bindJsonCodec(FailTaskRequest.class);
678679

680+
binder.bind(BlockEncodingSimdSupport.class).toInstance(new BlockEncodingSimdSupport(true));
679681
binder.bind(TypeManager.class).toInstance(TESTING_TYPE_MANAGER);
680682
binder.bind(BlockEncodingManager.class).in(SINGLETON);
681683
binder.bind(BlockEncodingSerde.class).to(InternalBlockEncodingSerde.class).in(SINGLETON);

core/trino-main/src/test/java/io/trino/sql/planner/TestingPlannerContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import com.google.common.collect.ImmutableSet;
1717
import io.trino.FeaturesConfig;
1818
import io.trino.connector.CatalogServiceProvider;
19-
import io.trino.metadata.BlockEncodingManager;
2019
import io.trino.metadata.FunctionBundle;
2120
import io.trino.metadata.FunctionManager;
2221
import io.trino.metadata.GlobalFunctionCatalog;
@@ -51,6 +50,7 @@
5150
import static com.google.common.base.Preconditions.checkState;
5251
import static io.airlift.tracing.Tracing.noopTracer;
5352
import static io.trino.client.NodeVersion.UNKNOWN;
53+
import static io.trino.testing.PlanTester.TESTING_BLOCK_ENCODING_MANAGER;
5454
import static java.util.Objects.requireNonNull;
5555

5656
public final class TestingPlannerContext
@@ -125,7 +125,7 @@ public PlannerContext build()
125125
globalFunctionCatalog.addFunctions(SystemFunctionBundle.create(featuresConfig, typeOperators, new BlockTypeOperators(typeOperators), UNKNOWN));
126126
functionBundles.forEach(globalFunctionCatalog::addFunctions);
127127

128-
BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(new BlockEncodingManager(), typeManager);
128+
BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, typeManager);
129129

130130
LanguageFunctionManager languageFunctionManager = new LanguageFunctionManager(
131131
new SqlParser(),

core/trino-spi/pom.xml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,34 @@
263263
<old>method void io.trino.spi.eventlistener.QueryStatistics::&lt;init&gt;(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List&lt;io.trino.spi.eventlistener.StageGcStatistics&gt;, int, boolean, java.util.List&lt;io.trino.spi.eventlistener.StageCpuDistribution&gt;, java.util.List&lt;io.trino.spi.eventlistener.StageOutputBufferUtilization&gt;, java.util.List&lt;io.trino.spi.eventlistener.StageOutputBufferMetrics&gt;, java.util.List&lt;io.trino.spi.eventlistener.StageTaskStatistics&gt;, java.util.List&lt;io.trino.spi.eventlistener.DynamicFilterDomainStatistics&gt;, java.util.function.Supplier&lt;java.util.List&lt;java.lang.String&gt;&gt;, java.util.List&lt;io.trino.spi.eventlistener.QueryPlanOptimizerStatistics&gt;, java.util.Map&lt;java.lang.String, io.trino.spi.metrics.Metrics&gt;, java.util.Optional&lt;java.lang.String&gt;)</old>
264264
<new>method void io.trino.spi.eventlistener.QueryStatistics::&lt;init&gt;(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List&lt;io.trino.spi.eventlistener.StageGcStatistics&gt;, int, boolean, java.util.List&lt;io.trino.spi.eventlistener.StageCpuDistribution&gt;, java.util.List&lt;io.trino.spi.eventlistener.StageOutputBufferUtilization&gt;, java.util.List&lt;io.trino.spi.eventlistener.StageOutputBufferMetrics&gt;, java.util.List&lt;io.trino.spi.eventlistener.StageTaskStatistics&gt;, java.util.List&lt;io.trino.spi.eventlistener.DynamicFilterDomainStatistics&gt;, java.util.function.Supplier&lt;java.util.List&lt;java.lang.String&gt;&gt;, java.util.List&lt;io.trino.spi.eventlistener.QueryPlanOptimizerStatistics&gt;, java.util.Map&lt;java.lang.String, io.trino.spi.metrics.Metrics&gt;, java.util.Optional&lt;java.lang.String&gt;)</new>
265265
</item>
266+
<item>
267+
<ignore>true</ignore>
268+
<code>java.method.numberOfParametersChanged</code>
269+
<old>method void io.trino.spi.block.ByteArrayBlockEncoding::&lt;init&gt;()</old>
270+
<new>method void io.trino.spi.block.ByteArrayBlockEncoding::&lt;init&gt;(boolean)</new>
271+
<justification>ByteArrayBlockEncoding need to accept a parameter to enable SIMD support</justification>
272+
</item>
273+
<item>
274+
<ignore>true</ignore>
275+
<code>java.method.numberOfParametersChanged</code>
276+
<old>method void io.trino.spi.block.IntArrayBlockEncoding::&lt;init&gt;()</old>
277+
<new>method void io.trino.spi.block.IntArrayBlockEncoding::&lt;init&gt;(boolean)</new>
278+
<justification>IntArrayBlockEncoding need to accept a parameter to enable SIMD support</justification>
279+
</item>
280+
<item>
281+
<ignore>true</ignore>
282+
<code>java.method.numberOfParametersChanged</code>
283+
<old>method void io.trino.spi.block.LongArrayBlockEncoding::&lt;init&gt;()</old>
284+
<new>method void io.trino.spi.block.LongArrayBlockEncoding::&lt;init&gt;(boolean)</new>
285+
<justification>LongArrayBlockEncoding need to accept a parameter to enable SIMD support</justification>
286+
</item>
287+
<item>
288+
<ignore>true</ignore>
289+
<code>java.method.numberOfParametersChanged</code>
290+
<old>method void io.trino.spi.block.ShortArrayBlockEncoding::&lt;init&gt;()</old>
291+
<new>method void io.trino.spi.block.ShortArrayBlockEncoding::&lt;init&gt;(boolean)</new>
292+
<justification>ShortArrayBlockEncoding need to accept a parameter to enable SIMD support</justification>
293+
</item>
266294
</differences>
267295
</revapi.differences>
268296
</analysisConfiguration>

core/trino-spi/src/main/java/io/trino/spi/block/ByteArrayBlockEncoding.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,25 @@
1818
import io.airlift.slice.Slices;
1919
import jakarta.annotation.Nullable;
2020

21+
import static io.trino.spi.block.EncoderUtil.compactBytesWithNullsScalar;
22+
import static io.trino.spi.block.EncoderUtil.compactBytesWithNullsVectorized;
2123
import static io.trino.spi.block.EncoderUtil.decodeNullBits;
2224
import static io.trino.spi.block.EncoderUtil.encodeNullsAsBits;
2325
import static io.trino.spi.block.EncoderUtil.retrieveNullBits;
2426
import static java.lang.System.arraycopy;
25-
import static java.util.Objects.checkFromIndexSize;
2627

2728
public class ByteArrayBlockEncoding
2829
implements BlockEncoding
2930
{
3031
public static final String NAME = "BYTE_ARRAY";
3132

33+
private final boolean enableVectorizedNullSuppression;
34+
35+
public ByteArrayBlockEncoding(boolean enableVectorizedNullSuppression)
36+
{
37+
this.enableVectorizedNullSuppression = enableVectorizedNullSuppression;
38+
}
39+
3240
@Override
3341
public String getName()
3442
{
@@ -52,23 +60,19 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
5260
@Nullable
5361
boolean[] isNull = byteArrayBlock.getRawValueIsNull();
5462
byte[] rawValues = byteArrayBlock.getRawValues();
55-
checkFromIndexSize(rawOffset, positionCount, rawValues.length);
5663

5764
encodeNullsAsBits(sliceOutput, isNull, rawOffset, positionCount);
5865

5966
if (isNull == null) {
6067
sliceOutput.writeBytes(rawValues, rawOffset, positionCount);
6168
}
6269
else {
63-
byte[] valuesWithoutNull = new byte[positionCount];
64-
int nonNullPositionCount = 0;
65-
for (int i = 0; i < positionCount; i++) {
66-
valuesWithoutNull[nonNullPositionCount] = rawValues[i + rawOffset];
67-
nonNullPositionCount += isNull[i + rawOffset] ? 0 : 1;
70+
if (enableVectorizedNullSuppression) {
71+
compactBytesWithNullsVectorized(sliceOutput, rawValues, isNull, rawOffset, positionCount);
72+
}
73+
else {
74+
compactBytesWithNullsScalar(sliceOutput, rawValues, isNull, rawOffset, positionCount);
6875
}
69-
70-
sliceOutput.writeInt(nonNullPositionCount);
71-
sliceOutput.writeBytes(valuesWithoutNull, 0, nonNullPositionCount);
7276
}
7377
}
7478

0 commit comments

Comments
 (0)