diff --git a/src/include/switch_core.h b/src/include/switch_core.h index 153e582037..3f9eddb584 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -1987,6 +1987,7 @@ typedef struct { switch_cache_db_native_handle_t native_handle; time_t last_used; switch_mutex_t *mutex; + switch_mutex_t *io_mutex; switch_memory_pool_t *pool; int32_t flags; unsigned long hash; diff --git a/src/switch_core_db.c b/src/switch_core_db.c index d6e3f08bf3..893626b8e1 100644 --- a/src/switch_core_db.c +++ b/src/switch_core_db.c @@ -86,7 +86,7 @@ SWITCH_DECLARE(const char *) switch_core_db_errmsg(switch_core_db_t *db) SWITCH_DECLARE(int) switch_core_db_exec(switch_core_db_t *db, const char *sql, switch_core_db_callback_func_t callback, void *data, char **errmsg) { int ret = 0; - int sane = 100; + int sane = 300; char *err = NULL; while (--sane > 0) { diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c index 6c746006fb..3593916979 100644 --- a/src/switch_core_sqldb.c +++ b/src/switch_core_sqldb.c @@ -35,28 +35,6 @@ #include #include "private/switch_core_pvt.h" - -#define SWITCH_CORE_DB "core" -/*! - \brief Open the default system database -*/ -SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line) -{ - switch_cache_db_connection_options_t options = { {0} }; - - if (runtime.odbc_dsn && runtime.odbc_user && runtime.odbc_pass) { - options.odbc_options.dsn = runtime.odbc_dsn; - options.odbc_options.user = runtime.odbc_user; - options.odbc_options.pass = runtime.odbc_pass; - - return _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line); - } else { - options.core_db_options.db_path = SWITCH_CORE_DB; - return _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line); - } -} - - static struct { switch_cache_db_handle_t *event_db; switch_queue_t *sql_queue[2]; @@ -65,10 +43,39 @@ static struct { switch_thread_t *thread; int thread_running; switch_bool_t manage; + switch_mutex_t *io_mutex; + switch_mutex_t *dbh_mutex; + switch_hash_t *dbh_hash; } sql_manager; -static switch_mutex_t *dbh_mutex = NULL; -static switch_hash_t *dbh_hash = NULL; + +#define SWITCH_CORE_DB "core" +/*! + \brief Open the default system database +*/ +SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line) +{ + switch_cache_db_connection_options_t options = { {0} }; + switch_status_t r; + + if (runtime.odbc_dsn && runtime.odbc_user && runtime.odbc_pass) { + options.odbc_options.dsn = runtime.odbc_dsn; + options.odbc_options.user = runtime.odbc_user; + options.odbc_options.pass = runtime.odbc_pass; + + r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line); + } else { + options.core_db_options.db_path = SWITCH_CORE_DB; + r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line); + } + + if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) { + (*dbh)->io_mutex = sql_manager.io_mutex; + } + + return r; +} + #define SQL_CACHE_TIMEOUT 300 @@ -81,11 +88,11 @@ static void sql_close(time_t prune) int locked = 0; char *key; - switch_mutex_lock(dbh_mutex); + switch_mutex_lock(sql_manager.dbh_mutex); top: locked = 0; - for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) { + for (hi = switch_hash_first(NULL, sql_manager.dbh_hash); hi; hi = switch_hash_next(hi)) { switch_hash_this(hi, &var, NULL, &val); key = (char *) var; @@ -117,7 +124,7 @@ static void sql_close(time_t prune) break; } - switch_core_hash_delete(dbh_hash, key); + switch_core_hash_delete(sql_manager.dbh_hash, key); switch_mutex_unlock(dbh->mutex); switch_core_destroy_memory_pool(&dbh->pool); goto top; @@ -133,7 +140,7 @@ static void sql_close(time_t prune) goto top; } - switch_mutex_unlock(dbh_mutex); + switch_mutex_unlock(sql_manager.dbh_mutex); } @@ -149,7 +156,7 @@ SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t SWITCH_DECLARE(void) switch_cache_db_destroy_db_handle(switch_cache_db_handle_t **dbh) { if (dbh && *dbh) { - switch_mutex_lock(dbh_mutex); + switch_mutex_lock(sql_manager.dbh_mutex); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Deleting DB connection %s\n", (*dbh)->name); switch ((*dbh)->type) { @@ -167,11 +174,11 @@ SWITCH_DECLARE(void) switch_cache_db_destroy_db_handle(switch_cache_db_handle_t } - switch_core_hash_delete(dbh_hash, (*dbh)->name); + switch_core_hash_delete(sql_manager.dbh_hash, (*dbh)->name); switch_mutex_unlock((*dbh)->mutex); switch_core_destroy_memory_pool(&(*dbh)->pool); *dbh = NULL; - switch_mutex_unlock(dbh_mutex); + switch_mutex_unlock(sql_manager.dbh_mutex); } } @@ -185,9 +192,9 @@ SWITCH_DECLARE(void) switch_cache_db_detach(void) switch_cache_db_handle_t *dbh = NULL; snprintf(thread_str, sizeof(thread_str) - 1, "%lu", (unsigned long)(intptr_t)switch_thread_self()); - switch_mutex_lock(dbh_mutex); + switch_mutex_lock(sql_manager.dbh_mutex); - for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) { + for (hi = switch_hash_first(NULL, sql_manager.dbh_hash); hi; hi = switch_hash_next(hi)) { switch_hash_this(hi, &var, NULL, &val); key = (char *) var; if ((dbh = (switch_cache_db_handle_t *) val)) { @@ -202,7 +209,7 @@ SWITCH_DECLARE(void) switch_cache_db_detach(void) } } - switch_mutex_unlock(dbh_mutex); + switch_mutex_unlock(sql_manager.dbh_mutex); } SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_handle_t **dbh, @@ -246,8 +253,8 @@ SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_ha snprintf(thread_str, sizeof(thread_str) - 1, "%s;thread=\"%lu\"", db_str, (unsigned long)(intptr_t)self); snprintf(db_callsite_str, sizeof(db_callsite_str) - 1, "%s:%d", file, line); - switch_mutex_lock(dbh_mutex); - if ((new_dbh = switch_core_hash_find(dbh_hash, thread_str))) { + switch_mutex_lock(sql_manager.dbh_mutex); + if ((new_dbh = switch_core_hash_find(sql_manager.dbh_hash, thread_str))) { switch_set_string(new_dbh->last_user, db_callsite_str); switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10, "Reuse Cached DB handle %s [%s]\n", thread_str, switch_cache_db_type_name(new_dbh->type)); @@ -260,7 +267,7 @@ SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_ha hash = switch_ci_hashfunc_default(db_str, &hlen); - for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) { + for (hi = switch_hash_first(NULL, sql_manager.dbh_hash); hi; hi = switch_hash_next(hi)) { switch_hash_this(hi, &var, NULL, &val); key = (char *) var; @@ -337,14 +344,14 @@ SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_ha switch_set_string(new_dbh->creator, db_callsite_str); switch_mutex_lock(new_dbh->mutex); - switch_core_hash_insert(dbh_hash, new_dbh->name, new_dbh); + switch_core_hash_insert(sql_manager.dbh_hash, new_dbh->name, new_dbh); } end: if (new_dbh) new_dbh->last_used = switch_epoch_time_now(NULL); - switch_mutex_unlock(dbh_mutex); + switch_mutex_unlock(sql_manager.dbh_mutex); *dbh = new_dbh; @@ -357,6 +364,10 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t switch_status_t status = SWITCH_STATUS_FALSE; char *errmsg = NULL; + if (dbh->io_mutex) { + switch_mutex_lock(dbh->io_mutex); + } + if (err) *err = NULL; switch (dbh->type) { @@ -386,6 +397,11 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t free(errmsg); } } + + + if (dbh->io_mutex) { + switch_mutex_unlock(dbh->io_mutex); + } return status; } @@ -456,6 +472,9 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand { switch_status_t status = SWITCH_STATUS_FALSE; + if (dbh->io_mutex) { + switch_mutex_lock(dbh->io_mutex); + } switch (dbh->type) { default: @@ -465,6 +484,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand break; } + if (dbh->io_mutex) { + switch_mutex_unlock(dbh->io_mutex); + } + return status; } @@ -475,6 +498,10 @@ SWITCH_DECLARE(char *) switch_cache_db_execute_sql2str(switch_cache_db_handle_t switch_status_t status = SWITCH_STATUS_FALSE; + if (dbh->io_mutex) { + switch_mutex_lock(dbh->io_mutex); + } + switch (dbh->type) { case SCDB_TYPE_CORE_DB: { @@ -522,6 +549,10 @@ SWITCH_DECLARE(char *) switch_cache_db_execute_sql2str(switch_cache_db_handle_t end: + if (dbh->io_mutex) { + switch_mutex_unlock(dbh->io_mutex); + } + return status == SWITCH_STATUS_SUCCESS ? str : NULL; } @@ -537,6 +568,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute(switch_cache_ retries = 1000; } + if (dbh->io_mutex) { + switch_mutex_lock(dbh->io_mutex); + } + while (retries > 0) { switch_cache_db_execute_sql_real(dbh, sql, &errmsg); if (errmsg) { @@ -554,6 +589,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute(switch_cache_ } } + if (dbh->io_mutex) { + switch_mutex_unlock(dbh->io_mutex); + } + return status; } @@ -571,6 +610,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_ retries = 1000; } + if (dbh->io_mutex) { + switch_mutex_lock(dbh->io_mutex); + } + again: while (begin_retries > 0) { @@ -629,6 +672,11 @@ done: switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL); + if (dbh->io_mutex) { + switch_mutex_unlock(dbh->io_mutex); + } + + return status; } @@ -641,6 +689,9 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cach if (err) *err = NULL; + if (dbh->io_mutex) { + switch_mutex_lock(dbh->io_mutex); + } switch (dbh->type) { @@ -661,6 +712,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cach break; } + if (dbh->io_mutex) { + switch_mutex_unlock(dbh->io_mutex); + } + return status; } @@ -668,6 +723,11 @@ SWITCH_DECLARE(void) switch_cache_db_test_reactive(switch_cache_db_handle_t *dbh { char *errmsg; + + if (dbh->io_mutex) { + switch_mutex_lock(dbh->io_mutex); + } + switch (dbh->type) { case SCDB_TYPE_ODBC: { @@ -707,17 +767,22 @@ SWITCH_DECLARE(void) switch_cache_db_test_reactive(switch_cache_db_handle_t *dbh } break; } + + + if (dbh->io_mutex) { + switch_mutex_unlock(dbh->io_mutex); + } } -#define SQLLEN 1024 * 64 +#define SQLLEN 1024 * 1024 static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread, void *obj) { void *pop; uint32_t itterations = 0; uint8_t trans = 0, nothing_in_queue = 0; - uint32_t target = 50000; + uint32_t target = 100000; switch_size_t len = 0, sql_len = SQLLEN; char *tmp, *sqlbuf = (char *) malloc(sql_len); char *sql; @@ -1175,8 +1240,10 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_ sql_manager.memory_pool = pool; sql_manager.manage = manage; - switch_mutex_init(&dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); - switch_core_hash_init(&dbh_hash, sql_manager.memory_pool); + switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); + switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); + + switch_core_hash_init(&sql_manager.dbh_hash, sql_manager.memory_pool); top: @@ -1329,7 +1396,7 @@ void switch_core_sqldb_stop(void) sql_close(0); - switch_core_hash_destroy(&dbh_hash); + switch_core_hash_destroy(&sql_manager.dbh_hash); } @@ -1347,9 +1414,9 @@ SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream) char *pos1 = NULL; char *pos2 = NULL; - switch_mutex_lock(dbh_mutex); + switch_mutex_lock(sql_manager.dbh_mutex); - for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) { + for (hi = switch_hash_first(NULL, sql_manager.dbh_hash); hi; hi = switch_hash_next(hi)) { switch_hash_this(hi, &var, NULL, &val); key = (char *) var; @@ -1385,7 +1452,7 @@ SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream) dbh->last_user); } } - switch_mutex_unlock(dbh_mutex); + switch_mutex_unlock(sql_manager.dbh_mutex); } /* For Emacs: