Skip to content

Mergable persistance in Postgres tabular #246

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: beta
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/@types/persisters/docs.js
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@
* |`tableId`|Id|The Id of the Store Table into which data from this database table should be loaded.|
* |`rowIdColumnName?`|string|The optional name of the column in the database table that will be used as the Row Ids in the Store Table, defaulting to '_id'.|
* |`condition?`|string|The optional SQL WHERE clause that will be used to filter the rows that are loaded into the Store Table. When set it must include the `$tableName` placeholder for the table name, since v6.1.0.|
* |`timestampColumnName?`|string|The optional name of the column in the database to save timestamp when the row was last updated. It's used for merging data during full load from the database.|
*
* As a shortcut, if you do not need to specify a custom `rowIdColumnName`, you
* can simply provide the Id of the Store Table instead of the whole object.
Expand Down Expand Up @@ -615,6 +616,13 @@
* @since v6.1.0
*/
/// DpcTabularLoad.condition
/**
* The optional name of the column in the database to load timestamp when the row was last updated.
* It's used for merging data during full load from the database.
* @category Configuration
* @since v6.1.0
*/
/// DpcTabularLoad.timestampColumnName
}
}
}
Expand All @@ -634,6 +642,7 @@
* |`deleteEmptyColumns?`|boolean|Whether columns in the database table will be removed if they are empty in the Store Table, defaulting to false.|
* |`deleteEmptyTable?`|boolean|Whether tables in the database will be removed if the Store Table is empty, defaulting to false.|
* |`condition?`|string|The optional SQL WHERE clause that will be used to scope cleanup operations to the Store Table. When set it must include the `$tableName` placeholder for the table name, since v6.1.0. Defaults to `DpcTabularLoad.condition`.|
* |`timestampColumnName?`|string|The optional name of the column in the database to save timestamp when the row was last updated. It's used for merging data during full load from the database.|
*
* As a shortcut, if you do not need to specify a custom `rowIdColumnName`, or
* enable the `deleteEmptyColumns` or `deleteEmptyTable` settings, you can
Expand Down Expand Up @@ -759,6 +768,13 @@
* @since v6.1.0
*/
/// DpcTabularSave.condition
/**
* The optional name of the column in the database to save timestamp when the row was last updated.
* It's used for merging data during full load from the database.
* @category Configuration
* @since v6.1.0
*/
/// DpcTabularSave.timestampColumnName
}
}
}
Expand Down Expand Up @@ -813,6 +829,13 @@
* @since v4.0.0
*/
/// DpcTabularValues.tableName
/**
* The optional name of the column in the database to save timestamp when the row was last updated.
* It's used for merging data during full load from the database.
* @category Configuration
* @since v6.1.0
*/
/// DpcTabularValues.timestampColumnName
}
/**
* A Persister object lets you save and load Store data to and from different
Expand Down
6 changes: 6 additions & 0 deletions src/@types/persisters/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ export type DpcTabularLoad = {
rowIdColumnName?: string;
/// DpcTabularLoad.condition
condition?: DpcTabularCondition;
/// DpcTabularLoad.timestampColumnName
timestampColumnName?: string;
}
| Id;
};
Expand All @@ -138,6 +140,8 @@ export type DpcTabularSave = {
deleteEmptyTable?: boolean;
/// DpcTabularSave.condition
condition?: DpcTabularCondition;
/// DpcTabularSave.timestampColumnName
timestampColumnName?: string;
}
| string;
};
Expand All @@ -150,6 +154,8 @@ export type DpcTabularValues = {
save?: boolean;
/// DpcTabularValues.tableName
tableName?: string;
/// DpcTabularValues.timestampColumnName
timestampColumnName?: string;
};

/// Persister
Expand Down
6 changes: 5 additions & 1 deletion src/persisters/common/database/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export const getCommandFunctions = (
deleteEmptyTable: boolean,
partial?: boolean,
condition?: DpcTabularCondition,
timestampColumnName?: string | null,
) => Promise<void>,
transaction: <Return>(actions: () => Promise<Return>) => Promise<Return>,
] => {
Expand Down Expand Up @@ -133,8 +134,11 @@ export const getCommandFunctions = (
deleteEmptyTable: boolean,
partial = false,
condition: DpcTabularCondition = TRUE,
timestampColumnName?: string | null,
): Promise<void> => {
const settingColumnNameSet = setNew<string>();
const settingColumnNameSet = setNew<string>(
timestampColumnName ?? undefined,
);
objMap(content ?? {}, (contentRow) =>
arrayMap(objIds(contentRow ?? {}), (cellOrValueId) =>
setAdd(settingColumnNameSet, cellOrValueId),
Expand Down
18 changes: 16 additions & 2 deletions src/persisters/common/database/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ export type DefaultedJsonConfig = [
];
export type DefaultedTabularConfig = [
tablesLoadConfig: IdMap<
[tableId: Id, rowIdColumnName: string, condition: DpcTabularCondition]
[
tableId: Id,
rowIdColumnName: string,
condition: DpcTabularCondition,
timestampColumnName: string | null,
]
>,
tablesSaveConfig: IdMap<
[
Expand All @@ -39,9 +44,15 @@ export type DefaultedTabularConfig = [
deleteEmptyColumns: boolean,
deleteEmptyTable: boolean,
condition: DpcTabularCondition,
timestampColumnName: string | null,
]
>,
valuesConfig: [load: boolean, save: boolean, tableName: string],
valuesConfig: [
load: boolean,
save: boolean,
tableName: string,
updatedAtColumnName: string | null,
],
];

const COLUMN_NAME = 'ColumnName';
Expand All @@ -60,6 +71,7 @@ const TABLE_NAME = 'tableName';
const DELETE_EMPTY_COLUMNS = 'deleteEmptyColumns';
const DELETE_EMPTY_TABLE = 'deleteEmptyTable';
const CONDITION = 'condition';
const TIMESTAMP_COLUMN_NAME = 'timestampColumnName';
const DEFAULT_CONFIG: DatabasePersisterConfig = {
mode: JSON,
[AUTO_LOAD_INTERVAL_SECONDS]: 1,
Expand Down Expand Up @@ -153,6 +165,7 @@ export const getConfigStructures = (
[TABLE_ID]: null,
[ROW_ID_COLUMN_NAME]: DEFAULT_ROW_ID_COLUMN_NAME,
[CONDITION]: TRUE,
[TIMESTAMP_COLUMN_NAME]: null,
},
TABLE_ID,
(tableName) => collHas(excludedTableNames, tableName),
Expand All @@ -167,6 +180,7 @@ export const getConfigStructures = (
[DELETE_EMPTY_COLUMNS]: 0,
[DELETE_EMPTY_TABLE]: 0,
[CONDITION]: null,
[TIMESTAMP_COLUMN_NAME]: null,
},
TABLE_NAME,
(_, tableName) => collHas(excludedTableNames, tableName),
Expand Down
88 changes: 80 additions & 8 deletions src/persisters/common/database/tabular.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type {Id} from '../../../@types/common/index.d.ts';
import type {TablesStamp} from '../../../@types/index.js';
import type {
DatabaseExecuteCommand,
PersistedChanges,
Expand All @@ -15,9 +16,14 @@ import type {
Values,
} from '../../../@types/store/index.d.ts';
import {arrayFilter} from '../../../common/array.ts';
import {getHash} from '../../../common/hash.ts';
import {getHlcFunctions} from '../../../common/hlc.ts';
import {getUniqueId} from '../../../common/index.ts';
import {jsonStringWithMap} from '../../../common/json.ts';
import {mapMap} from '../../../common/map.ts';
import {objHas, objIsEmpty, objNew} from '../../../common/obj.ts';
import {isUndefined, promiseAll} from '../../../common/other.ts';
import {objHas, objIsEmpty, objMap, objNew} from '../../../common/obj.ts';
import {isString, isUndefined, promiseAll} from '../../../common/other.ts';
import {getLatestTime, stampNewWithHash} from '../../../common/stamps.ts';
import {createCustomPersister} from '../create.ts';
import {getCommandFunctions} from './commands.ts';
import {
Expand Down Expand Up @@ -46,7 +52,7 @@ export const createTabularPersister = <
[
tablesLoadConfig,
tablesSaveConfig,
[valuesLoad, valuesSave, valuesTableName],
[valuesLoad, valuesSave, valuesTableName, valuesTimestampColumnName],
]: DefaultedTabularConfig,
managedTableNames: string[],
querySchema: QuerySchema,
Expand All @@ -57,6 +63,7 @@ export const createTabularPersister = <
encode?: (cellOrValue: any) => string | number,
decode?: (field: string | number) => any,
): Persister<Persist> => {
const uniqueId = getUniqueId(5);
const [refreshSchema, loadTable, saveTable, transaction] =
getCommandFunctions(
executeCommand,
Expand Down Expand Up @@ -89,6 +96,7 @@ export const createTabularPersister = <
deleteEmptyColumns,
deleteEmptyTable,
condition,
timestampColumnName,
],
tableId,
) => {
Expand All @@ -101,6 +109,7 @@ export const createTabularPersister = <
deleteEmptyTable,
partial,
condition,
timestampColumnName,
);
}
},
Expand All @@ -119,6 +128,8 @@ export const createTabularPersister = <
true,
true,
partial,
undefined,
valuesTimestampColumnName,
)
: null;

Expand All @@ -138,6 +149,53 @@ export const createTabularPersister = <
),
);

const loadMergeableTables = async (): Promise<TablesStamp<true>> => {
let tablesTime: string | undefined = undefined;
return stampNewWithHash(objNew(
arrayFilter(
await promiseAll(
mapMap(
tablesLoadConfig,
async (
[tableId, rowIdColumnName, condition, timestampColumnName],
tableName,
) => [
tableId,
await loadTable(tableName, rowIdColumnName, condition).then(
(table) => {
let tableTime: string | undefined = undefined;
return stampNewWithHash(objMap(table, (row) => {
const [getHlc] = getHlcFunctions(uniqueId, () => {
if(!timestampColumnName) return 0;
const value = row[timestampColumnName];
if(typeof value === 'number') return value;
if(isString(value)) return Date.parse(value);
return 0;
});

const rowTime = getHlc();
tableTime = getLatestTime(tableTime, rowTime);
tablesTime = getLatestTime(tablesTime, tableTime);
return stampNewWithHash(objMap(row, (cell) => {
const cellHash = getHash(
jsonStringWithMap(cell ?? null) + ':' + rowTime,
);
return stampNewWithHash(cell, rowTime, cellHash);
}), rowTime, 0);
}), tableTime ?? '', 0);
}),
],
),
),
(pair) => !objIsEmpty(pair[1]),
),
), tablesTime ?? '', 0);
};

// current values structure in the database is one big row
// with columns for values. To make it mergeable, we need to
// convert it to a key/value structure which would then
// enable timestamp for each row.
const loadValues = async (): Promise<Values | null> =>
valuesLoad
? (await loadTable(valuesTableName, DEFAULT_ROW_ID_COLUMN_NAME))[
Expand All @@ -148,11 +206,25 @@ export const createTabularPersister = <
const getPersisted = (): Promise<PersistedContent<Persist> | undefined> =>
transaction(async () => {
await refreshSchema();
const tables = await loadTables();
const values = await loadValues();
return !objIsEmpty(tables) || !isUndefined(values)
? [tables as Tables, values as Values]
: undefined;
if(store.isMergeable() && persist > 1 /* > Persists.StoreOnly*/) {
const tables = await loadMergeableTables();
const values = await loadValues();

if(objIsEmpty(tables) && isUndefined(values)) {
return undefined;
}

return [tables, values];
} else {
const tables = await loadTables();
const values = await loadValues();

if(objIsEmpty(tables) && isUndefined(values)) {
return undefined;
}

return [tables as Tables, values as Values];
}
}) as any;

const setPersisted = (
Expand Down