Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8182]Cache internalSchema for hive read, avoid each split reloa… #11914

Merged

Conversation

muyihao
Copy link
Contributor

@muyihao muyihao commented Sep 8, 2024

…d active timeline.

Change Logs

Addressing this issue: 11723
When using MR to read Hudi, use a global InternalSchema to avoid each reader listing the metadata directory to obtain the InternalSchema, which places a significant load on the HDFS NameNode.

Impact

Reduced metadata listing frequency to alleviate pressure on the HDFS NameNode.

Risk level (write none, low medium or high below)

low

Documentation Update

N/A

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Sep 8, 2024
@muyihao muyihao force-pushed the feat/cache-internal-schema-for-mr-read branch from 018164e to 6b5667d Compare September 8, 2024 10:43
@@ -375,6 +389,45 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
// clear work from ThreadLocal after splits generated in case of thread is reused in pool.
Utilities.clearWorkMapForConf(job);

// build internal schema for the query
if (job.getBoolean(INTERNAL_SCHEMA_CACHE_ENABLE, true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it always be true because this is a deterministic optimization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you are right

HoodieStorage storage = null;
try {
storage = new HoodieHadoopStorage(path.getFileSystem(job));
Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case there are multiple table paths?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case there are multiple table paths?

When we perform a query on two different Hudi tables, such as union, join, or subquery, there will be multiple table paths.

job.set(INTERNAL_SCHEMA_CACHE_PREFIX + "." + k, v);
});

job.setBoolean(INTERNAL_SCHEMA_CACHE_VISIT, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just check if the cache key exists instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestion. I will make the changes as recommended

}
} else {
try {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is invalid code path because the table itself is invalid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I will fix it.

@yihua
Copy link
Contributor

yihua commented Sep 11, 2024

@muyihao any update on addressing the comments in the PR?

@muyihao
Copy link
Contributor Author

muyihao commented Sep 11, 2024

@muyihao any update on addressing the comments in the PR?

Currently working on verifying whether HoodieCombineHiveInputFormat can handle multiple Hudi tables and making modifications based on the comments.

@muyihao muyihao force-pushed the feat/cache-internal-schema-for-mr-read branch 2 times, most recently from 59d00b6 to a2a07b6 Compare September 16, 2024 03:28
@danny0405
Copy link
Contributor

Hi, @muyihao Is this ready for review now?

@muyihao
Copy link
Contributor Author

muyihao commented Sep 17, 2024

Hi, @muyihao Is this ready for review now?

@danny0405 Yes, sorry for the late reply. Thank you for your help with the review. Currently, the modifications cache the latest schema for different tables when there are multiple table paths. However, this does not completely avoid loading the ActiveTimeline. When reading COW, it will still trigger the loading of the active timeline when searching for the fileSchema.

@danny0405
Copy link
Contributor

When reading COW, it will still trigger the loading of the active timeline when searching for the fileSchema.

Can you show us the line that triggers this logic?

@muyihao
Copy link
Contributor Author

muyihao commented Sep 18, 2024

When reading COW, it will still trigger the loading of the active timeline when searching for the fileSchema.

Can you show us the line that triggers this logic?

image

InternalSchemaCache#searchSchemaAndCache will trigger InternalSchemaCache#getSchemaByReadingCommitFile or InternalSchemaCache##getHistoricalSchemas, both may load active timeline

@danny0405
Copy link
Contributor

@muyihao Thanks for pinning it out, the metaClient.getActiveTimeline also choose to return the cached timeline first, for one metaClient instantce, the list would happen only once, so I guess it is not a problem if the metaClient can be shared for multiple input splits.

BTW, while I reviewing the code, I found a bug for FileBasedInternalSchemaStorageManager, the passed in metaClient does not always work if the base path of it is not end up with .schema, we should fix it like this:

  public FileBasedInternalSchemaStorageManager(HoodieTableMetaClient metaClient) {
    this.baseSchemaPath = new StoragePath(metaClient.getMetaPath(), SCHEMA_NAME);
    this.storage = metaClient.getStorage();
    this.metaClient = metaClient.getBasePath().getName().equalsIgnoreCase(SCHEMA_NAME) ? metaClient : null;
  }

@muyihao
Copy link
Contributor Author

muyihao commented Sep 20, 2024

@danny0405
Thank you for your reply. I also tried to serialize the metaClient and put it into the conf, but found that the StoragePath serialization fails.
image
do you have any better ideas?

@danny0405
Copy link
Contributor

danny0405 commented Sep 21, 2024

@muyihao Thanks, I have applied a patch again the master code(instead of your patch) to address the timeline list issue, you can merge the patch first then your Job conf set up logic, then it should be working:

8182.patch.zip

@muyihao muyihao force-pushed the feat/cache-internal-schema-for-mr-read branch from a2a07b6 to aacb48d Compare September 21, 2024 09:01
@muyihao
Copy link
Contributor Author

muyihao commented Sep 21, 2024

@danny0405 Thanks for the help, I have merged it.

if (schema.isPresent()) {
LOG.info("Set internal schema and avro schema of path: " + path.toString());
job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, SerDeHelper.toJson(schema.get()));
job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, schemaUtil.getTableAvroSchema().toString());
Copy link
Contributor

@danny0405 danny0405 Sep 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cache the schema by path or by table name? The table name looks more straight-forward. And the schemaUtil.getTableAvroSchema() should be invoked first to set up the commit metadata cache in TableSchemaResolver to avoid redundant commit metadata deserialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 That's a great suggestion. When we handle tables from two different databases together, there might be the same table name. For example, dev.table_xx and online.table_xx, where table_xx is the same. Should we consider this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use table name now, can extent it to path if needed.

return;
}
this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient();
this.internalSchemaOption = getInternalSchemaFromCache();
LOG.info("finish init schema evolution for split: {}", split);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this line for debugging? It looks like not very helpful, maybe we can switch to DEBUG devel or just remove it.

json -> SerDeHelper.fromJson(json)
);
if (internalSchema != null && internalSchema.isPresent()) {
LOG.info("get internal schema from conf for split: {}" + split);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, I think we should remove this line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove it

@@ -139,7 +186,10 @@ public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt
return;
}
if (internalSchemaOption.isPresent()) {
Schema tableAvroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
Schema tableAvroSchema = getAvroSchemaFromCache();
if (tableAvroSchema == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When internalSchemaOption.isPresent(), the getAvroSchemaFromCache() should also exists, this is decided by the cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it seems a bit redundant, I will remove it.

@muyihao muyihao force-pushed the feat/cache-internal-schema-for-mr-read branch from aacb48d to 19bed8b Compare September 21, 2024 10:13
@muyihao muyihao force-pushed the feat/cache-internal-schema-for-mr-read branch from 19bed8b to 0e785c5 Compare September 21, 2024 10:16
for (String path : uniqTablePaths) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(new HadoopStorageConfiguration(job)).build();
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
String avroSchema = schemaUtil.getTableAvroSchema().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the cache would be utilized.

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405
Copy link
Contributor

The test failures are not related, will merge it.

@danny0405 danny0405 merged commit 77eb9e5 into apache:master Sep 22, 2024
42 of 43 checks passed
@muyihao
Copy link
Contributor Author

muyihao commented Sep 23, 2024

The test failures are not related, will merge it.

Thanks for helping to land this PR. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:M PR with lines of changes in (100, 300]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants