Module: sip-router Branch: mariusbucur/dmq Commit: 78eea80ffdaf09dd6d247455d77c47a64eacae48 URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=78eea80f...
Author: Marius Bucur marius.bucur@1and1.ro Committer: Marius Bucur marius.bucur@1and1.ro Date: Wed Apr 6 20:12:00 2011 +0300
temporarily changed the presence module to test dmq functionality
---
modules_k/dmq/bind_dmq.h | 14 ++++++++ modules_k/dmq/dmq.c | 71 ++++++++++++++++++++++++----------------- modules_k/dmq/dmq.h | 30 +++++++++++++++++ modules_k/dmq/dmq_worker.h | 8 ----- modules_k/presence/presence.c | 29 +++++++++++++++++ 5 files changed, 115 insertions(+), 37 deletions(-)
diff --git a/modules_k/dmq/bind_dmq.h b/modules_k/dmq/bind_dmq.h index e69de29..9c515a4 100644 --- a/modules_k/dmq/bind_dmq.h +++ b/modules_k/dmq/bind_dmq.h @@ -0,0 +1,14 @@ +#ifndef BIND_DMQ_H +#define BIND_DMQ_H + +#include "peer.h" + +typedef struct dmq_api { + register_dmq_peer_t register_dmq_peer; +} dmq_api_t; + +typedef int (*bind_dmq_f)(dmq_api_t* api); + +int bind_dmq(dmq_api_t* api); + +#endif \ No newline at end of file diff --git a/modules_k/dmq/dmq.c b/modules_k/dmq/dmq.c index f5aae9a..90b0ac2 100644 --- a/modules_k/dmq/dmq.c +++ b/modules_k/dmq/dmq.c @@ -47,11 +47,11 @@ #include "../../lib/kmi/mi.h" #include "../../lib/kcore/hash_func.h" #include "dmq.h" +#include "peer.h" #include "bind_dmq.h" -#include "dmq_worker.h" +#include "worker.h" #include "../../mod_fix.h"
-static int mi_child_init(void); static int mod_init(void); static int child_init(int); static void destroy(void); @@ -74,15 +74,18 @@ sl_api_t slb;
/** module variables */ dmq_worker_t* workers; +dmq_peer_list_t* peer_list;
/** module functions */ static int mod_init(void); static int child_init(int); static void destroy(void); -static int fixup_dmq(void** param, int param_no); +static int handle_dmq_fixup(void** param, int param_no);
static cmd_export_t cmds[] = { - {"handle_dmq_message", (cmd_function)handle_dmq_message, 0, fixup_dmq, 0, + {"handle_dmq_message", (cmd_function)handle_dmq_message, 0, handle_dmq_fixup, 0, + REQUEST_ROUTE}, + {"bind_dmq", (cmd_function)bind_dmq, 0, 0, 0, REQUEST_ROUTE}, {0, 0, 0, 0, 0, 0} }; @@ -93,7 +96,6 @@ static param_export_t params[] = { };
static mi_export_t mi_cmds[] = { - {"cleanup", 0, 0, 0, mi_child_init}, {0, 0, 0, 0, 0} };
@@ -117,7 +119,6 @@ struct module_exports exports = { * init module function */ static int mod_init(void) { - int i = 0; if(register_mi_mod(exports.name, mi_cmds)!=0) { LM_ERR("failed to register MI commands\n"); @@ -136,23 +137,21 @@ static int mod_init(void) {
/* load all TM stuff */ if(load_tm_api(&tmb)==-1) { - LM_ERR("Can't load tm functions. Module TM not loaded?\n"); + LM_ERR("can't load tm functions. TM module probably not loaded\n"); return -1; } - /* fork worker processes */ + /* load peer list - the list containing the module callbacks for dmq */ + peer_list = init_peer_list(); + + /* register worker processes */ + register_procs(num_workers); + + /* allocate workers array */ workers = shm_malloc(num_workers * sizeof(*workers)); - for(i = 0; i < num_workers; i++) { - int newpid = fork_process(PROC_NOCHLDINIT, "DMQ WORKER", 0); - if(newpid < 0) { - LM_ERR("failed to form process\n"); - return -1; - } else if(newpid == 0) { - /* child */ - // worker loop - } else { - workers[i].pid = newpid; - } + if(workers == NULL) { + LM_ERR("error in shm_malloc\n"); + return -1; } startup_time = (int) time(NULL); @@ -163,26 +162,40 @@ static int mod_init(void) { * Initialize children */ static int child_init(int rank) { - if (rank==PROC_INIT || rank==PROC_MAIN || rank==PROC_TCP_MAIN) { + int i, newpid; + if (rank == PROC_MAIN) { + /* fork worker processes */ + for(i = 0; i < num_workers; i++) { + init_worker(&workers[i]); + LM_DBG("starting worker process %d\n", i); + newpid = fork_process(PROC_NOCHLDINIT, "DMQ WORKER", 0); + if(newpid < 0) { + LM_ERR("failed to form process\n"); + return -1; + } else if(newpid == 0) { + // child - this will loop forever + worker_loop(i); + } else { + workers[i].pid = newpid; + } + } + return 0; + } + if(rank == PROC_INIT || rank == PROC_TCP_MAIN) { /* do nothing for the main process */ return 0; }
pid = my_pid(); - - if(library_mode) - return 0; - - return 0; -} - -static int mi_child_init(void) { return 0; }
- /* * destroy function */ static void destroy(void) { } + +static int handle_dmq_fixup(void** param, int param_no) { + return 0; +} \ No newline at end of file diff --git a/modules_k/dmq/dmq.h b/modules_k/dmq/dmq.h index d4b13de..3bd9c84 100644 --- a/modules_k/dmq/dmq.h +++ b/modules_k/dmq/dmq.h @@ -1,3 +1,33 @@ +#ifndef DMQ_H +#define DMQ_H + +#include "../../dprint.h" +#include "../../error.h" +#include "../../sr_module.h" +#include "bind_dmq.h" +#include "peer.h" +#include "worker.h"
#define DEFAULT_NUM_WORKERS 2 + +extern int num_workers; +extern dmq_worker_t* workers; + +static inline int dmq_load_api(dmq_api_t* api) { + bind_dmq_f binddmq; + binddmq = (bind_dmq_f)find_export("bind_dmq", 0, 0); + if ( binddmq == 0) { + LM_ERR("cannot find bind_dmq\n"); + return -1; + } + if (binddmq(api) < 0) + { + LM_ERR("cannot bind dmq api\n"); + return -1; + } + return 0; +} + int handle_dmq_message(struct sip_msg* msg, char* str1 ,char* str2); + +#endif \ No newline at end of file diff --git a/modules_k/dmq/dmq_worker.h b/modules_k/dmq/dmq_worker.h deleted file mode 100644 index e20b075..0000000 --- a/modules_k/dmq/dmq_worker.h +++ /dev/null @@ -1,8 +0,0 @@ - - -struct dmq_worker { - void* queue; - int pid; -}; - -typedef struct dmq_worker dmq_worker_t; \ No newline at end of file diff --git a/modules_k/presence/presence.c b/modules_k/presence/presence.c index a1cb13f..e32e527 100644 --- a/modules_k/presence/presence.c +++ b/modules_k/presence/presence.c @@ -69,6 +69,7 @@ #include "../../lib/kmi/mi.h" #include "../../lib/kcore/hash_func.h" #include "../pua/hash.h" +#include "../dmq/dmq.h" #include "presence.h" #include "publish.h" #include "subscribe.h" @@ -105,6 +106,9 @@ char* to_tag_pref = "10"; struct tm_binds tmb; /* SL API structure */ sl_api_t slb; +/* dmq API structure */ +dmq_api_t dmq; +register_dmq_peer_t register_dmq;
/** module functions */
@@ -206,6 +210,22 @@ struct module_exports exports= { child_init /* per-child init function */ };
+int dmq_callback(struct sip_msg* msg) { + LM_ERR("it worked - dmq module triggered the presence callback [%ld %d]\n", time(0), my_pid()); + sleep(4); + return 0; +} + +static void add_dmq_peer() { + dmq_peer_t presence_peer; + presence_peer.peer_id.s = "presence"; + presence_peer.peer_id.len = 8; + presence_peer.description.s = "presence"; + presence_peer.description.len = 8; + presence_peer.callback = dmq_callback; + register_dmq(&presence_peer); +} + /** * init module function */ @@ -268,6 +288,15 @@ static int mod_init(void) return -1; } + if(dmq_load_api(&dmq) < 0) { + LM_ERR("cannot load dmq api\n"); + return -1; + } else { + register_dmq = dmq.register_dmq_peer; + add_dmq_peer(); + LM_DBG("presence-dmq loaded\n"); + } + if(db_url.s== NULL) { LM_ERR("database url not set!\n");