glib: move all threads to GSources for GMainLoop

This commit is contained in:
Kaian 2019-04-06 13:49:52 +02:00
parent 3250582e16
commit 9357b211ea
14 changed files with 492 additions and 359 deletions

View File

@ -54,6 +54,7 @@ set(SOURCES
src/ncurses/dialog.c src/ncurses/dialog.c
src/ncurses/keybinding.c src/ncurses/keybinding.c
src/ncurses/scrollbar.c src/ncurses/scrollbar.c
src/glib/gasyncqueuesource.c
src/filter.c src/filter.c
src/group.c src/group.c
src/main.c src/main.c
@ -111,7 +112,7 @@ pkg_check_modules(CURSES REQUIRED ncursesw menuw panelw formw)
include_directories(${CURSES_INCLUDE_DIRS}) include_directories(${CURSES_INCLUDE_DIRS})
target_link_libraries(sngrep ${CURSES_LIBRARIES}) target_link_libraries(sngrep ${CURSES_LIBRARIES})
pkg_check_modules(GLIB REQUIRED glib-2.0>=2.44) pkg_check_modules(GLIB REQUIRED glib-2.0>=2.44 gobject-2.0>=2.44)
include_directories(${GLIB_INCLUDE_DIRS}) include_directories(${GLIB_INCLUDE_DIRS})
target_link_libraries(sngrep ${GLIB_LIBRARIES}) target_link_libraries(sngrep ${GLIB_LIBRARIES})

View File

@ -8,6 +8,7 @@ Build-Depends: dh-autoreconf,
gnutls-dev, gnutls-dev,
libgcrypt-dev, libgcrypt-dev,
libglib2.0-dev (>= 2.44), libglib2.0-dev (>= 2.44),
gobject2.0-dev,
libpulse-dev, libpulse-dev,
libsndfile1-dev, libsndfile1-dev,
libbcg729-dev (>= 1.0.4), libbcg729-dev (>= 1.0.4),

View File

@ -29,12 +29,39 @@
#include "config.h" #include "config.h"
#include <glib.h> #include <glib.h>
#include "glib/gasyncqueuesource.h"
#include "setting.h" #include "setting.h"
#include "storage.h" #include "storage.h"
#include "capture.h" #include "capture.h"
static CaptureManager *manager; static CaptureManager *manager;
static gboolean
capture_manager_parse_packet(Packet *packet, G_GNUC_UNUSED gpointer user_data)
{
// Initialize parser dissector to first one
PacketParser *parser = packet->parser;
parser->current = parser->dissector_tree;
// Request initial dissector parsing
GByteArray *data = packet->data;
data = packet_parser_next_dissector(parser, packet, data);
// Free not parsed packet data
if (data != NULL) {
g_byte_array_free(data, TRUE);
packet_free(packet);
return TRUE;
}
// Add data to storage
if (storage_check_packet(packet) == NULL) {
packet_free(packet);
}
return TRUE;
}
CaptureManager * CaptureManager *
capture_manager_new() capture_manager_new()
{ {
@ -42,14 +69,20 @@ capture_manager_new()
manager->queue = g_async_queue_new(); manager->queue = g_async_queue_new();
manager->paused = FALSE; manager->paused = FALSE;
manager->running = FALSE;
#ifdef WITH_SSL #ifdef WITH_SSL
// Parse TLS Server setting // Parse TLS Server setting
manager->tlsserver = address_from_str(setting_get_value(SETTING_CAPTURE_TLSSERVER)); manager->tlsserver = address_from_str(setting_get_value(SETTING_CAPTURE_TLSSERVER));
#endif #endif
g_rec_mutex_init(&manager->lock); manager->loop = g_main_loop_new(
g_main_context_new(),
FALSE
);
GSource * source = g_async_queue_source_new(manager->queue, NULL);
g_source_set_callback(source, (GSourceFunc) capture_manager_parse_packet, manager, NULL);
g_source_attach(source, NULL);
return manager; return manager;
} }
@ -74,7 +107,6 @@ capture_manager_free(CaptureManager *manager)
g_slist_free(manager->inputs); g_slist_free(manager->inputs);
g_slist_free(manager->outputs); g_slist_free(manager->outputs);
g_rec_mutex_clear(&manager->lock);
g_async_queue_unref(manager->queue); g_async_queue_unref(manager->queue);
g_free(manager->filter); g_free(manager->filter);
g_free(manager); g_free(manager);
@ -87,60 +119,21 @@ capture_manager()
} }
static gpointer static gpointer
capture_manager_parser_thread(CaptureManager *manager) capture_manager_thread(CaptureManager *manager)
{ {
while (manager->running) { g_main_loop_run(manager->loop);
Packet *packet = g_async_queue_timeout_pop(manager->queue, 500000);
if (packet != NULL) {
// Initialize parser dissector to first one
PacketParser *parser = packet->parser;
parser->current = parser->dissector_tree;
// Request initial dissector parsing
GByteArray *data = packet->data;
data = packet_parser_next_dissector(parser, packet, data);
// Free not parsed packet data
if (data != NULL) {
g_byte_array_free(data, TRUE);
packet_free(packet);
} else {
// Add data to storage
storage_add_packet(packet);
}
}
}
return NULL; return NULL;
} }
void void
capture_manager_start(CaptureManager *manager) capture_manager_start(CaptureManager *manager)
{ {
// Start Parser thread manager->thread = g_thread_new(NULL, (GThreadFunc) capture_manager_thread, manager);
manager->running = TRUE;
manager->thread = g_thread_new(NULL, (GThreadFunc) capture_manager_parser_thread, manager);
// Start all captures threads
for (GSList *le = manager->inputs; le != NULL; le = le->next) {
CaptureInput *input = le->data;
input->running = TRUE;
input->thread = g_thread_new(NULL, (GThreadFunc) input->start, input);
}
} }
void void
capture_manager_stop(CaptureManager *manager) capture_manager_stop(CaptureManager *manager)
{ {
// Stop all capture inputs
for (GSList *le = manager->inputs; le != NULL; le = le->next) {
CaptureInput *input = le->data;
if (input->stop) {
input->stop(input);
}
g_thread_join(input->thread);
}
// Close all capture outputs // Close all capture outputs
for (GSList *le = manager->outputs; le != NULL; le = le->next) { for (GSList *le = manager->outputs; le != NULL; le = le->next) {
CaptureOutput *output = le->data; CaptureOutput *output = le->data;
@ -149,8 +142,8 @@ capture_manager_stop(CaptureManager *manager)
} }
} }
// Stop parser thread // Stop manager thread
manager->running = FALSE; g_main_loop_quit(manager->loop);
g_thread_join(manager->thread); g_thread_join(manager->thread);
} }
@ -189,6 +182,7 @@ void
capture_manager_add_input(CaptureManager *manager, CaptureInput *input) capture_manager_add_input(CaptureManager *manager, CaptureInput *input)
{ {
input->manager = manager; input->manager = manager;
g_source_attach(input->source, g_main_loop_get_context(manager->loop));
manager->inputs = g_slist_append(manager->inputs, input); manager->inputs = g_slist_append(manager->inputs, input);
} }
@ -199,20 +193,6 @@ capture_manager_add_output(CaptureManager *manager, CaptureOutput *output)
manager->outputs = g_slist_append(manager->outputs, output); manager->outputs = g_slist_append(manager->outputs, output);
} }
void
capture_lock(CaptureManager *manager)
{
// Avoid parsing more packet
g_rec_mutex_lock(&manager->lock);
}
void
capture_unlock(CaptureManager *manager)
{
// Allow parsing more packets
g_rec_mutex_unlock(&manager->lock);
}
void void
capture_manager_output_packet(CaptureManager *manager, Packet *packet) capture_manager_output_packet(CaptureManager *manager, Packet *packet)
{ {
@ -224,19 +204,6 @@ capture_manager_output_packet(CaptureManager *manager, Packet *packet)
} }
} }
gboolean
capture_is_running(CaptureManager *manager)
{
// Check if all capture inputs are running
for (GSList *l = manager->inputs; l != NULL; l = l->next) {
CaptureInput *input = l->data;
if (input->running == TRUE) {
return TRUE;
}
}
return FALSE;
}
const gchar * const gchar *
capture_status_desc(CaptureManager *manager) capture_status_desc(CaptureManager *manager)
{ {
@ -247,7 +214,7 @@ capture_status_desc(CaptureManager *manager)
if (input->mode == CAPTURE_MODE_OFFLINE) { if (input->mode == CAPTURE_MODE_OFFLINE) {
offline++; offline++;
if (input->running) { if (!g_source_is_destroyed(input->source)) {
loading++; loading++;
} }
} else { } else {

View File

@ -76,8 +76,6 @@ typedef void (*CaptureOutputFreeFunc)(CaptureOutput *);
*/ */
struct _CaptureManager struct _CaptureManager
{ {
//! Running flag
gboolean running;
//! Key file for TLS decrypt //! Key file for TLS decrypt
const gchar *keyfile; const gchar *keyfile;
//! capture filter expression text //! capture filter expression text
@ -92,10 +90,10 @@ struct _CaptureManager
GSList *outputs; GSList *outputs;
//! Packet waiting to be processed //! Packet waiting to be processed
GAsyncQueue *queue; GAsyncQueue *queue;
//! Capture Lock. Avoid parsing and handling data at the same time //! Packet main loop thread
GRecMutex lock;
//! Packet parser thread
GThread *thread; GThread *thread;
//! Capture Main loop
GMainLoop *loop;
}; };
struct _CaptureInput struct _CaptureInput
@ -107,13 +105,11 @@ struct _CaptureInput
//! Are captured packets life //! Are captured packets life
enum capture_mode mode; enum capture_mode mode;
//! Source string //! Source string
const gchar *source; const gchar *sourcestr;
//! Thread that runs capture callback //! Source of events for this input
GThread *thread; GSource *source;
//! Private capture input data //! Private capture input data
void *priv; gpointer priv;
//! Flag to check if capture is running
gboolean running;
//! Each packet type private data //! Each packet type private data
PacketParser *parser; PacketParser *parser;
@ -136,7 +132,7 @@ struct _CaptureOutput
//! Manager owner of this capture input //! Manager owner of this capture input
CaptureManager *manager; CaptureManager *manager;
//! Private capture output data //! Private capture output data
void *priv; gpointer priv;
//! Dump packet function //! Dump packet function
CaptureOuptutWriteFunc write; CaptureOuptutWriteFunc write;
@ -233,32 +229,12 @@ capture_manager_add_input(CaptureManager *manager, CaptureInput *input);
void void
capture_manager_add_output(CaptureManager *manager, CaptureOutput *output); capture_manager_add_output(CaptureManager *manager, CaptureOutput *output);
/**
* @brief Avoid parsing more packets
*/
void
capture_lock(CaptureManager *manager);
/**
* @brief Allow parsing more packets
*/
void
capture_unlock(CaptureManager *manager);
/** /**
* @brief Store the given packet in call outputs * @brief Store the given packet in call outputs
*/ */
void void
capture_manager_output_packet(CaptureManager *manager, Packet *packet); capture_manager_output_packet(CaptureManager *manager, Packet *packet);
/**
* @brief Determine if any of capture inputs is running
*
* @return TRUE if any capture input is running, FALSE if all are stopped
*/
gboolean
capture_is_running(CaptureManager *manager);
/** /**
* @brief Return a string representing current capture status * @brief Return a string representing current capture status
*/ */

View File

@ -166,7 +166,7 @@ capture_input_hep(const gchar *url, GError **error)
// Create a new structure to handle this capture source // Create a new structure to handle this capture source
input = g_malloc0(sizeof(CaptureInput)); input = g_malloc0(sizeof(CaptureInput));
input->source = g_strdup_printf("L:%s", hep->url.port); input->sourcestr = g_strdup_printf("L:%s", hep->url.port);
input->priv = hep; input->priv = hep;
input->tech = CAPTURE_TECH_HEP; input->tech = CAPTURE_TECH_HEP;
input->mode = CAPTURE_MODE_ONLINE; input->mode = CAPTURE_MODE_ONLINE;

View File

@ -35,6 +35,7 @@
#include <netdb.h> #include <netdb.h>
#include <string.h> #include <string.h>
#include <pcap/sll.h> #include <pcap/sll.h>
#include <glib-unix.h>
#include "glib-extra.h" #include "glib-extra.h"
#include "capture.h" #include "capture.h"
#include "capture_hep.h" #include "capture_hep.h"
@ -60,6 +61,25 @@ capture_input_pcap_free(CaptureInput *input)
g_free(input); g_free(input);
} }
static gboolean
capture_input_pcap_read_packet(G_GNUC_UNUSED gint fd,
G_GNUC_UNUSED GIOCondition condition, CaptureInput *input)
{
// Capture pcap information
CapturePcap *pcap = input->priv;
// Get next packet from this input
struct pcap_pkthdr header;
const guchar *data = pcap_next(pcap->handle, &header);
if (data != NULL) {
// Parse received data
capture_pcap_parse_packet((guchar *) input, &header, data);
}
return data != NULL;
}
CaptureInput * CaptureInput *
capture_input_pcap_online(const gchar *dev, GError **error) capture_input_pcap_online(const gchar *dev, GError **error)
{ {
@ -104,7 +124,7 @@ capture_input_pcap_online(const gchar *dev, GError **error)
// Create a new structure to handle this capture source // Create a new structure to handle this capture source
CaptureInput *input = g_malloc0(sizeof(CaptureInput)); CaptureInput *input = g_malloc0(sizeof(CaptureInput));
input->source = dev; input->sourcestr = dev;
input->priv = pcap; input->priv = pcap;
input->tech = CAPTURE_TECH_PCAP; input->tech = CAPTURE_TECH_PCAP;
input->mode = CAPTURE_MODE_ONLINE; input->mode = CAPTURE_MODE_ONLINE;
@ -118,6 +138,19 @@ capture_input_pcap_online(const gchar *dev, GError **error)
packet_parser_dissector_init(parser, parser->dissector_tree, PACKET_LINK); packet_parser_dissector_init(parser, parser->dissector_tree, PACKET_LINK);
input->parser = parser; input->parser = parser;
// Create GSource for main loop
input->source = g_unix_fd_source_new(
pcap_fileno(pcap->handle),
G_IO_IN | G_IO_ERR | G_IO_HUP
);
g_source_set_callback(
input->source,
(GSourceFunc) capture_input_pcap_read_packet,
input,
(GDestroyNotify) capture_input_pcap_stop
);
return input; return input;
} }
@ -162,7 +195,7 @@ capture_input_pcap_offline(const gchar *infile, GError **error)
// Create a new structure to handle this capture source // Create a new structure to handle this capture source
CaptureInput *input = g_malloc0(sizeof(CaptureInput)); CaptureInput *input = g_malloc0(sizeof(CaptureInput));
input->source = infile; input->sourcestr = infile;
input->priv = pcap; input->priv = pcap;
input->tech = CAPTURE_TECH_PCAP; input->tech = CAPTURE_TECH_PCAP;
input->mode = CAPTURE_MODE_OFFLINE; input->mode = CAPTURE_MODE_OFFLINE;
@ -176,6 +209,19 @@ capture_input_pcap_offline(const gchar *infile, GError **error)
packet_parser_dissector_init(parser, parser->dissector_tree, PACKET_LINK); packet_parser_dissector_init(parser, parser->dissector_tree, PACKET_LINK);
input->parser = parser; input->parser = parser;
// Create GSource for main loop
input->source = g_unix_fd_source_new(
pcap_get_selectable_fd(pcap->handle),
G_IO_IN | G_IO_ERR | G_IO_HUP
);
g_source_set_callback(
input->source,
(GSourceFunc) capture_input_pcap_read_packet,
input,
(GDestroyNotify) capture_input_pcap_stop
);
return input; return input;
} }
@ -188,12 +234,6 @@ capture_input_pcap_start(CaptureInput *input)
// Parse available packets // Parse available packets
pcap_loop(pcap->handle, -1, capture_pcap_parse_packet, (u_char *) input); pcap_loop(pcap->handle, -1, capture_pcap_parse_packet, (u_char *) input);
// Close input file in offline mode
if (input->mode == CAPTURE_MODE_OFFLINE) {
// Mark as finished reading packets
input->running = FALSE;
}
return NULL; return NULL;
} }
@ -209,12 +249,7 @@ capture_input_pcap_stop(CaptureInput *input)
if (input->mode == CAPTURE_MODE_OFFLINE) { if (input->mode == CAPTURE_MODE_OFFLINE) {
pcap_close(pcap->handle); pcap_close(pcap->handle);
} else {
pcap_breakloop(pcap->handle);
} }
// Mark as finished reading packets
input->running = FALSE;
} }
gboolean gboolean
@ -385,7 +420,7 @@ capture_input_pcap_file(CaptureManager *manager)
CaptureInput *input = manager->inputs->data; CaptureInput *input = manager->inputs->data;
if (input->tech == CAPTURE_TECH_PCAP && input->mode == CAPTURE_MODE_OFFLINE) if (input->tech == CAPTURE_TECH_PCAP && input->mode == CAPTURE_MODE_OFFLINE)
return input->source; return input->sourcestr;
return NULL; return NULL;
} }
@ -398,7 +433,7 @@ capture_input_pcap_device(CaptureManager *manager)
CaptureInput *input = manager->inputs->data; CaptureInput *input = manager->inputs->data;
if (input->tech == CAPTURE_TECH_PCAP && input->mode == CAPTURE_MODE_ONLINE) if (input->tech == CAPTURE_TECH_PCAP && input->mode == CAPTURE_MODE_ONLINE)
return input->source; return input->sourcestr;
return NULL; return NULL;
} }

View File

@ -258,10 +258,10 @@ packet_ip_new()
PacketDissector *proto = g_malloc0(sizeof(PacketDissector)); PacketDissector *proto = g_malloc0(sizeof(PacketDissector));
proto->id = PACKET_IP; proto->id = PACKET_IP;
proto->init = packet_ip_init; proto->init = packet_ip_init;
proto->deinit = packet_ip_deinit;
proto->dissect = packet_ip_parse; proto->dissect = packet_ip_parse;
proto->free = packet_ip_free; proto->free = packet_ip_free;
proto->subdissectors = g_slist_append(proto->subdissectors, GUINT_TO_POINTER(PACKET_UDP)); proto->subdissectors = g_slist_append(proto->subdissectors, GUINT_TO_POINTER(PACKET_UDP));
proto->deinit = packet_ip_deinit;
proto->subdissectors = g_slist_append(proto->subdissectors, GUINT_TO_POINTER(PACKET_TCP)); proto->subdissectors = g_slist_append(proto->subdissectors, GUINT_TO_POINTER(PACKET_TCP));
return proto; return proto;
} }

View File

@ -0,0 +1,123 @@
/**************************************************************************
**
** sngrep - SIP Messages flow viewer
**
** Copyright (C) 2013-2018 Ivan Alonso (Kaian)
** Copyright (C) 2013-2018 Irontec SL. All rights reserved.
**
** This program is free software: you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation, either version 3 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program. If not, see <http://www.gnu.org/licenses/>.
**
****************************************************************************/
/**
* @file gasyncqueuesource.c
* @author Ivan Alonso [aka Kaian] <kaian@irontec.com>
*
* @brief Functions to GSource for GAsyncQueue
*
* This GSource is implemented based on the Custom GSource tutorial available at
* https://developer.gnome.org/gnome-devel-demos/stable/custom-gsource.c.html.en
*/
#include <glib.h>
#include <glib-object.h>
#include "gasyncqueuesource.h"
static gboolean
g_async_queue_source_prepare(GSource *source, G_GNUC_UNUSED gint *timeout)
{
GAsyncQueueSource *g_async_queue_source = (GAsyncQueueSource *) source;
return (g_async_queue_length(g_async_queue_source->queue) > 0);
}
static gboolean
g_async_queue_source_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
{
GAsyncQueueSource *g_async_queue_source = (GAsyncQueueSource *) source;
gpointer message;
GAsyncQueueSourceFunc func = (GAsyncQueueSourceFunc) callback;
/* Pop a message off the queue. */
message = g_async_queue_try_pop(g_async_queue_source->queue);
/* If there was no message, bail. */
if (message == NULL) {
/* Keep the source around to handle the next message. */
return TRUE;
}
/* @func may be %NULL if no callback was specified.
* If so, drop the message. */
if (func == NULL) {
if (g_async_queue_source->destroy != NULL) {
g_async_queue_source->destroy(message);
}
/* Keep the source around to consume the next message. */
return TRUE;
}
return func(message, user_data);
}
static void
g_async_queue_source_finalize(GSource *source)
{
GAsyncQueueSource *g_async_queue_source = (GAsyncQueueSource *) source;
g_async_queue_unref(g_async_queue_source->queue);
}
static gboolean
g_async_queue_source_closure_callback(gpointer message, gpointer user_data)
{
GClosure *closure = user_data;
GValue param_value = G_VALUE_INIT;
GValue return_value = G_VALUE_INIT;
gboolean retval;
/* The invoked function is responsible for freeing @message. */
g_value_init(&return_value, G_TYPE_BOOLEAN);
g_value_init(&param_value, G_TYPE_POINTER);
g_value_set_pointer(&param_value, message);
g_closure_invoke(closure, &return_value, 1, &param_value, NULL);
retval = g_value_get_boolean(&return_value);
g_value_unset(&param_value);
g_value_unset(&return_value);
return retval;
}
static GSourceFuncs g_async_queue_source_funcs =
{
g_async_queue_source_prepare,
NULL, /* check */
g_async_queue_source_dispatch,
g_async_queue_source_finalize,
(GSourceFunc) g_async_queue_source_closure_callback,
NULL,
};
GSource *
g_async_queue_source_new(GAsyncQueue *queue, GDestroyNotify destroy)
{
g_return_val_if_fail (queue != NULL, NULL);
GAsyncQueueSource *source = (GAsyncQueueSource *) g_source_new(
&g_async_queue_source_funcs,
sizeof(GAsyncQueueSource)
);
source->queue = g_async_queue_ref(queue);
source->destroy = destroy;
return (GSource *) source;
}

View File

@ -0,0 +1,54 @@
/**************************************************************************
**
** sngrep - SIP Messages flow viewer
**
** Copyright (C) 2013-2018 Ivan Alonso (Kaian)
** Copyright (C) 2013-2018 Irontec SL. All rights reserved.
**
** This program is free software: you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation, either version 3 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program. If not, see <http://www.gnu.org/licenses/>.
**
****************************************************************************/
/**
* @file gasyncqueuesource.h
* @author Ivan Alonso [aka Kaian] <kaian@irontec.com>
*
* @brief Functions to GSource for GAsyncQueue
*
* This GSource is implemented based on the Custom GSource tutorial available at
* https://developer.gnome.org/gnome-devel-demos/stable/custom-gsource.c.html.en
*/
#ifndef SNGREP_GASYNCQUEUESOURCE_H
#define SNGREP_GASYNCQUEUESOURCE_H
#include <glib.h>
typedef struct _GAsyncQueueSource GAsyncQueueSource;
typedef gboolean (*GAsyncQueueSourceFunc)(gpointer message, gpointer user_data);
struct _GAsyncQueueSource
{
GSource parent;
GAsyncQueue *queue;
GDestroyNotify destroy;
};
GSource *
g_async_queue_source_new(GAsyncQueue *queue, GDestroyNotify destroy);
#endif //SNGREP_GASYNCQUEUESOURCE_H

View File

@ -69,6 +69,18 @@ print_version_info()
PACKAGE, VERSION); PACKAGE, VERSION);
} }
static gboolean
print_storage_count(GMainLoop *loop)
{
setbuf(stdout, NULL);
g_print("\rDialog count: %d", storage_calls_count());
if (storage_pending_packets() == 0) {
g_print("\n");
g_main_loop_quit(loop);
}
return TRUE;
}
/** /**
* @brief Main function logic * @brief Main function logic
* *
@ -368,6 +380,9 @@ main(int argc, char *argv[])
#endif #endif
/***************************** Main Logic *****************************/ /***************************** Main Logic *****************************/
// Create main loop for default context
GMainLoop *main_loop = g_main_loop_new(NULL, FALSE);
// Initialize SIP Messages Storage // Initialize SIP Messages Storage
if (!storage_init(storage_copts, storage_mopts, storage_sopts, &error)) { if (!storage_init(storage_copts, storage_mopts, storage_sopts, &error)) {
g_printerr("Failed to initialize storage: %s\n", error->message); g_printerr("Failed to initialize storage: %s\n", error->message);
@ -379,26 +394,19 @@ main(int argc, char *argv[])
if (!no_interface) { if (!no_interface) {
// Initialize interface // Initialize interface
if (!ncurses_init(&error)) { if (!ncurses_init(main_loop, &error)) {
g_printerr("error: %s\n", error->message); g_printerr("error: %s\n", error->message);
return 1; return 1;
} }
// This is a blocking call.
// Create the first panel and wait for user input
ncurses_create_window(WINDOW_CALL_LIST);
ncurses_wait_for_input();
} else { } else {
setbuf(stdout, NULL); if (!quiet) {
while (capture_is_running(capture) || storage_pending_packets() >= 0) { g_timeout_add(500, (GSourceFunc) print_storage_count, main_loop);
if (!quiet)
g_print("\rDialog count: %d", storage_calls_count());
g_usleep(500 * 1000);
} }
if (!quiet)
g_print("\rDialog count: %d\n", storage_calls_count());
} }
/************************* Application Main Loop *************************/
g_main_loop_run(main_loop);
// Capture stop // Capture stop
capture_manager_stop(capture); capture_manager_stop(capture);

View File

@ -34,6 +34,7 @@
#include <math.h> #include <math.h>
#include <stdlib.h> #include <stdlib.h>
#include <locale.h> #include <locale.h>
#include <glib-unix.h>
#include "glib-extra.h" #include "glib-extra.h"
#include "setting.h" #include "setting.h"
#include "manager.h" #include "manager.h"
@ -61,101 +62,6 @@
*/ */
static GPtrArray *windows; static GPtrArray *windows;
gboolean
ncurses_init(GError **error)
{
gshort bg, fg;
const gchar *term;
// Set Locale
setlocale(LC_CTYPE, "");
// Initialize curses
if (!initscr()) {
g_set_error(error,
NCURSES_ERROR,
NCURSES_ERROR_INIT,
"Unable to initialize ncurses mode.");
return FALSE;
}
// Check if user wants a black background
if (setting_has_value(SETTING_BACKGROUND, "dark")) {
assume_default_colors(COLOR_WHITE, COLOR_BLACK);
} else {
use_default_colors();
}
// Enable Colors
start_color();
cbreak();
// Dont write user input on screen
noecho();
// Hide the cursor
curs_set(0);
// Only delay ESC Sequences 25 ms (we dont want Escape sequences)
ESCDELAY = 25;
// Redefine some keys
term = getenv("TERM");
if (term
&& (!strcmp(term, "xterm") || !strcmp(term, "xterm-color") || !strcmp(term, "vt220"))) {
define_key("\033[H", KEY_HOME);
define_key("\033[F", KEY_END);
define_key("\033OP", KEY_F(1));
define_key("\033OQ", KEY_F(2));
define_key("\033OR", KEY_F(3));
define_key("\033OS", KEY_F(4));
define_key("\033[11~", KEY_F(1));
define_key("\033[12~", KEY_F(2));
define_key("\033[13~", KEY_F(3));
define_key("\033[14~", KEY_F(4));
define_key("\033[17;2~", KEY_F(18));
}
if (setting_has_value(SETTING_BACKGROUND, "dark")) {
fg = COLOR_WHITE;
bg = COLOR_BLACK;
} else {
fg = COLOR_DEFAULT;
bg = COLOR_DEFAULT;
}
// Initialize colorpairs
init_pair(CP_CYAN_ON_DEF, COLOR_CYAN, bg);
init_pair(CP_YELLOW_ON_DEF, COLOR_YELLOW, bg);
init_pair(CP_MAGENTA_ON_DEF, COLOR_MAGENTA, bg);
init_pair(CP_GREEN_ON_DEF, COLOR_GREEN, bg);
init_pair(CP_RED_ON_DEF, COLOR_RED, bg);
init_pair(CP_BLUE_ON_DEF, COLOR_BLUE, bg);
init_pair(CP_WHITE_ON_DEF, COLOR_WHITE, bg);
init_pair(CP_DEF_ON_CYAN, fg, COLOR_CYAN);
init_pair(CP_DEF_ON_BLUE, fg, COLOR_BLUE);
init_pair(CP_WHITE_ON_BLUE, COLOR_WHITE, COLOR_BLUE);
init_pair(CP_BLACK_ON_BLUE, COLOR_BLACK, COLOR_BLUE);
init_pair(CP_BLACK_ON_CYAN, COLOR_BLACK, COLOR_CYAN);
init_pair(CP_WHITE_ON_CYAN, COLOR_WHITE, COLOR_CYAN);
init_pair(CP_YELLOW_ON_CYAN, COLOR_YELLOW, COLOR_CYAN);
init_pair(CP_BLUE_ON_CYAN, COLOR_BLUE, COLOR_CYAN);
init_pair(CP_BLUE_ON_WHITE, COLOR_BLUE, COLOR_WHITE);
init_pair(CP_CYAN_ON_WHITE, COLOR_CYAN, COLOR_WHITE);
init_pair(CP_CYAN_ON_BLACK, COLOR_CYAN, COLOR_BLACK);
// Initialize windows stack
windows = g_ptr_array_new();
return TRUE;
}
void
ncurses_deinit()
{
// Clear screen before leaving
refresh();
// End ncurses mode
endwin();
}
Window * Window *
ncurses_create_window(enum WindowTypes type) ncurses_create_window(enum WindowTypes type)
{ {
@ -256,80 +162,88 @@ ncurses_find_by_type(enum WindowTypes type)
return window; return window;
} }
int static gboolean
ncurses_wait_for_input() ncurses_refresh_screen(GMainLoop *loop)
{ {
Window *ui; PANEL *panel = panel_below(NULL);
WINDOW *win;
PANEL *panel;
// While there are still panels // While there are still panels
while ((panel = panel_below(NULL))) { if (panel) {
// Get panel interface structure // Get panel interface structure
ui = ncurses_find_by_panel(panel); Window *ui = ncurses_find_by_panel(panel);
// Set character input timeout 200 ms
halfdelay(REFRESHTHSECS);
// Avoid parsing any packet while UI is being drawn
capture_lock(capture_manager());
// Query the interface if it needs to be redrawn // Query the interface if it needs to be redrawn
if (window_redraw(ui)) { if (window_redraw(ui)) {
// Redraw this panel // Redraw this panel
if (window_draw(ui) != 0) { if (window_draw(ui) != 0) {
ncurses_destroy_window(ui); ncurses_destroy_window(ui);
capture_unlock(capture_manager()); return TRUE;
continue;
} }
} }
capture_unlock(capture_manager());
// Update panel stack // Update panel stack
update_panels(); update_panels();
doupdate(); doupdate();
return TRUE;
// Get topmost panel } else {
panel = panel_below(NULL); g_main_loop_quit(loop);
return FALSE;
// Enable key input on current panel
win = panel_window(panel);
keypad(win, TRUE);
// Get pressed key
int c = wgetch(win);
// Timeout, no key pressed
if (c == ERR)
continue;
capture_lock(capture_manager());
// Handle received key
int hld = KEY_NOT_HANDLED;
while (hld != KEY_HANDLED) {
// Check if current panel has custom bindings for that key
hld = window_handle_key(ui, c);
if (hld == KEY_HANDLED) {
// Panel handled this key
continue;
} else if (hld == KEY_PROPAGATED) {
// Destroy current panel
ncurses_destroy_window(ui);
// Try to handle this key with the previous panel
ui = ncurses_find_by_panel(panel_below(NULL));
} else if (hld == KEY_DESTROY) {
ncurses_destroy_window(ui);
break;
} else {
// Key not handled by UI nor propagated. Use default handler
hld = ncurses_default_keyhandler(ui, c);
}
}
capture_unlock(capture_manager());
} }
return 0; }
static gboolean
ncurses_read_input(G_GNUC_UNUSED gint fd, G_GNUC_UNUSED GIOCondition condition, GMainLoop *loop)
{
PANEL *panel = panel_below(NULL);
g_return_val_if_fail(panel != NULL, FALSE);
// Get panel interface structure
Window *ui = ncurses_find_by_panel(panel);
g_return_val_if_fail(ui != NULL, FALSE);
// Enable key input on current panel
WINDOW *win = panel_window(panel);
g_return_val_if_fail(win != NULL, FALSE);
// Set window keyread in non-blocking mode
wtimeout(win, 0);
keypad(win, TRUE);
// Get pressed key
int c = wgetch(win);
// No key pressed
if (c == ERR)
return TRUE;
// Handle received key
int hld = KEY_NOT_HANDLED;
while (hld != KEY_HANDLED) {
// Check if current panel has custom bindings for that key
hld = window_handle_key(ui, c);
if (hld == KEY_HANDLED) {
// Panel handled this key
continue;
} else if (hld == KEY_PROPAGATED) {
// Destroy current panel
ncurses_destroy_window(ui);
// Try to handle this key with the previous panel
ui = ncurses_find_by_panel(panel_below(NULL));
} else if (hld == KEY_DESTROY) {
ncurses_destroy_window(ui);
break;
} else {
// Key not handled by UI nor propagated. Use default handler
hld = ncurses_default_keyhandler(ui, c);
}
}
// Force screen redraw with each keystroke
ncurses_refresh_screen(loop);
return TRUE;
} }
int int
@ -550,3 +464,109 @@ draw_message_pos(WINDOW *win, Message *msg, int starting)
return line - starting; return line - starting;
} }
gboolean
ncurses_init(GMainLoop *loop, GError **error)
{
gshort bg, fg;
const gchar *term;
// Set Locale
setlocale(LC_CTYPE, "");
// Initialize curses
if (!initscr()) {
g_set_error(error,
NCURSES_ERROR,
NCURSES_ERROR_INIT,
"Unable to initialize ncurses mode.");
return FALSE;
}
// Check if user wants a black background
if (setting_has_value(SETTING_BACKGROUND, "dark")) {
assume_default_colors(COLOR_WHITE, COLOR_BLACK);
} else {
use_default_colors();
}
// Enable Colors
start_color();
cbreak();
// Dont write user input on screen
noecho();
// Hide the cursor
curs_set(0);
// Only delay ESC Sequences 25 ms (we dont want Escape sequences)
ESCDELAY = 25;
// Redefine some keys
term = getenv("TERM");
if (term
&& (!strcmp(term, "xterm") || !strcmp(term, "xterm-color") || !strcmp(term, "vt220"))) {
define_key("\033[H", KEY_HOME);
define_key("\033[F", KEY_END);
define_key("\033OP", KEY_F(1));
define_key("\033OQ", KEY_F(2));
define_key("\033OR", KEY_F(3));
define_key("\033OS", KEY_F(4));
define_key("\033[11~", KEY_F(1));
define_key("\033[12~", KEY_F(2));
define_key("\033[13~", KEY_F(3));
define_key("\033[14~", KEY_F(4));
define_key("\033[17;2~", KEY_F(18));
}
if (setting_has_value(SETTING_BACKGROUND, "dark")) {
fg = COLOR_WHITE;
bg = COLOR_BLACK;
} else {
fg = COLOR_DEFAULT;
bg = COLOR_DEFAULT;
}
// Initialize colorpairs
init_pair(CP_CYAN_ON_DEF, COLOR_CYAN, bg);
init_pair(CP_YELLOW_ON_DEF, COLOR_YELLOW, bg);
init_pair(CP_MAGENTA_ON_DEF, COLOR_MAGENTA, bg);
init_pair(CP_GREEN_ON_DEF, COLOR_GREEN, bg);
init_pair(CP_RED_ON_DEF, COLOR_RED, bg);
init_pair(CP_BLUE_ON_DEF, COLOR_BLUE, bg);
init_pair(CP_WHITE_ON_DEF, COLOR_WHITE, bg);
init_pair(CP_DEF_ON_CYAN, fg, COLOR_CYAN);
init_pair(CP_DEF_ON_BLUE, fg, COLOR_BLUE);
init_pair(CP_WHITE_ON_BLUE, COLOR_WHITE, COLOR_BLUE);
init_pair(CP_BLACK_ON_BLUE, COLOR_BLACK, COLOR_BLUE);
init_pair(CP_BLACK_ON_CYAN, COLOR_BLACK, COLOR_CYAN);
init_pair(CP_WHITE_ON_CYAN, COLOR_WHITE, COLOR_CYAN);
init_pair(CP_YELLOW_ON_CYAN, COLOR_YELLOW, COLOR_CYAN);
init_pair(CP_BLUE_ON_CYAN, COLOR_BLUE, COLOR_CYAN);
init_pair(CP_BLUE_ON_WHITE, COLOR_BLUE, COLOR_WHITE);
init_pair(CP_CYAN_ON_WHITE, COLOR_CYAN, COLOR_WHITE);
init_pair(CP_CYAN_ON_BLACK, COLOR_CYAN, COLOR_BLACK);
// Initialize windows stack
windows = g_ptr_array_new();
// Create the first displayed window
ncurses_create_window(WINDOW_CALL_LIST);
// Source for reading events from stdin
GSource *source = g_unix_fd_source_new(fileno(stdin), G_IO_IN | G_IO_ERR | G_IO_HUP);
g_source_set_callback(source, (GSourceFunc) ncurses_read_input, loop, NULL);
g_source_attach(source, NULL);
// Refresh screen every 200 ms
g_timeout_add(200, (GSourceFunc) ncurses_refresh_screen, loop);
return TRUE;
}
void
ncurses_deinit()
{
// Clear screen before leaving
refresh();
// End ncurses mode
endwin();
}

View File

@ -63,7 +63,7 @@ enum ncurses_errors
* @return TRUE on initialization success, FALSE otherwise * @return TRUE on initialization success, FALSE otherwise
*/ */
gboolean gboolean
ncurses_init(GError **error); ncurses_init(GMainLoop *loop, GError **error);
/** /**
* @brief Stops ncurses mode * @brief Stops ncurses mode
@ -99,16 +99,6 @@ ncurses_find_by_panel(PANEL *panel);
Window * Window *
ncurses_find_by_type(enum WindowTypes type); ncurses_find_by_type(enum WindowTypes type);
/**
* @brief Wait for user input in topmost panel
*
* This function manages all user input in all panel types and
* redraws the panel using its own draw function
*
*/
int
ncurses_wait_for_input();
/** /**
* @brief Default handler for keys * @brief Default handler for keys
* *

View File

@ -37,9 +37,9 @@
* | | | * | | |
* | +--------------------------+ * | +--------------------------+
* | +--------------------------+ * | +--------------------------+
* +--->| | <----------- You are here. * +--->| Storage | <----------- You are here.
* | Storage | * |--------------------------|----+
* +--->| |----+ * +--->| Parser | |
* Packet | +--------------------------+ | Capture * Packet | +--------------------------+ | Capture
* Queue | +--------------------------+ | Output * Queue | +--------------------------+ | Output
* | | | | * | | | |
@ -62,12 +62,6 @@
*/ */
Storage storage = { 0 }; Storage storage = { 0 };
void
storage_add_packet(Packet *packet)
{
g_async_queue_push(storage.pkt_queue, packet);
}
static gint static gint
storage_sorter(const Call **a, const Call **b) storage_sorter(const Call **a, const Call **b)
{ {
@ -450,31 +444,17 @@ storage_check_rtcp_packet(Packet *packet)
} }
//! Start capturing packets function //! Start capturing packets function
static gpointer gpointer
storage_check_packet() storage_check_packet(Packet *packet)
{ {
while (storage.running) { if (packet_has_type(packet, PACKET_SIP)) {
return storage_check_sip_packet(packet);
Packet *packet = g_async_queue_timeout_pop(storage.pkt_queue, 500000); } else if (packet_has_type(packet, PACKET_RTP)) {
if (packet) { return storage_check_rtp_packet(packet);
if (packet_has_type(packet, PACKET_SIP)) { } else if (packet_has_type(packet, PACKET_RTCP)) {
if (storage_check_sip_packet(packet) == NULL) { return storage_check_rtcp_packet(packet);
packet_free(packet);
}
} else if (packet_has_type(packet, PACKET_RTP)) {
if (storage_check_rtp_packet(packet) == NULL) {
packet_free(packet);
}
} else if (packet_has_type(packet, PACKET_RTCP)) {
if (storage_check_rtcp_packet(packet) == NULL) {
packet_free(packet);
}
}
}
} }
return packet;
return NULL;
} }
gboolean gboolean
@ -507,9 +487,6 @@ storage_init(StorageCaptureOpts capture_options,
} }
} }
// Initialize storage packet queue
storage.pkt_queue = g_async_queue_new();
// Create a vector to store calls // Create a vector to store calls
storage.calls = g_ptr_array_new_with_free_func(call_destroy); storage.calls = g_ptr_array_new_with_free_func(call_destroy);
@ -527,25 +504,19 @@ storage_init(StorageCaptureOpts capture_options,
storage.sort.asc = TRUE; storage.sort.asc = TRUE;
} }
storage.running = TRUE;
storage.thread = g_thread_new(NULL, (GThreadFunc) storage_check_packet, NULL);
return TRUE; return TRUE;
} }
gint gint
storage_pending_packets() storage_pending_packets()
{ {
return g_async_queue_length(storage.pkt_queue); CaptureManager *manager = capture_manager();
return g_async_queue_length(manager->queue);
} }
void void
storage_deinit() storage_deinit()
{ {
// Stop storage thread
storage.running = FALSE;
g_thread_join(storage.thread);
// Remove all calls // Remove all calls
storage_calls_clear(); storage_calls_clear();
// Remove Call-id hash table // Remove Call-id hash table

View File

@ -115,14 +115,13 @@ struct _Storage
GHashTable *callids; GHashTable *callids;
//! Streams hash table //! Streams hash table
GHashTable *streams; GHashTable *streams;
//! Pending packets to be parsed queue
GAsyncQueue *pkt_queue;
//! Storage thread //! Storage thread
GThread *thread; guint source;
//! Running thread flag
gboolean running;
}; };
gpointer
storage_check_packet(Packet *packet);
/** /**
* @brief Initialize SIP Storage structures * @brief Initialize SIP Storage structures
* *
@ -143,18 +142,6 @@ storage_init(StorageCaptureOpts capture_options,
void void
storage_deinit(); storage_deinit();
/**
* @brief Add a new packet to storage queue
*
* This function must be used to include a new packet into the storage from
* capture threads. Storage will periodically check the internal queue for
* new packets to be stored or discarded.
*
* @param packet Packet structure pointer with fully captured data
*/
void
storage_add_packet(Packet *packet);
/** /**
* @brief Return if the call list has changed * @brief Return if the call list has changed
* *