[sr-dev] Using t_suspend()/t_continue() multiple times on the same transaction

Peter Dunkley peter.dunkley at crocodile-rcs.com
Wed Mar 28 16:52:27 CEST 2012


Hello again,

I took a quick look at the code in dset.c:append_branch().  This appears
to update the global variable nr_branches, but the check in
t_suspend.c:t_continue() is against t->nr_of_outgoings.  There are no
calls into the tm module in the PRESENCE_DISTRIBUTE and PRESENCE_ENQUEUE
routes except for t_suspend(), so I suspect that the call to
append_branch() didn't work because the updated nr_branches value was
not used to update t->nr_of_outgoings.  Should this be what happens?

Does this make sense?  Is there a simple work-around for this?

Thanks,

Peter

On Wed, 2012-03-28 at 15:30 +0100, Peter Dunkley wrote:

> Hi,
> 
> The append_branch() hasn't helped.
> 
> Having looked a bit more closely, I think what I said at the end of my
> last email was not quite right.
> 
> The t_continue() that kills the transaction is actually the first one.
> However, it only kills the transaction (causing a 500 to be sent)
> after it has been successfully suspended.  What this means is that the
> second t_continue() seems to manage to resume the transaction (despite
> it having previously been killed), but at this point the presence APIs
> go wrong because the transaction cannot be statefully replied to.
> Here is some log content:
> 
>         1: Mar 28 12:02:28 pd-laptop-linux kamailio[16424]: INFO:
>         <script>: presence: Found queued transaction [57508:831770136]
>         2: Mar 28 12:02:28 pd-laptop-linux kamailio[16424]: INFO:
>         <script>: PUBLISH: route[PRESENCE_DISTRIBUTE]
>         3: Mar 28 12:02:28 pd-laptop-linux kamailio[16424]: INFO:
>         <script>: Adding to queue: presenceWorker4
>         4: Mar 28 12:02:28 pd-laptop-linux kamailio[16424]: WARNING:
>         <script>: PUBLISH: route[PRESENCE_ENQUEUE]
>         5: Mar 28 12:02:28 pd-laptop-linux kamailio[16424]: INFO:
>         <script>: Suspended transaction for PUBLISH [57508:831770136]
>         6: Mar 28 12:02:28 pd-laptop-linux kamailio[16424]: ERROR: tm
>         [t_suspend.c:223]: branch == t->nr_of_outgoings
>         7: Mar 28 12:02:28 pd-laptop-linux kamailio[16470]: ERROR: pua
>         [pua_db.c:905]: no rows deleted
>         8: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: WARNING:
>         <script>: presenceWorker4: found queued transaction
>         [57508:831770136]
>         9: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: WARNING:
>         <script>: PUBLISH: route[PRESENCE]: presenceWorker4
>         10: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR: tm
>         [t_reply.c:591]: ERROR: _reply_light: can't generate 200 reply
>         when a final 500 was sent out
>         11: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR: sl
>         [sl.c:270]: failed to reply stateful (tm)
>         12: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR:
>         presence [presentity.c:154]: sending reply
>         13: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR:
>         presence [presentity.c:400]: sending 200OK
>         14: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR:
>         presence [publish.c:487]: when updating presentity
>         15: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR: tm
>         [t_reply.c:591]: ERROR: _reply_light: can't generate 500 reply
>         when a final 500 was sent out
>         16: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR: sl
>         [sl.c:270]: failed to reply stateful (tm)
>         17: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR:
>         presence [utils_func.c:146]: sending 500 Server Internal Error
>         reply
>         18: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR:
>         presence [publish.c:517]: failed to send error reply
>         
> 
> The scenario here is that pua_usrloc sends a PUBLISH to the local
> Kamailio instance.  Line 7 is the pua module handling the 500 response
> sent by t_continue().  Lines 10 through 18 are presence getting the
> PUBLISH, handling it properly (updating the DB and so on), but then
> failing to respond, because a 500 has already been sent for the
> transaction.
> 
> Thanks,
> 
> Peter
> 
> On Wed, 2012-03-28 at 14:44 +0100, Peter Dunkley wrote:
> 
> > Hi,
> > 
> > I am not relaying or replying to messages directly here - except in
> > the error case.  I am using the t_suspend()/t_continue() along with
> > the presence and RLS APIs.  So what I have is the following:
> > 
> > 
> >         #!substdef "!PRESENCE_PROCESS_SLEEP!100000!g"
> >         modparam("mqueue", "mqueue", "name=presence")
> >         modparam("rtimer", "timer", "name=presenceMaster;interval=PRESENCE_PROCESS_SLEEPu;mode=1;")
> >         modparam("rtimer", "exec", "timer=presenceMaster;route=PRESENCE_MASTER_PROCESS")
> >         modparam("mqueue", "mqueue", "name=presenceWorker0")
> >         modparam("rtimer", "timer", "name=presenceWorker0;interval=1u;mode=1;")
> >         modparam("rtimer", "exec", "timer=presenceWorker0;route=PRESENCE_WORKER_PROCESS")
> >         modparam("mqueue", "mqueue", "name=presenceWorker1")
> >         modparam("rtimer", "timer", "name=presenceWorker1;interval=1u;mode=1;")
> >         modparam("rtimer", "exec", "timer=presenceWorker1;route=PRESENCE_WORKER_PROCESS")
> >         ...
> >         modparam("mqueue", "mqueue", "name=presenceWorkern")
> >         modparam("rtimer", "timer", "name=presenceWorkern;interval=1u;mode=1;")
> >         modparam("rtimer", "exec", "timer=presenceWorkern;route=PRESENCE_WORKER_PROCESS")
> >         ...
> >         route {
> >         	...
> >         	# Some logic to determine this is a presence request (within or without dialog)
> >         	$var(queue) = "presence";
> >         	route(PRESENCE_ENQUEUE);
> >         	...
> >         }
> >         ...
> >         route[PRESENCE_ENQUEUE] {
> >                 xlog("L_WARN", "$rm: route[PRESENCE_ENQUEUE]\n");
> >         
> >                 if (!t_suspend()) {
> >                         t_reply("500", "Server Internal Error");
> >                         xlog("L_ERR", "Failed to suspend transaction for $rm\n");
> >                         exit;
> >                 }
> >         
> >                 xlog("L_INFO", "Suspended transaction for $rm [$T(id_index):$T(id_label)]\n");
> >         
> >                 if (!mq_add("$var(queue)", "$T(id_index)", "$T(id_label)")) {
> >                         t_reply("500", "Server Internal Error");
> >                         xlog("L_ERR", "Failed to queue transaction for $rm [$T(id_index):$T(id_label)] on $var(queue)\n");
> >                         exit;
> >                 }
> >         
> >                 exit;
> >         }
> >         
> >         route[PRESENCE_MASTER_PROCESS] {
> >                 xlog("L_INFO", "Running PRESENCE_MASTER_PROCESS\n");
> >         
> >                 while (mq_fetch("presence")) {
> >                         $var(id_index) = (int) $mqk(presence);
> >                         $var(id_label) = (int) $mqv(presence);
> >                         xlog("L_INFO", "presence: Found queued transaction [$var(id_index):$var(id_label)]\n");
> >                         t_continue("$var(id_index)", "$var(id_label)", "PRESENCE_DISTRIBUTE");
> >                 }
> >         }
> >         
> >         route[PRESENCE_DISTRIBUTE] {
> >                 xlog("L_WARN", "$rm: route[PRESENCE_DISTRIBUTE]\n");
> >         
> >                 # Some algorithm to distribute traffic across queues...
> >         	# Perhaps on request type (so we have a NOTIFIER that
> >         	# sends NOTIFY requests in order), perhaps on some form
> >         	# of hash...  I am experimenting with this...
> >                 $var(queue) = "presenceWorker" + $var(queue_number);
> >                 xlog("L_INFO", "Adding to queue: $var(queue)\n");
> >                 route(PRESENCE_ENQUEUE);
> >         }
> >         
> >         route[PRESENCE_WORKER_PROCESS] {
> >                 lock("pres");
> >                 $var(pres) = $shv(pres);
> >                 $shv(pres) = $shv(pres) + 1;
> >                 unlock("pres");
> >         
> >                 $var(queue) = "presenceWorker" + $var(pres);
> >                 xlog("L_WARN", "Starting process: $var(queue) (pid: $pp)\n");
> >         
> >                 while (1) {
> >                         while (mq_fetch($var(queue))) {
> >                                 $var(id_index) = (int) $mqk($var(queue));
> >                                 $var(id_label) = (int) $mqv($var(queue));
> >                                 xlog("L_WARN", "$var(queue): found queued transaction [$var(id_index):$var(id_label)]\n");
> >                                 t_continue("$var(id_index)", "$var(id_label)", "PRESENCE");
> >                         }
> >                         usleep(PRESENCE_PROCESS_SLEEP);
> >                 }
> >         }
> >         
> >         route[PRESENCE] {
> >                 xlog("L_WARN", "$rm: route[PRESENCE]: $var(queue)\n");
> >         
> >                 if (is_method("NOTIFY")) {
> >                         xlog("L_INFO", "Sending NOTIFY to RLS\n");
> >                         rls_handle_notify();
> >                 } else if (is_method("PUBLISH")) {
> >                         xlog("L_INFO", "Sending PUBLISH to Presence\n");
> >                         handle_publish();
> >                 } else if (is_method("SUBSCRIBE")) {
> >                         xlog("L_INFO", "Sending SUBSCRIBE to RLS\n");
> >                         $var(ret_code) = rls_handle_subscribe();
> >                         if ($var(ret_code) == 10) {
> >                                 xlog("L_INFO", "  SUBSCRIBE not for RLS - sending to Presence\n");
> >                                 handle_subscribe();
> >                         }
> >                 } else {
> >                         xlog("L_ERR", "Received non-(NOTIFY|PUBLISH|SUBSCRIBE) request from presence queue\n");
> >                         t_reply("500", "Server Internal Error");
> >                 }
> >         	exit;
> >         }
> >         ...
> >         
> > 
> > Previously, I had just a single queue "presence" which all of the
> > presence worker process took requests from.  This meant that
> > t_suspend()/t_continue() was used just once and this worked (the
> > presence/RLS APIs respond to the requests statefully).  The reason
> > for doing this in the first place is that I was getting problems
> > with back-end RLS traffic all using a single TCP connection, which
> > meant all the back-end presence requests were being handled by the
> > same Kamailio process, which caused a bottleneck (using UDP causes a
> > different set of problems under load and isn't really an option).
> > Although the queue is a FIFO the fact that different processes could
> > take different amounts of time means that things were happening out
> > of order (Klaus and Anca have had a discussion about just this kind
> > of issue with presence on the mailing list recently) and this is
> > causing me problems.
> > 
> > What I have now (above) is presence requests being pulled from the
> > TCP buffer and suspended as quickly as possible.  A presenceMaster
> > process then dequeues the request (continues it), performs some
> > analysis to determine which worker should deal with it, and then
> > suspends it again queuing it for the right worker.  All of this
> > works up until the t_continue() for the worker (in the
> > PRESENCE_WORKER_PROCESS) is called.  At this point the transaction
> > is killed.
> > 
> > What I can't understand is why the first t_suspend()/t_continue()
> > works here, but the second fails.  My previous version of this (with
> > the single queue and single t_suspend()/t_continue() call) worked
> > fine, but it seems that the sequence of t_suspend(), t_continue(),
> > t_suspend(), t_continue() - with no changes to or handling of the
> > request in-between - fails.
> > 
> > Thanks,
> > 
> > Peter
> > 
> > 
> > On Wed, 2012-03-28 at 15:13 +0200, Miklos Tirpak wrote: 
> > 
> > > Peter,
> > > 
> > > t_suspend() and t_continue() should work multiple times as long as they 
> > > are executed sequentially after each other, i.e. there cannot be two 
> > > branches suspended at the same time.
> > > 
> > > The error you get means to me that t_continue() executed the specified 
> > > route block, but in that route, the request was neither replied nor a 
> > > new branch was added. Hence, the transaction is hanging in memory and 
> > > the module sees no pending branch that could return a reply later.
> > > 
> > > Make sure that in the route block executed by t_continue() there is 
> > > either a t_reply() or you append a new branch and forward it with 
> > > t_relay() (or append a new branch and call t_suspend() again). I think 
> > > you also need to handle the failure of t_relay() and explicitly call 
> > > t_reply() when t_relay() fails in this route.
> > > 
> > > Regards,
> > > Miklos
> > > 
> > > On 03/28/2012 02:21 PM, Daniel-Constantin Mierla wrote:
> > > > Hello,
> > > >
> > > > I have been using it only once and didn't looked much deeper into the code.
> > > >
> > > > Maybe Miklos (cc-ed) can give faster more details, afaik he is the
> > > > developer of that piece.
> > > >
> > > > Cheers,
> > > > Daniel
> > > >
> > > > On 3/28/12 1:13 PM, Peter Dunkley wrote:
> > > >> Hi,
> > > >>
> > > >> I am trying to use t_suspend()/t_continue() multiple times on the same
> > > >> transaction. Calling t_suspend() more than once works, but the second
> > > >> time I call t_continue() the transaction is killed and a 500 response
> > > >> is sent. It is the "if (branch == t->nr_of_outgoings)" check from the
> > > >> code fragment below (from t_suspend.c:t_continue()) that results in
> > > >> the transaction being killed - you can see the debug/error line I
> > > >> added to determine this in the fragment.
> > > >>
> > > >> Is using t_suspend()/t_continue() multiple times something that should
> > > >> work?
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Peter
> > > >>
> > > >> if (t->uas.status < 200) {
> > > >> /* No final reply has been sent yet.
> > > >> * Check whether or not there is any pending branch.
> > > >> */
> > > >> for ( branch = 0;
> > > >> branch < t->nr_of_outgoings;
> > > >> branch++
> > > >> ) {
> > > >> if ((t->uac[branch].request.buffer != NULL)
> > > >> && (t->uac[branch].last_received < 200)
> > > >> )
> > > >> break;
> > > >> }
> > > >>
> > > >> if (branch == t->nr_of_outgoings) {
> > > >> /* There is not any open branch so there is
> > > >> * no chance that a final response will be received. */
> > > >> ret = 0;
> > > >> LM_ERR("branch == t->nr_of_outgoings\n");
> > > >> goto kill_trans;
> > > >> }
> > > >> }
> > > >>
> > > >> --
> > > >> Peter Dunkley
> > > >> Technical Director
> > > >> Crocodile RCS Ltd
> > > >>
> > > >>
> > > >>
> > > >> _______________________________________________
> > > >> sr-dev mailing list
> > > >> sr-dev at lists.sip-router.org
> > > >> http://lists.sip-router.org/cgi-bin/mailman/listinfo/sr-dev
> > > >
> > > > --
> > > > Daniel-Constantin Mierla
> > > > Kamailio Advanced Training, April 23-26, 2012, Berlin, Germany
> > > > http://www.asipto.com/index.php/kamailio-advanced-training/
> > > >
> > 
> > 
> > _______________________________________________
> > sr-dev mailing list
> > sr-dev at lists.sip-router.org
> > http://lists.sip-router.org/cgi-bin/mailman/listinfo/sr-dev
> > 
> 
> _______________________________________________
> sr-dev mailing list
> sr-dev at lists.sip-router.org
> http://lists.sip-router.org/cgi-bin/mailman/listinfo/sr-dev
> 

-- 
Peter Dunkley
Technical Director
Crocodile RCS Ltd
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.sip-router.org/pipermail/sr-dev/attachments/20120328/a7542a02/attachment-0001.htm>


More information about the sr-dev mailing list