i have made progress with pua_rpc module. the remaining problem is that rpc->fault does not produce proper fault document when its argument is delayed_ctx. in the publish() function below, rpc->fault calls work fine up to the point when delayed ctx is created:
dctx = rpc->delayed_ctx_new(c);
after that for example this piece of code
if (ret == 418) { /* rpc->add(c, "ds", 500, "Wrong ETag"); */ rpc->fault(c, 500, "Wrong ETag"); rpc->delayed_ctx_close(dctx); }
produces
HTTP/1.1 200 OK. Via: SIP/2.0/TCP 127.0.0.1:52280. Server: OpenXg SIP Proxy (4.3.0-0 (i386/linux)). Content-Length: 108. . <?xml version="1.0"?> <methodResponse> <params> <param> <value></value> </param> </params>
that is, failure response document is empty. if i make the rpc->add call instead, i get
HTTP/1.1 200 OK. Via: SIP/2.0/TCP 127.0.0.1:52204. Server: OpenXg SIP Proxy (4.3.0-0 (i386/linux)). Content-Length: 151. . <?xml version="1.0"?> <methodResponse> <params> <param> <value><int>500</int> <string>Wrong ETag</string> </value> </param> </params> </methodResponse>
any ideas how to make rpc->fault to work on delayed ctx?
-- juha
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <time.h>
#include "../../sr_module.h" #include "../../parser/parse_expires.h" #include "../../dprint.h" #include "../../mem/shm_mem.h" #include "../../parser/msg_parser.h" #include "../../str.h" #include "../../mem/mem.h" #include "../../pt.h" #include "../../rpc_lookup.h" #include "../../modules/tm/tm_load.h" #include "../../lib/kcore/cmpapi.h" #include "../pua/pua_bind.h"
MODULE_VERSION
static int mod_init(void);
send_publish_t pua_send_publish;
static const char* publish_doc[2] = { "sends publish request and waits for the final reply, using a list " "of string parameters: presentity uri, expires, event package, " "content type, id, etag, outbound proxy, extra headers, " " and body (optional)", 0 };
static int publish_callback(ua_pres_t* hentity, struct sip_msg* reply) { rpc_delayed_ctx_t* dctx; void* c; rpc_t* rpc; struct hdr_field* hdr= NULL; int statuscode; int expires; int found; str etag; str reason = {0, 0};
LM_DBG("running callback\n");
if (reply == NULL || hentity == NULL || hentity->cb_param == NULL) { LM_ERR("NULL reply or hentity parameter\n"); return -1; }
dctx = (rpc_delayed_ctx_t *)(hentity->cb_param); hentity->cb_param = NULL; if (dctx == 0){ BUG("null delayed reply ctx\n"); return -1; } rpc = &dctx->rpc; c = dctx->reply_ctx;
if (reply == FAKED_REPLY) { statuscode = 408; reason.s = "Request Timeout"; reason.len = strlen(reason.s); } else { statuscode = reply->first_line.u.reply.statuscode; reason = reply->first_line.u.reply.reason; } rpc->add(c, "dS", statuscode, &reason);
if (statuscode == 200) { expires = ((exp_body_t*)reply->expires->parsed)->val; LM_DBG("expires = %d\n", expires); hdr = reply->headers; found = 0; while (hdr != NULL) { if(cmp_hdrname_strzn(&hdr->name, "SIP-ETag",8) == 0) { found = 1; break; } hdr = hdr->next; } if (found == 0) { LM_ERR("SIP-ETag header field not found\n"); rpc->delayed_ctx_close(dctx); return -1; } etag = hdr->body; LM_DBG("SIP-Etag = %.*s\n", etag.len, etag.s); rpc->add(c, "Sd", &etag, expires); }
rpc->delayed_ctx_close(dctx);
return 0; }
static void publish(rpc_t* rpc, void* c) { str pres_uri, expires, event, content_type, id, etag, outbound_proxy, extra_headers, body; rpc_delayed_ctx_t* dctx; int exp, sign, ret, err_ret, sip_error; char err_buf[MAX_REASON_LEN]; struct sip_uri uri; publ_info_t publ;
body.s = 0; body.len = 0; dctx = 0;
LM_DBG("rpc publishing ...\n");
if ((rpc->capabilities == 0) || !(rpc->capabilities(c) & RPC_DELAYED_REPLY)) { rpc->fault(c, 600, "Reply wait/async mode not supported" " by this rpc transport"); return; }
ret = rpc->scan(c, "SSSSSSSS*S", &pres_uri, &expires, &event, &content_type, &id, &etag, &outbound_proxy, &extra_headers, &body); if (ret < 8) { rpc->fault(c, 400, "too few parameters (%d)", ret); return; }
if (parse_uri(pres_uri.s, pres_uri.len, &uri) <0) { LM_ERR("bad resentity uri\n"); rpc->fault(c, 400, "Invalid presentity uri '%s'", pres_uri.s); return; } LM_DBG("presentity uri '%.*s'\n", pres_uri.len, pres_uri.s);
if (expires.s[0]== '-') { sign= -1; expires.s++; expires.len--; } else { sign = 1; } if (str2int(&expires, (unsigned int*)&exp) < 0) { LM_ERR("invalid expires parameter\n" ); rpc->fault(c, 400, "Invalid expires '%s'", expires.s); return; } exp = exp * sign; LM_DBG("expires '%d'\n", exp);
LM_DBG("event '%.*s'\n", event.len, event.s);
LM_DBG("content type '%.*s'\n", content_type.len, content_type.s);
LM_DBG("id '%.*s'\n", id.len, id.s);
LM_DBG("ETag '%.*s'\n", etag.len, etag.s);
LM_DBG("outbound_proxy '%.*s'\n", outbound_proxy.len, outbound_proxy.s);
LM_DBG("extra headers '%.*s'\n", extra_headers.len, extra_headers.s);
if (body.len > 0) LM_DBG("body '%.*s'\n", body.len, body.s);
if ((body.s == 0) && (content_type.len != 1 || content_type.s[0] != '.')) { LM_ERR("body is missing, but content type is not .\n"); rpc->fault(c, 400, "Body is missing"); return; }
memset(&publ, 0, sizeof(publ_info_t)); publ.pres_uri= &pres_uri;
publ.expires= exp;
publ.event= get_event_flag(&event); if (publ.event < 0) { LM_ERR("unknown event '%.*s'\n", event.len, event.s); rpc->fault(c, 400, "Unknown event"); return; }
if (content_type.len != 1) { publ.content_type= content_type; }
if (!((id.len == 1) && (id.s[0]== '.'))) { publ.id= id; }
if (!((etag.len== 1) && (etag.s[0]== '.'))) { publ.etag= &etag; }
if (!((outbound_proxy.len == 1) && (outbound_proxy.s[0] == '.'))) { publ.outbound_proxy = &outbound_proxy; }
if (!((extra_headers.len == 1) && (extra_headers.s[0] == '.'))) { publ.extra_headers = &extra_headers; }
if (body.s != 0) { publ.body= &body; }
dctx = rpc->delayed_ctx_new(c); if (dctx == 0) { LM_ERR("internal error: failed to create context\n"); rpc->fault(c, 500, "internal error: failed to create context"); return; } publ.cb_param = dctx; publ.source_flag = MI_ASYN_PUBLISH;
ret = pua_send_publish(&publ); LM_DBG("pua_send_publish returned %d\n", ret);
if (dctx->reply_ctx != 0) { /* callback was not executed or its execution failed */ rpc = &dctx->rpc; c = dctx->reply_ctx; } else { return; }
if (ret < 0) { LM_ERR("pua_send_publish failed\n"); err_ret = err2reason_phrase(ret, &sip_error, err_buf, sizeof(err_buf), "RPC/PUBLISH") ; if (err_ret > 0 ) { rpc->fault(c, sip_error, "%s", err_buf); } else { rpc->fault(c, 500, "RPC/PUBLISH error"); } rpc->delayed_ctx_close(dctx); }
if (ret == 418) { /* rpc->add(c, "ds", 500, "Wrong ETag"); */ rpc->fault(c, 500, "Wrong ETag"); rpc->delayed_ctx_close(dctx); }
return; }
rpc_export_t pua_rpc[] = { {"pua.publish", publish, publish_doc, 0}, {0, 0, 0, 0} };
/** module exports */ struct module_exports exports= { "pua_rpc", 0, pua_rpc, 0, mod_init, 0, 0, 0, 0 }; /** * init module function */ static int mod_init(void) { bind_pua_t bind_pua; pua_api_t pua;
LM_DBG("initializing\n"); bind_pua= (bind_pua_t)find_export("bind_pua", 1,0);
if (!bind_pua) { LM_ERR("can't find pua\n"); return -1; } if (bind_pua(&pua) < 0) { LM_ERR("can't bind pua\n"); return -1; }
if (pua.send_publish == NULL) { LM_ERR("could not import send_publish\n"); return -1; } pua_send_publish = pua.send_publish;
if (pua.register_puacb(MI_ASYN_PUBLISH, publish_callback, NULL) < 0) { LM_ERR("could not register callback\n"); return -1; }
return 0; }