Skip to content

Commit

Permalink
partial merge of pr/59
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Perkins committed Jun 30, 2014
1 parent fe9a68d commit 6668301
Showing 1 changed file with 28 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -362,27 +363,31 @@ protected void updatePlanStatusForCompletedJobId(P2jPlanStatus planStatus, Strin
to the remote job tracker so we don't have access to this
information. */
try {
List<TaskReport> reports = Lists.newArrayList();
reports.addAll(Arrays.asList(jobClient.getMapTaskReports(jobID)));
reports.addAll(Arrays.asList(jobClient.getReduceTaskReports(jobID)));
for(TaskReport rpt : reports) {
/* rpt.getStartTime() sometimes returns zero meaning it does
not know what time it started so we need to prevent using
this or we'll lose the actual lowest start time */
long taskStartTime = rpt.getStartTime();
if (0 != taskStartTime) {
startTime = Math.min(startTime, taskStartTime);
if (!jobClient.getConf().getBoolean("pig.stats.notaskreport", false)) {
List<TaskReport> reports = Lists.newArrayList();
reports.addAll(Arrays.asList(jobClient.getMapTaskReports(jobID)));
reports.addAll(Arrays.asList(jobClient.getReduceTaskReports(jobID)));
for(TaskReport rpt : reports) {
/* rpt.getStartTime() sometimes returns zero meaning it does
not know what time it started so we need to prevent using
this or we'll lose the actual lowest start time */
long taskStartTime = rpt.getStartTime();
if (0 != taskStartTime) {
startTime = Math.min(startTime, taskStartTime);
}
finishTime = Math.max(finishTime, rpt.getFinishTime());
}
finishTime = Math.max(finishTime, rpt.getFinishTime());
}
P2jJobStatus jobStatus = jobIdToJobStatusMap.get(jobId);
if (startTime < Long.MAX_VALUE) {
jobStatus.setStartTime(startTime);
}
if (finishTime > Long.MIN_VALUE) {
jobStatus.setFinishTime(finishTime);
P2jJobStatus jobStatus = jobIdToJobStatusMap.get(jobId);
if (startTime < Long.MAX_VALUE) {
jobStatus.setStartTime(startTime);
}
if (finishTime > Long.MIN_VALUE) {
jobStatus.setFinishTime(finishTime);
}
LOG.info("Determined start and finish times for job " + jobId);
} else {
LOG.info("Skipping determining start and finish times for job " + jobId);
}
LOG.info("Determined start and finish times for job " + jobId);
} catch (IOException e) {
LOG.error("Error getting job info.", e);
}
Expand All @@ -408,19 +413,18 @@ protected P2jJobStatus buildJobStatusMap(String jobId) {
}

JobID jobID = rj.getID();
js.setCounters(buildCountersMap(rj.getCounters()));
Counters counters = rj.getCounters();
js.setCounters(buildCountersMap(counters));
js.setWarnings(getRunningJobWarnings(jobClient, jobID));

TaskReport[] mapTaskReport = jobClient.getMapTaskReports(jobID);
TaskReport[] reduceTaskReport = jobClient.getReduceTaskReports(jobID);
js.setJobName(rj.getJobName());
js.setTrackingUrl(rj.getTrackingURL());
js.setIsComplete(rj.isComplete());
js.setIsSuccessful(rj.isSuccessful());
js.setMapProgress(rj.mapProgress());
js.setReduceProgress(rj.reduceProgress());
js.setTotalMappers(mapTaskReport.length);
js.setTotalReducers(reduceTaskReport.length);
js.setTotalMappers((int)counters.findCounter(JobInProgress.Counter.TOTAL_LAUNCHED_MAPS).getCounter());
js.setTotalReducers((int)counters.findCounter(JobInProgress.Counter.TOTAL_LAUNCHED_REDUCES).getCounter());
return js;
} catch (IOException e) {
LOG.error("Error getting job info.", e);
Expand Down

0 comments on commit 6668301

Please sign in to comment.