Module: kamailio Branch: 5.0 Commit: 9abd1e002e0b3f7a884093e7e76efc8afdd54a17 URL: https://github.com/kamailio/kamailio/commit/9abd1e002e0b3f7a884093e7e76efc8a...
Author: Sebastian Damm damm@sipgate.de Committer: Sebastian Damm damm@sipgate.de Date: 2017-05-12T15:34:39+02:00
rabbitmq: don't create reply-to queue on publish
When using the rabbitmq_publish function, there is no need to create a reply to queue, because it will never be read. And since there is never a real consumer, so the queue will never be deleted. This will eventually cloak up the RabbitMQ server with millions of generic reply queues. This bug has been fixed in master already, so this is basically a backport.
---
Modified: src/modules/rabbitmq/rabbitmq.c
---
Diff: https://github.com/kamailio/kamailio/commit/9abd1e002e0b3f7a884093e7e76efc8a... Patch: https://github.com/kamailio/kamailio/commit/9abd1e002e0b3f7a884093e7e76efc8a...
---
diff --git a/src/modules/rabbitmq/rabbitmq.c b/src/modules/rabbitmq/rabbitmq.c index 4cd69c9..05fa750 100644 --- a/src/modules/rabbitmq/rabbitmq.c +++ b/src/modules/rabbitmq/rabbitmq.c @@ -175,7 +175,6 @@ static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange, char* in_rou int reconnect_attempts = 0; int log_ret; str exchange, routingkey, messagebody, contenttype; - amqp_bytes_t reply_to_queue;
// sanity checks if (get_str_fparam(&exchange, msg, (fparam_t*)in_exchange) < 0) { @@ -231,44 +230,13 @@ static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange, char* in_rou return RABBITMQ_ERR_CHANNEL; }
- // alloc queue - amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table); - if (log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_queue_declare()") != AMQP_RESPONSE_NORMAL) { - LM_ERR("FAIL: amqp_queue_declare()\n"); - amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); - return RABBITMQ_ERR_QUEUE; - } - - // alloc bytes - reply_to_queue = amqp_bytes_malloc_dup(r->queue); - LM_DBG("%.*s\n", (int)reply_to_queue.len, (char*)reply_to_queue.bytes); - if (reply_to_queue.bytes == NULL) { - amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); - amqp_bytes_free(reply_to_queue); - LM_ERR("Out of memory while copying queue name"); - return -1; - } - // alloc properties amqp_basic_properties_t props; props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | - AMQP_BASIC_REPLY_TO_FLAG | AMQP_BASIC_CORRELATION_ID_FLAG; props.content_type = amqp_cstring_bytes(contenttype.s); props.delivery_mode = 2; /* persistent delivery mode */ - props.reply_to = amqp_bytes_malloc_dup(reply_to_queue); - if (props.reply_to.bytes == NULL) { - // debug - LM_ERR("Out of memory while copying queue name"); - - // cleanup - amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); - amqp_bytes_free(reply_to_queue); - - // error - return -1; - } props.correlation_id = amqp_cstring_bytes("1");
// publish @@ -285,7 +253,6 @@ static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange, char* in_rou
// cleanup amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); - amqp_bytes_free(reply_to_queue);
// error return RABBITMQ_ERR_PUBLISH; @@ -295,8 +262,6 @@ static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange, char* in_rou LM_DBG("SUCCESS: amqp_basic_publish()\n");
// cleanup - amqp_bytes_free(props.reply_to); - amqp_bytes_free(reply_to_queue); amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
// success