Skip to content

Commit 9ed19e6

Browse files
Add: Consistent redundant stream logging
Unify redundant packet logging with regular packet reporting. Use global session ID for redundant checking across st40/st41 sessions, similar to timestamp validation to make things as similar to eachoder as possible. Change API to report out-of-order packets in user port statistics. Note: Out-of-order per port tracking not implemented for st20.
1 parent fdb0acd commit 9ed19e6

File tree

7 files changed

+177
-98
lines changed

7 files changed

+177
-98
lines changed

include/st_api.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,8 @@ struct st_rx_port_stats {
271271
uint64_t incomplete_frames;
272272
/** Total number of received packets which are not valid. */
273273
uint64_t err_packets;
274+
/** Total number of out-of-order packets received */
275+
uint64_t out_of_order_packets;
274276
};
275277

276278
/**

lib/src/mt_util.h

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -115,20 +115,6 @@ static inline bool mt_seq32_greater(uint32_t a, uint32_t b) {
115115
return (diff & 0x80000000u) == 0 && diff != 0;
116116
}
117117

118-
static inline bool st_rx_seq_redundant_drop(uint16_t new_id, int* sessions_redundant,
119-
enum mtl_session_port s_port, int num_port) {
120-
for (int i = MTL_SESSION_PORT_P; i < num_port; i++) {
121-
if (i == s_port) continue;
122-
123-
uint16_t old_id = sessions_redundant[i];
124-
if (mt_seq16_greater(old_id, new_id)) {
125-
return true;
126-
}
127-
}
128-
129-
return false;
130-
}
131-
132118
struct rte_mbuf* mt_build_pad(struct mtl_main_impl* impl, struct rte_mempool* mempool,
133119
enum mtl_port port, uint16_t ether_type, uint16_t len);
134120

@@ -346,4 +332,4 @@ static inline void mt_stat_u64_update(struct mt_stat_u64* stat, uint64_t new) {
346332
stat->cnt++;
347333
}
348334

349-
#endif
335+
#endif

lib/src/st2110/st_header.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,7 @@ struct st_rx_video_session_impl {
690690
int stat_pkts_enqueue_fallback; /* for pkt lcore */
691691
int stat_pkts_offset_dropped;
692692
int stat_pkts_out_of_order;
693+
int stat_pkts_out_of_order_per_port[MTL_SESSION_PORT_MAX];
693694
int stat_pkts_redundant_dropped;
694695
int stat_pkts_wrong_pt_dropped;
695696
int stat_pkts_wrong_ssrc_dropped;
@@ -1016,6 +1017,7 @@ struct st_rx_audio_session_impl {
10161017
uint32_t st30_pkt_size; /* size for each pkt which include the header */
10171018
int st30_total_pkts; /* total pkts in one frame */
10181019
int st30_pkt_idx; /* pkt index in current frame */
1020+
int session_seq_id; /* global session seq id to track continuity across redundant */
10191021
int latest_seq_id[MTL_SESSION_PORT_MAX]; /* latest seq id */
10201022

10211023
uint32_t first_pkt_rtp_ts; /* rtp time stamp for the first pkt */
@@ -1033,6 +1035,7 @@ struct st_rx_audio_session_impl {
10331035
int stat_pkts_dropped;
10341036
int stat_pkts_redundant;
10351037
int stat_pkts_out_of_order;
1038+
int stat_pkts_out_of_order_per_port[MTL_SESSION_PORT_MAX];
10361039
int stat_slot_get_frame_fail;
10371040
int stat_pkts_wrong_pt_dropped;
10381041
int stat_pkts_wrong_ssrc_dropped;
@@ -1182,7 +1185,7 @@ struct st_rx_ancillary_session_impl {
11821185

11831186
uint16_t st40_dst_port[MTL_SESSION_PORT_MAX]; /* udp port */
11841187
bool mcast_joined[MTL_SESSION_PORT_MAX];
1185-
1188+
int session_seq_id; /* global session seq id to track continuity across redundant */
11861189
int latest_seq_id[MTL_SESSION_PORT_MAX]; /* latest seq id */
11871190

11881191
struct mt_rtcp_rx* rtcp_rx[MTL_SESSION_PORT_MAX];
@@ -1193,6 +1196,7 @@ struct st_rx_ancillary_session_impl {
11931196
int stat_pkts_dropped;
11941197
int stat_pkts_redundant;
11951198
int stat_pkts_out_of_order;
1199+
int stat_pkts_out_of_order_per_port[MTL_SESSION_PORT_MAX];
11961200
int stat_pkts_enqueue_fail;
11971201
int stat_pkts_wrong_pt_dropped;
11981202
int stat_pkts_wrong_ssrc_dropped;
@@ -1354,7 +1358,7 @@ struct st_rx_fastmetadata_session_impl {
13541358

13551359
uint16_t st41_dst_port[MTL_SESSION_PORT_MAX]; /* udp port */
13561360
bool mcast_joined[MTL_SESSION_PORT_MAX];
1357-
1361+
int session_seq_id; /* global session seq id to track continuity across redundant */
13581362
int latest_seq_id[MTL_SESSION_PORT_MAX]; /* latest seq id */
13591363

13601364
struct mt_rtcp_rx* rtcp_rx[MTL_SESSION_PORT_MAX];
@@ -1364,6 +1368,7 @@ struct st_rx_fastmetadata_session_impl {
13641368
rte_atomic32_t stat_frames_received;
13651369
int stat_pkts_redundant;
13661370
int stat_pkts_out_of_order;
1371+
int stat_pkts_out_of_order_per_port[MTL_SESSION_PORT_MAX];
13671372
int stat_pkts_enqueue_fail;
13681373
int stat_pkts_wrong_pt_dropped;
13691374
int stat_pkts_wrong_ssrc_dropped;

lib/src/st2110/st_rx_ancillary_session.c

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -131,37 +131,46 @@ static int rx_ancillary_session_handle_pkt(struct mtl_main_impl* impl,
131131
}
132132
/* 0b00: progressive or not specified, do nothing */
133133

134-
if (unlikely(s->latest_seq_id[MTL_PORT_P] == -1))
135-
s->latest_seq_id[MTL_PORT_P] = seq_id - 1;
136-
if (unlikely(s->latest_seq_id[MTL_PORT_R] == -1))
137-
s->latest_seq_id[MTL_PORT_R] = seq_id - 1;
134+
if (unlikely(s->latest_seq_id[s_port] == -1)) s->latest_seq_id[s_port] = seq_id - 1;
135+
if (unlikely(s->session_seq_id == -1)) s->session_seq_id = seq_id - 1;
138136
if (unlikely(s->tmstamp == -1)) s->tmstamp = tmstamp - 1;
139137

138+
/* not a big deal as long as stream is continous */
140139
if (seq_id != (uint16_t)(s->latest_seq_id[s_port] + 1)) {
141140
dbg("%s(%d,%d), non-continuous seq now %u last %d\n", __func__, s->idx, s_port,
142141
seq_id, s->latest_seq_id[s_port]);
143-
ST_SESSION_STAT_INC(s, port_user_stats.common, stat_pkts_out_of_order);
142+
s->port_user_stats.common.port[s_port].out_of_order_packets++;
143+
s->stat_pkts_out_of_order_per_port[s_port]++;
144144
}
145+
s->latest_seq_id[s_port] = seq_id;
145146

146147
/* in ancillary we assume packet is redundant when the seq_id is old (it's possible to
147148
get multiple packets with the same timestamp)) */
148149
if ((mt_seq32_greater(s->tmstamp, tmstamp)) ||
149-
st_rx_seq_redundant_drop(seq_id, s->latest_seq_id, s_port, s->ops.num_port)) {
150-
if (mt_seq32_greater(s->tmstamp, tmstamp)) {
151-
dbg("%s(%d,%d), drop as pkt tmstamp %u is old\n", __func__, s->idx, s_port,
152-
tmstamp);
150+
!mt_seq16_greater(seq_id, s->session_seq_id)) {
151+
if (!mt_seq16_greater(seq_id, s->session_seq_id)) {
152+
dbg("%s(%d,%d), redundant seq now %u session last %d\n", __func__, s->idx, s_port,
153+
seq_id, s->session_seq_id);
153154
} else {
154-
dbg("%s(%d,%d), drop as pkt seq %d is old\n", __func__, s->idx, s_port, seq_id);
155+
dbg("%s(%d,%d), redundant tmstamp now %u session last %u\n", __func__, s->idx,
156+
s_port, tmstamp, s->tmstamp);
155157
}
156158

157159
ST_SESSION_STAT_INC(s, port_user_stats, stat_pkts_redundant);
158-
s->latest_seq_id[s_port] = seq_id;
159160
return -EIO;
160161
}
161162

163+
/* hole in seq id packets going into the session check if the seq_id of the session is
164+
* consistent */
165+
if (seq_id != (uint16_t)(s->session_seq_id + 1)) {
166+
dbg("%s(%d,%d), session seq_id %u out of order %d\n", __func__, s->idx, s_port,
167+
seq_id, s->session_seq_id);
168+
s->stat_pkts_out_of_order++;
169+
ST_SESSION_STAT_INC(s, port_user_stats.common, stat_pkts_out_of_order);
170+
}
171+
162172
/* update seq id */
163-
s->latest_seq_id[s_port] = seq_id;
164-
s->tmstamp = tmstamp;
173+
s->session_seq_id = seq_id;
165174

166175
/* enqueue to packet ring to let app to handle */
167176
int ret = rte_ring_sp_enqueue(s->packet_ring, (void*)mbuf);
@@ -417,6 +426,7 @@ static int rx_ancillary_session_attach(struct mtl_main_impl* impl,
417426
s->st40_dst_port[i] = (ops->udp_port[i]) ? (ops->udp_port[i]) : (30000 + idx * 2);
418427
}
419428

429+
s->session_seq_id = -1;
420430
s->latest_seq_id[MTL_SESSION_PORT_P] = -1;
421431
s->latest_seq_id[MTL_SESSION_PORT_R] = -1;
422432
s->tmstamp = -1;
@@ -462,23 +472,32 @@ static void rx_ancillary_session_stat(struct st_rx_ancillary_session_impl* s) {
462472

463473
rte_atomic32_set(&s->stat_frames_received, 0);
464474

465-
notice("RX_ANC_SESSION(%d:%s): fps %f frames %d pkts %d\n", idx, s->ops_name, framerate,
466-
frames_received, s->stat_pkts_received);
467-
s->stat_pkts_received = 0;
468-
s->stat_last_time = cur_time_ns;
469-
470475
if (s->stat_pkts_redundant) {
471-
notice("RX_ANC_SESSION(%d): redundant pkts %d\n", idx, s->stat_pkts_redundant);
476+
notice("RX_ANC_SESSION(%d:%s): fps %f frames %d pkts %d (redundant %d)\n", idx,
477+
s->ops_name, framerate, frames_received, s->stat_pkts_received,
478+
s->stat_pkts_redundant);
472479
s->stat_pkts_redundant = 0;
480+
} else {
481+
notice("RX_ANC_SESSION(%d:%s): fps %f frames %d pkts %d\n", idx, s->ops_name,
482+
framerate, frames_received, s->stat_pkts_received);
473483
}
484+
s->stat_pkts_received = 0;
485+
s->stat_last_time = cur_time_ns;
486+
474487
if (s->stat_pkts_dropped) {
475488
notice("RX_ANC_SESSION(%d): dropped pkts %d\n", idx, s->stat_pkts_dropped);
476489
s->stat_pkts_dropped = 0;
477490
}
478491
if (s->stat_pkts_out_of_order) {
479-
warn("RX_ANC_SESSION(%d): out of order pkts %d\n", idx, s->stat_pkts_out_of_order);
492+
warn("RX_ANC_SESSION(%d): out of order pkts %d (%d:%d)\n", idx,
493+
s->stat_pkts_out_of_order,
494+
s->stat_pkts_out_of_order_per_port[MTL_SESSION_PORT_P],
495+
s->stat_pkts_out_of_order_per_port[MTL_SESSION_PORT_R]);
480496
s->stat_pkts_out_of_order = 0;
497+
s->stat_pkts_out_of_order_per_port[MTL_SESSION_PORT_P] = 0;
498+
s->stat_pkts_out_of_order_per_port[MTL_SESSION_PORT_R] = 0;
481499
}
500+
482501
if (s->stat_pkts_wrong_pt_dropped) {
483502
notice("RX_ANC_SESSION(%d): wrong hdr payload_type dropped pkts %d\n", idx,
484503
s->stat_pkts_wrong_pt_dropped);
@@ -546,6 +565,8 @@ static int rx_ancillary_session_update_src(struct mtl_main_impl* impl,
546565
s->st40_dst_port[i] = (ops->udp_port[i]) ? (ops->udp_port[i]) : (30000 + idx * 2);
547566
}
548567
/* reset seq id */
568+
569+
s->session_seq_id = -1;
549570
s->latest_seq_id[MTL_SESSION_PORT_P] = -1;
550571
s->latest_seq_id[MTL_SESSION_PORT_R] = -1;
551572
s->tmstamp = -1;

lib/src/st2110/st_rx_audio_session.c

Lines changed: 66 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -292,32 +292,42 @@ static int rx_audio_session_handle_frame_pkt(struct mtl_main_impl* impl,
292292
s->first_pkt_rtp_ts = tmstamp;
293293
}
294294

295+
if (unlikely(s->latest_seq_id[s_port] == -1)) s->latest_seq_id[s_port] = seq_id - 1;
296+
if (unlikely(s->session_seq_id == -1)) s->session_seq_id = seq_id - 1;
295297
if (unlikely(s->tmstamp == -1)) s->tmstamp = tmstamp - 1;
296298

297-
/* all packets need to have increasing tmstamp */
299+
/* redundant stream seq_id out of order is not a big deal as long as stream is continous
300+
*/
301+
if (seq_id != (uint16_t)(s->latest_seq_id[s_port] + 1)) {
302+
dbg("%s(%d,%d), non-continuous seq now %u last %d\n", __func__, s->idx, s_port,
303+
seq_id, s->latest_seq_id[s_port]);
304+
s->port_user_stats.common.port[s_port].out_of_order_packets++;
305+
s->stat_pkts_out_of_order_per_port[s_port]++;
306+
}
307+
s->latest_seq_id[s_port] = seq_id;
308+
309+
/* all packets need to have increasing timestamp */
298310
if (!mt_seq32_greater(tmstamp, s->tmstamp)) {
299311
dbg("%s(%d,%d), drop as pkt seq_id %u (%u) or tmstamp %u (%ld) is old\n", __func__,
300312
s->idx, s_port, seq_id, s->latest_seq_id[s_port], tmstamp, s->tmstamp);
313+
s->stat_pkts_redundant++;
301314
ST_SESSION_STAT_INC(s, port_user_stats, stat_pkts_redundant);
302-
s->latest_seq_id[s_port] = seq_id;
303315
return -EIO;
304316
}
305317
s->tmstamp = tmstamp;
306318

307-
if (unlikely(s->latest_seq_id[MTL_PORT_P] == -1))
308-
s->latest_seq_id[MTL_PORT_P] = seq_id - 1;
309-
if (unlikely(s->latest_seq_id[MTL_PORT_R] == -1))
310-
s->latest_seq_id[MTL_PORT_R] = seq_id - 1;
311-
312-
if (seq_id != (uint16_t)(s->latest_seq_id[s_port] + 1)) {
313-
dbg("%s(%d,%d), non-continuous seq now %u last %d\n", __func__, s->idx, s_port,
314-
seq_id, s->latest_seq_id[s_port]);
319+
/* hole in seq id packets going into the session check if the seq_id of the session is
320+
* consistent */
321+
if (seq_id != (uint16_t)(s->session_seq_id + 1)) {
322+
dbg("%s(%d,%d), session seq_id %u out of order %d\n", __func__, s->idx, s_port,
323+
seq_id, s->session_seq_id);
324+
s->stat_pkts_out_of_order++;
315325
ST_SESSION_STAT_INC(s, port_user_stats.common, stat_pkts_out_of_order);
316326
}
317327

318328
/* The package is accepted and goes into the frame */
319-
/* update seq id */
320-
s->latest_seq_id[s_port] = seq_id;
329+
330+
s->session_seq_id = seq_id;
321331

322332
if (!s->st30_cur_frame) {
323333
s->st30_cur_frame = rx_audio_session_get_frame(s);
@@ -423,6 +433,7 @@ static int rx_audio_session_handle_rtp_pkt(struct mtl_main_impl* impl,
423433

424434
uint16_t seq_id = ntohs(rtp->seq_number);
425435
uint8_t payload_type = rtp->payload_type;
436+
uint32_t tmstamp = ntohl(rtp->tmstamp);
426437

427438
if (ops->payload_type && (payload_type != ops->payload_type)) {
428439
dbg("%s(%d,%d), get payload_type %u but expect %u\n", __func__, s->idx, s_port,
@@ -441,23 +452,40 @@ static int rx_audio_session_handle_rtp_pkt(struct mtl_main_impl* impl,
441452
}
442453

443454
if (unlikely(s->latest_seq_id[s_port] == -1)) s->latest_seq_id[s_port] = seq_id - 1;
455+
if (unlikely(s->session_seq_id == -1)) s->session_seq_id = seq_id - 1;
456+
if (unlikely(s->tmstamp == -1)) s->tmstamp = tmstamp - 1;
457+
458+
/* redundant stream seq_id out of order is not a big deal as long as stream is continous
459+
*/
444460
if (seq_id != (uint16_t)(s->latest_seq_id[s_port] + 1)) {
445461
dbg("%s(%d,%d), non-continuous seq now %u last %d\n", __func__, s->idx, s_port,
446462
seq_id, s->latest_seq_id[s_port]);
447-
ST_SESSION_STAT_INC(s, port_user_stats.common, stat_pkts_out_of_order);
463+
s->port_user_stats.common.port[s_port].out_of_order_packets++;
464+
s->stat_pkts_out_of_order_per_port[s_port]++;
448465
}
466+
s->latest_seq_id[s_port] = seq_id;
449467

450-
/* Only works for redundant streams basicly drop packets that are old */
451-
if (st_rx_seq_redundant_drop(seq_id, s->latest_seq_id, s_port, s->ops.num_port)) {
452-
dbg("%s(%d,%d), drop as pkt seq %d is old\n", __func__, s->idx, s_port, seq_id);
453-
468+
/* all packets need to have increasing timestamp */
469+
if (!mt_seq32_greater(tmstamp, s->tmstamp)) {
470+
dbg("%s(%d,%d), drop as pkt seq_id %u (%u) or tmstamp %u (%ld) is old\n", __func__,
471+
s->idx, s_port, seq_id, s->latest_seq_id[s_port], tmstamp, s->tmstamp);
472+
s->stat_pkts_redundant++;
454473
ST_SESSION_STAT_INC(s, port_user_stats, stat_pkts_redundant);
455-
s->latest_seq_id[s_port] = seq_id;
456474
return -EIO;
457475
}
476+
s->tmstamp = tmstamp;
458477

459-
/* update seq id */
460-
s->latest_seq_id[s_port] = seq_id;
478+
/* hole in seq id packets going into the session check if the seq_id of the session is
479+
* consistent */
480+
if (seq_id != (uint16_t)(s->session_seq_id + 1)) {
481+
dbg("%s(%d,%d), session seq_id %u out of order %d\n", __func__, s->idx, s_port,
482+
seq_id, s->session_seq_id);
483+
s->stat_pkts_out_of_order++;
484+
ST_SESSION_STAT_INC(s, port_user_stats.common, stat_pkts_out_of_order);
485+
}
486+
487+
/* The package is accepted and goes into the frame */
488+
s->session_seq_id = seq_id;
461489

462490
/* enqueue the packet ring to app */
463491
int ret = rte_ring_sp_enqueue(s->st30_rtps_ring, (void*)mbuf);
@@ -605,10 +633,6 @@ static int rx_audio_session_tasklet(struct st_rx_audio_session_impl* s) {
605633
rv = mt_rxq_burst(s->rxq[s_port], &mbuf[0], ST_RX_AUDIO_BURST_SIZE);
606634
if (!rv) continue;
607635

608-
if (s_port == 1) {
609-
dbg("DEBUG");
610-
}
611-
612636
rx_audio_session_handle_mbuf(&s->priv[s_port], &mbuf[0], rv);
613637
rte_pktmbuf_free_bulk(&mbuf[0], rv);
614638
if (s->enable_timing_parser && s->tp) {
@@ -821,6 +845,7 @@ static int rx_audio_session_attach(struct mtl_main_impl* impl,
821845
s->st30_pkt_idx = 0;
822846
s->st30_frame_size = ops->framebuff_size;
823847

848+
s->session_seq_id = -1;
824849
s->latest_seq_id[MTL_SESSION_PORT_P] = -1;
825850
s->latest_seq_id[MTL_SESSION_PORT_R] = -1;
826851
s->tmstamp = -1;
@@ -897,21 +922,28 @@ static void rx_audio_session_stat(struct st_rx_audio_sessions_mgr* mgr,
897922

898923
rte_atomic32_set(&s->stat_frames_received, 0);
899924

900-
notice("RX_AUDIO_SESSION(%d,%d:%s): fps %f frames %d pkts %d\n", m_idx, idx,
901-
s->ops_name, framerate, frames_received, s->stat_pkts_received);
902-
s->stat_pkts_received = 0;
903-
s->stat_last_time = cur_time_ns;
904-
905925
if (s->stat_pkts_redundant) {
906-
notice("RX_AUDIO_SESSION(%d,%d): redundant pkts %d\n", m_idx, idx,
926+
notice("RX_AUDIO_SESSION(%d,%d:%s): fps %f frames %d pkts %d (redundant %d)\n", m_idx,
927+
idx, s->ops_name, framerate, frames_received, s->stat_pkts_received,
907928
s->stat_pkts_redundant);
908929
s->stat_pkts_redundant = 0;
930+
} else {
931+
info("RX_AUDIO_SESSION(%d,%d:%s): fps %f frames %d pkts %d\n", m_idx, idx,
932+
s->ops_name, framerate, frames_received, s->stat_pkts_received);
909933
}
934+
935+
s->stat_pkts_received = 0;
936+
s->stat_last_time = cur_time_ns;
910937
if (s->stat_pkts_out_of_order) {
911-
warn("RX_AUDIO_SESSION(%d,%d): out of order pkts %d\n", m_idx, idx,
912-
s->stat_pkts_out_of_order);
938+
warn("RX_AUDIO_SESSION(%d): out of order pkts %d (%d:%d)\n", idx,
939+
s->stat_pkts_out_of_order,
940+
s->stat_pkts_out_of_order_per_port[MTL_SESSION_PORT_P],
941+
s->stat_pkts_out_of_order_per_port[MTL_SESSION_PORT_R]);
913942
s->stat_pkts_out_of_order = 0;
943+
s->stat_pkts_out_of_order_per_port[MTL_SESSION_PORT_P] = 0;
944+
s->stat_pkts_out_of_order_per_port[MTL_SESSION_PORT_R] = 0;
914945
}
946+
915947
if (s->stat_pkts_dropped) {
916948
notice("RX_AUDIO_SESSION(%d,%d): dropped pkts %d\n", m_idx, idx,
917949
s->stat_pkts_dropped);
@@ -989,6 +1021,8 @@ static int rx_audio_session_update_src(struct mtl_main_impl* impl,
9891021
s->st30_dst_port[i] = (ops->udp_port[i]) ? (ops->udp_port[i]) : (20000 + idx * 2);
9901022
}
9911023
/* reset seq id */
1024+
1025+
s->session_seq_id = -1;
9921026
s->latest_seq_id[MTL_SESSION_PORT_P] = -1;
9931027
s->latest_seq_id[MTL_SESSION_PORT_R] = -1;
9941028
s->tmstamp = -1;

0 commit comments

Comments
 (0)