Hi,
As part of some work to get the maximum performance out of Kamailio RLS and Presence I would like to be able to hand-off processing of presence requests (NOTIFY, PUBLISH, and SUBSCRIBE) to some worker processes. This way, even if large numbers of presence requests are received on a single TCP connection, all that the Kamailio process managing that connection has to do is to de-packetise the requests and hand them off.
I have been looking at the ASYNC module and it doesn't seem to do quite the right thing for this. The issue is the sleep parameter. I want my worker processes to process presence requests continuously from a queue, not suspend them for a second or more and then process them. Is there a simple change to ASYNC to get it to do this?
I currently do what I need in a quite different way (see config fragment below), but I think using ASYNC would be cleaner if it did what I needed...
... # ----- mqueue params ----- modparam("mqueue", "mqueue", "name=presence")
# ----- rtimer params ----- modparam("rtimer", "timer", "name=presence0;interval=1;mode=1;") modparam("rtimer", "exec", "timer=presence0;route=PRESENCE_PROCESS") modparam("rtimer", "timer", "name=presence1;interval=1;mode=1;") modparam("rtimer", "exec", "timer=presence1;route=PRESENCE_PROCESS") ... modparam("rtimer", "timer", "name=presence7;interval=1;mode=1;") modparam("rtimer", "exec", "timer=presence7;route=PRESENCE_PROCESS") ... if (@event!="presence" && @event!="presence.winfo") { $var(ev_type) = @event; xlog("L_INFO", " $rm $var(ev_type) not a presence message for myself\n"); return; }
if (!t_suspend()) { t_reply("500", "Server Internal Error"); xlog("L_ERR", "Failed to suspend transaction for $rm\n"); exit; }
xlog("L_INFO", "Suspended transaction for $rm [$T(id_index):$T(id_label)]\n");
if (!mq_add("presence", "$T(id_index)", "$T(id_label)")) { t_reply("500", "Server Internal Error"); xlog("L_ERR", "Failed to queue transaction for $rm [$T(id_index):$T(id_label)]\n"); exit; } exit; ... route[PRESENCE_PROCESS] { lock("pres"); $var(pres) = $shv(pres); $shv(pres) = $shv(pres) + 1; unlock("pres");
xlog("L_WARN", "Starting presence de-queue process $var(pres) (pid: $pp)\n");
while (1) { while (mq_fetch("presence")) { $var(id_index) = (int) $mqk(presence); $var(id_label) = (int) $mqv(presence); xlog("L_INFO", "Found queued presence transaction [$var(id_index):$var(id_label)]\n"); t_continue("$var(id_index)", "$var(id_label)", "PRESENCE"); } usleep(100000); } }
route[PRESENCE] { xlog("L_INFO", "$rm: route[PRESENCE] process $var(pres)\n");
if (is_method("NOTIFY")) { xlog("L_INFO", "Sending NOTIFY to RLS\n"); rls_handle_notify(); } else if (is_method("PUBLISH")) { xlog("L_INFO", "Sending PUBLISH to Presence\n"); handle_publish(); } else if (is_method("SUBSCRIBE")) { xlog("L_INFO", "Sending SUBSCRIBE to RLS\n"); $var(ret_code) = rls_handle_subscribe(); if ($var(ret_code) == 10) { xlog("L_INFO", " SUBSCRIBE not for RLS - sending to Presence\n"); handle_subscribe(); } } else { xlog("L_ERR", "Received non-(NOTIFY|SUBSCRIBE) request from presence queue\n"); t_reply("500", "Server Internal Error"); } exit; }
Thanks,
Peter
Hello,
On 3/16/12 12:33 PM, Peter Dunkley wrote:
Hi,
As part of some work to get the maximum performance out of Kamailio RLS and Presence I would like to be able to hand-off processing of presence requests (NOTIFY, PUBLISH, and SUBSCRIBE) to some worker processes. This way, even if large numbers of presence requests are received on a single TCP connection, all that the Kamailio process managing that connection has to do is to de-packetise the requests and hand them off.
I have been looking at the ASYNC module and it doesn't seem to do quite the right thing for this. The issue is the sleep parameter. I want my worker processes to process presence requests continuously from a queue, not suspend them for a second or more and then process them. Is there a simple change to ASYNC to get it to do this?
I currently do what I need in a quite different way (see config fragment below), but I think using ASYNC would be cleaner if it did what I needed...
... # ----- mqueue params ----- modparam("mqueue", "mqueue", "name=presence") # ----- rtimer params ----- modparam("rtimer", "timer", "name=presence0;interval=1;mode=1;") modparam("rtimer", "exec", "timer=presence0;route=PRESENCE_PROCESS") modparam("rtimer", "timer", "name=presence1;interval=1;mode=1;") modparam("rtimer", "exec", "timer=presence1;route=PRESENCE_PROCESS") ... modparam("rtimer", "timer", "name=presence7;interval=1;mode=1;") modparam("rtimer", "exec", "timer=presence7;route=PRESENCE_PROCESS") ... if (@event!="presence"&& @event!="presence.winfo") { $var(ev_type) = @event; xlog("L_INFO", " $rm $var(ev_type) not a presence message for myself\n"); return; } if (!t_suspend()) { t_reply("500", "Server Internal Error"); xlog("L_ERR", "Failed to suspend transaction for $rm\n"); exit; } xlog("L_INFO", "Suspended transaction for $rm [$T(id_index):$T(id_label)]\n"); if (!mq_add("presence", "$T(id_index)", "$T(id_label)")) { t_reply("500", "Server Internal Error"); xlog("L_ERR", "Failed to queue transaction for $rm [$T(id_index):$T(id_label)]\n"); exit; } exit; ... route[PRESENCE_PROCESS] { lock("pres"); $var(pres) = $shv(pres); $shv(pres) = $shv(pres) + 1; unlock("pres"); xlog("L_WARN", "Starting presence de-queue process $var(pres) (pid: $pp)\n"); while (1) { while (mq_fetch("presence")) { $var(id_index) = (int) $mqk(presence); $var(id_label) = (int) $mqv(presence); xlog("L_INFO", "Found queued presence transaction [$var(id_index):$var(id_label)]\n"); t_continue("$var(id_index)", "$var(id_label)", "PRESENCE"); } usleep(100000); } } route[PRESENCE] { xlog("L_INFO", "$rm: route[PRESENCE] process $var(pres)\n"); if (is_method("NOTIFY")) { xlog("L_INFO", "Sending NOTIFY to RLS\n"); rls_handle_notify(); } else if (is_method("PUBLISH")) { xlog("L_INFO", "Sending PUBLISH to Presence\n"); handle_publish(); } else if (is_method("SUBSCRIBE")) { xlog("L_INFO", "Sending SUBSCRIBE to RLS\n"); $var(ret_code) = rls_handle_subscribe(); if ($var(ret_code) == 10) { xlog("L_INFO", " SUBSCRIBE not for RLS - sending to Presence\n"); handle_subscribe(); } } else { xlog("L_ERR", "Received non-(NOTIFY|SUBSCRIBE) request from presence queue\n"); t_reply("500", "Server Internal Error"); } exit; }
just for the records, rtimer can do now micro-second timers -- that should lower waiting time for your example.
It would be good to have also an 'immediate' execution of a route in async module, kind of embedding the above config functionality -- having a list of tasks in shared memory (id_index, id_label) and a pool of workers checking and consuming it -- it will make config simpler for such needs, otherwise, the functionality can be achieved right now pretty much completely just via config.
Cheers, Daniel
Thanks Daniel,
I'll stick with my complex configuration file for now, but I might take a look at ASYNC sometime later if get a chance.
Thanks again,
Peter
On Tue, 2012-03-20 at 11:16 +0100, Daniel-Constantin Mierla wrote:
Hello,
On 3/16/12 12:33 PM, Peter Dunkley wrote:
Hi,
As part of some work to get the maximum performance out of Kamailio RLS and Presence I would like to be able to hand-off processing of presence requests (NOTIFY, PUBLISH, and SUBSCRIBE) to some worker processes. This way, even if large numbers of presence requests are received on a single TCP connection, all that the Kamailio process managing that connection has to do is to de-packetise the requests and hand them off.
I have been looking at the ASYNC module and it doesn't seem to do quite the right thing for this. The issue is the sleep parameter. I want my worker processes to process presence requests continuously from a queue, not suspend them for a second or more and then process them. Is there a simple change to ASYNC to get it to do this?
I currently do what I need in a quite different way (see config fragment below), but I think using ASYNC would be cleaner if it did what I needed...
... # ----- mqueue params ----- modparam("mqueue", "mqueue", "name=presence") # ----- rtimer params ----- modparam("rtimer", "timer", "name=presence0;interval=1;mode=1;") modparam("rtimer", "exec", "timer=presence0;route=PRESENCE_PROCESS") modparam("rtimer", "timer", "name=presence1;interval=1;mode=1;") modparam("rtimer", "exec", "timer=presence1;route=PRESENCE_PROCESS") ... modparam("rtimer", "timer", "name=presence7;interval=1;mode=1;") modparam("rtimer", "exec", "timer=presence7;route=PRESENCE_PROCESS") ... if (@event!="presence" && @event!="presence.winfo") { $var(ev_type) = @event; xlog("L_INFO", " $rm $var(ev_type) not a presence message for myself\n"); return; } if (!t_suspend()) { t_reply("500", "Server Internal Error"); xlog("L_ERR", "Failed to suspend transaction for $rm\n"); exit; } xlog("L_INFO", "Suspended transaction for $rm [$T(id_index):$T(id_label)]\n"); if (!mq_add("presence", "$T(id_index)", "$T(id_label)")) { t_reply("500", "Server Internal Error"); xlog("L_ERR", "Failed to queue transaction for $rm [$T(id_index):$T(id_label)]\n"); exit; } exit; ... route[PRESENCE_PROCESS] { lock("pres"); $var(pres) = $shv(pres); $shv(pres) = $shv(pres) + 1; unlock("pres"); xlog("L_WARN", "Starting presence de-queue process $var(pres) (pid: $pp)\n"); while (1) { while (mq_fetch("presence")) { $var(id_index) = (int) $mqk(presence); $var(id_label) = (int) $mqv(presence); xlog("L_INFO", "Found queued presence transaction [$var(id_index):$var(id_label)]\n"); t_continue("$var(id_index)", "$var(id_label)", "PRESENCE"); } usleep(100000); } } route[PRESENCE] { xlog("L_INFO", "$rm: route[PRESENCE] process $var(pres)\n"); if (is_method("NOTIFY")) { xlog("L_INFO", "Sending NOTIFY to RLS\n"); rls_handle_notify(); } else if (is_method("PUBLISH")) { xlog("L_INFO", "Sending PUBLISH to Presence\n"); handle_publish(); } else if (is_method("SUBSCRIBE")) { xlog("L_INFO", "Sending SUBSCRIBE to RLS\n"); $var(ret_code) = rls_handle_subscribe(); if ($var(ret_code) == 10) { xlog("L_INFO", " SUBSCRIBE not for RLS - sending to Presence\n"); handle_subscribe(); } } else { xlog("L_ERR", "Received non-(NOTIFY|SUBSCRIBE) request from presence queue\n"); t_reply("500", "Server Internal Error"); } exit; }
just for the records, rtimer can do now micro-second timers -- that should lower waiting time for your example.
It would be good to have also an 'immediate' execution of a route in async module, kind of embedding the above config functionality -- having a list of tasks in shared memory (id_index, id_label) and a pool of workers checking and consuming it -- it will make config simpler for such needs, otherwise, the functionality can be achieved right now pretty much completely just via config.
Cheers, Daniel
-- Daniel-Constantin Mierla Kamailio Advanced Training, April 23-26, 2012, Berlin, Germany http://www.asipto.com/index.php/kamailio-advanced-training/