From f0a0e469e819509a55deec5905b6262c6e84cc0c Mon Sep 17 00:00:00 2001 From: Seven Du Date: Mon, 29 Jul 2013 20:04:32 +0800 Subject: [PATCH 1/2] add ws --- src/mod/xml_int/mod_xml_rpc/ws.c | 771 +++++++++++++++++++++++++++++++ src/mod/xml_int/mod_xml_rpc/ws.h | 99 ++++ 2 files changed, 870 insertions(+) create mode 100644 src/mod/xml_int/mod_xml_rpc/ws.c create mode 100644 src/mod/xml_int/mod_xml_rpc/ws.h diff --git a/src/mod/xml_int/mod_xml_rpc/ws.c b/src/mod/xml_int/mod_xml_rpc/ws.c new file mode 100644 index 0000000000..35fb4c0f22 --- /dev/null +++ b/src/mod/xml_int/mod_xml_rpc/ws.c @@ -0,0 +1,771 @@ +#include "ws.h" +#include + +#ifndef _MSC_VER +#include +#endif + +#define SHA1_HASH_SIZE 20 +struct globals_s globals; + +#ifndef WSS_STANDALONE + +void init_ssl(void) +{ + SSL_library_init(); +} +void deinit_ssl(void) +{ + return; +} + +#else +static unsigned long pthreads_thread_id(void); +static void pthreads_locking_callback(int mode, int type, const char *file, int line); + +static pthread_mutex_t *lock_cs; +static long *lock_count; + + + +static void thread_setup(void) +{ + int i; + + lock_cs = OPENSSL_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t)); + lock_count = OPENSSL_malloc(CRYPTO_num_locks() * sizeof(long)); + + for (i = 0; i < CRYPTO_num_locks(); i++) { + lock_count[i] = 0; + pthread_mutex_init(&(lock_cs[i]), NULL); + } + + CRYPTO_set_id_callback(pthreads_thread_id); + CRYPTO_set_locking_callback(pthreads_locking_callback); +} + +static void thread_cleanup(void) +{ + int i; + + CRYPTO_set_locking_callback(NULL); + + for (i=0; i buflen - 1) { + cplen = buflen -1; + } else { + cplen = len; + } + + strncpy(buf, v, cplen); + *(buf+cplen) = '\0'; + return 1; + } + + } + } + return 0; +} + +static int b64encode(unsigned char *in, size_t ilen, unsigned char *out, size_t olen) +{ + int y=0,bytes=0; + size_t x=0; + unsigned int b=0,l=0; + + if(olen) { + } + + for(x=0;x= 6) { + out[bytes++] = c64[(b>>(l-=6))%64]; + if(++y!=72) { + continue; + } + //out[bytes++] = '\n'; + y=0; + } + } + + if (l > 0) { + out[bytes++] = c64[((b%16)<<(6-l))%64]; + } + if (l != 0) while (l < 6) { + out[bytes++] = '=', l += 2; + } + + return 0; +} + +#ifdef NO_OPENSSL +static void sha1_digest(char *digest, unsigned char *in) +{ + SHA1Context sha; + char *p; + int x; + + + SHA1Init(&sha); + SHA1Update(&sha, in, strlen(in)); + SHA1Final(&sha, digest); +} +#else + +static void sha1_digest(unsigned char *digest, char *in) +{ + SHA_CTX sha; + + SHA1_Init(&sha); + SHA1_Update(&sha, in, strlen(in)); + SHA1_Final(digest, &sha); + +} + +#endif + +int ws_handshake(wsh_t *wsh) +{ + char key[256] = ""; + char version[5] = ""; + char proto[256] = ""; + char uri[256] = ""; + char input[256] = ""; + unsigned char output[SHA1_HASH_SIZE] = ""; + char b64[256] = ""; + char respond[512] = ""; + issize_t bytes; + char *p, *e = 0; + + if (wsh->sock == ws_sock_invalid) { + return -3; + } + + while((bytes = ws_raw_read(wsh, wsh->buffer + wsh->datalen, wsh->buflen - wsh->datalen)) > 0) { + wsh->datalen += bytes; + if (strstr(wsh->buffer, "\r\n\r\n") || strstr(wsh->buffer, "\n\n")) { + break; + } + } + + if (bytes > sizeof(wsh->buffer)) { + goto err; + } + + *(wsh->buffer+bytes) = '\0'; + + if (strncasecmp(wsh->buffer, "GET ", 4)) { + goto err; + } + + p = wsh->buffer + 4; + + e = strchr(p, ' '); + if (!e) { + goto err; + } + + strncpy(uri, p, e-p); + + cheezy_get_var(wsh->buffer, "Sec-WebSocket-Key", key, sizeof(key)); + cheezy_get_var(wsh->buffer, "Sec-WebSocket-Version", version, sizeof(version)); + cheezy_get_var(wsh->buffer, "Sec-WebSocket-Protocol", proto, sizeof(proto)); + + if (!*key) { + goto err; + } + + snprintf(input, sizeof(input), "%s%s", key, WEBSOCKET_GUID); + sha1_digest(output, input); + b64encode((unsigned char *)output, SHA1_HASH_SIZE, (unsigned char *)b64, sizeof(b64)); + + snprintf(respond, sizeof(respond), + "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: %s\r\n" + "Sec-WebSocket-Protocol: %s\r\n\r\n", + b64, + proto); + + + ws_raw_write(wsh, respond, strlen(respond)); + wsh->handshake = 1; + + return 0; + + err: + + snprintf(respond, sizeof(respond), "HTTP/1.1 400 Bad Request\r\n" + "Sec-WebSocket-Version: 13\r\n\r\n"); + + //printf("ERR:\n%s\n", respond); + + + ws_raw_write(wsh, respond, strlen(respond)); + + ws_close(wsh, WS_NONE); + + return -1; + +} + +issize_t ws_raw_read(wsh_t *wsh, void *data, size_t bytes) +{ + issize_t r; + int x = 0; + + if (wsh->ssl) { + do { + r = SSL_read(wsh->ssl, data, bytes); +#ifndef _MSC_VER + if (x++) usleep(10000); +#else + if (x++) Sleep(10); +#endif + } while (r == -1 && SSL_get_error(wsh->ssl, r) == SSL_ERROR_WANT_READ && x < 100); + + return r; + } + + do { + r = recv(wsh->sock, data, bytes, 0); +#ifndef _MSC_VER + if (x++) usleep(10000); +#else + if (x++) Sleep(10); +#endif + } while (r == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK || + errno == 35 || errno == 730035 || errno == 2 || errno == 60) && x < 100); + + if (x >= 100) { + r = -1; + } + + return r; +} + +issize_t ws_raw_write(wsh_t *wsh, void *data, size_t bytes) +{ + size_t r; + + if (wsh->ssl) { + do { + r = SSL_write(wsh->ssl, data, bytes); + } while (r == -1 && SSL_get_error(wsh->ssl, r) == SSL_ERROR_WANT_WRITE); + + return r; + } + + do { + r = send(wsh->sock, data, bytes, 0); + } while (r == -1 && (errno == EAGAIN || errno == EINTR)); + + //if (r<0) { + //printf("wRITE FAIL: %s\n", strerror(errno)); + //} + + return r; +} + +#ifdef _MSC_VER +static int setup_socket(ws_socket_t sock) +{ + unsigned long v = 1; + + if (ioctlsocket(sock, FIONBIO, &v) == SOCKET_ERROR) { + return -1; + } + + return 0; + +} + +static int restore_socket(ws_socket_t sock) +{ + unsigned long v = 0; + + if (ioctlsocket(sock, FIONBIO, &v) == SOCKET_ERROR) { + return -1; + } + + return 0; + +} + +#else + +static int setup_socket(ws_socket_t sock) +{ + int flags = fcntl(sock, F_GETFL, 0); + return fcntl(sock, F_SETFL, flags | O_NONBLOCK); +} + +static int restore_socket(ws_socket_t sock) +{ + int flags = fcntl(sock, F_GETFL, 0); + + flags &= ~O_NONBLOCK; + + return fcntl(sock, F_SETFL, flags); + +} + +#endif + + + +int ws_init(wsh_t *wsh, ws_socket_t sock, SSL_CTX *ssl_ctx, int close_sock) +{ + memset(wsh, 0, sizeof(*wsh)); + wsh->sock = sock; + + if (!ssl_ctx) { + ssl_ctx = globals.ssl_ctx; + } + + if (close_sock) { + wsh->close_sock = 1; + } + + wsh->buflen = sizeof(wsh->buffer); + wsh->secure = ssl_ctx ? 1 : 0; + + setup_socket(sock); + + if (wsh->secure) { + int code; + int sanity = 500; + + wsh->ssl = SSL_new(ssl_ctx); + assert(wsh->ssl); + + SSL_set_fd(wsh->ssl, wsh->sock); + + do { + code = SSL_accept(wsh->ssl); + + if (code == 1) { + break; + } + + if (code == 0) { + return -1; + } + + if (code < 0) { + if (code == -1 && SSL_get_error(wsh->ssl, code) != SSL_ERROR_WANT_READ) { + return -1; + } + } +#ifndef _MSC_VER + usleep(10000); +#else + Sleep(10); +#endif + + } while (--sanity > 0); + + if (!sanity) { + return -1; + } + + } + + while (!wsh->down && !wsh->handshake) { + int r = ws_handshake(wsh); + + if (r < 0) { + wsh->down = 1; + return -1; + } + } + + if (wsh->down) { + return -1; + } + + return 0; +} + +void ws_destroy(wsh_t *wsh) +{ + + if (!wsh) { + return; + } + + if (!wsh->down) { + ws_close(wsh, WS_NONE); + } + + if (wsh->down > 1) { + return; + } + + wsh->down = 2; + + if (wsh->ssl) { + int code; + do { + code = SSL_shutdown(wsh->ssl); + } while (code == -1 && SSL_get_error(wsh->ssl, code) == SSL_ERROR_WANT_READ); + + SSL_free(wsh->ssl); + wsh->ssl = NULL; + } +} + +issize_t ws_close(wsh_t *wsh, int16_t reason) +{ + + if (wsh->down) { + return -1; + } + + wsh->down = 1; + + if (reason && wsh->sock != ws_sock_invalid) { + uint16_t *u16; + uint8_t fr[4] = {WSOC_CLOSE | 0x80, 2, 0}; + + u16 = (uint16_t *) &fr[2]; + *u16 = htons((int16_t)reason); + ws_raw_write(wsh, fr, 4); + } + + restore_socket(wsh->sock); + + if (wsh->close_sock) { + close(wsh->sock); + } + + wsh->sock = ws_sock_invalid; + + return reason * -1; + +} + +issize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data) +{ + + issize_t need = 2; + char *maskp; + + again: + need = 2; + maskp = NULL; + *data = NULL; + + if (wsh->down) { + return -1; + } + + if (!wsh->handshake) { + return ws_close(wsh, WS_PROTO_ERR); + } + + if ((wsh->datalen = ws_raw_read(wsh, wsh->buffer, 14)) < need) { + if ((wsh->datalen += ws_raw_read(wsh, wsh->buffer + wsh->datalen, 14 - wsh->datalen)) < need) { + /* too small - protocol err */ + return ws_close(wsh, WS_PROTO_ERR); + } + } + + *oc = *wsh->buffer & 0xf; + + switch(*oc) { + case WSOC_CLOSE: + { + wsh->plen = wsh->buffer[1] & 0x7f; + *data = (uint8_t *) &wsh->buffer[2]; + return ws_close(wsh, 1000); + } + break; + case WSOC_CONTINUATION: + case WSOC_TEXT: + case WSOC_BINARY: + case WSOC_PING: + case WSOC_PONG: + { + //int fin = (wsh->buffer[0] >> 7) & 1; + int mask = (wsh->buffer[1] >> 7) & 1; + + if (mask) { + need += 4; + + if (need > wsh->datalen) { + /* too small - protocol err */ + *oc = WSOC_CLOSE; + return ws_close(wsh, WS_PROTO_ERR); + } + } + + wsh->plen = wsh->buffer[1] & 0x7f; + wsh->payload = &wsh->buffer[2]; + + if (wsh->plen == 127) { + uint64_t *u64; + + need += 8; + + if (need > wsh->datalen) { + /* too small - protocol err */ + *oc = WSOC_CLOSE; + return ws_close(wsh, WS_PROTO_ERR); + } + + u64 = (uint64_t *) wsh->payload; + wsh->payload += 8; + + wsh->plen = ntohl((u_long)*u64); + + } else if (wsh->plen == 126) { + uint16_t *u16; + + need += 2; + + if (need > wsh->datalen) { + /* too small - protocol err */ + *oc = WSOC_CLOSE; + return ws_close(wsh, WS_PROTO_ERR); + } + + u16 = (uint16_t *) wsh->payload; + wsh->payload += 2; + wsh->plen = ntohs(*u16); + } + + if (mask) { + maskp = (char *)wsh->payload; + wsh->payload += 4; + } + + need = (wsh->plen - (wsh->datalen - need)); + + if ((need + wsh->datalen) > (issize_t)wsh->buflen) { + /* too big - Ain't nobody got time fo' dat */ + *oc = WSOC_CLOSE; + return ws_close(wsh, WS_DATA_TOO_BIG); + } + + wsh->rplen = wsh->plen - need; + + while(need) { + issize_t r = ws_raw_read(wsh, wsh->payload + wsh->rplen, need); + + if (r < 1) { + /* invalid read - protocol err .. */ + *oc = WSOC_CLOSE; + return ws_close(wsh, WS_PROTO_ERR); + } + + wsh->datalen += r; + wsh->rplen += r; + need -= r; + } + + if (mask && maskp) { + issize_t i; + + for (i = 0; i < wsh->datalen; i++) { + wsh->payload[i] ^= maskp[i % 4]; + } + } + + + if (*oc == WSOC_PING) { + ws_write_frame(wsh, WSOC_PONG, wsh->payload, wsh->rplen); + goto again; + } + + + *(wsh->payload+wsh->rplen) = '\0'; + *data = (uint8_t *)wsh->payload; + + //printf("READ[%ld][%d]-----------------------------:\n[%s]\n-------------------------------\n", wsh->rplen, *oc, (char *)*data); + + + return wsh->rplen; + } + break; + default: + { + /* invalid op code - protocol err .. */ + *oc = WSOC_CLOSE; + return ws_close(wsh, WS_PROTO_ERR); + } + break; + } +} + +issize_t ws_feed_buf(wsh_t *wsh, void *data, size_t bytes) +{ + + if (bytes + wsh->wdatalen > wsh->buflen) { + return -1; + } + + memcpy(wsh->wbuffer + wsh->wdatalen, data, bytes); + + wsh->wdatalen += bytes; + + return bytes; +} + +issize_t ws_send_buf(wsh_t *wsh, ws_opcode_t oc) +{ + issize_t r = 0; + + if (!wsh->wdatalen) { + return -1; + } + + r = ws_write_frame(wsh, oc, wsh->wbuffer, wsh->wdatalen); + + wsh->wdatalen = 0; + + return r; +} + + +issize_t ws_write_frame(wsh_t *wsh, ws_opcode_t oc, void *data, size_t bytes) +{ + uint8_t hdr[14] = { 0 }; + size_t hlen = 2; + + if (wsh->down) { + return -1; + } + + //printf("WRITE[%ld]-----------------------------:\n[%s]\n-----------------------------------\n", bytes, (char *) data); + + hdr[0] = (uint8_t)(oc | 0x80); + + if (bytes < 126) { + hdr[1] = (uint8_t)bytes; + } else if (bytes < 0x10000) { + uint16_t *u16; + + hdr[1] = 126; + hlen += 2; + + u16 = (uint16_t *) &hdr[2]; + *u16 = htons((uint16_t) bytes); + + } else { + uint64_t *u64; + + hdr[1] = 127; + hlen += 8; + + u64 = (uint64_t *) &hdr[2]; + *u64 = htonl(bytes); + } + + if (ws_raw_write(wsh, (void *) &hdr[0], hlen) != (issize_t)hlen) { + return -1; + } + + if (ws_raw_write(wsh, data, bytes) != (issize_t)bytes) { + return -2; + } + + return bytes; +} + + diff --git a/src/mod/xml_int/mod_xml_rpc/ws.h b/src/mod/xml_int/mod_xml_rpc/ws.h new file mode 100644 index 0000000000..81368158b3 --- /dev/null +++ b/src/mod/xml_int/mod_xml_rpc/ws.h @@ -0,0 +1,99 @@ +#ifndef _WS_H +#define _WS_H + +//#define WSS_STANDALONE 1 + +#define WEBSOCKET_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" +#define B64BUFFLEN 1024 + +#include +#ifndef _MSC_VER +#include +#include +#include +#else +#pragma warning(disable:4996) +#endif +#include +#include +#include +#include +#include +#include +#include +#include +//#include "sha1.h" +#include +#include + + +struct globals_s { + const SSL_METHOD *ssl_method; + SSL_CTX *ssl_ctx; + char cert[512]; + char key[512]; +}; + +extern struct globals_s globals; + +typedef int ws_socket_t; +#define ws_sock_invalid -1 + + +typedef enum { + WS_NONE = 0, + WS_NORMAL = 1000, + WS_PROTO_ERR = 1002, + WS_DATA_TOO_BIG = 1009 +} ws_cause_t; + +typedef enum { + WSOC_CONTINUATION = 0x0, + WSOC_TEXT = 0x1, + WSOC_BINARY = 0x2, + WSOC_CLOSE = 0x8, + WSOC_PING = 0x9, + WSOC_PONG = 0xA +} ws_opcode_t; + +typedef struct wsh_s { + ws_socket_t sock; + char buffer[65536]; + char wbuffer[65536]; + size_t buflen; + issize_t datalen; + issize_t wdatalen; + char *payload; + issize_t plen; + issize_t rplen; + SSL *ssl; + int handshake; + uint8_t down; + int secure; + uint8_t close_sock; +} wsh_t; + +issize_t ws_send_buf(wsh_t *wsh, ws_opcode_t oc); +issize_t ws_feed_buf(wsh_t *wsh, void *data, size_t bytes); + + +issize_t ws_raw_read(wsh_t *wsh, void *data, size_t bytes); +issize_t ws_raw_write(wsh_t *wsh, void *data, size_t bytes); +issize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data); +issize_t ws_write_frame(wsh_t *wsh, ws_opcode_t oc, void *data, size_t bytes); +int ws_init(wsh_t *wsh, ws_socket_t sock, SSL_CTX *ssl_ctx, int close_sock); +issize_t ws_close(wsh_t *wsh, int16_t reason); +void ws_destroy(wsh_t *wsh); +void init_ssl(void); +void deinit_ssl(void); + + +#ifndef _MSC_VER +static inline uint64_t get_unaligned_uint64(const void *p) +{ + const struct { uint64_t d; } __attribute__((packed)) *pp = p; + return pp->d; +} +#endif + +#endif From 253880f586ba10f87a3a76a23ec7e4ab76bcd1df Mon Sep 17 00:00:00 2001 From: Seven Du Date: Mon, 29 Jul 2013 23:06:12 +0800 Subject: [PATCH 2/2] make websocket work with abyss --- src/mod/xml_int/mod_xml_rpc/Makefile | 4 +- src/mod/xml_int/mod_xml_rpc/mod_xml_rpc.c | 175 +++++++++++++++++++++- src/mod/xml_int/mod_xml_rpc/ws.c | 120 +++++++-------- src/mod/xml_int/mod_xml_rpc/ws.h | 9 +- 4 files changed, 245 insertions(+), 63 deletions(-) diff --git a/src/mod/xml_int/mod_xml_rpc/Makefile b/src/mod/xml_int/mod_xml_rpc/Makefile index cfc0276204..f0585dc324 100644 --- a/src/mod/xml_int/mod_xml_rpc/Makefile +++ b/src/mod/xml_int/mod_xml_rpc/Makefile @@ -60,10 +60,12 @@ $(XMLRPC_DIR)/src/xmlrpc_server_abyss.o\ $(XMLRPC_DIR)/src/xmlrpc_server_cgi.o\ $(XMLRPC_DIR)/src/xmlrpc_string.o\ $(XMLRPC_DIR)/src/xmlrpc_struct.o\ -$(XMLRPC_DIR)/lib/expat/xmltok/xmltok.o +$(XMLRPC_DIR)/lib/expat/xmltok/xmltok.o\ +ws.o LOCAL_CFLAGS = -w -I$(XMLRPC_DIR)/lib/expat/xmlparse -I$(XMLRPC_DIR)/lib/expat/xmltok -I$(XMLRPC_DIR) -I$(XMLRPC_DIR)/include LOCAL_CFLAGS+= -I$(XMLRPC_DIR)/lib/abyss/src -I$(XMLRPC_DIR)/lib/util/include -D_THREAD -D__EXTENSIONS__ +LOCAL_CFLAGS+= -I. -I../../../../libs/sofia-sip/libsofia-sip-ua/su include $(BASE)/build/modmake.rules diff --git a/src/mod/xml_int/mod_xml_rpc/mod_xml_rpc.c b/src/mod/xml_int/mod_xml_rpc/mod_xml_rpc.c index d25009f8fb..9b791b6c53 100644 --- a/src/mod/xml_int/mod_xml_rpc/mod_xml_rpc.c +++ b/src/mod/xml_int/mod_xml_rpc/mod_xml_rpc.c @@ -26,6 +26,7 @@ * Anthony Minessale II * John Wehle * Garmt Boekholt + * Seven Du * * mod_xml_rpc.c -- XML RPC * @@ -69,6 +70,7 @@ #include <../lib/abyss/src/token.h> #include <../lib/abyss/src/http.h> #include <../lib/abyss/src/session.h> +#include "ws.h" SWITCH_MODULE_LOAD_FUNCTION(mod_xml_rpc_load); SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_xml_rpc_shutdown); @@ -87,6 +89,7 @@ static struct { switch_bool_t virtual_host; TServer abyssServer; xmlrpc_registry *registryP; + switch_bool_t enable_websocket; } globals; SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_global_realm, globals.realm); @@ -126,6 +129,8 @@ static switch_status_t do_config(void) default_domain = val; } else if (!strcasecmp(var, "virtual-host")) { globals.virtual_host = switch_true(val); + } else if (!strcasecmp(var, "enable-websocket")) { + globals.enable_websocket = switch_true(val); } } } @@ -541,11 +546,160 @@ static abyss_bool http_directory_auth(TSession *r, char *domain_name) return rval; } +void stop_hook_event_handler(switch_event_t *event) { + char *json; + wsh_t *wsh = (TSession *)event->bind_user_data; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "got websocket::stophook, closing\n"); + wsh->down++; +} + +void event_handler(switch_event_t *event) { + char *json; + wsh_t *wsh = (TSession *)event->bind_user_data; + switch_event_serialize_json(event, &json); + ws_write_frame(wsh, WSOC_TEXT, json, strlen(json)); + free(json); +} + +#define MAX_EVENT_BIND_SLOTS SWITCH_EVENT_ALL + +abyss_bool websocket_hook(TSession *r) +{ + wsh_t wsh; + int ret; + int i; + ws_opcode_t opcode; + uint8_t *data; + switch_event_node_t *nodes[MAX_EVENT_BIND_SLOTS]; + int node_count = 0; + char *p; + char *key = TableFind(&r->requestHeaderFields, "sec-websocket-key"); + char *version = TableFind(&r->requestHeaderFields, "sec-websocket-version"); + char *proto = TableFind(&r->requestHeaderFields, "sec-websocket-protocol"); + char *upgrade = TableFind(&r->requestHeaderFields, "connection"); + + if (!key || !version || !proto || !upgrade) return FALSE; + if (strncasecmp(upgrade, "Upgrade", 7) || strncasecmp(proto, "websocket", 9)) return FALSE; + + for (i = 0; i < r->requestHeaderFields.size; ++i) { + TTableItem * const fieldP = &r->requestHeaderFields.item[i]; + const char * const fieldValue = fieldP->value; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "headers %s: %s\n", fieldP->name, fieldValue); + } + + ret = ws_init(&wsh, r, NULL, 0); + if (ret != 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "websocket error %d\n", ret); + return FALSE; + } + + while(!wsh.down && !wsh.handshake) { + ret = ws_handshake_kvp(&wsh, key, version, proto); + if (ret < 0) wsh.down = 1; + } + + if (ret != 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "handshake error %d\n", ret); + return FALSE; + } + + if (switch_event_bind_removable("websocket", SWITCH_EVENT_CUSTOM, "websocket::stophook", stop_hook_event_handler, &wsh, &nodes[node_count++]) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't bind!\n"); + node_count--; + } + + while (!wsh.down) { + int bytes = ws_read_frame(&wsh, &opcode, &data); + + if (bytes < 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%d %s\n", opcode, (char *)data); + switch_yield(1000); + continue; + } + + switch (opcode) { + case WSOC_CLOSE: + ws_close(&wsh, 1000); + break; + case WSOC_CONTINUATION: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "continue\n"); + continue; + case WSOC_TEXT: + p = data; + if (!p) continue; + if (!strncasecmp(data, "event ", 6)) { + switch_event_types_t type; + char *subclass; + + if (node_count == MAX_EVENT_BIND_SLOTS - 1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "cannot subscribe more than %d events\n", node_count); + continue; + } + p += 6; + if (p = strchr(p, ' ')) p++; + if (!strncasecmp(p, "json ", 5)) { + p += 5; + } else if (!strncasecmp(p, "xml ", 4)) { + p += 4; + } else if (!strncasecmp(p, "plain ", 6)) { + p += 6; + } + if (!*p) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing event type in [%s]\n", data); + break; + } else { + } + if (subclass = strchr(p, ' ')) { + *subclass++ = '\0'; + if (!*subclass) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "missing subclass\n"); + continue; + } + } else { + subclass = SWITCH_EVENT_SUBCLASS_ANY; + } + + if (switch_name_event(p, &type) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown event %s\n", p); + continue; + } + + if (switch_event_bind_removable("websocket", type, subclass, event_handler, &wsh, &nodes[node_count++]) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't bind!\n"); + node_count--; + continue; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Bind %s\n", data); + } + + } + break; + default: + break; + } + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "wsh.down = %d, node_count = %d\n", wsh.down, node_count); + + switch_yield(2000); + while (--node_count >= 0) switch_event_unbind(&nodes[node_count]); + + return FALSE; +} + abyss_bool auth_hook(TSession * r) { char *domain_name, *e; abyss_bool ret = FALSE; + if (globals.enable_websocket && !strncmp(r->requestInfo.uri, "/socket", 7)) { + // Chrome has no Authorization support yet + // https://code.google.com/p/chromium/issues/detail?id=123862 + return websocket_hook(r); + } + if (!strncmp(r->requestInfo.uri, "/domains/", 9)) { domain_name = strdup(r->requestInfo.uri + 9); switch_assert(domain_name); @@ -1059,7 +1213,8 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_xml_rpc_runtime) ServerAddHandler(&globals.abyssServer, auth_hook); ServerSetKeepaliveTimeout(&globals.abyssServer, 5); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Starting HTTP Port %d, DocRoot [%s]\n", globals.port, SWITCH_GLOBAL_dirs.htdocs_dir); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Starting HTTP Port %d, DocRoot [%s]%s\n", + globals.port, SWITCH_GLOBAL_dirs.htdocs_dir, globals.enable_websocket ? " with websocket." : ""); ServerRun(&globals.abyssServer); switch_yield(1000000); @@ -1069,10 +1224,28 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_xml_rpc_runtime) return SWITCH_STATUS_TERM; } +void stop_all_websockets() +{ + switch_event_t *event; + if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, "websocket::stophook") != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG,SWITCH_LOG_ERROR, "Failed to create event!\n"); + } + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "stop", "now"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "stopping all websockets ...\n"); + if (switch_event_fire(&event) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG,SWITCH_LOG_ERROR, "Failed to fire the event!\n"); + switch_event_destroy(&event); + return false; + } +} + /* upon module unload */ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_xml_rpc_shutdown) { + /* Cann't find a way to stop the websockets, use this for a workaround before finding the real one that works */ + stop_all_websockets(); + /* this makes the worker thread (ServerRun) stop */ ServerTerminate(&globals.abyssServer); diff --git a/src/mod/xml_int/mod_xml_rpc/ws.c b/src/mod/xml_int/mod_xml_rpc/ws.c index 35fb4c0f22..1ef76e3cc9 100644 --- a/src/mod/xml_int/mod_xml_rpc/ws.c +++ b/src/mod/xml_int/mod_xml_rpc/ws.c @@ -218,11 +218,8 @@ static void sha1_digest(unsigned char *digest, char *in) #endif -int ws_handshake(wsh_t *wsh) +int ws_handshake_kvp(wsh_t *wsh, char *key, char *version, char *proto) { - char key[256] = ""; - char version[5] = ""; - char proto[256] = ""; char uri[256] = ""; char input[256] = ""; unsigned char output[SHA1_HASH_SIZE] = ""; @@ -231,44 +228,14 @@ int ws_handshake(wsh_t *wsh) issize_t bytes; char *p, *e = 0; - if (wsh->sock == ws_sock_invalid) { + if (!wsh->tsession) { return -3; } - while((bytes = ws_raw_read(wsh, wsh->buffer + wsh->datalen, wsh->buflen - wsh->datalen)) > 0) { - wsh->datalen += bytes; - if (strstr(wsh->buffer, "\r\n\r\n") || strstr(wsh->buffer, "\n\n")) { - break; - } - } - - if (bytes > sizeof(wsh->buffer)) { + if (!*key || !*version || !*proto) { goto err; } - *(wsh->buffer+bytes) = '\0'; - - if (strncasecmp(wsh->buffer, "GET ", 4)) { - goto err; - } - - p = wsh->buffer + 4; - - e = strchr(p, ' '); - if (!e) { - goto err; - } - - strncpy(uri, p, e-p); - - cheezy_get_var(wsh->buffer, "Sec-WebSocket-Key", key, sizeof(key)); - cheezy_get_var(wsh->buffer, "Sec-WebSocket-Version", version, sizeof(version)); - cheezy_get_var(wsh->buffer, "Sec-WebSocket-Protocol", proto, sizeof(proto)); - - if (!*key) { - goto err; - } - snprintf(input, sizeof(input), "%s%s", key, WEBSOCKET_GUID); sha1_digest(output, input); b64encode((unsigned char *)output, SHA1_HASH_SIZE, (unsigned char *)b64, sizeof(b64)); @@ -282,7 +249,6 @@ int ws_handshake(wsh_t *wsh) b64, proto); - ws_raw_write(wsh, respond, strlen(respond)); wsh->handshake = 1; @@ -308,7 +274,9 @@ issize_t ws_raw_read(wsh_t *wsh, void *data, size_t bytes) { issize_t r; int x = 0; + TConn *conn = wsh->tsession->connP; +#if 0 if (wsh->ssl) { do { r = SSL_read(wsh->ssl, data, bytes); @@ -321,21 +289,50 @@ issize_t ws_raw_read(wsh_t *wsh, void *data, size_t bytes) return r; } - - do { - r = recv(wsh->sock, data, bytes, 0); -#ifndef _MSC_VER - if (x++) usleep(10000); -#else - if (x++) Sleep(10); #endif - } while (r == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK || - errno == 35 || errno == 730035 || errno == 2 || errno == 60) && x < 100); - - if (x >= 100) { - r = -1; + + if (!wsh->handshake) { + r = wsh->tsession->connP->buffersize; + memcpy(data, conn->buffer.b, r); + printf("%s\n", conn->buffer.t); + ConnReadInit(conn); + return r; + } else { + const char *readError = NULL; + + // printf(" pos=%d size=%d need=%d\n", conn->bufferpos, conn->buffersize, bytes); + + r = conn->buffersize - conn->bufferpos; + + if (r < 0) { + printf("348 Read Error %d!\n", r); + return 0; + } else if (r == 0) { + ConnRead(conn, 2, NULL, NULL, &readError); + + if (readError) { + // printf("354 Read Error %s\n", readError); + xmlrpc_strfree(readError); + return 0; + } + + r = conn->buffersize - conn->bufferpos; + } + + if (r <= bytes) { + memcpy(data, conn->buffer.b + conn->bufferpos, r); + // ConnReadInit(conn); + conn->bufferpos = conn->buffersize; + ConnReadInit(conn); + return r; + } else { + memcpy(data, conn->buffer.b + conn->bufferpos, bytes); + conn->bufferpos += bytes; + return bytes; + } + } - + return r; } @@ -351,9 +348,11 @@ issize_t ws_raw_write(wsh_t *wsh, void *data, size_t bytes) return r; } - do { - r = send(wsh->sock, data, bytes, 0); - } while (r == -1 && (errno == EAGAIN || errno == EINTR)); + if (ConnWrite(wsh->tsession->connP, data, bytes)) { + return bytes; + } else { + return 0; + } //if (r<0) { //printf("wRITE FAIL: %s\n", strerror(errno)); @@ -408,11 +407,10 @@ static int restore_socket(ws_socket_t sock) #endif - -int ws_init(wsh_t *wsh, ws_socket_t sock, SSL_CTX *ssl_ctx, int close_sock) +int ws_init(wsh_t *wsh, ws_tsession_t *tsession, SSL_CTX *ssl_ctx, int close_sock) { memset(wsh, 0, sizeof(*wsh)); - wsh->sock = sock; + wsh->tsession = tsession; if (!ssl_ctx) { ssl_ctx = globals.ssl_ctx; @@ -425,7 +423,7 @@ int ws_init(wsh_t *wsh, ws_socket_t sock, SSL_CTX *ssl_ctx, int close_sock) wsh->buflen = sizeof(wsh->buffer); wsh->secure = ssl_ctx ? 1 : 0; - setup_socket(sock); + // setup_socket(sock); if (wsh->secure) { int code; @@ -466,6 +464,7 @@ int ws_init(wsh_t *wsh, ws_socket_t sock, SSL_CTX *ssl_ctx, int close_sock) } +/* while (!wsh->down && !wsh->handshake) { int r = ws_handshake(wsh); @@ -474,6 +473,7 @@ int ws_init(wsh_t *wsh, ws_socket_t sock, SSL_CTX *ssl_ctx, int close_sock) return -1; } } +*/ if (wsh->down) { return -1; @@ -560,8 +560,10 @@ issize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data) } if ((wsh->datalen = ws_raw_read(wsh, wsh->buffer, 14)) < need) { - if ((wsh->datalen += ws_raw_read(wsh, wsh->buffer + wsh->datalen, 14 - wsh->datalen)) < need) { - /* too small - protocol err */ + while (!wsh->down && (wsh->datalen += ws_raw_read(wsh, wsh->buffer + wsh->datalen, 14 - wsh->datalen)) < need) ; + + if (0 && (wsh->datalen += ws_raw_read(wsh, wsh->buffer + wsh->datalen, 14 - wsh->datalen)) < need) { + /* too small - protocol err */ return ws_close(wsh, WS_PROTO_ERR); } } diff --git a/src/mod/xml_int/mod_xml_rpc/ws.h b/src/mod/xml_int/mod_xml_rpc/ws.h index 81368158b3..06fd3b2594 100644 --- a/src/mod/xml_int/mod_xml_rpc/ws.h +++ b/src/mod/xml_int/mod_xml_rpc/ws.h @@ -25,7 +25,10 @@ //#include "sha1.h" #include #include +#include <../lib/abyss/src/session.h> +#include <../lib/abyss/src/conn.h> +typedef TSession ws_tsession_t; struct globals_s { const SSL_METHOD *ssl_method; @@ -34,7 +37,7 @@ struct globals_s { char key[512]; }; -extern struct globals_s globals; +// extern struct globals_s globals; typedef int ws_socket_t; #define ws_sock_invalid -1 @@ -71,6 +74,7 @@ typedef struct wsh_s { uint8_t down; int secure; uint8_t close_sock; + ws_tsession_t *tsession; } wsh_t; issize_t ws_send_buf(wsh_t *wsh, ws_opcode_t oc); @@ -81,11 +85,12 @@ issize_t ws_raw_read(wsh_t *wsh, void *data, size_t bytes); issize_t ws_raw_write(wsh_t *wsh, void *data, size_t bytes); issize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data); issize_t ws_write_frame(wsh_t *wsh, ws_opcode_t oc, void *data, size_t bytes); -int ws_init(wsh_t *wsh, ws_socket_t sock, SSL_CTX *ssl_ctx, int close_sock); +int ws_init(wsh_t *wsh, ws_tsession_t *tsession, SSL_CTX *ssl_ctx, int close_sock); issize_t ws_close(wsh_t *wsh, int16_t reason); void ws_destroy(wsh_t *wsh); void init_ssl(void); void deinit_ssl(void); +int ws_handshake_kvp(wsh_t *wsh, char *key, char *version, char *proto); #ifndef _MSC_VER