freeswitch/libs/sofia-sip/libsofia-sip-ua/tport/tport_threadpool.c
Michael Jerris dd22cf6876 Wed Jun 3 12:25:19 CDT 2009 Pekka Pessi <first.last@nokia.com>
* tport_type_udp.c: added field names to tport_vtable_t initialization

Wed Jun  3 12:25:52 CDT 2009  Pekka Pessi <first.last@nokia.com>
  * tport_type_tcp.c: added field names to tport_vtable_t initialization

Wed Jun  3 12:29:13 CDT 2009  Pekka Pessi <first.last@nokia.com>
  * tport_threadpool.c: added field names to tport_vtable_t initialization

Wed Jun  3 12:29:41 CDT 2009  Pekka Pessi <first.last@nokia.com>
  * tport_type_connect.c: added field names to tport_vtable_t initialization

Wed Jun  3 12:30:01 CDT 2009  Pekka Pessi <first.last@nokia.com>
  * tport_type_stun.c: added field names to tport_vtable_t initialization

Wed Jun  3 12:30:17 CDT 2009  Pekka Pessi <first.last@nokia.com>
  * tport_type_sctp.c: added field names to tport_vtable_t initialization



git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@13956 d0543943-73ff-0310-b7d9-9358b9ac24b2
2009-06-25 18:43:54 +00:00

819 lines
20 KiB
C

/*
* This file is part of the Sofia-SIP package
*
* Copyright (C) 2006 Nokia Corporation.
*
* Contact: Pekka Pessi <pekka.pessi@nokia.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA
*
*/
/**@CFILE tport_threadpool.c Multithreading transport
*
* See tport.docs for more detailed description of tport interface.
*
* @author Pekka Pessi <Pekka.Pessi@nokia.com>
* @author Martti Mela <Martti.Mela@nokia.com>
*
* @date Created: Fri Mar 24 08:45:49 EET 2006 ppessi
*/
#include "config.h"
#undef HAVE_SIGCOMP
#define SU_ROOT_MAGIC_T struct tport_threadpool
#define SU_WAKEUP_ARG_T struct tport_s
#define SU_MSG_ARG_T union tport_su_msg_arg
#include "tport_internal.h"
#include <stdlib.h>
#include <time.h>
#include <assert.h>
#include <errno.h>
#include <limits.h>
#if HAVE_FUNC
#elif HAVE_FUNCTION
#define __func__ __FUNCTION__
#else
static char const __func__[] = "tport_threadpool";
#endif
/* ==== Thread pools =================================================== */
typedef struct threadpool threadpool_t;
typedef struct {
tport_primary_t tptp_primary;
threadpool_t *tptp_pool; /**< Worker threads */
unsigned tptp_poolsize;
} tport_threadpool_t;
struct threadpool
{
/* Shared */
su_clone_r thrp_clone;
tport_threadpool_t *thrp_tport;
int thrp_killing; /* Threadpool is being killed */
/* Private variables */
su_root_t *thrp_root;
int thrp_reg;
struct sigcomp_compartment *thrp_compartment;
su_msg_r thrp_rmsg;
/* Slave thread counters */
int thrp_r_sent;
int thrp_s_recv;
unsigned thrp_rcvd_msgs;
unsigned thrp_rcvd_bytes;
/* Master thread counters */
int thrp_s_sent;
int thrp_r_recv;
int thrp_yield;
};
typedef struct
{
threadpool_t *tpd_thrp;
int tpd_errorcode;
msg_t *tpd_msg;
su_time_t tpd_when;
unsigned tpd_mtu;
#if HAVE_SIGCOMP
struct sigcomp_compartment *tpd_cc;
#endif
struct sigcomp_udvm *tpd_udvm;
socklen_t tpd_namelen;
su_sockaddr_t tpd_name[1];
} thrp_udp_deliver_t;
union tport_su_msg_arg
{
threadpool_t *thrp;
thrp_udp_deliver_t thrp_udp_deliver[1];
};
int tport_threadpool_init_primary(tport_primary_t *,
tp_name_t tpn[1],
su_addrinfo_t *,
tagi_t const *,
char const **return_culprit);
static void tport_threadpool_deinit_primary(tport_primary_t *pri);
static int tport_thread_send(tport_t *tp,
msg_t *msg,
tp_name_t const *tpn,
struct sigcomp_compartment *cc,
unsigned mtu);
tport_vtable_t const tport_threadpool_vtable =
{
/* vtp_name */ "udp",
/* vtp_public */ tport_type_local,
/* vtp_pri_size */ sizeof (tport_threadpool_t),
/* vtp_init_primary */ tport_threadpool_init_primary,
/* vtp_deinit_primary */ tport_threadpool_deinit_primary,
/* vtp_wakeup_pri */ NULL,
/* vtp_connect */ NULL,
/* vtp_secondary_size */ 0, /* No secondary transports! */
/* vtp_init_secondary */ NULL,
/* vtp_deinit_secondary */ NULL,
/* vtp_shutdown */ NULL,
/* vtp_set_events */ NULL,
/* vtp_wakeup */ NULL,
/* vtp_recv */ tport_recv_dgram,
/* vtp_send */ tport_send_dgram,
/* vtp_deliver */ NULL,
/* vtp_prepare */ tport_thread_send,
/* vtp_keepalive */ NULL,
/* vtp_stun_response */ NULL,
/* vtp_next_secondary_timer*/ NULL,
/* vtp_secondary_timer */ NULL,
};
static int thrp_udp_init(su_root_t *, threadpool_t *);
static void thrp_udp_deinit(su_root_t *, threadpool_t *);
static int thrp_udp_event(threadpool_t *thrp,
su_wait_t *w,
tport_t *_tp);
static int thrp_udp_recv_deliver(threadpool_t *thrp,
tport_t const *tp,
thrp_udp_deliver_t *tpd,
int events);
static int thrp_udp_recv(threadpool_t *thrp, thrp_udp_deliver_t *tpd);
#if HAVE_SIGCOMP
static int thrp_udvm_decompress(threadpool_t *thrp,
thrp_udp_deliver_t *tpd);
#endif
static void thrp_udp_deliver(threadpool_t *thrp,
su_msg_r msg,
union tport_su_msg_arg *arg);
static void thrp_udp_deliver_report(threadpool_t *thrp,
su_msg_r m,
union tport_su_msg_arg *arg);
static void thrp_udp_send(threadpool_t *thrp,
su_msg_r msg,
union tport_su_msg_arg *arg);
static void thrp_udp_send_report(threadpool_t *thrp,
su_msg_r msg,
union tport_su_msg_arg *arg);
/** Launch threads in the tport pool. */
int tport_threadpool_init_primary(tport_primary_t *pri,
tp_name_t tpn[1],
su_addrinfo_t *ai,
tagi_t const *tags,
char const **return_culprit)
{
tport_threadpool_t *tptp = (tport_threadpool_t *)pri;
tport_t *tp = pri->pri_primary;
threadpool_t *thrp;
int i, N = tp->tp_params->tpp_thrpsize;
assert(ai->ai_socktype == SOCK_DGRAM);
if (tport_udp_init_primary(pri, tpn, ai, tags, return_culprit) < 0)
return -1;
if (N == 0)
return 0;
thrp = su_zalloc(tp->tp_home, (sizeof *thrp) * N);
if (!thrp)
return -1;
su_setblocking(tp->tp_socket, 0);
tptp->tptp_pool = thrp;
tptp->tptp_poolsize = N;
for (i = 0; i < N; i++) {
#if HAVE_SIGCOMP
if (tport_has_sigcomp(tp))
thrp[i].thrp_compartment = tport_primary_compartment(tp->tp_master);
#endif
thrp[i].thrp_tport = tptp;
if (su_clone_start(pri->pri_master->mr_root,
thrp[i].thrp_clone,
thrp + i,
thrp_udp_init,
thrp_udp_deinit) < 0)
goto error;
}
tp->tp_events = 0;
return 0;
error:
assert(!"tport_launch_threadpool");
return -1;
}
/** Kill threads in the tport pool.
*
* @note Executed by stack thread only.
*/
static
void tport_threadpool_deinit_primary(tport_primary_t *pri)
{
tport_threadpool_t *tptp = (tport_threadpool_t *)pri;
threadpool_t *thrp = tptp->tptp_pool;
int i, N = pri->tptp_poolsize;
if (!thrp)
return;
/* Prevent application from using these. */
for (i = 0; i < N; i++)
thrp[i].thrp_killing = 1;
/* Stop every task in the threadpool. */
for (i = 0; i < N; i++)
su_clone_wait(pri->pri_master->mr_root, thrp[i].thrp_clone);
su_free(pri->pri_home, tptp), tptp->tptp_pool = NULL;
tptp->tptp_poolsize = 0;
SU_DEBUG_3(("%s(%p): zapped threadpool\n", __func__, pri));
}
static int thrp_udp_init(su_root_t *root, threadpool_t *thrp)
{
tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary;
su_wait_t wait[1];
assert(tp);
thrp->thrp_root = root;
if (su_wait_create(wait, tp->tp_socket, SU_WAIT_IN | SU_WAIT_ERR) < 0)
return -1;
thrp->thrp_reg = su_root_register(root, wait, thrp_udp_event, tp, 0);
if (thrp->thrp_reg == -1)
return -1;
return 0;
}
static void thrp_udp_deinit(su_root_t *root, threadpool_t *thrp)
{
if (thrp->thrp_reg)
su_root_deregister(root, thrp->thrp_reg), thrp->thrp_reg = 0;
su_msg_destroy(thrp->thrp_rmsg);
}
su_inline void
thrp_yield(threadpool_t *thrp)
{
tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary;
su_root_eventmask(thrp->thrp_root, thrp->thrp_reg, tp->tp_socket, 0);
thrp->thrp_yield = 1;
}
su_inline void
thrp_gain(threadpool_t *thrp)
{
tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary;
int events = SU_WAIT_IN | SU_WAIT_ERR;
su_root_eventmask(thrp->thrp_root, thrp->thrp_reg, tp->tp_socket, events);
thrp->thrp_yield = 0;
}
static int thrp_udp_event(threadpool_t *thrp,
su_wait_t *w,
tport_t *tp)
{
#if HAVE_POLL
assert(w->fd == tp->tp_socket);
#endif
for (;;) {
thrp_udp_deliver_t *tpd;
int events;
if (!*thrp->thrp_rmsg) {
if (su_msg_create(thrp->thrp_rmsg,
su_root_parent(thrp->thrp_root),
su_root_task(thrp->thrp_root),
thrp_udp_deliver,
sizeof (*tpd)) == -1) {
SU_DEBUG_1(("thrp_udp_event(%p): su_msg_create(): %s\n", thrp,
strerror(errno)));
return 0;
}
}
tpd = su_msg_data(thrp->thrp_rmsg)->thrp_udp_deliver; assert(tpd);
tpd->tpd_thrp = thrp;
events = su_wait_events(w, tp->tp_socket);
if (!events)
return 0;
thrp_udp_recv_deliver(thrp, tp, tpd, events);
if (*thrp->thrp_rmsg) {
SU_DEBUG_7(("thrp_udp_event(%p): no msg sent\n", thrp));
tpd = su_msg_data(thrp->thrp_rmsg)->thrp_udp_deliver;
memset(tpd, 0, sizeof *tpd);
return 0;
}
if (thrp->thrp_yield || (thrp->thrp_s_sent - thrp->thrp_s_recv) > 0)
return 0;
su_wait(w, 1, 0);
}
}
static int thrp_udp_recv_deliver(threadpool_t *thrp,
tport_t const *tp,
thrp_udp_deliver_t *tpd,
int events)
{
unsigned qlen = thrp->thrp_r_sent - thrp->thrp_r_recv;
SU_DEBUG_7(("thrp_udp_event(%p): events%s%s%s%s for %p\n", thrp,
events & SU_WAIT_IN ? " IN" : "",
events & SU_WAIT_HUP ? " HUP" : "",
events & SU_WAIT_OUT ? " OUT" : "",
events & SU_WAIT_ERR ? " ERR" : "",
tpd));
if (events & SU_WAIT_ERR) {
tpd->tpd_errorcode = tport_udp_error(tp, tpd->tpd_name);
if (tpd->tpd_errorcode) {
if (thrp->thrp_yield)
su_msg_report(thrp->thrp_rmsg, thrp_udp_deliver_report);
tpd->tpd_when = su_now();
su_msg_send(thrp->thrp_rmsg);
thrp->thrp_r_sent++;
return 0;
}
}
if (events & SU_WAIT_IN) {
if (thrp_udp_recv(thrp, tpd) < 0) {
tpd->tpd_errorcode = su_errno();
assert(tpd->tpd_errorcode);
if (su_is_blocking(tpd->tpd_errorcode))
return 0;
}
else if (tpd->tpd_msg) {
int n = msg_extract(tpd->tpd_msg); (void)n;
thrp->thrp_rcvd_msgs++;
thrp->thrp_rcvd_bytes += msg_size(tpd->tpd_msg);
}
#if HAVE_SIGCOMP
if (tpd->tpd_udvm && !tpd->tpd_msg)
sigcomp_udvm_free(tpd->tpd_udvm), tpd->tpd_udvm = NULL;
#endif
assert(!tpd->tpd_msg || !tpd->tpd_errorcode);
if (tpd->tpd_msg || tpd->tpd_errorcode) {
if (qlen >= tp->tp_params->tpp_thrprqsize) {
SU_DEBUG_7(("tport recv queue %i: %u\n",
(int)(thrp - tp->tp_pri->tptp_pool), qlen));
thrp_yield(thrp);
}
if (qlen >= tp->tp_params->tpp_thrprqsize / 2)
su_msg_report(thrp->thrp_rmsg, thrp_udp_deliver_report);
tpd->tpd_when = su_now();
su_msg_send(thrp->thrp_rmsg);
thrp->thrp_r_sent++;
return 0;
}
}
return 0;
}
#include <pthread.h>
/** Mutex for reading from socket */
static pthread_mutex_t mutex[1] = { PTHREAD_MUTEX_INITIALIZER };
/** Receive a UDP packet by threadpool. */
static
int thrp_udp_recv(threadpool_t *thrp, thrp_udp_deliver_t *tpd)
{
tport_t const *tp = thrp->thrp_tport->pri_primary;
unsigned char sample[2];
int N;
int s = tp->tp_socket;
pthread_mutex_lock(mutex);
/* Simulate packet loss */
if (tp->tp_params->tpp_drop &&
su_randint(0, 1000) < tp->tp_params->tpp_drop) {
recv(s, sample, 1, 0);
pthread_mutex_unlock(mutex);
SU_DEBUG_3(("tport(%p): simulated packet loss!\n", tp));
return 0;
}
/* Peek for first two bytes in message:
determine if this is stun, sigcomp or sip
*/
N = recv(s, sample, sizeof sample, MSG_PEEK | MSG_TRUNC);
if (N < 0) {
if (su_is_blocking(su_errno()))
N = 0;
}
else if (N <= 1) {
SU_DEBUG_1(("%s(%p): runt of %u bytes\n", "thrp_udp_recv", thrp, N));
recv(s, sample, sizeof sample, 0);
N = 0;
}
#if !HAVE_MSG_TRUNC
else if ((N = su_getmsgsize(tp->tp_socket)) < 0)
;
#endif
else if ((sample[0] & 0xf8) == 0xf8) {
#if HAVE_SIGCOMP
if (thrp->thrp_compartment) {
struct sigcomp_buffer *input;
void *data;
int dlen;
tpd->tpd_udvm =
sigcomp_udvm_create_for_compartment(thrp->thrp_compartment);
input = sigcomp_udvm_input_buffer(tpd->tpd_udvm, N); assert(input);
data = input->b_data + input->b_avail;
dlen = input->b_size - input->b_avail;
if (dlen < N)
dlen = 0;
tpd->tpd_namelen = sizeof(tpd->tpd_name);
dlen = recvfrom(tp->tp_socket, data, dlen, 0,
&tpd->tpd_name->su_sa, &tpd->tpd_namelen);
SU_CANONIZE_SOCKADDR(tpd->tpd_name);
if (dlen < N) {
su_seterrno(EMSGSIZE); /* Protocol error */
N = -1;
} else if (dlen == -1)
N = -1;
else {
input->b_avail += dlen;
input->b_complete = 1;
pthread_mutex_unlock(mutex);
N = thrp_udvm_decompress(thrp, tpd);
if (N == -1)
/* Do not report decompression errors as ICMP errors */
memset(tpd->tpd_name, 0, tpd->tpd_namelen);
return N;
}
pthread_mutex_unlock(mutex);
return N;
}
#endif
recv(s, sample, 1, 0);
pthread_mutex_unlock(mutex);
/* XXX - send NACK ? */
su_seterrno(EBADMSG);
N = -1;
}
else {
/* receive as usual */
N = tport_recv_dgram_r(tp, &tpd->tpd_msg, N);
}
pthread_mutex_unlock(mutex);
return N;
}
#if HAVE_SIGCOMP
static
int thrp_udvm_decompress(threadpool_t *thrp, thrp_udp_deliver_t *tpd)
{
struct sigcomp_udvm *udvm = tpd->tpd_udvm;
struct sigcomp_buffer *output;
msg_iovec_t iovec[msg_n_fragments] = {{ 0 }};
su_addrinfo_t *ai;
tport_t *tp = thrp->thrp_tport->pri_primary;
size_t n, m, i, dlen;
int eos;
void *data;
ssize_t veclen;
output = sigcomp_udvm_output_buffer(udvm, -1);
if (sigcomp_udvm_decompress(udvm, output, NULL) < 0) {
int error = sigcomp_udvm_errno(udvm);
SU_DEBUG_3(("%s: UDVM error %d: %s\n", __func__,
error, sigcomp_udvm_strerror(udvm)));
su_seterrno(EREMOTEIO);
return -1;
}
data = output->b_data + output->b_used;
dlen = output->b_avail - output->b_used;
/* XXX - if a message is larger than default output size... */
eos = output->b_complete; assert(output->b_complete);
veclen = tport_recv_iovec(tp, &tpd->tpd_msg, iovec, dlen, eos);
if (veclen <= 0) {
n = -1;
} else {
for (i = 0, n = 0; i < veclen; i++) {
m = iovec[i].mv_len; assert(dlen >= n + m);
memcpy(iovec[i].mv_base, data + n, m);
n += m;
}
assert(dlen == n);
msg_recv_commit(tpd->tpd_msg, dlen, eos); /* Mark buffer as used */
/* Message address */
ai = msg_addrinfo(tpd->tpd_msg);
ai->ai_flags |= TP_AI_COMPRESSED;
ai->ai_family = tpd->tpd_name->su_sa.sa_family;
ai->ai_socktype = SOCK_DGRAM;
ai->ai_protocol = IPPROTO_UDP;
memcpy(ai->ai_addr, tpd->tpd_name, ai->ai_addrlen = tpd->tpd_namelen);
SU_DEBUG_9(("%s(%p): sigcomp msg sz = %d\n", __func__, tp, n));
}
return n;
}
#endif
/** Deliver message from threadpool to the stack
*
* @note Executed by stack thread only.
*/
static
void thrp_udp_deliver(su_root_magic_t *magic,
su_msg_r m,
union tport_su_msg_arg *arg)
{
thrp_udp_deliver_t *tpd = arg->thrp_udp_deliver;
threadpool_t *thrp = tpd->tpd_thrp;
tport_t *tp = thrp->thrp_tport->pri_primary;
su_time_t now = su_now();
assert(magic != thrp);
thrp->thrp_r_recv++;
if (thrp->thrp_killing) {
#if HAVE_SIGCOMP
sigcomp_udvm_free(tpd->tpd_udvm), tpd->tpd_udvm = NULL;
#endif
msg_destroy(tpd->tpd_msg);
return;
}
SU_DEBUG_7(("thrp_udp_deliver(%p): got %p delay %f\n",
thrp, tpd, 1000 * su_time_diff(now, tpd->tpd_when)));
if (tpd->tpd_errorcode)
tport_error_report(tp, tpd->tpd_errorcode, tpd->tpd_name);
else if (tpd->tpd_msg) {
tport_deliver(tp, tpd->tpd_msg, NULL, &tpd->tpd_udvm, tpd->tpd_when);
tp->tp_rlogged = NULL;
}
#if HAVE_SIGCOMP
if (tpd->tpd_udvm) {
sigcomp_udvm_free(tpd->tpd_udvm), tpd->tpd_udvm = NULL;
}
#endif
}
static
void thrp_udp_deliver_report(threadpool_t *thrp,
su_msg_r m,
union tport_su_msg_arg *arg)
{
if (thrp->thrp_yield) {
int qlen = thrp->thrp_r_sent - thrp->thrp_r_recv;
int qsize = thrp->thrp_tport->pri_params->tpp_thrprqsize;
if (qlen == 0 || qlen < qsize / 2)
thrp_gain(thrp);
}
}
/** Send a message to network using threadpool.
*
* @note Executed by stack thread only.
*/
static
int tport_thread_send(tport_t *tp,
msg_t *msg,
tp_name_t const *tpn,
struct sigcomp_compartment *cc,
unsigned mtu)
{
threadpool_t *thrp = tp->tp_pri->tptp_pool;
thrp_udp_deliver_t *tpd;
int i, N = tp->tp_pri->tptp_poolsize;
su_msg_r m;
unsigned totalqlen = 0;
unsigned qlen;
if (!tp->tp_pri->tptp_pool)
return tport_prepare_and_send(tp, msg, tpn, cc, mtu);
SU_DEBUG_9(("tport_thread_send()\n"));
if (thrp->thrp_killing)
return (su_seterrno(ECHILD)), -1;
qlen = totalqlen = thrp->thrp_s_sent - thrp->thrp_s_recv;
/* Select thread with shortest queue */
for (i = 1; i < N; i++) {
threadpool_t *other = tp->tp_pri->tptp_pool + i;
unsigned len = other->thrp_s_sent - other->thrp_s_recv;
if (len < qlen ||
(len == qlen && (other->thrp_s_sent - thrp->thrp_s_sent) < 0))
thrp = other, qlen = len;
totalqlen += len;
}
if (totalqlen >= N * tp->tp_params->tpp_qsize)
SU_DEBUG_3(("tport send queue: %u (shortest %u)\n", totalqlen, qlen));
if (su_msg_create(m,
su_clone_task(thrp->thrp_clone),
su_root_task(tp->tp_master->mr_root),
thrp_udp_send,
sizeof (*tpd)) != su_success) {
SU_DEBUG_1(("thrp_udp_event(%p): su_msg_create(): %s\n", thrp,
strerror(errno)));
return -1;
}
tpd = su_msg_data(m)->thrp_udp_deliver;
tpd->tpd_thrp = thrp;
tpd->tpd_when = su_now();
tpd->tpd_mtu = mtu;
tpd->tpd_msg = msg_ref_create(msg);
#if HAVE_SIGCOMP
tpd->tpd_cc = cc;
#endif
su_msg_report(m, thrp_udp_send_report);
if (su_msg_send(m) == su_success) {
thrp->thrp_s_sent++;
return 0;
}
msg_ref_destroy(msg);
return -1;
}
/** thrp_udp_send() is run by threadpool to send the message. */
static
void thrp_udp_send(threadpool_t *thrp,
su_msg_r m,
union tport_su_msg_arg *arg)
{
thrp_udp_deliver_t *tpd = arg->thrp_udp_deliver;
tport_t *tp = thrp->thrp_tport->pri_primary;
msg_t *msg = tpd->tpd_msg;
msg_iovec_t *iov, auto_iov[40], *iov0 = NULL;
int iovlen, iovused, n;
assert(thrp == tpd->tpd_thrp);
thrp->thrp_s_recv++;
{
double delay = 1000 * su_time_diff(su_now(), tpd->tpd_when);
if (delay > 100)
SU_DEBUG_3(("thrp_udp_deliver(%p): got %p delay %f\n", thrp, tpd, delay));
else
SU_DEBUG_7(("thrp_udp_deliver(%p): got %p delay %f\n", thrp, tpd, delay));
}
if (!msg) {
tpd->tpd_errorcode = EINVAL;
return;
}
/* Prepare message for sending - i.e., encode it */
if (msg_prepare(msg) < 0) {
tpd->tpd_errorcode = errno;
return;
}
if (tpd->tpd_mtu != 0 && msg_size(msg) > tpd->tpd_mtu) {
tpd->tpd_errorcode = EMSGSIZE;
return;
}
/* Use initially the I/O vector from stack */
iov = auto_iov, iovlen = sizeof(auto_iov)/sizeof(auto_iov[0]);
/* Get a iovec for message contents */
for (;;) {
iovused = msg_iovec(msg, iov, iovlen);
if (iovused <= iovlen)
break;
iov = iov0 = realloc(iov0, sizeof(*iov) * iovused);
iovlen = iovused;
if (iov0 == NULL) {
tpd->tpd_errorcode = errno;
return;
}
}
assert(iovused > 0);
tpd->tpd_when = su_now();
if (0)
;
#if HAVE_SIGCOMP
else if (tpd->tpd_cc) {
tport_sigcomp_t sc[1] = {{ NULL }};
n = tport_sigcomp_vsend(tp, msg, iov, iovused, tpd->tpd_cc, sc);
}
#endif
else
n = tport_send_dgram(tp, msg, iov, iovused);
if (n == -1)
tpd->tpd_errorcode = su_errno();
if (iov0)
free(iov0);
}
static
void thrp_udp_send_report(su_root_magic_t *magic,
su_msg_r msg,
union tport_su_msg_arg *arg)
{
thrp_udp_deliver_t *tpd = arg->thrp_udp_deliver;
threadpool_t *thrp = tpd->tpd_thrp;
tport_t *tp = thrp->thrp_tport->pri_primary;
assert(magic != thrp);
SU_DEBUG_7(("thrp_udp_send_report(%p): got %p delay %f\n",
thrp, tpd, 1000 * su_time_diff(su_now(), tpd->tpd_when)));
if (tp->tp_master->mr_log)
tport_log_msg(tp, tpd->tpd_msg, "sent", "to", tpd->tpd_when);
if (tpd->tpd_errorcode)
tport_error_report(tp, tpd->tpd_errorcode, msg_addr(tpd->tpd_msg));
msg_ref_destroy(tpd->tpd_msg);
}