[sr-dev] ASYNC module question

Peter Dunkley peter.dunkley at crocodile-rcs.com
Fri Mar 16 12:33:15 CET 2012


Hi,

As part of some work to get the maximum performance out of Kamailio RLS
and Presence I would like to be able to hand-off processing of presence
requests (NOTIFY, PUBLISH, and SUBSCRIBE) to some worker processes.
This way, even if large numbers of presence requests are received on a
single TCP connection, all that the Kamailio process managing that
connection has to do is to de-packetise the requests and hand them off.

I have been looking at the ASYNC module and it doesn't seem to do quite
the right thing for this.  The issue is the sleep parameter.  I want my
worker processes to process presence requests continuously from a queue,
not suspend them for a second or more and then process them.  Is there a
simple change to ASYNC to get it to do this?

I currently do what I need in a quite different way (see config fragment
below), but I think using ASYNC would be cleaner if it did what I
needed...

        ...
        # ----- mqueue params -----
        modparam("mqueue", "mqueue", "name=presence")
        
        # ----- rtimer params -----
        modparam("rtimer", "timer", "name=presence0;interval=1;mode=1;")
        modparam("rtimer", "exec", "timer=presence0;route=PRESENCE_PROCESS")
        modparam("rtimer", "timer", "name=presence1;interval=1;mode=1;")
        modparam("rtimer", "exec", "timer=presence1;route=PRESENCE_PROCESS")
        ...
        modparam("rtimer", "timer", "name=presence7;interval=1;mode=1;")
        modparam("rtimer", "exec", "timer=presence7;route=PRESENCE_PROCESS")
        ...
                if (@event!="presence" && @event!="presence.winfo") {
                        $var(ev_type) = @event;
                        xlog("L_INFO", "  $rm $var(ev_type) not a presence message for myself\n");
                        return;
                }
        
                if (!t_suspend()) {
                        t_reply("500", "Server Internal Error");
                        xlog("L_ERR", "Failed to suspend transaction for $rm\n");
                        exit;
                }
        
                xlog("L_INFO", "Suspended transaction for $rm [$T(id_index):$T(id_label)]\n");
        
                if (!mq_add("presence", "$T(id_index)", "$T(id_label)")) {
                        t_reply("500", "Server Internal Error");
                        xlog("L_ERR", "Failed to queue transaction for $rm [$T(id_index):$T(id_label)]\n");
                        exit;
                }
                exit;
        ...
        route[PRESENCE_PROCESS] {
                lock("pres");
                $var(pres) = $shv(pres);
                $shv(pres) = $shv(pres) + 1;
                unlock("pres");
        
                xlog("L_WARN", "Starting presence de-queue process $var(pres) (pid: $pp)\n");
        
                while (1) {
                        while (mq_fetch("presence")) {
                                $var(id_index) = (int) $mqk(presence);
                                $var(id_label) = (int) $mqv(presence);
                                xlog("L_INFO", "Found queued presence transaction [$var(id_index):$var(id_label)]\n");
                                t_continue("$var(id_index)", "$var(id_label)", "PRESENCE");
                        }
                        usleep(100000);
                }
        }
        
        route[PRESENCE] {
                xlog("L_INFO", "$rm: route[PRESENCE] process $var(pres)\n");
        
                if (is_method("NOTIFY")) {
                        xlog("L_INFO", "Sending NOTIFY to RLS\n");
                        rls_handle_notify();
                } else if (is_method("PUBLISH")) {
                        xlog("L_INFO", "Sending PUBLISH to Presence\n");
                        handle_publish();
                } else if (is_method("SUBSCRIBE")) {
                        xlog("L_INFO", "Sending SUBSCRIBE to RLS\n");
                        $var(ret_code) = rls_handle_subscribe();
                        if ($var(ret_code) == 10) {
                                xlog("L_INFO", "  SUBSCRIBE not for RLS - sending to Presence\n");
                                handle_subscribe();
                        }
                } else {
                        xlog("L_ERR", "Received non-(NOTIFY|SUBSCRIBE) request from presence queue\n");
                        t_reply("500", "Server Internal Error");
                }
                exit;
        }


Thanks,

Peter

-- 
Peter Dunkley
Technical Director
Crocodile RCS Ltd
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.sip-router.org/pipermail/sr-dev/attachments/20120316/c703ac91/attachment.htm>


More information about the sr-dev mailing list