Skip to content

Commit

Permalink
improve counters/statistics
Browse files Browse the repository at this point in the history
Reworks the statistics code in a backward compatible way. A new options
called statistics is added and set to true by default.

It shows at every keep-alive sent what are the latest counters for
packet types and others.

The commit is longer than should as some ruff formating got in.
  • Loading branch information
thomas-mangin committed Dec 9, 2024
1 parent 11a15c9 commit d3af6fd
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 57 deletions.
3 changes: 3 additions & 0 deletions doc/man/exabgp.1
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ Default: false.
Controls logging of networking information (TCP/IP state, network
state etc.).
Default: true.
.It exabgp.log.statistics
Controls logging of packet statistics (counters).
Default: true.
.It exabgp.log.packets
Controls logging of BGP packets sent and received.
Default: false.
Expand Down
6 changes: 6 additions & 0 deletions src/exabgp/environment/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@
'value': 'true',
'help': 'report networking information (TCP/IP, network state,...)',
},
'statistics': {
'read': parsing.boolean,
'write': parsing.lower,
'value': 'true',
'help': 'report packet statistics',
},
'packets': {
'read': parsing.boolean,
'write': parsing.lower,
Expand Down
2 changes: 2 additions & 0 deletions src/exabgp/logger/option.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class option(object):
'processes': False,
'configuration': False,
'network': False,
'statistics': False,
'wire': False,
'message': False,
'rib': False,
Expand Down Expand Up @@ -67,6 +68,7 @@ def load(cls, env):
'processes': env.log.enable and (env.log.all or env.log.processes),
'configuration': env.log.enable and (env.log.all or env.log.configuration),
'network': env.log.enable and (env.log.all or env.log.network),
'statistics': env.log.enable and (env.log.all or env.log.statistics),
'wire': env.log.enable and (env.log.all or env.log.packets),
'message': env.log.enable and (env.log.all or env.log.message),
'rib': env.log.enable and (env.log.all or env.log.rib),
Expand Down
182 changes: 146 additions & 36 deletions src/exabgp/reactor/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,27 @@ class Stop(Exception):
pass


# ======================================================================== Counter


class Stats(dict):
__format = {'complete': lambda t: 'time %s' % time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(t))}

def __init__(self, *args):
dict.__init__(self, args)
self.__changed = set()

def __setitem__(self, key, val):
dict.__setitem__(self, key, val)
self.__changed.add(key)

def changed_statistics(self):
for name in self.__changed:
formater = self.__format.get(name, lambda v: 'counter %s' % v)
yield 'statistics for %s %s' % (name, formater(self[name]))
self.__changed = set()


# ======================================================================== Peer
# Present a File like interface to socket.socket

Expand All @@ -78,12 +99,28 @@ def __init__(self, neighbor, reactor):

self.proto = None
self.fsm = FSM(self, FSM.IDLE)
self.stats = {
'fsm': self.fsm,
'creation': now,
'reset': now,
'complete': 0,
}
self.stats = Stats()
self.stats.update(
{
'fsm': self.fsm,
'creation': now, # when the peer was created
'reset': now, # time of last reset
'complete': 0, # when did the peer got established
'up': 0,
'down': 0,
'receive-open': 0,
'send-open': 0,
'receive-notification': 0,
'send-notification': 0,
'receive-update': 0,
'send-update': 0,
'receive-refresh': 0,
'send-refresh': 0,
'receive-keepalive': 0,
'send-keepalive': 0,
}
)

self.generator = None

# The peer should restart after a stop
Expand All @@ -106,15 +143,30 @@ def _close(self, message='', error=''):
if self.neighbor.api['neighbor-changes']:
self.reactor.processes.down(self.neighbor, message)
except ProcessError:
log.debug('could not send notification of neighbor close to API', self.connection.session())
log.debug(
'could not send notification of neighbor close to API',
self.connection.session(),
)
self.fsm.change(FSM.IDLE)

self.stats = {
'fsm': self.fsm,
'creation': self.stats['creation'],
'reset': time.time(),
'complete': 0,
}
self.stats.update(
{
'fsm': self.fsm,
'reset': time.time(),
'complete': 0,
'receive-open': 0,
'send-open': 0,
'receive-notification': 0,
'send-notification': 0,
'receive-update': 0,
'send-update': 0,
'receive-refresh': 0,
'send-refresh': 0,
'receive-keepalive': 0,
'send-keepalive': 0,
}
)

if self.proto:
try:
message = 'peer reset, message [{0}] error[{1}]'.format(message, error)
Expand Down Expand Up @@ -149,7 +201,11 @@ def _stop(self, message):
# logging

def me(self, message):
return 'peer %s ASN %-7s %s' % (self.neighbor['peer-address'], self.neighbor['peer-as'], message)
return 'peer %s ASN %-7s %s' % (
self.neighbor['peer-address'],
self.neighbor['peer-as'],
message,
)

# control

Expand All @@ -159,11 +215,23 @@ def stop(self):
self._restarted = False
self._delay.reset()
self.fsm.change(FSM.IDLE)
self.stats = {
'fsm': self.fsm,
'creation': self.stats['creation'],
'reset': time.time(),
}
self.stats.update(
{
'fsm': self.fsm,
'reset': time.time(),
'complete': 0,
'receive-open': 0,
'send-open': 0,
'receive-notification': 0,
'send-notification': 0,
'receive-update': 0,
'send-update': 0,
'receive-refresh': 0,
'send-refresh': 0,
'receive-keepalive': 0,
'send-keepalive': 0,
}
)
self.neighbor.rib.uncache()

def remove(self):
Expand Down Expand Up @@ -205,7 +273,10 @@ def handle_connection(self, connection):

# if the other side fails, we go back to idle
if self.fsm == FSM.ESTABLISHED:
log.debug('we already have a peer in state established for %s' % connection.name(), self.id())
log.debug(
'we already have a peer in state established for %s' % connection.name(),
self.id(),
)
return connection.notification(6, 7, 'could not accept the connection, already established')

# 6.8 The convention is to compare the BGP Identifiers of the peers
Expand Down Expand Up @@ -294,7 +365,11 @@ def _send_open(self):
def _read_open(self):
wait = getenv().bgp.openwait
opentimer = ReceiveTimer(
self.proto.connection.session, wait, 1, 1, 'waited for open too long, we do not like stuck in active'
self.proto.connection.session,
wait,
1,
1,
'waited for open too long, we do not like stuck in active',
)
# Only yield if we have not the open, otherwise the reactor can run the other connection
# which would be bad as we need to do the collission check without going to the other peer
Expand Down Expand Up @@ -384,8 +459,11 @@ def _main(self):
include_withdraw = False

# Announce to the process BGP is up
log.info('connected to %s with %s' % (self.id(), self.proto.connection.name()), 'reactor')
self.stats['up'] = self.stats.get('up', 0) + 1
log.info(
'connected to %s with %s' % (self.id(), self.proto.connection.name()),
'reactor',
)
self.stats['up'] += 1
if self.neighbor.api['neighbor-changes']:
try:
self.reactor.processes.up(self.neighbor)
Expand Down Expand Up @@ -438,6 +516,8 @@ def _main(self):
# we need and will send a keepalive
while send_ka() is None:
yield ACTION.NOW
for counter_line in self.stats.changed_statistics():
log.info(counter_line, 'statistics')

# Received update
if message.TYPE == Update.TYPE:
Expand All @@ -446,7 +526,10 @@ def _main(self):

for nlri in message.nlris:
self.neighbor.rib.incoming.update_cache(Change(nlri, message.attributes))
logfunc.debug(lazyformat(' UPDATE #%d nlri ' % number, nlri, str), self.id())
logfunc.debug(
lazyformat(' UPDATE #%d nlri ' % number, nlri, str),
self.id(),
)

elif message.TYPE == RouteRefresh.TYPE:
enhanced = message.reserved == RouteRefresh.request
Expand Down Expand Up @@ -551,7 +634,10 @@ def _run(self):
except NetworkError as network:
# we tried to connect once, it failed and it was not a manual request, we stop
if self.once and not self._teardown:
log.debug('only one attempt to connect is allowed, stopping the peer', self.id())
log.debug(
'only one attempt to connect is allowed, stopping the peer',
self.id(),
)
self.stop()

self._reset('closing connection', network)
Expand Down Expand Up @@ -579,10 +665,16 @@ def _run(self):
except Notification as notification:
# we tried to connect once, it failed and it was not a manual request, we stop
if self.once and not self._teardown:
log.debug('only one attempt to connect is allowed, stopping the peer', self.id())
log.debug(
'only one attempt to connect is allowed, stopping the peer',
self.id(),
)
self.stop()

self._reset('notification received (%d,%d)' % (notification.code, notification.subcode), notification)
self._reset(
'notification received (%d,%d)' % (notification.code, notification.subcode),
notification,
)
return

# RECEIVED a Message TYPE we did not expect
Expand Down Expand Up @@ -682,12 +774,30 @@ def tri(value):

capabilities = {
'asn4': (tri(self.neighbor['capability']['asn4']), tri(peer['asn4'])),
'route-refresh': (tri(self.neighbor['capability']['route-refresh']), tri(peer['route-refresh'])),
'multi-session': (tri(self.neighbor['capability']['multi-session']), tri(peer['multi-session'])),
'operational': (tri(self.neighbor['capability']['operational']), tri(peer['operational'])),
'add-path': (tri(self.neighbor['capability']['add-path']), tri(peer['add-path'])),
'extended-message': (tri(self.neighbor['capability']['extended-message']), tri(peer['extended-message'])),
'graceful-restart': (tri(self.neighbor['capability']['graceful-restart']), tri(peer['graceful-restart'])),
'route-refresh': (
tri(self.neighbor['capability']['route-refresh']),
tri(peer['route-refresh']),
),
'multi-session': (
tri(self.neighbor['capability']['multi-session']),
tri(peer['multi-session']),
),
'operational': (
tri(self.neighbor['capability']['operational']),
tri(peer['operational']),
),
'add-path': (
tri(self.neighbor['capability']['add-path']),
tri(peer['add-path']),
),
'extended-message': (
tri(self.neighbor['capability']['extended-message']),
tri(peer['extended-message']),
),
'graceful-restart': (
tri(self.neighbor['capability']['graceful-restart']),
tri(peer['graceful-restart']),
),
}

families = {}
Expand All @@ -706,16 +816,16 @@ def tri(value):
total_sent = 0
total_rcvd = 0
for message in ('open', 'notification', 'keepalive', 'update', 'refresh'):
sent = self.stats.get('send-%s' % message, 0)
rcvd = self.stats.get('receive-%s' % message, 0)
sent = self.stats['send-%s' % message]
rcvd = self.stats['receive-%s' % message]
total_sent += sent
total_rcvd += rcvd
messages[message] = (sent, rcvd)
messages['total'] = (total_sent, total_rcvd)

return {
'down': int(self.stats['reset'] - self.stats['creation']),
'duration': int(time.time() - self.stats['complete']) if self.stats['complete'] else 0,
'duration': (int(time.time() - self.stats['complete']) if self.stats['complete'] else 0),
'local-address': str(self.neighbor['local-address']),
'peer-address': str(self.neighbor['peer-address']),
'local-as': int(self.neighbor['local-as']),
Expand Down
Loading

0 comments on commit d3af6fd

Please sign in to comment.