Skip to content

HADOOP-19091: Add support for Tez to MagicS3GuardCommitter #7441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;

Expand Down Expand Up @@ -117,6 +118,19 @@ public PathOutputCommitter createOutputCommitter(
return createFileOutputCommitter(outputPath, context);
}

/**
* Create an output committer for a job.
* @param outputPath output path. This may be null.
* @param context context
* @return a new committer
* @throws IOException problems instantiating the committer
*/
public PathOutputCommitter createOutputCommitter(
Path outputPath,
JobContext context) throws IOException {
return createFileOutputCommitter(outputPath, context);
}

/**
* Create an instance of the default committer, a {@link FileOutputCommitter}
* for a task.
Expand All @@ -134,6 +148,14 @@ protected final PathOutputCommitter createFileOutputCommitter(
return new FileOutputCommitter(outputPath, context);
}

protected final PathOutputCommitter createFileOutputCommitter(
Path outputPath,
JobContext context) throws IOException {
LOG.debug("Creating FileOutputCommitter for path {} and context {}",
outputPath, context);
return new FileOutputCommitter(outputPath, context);
}

/**
* Get the committer factory for a configuration.
* @param outputPath the job's output path. If null, it means that the
Expand Down Expand Up @@ -185,6 +207,13 @@ public static PathOutputCommitterFactory getCommitterFactory(
return ReflectionUtils.newInstance(factory, conf);
}

public static PathOutputCommitter createCommitter(Path outputPath,
JobContext context) throws IOException {
return getCommitterFactory(outputPath,
context.getConfiguration())
.createOutputCommitter(outputPath, context);
}

/**
* Create the committer factory for a task attempt and destination, then
* create the committer from it.
Expand Down
1 change: 1 addition & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@
<exclusion>org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory</exclusion>
<exclusion>org.apache.hadoop.fs.s3a.commit.impl.*</exclusion>
<exclusion>org.apache.hadoop.fs.s3a.commit.magic.*</exclusion>
<exclusion>org.apache.hadoop.fs.s3a.commit.magic.mapred.*</exclusion>
<exclusion>org.apache.hadoop.fs.s3a.commit.staging.*</exclusion>
</exclusions>
<bannedImports>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.JOB_TEZ_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID;
import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
Expand Down Expand Up @@ -240,6 +241,55 @@ protected AbstractS3ACommitter(
outputPath.toString());
}

/**
* Create a committer.
* This constructor binds the destination directory and configuration, but
* does not update the work path: That must be calculated by the
* implementation;
* It is omitted here to avoid subclass methods being called too early.
* @param outputPath the job's output path: MUST NOT be null.
* @param context the job's context
* @throws IOException on a failure
*/
protected AbstractS3ACommitter(
Path outputPath,
JobContext context) throws IOException {
super(outputPath, context);
setOutputPath(outputPath);
this.jobContext = requireNonNull(context, "null job context");
this.role = "Job committer " + context.getJobID();
setConf(context.getConfiguration());
Pair<String, JobUUIDSource> id = buildJobUUID(
conf, context.getJobID());
this.uuid = id.getLeft();
this.uuidSource = id.getRight();
LOG.info("Job UUID {} source {}", getUUID(), getUUIDSource().getText());
initOutput(outputPath);
LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
role, jobName(context), jobIdString(context), outputPath);
S3AFileSystem fs = getDestS3AFS();
if (!fs.isMultipartUploadEnabled()) {
throw new PathCommitException(outputPath, "Multipart uploads are disabled for the FileSystem,"
+ " the committer can't proceed.");
}
// set this thread's context with the job ID.
// audit spans created in this thread will pick
// up this value., including the commit operations instance
// soon to be created.
new AuditContextUpdater(jobContext)
.updateCurrentAuditContext();

// the filesystem is the span source, always.
this.auditSpanSource = fs.getAuditSpanSource();
this.createJobMarker = context.getConfiguration().getBoolean(
CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER);
// the statistics are shared between this committer and its operations.
this.committerStatistics = fs.newCommitterStatistics();
this.commitOperations = new CommitOperations(fs, committerStatistics,
outputPath.toString());
}

/**
* Init the output filesystem and path.
* TESTING ONLY; allows mock FS to cheat.
Expand Down Expand Up @@ -1377,6 +1427,13 @@ protected void warnOnActiveUploads(final Path path) {
return Pair.of(jobUUID, JobUUIDSource.SparkWriteUUID);
}

//no Spark UUID configured
// look for one from Tez
jobUUID = conf.getTrimmed(JOB_TEZ_UUID, "");
if (!jobUUID.isEmpty()) {
return Pair.of(jobUUID, JobUUIDSource.TezJobUUID);
}

// there is no UUID configuration in the job/task config

// Check the job hasn't declared a requirement for the UUID.
Expand Down Expand Up @@ -1407,6 +1464,7 @@ protected void warnOnActiveUploads(final Path path) {
*/
public enum JobUUIDSource {
SparkWriteUUID(SPARK_WRITE_UUID),
TezJobUUID(JOB_TEZ_UUID),
CommitterUUIDProperty(FS_S3A_COMMITTER_UUID),
JobID("JobID"),
GeneratedLocally("Generated Locally");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,23 @@ public PathOutputCommitter createOutputCommitter(Path outputPath,
return outputCommitter;
}

public PathOutputCommitter createOutputCommitter(Path outputPath,
JobContext context) throws IOException {
FileSystem fs = getDestinationFileSystem(outputPath, context);
PathOutputCommitter outputCommitter;
if (fs instanceof S3AFileSystem) {
outputCommitter = createJobCommitter((S3AFileSystem)fs,
outputPath, context);
} else {
throw new PathCommitException(outputPath,
"Filesystem not supported by this committer");
}
LOG.info("Using Committer {} for {}",
outputCommitter,
outputPath);
return outputCommitter;
}

/**
* Get the destination filesystem, returning null if there is none.
* Code using this must explicitly or implicitly look for a null value
Expand Down Expand Up @@ -88,4 +105,18 @@ public abstract PathOutputCommitter createTaskCommitter(
S3AFileSystem fileSystem,
Path outputPath,
TaskAttemptContext context) throws IOException;

/**
* Implementation point: create a job committer for a specific filesystem.
* @param fileSystem destination FS.
* @param outputPath final output path for work
* @param context task context
* @return a committer
* @throws IOException any problem, including the FS not supporting
* the desired committer
*/
public abstract PathOutputCommitter createJobCommitter(
S3AFileSystem fileSystem,
Path outputPath,
JobContext context) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ private InternalCommitterConstants() {
public static final String SPARK_WRITE_UUID =
"spark.sql.sources.writeJobUUID";

/*
* The UUID for jobs set by Tez
*/
public static final String JOB_TEZ_UUID =
"job.committer.uuid";

/**
* Java temp dir: {@value}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitterFactory;
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitterFactory;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterFactory;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;

Expand Down Expand Up @@ -93,6 +94,29 @@ public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem,
}
}

@Override
public PathOutputCommitter createJobCommitter(S3AFileSystem fileSystem,
Path outputPath,
JobContext context) throws IOException {
AbstractS3ACommitterFactory factory = chooseCommitterFactory(fileSystem,
outputPath,
context.getConfiguration());
if (factory != null) {
PathOutputCommitter committer = factory.createJobCommitter(
fileSystem, outputPath, context);
LOG.info("Using committer {} to output data to {}",
(committer instanceof AbstractS3ACommitter
? ((AbstractS3ACommitter) committer).getName()
: committer.toString()),
outputPath);
return committer;
} else {
LOG.warn("Using standard FileOutputCommitter to commit work."
+ " This is slow and potentially unsafe.");
return createFileOutputCommitter(outputPath, context);
}
}

/**
* Choose a committer from the FS and task configurations. Task Configuration
* takes priority, allowing execution engines to dynamically change
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ public MagicS3GuardCommitter(Path outputPath,
getWorkPath());
}

/**
* Create a job committer.
* @param outputPath the job's output path
* @param context the job's context
* @throws IOException on a failure
*/
public MagicS3GuardCommitter(Path outputPath,
JobContext context) throws IOException {
super(outputPath, context);
setWorkPath(getJobAttemptPath(context));
verifyIsMagicCommitPath(getDestS3AFS(), getWorkPath());
LOG.debug("Job attempt {} has work path {}",
context.getJobID(),
getWorkPath());
}

@Override
public String getName() {
return NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;

Expand All @@ -44,4 +45,11 @@ public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem,
return new MagicS3GuardCommitter(outputPath, context);
}

@Override
public PathOutputCommitter createJobCommitter(S3AFileSystem fileSystem,
Path outputPath,
JobContext context) throws IOException {
return new MagicS3GuardCommitter(outputPath, context);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.commit.magic.mapred;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitterFactory;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;

import java.io.IOException;

public class MagicS3GuardCommitter extends OutputCommitter {

private org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter committer = null;

private org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter getWrapped(
JobContext context) throws IOException {
if (committer == null) {
committer = (org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter)
PathOutputCommitterFactory.createCommitter(
new Path(context.getConfiguration().get("mapred.output.dir")),
context);
}
return committer;
}

private org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter getWrapped(
TaskAttemptContext context) throws IOException {
if (committer == null) {
committer = (org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter)
MagicS3GuardCommitterFactory.createCommitter(
new Path(context.getConfiguration().get("mapred.output.dir")),
context);
}
return committer;
}

@Override
public void setupJob(JobContext context) throws IOException {
getWrapped(context).setupJob(context);
}

@Override
public void setupTask(TaskAttemptContext context) throws IOException {
getWrapped(context).setupTask(context);
}

@Override
public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
return getWrapped(context).needsTaskCommit(context);
}

@Override
public void commitTask(TaskAttemptContext context) throws IOException {
getWrapped(context).commitTask(context);
}

@Override
public void abortTask(TaskAttemptContext context) throws IOException {
getWrapped(context).abortTask(context);
}

@Override
public void cleanupJob(JobContext context) throws IOException {
getWrapped(context).cleanupJob(context);
}

@Override
public void commitJob(JobContext context) throws IOException {
getWrapped(context).commitJob(context);
}

public final Path getWorkPath() {
return committer.getWorkPath();
}

public final Path getOutputPath() {
return committer.getOutputPath();
}
}
Loading