[sr-dev] ASYNC module question

Daniel-Constantin Mierla miconda at gmail.com
Tue Mar 20 11:16:49 CET 2012


Hello,

On 3/16/12 12:33 PM, Peter Dunkley wrote:
> Hi,
>
> As part of some work to get the maximum performance out of Kamailio 
> RLS and Presence I would like to be able to hand-off processing of 
> presence requests (NOTIFY, PUBLISH, and SUBSCRIBE) to some worker 
> processes.  This way, even if large numbers of presence requests are 
> received on a single TCP connection, all that the Kamailio process 
> managing that connection has to do is to de-packetise the requests and 
> hand them off.
>
> I have been looking at the ASYNC module and it doesn't seem to do 
> quite the right thing for this.  The issue is the sleep parameter.  I 
> want my worker processes to process presence requests continuously 
> from a queue, not suspend them for a second or more and then process 
> them.  Is there a simple change to ASYNC to get it to do this?
>
> I currently do what I need in a quite different way (see config 
> fragment below), but I think using ASYNC would be cleaner if it did 
> what I needed...
>
>     ...
>     # ----- mqueue params -----
>     modparam("mqueue", "mqueue", "name=presence")
>
>     # ----- rtimer params -----
>     modparam("rtimer", "timer", "name=presence0;interval=1;mode=1;")
>     modparam("rtimer", "exec", "timer=presence0;route=PRESENCE_PROCESS")
>     modparam("rtimer", "timer", "name=presence1;interval=1;mode=1;")
>     modparam("rtimer", "exec", "timer=presence1;route=PRESENCE_PROCESS")
>     ...
>     modparam("rtimer", "timer", "name=presence7;interval=1;mode=1;")
>     modparam("rtimer", "exec", "timer=presence7;route=PRESENCE_PROCESS")
>     ...
>              if (@event!="presence"&&  @event!="presence.winfo") {
>                      $var(ev_type) = @event;
>                      xlog("L_INFO", "  $rm $var(ev_type) not a presence message for myself\n");
>                      return;
>              }
>
>              if (!t_suspend()) {
>                      t_reply("500", "Server Internal Error");
>                      xlog("L_ERR", "Failed to suspend transaction for $rm\n");
>                      exit;
>              }
>
>              xlog("L_INFO", "Suspended transaction for $rm [$T(id_index):$T(id_label)]\n");
>
>              if (!mq_add("presence", "$T(id_index)", "$T(id_label)")) {
>                      t_reply("500", "Server Internal Error");
>                      xlog("L_ERR", "Failed to queue transaction for $rm [$T(id_index):$T(id_label)]\n");
>                      exit;
>              }
>              exit;
>     ...
>     route[PRESENCE_PROCESS] {
>              lock("pres");
>              $var(pres) = $shv(pres);
>              $shv(pres) = $shv(pres) + 1;
>              unlock("pres");
>
>              xlog("L_WARN", "Starting presence de-queue process $var(pres) (pid: $pp)\n");
>
>              while (1) {
>                      while (mq_fetch("presence")) {
>                              $var(id_index) = (int) $mqk(presence);
>                              $var(id_label) = (int) $mqv(presence);
>                              xlog("L_INFO", "Found queued presence transaction [$var(id_index):$var(id_label)]\n");
>                              t_continue("$var(id_index)", "$var(id_label)", "PRESENCE");
>                      }
>                      usleep(100000);
>              }
>     }
>
>     route[PRESENCE] {
>              xlog("L_INFO", "$rm: route[PRESENCE] process $var(pres)\n");
>
>              if (is_method("NOTIFY")) {
>                      xlog("L_INFO", "Sending NOTIFY to RLS\n");
>                      rls_handle_notify();
>              } else if (is_method("PUBLISH")) {
>                      xlog("L_INFO", "Sending PUBLISH to Presence\n");
>                      handle_publish();
>              } else if (is_method("SUBSCRIBE")) {
>                      xlog("L_INFO", "Sending SUBSCRIBE to RLS\n");
>                      $var(ret_code) = rls_handle_subscribe();
>                      if ($var(ret_code) == 10) {
>                              xlog("L_INFO", "  SUBSCRIBE not for RLS - sending to Presence\n");
>                              handle_subscribe();
>                      }
>              } else {
>                      xlog("L_ERR", "Received non-(NOTIFY|SUBSCRIBE) request from presence queue\n");
>                      t_reply("500", "Server Internal Error");
>              }
>              exit;
>     }
>
>
just for the records, rtimer can do now micro-second timers -- that 
should lower waiting time for your example.

It would be good to have also an 'immediate' execution of a route in 
async module, kind of embedding the above config functionality -- having 
a list of tasks in shared memory (id_index, id_label) and a pool of 
workers checking and consuming it -- it will make config simpler for 
such needs, otherwise, the functionality can be achieved right now 
pretty much completely just via config.

Cheers,
Daniel

-- 
Daniel-Constantin Mierla
Kamailio Advanced Training, April 23-26, 2012, Berlin, Germany
http://www.asipto.com/index.php/kamailio-advanced-training/

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.sip-router.org/pipermail/sr-dev/attachments/20120320/eae5e124/attachment.htm>


More information about the sr-dev mailing list