Skip to content

Commit

Permalink
Improvements (#4)
Browse files Browse the repository at this point in the history
* authentication + fixed topic support
  • Loading branch information
kingster authored Apr 30, 2022
1 parent ae643cb commit 7c3beaf
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 17 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ MODCFLAGS = -Wall -Werror
MODLDFLAGS = -lssl

CXX = g++
CXXFLAGS = -fPIC -g -ggdb -I/usr/include `pkg-config --cflags freeswitch` $(MODCFLAGS) -std=c++0x
CXXFLAGS = -fPIC -g -ggdb -I/usr/include `pkg-config --cflags freeswitch` $(MODCFLAGS) -std=c++17 -fpermissive -o2
LDFLAGS = `pkg-config --libs freeswitch` -lrdkafka -lz -lpthread -lrt $(MODLDFLAGS)

.PHONY: all
Expand All @@ -30,4 +30,4 @@ install: $(MODNAME)

.PHONY: release
release: $(MODNAME)
distribution/make-deb.sh
distribution/make-deb.sh
4 changes: 2 additions & 2 deletions distribution/make-deb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ FS_VERSION=$(dpkg -s libfreeswitch-dev | grep "Version" | cut -d ' ' -f 2)

sed -i "s/_VERSION_/$VERSION/g" $BUILD_ROOT/DEBIAN/control
sed -i "s/_FSVERSION_/$FS_VERSION/g" $BUILD_ROOT/DEBIAN/control
dpkg-deb --build $BUILD_ROOT freeswitch-mod-event-kafka.deb
dpkg-deb -Zgzip --build $BUILD_ROOT freeswitch-mod-event-kafka.deb

rm -rf $BUILD_ROOT
rm -rf $BUILD_ROOT
5 changes: 4 additions & 1 deletion event_kafka.conf.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
<configuration name="event_kafka.conf" description="Kafka Event Configuration">
<settings>
<param name="bootstrap-servers" value="localhost:9092"/>
<param name="topic-prefix" value="topic_name"/>
<param name="topic-prefix" value="topic_prefix"/>
<param name="topic" value="" /> <!-- set either topic-prefix or topic, incase both are defiend topic value would be used. -->
<param name="username" value="" /> <!-- set it only if you have sasl enabled on your kafka cluster -->
<param name="password" value="" />
<param name="buffer-size" value="16" />
</settings>
</configuration>
62 changes: 51 additions & 11 deletions mod_event_kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,19 @@

namespace mod_event_kafka {

template <typename T, std::size_t Muliplier = 1>
T* malloc_new()
{
return static_cast<T*>(std::malloc(sizeof(T) * Muliplier));
}

static switch_xml_config_item_t instructions[] = {
SWITCH_CONFIG_ITEM("bootstrap-servers", SWITCH_CONFIG_STRING, CONFIG_RELOADABLE, &globals.brokers,
"localhost:9092", NULL, "bootstrap-servers", "Kafka Bootstrap Brokers"),
SWITCH_CONFIG_ITEM("username", SWITCH_CONFIG_STRING, CONFIG_RELOADABLE, &globals.username, "", NULL, "username", "Username"),
SWITCH_CONFIG_ITEM("password", SWITCH_CONFIG_STRING, CONFIG_RELOADABLE, &globals.password, "", NULL, "password", "Password"),
SWITCH_CONFIG_ITEM("topic", SWITCH_CONFIG_STRING, CONFIG_RELOADABLE, &globals.topic,
"", NULL, "topic", "Kafka Topic"),
SWITCH_CONFIG_ITEM("topic-prefix", SWITCH_CONFIG_STRING, CONFIG_RELOADABLE, &globals.topic_prefix,
"fs", NULL, "topic-prefix", "Kafka Topic Prefix"),
SWITCH_CONFIG_ITEM("buffer-size", SWITCH_CONFIG_INT, CONFIG_RELOADABLE, &globals.buffer_size,
Expand All @@ -57,7 +67,7 @@ namespace mod_event_kafka {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Could not open event_kafka.conf\n");
return SWITCH_STATUS_FALSE;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "event_kafka.conf loaded [brokers: %s, prefix: %s, buffer-size: %d]", globals.brokers, globals.topic_prefix, globals.buffer_size);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "event_kafka.conf loaded [brokers: %s, topic/topic_prefix: %s/%s, username: %s, buffer-size: %d]", globals.brokers, globals.topic, globals.topic_prefix, globals.username, globals.buffer_size);
}
return SWITCH_STATUS_SUCCESS;
}
Expand Down Expand Up @@ -91,32 +101,62 @@ namespace mod_event_kafka {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, errstr);
}

if (globals.username && globals.username[0] != '\0') {
//username is set, set authentication params
if (rd_kafka_conf_set(conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, errstr);
}

if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, errstr);
}

if (rd_kafka_conf_set(conf, "sasl.username", globals.username, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, errstr);
}

if (rd_kafka_conf_set(conf, "sasl.password", globals.password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, errstr);
}
}

rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create new producer: %s \n", errstr);
}

std::string topic_str = std::string(globals.topic_prefix) + "_" + std::string(switch_core_get_switchname());
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "KafkaEventPublisher Topic : %s \n", topic_str.c_str());
//build topic if not defined
if (globals.topic && globals.topic[0] == '\0') {
std::string topic_str = std::string(globals.topic_prefix) + "_" + std::string(switch_core_get_switchname());
strcpy(globals.topic, topic_str.c_str());
}

switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "KafkaEventPublisher Topic : %s \n", globals.topic);

rd_kafka_topic_conf_t *tconf = rd_kafka_topic_conf_new();
rd_kafka_topic_conf_set(tconf, "message.timeout.ms", "30000", NULL, 0);

topic = rd_kafka_topic_new(producer, topic_str.c_str(), NULL);
topic = rd_kafka_topic_new(producer, globals.topic, NULL);
if (!topic) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create topic %s object: %s \n", topic_str.c_str(), rd_kafka_err2str(rd_kafka_last_error()));
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create topic %s object: %s \n", globals.topic, rd_kafka_err2str(rd_kafka_last_error()));
}

_initialized = 1;
_initialized = true;
}

void PublishEvent(switch_event_t *event) {

char *uuid = switch_event_get_header(event, "Channel-Call-UUID");
char *event_json = (char*)malloc(sizeof(char));
switch_event_serialize_json(event, &event_json);
char *event_json = malloc_new<char>();

const switch_status_t json_status = switch_event_serialize_json(event, &event_json);
if (json_status == SWITCH_STATUS_FALSE) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "json serialization failed in switch\n");
std::free(event_json);
return;
}

if(_initialized){
int resp = send(event_json, uuid ,0);
Expand All @@ -130,7 +170,7 @@ namespace mod_event_kafka {
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "PublishEvent without active KafkaPublisher\n %s \n",event_json);
delete uuid;
delete event_json;
std::free(event_json);
}
}

Expand Down Expand Up @@ -195,7 +235,7 @@ namespace mod_event_kafka {
}

int max_retry_limit = 3;
bool _initialized = 0;
bool _initialized = false;

rd_kafka_t *producer;
rd_kafka_topic_t *topic;
Expand Down Expand Up @@ -269,7 +309,7 @@ namespace mod_event_kafka {
//*****************************//
SWITCH_MODULE_LOAD_FUNCTION(mod_event_kafka_load) {
try {
module.reset(new KafkaModule(module_interface, pool));
module = std::make_unique<KafkaModule>(module_interface, pool);
return SWITCH_STATUS_SUCCESS;
} catch(...) { // Exceptions must not propogate to C caller
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error loading Kafka Event module\n");
Expand Down
5 changes: 4 additions & 1 deletion mod_event_kafka.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ namespace mod_event_kafka {
static struct {
char *brokers;
char *topic_prefix;
char *topic;
char *username;
char *password;
int buffer_size;
} globals;

Expand All @@ -21,4 +24,4 @@ namespace mod_event_kafka {
};
};

#endif // MOD_EVENT_KAFKA_H
#endif // MOD_EVENT_KAFKA_H

0 comments on commit 7c3beaf

Please sign in to comment.