Module: sip-router
Branch: master
Commit: 244d4d4729b295be999acb3a4ca4cf156a9bfbbf
URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=244d4d4…
Author: Andrei Pelinescu-Onciul <andrei(a)iptel.org>
Committer: Andrei Pelinescu-Onciul <andrei(a)iptel.org>
Date: Thu Sep 24 18:06:22 2009 +0200
mi_rpc: support for async mi commands
async mi commands can now be executed, if the underlying rpc
transport module supports it (e.g. xmlrpc).
---
modules/mi_rpc/mi_rpc_mod.c | 135 ++++++++++++++++++++++++++++++++++++++-----
1 files changed, 120 insertions(+), 15 deletions(-)
diff --git a/modules/mi_rpc/mi_rpc_mod.c b/modules/mi_rpc/mi_rpc_mod.c
index 9c68839..b678f25 100644
--- a/modules/mi_rpc/mi_rpc_mod.c
+++ b/modules/mi_rpc/mi_rpc_mod.c
@@ -93,7 +93,7 @@ struct mi_root *mi_rpc_read_params(rpc_t *rpc, void *ctx)
{
name.s = 0;
name.len = 0;
-
+
if(value.len>=2 && value.s[0]=='-' &&
value.s[1]=='-')
{
/* name */
@@ -256,12 +256,66 @@ static int mi_rpc_print_tree(rpc_t* rpc, void* ctx, struct mi_root
*tree,
return 0;
}
+
+
+/* structure used to pack the rpc dyn. ctx and the print mode */
+struct mi_rpc_handler_param{
+ rpc_delayed_ctx_t* dctx;
+ enum mi_rpc_print_mode mode;
+};
+
+/* send reply and close async context */
+static void mi_rpc_async_close(struct mi_root* mi_rpl,
+ struct mi_handler* mi_h,
+ int done)
+{
+ rpc_delayed_ctx_t* dctx;
+ rpc_t* rpc;
+ void* c;
+ enum mi_rpc_print_mode mode;
+
+ dctx=0;
+ if (done){
+ if (mi_h->param==0){
+ BUG("null param\n");
+ shm_free(mi_h);
+ goto error;
+ }
+ dctx=((struct mi_rpc_handler_param*)mi_h->param)->dctx;
+ if (dctx==0){
+ BUG("null dctx\n");
+ shm_free(mi_h->param);
+ shm_free(mi_h);
+ mi_h->param=0;
+ goto error;
+ }
+ mode=((struct mi_rpc_handler_param*)mi_h->param)->mode;
+ rpc=&dctx->rpc;
+ c=dctx->reply_ctx;
+
+ mi_rpc_print_tree(rpc, c, mi_rpl, mode);
+
+ rpc->delayed_ctx_close(dctx);
+ shm_free(mi_h->param);
+ mi_h->param=0;
+ shm_free(mi_h);
+ } /* else: no provisional support => do nothing */
+error:
+ if (mi_rpl)
+ free_mi_tree(mi_rpl);
+ return;
+}
+
+
+
static void rpc_mi_exec(rpc_t *rpc, void *ctx, enum mi_rpc_print_mode mode)
{
str cmd;
struct mi_cmd *mic;
struct mi_root *mi_req;
struct mi_root *mi_rpl;
+ struct mi_handler* mi_async_h;
+ struct mi_rpc_handler_param* mi_handler_param;
if (rpc->scan(ctx, "S", &cmd) < 1)
{
@@ -269,7 +323,11 @@ static void rpc_mi_exec(rpc_t *rpc, void *ctx, enum mi_rpc_print_mode
mode)
rpc->fault(ctx, 500, "command parameter missing");
return;
}
-
+
+ mi_async_h=0;
+ mi_req = 0;
+ mi_rpl=0;
+
mic = lookup_mi_cmd(cmd.s, cmd.len);
if(mic==0)
{
@@ -280,12 +338,15 @@ static void rpc_mi_exec(rpc_t *rpc, void *ctx, enum
mi_rpc_print_mode mode)
if (mic->flags&MI_ASYNC_RPL_FLAG)
{
- LM_ERR("async mi cmd support not implemented yet\n");
- rpc->fault(ctx, 500, "async my cmd not implemented yet");
- return;
+ if (rpc->capabilities==0 ||
+ !(rpc->capabilities(ctx) & RPC_DELAYED_REPLY))
+ {
+ rpc->fault(ctx, 500,
+ "this rpc transport does not support async mode");
+ return;
+ }
}
- mi_req = 0;
if(!(mic->flags&MI_NO_INPUT_FLAG))
{
mi_req = mi_rpc_read_params(rpc, ctx);
@@ -293,29 +354,73 @@ static void rpc_mi_exec(rpc_t *rpc, void *ctx, enum
mi_rpc_print_mode mode)
{
LM_ERR("cannot parse parameters\n");
rpc->fault(ctx, 500, "cannot parse parameters");
- return;
+ goto error;
}
+ if (mic->flags&MI_ASYNC_RPL_FLAG)
+ {
+ /* build mi async handler */
+ mi_handler_param=shm_malloc(sizeof(*mi_handler_param));
+ if (mi_handler_param==0){
+ rpc->fault(ctx, 500, "out of memory");
+ return;
+ }
+ mi_async_h=shm_malloc(sizeof(*mi_async_h));
+ if (mi_async_h==0){
+ shm_free(mi_handler_param);
+ mi_handler_param=0;
+ rpc->fault(ctx, 500, "out of memory");
+ return;
+ }
+ memset(mi_async_h, 0, sizeof(*mi_async_h));
+ mi_async_h->handler_f=mi_rpc_async_close;
+ mi_handler_param->mode=mode;
+ mi_handler_param->dctx=rpc->delayed_ctx_new(ctx);
+ if (mi_handler_param->dctx==0){
+ rpc->fault(ctx, 500, "internal error: async ctx"
+ " creation failed");
+ goto error;
+ }
+ /* switch context, since replies are not allowed anymore on the
+ original one */
+ rpc=&mi_handler_param->dctx->rpc;
+ ctx=mi_handler_param->dctx->reply_ctx;
+ mi_async_h->param=mi_handler_param;
+ }
+ mi_req->async_hdl=mi_async_h;
}
mi_rpl=run_mi_cmd(mic, mi_req);
if(mi_rpl == 0)
{
rpc->fault(ctx, 500, "execution failed");
- if (mi_req) free_mi_tree(mi_req);
- return;
+ goto error;
}
if (mi_rpl!=MI_ROOT_ASYNC_RPL)
{
mi_rpc_print_tree(rpc, ctx, mi_rpl, mode);
+ goto end;
+ }else if (mi_async_h==0){
+ /* async reply, but command not listed as async */
+ rpc->fault(ctx, 500, "bad mi command: unexpected async reply");
+ goto error;
+ }
+ mi_async_h=0; /* don't delete it */
+end:
+error:
+ if (mi_req)
+ free_mi_tree(mi_req);
+ if (mi_rpl && mi_rpl!=MI_ROOT_ASYNC_RPL)
free_mi_tree(mi_rpl);
- if (mi_req) free_mi_tree(mi_req);
- return;
+ if (mi_async_h){
+ if (mi_async_h->param){
+ if (((struct mi_rpc_handler_param*)mi_async_h->param)->dctx)
+ rpc->delayed_ctx_close(((struct mi_rpc_handler_param*)
+ mi_async_h->param)->dctx);
+ shm_free(mi_async_h->param);
+ }
+ shm_free(mi_async_h);
}
-
- /* async cmd -- not yet */
- rpc->fault(ctx, 500, "no async handling yet");
- if (mi_req) free_mi_tree(mi_req);
return;
}