<!-- Kamailio Pull Request Template -->
<!-- IMPORTANT: - for detailed contributing guidelines, read: https://github.com/kamailio/kamailio/blob/master/.github/CONTRIBUTING.md - pull requests must be done to master branch, unless they are backports of fixes from master branch to a stable branch - backports to stable branches must be done with 'git cherry-pick -x ...' - code is contributed under BSD for core and main components (tm, sl, auth, tls) - code is contributed GPLv2 or a compatible license for the other components - GPL code is contributed with OpenSSL licensing exception -->
#### Pre-Submission Checklist <!-- Go over all points below, and after creating the PR, tick all the checkboxes that apply --> <!-- All points should be verified, otherwise, read the CONTRIBUTING guidelines from above--> <!-- If you're unsure about any of these, don't hesitate to ask on sr-dev mailing list --> - [x] Commit message has the format required by CONTRIBUTING guide - [x] Commits are split per component (core, individual modules, libs, utils, ...) - [x] Each component has a single commit (if not, squash them into one commit) - [x] No commits to README files for modules (changes must be done to docbook files in `doc/` subfolder, the README file is autogenerated)
#### Type Of Change - [ ] Small bug fix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds new functionality) - [ ] Breaking change (fix or feature that would change existing functionality)
#### Checklist: <!-- Go over all points below, and after creating the PR, tick the checkboxes that apply --> - [ ] PR should be backported to stable branches - [x] Tested changes locally - [ ] Related to issue #XXXX (replace XXXX with an open issue number)
#### Description <!-- Describe your changes in detail -->
Module to produce and send messages to a Kafka server busing librdkafka library.
I copy part of README file here:
Chapter 1. Admin Guide
Table of Contents
1. Overview 2. Dependencies
2.1. Kamailio Modules 2.2. External Libraries or Applications 2.3. Parameters
2.3.1. brokers (string) 2.3.2. configuration (string) 2.3.3. topic (string)
2.4. Functions
2.4.1. kafka_send(topic, msg)
2.5. RPC Commands
2.5.1. kafka.stats 2.5.2. kafka.stats_topic
1. Overview
This module produces and sends messages to a Kafka server.
2. Dependencies
2.1. Kamailio Modules 2.2. External Libraries or Applications 2.3. Parameters
2.3.1. brokers (string) 2.3.2. configuration (string) 2.3.3. topic (string)
2.4. Functions
2.4.1. kafka_send(topic, msg)
2.5. RPC Commands
2.5.1. kafka.stats 2.5.2. kafka.stats_topic
2.1. Kamailio Modules
The following modules must be loaded before this module: * none.
2.2. External Libraries or Applications
The following libraries or applications must be installed before running Kamailio with this module loaded: * librdkafka: the Apache Kafka C/C++ client library. https://github.com/edenhill/librdkafka
2.3. Parameters
2.3.1. brokers (string)
Specifies a list of brokers separated by commas.
From librdkafka documentation:
brokerlist is a ,-separated list of brokers in the format: <broker1>,<broker2>,
Where each broker is in either the host or URL based format: * <host>[:<port>] * <proto>://<host>[:port]
<proto> is either PLAINTEXT, SSL, SASL, SASL_PLAINTEXT
The two formats can be mixed but ultimately the value of the security.protocol config property decides what brokers are allowed.
This parameter is mandatory. There is no default value.
Example 1.1. Set brokers parameter ... modparam("kafka", "brokers", "localhost:9092") modparam("kafka", "brokers", "broker1:10000,broker2") modparam("kafka", "brokers", "SSL://broker3:9000,ssl://broker2") ...
2.3.2. configuration (string)
Specifies a set of general properties.
Each configuration property follows: name = value pattern. And configuration properties are separated by ;
This parameter is optional, but if it exists it can be configured only once.
Example 1.2. Set configuration parameter ... modparam("kafka", "configuration", "topic.metadata.refresh.interval.ms=20000;que ue.buffering.max.messages=1000000;metadata.request.timeout.ms=90000")
modparam("kafka", "configuration", "topic.metadata.refresh.interval.ms=20000;que ue.buffering.max.messages=500000;debug=all;metadata.request.timeout.ms=900000") ...
2.3.3. topic (string)
Specifies a topic name and a set of topic properties.
The topic defined in topic parameter has to already exist in Kafka servers.
Each topic property is a list of attribute = value separated by semicolon.
name atribute indicates the topic name. It is mandatory. Other attributes mean names of properties and are optional.
This parameter is optional. Each topic needs a topic parameter so several topic parameters are allowed.
Example 1.3. Set topic parameter ... modparam("kafka", "topic", "name=my_topic;request.required.acks=0;request.timeou t.ms=10000") modparam("kafka", "topic", "name=second_topic;request.required.acks=0;request.ti meout.ms=10000") modparam("kafka", "topic", "name=third_topic") ...
2.4. Functions
2.4.1. kafka_send(topic, msg)
Send a message to a specific topic via Kafka server.
Parameters: * topic: (string) name of the topic. It is mandatory. * msg: (string) message to send. It is mandatory.
Available via KEMI framework as kafka.send.
Example 1.4. kafka_send usage ... # Send "test message" to topic "my_topic" kafka_send("my_topic", "test message"); ...
2.5. RPC Commands
2.5.1. kafka.stats
Show statistics about total sent messages and failed to deliver ones.
Example 1.5. kafka.stats usage ... kamcmd kafka.stats Total messages: 26 Errors: 0 ...
2.5.2. kafka.stats_topic
Show statistics about sent messages and failed to deliver ones for a specific topic.
Parameter: topic (string) name of the topic. Required.
Example 1.6. kafka.stats usage ... # Show statistics for my_topic. kamcmd kafka.stats_topic "my_topic" Topic: my_topic Total messages: 17 Errors: 0 ...
You can view, comment on, or merge this pull request online at:
https://github.com/kamailio/kamailio/pull/2112
-- Commit Summary --
* kafka: module to produce and send messages to a Kafka server
-- File Changes --
A src/modules/kafka/Makefile (23) A src/modules/kafka/doc/Makefile (4) A src/modules/kafka/doc/kafka.xml (41) A src/modules/kafka/doc/kafka_admin.xml (221) A src/modules/kafka/kafka_mod.c (335) A src/modules/kafka/kfk.c (1175) A src/modules/kafka/kfk.h (96)
-- Patch Links --
https://github.com/kamailio/kamailio/pull/2112.patch https://github.com/kamailio/kamailio/pull/2112.diff
henningw commented on this pull request.
Thank you for the pull request! It generally looks all fine, good documented and also doxygen docs added to many functions, great.
I've added a few comments/remarks, nothing serious but would be great if you could have a look.
break;
+ + case LOG_CRIT: + LM_CRIT("RDKAFKA fac: %s : %s : %s\n", + fac, rk ? rd_kafka_name(rk) : NULL, + buf); + break; + + case LOG_ERR: + LM_ERR("RDKAFKA fac: %s : %s : %s\n", + fac, rk ? rd_kafka_name(rk) : NULL, + buf); + break; + + case LOG_WARNING: + case LOG_NOTICE:
There is also a LM_NOTICE - might be good to add it as well, in case of somebody extending the module later on.
- */
+int kfk_init(char *brokers) +{ + LM_DBG("Initializing Kafka\n"); + + if (brokers == NULL) { + LM_ERR("brokers parameter not set\n"); + return -1; + } + + /* + * Create Kafka client configuration place-holder + */ + rk_conf = rd_kafka_conf_new(); + + /* Log level range 0-7 7 == LOG_DEBUG */
Consider to remove this debugging code before a merge into git master
- LM_DBG("Initializing statistics\n");
+ + stats_lock = lock_alloc(); + if (!stats_lock) { + LM_ERR("Cannot allocate stats lock\n"); + return -1; + } + + if(lock_init(stats_lock) == NULL) { + LM_ERR("cannot init stats lock\n"); + lock_dealloc(stats_lock); + stats_lock = NULL; + return -1; + } + + stats_general = shm_malloc(sizeof(kfk_stats_t));
Where you shm_free this stats_general memory? Or is this deleted together with the other elements?
<para>
+ <emphasis>none</emphasis>. + </para> + </listitem> + </itemizedlist> + </para> + </section> + <section> + <title>External Libraries or Applications</title> + <para> + The following libraries or applications must be installed before running + &kamailio; with this module loaded: + <itemizedlist> + <listitem> + <para> + <emphasis>librdkafka</emphasis>: the Apache Kafka C/C++ client library.
Do you require a specific version of the library?
<title>Set <varname>topic</varname> parameter</title>
+ <programlisting format="linespecific"> +... +modparam("kafka", "topic", "name=my_topic;request.required.acks=0;request.timeout.ms=10000") +modparam("kafka", "topic", "name=second_topic;request.required.acks=0;request.timeout.ms=10000") +modparam("kafka", "topic", "name=third_topic") +... + </programlisting> + </example> + </section> + </section> + <section> + <title>Functions</title> + <section id="kafka.f.kafka_send"> + <title> + <function moreinfo="none">kafka_send(topic, msg)</function>
Would be good to add a sentence about the return code of the function, that it returns -1 if for all failures etc..
- shared among every Kamailio process.
+ */ +static kfk_stats_t *stats_general; + +/* Static functions. */ +static void kfk_conf_free(kfk_conf_t *kconf); +static void kfk_topic_free(kfk_topic_t *ktopic); +static int kfk_conf_configure(); +static int kfk_topic_list_configure(); +static int kfk_topic_exist(str *topic_name); +static rd_kafka_topic_t* kfk_topic_get(str *topic_name); +static int kfk_stats_add(const char *topic, rd_kafka_resp_err_t err); +static void kfk_stats_topic_free(kfk_stats_t *st_topic); + +/** + * \name Logging_macros from /usr/include/sys/syslog.h
Wouldn't it make sense to include just the mentioned file - just wondered.
+/**
+ * \name Logging_macros from /usr/include/sys/syslog.h + */ +/*@{*/ +#define LOG_EMERG 0 /* system is unusable */ +#define LOG_ALERT 1 /* action must be taken immediately */ +#define LOG_CRIT 2 /* critical conditions */ +#define LOG_ERR 3 /* error conditions */ +#define LOG_WARNING 4 /* warning conditions */ +#define LOG_NOTICE 5 /* normal but significant condition */ +#define LOG_INFO 6 /* informational */ +#define LOG_DEBUG 7 /* debug-level messages */ +/*@}*/ + +/** + * \brief Kafka logger callback (optional)
You mention that this callback is optional, but you set it always - maybe adapt the comment.
@vhernando pushed 2 commits.
f5539642dfda861e884283da3adb83338979e7b6 kafka: associate LOG_NOTICE to LM_NOTICE in kfk_logger a58e2f8bf7912f8c1d756cda346e0665edce9122 kafka: delete some debug code
vhernando commented on this pull request.
- LM_DBG("Initializing statistics\n");
+ + stats_lock = lock_alloc(); + if (!stats_lock) { + LM_ERR("Cannot allocate stats lock\n"); + return -1; + } + + if(lock_init(stats_lock) == NULL) { + LM_ERR("cannot init stats lock\n"); + lock_dealloc(stats_lock); + stats_lock = NULL; + return -1; + } + + stats_general = shm_malloc(sizeof(kfk_stats_t));
Hi Henning! stats_general memory is freed in kfk_stats_close function node by node by kfk_stats_topic_free.
vhernando commented on this pull request.
<para>
+ <emphasis>none</emphasis>. + </para> + </listitem> + </itemizedlist> + </para> + </section> + <section> + <title>External Libraries or Applications</title> + <para> + The following libraries or applications must be installed before running + &kamailio; with this module loaded: + <itemizedlist> + <listitem> + <para> + <emphasis>librdkafka</emphasis>: the Apache Kafka C/C++ client library.
I tested the module against librdkafka version 1.1.0. Current stable version is 1.2.1 https://github.com/edenhill/librdkafka/releases. It should work too with some older releases.
@vhernando pushed 1 commit.
3052e563d80dd96de880bc68eed8f06bf6f6e753 kafka: delete optional word in kfk_logger function
@vhernando pushed 1 commit.
95dd5245ce2499f97ade9cf3f24b2ee52690880c kafka: get log levels from syslog.h header
@vhernando pushed 1 commit.
fa9228b45b29e29b807d03e267d2cb328b882b3d kafka: comment about error return code for kafka_send function
henningw commented on this pull request.
- LM_DBG("Initializing statistics\n");
+ + stats_lock = lock_alloc(); + if (!stats_lock) { + LM_ERR("Cannot allocate stats lock\n"); + return -1; + } + + if(lock_init(stats_lock) == NULL) { + LM_ERR("cannot init stats lock\n"); + lock_dealloc(stats_lock); + stats_lock = NULL; + return -1; + } + + stats_general = shm_malloc(sizeof(kfk_stats_t));
Thank you
henningw commented on this pull request.
<para>
+ <emphasis>none</emphasis>. + </para> + </listitem> + </itemizedlist> + </para> + </section> + <section> + <title>External Libraries or Applications</title> + <para> + The following libraries or applications must be installed before running + &kamailio; with this module loaded: + <itemizedlist> + <listitem> + <para> + <emphasis>librdkafka</emphasis>: the Apache Kafka C/C++ client library.
Ok, it would be probably good to add a sentence in this regards to the readme, to help our packagers and users of the module.
@vhernando pushed 1 commit.
4ae09ec3b1348df7c93c4a0e0f25e65acb223b49 kafka: comment about suitable versions of librdkafka library
Thanks for the changes. I think it you can merge it to git master now. If there are more changes or comments from other developers, it can be done directly in master.
Merged #2112 into master.