Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[b/356461225] Add HDFS progress logging #534

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2022-2024 Google LLC
* Copyright 2013-2021 CompilerWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.edwmigration.dumper.application.dumper;

/** Logger for the progress of the long-running task. */
public interface PartialProgressLogger {

/**
* Logs progress, including the partial progress of a long-running task.
*
* @param partialProgressMessage message describing the progress of the long-running task
*/
void logProgress(String partialProgressMessage);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.primitives.Ints;
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
import com.google.edwmigration.dumper.application.dumper.io.OutputHandle;
import com.google.edwmigration.dumper.application.dumper.io.OutputHandleFactory;
Expand All @@ -35,6 +36,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -72,6 +74,11 @@ public TasksRunner(
private TaskRunContext createContext(
OutputHandleFactory sinkFactory, Handle handle, int threadPoolSize, Impl state) {
return new TaskRunContext(sinkFactory, handle, threadPoolSize) {
@Override
public void logProgress(String partialProgressMessage) {
TasksRunner.this.logProgress(Optional.of(partialProgressMessage));
}

@Override
public TaskState getTaskState(Task<?> task) {
return state.getTaskState(task);
Expand All @@ -96,26 +103,30 @@ private <T> T handleTask(Task<T> task) throws MetadataDumperUsageException {
if (!(task instanceof TaskGroup)) {
numberOfCompletedTasks.getAndIncrement();
}
logProgress();
logProgress(/* partialProgressMessage= */ Optional.empty());
return t;
}

private void logProgress() {
private void logProgress(Optional<String> partialProgressMessage) {
int numberOfCompletedTasks = this.numberOfCompletedTasks.get();

Duration averageTimePerTask = stopwatch.elapsed().dividedBy(max(1, numberOfCompletedTasks));

int percentFinished = numberOfCompletedTasks * 100 / totalNumberOfTasks;
String progressMessage = percentFinished + "% Completed";
int percentFinished =
Ints.constrainToRange(numberOfCompletedTasks * 100 / totalNumberOfTasks, 0, 99);
final StringBuilder progressMessage =
new StringBuilder().append(percentFinished).append("% Completed");

partialProgressMessage.ifPresent(message -> progressMessage.append(", ").append(message));

int remainingTasks = totalNumberOfTasks - numberOfCompletedTasks;
Duration remainingTime = averageTimePerTask.multipliedBy(remainingTasks);

if (numberOfCompletedTasks > 10 && remainingTasks > 0) {
progressMessage += ". ETA: " + formatApproximateDuration(remainingTime);
progressMessage.append(". ETA: ").append(formatApproximateDuration(remainingTime));
}

PROGRESS_LOG.info(progressMessage);
PROGRESS_LOG.info(progressMessage.toString());
}

private <T> T runTask(Task<T> task) throws MetadataDumperUsageException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.google.edwmigration.dumper.application.dumper.connector.hdfs;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.edwmigration.dumper.application.dumper.connector.hdfs.HdfsPermissionExtractionConnector.LOG;

import com.google.common.base.Preconditions;
import com.google.edwmigration.dumper.application.dumper.ConnectorArguments;
Expand All @@ -27,8 +26,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class HdfsHandle implements Handle {
private static final Logger LOG = LoggerFactory.getLogger(HdfsHandle.class);

private final String clusterHost;
private final int port;
private final DistributedFileSystem dfs;
Expand All @@ -38,8 +41,8 @@ class HdfsHandle implements Handle {
clusterHost = args.getHostOrDefault();
port = args.getPort(/* defaultPort= */ 8020);

LOG.info("clusterHost: {}", clusterHost);
LOG.info("port: {}", port);
LOG.info("clusterHost: '{}'", clusterHost);
LOG.info("port: '{}'", port);

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://" + clusterHost + ":" + port + "/");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import java.time.Clock;
import java.util.List;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RespectsInput(
order = 100,
Expand All @@ -52,7 +50,6 @@
@Description("Dumps permissions from the HDFS.")
public class HdfsPermissionExtractionConnector extends AbstractConnector
implements HdfsPermissionExtractionDumpFormat {
static final Logger LOG = LoggerFactory.getLogger(HdfsPermissionExtractionConnector.class);

public HdfsPermissionExtractionConnector() {
super("hdfs-permissions");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package com.google.edwmigration.dumper.application.dumper.connector.hdfs;

import static com.google.edwmigration.dumper.application.dumper.connector.hdfs.HdfsPermissionExtractionConnector.LOG;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;

Expand All @@ -38,9 +37,13 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HdfsPermissionExtractionTask extends AbstractTask<Void>
implements HdfsPermissionExtractionDumpFormat {
private static final Logger LOG = LoggerFactory.getLogger(HdfsPermissionExtractionTask.class);

private final int poolSize;

HdfsPermissionExtractionTask(@Nonnull ConnectorArguments args) {
Expand All @@ -63,21 +66,21 @@ public String getTargetPath() {
protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, @Nonnull Handle handle)
throws IOException, ExecutionException, InterruptedException {
DistributedFileSystem fs = ((HdfsHandle) handle).getDfs();
// Create a dedicated ExecutorService to use:
String hdfsPath = "/";
org.apache.hadoop.fs.ContentSummary contentSummary = fs.getContentSummary(new Path(hdfsPath));
ExecutorService execService =
ExecutorManager.newExecutorServiceWithBackpressure("hdfs-permission-extraction", poolSize);
try (Writer output = sink.asCharSink(UTF_8).openBufferedStream();
ScanContext scanCtx = new ScanContext(fs, output);
ScanContext scanCtx =
new ScanContext(fs, output, contentSummary.getDirectoryCount(), context);
ExecutorManager execManager = new ExecutorManager(execService)) {

String hdfsPath = "/";
FileStatus rootDir = fs.getFileStatus(new Path(hdfsPath));
SingleDirScanJob rootJob = new SingleDirScanJob(scanCtx, execManager, rootDir);
execManager.execute(rootJob); // The root job executes immediately
execManager.await(); // Wait until all (recursive) tasks are done executing
execManager.execute(rootJob);
execManager.await();
LOG.info(scanCtx.getFormattedStats());
} finally {
// Shutdown the dedicated ExecutorService:
MoreExecutors.shutdownAndAwaitTermination(execService, 100, TimeUnit.MILLISECONDS);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.google.edwmigration.dumper.application.dumper.connector.hdfs;

import com.google.common.primitives.Longs;
import com.google.edwmigration.dumper.application.dumper.PartialProgressLogger;
import com.google.edwmigration.dumper.application.dumper.task.AbstractTask;
import com.google.edwmigration.dumper.plugin.lib.dumper.spi.HdfsPermissionExtractionDumpFormat.PermissionExtraction;
import java.io.Closeable;
Expand All @@ -40,10 +42,16 @@ final class ScanContext implements Closeable {
private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneOffset.UTC);

private static final Duration LOG_PROGRESS_DELAY = Duration.ofSeconds(10);

private final DistributedFileSystem dfs;
private final DFSClient dfsClient;
private final CSVPrinter csvPrinter;
private final Instant instantScanBegin;
private final long totalDirectoryCount;
private final PartialProgressLogger partialProgressLogger;

private long lastLogTime;
private LongAdder timeSpentInListStatus = new LongAdder();
private LongAdder numFilesByListStatus = new LongAdder();

Expand All @@ -59,12 +67,19 @@ final class ScanContext implements Closeable {
@GuardedBy("csvPrinter")
private long numDirsWalked = 0L;

ScanContext(DistributedFileSystem dfs, @WillClose Writer outputSink) throws IOException {
ScanContext(
DistributedFileSystem dfs,
@WillClose Writer outputSink,
long totalDirectoryCount,
PartialProgressLogger partialProgressLogger)
throws IOException {
this.dfs = dfs;
this.dfsClient = dfs.getClient();
this.csvPrinter =
AbstractTask.FORMAT.withHeader(PermissionExtraction.Header.class).print(outputSink);
this.instantScanBegin = Instant.now();
this.totalDirectoryCount = totalDirectoryCount;
this.partialProgressLogger = partialProgressLogger;
}

@Override
Expand All @@ -80,13 +95,12 @@ FileStatus[] listDirectory(FileStatus dir) throws IOException {
return files;
}

void beginWalkDir(FileStatus dir) {}

/*
* CsvPrint the directory attributes of the specified dir
* and incrementally update the scan statistics (this is what the rest of the parameters are for)
*/
void endWalkDir(FileStatus dir, long nFiles, long nDirs, long accumFileSize) throws IOException {
logProgress();
String absolutePath = dir.getPath().toUri().getPath();
HdfsFileStatus hdfsFileStatus = dfsClient.getFileInfo(absolutePath);
String strModificationTime =
Expand Down Expand Up @@ -116,6 +130,16 @@ void endWalkDir(FileStatus dir, long nFiles, long nDirs, long accumFileSize) thr
}
}

private void logProgress() {
long now = System.currentTimeMillis();
if (now - lastLogTime > LOG_PROGRESS_DELAY.toMillis()) {
lastLogTime = now;
long percentFinished =
Longs.constrainToRange(this.numDirsWalked * 100 / totalDirectoryCount, 0, 99);
partialProgressLogger.logProgress("HDFS scan " + percentFinished + "% completed");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this looks very similar to the shared/common code in [Concurrent]RecordProgressMonitor. Would it be possible to use that?

}
}

void walkFile(FileStatus file) throws IOException {
String absolutePath = file.getPath().toUri().getPath();
HdfsFileStatus hdfsFileStatus = dfsClient.getFileInfo(absolutePath);
Expand Down Expand Up @@ -152,24 +176,22 @@ String getFormattedStats() {
: Duration.ZERO;

final long numFilesDivisor = numFiles > 0 ? numFiles : 1;
StringBuilder sb =
new StringBuilder()
.append("[HDFS Permission extraction stats]")
.append("\nTotal: num files&dirs: " + numFiles)
.append("\n num dirs found: " + numDirs)
.append("\n num dirs walkd: " + numDirsWalked)
.append("\nTotal File Size: " + accumulatedFileSize)
.append("\nAvg File Size: " + accumulatedFileSize / numFilesDivisor)
.append("\nTotal time: ")
.append(timeSinceScanBegin.getSeconds() + "s")
.append("\nTotal time in listStatus(..): ")
.append(timeSpentInListStatus.getSeconds() + "s")
.append("\nAvg time per file in listStatus(..): ")
.append(avgTimeSpentInListStatusPerFile.toMillis() + "ms")
.append("\nAvg time per doc: ")
.append(timeSinceScanBegin.dividedBy(numFilesDivisor).toMillis() + "ms\n")
.append("\n[/HDFS Permission extraction stats]");

return sb.toString();
return new StringBuilder()
.append("[HDFS Permission extraction stats]")
.append("\nTotal: num files&dirs: " + numFiles)
.append("\n num dirs found: " + numDirs)
.append("\n num dirs walked: " + numDirsWalked)
.append("\nTotal File Size: " + accumulatedFileSize)
.append("\nAvg File Size: " + accumulatedFileSize / numFilesDivisor)
.append("\nTotal time: ")
.append(timeSinceScanBegin.getSeconds() + "s")
.append("\nTotal time in listStatus(..): ")
.append(timeSpentInListStatus.getSeconds() + "s")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a chained append(), not a String-+ in the middle, as that creates an extra StringBuilder and String.

.append("\nAvg time per file in listStatus(..): ")
.append(avgTimeSpentInListStatusPerFile.toMillis() + "ms")
.append("\nAvg time per doc: ")
.append(timeSinceScanBegin.dividedBy(numFilesDivisor).toMillis() + "ms\n")
.append("\n[/HDFS Permission extraction stats]")
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ public Void call() {
long accumFileSize = 0;

try {
scanCtx.beginWalkDir(dir);
for (FileStatus file : scanCtx.listDirectory(dir)) {
// Process file or dir (in this case - just collect statistics)
accumFileSize += file.getLen();

if (file.isDirectory()) {
Expand All @@ -56,10 +54,11 @@ public Void call() {
}
}
scanCtx.endWalkDir(dir, numFiles, numDirs, accumFileSize);
} catch (Exception exn) {
} catch (Exception e) {
LOG.error(
"Unexpected exception while scanning HDFS folder " + dir.getPath().toUri().getPath(),
exn);
"Unexpected exception while scanning HDFS folder '{}'",
dir.getPath().toUri().getPath(),
e);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.base.Preconditions;
import com.google.edwmigration.dumper.application.dumper.MetadataDumperUsageException;
import com.google.edwmigration.dumper.application.dumper.PartialProgressLogger;
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
import com.google.edwmigration.dumper.application.dumper.io.OutputHandle;
import com.google.edwmigration.dumper.application.dumper.io.OutputHandleFactory;
Expand All @@ -27,7 +28,7 @@
import javax.annotation.Nonnull;

/** @author shevek */
public abstract class TaskRunContext implements OutputHandleFactory {
public abstract class TaskRunContext implements OutputHandleFactory, PartialProgressLogger {

private final OutputHandleFactory sinkFactory;
private final Handle handle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ public TaskState getTaskState(Task<?> task) {
public <T> T runChildTask(Task<T> task) {
throw new UnsupportedOperationException("Not supported.");
}

@Override
public void logProgress(String partialProgressMessage) {
throw new UnsupportedOperationException("Not supported.");
}
}