[sr-dev] git:master:29aec680: kazoo: add consistent worker key

lazedo luis.azedo at factorlusitano.com
Fri Apr 14 06:49:07 CEST 2017


Module: kamailio
Branch: master
Commit: 29aec6800099a3b1d7a2b2c7e6d4b3865f0bc6e6
URL: https://github.com/kamailio/kamailio/commit/29aec6800099a3b1d7a2b2c7e6d4b3865f0bc6e6

Author: lazedo <luis.azedo at factorlusitano.com>
Committer: lazedo <luis.azedo at factorlusitano.com>
Date: 2017-04-14T05:47:52+01:00

kazoo: add consistent worker key

---

Modified: src/modules/kazoo/kz_amqp.c
Modified: src/modules/kazoo/kz_amqp.h

---

Diff:  https://github.com/kamailio/kamailio/commit/29aec6800099a3b1d7a2b2c7e6d4b3865f0bc6e6.diff
Patch: https://github.com/kamailio/kamailio/commit/29aec6800099a3b1d7a2b2c7e6d4b3865f0bc6e6.patch

---

diff --git a/src/modules/kazoo/kz_amqp.c b/src/modules/kazoo/kz_amqp.c
index f3a7c2c..d759490 100644
--- a/src/modules/kazoo/kz_amqp.c
+++ b/src/modules/kazoo/kz_amqp.c
@@ -384,6 +384,8 @@ void kz_amqp_free_bind(kz_amqp_bind_ptr bind)
 		kz_amqp_bytes_free(bind->event_key);
 	if(bind->event_subkey.bytes)
 		kz_amqp_bytes_free(bind->event_subkey);
+	if(bind->consistent_worker_key)
+		shm_free(bind->consistent_worker_key);
 	shm_free(bind);
 }
 
@@ -1771,6 +1773,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
 	int no_ack = 1;
 	int federate = 0;
 	int consistent_worker = 0;
+	str* consistent_worker_key = NULL;
 	int wait_for_consumer_ack = 1;
 	kz_amqp_queue_ptr queue = NULL;
 	kz_amqp_exchange_ptr exchange = NULL;
@@ -1825,6 +1828,11 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
 		consistent_worker = json_object_get_int(tmpObj);
 	}
 
+	tmpObj = kz_json_get_object(json_obj, "consistent-worker-key");
+	if(tmpObj != NULL) {
+		consistent_worker_key = kz_str_dup_from_char((char*)json_object_get_string(tmpObj));
+	}
+
 	tmpObj = kz_json_get_object(json_obj, "exchange-bindings");
 	if(tmpObj != NULL) {
 		exchange_binding = kz_amqp_exchange_binding_from_json(tmpObj);
@@ -1853,6 +1861,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
 	bind->wait_for_consumer_ack = wait_for_consumer_ack;
 	bind->federate = federate;
 	bind->consistent_worker = consistent_worker;
+	bind->consistent_worker_key = consistent_worker_key;
 
 
 	kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding));
@@ -2866,6 +2875,8 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e
     char buffer[100];
     kz_amqp_cmd_ptr cmd = NULL;
     kz_amqp_consumer_delivery_ptr ptr = NULL;
+    json_obj_ptr json_obj = NULL;
+    json_object* JObj = NULL;
     str* message_id = NULL;
     int idx = envelope->channel-1;
     int worker = 0;
@@ -2878,7 +2889,7 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e
     }
     memset(json_data, 0, msg_size + 1);
     memcpy(json_data, (char*)envelope->message.body.bytes, msg_size);
-    json_obj_ptr json_obj = kz_json_parse(json_data);
+    json_obj = kz_json_parse(json_data);
     pkg_free(json_data);
     if (json_obj == NULL) {
     	LM_ERR("error parsing json body\n");
@@ -2888,7 +2899,7 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e
     json_object_object_add(json_obj, BLF_JSON_BROKER_ZONE, json_object_new_string(server_ptr->zone->zone));
 
 
-    json_object* JObj = kz_json_get_object(json_obj, BLF_JSON_SERVERID);
+    JObj = kz_json_get_object(json_obj, BLF_JSON_SERVERID);
     if(JObj != NULL) {
         const char* _kz_server_id_str = json_object_get_string(JObj);
         sprintf(buffer, "consumer://%d/%s", _kz_server_id, _kz_server_id_str);
@@ -2932,8 +2943,14 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e
 
     if(bind && bind->consistent_worker) {
         str rk;
-        rk.s = (char*)envelope->routing_key.bytes;
-        rk.len = (int)envelope->routing_key.len;
+    	if(bind->consistent_worker_key != NULL &&
+    			(JObj = kz_json_get_object(json_obj, bind->consistent_worker_key->s)) != NULL) {
+    		rk.s = (char*)json_object_get_string(JObj);
+    		rk.len = strlen(rk.s);
+    	} else {
+    		rk.s = (char*)envelope->routing_key.bytes;
+    		rk.len = (int)envelope->routing_key.len;
+    	}
         worker = core_hash(&rk, NULL, dbk_consumer_workers);
         LM_DBG("computed worker for %.*s is %d\n", rk.len, rk.s, worker);
     } else {
diff --git a/src/modules/kazoo/kz_amqp.h b/src/modules/kazoo/kz_amqp.h
index c412156..b4a88ff 100644
--- a/src/modules/kazoo/kz_amqp.h
+++ b/src/modules/kazoo/kz_amqp.h
@@ -217,6 +217,7 @@ typedef struct {
 	amqp_boolean_t wait_for_consumer_ack;
 	amqp_boolean_t federate;
     amqp_boolean_t consistent_worker;
+    str* consistent_worker_key;
 } kz_amqp_bind, *kz_amqp_bind_ptr;
 
 typedef struct {




More information about the sr-dev mailing list