[sr-dev] git:andrei/rpc_async: xmlrpc(s): basic support for delayed replies

Andrei Pelinescu-Onciul andrei at iptel.org
Thu Sep 3 11:27:19 CEST 2009


Module: sip-router
Branch: andrei/rpc_async
Commit: 4dc222c20c93899a35046c1559a94618fb01e84d
URL:    http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=4dc222c20c93899a35046c1559a94618fb01e84d

Author: Andrei Pelinescu-Onciul <andrei at iptel.org>
Committer: Andrei Pelinescu-Onciul <andrei at iptel.org>
Date:   Thu Jul 30 15:45:11 2009 +0200

xmlrpc(s): basic support for delayed replies

Support for delaying replies with a few caveats:
- a special delayed reply context must be created first (via the
  new rpc hooks).
- a function using this context does not have any access to the
  original rpc message (so if it needs any parameters from the
  original rpc request it must pass them somehow to the function
  that will use the delayed reply context).
- a delayed reply context can  be used _only_ from one process (is
  not possible to add part of the reply from one process and
  another part from another process).
- when finished the delayed reply context _must_ be closed (using
  the new rpc hook). This must be done from the same process in
  which the delayed reply context was used.

---

 modules_s/xmlrpc/xmlrpc.c |  140 ++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 139 insertions(+), 1 deletions(-)

diff --git a/modules_s/xmlrpc/xmlrpc.c b/modules_s/xmlrpc/xmlrpc.c
index c140626..f386a88 100644
--- a/modules_s/xmlrpc/xmlrpc.c
+++ b/modules_s/xmlrpc/xmlrpc.c
@@ -60,6 +60,7 @@
 #include "../../action.h" /* run_actions */
 #include "../../script_cb.h" /* exec_*_script_cb */
 #include "../../route.h" /* route_get */
+#include "../../sip_msg_clone.h" /* sip_msg_shm_clone */
 #include "http.h"
 
 /** @addtogroup xmlrpc
@@ -316,6 +317,8 @@ typedef struct rpc_ctx {
 							  received */
 	struct xmlrpc_reply reply;  /**< XML-RPC reply to be sent to the client */
 	struct rpc_struct* structs; /**< Structures to be added to the reply */
+	int msg_shm_block_size; /**< non-zero for delayed reply contexts with
+								shm cloned msgs */
 	int reply_sent;             /**< The flag is set after a reply is sent,
 								   this prevents a single reply being sent
 								   twice */
@@ -330,6 +333,11 @@ typedef struct rpc_ctx {
 } rpc_ctx_t;
 
 
+/* extra rpc_ctx_t flags */
+/* first 8 bits reserved for rpc flags (e.g. RET_ARRAY) */
+#define XMLRPC_DELAYED_CTX_F	256
+#define XMLRPC_DELAYED_REPLY_F	512
+
 /** The structure represents a XML-RPC document structure.
  *
  * This is the data structure that represents XML-RPC structures that are sent
@@ -424,6 +432,9 @@ struct module_exports exports = {
 #define ESC_AMP "&amp;"
 
 
+static void clean_context(rpc_ctx_t* ctx);
+
+
 /** Adds arbitrary text to the XML-RPC reply being constructed, special
  * characters < and & will be escaped.
  *
@@ -606,6 +617,21 @@ static int init_xmlrpc_reply(struct xmlrpc_reply* reply)
 }
 
 
+
+/* if this a delayed reply context, and it's never been use before, fix it */
+static int fix_delayed_reply_ctx(rpc_ctx_t* ctx)
+{
+	if  ((ctx->flags & XMLRPC_DELAYED_CTX_F) && (ctx->reply.buf.s==0)){
+		if (init_xmlrpc_reply(&ctx->reply) <0) return -1;
+		add_xmlrpc_reply(&ctx->reply, &success_prefix);
+		if (ctx->flags & RET_ARRAY)
+			return add_xmlrpc_reply(&ctx->reply, &array_prefix);
+	}
+	return 0;
+}
+
+
+
 /** Free all memory used by the XML-RPC reply structure. */
 static void clean_xmlrpc_reply(struct xmlrpc_reply* reply)
 {
@@ -990,6 +1016,7 @@ static int rpc_add(rpc_ctx_t* ctx, char* fmt, ...)
 	struct xmlrpc_reply* reply;
 	struct rpc_struct* p;
 
+	fix_delayed_reply_ctx(ctx);
 	va_start(ap, fmt);
 	reply = &ctx->reply;
 
@@ -1479,6 +1506,7 @@ static int rpc_printf(rpc_ctx_t* ctx, char* fmt, ...)
 	str s;
 	struct xmlrpc_reply* reply;
 
+	fix_delayed_reply_ctx(ctx);
 	reply = &ctx->reply;
 	buf = (char*)pkg_malloc(RPC_BUF_SIZE);
 	if (!buf) {
@@ -1746,6 +1774,111 @@ static int rpc_struct_scan(struct rpc_struct* s, char* fmt, ...)
 }
 
 
+/** Returns the RPC capabilities supported by the xmlrpc driver.
+ */
+static rpc_capabilities_t rpc_capabilities(rpc_ctx_t* ctx)
+{
+	return RPC_DELAYED_REPLY;
+}
+
+
+/** Returns a new "delayed reply" context.
+ * Creates a new delayed reply context in shm and returns it.
+ * @return 0 - not supported, already replied, or no more memory;
+ *         !=0 pointer to the special delayed ctx.
+ * Note1: one should use the returned ctx reply context to build a reply and
+ *  when finished call rpc_delayed_ctx_close().
+ * Note2: adding pieces to the reply in different processes is not supported.
+ */
+static struct rpc_delayed_ctx* rpc_delayed_ctx_new(rpc_ctx_t* ctx)
+{
+	struct rpc_delayed_ctx* ret;
+	int size;
+	rpc_ctx_t* r_ctx;
+	struct sip_msg* shm_msg;
+	int len;
+	
+	ret=0;
+	shm_msg=0;
+	
+	if (ctx->reply_sent)
+		return 0; /* no delayed reply if already replied */
+	/* clone the sip msg */
+	shm_msg=sip_msg_shm_clone(ctx->msg, &len, 1);
+	if (shm_msg==0)
+		goto error;
+	
+	/* alloc into one block */
+	size=ROUND_POINTER(sizeof(*ret))+sizeof(rpc_ctx_t);
+	if ((ret=shm_malloc(size))==0)
+		goto error;
+	memset(ret, 0, size);
+	ret->rpc=func_param;
+	ret->reply_ctx=(char*)ret+ROUND_POINTER(sizeof(*ret));
+	r_ctx=ret->reply_ctx;
+	r_ctx->flags=ctx->flags | XMLRPC_DELAYED_CTX_F;
+	ctx->flags |= XMLRPC_DELAYED_REPLY_F;
+	r_ctx->msg=shm_msg;
+	r_ctx->msg_shm_block_size=len;
+	
+	return ret;
+error:
+	if (shm_msg)
+		shm_free(shm_msg);
+	if (ret)
+		shm_free(ret);
+	return 0;
+}
+
+
+
+/** Closes a "delayed reply" context and sends the reply.
+ * If no reply has been sent the reply will be built and sent automatically.
+ * See the notes from rpc_new_delayed_ctx()
+ */
+static void rpc_delayed_ctx_close(struct rpc_delayed_ctx* dctx)
+{
+	rpc_ctx_t* r_ctx;
+	struct hdr_field* hdr;
+	
+	r_ctx=dctx->reply_ctx;
+	if (unlikely(!(r_ctx->flags & XMLRPC_DELAYED_CTX_F))){
+		BUG("reply ctx not marked as async/delayed\n");
+		goto error;
+	}
+	if (fix_delayed_reply_ctx(r_ctx)<0)
+		goto error;
+	if (!r_ctx->reply_sent){
+		rpc_send(r_ctx);
+	}
+error:
+	clean_context(r_ctx);
+	/* collect possible garbage (e.g. generated by structures) */
+	collect_garbage();
+	/* free added lumps (rpc_send adds a body lump) */
+	del_nonshm_lump( &(r_ctx->msg->add_rm) );
+	del_nonshm_lump( &(r_ctx->msg->body_lumps) );
+	del_nonshm_lump_rpl( &(r_ctx->msg->reply_lump) );
+	/* free header's parsed structures that were added by failure handlers */
+	for( hdr=r_ctx->msg->headers ; hdr ; hdr=hdr->next ) {
+		if ( hdr->parsed && hdr_allocs_parse(hdr) &&
+		(hdr->parsed<(void*)r_ctx->msg ||
+		hdr->parsed>=(void*)(r_ctx->msg+r_ctx->msg_shm_block_size))) {
+			/* header parsed filed doesn't point inside uas.request memory
+			 * chunck -> it was added by failure funcs.-> free it as pkg */
+			DBG("DBG:free_faked_req: removing hdr->parsed %d\n",
+					hdr->type);
+			clean_hdr_field(hdr);
+			hdr->parsed = 0;
+		}
+	}
+	shm_free(r_ctx->msg);
+	r_ctx->msg=0;
+	dctx->reply_ctx=0;
+	shm_free(dctx);
+}
+
+
 /** Starts parsing XML-RPC document, get the name of the method to be called
  * and position the cursor at the first parameter in the document.
  */
@@ -1832,6 +1965,7 @@ static void close_doc(rpc_ctx_t* ctx)
 static int init_context(rpc_ctx_t* ctx, sip_msg_t* msg)
 {
 	ctx->msg = msg;
+	ctx->msg_shm_block_size=0;
 	ctx->method = 0;
 	ctx->reply_sent = 0;
 	ctx->act_param = 0;
@@ -2066,7 +2200,7 @@ static int dispatch_rpc(sip_msg_t* msg, char* s1, char* s2)
 
  skip:
 	     /* The function may have sent the reply itself */
-	if (!ctx.reply_sent) {
+	if (!ctx.reply_sent && !(ctx.flags&XMLRPC_DELAYED_REPLY_F)) {
 		ret = rpc_send(&ctx);
 	}
 	clean_context(&ctx);
@@ -2212,6 +2346,10 @@ static int mod_init(void)
 	func_param.struct_add = (rpc_struct_add_f)rpc_struct_add;
 	func_param.struct_scan = (rpc_struct_scan_f)rpc_struct_scan;
 	func_param.struct_printf = (rpc_struct_printf_f)rpc_struct_printf;
+	func_param.capabilities = (rpc_capabilities_f)rpc_capabilities;
+	func_param.delayed_ctx_new = (rpc_delayed_ctx_new_f)rpc_delayed_ctx_new;
+	func_param.delayed_ctx_close =
+		(rpc_delayed_ctx_close_f)rpc_delayed_ctx_close;
 	register_select_table(xmlrpc_sel);
 	
 	/* register non-sip hooks */




More information about the sr-dev mailing list