[sr-dev] git:master: modules/ db_cassandra Added support for queries without condition

Anca Vamanu anca.vamanu at 1and1.ro
Mon Jan 23 17:43:59 CET 2012


Module: sip-router
Branch: master
Commit: 13464e8d055363e820522e566c24cde043d2e00f
URL:    http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=13464e8d055363e820522e566c24cde043d2e00f

Author: Anca Vamanu <anca.vamanu at 1and1.ro>
Committer: Anca Vamanu <anca.vamanu at 1and1.ro>
Date:   Mon Jan 23 18:43:21 2012 +0200

modules/db_cassandra  Added support for queries without condition

	Now it is possible to fetch all rows in a table.
	Tested with domain module.

---

 modules/db_cassandra/README                     |    2 +-
 modules/db_cassandra/dbcassa_base.cpp           |   96 +++++++++++++++--------
 modules/db_cassandra/doc/db_cassandra_admin.xml |    2 +-
 3 files changed, 66 insertions(+), 34 deletions(-)

diff --git a/modules/db_cassandra/README b/modules/db_cassandra/README
index c4bf97b..be27201 100644
--- a/modules/db_cassandra/README
+++ b/modules/db_cassandra/README
@@ -84,7 +84,7 @@ Chapter 1. Admin Guide
    Router servers, with more proxies and registration servers accessing
    the same location database. This was actually the main use case we had
    in mind when implementing this module. Please NOTE that it has only
-   been tested with the usrloc and auth_db modules.
+   been tested with the usrloc, auth_db and domain modules.
 
    You can find a configuration file example for this usage in the module
    - kamailio_cassa.cfg.
diff --git a/modules/db_cassandra/dbcassa_base.cpp b/modules/db_cassandra/dbcassa_base.cpp
index aa93474..bd7f057 100644
--- a/modules/db_cassandra/dbcassa_base.cpp
+++ b/modules/db_cassandra/dbcassa_base.cpp
@@ -427,36 +427,39 @@ dbcassa_column_p cassa_search_col(dbcassa_table_p tbc, db_key_t col_name)
 	return 0;
 }
 
-typedef std::auto_ptr<std::vector<oac::ColumnOrSuperColumn> >  ColumnVecPtr;
+typedef std::vector<oac::ColumnOrSuperColumn>  ColumnVec;
+typedef std::auto_ptr<ColumnVec>  ColumnVecPtr;
 
 ColumnVecPtr cassa_translate_query(const db1_con_t* _h, const db_key_t* _k,
-		const db_val_t* _v, const db_key_t* _c, int _n, int _nc)
+		const db_val_t* _v, const db_key_t* _c, int _n, int _nc, int* ret_rows_no)
 {
 	char row_key[cassa_max_key_len];
 	char sec_key[cassa_max_key_len];
-	int key_len, seckey_len = 0;
+	int key_len=0, seckey_len = 0;
 	int no_kc, no_sec_kc;
 	dbcassa_table_p tbc;
 
 	/** Lock table schema and construct primary and secondary key **/
-	tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
-	if(!tbc) {
-		LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
-		return ColumnVecPtr(NULL);
-	}
-	cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, &no_kc, row_key);
+	if(_k) {
+		tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
+		if(!tbc) {
+			LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
+			return ColumnVecPtr(NULL);
+		}
+		cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, &no_kc, row_key);
 
-	if(no_kc != tbc->key_len) {/* was not able to construct the whole key */
-		LM_ERR("Query not supported - key not provided\n");
-		dbcassa_lock_release(tbc);
-		return ColumnVecPtr(NULL);
-	}
-	key_len = tbc->key_len;
+		if(no_kc != tbc->key_len) {/* was not able to construct the whole key */
+			LM_ERR("Query not supported - key not provided\n");
+			dbcassa_lock_release(tbc);
+			return ColumnVecPtr(NULL);
+		}
+		key_len = tbc->key_len;
 
-	cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, &no_sec_kc, sec_key);
-	seckey_len = tbc->seckey_len;
+		cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, &no_sec_kc, sec_key);
+		seckey_len = tbc->seckey_len;
 
-	dbcassa_lock_release(tbc);
+		dbcassa_lock_release(tbc);
+	}
 
 	try {
 		oac::SlicePredicate sp;
@@ -514,7 +517,42 @@ ColumnVecPtr cassa_translate_query(const db1_con_t* _h, const db_key_t* _k,
 		do {
 			if(CON_CASSA(_h)->con) {
 				try {
-					CON_CASSA(_h)->con->get_slice(*cassa_result, row_key, cparent, sp, oac::ConsistencyLevel::ONE);
+
+					if(_k) {
+						CON_CASSA(_h)->con->get_slice(*cassa_result, row_key, cparent, sp, oac::ConsistencyLevel::ONE);
+						*ret_rows_no = 1;
+					} else {
+						oac::KeyRange keyRange;
+						keyRange.start_key = "";
+						keyRange.start_key = "";
+						std::vector<oac::KeySlice> key_slice_vect;
+						keyRange.__isset.start_key = 1;
+						keyRange.__isset.end_key = 1;
+						ColumnVec::iterator it = cassa_result->begin();
+
+						/* get in a loop 100 records at a time */
+						int rows_no =0;
+						while(1) {
+							CON_CASSA(_h)->con->get_range_slices(key_slice_vect, cparent, sp, keyRange, oac::ConsistencyLevel::ONE);
+							/* construct cassa_result */
+							LM_DBG("Retuned %d key slices\n", key_slice_vect.size());
+							for(unsigned int i = 0; i< key_slice_vect.size(); i++) {
+								if(key_slice_vect[i].columns.size()==0) {
+									continue;
+								}
+								cassa_result->insert(it, key_slice_vect[i].columns.begin(), key_slice_vect[i].columns.end());
+								it = cassa_result->begin();
+								row_slices[rows_no][0] = cassa_result->size();
+								row_slices[rows_no][1] = 0;
+								rows_no++;
+							}
+							if(key_slice_vect.size() < (unsigned int)keyRange.count)
+								break;
+						}
+
+						*ret_rows_no = rows_no;
+					}
+
 					return cassa_result;
 				} catch (const att::TTransportException &tx) {
 					LM_ERR("Failed to query: %s\n", tx.what());
@@ -571,7 +609,7 @@ int db_cassa_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
 
 	/** Construct and send the query to Cassandra Cluster **/
 
-	cassa_result = cassa_translate_query(_h, _k, _v, _c, _n, _nc);
+	cassa_result = cassa_translate_query(_h, _k, _v, _c, _n, _nc, &rows_no);
 
 	if(cassa_result.get() == NULL) {
 		LM_ERR("Failed to query Cassandra cluster\n");
@@ -626,6 +664,7 @@ int db_cassa_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
 		LM_DBG("RES_NAMES(%p)[%d]=[%.*s]\n", RES_NAMES(db_res)[col], col,
 				RES_NAMES(db_res)[col]->len, RES_NAMES(db_res)[col]->s);
 	}
+	/* TODO  if all columns asked - take from table schema */
 	seckey_len = tbc->seckey_len;
 	dbcassa_lock_release(tbc);
 
@@ -636,16 +675,11 @@ int db_cassa_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
 	}
 
 	/* Initialize the row_slices vector for the case with one column and no secondary key */
-	row_slices[0][0]= cassa_result->size();
-	row_slices[0][1]= 0;
-
-	if(seckey_len) { /* if the table has a secondary key defined */
-/* if the secondary key was computed */
-/*		if (no_sec_kc == seckey_len) { 
-			rows_no = 1;
-			row_slices[0][1] = strlen(sec_key) + 1;
-		}
-		else */ {
+	if(rows_no == 1) {
+		row_slices[0][0]= cassa_result->size();
+		row_slices[0][1]= 0;
+
+		if(seckey_len) { /* if the table has a secondary key defined */
 			/* pass through the result once to see how many rows there are */
 			rows_no = cassa_result_separate_rows(*cassa_result);
 			if(rows_no < 0) {
@@ -654,8 +688,6 @@ int db_cassa_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
 			}
 		}
 	}
-	else
-		rows_no = 1;
 
 	RES_ROW_N(db_res) = rows_no;
 
diff --git a/modules/db_cassandra/doc/db_cassandra_admin.xml b/modules/db_cassandra/doc/db_cassandra_admin.xml
index f6566b8..1a59500 100644
--- a/modules/db_cassandra/doc/db_cassandra_admin.xml
+++ b/modules/db_cassandra/doc/db_cassandra_admin.xml
@@ -40,7 +40,7 @@
 		servers, with more proxies and registration servers accessing the same location
 		database. This was actually the main use case we had in mind when implementing
 		this module. Please NOTE that it has only been tested with the
-		<emphasis>usrloc</emphasis> and <emphasis>auth_db</emphasis> modules.
+		<emphasis>usrloc</emphasis>, <emphasis>auth_db</emphasis> and <emphasis>domain</emphasis> modules.
 	</para>
 	<para>
 		You can find a configuration file example for this usage in the module - kamailio_cassa.cfg.




More information about the sr-dev mailing list