[sr-dev] git:mariusbucur/dmq: added support for binding the dmq module within another module.

Marius Ovidiu Bucur marius at marius-bucur.ro
Wed Apr 6 18:32:26 CEST 2011


Module: sip-router
Branch: mariusbucur/dmq
Commit: ba31d863c9475bafc7d6073e3a6ebdd0b40e207a
URL:    http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=ba31d863c9475bafc7d6073e3a6ebdd0b40e207a

Author: Marius Bucur <marius.bucur at 1and1.ro>
Committer: Marius Bucur <marius.bucur at 1and1.ro>
Date:   Wed Apr  6 19:30:31 2011 +0300

added support for binding the dmq module within another module.

also, finished the implementation for the dmq worker queues

---

 modules_k/dmq/bind_dmq.c |    7 +++
 modules_k/dmq/message.c  |   29 +++++++++++
 modules_k/dmq/message.h  |    2 +
 modules_k/dmq/peer.c     |   43 ++++++++++++++++
 modules_k/dmq/peer.h     |   35 +++++++++++++
 modules_k/dmq/worker.c   |  120 ++++++++++++++++++++++++++++++++++++++++++++++
 modules_k/dmq/worker.h   |   43 ++++++++++++++++
 7 files changed, 279 insertions(+), 0 deletions(-)

diff --git a/modules_k/dmq/bind_dmq.c b/modules_k/dmq/bind_dmq.c
new file mode 100644
index 0000000..6744753
--- /dev/null
+++ b/modules_k/dmq/bind_dmq.c
@@ -0,0 +1,7 @@
+#include "bind_dmq.h"
+#include "peer.h"
+
+int bind_dmq(dmq_api_t* api) {
+	api->register_dmq_peer = register_dmq_peer;
+	return 0;
+}
\ No newline at end of file
diff --git a/modules_k/dmq/message.c b/modules_k/dmq/message.c
new file mode 100644
index 0000000..55f385c
--- /dev/null
+++ b/modules_k/dmq/message.c
@@ -0,0 +1,29 @@
+#include "../../parser/parse_to.h"
+#include "../../parser/parse_uri.h" 
+#include "../../parser/parse_content.h"
+#include "../../parser/parse_from.h"
+#include "../../ut.h"
+#include "worker.h"
+#include "peer.h"
+#include "message.h"
+
+int handle_dmq_message(struct sip_msg* msg, char* str1, char* str2) {
+	dmq_peer_t* peer;
+	if ((parse_sip_msg_uri(msg) < 0) || (!msg->parsed_uri.user.s)) {
+			LM_ERR("cannot parse msg URI\n");
+			return -1;
+	}
+	LM_DBG("handle_dmq_message [%.*s %.*s] [%s %s]\n",
+	       msg->first_line.u.request.method.len, msg->first_line.u.request.method.s,
+	       msg->first_line.u.request.uri.len, msg->first_line.u.request.uri.s,
+	       ZSW(str1), ZSW(str2));
+	/* the peer id is given as the userinfo part of the request URI */
+	peer = find_peer(msg->parsed_uri.user);
+	if(!peer) {
+		LM_DBG("no peer found for %.*s\n", msg->parsed_uri.user.len, msg->parsed_uri.user.s);
+		return 0;
+	}
+	LM_DBG("handle_dmq_message peer found: %.*s\n", msg->parsed_uri.user.len, msg->parsed_uri.user.s);
+	add_dmq_job(msg, peer);
+	return 0;
+}
\ No newline at end of file
diff --git a/modules_k/dmq/message.h b/modules_k/dmq/message.h
new file mode 100644
index 0000000..7e0cb95
--- /dev/null
+++ b/modules_k/dmq/message.h
@@ -0,0 +1,2 @@
+
+int handle_dmq_message(struct sip_msg*, char*, char*);
\ No newline at end of file
diff --git a/modules_k/dmq/peer.c b/modules_k/dmq/peer.c
new file mode 100644
index 0000000..1ec177f
--- /dev/null
+++ b/modules_k/dmq/peer.c
@@ -0,0 +1,43 @@
+#include "peer.h"
+
+dmq_peer_list_t* init_peer_list() {
+	dmq_peer_list_t* peer_list = shm_malloc(sizeof(dmq_peer_list_t));
+	memset(peer_list, 0, sizeof(dmq_peer_list_t));
+	return peer_list;
+}
+
+dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
+	dmq_peer_t* cur = peer_list->peers;
+	int len;
+	while(cur) {
+		/* len - the minimum length of the two strings */
+		len = cur->peer_id.len < peer->peer_id.len ? cur->peer_id.len:peer->peer_id.len;
+		if(strncasecmp(cur->peer_id.s, peer->peer_id.s, len) == 0) {
+			return cur;
+		}
+	}
+	return 0;
+}
+
+void add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
+	dmq_peer_t* new_peer = shm_malloc(sizeof(dmq_peer_t));
+	*new_peer = *peer;
+	new_peer->next = peer_list->peers;
+	peer_list->peers = new_peer;
+}
+
+int register_dmq_peer(dmq_peer_t* peer) {
+	if(search_peer_list(peer_list, peer)) {
+		LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len, peer->peer_id.s,
+		       peer->description.len, peer->description.s);
+		return -1;
+	}
+	add_peer(peer_list, peer);
+	return 0;
+}
+
+dmq_peer_t* find_peer(str peer_id) {
+	dmq_peer_t foo_peer;
+	foo_peer.peer_id = peer_id;
+	return search_peer_list(peer_list, &foo_peer);
+}
\ No newline at end of file
diff --git a/modules_k/dmq/peer.h b/modules_k/dmq/peer.h
new file mode 100644
index 0000000..72c851f
--- /dev/null
+++ b/modules_k/dmq/peer.h
@@ -0,0 +1,35 @@
+#ifndef PEER_H
+#define PEER_H
+
+#include <string.h>
+#include <stdlib.h>
+#include "../../str.h"
+#include "../../mem/mem.h"
+#include "../../mem/shm_mem.h"
+#include "../../parser/msg_parser.h"
+
+typedef int(*peer_callback_t)(struct sip_msg*);
+
+typedef struct dmq_peer {
+	str peer_id;
+	str description;
+	peer_callback_t callback;
+	struct dmq_peer* next;
+} dmq_peer_t;
+
+typedef struct dmq_peer_list {
+	dmq_peer_t* peers;
+	int count;
+} dmq_peer_list_t;
+
+extern dmq_peer_list_t* peer_list;
+
+dmq_peer_list_t* init_peer_list();
+dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
+typedef int (*register_dmq_peer_t)(dmq_peer_t*);
+
+int register_dmq_peer(dmq_peer_t* peer);
+dmq_peer_t* find_peer(str peer_id);
+
+
+#endif
\ No newline at end of file
diff --git a/modules_k/dmq/worker.c b/modules_k/dmq/worker.c
new file mode 100644
index 0000000..6eaae21
--- /dev/null
+++ b/modules_k/dmq/worker.c
@@ -0,0 +1,120 @@
+#include "dmq.h"
+#include "worker.h"
+
+void worker_loop(int id) {
+	dmq_worker_t* worker = &workers[id];
+	dmq_job_t* current_job;
+	int ret_value;
+	for(;;) {
+		LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid());
+		lock_get(&worker->lock);
+		LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid());
+		/* multiple lock_release calls might be performed, so remove from queue until empty */
+		do {
+			current_job = job_queue_pop(worker->queue);
+			/* job_queue_pop might return NULL if queue is empty */
+			if(current_job) {
+				ret_value = current_job->f(current_job->msg);
+				if(ret_value < 0) {
+					LM_ERR("running job failed\n");
+				}
+				shm_free(current_job);
+				worker->jobs_processed++;
+			}
+		} while(job_queue_size(worker->queue) > 0);
+	}
+}
+
+int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) {
+	int i, found_available = 0;
+	dmq_job_t new_job;
+	dmq_worker_t* worker;
+	new_job.f = peer->callback;
+	new_job.msg = msg;
+	new_job.orig_peer = peer;
+	if(!num_workers) {
+		LM_ERR("error in add_dmq_job no workers spawned\n");
+		return -1;
+	}
+	/* initialize the worker with the first one */
+	worker = workers;
+	/* search for an available worker, or, if not possible, for the least busy one */
+	for(i = 0; i < num_workers; i++) {
+		if(job_queue_size(workers[i].queue) == 0) {
+			worker = &workers[i];
+			found_available = 1;
+			break;
+		} else if(job_queue_size(workers[i].queue) < job_queue_size(worker->queue)) {
+			worker = &workers[i];
+		}
+	}
+	if(!found_available) {
+		LM_DBG("no available worker found, passing job to the least busy one\n");
+	}
+	job_queue_push(worker->queue, &new_job);
+	lock_release(&worker->lock);
+	return 0;
+}
+
+void init_worker(dmq_worker_t* worker) {
+	memset(worker, 0, sizeof(*worker));
+	lock_init(&worker->lock);
+	// acquire the lock for the first time - so that dmq_worker_loop blocks
+	lock_get(&worker->lock);
+	worker->queue = alloc_job_queue();
+}
+
+job_queue_t* alloc_job_queue() {
+	job_queue_t* queue = shm_malloc(sizeof(job_queue_t));
+	atomic_set(&queue->count, 0);
+	queue->front = NULL;
+	queue->back = NULL;
+	lock_init(&queue->lock);
+	return queue;
+}
+
+void destroy_job_queue(job_queue_t* queue) {
+	shm_free(queue);
+}
+
+int job_queue_size(job_queue_t* queue) {
+	return atomic_get(&queue->count);
+}
+
+void job_queue_push(job_queue_t* queue, dmq_job_t* job) {
+	/* we need to copy the dmq_job into a newly created dmq_job in shm */
+	dmq_job_t* newjob = shm_malloc(sizeof(dmq_job_t));
+	*newjob = *job;
+	
+	lock_get(&queue->lock);
+	newjob->prev = NULL;
+	newjob->next = queue->back;
+	if(queue->back) {
+		queue->back->prev = newjob;
+	}
+	queue->back = newjob;
+	if(!queue->front) {
+		queue->front = newjob;
+	}
+	atomic_inc(&queue->count);
+	lock_release(&queue->lock);
+}
+dmq_job_t* job_queue_pop(job_queue_t* queue) {
+	dmq_job_t* front;
+	lock_get(&queue->lock);
+	if(!queue->front) {
+		lock_release(&queue->lock);
+		return NULL;
+	}
+	front = queue->front;
+	if(front->prev) {
+		queue->front = front->prev;
+		front->prev->next = NULL;
+	} else {
+		queue->front = NULL;
+		queue->back = NULL;
+	}
+	atomic_dec(&queue->count);
+	lock_release(&queue->lock);
+	return front;
+}
\ No newline at end of file
diff --git a/modules_k/dmq/worker.h b/modules_k/dmq/worker.h
new file mode 100644
index 0000000..61eda09
--- /dev/null
+++ b/modules_k/dmq/worker.h
@@ -0,0 +1,43 @@
+#ifndef DMQ_WORKER_H
+#define DMQ_WORKER_H
+
+#include "peer.h"
+#include "../../locking.h"
+#include "../../atomic_ops.h"
+#include "../../parser/msg_parser.h"
+
+typedef struct dmq_job {
+	peer_callback_t f;
+	struct sip_msg* msg;
+	dmq_peer_t* orig_peer;
+	struct dmq_job* next;
+	struct dmq_job* prev;
+} dmq_job_t;
+
+typedef struct job_queue {
+	atomic_t count;
+	struct dmq_job* back;
+	struct dmq_job* front;
+	gen_lock_t lock;
+} job_queue_t;
+
+struct dmq_worker {
+	job_queue_t* queue;
+	int jobs_processed;
+	gen_lock_t lock;
+	int pid;
+};
+
+typedef struct dmq_worker dmq_worker_t;
+
+void init_worker(dmq_worker_t* worker);
+int add_dmq_job(struct sip_msg*, dmq_peer_t*);
+void worker_loop(int id);
+
+job_queue_t* alloc_job_queue();
+void destroy_job_queue(job_queue_t* queue);
+void job_queue_push(job_queue_t* queue, dmq_job_t* job);
+dmq_job_t* job_queue_pop(job_queue_t* queue);
+int job_queue_size(job_queue_t* queue);
+
+#endif
\ No newline at end of file




More information about the sr-dev mailing list