[sr-dev] git:master: modules_k/rls: Use database row/ table locking where supported in DB only mode

Peter Dunkley peter.dunkley at crocodile-rcs.com
Tue Aug 21 16:41:49 CEST 2012


Module: sip-router
Branch: master
Commit: 10327c61d35e034f31c49a27f11f81ec82c22055
URL:    http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=10327c61d35e034f31c49a27f11f81ec82c22055

Author: Peter Dunkley <peter.dunkley at crocodile-rcs.com>
Committer: Peter Dunkley <peter.dunkley at crocodile-rcs.com>
Date:   Tue Aug 21 15:41:27 2012 +0100

modules_k/rls: Use database row/table locking where supported in DB only mode

- Under load there are lots of DB deadlocks when using
  (start|end)_transaction() with multiple presence processes and/or
  servers.
- Without using (start|end)_transaction() multiple processes/servers
  overwrite each others changes.
- Using row locking (where possible) and table locking (where
  required) fixes these problems.
- IMPORTANT NOTE: DB only, multi-process/multi-server, presence will
  only work properly under high-load when using a database driver that
  supports transactions and locking (currently just db_postgres).

---

 modules_k/rls/notify.c          |   10 ++++----
 modules_k/rls/resource_notify.c |   40 +++++++++++++++++++++++++++++------
 modules_k/rls/rls_db.c          |   37 +++++++++++++++++++++++++++++---
 modules_k/rls/subscribe.c       |   44 +++++++++++++++++++++++++++++++++++++++
 4 files changed, 115 insertions(+), 16 deletions(-)

diff --git a/modules_k/rls/notify.c b/modules_k/rls/notify.c
index c8eb230..f0ec0aa 100644
--- a/modules_k/rls/notify.c
+++ b/modules_k/rls/notify.c
@@ -101,10 +101,10 @@ int send_full_notify(subs_t* subs, xmlNodePtr rl_node, str* rl_uri,
 	int len_est;
 	res_param_t param;
 	int resource_added = 0; /* Flag to indicate that we have added at least one resource */
-	multipart_body = NULL;
+	multipart_body=NULL;
+	db_query_f query_fn = rlpres_dbf.query_lock ? rlpres_dbf.query_lock : rlpres_dbf.query;
 
 	LM_DBG("start\n");
-	/* query in alfabetical order */
 	
 	if(CONSTR_RLSUBS_DID(subs, &rlsubs_did)<0)
 	{
@@ -136,15 +136,15 @@ int send_full_notify(subs_t* subs, xmlNodePtr rl_node, str* rl_uri,
 
 	if (dbmode == RLS_DB_ONLY && rlpres_dbf.start_transaction)
 	{
-		if (rlpres_dbf.start_transaction(rlpres_db) < 0)
+		if (rlpres_dbf.start_transaction(rlpres_db, DB_LOCKING_WRITE) < 0)
 		{
 			LM_ERR("in start_transaction\n");
 			goto error;
 		}
 	}
 
-	if(rlpres_dbf.query(rlpres_db, query_cols, 0, query_vals, result_cols,
-					1, n_result_cols, &str_resource_uri_col, &result )< 0)
+	if(query_fn(rlpres_db, query_cols, 0, query_vals, result_cols,
+					1, n_result_cols, NULL, &result )< 0)
 	{
 		LM_ERR("in sql query\n");
 		goto error;
diff --git a/modules_k/rls/resource_notify.c b/modules_k/rls/resource_notify.c
index 8937131..1b9b47c 100644
--- a/modules_k/rls/resource_notify.c
+++ b/modules_k/rls/resource_notify.c
@@ -203,6 +203,15 @@ static void send_notifies(db1_res_t *result, int did_col, int resource_uri_col,
 		ERR_MEM(PKG_MEM_STR);
 	}
 
+	if (dbmode == RLS_DB_ONLY && rls_dbf.start_transaction)
+	{
+		if (rls_dbf.start_transaction(rls_db, DB_LOCKING_WRITE) < 0)
+		{
+			LM_ERR("in start_transaction\n");
+			goto error;
+		}
+	}
+
 	LM_DBG("found %d records with updated state\n", result->n);
 	for(i= 0; i< result->n; i++)
 	{
@@ -420,9 +429,17 @@ static void send_notifies(db1_res_t *result, int did_col, int resource_uri_col,
 		dialog= NULL;
 	}
 
-	
-error:
 done:
+	if (dbmode == RLS_DB_ONLY && rls_dbf.end_transaction)
+	{
+		if (rls_dbf.end_transaction(rls_db) < 0)
+		{
+			LM_ERR("in end_transaction\n");
+			goto error;
+		}
+	}
+
+error:
 	if(bstr.s)
 		pkg_free(bstr.s);
 
@@ -430,6 +447,13 @@ done:
 		pkg_free(buf);
 	if(dialog)
 		pkg_free(dialog);
+
+	if (dbmode == RLS_DB_ONLY && rls_dbf.abort_transaction)
+	{
+		if (rls_dbf.abort_transaction(rls_db) < 0)
+			LM_ERR("in abort_transaction\n");
+	}
+
 	return;
 }
 
@@ -769,7 +793,7 @@ int rls_handle_notify(struct sip_msg* msg, char* c1, char* c2)
 
 	if (dbmode == RLS_DB_ONLY && rlpres_dbf.start_transaction)
 	{
-		if (rlpres_dbf.start_transaction(rlpres_db) < 0)
+		if (rlpres_dbf.start_transaction(rlpres_db, DB_LOCKING_WRITE) < 0)
 		{
 			LM_ERR("in start_transaction\n");
 			goto error;
@@ -883,6 +907,7 @@ static void timer_send_full_state_notifies(int round)
 	xmlDocPtr doc = NULL;
 	xmlNodePtr service_node = NULL;
 	int now = (int)time(NULL);
+	db_query_f query_fn = rls_dbf.query_lock ? rls_dbf.query_lock : rls_dbf.query;
 
 	query_cols[0] = &str_updated_col;
 	query_vals[0].type = DB1_INT;
@@ -925,7 +950,7 @@ static void timer_send_full_state_notifies(int round)
 
 	if (dbmode == RLS_DB_ONLY && rls_dbf.start_transaction)
 	{
-		if (rls_dbf.start_transaction(rls_db) < 0)
+		if (rls_dbf.start_transaction(rls_db, DB_LOCKING_WRITE) < 0)
 		{
 			LM_ERR("in start_transaction\n");
 			goto done;
@@ -933,7 +958,7 @@ static void timer_send_full_state_notifies(int round)
 	}
 
 	/* Step 1: Find rls_watchers that require full-state notification */
-	if (rls_dbf.query(rls_db, query_cols, 0, query_vals, result_cols,
+	if (query_fn(rls_db, query_cols, 0, query_vals, result_cols,
 				1, n_result_cols, 0, &result) < 0)
 	{
 		LM_ERR("in sql query\n");
@@ -1051,6 +1076,7 @@ static void timer_send_update_notifies(int round)
 		pres_state_col, content_type_col;
 	int n_result_cols= 0;
 	db1_res_t *result= NULL;
+	db_query_f query_fn = rlpres_dbf.query_lock ? rlpres_dbf.query_lock : rlpres_dbf.query;
 
 	query_cols[0]= &str_updated_col;
 	query_vals[0].type = DB1_INT;
@@ -1080,14 +1106,14 @@ static void timer_send_update_notifies(int round)
 
 	if (dbmode == RLS_DB_ONLY && rlpres_dbf.start_transaction)
 	{
-		if (rlpres_dbf.start_transaction(rlpres_db) < 0)
+		if (rlpres_dbf.start_transaction(rlpres_db, DB_LOCKING_WRITE) < 0)
 		{
 			LM_ERR("in start_transaction\n");
 			goto done;
 		}
 	}
 
-	if(rlpres_dbf.query(rlpres_db, query_cols, 0, query_vals, result_cols,
+	if(query_fn(rlpres_db, query_cols, 0, query_vals, result_cols,
 					1, n_result_cols, &str_rlsubs_did_col, &result)< 0)
 	{
 		LM_ERR("in sql query\n");
diff --git a/modules_k/rls/rls_db.c b/modules_k/rls/rls_db.c
index 97ac7d6..ee406de 100644
--- a/modules_k/rls/rls_db.c
+++ b/modules_k/rls/rls_db.c
@@ -124,6 +124,7 @@ int delete_expired_subs_rlsdb( void )
 	int i;
 	subs_t subs;
 	str rlsubs_did = {0, 0};
+	db_query_f query_fn = rls_dbf.query_lock ? rls_dbf.query_lock : rls_dbf.query;
 
 	if(rls_db == NULL)
 	{
@@ -148,7 +149,16 @@ int delete_expired_subs_rlsdb( void )
 	result_cols[r_to_tag_col=n_result_cols++] = &str_to_tag_col;
 	result_cols[r_from_tag_col=n_result_cols++] = &str_from_tag_col;
 
-	if(rls_dbf.query(rls_db, query_cols, query_ops, query_vals, result_cols, 
+	if (rls_dbf.start_transaction)
+	{
+		if (rls_dbf.start_transaction(rls_db, DB_LOCKING_WRITE) < 0)
+		{
+			LM_ERR("in start_transaction\n");
+			goto error;
+		}
+	}
+
+	if(query_fn(rls_db, query_cols, query_ops, query_vals, result_cols, 
 				n_query_cols, n_result_cols, 0, &result )< 0)
 	{
 		LM_ERR("Can't query db\n");
@@ -213,11 +223,28 @@ int delete_expired_subs_rlsdb( void )
 	}
 
 	rls_dbf.free_result(rls_db, result);
+
+	if (rls_dbf.end_transaction)
+	{
+		if (rls_dbf.end_transaction(rls_db) < 0)
+		{
+			LM_ERR("in end_transaction\n");
+			goto error;
+		}
+	}
+
 	return 1;
 
 error:
 	if (result) rls_dbf.free_result(rls_db, result);
 	if (rlsubs_did.s) pkg_free(rlsubs_did.s);
+
+	if (rls_dbf.abort_transaction)
+	{
+		if (rls_dbf.abort_transaction(rls_db) < 0)
+			LM_ERR("in abort_transaction\n");
+	}
+
 	return -1;
 }
 
@@ -718,7 +745,8 @@ int get_dialog_subscribe_rlsdb(subs_t *subs)
 	int nr_rows;
 	int r_remote_cseq, r_local_cseq, r_version;
 	char *r_pres_uri, *r_record_route;
-
+	db_query_f query_fn = rls_dbf.query_lock ? rls_dbf.query_lock : rls_dbf.query;
+	
 	if(rls_db == NULL)
 	{
 		LM_ERR("null database connection\n");
@@ -761,7 +789,7 @@ int get_dialog_subscribe_rlsdb(subs_t *subs)
 	result_cols[version_col = n_result_cols++] = &str_version_col;
 	result_cols[rroute_col = n_result_cols++] = &str_record_route_col;
 
-	if(rls_dbf.query(rls_db, query_cols, 0, query_vals, result_cols, 
+	if(query_fn(rls_db, query_cols, 0, query_vals, result_cols, 
 			n_query_cols, n_result_cols, 0, &result )< 0)
 	{
 		LM_ERR("Can't query db\n");
@@ -865,6 +893,7 @@ subs_t *get_dialog_notify_rlsdb(str callid, str to_tag, str from_tag)
 	subs_t *dest;
 	event_t parsed_event;
 	str ev_sname;
+	db_query_f query_fn = rls_dbf.query_lock ? rls_dbf.query_lock : rls_dbf.query;
 
 	if(rls_db == NULL)
 	{
@@ -919,7 +948,7 @@ subs_t *get_dialog_notify_rlsdb(str callid, str to_tag, str from_tag)
 	result_cols[r_version_col=n_result_cols++] = &str_version_col;
 	result_cols[r_expires_col=n_result_cols++] = &str_expires_col;
 
-	if(rls_dbf.query(rls_db, query_cols, 0, query_vals, result_cols, 
+	if(query_fn(rls_db, query_cols, 0, query_vals, result_cols, 
 				n_query_cols, n_result_cols, 0, &result )< 0)
 	{
 		LM_ERR("Can't query db\n");
diff --git a/modules_k/rls/subscribe.c b/modules_k/rls/subscribe.c
index f719754..c770cbf 100644
--- a/modules_k/rls/subscribe.c
+++ b/modules_k/rls/subscribe.c
@@ -639,6 +639,15 @@ int rls_handle_subscribe(struct sip_msg* msg, str watcher_user, str watcher_doma
 		/* search if a stored dialog */
 		if ( dbmode == RLS_DB_ONLY )
 		{
+			if (rls_dbf.start_transaction)
+			{
+				if (rls_dbf.start_transaction(rls_db, DB_LOCKING_WRITE) < 0)
+				{
+					LM_ERR("in start_transaction\n");
+					goto error;
+				}
+			}
+
 			rt = get_dialog_subscribe_rlsdb(&subs);
 
 			if (rt <= 0)
@@ -646,6 +655,16 @@ int rls_handle_subscribe(struct sip_msg* msg, str watcher_user, str watcher_doma
 				LM_DBG("subscription dialog not found for <%.*s@%.*s>\n",
 						subs.watcher_user.len, subs.watcher_user.s,
 						subs.watcher_domain.len, subs.watcher_domain.s);
+
+				if (rls_dbf.end_transaction)
+				{
+					if (rls_dbf.end_transaction(rls_db) < 0)
+					{
+						LM_ERR("in end_transaction\n");
+						goto error;
+					}
+				}
+
 				goto forpresence;
 			}
 			else if(rt>=400)
@@ -657,6 +676,16 @@ int rls_handle_subscribe(struct sip_msg* msg, str watcher_user, str watcher_doma
 					LM_ERR("while sending reply\n");
 					goto error;
 				}
+
+				if (rls_dbf.end_transaction)
+				{
+					if (rls_dbf.end_transaction(rls_db) < 0)
+					{
+						LM_ERR("in end_transaction\n");
+						goto error;
+					}
+				}
+
 				ret = 0;
 				goto stop;
 			}
@@ -670,6 +699,15 @@ int rls_handle_subscribe(struct sip_msg* msg, str watcher_user, str watcher_doma
 				LM_ERR("while updating resource list subscription\n");
 				goto error;
 			}
+
+			if (rls_dbf.end_transaction)
+			{
+				if (rls_dbf.end_transaction(rls_db) < 0)
+				{
+					LM_ERR("in end_transaction\n");
+					goto error;
+				}
+			}
 		}
 		else
 		{
@@ -795,6 +833,12 @@ error:
 	if (rlsubs_did.s != NULL)
 		pkg_free(rlsubs_did.s);
 
+	if (rls_dbf.abort_transaction)
+	{
+		if (rls_dbf.abort_transaction(rls_db) < 0)
+			LM_ERR("in abort_transaction\n");
+	}
+	
 	return err_ret;
 }
 




More information about the sr-dev mailing list