From 2087fa4cc2a3297a9645f853273e03bc5cb371ad Mon Sep 17 00:00:00 2001 From: Kaian Date: Sun, 29 Jul 2018 12:07:07 +0200 Subject: [PATCH] storage: rework storage to use GAsyncQueue --- CMakeLists.txt | 2 +- src/curses/screens/ui_call_list.c | 8 +- src/main.c | 6 +- src/packet/dissectors/packet_sip.c | 2 +- src/storage.c | 181 ++++++++++++++++------------- src/storage.h | 52 ++++++--- 6 files changed, 144 insertions(+), 107 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 08eec1e..4027a10 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ project(sngrep set(PROJECT_NAME sngrep) set(CMAKE_C_STANDARD 99) -add_compile_options(-Werror) +add_compile_options(-Werror -pg) #add_compile_options(-Werror -Wall -pedantic -Wextra) configure_file( diff --git a/src/curses/screens/ui_call_list.c b/src/curses/screens/ui_call_list.c index aaeb437..ce42afe 100644 --- a/src/curses/screens/ui_call_list.c +++ b/src/curses/screens/ui_call_list.c @@ -258,7 +258,7 @@ call_list_draw_header(ui_t *ui) wattron(ui->win, A_REVERSE); // Get configured sorting options - SStorageSortOpts sort = storage_sort_options(); + StorageSortOpts sort = storage_sort_options(); // Draw columns titles wattron(ui->win, A_BOLD | COLOR_PAIR(CP_DEF_ON_CYAN)); @@ -374,7 +374,7 @@ call_list_draw_list(ui_t *ui) // If autoscroll is enabled, select the last dialog if (info->autoscroll) { - SStorageSortOpts sort = storage_sort_options(); + StorageSortOpts sort = storage_sort_options(); if (sort.asc) { call_list_move(ui, g_sequence_get_length(info->dcalls) - 1); } else { @@ -568,7 +568,7 @@ call_list_handle_key(ui_t *ui, int key) SipCallGroup *group; int action = -1; SipCall *call; - SStorageSortOpts sort; + StorageSortOpts sort; // Sanity check, this should not happen if (!(info = call_list_info(ui))) @@ -834,7 +834,7 @@ call_list_handle_menu_key(ui_t *ui, int key) MENU *menu; int i; int action = -1; - SStorageSortOpts sort; + StorageSortOpts sort; enum sip_attr_id id; // Get panel information diff --git a/src/main.c b/src/main.c index 3c8816e..f0938f8 100644 --- a/src/main.c +++ b/src/main.c @@ -81,9 +81,9 @@ main(int argc, char* argv[]) gchar *output_file = NULL; gchar *config_file = NULL; gchar *keyfile = NULL; - SStorageSortOpts storage_sopts = {}; - SStorageMatchOpts storage_mopts = {}; - SStorageCaptureOpts storage_copts = {}; + StorageSortOpts storage_sopts = {}; + StorageMatchOpts storage_mopts = {}; + StorageCaptureOpts storage_copts = {}; CaptureManager *manager; CaptureInput *input; CaptureOutput *output; diff --git a/src/packet/dissectors/packet_sip.c b/src/packet/dissectors/packet_sip.c index c349951..62a6ef5 100644 --- a/src/packet/dissectors/packet_sip.c +++ b/src/packet/dissectors/packet_sip.c @@ -301,7 +301,7 @@ packet_sip_parse(PacketParser *parser, Packet *packet, GByteArray *data) packet_parser_next_dissector(parser, packet, data); // Add data to storage - storage_check_sip_packet(packet); + storage_add_packet(packet); return NULL; } diff --git a/src/storage.c b/src/storage.c index b89cd5f..fed4413 100644 --- a/src/storage.c +++ b/src/storage.c @@ -39,56 +39,60 @@ * All parsed calls will be added to this list, only accesible from * this awesome structure, so, keep it thread-safe. */ -sip_call_list_t calls = - {0}; +Storage storage = { }; gboolean -storage_init(SStorageCaptureOpts capture_options, - SStorageMatchOpts match_options, - SStorageSortOpts sort_options, +storage_init(StorageCaptureOpts capture_options, + StorageMatchOpts match_options, + StorageSortOpts sort_options, GError **error) { GRegexCompileFlags cflags = G_REGEX_EXTENDED; GRegexMatchFlags mflags = G_REGEX_MATCH_NEWLINE_CRLF; - calls.capture = capture_options; - calls.match = match_options; - calls.sort = sort_options; + storage.capture = capture_options; + storage.match = match_options; + storage.sort = sort_options; // Store capture limit - calls.last_index = 0; + storage.last_index = 0; // Validate match expression - if (calls.match.mexpr) { + if (storage.match.mexpr) { // Case insensitive requested - if (calls.match.micase) { + if (storage.match.micase) { cflags |= G_REGEX_CASELESS; } // Check the expresion is a compilable regexp - calls.match.mregex = g_regex_new(calls.match.mexpr, cflags, 0, error); - if (calls.match.mregex == NULL) { + storage.match.mregex = g_regex_new(storage.match.mexpr, cflags, mflags, error); + if (storage.match.mregex == NULL) { return FALSE; } } + // Initialize storage packet queue + storage.pkt_queue = g_async_queue_new(); + // Create a vector to store calls - calls.list = g_sequence_new(call_destroy); - calls.active = g_sequence_new(NULL); + storage.list = g_sequence_new(call_destroy); + storage.active = g_sequence_new(NULL); // Create hash table for callid search - calls.callids = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL); + storage.callids = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL); // Set default sorting field if (sip_attr_from_name(setting_get_value(SETTING_CL_SORTFIELD)) >= 0) { - calls.sort.by = sip_attr_from_name(setting_get_value(SETTING_CL_SORTFIELD)); - calls.sort.asc = (!strcmp(setting_get_value(SETTING_CL_SORTORDER), "asc")); + storage.sort.by = sip_attr_from_name(setting_get_value(SETTING_CL_SORTFIELD)); + storage.sort.asc = (!strcmp(setting_get_value(SETTING_CL_SORTORDER), "asc")); } else { // Fallback to default sorting field - calls.sort.by = SIP_ATTR_CALLINDEX; - calls.sort.asc = true; + storage.sort.by = SIP_ATTR_CALLINDEX; + storage.sort.asc = true; } + storage.running = TRUE; + storage.thread = g_thread_new(NULL, (GThreadFunc) storage_check_packet, NULL); return TRUE; } @@ -96,28 +100,54 @@ storage_init(SStorageCaptureOpts capture_options, void storage_deinit() { + // Stop storage thread + storage.running = FALSE; + g_thread_join(storage.thread); + // Remove all calls storage_calls_clear(); // Remove Call-id hash table - g_hash_table_destroy(calls.callids); + g_hash_table_destroy(storage.callids); // Remove calls vector - g_sequence_free(calls.list); - g_sequence_free(calls.active); + g_sequence_free(storage.list); + g_sequence_free(storage.active); } -SStorageCaptureOpts +//! Start capturing packets function +void storage_check_packet() +{ + while (storage.running) { + + Packet *packet = g_async_queue_timeout_pop(storage.pkt_queue, 500000); + if (packet) { + if (packet_has_type(packet, PACKET_SIP)) { + storage_check_sip_packet(packet); + } else if (packet_has_type(packet, PACKET_RTP)) { + storage_check_rtp_packet(packet); + } + } + } +} + +void +storage_add_packet(Packet *packet) +{ + g_async_queue_push(storage.pkt_queue, packet); +} + +StorageCaptureOpts storage_capture_options() { - return calls.capture; + return storage.capture; } gint storage_sorter(gconstpointer a, gconstpointer b, G_GNUC_UNUSED gpointer user_data) { const SipCall *calla = a, *callb = b; - int cmp = call_attr_compare(calla, callb, calls.sort.by); - return (calls.sort.asc) ? cmp : cmp * -1; + int cmp = call_attr_compare(calla, callb, storage.sort.by); + return (storage.sort.asc) ? cmp : cmp * -1; } SipMsg * @@ -129,7 +159,7 @@ storage_check_sip_packet(Packet *packet) PacketSipData *sip_data = g_ptr_array_index(packet->proto, PACKET_SIP); - // Create a new message from this data + // FIXME Create a new message from this data msg = msg_create(); msg->cseq = sip_data->cseq; msg->sip_from = sip_data->from; @@ -142,30 +172,30 @@ storage_check_sip_packet(Packet *packet) // Check if payload matches expression if (!storage_check_match_expr(sip_data->payload)) - goto skip_message; + return NULL; // User requested only INVITE starting dialogs - if (calls.match.invite && msg->reqresp != SIP_METHOD_INVITE) - goto skip_message; + if (storage.match.invite && sip_data->reqresp != SIP_METHOD_INVITE) + return NULL; // Only create a new call if the first msg // is a request message in the following gorup - if (calls.match.complete && msg->reqresp > SIP_METHOD_MESSAGE) - goto skip_message; + if (storage.match.complete && sip_data->reqresp > SIP_METHOD_MESSAGE) + return NULL; // Rotate call list if limit has been reached - if (calls.capture.limit == storage_calls_count()) + if (storage.capture.limit == storage_calls_count()) storage_calls_rotate(); // Create the call if not found if (!(call = call_create(sip_data->callid, sip_data->xcallid))) - goto skip_message; + return NULL; // Add this Call-Id to hash table - g_hash_table_insert(calls.callids, call->callid, call); + g_hash_table_insert(storage.callids, call->callid, call); // Set call index - call->index = ++calls.last_index; + call->index = ++storage.last_index; // Mark this as a new call newcall = true; @@ -196,35 +226,28 @@ storage_check_sip_packet(Packet *packet) // Check if this call should be in active call list if (call_is_active(call)) { if (storage_call_is_active(call)) { - g_sequence_append(calls.active, call); + g_sequence_append(storage.active, call); } } else { if (storage_call_is_active(call)) { - g_sequence_remove_data(calls.active, call); + g_sequence_remove_data(storage.active, call); } } } if (newcall) { // Append this call to the call list - g_sequence_insert_sorted(calls.list, call, storage_sorter, NULL); + g_sequence_insert_sorted(storage.list, call, storage_sorter, NULL); } // Mark the list as changed - calls.changed = true; + storage.changed = true; // Send this packet to all capture outputs capture_manager_output_packet(capture_manager(), packet); - // Return the loaded message return msg; - -skip_message: - // Deallocate message memory - msg_destroy(msg); - return NULL; - } rtp_stream_t * @@ -333,46 +356,46 @@ storage_check_rtp_packet(Packet *packet) gboolean storage_calls_changed() { - gboolean changed = calls.changed; - calls.changed = false; + gboolean changed = storage.changed; + storage.changed = false; return changed; } int storage_calls_count() { - return g_sequence_get_length(calls.list); + return g_sequence_get_length(storage.list); } GSequenceIter * storage_calls_iterator() { - return g_sequence_get_begin_iter(calls.list); + return g_sequence_get_begin_iter(storage.list); } gboolean storage_call_is_active(SipCall *call) { - return g_sequence_index(calls.active, call) != -1; + return g_sequence_index(storage.active, call) != -1; } GSequence * storage_calls_vector() { - return calls.list; + return storage.list; } GSequence * storage_active_calls_vector() { - return calls.active; + return storage.active; } sip_stats_t storage_calls_stats() { sip_stats_t stats = {}; - GSequenceIter *it = g_sequence_get_begin_iter(calls.list); + GSequenceIter *it = g_sequence_get_begin_iter(storage.list); // Total number of calls without filtering stats.total = g_sequence_iter_length(it); @@ -387,7 +410,7 @@ storage_calls_stats() SipCall * storage_find_by_callid(const char *callid) { - return g_hash_table_lookup(calls.callids, callid); + return g_hash_table_lookup(storage.callids, callid); } void @@ -441,30 +464,30 @@ void storage_calls_clear() { // Create again the callid hash table - g_hash_table_remove_all(calls.callids); + g_hash_table_remove_all(storage.callids); // Remove all items from vector - g_sequence_remove_all(calls.list); - g_sequence_remove_all(calls.active); + g_sequence_remove_all(storage.list); + g_sequence_remove_all(storage.active); } void storage_calls_clear_soft() { // Create again the callid hash table - g_hash_table_remove_all(calls.callids); + g_hash_table_remove_all(storage.callids); // Repopulate list applying current filter - calls.list = g_sequence_copy(storage_calls_vector(), filter_check_call, NULL); - calls.active = g_sequence_copy(storage_active_calls_vector(), filter_check_call, NULL); + storage.list = g_sequence_copy(storage_calls_vector(), filter_check_call, NULL); + storage.active = g_sequence_copy(storage_active_calls_vector(), filter_check_call, NULL); // Repopulate callids based on filtered list SipCall *call; - GSequenceIter *it = g_sequence_get_begin_iter(calls.list); + GSequenceIter *it = g_sequence_get_begin_iter(storage.list); for (; !g_sequence_iter_is_end(it); it = g_sequence_iter_next(it)) { call = g_sequence_get(it); - g_hash_table_insert(calls.callids, call->callid, call); + g_hash_table_insert(storage.callids, call->callid, call); } } @@ -472,15 +495,15 @@ void storage_calls_rotate() { SipCall *call; - GSequenceIter *it = g_sequence_get_begin_iter(calls.list); + GSequenceIter *it = g_sequence_get_begin_iter(storage.list); for (; !g_sequence_iter_is_end(it); it = g_sequence_iter_next(it)) { call = g_sequence_get(it); if (!call->locked) { // Remove from callids hash - g_hash_table_remove(calls.callids, call->callid); + g_hash_table_remove(storage.callids, call->callid); // Remove first call from active and call lists - g_sequence_remove_data(calls.active, call); - g_sequence_remove_data(calls.list, call); + g_sequence_remove_data(storage.active, call); + g_sequence_remove_data(storage.list, call); return; } } @@ -489,35 +512,35 @@ storage_calls_rotate() const char * storage_match_expr() { - return calls.match.mexpr; + return storage.match.mexpr; } int storage_check_match_expr(const char *payload) { // Everything matches when there is no match - if (calls.match.mexpr == NULL) + if (storage.match.mexpr == NULL) return 1; // Check if payload matches the given expresion - if (g_regex_match(calls.match.mregex, payload, 0, NULL)) { - return 0 == calls.match.minvert; + if (g_regex_match(storage.match.mregex, payload, 0, NULL)) { + return 0 == storage.match.minvert; } else { - return 1 == calls.match.minvert; + return 1 == storage.match.minvert; } } void -storage_set_sort_options(SStorageSortOpts sort) +storage_set_sort_options(StorageSortOpts sort) { - calls.sort = sort; - g_sequence_sort(calls.list, storage_sorter, NULL); + storage.sort = sort; + g_sequence_sort(storage.list, storage_sorter, NULL); } -SStorageSortOpts +StorageSortOpts storage_sort_options() { - return calls.sort; + return storage.sort; } diff --git a/src/storage.h b/src/storage.h index b4f50c1..8019b2b 100644 --- a/src/storage.h +++ b/src/storage.h @@ -36,16 +36,16 @@ #define MAX_SIP_PAYLOAD 10240 //! Shorter declaration of sip_call_list structure -typedef struct sip_call_list sip_call_list_t; +typedef struct _Storage Storage; //! Shorter declaration of sip stats typedef struct sip_stats sip_stats_t; //! Shorter declaration of structs -typedef struct _SStorageSortOpts SStorageSortOpts; -typedef struct _SStorageMatchOpts SStorageMatchOpts; -typedef struct _SStorageCaptureOpts SStorageCaptureOpts; +typedef struct _StorageSortOpts StorageSortOpts; +typedef struct _StorageMatchOpts StorageMatchOpts; +typedef struct _StorageCaptureOpts StorageCaptureOpts; -struct _SStorageSortOpts +struct _StorageSortOpts { //! Sort call list by this attribute enum sip_attr_id by; @@ -53,7 +53,7 @@ struct _SStorageSortOpts gboolean asc; }; -struct _SStorageMatchOpts +struct _StorageMatchOpts { //! Only store dialogs starting with INVITE gboolean invite; @@ -69,7 +69,7 @@ struct _SStorageMatchOpts GRegex *mregex; }; -struct _SStorageCaptureOpts +struct _StorageCaptureOpts { //! Max number of calls in the list guint limit; @@ -97,14 +97,14 @@ struct sip_stats * * This structure acts as header of calls list */ -struct sip_call_list +struct _Storage { - // Matching options - struct _SStorageMatchOpts match; - // Capture options - struct _SStorageCaptureOpts capture; + //! Matching options + StorageMatchOpts match; + //! Capture options + StorageCaptureOpts capture; //! Sort call list following this options - struct _SStorageSortOpts sort; + StorageSortOpts sort; //! List of all captured calls GSequence *list; //! List of active captured calls @@ -115,6 +115,12 @@ struct sip_call_list int last_index; //! Call-Ids hash table GHashTable *callids; + //! Pending packets to be parsed queue + GAsyncQueue *pkt_queue; + //! Storage thread + GThread *thread; + //! Running thread flag + gboolean running; }; /** @@ -122,9 +128,9 @@ struct sip_call_list * */ gboolean -storage_init(SStorageCaptureOpts capture_options, - SStorageMatchOpts match_options, - SStorageSortOpts sort_options, +storage_init(StorageCaptureOpts capture_options, + StorageMatchOpts match_options, + StorageSortOpts sort_options, GError **error); /** @@ -133,9 +139,17 @@ storage_init(SStorageCaptureOpts capture_options, void storage_deinit(); -SStorageCaptureOpts +void +storage_check_packet(); + +void +storage_add_packet(Packet *packet); + +StorageCaptureOpts storage_capture_options(); + + /** * @brief Loads a new message from raw header/payload * @@ -278,9 +292,9 @@ int storage_check_match_expr(const char *payload); void -storage_set_sort_options(SStorageSortOpts sort); +storage_set_sort_options(StorageSortOpts sort); -SStorageSortOpts +StorageSortOpts storage_sort_options(); #endif