Module: kamailio
Branch: master
Commit: a544b14e0bdc331820a505517a52477879734e7c
URL:
https://github.com/kamailio/kamailio/commit/a544b14e0bdc331820a505517a52477…
Author: Stefan Mititelu <stefan.mititelu(a)net2phone.com>
Committer: Daniel-Constantin Mierla <miconda(a)gmail.com>
Date: 2024-09-18T10:01:10+02:00
kafka: add modparam log_without_overflow
---
Modified: src/modules/kafka/kafka_mod.c
Modified: src/modules/kafka/kfk.c
---
Diff:
https://github.com/kamailio/kamailio/commit/a544b14e0bdc331820a505517a52477…
Patch:
https://github.com/kamailio/kamailio/commit/a544b14e0bdc331820a505517a52477…
---
diff --git a/src/modules/kafka/kafka_mod.c b/src/modules/kafka/kafka_mod.c
index e88bd37384b..89f3fcd3c42 100644
--- a/src/modules/kafka/kafka_mod.c
+++ b/src/modules/kafka/kafka_mod.c
@@ -66,6 +66,7 @@ static int w_kafka_send_key(
*/
int child_init_ok = 0;
int init_without_kafka = 0;
+int log_without_overflow = 0;
char *brokers_param = NULL; /**< List of brokers. */
static int kafka_conf_param(modparam_t type, void *val);
static int kafka_topic_param(modparam_t type, void *val);
@@ -86,7 +87,8 @@ static param_export_t params[] = {{"brokers", PARAM_STRING,
&brokers_param},
{"configuration", PARAM_STRING | USE_FUNC_PARAM,
(void *)kafka_conf_param},
{"topic", PARAM_STRING | USE_FUNC_PARAM, (void *)kafka_topic_param},
- {"init_without_kafka", PARAM_INT, &init_without_kafka}, {0, 0, 0}};
+ {"init_without_kafka", PARAM_INT, &init_without_kafka},
+ {"log_without_overflow", PARAM_INT, &log_without_overflow}, {0, 0, 0}};
/**
* \brief Kafka :: Module interface
diff --git a/src/modules/kafka/kfk.c b/src/modules/kafka/kfk.c
index 76a5fea75d9..68014b8b4fb 100644
--- a/src/modules/kafka/kfk.c
+++ b/src/modules/kafka/kfk.c
@@ -38,6 +38,7 @@
extern int child_init_ok;
extern int init_without_kafka;
+extern int log_without_overflow;
/**
* \brief data type for a configuration property.
@@ -124,6 +125,12 @@ static void kfk_logger(
const rd_kafka_t *rk, int level, const char *fac, const char *buf)
{
+ if(log_without_overflow && strstr(buf, "Connection refused") != NULL)
{
+ // libkafka will keep retrying to connect if kafka server is down
+ // FIX: ignore these types of errors not to get overflowed
+ return;
+ }
+
switch(level) {
case LOG_EMERG:
LM_NPRL("RDKAFKA fac: %s : %s : %s\n", fac,
@@ -190,8 +197,14 @@ static void kfk_msg_delivered(
kfk_stats_add(topic_name, rkmessage->err);
if(rkmessage->err) {
- LM_ERR("RDKAFKA Message delivery failed: %s\n",
- rd_kafka_err2str(rkmessage->err));
+ if(log_without_overflow) {
+ // libkafka will log all undelivered msgs as ERR
+ // FIX: ignore these types of errors not to get overflowed; check stats instead
+ ;
+ } else {
+ LM_ERR("RDKAFKA Message delivery failed: %s\n",
+ rd_kafka_err2str(rkmessage->err));
+ }
} else {
LM_DBG("RDKAFKA Message delivered (%zd bytes, offset %" PRId64 ",
"
"partition %" PRId32 "): %.*s\n",
@@ -865,7 +878,11 @@ int kfk_message_send(str *topic_name, str *message, str *key)
NULL)
== -1) {
rd_kafka_resp_err_t err = rd_kafka_last_error();
- LM_ERR("Error sending message: %s\n", rd_kafka_err2str(err));
+ if(!log_without_overflow) {
+ LM_ERR("Error sending message: %s\n", rd_kafka_err2str(err));
+ } else {
+ return 0;
+ }
return -1;
}