[sr-dev] git:master:f1f0dbbb: kazoo : add consistent worker binding option

Luis Azedo luis at 2600hz.com
Mon Mar 6 13:07:18 CET 2017


Module: kamailio
Branch: master
Commit: f1f0dbbbde0bfbe1162deca4f70c3c3969bf99d0
URL: https://github.com/kamailio/kamailio/commit/f1f0dbbbde0bfbe1162deca4f70c3c3969bf99d0

Author: Luis Azedo <luis at 2600hz.com>
Committer: Luis Azedo <luis at 2600hz.com>
Date: 2017-03-06T12:06:48Z

kazoo : add consistent worker binding option

---

Modified: src/modules/kazoo/kz_amqp.c
Modified: src/modules/kazoo/kz_amqp.h

---

Diff:  https://github.com/kamailio/kamailio/commit/f1f0dbbbde0bfbe1162deca4f70c3c3969bf99d0.diff
Patch: https://github.com/kamailio/kamailio/commit/f1f0dbbbde0bfbe1162deca4f70c3c3969bf99d0.patch

---

diff --git a/src/modules/kazoo/kz_amqp.c b/src/modules/kazoo/kz_amqp.c
index a962d99..05e8407 100644
--- a/src/modules/kazoo/kz_amqp.c
+++ b/src/modules/kazoo/kz_amqp.c
@@ -1551,6 +1551,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
 	int no_ack = 1;
 	int federate = 0;
 	int wait_for_consumer_ack = 1;
+    int consistent_worker = 0;
 
     json_obj_ptr json_obj = NULL;
 	struct json_object* tmpObj = NULL;
@@ -1607,6 +1608,11 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
     	federate = json_object_get_int(tmpObj);
     }
 
+    tmpObj = kz_json_get_object(json_obj, "consistent-worker");
+    if(tmpObj != NULL) {
+        consistent_worker = json_object_get_int(tmpObj);
+    }
+    
 	kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(&exchange_s, &exchange_type_s, &queue_s, &routing_key_s);
 	if(bind == NULL) {
 		LM_ERR("Could not allocate bind struct\n");
@@ -1620,6 +1626,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
 	bind->no_ack = no_ack;
 	bind->wait_for_consumer_ack = wait_for_consumer_ack;
 	bind->federate = federate;
+    bind->consistent_worker = consistent_worker;
 
 
 	kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding));
@@ -2571,6 +2578,7 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_
     kz_amqp_consumer_delivery_ptr ptr = NULL;
     str* message_id = NULL;
     int idx = envelope->channel-1;
+    int worker = 0;
 
     json_obj_ptr json_obj = kz_json_parse((char*)envelope->message.body.bytes);
     if (json_obj == NULL) {
@@ -2617,12 +2625,21 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_
 		ptr->event_subkey = kz_amqp_bytes_dup(bind->event_subkey);
 	}
 
-	consumer++;
-	if(consumer >= dbk_consumer_workers) {
-		consumer = 0;
-	}
+    if(bind && bind->consistent_worker) {
+        str rk;
+        rk.s = (char*)envelope->routing_key.bytes;
+        rk.len = (int)envelope->routing_key.len;
+        worker = core_hash(&rk, NULL, dbk_consumer_workers);
+        LM_DBG("computed worker for %.*s is %d\n", rk.len, rk.s, worker);
+    } else {
+        consumer++;
+        if(consumer >= dbk_consumer_workers) {
+            consumer = 0;
+        }
+        worker = consumer;
+    }
 
-	if (write(kz_worker_pipes[consumer], &ptr, sizeof(ptr)) != sizeof(ptr)) {
+	if (write(kz_worker_pipes[worker], &ptr, sizeof(ptr)) != sizeof(ptr)) {
 		LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer, strerror(errno), ptr->payload);
 		goto error;
 	}
diff --git a/src/modules/kazoo/kz_amqp.h b/src/modules/kazoo/kz_amqp.h
index 6aa36b8..7c5e6e8 100644
--- a/src/modules/kazoo/kz_amqp.h
+++ b/src/modules/kazoo/kz_amqp.h
@@ -183,6 +183,7 @@ typedef struct {
 	amqp_boolean_t no_ack;
 	amqp_boolean_t wait_for_consumer_ack;
 	amqp_boolean_t federate;
+    amqp_boolean_t consistent_worker;
 } kz_amqp_bind, *kz_amqp_bind_ptr;
 
 typedef struct {




More information about the sr-dev mailing list