@@ -408,33 +408,23 @@ async def pub_task():
408408
409409
410410@pytest .mark .slow
411- @dfly_args ({"proactor_threads" : "4" , "migrate_connections" : False })
411+ @dfly_args ({"proactor_threads" : "4" })
412412async def test_pubsub_busy_connections (df_server : DflyInstance ):
413- sleep = 10
414-
415- idd = 0
413+ sleep = 60
416414
417415 async def sub_thread ():
418416 i = 0
419417
420418 async def sub_task ():
421419 nonlocal i
422- nonlocal idd
423420 sleep_task = asyncio .create_task (asyncio .sleep (sleep ))
424- j = idd
425- idd = idd + 1
426421 while not sleep_task .done ():
427422 client = df_server .client ()
428423 pubsub = client .pubsub ()
429- try :
430- await pubsub .subscribe ("channel" )
431- except Exception as e :
432- logging .info (f"ERRRRRRRROR { j } " )
433- pass
424+ await pubsub .subscribe ("channel" )
434425 # await pubsub.unsubscribe("channel")
435426 i = i + 1
436427 await client .close ()
437- logging .info (f"SUB DONE { j } " )
438428
439429 subs = [asyncio .create_task (sub_task ()) for _ in range (10 )]
440430 for s in subs :
@@ -443,32 +433,25 @@ async def sub_task():
443433
444434 async def pub_task ():
445435 pub = df_server .client ()
446- i = 0
447436 sleep_task = asyncio .create_task (asyncio .sleep (sleep ))
448437 while not sleep_task .done ():
449- # logging.info("before")
450438 await pub .publish ("channel" , f"message-{ i } " )
451- i = i + 1
452- # logging.info("after")
453- logging .info ("DONE" )
454439
455440 def run_in_thread ():
456441 loop = asyncio .new_event_loop ()
457442 asyncio .set_event_loop (loop )
458443 loop .run_until_complete (sub_thread ())
459444
460445 threads = []
461- for _ in range (1 ):
446+ for _ in range (10 ):
462447 thread = Thread (target = run_in_thread )
463448 thread .start ()
464449 threads .append (thread )
465450
466451 await pub_task ()
467452
468- logging .info ("==================" )
469453 for thread in threads :
470454 thread .join ()
471- logging .info ("==================" )
472455
473456
474457async def test_subscribers_with_active_publisher (df_server : DflyInstance , max_connections = 100 ):
0 commit comments