Module: sip-router
Branch: andrei/rpc_async
Commit: 4dc222c20c93899a35046c1559a94618fb01e84d
URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=4dc222c…
Author: Andrei Pelinescu-Onciul <andrei(a)iptel.org>
Committer: Andrei Pelinescu-Onciul <andrei(a)iptel.org>
Date: Thu Jul 30 15:45:11 2009 +0200
xmlrpc(s): basic support for delayed replies
Support for delaying replies with a few caveats:
- a special delayed reply context must be created first (via the
new rpc hooks).
- a function using this context does not have any access to the
original rpc message (so if it needs any parameters from the
original rpc request it must pass them somehow to the function
that will use the delayed reply context).
- a delayed reply context can be used _only_ from one process (is
not possible to add part of the reply from one process and
another part from another process).
- when finished the delayed reply context _must_ be closed (using
the new rpc hook). This must be done from the same process in
which the delayed reply context was used.
---
modules_s/xmlrpc/xmlrpc.c | 140 ++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 139 insertions(+), 1 deletions(-)
diff --git a/modules_s/xmlrpc/xmlrpc.c b/modules_s/xmlrpc/xmlrpc.c
index c140626..f386a88 100644
--- a/modules_s/xmlrpc/xmlrpc.c
+++ b/modules_s/xmlrpc/xmlrpc.c
@@ -60,6 +60,7 @@
#include "../../action.h" /* run_actions */
#include "../../script_cb.h" /* exec_*_script_cb */
#include "../../route.h" /* route_get */
+#include "../../sip_msg_clone.h" /* sip_msg_shm_clone */
#include "http.h"
/** @addtogroup xmlrpc
@@ -316,6 +317,8 @@ typedef struct rpc_ctx {
received */
struct xmlrpc_reply reply; /**< XML-RPC reply to be sent to the client */
struct rpc_struct* structs; /**< Structures to be added to the reply */
+ int msg_shm_block_size; /**< non-zero for delayed reply contexts with
+ shm cloned msgs */
int reply_sent; /**< The flag is set after a reply is sent,
this prevents a single reply being sent
twice */
@@ -330,6 +333,11 @@ typedef struct rpc_ctx {
} rpc_ctx_t;
+/* extra rpc_ctx_t flags */
+/* first 8 bits reserved for rpc flags (e.g. RET_ARRAY) */
+#define XMLRPC_DELAYED_CTX_F 256
+#define XMLRPC_DELAYED_REPLY_F 512
+
/** The structure represents a XML-RPC document structure.
*
* This is the data structure that represents XML-RPC structures that are sent
@@ -424,6 +432,9 @@ struct module_exports exports = {
#define ESC_AMP "&"
+static void clean_context(rpc_ctx_t* ctx);
+
+
/** Adds arbitrary text to the XML-RPC reply being constructed, special
* characters < and & will be escaped.
*
@@ -606,6 +617,21 @@ static int init_xmlrpc_reply(struct xmlrpc_reply* reply)
}
+
+/* if this a delayed reply context, and it's never been use before, fix it */
+static int fix_delayed_reply_ctx(rpc_ctx_t* ctx)
+{
+ if ((ctx->flags & XMLRPC_DELAYED_CTX_F) && (ctx->reply.buf.s==0)){
+ if (init_xmlrpc_reply(&ctx->reply) <0) return -1;
+ add_xmlrpc_reply(&ctx->reply, &success_prefix);
+ if (ctx->flags & RET_ARRAY)
+ return add_xmlrpc_reply(&ctx->reply, &array_prefix);
+ }
+ return 0;
+}
+
+
+
/** Free all memory used by the XML-RPC reply structure. */
static void clean_xmlrpc_reply(struct xmlrpc_reply* reply)
{
@@ -990,6 +1016,7 @@ static int rpc_add(rpc_ctx_t* ctx, char* fmt, ...)
struct xmlrpc_reply* reply;
struct rpc_struct* p;
+ fix_delayed_reply_ctx(ctx);
va_start(ap, fmt);
reply = &ctx->reply;
@@ -1479,6 +1506,7 @@ static int rpc_printf(rpc_ctx_t* ctx, char* fmt, ...)
str s;
struct xmlrpc_reply* reply;
+ fix_delayed_reply_ctx(ctx);
reply = &ctx->reply;
buf = (char*)pkg_malloc(RPC_BUF_SIZE);
if (!buf) {
@@ -1746,6 +1774,111 @@ static int rpc_struct_scan(struct rpc_struct* s, char* fmt, ...)
}
+/** Returns the RPC capabilities supported by the xmlrpc driver.
+ */
+static rpc_capabilities_t rpc_capabilities(rpc_ctx_t* ctx)
+{
+ return RPC_DELAYED_REPLY;
+}
+
+
+/** Returns a new "delayed reply" context.
+ * Creates a new delayed reply context in shm and returns it.
+ * @return 0 - not supported, already replied, or no more memory;
+ * !=0 pointer to the special delayed ctx.
+ * Note1: one should use the returned ctx reply context to build a reply and
+ * when finished call rpc_delayed_ctx_close().
+ * Note2: adding pieces to the reply in different processes is not supported.
+ */
+static struct rpc_delayed_ctx* rpc_delayed_ctx_new(rpc_ctx_t* ctx)
+{
+ struct rpc_delayed_ctx* ret;
+ int size;
+ rpc_ctx_t* r_ctx;
+ struct sip_msg* shm_msg;
+ int len;
+
+ ret=0;
+ shm_msg=0;
+
+ if (ctx->reply_sent)
+ return 0; /* no delayed reply if already replied */
+ /* clone the sip msg */
+ shm_msg=sip_msg_shm_clone(ctx->msg, &len, 1);
+ if (shm_msg==0)
+ goto error;
+
+ /* alloc into one block */
+ size=ROUND_POINTER(sizeof(*ret))+sizeof(rpc_ctx_t);
+ if ((ret=shm_malloc(size))==0)
+ goto error;
+ memset(ret, 0, size);
+ ret->rpc=func_param;
+ ret->reply_ctx=(char*)ret+ROUND_POINTER(sizeof(*ret));
+ r_ctx=ret->reply_ctx;
+ r_ctx->flags=ctx->flags | XMLRPC_DELAYED_CTX_F;
+ ctx->flags |= XMLRPC_DELAYED_REPLY_F;
+ r_ctx->msg=shm_msg;
+ r_ctx->msg_shm_block_size=len;
+
+ return ret;
+error:
+ if (shm_msg)
+ shm_free(shm_msg);
+ if (ret)
+ shm_free(ret);
+ return 0;
+}
+
+
+
+/** Closes a "delayed reply" context and sends the reply.
+ * If no reply has been sent the reply will be built and sent automatically.
+ * See the notes from rpc_new_delayed_ctx()
+ */
+static void rpc_delayed_ctx_close(struct rpc_delayed_ctx* dctx)
+{
+ rpc_ctx_t* r_ctx;
+ struct hdr_field* hdr;
+
+ r_ctx=dctx->reply_ctx;
+ if (unlikely(!(r_ctx->flags & XMLRPC_DELAYED_CTX_F))){
+ BUG("reply ctx not marked as async/delayed\n");
+ goto error;
+ }
+ if (fix_delayed_reply_ctx(r_ctx)<0)
+ goto error;
+ if (!r_ctx->reply_sent){
+ rpc_send(r_ctx);
+ }
+error:
+ clean_context(r_ctx);
+ /* collect possible garbage (e.g. generated by structures) */
+ collect_garbage();
+ /* free added lumps (rpc_send adds a body lump) */
+ del_nonshm_lump( &(r_ctx->msg->add_rm) );
+ del_nonshm_lump( &(r_ctx->msg->body_lumps) );
+ del_nonshm_lump_rpl( &(r_ctx->msg->reply_lump) );
+ /* free header's parsed structures that were added by failure handlers */
+ for( hdr=r_ctx->msg->headers ; hdr ; hdr=hdr->next ) {
+ if ( hdr->parsed && hdr_allocs_parse(hdr) &&
+ (hdr->parsed<(void*)r_ctx->msg ||
+ hdr->parsed>=(void*)(r_ctx->msg+r_ctx->msg_shm_block_size))) {
+ /* header parsed filed doesn't point inside uas.request memory
+ * chunck -> it was added by failure funcs.-> free it as pkg */
+ DBG("DBG:free_faked_req: removing hdr->parsed %d\n",
+ hdr->type);
+ clean_hdr_field(hdr);
+ hdr->parsed = 0;
+ }
+ }
+ shm_free(r_ctx->msg);
+ r_ctx->msg=0;
+ dctx->reply_ctx=0;
+ shm_free(dctx);
+}
+
+
/** Starts parsing XML-RPC document, get the name of the method to be called
* and position the cursor at the first parameter in the document.
*/
@@ -1832,6 +1965,7 @@ static void close_doc(rpc_ctx_t* ctx)
static int init_context(rpc_ctx_t* ctx, sip_msg_t* msg)
{
ctx->msg = msg;
+ ctx->msg_shm_block_size=0;
ctx->method = 0;
ctx->reply_sent = 0;
ctx->act_param = 0;
@@ -2066,7 +2200,7 @@ static int dispatch_rpc(sip_msg_t* msg, char* s1, char* s2)
skip:
/* The function may have sent the reply itself */
- if (!ctx.reply_sent) {
+ if (!ctx.reply_sent && !(ctx.flags&XMLRPC_DELAYED_REPLY_F)) {
ret = rpc_send(&ctx);
}
clean_context(&ctx);
@@ -2212,6 +2346,10 @@ static int mod_init(void)
func_param.struct_add = (rpc_struct_add_f)rpc_struct_add;
func_param.struct_scan = (rpc_struct_scan_f)rpc_struct_scan;
func_param.struct_printf = (rpc_struct_printf_f)rpc_struct_printf;
+ func_param.capabilities = (rpc_capabilities_f)rpc_capabilities;
+ func_param.delayed_ctx_new = (rpc_delayed_ctx_new_f)rpc_delayed_ctx_new;
+ func_param.delayed_ctx_close =
+ (rpc_delayed_ctx_close_f)rpc_delayed_ctx_close;
register_select_table(xmlrpc_sel);
/* register non-sip hooks */