Skip to content

Commit 01ff3a3

Browse files
SongChujunpettyjamesm
authored andcommitted
Add simd support detection logic
1 parent c186d84 commit 01ff3a3

File tree

6 files changed

+239
-1
lines changed

6 files changed

+239
-1
lines changed

core/trino-main/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,4 +556,29 @@
556556
<scope>test</scope>
557557
</dependency>
558558
</dependencies>
559+
560+
<build>
561+
<pluginManagement>
562+
<plugins>
563+
<plugin>
564+
<groupId>org.apache.maven.plugins</groupId>
565+
<artifactId>maven-compiler-plugin</artifactId>
566+
<configuration>
567+
<!-- Ensure incubator Vector API is on the module path for javac -->
568+
<compilerArgs combine.self="merge">
569+
<arg>${extraJavaVectorArgs}</arg>
570+
</compilerArgs>
571+
</configuration>
572+
</plugin>
573+
<plugin>
574+
<groupId>org.apache.maven.plugins</groupId>
575+
<artifactId>maven-javadoc-plugin</artifactId>
576+
<configuration>
577+
<!-- Ensure javadoc resolves incubator Vector API -->
578+
<additionalOptions combine.self="merge">${extraJavaVectorArgs}</additionalOptions>
579+
</configuration>
580+
</plugin>
581+
</plugins>
582+
</pluginManagement>
583+
</build>
559584
</project>

core/trino-main/src/main/java/io/trino/FeaturesConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public class FeaturesConfig
9494
* default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}}
9595
*/
9696
private CompressionCodec exchangeCompressionCodec = NONE;
97+
private boolean exchangeVectorizedSerdeEnabled = true;
9798
private boolean pagesIndexEagerCompactionEnabled;
9899
private boolean omitDateTimeTypePrecision;
99100
private int maxRecursionDepth = 10;
@@ -366,6 +367,19 @@ public FeaturesConfig setExchangeCompressionCodec(CompressionCodec exchangeCompr
366367
return this;
367368
}
368369

370+
@Config("exchange.experimental.vectorized-serde.enabled")
371+
@ConfigDescription("Enable using Java Vector API for faster serialization and deserialization of exchange data")
372+
public FeaturesConfig setExchangeVectorizedSerdeEnabled(boolean exchangeVectorizedSerdeEnabled)
373+
{
374+
this.exchangeVectorizedSerdeEnabled = exchangeVectorizedSerdeEnabled;
375+
return this;
376+
}
377+
378+
public boolean isExchangeVectorizedSerdeEnabled()
379+
{
380+
return exchangeVectorizedSerdeEnabled;
381+
}
382+
369383
public DataIntegrityVerification getExchangeDataIntegrityVerification()
370384
{
371385
return exchangeDataIntegrityVerification;

core/trino-main/src/main/java/io/trino/server/ServerMainModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
import io.trino.server.protocol.PreparedStatementEncoder;
101101
import io.trino.server.protocol.spooling.SpoolingServerModule;
102102
import io.trino.server.remotetask.HttpLocationFactory;
103+
import io.trino.simd.BlockEncodingSimdSupport;
103104
import io.trino.spi.PageIndexerFactory;
104105
import io.trino.spi.PageSorter;
105106
import io.trino.spi.VersionEmbedder;
@@ -431,6 +432,9 @@ protected void setup(Binder binder)
431432
.to(ServerPluginsProvider.class).in(Scopes.SINGLETON);
432433
configBinder(binder).bindConfig(ServerPluginsProviderConfig.class);
433434

435+
// SIMD support
436+
binder.bind(BlockEncodingSimdSupport.class).in(Scopes.SINGLETON);
437+
434438
// block encodings
435439
binder.bind(BlockEncodingManager.class).in(Scopes.SINGLETON);
436440
jsonBinder(binder).addSerializerBinding(Block.class).to(BlockJsonSerde.Serializer.class);
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.simd;
15+
16+
import com.google.inject.Inject;
17+
import com.google.inject.Singleton;
18+
import io.trino.FeaturesConfig;
19+
import io.trino.util.MachineInfo;
20+
import jdk.incubator.vector.VectorShape;
21+
import oshi.hardware.CentralProcessor.ProcessorIdentifier;
22+
23+
import java.util.EnumSet;
24+
import java.util.Set;
25+
26+
import static io.trino.util.MachineInfo.readCpuFlags;
27+
import static java.util.Locale.ENGLISH;
28+
29+
/*
30+
We need to specifically detect AVX512F (for VPCOMPRESSD / VPCOMPRESSQ instruction support for int and long types) and
31+
AVX512VBMI2 (for VPCOMPRESSB / VPCOMPRESSW instruction support over byte and short types). Because we would like to check
32+
whether Vector<T>#compress(VectorMask<T>) is supported natively in hardware or emulated by the JVM - because the emulated
33+
support is so much slower than the simple scalar code that exists, but since we don't have the ability to detect that
34+
directly from the JDK vector API we have to assume that native support exists whenever the CPU advertises it.
35+
*/
36+
@Singleton
37+
public final class BlockEncodingSimdSupport
38+
{
39+
public record SimdSupport(
40+
boolean expandAndCompressByte,
41+
boolean expandAndCompressShort,
42+
boolean expandAndCompressInt,
43+
boolean expandAndCompressLong)
44+
{
45+
public static final SimdSupport NONE = new SimdSupport(false, false, false, false);
46+
}
47+
48+
private static final int MINIMUM_SIMD_LENGTH = 256;
49+
private static final SimdSupport AUTO_DETECTED_SUPPORT = detectSimd();
50+
private static final int PREFERRED_BIT_WIDTH = VectorShape.preferredShape().vectorBitSize();
51+
52+
private final SimdSupport simdSupport;
53+
54+
@Inject
55+
public BlockEncodingSimdSupport(FeaturesConfig featuresConfig)
56+
{
57+
this(featuresConfig.isExchangeVectorizedSerdeEnabled());
58+
}
59+
60+
public BlockEncodingSimdSupport(boolean enableAutoDetectedSimdSupport)
61+
{
62+
if (enableAutoDetectedSimdSupport) {
63+
simdSupport = AUTO_DETECTED_SUPPORT;
64+
}
65+
else {
66+
simdSupport = SimdSupport.NONE;
67+
}
68+
}
69+
70+
private static SimdSupport detectSimd()
71+
{
72+
ProcessorIdentifier id = MachineInfo.getProcessorInfo();
73+
74+
String vendor = id.getVendor().toLowerCase(ENGLISH);
75+
76+
if (vendor.contains("intel") || vendor.contains("amd")) {
77+
return detectX86SimdSupport();
78+
}
79+
80+
return SimdSupport.NONE;
81+
}
82+
83+
private static SimdSupport detectX86SimdSupport()
84+
{
85+
enum X86SimdInstructionSet {
86+
avx512f,
87+
avx512vbmi2
88+
}
89+
90+
Set<String> flags = readCpuFlags();
91+
EnumSet<X86SimdInstructionSet> x86Flags = EnumSet.noneOf(X86SimdInstructionSet.class);
92+
93+
if (!flags.isEmpty()) {
94+
for (X86SimdInstructionSet instructionSet : X86SimdInstructionSet.values()) {
95+
if (flags.contains(instructionSet.name())) {
96+
x86Flags.add(instructionSet);
97+
}
98+
}
99+
}
100+
101+
if (PREFERRED_BIT_WIDTH < MINIMUM_SIMD_LENGTH) {
102+
return SimdSupport.NONE;
103+
}
104+
else {
105+
return new SimdSupport(
106+
x86Flags.contains(X86SimdInstructionSet.avx512vbmi2),
107+
x86Flags.contains(X86SimdInstructionSet.avx512vbmi2),
108+
x86Flags.contains(X86SimdInstructionSet.avx512f),
109+
x86Flags.contains(X86SimdInstructionSet.avx512f));
110+
}
111+
}
112+
113+
public SimdSupport getSimdSupport()
114+
{
115+
return simdSupport;
116+
}
117+
}

core/trino-main/src/main/java/io/trino/util/MachineInfo.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,25 @@
1414
package io.trino.util;
1515

1616
import com.google.common.base.StandardSystemProperty;
17+
import com.google.common.collect.ImmutableSet;
1718
import oshi.SystemInfo;
19+
import oshi.hardware.CentralProcessor;
20+
import oshi.hardware.CentralProcessor.ProcessorIdentifier;
1821

22+
import java.util.Arrays;
23+
import java.util.HashSet;
24+
import java.util.List;
25+
import java.util.Set;
26+
27+
import static com.google.common.collect.ImmutableSet.toImmutableSet;
1928
import static java.lang.Math.min;
29+
import static java.util.Locale.ENGLISH;
2030

2131
public final class MachineInfo
2232
{
2333
// cache physical processor count, so that it's not queried multiple times during tests
2434
private static volatile int physicalProcessorCount = -1;
35+
private static final SystemInfo SYSTEM_INFO = new SystemInfo();
2536

2637
private MachineInfo() {}
2738

@@ -38,7 +49,7 @@ public static int getAvailablePhysicalProcessorCount()
3849
if ("amd64".equals(osArch) || "x86_64".equals(osArch)) {
3950
// Oshi can recognize physical processor count (without hyper threading) for x86 platforms.
4051
// However, it doesn't correctly recognize physical processor count for ARM platforms.
41-
totalPhysicalProcessorCount = new SystemInfo()
52+
totalPhysicalProcessorCount = SYSTEM_INFO
4253
.getHardware()
4354
.getProcessor()
4455
.getPhysicalProcessorCount();
@@ -52,4 +63,68 @@ public static int getAvailablePhysicalProcessorCount()
5263
physicalProcessorCount = min(totalPhysicalProcessorCount, availableProcessorCount);
5364
return physicalProcessorCount;
5465
}
66+
67+
public static ProcessorIdentifier getProcessorInfo()
68+
{
69+
return SYSTEM_INFO.getHardware().getProcessor().getProcessorIdentifier();
70+
}
71+
72+
public static Set<String> readCpuFlags()
73+
{
74+
CentralProcessor cpu = SYSTEM_INFO.getHardware().getProcessor();
75+
List<String> flags = cpu.getFeatureFlags();
76+
if (flags == null || flags.isEmpty()) {
77+
return ImmutableSet.of();
78+
}
79+
// Each element of flags represents the hardware support for an individual core, so we're want to calculate flags
80+
// advertised by all cores
81+
Set<String> intersection = null;
82+
83+
for (String line : flags) {
84+
if (line == null || line.isBlank()) {
85+
continue;
86+
}
87+
88+
// Strip the "flags:" / "Features:" prefix if present.
89+
String body = line;
90+
int colon = line.indexOf(':');
91+
if (colon >= 0) {
92+
body = line.substring(colon + 1);
93+
}
94+
95+
// Tokenize + normalize.
96+
Set<String> tokens = Arrays.stream(body.trim().split("\\s+"))
97+
.map(token -> normalizeFlag(token))
98+
.filter(token -> !token.isEmpty())
99+
.collect(toImmutableSet());
100+
101+
if (tokens.isEmpty()) {
102+
continue;
103+
}
104+
105+
if (intersection == null) {
106+
intersection = new HashSet<>(tokens);
107+
}
108+
else {
109+
intersection.retainAll(tokens);
110+
if (intersection.isEmpty()) {
111+
break; // nothing in common
112+
}
113+
}
114+
}
115+
116+
return intersection == null ? ImmutableSet.of() : intersection;
117+
}
118+
119+
public static String normalizeFlag(String flag)
120+
{
121+
flag = flag.toLowerCase(ENGLISH).replace("_", "").trim();
122+
123+
// Skip stray keys that may sneak in if the colon wasn’t found.
124+
if (flag.equals("flags") || flag.equals("features")) {
125+
return "";
126+
}
127+
128+
return flag;
129+
}
55130
}

core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public void testDefaults()
5454
.setMemoryRevokingThreshold(0.9)
5555
.setMemoryRevokingTarget(0.5)
5656
.setExchangeCompressionCodec(NONE)
57+
.setExchangeVectorizedSerdeEnabled(true)
5758
.setExchangeDataIntegrityVerification(DataIntegrityVerification.ABORT)
5859
.setPagesIndexEagerCompactionEnabled(false)
5960
.setFilterAndProjectMinOutputPageSize(DataSize.of(500, KILOBYTE))
@@ -89,6 +90,7 @@ public void testExplicitPropertyMappings()
8990
.put("memory-revoking-threshold", "0.2")
9091
.put("memory-revoking-target", "0.8")
9192
.put("exchange.compression-codec", "ZSTD")
93+
.put("exchange.experimental.vectorized-serde.enabled", "false")
9294
.put("exchange.data-integrity-verification", "RETRY")
9395
.put("pages-index.eager-compaction-enabled", "true")
9496
.put("filter-and-project-min-output-page-size", "1MB")
@@ -121,6 +123,7 @@ public void testExplicitPropertyMappings()
121123
.setMemoryRevokingThreshold(0.2)
122124
.setMemoryRevokingTarget(0.8)
123125
.setExchangeCompressionCodec(ZSTD)
126+
.setExchangeVectorizedSerdeEnabled(false)
124127
.setExchangeDataIntegrityVerification(DataIntegrityVerification.RETRY)
125128
.setPagesIndexEagerCompactionEnabled(true)
126129
.setFilterAndProjectMinOutputPageSize(DataSize.of(1, MEGABYTE))

0 commit comments

Comments
 (0)