[sr-dev] git:alexh/dialog-sync-wip: dialog: DMQ-sync dialogs with peers on startup

Alex Hermann alex at speakup.nl
Thu Sep 4 13:21:35 CEST 2014


Module: sip-router
Branch: alexh/dialog-sync-wip
Commit: 144737c482d39ae276e8a09f7dea01c5fa685bc6
URL:    http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=144737c482d39ae276e8a09f7dea01c5fa685bc6

Author: Alex Hermann <alex at speakup.nl>
Committer: Alex Hermann <alex at speakup.nl>
Date:   Tue Aug 26 16:26:03 2014 +0200

dialog: DMQ-sync dialogs with peers on startup

Use DMQ's init_callback() to request the peers to send all dialogs.

---

 modules/dialog/dlg_dmq.c        |   75 ++++++++++++++++++++++++++++++++++++++-
 modules/dialog/dlg_dmq.h        |    1 +
 modules/dmq/dmq.c               |    9 ++++-
 modules/dmq/notification_peer.c |   19 ++++++++--
 modules/dmq/notification_peer.h |    1 +
 5 files changed, 99 insertions(+), 6 deletions(-)

diff --git a/modules/dialog/dlg_dmq.c b/modules/dialog/dlg_dmq.c
index f659a70..bd43da7 100644
--- a/modules/dialog/dlg_dmq.c
+++ b/modules/dialog/dlg_dmq.c
@@ -35,6 +35,10 @@ dmq_api_t dlg_dmqb;
 dmq_peer_t* dlg_dmq_peer = NULL;
 dmq_resp_cback_t dlg_dmq_resp_callback = {&dlg_dmq_resp_callback_f, 0};
 
+int dmq_send_all_dlgs();
+int dlg_dmq_request_sync();
+
+
 /**
 * @brief add notification peer
 */
@@ -51,7 +55,7 @@ int dlg_dmq_initialize()
 	}
 
 	not_peer.callback = dlg_dmq_handle_msg;
-	not_peer.init_callback = NULL;
+	not_peer.init_callback = dlg_dmq_request_sync;
 	not_peer.description.s = "dialog";
 	not_peer.description.len = 6;
 	not_peer.peer_id.s = "dialog";
@@ -288,6 +292,10 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp)
 			unref++;
 			break;
 
+		case DLG_DMQ_SYNC:
+			dmq_send_all_dlgs();
+			break;
+
 		case DLG_DMQ_NONE:
 			break;
 	}
@@ -314,6 +322,46 @@ error:
 }
 
 
+int dlg_dmq_request_sync() {
+	srjson_doc_t jdoc;
+
+	LM_DBG("requesting sync from dmq peers\n");
+
+	srjson_InitDoc(&jdoc, NULL);
+
+	jdoc.root = srjson_CreateObject(&jdoc);
+	if(jdoc.root==NULL) {
+		LM_ERR("cannot create json root\n");
+		goto error;
+	}
+
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", DLG_DMQ_SYNC);
+	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
+	if(jdoc.buf.s==NULL) {
+		LM_ERR("unable to serialize data\n");
+		goto error;
+	}
+	jdoc.buf.len = strlen(jdoc.buf.s);
+	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
+	if (dlg_dmq_broadcast(&jdoc.buf)!=0) {
+		goto error;
+	}
+
+	jdoc.free_fn(jdoc.buf.s);
+	jdoc.buf.s = NULL;
+	srjson_DestroyDoc(&jdoc);
+	return 0;
+
+error:
+	if(jdoc.buf.s!=NULL) {
+		jdoc.free_fn(jdoc.buf.s);
+		jdoc.buf.s = NULL;
+	}
+	srjson_DestroyDoc(&jdoc);
+	return -1;
+}
+
+
 int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock) {
 
 	srjson_doc_t jdoc, prof_jdoc;
@@ -391,6 +439,7 @@ int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needl
 			break;
 
 		case DLG_DMQ_NONE:
+		case DLG_DMQ_SYNC:
 			break;
 	}
 	if (needlock)
@@ -422,6 +471,30 @@ error:
 }
 
 
+int dmq_send_all_dlgs() {
+	int index;
+	dlg_entry_t entry;
+	dlg_cell_t *dlg;
+
+	LM_DBG("sending all dialogs \n");
+
+	for(index = 0; index< d_table->size; index++){
+		/* lock the whole entry */
+		entry = (d_table->entries)[index];
+		dlg_lock( d_table, &entry);
+
+		for(dlg = entry.first; dlg != NULL; dlg = dlg->next){
+			dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC;
+			dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0);
+		}
+
+		dlg_unlock( d_table, &entry);
+	}
+
+	return 0;
+}
+
+
 /**
 * @brief dmq response callback
 */
diff --git a/modules/dialog/dlg_dmq.h b/modules/dialog/dlg_dmq.h
index ac38010..bd13757 100644
--- a/modules/dialog/dlg_dmq.h
+++ b/modules/dialog/dlg_dmq.h
@@ -38,6 +38,7 @@ typedef enum {
 	DLG_DMQ_UPDATE,
 	DLG_DMQ_STATE,
 	DLG_DMQ_RM,
+	DLG_DMQ_SYNC,
 } dlg_dmq_action_t;
 
 int dlg_dmq_initialize();
diff --git a/modules/dmq/dmq.c b/modules/dmq/dmq.c
index 30405f1..515ba49 100644
--- a/modules/dmq/dmq.c
+++ b/modules/dmq/dmq.c
@@ -229,7 +229,14 @@ static int mod_init(void)
 		LM_ERR("error in shm_malloc\n");
 		return -1;
 	}
-	
+
+	dmq_init_callback_done = shm_malloc(sizeof(int));
+	if (!dmq_init_callback_done) {
+		LM_ERR("no more shm\n");
+		return -1;
+	}
+	*dmq_init_callback_done = 0;
+
 	/**
 	 * add the dmq notification peer.
 	 * the dmq is a peer itself so that it can receive node notifications
diff --git a/modules/dmq/notification_peer.c b/modules/dmq/notification_peer.c
index e7704f0..75c1386 100644
--- a/modules/dmq/notification_peer.c
+++ b/modules/dmq/notification_peer.c
@@ -29,6 +29,9 @@
 str notification_content_type = str_init("text/plain");
 dmq_resp_cback_t notification_callback = {&notification_resp_callback_f, 0};
 
+int *dmq_init_callback_done;
+
+
 /**
  * @brief add notification peer
  */
@@ -186,7 +189,6 @@ int run_init_callbacks() {
  */
 int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp)
 {
-	static int firstrun = 1;
 	int nodes_recv;
 	str* response_body = NULL;
 	int maxforwards = 0;
@@ -223,9 +225,9 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp)
 				&notification_callback, maxforwards, &notification_content_type);
 	}
 	pkg_free(response_body);
-	if (firstrun) {
+	if (!*dmq_init_callback_done) {
+		*dmq_init_callback_done = 1;
 		run_init_callbacks();
-		firstrun = 0;
 	}
 	return 0;
 error:
@@ -312,8 +314,17 @@ int notification_resp_callback_f(struct sip_msg* msg, int code,
 		dmq_node_t* node, void* param)
 {
 	int ret;
+	int nodes_recv;
+
 	LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
-	if(code == 408) {
+	if(code == 200) {
+		nodes_recv = extract_node_list(node_list, msg);
+		LM_DBG("received %d new or changed nodes\n", nodes_recv);
+		if (!*dmq_init_callback_done) {
+			*dmq_init_callback_done = 1;
+			run_init_callbacks();
+		}
+	} else if(code == 408) {
 		/* deleting node - the server did not respond */
 		LM_ERR("deleting server %.*s because of failed request\n", STR_FMT(&node->orig_uri));
 		if (STR_EQ(node->orig_uri, dmq_notification_address)) {
diff --git a/modules/dmq/notification_peer.h b/modules/dmq/notification_peer.h
index 72df4ec..ff9871d 100644
--- a/modules/dmq/notification_peer.h
+++ b/modules/dmq/notification_peer.h
@@ -34,6 +34,7 @@
 #include "dmq_funcs.h"
 
 extern str notification_content_type;
+extern int *dmq_init_callback_done;
 
 int add_notification_peer();
 int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp);




More information about the sr-dev mailing list