Module: sip-router Branch: mariusbucur/dmq Commit: ba31d863c9475bafc7d6073e3a6ebdd0b40e207a URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=ba31d863...
Author: Marius Bucur marius.bucur@1and1.ro Committer: Marius Bucur marius.bucur@1and1.ro Date: Wed Apr 6 19:30:31 2011 +0300
added support for binding the dmq module within another module.
also, finished the implementation for the dmq worker queues
---
modules_k/dmq/bind_dmq.c | 7 +++ modules_k/dmq/message.c | 29 +++++++++++ modules_k/dmq/message.h | 2 + modules_k/dmq/peer.c | 43 ++++++++++++++++ modules_k/dmq/peer.h | 35 +++++++++++++ modules_k/dmq/worker.c | 120 ++++++++++++++++++++++++++++++++++++++++++++++ modules_k/dmq/worker.h | 43 ++++++++++++++++ 7 files changed, 279 insertions(+), 0 deletions(-)
diff --git a/modules_k/dmq/bind_dmq.c b/modules_k/dmq/bind_dmq.c new file mode 100644 index 0000000..6744753 --- /dev/null +++ b/modules_k/dmq/bind_dmq.c @@ -0,0 +1,7 @@ +#include "bind_dmq.h" +#include "peer.h" + +int bind_dmq(dmq_api_t* api) { + api->register_dmq_peer = register_dmq_peer; + return 0; +} \ No newline at end of file diff --git a/modules_k/dmq/message.c b/modules_k/dmq/message.c new file mode 100644 index 0000000..55f385c --- /dev/null +++ b/modules_k/dmq/message.c @@ -0,0 +1,29 @@ +#include "../../parser/parse_to.h" +#include "../../parser/parse_uri.h" +#include "../../parser/parse_content.h" +#include "../../parser/parse_from.h" +#include "../../ut.h" +#include "worker.h" +#include "peer.h" +#include "message.h" + +int handle_dmq_message(struct sip_msg* msg, char* str1, char* str2) { + dmq_peer_t* peer; + if ((parse_sip_msg_uri(msg) < 0) || (!msg->parsed_uri.user.s)) { + LM_ERR("cannot parse msg URI\n"); + return -1; + } + LM_DBG("handle_dmq_message [%.*s %.*s] [%s %s]\n", + msg->first_line.u.request.method.len, msg->first_line.u.request.method.s, + msg->first_line.u.request.uri.len, msg->first_line.u.request.uri.s, + ZSW(str1), ZSW(str2)); + /* the peer id is given as the userinfo part of the request URI */ + peer = find_peer(msg->parsed_uri.user); + if(!peer) { + LM_DBG("no peer found for %.*s\n", msg->parsed_uri.user.len, msg->parsed_uri.user.s); + return 0; + } + LM_DBG("handle_dmq_message peer found: %.*s\n", msg->parsed_uri.user.len, msg->parsed_uri.user.s); + add_dmq_job(msg, peer); + return 0; +} \ No newline at end of file diff --git a/modules_k/dmq/message.h b/modules_k/dmq/message.h new file mode 100644 index 0000000..7e0cb95 --- /dev/null +++ b/modules_k/dmq/message.h @@ -0,0 +1,2 @@ + +int handle_dmq_message(struct sip_msg*, char*, char*); \ No newline at end of file diff --git a/modules_k/dmq/peer.c b/modules_k/dmq/peer.c new file mode 100644 index 0000000..1ec177f --- /dev/null +++ b/modules_k/dmq/peer.c @@ -0,0 +1,43 @@ +#include "peer.h" + +dmq_peer_list_t* init_peer_list() { + dmq_peer_list_t* peer_list = shm_malloc(sizeof(dmq_peer_list_t)); + memset(peer_list, 0, sizeof(dmq_peer_list_t)); + return peer_list; +} + +dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) { + dmq_peer_t* cur = peer_list->peers; + int len; + while(cur) { + /* len - the minimum length of the two strings */ + len = cur->peer_id.len < peer->peer_id.len ? cur->peer_id.len:peer->peer_id.len; + if(strncasecmp(cur->peer_id.s, peer->peer_id.s, len) == 0) { + return cur; + } + } + return 0; +} + +void add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) { + dmq_peer_t* new_peer = shm_malloc(sizeof(dmq_peer_t)); + *new_peer = *peer; + new_peer->next = peer_list->peers; + peer_list->peers = new_peer; +} + +int register_dmq_peer(dmq_peer_t* peer) { + if(search_peer_list(peer_list, peer)) { + LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len, peer->peer_id.s, + peer->description.len, peer->description.s); + return -1; + } + add_peer(peer_list, peer); + return 0; +} + +dmq_peer_t* find_peer(str peer_id) { + dmq_peer_t foo_peer; + foo_peer.peer_id = peer_id; + return search_peer_list(peer_list, &foo_peer); +} \ No newline at end of file diff --git a/modules_k/dmq/peer.h b/modules_k/dmq/peer.h new file mode 100644 index 0000000..72c851f --- /dev/null +++ b/modules_k/dmq/peer.h @@ -0,0 +1,35 @@ +#ifndef PEER_H +#define PEER_H + +#include <string.h> +#include <stdlib.h> +#include "../../str.h" +#include "../../mem/mem.h" +#include "../../mem/shm_mem.h" +#include "../../parser/msg_parser.h" + +typedef int(*peer_callback_t)(struct sip_msg*); + +typedef struct dmq_peer { + str peer_id; + str description; + peer_callback_t callback; + struct dmq_peer* next; +} dmq_peer_t; + +typedef struct dmq_peer_list { + dmq_peer_t* peers; + int count; +} dmq_peer_list_t; + +extern dmq_peer_list_t* peer_list; + +dmq_peer_list_t* init_peer_list(); +dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer); +typedef int (*register_dmq_peer_t)(dmq_peer_t*); + +int register_dmq_peer(dmq_peer_t* peer); +dmq_peer_t* find_peer(str peer_id); + + +#endif \ No newline at end of file diff --git a/modules_k/dmq/worker.c b/modules_k/dmq/worker.c new file mode 100644 index 0000000..6eaae21 --- /dev/null +++ b/modules_k/dmq/worker.c @@ -0,0 +1,120 @@ +#include "dmq.h" +#include "worker.h" + +void worker_loop(int id) { + dmq_worker_t* worker = &workers[id]; + dmq_job_t* current_job; + int ret_value; + for(;;) { + LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid()); + lock_get(&worker->lock); + LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid()); + /* multiple lock_release calls might be performed, so remove from queue until empty */ + do { + current_job = job_queue_pop(worker->queue); + /* job_queue_pop might return NULL if queue is empty */ + if(current_job) { + ret_value = current_job->f(current_job->msg); + if(ret_value < 0) { + LM_ERR("running job failed\n"); + } + shm_free(current_job); + worker->jobs_processed++; + } + } while(job_queue_size(worker->queue) > 0); + } +} + +int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) { + int i, found_available = 0; + dmq_job_t new_job; + dmq_worker_t* worker; + new_job.f = peer->callback; + new_job.msg = msg; + new_job.orig_peer = peer; + if(!num_workers) { + LM_ERR("error in add_dmq_job no workers spawned\n"); + return -1; + } + /* initialize the worker with the first one */ + worker = workers; + /* search for an available worker, or, if not possible, for the least busy one */ + for(i = 0; i < num_workers; i++) { + if(job_queue_size(workers[i].queue) == 0) { + worker = &workers[i]; + found_available = 1; + break; + } else if(job_queue_size(workers[i].queue) < job_queue_size(worker->queue)) { + worker = &workers[i]; + } + } + if(!found_available) { + LM_DBG("no available worker found, passing job to the least busy one\n"); + } + job_queue_push(worker->queue, &new_job); + lock_release(&worker->lock); + return 0; +} + +void init_worker(dmq_worker_t* worker) { + memset(worker, 0, sizeof(*worker)); + lock_init(&worker->lock); + // acquire the lock for the first time - so that dmq_worker_loop blocks + lock_get(&worker->lock); + worker->queue = alloc_job_queue(); +} + +job_queue_t* alloc_job_queue() { + job_queue_t* queue = shm_malloc(sizeof(job_queue_t)); + atomic_set(&queue->count, 0); + queue->front = NULL; + queue->back = NULL; + lock_init(&queue->lock); + return queue; +} + +void destroy_job_queue(job_queue_t* queue) { + shm_free(queue); +} + +int job_queue_size(job_queue_t* queue) { + return atomic_get(&queue->count); +} + +void job_queue_push(job_queue_t* queue, dmq_job_t* job) { + /* we need to copy the dmq_job into a newly created dmq_job in shm */ + dmq_job_t* newjob = shm_malloc(sizeof(dmq_job_t)); + *newjob = *job; + + lock_get(&queue->lock); + newjob->prev = NULL; + newjob->next = queue->back; + if(queue->back) { + queue->back->prev = newjob; + } + queue->back = newjob; + if(!queue->front) { + queue->front = newjob; + } + atomic_inc(&queue->count); + lock_release(&queue->lock); +} +dmq_job_t* job_queue_pop(job_queue_t* queue) { + dmq_job_t* front; + lock_get(&queue->lock); + if(!queue->front) { + lock_release(&queue->lock); + return NULL; + } + front = queue->front; + if(front->prev) { + queue->front = front->prev; + front->prev->next = NULL; + } else { + queue->front = NULL; + queue->back = NULL; + } + atomic_dec(&queue->count); + lock_release(&queue->lock); + return front; +} \ No newline at end of file diff --git a/modules_k/dmq/worker.h b/modules_k/dmq/worker.h new file mode 100644 index 0000000..61eda09 --- /dev/null +++ b/modules_k/dmq/worker.h @@ -0,0 +1,43 @@ +#ifndef DMQ_WORKER_H +#define DMQ_WORKER_H + +#include "peer.h" +#include "../../locking.h" +#include "../../atomic_ops.h" +#include "../../parser/msg_parser.h" + +typedef struct dmq_job { + peer_callback_t f; + struct sip_msg* msg; + dmq_peer_t* orig_peer; + struct dmq_job* next; + struct dmq_job* prev; +} dmq_job_t; + +typedef struct job_queue { + atomic_t count; + struct dmq_job* back; + struct dmq_job* front; + gen_lock_t lock; +} job_queue_t; + +struct dmq_worker { + job_queue_t* queue; + int jobs_processed; + gen_lock_t lock; + int pid; +}; + +typedef struct dmq_worker dmq_worker_t; + +void init_worker(dmq_worker_t* worker); +int add_dmq_job(struct sip_msg*, dmq_peer_t*); +void worker_loop(int id); + +job_queue_t* alloc_job_queue(); +void destroy_job_queue(job_queue_t* queue); +void job_queue_push(job_queue_t* queue, dmq_job_t* job); +dmq_job_t* job_queue_pop(job_queue_t* queue); +int job_queue_size(job_queue_t* queue); + +#endif \ No newline at end of file