Patch from Rob Charlton to use rpc:call instead of spawn and to make the registered process argument to handlecall optional

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@11542 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Andrew Thompson 2009-01-28 19:26:37 +00:00
parent 3a47504e2e
commit a6717df8d0
4 changed files with 103 additions and 14 deletions

View File

@ -122,6 +122,24 @@ void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *ta
ei_encode_switch_event_headers(ebuf, event);
}
/* function to make rpc call to remote node to retrieve a pid -
calls module:function(Ref). The response comes back as
{rex, {Ref, Pid}}
*/
int ei_pid_from_rpc(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function)
{
ei_x_buff buf;
ei_x_new(&buf);
ei_x_encode_list_header(&buf, 1);
ei_init_ref(ec, ref);
ei_x_encode_ref(&buf, ref);
ei_x_encode_empty_list(&buf);
ei_rpc_to(ec, sockfd, module, function, buf.buff, buf.index);
ei_x_free(&buf);
return 0;
}
/* function to spawn a process on a remote node */
int ei_spawn(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function, int argc, char **argv)

View File

@ -37,6 +37,8 @@
static char *MARKER = "1";
static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf);
static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
{
switch_bool_t r = SWITCH_TRUE;
@ -519,15 +521,18 @@ static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg *msg, ei
return SWITCH_STATUS_SUCCESS;
}
/* {handlecall,<uuid>,<handler process registered name>} */
static switch_status_t handle_msg_handlecall(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
/* {handlecall,<uuid>,<handler process registered name>}
or
{handlecall,<uuid>} to send messages back to the sender
*/
static switch_status_t handle_msg_handlecall(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
{
char reg_name[MAXATOMLEN];
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
if (arity != 3 ||
ei_decode_string_or_binary(buf->buff, &buf->index, SWITCH_UUID_FORMATTED_LENGTH, uuid_str) ||
ei_decode_atom(buf->buff, &buf->index, reg_name)) {
if (arity < 2 || arity > 3 ||
(arity==3 && ei_decode_atom(buf->buff, &buf->index, reg_name)) ||
ei_decode_string_or_binary(buf->buff, &buf->index, SWITCH_UUID_FORMATTED_LENGTH, uuid_str)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
@ -535,7 +540,8 @@ static switch_status_t handle_msg_handlecall(listener_t *listener, int arity, ei
switch_core_session_t *session;
if (!switch_strlen_zero(uuid_str) && (session = switch_core_session_locate(uuid_str))) {
/* create a new session list element and attach it to this listener */
if (attach_call_to_registered_process(listener, reg_name, session)) {
if ((arity==2 && attach_call_to_pid(listener, &msg->from, session)) ||
(arity==3 && attach_call_to_registered_process(listener, reg_name, session))) {
ei_x_encode_atom(rbuf, "ok");
} else {
ei_x_encode_tuple_header(rbuf, 2);
@ -551,6 +557,28 @@ static switch_status_t handle_msg_handlecall(listener_t *listener, int arity, ei
return SWITCH_STATUS_SUCCESS;
}
/* catch the response to ei_rpc_to (which comes back as {rex, {Ref, Pid}}
The {Ref,Pid} bit can be handled by handle_ref_tuple
*/
static switch_status_t handle_msg_rpcresponse(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff *buf, ei_x_buff *rbuf)
{
int type, size, arity2, tmpindex;
ei_get_type(buf->buff, &buf->index, &type, &size);
switch(type) {
case ERL_SMALL_TUPLE_EXT :
case ERL_LARGE_TUPLE_EXT :
tmpindex = buf->index;
ei_decode_tuple_header(buf->buff, &tmpindex, &arity2);
return handle_ref_tuple(listener,msg,buf,rbuf);
default:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unknown rpc response\n");
break;
}
/* no reply */
return SWITCH_STATUS_FALSE;
}
static switch_status_t handle_msg_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
{
char tupletag[MAXATOMLEN];
@ -583,7 +611,9 @@ static switch_status_t handle_msg_tuple(listener_t *listener, erlang_msg *msg, e
} else if (!strncmp(tupletag, "bind", MAXATOMLEN)) {
ret = handle_msg_bind(listener,msg,buf,rbuf);
} else if (!strncmp(tupletag, "handlecall", MAXATOMLEN)) {
ret = handle_msg_handlecall(listener,arity,buf,rbuf);
ret = handle_msg_handlecall(listener,msg,arity,buf,rbuf);
} else if (!strncmp(tupletag, "rex", MAXATOMLEN)) {
ret = handle_msg_rpcresponse(listener,msg,arity,buf,rbuf);
} else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
@ -701,7 +731,8 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, e
switch_core_hash_insert(listener->spawn_pid_hash, hash, pid);
}
return SWITCH_STATUS_SUCCESS;
/* no reply */
return SWITCH_STATUS_FALSE;
}

View File

@ -187,8 +187,8 @@ static void event_handler(switch_event_t *event)
l = lp;
lp = lp->next;
/* test all of the sessions attached to this event in case
one of them should receive it as well
/* test all of the sessions attached to this listener in case
one of them should receive the event as well
*/
send_event_to_attached_sessions(l,event);
@ -684,9 +684,10 @@ static void listener_main_loop(listener_t *listener)
ei_x_buff rbuf;
ei_x_new_with_version(&rbuf);
switch_mutex_lock(listener->sock_mutex);
/* do we need the mutex when reading? */
/*switch_mutex_lock(listener->sock_mutex);*/
status = ei_xreceive_msg_tmo(listener->sockfd, &msg, &buf, 100);
switch_mutex_unlock(listener->sock_mutex);
/*switch_mutex_unlock(listener->sock_mutex);*/
switch(status) {
case ERL_TICK :
@ -780,7 +781,9 @@ static switch_bool_t check_inbound_acl(listener_t* listener)
ei_x_encode_atom(&rbuf, "error");
ei_x_encode_atom(&rbuf, "acldeny");
switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &msg.from, rbuf.buff, rbuf.index);
switch_mutex_unlock(listener->sock_mutex);
#ifdef EI_DEBUG
ei_x_print_msg(&rbuf, &msg.from, 1);
#endif
@ -1026,6 +1029,33 @@ session_elem_t* attach_call_to_registered_process(listener_t* listener, char* re
return session_element;
}
session_elem_t* attach_call_to_pid(listener_t* listener, erlang_pid* pid, switch_core_session_t *session)
{
/* create a session list element */
session_elem_t* session_element = NULL;
if (!(session_element = switch_core_alloc(switch_core_session_get_pool(session), sizeof(*session_element)))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n");
}
else {
if (SWITCH_STATUS_SUCCESS != switch_core_session_read_lock(session)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n");
}
else {
session_element->session = session;
session_element->process.type = ERLANG_PID;
memcpy(&session_element->process.pid, pid, sizeof(erlang_pid));
switch_set_flag(session_element, LFLAG_SESSION_ALIVE);
switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT);
switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session));
switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session));
/* attach the session to the listener */
add_session_elem_to_listener(listener,session_element);
ei_link(listener, ei_self(listener->ec), pid);
}
}
return session_element;
}
session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *module, char *function, switch_core_session_t *session)
{
@ -1039,7 +1069,7 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n");
}
else {
char *argv[1], hash[100];
char hash[100];
int i = 0;
session_element->session = session;
erlang_pid *pid;
@ -1061,13 +1091,20 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul
ei_x_encode_atom(&rbuf, "new_pid");
ei_x_encode_ref(&rbuf, &ref);
ei_x_encode_pid(&rbuf, ei_self(listener->ec));
/* should lock with mutex? */
ei_reg_send(listener->ec, listener->sockfd, module, rbuf.buff, rbuf.index);
#ifdef EI_DEBUG
ei_x_print_reg_msg(&rbuf, module, 1);
#endif
ei_x_free(&rbuf);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "rpc call: %s:%s(Ref)\n", module, function);
/* should lock with mutex? */
ei_pid_from_rpc(listener->ec, listener->sockfd, &ref, module, function);
/*
char *argv[1];
ei_spawn(listener->ec, listener->sockfd, &ref, module, function, 0, argv);
*/
}
ei_hash_ref(&ref, hash);
@ -1091,7 +1128,8 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul
switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT);
switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID);
ei_link(listener, ei_self(listener->ec), pid);
/* this hangs because it can never get hold of the socket mutex */
ei_link(listener, ei_self(listener->ec), pid);
}
}
return session_element;

View File

@ -192,6 +192,7 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff
void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to);
void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event);
void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag);
int ei_pid_from_rpc(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function);
int ei_spawn(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function, int argc, char **argv);
void ei_init_ref(struct ei_cnode_s *ec, erlang_ref *ref);
void ei_x_print_reg_msg(ei_x_buff *buf, char *dest, int send);
@ -215,6 +216,7 @@ switch_status_t initialise_ei(struct ei_cnode_s *ec);
/* mod_erlang_event.c */
session_elem_t* attach_call_to_registered_process(listener_t* listener, char* reg_name, switch_core_session_t *session);
session_elem_t* attach_call_to_pid(listener_t* listener, erlang_pid* pid, switch_core_session_t *session);
session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *module, char *function, switch_core_session_t *session);
/* For Emacs: