Skip to content

feat(DHT): naive ip checking on wss connection #3033

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions packages/autocertifier-server/src/StreamrChallenger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
createOutgoingHandshaker
} from '@streamr/dht'
import { toProtoRpcClient } from '@streamr/proto-rpc'
import { Logger } from '@streamr/utils'
import { ipv4ToNumber, Logger } from '@streamr/utils'
import { FailedToConnectToStreamrWebSocket, AutoCertifierRpcClient, SERVICE_ID } from '@streamr/autocertifier-client'

const logger = new Logger(module)
Expand All @@ -22,6 +22,7 @@ const logger = new Logger(module)
const LOCAL_PEER_DESCRIPTOR: PeerDescriptor = {
nodeId: toDhtAddressRaw(randomDhtAddress()),
type: NodeType.NODEJS,
ipAddress: ipv4ToNumber('127.0.0.1') // ????
}

// TODO: use async/await
Expand All @@ -38,7 +39,8 @@ export const runStreamrChallenge = (
host: streamrWebSocketIp,
port: parseInt(streamrWebSocketPort),
tls: true
}
},
ipAddress: ipv4ToNumber(streamrWebSocketIp)
}
const socket = new WebsocketClientConnection()
const address = 'wss://' + remotePeerDescriptor.websocket!.host + ':' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
toDhtAddressRaw
} from '@streamr/dht'
import path from 'path'
import { MetricsContext, until } from '@streamr/utils'
import { ipv4ToNumber, MetricsContext, until } from '@streamr/utils'

describe('StreamrChallenger', () => {

Expand All @@ -28,7 +28,8 @@ describe('StreamrChallenger', () => {
host: '127.0.0.1',
port: 12323,
tls: false
}
},
ipAddress: ipv4ToNumber('127.0.0.1')
}
const sessionId = 'sessionId'
const rpcMethod = async (): Promise<HasSessionResponse> => {
Expand Down
3 changes: 2 additions & 1 deletion packages/dht/protos/DhtRpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ message PeerDescriptor {
ConnectivityMethod tcp = 4;
ConnectivityMethod websocket = 5;
optional uint32 region = 6;
optional uint32 ipAddress = 7;
uint32 ipAddress = 7;
optional bytes publicKey = 8;
// signature of fields 2-8
optional bytes signature = 9;
Expand Down Expand Up @@ -238,6 +238,7 @@ enum HandshakeError {
DUPLICATE_CONNECTION = 0;
INVALID_TARGET_PEER_DESCRIPTOR = 1;
UNSUPPORTED_PROTOCOL_VERSION = 2;
INVALID_IP_ADDRESS = 3;
}

// Wraps all messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import { ConnectivityResponse, HandshakeError, PeerDescriptor } from '../../../generated/packages/dht/protos/DhtRpc'
import { NatType, PortRange, TlsCertificate } from '../ConnectionManager'
import { ITransport } from '../../transport/ITransport'
import { ipv4ToNumber, Logger, wait } from '@streamr/utils'
import { ipv4ToNumber, Logger, numberToIpv4, wait } from '@streamr/utils'
import { attachConnectivityRequestHandler, DISABLE_CONNECTIVITY_PROBE } from '../connectivityRequestHandler'
import { WebsocketServerConnection } from './WebsocketServerConnection'
import { ConnectionType, IConnection } from '../IConnection'
Expand Down Expand Up @@ -138,6 +138,9 @@
} else if (targetPeerDescriptor && !areEqualPeerDescriptors(this.localPeerDescriptor!, targetPeerDescriptor)) {
rejectHandshake(pendingConnection, websocketServerConnection, handshaker, HandshakeError.INVALID_TARGET_PEER_DESCRIPTOR)
delFunc()
} else if (numberToIpv4(remotePeerDescriptor.ipAddress) !== (websocketServerConnection as WebsocketServerConnection).getRemoteIpAddress()) {

Check failure on line 141 in packages/dht/src/connection/websocket/WebsocketServerConnector.ts

View workflow job for this annotation

GitHub Actions / lint

This line has a length of 152. Maximum allowed is 150
rejectHandshake(pendingConnection, websocketServerConnection, handshaker, HandshakeError.INVALID_IP_ADDRESS)
delFunc()
} else {
acceptHandshake(handshaker, pendingConnection, websocketServerConnection)
}
Expand All @@ -148,6 +151,8 @@
rejectHandshake(pendingConnection, websocketServerConnection, handshaker, HandshakeError.UNSUPPORTED_PROTOCOL_VERSION)
} else if (targetPeerDescriptor && !areEqualPeerDescriptors(this.localPeerDescriptor!, targetPeerDescriptor)) {
rejectHandshake(pendingConnection, websocketServerConnection, handshaker, HandshakeError.INVALID_TARGET_PEER_DESCRIPTOR)
} else if (numberToIpv4(remotePeerDescriptor.ipAddress) !== (websocketServerConnection as WebsocketServerConnection).getRemoteIpAddress()) {

Check failure on line 154 in packages/dht/src/connection/websocket/WebsocketServerConnector.ts

View workflow job for this annotation

GitHub Actions / lint

This line has a length of 152. Maximum allowed is 150
rejectHandshake(pendingConnection, websocketServerConnection, handshaker, HandshakeError.INVALID_IP_ADDRESS)
} else if (this.options.onNewConnection(pendingConnection)) {
acceptHandshake(handshaker, pendingConnection, websocketServerConnection)
} else {
Expand Down
5 changes: 3 additions & 2 deletions packages/dht/test/integration/ConnectionManager.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Logger, MetricsContext, until, waitForEvent3 } from '@streamr/utils'
import { ipv4ToNumber, Logger, MetricsContext, until, waitForEvent3 } from '@streamr/utils'
import { MarkOptional } from 'ts-essentials'
import { ConnectionManager } from '../../src/connection/ConnectionManager'
import { DefaultConnectorFacade, DefaultConnectorFacadeOptions } from '../../src/connection/ConnectorFacade'
Expand Down Expand Up @@ -386,7 +386,8 @@ describe('ConnectionManager', () => {
// This is not the correct nodeId of peerDescriptor2
nodeId: toDhtAddressRaw(randomDhtAddress()),
type: NodeType.NODEJS,
websocket: peerDescriptor2.websocket
websocket: peerDescriptor2.websocket,
ipAddress: ipv4ToNumber('127.0.0.1')
},
body: {
oneofKind: 'rpcMessage',
Expand Down
7 changes: 2 additions & 5 deletions packages/dht/test/integration/RouteMessage.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { DhtNode, Events as DhtNodeEvents } from '../../src/dht/DhtNode'
import { Message, NodeType, PeerDescriptor, RouteMessageWrapper } from '../../generated/packages/dht/protos/DhtRpc'

Check failure on line 2 in packages/dht/test/integration/RouteMessage.test.ts

View workflow job for this annotation

GitHub Actions / lint

'NodeType' is defined but never used. Allowed unused vars must match /^_/u
import { RpcMessage } from '../../generated/packages/proto-rpc/protos/ProtoRpc'
import { Logger, runAndWaitForEvents3, until } from '@streamr/utils'
import { createMockConnectionDhtNode, createWrappedClosestPeersRequest } from '../utils/utils'
import { createMockConnectionDhtNode, createMockPeerDescriptor, createWrappedClosestPeersRequest } from '../utils/utils'

Check failure on line 5 in packages/dht/test/integration/RouteMessage.test.ts

View workflow job for this annotation

GitHub Actions / lint

'createMockPeerDescriptor' is defined but never used. Allowed unused vars must match /^_/u
import { Simulator } from '../../src/connection/simulator/Simulator'
import { v4 } from 'uuid'
import { Any } from '../../generated/google/protobuf/any'
import { RoutingMode } from '../../src/dht/routing/RoutingSession'
import { DhtAddress, randomDhtAddress, toDhtAddressRaw } from '../../src/identifiers'

Check failure on line 10 in packages/dht/test/integration/RouteMessage.test.ts

View workflow job for this annotation

GitHub Actions / lint

'toDhtAddressRaw' is defined but never used. Allowed unused vars must match /^_/u

const logger = new Logger(module)

Expand All @@ -27,10 +27,7 @@
simulator = new Simulator()
entryPoint = await createMockConnectionDhtNode(simulator, randomDhtAddress())

entryPointDescriptor = {
nodeId: toDhtAddressRaw(entryPoint.getNodeId()),
type: NodeType.NODEJS
}
entryPointDescriptor = entryPoint.getLocalPeerDescriptor()

sourceNode = await createMockConnectionDhtNode(simulator, randomDhtAddress())
destinationNode = await createMockConnectionDhtNode(simulator, randomDhtAddress())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { ITransport } from '../../src/transport/ITransport'
import * as Err from '../../src/helpers/errors'
import { SimulatorTransport } from '../../src/connection/simulator/SimulatorTransport'
import { DefaultConnectorFacade } from '../../src/connection/ConnectorFacade'
import { MetricsContext } from '@streamr/utils'
import { ipv4ToNumber, MetricsContext } from '@streamr/utils'
import { createMockPeerDescriptor } from '../utils/utils'

const createConnectionManager = (localPeerDescriptor: PeerDescriptor, transport: ITransport) => {
Expand Down Expand Up @@ -181,6 +181,7 @@ describe('WebRTC Connection Management', () => {
msg.targetDescriptor = {
nodeId: new Uint8Array([0, 0, 0, 0, 0]),
type: NodeType.NODEJS,
ipAddress: ipv4ToNumber('127.0.0.1')
}

await Promise.allSettled([
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { MetricsContext, until, waitForEvent3 } from '@streamr/utils'
import { ipv4ToNumber, MetricsContext, until, waitForEvent3 } from '@streamr/utils'

Check failure on line 1 in packages/dht/test/integration/WebsocketConnectionManagement.test.ts

View workflow job for this annotation

GitHub Actions / lint

'ipv4ToNumber' is defined but never used. Allowed unused vars must match /^_/u
import { ConnectionManager } from '../../src/connection/ConnectionManager'
import { DefaultConnectorFacade, DefaultConnectorFacadeOptions } from '../../src/connection/ConnectorFacade'
import { Simulator } from '../../src/connection/simulator/Simulator'
Expand All @@ -8,6 +8,7 @@
import { RpcMessage } from '../../generated/packages/proto-rpc/protos/ProtoRpc'
import { TransportEvents } from '../../src/transport/ITransport'
import { toNodeId } from '../../src/identifiers'
import { createMockPeerDescriptor } from '../utils/utils'

const SERVICE_ID = 'test'

Expand All @@ -28,23 +29,15 @@
let noWsServerManager: ConnectionManager
let biggerNoWsServerManager: ConnectionManager
const simulator = new Simulator()
const wsServerConnectorPeerDescriptor: PeerDescriptor = {
nodeId: new Uint8Array([2]),
type: NodeType.NODEJS,
const wsServerConnectorPeerDescriptor = createMockPeerDescriptor({
websocket: {
host: '127.0.0.1',
port: 12223,
tls: false
}
}
const noWsServerConnectorPeerDescriptor: PeerDescriptor = {
nodeId: new Uint8Array([1]),
type: NodeType.NODEJS,
}
const biggerNoWsServerConnectorPeerDescriptor: PeerDescriptor = {
nodeId: new Uint8Array([3]),
type: NodeType.NODEJS,
}
})
const noWsServerConnectorPeerDescriptor = createMockPeerDescriptor()
const biggerNoWsServerConnectorPeerDescriptor = createMockPeerDescriptor()

let connectorTransport1: SimulatorTransport
let connectorTransport2: SimulatorTransport
Expand Down Expand Up @@ -134,10 +127,7 @@
rpcMessage: RpcMessage.create()
},
messageId: 'mockerer',
targetDescriptor: {
nodeId: new Uint8Array([1, 2, 4]),
type: NodeType.NODEJS
}
targetDescriptor: createMockPeerDescriptor()
}

await Promise.allSettled([
Expand Down
5 changes: 3 additions & 2 deletions packages/dht/test/unit/DiscoverySession.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Multimap, wait } from '@streamr/utils'
import { ipv4ToNumber, Multimap, wait } from '@streamr/utils'
import sampleSize from 'lodash/sampleSize'
import { DhtNodeRpcRemote } from '../../src/dht/DhtNodeRpcRemote'
import { PeerManager, getDistance } from '../../src/dht/PeerManager'
Expand All @@ -17,7 +17,8 @@ const QUERY_BATCH_SIZE = 5 // the default value in DhtNode's options, not relev
const createPeerDescriptor = (nodeId: DhtAddress): PeerDescriptor => {
return {
nodeId: toDhtAddressRaw(nodeId),
type: NodeType.NODEJS
type: NodeType.NODEJS,
ipAddress: ipv4ToNumber('127.0.0.1')
}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/dht/test/unit/PeerManager.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { until } from '@streamr/utils'
import { ipv4ToNumber, until } from '@streamr/utils'
import range from 'lodash/range'
import sample from 'lodash/sample'
import sampleSize from 'lodash/sampleSize'
Expand Down Expand Up @@ -36,7 +36,7 @@ const createPeerManager = (
createDhtNodeRpcRemote: (peerDescriptor: PeerDescriptor) => createDhtNodeRpcRemote(peerDescriptor, localPeerDescriptor, pingFailures),
hasConnection: () => false
} as any)
const contacts = nodeIds.map((n) => ({ nodeId: toDhtAddressRaw(n), type: NodeType.NODEJS }))
const contacts = nodeIds.map((n) => ({ nodeId: toDhtAddressRaw(n), type: NodeType.NODEJS, ipAddress: ipv4ToNumber('127.0.0.1') }))
for (const contact of contacts) {
manager.addContact(contact)
}
Expand Down
11 changes: 7 additions & 4 deletions packages/dht/test/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import { v4 } from 'uuid'
import { getRandomRegion } from '../../src/connection/simulator/pings'
import { Empty } from '../../generated/google/protobuf/empty'
import { Any } from '../../generated/google/protobuf/any'
import { wait, until } from '@streamr/utils'
import { wait, until, ipv4ToNumber } from '@streamr/utils'
import { SimulatorTransport } from '../../src/connection/simulator/SimulatorTransport'
import { DhtAddress, randomDhtAddress, toDhtAddressRaw } from '../../src/identifiers'

export const createMockPeerDescriptor = (opts?: Partial<Omit<PeerDescriptor, 'nodeId'>>): PeerDescriptor => {
return {
nodeId: toDhtAddressRaw(randomDhtAddress()),
type: NodeType.NODEJS,
ipAddress: ipv4ToNumber('127.0.0.1'),
...opts
}
}
Expand All @@ -48,8 +49,8 @@ export const createMockRingNode = async (
const peerDescriptor: PeerDescriptor = {
nodeId: toDhtAddressRaw(nodeId ?? randomDhtAddress()),
type: NodeType.NODEJS,
region
//ipAddress: ipv4ToNumber(ipAddress)
region,
ipAddress: ipv4ToNumber('127.0.0.1')
}
const mockConnectionManager = new SimulatorTransport(peerDescriptor, simulator)
await mockConnectionManager.start()
Expand Down Expand Up @@ -82,7 +83,8 @@ export const createMockConnectionDhtNode = async (
const peerDescriptor: PeerDescriptor = {
nodeId: toDhtAddressRaw(nodeId ?? randomDhtAddress()),
type: NodeType.NODEJS,
region: getRandomRegion()
region: getRandomRegion(),
ipAddress: ipv4ToNumber('127.0.0.1')
}
const mockConnectionManager = new SimulatorTransport(peerDescriptor, simulator)
await mockConnectionManager.start()
Expand Down Expand Up @@ -114,6 +116,7 @@ export const createMockConnectionLayer1Node = async (
const descriptor: PeerDescriptor = {
nodeId: layer0Node.getLocalPeerDescriptor().nodeId,
type: NodeType.NODEJS,
ipAddress: ipv4ToNumber('127.0.0.1')
}
const node = new DhtNode({
peerDescriptor: descriptor,
Expand Down
4 changes: 3 additions & 1 deletion packages/node/bin/entry-point.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#!/usr/bin/env node
import { config as CHAIN_CONFIG } from '@streamr/config'
import { DhtAddress, DhtNode, NodeType, toDhtAddressRaw } from '@streamr/dht'
import { ipv4ToNumber } from '@streamr/utils'

const main = async () => {
const entryPoint = CHAIN_CONFIG.dev2.entryPoints[0]
const peerDescriptor = {
...entryPoint,
nodeId: toDhtAddressRaw(entryPoint.nodeId as DhtAddress),
type: NodeType.NODEJS // TODO remove this when NET-1070 done
type: NodeType.NODEJS, // TODO remove this when NET-1070 done
ipAddress: ipv4ToNumber(entryPoint.websocket.host)
}
const dhtNode = new DhtNode({
nodeId: entryPoint.nodeId as DhtAddress,
Expand Down
3 changes: 2 additions & 1 deletion packages/sdk/src/utils/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { DhtAddress, NodeType, PeerDescriptor, toDhtAddress, toDhtAddressRaw } from '@streamr/dht'
import {
LengthPrefixedFrameDecoder,
Logger, StreamID, TheGraphClient, composeAbortSignals, merge,
Logger, StreamID, TheGraphClient, composeAbortSignals, ipv4ToNumber, merge,
randomString, toEthereumAddress, toStreamID
} from '@streamr/utils'
import { ContractTransactionReceipt } from 'ethers'
Expand Down Expand Up @@ -130,6 +130,7 @@ export function peerDescriptorTranslator(json: NetworkPeerDescriptor): PeerDescr
...json,
nodeId: toDhtAddressRaw((json.nodeId ?? (json as any).id) as DhtAddress),
type,
ipAddress: ipv4ToNumber('127.0.0.1'), //????
websocket: json.websocket
}
if ((peerDescriptor as any).id !== undefined) {
Expand Down
5 changes: 3 additions & 2 deletions packages/sdk/test/end-to-end/publish-subscribe.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { config as CHAIN_CONFIG } from '@streamr/config'
import { DhtAddress, NodeType, toDhtAddressRaw } from '@streamr/dht'
import { createTestPrivateKey, createTestWallet } from '@streamr/test-utils'
import { createNetworkNode } from '@streamr/trackerless-network'
import { StreamID, toStreamPartID, until } from '@streamr/utils'
import { ipv4ToNumber, StreamID, toStreamPartID, until } from '@streamr/utils'
import { Wallet } from 'ethers'
import { Stream } from '../../src/Stream'
import { StreamrClient } from '../../src/StreamrClient'
Expand All @@ -18,7 +18,8 @@ async function startNetworkNodeAndListenForAtLeastOneMessage(streamId: StreamID)
const entryPoints = CHAIN_CONFIG.dev2.entryPoints.map((entryPoint) => ({
...entryPoint,
nodeId: toDhtAddressRaw(entryPoint.nodeId as DhtAddress),
type: NodeType.NODEJS
type: NodeType.NODEJS,
ipAddress: ipv4ToNumber('127.0.0.1')
}))
const networkNode = createNetworkNode({
layer0: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,16 @@ import {
StreamMessage
} from '../../generated/packages/trackerless-network/protos/NetworkRpc'
import { ContentDeliveryRpcClient } from '../../generated/packages/trackerless-network/protos/NetworkRpc.client'
import { createStreamMessage } from '../utils/utils'
import { createMockPeerDescriptor, createStreamMessage } from '../utils/utils'
import { randomUserId } from '@streamr/test-utils'

describe('ContentDeliveryRpcRemote', () => {
let mockServerRpc: ListeningRpcCommunicator
let clientRpc: ListeningRpcCommunicator
let rpcRemote: ContentDeliveryRpcRemote

const clientNode: PeerDescriptor = {
nodeId: new Uint8Array([1, 1, 1]),
type: NodeType.NODEJS
}
const serverNode: PeerDescriptor = {
nodeId: new Uint8Array([2, 2, 2]),
type: NodeType.NODEJS
}
const clientNode = createMockPeerDescriptor()
const serverNode = createMockPeerDescriptor()

let recvCounter: number

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,15 @@ import {
import {
HandshakeRpcClient,
} from '../../generated/packages/trackerless-network/protos/NetworkRpc.client'
import { createMockPeerDescriptor } from '../utils/utils'

describe('HandshakeRpcRemote', () => {
let mockServerRpc: ListeningRpcCommunicator
let clientRpc: ListeningRpcCommunicator
let rpcRemote: HandshakeRpcRemote

const clientNode: PeerDescriptor = {
nodeId: new Uint8Array([1, 1, 1]),
type: NodeType.NODEJS
}
const serverNode: PeerDescriptor = {
nodeId: new Uint8Array([2, 2, 2]),
type: NodeType.NODEJS
}
const clientNode = createMockPeerDescriptor()
const serverNode = createMockPeerDescriptor()

let simulator: Simulator
let mockConnectionManager1: SimulatorTransport
Expand Down
Loading
Loading