Module: kamailio
Branch: master
Commit: a850af45acc4a433706120110d1cb91cd885b33b
URL:
https://github.com/kamailio/kamailio/commit/a850af45acc4a433706120110d1cb91…
Author: Emmanuel Schmidbauer <emmanuel(a)getweave.com>
Committer: GitHub <noreply(a)github.com>
Date: 2017-02-09T16:06:40-05:00
Merge pull request #982 from kamailio/NSQ-child-process-rank
nsq: do not use PROC_SIPINIT rank for consumer workers
---
Modified: src/modules/nsq/nsq_mod.c
Modified: src/modules/nsq/nsq_mod.h
---
Diff:
https://github.com/kamailio/kamailio/commit/a850af45acc4a433706120110d1cb91…
Patch:
https://github.com/kamailio/kamailio/commit/a850af45acc4a433706120110d1cb91…
---
diff --git a/src/modules/nsq/nsq_mod.c b/src/modules/nsq/nsq_mod.c
index cf6c978..12a9d69 100644
--- a/src/modules/nsq/nsq_mod.c
+++ b/src/modules/nsq/nsq_mod.c
@@ -159,7 +159,6 @@ static int fire_init_event(int rank)
static int mod_init(void)
{
- int i;
startup_time = (int) time(NULL);
if (dbn_pua_mode == 1) {
@@ -208,26 +207,6 @@ static int mod_init(void)
register_procs(total_workers);
cfg_register_child(total_workers);
- if (pipe(nsq_cmd_pipe_fds) < 0) {
- LM_ERR("cmd pipe() failed\n");
- return -1;
- }
-
- nsq_worker_pipes_fds = (int*) shm_malloc(sizeof(int) * (dbn_consumer_workers) * 2 );
- nsq_worker_pipes = (int*) shm_malloc(sizeof(int) * dbn_consumer_workers);
- for (i=0; i < dbn_consumer_workers; i++) {
- nsq_worker_pipes_fds[i*2] = nsq_worker_pipes_fds[i*2+1] = -1;
- if (pipe(&nsq_worker_pipes_fds[i*2]) < 0) {
- LM_ERR("worker pipe(%d) failed\n", i);
- return -1;
- }
- }
-
- nsq_cmd_pipe = nsq_cmd_pipe_fds[1];
- for (i=0; i < dbn_consumer_workers; i++) {
- nsq_worker_pipes[i] = nsq_worker_pipes_fds[i*2+1];
- }
-
return 0;
}
@@ -240,25 +219,10 @@ int mod_register(char *path, int *dlflags, void *p1, void *p2)
return register_trans_mod(path, mod_trans);
}
-
-int set_non_blocking(int fd)
-{
- int flags;
-
- flags = fcntl(fd, F_GETFL);
- if (flags < 0)
- return flags;
- flags |= O_NONBLOCK;
- if (fcntl(fd, F_SETFL, flags) < 0)
- return -1;
-
- return 0;
-}
-
/**
*
*/
-int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel, int
max_in_flight)
+void nsq_consumer_worker_proc(char *topic, char *channel, int max_in_flight)
{
struct ev_loop *loop;
loop = ev_default_loop(0);
@@ -269,7 +233,6 @@ int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel,
int max_i
if (loop == NULL) {
LM_ERR("cannot get libev loop\n");
}
- set_non_blocking(cmd_pipe);
LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel [%s]\n",
topic, channel);
// setup the reader
@@ -285,7 +248,6 @@ int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel,
int max_i
}
nsq_run(loop);
- return 0;
}
/**
@@ -318,8 +280,10 @@ static int mod_child_init(int rank)
if (pid<0)
return -1; /* error */
if (pid==0){
- close(nsq_worker_pipes_fds[i*2+1]);
- return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC,
DEFAULT_CHANNEL, max_in_flight));
+ if (cfg_child_init()) return -1;
+ nsq_consumer_worker_proc(DEFAULT_TOPIC, DEFAULT_CHANNEL, max_in_flight);
+ LM_CRIT("nsq_consumer_worker_proc():: worker_process finished without
exit!\n");
+ exit(-1);
}
}
} else {
@@ -329,8 +293,10 @@ static int mod_child_init(int rank)
if (pid<0)
return -1; /* error */
if (pid==0){
- close(nsq_worker_pipes_fds[i*2+1]);
- return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic,
tc->channel, max_in_flight));
+ if (cfg_child_init()) return -1;
+ nsq_consumer_worker_proc(tc->topic, tc->channel, max_in_flight);
+ LM_CRIT("nsq_consumer_worker_proc():: worker_process finished without
exit!\n");
+ exit(-1);
}
}
tc = tc->next;
@@ -367,6 +333,4 @@ static int mod_child_init(int rank)
*/
static void mod_destroy(void) {
free_tc_list(tc_list);
- shm_free(nsq_worker_pipes_fds);
- shm_free(nsq_worker_pipes);
}
diff --git a/src/modules/nsq/nsq_mod.h b/src/modules/nsq/nsq_mod.h
index bbbc32c..72323a4 100644
--- a/src/modules/nsq/nsq_mod.h
+++ b/src/modules/nsq/nsq_mod.h
@@ -72,10 +72,6 @@ char nsq_json_escape_char = '%';
int nsq_topic_channel_counter = 0;
int dbn_consumer_workers = DBN_DEFAULT_NO_WORKERS;
int startup_time = 0;
-int *nsq_worker_pipes_fds = NULL;
-int *nsq_worker_pipes = NULL;
-int nsq_cmd_pipe = 0;
-int nsq_cmd_pipe_fds[2] = {-1,-1};
/* database connection */
db1_con_t *nsq_pa_db = NULL;