[sr-dev] Using t_suspend()/t_continue() multiple times on the same transaction

Miklos Tirpak miklos at iptel.org
Wed Mar 28 16:20:27 CEST 2012


On 03/28/2012 03:44 PM, Peter Dunkley wrote:
> Hi,
>
> I am not relaying or replying to messages directly here - except in the
> error case. I am using the t_suspend()/t_continue() along with the
> presence and RLS APIs. So what I have is the following:
>
>     #!substdef"!PRESENCE_PROCESS_SLEEP!100000!g"
>     modparam("mqueue","mqueue","name=presence")
>     modparam("rtimer","timer","name=presenceMaster;interval=PRESENCE_PROCESS_SLEEPu;mode=1;")
>     modparam("rtimer","exec","timer=presenceMaster;route=PRESENCE_MASTER_PROCESS")
>     modparam("mqueue","mqueue","name=presenceWorker0")
>     modparam("rtimer","timer","name=presenceWorker0;interval=1u;mode=1;")
>     modparam("rtimer","exec","timer=presenceWorker0;route=PRESENCE_WORKER_PROCESS")
>     modparam("mqueue","mqueue","name=presenceWorker1")
>     modparam("rtimer","timer","name=presenceWorker1;interval=1u;mode=1;")
>     modparam("rtimer","exec","timer=presenceWorker1;route=PRESENCE_WORKER_PROCESS")
>     ...
>     modparam("mqueue","mqueue","name=presenceWorkern")
>     modparam("rtimer","timer","name=presenceWorkern;interval=1u;mode=1;")
>     modparam("rtimer","exec","timer=presenceWorkern;route=PRESENCE_WORKER_PROCESS")
>     ...
>     route {
>     	...
>     	# Some logic to determine this is a presence request (within or without dialog)
>     	$var(queue) ="presence";
>     	route(PRESENCE_ENQUEUE);
>     	...
>     }
>     ...
>     route[PRESENCE_ENQUEUE] {
>                     xlog("L_WARN","$rm: route[PRESENCE_ENQUEUE]\n");
>
>                     if (!t_suspend()) {
>                                     t_reply("500","Server Internal Error");
>                                     xlog("L_ERR","Failed to suspend transaction for $rm\n");
>                                     exit;
>                     }
>
>                     xlog("L_INFO","Suspended transaction for $rm [$T(id_index):$T(id_label)]\n");
>
>                     if (!mq_add("$var(queue)","$T(id_index)","$T(id_label)")) {
>                                     t_reply("500","Server Internal Error");
>                                     xlog("L_ERR","Failed to queue transaction for $rm [$T(id_index):$T(id_label)] on $var(queue)\n");
>                                     exit;
>                     }
>
>                     exit;
>     }
>
>     route[PRESENCE_MASTER_PROCESS] {
>                     xlog("L_INFO","Running PRESENCE_MASTER_PROCESS\n");
>
>                     while (mq_fetch("presence")) {
>                                     $var(id_index) = (int) $mqk(presence);
>                                     $var(id_label) = (int) $mqv(presence);
>                                     xlog("L_INFO","presence: Found queued transaction [$var(id_index):$var(id_label)]\n");
>                                     t_continue("$var(id_index)","$var(id_label)","PRESENCE_DISTRIBUTE");
>                     }
>     }
>
>     route[PRESENCE_DISTRIBUTE] {
>                     xlog("L_WARN","$rm: route[PRESENCE_DISTRIBUTE]\n");
>
>                     # Some algorithm to distribute traffic across queues...
>     	# Perhaps on request type (so we have a NOTIFIER that
>     	# sends NOTIFY requests in order), perhaps on some form
>     	# of hash...    I am experimenting with this...
>                     $var(queue) ="presenceWorker"  + $var(queue_number);
>                     xlog("L_INFO","Adding to queue: $var(queue)\n");

Please try to add a new branch here explicitly with:

append_branch();

Regards,
Miklos

>                     route(PRESENCE_ENQUEUE);
>     }
>
>     route[PRESENCE_WORKER_PROCESS] {
>                     lock("pres");
>                     $var(pres) = $shv(pres);
>                     $shv(pres) = $shv(pres) + 1;
>                     unlock("pres");
>
>                     $var(queue) ="presenceWorker"  + $var(pres);
>                     xlog("L_WARN","Starting process: $var(queue) (pid: $pp)\n");
>
>                     while (1) {
>                                     while (mq_fetch($var(queue))) {
>                                                     $var(id_index) = (int) $mqk($var(queue));
>                                                     $var(id_label) = (int) $mqv($var(queue));
>                                                     xlog("L_WARN","$var(queue): found queued transaction [$var(id_index):$var(id_label)]\n");
>                                                     t_continue("$var(id_index)","$var(id_label)","PRESENCE");
>                                     }
>                                     usleep(PRESENCE_PROCESS_SLEEP);
>                     }
>     }
>
>     route[PRESENCE] {
>                     xlog("L_WARN","$rm: route[PRESENCE]: $var(queue)\n");
>
>                     if (is_method("NOTIFY")) {
>                                     xlog("L_INFO","Sending NOTIFY to RLS\n");
>                                     rls_handle_notify();
>                     } else if (is_method("PUBLISH")) {
>                                     xlog("L_INFO","Sending PUBLISH to Presence\n");
>                                     handle_publish();
>                     } else if (is_method("SUBSCRIBE")) {
>                                     xlog("L_INFO","Sending SUBSCRIBE to RLS\n");
>                                     $var(ret_code) = rls_handle_subscribe();
>                                     if ($var(ret_code) == 10) {
>                                                     xlog("L_INFO","    SUBSCRIBE not for RLS - sending to Presence\n");
>                                                     handle_subscribe();
>                                     }
>                     } else {
>                                     xlog("L_ERR","Received non-(NOTIFY|PUBLISH|SUBSCRIBE) request from presence queue\n");
>                                     t_reply("500","Server Internal Error");
>                     }
>     	exit;
>     }
>     ...
>
>
> Previously, I had just a single queue "presence" which all of the
> presence worker process took requests from. This meant that
> t_suspend()/t_continue() was used just once and this worked (the
> presence/RLS APIs respond to the requests statefully). The reason for
> doing this in the first place is that I was getting problems with
> back-end RLS traffic all using a single TCP connection, which meant all
> the back-end presence requests were being handled by the same Kamailio
> process, which caused a bottleneck (using UDP causes a different set of
> problems under load and isn't really an option). Although the queue is a
> FIFO the fact that different processes could take different amounts of
> time means that things were happening out of order (Klaus and Anca have
> had a discussion about just this kind of issue with presence on the
> mailing list recently) and this is causing me problems.
>
> What I have now (above) is presence requests being pulled from the TCP
> buffer and suspended as quickly as possible. A presenceMaster process
> then dequeues the request (continues it), performs some analysis to
> determine which worker should deal with it, and then suspends it again
> queuing it for the right worker. All of this works up until the
> t_continue() for the worker (in the PRESENCE_WORKER_PROCESS) is called.
> At this point the transaction is killed.
>
> What I can't understand is why the first t_suspend()/t_continue() works
> here, but the second fails. My previous version of this (with the single
> queue and single t_suspend()/t_continue() call) worked fine, but it
> seems that the sequence of t_suspend(), t_continue(), t_suspend(),
> t_continue() - with no changes to or handling of the request in-between
> - fails.
>
> Thanks,
>
> Peter
>
>
> On Wed, 2012-03-28 at 15:13 +0200, Miklos Tirpak wrote:
>> Peter,
>>
>> t_suspend() and t_continue() should work multiple times as long as they
>> are executed sequentially after each other, i.e. there cannot be two
>> branches suspended at the same time.
>>
>> The error you get means to me that t_continue() executed the specified
>> route block, but in that route, the request was neither replied nor a
>> new branch was added. Hence, the transaction is hanging in memory and
>> the module sees no pending branch that could return a reply later.
>>
>> Make sure that in the route block executed by t_continue() there is
>> either a t_reply() or you append a new branch and forward it with
>> t_relay() (or append a new branch and call t_suspend() again). I think
>> you also need to handle the failure of t_relay() and explicitly call
>> t_reply() when t_relay() fails in this route.
>>
>> Regards,
>> Miklos
>>
>> On 03/28/2012 02:21 PM, Daniel-Constantin Mierla wrote:
>> >  Hello,
>> >
>> >  I have been using it only once and didn't looked much deeper into the code.
>> >
>> >  Maybe Miklos (cc-ed) can give faster more details, afaik he is the
>> >  developer of that piece.
>> >
>> >  Cheers,
>> >  Daniel
>> >
>> >  On 3/28/12 1:13 PM, Peter Dunkley wrote:
>> >>  Hi,
>> >>
>> >>  I am trying to use t_suspend()/t_continue() multiple times on the same
>> >>  transaction. Calling t_suspend() more than once works, but the second
>> >>  time I call t_continue() the transaction is killed and a 500 response
>> >>  is sent. It is the"if (branch == t->nr_of_outgoings)"  check from the
>> >>  code fragment below (from t_suspend.c:t_continue()) that results in
>> >>  the transaction being killed - you can see the debug/error line I
>> >>  added to determine this in the fragment.
>> >>
>> >>  Is using t_suspend()/t_continue() multiple times something that should
>> >>  work?
>> >>
>> >>  Thanks,
>> >>
>> >>  Peter
>> >>
>> >>  if (t->uas.status<  200) {
>> >>  /* No final reply has been sent yet.
>> >>  * Check whether or not there is any pending branch.
>> >>  */
>> >>  for ( branch = 0;
>> >>  branch<  t->nr_of_outgoings;
>> >>  branch++
>> >>  ) {
>> >>  if ((t->uac[branch].request.buffer != NULL)
>> >>  &&  (t->uac[branch].last_received<  200)
>> >>  )
>> >>  break;
>> >>  }
>> >>
>> >>  if (branch == t->nr_of_outgoings) {
>> >>  /* There is not any open branch so there is
>> >>  * no chance that a final response will be received. */
>> >>  ret = 0;
>> >>  LM_ERR("branch == t->nr_of_outgoings\n");
>> >>  goto kill_trans;
>> >>  }
>> >>  }
>> >>
>> >>  --
>> >>  Peter Dunkley
>> >>  Technical Director
>> >>  Crocodile RCS Ltd
>> >>
>> >>
>> >>
>> >>  _______________________________________________
>> >>  sr-dev mailing list
>> >>  sr-dev at lists.sip-router.org  <mailto:sr-dev at lists.sip-router.org>
>> >>  http://lists.sip-router.org/cgi-bin/mailman/listinfo/sr-dev
>> >
>> >  --
>> >  Daniel-Constantin Mierla
>> >  Kamailio Advanced Training, April 23-26, 2012, Berlin, Germany
>> >  http://www.asipto.com/index.php/kamailio-advanced-training/
>> >
>
> --
> Peter Dunkley
> Technical Director
> Crocodile RCS Ltd
>



More information about the sr-dev mailing list