Skip to content
This repository was archived by the owner on Nov 23, 2022. It is now read-only.

Commit 1f2c862

Browse files
committed
feat(store): new implementation
1 parent 9466fda commit 1f2c862

File tree

11 files changed

+243
-158
lines changed

11 files changed

+243
-158
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ npm-debug.log
3737
yarn-error.log
3838
testem.log
3939
/typings
40+
.eslintcache
4041

4142
# System Files
4243
.DS_Store

.husky/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
_

.husky/pre-commit

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/sh
2+
. "$(dirname "$0")/_/husky.sh"
3+
4+
npx lint-staged

package.json

+7-1
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,17 @@
3939
"husky": "^7.0.4",
4040
"jest": "27.2.3",
4141
"lerna": "^4.0.0",
42+
"lint-staged": ">=10",
4243
"prettier": "^2.3.1",
4344
"standard-version": "^9.3.2",
4445
"ts-jest": "27.0.5",
4546
"typescript": "~4.4.3",
4647
"wrtc": "^0.4.7",
47-
"yaml-crypt": "^0.7.6"
48+
"yaml-crypt": "^0.7.6",
49+
"zx": "^4.3.0"
50+
},
51+
"lint-staged": {
52+
"*.{js,ts}": "eslint --cache --fix",
53+
"*.{js,css,md}": "prettier --write"
4854
}
4955
}

packages/explorer/src/app.tsx

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { create } from '@dstack-js/ipfs';
2-
import { Stack } from '@dstack-js/lib';
2+
import { Stack, Shard, ShardKind } from '@dstack-js/lib';
33
import React, { useState } from 'react';
44
import { Dashboard } from './dashboard';
55

@@ -14,10 +14,13 @@ export const App: React.FunctionComponent<{}> = () => {
1414

1515
const ipfs = await create();
1616
const stack = await Stack.create(namespace, ipfs);
17-
stack.debug();
1817

1918
// @ts-expect-error
2019
window.stack = stack;
20+
// @ts-expect-error
21+
window.Shard = Shard;
22+
// @ts-expect-error
23+
window.ShardKind = ShardKind;
2124

2225
setStack(stack);
2326
};

packages/lib/src/errors.ts

+12
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,15 @@ export class TimeoutError extends Error {
33
super('timeout')
44
}
55
}
6+
7+
export class UnknownShardKindError extends Error {
8+
constructor() {
9+
super('unknownShardKind')
10+
}
11+
}
12+
13+
export class InvalidShardPathError extends Error {
14+
constructor(extensions: any = 'invalidShardPath') {
15+
super(extensions)
16+
}
17+
}

packages/lib/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
export * from './store'
22
export * from './stack'
3+
export * from './shard'
34
export * from './errors'

packages/lib/src/pubsub.ts

+16-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,14 @@ export interface Message<T> {
99
}
1010

1111
export class PubSub<T = unknown> {
12+
public subscribed: string[] = []
13+
1214
constructor(public ipfs: IPFS, public namespace: string) { }
1315

16+
public create<T = unknown>(namespace: string): PubSub<T> {
17+
return new PubSub<T>(this.ipfs, `${this.namespace}/${namespace}`)
18+
}
19+
1420
private getTopic(topic: string): string {
1521
return `${this.namespace}.${topic}`
1622
}
@@ -27,14 +33,14 @@ export class PubSub<T = unknown> {
2733
const topic = await this.ipfs.pubsub.ls()
2834

2935
return topic
36+
.map((topic) => topic.replace(`${this.namespace}.`, ''))
3037
.filter((value) => {
31-
if (ignoreInternals && value.includes('$')) {
38+
if (ignoreInternals && value.startsWith('$$')) {
3239
return false
3340
}
3441

3542
return true
3643
})
37-
.map((topic) => topic.replace(`${this.namespace}.`, ''))
3844
}
3945

4046
public async peers(topic: string): Promise<number> {
@@ -44,6 +50,8 @@ export class PubSub<T = unknown> {
4450
}
4551

4652
public async subscribe(topic: string, listener: (msg: Message<T>) => void): Promise<void> {
53+
this.subscribed.push(topic)
54+
4755
await this.ipfs.pubsub.subscribe(this.getTopic(topic), (message) => {
4856
listener({
4957
from: message.from,
@@ -87,6 +95,12 @@ export class PubSub<T = unknown> {
8795
}
8896

8997
public async unsubscribe(topic: string): Promise<void> {
98+
delete this.subscribed[this.subscribed.indexOf(topic)]
99+
90100
await this.ipfs.pubsub.unsubscribe(this.getTopic(topic))
91101
}
102+
103+
public async stop() {
104+
await Promise.all(this.subscribed.map(this.unsubscribe))
105+
}
92106
}

packages/lib/src/shard.ts

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import { CID } from 'multiformats/cid'
2+
import { Buffer } from 'buffer'
3+
import { UnknownShardKindError } from './errors'
4+
import { EventEmitter } from 'events'
5+
import { Stack } from './stack'
6+
import { InvalidShardPathError } from '.'
7+
8+
export enum ShardKind {
9+
// eslint-disable-next-line no-unused-vars
10+
Binary = 'binary',
11+
// eslint-disable-next-line no-unused-vars
12+
Object = 'object'
13+
}
14+
15+
export class Shard<TData = Buffer | Record<string, any>> {
16+
private events = new EventEmitter()
17+
18+
private constructor(public stack: Stack, public cid: CID, public kind: ShardKind, public data: TData) { }
19+
20+
public async put(data: TData): Promise<void> {
21+
switch (this.kind) {
22+
case ShardKind.Binary:
23+
this.cid = await this.stack.ipfs.block.put(new Uint8Array(data as unknown as Buffer))
24+
break
25+
case ShardKind.Object:
26+
this.cid = await this.stack.ipfs.dag.put(data)
27+
break
28+
default:
29+
throw new UnknownShardKindError()
30+
}
31+
32+
this.events.emit('update', this)
33+
}
34+
35+
public on(event: 'update', listener: (shard: Shard<TData>) => void): void {
36+
this.events.on(event, listener)
37+
}
38+
39+
public static async new<TData = unknown>(stack: Stack, kind: ShardKind, data: TData): Promise<Shard<TData>> {
40+
const cid: CID = await stack.ipfs.dag.put({ empty: true })
41+
42+
const shard = new Shard<TData>(stack, cid, kind, data)
43+
shard.put(data)
44+
45+
return shard
46+
}
47+
48+
public static async create<TData = Buffer | Record<string, any>>(stack: Stack, cid: CID, kind: ShardKind): Promise<Shard<TData>> {
49+
let data
50+
51+
switch (kind) {
52+
case ShardKind.Binary:
53+
data = Buffer.from(await stack.ipfs.block.get(cid))
54+
break
55+
case ShardKind.Object:
56+
data = (await stack.ipfs.dag.get(cid)).value
57+
break
58+
default:
59+
throw new UnknownShardKindError()
60+
}
61+
62+
return new Shard(stack, cid, kind, data)
63+
}
64+
65+
public static from<TData = Buffer | Record<string, any>>(stack: Stack, path: string): Promise<Shard<TData>> {
66+
// eslint-disable-next-line no-unused-vars
67+
const [_, shard, cid, kind] = path.split('/')
68+
69+
if (shard !== 'shard') {
70+
throw new InvalidShardPathError()
71+
}
72+
73+
try {
74+
return Shard.create(stack, CID.parse(cid), kind as ShardKind)
75+
} catch (error) {
76+
throw new InvalidShardPathError(error)
77+
}
78+
}
79+
80+
public toString(): string {
81+
return `/shard/${this.cid.toString()}/${this.kind}`
82+
}
83+
}

packages/lib/src/stack.ts

+52-18
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,67 @@
1-
import type { IPFS } from 'ipfs-core'
1+
import { IPFS } from 'ipfs-core-types'
22
import type { libp2p as Libp2p } from 'ipfs-core/src/components/network'
3+
import { CID } from 'multiformats/cid'
4+
import { Store } from '.'
35
import { PubSub } from './pubsub'
4-
import { Store } from './store'
56

67
export interface Peer {
78
id: string
89
address: string
910
}
1011

11-
export class Stack<TMessage = any> {
12+
export interface PeerAnnouncement {
13+
kind: 'announcement'
14+
peer: Peer
15+
}
16+
17+
export type StackPubSubMessage = PeerAnnouncement
18+
19+
export class Stack {
20+
public pubsub: PubSub<StackPubSubMessage>
1221
public store: Store
13-
public pubsub: PubSub<TMessage>
1422

15-
private constructor(public namespace: string, public ipfs: IPFS, public id: string) {
16-
this.pubsub = new PubSub(ipfs, namespace)
17-
this.store = new Store(ipfs, namespace, this.pubsub)
23+
private announceInterval?: ReturnType<typeof setTimeout>
24+
public announce = false
25+
26+
constructor(public namespace: CID, public ipfs: IPFS) {
27+
this.pubsub = new PubSub(ipfs, namespace.toString())
28+
this.store = new Store(this)
29+
}
30+
31+
/**
32+
* get this peer info
33+
*/
34+
public async id(): Promise<Peer> {
35+
const result = await this.ipfs.id()
36+
37+
return {
38+
id: result.id,
39+
address: result.addresses[0].toString()
40+
}
1841
}
1942

2043
private get libp2p() {
2144
return (this.ipfs as any).libp2p as Libp2p
2245
}
2346

47+
public async start(): Promise<void> {
48+
this.announceInterval = setInterval(async () => {
49+
if (!this.announce) return
50+
await this.pubsub.publish('announce', {
51+
kind: 'announcement',
52+
peer: await this.id()
53+
})
54+
}, 250)
55+
56+
await this.store.start()
57+
}
58+
59+
public async stop(): Promise<void> {
60+
if (this.announceInterval) clearInterval(this.announceInterval)
61+
await this.store.stop()
62+
await this.pubsub.stop()
63+
}
64+
2465
/**
2566
* Create stack
2667
*
@@ -29,10 +70,11 @@ export class Stack<TMessage = any> {
2970
* @returns Stack instance
3071
*/
3172
public static async create(namespace: string, ipfs: IPFS) {
32-
const { id } = await ipfs.id()
33-
const stack = new Stack(namespace, ipfs, id)
73+
const cid = await ipfs.dag.put({ namespace })
74+
75+
const stack = new Stack(cid, ipfs)
76+
await stack.start()
3477

35-
await stack.store.start()
3678
return stack
3779
}
3880

@@ -88,12 +130,4 @@ export class Stack<TMessage = any> {
88130
})
89131
})
90132
}
91-
92-
/**
93-
* Start logging debug events
94-
*/
95-
public debug(): void {
96-
this.onPeerConnect((peer) => console.log('New peer connected', peer.id, peer.address))
97-
this.onPeerDisconnected((peer) => console.log('Peer disconnected', peer.id, peer.address))
98-
}
99133
}

0 commit comments

Comments
 (0)