4545 AfterPubSubConnectionInstantiationEvent ,
4646 AfterSingleConnectionInstantiationEvent ,
4747 ClientType ,
48- EventDispatcher ,
48+ EventDispatcher , AfterCommandExecutionEvent ,
4949)
5050from redis .exceptions import (
5151 ConnectionError ,
@@ -478,7 +478,8 @@ def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline":
478478 between the client and server.
479479 """
480480 return Pipeline (
481- self .connection_pool , self .response_callbacks , transaction , shard_hint
481+ self .connection_pool , self .response_callbacks , transaction , shard_hint ,
482+ event_dispatcher = self ._event_dispatcher
482483 )
483484
484485 def transaction (
@@ -662,16 +663,42 @@ def _execute_command(self, *args, **options):
662663 command_name = args [0 ]
663664 conn = self .connection or pool .get_connection ()
664665
666+ # Start timing for observability
667+ start_time = time .monotonic ()
668+
665669 if self ._single_connection_client :
666670 self .single_connection_lock .acquire ()
667671 try :
668- return conn .retry .call_with_retry (
672+ result = conn .retry .call_with_retry (
669673 lambda : self ._send_command_parse_response (
670674 conn , command_name , * args , ** options
671675 ),
672676 lambda _ : self ._close_connection (conn ),
673677 )
674678
679+ self ._event_dispatcher .dispatch (
680+ AfterCommandExecutionEvent (
681+ command_name = command_name ,
682+ duration_seconds = time .monotonic () - start_time ,
683+ server_address = conn .host ,
684+ server_port = conn .port ,
685+ db_namespace = str (conn .db ),
686+ )
687+ )
688+ return result
689+ except Exception as e :
690+ self ._event_dispatcher .dispatch (
691+ AfterCommandExecutionEvent (
692+ command_name = command_name ,
693+ duration_seconds = time .monotonic () - start_time ,
694+ server_address = conn .host ,
695+ server_port = conn .port ,
696+ db_namespace = str (conn .db ),
697+ error = e ,
698+ )
699+ )
700+ raise
701+
675702 finally :
676703 if conn and conn .should_reconnect ():
677704 self ._close_connection (conn )
@@ -1385,6 +1412,7 @@ def __init__(
13851412 response_callbacks ,
13861413 transaction ,
13871414 shard_hint ,
1415+ event_dispatcher : EventDispatcher
13881416 ):
13891417 self .connection_pool = connection_pool
13901418 self .connection : Optional [Connection ] = None
@@ -1395,6 +1423,7 @@ def __init__(
13951423 self .command_stack = []
13961424 self .scripts : Set [Script ] = set ()
13971425 self .explicit_transaction = False
1426+ self ._event_dispatcher = event_dispatcher
13981427
13991428 def __enter__ (self ) -> "Pipeline" :
14001429 return self
@@ -1501,12 +1530,41 @@ def immediate_execute_command(self, *args, **options):
15011530 conn = self .connection_pool .get_connection ()
15021531 self .connection = conn
15031532
1504- return conn .retry .call_with_retry (
1505- lambda : self ._send_command_parse_response (
1506- conn , command_name , * args , ** options
1507- ),
1508- lambda error : self ._disconnect_reset_raise_on_watching (conn , error ),
1509- )
1533+ # Start timing for observability
1534+ start_time = time .monotonic ()
1535+
1536+ try :
1537+ response = conn .retry .call_with_retry (
1538+ lambda : self ._send_command_parse_response (
1539+ conn , command_name , * args , ** options
1540+ ),
1541+ lambda error : self ._disconnect_reset_raise_on_watching (conn , error ),
1542+ )
1543+
1544+ self ._event_dispatcher .dispatch (
1545+ AfterCommandExecutionEvent (
1546+ command_name = command_name ,
1547+ duration_seconds = time .monotonic () - start_time ,
1548+ server_address = conn .host ,
1549+ server_port = conn .port ,
1550+ db_namespace = str (conn .db ),
1551+ )
1552+ )
1553+
1554+ return response
1555+ except Exception as e :
1556+ self ._event_dispatcher .dispatch (
1557+ AfterCommandExecutionEvent (
1558+ command_name = command_name ,
1559+ duration_seconds = time .monotonic () - start_time ,
1560+ server_address = conn .host ,
1561+ server_port = conn .port ,
1562+ db_namespace = str (conn .db ),
1563+ error = e ,
1564+ )
1565+ )
1566+ raise
1567+
15101568
15111569 def pipeline_execute_command (self , * args , ** options ) -> "Pipeline" :
15121570 """
@@ -1679,8 +1737,10 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
16791737 self .load_scripts ()
16801738 if self .transaction or self .explicit_transaction :
16811739 execute = self ._execute_transaction
1740+ operation_name = "MULTI"
16821741 else :
16831742 execute = self ._execute_pipeline
1743+ operation_name = "PIPELINE"
16841744
16851745 conn = self .connection
16861746 if not conn :
@@ -1689,11 +1749,40 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
16891749 # back to the pool after we're done
16901750 self .connection = conn
16911751
1752+ # Start timing for observability
1753+ start_time = time .monotonic ()
1754+
16921755 try :
1693- return conn .retry .call_with_retry (
1756+ response = conn .retry .call_with_retry (
16941757 lambda : execute (conn , stack , raise_on_error ),
16951758 lambda error : self ._disconnect_raise_on_watching (conn , error ),
16961759 )
1760+
1761+ self ._event_dispatcher .dispatch (
1762+ AfterCommandExecutionEvent (
1763+ command_name = operation_name ,
1764+ duration_seconds = time .monotonic () - start_time ,
1765+ server_address = conn .host ,
1766+ server_port = conn .port ,
1767+ db_namespace = str (conn .db ),
1768+ batch_size = len (stack ),
1769+ )
1770+ )
1771+ return response
1772+ except Exception as e :
1773+ self ._event_dispatcher .dispatch (
1774+ AfterCommandExecutionEvent (
1775+ command_name = operation_name ,
1776+ duration_seconds = time .monotonic () - start_time ,
1777+ server_address = conn .host ,
1778+ server_port = conn .port ,
1779+ db_namespace = str (conn .db ),
1780+ error = e ,
1781+ batch_size = len (stack ),
1782+ )
1783+ )
1784+ raise
1785+
16971786 finally :
16981787 # in reset() the connection is disconnected before returned to the pool if
16991788 # it is marked for reconnect.
0 commit comments