Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deno port #8

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
578 changes: 578 additions & 0 deletions deno/README.md

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions deno/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"name": "flatend",
"description": "Production-ready microservice mesh networks with just a few lines of code.",
"author": "Kenta Iwasaki",
"license": "MIT",
"version": "0.0.8",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"prepare": "yarn tsc",
"test": "TS_NODE_PROJECT=tests/tsconfig.json mocha -r ts-node/register tests/**/*.spec.ts"
},
"dependencies": {
"blake2b": "^2.1.3",
"debug": "^4.1.1",
"ipaddr.js": "^1.9.1",
"object-hash": "^2.0.3",
"tweetnacl": "^1.0.3"
},
"devDependencies": {
"@types/chai": "^4.2.11",
"@types/debug": "^4.1.5",
"@types/ip": "^1.1.0",
"@types/mocha": "^7.0.2",
"@types/node": "^14.0.13",
"@types/object-hash": "^1.3.3",
"chai": "^4.2.0",
"chai-bytes": "^0.1.2",
"core-js": "^3.6.5",
"mocha": "^8.0.1",
"prettier": "^2.0.5",
"ts-node": "^8.10.2",
"typescript": "^3.9.5"
}
}
141 changes: 141 additions & 0 deletions deno/src/context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import { Buffer } from "https://deno.land/std/node/buffer.ts";
import { Duplex, finished } from "./std-node-stream.ts";
import { Stream, STREAM_CHUNK_SIZE } from "./stream.ts";
import { ID } from "./kademlia.ts";
import * as util from 'https://deno.land/std/node/util.ts'
import { DataPacket, Opcode, ServiceResponsePacket } from "./packet.ts";
import { Provider } from "./provider.ts";
import { chunkBuffer } from "./node.ts";

export type BufferEncoding = string

export type Handler = (ctx: Context) => void;

export class Context extends Duplex {
_provider: Provider;
_stream: Stream;
_headersWritten = false;
_headers: { [key: string]: string } = {};

headers: { [key: string]: string };

get id(): ID {
return this._provider.id!;
}

constructor(
provider: Provider,
stream: Stream,
headers: { [key: string]: string }
) {
super();

this._provider = provider;
this._stream = stream;
this.headers = headers;

// pipe stream body to context

setTimeout(async () => {
for await (const frame of this._stream.body) {
this.push(frame);
}
this.push(null);
});

// write stream eof when stream writable is closed

setTimeout(async () => {
await util.promisify(finished)(this, { readable: false });

await this._writeHeader();

const payload = new DataPacket(this._stream.id, Buffer.from([])).encode();
await this._provider.write(
this._provider.rpc.message(
0,
Buffer.concat([Buffer.of(Opcode.Data), payload])
)
);
});
}

header(key: string, val: string): Context {
this._headers[key] = val;
return this;
}

send(data: string | Buffer | Uint8Array) {
this.write(data);
if (!this.writableEnded) this.end();
}

json(data: any) {
this.header("content-type", "application/json");
this.send(JSON.stringify(data));
}

_read(size: number) {
this._stream.body._read(size);
}

async body(opts?: { limit?: number }): Promise<Buffer> {
const limit = opts?.limit ?? 2 ** 16;

let buf = Buffer.from([]);
for await (const chunk of this) {
buf = Buffer.concat([buf, chunk]);
if (buf.byteLength > limit) {
throw new Error(
`Exceeded max allowed body size limit of ${limit} byte(s).`
);
}
}

return buf;
}

async _writeHeader() {
if (!this._headersWritten) {
this._headersWritten = true;

const payload = new ServiceResponsePacket(
this._stream.id,
true,
this._headers
).encode();
await this._provider.write(
this._provider.rpc.message(
0,
Buffer.concat([Buffer.of(Opcode.ServiceResponse), payload])
)
);
}
}

_write(
chunk: any,
encoding: BufferEncoding,
callback: (error?: Error | null) => void
) {
const write = async () => {
await this._writeHeader();

const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);

for (const chunk of chunkBuffer(buf, STREAM_CHUNK_SIZE)) {
const payload = new DataPacket(this._stream.id, chunk).encode();
await this._provider.write(
this._provider.rpc.message(
0,
Buffer.concat([Buffer.of(Opcode.Data), payload])
)
);
}
};

write()
.then(() => callback())
.catch((error) => callback(error));
}
}
192 changes: 192 additions & 0 deletions deno/src/kademlia.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import { Buffer } from "https://deno.land/std/node/buffer.ts";
import * as nacl from "https://deno.land/x/tweetnacl_deno/src/nacl.ts";
import ipaddr from "https://jspm.dev/ipaddr.js";
import { assert } from "https://deno.land/std/testing/asserts.ts";

function BufferCompare(a: Buffer | Uint8Array, b: Buffer|Uint8Array) {
//if (typeof a.compare === 'function') return a.compare(b)
if (a === b) return 0

var x = a.length
var y = b.length

var i = 0
var len = Math.min(x, y)
while (i < len) {
if (a[i] !== b[i]) break

++i
}

if (i !== len) {
x = a[i]
y = b[i]
}

if (x < y) return -1
if (y < x) return 1
return 0
}

type IPv4 = ipaddr.IPv4;
type IPv6 = ipaddr.IPv6;

export enum UpdateResult {
New,
Ok,
Full,
Fail,
}

const leadingZeros = (buf: Uint8Array): number => {
const i = buf.findIndex((b) => b != 0);
if (i === -1) return buf.byteLength * 8;

let b = buf[i] >>> 0;
if (b === 0) return i * 8 + 8;
return i * 8 + ((7 - ((Math.log(b) / Math.LN2) | 0)) | 0);
};

const xor = (a: Uint8Array, b: Uint8Array): Uint8Array => {
const c = Buffer.alloc(Math.min(a.byteLength, b.byteLength));
for (let i = 0; i < c.byteLength; i++) c[i] = a[i] ^ b[i];
return c;
};

export class ID {
publicKey: Uint8Array = Buffer.alloc(nacl.SignLength.PublicKey);
host: IPv4 | IPv6;
port: number = 0;

constructor(publicKey: Uint8Array, host: IPv4 | IPv6, port: number) {
this.publicKey = publicKey;
this.host = host;
this.port = port;
}

get addr(): string {
let host = this.host;
if (host.kind() === "ipv6" && (<IPv6>host).isIPv4MappedAddress()) {
host = (<IPv6>host).toIPv4Address();
}
return host.toString() + ":" + this.port;
}

public encode(): Buffer {
let host = Buffer.of(...this.host.toByteArray());
host = Buffer.concat([Buffer.of(host.byteLength === 4 ? 0 : 1), host]);

const port = Buffer.alloc(2);
port.writeUInt16BE(this.port);

return Buffer.concat([this.publicKey, host, port]);
}

public static decode(buf: Buffer): [ID, Buffer] {
const publicKey = Uint8Array.from(buf.slice(0, nacl.SignLength.PublicKey));
buf = buf.slice(nacl.SignLength.PublicKey);

const hostHeader = buf.readUInt8();
buf = buf.slice(1);

assert(hostHeader === 0 || hostHeader === 1);

const hostLen = hostHeader === 0 ? 4 : 16;
const host = ipaddr.fromByteArray([...buf.slice(0, hostLen)]);
buf = buf.slice(hostLen);

const port = buf.readUInt16BE();
buf = buf.slice(2);

return [new ID(publicKey, host, port), buf];
}
}

export class Table {
buckets: Array<Array<ID>> = [
...Array(nacl.SignLength.PublicKey * 8),
].map(() => []);

pub: Uint8Array;
cap: number = 16;
length: number = 0;

public constructor(
pub: Uint8Array = Buffer.alloc(nacl.SignLength.PublicKey)
) {
this.pub = pub;
}

private bucketIndex(pub: Uint8Array): number {
if (BufferCompare(pub, this.pub) === 0) return 0;
return leadingZeros(xor(pub, this.pub));
}

public update(id: ID): UpdateResult {
if (BufferCompare(id.publicKey, this.pub) === 0) return UpdateResult.Fail;

const bucket = this.buckets[this.bucketIndex(id.publicKey)];

const i = bucket.findIndex(
(item) => BufferCompare(item.publicKey, id.publicKey) === 0
);
if (i >= 0) {
bucket.unshift(...bucket.splice(i, 1));
return UpdateResult.Ok;
}

if (bucket.length < this.cap) {
bucket.unshift(id);
this.length++;
return UpdateResult.New;
}
return UpdateResult.Full;
}

public delete(pub: Uint8Array): boolean {
const bucket = this.buckets[this.bucketIndex(pub)];
const i = bucket.findIndex((id) => BufferCompare(id.publicKey, pub) === 0);
if (i >= 0) {
bucket.splice(i, 1);
this.length--;
return true;
}
return false;
}

public has(pub: Uint8Array): boolean {
const bucket = this.buckets[this.bucketIndex(pub)];
return !!bucket.find((id) => BufferCompare(id.publicKey, pub) === 0);
}

public closestTo(pub: Uint8Array, k = this.cap): ID[] {
const closest: ID[] = [];

const fill = (i: number) => {
const bucket = this.buckets[i];
for (let i = 0; closest.length < k && i < bucket.length; i++) {
if (BufferCompare(bucket[i].publicKey, pub) != 0)
closest.push(bucket[i]);
}
};

const m = this.bucketIndex(pub);

fill(m);

for (
let i = 1;
closest.length < k && (m - i >= 0 || m + i < this.buckets.length);
i++
) {
if (m - i >= 0) fill(m - i);
if (m + i < this.buckets.length) fill(m + i);
}

closest.sort((a: ID, b: ID) =>
BufferCompare(xor(a.publicKey, pub), xor(b.publicKey, pub))
);

return closest.length > k ? closest.slice(0, k) : closest;
}
}
14 changes: 14 additions & 0 deletions deno/src/mod.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export { Node, generateSecretKey } from "./node.ts";
export { Context } from "./context.ts";
export { ID, Table, UpdateResult } from "./kademlia.ts";
export { getAvailableAddress, splitHostPort } from "./net.ts";
export { Provider } from "./provider.ts";
export { x25519, serverHandshake, clientHandshake, Session } from "./session.ts";
export {
drain,
lengthPrefixed,
prefixLength,
RPC,
Stream,
Streams,
} from "./stream.ts";
Loading