Issue 8: Handle the websocket close event

This commit is contained in:
JCash 2020-09-27 16:54:22 +02:00
parent 18a768774f
commit bd8569f49a
6 changed files with 114 additions and 56 deletions

View File

@ -10,3 +10,13 @@ We recommend using a link to a zip file of a [specific release](https://github.c
## API reference ## API reference
https://defold.com/extension-websocket/api/ https://defold.com/extension-websocket/api/
## Debugging
In order to make it easier to debug this extension, we provide a `game.project` setting `websocket.debug`.
Set it to:
* `0` to disable debugging (i.e. no debug output).
* `1` to display state changes.
* `2` to display the messages sent and received.

View File

@ -20,10 +20,13 @@ local function websocket_callback(self, conn, data)
print("Connected " .. conn) print("Connected " .. conn)
-- self.connection = conn -- self.connection = conn
elseif data.event == websocket.EVENT_ERROR then elseif data.event == websocket.EVENT_ERROR then
print("Error:", data.error) print("Error:", data.message)
elseif data.event == websocket.EVENT_MESSAGE then elseif data.event == websocket.EVENT_MESSAGE then
print("Receiving: '" .. tostring(data.message) .. "'") print("Receiving: '" .. tostring(data.message) .. "'")
end end
elseif data.event == websocket.EVENT_DISCONNECTED then
print("Disconnected: '" .. tostring(data.message) .. "'")
end
end end
function init(self) function init(self)

View File

@ -56,7 +56,9 @@ dmSocket::Result Receive(WebsocketConnection* conn, void* buffer, int length, in
sr = dmSocket::Receive(conn->m_Socket, buffer, length, received_bytes); sr = dmSocket::Receive(conn->m_Socket, buffer, length, received_bytes);
int num_bytes = received_bytes ? (uint32_t)*received_bytes : 0; int num_bytes = received_bytes ? (uint32_t)*received_bytes : 0;
if (sr == dmSocket::RESULT_OK && num_bytes > 0) if (sr == dmSocket::RESULT_OK && num_bytes == 0)
return dmSocket::RESULT_CONNABORTED;
if (sr == dmSocket::RESULT_OK)
DebugPrint(2, "Received bytes:", buffer, num_bytes); DebugPrint(2, "Received bytes:", buffer, num_bytes);
return sr; return sr;

View File

@ -133,6 +133,8 @@ Result SetStatus(WebsocketConnection* conn, Result status, const char* format, .
conn->m_BufferSize = vsnprintf(conn->m_Buffer, conn->m_BufferCapacity, format, lst); conn->m_BufferSize = vsnprintf(conn->m_Buffer, conn->m_BufferCapacity, format, lst);
va_end(lst); va_end(lst);
conn->m_Status = status; conn->m_Status = status;
DebugLog(1, "STATUS: '%s' len: %u", conn->m_Buffer, conn->m_BufferSize);
} }
return status; return status;
} }
@ -337,14 +339,8 @@ static void HandleCallback(WebsocketConnection* conn, int event, int msg_offset,
lua_pushinteger(L, event); lua_pushinteger(L, event);
lua_setfield(L, -2, "event"); lua_setfield(L, -2, "event");
if (EVENT_ERROR == event) {
lua_pushlstring(L, conn->m_Buffer, conn->m_BufferSize);
lua_setfield(L, -2, "error");
}
else if (EVENT_MESSAGE == event) {
lua_pushlstring(L, conn->m_Buffer + msg_offset, msg_length); lua_pushlstring(L, conn->m_Buffer + msg_offset, msg_length);
lua_setfield(L, -2, "message"); lua_setfield(L, -2, "message");
}
dmScript::PCall(L, 3, 0); dmScript::PCall(L, 3, 0);
@ -464,11 +460,34 @@ static dmExtension::Result Finalize(dmExtension::Params* params)
return dmExtension::RESULT_OK; return dmExtension::RESULT_OK;
} }
Result PushMessage(WebsocketConnection* conn, int length) Result PushMessage(WebsocketConnection* conn, MessageType type, int length, const uint8_t* buffer)
{ {
if (conn->m_Messages.Full()) if (conn->m_Messages.Full())
conn->m_Messages.OffsetCapacity(4); conn->m_Messages.OffsetCapacity(4);
conn->m_Messages.Push(length);
Message msg;
msg.m_Type = (uint32_t)type;
msg.m_Length = length;
conn->m_Messages.Push(msg);
// No need to copy itself (html5)
if (buffer != (const uint8_t*)conn->m_Buffer)
{
if ((conn->m_BufferSize + length) >= conn->m_BufferCapacity)
{
conn->m_BufferCapacity = conn->m_BufferSize + length + 1;
conn->m_Buffer = (char*)realloc(conn->m_Buffer, conn->m_BufferCapacity);
}
// append to the end of the buffer
memcpy(conn->m_Buffer + conn->m_BufferSize, buffer, length);
}
conn->m_BufferSize += length;
conn->m_Buffer[conn->m_BufferCapacity-1] = 0;
// Instead of printing from the incoming buffer, we print from our own, to make sure it looks ok
DebugPrint(2, __FUNCTION__, conn->m_Buffer+conn->m_BufferSize-length, length);
return dmWebsocket::RESULT_OK; return dmWebsocket::RESULT_OK;
} }
@ -484,10 +503,13 @@ static dmExtension::Result OnUpdate(dmExtension::Params* params)
{ {
if (RESULT_OK != conn->m_Status) if (RESULT_OK != conn->m_Status)
{ {
HandleCallback(conn, EVENT_ERROR, 0, 0); HandleCallback(conn, EVENT_ERROR, 0, conn->m_BufferSize);
}
HandleCallback(conn, EVENT_DISCONNECTED, 0, 0); HandleCallback(conn, EVENT_DISCONNECTED, 0, 0);
}
else
{
HandleCallback(conn, EVENT_DISCONNECTED, 0, conn->m_BufferSize);
}
g_Websocket.m_Connections.EraseSwap(i); g_Websocket.m_Connections.EraseSwap(i);
--i; --i;
@ -503,12 +525,6 @@ static dmExtension::Result OnUpdate(dmExtension::Params* params)
CLOSE_CONN("Websocket closing for %s (%s)", conn->m_Url.m_Hostname, WSL_ResultToString(r)); CLOSE_CONN("Websocket closing for %s (%s)", conn->m_Url.m_Hostname, WSL_ResultToString(r));
continue; continue;
} }
r = WSL_WantsExit(conn->m_Ctx);
if (0 != r)
{
CLOSE_CONN("Websocket received close event for %s", conn->m_Url.m_Hostname);
continue;
}
#else #else
int recv_bytes = 0; int recv_bytes = 0;
dmSocket::Result sr = Receive(conn, conn->m_Buffer, conn->m_BufferCapacity-1, &recv_bytes); dmSocket::Result sr = Receive(conn, conn->m_Buffer, conn->m_BufferCapacity-1, &recv_bytes);
@ -519,9 +535,7 @@ static dmExtension::Result OnUpdate(dmExtension::Params* params)
if (dmSocket::RESULT_OK == sr) if (dmSocket::RESULT_OK == sr)
{ {
PushMessage(conn, recv_bytes); PushMessage(conn, MESSAGE_TYPE_NORMAL, recv_bytes, (const uint8_t*)conn->m_Buffer);
conn->m_BufferSize += recv_bytes;
conn->m_Buffer[conn->m_BufferCapacity-1] = 0;
} }
else else
{ {
@ -531,14 +545,32 @@ static dmExtension::Result OnUpdate(dmExtension::Params* params)
#endif #endif
uint32_t offset = 0; uint32_t offset = 0;
bool close_received = false;
for (uint32_t i = 0; i < conn->m_Messages.Size(); ++i) for (uint32_t i = 0; i < conn->m_Messages.Size(); ++i)
{ {
uint32_t length = conn->m_Messages[i]; const Message& msg = conn->m_Messages[i];
HandleCallback(conn, EVENT_MESSAGE, offset, length);
offset += length; if (EVENT_DISCONNECTED == msg.m_Type)
} {
conn->m_Status = RESULT_OK;
CloseConnection(conn);
// Put the message at the front of the buffer
conn->m_Messages.SetSize(0); conn->m_Messages.SetSize(0);
conn->m_BufferSize = 0; conn->m_BufferSize = 0;
PushMessage(conn, MESSAGE_TYPE_CLOSE, msg.m_Length, (const uint8_t*)conn->m_Buffer+offset);
close_received = true;
break;
}
HandleCallback(conn, EVENT_MESSAGE, offset, msg.m_Length);
offset += msg.m_Length;
}
if (!close_received) // saving the close message for next step
{
conn->m_Messages.SetSize(0);
conn->m_BufferSize = 0;
}
} }
else if (STATE_HANDSHAKE_READ == conn->m_State) else if (STATE_HANDSHAKE_READ == conn->m_State)
{ {

View File

@ -60,6 +60,18 @@ namespace dmWebsocket
EVENT_ERROR, EVENT_ERROR,
}; };
enum MessageType
{
MESSAGE_TYPE_NORMAL = 0,
MESSAGE_TYPE_CLOSE = 1,
};
struct Message
{
uint32_t m_Length:30;
uint32_t m_Type:2;
};
struct WebsocketConnection struct WebsocketConnection
{ {
dmScript::LuaCallbackInfo* m_Callback; dmScript::LuaCallbackInfo* m_Callback;
@ -70,7 +82,7 @@ namespace dmWebsocket
dmConnectionPool::HConnection m_Connection; dmConnectionPool::HConnection m_Connection;
dmSocket::Socket m_Socket; dmSocket::Socket m_Socket;
dmSSLSocket::Socket m_SSLSocket; dmSSLSocket::Socket m_SSLSocket;
dmArray<uint32_t> m_Messages; // lengths of the messages in the data buffer dmArray<Message> m_Messages; // lengths of the messages in the data buffer
uint8_t m_Key[16]; uint8_t m_Key[16];
State m_State; State m_State;
char* m_Buffer; char* m_Buffer;
@ -100,7 +112,7 @@ namespace dmWebsocket
Result VerifyHeaders(WebsocketConnection* conn); Result VerifyHeaders(WebsocketConnection* conn);
// Messages // Messages
Result PushMessage(WebsocketConnection* conn, int length); Result PushMessage(WebsocketConnection* conn, MessageType type, int length, const uint8_t* msg);
#if defined(HAVE_WSLAY) #if defined(HAVE_WSLAY)
// Wslay callbacks // Wslay callbacks
@ -108,7 +120,6 @@ namespace dmWebsocket
void WSL_Exit(wslay_event_context_ptr ctx); void WSL_Exit(wslay_event_context_ptr ctx);
int WSL_Close(wslay_event_context_ptr ctx); int WSL_Close(wslay_event_context_ptr ctx);
int WSL_Poll(wslay_event_context_ptr ctx); int WSL_Poll(wslay_event_context_ptr ctx);
int WSL_WantsExit(wslay_event_context_ptr ctx);
ssize_t WSL_RecvCallback(wslay_event_context_ptr ctx, uint8_t *buf, size_t len, int flags, void *user_data); ssize_t WSL_RecvCallback(wslay_event_context_ptr ctx, uint8_t *buf, size_t len, int flags, void *user_data);
ssize_t WSL_SendCallback(wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data); ssize_t WSL_SendCallback(wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data);
void WSL_OnMsgRecvCallback(wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg *arg, void *user_data); void WSL_OnMsgRecvCallback(wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg *arg, void *user_data);

View File

@ -54,8 +54,8 @@ void WSL_Exit(wslay_event_context_ptr ctx)
int WSL_Close(wslay_event_context_ptr ctx) int WSL_Close(wslay_event_context_ptr ctx)
{ {
const char* reason = "Client wants to close"; const char* reason = "";
wslay_event_queue_close(ctx, 0, (const uint8_t*)reason, strlen(reason)); wslay_event_queue_close(ctx, WSLAY_CODE_NORMAL_CLOSURE, (const uint8_t*)reason, 0);
return 0; return 0;
} }
@ -68,14 +68,6 @@ int WSL_Poll(wslay_event_context_ptr ctx)
return r; return r;
} }
int WSL_WantsExit(wslay_event_context_ptr ctx)
{
if ((wslay_event_get_close_sent(ctx) && wslay_event_get_close_received(ctx))) {
return 1;
}
return 0;
}
ssize_t WSL_RecvCallback(wslay_event_context_ptr ctx, uint8_t *buf, size_t len, int flags, void *user_data) ssize_t WSL_RecvCallback(wslay_event_context_ptr ctx, uint8_t *buf, size_t len, int flags, void *user_data)
{ {
WebsocketConnection* conn = (WebsocketConnection*)user_data; WebsocketConnection* conn = (WebsocketConnection*)user_data;
@ -132,21 +124,29 @@ void WSL_OnMsgRecvCallback(wslay_event_context_ptr ctx, const struct wslay_event
WebsocketConnection* conn = (WebsocketConnection*)user_data; WebsocketConnection* conn = (WebsocketConnection*)user_data;
if (arg->opcode == WSLAY_TEXT_FRAME || arg->opcode == WSLAY_BINARY_FRAME) if (arg->opcode == WSLAY_TEXT_FRAME || arg->opcode == WSLAY_BINARY_FRAME)
{ {
if ((conn->m_BufferSize + arg->msg_length) >= conn->m_BufferCapacity) PushMessage(conn, MESSAGE_TYPE_NORMAL, arg->msg_length, arg->msg);
{
conn->m_BufferCapacity = conn->m_BufferSize + arg->msg_length + 1;
conn->m_Buffer = (char*)realloc(conn->m_Buffer, conn->m_BufferCapacity);
}
// append to the end of the buffer
memcpy(conn->m_Buffer + conn->m_BufferSize, arg->msg, arg->msg_length);
conn->m_BufferSize += arg->msg_length;
PushMessage(conn, arg->msg_length);
DebugPrint(2, __FUNCTION__, conn->m_Buffer+conn->m_BufferSize-arg->msg_length, arg->msg_length);
} else if (arg->opcode == WSLAY_CONNECTION_CLOSE) } else if (arg->opcode == WSLAY_CONNECTION_CLOSE)
{ {
// TODO: Store the reason // The first two bytes is the close code
const uint8_t* reason = (const uint8_t*)"";
size_t len = arg->msg_length;
if (arg->msg_length > 2)
{
reason = arg->msg + 2;
len -= 2;
}
char buffer[1024];
len = dmSnPrintf(buffer, sizeof(buffer), "Server closing (%u). Reason: '%s'", wslay_event_get_status_code_received(ctx), reason);
PushMessage(conn, MESSAGE_TYPE_CLOSE, len, (const uint8_t*)buffer);
if (!wslay_event_get_close_sent(ctx))
{
wslay_event_queue_close(ctx, arg->status_code, (const uint8_t*)buffer, len);
}
DebugLog(1, "%s", buffer);
} }
} }