Module: kamailio
Branch: master
Commit: f27df5650cf0e5ac17d7ff25db67b27ad33cc6e3
URL:
https://github.com/kamailio/kamailio/commit/f27df5650cf0e5ac17d7ff25db67b27…
Author: Emmanuel Schmidbauer <emmanuel(a)getweave.com>
Committer: GitHub <noreply(a)github.com>
Date: 2016-12-08T13:44:33-07:00
Merge pull request #884 from kamailio/nsq-max-in-flight
nsq: use max_in_flight value
---
Modified: src/modules/nsq/nsq_mod.c
---
Diff:
https://github.com/kamailio/kamailio/commit/f27df5650cf0e5ac17d7ff25db67b27…
Patch:
https://github.com/kamailio/kamailio/commit/f27df5650cf0e5ac17d7ff25db67b27…
---
diff --git a/src/modules/nsq/nsq_mod.c b/src/modules/nsq/nsq_mod.c
index 61d6334..6c23603 100644
--- a/src/modules/nsq/nsq_mod.c
+++ b/src/modules/nsq/nsq_mod.c
@@ -263,7 +263,7 @@ int set_non_blocking(int fd)
/**
*
*/
-int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel)
+int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel, int
max_in_flight)
{
struct ev_loop *loop;
loop = ev_default_loop(0);
@@ -279,6 +279,7 @@ int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char
*channel)
LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel [%s]\n",
topic, channel);
// setup the reader
rdr = new_nsq_reader(loop, topic, channel, (void *)ctx, NULL, NULL, NULL,
nsq_message_handler);
+ rdr->max_in_flight = max_in_flight;
if (consumer_use_nsqd == 0) {
snprintf(address, 128, "%.*s", nsq_lookupd_address.len,
nsq_lookupd_address.s);
@@ -300,6 +301,11 @@ static int mod_child_init(int rank)
int pid;
int i;
int workers = dbn_consumer_workers / nsq_topic_channel_counter;
+ int max_in_flight = 1;
+
+ if (nsq_max_in_flight > 1) {
+ max_in_flight = nsq_max_in_flight;
+ }
fire_init_event(rank);
@@ -318,7 +324,7 @@ static int mod_child_init(int rank)
return -1; /* error */
if (pid==0){
close(nsq_worker_pipes_fds[i*2+1]);
- return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC,
DEFAULT_CHANNEL));
+ return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC,
DEFAULT_CHANNEL, max_in_flight));
}
}
} else {
@@ -329,7 +335,7 @@ static int mod_child_init(int rank)
return -1; /* error */
if (pid==0){
close(nsq_worker_pipes_fds[i*2+1]);
- return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic,
tc->channel));
+ return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic,
tc->channel, max_in_flight));
}
}
tc = tc->next;