diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java
index cf83bf2d7f4b5..715e660d91143 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
@@ -117,6 +118,19 @@ public PathOutputCommitter createOutputCommitter(
return createFileOutputCommitter(outputPath, context);
}
+ /**
+ * Create an output committer for a job.
+ * @param outputPath output path. This may be null.
+ * @param context context
+ * @return a new committer
+ * @throws IOException problems instantiating the committer
+ */
+ public PathOutputCommitter createOutputCommitter(
+ Path outputPath,
+ JobContext context) throws IOException {
+ return createFileOutputCommitter(outputPath, context);
+ }
+
/**
* Create an instance of the default committer, a {@link FileOutputCommitter}
* for a task.
@@ -134,6 +148,14 @@ protected final PathOutputCommitter createFileOutputCommitter(
return new FileOutputCommitter(outputPath, context);
}
+ protected final PathOutputCommitter createFileOutputCommitter(
+ Path outputPath,
+ JobContext context) throws IOException {
+ LOG.debug("Creating FileOutputCommitter for path {} and context {}",
+ outputPath, context);
+ return new FileOutputCommitter(outputPath, context);
+ }
+
/**
* Get the committer factory for a configuration.
* @param outputPath the job's output path. If null, it means that the
@@ -185,6 +207,13 @@ public static PathOutputCommitterFactory getCommitterFactory(
return ReflectionUtils.newInstance(factory, conf);
}
+ public static PathOutputCommitter createCommitter(Path outputPath,
+ JobContext context) throws IOException {
+ return getCommitterFactory(outputPath,
+ context.getConfiguration())
+ .createOutputCommitter(outputPath, context);
+ }
+
/**
* Create the committer factory for a task attempt and destination, then
* create the committer from it.
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index d28704b7c334e..40e80ed366448 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -459,6 +459,7 @@
org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
org.apache.hadoop.fs.s3a.commit.impl.*
org.apache.hadoop.fs.s3a.commit.magic.*
+ org.apache.hadoop.fs.s3a.commit.magic.mapred.*
org.apache.hadoop.fs.s3a.commit.staging.*
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
index 09664a6dbdf63..15833a1fa875e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
@@ -83,6 +83,7 @@
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.JOB_TEZ_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID;
import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
@@ -240,6 +241,55 @@ protected AbstractS3ACommitter(
outputPath.toString());
}
+ /**
+ * Create a committer.
+ * This constructor binds the destination directory and configuration, but
+ * does not update the work path: That must be calculated by the
+ * implementation;
+ * It is omitted here to avoid subclass methods being called too early.
+ * @param outputPath the job's output path: MUST NOT be null.
+ * @param context the job's context
+ * @throws IOException on a failure
+ */
+ protected AbstractS3ACommitter(
+ Path outputPath,
+ JobContext context) throws IOException {
+ super(outputPath, context);
+ setOutputPath(outputPath);
+ this.jobContext = requireNonNull(context, "null job context");
+ this.role = "Job committer " + context.getJobID();
+ setConf(context.getConfiguration());
+ Pair id = buildJobUUID(
+ conf, context.getJobID());
+ this.uuid = id.getLeft();
+ this.uuidSource = id.getRight();
+ LOG.info("Job UUID {} source {}", getUUID(), getUUIDSource().getText());
+ initOutput(outputPath);
+ LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
+ role, jobName(context), jobIdString(context), outputPath);
+ S3AFileSystem fs = getDestS3AFS();
+ if (!fs.isMultipartUploadEnabled()) {
+ throw new PathCommitException(outputPath, "Multipart uploads are disabled for the FileSystem,"
+ + " the committer can't proceed.");
+ }
+ // set this thread's context with the job ID.
+ // audit spans created in this thread will pick
+ // up this value., including the commit operations instance
+ // soon to be created.
+ new AuditContextUpdater(jobContext)
+ .updateCurrentAuditContext();
+
+ // the filesystem is the span source, always.
+ this.auditSpanSource = fs.getAuditSpanSource();
+ this.createJobMarker = context.getConfiguration().getBoolean(
+ CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
+ DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER);
+ // the statistics are shared between this committer and its operations.
+ this.committerStatistics = fs.newCommitterStatistics();
+ this.commitOperations = new CommitOperations(fs, committerStatistics,
+ outputPath.toString());
+ }
+
/**
* Init the output filesystem and path.
* TESTING ONLY; allows mock FS to cheat.
@@ -1377,6 +1427,13 @@ protected void warnOnActiveUploads(final Path path) {
return Pair.of(jobUUID, JobUUIDSource.SparkWriteUUID);
}
+ //no Spark UUID configured
+ // look for one from Tez
+ jobUUID = conf.getTrimmed(JOB_TEZ_UUID, "");
+ if (!jobUUID.isEmpty()) {
+ return Pair.of(jobUUID, JobUUIDSource.TezJobUUID);
+ }
+
// there is no UUID configuration in the job/task config
// Check the job hasn't declared a requirement for the UUID.
@@ -1407,6 +1464,7 @@ protected void warnOnActiveUploads(final Path path) {
*/
public enum JobUUIDSource {
SparkWriteUUID(SPARK_WRITE_UUID),
+ TezJobUUID(JOB_TEZ_UUID),
CommitterUUIDProperty(FS_S3A_COMMITTER_UUID),
JobID("JobID"),
GeneratedLocally("Generated Locally");
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java
index cbbe5fdc602d6..66a6cdfa5b308 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java
@@ -58,6 +58,23 @@ public PathOutputCommitter createOutputCommitter(Path outputPath,
return outputCommitter;
}
+ public PathOutputCommitter createOutputCommitter(Path outputPath,
+ JobContext context) throws IOException {
+ FileSystem fs = getDestinationFileSystem(outputPath, context);
+ PathOutputCommitter outputCommitter;
+ if (fs instanceof S3AFileSystem) {
+ outputCommitter = createJobCommitter((S3AFileSystem)fs,
+ outputPath, context);
+ } else {
+ throw new PathCommitException(outputPath,
+ "Filesystem not supported by this committer");
+ }
+ LOG.info("Using Committer {} for {}",
+ outputCommitter,
+ outputPath);
+ return outputCommitter;
+ }
+
/**
* Get the destination filesystem, returning null if there is none.
* Code using this must explicitly or implicitly look for a null value
@@ -88,4 +105,18 @@ public abstract PathOutputCommitter createTaskCommitter(
S3AFileSystem fileSystem,
Path outputPath,
TaskAttemptContext context) throws IOException;
+
+ /**
+ * Implementation point: create a job committer for a specific filesystem.
+ * @param fileSystem destination FS.
+ * @param outputPath final output path for work
+ * @param context task context
+ * @return a committer
+ * @throws IOException any problem, including the FS not supporting
+ * the desired committer
+ */
+ public abstract PathOutputCommitter createJobCommitter(
+ S3AFileSystem fileSystem,
+ Path outputPath,
+ JobContext context) throws IOException;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java
index ee07e652bed65..d5f3aa565653f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java
@@ -117,6 +117,12 @@ private InternalCommitterConstants() {
public static final String SPARK_WRITE_UUID =
"spark.sql.sources.writeJobUUID";
+ /*
+ * The UUID for jobs set by Tez
+ */
+ public static final String JOB_TEZ_UUID =
+ "job.committer.uuid";
+
/**
* Java temp dir: {@value}.
*/
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
index 7f5455b6098d0..6431465ba2e2a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitterFactory;
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitterFactory;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterFactory;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
@@ -93,6 +94,29 @@ public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem,
}
}
+ @Override
+ public PathOutputCommitter createJobCommitter(S3AFileSystem fileSystem,
+ Path outputPath,
+ JobContext context) throws IOException {
+ AbstractS3ACommitterFactory factory = chooseCommitterFactory(fileSystem,
+ outputPath,
+ context.getConfiguration());
+ if (factory != null) {
+ PathOutputCommitter committer = factory.createJobCommitter(
+ fileSystem, outputPath, context);
+ LOG.info("Using committer {} to output data to {}",
+ (committer instanceof AbstractS3ACommitter
+ ? ((AbstractS3ACommitter) committer).getName()
+ : committer.toString()),
+ outputPath);
+ return committer;
+ } else {
+ LOG.warn("Using standard FileOutputCommitter to commit work."
+ + " This is slow and potentially unsafe.");
+ return createFileOutputCommitter(outputPath, context);
+ }
+ }
+
/**
* Choose a committer from the FS and task configurations. Task Configuration
* takes priority, allowing execution engines to dynamically change
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
index 5ed1a3abd4645..d99f5ef9ec8cd 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
@@ -84,6 +84,22 @@ public MagicS3GuardCommitter(Path outputPath,
getWorkPath());
}
+ /**
+ * Create a job committer.
+ * @param outputPath the job's output path
+ * @param context the job's context
+ * @throws IOException on a failure
+ */
+ public MagicS3GuardCommitter(Path outputPath,
+ JobContext context) throws IOException {
+ super(outputPath, context);
+ setWorkPath(getJobAttemptPath(context));
+ verifyIsMagicCommitPath(getDestS3AFS(), getWorkPath());
+ LOG.debug("Job attempt {} has work path {}",
+ context.getJobID(),
+ getWorkPath());
+ }
+
@Override
public String getName() {
return NAME;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitterFactory.java
index 0392491c35b1c..296c134ef1ec7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitterFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitterFactory.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
@@ -44,4 +45,11 @@ public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem,
return new MagicS3GuardCommitter(outputPath, context);
}
+ @Override
+ public PathOutputCommitter createJobCommitter(S3AFileSystem fileSystem,
+ Path outputPath,
+ JobContext context) throws IOException {
+ return new MagicS3GuardCommitter(outputPath, context);
+ }
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/mapred/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/mapred/MagicS3GuardCommitter.java
new file mode 100644
index 0000000000000..59112f2d96a1b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/mapred/MagicS3GuardCommitter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitterFactory;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
+
+import java.io.IOException;
+
+public class MagicS3GuardCommitter extends OutputCommitter {
+
+ private org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter committer = null;
+
+ private org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter getWrapped(
+ JobContext context) throws IOException {
+ if (committer == null) {
+ committer = (org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter)
+ PathOutputCommitterFactory.createCommitter(
+ new Path(context.getConfiguration().get("mapred.output.dir")),
+ context);
+ }
+ return committer;
+ }
+
+ private org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter getWrapped(
+ TaskAttemptContext context) throws IOException {
+ if (committer == null) {
+ committer = (org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter)
+ MagicS3GuardCommitterFactory.createCommitter(
+ new Path(context.getConfiguration().get("mapred.output.dir")),
+ context);
+ }
+ return committer;
+ }
+
+ @Override
+ public void setupJob(JobContext context) throws IOException {
+ getWrapped(context).setupJob(context);
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext context) throws IOException {
+ getWrapped(context).setupTask(context);
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+ return getWrapped(context).needsTaskCommit(context);
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext context) throws IOException {
+ getWrapped(context).commitTask(context);
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ getWrapped(context).abortTask(context);
+ }
+
+ @Override
+ public void cleanupJob(JobContext context) throws IOException {
+ getWrapped(context).cleanupJob(context);
+ }
+
+ @Override
+ public void commitJob(JobContext context) throws IOException {
+ getWrapped(context).commitJob(context);
+ }
+
+ public final Path getWorkPath() {
+ return committer.getWorkPath();
+ }
+
+ public final Path getOutputPath() {
+ return committer.getOutputPath();
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/mapred/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/mapred/package-info.java
new file mode 100644
index 0000000000000..aab35ddfacdc7
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/mapred/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This is the "Magic" committer's mapred wrapper.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.s3a.commit.magic.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
index 99683db984983..0ddfef310753a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
@@ -58,6 +58,11 @@ public DirectoryStagingCommitter(Path outputPath, TaskAttemptContext context)
super(outputPath, context);
}
+ public DirectoryStagingCommitter(Path outputPath, JobContext context)
+ throws IOException {
+ super(outputPath, context);
+ }
+
@Override
public String getName() {
return NAME;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitterFactory.java
index bfa89144777af..bcbcbc22a10e4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitterFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitterFactory.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
@@ -45,4 +46,10 @@ public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem,
return new DirectoryStagingCommitter(outputPath, context);
}
+ public PathOutputCommitter createJobCommitter(S3AFileSystem fileSystem,
+ Path outputPath,
+ JobContext context) throws IOException {
+ return new DirectoryStagingCommitter(outputPath, context);
+ }
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
index 5d1a20e4240c2..e7fea0a7384a0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.fs.s3a.commit.files.PersistentCommitData;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.impl.CommitContext;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.functional.TaskPool;
@@ -74,6 +75,12 @@ public PartitionedStagingCommitter(Path outputPath,
super(outputPath, context);
}
+ public PartitionedStagingCommitter(Path outputPath,
+ JobContext context)
+ throws IOException {
+ super(outputPath, context);
+ }
+
@Override
public String getName() {
return NAME;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitterFactory.java
index b446f22e6d888..8a85bd5e36d87 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitterFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitterFactory.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
@@ -45,4 +46,10 @@ public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem,
return new PartitionedStagingCommitter(outputPath, context);
}
+ public PathOutputCommitter createJobCommitter(S3AFileSystem fileSystem,
+ Path outputPath,
+ JobContext context) throws IOException {
+ return new PartitionedStagingCommitter(outputPath, context);
+ }
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index 31d7693a2d941..4e4dd6d9cc578 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -134,6 +134,37 @@ public StagingCommitter(Path outputPath,
LOG.debug("Conflict resolution mode: {}", mode);
}
+ /**
+ * Committer for a single task attempt.
+ * @param outputPath final output path
+ * @param context job context
+ * @throws IOException on a failure
+ */
+ public StagingCommitter(Path outputPath,
+ JobContext context) throws IOException {
+ super(outputPath, context);
+ this.constructorOutputPath = requireNonNull(getOutputPath(), "output path");
+ Configuration conf = getConf();
+ this.uploadPartSize = conf.getLongBytes(
+ MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
+ this.uniqueFilenames = conf.getBoolean(
+ FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
+ DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES);
+ setWorkPath(buildWorkPath(context, getUUID()));
+ this.wrappedCommitter = createWrappedCommitter(context, conf);
+ setOutputPath(constructorOutputPath);
+ Path finalOutputPath = requireNonNull(getOutputPath(),
+ "Output path cannot be null");
+ S3AFileSystem fs = getS3AFileSystem(finalOutputPath,
+ context.getConfiguration(), false);
+ s3KeyPrefix = fs.pathToKey(finalOutputPath);
+ LOG.debug("{}: final output path is {}", getRole(), finalOutputPath);
+ // forces evaluation and caching of the resolution mode.
+ ConflictResolution mode = getConflictResolutionMode(getJobContext(),
+ fs.getConf());
+ LOG.debug("Conflict resolution mode: {}", mode);
+ }
+
@Override
public String getName() {
return NAME;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterFactory.java
index 292b16b90c4eb..d8cd1865d6a5b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterFactory.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
@@ -46,4 +47,10 @@ public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem,
return new StagingCommitter(outputPath, context);
}
+ public PathOutputCommitter createJobCommitter(S3AFileSystem fileSystem,
+ Path outputPath,
+ JobContext context) throws IOException {
+ return new StagingCommitter(outputPath, context);
+ }
+
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
index 165379d1dc0c8..814d53eb0e76b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
@@ -80,6 +80,7 @@
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.JOB_TEZ_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_TASKS_SUCCEEDED;
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
@@ -1774,6 +1775,37 @@ public void testSelfGeneratedUUID() throws Throwable {
committer2.abortTask(tContext2);
}
+ /**
+ * Verify Tez-generated UUID logic.
+ */
+ @Test
+ public void testTezGeneratedUUID() throws Throwable {
+ describe("Create committer with Tez-generated UUID and see if it accepts it");
+ Configuration conf = getConfiguration();
+
+ unsetUUIDOptions(conf);
+ // job must pull Tez-generated UUID
+ conf.set(InternalCommitterConstants.JOB_TEZ_UUID, "dag-application654444302_10");
+
+ // create the job. don't write anything
+ JobData jobData = startJob(false);
+ AbstractS3ACommitter committer = jobData.committer;
+ String uuid = committer.getUUID();
+ Assertions.assertThat(committer.getUUIDSource())
+ .describedAs("UUID source of %s", committer)
+ .isEqualTo(AbstractS3ACommitter.JobUUIDSource.TezJobUUID);
+
+ // examine the job configuration and verify that it has been updated
+ Configuration jobConf = jobData.conf;
+ Assertions.assertThat(jobConf.get(FS_S3A_COMMITTER_UUID, null))
+ .describedAs("Config option " + FS_S3A_COMMITTER_UUID)
+ .isEqualTo(uuid);
+ Assertions.assertThat(jobConf.get(FS_S3A_COMMITTER_UUID_SOURCE, null))
+ .describedAs("Config option " + FS_S3A_COMMITTER_UUID_SOURCE)
+ .isEqualTo(AbstractS3ACommitter.JobUUIDSource.TezJobUUID
+ .getText());
+ }
+
/**
* Verify the option to require a UUID applies and
* when a committer is instantiated without those options,
@@ -1799,6 +1831,7 @@ public void testRequirePropagatedUUID() throws Throwable {
* @return the patched config
*/
protected Configuration unsetUUIDOptions(final Configuration conf) {
+ conf.unset(JOB_TEZ_UUID);
conf.unset(SPARK_WRITE_UUID);
conf.unset(FS_S3A_COMMITTER_UUID);
conf.unset(FS_S3A_COMMITTER_GENERATE_UUID);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
index cbfc23a2a29b6..273bbe244784e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
@@ -25,6 +25,8 @@
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@@ -213,6 +215,20 @@ public void testCommittersPathsHaveUUID() throws Throwable {
.contains(ta0);
}
+ @Test
+ public void testCommitterJobContextPath() throws Throwable {
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(
+ getConfiguration(),
+ getTaskAttempt0());
+ JobContext jobContext = new JobContextImpl(
+ new JobConf(getConfiguration()), tContext.getJobID());
+ MagicS3GuardCommitter committer1 = createCommitter(getOutDir(), tContext);
+ MagicS3GuardCommitter committer2 = new MagicS3GuardCommitter(getOutDir(), jobContext);
+ //Task specific segment of working path is 3 levels deep beyond the job specific segment
+ Assertions.assertThat(committer2.getJobAttemptPath(jobContext))
+ .isEqualTo(committer1.getTaskAttemptPath(tContext).getParent().getParent().getParent());
+ }
+
/**
* The class provides a overridden implementation of commitJobInternal which
* causes the commit failed for the first time then succeed.