-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwrapper-index.js
2064 lines (1828 loc) · 72.2 KB
/
wrapper-index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Externals
import { asyncScheduler, concat, from, merge, of, ReplaySubject, Subject, BehaviorSubject } from 'rxjs'
import {
concatMap,
debounceTime,
distinctUntilChanged,
endWith,
filter,
first,
map,
mergeAll,
mergeMap,
publishReplay,
scan,
startWith,
switchMap,
tap,
throttleTime,
withLatestFrom
} from 'rxjs/operators'
import uuidv4 from 'uuid/v4'
import Web3 from 'web3'
import { isAddress } from 'web3-utils'
import dotprop from 'dot-prop'
// APM
import apm from '@aragon/apm'
// RPC
import Messenger from '@aragon/rpc-messenger'
import * as handlers from './rpc/handlers'
// Utilities
import { getApmAppInfo } from './core/apm'
import { makeRepoProxy, getAllRepoVersions, getRepoVersionById } from './core/apm/repo'
import {
getAragonOsInternalAppInfo,
isAragonOsInternalApp
} from './core/aragonOS'
import { getKernelNamespace, isKernelAppCodeNamespace } from './core/aragonOS/kernel'
import { setConfiguration } from './configuration'
import * as configurationKeys from './configuration/keys'
import {
tryDescribingUpdateAppIntent,
tryDescribingUpgradeOrganizationBasket,
tryEvaluatingRadspec
} from './radspec'
import {
addressesEqual,
getCacheKey,
includesAddress,
makeAddressMapProxy,
makeProxy,
makeProxyFromABI,
AsyncRequestCache,
ANY_ENTITY
} from './utils'
import { decodeCallScript, encodeCallScript, isCallScript } from './utils/callscript'
import { isValidForwardCall, parseForwardCall } from './utils/forwarding'
import { doIntentPathsMatch } from './utils/intents'
import {
applyForwardingPretransaction,
createDirectTransaction,
createDirectTransactionForApp,
createForwarderTransactionBuilder,
getRecommendedGasLimit
} from './utils/transactions'
// Templates
import Templates from './templates'
// Cache
import Cache from './cache'
// Local address labels
import { LocalIdentityProvider } from './identity'
// Interfaces
import { getAbi } from './interfaces'
// Try to get an injected web3 provider, return a public one otherwise.
export const detectProvider = () =>
typeof web3 !== 'undefined'
? web3.currentProvider // eslint-disable-line
: 'wss://rinkeby.eth.aragon.network/ws'
/**
* Set up an instance of the template factory that can be used independently
*
* @param {string} from
* The address of the account using the factory.
* @param {Object} options
* Template factory options.
* @param {Object} [options.apm]
* Options for apm.js (see https://github.com/aragon/apm.js)
* @param {string} [options.apm.ensRegistryAddress]
* ENS registry for apm.js
* @param {Object} [options.apm.ipfs]
* IPFS provider config for apm.js
* @param {string} [options.apm.ipfs.gateway]
* IPFS gateway apm.js will use to fetch artifacts from
* @param {Function} [options.defaultGasPriceFn=function]
* A factory function to provide the default gas price for transactions.
* It can return a promise of number string or a number string. The function
* has access to a recommended gas limit which can be used for custom
* calculations. This function can also be used to get a good gas price
* estimation from a 3rd party resource.
* @param {string|Object} [options.provider=web3.currentProvider]
* The Web3 provider to use for blockchain communication. Defaults to `web3.currentProvider`
* if web3 is injected, otherwise will fallback to wss://rinkeby.eth.aragon.network/ws
* @return {Object} Template factory instance
*/
export const setupTemplates = (from, options = {}) => {
const defaultOptions = {
apm: {},
defaultGasPriceFn: () => { },
provider: detectProvider()
}
options = Object.assign(defaultOptions, options)
const web3 = new Web3(options.provider)
return Templates(from, {
web3,
apm: apm(web3, options.apm),
defaultGasPriceFn: options.defaultGasPriceFn
})
}
/**
* An Aragon wrapper.
*
* @param {string} daoAddress
* The address of the DAO.
* @param {Object} options
* Wrapper options.
* @param {Object} [options.apm]
* Options for apm.js (see https://github.com/aragon/apm.js)
* @param {string} [options.apm.ensRegistryAddress]
* ENS registry for apm.js
* @param {Object} [options.apm.ipfs]
* IPFS provider config for apm.js
* @param {string} [options.apm.ipfs.gateway]
* IPFS gateway apm.js will use to fetch artifacts from
* @param {Object} [options.cache]
* Options for the internal cache
* @param {boolean} [options.forceLocalStorage=false]
* Downgrade to localStorage even if IndexedDB is available
* @param {Object} [options.events]
* Options for handling Ethereum events
* @param {boolean} [options.subscriptionEventDelay]
* Time in ms to delay a new event from a contract subscription
* @param {Function} [options.defaultGasPriceFn=function]
* A factory function to provide the default gas price for transactions.
* It can return a promise of number string or a number string. The function
* has access to a recommended gas limit which can be used for custom
* calculations. This function can also be used to get a good gas price
* estimation from a 3rd party resource.
* @param {string|Object} [options.provider=web3.currentProvider]
* The Web3 provider to use for blockchain communication. Defaults to `web3.currentProvider`
* if web3 is injected, otherwise will fallback to wss://rinkeby.eth.aragon.network/ws
*/
export default class Aragon {
constructor (daoAddress, options = {}) {
const defaultOptions = {
apm: {},
defaultGasPriceFn: () => { },
provider: detectProvider(),
cache: {
forceLocalStorage: false
},
events: {
subscriptionDelayTime: 0
}
}
options = Object.assign(defaultOptions, options)
// Set up desired configuration
setConfiguration(
configurationKeys.FORCE_LOCAL_STORAGE,
!!(options.cache && options.cache.forceLocalStorage)
)
setConfiguration(
configurationKeys.SUBSCRIPTION_EVENT_DELAY,
Number.isFinite(options.events && options.events.subscriptionEventDelay)
? options.events.subscriptionEventDelay
: 0
)
// Set up Web3
this.web3 = new Web3(options.provider)
// Set up APM
this.apm = apm(this.web3, options.apm)
// Set up the kernel proxy
this.kernelProxy = makeProxy(daoAddress, 'Kernel', this.web3)
// Set up cache
this.cache = new Cache(daoAddress)
this.defaultGasPriceFn = options.defaultGasPriceFn
}
/**
* Initialise the wrapper.
*
* @param {Object} [options] Options
* @param {Object} [options.accounts] `initAccount()` options (see below)
* @param {Object} [options.acl] `initACL()` options (see below)
* @return {Promise<void>}
* @throws {Error} Will throw an error if the `daoAddress` is detected to not be a Kernel instance
*/
async init (options = {}) {
let aclAddress
try {
// Check if address is kernel
// web3 throws if it's an empty address ('0x')
aclAddress = await this.kernelProxy.call('acl')
} catch (_) {
throw Error(`Provided daoAddress is not a DAO`)
}
await this.cache.init()
await this.kernelProxy.updateInitializationBlock()
await this.initAccounts(options.accounts)
await this.initAcl(Object.assign({ aclAddress }, options.acl))
await this.initIdentityProviders()
this.initApps()
this.initForwarders()
this.initAppIdentifiers()
this.initNetwork()
this.initNotifications()
this.transactions = new Subject()
this.signatures = new Subject()
}
/**
* Initialise the accounts observable.
*
* @param {Object} [options] Options
* @param {boolean} [options.fetchFromWeb3] Whether or not accounts should also be fetched from
* the provided Web3 instance
* @param {Array<string>} [options.providedAccounts] Array of accounts that the user controls
* @return {Promise<void>}
*/
async initAccounts ({ fetchFromWeb3, providedAccounts = [] } = {}) {
this.accounts = new ReplaySubject(1)
const accounts = fetchFromWeb3
? providedAccounts.concat(await this.web3.eth.getAccounts())
: providedAccounts
this.setAccounts(accounts)
}
/**
* Initialise the ACL (Access Control List).
*
* @return {Promise<void>}
*/
async initAcl ({ aclAddress } = {}) {
if (!aclAddress) {
aclAddress = await this.kernelProxy.call('acl')
}
let votingAppAddress = '0x40923e3215243b4a51bf411f9873d02f5bacfd60';
// Set up ACL proxy
this.aclProxy = makeProxy(aclAddress, 'ACL', this.web3, { initializationBlock: this.kernelProxy.initializationBlock })
// Set up Voting proxy
this.votingProxy = makeProxy(votingAppAddress, 'Voting', this.web3, { initializationBlock: this.kernelProxy.initializationBlock })
const SET_PERMISSION_EVENT = 'SetPermission'
const CHANGE_PERMISSION_MANAGER_EVENT = 'ChangePermissionManager'
const ACL_CACHE_KEY = getCacheKey(aclAddress, 'acl')
const VOTING_CACHE_KEY = getCacheKey(votingAppAddress, 'voting')
const REORG_SAFETY_BLOCK_AGE = 100
const currentBlock = await this.web3.eth.getBlockNumber()
const cacheBlockHeight = Math.max(currentBlock - REORG_SAFETY_BLOCK_AGE, 0) // clamp to 0 for safety
// Check if we have cached ACL for this address
// Cache object for an ACL: { permissions, blockNumber }
const cachedAclState = await this.cache.get(ACL_CACHE_KEY, {})
const { permissions: cachedPermissions, blockNumber: cachedBlockNumber } = cachedAclState
const pastEventsOptions = {
toBlock: cacheBlockHeight,
// When using cache, fetch events from the next block after cache
fromBlock: cachedPermissions ? cachedBlockNumber + 1 : undefined
}
this.pastEventsOptions = pastEventsOptions
// modified pastEvents$ to get a promise instead of an observable
async function pastEvents$() {
return new Promise( resolve => {
this.votingProxy.pastEvents(null, pastEventsOptions).pipe(
mergeMap((pastEvents) => from(pastEvents)),
// Custom cache event
// endWith({
// event: VOTING_CACHE_KEY,
// returnValues: {}
// })
)
.subscribe( data => console.log(data))
})
}
this.pastEvents = pastEvents$
async function currentEvents$() {
return new Promise( resolve => {
this.votingProxy.events(null, { fromBlock: cacheBlockHeight + 1 }).pipe(
startWith({
event: 'starting current events',
returnValues: {}
})
)
.subscribe( data => console.log(data))
})
}
this.currentEvents = currentEvents$
// Permissions Object:
// { app -> role -> { manager, allowedEntities -> [ entities with permission ] } }
const fetchedPermissions$ = concat(pastEvents$, currentEvents$).pipe(
scan(([permissions], event) => {
const eventData = event.returnValues
if (eventData.app) {
// NOTE: dotprop.get() doesn't work through proxies, so we manually access permissions
const appPermissions = permissions[eventData.app] || {}
if (event.event === SET_PERMISSION_EVENT) {
const key = `${eventData.role}.allowedEntities`
// Converts to and from a set to avoid duplicated entities
const allowedEntitiesSet = new Set(dotprop.get(appPermissions, key, []))
if (eventData.allowed) {
allowedEntitiesSet.add(eventData.entity)
} else {
allowedEntitiesSet.delete(eventData.entity)
}
dotprop.set(appPermissions, key, Array.from(allowedEntitiesSet))
}
if (event.event === CHANGE_PERMISSION_MANAGER_EVENT) {
// We only care about the last one. An app permission can have only one manager
dotprop.set(appPermissions, `${eventData.role}.manager`, eventData.manager)
}
permissions[eventData.app] = appPermissions
}
return [permissions, event]
}, [ makeAddressMapProxy(cachedPermissions || {}) ]),
// Cache if we're finished syncing up to cache block height
map(([permissions, event]) => {
if (event.event === ACL_CACHE_KEY) {
this.cache.set(
ACL_CACHE_KEY,
// Make copy for cache
{ permissions: Object.assign({}, permissions), blockNumber: cacheBlockHeight }
)
}
return permissions
}),
// Throttle so it only continues after 30ms without new values
// Avoids DDOSing subscribers as during initialization there may be
// hundreds of events processed in a short timespan
debounceTime(30),
publishReplay(1)
)
fetchedPermissions$.connect()
const cachedPermissions$ = cachedPermissions ? of(makeAddressMapProxy(cachedPermissions)) : of()
this.permissions = concat(cachedPermissions$, fetchedPermissions$).pipe(publishReplay(1))
this.permissions.connect()
}
/**
* Check if an object is an app.
*
* @param {Object} app
* @return {boolean}
*/
isApp (app) {
return app.kernelAddress && this.isKernelAddress(app.kernelAddress)
}
/**
* Check if an address is this DAO's kernel.
*
* @param {string} address
* @return {boolean}
*/
isKernelAddress (address) {
return addressesEqual(address, this.kernelProxy.address)
}
/**
* Initialize apps observable.
*
* @return {void}
*/
initApps () {
/******************************
* *
* CACHING *
* *
******************************/
const applicationInfoCache = new AsyncRequestCache((cacheKey) => {
const [appId, codeAddress] = cacheKey.split('.')
return getAragonOsInternalAppInfo(appId) ||
getApmAppInfo(appId) ||
this.apm.getLatestVersionForContract(appId, codeAddress)
})
const proxyContractValueCache = new AsyncRequestCache((proxyAddress) => {
if (this.isKernelAddress(proxyAddress)) {
const kernelProxy = makeProxy(proxyAddress, 'ERCProxy', this.web3)
return Promise.all([
// Use Kernel ABI
this.kernelProxy.call('KERNEL_APP_ID'),
// Use ERC897 proxy ABI
// Note that this won't work on old Aragon Core 0.5 Kernels,
// as they had not implemented ERC897 yet
kernelProxy.call('implementation')
]).then((values) => ({
appId: values[0],
codeAddress: values[1]
}))
}
const appProxy = makeProxy(proxyAddress, 'AppProxy', this.web3)
const appProxyForwarder = makeProxy(proxyAddress, 'Forwarder', this.web3)
return Promise.all([
appProxy.call('kernel'),
appProxy.call('appId'),
appProxy.call('implementation'),
// Not all apps implement the forwarding interface
appProxyForwarder.call('isForwarder').catch(() => false)
]).then((values) => ({
kernelAddress: values[0],
appId: values[1],
codeAddress: values[2],
isForwarder: values[3]
}))
})
/******************************
* *
* APPS *
* *
******************************/
// Get all installed app proxy addresses
const installedApps$ = this.permissions.pipe(
map(Object.keys),
// Dedupe until apps change
distinctUntilChanged((oldProxies, newProxies) => {
if (oldProxies.length !== newProxies.length) {
return false
}
const oldSet = new Set(oldProxies)
const intersection = new Set(newProxies.filter(newProxy => oldSet.has(newProxy)))
return intersection.size === oldSet.size
}),
// Add Kernel as the first "app"
map((proxyAddresses) => {
const appsWithoutKernel = proxyAddresses.filter((address) => !this.isKernelAddress(address))
return [this.kernelProxy.address].concat(appsWithoutKernel)
}),
// Get proxy values
// Note that we can safely discard throttled values,
// so we use a `switchMap()` instead of a `mergeMap()`
switchMap(
(proxyAddresses) => Promise.all(
proxyAddresses.map(async (proxyAddress) => {
let proxyValues
try {
proxyValues = await proxyContractValueCache.request(proxyAddress)
} catch (_) {}
return {
proxyAddress,
...proxyValues
}
})
)
),
// Filter to remove any non-apps assigned in permissions
map(appProxies => appProxies.filter(
(appProxy) => this.isApp(appProxy) || this.isKernelAddress(appProxy.proxyAddress)
))
)
// SetApp events are emitted when apps are installed and upgraded
// These may modify the implementation addresses of the proxies (modifying their behaviour), so
// we invalidate any caching we've done
const updatedApps$ = this.kernelProxy
// Override events subscription with empty options to subscribe from latest block
.events('SetApp', {})
.pipe(
// Only care about changes if they're in the APP_BASE namespace
filter(({ returnValues }) => isKernelAppCodeNamespace(returnValues.namespace)),
// Merge with latest value of installedApps$ so we can return the full list of apps
withLatestFrom(
installedApps$,
function updateApps (setAppEvent, apps) {
const { appId: setAppId } = setAppEvent.returnValues
return apps.map(async (app) => {
if (app.appId !== setAppId) {
return app
}
let proxyValues
try {
proxyValues = await proxyContractValueCache.request(
app.proxyAddress,
true // force cache invalidation
)
} catch (_) {}
return {
...app,
...proxyValues,
updated: true
}
})
}
),
// Emit resolved array of promises, one at a time
concatMap(updatedApps => Promise.all(updatedApps))
)
// We merge these two observables, which both return the full list of apps attached with their
// proxy values:
// - installedApps$: emits any time the list of installed apps changes
// - updatedApps$: emits any time SetApp could modify an installed app
const apps$ = merge(installedApps$, updatedApps$)
// Get artifact info for apps
const appsWithInfo$ = apps$.pipe(
concatMap(
(apps) => Promise.all(
apps.map(async (app) => {
let appInfo
if (app.appId && app.codeAddress) {
const cacheKey = `${app.appId}.${app.codeAddress}`
try {
appInfo = await applicationInfoCache.request(cacheKey)
} catch (_) { }
}
return {
...appInfo,
// Override the fetched appInfo with the actual app proxy's values to avoid mismatches
...app
}
})
)
)
)
this.apps = appsWithInfo$.pipe(
publishReplay(1)
)
this.apps.connect()
/*******************************
* *
* REPOS *
* *
******************************/
// Initialize installed repos from the list of apps
const installedRepoCache = new Map()
const repo$ = apps$.pipe(
// Map installed apps into a deduped list of their aragonPM repos, with these assumptions:
// - No apps are lying about their appId (malicious apps _could_ masquerade as other
// apps by setting this value themselves)
// - `contractAddress`s will stay the same across all installed apps.
// This is technically not true as apps could set this value themselves
// (e.g. as pinned apps do), but these apps wouldn't be able to upgrade anyway
//
// Ultimately returns an array of objects, holding the repo's:
// - appId
// - base contractAddress
map((apps) => Object.values(
apps
.filter(({ appId }) => !isAragonOsInternalApp(appId))
.reduce((installedRepos, { appId, codeAddress, updated }) => {
installedRepos[appId] = {
appId,
updated,
contractAddress: codeAddress
}
return installedRepos
}, {})
)),
// Filter list of installed repos into:
// - New repos we haven't seen before (to begin subscribing to their version events)
// - Repos we've seen before, to trigger a recalculation of the currently installed version
map((repos) => {
const newRepoAppIds = []
const updatedRepoAppIds = []
repos.forEach((repo) => {
const { appId, updated } = repo
if (!installedRepoCache.has(appId)) {
newRepoAppIds.push(appId)
} else if (updated) {
updatedRepoAppIds.push(appId)
}
// Mark repo as seen and cache installed information
installedRepoCache.set(appId, repo)
})
return [newRepoAppIds, updatedRepoAppIds]
}),
// Stop if there's no new repos or updated apps
filter(([newRepoAppIds, updatedRepoAppIds]) =>
newRepoAppIds.length || updatedRepoAppIds.length
),
// Project new repos into their ids and web3 proxy objects
concatMap(async ([newRepoAppIds, updatedRepoAppIds]) => {
const newRepos = (await Promise.all(
newRepoAppIds.map(async (appId) => {
let repoProxy
try {
repoProxy = await makeRepoProxy(appId, this.apm, this.web3)
await repoProxy.updateInitializationBlock()
} catch (err) {
console.error(`Could not find repo for ${appId}`, err)
}
return {
appId,
repoProxy
}
})
))
// Filter out repos we couldn't create proxies for (they were likely due to publishing
// invalid aragonPM repos)
// Note that we don't need to worry about doing this for the updated repos list; if
// we could not create the original repo proxy when we first saw the repo, the updates
// won't do anything because we weren't able to fetch enough information (versions list)
.filter((newRepos) => newRepos.repoProxy)
return [newRepos, updatedRepoAppIds]
}),
// Here's where the fun begins!
// It'll be easy to get lost, so remember to take it slowly.
// Just remember, with this `mergeMap()`, we'll be subscribing to all the projected (returned)
// observables and merging their respective emissions into a single observable.
//
// The output of this merged observable are update events containing the following:
// - `appId`: mandatory, signifies which repo was updated
// - `repoAddress`: optional, address of the repo contract itself
// - `versions`: optional, new version information
mergeMap(([newRepos, updatedRepoAppIds]) => {
// Create a new observable to project each new update as its own update emission.
const update$ = of(...updatedRepoAppIds).pipe(
map((appId) => ({ appId }))
)
// Create a new observable to project each new repo as its own emission.
const newRepo$ = of(...newRepos)
// Create a new observable to project each new repo's address as its own update emission.
const repoAddress$ = newRepo$.pipe(
map(({ appId, repoProxy }) => ({
appId,
repoAddress: repoProxy.address
}))
)
// Create a new observable that projects each NewVersion event as its own update event
// emission.
// This one is a bit trickier, due to the higher order observable. Keep reading.
const version$ = newRepo$.pipe(
// `mergeMap()` to "flatten" the async transformation. This async function returns an
// observable, which is ultimately the NewVersion stream. More on this, after the break.
// Note: we don't care about the ordering, so we use `mergeMap()` instead of `concatMap()`
mergeMap(async ({ appId, repoProxy }) => {
const initialVersions = [
// Immediately query state from the repo contract, to avoid having to wait until all
// past events sync (may be long)
...await getAllRepoVersions(repoProxy)
]
// Return an observable subscribed to NewVersion events, giving us:
// - Timestamps for versions that were published prior to this process running
// - Notifications for newly published versions
//
// Reduce this with the cached version information to emit version updates for the repo.
return repoProxy.events('NewVersion').pipe(
// Project each event to a new version info object, one at a time
concatMap(async (event) => {
const { versionId: eventVersionId } = event.returnValues
// Adjust from Ethereum time
const timestamp = (await this.web3.eth.getBlock(event.blockNumber)).timestamp * 1000
const versionIndex = initialVersions.findIndex(({ versionId }) => versionId === eventVersionId)
const versionInfo =
versionIndex === -1
? await getRepoVersionById(repoProxy, eventVersionId)
: initialVersions[versionIndex]
return {
...versionInfo,
timestamp
}
}),
// Trick to immediately emit (e.g. similar to a do/while loop)
startWith(null),
// Reduce newly emitted versions into the full list of versions
scan(({ appId, versions }, newVersionInfo) => {
let newVersions = versions
if (newVersionInfo) {
const versionIndex = versions.findIndex(({ versionId }) => versionId === newVersionInfo.versionId)
if (versionIndex === -1) {
newVersions = versions.concat(newVersionInfo)
} else {
newVersions = Array.from(versions)
newVersions[versionIndex] = newVersionInfo
}
}
return {
appId,
versions: newVersions
}
}, {
appId,
versions: initialVersions
})
)
}),
// This bit is interesting.
// We've "flattened" our async transformation with the `mergeMap()` above, but it still
// returns an observable. We need to flatten this observable's emissions into the upper
// stream, which is what `mergeAll()` achieves.
mergeAll()
)
// Merge all of the repo update events resulting from the apps being updated, and return it
// to the upper `mergeMap()` so it can be re-flattened into a single event stream.
return merge(repoAddress$, version$, update$)
}),
// Reduce the event stream into a current representation of the installed repos, and which
// repo to update next.
scan(({ repos }, repoUpdate) => {
const { appId: updatedAppId, ...update } = repoUpdate
const updatedRepoInfo = {
...repos[updatedAppId],
...update
}
return {
repos: {
...repos,
[updatedAppId]: updatedRepoInfo
},
updatedRepoAppId: updatedAppId
}
}, {
repos: {},
updatedRepoAppId: null
}),
// Stop if we don't have enough information yet to continue
filter(({ repos, updatedRepoAppId }) =>
!!updatedRepoAppId && Array.isArray(repos[updatedRepoAppId].versions)
),
// Grab the full information of the updated repo using its latest values.
// With this, we've taken the basic stream of updates for events and mapped them onto their
// full repo objects.
concatMap(async ({ repos, updatedRepoAppId: appId }) => {
const { repoAddress, versions } = repos[appId]
const installedRepoInfo = installedRepoCache.get(appId)
const latestVersion = versions[versions.length - 1]
const currentVersion = Array.from(versions)
// Apply reverse to find the latest version with the currently installed contract address
.reverse()
.find(version => version.contractAddress === installedRepoInfo.contractAddress)
// Get info for the current and latest versions of the repo
const currentVersionRequest = applicationInfoCache
.request(`${appId}.${currentVersion.contractAddress}`)
.catch(() => ({}))
.then(content => ({
content,
version: currentVersion.version
}))
const versionInfos = await Promise.all([
currentVersionRequest,
currentVersion.contractAddress === latestVersion.contractAddress
? currentVersionRequest // current version is also the latest, no need to refetch
: applicationInfoCache
.request(`${appId}.${latestVersion.contractAddress}`)
.catch(() => ({}))
.then(content => ({
content,
version: latestVersion.version
}))
])
// Emit updated repo information
return {
appId,
repoAddress,
versions,
currentVersion: versionInfos[0],
latestVersion: versionInfos[1]
}
})
)
this.installedRepos = repo$.pipe(
// Finally, we reduce the merged updates from individual repos into one final, expanding array
// of the installed repos
scan((repos, updatedRepo) => {
const repoIndex = repos.findIndex(repo => repo.repoAddress === updatedRepo.repoAddress)
if (repoIndex === -1) {
return repos.concat(updatedRepo)
} else {
const nextRepos = Array.from(repos)
nextRepos[repoIndex] = updatedRepo
return nextRepos
}
}, []),
// Throttle updates, but must keep trailing to ensure we don't drop any updates
throttleTime(500, asyncScheduler, { leading: false, trailing: true }),
publishReplay(1)
)
this.installedRepos.connect()
}
/**
* Initialise forwarder observable.
*
* @return {void}
*/
initForwarders () {
this.forwarders = this.apps.pipe(
map(
(apps) => apps.filter((app) => app.isForwarder)
),
publishReplay(1)
)
this.forwarders.connect()
}
/**
* Initialise app identifier observable.
*
* @return {void}
*/
initAppIdentifiers () {
this.appIdentifiers = new BehaviorSubject({}).pipe(
scan(
(identifiers, { address, identifier }) =>
Object.assign(identifiers, { [address]: identifier })
),
publishReplay(1)
)
this.appIdentifiers.connect()
}
/**
* Set the identifier of an app.
*
* @param {string} address The proxy address of the app
* @param {string} identifier The identifier of the app
* @return {void}
*/
setAppIdentifier (address, identifier) {
this.appIdentifiers.next({
address,
identifier
})
}
/**
* Initialise identity providers.
*
* @return {Promise<void>}
*/
async initIdentityProviders () {
const defaultIdentityProviders = [{
name: 'local',
provider: new LocalIdentityProvider()
}]
// TODO: detect other installed providers
const detectedIdentityProviders = []
const identityProviders = [...defaultIdentityProviders, ...detectedIdentityProviders]
// Init all providers
await Promise.all(identityProviders.map(({ provider }) => {
// Most providers should have this defined to a noop function by default, but just in case
if (typeof provider.init === 'function') {
return provider.init()
}
return Promise.resolve()
}))
this.identityProviderRegistrar = new Map(
identityProviders.map(({ name, provider }) => [name, provider])
)
// Set up identity modification intent observable
this.identityIntents = new Subject()
}
/**
* Modify the identity metadata for an address using the highest priority provider.
*
* @param {string} address Address to modify
* @param {Object} metadata Modification metadata object
* @return {Promise} Resolves if the modification was successful
*/
modifyAddressIdentity (address, metadata) {
const providerName = 'local'
const provider = this.identityProviderRegistrar.get(providerName)
if (provider && typeof provider.modify === 'function') {
return provider.modify(address, metadata)
}
return Promise.reject(new Error(`Provider (${providerName}) not installed`))
}
/**
* Resolve the identity metadata for an address using the highest priority provider.
*
* @param {string} address Address to resolve
* @return {Promise} Resolves with the identity or null if not found
*/
resolveAddressIdentity (address) {
const providerName = 'local' // TODO - get provider
const provider = this.identityProviderRegistrar.get(providerName)
if (provider && typeof provider.resolve === 'function') {
return provider.resolve(address)
}
return Promise.reject(new Error(`Provider (${providerName}) not installed`))
}
/**
* Search identities based on a term
*
* @param {string} searchTerm
* @return {Promise} Resolves with the identity or null if not found
*/
searchIdentities (searchTerm) {
const providerName = 'local' // TODO - get provider
const provider = this.identityProviderRegistrar.get(providerName)
if (provider && typeof provider.search === 'function') {
return provider.search(searchTerm)
}
return Promise.reject(new Error(`Provider (${providerName}) not installed`))
}
/**
* Request an identity modification using the highest priority provider.
*
* Returns a promise which delegates resolution to the handler
* which listens and handles `this.identityIntents`
*
* @param {string} address Address to modify
* @return {Promise} Reolved by the handler of identityIntents
*/
requestAddressIdentityModification (address) {
const providerName = 'local' // TODO - get provider
if (this.identityProviderRegistrar.has(providerName)) {
return new Promise((resolve, reject) => {
this.identityIntents.next({
address,
providerName,
resolve,