#### Type Of Change - [X] New feature (non-breaking change which adds new functionality)
#### Checklist: - [ ] PR should be backported to stable branches - [X] Tested changes locally
#### Description - new ms_timer parameter to enable millisecond precision timer - new async_ms_route and async_ms_sleep functions with milliseconds as a param - implementation: Each async_ms_sleep adds an entry to a linked list sorted by expiry time. List is checked every ms_timer ms for expired entries. All expired entries are pushed for execution on a pool of async workers.
This allows easy implementation of request traffic shaping with an external leaky bucket microservice Assuring a maximum rate with no dropped requests by introducing delay.
You can view, comment on, or merge this pull request online at:
https://github.com/kamailio/kamailio/pull/2016
-- Commit Summary --
* async: added support for millisecond resolution sleep
-- File Changes --
M src/modules/async/async_mod.c (134) M src/modules/async/async_sleep.c (210) M src/modules/async/async_sleep.h (8) M src/modules/async/doc/async_admin.xml (113)
-- Patch Links --
https://github.com/kamailio/kamailio/pull/2016.patch https://github.com/kamailio/kamailio/pull/2016.diff
henningw commented on this pull request.
Thank you for the pull request! Generally all looks fine. I've had a few questions regarding the memory management in the error cases (I only looked through the code, did not tested it).
sizeof(struct async_ms_list));
+ if(_async_ms_list == NULL) { + LM_ERR("no more shm\n"); + return -1; + } + memset(_async_ms_list, 0, sizeof(struct async_ms_list)); + if(lock_init(&_async_ms_list->lock) == 0) { + LM_ERR("cannot init lock \n"); + shm_free(_async_ms_list); + _async_ms_list = 0; + return -1; + } + return 0; +} + +int async_destroy_ms_timer_list(void)
It is not necessary to shm_free the allocated _async_ms_list as well here?
return -1;
+ } + dsize = sizeof(async_task_t) + sizeof(async_task_param_t) + sizeof(async_ms_item_t); + + at = (async_task_t *)shm_malloc(dsize); + if(at == NULL) { + LM_ERR("no more shm memory\n"); + return -1; + } + memset(at, 0, dsize); + at->param = (char *)at + sizeof(async_task_t); + atp = (async_task_param_t *)at->param; + ai = (async_ms_item_t *) ((char *)at + sizeof(async_task_t) + sizeof(async_task_param_t)); + ai->at = at; + + if(cbname && cbname->len>=ASYNC_CBNAME_SIZE-1) {
Is here a shm_free for "at" missing?
return -1;
+ } + memset(at, 0, dsize); + at->param = (char *)at + sizeof(async_task_t); + atp = (async_task_param_t *)at->param; + ai = (async_ms_item_t *) ((char *)at + sizeof(async_task_t) + sizeof(async_task_param_t)); + ai->at = at; + + if(cbname && cbname->len>=ASYNC_CBNAME_SIZE-1) { + LM_ERR("callback name is too long: %.*s\n", cbname->len, cbname->s); + return -1; + } + + t = tmb.t_gett(); + if(t == NULL || t == T_UNDEFINED) { + if(tmb.t_newtran(msg) < 0) {
Same as above
- ai = (async_ms_item_t *) ((char *)at + sizeof(async_task_t) + sizeof(async_task_param_t));
+ ai->at = at; + + if(cbname && cbname->len>=ASYNC_CBNAME_SIZE-1) { + LM_ERR("callback name is too long: %.*s\n", cbname->len, cbname->s); + return -1; + } + + t = tmb.t_gett(); + if(t == NULL || t == T_UNDEFINED) { + if(tmb.t_newtran(msg) < 0) { + LM_ERR("cannot create the transaction\n"); + return -1; + } + t = tmb.t_gett(); + if(t == NULL || t == T_UNDEFINED) {
Same as above
- t = tmb.t_gett();
+ if(t == NULL || t == T_UNDEFINED) { + if(tmb.t_newtran(msg) < 0) { + LM_ERR("cannot create the transaction\n"); + return -1; + } + t = tmb.t_gett(); + if(t == NULL || t == T_UNDEFINED) { + LM_ERR("cannot lookup the transaction\n"); + return -1; + } + } + + if(tmb.t_suspend(msg, &tindex, &tlabel) < 0) { + LM_ERR("failed to suspend the processing\n"); + shm_free(ai);
You free here the "ai" structure - where is this memory actually allocated? And again, what about "at"?
urtho commented on this pull request.
sizeof(struct async_ms_list));
+ if(_async_ms_list == NULL) { + LM_ERR("no more shm\n"); + return -1; + } + memset(_async_ms_list, 0, sizeof(struct async_ms_list)); + if(lock_init(&_async_ms_list->lock) == 0) { + LM_ERR("cannot init lock \n"); + shm_free(_async_ms_list); + _async_ms_list = 0; + return -1; + } + return 0; +} + +int async_destroy_ms_timer_list(void)
Good catch. Fixing now.
urtho commented on this pull request.
return -1;
+ } + dsize = sizeof(async_task_t) + sizeof(async_task_param_t) + sizeof(async_ms_item_t); + + at = (async_task_t *)shm_malloc(dsize); + if(at == NULL) { + LM_ERR("no more shm memory\n"); + return -1; + } + memset(at, 0, dsize); + at->param = (char *)at + sizeof(async_task_t); + atp = (async_task_param_t *)at->param; + ai = (async_ms_item_t *) ((char *)at + sizeof(async_task_t) + sizeof(async_task_param_t)); + ai->at = at; + + if(cbname && cbname->len>=ASYNC_CBNAME_SIZE-1) {
The allocation is done too early. Moving it past all the input and transaction validation.
urtho commented on this pull request.
return -1;
+ } + memset(at, 0, dsize); + at->param = (char *)at + sizeof(async_task_t); + atp = (async_task_param_t *)at->param; + ai = (async_ms_item_t *) ((char *)at + sizeof(async_task_t) + sizeof(async_task_param_t)); + ai->at = at; + + if(cbname && cbname->len>=ASYNC_CBNAME_SIZE-1) { + LM_ERR("callback name is too long: %.*s\n", cbname->len, cbname->s); + return -1; + } + + t = tmb.t_gett(); + if(t == NULL || t == T_UNDEFINED) { + if(tmb.t_newtran(msg) < 0) {
The allocation is done too early. Moving it past all the input and transaction validation.
urtho commented on this pull request.
- ai = (async_ms_item_t *) ((char *)at + sizeof(async_task_t) + sizeof(async_task_param_t));
+ ai->at = at; + + if(cbname && cbname->len>=ASYNC_CBNAME_SIZE-1) { + LM_ERR("callback name is too long: %.*s\n", cbname->len, cbname->s); + return -1; + } + + t = tmb.t_gett(); + if(t == NULL || t == T_UNDEFINED) { + if(tmb.t_newtran(msg) < 0) { + LM_ERR("cannot create the transaction\n"); + return -1; + } + t = tmb.t_gett(); + if(t == NULL || t == T_UNDEFINED) {
The allocation is done too early. Moving it past all the input and transaction validation.
urtho commented on this pull request.
- t = tmb.t_gett();
+ if(t == NULL || t == T_UNDEFINED) { + if(tmb.t_newtran(msg) < 0) { + LM_ERR("cannot create the transaction\n"); + return -1; + } + t = tmb.t_gett(); + if(t == NULL || t == T_UNDEFINED) { + LM_ERR("cannot lookup the transaction\n"); + return -1; + } + } + + if(tmb.t_suspend(msg, &tindex, &tlabel) < 0) { + LM_ERR("failed to suspend the processing\n"); + shm_free(ai);
The allocation is done too early. Moving it past all the input and transaction validation.
I have all the fixes. Do I open a new pull req or push changes to this one with the same commit message ?
Thanks, just push the changes to this pull request.
@urtho pushed 1 commit.
96c8fe5a8e66a70649d69bc98f210050c59df4bd sync: added support for millisecond resolution sleep
Merged #2016 into master.
Merged. If they are further changes, they can be done directly in master.