[sr-dev] Using t_suspend()/t_continue() multiple times on the same transaction
Miklos Tirpak
miklos at iptel.org
Wed Mar 28 16:20:27 CEST 2012
On 03/28/2012 03:44 PM, Peter Dunkley wrote:
> Hi,
>
> I am not relaying or replying to messages directly here - except in the
> error case. I am using the t_suspend()/t_continue() along with the
> presence and RLS APIs. So what I have is the following:
>
> #!substdef"!PRESENCE_PROCESS_SLEEP!100000!g"
> modparam("mqueue","mqueue","name=presence")
> modparam("rtimer","timer","name=presenceMaster;interval=PRESENCE_PROCESS_SLEEPu;mode=1;")
> modparam("rtimer","exec","timer=presenceMaster;route=PRESENCE_MASTER_PROCESS")
> modparam("mqueue","mqueue","name=presenceWorker0")
> modparam("rtimer","timer","name=presenceWorker0;interval=1u;mode=1;")
> modparam("rtimer","exec","timer=presenceWorker0;route=PRESENCE_WORKER_PROCESS")
> modparam("mqueue","mqueue","name=presenceWorker1")
> modparam("rtimer","timer","name=presenceWorker1;interval=1u;mode=1;")
> modparam("rtimer","exec","timer=presenceWorker1;route=PRESENCE_WORKER_PROCESS")
> ...
> modparam("mqueue","mqueue","name=presenceWorkern")
> modparam("rtimer","timer","name=presenceWorkern;interval=1u;mode=1;")
> modparam("rtimer","exec","timer=presenceWorkern;route=PRESENCE_WORKER_PROCESS")
> ...
> route {
> ...
> # Some logic to determine this is a presence request (within or without dialog)
> $var(queue) ="presence";
> route(PRESENCE_ENQUEUE);
> ...
> }
> ...
> route[PRESENCE_ENQUEUE] {
> xlog("L_WARN","$rm: route[PRESENCE_ENQUEUE]\n");
>
> if (!t_suspend()) {
> t_reply("500","Server Internal Error");
> xlog("L_ERR","Failed to suspend transaction for $rm\n");
> exit;
> }
>
> xlog("L_INFO","Suspended transaction for $rm [$T(id_index):$T(id_label)]\n");
>
> if (!mq_add("$var(queue)","$T(id_index)","$T(id_label)")) {
> t_reply("500","Server Internal Error");
> xlog("L_ERR","Failed to queue transaction for $rm [$T(id_index):$T(id_label)] on $var(queue)\n");
> exit;
> }
>
> exit;
> }
>
> route[PRESENCE_MASTER_PROCESS] {
> xlog("L_INFO","Running PRESENCE_MASTER_PROCESS\n");
>
> while (mq_fetch("presence")) {
> $var(id_index) = (int) $mqk(presence);
> $var(id_label) = (int) $mqv(presence);
> xlog("L_INFO","presence: Found queued transaction [$var(id_index):$var(id_label)]\n");
> t_continue("$var(id_index)","$var(id_label)","PRESENCE_DISTRIBUTE");
> }
> }
>
> route[PRESENCE_DISTRIBUTE] {
> xlog("L_WARN","$rm: route[PRESENCE_DISTRIBUTE]\n");
>
> # Some algorithm to distribute traffic across queues...
> # Perhaps on request type (so we have a NOTIFIER that
> # sends NOTIFY requests in order), perhaps on some form
> # of hash... I am experimenting with this...
> $var(queue) ="presenceWorker" + $var(queue_number);
> xlog("L_INFO","Adding to queue: $var(queue)\n");
Please try to add a new branch here explicitly with:
append_branch();
Regards,
Miklos
> route(PRESENCE_ENQUEUE);
> }
>
> route[PRESENCE_WORKER_PROCESS] {
> lock("pres");
> $var(pres) = $shv(pres);
> $shv(pres) = $shv(pres) + 1;
> unlock("pres");
>
> $var(queue) ="presenceWorker" + $var(pres);
> xlog("L_WARN","Starting process: $var(queue) (pid: $pp)\n");
>
> while (1) {
> while (mq_fetch($var(queue))) {
> $var(id_index) = (int) $mqk($var(queue));
> $var(id_label) = (int) $mqv($var(queue));
> xlog("L_WARN","$var(queue): found queued transaction [$var(id_index):$var(id_label)]\n");
> t_continue("$var(id_index)","$var(id_label)","PRESENCE");
> }
> usleep(PRESENCE_PROCESS_SLEEP);
> }
> }
>
> route[PRESENCE] {
> xlog("L_WARN","$rm: route[PRESENCE]: $var(queue)\n");
>
> if (is_method("NOTIFY")) {
> xlog("L_INFO","Sending NOTIFY to RLS\n");
> rls_handle_notify();
> } else if (is_method("PUBLISH")) {
> xlog("L_INFO","Sending PUBLISH to Presence\n");
> handle_publish();
> } else if (is_method("SUBSCRIBE")) {
> xlog("L_INFO","Sending SUBSCRIBE to RLS\n");
> $var(ret_code) = rls_handle_subscribe();
> if ($var(ret_code) == 10) {
> xlog("L_INFO"," SUBSCRIBE not for RLS - sending to Presence\n");
> handle_subscribe();
> }
> } else {
> xlog("L_ERR","Received non-(NOTIFY|PUBLISH|SUBSCRIBE) request from presence queue\n");
> t_reply("500","Server Internal Error");
> }
> exit;
> }
> ...
>
>
> Previously, I had just a single queue "presence" which all of the
> presence worker process took requests from. This meant that
> t_suspend()/t_continue() was used just once and this worked (the
> presence/RLS APIs respond to the requests statefully). The reason for
> doing this in the first place is that I was getting problems with
> back-end RLS traffic all using a single TCP connection, which meant all
> the back-end presence requests were being handled by the same Kamailio
> process, which caused a bottleneck (using UDP causes a different set of
> problems under load and isn't really an option). Although the queue is a
> FIFO the fact that different processes could take different amounts of
> time means that things were happening out of order (Klaus and Anca have
> had a discussion about just this kind of issue with presence on the
> mailing list recently) and this is causing me problems.
>
> What I have now (above) is presence requests being pulled from the TCP
> buffer and suspended as quickly as possible. A presenceMaster process
> then dequeues the request (continues it), performs some analysis to
> determine which worker should deal with it, and then suspends it again
> queuing it for the right worker. All of this works up until the
> t_continue() for the worker (in the PRESENCE_WORKER_PROCESS) is called.
> At this point the transaction is killed.
>
> What I can't understand is why the first t_suspend()/t_continue() works
> here, but the second fails. My previous version of this (with the single
> queue and single t_suspend()/t_continue() call) worked fine, but it
> seems that the sequence of t_suspend(), t_continue(), t_suspend(),
> t_continue() - with no changes to or handling of the request in-between
> - fails.
>
> Thanks,
>
> Peter
>
>
> On Wed, 2012-03-28 at 15:13 +0200, Miklos Tirpak wrote:
>> Peter,
>>
>> t_suspend() and t_continue() should work multiple times as long as they
>> are executed sequentially after each other, i.e. there cannot be two
>> branches suspended at the same time.
>>
>> The error you get means to me that t_continue() executed the specified
>> route block, but in that route, the request was neither replied nor a
>> new branch was added. Hence, the transaction is hanging in memory and
>> the module sees no pending branch that could return a reply later.
>>
>> Make sure that in the route block executed by t_continue() there is
>> either a t_reply() or you append a new branch and forward it with
>> t_relay() (or append a new branch and call t_suspend() again). I think
>> you also need to handle the failure of t_relay() and explicitly call
>> t_reply() when t_relay() fails in this route.
>>
>> Regards,
>> Miklos
>>
>> On 03/28/2012 02:21 PM, Daniel-Constantin Mierla wrote:
>> > Hello,
>> >
>> > I have been using it only once and didn't looked much deeper into the code.
>> >
>> > Maybe Miklos (cc-ed) can give faster more details, afaik he is the
>> > developer of that piece.
>> >
>> > Cheers,
>> > Daniel
>> >
>> > On 3/28/12 1:13 PM, Peter Dunkley wrote:
>> >> Hi,
>> >>
>> >> I am trying to use t_suspend()/t_continue() multiple times on the same
>> >> transaction. Calling t_suspend() more than once works, but the second
>> >> time I call t_continue() the transaction is killed and a 500 response
>> >> is sent. It is the"if (branch == t->nr_of_outgoings)" check from the
>> >> code fragment below (from t_suspend.c:t_continue()) that results in
>> >> the transaction being killed - you can see the debug/error line I
>> >> added to determine this in the fragment.
>> >>
>> >> Is using t_suspend()/t_continue() multiple times something that should
>> >> work?
>> >>
>> >> Thanks,
>> >>
>> >> Peter
>> >>
>> >> if (t->uas.status< 200) {
>> >> /* No final reply has been sent yet.
>> >> * Check whether or not there is any pending branch.
>> >> */
>> >> for ( branch = 0;
>> >> branch< t->nr_of_outgoings;
>> >> branch++
>> >> ) {
>> >> if ((t->uac[branch].request.buffer != NULL)
>> >> && (t->uac[branch].last_received< 200)
>> >> )
>> >> break;
>> >> }
>> >>
>> >> if (branch == t->nr_of_outgoings) {
>> >> /* There is not any open branch so there is
>> >> * no chance that a final response will be received. */
>> >> ret = 0;
>> >> LM_ERR("branch == t->nr_of_outgoings\n");
>> >> goto kill_trans;
>> >> }
>> >> }
>> >>
>> >> --
>> >> Peter Dunkley
>> >> Technical Director
>> >> Crocodile RCS Ltd
>> >>
>> >>
>> >>
>> >> _______________________________________________
>> >> sr-dev mailing list
>> >> sr-dev at lists.sip-router.org <mailto:sr-dev at lists.sip-router.org>
>> >> http://lists.sip-router.org/cgi-bin/mailman/listinfo/sr-dev
>> >
>> > --
>> > Daniel-Constantin Mierla
>> > Kamailio Advanced Training, April 23-26, 2012, Berlin, Germany
>> > http://www.asipto.com/index.php/kamailio-advanced-training/
>> >
>
> --
> Peter Dunkley
> Technical Director
> Crocodile RCS Ltd
>
More information about the sr-dev
mailing list