[sr-dev] git:master: dmq: add the following config functions:

Charles Chance charles.chance at sipcentric.com
Wed Sep 10 02:20:02 CEST 2014

Module: sip-router
Branch: master
Commit: d99a9c7f47613078f9d8ab42959e954fc58518f7
URL:    http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=d99a9c7f47613078f9d8ab42959e954fc58518f7

Author: Charles Chance <charles.chance at sipcentric.com>
Committer: Charles Chance <charles.chance at sipcentric.com>
Date:   Tue Sep  9 23:42:35 2014 +0100

dmq: add the following config functions:

- dmq_t_replicate(): Replicates current SIP message to all active DMQ nodes. Includes built-in loop detection which can be skipped with optional parameter.
- dmq_is_from_node(): Checks whether current message has come from another node, based on source IP.


 modules/dmq/dmq.c       |    6 ++++
 modules/dmq/dmq_funcs.c |   73 +++++++++++++++++++++++++++++++++++++++++++++++
 modules/dmq/dmq_funcs.h |    2 +
 3 files changed, 81 insertions(+), 0 deletions(-)

diff --git a/modules/dmq/dmq.c b/modules/dmq/dmq.c
index 30405f1..06d9cfb 100644
--- a/modules/dmq/dmq.c
+++ b/modules/dmq/dmq.c
@@ -97,6 +97,12 @@ static cmd_export_t cmds[] = {
         {"dmq_bcast_message", (cmd_function)cfg_dmq_bcast_message, 3, bcast_dmq_fixup, 0,
+	{"dmq_t_replicate",  (cmd_function)cfg_dmq_t_replicate, 0, 0, 0,
+        {"dmq_t_replicate",  (cmd_function)cfg_dmq_t_replicate, 1, fixup_spve_null, 0,
+                REQUEST_ROUTE},
+        {"dmq_is_from_node",  (cmd_function)cfg_dmq_is_from_node, 0, 0, 0,
+                REQUEST_ROUTE},
         {"bind_dmq",        (cmd_function)bind_dmq,       0, 0,              0},
 	{0, 0, 0, 0, 0, 0}
diff --git a/modules/dmq/dmq_funcs.c b/modules/dmq/dmq_funcs.c
index 6b09b2a..3c9c8c3 100644
--- a/modules/dmq/dmq_funcs.c
+++ b/modules/dmq/dmq_funcs.c
@@ -114,6 +114,30 @@ int build_uri_str(str* username, struct sip_uri* uri, str* from)
 	return 0;
+/* Checks if the request (sip_msg_t* msg) comes from another DMQ node based on source IP. */
+int is_from_remote_node(sip_msg_t* msg)
+	ip_addr_t* ip;
+        dmq_node_t* node;
+        int result = -1;
+	ip = &msg->rcv.src_ip;
+	lock_get(&node_list->lock);
+	node = node_list->nodes;
+	while(node) {
+		if (!node->local && ip_addr_cmp(ip, &node->ip_address)) {
+			result = 1;
+			goto done;
+		}
+		node = node->next;
+        }
+        lock_release(&node_list->lock);
+	return result;
  * @brief broadcast a dmq message
@@ -340,6 +364,55 @@ error:
 	return -1;
+ * @brief config file function for replicating SIP message to all nodes (wraps t_replicate)
+ */
+int cfg_dmq_t_replicate(struct sip_msg* msg, char* s)
+	dmq_node_t* node;
+	int i = 0;
+	/* avoid loops - do not replicate if message has come from another node
+	 * (override if optional parameter is set)
+	 */
+	if ((!s || (get_int_fparam(&i, msg, (fparam_t*)s)==0 && !i))
+			&& (is_from_remote_node(msg) > 0)) {
+		LM_DBG("message is from another node - skipping replication\n");
+		return -1;
+	}
+	lock_get(&node_list->lock);
+	node = node_list->nodes;
+	while(node) {
+		/* we do not send the message to the following:
+		 *   - ourself
+		 *   - any inactive nodes
+		 */
+                if(node->local || node->status != DMQ_NODE_ACTIVE) {
+                        LM_DBG("skipping node %.*s\n", STR_FMT(&node->orig_uri));
+                        node = node->next;
+			continue;
+		}
+		if(tmb.t_replicate(msg, &node->orig_uri) < 0) {
+			LM_ERR("error calling t_replicate\n");
+			goto error;
+		}
+		node = node->next;
+	}
+	lock_release(&node_list->lock);
+	return 0;
+	lock_release(&node_list->lock);
+	return -1;
+ * @brief config file function to check if received message is from another DMQ node based on source IP
+ */
+int cfg_dmq_is_from_node(struct sip_msg* msg)
+	return is_from_remote_node(msg);
  * @brief pings the servers in the nodelist
diff --git a/modules/dmq/dmq_funcs.h b/modules/dmq/dmq_funcs.h
index a138788..55f67ac 100644
--- a/modules/dmq/dmq_funcs.h
+++ b/modules/dmq/dmq_funcs.h
@@ -50,6 +50,8 @@ int cfg_dmq_send_message(struct sip_msg* msg, char* peer, char* to,
 		char* body, char* content_type);
 int cfg_dmq_bcast_message(struct sip_msg* msg, char* peer, char* body, 
 		char* content_type);
+int cfg_dmq_t_replicate(struct sip_msg* msg, char* s);
+int cfg_dmq_is_from_node(struct sip_msg* msg);
 dmq_peer_t* register_dmq_peer(dmq_peer_t* peer);
 int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node,
 		dmq_resp_cback_t* resp_cback, int max_forwards, str* content_type);

More information about the sr-dev mailing list