Skip to content

Commit

Permalink
[HUDI-8182] Cache internalSchema for hive read, avoid each split relo…
Browse files Browse the repository at this point in the history
…ad active timeline.
  • Loading branch information
yanghao14 committed Sep 16, 2024
1 parent 0cd4291 commit 59d00b6
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
import org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
Expand All @@ -36,6 +37,7 @@
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
Expand Down Expand Up @@ -69,6 +71,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
Expand Down Expand Up @@ -98,19 +102,63 @@ public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException
public SchemaEvolutionContext(InputSplit split, JobConf job, Option<HoodieTableMetaClient> metaClientOption) throws IOException {
this.split = split;
this.job = job;
this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient();
if (this.metaClient == null) {
if (!job.getBoolean(HIVE_EVOLUTION_ENABLE, true)) {
LOG.info("schema evolution is disabled for split: {}", split);
internalSchemaOption = Option.empty();
this.metaClient = null;
return;
}
this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient();
this.internalSchemaOption = getInternalSchemaFromCache();
LOG.info("finish init schema evolution for split: {}", split);
}

public Option<InternalSchema> getInternalSchemaFromCache() throws IOException {
Option<InternalSchema> internalSchema = getCachedData(
HoodieCombineHiveInputFormat.INTERNAL_SCHEMA_CACHE_KEY_PREFIX,
json -> SerDeHelper.fromJson(json)
);
if (internalSchema != null && internalSchema.isPresent()) {
LOG.info("get internal schema from conf for split: {}" + split);
return internalSchema;
}
return Option.empty();
}

public Schema getAvroSchemaFromCache() throws IOException {
Schema avroSchema = getCachedData(
HoodieCombineHiveInputFormat.SCHEMA_CACHE_KEY_PREFIX,
json -> new Schema.Parser().parse(json)
);
return avroSchema;
}

private <T> T getCachedData(String keyPrefix, Function<String, T> parser) throws IOException {
Option<StoragePath> tablePath = getTablePath(job, split);
if (!tablePath.isPresent()) {
return null;
}
String cacheKey = keyPrefix + "." + tablePath.get().toUri();
String cachedJson = job.get(cacheKey);
if (cachedJson == null) {
return null;
}
try {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
this.internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata();
return parser.apply(cachedJson);
} catch (Exception e) {
internalSchemaOption = Option.empty();
LOG.warn(String.format("failed to get internal Schema from hudi table:%s", metaClient.getBasePath()), e);
LOG.warn(String.format("Failed to parse data from cache with key: %s", cacheKey), e);
return null;
}
LOG.info("finish init schema evolution for split: {}", split);
}

private Option<StoragePath> getTablePath(JobConf job, InputSplit split) throws IOException {
if (split instanceof FileSplit) {
Path path = ((FileSplit) split).getPath();
FileSystem fs = path.getFileSystem(job);
HoodieStorage storage = new HoodieHadoopStorage(fs);
return TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path));
}
return Option.empty();
}

private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException {
Expand Down Expand Up @@ -139,7 +187,10 @@ public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt
return;
}
if (internalSchemaOption.isPresent()) {
Schema tableAvroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
Schema tableAvroSchema = getAvroSchemaFromCache();
if (tableAvroSchema == null) {
tableAvroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
}
List<String> requiredColumns = getRequireColumn(job);
InternalSchema prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(),
requiredColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

package org.apache.hudi.hadoop.hive;

import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormatBase;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
Expand Down Expand Up @@ -64,6 +69,12 @@
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -108,6 +119,8 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
// max number of threads we can use to check non-combinable paths
private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50;
private static final int DEFAULT_NUM_PATH_PER_THREAD = 100;
public static final String INTERNAL_SCHEMA_CACHE_KEY_PREFIX = "hudi.hive.internal.schema.cache.key.prefix";
public static final String SCHEMA_CACHE_KEY_PREFIX = "hudi.hive.schema.cache.key.prefix";

protected String getParquetInputFormatClassName() {
return HoodieParquetInputFormat.class.getName();
Expand Down Expand Up @@ -375,6 +388,38 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
// clear work from ThreadLocal after splits generated in case of thread is reused in pool.
Utilities.clearWorkMapForConf(job);

// build internal schema for the query
if (result.size() > 0) {
ArrayList<String> uniqTablePaths = new ArrayList<>();
Arrays.stream(paths).forEach(path -> {
HoodieStorage storage = null;
try {
storage = new HoodieHadoopStorage(path.getFileSystem(job));
Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path));
if (tablePath.isPresent()) {
uniqTablePaths.add(tablePath.get().toUri().toString());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});

try {
for (String path : uniqTablePaths) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(new HadoopStorageConfiguration(job)).build();
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
Option<InternalSchema> schema = schemaUtil.getTableInternalSchemaFromCommitMetadata();
if (schema.isPresent()) {
LOG.info("Set internal schema and avro schema of path: " + path.toString());
job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, SerDeHelper.toJson(schema.get()));
job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, schemaUtil.getTableAvroSchema().toString());
}
}
} catch (Exception e) {
LOG.warn("Fail to set internal schema", e);
}
}

LOG.info("Number of all splits " + result.size());
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
return result.toArray(new InputSplit[result.size()]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.hadoop.hive;

import org.apache.hadoop.mapred.lib.CombineFileSplit;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
Expand All @@ -31,10 +32,17 @@
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.hadoop.SchemaEvolutionContext;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -111,6 +119,84 @@ public void tearDown() throws IOException {
}
}

@Test
public void testInternalSchemaCacheForMR() throws Exception {
StorageConfiguration<Configuration> conf = HoodieTestUtils.getDefaultStorageConf();
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
java.nio.file.Path path1 = tempDir.resolve("tblOne");
java.nio.file.Path path2 = tempDir.resolve("tblTwo");
HoodieTestUtils.init(conf, path1.toString(), HoodieTableType.MERGE_ON_READ);
HoodieTestUtils.init(conf, path2.toString(), HoodieTableType.MERGE_ON_READ);
String commitTime = "100";
final int numRecords = 10;
// Create 3 parquet files with 10 records each for partition 1
File partitionDirOne = InputFormatTestUtil.prepareParquetTable(path1, schema, 3, numRecords, commitTime);
HoodieCommitMetadata commitMetadataOne = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
// mock the latest schema to the commit metadata
InternalSchema internalSchema = AvroInternalSchemaConverter.convert(schema);
commitMetadataOne.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(internalSchema));
FileCreateUtils.createCommit(path1.toString(), commitTime, Option.of(commitMetadataOne));
// Create 3 parquet files with 10 records each for partition 2
File partitionDirTwo = InputFormatTestUtil.prepareParquetTable(path2, schema, 3, numRecords, commitTime);
HoodieCommitMetadata commitMetadataTwo = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
// Mock the latest schema to the commit metadata
commitMetadataTwo.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(internalSchema));
FileCreateUtils.createCommit(path2.toString(), commitTime, Option.of(commitMetadataTwo));

// Enable schema evolution
conf.set("hoodie.schema.on.read.enable", "true");

TableDesc tblDesc = Utilities.defaultTd;
// Set the input format
tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class);
PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
LinkedHashMap<Path, ArrayList<String>> tableAlias = new LinkedHashMap<>();
ArrayList<String> alias = new ArrayList<>();
// Add partition info one
alias.add(path1.toAbsolutePath().toString());
tableAlias.put(new Path(path1.toAbsolutePath().toString()), alias);
pt.put(new Path(path1.toAbsolutePath().toString()), partDesc);
// Add partition info two
alias.add(path2.toAbsolutePath().toString());
tableAlias.put(new Path(path2.toAbsolutePath().toString()), alias);
pt.put(new Path(path2.toAbsolutePath().toString()), partDesc);

MapredWork mrwork = new MapredWork();
mrwork.getMapWork().setPathToPartitionInfo(pt);
mrwork.getMapWork().setPathToAliases(tableAlias);
mrwork.getMapWork().setMapperCannotSpanPartns(true);
Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
Utilities.setMapRedWork(conf.unwrap(), mrwork, mapWorkPath);
JobConf jobConf = new JobConf(conf.unwrap());
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDirOne.getPath() + "," + partitionDirTwo.getPath());
jobConf.set(HAS_MAP_WORK, "true");
// The following config tells Hive to choose ExecMapper to read the MAP_WORK
jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
// set SPLIT_MAXSIZE larger to create one split for 3 files groups
jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "128000000");

HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat();
String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double";
InputFormatTestUtil.setProjectFieldsForInputFormat(jobConf, schema, tripsHiveColumnTypes);
InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 2);
// Check the internal schema and avro is the same as the original one
for (InputSplit split : splits) {
HoodieCombineRealtimeFileSplit inputSplitShim = (HoodieCombineRealtimeFileSplit) ((HoodieCombineRealtimeHiveSplit) split).getInputSplitShim();
List<FileSplit> fileSplits = inputSplitShim.getRealtimeFileSplits();
for (FileSplit fileSplit : fileSplits) {
SchemaEvolutionContext schemaEvolutionContext = new SchemaEvolutionContext(fileSplit, jobConf);
Option<InternalSchema> internalSchemaFromCache = schemaEvolutionContext.getInternalSchemaFromCache();
assertEquals(internalSchemaFromCache.get(), internalSchema);
Schema avroSchemaFromCache = schemaEvolutionContext.getAvroSchemaFromCache();
assertEquals(avroSchemaFromCache, schema);
}
}
}

@Test
public void multiPartitionReadersRealtimeCombineHoodieInputFormat() throws Exception {
// test for HUDI-1718
Expand Down

0 comments on commit 59d00b6

Please sign in to comment.