storage: rework storage to use GAsyncQueue

This commit is contained in:
Kaian 2018-07-29 12:07:07 +02:00
parent b34f321cd1
commit 2087fa4cc2
6 changed files with 144 additions and 107 deletions

View File

@ -5,7 +5,7 @@ project(sngrep
set(PROJECT_NAME sngrep)
set(CMAKE_C_STANDARD 99)
add_compile_options(-Werror)
add_compile_options(-Werror -pg)
#add_compile_options(-Werror -Wall -pedantic -Wextra)
configure_file(

View File

@ -258,7 +258,7 @@ call_list_draw_header(ui_t *ui)
wattron(ui->win, A_REVERSE);
// Get configured sorting options
SStorageSortOpts sort = storage_sort_options();
StorageSortOpts sort = storage_sort_options();
// Draw columns titles
wattron(ui->win, A_BOLD | COLOR_PAIR(CP_DEF_ON_CYAN));
@ -374,7 +374,7 @@ call_list_draw_list(ui_t *ui)
// If autoscroll is enabled, select the last dialog
if (info->autoscroll) {
SStorageSortOpts sort = storage_sort_options();
StorageSortOpts sort = storage_sort_options();
if (sort.asc) {
call_list_move(ui, g_sequence_get_length(info->dcalls) - 1);
} else {
@ -568,7 +568,7 @@ call_list_handle_key(ui_t *ui, int key)
SipCallGroup *group;
int action = -1;
SipCall *call;
SStorageSortOpts sort;
StorageSortOpts sort;
// Sanity check, this should not happen
if (!(info = call_list_info(ui)))
@ -834,7 +834,7 @@ call_list_handle_menu_key(ui_t *ui, int key)
MENU *menu;
int i;
int action = -1;
SStorageSortOpts sort;
StorageSortOpts sort;
enum sip_attr_id id;
// Get panel information

View File

@ -81,9 +81,9 @@ main(int argc, char* argv[])
gchar *output_file = NULL;
gchar *config_file = NULL;
gchar *keyfile = NULL;
SStorageSortOpts storage_sopts = {};
SStorageMatchOpts storage_mopts = {};
SStorageCaptureOpts storage_copts = {};
StorageSortOpts storage_sopts = {};
StorageMatchOpts storage_mopts = {};
StorageCaptureOpts storage_copts = {};
CaptureManager *manager;
CaptureInput *input;
CaptureOutput *output;

View File

@ -301,7 +301,7 @@ packet_sip_parse(PacketParser *parser, Packet *packet, GByteArray *data)
packet_parser_next_dissector(parser, packet, data);
// Add data to storage
storage_check_sip_packet(packet);
storage_add_packet(packet);
return NULL;
}

View File

@ -39,56 +39,60 @@
* All parsed calls will be added to this list, only accesible from
* this awesome structure, so, keep it thread-safe.
*/
sip_call_list_t calls =
{0};
Storage storage = { };
gboolean
storage_init(SStorageCaptureOpts capture_options,
SStorageMatchOpts match_options,
SStorageSortOpts sort_options,
storage_init(StorageCaptureOpts capture_options,
StorageMatchOpts match_options,
StorageSortOpts sort_options,
GError **error)
{
GRegexCompileFlags cflags = G_REGEX_EXTENDED;
GRegexMatchFlags mflags = G_REGEX_MATCH_NEWLINE_CRLF;
calls.capture = capture_options;
calls.match = match_options;
calls.sort = sort_options;
storage.capture = capture_options;
storage.match = match_options;
storage.sort = sort_options;
// Store capture limit
calls.last_index = 0;
storage.last_index = 0;
// Validate match expression
if (calls.match.mexpr) {
if (storage.match.mexpr) {
// Case insensitive requested
if (calls.match.micase) {
if (storage.match.micase) {
cflags |= G_REGEX_CASELESS;
}
// Check the expresion is a compilable regexp
calls.match.mregex = g_regex_new(calls.match.mexpr, cflags, 0, error);
if (calls.match.mregex == NULL) {
storage.match.mregex = g_regex_new(storage.match.mexpr, cflags, mflags, error);
if (storage.match.mregex == NULL) {
return FALSE;
}
}
// Initialize storage packet queue
storage.pkt_queue = g_async_queue_new();
// Create a vector to store calls
calls.list = g_sequence_new(call_destroy);
calls.active = g_sequence_new(NULL);
storage.list = g_sequence_new(call_destroy);
storage.active = g_sequence_new(NULL);
// Create hash table for callid search
calls.callids = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL);
storage.callids = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL);
// Set default sorting field
if (sip_attr_from_name(setting_get_value(SETTING_CL_SORTFIELD)) >= 0) {
calls.sort.by = sip_attr_from_name(setting_get_value(SETTING_CL_SORTFIELD));
calls.sort.asc = (!strcmp(setting_get_value(SETTING_CL_SORTORDER), "asc"));
storage.sort.by = sip_attr_from_name(setting_get_value(SETTING_CL_SORTFIELD));
storage.sort.asc = (!strcmp(setting_get_value(SETTING_CL_SORTORDER), "asc"));
} else {
// Fallback to default sorting field
calls.sort.by = SIP_ATTR_CALLINDEX;
calls.sort.asc = true;
storage.sort.by = SIP_ATTR_CALLINDEX;
storage.sort.asc = true;
}
storage.running = TRUE;
storage.thread = g_thread_new(NULL, (GThreadFunc) storage_check_packet, NULL);
return TRUE;
}
@ -96,28 +100,54 @@ storage_init(SStorageCaptureOpts capture_options,
void
storage_deinit()
{
// Stop storage thread
storage.running = FALSE;
g_thread_join(storage.thread);
// Remove all calls
storage_calls_clear();
// Remove Call-id hash table
g_hash_table_destroy(calls.callids);
g_hash_table_destroy(storage.callids);
// Remove calls vector
g_sequence_free(calls.list);
g_sequence_free(calls.active);
g_sequence_free(storage.list);
g_sequence_free(storage.active);
}
SStorageCaptureOpts
//! Start capturing packets function
void storage_check_packet()
{
while (storage.running) {
Packet *packet = g_async_queue_timeout_pop(storage.pkt_queue, 500000);
if (packet) {
if (packet_has_type(packet, PACKET_SIP)) {
storage_check_sip_packet(packet);
} else if (packet_has_type(packet, PACKET_RTP)) {
storage_check_rtp_packet(packet);
}
}
}
}
void
storage_add_packet(Packet *packet)
{
g_async_queue_push(storage.pkt_queue, packet);
}
StorageCaptureOpts
storage_capture_options()
{
return calls.capture;
return storage.capture;
}
gint
storage_sorter(gconstpointer a, gconstpointer b, G_GNUC_UNUSED gpointer user_data)
{
const SipCall *calla = a, *callb = b;
int cmp = call_attr_compare(calla, callb, calls.sort.by);
return (calls.sort.asc) ? cmp : cmp * -1;
int cmp = call_attr_compare(calla, callb, storage.sort.by);
return (storage.sort.asc) ? cmp : cmp * -1;
}
SipMsg *
@ -129,7 +159,7 @@ storage_check_sip_packet(Packet *packet)
PacketSipData *sip_data = g_ptr_array_index(packet->proto, PACKET_SIP);
// Create a new message from this data
// FIXME Create a new message from this data
msg = msg_create();
msg->cseq = sip_data->cseq;
msg->sip_from = sip_data->from;
@ -142,30 +172,30 @@ storage_check_sip_packet(Packet *packet)
// Check if payload matches expression
if (!storage_check_match_expr(sip_data->payload))
goto skip_message;
return NULL;
// User requested only INVITE starting dialogs
if (calls.match.invite && msg->reqresp != SIP_METHOD_INVITE)
goto skip_message;
if (storage.match.invite && sip_data->reqresp != SIP_METHOD_INVITE)
return NULL;
// Only create a new call if the first msg
// is a request message in the following gorup
if (calls.match.complete && msg->reqresp > SIP_METHOD_MESSAGE)
goto skip_message;
if (storage.match.complete && sip_data->reqresp > SIP_METHOD_MESSAGE)
return NULL;
// Rotate call list if limit has been reached
if (calls.capture.limit == storage_calls_count())
if (storage.capture.limit == storage_calls_count())
storage_calls_rotate();
// Create the call if not found
if (!(call = call_create(sip_data->callid, sip_data->xcallid)))
goto skip_message;
return NULL;
// Add this Call-Id to hash table
g_hash_table_insert(calls.callids, call->callid, call);
g_hash_table_insert(storage.callids, call->callid, call);
// Set call index
call->index = ++calls.last_index;
call->index = ++storage.last_index;
// Mark this as a new call
newcall = true;
@ -196,35 +226,28 @@ storage_check_sip_packet(Packet *packet)
// Check if this call should be in active call list
if (call_is_active(call)) {
if (storage_call_is_active(call)) {
g_sequence_append(calls.active, call);
g_sequence_append(storage.active, call);
}
} else {
if (storage_call_is_active(call)) {
g_sequence_remove_data(calls.active, call);
g_sequence_remove_data(storage.active, call);
}
}
}
if (newcall) {
// Append this call to the call list
g_sequence_insert_sorted(calls.list, call, storage_sorter, NULL);
g_sequence_insert_sorted(storage.list, call, storage_sorter, NULL);
}
// Mark the list as changed
calls.changed = true;
storage.changed = true;
// Send this packet to all capture outputs
capture_manager_output_packet(capture_manager(), packet);
// Return the loaded message
return msg;
skip_message:
// Deallocate message memory
msg_destroy(msg);
return NULL;
}
rtp_stream_t *
@ -333,46 +356,46 @@ storage_check_rtp_packet(Packet *packet)
gboolean
storage_calls_changed()
{
gboolean changed = calls.changed;
calls.changed = false;
gboolean changed = storage.changed;
storage.changed = false;
return changed;
}
int
storage_calls_count()
{
return g_sequence_get_length(calls.list);
return g_sequence_get_length(storage.list);
}
GSequenceIter *
storage_calls_iterator()
{
return g_sequence_get_begin_iter(calls.list);
return g_sequence_get_begin_iter(storage.list);
}
gboolean
storage_call_is_active(SipCall *call)
{
return g_sequence_index(calls.active, call) != -1;
return g_sequence_index(storage.active, call) != -1;
}
GSequence *
storage_calls_vector()
{
return calls.list;
return storage.list;
}
GSequence *
storage_active_calls_vector()
{
return calls.active;
return storage.active;
}
sip_stats_t
storage_calls_stats()
{
sip_stats_t stats = {};
GSequenceIter *it = g_sequence_get_begin_iter(calls.list);
GSequenceIter *it = g_sequence_get_begin_iter(storage.list);
// Total number of calls without filtering
stats.total = g_sequence_iter_length(it);
@ -387,7 +410,7 @@ storage_calls_stats()
SipCall *
storage_find_by_callid(const char *callid)
{
return g_hash_table_lookup(calls.callids, callid);
return g_hash_table_lookup(storage.callids, callid);
}
void
@ -441,30 +464,30 @@ void
storage_calls_clear()
{
// Create again the callid hash table
g_hash_table_remove_all(calls.callids);
g_hash_table_remove_all(storage.callids);
// Remove all items from vector
g_sequence_remove_all(calls.list);
g_sequence_remove_all(calls.active);
g_sequence_remove_all(storage.list);
g_sequence_remove_all(storage.active);
}
void
storage_calls_clear_soft()
{
// Create again the callid hash table
g_hash_table_remove_all(calls.callids);
g_hash_table_remove_all(storage.callids);
// Repopulate list applying current filter
calls.list = g_sequence_copy(storage_calls_vector(), filter_check_call, NULL);
calls.active = g_sequence_copy(storage_active_calls_vector(), filter_check_call, NULL);
storage.list = g_sequence_copy(storage_calls_vector(), filter_check_call, NULL);
storage.active = g_sequence_copy(storage_active_calls_vector(), filter_check_call, NULL);
// Repopulate callids based on filtered list
SipCall *call;
GSequenceIter *it = g_sequence_get_begin_iter(calls.list);
GSequenceIter *it = g_sequence_get_begin_iter(storage.list);
for (; !g_sequence_iter_is_end(it); it = g_sequence_iter_next(it)) {
call = g_sequence_get(it);
g_hash_table_insert(calls.callids, call->callid, call);
g_hash_table_insert(storage.callids, call->callid, call);
}
}
@ -472,15 +495,15 @@ void
storage_calls_rotate()
{
SipCall *call;
GSequenceIter *it = g_sequence_get_begin_iter(calls.list);
GSequenceIter *it = g_sequence_get_begin_iter(storage.list);
for (; !g_sequence_iter_is_end(it); it = g_sequence_iter_next(it)) {
call = g_sequence_get(it);
if (!call->locked) {
// Remove from callids hash
g_hash_table_remove(calls.callids, call->callid);
g_hash_table_remove(storage.callids, call->callid);
// Remove first call from active and call lists
g_sequence_remove_data(calls.active, call);
g_sequence_remove_data(calls.list, call);
g_sequence_remove_data(storage.active, call);
g_sequence_remove_data(storage.list, call);
return;
}
}
@ -489,35 +512,35 @@ storage_calls_rotate()
const char *
storage_match_expr()
{
return calls.match.mexpr;
return storage.match.mexpr;
}
int
storage_check_match_expr(const char *payload)
{
// Everything matches when there is no match
if (calls.match.mexpr == NULL)
if (storage.match.mexpr == NULL)
return 1;
// Check if payload matches the given expresion
if (g_regex_match(calls.match.mregex, payload, 0, NULL)) {
return 0 == calls.match.minvert;
if (g_regex_match(storage.match.mregex, payload, 0, NULL)) {
return 0 == storage.match.minvert;
} else {
return 1 == calls.match.minvert;
return 1 == storage.match.minvert;
}
}
void
storage_set_sort_options(SStorageSortOpts sort)
storage_set_sort_options(StorageSortOpts sort)
{
calls.sort = sort;
g_sequence_sort(calls.list, storage_sorter, NULL);
storage.sort = sort;
g_sequence_sort(storage.list, storage_sorter, NULL);
}
SStorageSortOpts
StorageSortOpts
storage_sort_options()
{
return calls.sort;
return storage.sort;
}

View File

@ -36,16 +36,16 @@
#define MAX_SIP_PAYLOAD 10240
//! Shorter declaration of sip_call_list structure
typedef struct sip_call_list sip_call_list_t;
typedef struct _Storage Storage;
//! Shorter declaration of sip stats
typedef struct sip_stats sip_stats_t;
//! Shorter declaration of structs
typedef struct _SStorageSortOpts SStorageSortOpts;
typedef struct _SStorageMatchOpts SStorageMatchOpts;
typedef struct _SStorageCaptureOpts SStorageCaptureOpts;
typedef struct _StorageSortOpts StorageSortOpts;
typedef struct _StorageMatchOpts StorageMatchOpts;
typedef struct _StorageCaptureOpts StorageCaptureOpts;
struct _SStorageSortOpts
struct _StorageSortOpts
{
//! Sort call list by this attribute
enum sip_attr_id by;
@ -53,7 +53,7 @@ struct _SStorageSortOpts
gboolean asc;
};
struct _SStorageMatchOpts
struct _StorageMatchOpts
{
//! Only store dialogs starting with INVITE
gboolean invite;
@ -69,7 +69,7 @@ struct _SStorageMatchOpts
GRegex *mregex;
};
struct _SStorageCaptureOpts
struct _StorageCaptureOpts
{
//! Max number of calls in the list
guint limit;
@ -97,14 +97,14 @@ struct sip_stats
*
* This structure acts as header of calls list
*/
struct sip_call_list
struct _Storage
{
// Matching options
struct _SStorageMatchOpts match;
// Capture options
struct _SStorageCaptureOpts capture;
//! Matching options
StorageMatchOpts match;
//! Capture options
StorageCaptureOpts capture;
//! Sort call list following this options
struct _SStorageSortOpts sort;
StorageSortOpts sort;
//! List of all captured calls
GSequence *list;
//! List of active captured calls
@ -115,6 +115,12 @@ struct sip_call_list
int last_index;
//! Call-Ids hash table
GHashTable *callids;
//! Pending packets to be parsed queue
GAsyncQueue *pkt_queue;
//! Storage thread
GThread *thread;
//! Running thread flag
gboolean running;
};
/**
@ -122,9 +128,9 @@ struct sip_call_list
*
*/
gboolean
storage_init(SStorageCaptureOpts capture_options,
SStorageMatchOpts match_options,
SStorageSortOpts sort_options,
storage_init(StorageCaptureOpts capture_options,
StorageMatchOpts match_options,
StorageSortOpts sort_options,
GError **error);
/**
@ -133,9 +139,17 @@ storage_init(SStorageCaptureOpts capture_options,
void
storage_deinit();
SStorageCaptureOpts
void
storage_check_packet();
void
storage_add_packet(Packet *packet);
StorageCaptureOpts
storage_capture_options();
/**
* @brief Loads a new message from raw header/payload
*
@ -278,9 +292,9 @@ int
storage_check_match_expr(const char *payload);
void
storage_set_sort_options(SStorageSortOpts sort);
storage_set_sort_options(StorageSortOpts sort);
SStorageSortOpts
StorageSortOpts
storage_sort_options();
#endif