Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Output first and last query log entries for Teradata in summary section using shared state class #567

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.edwmigration.dumper.application.dumper.SummaryPrinter.SummaryLinePrinter;
import com.google.edwmigration.dumper.application.dumper.connector.Connector;
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
import com.google.edwmigration.dumper.application.dumper.io.FileSystemOutputHandleFactory;
Expand All @@ -43,6 +44,8 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Clock;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -61,6 +64,9 @@ public class MetadataDumper {
private static final Pattern GCS_PATH_PATTERN =
Pattern.compile("gs://(?<bucket>[^/]+)/(?<path>.*)");

private static DateTimeFormatter OUTPUT_DATE_FORMAT =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm").withZone(ZoneOffset.UTC);
sayuzbas-google marked this conversation as resolved.
Show resolved Hide resolved

public boolean run(String... args) throws Exception {
ConnectorArguments arguments = new ConnectorArguments(JsonResponseFile.addResponseFiles(args));
try {
Expand Down Expand Up @@ -194,6 +200,7 @@ protected boolean run(@Nonnull Connector connector, @Nonnull ConnectorArguments
logFinalSummary(
summaryPrinter,
state,
connector,
outputFileLength,
stopwatch,
outputFileLocation,
Expand Down Expand Up @@ -265,9 +272,26 @@ private boolean checkRequiredTaskSuccess(
return true;
}

private void outputFirstAndLastQueryLogEnries(SummaryLinePrinter linePrinter) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to add tests for the logic inside this method


if (QueryLogSharedState.queryLogEntries.size() == 0) {
return;
}

linePrinter.println(
"The first query log entry is '%s' UTC and the last query log entry is '%s' UTC",
QueryLogSharedState.queryLogEntries
.get(QueryLogSharedState.QueryLogEntry.QUERY_LOG_FIRST_ENTRY)
.format(OUTPUT_DATE_FORMAT),
QueryLogSharedState.queryLogEntries
.get(QueryLogSharedState.QueryLogEntry.QUERY_LOG_LAST_ENTRY)
.format(OUTPUT_DATE_FORMAT));
}

private void logFinalSummary(
SummaryPrinter summaryPrinter,
TaskSetState state,
Connector connector,
long outputFileLength,
Stopwatch stopwatch,
String outputFileLocation,
Expand All @@ -281,6 +305,10 @@ private void logFinalSummary(
+ state.getTasksReports().stream()
.map(taskReport -> taskReport.count() + " " + taskReport.state())
.collect(joining(", ")));
// For now, it will return true only for TeradataLogsConnector and Terada14LogsConnector
sayuzbas-google marked this conversation as resolved.
Show resolved Hide resolved
if (connector.isLogConnector()) {
outputFirstAndLastQueryLogEnries(linePrinter);
}
if (requiredTaskSucceeded) {
linePrinter.println("Output saved to '%s'", outputFileLocation);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2022-2024 Google LLC
* Copyright 2013-2021 CompilerWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.edwmigration.dumper.application.dumper;

import com.google.common.annotations.VisibleForTesting;
import java.time.ZonedDateTime;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class QueryLogSharedState {
public static final ConcurrentMap<QueryLogEntry, ZonedDateTime> queryLogEntries =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concurrent collection suggests that this class is designed for use from multiple threads. If this is the case, please document how this is done - is it fully thread-safe and what rules need to be followed by developers using it. If any guarantees provided by the class aren't obvious then it's also good to document them.

new ConcurrentHashMap<>();
sayuzbas-google marked this conversation as resolved.
Show resolved Hide resolved

public enum QueryLogEntry {
QUERY_LOG_FIRST_ENTRY,
QUERY_LOG_LAST_ENTRY
}

/*
* Calculates first and last query log entries, by applying 'min' and 'max' logic.
*/
public static void updateQueryLogEntries(QueryLogEntry logEntry, ZonedDateTime newDateTime) {
ZonedDateTime currentDateTime = QueryLogSharedState.queryLogEntries.get(logEntry);
if (currentDateTime == null) {
QueryLogSharedState.queryLogEntries.put(logEntry, newDateTime);
} else {
if ((logEntry == QueryLogEntry.QUERY_LOG_FIRST_ENTRY && newDateTime.isBefore(currentDateTime))
|| (logEntry == QueryLogEntry.QUERY_LOG_LAST_ENTRY
&& newDateTime.isAfter(currentDateTime))) {
QueryLogSharedState.queryLogEntries.put(logEntry, newDateTime);
}
}
}

@VisibleForTesting
static void clearQueryLogEntries() {
queryLogEntries.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,8 @@ public void addTasksTo(@Nonnull List<? super Task<?>> out, @Nonnull ConnectorArg
public default Class<? extends Enum<? extends ConnectorProperty>> getConnectorProperties() {
return DefaultProperties.class;
}

default boolean isLogConnector() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.common.io.ByteSink;
import com.google.edwmigration.dumper.application.dumper.ConnectorArguments;
import com.google.edwmigration.dumper.application.dumper.MetadataDumperUsageException;
import com.google.edwmigration.dumper.application.dumper.QueryLogSharedState;
import com.google.edwmigration.dumper.application.dumper.QueryLogSharedState.QueryLogEntry;
import com.google.edwmigration.dumper.application.dumper.annotations.RespectsArgumentQueryLogDays;
import com.google.edwmigration.dumper.application.dumper.annotations.RespectsArgumentQueryLogEnd;
import com.google.edwmigration.dumper.application.dumper.annotations.RespectsArgumentQueryLogStart;
Expand Down Expand Up @@ -86,6 +88,15 @@ public Teradata14LogsConnector() {
super("teradata14-logs");
}

/*
* Overriding it only for Teradata logs connectors, so in MetadataDumper summary
* section only they can output first and last entries of query logs for now
*/
@Override
public boolean isLogConnector() {
return true;
}

private abstract static class Teradata14LogsJdbcTask extends AbstractJdbcTask<Summary> {

protected static String EXPRESSION_VALIDITY_QUERY = "SELECT TOP 1 %s FROM %s";
Expand Down Expand Up @@ -133,7 +144,14 @@ protected Summary doInConnection(
throws SQLException {
String sql = getSql(jdbcHandle);
ResultSetExtractor<Summary> rse = newCsvResultSetExtractor(sink);
return doSelect(connection, withInterval(rse, interval), sql);
Summary summary = doSelect(connection, withInterval(rse, interval), sql);
if (summary != null && summary.rowCount() > 0) {
QueryLogSharedState.updateQueryLogEntries(
QueryLogEntry.QUERY_LOG_FIRST_ENTRY, interval.getStart());
QueryLogSharedState.updateQueryLogEntries(
QueryLogEntry.QUERY_LOG_LAST_ENTRY, interval.getEndExclusive());
}
return summary;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,15 @@ public TeradataLogsConnector() {
super("teradata-logs");
}

/*
* Overriding it only for Teradata logs connectors, so in MetadataDumper summary section
* only they can output first and last entries of query logs for now
*/
@Override
public boolean isLogConnector() {
return true;
}

private ImmutableList<TeradataJdbcSelectTask> createTimeSeriesTasks(
ZonedInterval interval, @Nonnull ConnectorArguments arguments) {
return TIME_SERIES_PROPERTY_TO_FILENAME_PREFIX_MAP.keySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.io.ByteSink;
import com.google.common.primitives.Ints;
import com.google.edwmigration.dumper.application.dumper.QueryLogSharedState;
import com.google.edwmigration.dumper.application.dumper.QueryLogSharedState.QueryLogEntry;
import com.google.edwmigration.dumper.application.dumper.connector.ZonedInterval;
import com.google.edwmigration.dumper.application.dumper.connector.teradata.AbstractTeradataConnector.SharedState;
import com.google.edwmigration.dumper.application.dumper.connector.teradata.query.model.Expression;
Expand Down Expand Up @@ -157,7 +159,14 @@ protected Summary doInConnection(
throws SQLException {
String sql = getOrCreateSql(jdbcHandle);
ResultSetExtractor<Summary> rse = newCsvResultSetExtractor(sink);
return doSelect(connection, withInterval(rse, interval), sql);
Summary summary = doSelect(connection, withInterval(rse, interval), sql);
if (summary != null && summary.rowCount() > 0) {
QueryLogSharedState.updateQueryLogEntries(
QueryLogEntry.QUERY_LOG_FIRST_ENTRY, interval.getStart());
QueryLogSharedState.updateQueryLogEntries(
QueryLogEntry.QUERY_LOG_LAST_ENTRY, interval.getEndExclusive());
}
return summary;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2022-2024 Google LLC
* Copyright 2013-2021 CompilerWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.edwmigration.dumper.application.dumper;

import static org.junit.Assert.assertEquals;

import com.google.edwmigration.dumper.application.dumper.QueryLogSharedState.QueryLogEntry;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class QueryLogSharedStateTest {

@Before
public void beforeEachTest() {
QueryLogSharedState.clearQueryLogEntries();
}

@Test
public void queryLogFirstEntryUpdatedSuccessfully() {
ZonedDateTime newQueryLogDate = ZonedDateTime.now();

// Act
QueryLogSharedState.updateQueryLogEntries(QueryLogEntry.QUERY_LOG_FIRST_ENTRY, newQueryLogDate);

// Assert
assertEquals(
newQueryLogDate,
QueryLogSharedState.queryLogEntries.get(QueryLogEntry.QUERY_LOG_FIRST_ENTRY));
}

@Test
public void queryLogLastEntryUpdatedSuccessfully() {
ZonedDateTime newQueryLogDate = ZonedDateTime.now();

// Act
QueryLogSharedState.updateQueryLogEntries(QueryLogEntry.QUERY_LOG_LAST_ENTRY, newQueryLogDate);

// Assert
assertEquals(
newQueryLogDate,
QueryLogSharedState.queryLogEntries.get(QueryLogEntry.QUERY_LOG_LAST_ENTRY));
}

@Test
public void queryLogFirstEntryUpdatedSuccessfullyForEarlierDate() {
ZonedDateTime now = ZonedDateTime.now();
ZonedDateTime earlierDate = ZonedDateTime.of(1970, 1, 1, 1, 1, 1, 1, ZoneId.of("UTC"));

// Act
QueryLogSharedState.updateQueryLogEntries(QueryLogEntry.QUERY_LOG_FIRST_ENTRY, now);
QueryLogSharedState.updateQueryLogEntries(QueryLogEntry.QUERY_LOG_FIRST_ENTRY, earlierDate);

// Assert
assertEquals(
earlierDate, QueryLogSharedState.queryLogEntries.get(QueryLogEntry.QUERY_LOG_FIRST_ENTRY));
}

@Test
public void queryLogLastEntryUpdatedSuccessfullyForLaterDate() {
ZonedDateTime date = ZonedDateTime.of(2000, 1, 1, 1, 1, 1, 1, ZoneId.of("UTC"));
ZonedDateTime laterDate = ZonedDateTime.now();

// Act
QueryLogSharedState.updateQueryLogEntries(QueryLogEntry.QUERY_LOG_LAST_ENTRY, date);
QueryLogSharedState.updateQueryLogEntries(QueryLogEntry.QUERY_LOG_LAST_ENTRY, laterDate);

// Assert
assertEquals(
laterDate, QueryLogSharedState.queryLogEntries.get(QueryLogEntry.QUERY_LOG_LAST_ENTRY));
}
}