Thanks Daniel,
I'll stick with my complex configuration file for now, but I might take a look at ASYNC sometime later if get a chance.
Thanks again,
Peter
On Tue, 2012-03-20 at 11:16 +0100, Daniel-Constantin Mierla wrote:
Hello,
On 3/16/12 12:33 PM, Peter Dunkley wrote:
Hi,
As part of some work to get the maximum performance out of Kamailio RLS and Presence I would like to be able to hand-off processing of presence requests (NOTIFY, PUBLISH, and SUBSCRIBE) to some worker processes. This way, even if large numbers of presence requests are received on a single TCP connection, all that the Kamailio process managing that connection has to do is to de-packetise the requests and hand them off.
I have been looking at the ASYNC module and it doesn't seem to do quite the right thing for this. The issue is the sleep parameter. I want my worker processes to process presence requests continuously from a queue, not suspend them for a second or more and then process them. Is there a simple change to ASYNC to get it to do this?
I currently do what I need in a quite different way (see config fragment below), but I think using ASYNC would be cleaner if it did what I needed...
... # ----- mqueue params ----- modparam("mqueue", "mqueue", "name=presence") # ----- rtimer params ----- modparam("rtimer", "timer", "name=presence0;interval=1;mode=1;") modparam("rtimer", "exec", "timer=presence0;route=PRESENCE_PROCESS") modparam("rtimer", "timer", "name=presence1;interval=1;mode=1;") modparam("rtimer", "exec", "timer=presence1;route=PRESENCE_PROCESS") ... modparam("rtimer", "timer", "name=presence7;interval=1;mode=1;") modparam("rtimer", "exec", "timer=presence7;route=PRESENCE_PROCESS") ... if (@event!="presence" && @event!="presence.winfo") { $var(ev_type) = @event; xlog("L_INFO", " $rm $var(ev_type) not a presence message for myself\n"); return; } if (!t_suspend()) { t_reply("500", "Server Internal Error"); xlog("L_ERR", "Failed to suspend transaction for $rm\n"); exit; } xlog("L_INFO", "Suspended transaction for $rm [$T(id_index):$T(id_label)]\n"); if (!mq_add("presence", "$T(id_index)", "$T(id_label)")) { t_reply("500", "Server Internal Error"); xlog("L_ERR", "Failed to queue transaction for $rm [$T(id_index):$T(id_label)]\n"); exit; } exit; ... route[PRESENCE_PROCESS] { lock("pres"); $var(pres) = $shv(pres); $shv(pres) = $shv(pres) + 1; unlock("pres"); xlog("L_WARN", "Starting presence de-queue process $var(pres) (pid: $pp)\n"); while (1) { while (mq_fetch("presence")) { $var(id_index) = (int) $mqk(presence); $var(id_label) = (int) $mqv(presence); xlog("L_INFO", "Found queued presence transaction [$var(id_index):$var(id_label)]\n"); t_continue("$var(id_index)", "$var(id_label)", "PRESENCE"); } usleep(100000); } } route[PRESENCE] { xlog("L_INFO", "$rm: route[PRESENCE] process $var(pres)\n"); if (is_method("NOTIFY")) { xlog("L_INFO", "Sending NOTIFY to RLS\n"); rls_handle_notify(); } else if (is_method("PUBLISH")) { xlog("L_INFO", "Sending PUBLISH to Presence\n"); handle_publish(); } else if (is_method("SUBSCRIBE")) { xlog("L_INFO", "Sending SUBSCRIBE to RLS\n"); $var(ret_code) = rls_handle_subscribe(); if ($var(ret_code) == 10) { xlog("L_INFO", " SUBSCRIBE not for RLS - sending to Presence\n"); handle_subscribe(); } } else { xlog("L_ERR", "Received non-(NOTIFY|SUBSCRIBE) request from presence queue\n"); t_reply("500", "Server Internal Error"); } exit; }
just for the records, rtimer can do now micro-second timers -- that should lower waiting time for your example.
It would be good to have also an 'immediate' execution of a route in async module, kind of embedding the above config functionality -- having a list of tasks in shared memory (id_index, id_label) and a pool of workers checking and consuming it -- it will make config simpler for such needs, otherwise, the functionality can be achieved right now pretty much completely just via config.
Cheers, Daniel
-- Daniel-Constantin Mierla Kamailio Advanced Training, April 23-26, 2012, Berlin, Germany http://www.asipto.com/index.php/kamailio-advanced-training/