Skip to content
Open
12 changes: 11 additions & 1 deletion .github/workflows/aggregate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,20 @@ jobs:
- name: Fetch previous output
run: ardrive download-file -f 7fa5d4e3-0087-422a-acb3-2e481d98d08b

- name: Run
- name: Copy the initial output
run: cp ./daily_output.json ./daily_output_initial.json

- name: Run the aggregate command
run: bash ".github/workflows/aggregate.sh"

- name: Check for changes in the output
if: always()
run: |
IS_THERE_DIFF="$(jq -n --argjson a "$(cat ./daily_output.json)" --argjson b "$(cat ./daily_output_initial.json)" '$a != $b')" \
echo "::set-env name=OUTPUT_HAS_CHANGED::$IS_THERE_DIFF" // vs. export OUTPUT_HAS_CHANGED="$IS_THERE_DIFF" (?)

- name: Publish output
if: ${{ env.OUTPUT_HAS_CHANGED == 'true' }}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great addition!

run: ardrive upload-file --local-path ./daily_output.json -F "d62a47c3-0b9d-4442-ac72-252a239d0469" -w /tmp/wallet --no-bundle --upsert

- name: Commit GQL cache
Expand Down
2 changes: 1 addition & 1 deletion src/daily_output.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ describe('DailyOutput class', () => {

it('Throws if the previous block height is ahead of some query result', () => {
return expectAsyncErrorThrow({
promiseToError: output.feedGQLData([
promiseToError: output.writeOutputFrom([
{
cursor: '914100',
node: {
Expand Down
27 changes: 20 additions & 7 deletions src/daily_output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ export class DailyOutput {
private latestTimestamp = getLastTimestamp();
private bundlesTips: { [txId: string]: { tip: number; size: number; address: string } } = {};
private bundleFileCount: { [txId: string]: number } = {};
private currentHieght: number;

constructor(private heightRange: [number, number]) {}
constructor(private heightRange: [number, number]) {
this.currentHieght = heightRange[0];
}

/**
* @param {StakedPSTHolders} stakedPSTHolders key/value of address/tokens above 200 ARDRIVE locked for at least 21600 blocks (~30 days)
Expand All @@ -46,11 +49,20 @@ export class DailyOutput {
/**
* @param {GQLEdgeInterface[]} queryResult the edges of ArFSTransactions only - 50 block before the latest
*/
public async feedGQLData(queryResult: GQLEdgeInterface[]): Promise<void> {
public async writeOutputFrom(queryResult: GQLEdgeInterface[]): Promise<void> {
for (const edge of queryResult) {
if (edge.node.block.height && this.currentHieght !== edge.node.block.height) {
// only after we find a new block, because we are sure that there are no more transactions belonging to an already found bundle
await this.finishAggregatingBlock();
}
this.currentHieght = edge.node.block.height;

await this.aggregateData(edge);
}
await this.finishDataAggregation();

this.data.blockHeight = this.heightRange[1];

await this.finishAggregatingBlock();
}

/**
Expand All @@ -61,19 +73,18 @@ export class DailyOutput {
* - group effort rewards, and
* - streak rewards
*/
private async finishDataAggregation(): Promise<void> {
console.log(`Finishing data aggregation...`);

private async finishAggregatingBlock(): Promise<void> {
// aggregate +1 file count to the non unbundled bundles
const bundleTxIDs = Object.keys(this.bundlesTips);
bundleTxIDs.forEach((txId) => {
if (!this.bundleFileCount[txId]) {
this.sumFile(this.bundlesTips[txId].address);
delete this.bundlesTips[txId];
}
});

// calculate change in percentage of the uploaded data and rank position
this.data.blockHeight = this.heightRange[1];
this.data.blockHeight = this.currentHieght;
this.data.timestamp = this.latestTimestamp;

this.caclulateChangeOfUploadVolume();
Expand All @@ -85,6 +96,8 @@ export class DailyOutput {
// TODO: determine if the wallets has uploaded data for 7 days in a row

this.data.lastUpdated = Date.now();

this.write();
}

private caclulateChangeOfUploadVolume(): void {
Expand Down
2 changes: 2 additions & 0 deletions src/gql_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ export class GQLCache {
if (!this.cacheFolderExists) {
return [];
}

// These are not sorted (these are, but the OS is doing the job, might differ within different platforms)
const listResult = readdirSync(CACHE_FOLDER);
return listResult;
}
Expand Down
7 changes: 4 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ function run(): void {
* @param maxBlock an integer representing the block until where to query the data
*/
async function aggregateOutputData(minBlock?: number, maxBlock?: number): Promise<void> {
const minimumBlock = minBlock ?? getMinBlockHeight();
const minimumBlock = minBlock || getMinBlockHeight();
const maximumBlock = maxBlock ?? (await getBlockHeight());
const output = new DailyOutput([minimumBlock, maximumBlock]);
const PSTHolders = await getWalletsEligibleForStreak();
await output.feedPSTHolders(PSTHolders);
const edges = await getAllArDriveTransactionsWithin(new HeightRange(minimumBlock, maximumBlock));
await output.feedGQLData(edges);
output.write();

// TODO: Could take instead a stream
await output.writeOutputFrom(edges);
}
2 changes: 1 addition & 1 deletion src/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async function getStakedPSTHolders(): Promise<StakedPSTHolders> {
*/
export async function getAllArDriveTransactionsWithin(range: HeightRange): Promise<GQLEdgeInterface[]> {
const cache = new GQLCache(range);
const nonCachedRanges = cache.getNonCachedRangesWithin().reverse();
const nonCachedRanges = cache.getNonCachedRangesWithin();
Comment thread
matibat marked this conversation as resolved.

console.log('Height ranges to query are', nonCachedRanges.length);

Expand Down