Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ jobs:
with:
role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/${{ secrets.AWS_DEPLOY_ROLE }}
role-session-name: nodejs_int_latest_tests
role-duration-seconds: 21600
aws-region: ${{ secrets.AWS_DEFAULT_REGION }}
output-credentials: true

Expand Down
3 changes: 3 additions & 0 deletions common/lib/connection_plugin_chain_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import { HostMonitoring2PluginFactory } from "./plugins/efm2/host_monitoring2_pl
import { BlueGreenPluginFactory } from "./plugins/bluegreen/blue_green_plugin_factory";
import { GlobalDbFailoverPluginFactory } from "./plugins/gdb_failover/global_db_failover_plugin_factory";
import { FullServicesContainer } from "./utils/full_services_container";
import { GdbReadWriteSplittingPluginFactory } from "./plugins/read_write_splitting/gdb_read_write_splitting_plugin_factory";

/*
Type alias used for plugin factory sorting. It holds a reference to a plugin
Expand All @@ -65,6 +66,7 @@ export class ConnectionPluginChainBuilder {
["staleDns", { factory: StaleDnsPluginFactory, weight: 500 }],
["bg", { factory: BlueGreenPluginFactory, weight: 550 }],
["readWriteSplitting", { factory: ReadWriteSplittingPluginFactory, weight: 600 }],
["gdbReadWriteSplitting", { factory: GdbReadWriteSplittingPluginFactory, weight: 610 }],
["failover", { factory: FailoverPluginFactory, weight: 700 }],
["failover2", { factory: Failover2PluginFactory, weight: 710 }],
["gdbFailover", { factory: GlobalDbFailoverPluginFactory, weight: 720 }],
Expand All @@ -87,6 +89,7 @@ export class ConnectionPluginChainBuilder {
[StaleDnsPluginFactory, 500],
[BlueGreenPluginFactory, 550],
[ReadWriteSplittingPluginFactory, 600],
[GdbReadWriteSplittingPluginFactory, 610],
[FailoverPluginFactory, 700],
[Failover2PluginFactory, 710],
[GlobalDbFailoverPluginFactory, 720],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ export class ClusterTopologyMonitorImpl extends AbstractMonitor implements Clust
if (writerClient && writerClientHostInfo) {
logger.debug(Messages.get("ClusterTopologyMonitor.writerPickedUpFromHostMonitors", writerClientHostInfo.toString()));

const oldMonitoringClient = this.monitoringClient;
this.monitoringClient = writerClient;
this.writerHostInfo = writerClientHostInfo;
this.isVerifiedWriterConnection = true;
Expand All @@ -425,6 +426,11 @@ export class ClusterTopologyMonitorImpl extends AbstractMonitor implements Clust
this.readerTopologiesById.clear();
this.completedOneCycle.clear();

// Close the old monitoring client that was replaced by the new writer client.
if (oldMonitoringClient && oldMonitoringClient !== this.monitoringClient) {
await this.closeConnection(oldMonitoringClient);
}

await this.delay(true);
continue;
} else {
Expand Down
15 changes: 13 additions & 2 deletions common/lib/host_list_provider/rds_host_list_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,21 @@ export class RdsHostListProvider implements DynamicHostListProvider {

if (!this.pluginService.isDialectConfirmed()) {
// We need to confirm the dialect before creating a topology monitor so that it uses the correct SQL queries.
// We will return the original hosts parsed from the connections string until the dialect has been confirmed.
// Return the original hosts parsed from the connection string.
return this.initialHostList;
}

return await this.forceRefreshMonitor(verifyTopology, timeoutMs);
const hosts = await this.forceRefreshMonitor(verifyTopology, timeoutMs);
if (hosts && hosts.length > 0) {
return hosts;
}

// Check for cached topology as a fallback.
const storedTopology = this.getStoredTopology();
if (storedTopology && storedTopology.length > 0) {
return storedTopology;
}
return this.initialHostList;
}

async getHostRole(client: ClientWrapper, _dialect: DatabaseDialect): Promise<HostRole> {
Expand Down Expand Up @@ -236,6 +246,7 @@ export class RdsHostListProvider implements DynamicHostListProvider {
}

async getCurrentTopology(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo[]> {
this.init();
return await this.topologyUtils.queryForTopology(targetClient, dialect, this.initialHost, this.clusterInstanceTemplate);
}

Expand Down
2 changes: 1 addition & 1 deletion common/lib/partial_plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ export class PartialPluginService implements PluginService, HostListProviderServ
}

isDialectConfirmed(): boolean {
throw new AwsWrapperError(Messages.get("PartialPluginService.unexpectedMethodCall", "isDialectConfirmed"));
return true;
}

setInTransaction(inTransaction: boolean): void {
Expand Down
2 changes: 1 addition & 1 deletion common/lib/pg_client_wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class PgClientWrapper implements ClientWrapper {

async abort(): Promise<void> {
try {
return await ClientUtils.queryWithTimeout(this.end(), this.properties);
this.client?.connection?.stream?.destroy();
} catch (error: any) {
// Ignore
}
Expand Down
23 changes: 23 additions & 0 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,28 @@ export class PluginServiceImpl implements PluginService, HostListProviderService
return this.dialect;
}

private static readonly DIALECT_CONFIRMED_STATUS_KEY = "DialectConfirmed";

private getDialectConfirmedCacheKey(): string {
let clusterId = WrapperProperties.CLUSTER_ID.defaultValue;
try {
clusterId = this._hostListProvider?.getClusterId() ?? WrapperProperties.CLUSTER_ID.defaultValue;
} catch (e) {
// May fail if the host list provider does not support getClusterId. In this case use the default value.
}
return `${clusterId}::${PluginServiceImpl.DIALECT_CONFIRMED_STATUS_KEY}`;
}

isDialectConfirmed(): boolean {
if (this._isDialectConfirmed) {
return true;
}

const cacheItem = this.storageService.get(StatusCacheItem, this.getDialectConfirmedCacheKey());
if (cacheItem && cacheItem.status === true) {
this._isDialectConfirmed = true;
}

return this._isDialectConfirmed;
}

Expand Down Expand Up @@ -634,6 +655,8 @@ export class PluginServiceImpl implements PluginService, HostListProviderService
this.dialect = await this.dbDialectProvider.getDialectForUpdate(targetClient, this.initialHost, this.props.get(WrapperProperties.HOST.name));

this._isDialectConfirmed = true;
this.storageService.set(this.getDialectConfirmedCacheKey(), new StatusCacheItem(true));

if (originalDialect === this.dialect) {
return;
}
Expand Down
15 changes: 9 additions & 6 deletions common/lib/plugins/bluegreen/blue_green_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import { IamAuthenticationPlugin } from "../../authentication/iam_authentication
import { BlueGreenRole } from "./blue_green_role";
import { ExecuteRouting, RoutingResultHolder } from "./routing/execute_routing";
import { CanReleaseResources } from "../../can_release_resources";
import { FullServicesContainer } from "../../utils/full_services_container";

export interface BlueGreenProviderSupplier {
create(pluginService: PluginService, props: Map<string, any>, bgdId: string): BlueGreenStatusProvider;
create(servicesContainer: FullServicesContainer, props: Map<string, any>, bgdId: string): BlueGreenStatusProvider;
}

export class BlueGreenPlugin extends AbstractConnectionPlugin implements CanReleaseResources {
Expand All @@ -42,6 +43,7 @@ export class BlueGreenPlugin extends AbstractConnectionPlugin implements CanRele

private static readonly CLOSED_METHOD_NAMES: Set<string> = new Set(["end", "abort"]);
protected readonly pluginService: PluginService;
protected readonly servicesContainer: FullServicesContainer;
protected readonly properties: Map<string, any>;
protected bgProviderSupplier: BlueGreenProviderSupplier;
protected bgStatus: BlueGreenStatus = null;
Expand All @@ -53,18 +55,19 @@ export class BlueGreenPlugin extends AbstractConnectionPlugin implements CanRele
protected endTimeNano: bigint = BigInt(0);
private static provider: Map<string, BlueGreenStatusProvider> = new Map();

constructor(pluginService: PluginService, properties: Map<string, any>, bgProviderSupplier: BlueGreenProviderSupplier = null) {
constructor(servicesContainer: FullServicesContainer, properties: Map<string, any>, bgProviderSupplier: BlueGreenProviderSupplier = null) {
super();
if (!bgProviderSupplier) {
bgProviderSupplier = {
create: (pluginService: PluginService, props: Map<string, any>, bgdId: string): BlueGreenStatusProvider => {
return new BlueGreenStatusProvider(pluginService, props, bgdId);
create: (servicesContainer: FullServicesContainer, props: Map<string, any>, bgdId: string): BlueGreenStatusProvider => {
return new BlueGreenStatusProvider(servicesContainer, props, bgdId);
}
};
}

this.properties = properties;
this.pluginService = pluginService;
this.servicesContainer = servicesContainer;
this.pluginService = servicesContainer.pluginService;
this.bgProviderSupplier = bgProviderSupplier;
this.bgdId = WrapperProperties.BGD_ID.get(this.properties).trim().toLowerCase();
}
Expand Down Expand Up @@ -215,7 +218,7 @@ export class BlueGreenPlugin extends AbstractConnectionPlugin implements CanRele
private initProvider() {
const provider = BlueGreenPlugin.provider.get(this.bgdId);
if (!provider) {
const provider = this.bgProviderSupplier.create(this.pluginService, this.properties, this.bgdId);
const provider = this.bgProviderSupplier.create(this.servicesContainer, this.properties, this.bgdId);
BlueGreenPlugin.provider.set(this.bgdId, provider);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class BlueGreenPluginFactory extends ConnectionPluginFactory {
if (!BlueGreenPluginFactory.blueGreenPlugin) {
BlueGreenPluginFactory.blueGreenPlugin = await import("./blue_green_plugin");
}
return new BlueGreenPluginFactory.blueGreenPlugin.BlueGreenPlugin(servicesContainer.pluginService, props);
return new BlueGreenPluginFactory.blueGreenPlugin.BlueGreenPlugin(servicesContainer, props);
} catch (error: any) {
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "BlueGreenPluginFactory"));
}
Expand Down
16 changes: 10 additions & 6 deletions common/lib/plugins/failover/failover_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { ClientWrapper } from "../../client_wrapper";
import { getWriter, logTopology } from "../../utils/utils";
import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter";
import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level";
import { FullServicesContainer } from "../../utils/full_services_container";

export class FailoverPlugin extends AbstractConnectionPlugin {
private static readonly TELEMETRY_WRITER_FAILOVER = "failover to writer instance";
Expand Down Expand Up @@ -79,34 +80,36 @@ export class FailoverPlugin extends AbstractConnectionPlugin {

private hostListProviderService?: HostListProviderService;
private readonly pluginService: PluginService;
private readonly servicesContainer: FullServicesContainer;
protected enableFailoverSetting: boolean = WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.defaultValue;

constructor(pluginService: PluginService, properties: Map<string, any>, rdsHelper: RdsUtils);
constructor(servicesContainer: FullServicesContainer, properties: Map<string, any>, rdsHelper: RdsUtils);
constructor(
pluginService: PluginService,
servicesContainer: FullServicesContainer,
properties: Map<string, any>,
rdsHelper: RdsUtils,
readerFailoverHandler: ClusterAwareReaderFailoverHandler,
writerFailoverHandler: ClusterAwareWriterFailoverHandler
);
constructor(
pluginService: PluginService,
servicesContainer: FullServicesContainer,
properties: Map<string, any>,
rdsHelper: RdsUtils,
readerFailoverHandler?: ClusterAwareReaderFailoverHandler,
writerFailoverHandler?: ClusterAwareWriterFailoverHandler
) {
super();
this._properties = properties;
this.pluginService = pluginService;
this.pluginService = servicesContainer.pluginService;
this.servicesContainer = servicesContainer;
this._rdsHelper = rdsHelper;

this.initSettings();

this._readerFailoverHandler = readerFailoverHandler
? readerFailoverHandler
: new ClusterAwareReaderFailoverHandler(
pluginService,
this.pluginService,
properties,
this.failoverTimeoutMsSetting,
this.failoverReaderConnectTimeoutMsSetting,
Expand All @@ -115,7 +118,8 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
this._writerFailoverHandler = writerFailoverHandler
? writerFailoverHandler
: new ClusterAwareWriterFailoverHandler(
pluginService,
this.pluginService,
this.servicesContainer,
this._readerFailoverHandler,
properties,
this.failoverTimeoutMsSetting,
Expand Down
2 changes: 1 addition & 1 deletion common/lib/plugins/failover/failover_plugin_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class FailoverPluginFactory extends ConnectionPluginFactory {
if (!FailoverPluginFactory.failoverPlugin) {
FailoverPluginFactory.failoverPlugin = await import("./failover_plugin");
}
return new FailoverPluginFactory.failoverPlugin.FailoverPlugin(servicesContainer.pluginService, properties, new RdsUtils());
return new FailoverPluginFactory.failoverPlugin.FailoverPlugin(servicesContainer, properties, new RdsUtils());
} catch (error: any) {
throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "FailoverPlugin"));
}
Expand Down
25 changes: 22 additions & 3 deletions common/lib/plugins/failover/writer_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import { logger } from "../../../logutils";
import { WrapperProperties } from "../../wrapper_property";
import { ClientWrapper } from "../../client_wrapper";
import { FailoverRestriction } from "./failover_restriction";
import { FullServicesContainer } from "../../utils/full_services_container";
import { ServiceUtils } from "../../utils/service_utils";
import { DatabaseDialect } from "../../database_dialect/database_dialect";

export interface WriterFailoverHandler {
failover(currentTopology: HostInfo[]): Promise<WriterFailoverResult>;
Expand All @@ -47,6 +50,7 @@ export class ClusterAwareWriterFailoverHandler implements WriterFailoverHandler
static readonly RECONNECT_WRITER_TASK = "TaskA";
static readonly WAIT_NEW_WRITER_TASK = "TaskB";
private readonly pluginService: PluginService;
private readonly servicesContainer: FullServicesContainer;
private readonly readerFailoverHandler: ClusterAwareReaderFailoverHandler;
private readonly initialConnectionProps: Map<string, any>;
maxFailoverTimeoutMs = 60000; // 60 sec
Expand All @@ -55,30 +59,45 @@ export class ClusterAwareWriterFailoverHandler implements WriterFailoverHandler

constructor(
pluginService: PluginService,
servicesContainer: FullServicesContainer,
readerFailoverHandler: ClusterAwareReaderFailoverHandler,
initialConnectionProps: Map<string, any>,
failoverTimeoutMs?: number,
readTopologyIntervalMs?: number,
reconnectWriterIntervalMs?: number
) {
this.pluginService = pluginService;
this.servicesContainer = servicesContainer;
this.readerFailoverHandler = readerFailoverHandler;
this.initialConnectionProps = initialConnectionProps;
this.maxFailoverTimeoutMs = failoverTimeoutMs ?? this.maxFailoverTimeoutMs;
this.readTopologyIntervalMs = readTopologyIntervalMs ?? this.readTopologyIntervalMs;
this.reconnectionWriterIntervalMs = reconnectWriterIntervalMs ?? this.reconnectionWriterIntervalMs;
}

protected async newServicesContainer(): Promise<FullServicesContainer> {
const container = ServiceUtils.instance.createMinimalServiceContainerFrom(this.servicesContainer, this.initialConnectionProps);
await container.pluginManager.init();
const initialHostInfo = this.pluginService.getInitialConnectionHostInfo();
if (initialHostInfo) {
container.hostListProviderService.setInitialConnectionHostInfo(initialHostInfo);
}
return container;
}

async failover(currentTopology: HostInfo[]): Promise<WriterFailoverResult> {
if (!currentTopology || currentTopology.length == 0) {
logger.error(Messages.get("ClusterAwareWriterFailoverHandler.failoverCalledWithInvalidTopology"));
return ClusterAwareWriterFailoverHandler.DEFAULT_RESULT;
}

const taskAContainer = await this.newServicesContainer();
const taskBContainer = await this.newServicesContainer();

const reconnectToWriterHandlerTask = new ReconnectToWriterHandlerTask(
currentTopology,
getWriter(currentTopology),
this.pluginService,
taskAContainer.pluginService,
this.initialConnectionProps,
this.reconnectionWriterIntervalMs,
Date.now() + this.maxFailoverTimeoutMs
Expand All @@ -88,7 +107,7 @@ export class ClusterAwareWriterFailoverHandler implements WriterFailoverHandler
currentTopology,
getWriter(currentTopology),
this.readerFailoverHandler,
this.pluginService,
taskBContainer.pluginService,
this.initialConnectionProps,
this.readTopologyIntervalMs,
Date.now() + this.maxFailoverTimeoutMs
Expand Down Expand Up @@ -379,7 +398,7 @@ class WaitForNewWriterHandlerTask {
async refreshTopologyAndConnectToNewWriter(): Promise<boolean> {
const allowOldWriter: boolean = this.pluginService.getDialect().getFailoverRestrictions().includes(FailoverRestriction.ENABLE_WRITER_IN_TASK_B);

while (this.pluginService.getCurrentClient() && Date.now() < this.endTime && !this.failoverCompleted) {
while (Date.now() < this.endTime && !this.failoverCompleted) {
try {
if (this.currentReaderTargetClient) {
await this.pluginService.forceRefreshHostList();
Expand Down
Loading
Loading