diff --git a/.github/workflows/aggregate.yaml b/.github/workflows/aggregate.yaml index 8cf84a446..8589ec531 100644 --- a/.github/workflows/aggregate.yaml +++ b/.github/workflows/aggregate.yaml @@ -37,10 +37,20 @@ jobs: - name: Fetch previous output run: ardrive download-file -f 7fa5d4e3-0087-422a-acb3-2e481d98d08b - - name: Run + - name: Copy the initial output + run: cp ./daily_output.json ./daily_output_initial.json + + - name: Run the aggregate command run: bash ".github/workflows/aggregate.sh" + - name: Check for changes in the output + if: always() + run: | + IS_THERE_DIFF="$(jq -n --argjson a "$(cat ./daily_output.json)" --argjson b "$(cat ./daily_output_initial.json)" '$a != $b')" \ + echo "::set-env name=OUTPUT_HAS_CHANGED::$IS_THERE_DIFF" // vs. export OUTPUT_HAS_CHANGED="$IS_THERE_DIFF" (?) + - name: Publish output + if: ${{ env.OUTPUT_HAS_CHANGED == 'true' }} run: ardrive upload-file --local-path ./daily_output.json -F "d62a47c3-0b9d-4442-ac72-252a239d0469" -w /tmp/wallet --no-bundle --upsert - name: Commit GQL cache diff --git a/src/daily_output.test.ts b/src/daily_output.test.ts index 6afe6716d..7717f728a 100644 --- a/src/daily_output.test.ts +++ b/src/daily_output.test.ts @@ -44,7 +44,7 @@ describe('DailyOutput class', () => { it('Throws if the previous block height is ahead of some query result', () => { return expectAsyncErrorThrow({ - promiseToError: output.feedGQLData([ + promiseToError: output.writeOutputFrom([ { cursor: '914100', node: { diff --git a/src/daily_output.ts b/src/daily_output.ts index 88d1ab883..65d537a0b 100644 --- a/src/daily_output.ts +++ b/src/daily_output.ts @@ -33,8 +33,11 @@ export class DailyOutput { private latestTimestamp = getLastTimestamp(); private bundlesTips: { [txId: string]: { tip: number; size: number; address: string } } = {}; private bundleFileCount: { [txId: string]: number } = {}; + private currentHieght: number; - constructor(private heightRange: [number, number]) {} + constructor(private heightRange: [number, number]) { + this.currentHieght = heightRange[0]; + } /** * @param {StakedPSTHolders} stakedPSTHolders key/value of address/tokens above 200 ARDRIVE locked for at least 21600 blocks (~30 days) @@ -46,11 +49,20 @@ export class DailyOutput { /** * @param {GQLEdgeInterface[]} queryResult the edges of ArFSTransactions only - 50 block before the latest */ - public async feedGQLData(queryResult: GQLEdgeInterface[]): Promise { + public async writeOutputFrom(queryResult: GQLEdgeInterface[]): Promise { for (const edge of queryResult) { + if (edge.node.block.height && this.currentHieght !== edge.node.block.height) { + // only after we find a new block, because we are sure that there are no more transactions belonging to an already found bundle + await this.finishAggregatingBlock(); + } + this.currentHieght = edge.node.block.height; + await this.aggregateData(edge); } - await this.finishDataAggregation(); + + this.data.blockHeight = this.heightRange[1]; + + await this.finishAggregatingBlock(); } /** @@ -61,19 +73,18 @@ export class DailyOutput { * - group effort rewards, and * - streak rewards */ - private async finishDataAggregation(): Promise { - console.log(`Finishing data aggregation...`); - + private async finishAggregatingBlock(): Promise { // aggregate +1 file count to the non unbundled bundles const bundleTxIDs = Object.keys(this.bundlesTips); bundleTxIDs.forEach((txId) => { if (!this.bundleFileCount[txId]) { this.sumFile(this.bundlesTips[txId].address); + delete this.bundlesTips[txId]; } }); // calculate change in percentage of the uploaded data and rank position - this.data.blockHeight = this.heightRange[1]; + this.data.blockHeight = this.currentHieght; this.data.timestamp = this.latestTimestamp; this.caclulateChangeOfUploadVolume(); @@ -85,6 +96,8 @@ export class DailyOutput { // TODO: determine if the wallets has uploaded data for 7 days in a row this.data.lastUpdated = Date.now(); + + this.write(); } private caclulateChangeOfUploadVolume(): void { diff --git a/src/gql_cache.ts b/src/gql_cache.ts index 409382fae..018827106 100644 --- a/src/gql_cache.ts +++ b/src/gql_cache.ts @@ -174,6 +174,8 @@ export class GQLCache { if (!this.cacheFolderExists) { return []; } + + // These are not sorted (these are, but the OS is doing the job, might differ within different platforms) const listResult = readdirSync(CACHE_FOLDER); return listResult; } diff --git a/src/index.ts b/src/index.ts index 4f85aa943..a79e0626d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -76,12 +76,13 @@ function run(): void { * @param maxBlock an integer representing the block until where to query the data */ async function aggregateOutputData(minBlock?: number, maxBlock?: number): Promise { - const minimumBlock = minBlock ?? getMinBlockHeight(); + const minimumBlock = minBlock || getMinBlockHeight(); const maximumBlock = maxBlock ?? (await getBlockHeight()); const output = new DailyOutput([minimumBlock, maximumBlock]); const PSTHolders = await getWalletsEligibleForStreak(); await output.feedPSTHolders(PSTHolders); const edges = await getAllArDriveTransactionsWithin(new HeightRange(minimumBlock, maximumBlock)); - await output.feedGQLData(edges); - output.write(); + + // TODO: Could take instead a stream + await output.writeOutputFrom(edges); } diff --git a/src/queries.ts b/src/queries.ts index c3e7cfe3a..b9c3e6f4e 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -52,7 +52,7 @@ async function getStakedPSTHolders(): Promise { */ export async function getAllArDriveTransactionsWithin(range: HeightRange): Promise { const cache = new GQLCache(range); - const nonCachedRanges = cache.getNonCachedRangesWithin().reverse(); + const nonCachedRanges = cache.getNonCachedRangesWithin(); console.log('Height ranges to query are', nonCachedRanges.length);