[sr-dev] git:mariusbucur/dmq: added dmq notification peer changes

Marius Ovidiu Bucur marius at marius-bucur.ro
Fri Apr 8 18:33:37 CEST 2011


Module: sip-router
Branch: mariusbucur/dmq
Commit: 04fa5b387dd48f3751671b87e321438119f34d96
URL:    http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=04fa5b387dd48f3751671b87e321438119f34d96

Author: Marius Bucur <marius.bucur at 1and1.ro>
Committer: Marius Bucur <marius.bucur at 1and1.ro>
Date:   Fri Apr  8 19:33:11 2011 +0300

added dmq notification peer changes

---

 modules_k/dmq/dmq.c    |   14 ++++++++++++--
 modules_k/dmq/dmq.h    |    1 +
 modules_k/dmq/peer.c   |   11 +++++++++++
 modules_k/dmq/peer.h   |    1 +
 modules_k/dmq/worker.c |    3 ++-
 5 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/modules_k/dmq/dmq.c b/modules_k/dmq/dmq.c
index 90b0ac2..7920361 100644
--- a/modules_k/dmq/dmq.c
+++ b/modules_k/dmq/dmq.c
@@ -50,6 +50,7 @@
 #include "peer.h"
 #include "bind_dmq.h"
 #include "worker.h"
+#include "notification_peer.h"
 #include "../../mod_fix.h"
 
 static int mod_init(void);
@@ -75,6 +76,8 @@ sl_api_t slb;
 /** module variables */
 dmq_worker_t* workers;
 dmq_peer_list_t* peer_list;
+// the dmq module is a peer itself for receiving notifications regarding nodes
+dmq_peer_t dmq_notification_peer;
 
 /** module functions */
 static int mod_init(void);
@@ -144,7 +147,7 @@ static int mod_init(void) {
 	/* load peer list - the list containing the module callbacks for dmq */
 	peer_list = init_peer_list();
 	
-	/* register worker processes */
+	/* register worker processes - add one because of the ping process */
 	register_procs(num_workers);
 	
 	/* allocate workers array */
@@ -154,12 +157,14 @@ static int mod_init(void) {
 		return -1;
 	}
 	
+	/* add first dmq peer - the dmq module itself to receive peer notify messages */
+	
 	startup_time = (int) time(NULL);
 	return 0;
 }
 
 /**
- * Initialize children
+ * initialize children
  */
 static int child_init(int rank) {
   	int i, newpid;
@@ -179,6 +184,11 @@ static int child_init(int rank) {
 				workers[i].pid = newpid;
 			}
 		}
+		/**
+		 * add the dmq notification peer.
+		 * the dmq is a peer itself so that it can receive node notifications
+		 */
+		add_notification_peer();
 		return 0;
 	}
 	if(rank == PROC_INIT || rank == PROC_TCP_MAIN) {
diff --git a/modules_k/dmq/dmq.h b/modules_k/dmq/dmq.h
index 3bd9c84..6d0aac9 100644
--- a/modules_k/dmq/dmq.h
+++ b/modules_k/dmq/dmq.h
@@ -12,6 +12,7 @@
 
 extern int num_workers;
 extern dmq_worker_t* workers;
+extern dmq_peer_t dmq_notification_peer;
 
 static inline int dmq_load_api(dmq_api_t* api) {
 	bind_dmq_f binddmq;
diff --git a/modules_k/dmq/peer.c b/modules_k/dmq/peer.c
index 1ec177f..4215289 100644
--- a/modules_k/dmq/peer.c
+++ b/modules_k/dmq/peer.c
@@ -3,6 +3,7 @@
 dmq_peer_list_t* init_peer_list() {
 	dmq_peer_list_t* peer_list = shm_malloc(sizeof(dmq_peer_list_t));
 	memset(peer_list, 0, sizeof(dmq_peer_list_t));
+	lock_init(&peer_list->lock);
 	return peer_list;
 }
 
@@ -15,6 +16,7 @@ dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
 		if(strncasecmp(cur->peer_id.s, peer->peer_id.s, len) == 0) {
 			return cur;
 		}
+		cur = cur->next;
 	}
 	return 0;
 }
@@ -22,17 +24,26 @@ dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
 void add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
 	dmq_peer_t* new_peer = shm_malloc(sizeof(dmq_peer_t));
 	*new_peer = *peer;
+	
+	/* copy the str's */
+	new_peer->peer_id.s = shm_malloc(peer->peer_id.len);
+	memcpy(new_peer->peer_id.s, peer->peer_id.s, peer->peer_id.len);
+	new_peer->description.s = shm_malloc(peer->description.len);
+	memcpy(new_peer->peer_id.s, peer->peer_id.s, peer->peer_id.len);
+	
 	new_peer->next = peer_list->peers;
 	peer_list->peers = new_peer;
 }
 
 int register_dmq_peer(dmq_peer_t* peer) {
+	lock_get(&peer_list->lock);
 	if(search_peer_list(peer_list, peer)) {
 		LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len, peer->peer_id.s,
 		       peer->description.len, peer->description.s);
 		return -1;
 	}
 	add_peer(peer_list, peer);
+	lock_release(&peer_list->lock);
 	return 0;
 }
 
diff --git a/modules_k/dmq/peer.h b/modules_k/dmq/peer.h
index 72c851f..b9a09ee 100644
--- a/modules_k/dmq/peer.h
+++ b/modules_k/dmq/peer.h
@@ -18,6 +18,7 @@ typedef struct dmq_peer {
 } dmq_peer_t;
 
 typedef struct dmq_peer_list {
+	gen_lock_t lock;
 	dmq_peer_t* peers;
 	int count;
 } dmq_peer_list_t;
diff --git a/modules_k/dmq/worker.c b/modules_k/dmq/worker.c
index 6eaae21..3a3005c 100644
--- a/modules_k/dmq/worker.c
+++ b/modules_k/dmq/worker.c
@@ -49,7 +49,8 @@ int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) {
 		}
 	}
 	if(!found_available) {
-		LM_DBG("no available worker found, passing job to the least busy one\n");
+		LM_DBG("no available worker found, passing job to the least busy one [%d %d]\n",
+		       worker->pid, job_queue_size(worker->queue));
 	}
 	job_queue_push(worker->queue, &new_job);
 	lock_release(&worker->lock);




More information about the sr-dev mailing list