Skip to content

Commit 6c99cd1

Browse files
committed
relay reconnect for push and pull
1 parent 4f48203 commit 6c99cd1

6 files changed

+331
-91
lines changed

ngx_live.c

Lines changed: 85 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,45 @@ ngx_live_put_stream(ngx_live_stream_t *st)
235235
++lcf->free_stream_count;
236236
}
237237

238+
ngx_relay_reconnect_t *
239+
ngx_live_get_relay_reconnect()
240+
{
241+
ngx_relay_reconnect_t *rc;
242+
ngx_live_conf_t *lcf;
243+
244+
lcf = (ngx_live_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
245+
ngx_live_module);
246+
247+
rc = lcf->free_reconnect;
248+
if (rc == NULL) {
249+
rc = ngx_pcalloc(lcf->pool, sizeof(ngx_relay_reconnect_t));
250+
++lcf->alloc_reconnect_count;
251+
} else {
252+
lcf->free_reconnect = rc->next;
253+
--lcf->free_reconnect_count;
254+
ngx_memzero(rc, sizeof(ngx_relay_reconnect_t));
255+
}
256+
257+
return rc;
258+
}
259+
260+
void
261+
ngx_live_put_relay_reconnect(ngx_relay_reconnect_t *rc)
262+
{
263+
ngx_live_conf_t *lcf;
264+
265+
lcf = (ngx_live_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
266+
ngx_live_module);
267+
268+
if (rc->reconnect.timer_set) {
269+
ngx_del_timer(&rc->reconnect);
270+
}
271+
272+
rc->next = lcf->free_reconnect;
273+
lcf->free_reconnect = rc;
274+
++lcf->free_reconnect_count;
275+
}
276+
238277
ngx_live_server_t *
239278
ngx_live_create_server(ngx_str_t *serverid)
240279
{
@@ -330,6 +369,7 @@ ngx_live_delete_stream(ngx_str_t *serverid, ngx_str_t *stream)
330369
{
331370
ngx_live_server_t **psrv;
332371
ngx_live_stream_t **pst, *st;
372+
ngx_relay_reconnect_t *rc;
333373

334374
psrv = ngx_live_find_server(serverid);
335375
if (*psrv == NULL) {
@@ -345,6 +385,19 @@ ngx_live_delete_stream(ngx_str_t *serverid, ngx_str_t *stream)
345385
}
346386

347387
st = *pst;
388+
389+
while (st->publish_reconnect) {
390+
rc = st->publish_reconnect;
391+
st->publish_reconnect = st->publish_reconnect->next;
392+
ngx_live_put_relay_reconnect(rc);
393+
}
394+
395+
while (st->play_reconnect) {
396+
rc = st->play_reconnect;
397+
st->play_reconnect = st->play_reconnect->next;
398+
ngx_live_put_relay_reconnect(rc);
399+
}
400+
348401
*pst = st->next;
349402
ngx_live_put_stream(st);
350403
--(*psrv)->n_stream;
@@ -405,63 +458,47 @@ ngx_live_delete_ctx(ngx_rtmp_session_t *s)
405458
}
406459
}
407460

408-
#if (NGX_DEBUG)
409-
static void
410-
ngx_live_print_stream(ngx_live_stream_t *st, size_t idx)
411-
{
412-
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
413-
"\t\t%z Stream(%p %s), next:%p", idx, st, st->name, st->next);
414-
}
415461

416-
static void
417-
ngx_live_print_server(ngx_live_server_t *srv, size_t idx)
462+
ngx_chain_t *
463+
ngx_live_state(ngx_http_request_t *r)
418464
{
419465
ngx_live_conf_t *lcf;
420-
ngx_live_stream_t *st;
421-
size_t i;
466+
ngx_chain_t *cl;
467+
ngx_buf_t *b;
468+
size_t len;
422469

423470
lcf = (ngx_live_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
424471
ngx_live_module);
425472

426-
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
427-
"\t%z Server(%p %s) n_stream:%ui, deleted:%d, next:%p",
428-
idx, srv, srv->serverid, srv->n_stream, srv->deleted, srv->next);
429-
430-
for (i = 0; i < lcf->stream_buckets; ++i) {
431-
st = srv->streams[i];
432-
while (st) {
433-
ngx_live_print_stream(st, i);
434-
st = st->next;
435-
}
436-
}
437-
}
438-
#endif
439-
440-
void
441-
ngx_live_print()
442-
{
443-
#if (NGX_DEBUG)
444-
ngx_live_conf_t *lcf;
445-
ngx_live_server_t *srv;
446-
size_t i;
447-
448-
lcf = (ngx_live_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
449-
ngx_live_module);
450473

451-
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
452-
"free server, alloc %ui, free %ui",
453-
lcf->alloc_server_count, lcf->free_server_count);
474+
len = sizeof("##########ngx live state##########\n") - 1
475+
+ sizeof("ngx_live nalloc server: \n") - 1 + NGX_OFF_T_LEN
476+
+ sizeof("ngx_live nfree server: \n") - 1 + NGX_OFF_T_LEN
477+
+ sizeof("ngx_live nalloc stream: \n") - 1 + NGX_OFF_T_LEN
478+
+ sizeof("ngx_live nfree stream: \n") - 1 + NGX_OFF_T_LEN
479+
+ sizeof("ngx_live nalloc reconnect: \n") - 1 + NGX_OFF_T_LEN
480+
+ sizeof("ngx_live nfree reconnect: \n") - 1 + NGX_OFF_T_LEN;
454481

455-
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
456-
"free stream, alloc %ui, free %ui",
457-
lcf->alloc_stream_count, lcf->free_stream_count);
482+
cl = ngx_alloc_chain_link(r->pool);
483+
if (cl == NULL) {
484+
return NULL;
485+
}
486+
cl->next = NULL;
458487

459-
for (i = 0; i < lcf->server_buckets; ++i) {
460-
srv = lcf->servers[i];
461-
while (srv) {
462-
ngx_live_print_server(srv, i);
463-
srv = srv->next;
464-
}
488+
b = ngx_create_temp_buf(r->pool, len);
489+
if (b == NULL) {
490+
return NULL;
465491
}
466-
#endif
492+
cl->buf = b;
493+
494+
b->last = ngx_snprintf(b->last, len,
495+
"##########ngx live state##########\n"
496+
"ngx_live nalloc server: %ui\nngx_live nfree server: %ui\n"
497+
"ngx_live nalloc stream: %ui\nngx_live nfree stream: %ui\n"
498+
"ngx_live nalloc reconnect: %ui\nngx_live nfree reconnect: %ui\n",
499+
lcf->alloc_server_count, lcf->free_server_count,
500+
lcf->alloc_stream_count, lcf->free_stream_count,
501+
lcf->alloc_reconnect_count, lcf->free_reconnect_count);
502+
503+
return cl;
467504
}

ngx_live.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,29 @@ typedef struct {
1818

1919
ngx_live_server_t *free_server;
2020
ngx_live_stream_t *free_stream;
21+
ngx_relay_reconnect_t *free_reconnect;
2122

2223
ngx_uint_t alloc_server_count;
2324
ngx_uint_t free_server_count;
2425

2526
ngx_uint_t alloc_stream_count;
2627
ngx_uint_t free_stream_count;
2728

29+
ngx_uint_t alloc_reconnect_count;
30+
ngx_uint_t free_reconnect_count;
31+
2832
ngx_pool_t *pool;
2933
} ngx_live_conf_t;
3034

3135

3236
extern ngx_module_t ngx_live_module;
3337

3438

39+
/*
40+
* paras:
41+
* r: http request to query status of rbuf
42+
*/
43+
ngx_chain_t *ngx_live_state(ngx_http_request_t *r);
44+
45+
3546
#endif

ngx_rtmp.h

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,8 +328,9 @@ typedef struct {
328328
#define NGX_LIVE_SERVERID_LEN 512
329329
#define NGX_LIVE_STREAM_LEN 512
330330

331-
typedef struct ngx_rtmp_core_ctx_s ngx_rtmp_core_ctx_t;
332-
typedef struct ngx_rtmp_live_ctx_s ngx_rtmp_live_ctx_t;
331+
typedef struct ngx_rtmp_core_ctx_s ngx_rtmp_core_ctx_t;
332+
typedef struct ngx_rtmp_live_ctx_s ngx_rtmp_live_ctx_t;
333+
typedef struct ngx_relay_reconnect_s ngx_relay_reconnect_t;
333334

334335
struct ngx_rtmp_core_ctx_s {
335336
ngx_rtmp_core_ctx_t *next;
@@ -338,6 +339,15 @@ struct ngx_rtmp_core_ctx_s {
338339
unsigned publishing:1;
339340
};
340341

342+
struct ngx_relay_reconnect_s {
343+
ngx_event_t reconnect;
344+
void *tag;
345+
void *data;
346+
ngx_live_stream_t *live_stream;
347+
348+
ngx_relay_reconnect_t *next;
349+
};
350+
341351
struct ngx_live_stream_s {
342352
u_char name[NGX_LIVE_STREAM_LEN];
343353

@@ -346,6 +356,9 @@ struct ngx_live_stream_s {
346356
ngx_rtmp_core_ctx_t *publish_ctx;
347357
ngx_rtmp_core_ctx_t *play_ctx;
348358

359+
ngx_relay_reconnect_t *publish_reconnect;
360+
ngx_relay_reconnect_t *play_reconnect;
361+
349362
ngx_live_stream_t *next;
350363

351364
/* for relay */
@@ -373,6 +386,9 @@ struct ngx_live_server_s {
373386
ngx_live_stream_t **streams;
374387
};
375388

389+
ngx_relay_reconnect_t *ngx_live_get_relay_reconnect();
390+
void ngx_live_put_relay_reconnect(ngx_relay_reconnect_t *rc);
391+
376392
ngx_live_server_t *ngx_live_create_server(ngx_str_t *serverid);
377393
ngx_live_server_t *ngx_live_fetch_server(ngx_str_t *serverid);
378394
void ngx_live_delete_server(ngx_str_t *serverid);

ngx_rtmp_cmd_module.c

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -505,8 +505,16 @@ ngx_rtmp_cmd_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
505505
return NGX_OK;
506506
}
507507

508+
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "closeStream");
509+
508510
s->closed = 1;
511+
512+
if (s->live_stream == NULL) { /* stream before publish or play status */
513+
return NGX_OK;
514+
}
515+
509516
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_core_module);
517+
510518
if (ctx->publishing) {
511519
--s->live_stream->publishers;
512520
} else {
@@ -541,8 +549,6 @@ ngx_rtmp_cmd_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
541549
}
542550
}
543551

544-
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "closeStream");
545-
546552
return NGX_OK;
547553
}
548554

0 commit comments

Comments
 (0)