[sr-dev] git:master: presence(k): option to fetch records from db in chunks

Daniel-Constantin Mierla miconda at gmail.com
Thu Dec 1 09:22:00 CET 2011


Module: sip-router
Branch: master
Commit: 5931a413c2e7be4aa0c8ec57f777c1f1805dc86e
URL:    http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=5931a413c2e7be4aa0c8ec57f777c1f1805dc86e

Author: Daniel-Constantin Mierla <miconda at gmail.com>
Committer: Daniel-Constantin Mierla <miconda at gmail.com>
Date:   Thu Dec  1 09:20:01 2011 +0100

presence(k): option to fetch records from db in chunks

- can cope with large number of db records
- new parameter fetch_rows (default value 500)

---

 modules_k/presence/presence.c   |    2 +
 modules_k/presence/presentity.c |   96 +++++++++++++++++++++------------------
 modules_k/presence/subscribe.c  |   42 +++--------------
 3 files changed, 60 insertions(+), 80 deletions(-)

diff --git a/modules_k/presence/presence.c b/modules_k/presence/presence.c
index b0ec644..d3db179 100644
--- a/modules_k/presence/presence.c
+++ b/modules_k/presence/presence.c
@@ -90,6 +90,7 @@ str presentity_table= str_init("presentity");
 str active_watchers_table = str_init("active_watchers");
 str watchers_table= str_init("watchers");
 
+int pres_fetch_rows = 500;
 int library_mode= 0;
 str server_address= {0, 0};
 evlist_t* EvList= NULL;
@@ -179,6 +180,7 @@ static param_export_t params[]={
 	{ "enable_sphere_check",    INT_PARAM, &sphere_enable},
 	{ "timeout_rm_subs",        INT_PARAM, &timeout_rm_subs},
 	{ "send_fast_notify",	    INT_PARAM, &send_fast_notify},
+	{ "fetch_rows",             INT_PARAM, &pres_fetch_rows},
     {0,0,0}
 };
 
diff --git a/modules_k/presence/presentity.c b/modules_k/presence/presentity.c
index 75c3cfc..0c194f2 100644
--- a/modules_k/presence/presentity.c
+++ b/modules_k/presence/presentity.c
@@ -54,6 +54,8 @@ xmlNodePtr xmlNodeGetNodeByName(xmlNodePtr node, const char *name,
 static str pu_200_rpl  = str_init("OK");
 static str pu_412_rpl  = str_init("Conditional request failed");
 
+extern int pres_fetch_rows;
+
 #define ETAG_LEN  128
 
 char* generate_ETag(int publ_count)
@@ -717,8 +719,8 @@ int pres_htable_restore(void)
 		goto error;
 	}
 	static str query_str = str_init("username");
-	if (pa_dbf.query (pa_db, 0, 0, 0,result_cols,0, n_result_cols,
-				&query_str, &result) < 0)
+	if (db_fetch_query(&pa_dbf, pres_fetch_rows, pa_db, 0, 0, 0, result_cols,
+				0, n_result_cols, &query_str, &result) < 0)
 	{
 		LM_ERR("querying presentity\n");
 		goto error;
@@ -731,58 +733,62 @@ int pres_htable_restore(void)
 		pa_dbf.free_result(pa_db, result);
 		return 0;
 	}
-		
-	for(i= 0; i< result->n; i++)
-	{
-		row = &result->rows[i];
-		row_vals = ROW_VALUES(row);
 
-		if(row_vals[expires_col].val.int_val< (int)time(NULL))
-			continue;
-		
-		sphere= NULL;
-		user.s= (char*)row_vals[user_col].val.string_val;
-		user.len= strlen(user.s);
-		domain.s= (char*)row_vals[domain_col].val.string_val;
-		domain.len= strlen(domain.s);
-		ev_str.s= (char*)row_vals[event_col].val.string_val;
-		ev_str.len= strlen(ev_str.s);
-
-		if(event_parser(ev_str.s, ev_str.len, &ev)< 0)
+	do {
+		for(i= 0; i< result->n; i++)
 		{
-			LM_ERR("parsing event\n");
+			row = &result->rows[i];
+			row_vals = ROW_VALUES(row);
+
+			if(row_vals[expires_col].val.int_val< (int)time(NULL))
+				continue;
+		
+			sphere= NULL;
+			user.s= (char*)row_vals[user_col].val.string_val;
+			user.len= strlen(user.s);
+			domain.s= (char*)row_vals[domain_col].val.string_val;
+			domain.len= strlen(domain.s);
+			ev_str.s= (char*)row_vals[event_col].val.string_val;
+			ev_str.len= strlen(ev_str.s);
+
+			if(event_parser(ev_str.s, ev_str.len, &ev)< 0)
+			{
+				LM_ERR("parsing event\n");
+				free_event_params(ev.params.list, PKG_MEM_TYPE);
+				goto error;
+			}
+			event= ev.type;
 			free_event_params(ev.params.list, PKG_MEM_TYPE);
-			goto error;
-		}
-		event= ev.type;
-		free_event_params(ev.params.list, PKG_MEM_TYPE);
 
-		if(uandd_to_uri(user, domain, &uri)< 0)
-		{
-			LM_ERR("constructing uri\n");
-			goto error;
-		}
-		/* insert in hash_table*/
+			if(uandd_to_uri(user, domain, &uri)< 0)
+			{
+				LM_ERR("constructing uri\n");
+				goto error;
+			}
+			/* insert in hash_table*/
 	
-		if(sphere_enable && event== EVENT_PRESENCE )
-		{
-			body.s= (char*)row_vals[body_col].val.string_val;
-			body.len= strlen(body.s);
-			sphere= extract_sphere(body);
-		}
+			if(sphere_enable && event== EVENT_PRESENCE )
+			{
+				body.s= (char*)row_vals[body_col].val.string_val;
+				body.len= strlen(body.s);
+				sphere= extract_sphere(body);
+			}
 
-		if(insert_phtable(&uri, event, sphere)< 0)
-		{
-			LM_ERR("inserting record in presentity hash table");
-			pkg_free(uri.s);
+			if(insert_phtable(&uri, event, sphere)< 0)
+			{
+				LM_ERR("inserting record in presentity hash table");
+				pkg_free(uri.s);
+				if(sphere)
+					pkg_free(sphere);
+				goto error;
+			}
 			if(sphere)
 				pkg_free(sphere);
-			goto error;
+			pkg_free(uri.s);
 		}
-		if(sphere)
-			pkg_free(sphere);
-		pkg_free(uri.s);
-	}
+	} while((db_fetch_next(&pa_dbf, pres_fetch_rows, pa_db, &result)==1)
+			&& (RES_ROW_N(result)>0));
+
 	pa_dbf.free_result(pa_db, result);
 
 	return 0;
diff --git a/modules_k/presence/subscribe.c b/modules_k/presence/subscribe.c
index 3227cc7..b29fa34 100644
--- a/modules_k/presence/subscribe.c
+++ b/modules_k/presence/subscribe.c
@@ -46,8 +46,6 @@
 #include "notify.h"
 #include "../pua/hash.h"
 
-#define ACTW_FETCH_SIZE  128
-
 int get_stored_info(struct sip_msg* msg, subs_t* subs, int* error_ret,
 		str* reply_str);
 int get_database_info(struct sip_msg* msg, subs_t* subs, int* error_ret,
@@ -61,6 +59,7 @@ static str pu_400_rpl  = str_init("Bad request");
 static str pu_500_rpl  = str_init("Server Internal Error");
 static str pu_489_rpl  = str_init("Bad Event");
 
+extern int pres_fetch_rows;
 
 int send_2XX_reply(struct sip_msg * msg, int reply_code, int lexpire,
 		str* local_contact)
@@ -1837,27 +1836,11 @@ int restore_db_subs(void)
 	}
 
 	/* select the whole table and all the columns */
-	if (DB_CAPABILITY(pa_dbf, DB_CAP_FETCH)) 
+	if (db_fetch_query(&pa_dbf, pres_fetch_rows, pa_db, 0, 0, 0, result_cols,
+				0, n_result_cols, 0, &result) < 0)
 	{
-		if(pa_dbf.query(pa_db,0,0,0,result_cols, 0,
-		n_result_cols, 0, 0) < 0) 
-		{
-			LM_ERR("Error while querying (fetch) database\n");
-			return -1;
-		}
-		if(pa_dbf.fetch_result(pa_db,&result,ACTW_FETCH_SIZE)<0)
-		{
-			LM_ERR("fetching rows failed\n");
-			return -1;
-		}
-	} else 
-	{
-		if (pa_dbf.query (pa_db, 0, 0, 0,result_cols,0, n_result_cols,
-					0, &result) < 0)
-		{
-			LM_ERR("querying presentity\n");
-			goto error;
-		}
+		LM_ERR("querying presentity\n");
+		goto error;
 	}
 
 	nr_rows = RES_ROW_N(result);
@@ -1983,19 +1966,8 @@ int restore_db_subs(void)
 			}
 		}
 
-		/* any more data to be fetched ?*/
-		if (DB_CAPABILITY(pa_dbf, DB_CAP_FETCH)) {
-		    if (pa_dbf.fetch_result( pa_db, &result,
-					     ACTW_FETCH_SIZE ) < 0) {
-			LM_ERR("fetching more rows failed\n");
-			goto error;
-		    }
-		    nr_rows = RES_ROW_N(result);
-		} else {
-		    nr_rows = 0;
-		}
-
-	}while (nr_rows>0);
+	} while((db_fetch_next(&pa_dbf, pres_fetch_rows, pa_db, &result)==1)
+			&& (RES_ROW_N(result)>0));
 
 	pa_dbf.free_result(pa_db, result);
 




More information about the sr-dev mailing list