Skip to content

Commit bf11ad3

Browse files
Optimize memory usage during broadcasts (#1233)
1 parent f49d65a commit bf11ad3

14 files changed

+366
-248
lines changed

setup.cfg

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ packages = find:
2525
python_requires = >=3.6
2626
install_requires =
2727
bidict >= 0.21.0
28-
python-engineio >= 4.3.0
28+
python-engineio >= 4.7.0
2929

3030
[options.packages.find]
3131
where = src

src/socketio/asyncio_manager.py

+38-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import asyncio
22

3+
from engineio import packet as eio_packet
4+
from socketio import packet
35
from .base_manager import BaseManager
46

57

@@ -17,18 +19,45 @@ async def emit(self, event, data, namespace, room=None, skip_sid=None,
1719
"""
1820
if namespace not in self.rooms:
1921
return
20-
tasks = []
22+
if isinstance(data, tuple):
23+
# tuples are expanded to multiple arguments, everything else is
24+
# sent as a single argument
25+
data = list(data)
26+
elif data is not None:
27+
data = [data]
28+
else:
29+
data = []
2130
if not isinstance(skip_sid, list):
2231
skip_sid = [skip_sid]
23-
for sid, eio_sid in self.get_participants(namespace, room):
24-
if sid not in skip_sid:
25-
if callback is not None:
32+
tasks = []
33+
if not callback:
34+
# when callbacks aren't used the packets sent to each recipient are
35+
# identical, so they can be generated once and reused
36+
pkt = self.server.packet_class(
37+
packet.EVENT, namespace=namespace, data=[event] + data)
38+
encoded_packet = pkt.encode()
39+
if not isinstance(encoded_packet, list):
40+
encoded_packet = [encoded_packet]
41+
eio_pkt = [eio_packet.Packet(eio_packet.MESSAGE, p)
42+
for p in encoded_packet]
43+
for sid, eio_sid in self.get_participants(namespace, room):
44+
if sid not in skip_sid:
45+
for p in eio_pkt:
46+
tasks.append(asyncio.create_task(
47+
self.server._send_eio_packet(eio_sid, p)))
48+
else:
49+
# callbacks are used, so each recipient must be sent a packet that
50+
# contains a unique callback id
51+
# note that callbacks when addressing a group of people are
52+
# implemented but not tested or supported
53+
for sid, eio_sid in self.get_participants(namespace, room):
54+
if sid not in skip_sid: # pragma: no branch
2655
id = self._generate_ack_id(sid, callback)
27-
else:
28-
id = None
29-
tasks.append(asyncio.create_task(
30-
self.server._emit_internal(eio_sid, event, data,
31-
namespace, id)))
56+
pkt = self.server.packet_class(
57+
packet.EVENT, namespace=namespace, data=[event] + data,
58+
id=id)
59+
tasks.append(asyncio.create_task(
60+
self.server._send_packet(eio_sid, pkt)))
3261
if tasks == []: # pragma: no cover
3362
return
3463
await asyncio.wait(tasks)

src/socketio/asyncio_server.py

+4-13
Original file line numberDiff line numberDiff line change
@@ -424,19 +424,6 @@ async def sleep(self, seconds=0):
424424
"""
425425
return await self.eio.sleep(seconds)
426426

427-
async def _emit_internal(self, sid, event, data, namespace=None, id=None):
428-
"""Send a message to a client."""
429-
# tuples are expanded to multiple arguments, everything else is sent
430-
# as a single argument
431-
if isinstance(data, tuple):
432-
data = list(data)
433-
elif data is not None:
434-
data = [data]
435-
else:
436-
data = []
437-
await self._send_packet(sid, self.packet_class(
438-
packet.EVENT, namespace=namespace, data=[event] + data, id=id))
439-
440427
async def _send_packet(self, eio_sid, pkt):
441428
"""Send a Socket.IO packet to a client."""
442429
encoded_packet = pkt.encode()
@@ -446,6 +433,10 @@ async def _send_packet(self, eio_sid, pkt):
446433
else:
447434
await self.eio.send(eio_sid, encoded_packet)
448435

436+
async def _send_eio_packet(self, eio_sid, eio_pkt):
437+
"""Send a raw Engine.IO packet to a client."""
438+
await self.eio.send_packet(eio_sid, eio_pkt)
439+
449440
async def _handle_connect(self, eio_sid, namespace, data):
450441
"""Handle a client connection request."""
451442
namespace = namespace or '/'

src/socketio/base_manager.py

+35-6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import logging
33

44
from bidict import bidict, ValueDuplicationError
5+
from engineio import packet as eio_packet
6+
from socketio import packet
57

68
default_logger = logging.getLogger('socketio')
79

@@ -161,15 +163,42 @@ def emit(self, event, data, namespace, room=None, skip_sid=None,
161163
connected to the namespace."""
162164
if namespace not in self.rooms:
163165
return
166+
if isinstance(data, tuple):
167+
# tuples are expanded to multiple arguments, everything else is
168+
# sent as a single argument
169+
data = list(data)
170+
elif data is not None:
171+
data = [data]
172+
else:
173+
data = []
164174
if not isinstance(skip_sid, list):
165175
skip_sid = [skip_sid]
166-
for sid, eio_sid in self.get_participants(namespace, room):
167-
if sid not in skip_sid:
168-
if callback is not None:
176+
if not callback:
177+
# when callbacks aren't used the packets sent to each recipient are
178+
# identical, so they can be generated once and reused
179+
pkt = self.server.packet_class(
180+
packet.EVENT, namespace=namespace, data=[event] + data)
181+
encoded_packet = pkt.encode()
182+
if not isinstance(encoded_packet, list):
183+
encoded_packet = [encoded_packet]
184+
eio_pkt = [eio_packet.Packet(eio_packet.MESSAGE, p)
185+
for p in encoded_packet]
186+
for sid, eio_sid in self.get_participants(namespace, room):
187+
if sid not in skip_sid:
188+
for p in eio_pkt:
189+
self.server._send_eio_packet(eio_sid, p)
190+
else:
191+
# callbacks are used, so each recipient must be sent a packet that
192+
# contains a unique callback id
193+
# note that callbacks when addressing a group of people are
194+
# implemented but not tested or supported
195+
for sid, eio_sid in self.get_participants(namespace, room):
196+
if sid not in skip_sid: # pragma: no branch
169197
id = self._generate_ack_id(sid, callback)
170-
else:
171-
id = None
172-
self.server._emit_internal(eio_sid, event, data, namespace, id)
198+
pkt = self.server.packet_class(
199+
packet.EVENT, namespace=namespace, data=[event] + data,
200+
id=id)
201+
self.server._send_packet(eio_sid, pkt)
173202

174203
def trigger_callback(self, sid, id, data):
175204
"""Invoke an application callback."""

src/socketio/server.py

+6-15
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ def on(self, event, handler=None, namespace=None):
185185
Example usage::
186186
187187
# as a decorator:
188-
@socket_io.on('connect', namespace='/chat')
188+
@sio.on('connect', namespace='/chat')
189189
def connect_handler(sid, environ):
190190
print('Connection request')
191191
if environ['REMOTE_ADDR'] in blacklisted:
@@ -194,7 +194,7 @@ def connect_handler(sid, environ):
194194
# as a method:
195195
def message_handler(sid, msg):
196196
print('Received message: ', msg)
197-
eio.send(sid, 'response')
197+
sio.send(sid, 'response')
198198
socket_io.on('message', namespace='/chat', handler=message_handler)
199199
200200
The handler function receives the ``sid`` (session ID) for the
@@ -633,19 +633,6 @@ def sleep(self, seconds=0):
633633
"""
634634
return self.eio.sleep(seconds)
635635

636-
def _emit_internal(self, eio_sid, event, data, namespace=None, id=None):
637-
"""Send a message to a client."""
638-
# tuples are expanded to multiple arguments, everything else is sent
639-
# as a single argument
640-
if isinstance(data, tuple):
641-
data = list(data)
642-
elif data is not None:
643-
data = [data]
644-
else:
645-
data = []
646-
self._send_packet(eio_sid, self.packet_class(
647-
packet.EVENT, namespace=namespace, data=[event] + data, id=id))
648-
649636
def _send_packet(self, eio_sid, pkt):
650637
"""Send a Socket.IO packet to a client."""
651638
encoded_packet = pkt.encode()
@@ -655,6 +642,10 @@ def _send_packet(self, eio_sid, pkt):
655642
else:
656643
self.eio.send(eio_sid, encoded_packet)
657644

645+
def _send_eio_packet(self, eio_sid, eio_pkt):
646+
"""Send a raw Engine.IO packet to a client."""
647+
self.eio.send_packet(eio_sid, eio_pkt)
648+
658649
def _handle_connect(self, eio_sid, namespace, data):
659650
"""Handle a client connection request."""
660651
namespace = namespace or '/'

0 commit comments

Comments
 (0)