[sr-dev] git:mariusbucur/dmq: temporarily changed the presence module to test dmq functionality

Marius Ovidiu Bucur marius at marius-bucur.ro
Wed Apr 6 19:12:44 CEST 2011


Module: sip-router
Branch: mariusbucur/dmq
Commit: 78eea80ffdaf09dd6d247455d77c47a64eacae48
URL:    http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=78eea80ffdaf09dd6d247455d77c47a64eacae48

Author: Marius Bucur <marius.bucur at 1and1.ro>
Committer: Marius Bucur <marius.bucur at 1and1.ro>
Date:   Wed Apr  6 20:12:00 2011 +0300

temporarily changed the presence module to test dmq functionality

---

 modules_k/dmq/bind_dmq.h      |   14 ++++++++
 modules_k/dmq/dmq.c           |   71 ++++++++++++++++++++++++-----------------
 modules_k/dmq/dmq.h           |   30 +++++++++++++++++
 modules_k/dmq/dmq_worker.h    |    8 -----
 modules_k/presence/presence.c |   29 +++++++++++++++++
 5 files changed, 115 insertions(+), 37 deletions(-)

diff --git a/modules_k/dmq/bind_dmq.h b/modules_k/dmq/bind_dmq.h
index e69de29..9c515a4 100644
--- a/modules_k/dmq/bind_dmq.h
+++ b/modules_k/dmq/bind_dmq.h
@@ -0,0 +1,14 @@
+#ifndef BIND_DMQ_H
+#define BIND_DMQ_H
+
+#include "peer.h"
+
+typedef struct dmq_api {
+	register_dmq_peer_t register_dmq_peer;
+} dmq_api_t;
+
+typedef int (*bind_dmq_f)(dmq_api_t* api);
+
+int bind_dmq(dmq_api_t* api);
+
+#endif
\ No newline at end of file
diff --git a/modules_k/dmq/dmq.c b/modules_k/dmq/dmq.c
index f5aae9a..90b0ac2 100644
--- a/modules_k/dmq/dmq.c
+++ b/modules_k/dmq/dmq.c
@@ -47,11 +47,11 @@
 #include "../../lib/kmi/mi.h"
 #include "../../lib/kcore/hash_func.h"
 #include "dmq.h"
+#include "peer.h"
 #include "bind_dmq.h"
-#include "dmq_worker.h"
+#include "worker.h"
 #include "../../mod_fix.h"
 
-static int mi_child_init(void);
 static int mod_init(void);
 static int child_init(int);
 static void destroy(void);
@@ -74,15 +74,18 @@ sl_api_t slb;
 
 /** module variables */
 dmq_worker_t* workers;
+dmq_peer_list_t* peer_list;
 
 /** module functions */
 static int mod_init(void);
 static int child_init(int);
 static void destroy(void);
-static int fixup_dmq(void** param, int param_no);
+static int handle_dmq_fixup(void** param, int param_no);
 
 static cmd_export_t cmds[] = {
-	{"handle_dmq_message",  (cmd_function)handle_dmq_message, 0, fixup_dmq, 0, 
+	{"handle_dmq_message",  (cmd_function)handle_dmq_message, 0, handle_dmq_fixup, 0, 
+		REQUEST_ROUTE},
+	{"bind_dmq",        (cmd_function)bind_dmq, 0, 0, 0,
 		REQUEST_ROUTE},
 	{0, 0, 0, 0, 0, 0}
 };
@@ -93,7 +96,6 @@ static param_export_t params[] = {
 };
 
 static mi_export_t mi_cmds[] = {
-	{"cleanup", 0, 0, 0, mi_child_init},
 	{0, 0, 0, 0, 0}
 };
 
@@ -117,7 +119,6 @@ struct module_exports exports = {
  * init module function
  */
 static int mod_init(void) {
-	int i = 0;
 	
 	if(register_mi_mod(exports.name, mi_cmds)!=0) {
 		LM_ERR("failed to register MI commands\n");
@@ -136,23 +137,21 @@ static int mod_init(void) {
 
 	/* load all TM stuff */
 	if(load_tm_api(&tmb)==-1) {
-		LM_ERR("Can't load tm functions. Module TM not loaded?\n");
+		LM_ERR("can't load tm functions. TM module probably not loaded\n");
 		return -1;
 	}
 	
-	/* fork worker processes */
+	/* load peer list - the list containing the module callbacks for dmq */
+	peer_list = init_peer_list();
+	
+	/* register worker processes */
+	register_procs(num_workers);
+	
+	/* allocate workers array */
 	workers = shm_malloc(num_workers * sizeof(*workers));
-	for(i = 0; i < num_workers; i++) {
-		int newpid = fork_process(PROC_NOCHLDINIT, "DMQ WORKER", 0);
-		if(newpid < 0) {
-			LM_ERR("failed to form process\n");
-			return -1;
-		} else if(newpid == 0) {
-			/* child */
-			// worker loop
-		} else {
-			workers[i].pid = newpid;
-		}
+	if(workers == NULL) {
+		LM_ERR("error in shm_malloc\n");
+		return -1;
 	}
 	
 	startup_time = (int) time(NULL);
@@ -163,26 +162,40 @@ static int mod_init(void) {
  * Initialize children
  */
 static int child_init(int rank) {
-	if (rank==PROC_INIT || rank==PROC_MAIN || rank==PROC_TCP_MAIN) {
+  	int i, newpid;
+	if (rank == PROC_MAIN) {
+		/* fork worker processes */
+		for(i = 0; i < num_workers; i++) {
+			init_worker(&workers[i]);
+			LM_DBG("starting worker process %d\n", i);
+			newpid = fork_process(PROC_NOCHLDINIT, "DMQ WORKER", 0);
+			if(newpid < 0) {
+				LM_ERR("failed to form process\n");
+				return -1;
+			} else if(newpid == 0) {
+				// child - this will loop forever
+				worker_loop(i);
+			} else {
+				workers[i].pid = newpid;
+			}
+		}
+		return 0;
+	}
+	if(rank == PROC_INIT || rank == PROC_TCP_MAIN) {
 		/* do nothing for the main process */
 		return 0;
 	}
 
 	pid = my_pid();
-	
-	if(library_mode)
-		return 0;
-
-	return 0;
-}
-
-static int mi_child_init(void) {
 	return 0;
 }
 
-
 /*
  * destroy function
  */
 static void destroy(void) {
 }
+
+static int handle_dmq_fixup(void** param, int param_no) {
+ 	return 0;
+}
\ No newline at end of file
diff --git a/modules_k/dmq/dmq.h b/modules_k/dmq/dmq.h
index d4b13de..3bd9c84 100644
--- a/modules_k/dmq/dmq.h
+++ b/modules_k/dmq/dmq.h
@@ -1,3 +1,33 @@
+#ifndef DMQ_H
+#define DMQ_H
+
+#include "../../dprint.h"
+#include "../../error.h"
+#include "../../sr_module.h"
+#include "bind_dmq.h"
+#include "peer.h"
+#include "worker.h"
 
 #define DEFAULT_NUM_WORKERS	2
+
+extern int num_workers;
+extern dmq_worker_t* workers;
+
+static inline int dmq_load_api(dmq_api_t* api) {
+	bind_dmq_f binddmq;
+	binddmq = (bind_dmq_f)find_export("bind_dmq", 0, 0);
+	if ( binddmq == 0) {
+		LM_ERR("cannot find bind_dmq\n");
+		return -1;
+	}
+	if (binddmq(api) < 0)
+	{
+		LM_ERR("cannot bind dmq api\n");
+		return -1;
+	}
+	return 0;
+}
+
 int handle_dmq_message(struct sip_msg* msg, char* str1 ,char* str2);
+
+#endif
\ No newline at end of file
diff --git a/modules_k/dmq/dmq_worker.h b/modules_k/dmq/dmq_worker.h
deleted file mode 100644
index e20b075..0000000
--- a/modules_k/dmq/dmq_worker.h
+++ /dev/null
@@ -1,8 +0,0 @@
-
-
-struct dmq_worker {
-	void* queue;
-	int pid;
-};
-
-typedef struct dmq_worker dmq_worker_t;
\ No newline at end of file
diff --git a/modules_k/presence/presence.c b/modules_k/presence/presence.c
index a1cb13f..e32e527 100644
--- a/modules_k/presence/presence.c
+++ b/modules_k/presence/presence.c
@@ -69,6 +69,7 @@
 #include "../../lib/kmi/mi.h"
 #include "../../lib/kcore/hash_func.h"
 #include "../pua/hash.h"
+#include "../dmq/dmq.h"
 #include "presence.h"
 #include "publish.h"
 #include "subscribe.h"
@@ -105,6 +106,9 @@ char* to_tag_pref = "10";
 struct tm_binds tmb;
 /* SL API structure */
 sl_api_t slb;
+/* dmq API structure */
+dmq_api_t dmq;
+register_dmq_peer_t register_dmq;
 
 /** module functions */
 
@@ -206,6 +210,22 @@ struct module_exports exports= {
 	child_init                  	/* per-child init function */
 };
 
+int dmq_callback(struct sip_msg* msg) {
+	LM_ERR("it worked - dmq module triggered the presence callback [%ld %d]\n", time(0), my_pid());
+	sleep(4);
+	return 0;
+}
+
+static void add_dmq_peer() {
+	dmq_peer_t presence_peer;
+	presence_peer.peer_id.s = "presence";
+	presence_peer.peer_id.len = 8;
+	presence_peer.description.s = "presence";
+	presence_peer.description.len = 8;
+	presence_peer.callback = dmq_callback;
+	register_dmq(&presence_peer);
+}
+
 /**
  * init module function
  */
@@ -268,6 +288,15 @@ static int mod_init(void)
 		return -1;
 	}
 	
+	if(dmq_load_api(&dmq) < 0) {
+		LM_ERR("cannot load dmq api\n");
+		return -1;
+	} else {
+		register_dmq = dmq.register_dmq_peer;
+		add_dmq_peer();
+		LM_DBG("presence-dmq loaded\n");
+	}
+	
 	if(db_url.s== NULL)
 	{
 		LM_ERR("database url not set!\n");




More information about the sr-dev mailing list