[sr-dev] git:master: mi_rpc: support for async mi commands

Andrei Pelinescu-Onciul andrei at iptel.org
Thu Sep 24 18:21:55 CEST 2009


Module: sip-router
Branch: master
Commit: 244d4d4729b295be999acb3a4ca4cf156a9bfbbf
URL:    http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=244d4d4729b295be999acb3a4ca4cf156a9bfbbf

Author: Andrei Pelinescu-Onciul <andrei at iptel.org>
Committer: Andrei Pelinescu-Onciul <andrei at iptel.org>
Date:   Thu Sep 24 18:06:22 2009 +0200

mi_rpc: support for async mi commands

async mi commands can now be executed, if the underlying rpc
transport module supports it (e.g. xmlrpc).

---

 modules/mi_rpc/mi_rpc_mod.c |  135 ++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 120 insertions(+), 15 deletions(-)

diff --git a/modules/mi_rpc/mi_rpc_mod.c b/modules/mi_rpc/mi_rpc_mod.c
index 9c68839..b678f25 100644
--- a/modules/mi_rpc/mi_rpc_mod.c
+++ b/modules/mi_rpc/mi_rpc_mod.c
@@ -93,7 +93,7 @@ struct mi_root *mi_rpc_read_params(rpc_t *rpc, void *ctx)
 	{
 		name.s   = 0;
 		name.len = 0;
-		
+
 		if(value.len>=2 && value.s[0]=='-' && value.s[1]=='-')
 		{
 			/* name */
@@ -256,12 +256,66 @@ static int mi_rpc_print_tree(rpc_t* rpc, void* ctx, struct mi_root *tree,
 	return 0;
 }
 
+
+
+/* structure used to pack the rpc dyn. ctx and the print mode */
+struct mi_rpc_handler_param{
+	rpc_delayed_ctx_t* dctx;
+	enum mi_rpc_print_mode mode;
+};
+
+/* send reply and close async context */
+static void mi_rpc_async_close(struct mi_root* mi_rpl,
+									struct mi_handler* mi_h,
+									int done)
+{
+	rpc_delayed_ctx_t* dctx;
+	rpc_t* rpc;
+	void* c;
+	enum mi_rpc_print_mode mode;
+	
+	dctx=0;
+	if (done){
+		if (mi_h->param==0){
+			BUG("null param\n");
+			shm_free(mi_h);
+			goto error;
+		}
+		dctx=((struct mi_rpc_handler_param*)mi_h->param)->dctx;
+		if (dctx==0){
+			BUG("null dctx\n");
+			shm_free(mi_h->param);
+			shm_free(mi_h);
+			mi_h->param=0;
+			goto error;
+		}
+		mode=((struct mi_rpc_handler_param*)mi_h->param)->mode;
+		rpc=&dctx->rpc;
+		c=dctx->reply_ctx;
+		
+		mi_rpc_print_tree(rpc, c, mi_rpl, mode);
+		
+		rpc->delayed_ctx_close(dctx);
+		shm_free(mi_h->param);
+		mi_h->param=0;
+		shm_free(mi_h);
+	} /* else: no provisional support => do nothing */
+error:
+	if (mi_rpl)
+		free_mi_tree(mi_rpl);
+	return;
+}
+
+
+
 static void rpc_mi_exec(rpc_t *rpc, void *ctx, enum mi_rpc_print_mode mode)
 {
 	str cmd;
 	struct mi_cmd *mic;
 	struct mi_root *mi_req;
 	struct mi_root *mi_rpl;
+	struct mi_handler* mi_async_h;
+	struct mi_rpc_handler_param* mi_handler_param;
 
 	if (rpc->scan(ctx, "S", &cmd) < 1)
 	{
@@ -269,7 +323,11 @@ static void rpc_mi_exec(rpc_t *rpc, void *ctx, enum mi_rpc_print_mode mode)
 		rpc->fault(ctx, 500, "command parameter missing");
 		return;
 	}
-
+	
+	mi_async_h=0;
+	mi_req = 0;
+	mi_rpl=0;
+	
 	mic = lookup_mi_cmd(cmd.s, cmd.len);
 	if(mic==0)
 	{
@@ -280,12 +338,15 @@ static void rpc_mi_exec(rpc_t *rpc, void *ctx, enum mi_rpc_print_mode mode)
 
 	if (mic->flags&MI_ASYNC_RPL_FLAG)
 	{
-		LM_ERR("async mi cmd support not implemented yet\n");
-		rpc->fault(ctx, 500, "async my cmd not implemented yet");
-		return;
+		if (rpc->capabilities==0 ||
+				!(rpc->capabilities(ctx) & RPC_DELAYED_REPLY))
+		{
+			rpc->fault(ctx, 500,
+							"this rpc transport does not support async mode");
+			return;
+		}
 	}
 
-	mi_req = 0;
 	if(!(mic->flags&MI_NO_INPUT_FLAG))
 	{
 		mi_req = mi_rpc_read_params(rpc, ctx);
@@ -293,29 +354,73 @@ static void rpc_mi_exec(rpc_t *rpc, void *ctx, enum mi_rpc_print_mode mode)
 		{
 			LM_ERR("cannot parse parameters\n");
 			rpc->fault(ctx, 500, "cannot parse parameters");
-			return;
+			goto error;
 		}
+		if (mic->flags&MI_ASYNC_RPL_FLAG)
+		{
+			/* build mi async handler */
+			mi_handler_param=shm_malloc(sizeof(*mi_handler_param));
+			if (mi_handler_param==0){
+				rpc->fault(ctx, 500, "out of memory");
+				return;
+			}
+			mi_async_h=shm_malloc(sizeof(*mi_async_h));
+			if (mi_async_h==0){
+				shm_free(mi_handler_param);
+				mi_handler_param=0;
+				rpc->fault(ctx, 500, "out of memory");
+				return;
+			}
+			memset(mi_async_h, 0, sizeof(*mi_async_h));
+			mi_async_h->handler_f=mi_rpc_async_close;
+			mi_handler_param->mode=mode;
+			mi_handler_param->dctx=rpc->delayed_ctx_new(ctx);
+			if (mi_handler_param->dctx==0){
+				rpc->fault(ctx, 500, "internal error: async ctx"
+										" creation failed");
+				goto error;
+			}
+			/* switch context, since replies are not allowed anymore on the
+			   original one */
+			rpc=&mi_handler_param->dctx->rpc;
+			ctx=mi_handler_param->dctx->reply_ctx;
+			mi_async_h->param=mi_handler_param;
+		}
+		mi_req->async_hdl=mi_async_h;
 	}
 	mi_rpl=run_mi_cmd(mic, mi_req);
 
 	if(mi_rpl == 0)
 	{
 		rpc->fault(ctx, 500, "execution failed");
-		if (mi_req) free_mi_tree(mi_req);
-		return;
+		goto error;
 	}
 
 	if (mi_rpl!=MI_ROOT_ASYNC_RPL)
 	{
 		mi_rpc_print_tree(rpc, ctx, mi_rpl, mode);
+		goto end;
+	}else if (mi_async_h==0){
+		/* async reply, but command not listed as async */
+		rpc->fault(ctx, 500, "bad mi command: unexpected async reply");
+		goto error;
+	}
+	mi_async_h=0; /* don't delete it */
+end:
+error:
+	if (mi_req)
+		free_mi_tree(mi_req);
+	if (mi_rpl && mi_rpl!=MI_ROOT_ASYNC_RPL)
 		free_mi_tree(mi_rpl);
-		if (mi_req) free_mi_tree(mi_req);
-		return;
+	if (mi_async_h){
+		if (mi_async_h->param){
+			if (((struct mi_rpc_handler_param*)mi_async_h->param)->dctx)
+				rpc->delayed_ctx_close(((struct mi_rpc_handler_param*)
+											mi_async_h->param)->dctx);
+			shm_free(mi_async_h->param);
+		}
+		shm_free(mi_async_h);
 	}
-
-	/* async cmd -- not yet */
-	rpc->fault(ctx, 500, "no async handling yet");
-	if (mi_req) free_mi_tree(mi_req);
 	return;
 }
 




More information about the sr-dev mailing list