[sr-dev] git:master:67db972a: kazoo : fix, send timeout callback to consumer process

Luis Azedo luis.azedo at factorlusitano.com
Wed Jul 1 17:00:15 CEST 2015


Module: kamailio
Branch: master
Commit: 67db972a129a8f34ea7406618593df4eaf846a1b
URL: https://github.com/kamailio/kamailio/commit/67db972a129a8f34ea7406618593df4eaf846a1b

Author: Luis Azedo <luis at 2600hz.com>
Committer: Luis Azedo <luis.azedo at factorlusitano.com>
Date: 2015-07-01T15:59:57+01:00

kazoo : fix, send timeout callback to consumer process

---

Modified: modules/kazoo/kz_amqp.c

---

Diff:  https://github.com/kamailio/kamailio/commit/67db972a129a8f34ea7406618593df4eaf846a1b.diff
Patch: https://github.com/kamailio/kamailio/commit/67db972a129a8f34ea7406618593df4eaf846a1b.patch

---

diff --git a/modules/kazoo/kz_amqp.c b/modules/kazoo/kz_amqp.c
index 96227c0..06e1599 100644
--- a/modules/kazoo/kz_amqp.c
+++ b/modules/kazoo/kz_amqp.c
@@ -2034,7 +2034,32 @@ void kz_amqp_cb_error(kz_amqp_cmd_ptr cmd)
 	int n = route_get(&main_rt, cmd->err_route);
 	struct action *a = main_rt.rlist[n];
 	tmb.t_continue(cmd->t_hash, cmd->t_label, a);
-	kz_amqp_free_pipe_cmd(cmd);
+}
+
+int kz_send_worker_error_event(kz_amqp_cmd_ptr cmd)
+{
+	cmd->return_code = -1;
+	kz_amqp_consumer_delivery_ptr ptr = (kz_amqp_consumer_delivery_ptr) shm_malloc(sizeof(kz_amqp_consumer_delivery));
+	if(ptr == NULL) {
+		LM_ERR("NO MORE SHARED MEMORY!");
+		return 0;
+	}
+	memset(ptr, 0, sizeof(kz_amqp_consumer_delivery));
+	ptr->cmd = cmd;
+
+	consumer++;
+	if(consumer >= dbk_consumer_processes) {
+		consumer = 0;
+	}
+
+	if (write(kz_worker_pipes[consumer], &ptr, sizeof(ptr)) != sizeof(ptr)) {
+		LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer, strerror(errno), cmd->payload);
+		kz_amqp_free_consumer_delivery(ptr);
+		return 0;
+	}
+
+	return 1;
+
 }
 
 void kz_amqp_cmd_timeout_cb(int fd, short event, void *arg)
@@ -2047,7 +2072,7 @@ void kz_amqp_cmd_timeout_cb(int fd, short event, void *arg)
 				, retrieved_cmd ->message_id->len, retrieved_cmd ->message_id->s
 				);
 		if(retrieved_cmd->type == KZ_AMQP_CMD_ASYNC_CALL) {
-			kz_amqp_cb_error(retrieved_cmd);
+			kz_send_worker_error_event(retrieved_cmd);
 		} else {
 			retrieved_cmd->return_code = -1;
 			lock_release(&retrieved_cmd->lock);
@@ -2577,6 +2602,9 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_
     	}
 		if(idx < dbk_channels) {
 			cmd = kz_cmd_retrieve(message_id);
+			if(cmd)
+				cmd->return_code = AMQP_RESPONSE_NORMAL;
+
 			/*
 			if(cmd != NULL) {
 				cmd->return_code = 0;
@@ -2791,8 +2819,13 @@ void kz_amqp_consumer_worker_cb(int fd, short event, void *arg)
 	LM_DBG("consumer %d received payload %s\n", my_pid(), cmd->payload);
 
 	if(cmd->cmd) {
-		kz_amqp_set_last_result(cmd->payload);
-		kz_amqp_cb_ok(cmd->cmd);
+		if(cmd->cmd->return_code == AMQP_RESPONSE_NORMAL) {
+			kz_amqp_set_last_result(cmd->payload);
+			kz_amqp_cb_ok(cmd->cmd);
+		} else {
+			kz_amqp_reset_last_result();
+			kz_amqp_cb_error(cmd->cmd);
+		}
 	} else {
 		kz_amqp_consumer_event(cmd->payload, cmd->event_key, cmd->event_subkey);
 	}




More information about the sr-dev mailing list