Hi,

I have tried the fix and it worked.  I tried it both with and without the append_branch() call, and both cases worked.  Is this what you would expect?

Thanks,

Peter

On Thu, 2012-03-29 at 11:01 +0200, Miklos Tirpak wrote:
Hi,

I think the problem was in the check made by t_continue() which verifies 
whether or not a new branch was added, the check ignored the pending 
"blind UAC", i.e. the new branch added by the subsequent t_suspend().
This explains why the first t_continue() killed the transaction and why 
after the second t_suspend(). Thanks a lot for the logs!

I have committed a fix into master 
(9ae149ba25ee6467da1d95dd435995b9a59166a3), could you please try it?

Miklos

On 03/28/2012 04:52 PM, Peter Dunkley wrote:
> Hello again,
>
> I took a quick look at the code in dset.c:append_branch(). This appears
> to update the global variable nr_branches, but the check in
> t_suspend.c:t_continue() is against t->nr_of_outgoings. There are no
> calls into the tm module in the PRESENCE_DISTRIBUTE and PRESENCE_ENQUEUE
> routes except for t_suspend(), so I suspect that the call to
> append_branch() didn't work because the updated nr_branches value was
> not used to update t->nr_of_outgoings. Should this be what happens?
>
> Does this make sense? Is there a simple work-around for this?
>
> Thanks,
>
> Peter
>
> On Wed, 2012-03-28 at 15:30 +0100, Peter Dunkley wrote:
>> Hi,
>>
>> The append_branch() hasn't helped.
>>
>> Having looked a bit more closely, I think what I said at the end of my
>> last email was not quite right.
>>
>> The t_continue() that kills the transaction is actually the first one.
>> However, it only kills the transaction (causing a 500 to be sent)
>> after it has been successfully suspended. What this means is that the
>> second t_continue() seems to manage to resume the transaction (despite
>> it having previously been killed), but at this point the presence APIs
>> go wrong because the transaction cannot be statefully replied to. Here
>> is some log content:
>>
>>     1: Mar 28 12:02:28 pd-laptop-linux kamailio[16424]: INFO:
>>     <script>: presence: Found queued transaction [57508:831770136]
>>     2: Mar 28 12:02:28 pd-laptop-linux kamailio[16424]: INFO:
>>     <script>: PUBLISH: route[PRESENCE_DISTRIBUTE]
>>     3: Mar 28 12:02:28 pd-laptop-linux kamailio[16424]: INFO:
>>     <script>: Adding to queue: presenceWorker4
>>     4: Mar 28 12:02:28 pd-laptop-linux kamailio[16424]: WARNING:
>>     <script>: PUBLISH: route[PRESENCE_ENQUEUE]
>>     5: Mar 28 12:02:28 pd-laptop-linux kamailio[16424]: INFO:
>>     <script>: Suspended transaction for PUBLISH [57508:831770136]
>>     6: Mar 28 12:02:28 pd-laptop-linux kamailio[16424]: ERROR: tm
>>     [t_suspend.c:223]: branch == t->nr_of_outgoings
>>     7: Mar 28 12:02:28 pd-laptop-linux kamailio[16470]: ERROR: pua
>>     [pua_db.c:905]: no rows deleted
>>     8: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: WARNING:
>>     <script>: presenceWorker4: found queued transaction [57508:831770136]
>>     9: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: WARNING:
>>     <script>: PUBLISH: route[PRESENCE]: presenceWorker4
>>     10: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR: tm
>>     [t_reply.c:591]: ERROR: _reply_light: can't generate 200 reply
>>     when a final 500 was sent out
>>     11: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR: sl
>>     [sl.c:270]: failed to reply stateful (tm)
>>     12: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR:
>>     presence [presentity.c:154]: sending reply
>>     13: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR:
>>     presence [presentity.c:400]: sending 200OK
>>     14: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR:
>>     presence [publish.c:487]: when updating presentity
>>     15: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR: tm
>>     [t_reply.c:591]: ERROR: _reply_light: can't generate 500 reply
>>     when a final 500 was sent out
>>     16: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR: sl
>>     [sl.c:270]: failed to reply stateful (tm)
>>     17: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR:
>>     presence [utils_func.c:146]: sending 500 Server Internal Error reply
>>     18: Mar 28 12:02:28 pd-laptop-linux kamailio[16412]: ERROR:
>>     presence [publish.c:517]: failed to send error reply
>>
>> The scenario here is that pua_usrloc sends a PUBLISH to the local
>> Kamailio instance. Line 7 is the pua module handling the 500 response
>> sent by t_continue(). Lines 10 through 18 are presence getting the
>> PUBLISH, handling it properly (updating the DB and so on), but then
>> failing to respond, because a 500 has already been sent for the
>> transaction.
>>
>> Thanks,
>>
>> Peter
>>
>> On Wed, 2012-03-28 at 14:44 +0100, Peter Dunkley wrote:
>>> Hi,
>>>
>>> I am not relaying or replying to messages directly here - except in
>>> the error case. I am using the t_suspend()/t_continue() along with
>>> the presence and RLS APIs. So what I have is the following:
>>>
>>>     #!substdef"!PRESENCE_PROCESS_SLEEP!100000!g"
>>>     modparam("mqueue","mqueue","name=presence")
>>>     modparam("rtimer","timer","name=presenceMaster;interval=PRESENCE_PROCESS_SLEEPu;mode=1;")
>>>     modparam("rtimer","exec","timer=presenceMaster;route=PRESENCE_MASTER_PROCESS")
>>>     modparam("mqueue","mqueue","name=presenceWorker0")
>>>     modparam("rtimer","timer","name=presenceWorker0;interval=1u;mode=1;")
>>>     modparam("rtimer","exec","timer=presenceWorker0;route=PRESENCE_WORKER_PROCESS")
>>>     modparam("mqueue","mqueue","name=presenceWorker1")
>>>     modparam("rtimer","timer","name=presenceWorker1;interval=1u;mode=1;")
>>>     modparam("rtimer","exec","timer=presenceWorker1;route=PRESENCE_WORKER_PROCESS")
>>>     ...
>>>     modparam("mqueue","mqueue","name=presenceWorkern")
>>>     modparam("rtimer","timer","name=presenceWorkern;interval=1u;mode=1;")
>>>     modparam("rtimer","exec","timer=presenceWorkern;route=PRESENCE_WORKER_PROCESS")
>>>     ...
>>>     route {
>>>     	...
>>>     	# Some logic to determine this is a presence request (within or without dialog)
>>>     	$var(queue) ="presence";
>>>     	route(PRESENCE_ENQUEUE);
>>>     	...
>>>     }
>>>     ...
>>>     route[PRESENCE_ENQUEUE] {
>>>                     xlog("L_WARN","$rm: route[PRESENCE_ENQUEUE]\n");
>>>
>>>                     if (!t_suspend()) {
>>>                                     t_reply("500","Server Internal Error");
>>>                                     xlog("L_ERR","Failed to suspend transaction for $rm\n");
>>>                                     exit;
>>>                     }
>>>
>>>                     xlog("L_INFO","Suspended transaction for $rm [$T(id_index):$T(id_label)]\n");
>>>
>>>                     if (!mq_add("$var(queue)","$T(id_index)","$T(id_label)")) {
>>>                                     t_reply("500","Server Internal Error");
>>>                                     xlog("L_ERR","Failed to queue transaction for $rm [$T(id_index):$T(id_label)] on $var(queue)\n");
>>>                                     exit;
>>>                     }
>>>
>>>                     exit;
>>>     }
>>>
>>>     route[PRESENCE_MASTER_PROCESS] {
>>>                     xlog("L_INFO","Running PRESENCE_MASTER_PROCESS\n");
>>>
>>>                     while (mq_fetch("presence")) {
>>>                                     $var(id_index) = (int) $mqk(presence);
>>>                                     $var(id_label) = (int) $mqv(presence);
>>>                                     xlog("L_INFO","presence: Found queued transaction [$var(id_index):$var(id_label)]\n");
>>>                                     t_continue("$var(id_index)","$var(id_label)","PRESENCE_DISTRIBUTE");
>>>                     }
>>>     }
>>>
>>>     route[PRESENCE_DISTRIBUTE] {
>>>                     xlog("L_WARN","$rm: route[PRESENCE_DISTRIBUTE]\n");
>>>
>>>                     # Some algorithm to distribute traffic across queues...
>>>     	# Perhaps on request type (so we have a NOTIFIER that
>>>     	# sends NOTIFY requests in order), perhaps on some form
>>>     	# of hash...    I am experimenting with this...
>>>                     $var(queue) ="presenceWorker"  + $var(queue_number);
>>>                     xlog("L_INFO","Adding to queue: $var(queue)\n");
>>>                     route(PRESENCE_ENQUEUE);
>>>     }
>>>
>>>     route[PRESENCE_WORKER_PROCESS] {
>>>                     lock("pres");
>>>                     $var(pres) = $shv(pres);
>>>                     $shv(pres) = $shv(pres) + 1;
>>>                     unlock("pres");
>>>
>>>                     $var(queue) ="presenceWorker"  + $var(pres);
>>>                     xlog("L_WARN","Starting process: $var(queue) (pid: $pp)\n");
>>>
>>>                     while (1) {
>>>                                     while (mq_fetch($var(queue))) {
>>>                                                     $var(id_index) = (int) $mqk($var(queue));
>>>                                                     $var(id_label) = (int) $mqv($var(queue));
>>>                                                     xlog("L_WARN","$var(queue): found queued transaction [$var(id_index):$var(id_label)]\n");
>>>                                                     t_continue("$var(id_index)","$var(id_label)","PRESENCE");
>>>                                     }
>>>                                     usleep(PRESENCE_PROCESS_SLEEP);
>>>                     }
>>>     }
>>>
>>>     route[PRESENCE] {
>>>                     xlog("L_WARN","$rm: route[PRESENCE]: $var(queue)\n");
>>>
>>>                     if (is_method("NOTIFY")) {
>>>                                     xlog("L_INFO","Sending NOTIFY to RLS\n");
>>>                                     rls_handle_notify();
>>>                     } else if (is_method("PUBLISH")) {
>>>                                     xlog("L_INFO","Sending PUBLISH to Presence\n");
>>>                                     handle_publish();
>>>                     } else if (is_method("SUBSCRIBE")) {
>>>                                     xlog("L_INFO","Sending SUBSCRIBE to RLS\n");
>>>                                     $var(ret_code) = rls_handle_subscribe();
>>>                                     if ($var(ret_code) == 10) {
>>>                                                     xlog("L_INFO","    SUBSCRIBE not for RLS - sending to Presence\n");
>>>                                                     handle_subscribe();
>>>                                     }
>>>                     } else {
>>>                                     xlog("L_ERR","Received non-(NOTIFY|PUBLISH|SUBSCRIBE) request from presence queue\n");
>>>                                     t_reply("500","Server Internal Error");
>>>                     }
>>>     	exit;
>>>     }
>>>     ...
>>>
>>>
>>> Previously, I had just a single queue "presence" which all of the
>>> presence worker process took requests from. This meant that
>>> t_suspend()/t_continue() was used just once and this worked (the
>>> presence/RLS APIs respond to the requests statefully). The reason for
>>> doing this in the first place is that I was getting problems with
>>> back-end RLS traffic all using a single TCP connection, which meant
>>> all the back-end presence requests were being handled by the same
>>> Kamailio process, which caused a bottleneck (using UDP causes a
>>> different set of problems under load and isn't really an option).
>>> Although the queue is a FIFO the fact that different processes could
>>> take different amounts of time means that things were happening out
>>> of order (Klaus and Anca have had a discussion about just this kind
>>> of issue with presence on the mailing list recently) and this is
>>> causing me problems.
>>>
>>> What I have now (above) is presence requests being pulled from the
>>> TCP buffer and suspended as quickly as possible. A presenceMaster
>>> process then dequeues the request (continues it), performs some
>>> analysis to determine which worker should deal with it, and then
>>> suspends it again queuing it for the right worker. All of this works
>>> up until the t_continue() for the worker (in the
>>> PRESENCE_WORKER_PROCESS) is called. At this point the transaction is
>>> killed.
>>>
>>> What I can't understand is why the first t_suspend()/t_continue()
>>> works here, but the second fails. My previous version of this (with
>>> the single queue and single t_suspend()/t_continue() call) worked
>>> fine, but it seems that the sequence of t_suspend(), t_continue(),
>>> t_suspend(), t_continue() - with no changes to or handling of the
>>> request in-between - fails.
>>>
>>> Thanks,
>>>
>>> Peter
>>>
>>>
>>> On Wed, 2012-03-28 at 15:13 +0200, Miklos Tirpak wrote:
>>>> Peter,
>>>>
>>>> t_suspend() and t_continue() should work multiple times as long as they
>>>> are executed sequentially after each other, i.e. there cannot be two
>>>> branches suspended at the same time.
>>>>
>>>> The error you get means to me that t_continue() executed the specified
>>>> route block, but in that route, the request was neither replied nor a
>>>> new branch was added. Hence, the transaction is hanging in memory and
>>>> the module sees no pending branch that could return a reply later.
>>>>
>>>> Make sure that in the route block executed by t_continue() there is
>>>> either a t_reply() or you append a new branch and forward it with
>>>> t_relay() (or append a new branch and call t_suspend() again). I think
>>>> you also need to handle the failure of t_relay() and explicitly call
>>>> t_reply() when t_relay() fails in this route.
>>>>
>>>> Regards,
>>>> Miklos
>>>>
>>>> On 03/28/2012 02:21 PM, Daniel-Constantin Mierla wrote:
>>>> >  Hello,
>>>> >
>>>> >  I have been using it only once and didn't looked much deeper into the code.
>>>> >
>>>> >  Maybe Miklos (cc-ed) can give faster more details, afaik he is the
>>>> >  developer of that piece.
>>>> >
>>>> >  Cheers,
>>>> >  Daniel
>>>> >
>>>> >  On 3/28/12 1:13 PM, Peter Dunkley wrote:
>>>> >>  Hi,
>>>> >>
>>>> >>  I am trying to use t_suspend()/t_continue() multiple times on the same
>>>> >>  transaction. Calling t_suspend() more than once works, but the second
>>>> >>  time I call t_continue() the transaction is killed and a 500 response
>>>> >>  is sent. It is the"if (branch == t->nr_of_outgoings)"  check from the
>>>> >>  code fragment below (from t_suspend.c:t_continue()) that results in
>>>> >>  the transaction being killed - you can see the debug/error line I
>>>> >>  added to determine this in the fragment.
>>>> >>
>>>> >>  Is using t_suspend()/t_continue() multiple times something that should
>>>> >>  work?
>>>> >>
>>>> >>  Thanks,
>>>> >>
>>>> >>  Peter
>>>> >>
>>>> >>  if (t->uas.status<  200) {
>>>> >>  /* No final reply has been sent yet.
>>>> >>  * Check whether or not there is any pending branch.
>>>> >>  */
>>>> >>  for ( branch = 0;
>>>> >>  branch<  t->nr_of_outgoings;
>>>> >>  branch++
>>>> >>  ) {
>>>> >>  if ((t->uac[branch].request.buffer != NULL)
>>>> >>  &&  (t->uac[branch].last_received<  200)
>>>> >>  )
>>>> >>  break;
>>>> >>  }
>>>> >>
>>>> >>  if (branch == t->nr_of_outgoings) {
>>>> >>  /* There is not any open branch so there is
>>>> >>  * no chance that a final response will be received. */
>>>> >>  ret = 0;
>>>> >>  LM_ERR("branch == t->nr_of_outgoings\n");
>>>> >>  goto kill_trans;
>>>> >>  }
>>>> >>  }
>>>> >>
>>>> >>  --
>>>> >>  Peter Dunkley
>>>> >>  Technical Director
>>>> >>  Crocodile RCS Ltd
>>>> >>
>>>> >>
>>>> >>
>>>> >>  _______________________________________________
>>>> >>  sr-dev mailing list
>>>> >>  sr-dev@lists.sip-router.org  <mailto:sr-dev@lists.sip-router.org>
>>>> >>  http://lists.sip-router.org/cgi-bin/mailman/listinfo/sr-dev
>>>> >
>>>> >  --
>>>> >  Daniel-Constantin Mierla
>>>> >  Kamailio Advanced Training, April 23-26, 2012, Berlin, Germany
>>>> >  http://www.asipto.com/index.php/kamailio-advanced-training/
>>>> >
>>>
>>> _______________________________________________
>>> sr-dev mailing list
>>> sr-dev@lists.sip-router.org  <mailto:sr-dev@lists.sip-router.org>
>>> http://lists.sip-router.org/cgi-bin/mailman/listinfo/sr-dev
>>>
>> _______________________________________________
>> sr-dev mailing list
>> sr-dev@lists.sip-router.org  <mailto:sr-dev@lists.sip-router.org>
>> http://lists.sip-router.org/cgi-bin/mailman/listinfo/sr-dev
>>
> --
> Peter Dunkley
> Technical Director
> Crocodile RCS Ltd
>

-- 
Peter Dunkley
Technical Director
Crocodile RCS Ltd