Module: sip-router Branch: master Commit: 5db7f650ad0c5dd81ba5d8b670075816f9fe4f65 URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=5db7f650...
Author: Daniel-Constantin Mierla miconda@gmail.com Committer: Daniel-Constantin Mierla miconda@gmail.com Date: Tue Mar 13 14:03:52 2012 +0100
db_postgres: implemented custom REPLACE command for DB API v1
- so far it implements replace as update, if affected rows == 0, then insert - it uses locks to ensure there is no race between update and insert commands - the lock to be used is selected based on the values for the update key, this ensuring that operations over the same record are done under mutex - if number of colums to be used for update key is 0, then a straight insert without locking is done
---
modules/db_postgres/km_db_postgres.c | 1 + modules/db_postgres/km_db_postgres.h | 4 + modules/db_postgres/km_dbase.c | 120 ++++++++++++++++++++++++++++++++++ modules/db_postgres/km_dbase.h | 5 ++ modules/db_postgres/pg_mod.c | 5 ++ 5 files changed, 135 insertions(+), 0 deletions(-)
diff --git a/modules/db_postgres/km_db_postgres.c b/modules/db_postgres/km_db_postgres.c index 5914e5f..9292319 100644 --- a/modules/db_postgres/km_db_postgres.c +++ b/modules/db_postgres/km_db_postgres.c @@ -93,6 +93,7 @@ int db_postgres_bind_api(db_func_t *dbb) dbb->insert = db_postgres_insert; dbb->delete = db_postgres_delete; dbb->update = db_postgres_update; + dbb->replace = db_postgres_replace; dbb->affected_rows = db_postgres_affected_rows;
return 0; diff --git a/modules/db_postgres/km_db_postgres.h b/modules/db_postgres/km_db_postgres.h index 8959417..ae69d2f 100644 --- a/modules/db_postgres/km_db_postgres.h +++ b/modules/db_postgres/km_db_postgres.h @@ -36,4 +36,8 @@ int db_postgres_bind_api(db_func_t *dbb);
int km_postgres_mod_init(void);
+int pg_init_lock_set(int sz); + +void pg_destroy_lock_set(void); + #endif /* _KM_DB_POSTGRES_H */ diff --git a/modules/db_postgres/km_dbase.c b/modules/db_postgres/km_dbase.c index a7b9546..216ca05 100644 --- a/modules/db_postgres/km_dbase.c +++ b/modules/db_postgres/km_dbase.c @@ -75,12 +75,50 @@ #include "../../lib/srdb1/db.h" #include "../../lib/srdb1/db_ut.h" #include "../../lib/srdb1/db_query.h" +#include "../../locking.h" +#include "../../hashes.h" #include "km_dbase.h" #include "km_pg_con.h" #include "km_val.h" #include "km_res.h" #include "pg_mod.h"
+static gen_lock_set_t *_pg_lock_set = NULL; +static unsigned int _pg_lock_size = 0; + +/*! + * \brief init lock set used to implement SQL REPLACE via UPDATE/INSERT + * \param sz power of two to compute the lock set size + * \return 0 on success, -1 on error + */ +int pg_init_lock_set(int sz) +{ + if(sz>0 && sz<=10) + { + _pg_lock_size = 1<<sz; + } else { + _pg_lock_size = 1<<4; + } + _pg_lock_set = lock_set_alloc(_pg_lock_size); + if(_pg_lock_set==NULL || lock_set_init(_pg_lock_set)==NULL) + { + LM_ERR("cannot initiate lock set\n"); + return -1; + } + return 0; +} + +void pg_destroy_lock_set(void) +{ + if(_pg_lock_set!=NULL) + { + lock_set_destroy(_pg_lock_set); + lock_set_dealloc(_pg_lock_set); + _pg_lock_set = NULL; + _pg_lock_size = 0; + } +} + static void db_postgres_free_query(const db1_con_t* _con);
@@ -587,3 +625,85 @@ int db_postgres_use_table(db1_con_t* _con, const str* _t) { return db_use_table(_con, _t); } + + +/*! + * \brief SQL REPLACE implementation + * \param _h structure representing database connection + * \param _k key names + * \param _v values of the keys + * \param _n number of key=value pairs + * \param _un number of keys to build the unique key, starting from first + * \param _m mode - first update, then insert, or first insert, then update + * \return 0 on success, negative on failure + */ +int db_postgres_replace(const db1_con_t* _h, const db_key_t* _k, + const db_val_t* _v, const int _n, const int _un, const int _m) +{ + unsigned int pos = 0; + int i; + + if(_un > _n) + { + LM_ERR("number of columns for unique key is too high\n"); + return -1; + } + + if(_un > 0) + { + for(i=0; i<_un; i++) + { + if(!VAL_NULL(&_v[i])) + { + switch(VAL_TYPE(&_v[i])) + { + case DB1_INT: + pos += VAL_UINT(&_v[i]); + break; + case DB1_STR: + pos += get_hash1_raw((VAL_STR(&_v[i])).s, + (VAL_STR(&_v[i])).len); + break; + case DB1_STRING: + pos += get_hash1_raw(VAL_STRING(&_v[i]), + strlen(VAL_STRING(&_v[i]))); + break; + default: + break; + } + } + } + pos &= (_pg_lock_size-1); + lock_set_get(_pg_lock_set, pos); + if(db_postgres_update(_h, _k, 0, _v, _k + _un, + _v + _un, _un, _n -_un)< 0) + { + LM_ERR("update failed\n"); + lock_set_release(_pg_lock_set, pos); + return -1; + } + + if (db_postgres_affected_rows(_h) <= 0) + { + if(db_postgres_insert(_h, _k, _v, _n)< 0) + { + LM_ERR("insert failed\n"); + lock_set_release(_pg_lock_set, pos); + return -1; + } + LM_DBG("inserted new record in database table\n"); + } else { + LM_DBG("updated record in database table\n"); + } + lock_set_release(_pg_lock_set, pos); + } else { + if(db_postgres_insert(_h, _k, _v, _n)< 0) + { + LM_ERR("direct insert failed\n"); + return -1; + } + LM_DBG("directly inserted new record in database table\n"); + } + return 0; +} + diff --git a/modules/db_postgres/km_dbase.h b/modules/db_postgres/km_dbase.h index 7c15e8d..bfdd5b1 100644 --- a/modules/db_postgres/km_dbase.h +++ b/modules/db_postgres/km_dbase.h @@ -113,5 +113,10 @@ int db_postgres_affected_rows(const db1_con_t* _h); */ int db_postgres_use_table(db1_con_t* _h, const str* _t);
+/* + * Replace a row in table (via update/insert) + */ +int db_postgres_replace(const db1_con_t* _h, const db_key_t* _k, + const db_val_t* _v, const int _n, const int _un, const int _m);
#endif /* KM_DBASE_H */ diff --git a/modules/db_postgres/pg_mod.c b/modules/db_postgres/pg_mod.c index c560c9d..57dd2fd 100644 --- a/modules/db_postgres/pg_mod.c +++ b/modules/db_postgres/pg_mod.c @@ -59,6 +59,8 @@ int pg_connect_timeout = 0; /* Default is unlimited */ int pg_retries = 2; /* How many times should the module try re-execute failed commands. * 0 disables reconnecting */
+int pg_lockset = 4; + /* * Postgres module interface */ @@ -88,6 +90,7 @@ static cmd_export_t cmds[] = { */ static param_export_t params[] = { {"retries", PARAM_INT, &pg_retries }, + {"lockset", PARAM_INT, &pg_lockset }, {0, 0, 0} };
@@ -547,6 +550,8 @@ static int pg_mod_init(void) } return -1; #endif /* PG_TEST */ + if(pg_init_lock_set(pg_lockset)<0) + return -1; return km_postgres_mod_init(); }