Module: kamailio Branch: master Commit: f1f0dbbbde0bfbe1162deca4f70c3c3969bf99d0 URL: https://github.com/kamailio/kamailio/commit/f1f0dbbbde0bfbe1162deca4f70c3c39...
Author: Luis Azedo luis@2600hz.com Committer: Luis Azedo luis@2600hz.com Date: 2017-03-06T12:06:48Z
kazoo : add consistent worker binding option
---
Modified: src/modules/kazoo/kz_amqp.c Modified: src/modules/kazoo/kz_amqp.h
---
Diff: https://github.com/kamailio/kamailio/commit/f1f0dbbbde0bfbe1162deca4f70c3c39... Patch: https://github.com/kamailio/kamailio/commit/f1f0dbbbde0bfbe1162deca4f70c3c39...
---
diff --git a/src/modules/kazoo/kz_amqp.c b/src/modules/kazoo/kz_amqp.c index a962d99..05e8407 100644 --- a/src/modules/kazoo/kz_amqp.c +++ b/src/modules/kazoo/kz_amqp.c @@ -1551,6 +1551,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload) int no_ack = 1; int federate = 0; int wait_for_consumer_ack = 1; + int consistent_worker = 0;
json_obj_ptr json_obj = NULL; struct json_object* tmpObj = NULL; @@ -1607,6 +1608,11 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload) federate = json_object_get_int(tmpObj); }
+ tmpObj = kz_json_get_object(json_obj, "consistent-worker"); + if(tmpObj != NULL) { + consistent_worker = json_object_get_int(tmpObj); + } + kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(&exchange_s, &exchange_type_s, &queue_s, &routing_key_s); if(bind == NULL) { LM_ERR("Could not allocate bind struct\n"); @@ -1620,6 +1626,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload) bind->no_ack = no_ack; bind->wait_for_consumer_ack = wait_for_consumer_ack; bind->federate = federate; + bind->consistent_worker = consistent_worker;
kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding)); @@ -2571,6 +2578,7 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_ kz_amqp_consumer_delivery_ptr ptr = NULL; str* message_id = NULL; int idx = envelope->channel-1; + int worker = 0;
json_obj_ptr json_obj = kz_json_parse((char*)envelope->message.body.bytes); if (json_obj == NULL) { @@ -2617,12 +2625,21 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_ ptr->event_subkey = kz_amqp_bytes_dup(bind->event_subkey); }
- consumer++; - if(consumer >= dbk_consumer_workers) { - consumer = 0; - } + if(bind && bind->consistent_worker) { + str rk; + rk.s = (char*)envelope->routing_key.bytes; + rk.len = (int)envelope->routing_key.len; + worker = core_hash(&rk, NULL, dbk_consumer_workers); + LM_DBG("computed worker for %.*s is %d\n", rk.len, rk.s, worker); + } else { + consumer++; + if(consumer >= dbk_consumer_workers) { + consumer = 0; + } + worker = consumer; + }
- if (write(kz_worker_pipes[consumer], &ptr, sizeof(ptr)) != sizeof(ptr)) { + if (write(kz_worker_pipes[worker], &ptr, sizeof(ptr)) != sizeof(ptr)) { LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer, strerror(errno), ptr->payload); goto error; } diff --git a/src/modules/kazoo/kz_amqp.h b/src/modules/kazoo/kz_amqp.h index 6aa36b8..7c5e6e8 100644 --- a/src/modules/kazoo/kz_amqp.h +++ b/src/modules/kazoo/kz_amqp.h @@ -183,6 +183,7 @@ typedef struct { amqp_boolean_t no_ack; amqp_boolean_t wait_for_consumer_ack; amqp_boolean_t federate; + amqp_boolean_t consistent_worker; } kz_amqp_bind, *kz_amqp_bind_ptr;
typedef struct {