Module: kamailio
Branch: master
Commit: 741bb148ddf4311679cfa6e379fa8bdbc8fac5e5
URL:
https://github.com/kamailio/kamailio/commit/741bb148ddf4311679cfa6e379fa8bd…
Author: Stefan Mititelu <stefan.mititelu(a)net2phone.com>
Committer: Daniel-Constantin Mierla <miconda(a)gmail.com>
Date: 2024-09-18T10:01:10+02:00
kafka: add modparam init_without_kafka
---
Modified: src/modules/kafka/kafka_mod.c
Modified: src/modules/kafka/kfk.c
---
Diff:
https://github.com/kamailio/kamailio/commit/741bb148ddf4311679cfa6e379fa8bd…
Patch:
https://github.com/kamailio/kamailio/commit/741bb148ddf4311679cfa6e379fa8bd…
---
diff --git a/src/modules/kafka/kafka_mod.c b/src/modules/kafka/kafka_mod.c
index 26369d6ca31..e88bd37384b 100644
--- a/src/modules/kafka/kafka_mod.c
+++ b/src/modules/kafka/kafka_mod.c
@@ -64,6 +64,8 @@ static int w_kafka_send_key(
/*
* Variables and functions to deal with module parameters.
*/
+int child_init_ok = 0;
+int init_without_kafka = 0;
char *brokers_param = NULL; /**< List of brokers. */
static int kafka_conf_param(modparam_t type, void *val);
static int kafka_topic_param(modparam_t type, void *val);
@@ -84,7 +86,7 @@ static param_export_t params[] = {{"brokers", PARAM_STRING,
&brokers_param},
{"configuration", PARAM_STRING | USE_FUNC_PARAM,
(void *)kafka_conf_param},
{"topic", PARAM_STRING | USE_FUNC_PARAM, (void *)kafka_topic_param},
- {0, 0, 0}};
+ {"init_without_kafka", PARAM_INT, &init_without_kafka}, {0, 0, 0}};
/**
* \brief Kafka :: Module interface
@@ -125,9 +127,15 @@ static int child_init(int rank)
if(rank == PROC_INIT || rank == PROC_TCP_MAIN)
return 0;
+ child_init_ok = 1;
if(kfk_init(brokers_param)) {
- LM_ERR("Failed to initialize Kafka\n");
- return -1;
+ child_init_ok = 0;
+ if(init_without_kafka) {
+ LM_ERR("Failed to initialize Kafka - continue\n");
+ } else {
+ LM_ERR("Failed to initialize Kafka\n");
+ return -1;
+ }
}
return 0;
}
diff --git a/src/modules/kafka/kfk.c b/src/modules/kafka/kfk.c
index c6dc8398da2..76a5fea75d9 100644
--- a/src/modules/kafka/kfk.c
+++ b/src/modules/kafka/kfk.c
@@ -36,6 +36,9 @@
#include "../../core/mem/shm_mem.h"
#include "../../core/locking.h"
+extern int child_init_ok;
+extern int init_without_kafka;
+
/**
* \brief data type for a configuration property.
*/
@@ -587,7 +590,9 @@ static int kfk_topic_configure(kfk_topic_t *ktopic)
}
int topic_found = kfk_topic_exist(ktopic->topic_name);
- if(topic_found == -1) {
+ if(init_without_kafka) {
+ ;
+ } else if(topic_found == -1) {
LM_ERR("Failed to search for topic %.*s in cluster\n",
ktopic->topic_name->len, ktopic->topic_name->s);
goto error;
@@ -828,6 +833,12 @@ int kfk_message_send(str *topic_name, str *message, str *key)
/* Get topic from name. */
rd_kafka_topic_t *rkt = kfk_topic_get(topic_name);
+ if(!child_init_ok) {
+ LM_ERR("kafka module is unusable: child init NOT ok! Skip sending "
+ "message, message lost!");
+ return -1;
+ }
+
if(!rkt) {
LM_ERR("Topic not found: %.*s\n", topic_name->len, topic_name->s);
return -1;