Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 118 additions & 44 deletions source/qcommon/net.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,77 @@ static bool net_initialized = false;
static int numIP;
static uint8_t localIP[MAX_IPS][4];

#define STEAM_SLOT_MAX 256

struct steam_pkt_recieved_s {
struct steam_pkt_recieved_s *next;
uint32_t handle; // HSteamNetConnection (peer connection)
uint64_t steam_handle; // peer SteamID
size_t recvSize;
uint8_t buffered[];
};

struct steam_slot_s {
uint32_t handle;
struct steam_pkt_recieved_s *head;
struct steam_pkt_recieved_s *tail;
};

static struct steam_slot_s steam_slots[STEAM_SLOT_MAX];
static size_t num_steam_slots = 0;

static void __SDR_DisconnectMessages( void *self, struct steam_rpc_pkt_s *pkt )
{
struct p2p_disconnect_req_s *rpc = &pkt->p2p_disconnect;
for( size_t i = 0; i < num_steam_slots; i++ ) {
if( steam_slots[i].handle != rpc->handle)
continue;
struct steam_pkt_recieved_s *p = steam_slots[i].head;
while( p ) {
struct steam_pkt_recieved_s *next = p->next;
free( p );
p = next;
}
steam_slots[i] = steam_slots[--num_steam_slots];
return;
}
}

static void __SDR_OnRecvMessages( void *self, struct steam_evt_pkt_s *pkt )
{
struct recv_messages_evt_s *evt = &pkt->recv_messages;
struct steam_slot_s *slot = NULL; //__SDR_GetOrCreateSlot( evt->handle );
for( size_t i = 0; i < num_steam_slots; i++ ) {
if( steam_slots[i].handle == evt->handle ) {
slot = &steam_slots[i];
}
}
if( slot == NULL ) {
if( num_steam_slots >= STEAM_SLOT_MAX )
return;
slot = &steam_slots[num_steam_slots++];
slot->handle = evt->handle;
slot->head = NULL;
slot->tail = NULL;
}
size_t offset = 0;
for( int i = 0; i < evt->count; i++ ) {
size_t isz = (size_t)evt->messageinfo[i].count;
struct steam_pkt_recieved_s *p = malloc( sizeof( struct steam_pkt_recieved_s ) + isz );
p->next = NULL;
p->handle = evt->handle;
p->steam_handle = evt->steamID;
p->recvSize = isz;
memcpy( p->buffered, evt->buffer + offset, isz );
if( slot->tail )
slot->tail->next = p;
else
slot->head = p;
slot->tail = p;
offset += isz;
}
}

/*
=============================================================================
PRIVATE FUNCTIONS
Expand Down Expand Up @@ -339,48 +410,38 @@ static bool NET_SocketMakeNonBlocking( socket_handle_t handle )
return true;
}

struct SDR_GetPacket_Self {
msg_t* message;
netadr_t* addr;
int result;
};
static void NET_SDR_ConsumePacket(void* self, struct steam_rpc_pkt_s* rpc) {
struct SDR_GetPacket_Self* netRecieve = self;
msg_t* message = netRecieve->message;
netadr_t* address = netRecieve->addr;
struct recv_messages_recv_s *recv = &rpc->recv_messages_recv;
if(recv->count == 0) {
message->cursize = 0;
netRecieve->result = 0;
return;
}

int size = recv->messageinfo[0].count;
if (size > message->maxsize)
size = message->maxsize;
message->cursize = size;
static int NET_SDR_GetPacket( const socket_t *socket, netadr_t *address, msg_t *message )
{
assert( socket && socket->open && socket->type == SOCKET_SDR );

memcpy(message->data, recv->buffer, size);
NET_InitAddress(address, NA_SDR);
address->address.steamid = recv->steamID;
netRecieve->result = 1;
}
STEAMSHIM_dispatch();

static int NET_SDR_GetPacket( const socket_t *socket, netadr_t *address, msg_t *message ) {
assert(socket && socket->open && socket->type == SOCKET_SDR);
struct recv_messages_req_s req;
if(socket->server) {
req.cmd = RPC_SRV_P2P_RECV_MESSAGES;
} else {
req.cmd = RPC_P2P_RECV_MESSAGES;
struct steam_slot_s *slot = NULL;
for( size_t i = 0; i < num_steam_slots; i++ ) {
if( steam_slots[i].handle == socket->steam_handle ) {
slot = &steam_slots[i];
break;
}
}
req.handle = socket->steam_handle;
if( !slot || !slot->head )
return 0;

uint32_t syncIndex;
struct SDR_GetPacket_Self self = {message, address, -1};
STEAMSHIM_sendRPC(&req, sizeof req, &self, NET_SDR_ConsumePacket, &syncIndex);
STEAMSHIM_waitDispatchSync(syncIndex);
return self.result;
struct steam_pkt_recieved_s *p = slot->head;
slot->head = p->next;
if( !slot->head )
slot->tail = NULL;

size_t sz = p->recvSize;
if( sz > message->maxsize )
sz = message->maxsize;
memcpy( message->data, p->buffered, sz );
message->cursize = sz;
message->readcount = 0;
NET_InitAddress( address, NA_SDR );
address->address.steamid = p->steam_handle;

free( p );
return 1;
}

/*
Expand Down Expand Up @@ -576,13 +637,10 @@ static void NET_SDR_CloseSocket( socket_t *socket ) {
return;

struct p2p_disconnect_req_s req;
if(socket->server) {
req.cmd = RPC_SRV_P2P_DISCONNECT;
} else {
req.cmd = RPC_P2P_DISCONNECT;
}
req.cmd = socket->server ? RPC_SRV_P2P_DISCONNECT : RPC_P2P_DISCONNECT;
req.handle = socket->handle;
STEAMSHIM_sendRPC(&req, sizeof req, NULL, NULL, NULL);
STEAMSHIM_sendRPC( &req, sizeof req, NULL, __SDR_DisconnectMessages, NULL );

socket->open = false;
socket->handle = 0;
}
Expand Down Expand Up @@ -2094,6 +2152,10 @@ void NET_Init( void )

GetLocalAddress();

num_steam_slots = 0;
STEAMSHIM_subscribeEvent( EVT_P2P_RECV_MESSAGES, NULL, __SDR_OnRecvMessages );
STEAMSHIM_subscribeEvent( EVT_SRV_P2P_RECV_MESSAGES, NULL, __SDR_OnRecvMessages );

net_initialized = true;
}

Expand All @@ -2109,5 +2171,17 @@ void NET_Shutdown( void )

Sys_NET_Shutdown();

STEAMSHIM_unsubscribeEvent( EVT_P2P_RECV_MESSAGES, __SDR_OnRecvMessages );
STEAMSHIM_unsubscribeEvent( EVT_SRV_P2P_RECV_MESSAGES, __SDR_OnRecvMessages );
for( size_t i = 0; i < num_steam_slots; i++ ) {
struct steam_pkt_recieved_s *p = steam_slots[i].head;
while( p ) {
struct steam_pkt_recieved_s *next = p->next;
free( p );
p = next;
}
}
num_steam_slots = 0;

net_initialized = false;
}
118 changes: 76 additions & 42 deletions source/steamshim/src/child/child.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,47 @@ static ISteamUGC *GSteamUGC = NULL;
ServerBrowser *GServerBrowser = NULL;
static time_t time_since_last_pump = 0;

// Tracked SDR connections. Populated when SteamNetConnectionStatusChangedCallback_t
// reports Connected, removed on ClosedByPeer / ProblemDetectedLocally / None.
// Drained every loop iteration to push messages as events to the parent.
#define MAX_TRACKED_CONNS 256
static HSteamNetConnection g_serverConns[MAX_TRACKED_CONNS];
static int g_numServerConns = 0;
static HSteamNetConnection g_clientConns[MAX_TRACKED_CONNS];
static int g_numClientConns = 0;

static void handleSteamConnectionStatusChanged( steam_cmd_s cmd, SteamNetConnectionStatusChangedCallback_t *pCallback )
{
HSteamNetConnection *arr = ( cmd == EVT_SRV_P2P_CONNECTION_CHANGED ) ? g_serverConns : g_clientConns;
int *n = ( cmd == EVT_SRV_P2P_CONNECTION_CHANGED ) ? &g_numServerConns : &g_numClientConns;
const HSteamNetConnection h = pCallback->m_hConn;
switch( pCallback->m_info.m_eState ) {
case k_ESteamNetworkingConnectionState_Connected: {
bool found = false;
for( int i = 0; i < *n; i++ ) {
if( arr[i] == h ) {
found = true;
break;
}
}
if( !found && *n < MAX_TRACKED_CONNS )
arr[(*n)++] = h;
break;
}
case k_ESteamNetworkingConnectionState_ClosedByPeer:
case k_ESteamNetworkingConnectionState_ProblemDetectedLocally:
case k_ESteamNetworkingConnectionState_None:
for( int i = 0; i < *n; i++ ) {
if( arr[i] == h ) {
arr[i] = arr[--(*n)];
break;
}
}
break;
default:
break;
}

struct p2p_net_connection_changed_evt_s evt;
evt.cmd = cmd;
evt.hConn = pCallback->m_hConn;
Expand All @@ -77,37 +116,50 @@ static void handleSteamConnectionStatusChanged( steam_cmd_s cmd, SteamNetConnect
write_packet( GPipeWrite, &evt, sizeof( p2p_net_connection_changed_evt_s ) );
}

static void handleRecvMessageRPC( const steam_rpc_shim_common_s *req, SteamNetworkingMessage_t **msgs, int num_messages, uint64_t steamID, uint32_t handle )
static void _drainSocket( steam_cmd_s evtCmd, ISteamNetworkingSockets *sockets, HSteamNetConnection h )
{
if(num_messages == -1) {
recv_messages_recv_s recv;
recv.steamID = steamID;
recv.handle = handle;
recv.count = 0;
prepared_rpc_packet( req, &recv );
write_packet( GPipeWrite, &recv, sizeof( struct recv_messages_recv_s ));
printf("invalid handle for req: %d\n", req->cmd);
SteamNetworkingMessage_t *msgs[SDR_MAX_REQUESTED_PACKETS];
const int n = sockets->ReceiveMessagesOnConnection( h, msgs, SDR_MAX_REQUESTED_PACKETS );
if( n <= 0 )
return;
}

SteamNetConnectionInfo_t info;
sockets->GetConnectionInfo( h, &info );
const uint64_t steamid = info.m_identityRemote.GetSteamID().ConvertToUint64();

size_t total = 0;
for( int i = 0; i < num_messages; i++ ) {
for( int i = 0; i < n; i++ )
total += msgs[i]->GetSize();
}
auto recv = (struct recv_messages_recv_s *)malloc( sizeof( struct recv_messages_recv_s ) + total );
recv->steamID = steamID;
recv->handle = handle;
recv->count = num_messages;

auto evt = (struct recv_messages_evt_s *)malloc( sizeof( struct recv_messages_evt_s ) + total );
evt->cmd = evtCmd;
evt->steamID = steamid;
evt->handle = h;
evt->count = n;
size_t offset = 0;
for( int i = 0; i < n; i++ ) {
const size_t sz = msgs[i]->GetSize();
evt->messageinfo[i].count = (int)sz;
memcpy( evt->buffer + offset, msgs[i]->GetData(), sz );
offset += sz;
msgs[i]->Release();
}
write_packet( GPipeWrite, evt, sizeof( struct recv_messages_evt_s ) + total );
free( evt );
}

for( int i = 0; i < num_messages; i++ ) {
recv->messageinfo[i].count = msgs[i]->GetSize();
memcpy( recv->buffer + offset, msgs[i]->GetData(), msgs[i]->GetSize() );
offset += msgs[i]->GetSize();
static void drainAllConnections( void )
{
if( GRunServer ) {
ISteamNetworkingSockets *s = SteamGameServerNetworkingSockets();
for( int i = 0; i < g_numServerConns; i++ )
_drainSocket( EVT_SRV_P2P_RECV_MESSAGES, s, g_serverConns[i] );
}
if( GRunClient ) {
ISteamNetworkingSockets *s = SteamNetworkingSockets();
for( int i = 0; i < g_numClientConns; i++ )
_drainSocket( EVT_P2P_RECV_MESSAGES, s, g_clientConns[i] );
}
prepared_rpc_packet( req, recv );
write_packet( GPipeWrite, recv, sizeof( struct recv_messages_recv_s ) + total );
free( recv );
}

static void processEVT( steam_evt_pkt_s *req, size_t size )
Expand Down Expand Up @@ -331,25 +383,6 @@ static void processRPC( steam_rpc_pkt_s *req, size_t size )
write_packet( GPipeWrite, &recv, sizeof( steam_rpc_shim_common_s ) );
break;
}
case RPC_SRV_P2P_RECV_MESSAGES: {
SteamNetworkingMessage_t *msgs[32];
SteamNetConnectionInfo_t info;
SteamGameServerNetworkingSockets()->GetConnectionInfo( req->recv_messages.handle, &info );
const uint64_t steamid = info.m_identityRemote.GetSteamID().ConvertToUint64();
const int n = SteamGameServerNetworkingSockets()->ReceiveMessagesOnConnection( req->recv_messages.handle, msgs, 1 );
handleRecvMessageRPC( &req->common, msgs, n, steamid, req->recv_messages.handle );
break;
}
case RPC_P2P_RECV_MESSAGES: {
SteamNetworkingMessage_t *msgs[32];
SteamNetConnectionInfo_t info;
SteamNetworkingSockets()->GetConnectionInfo( req->recv_messages.handle, &info );
const uint64_t steamid = info.m_identityRemote.GetSteamID().ConvertToUint64();

const int n = SteamNetworkingSockets()->ReceiveMessagesOnConnection( req->recv_messages.handle, msgs, 1 );
handleRecvMessageRPC( &req->common, msgs, n, steamid, req->recv_messages.handle );
break;
}
case RPC_GETVOICE: {
uint32 size;

Expand Down Expand Up @@ -615,6 +648,7 @@ static void processCommands()

processSteamServerDispatch();
processSteamDispatch();
drainAllConnections();
continue_processing:

if( packet.size > STEAM_PACKED_RESERVE_SIZE - sizeof( uint32_t ) ) {
Expand Down
4 changes: 4 additions & 0 deletions source/steamshim/src/mod_steam.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ DECLARE_TYPEDEF_METHOD( int, STEAMSHIM_dispatch );
DECLARE_TYPEDEF_METHOD( int, STEAMSHIM_sendRPC, void *req, uint32_t size, void *self, STEAMSHIM_rpc_handle rpc, uint32_t *syncIndex );
DECLARE_TYPEDEF_METHOD( int, STEAMSHIM_sendEVT, void *packet, uint32_t size );
DECLARE_TYPEDEF_METHOD( int, STEAMSHIM_waitDispatchSync, uint32_t syncIndex ); // wait on the dispatch loop does not trigger steam callbacks
DECLARE_TYPEDEF_METHOD( uint32_t, STEAMSHIM_currentSync ); // sync index of the most recently processed RPC
DECLARE_TYPEDEF_METHOD( void, STEAMSHIM_subscribeEvent, uint32_t id, void *self, STEAMSHIM_evt_handle evt );
DECLARE_TYPEDEF_METHOD( void, STEAMSHIM_unsubscribeEvent, uint32_t id, STEAMSHIM_evt_handle evt );
DECLARE_TYPEDEF_METHOD( bool, STEAMSHIM_active );
Expand All @@ -30,6 +31,7 @@ struct steam_import_s {
STEAMSHIM_sendRPCFn STEAMSHIM_sendRPC;
STEAMSHIM_sendEVTFn STEAMSHIM_sendEVT;
STEAMSHIM_waitDispatchSyncFn STEAMSHIM_waitDispatchSync;
STEAMSHIM_currentSyncFn STEAMSHIM_currentSync;
STEAMSHIM_subscribeEventFn STEAMSHIM_subscribeEvent;
STEAMSHIM_unsubscribeEventFn STEAMSHIM_unsubscribeEvent;
STEAMSHIM_activeFn STEAMSHIM_active;
Expand All @@ -39,6 +41,7 @@ struct steam_import_s {
STEAMSHIM_sendRPC, \
STEAMSHIM_sendEVT, \
STEAMSHIM_waitDispatchSync, \
STEAMSHIM_currentSync, \
STEAMSHIM_subscribeEvent, \
STEAMSHIM_unsubscribeEvent, \
STEAMSHIM_active \
Expand All @@ -54,6 +57,7 @@ int STEAMSHIM_dispatch() { return steam_import.STEAMSHIM_dispatch();}
int STEAMSHIM_sendRPC( void *req, uint32_t size, void *self, STEAMSHIM_rpc_handle rpc, uint32_t *syncIndex ) { return steam_import.STEAMSHIM_sendRPC(req, size, self, rpc, syncIndex);}
int STEAMSHIM_sendEVT( void *packet, uint32_t size) { return steam_import.STEAMSHIM_sendEVT(packet, size);}
int STEAMSHIM_waitDispatchSync( uint32_t syncIndex ){ return steam_import.STEAMSHIM_waitDispatchSync(syncIndex);} // wait on the dispatch loop
uint32_t STEAMSHIM_currentSync(){ return steam_import.STEAMSHIM_currentSync();}
void STEAMSHIM_subscribeEvent( uint32_t id, void *self, STEAMSHIM_evt_handle evt ){ return steam_import.STEAMSHIM_subscribeEvent(id, self, evt);} // wait on the dispatch loop
void STEAMSHIM_unsubscribeEvent( uint32_t id, STEAMSHIM_evt_handle evt){ return steam_import.STEAMSHIM_unsubscribeEvent(id, evt);} // wait on the dispatch loop
bool STEAMSHIM_active(){ return steam_import.STEAMSHIM_active();} // wait on the dispatch loop
Expand Down
Loading
Loading