-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathrmc_pub_packet.c
115 lines (89 loc) · 3.46 KB
/
rmc_pub_packet.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// Copyright (C) 2018, Jaguar Land Rover
// This program is licensed under the terms and conditions of the
// Mozilla Public License, version 2.0. The full text of the
// Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
//
// Author: Magnus Feuer ([email protected])
#include "rmc_internal.h"
#include "rmc_log.h"
#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
int rmc_pub_queue_packet(rmc_pub_context_t* ctx,
void* payload,
payload_len_t payload_len,
uint8_t announce_flag)
{
pub_packet_t *pack;
if (!ctx || !payload)
return EINVAL;
if (payload_len > RMC_MAX_PAYLOAD) {
RMC_LOG_ERROR("Oversized packet [%d] bytes. Max size[%d]", payload_len, RMC_MAX_PAYLOAD);
return EMSGSIZE;
}
// Is outbound traffic suspended due to too many packets being in
// flight toward subscribers?
if (rmc_pub_traffic_suspended(ctx) == EBUSY)
return EBUSY;
ctx->traffic_suspended = 0;
pack = pub_next_queued_packet(&ctx->pub_ctx);
// If this is an announcement, force PID to 0.
// If this is a regular packet, let pub.c figure out the next PID to use.
if (announce_flag)
pub_queue_no_acknowledge_packet(&ctx->pub_ctx, payload, payload_len, user_data_nil());
else
pub_queue_packet(&ctx->pub_ctx, payload, payload_len, user_data_nil());
if (ctx->conn_vec.poll_modify) {
// Did we already have a packet pending for send prior
// to queueing the lastest packet? If so, old action
// was POLLWRITE, if not, it was 0.
(*ctx->conn_vec.poll_modify)(ctx->user_data,
ctx->mcast_send_descriptor,
RMC_MULTICAST_INDEX,
(pack?RMC_POLLWRITE:0),
RMC_POLLWRITE);
}
// Is it time to suspend traffic?
if (ctx->traffic_suspend_threshold > 0 &&
pub_get_unacknowledged_packet_count(&ctx->pub_ctx) >= ctx->traffic_suspend_threshold) {
RMC_LOG_INFO("Suspending traffic");
ctx->traffic_suspended = 1;
}
return 0;
}
uint32_t rmc_pub_queue_length(rmc_pub_context_t* ctx)
{
if (!ctx)
return 0;
return pub_queue_size(&ctx->pub_ctx);
}
int rmc_pub_traffic_suspended(rmc_pub_context_t* ctx)
{
if (!ctx)
return EINVAL;
return ctx->traffic_suspended?EBUSY:0;
}
static void _payload_free_generic(void* payload, payload_len_t payload_len, user_data_t user_data)
{
free(payload);
}
int rmc_pub_packet_ack(rmc_pub_context_t* ctx, rmc_connection_t* conn, packet_id_t pid)
{
pub_packet_ack(&ctx->subscribers[conn->connection_index],
pid,
// packet free function invoked when the last subscriber acks the packet.
ctx->payload_free?ctx->payload_free:_payload_free_generic);
// If we currently have suspended traffic,
// check if we are to resume it.
if (ctx->traffic_suspended) {
if (pub_get_unacknowledged_packet_count(&ctx->pub_ctx) <= ctx->traffic_resume_threshold) {
RMC_LOG_INFO("Resuming traffic");
ctx->traffic_suspended = 0;
} else
RMC_LOG_DEBUG("Still suspended unacked[%d] threshold[%d]",
pub_get_unacknowledged_packet_count(&ctx->pub_ctx),
ctx->traffic_resume_threshold);
}
return 0;
}