Use the Emscripten websocket library instead of POSIX socket emulation (#33)

* Switched to direct javascript websockets

* Skip the null termination on text messages when pushing

* Moved Emscripten callbacks and some other PR improvements

* STATE_CONNECT -> STATE_CREATE

* Changed disconnect logic

* Review feedback
This commit is contained in:
Björn Ritzl
2021-02-18 22:13:53 +01:00
committed by GitHub
parent b3afb9a276
commit efe9115413
6 changed files with 724 additions and 88 deletions

View File

@@ -0,0 +1,42 @@
#include "websocket.h"
#if defined(__EMSCRIPTEN__)
namespace dmWebsocket
{
EM_BOOL Emscripten_WebSocketOnOpen(int eventType, const EmscriptenWebSocketOpenEvent *websocketEvent, void *userData) {
DebugLog(1, "WebSocket OnOpen");
WebsocketConnection* conn = (WebsocketConnection*)userData;
SetState(conn, STATE_CONNECTED);
HandleCallback(conn, EVENT_CONNECTED, 0, 0);
return EM_TRUE;
}
EM_BOOL Emscripten_WebSocketOnError(int eventType, const EmscriptenWebSocketErrorEvent *websocketEvent, void *userData) {
DebugLog(1, "WebSocket OnError");
WebsocketConnection* conn = (WebsocketConnection*)userData;
conn->m_Status = RESULT_ERROR;
SetState(conn, STATE_DISCONNECTED);
return EM_TRUE;
}
EM_BOOL Emscripten_WebSocketOnClose(int eventType, const EmscriptenWebSocketCloseEvent *websocketEvent, void *userData) {
DebugLog(1, "WebSocket OnClose");
WebsocketConnection* conn = (WebsocketConnection*)userData;
PushMessage(conn, MESSAGE_TYPE_CLOSE, 0, 0);
SetState(conn, STATE_DISCONNECTED);
return EM_TRUE;
}
EM_BOOL Emscripten_WebSocketOnMessage(int eventType, const EmscriptenWebSocketMessageEvent *websocketEvent, void *userData) {
DebugLog(1, "WebSocket OnMessage");
WebsocketConnection* conn = (WebsocketConnection*)userData;
int length = websocketEvent->numBytes;
if (websocketEvent->isText)
{
length--;
}
PushMessage(conn, MESSAGE_TYPE_NORMAL, length, websocketEvent->data);
return EM_TRUE;
}
} // namespace
#endif // __EMSCRIPTEN__

View File

@@ -13,6 +13,7 @@
#if defined(__EMSCRIPTEN__)
#include <emscripten/emscripten.h> // for EM_ASM
#include <emscripten/websocket.h>
#endif
#if defined(WIN32)
@@ -53,10 +54,12 @@ const char* ResultToString(Result err)
const char* StateToString(State err)
{
switch(err) {
STRING_CASE(STATE_CREATE);
STRING_CASE(STATE_CONNECTING);
STRING_CASE(STATE_HANDSHAKE_WRITE);
STRING_CASE(STATE_HANDSHAKE_READ);
STRING_CASE(STATE_CONNECTED);
STRING_CASE(STATE_DISCONNECTING);
STRING_CASE(STATE_DISCONNECTED);
default: return "Unknown error";
};
@@ -136,7 +139,7 @@ void DebugPrint(int level, const char* msg, const void* _bytes, uint32_t num_byt
CloseConnection(conn);
static void SetState(WebsocketConnection* conn, State state)
void SetState(WebsocketConnection* conn, State state)
{
State prev_state = conn->m_State;
if (prev_state != state)
@@ -166,8 +169,6 @@ Result SetStatus(WebsocketConnection* conn, Result status, const char* format, .
// ***************************************************************************************************
// LUA functions
static WebsocketConnection* CreateConnection(const char* url)
{
WebsocketConnection* conn = new WebsocketConnection;
@@ -183,7 +184,7 @@ static WebsocketConnection* CreateConnection(const char* url)
strcpy(conn->m_Url.m_Scheme, "wss");
conn->m_SSL = strcmp(conn->m_Url.m_Scheme, "wss") == 0 ? 1 : 0;
conn->m_State = STATE_CONNECTING;
conn->m_State = STATE_CREATE;
conn->m_Callback = 0;
conn->m_Connection = 0;
@@ -196,6 +197,9 @@ static WebsocketConnection* CreateConnection(const char* url)
#if defined(HAVE_WSLAY)
conn->m_Ctx = 0;
#endif
#if defined(__EMSCRIPTEN__)
conn->m_WS = 0;
#endif
return conn;
}
@@ -214,10 +218,9 @@ static void DestroyConnection(WebsocketConnection* conn)
dmScript::DestroyCallback(conn->m_Callback);
#if defined(__EMSCRIPTEN__)
if (conn->m_Socket != dmSocket::INVALID_SOCKET_HANDLE) {
// We would normally do a shutdown() first, but Emscripten returns ENOSYS
//dmSocket::Shutdown(conn->m_Socket, dmSocket::SHUTDOWNTYPE_READWRITE);
dmSocket::Delete(conn->m_Socket);
if (conn->m_WS)
{
emscripten_websocket_delete(conn->m_WS);
}
#else
if (conn->m_Connection)
@@ -239,21 +242,32 @@ static void CloseConnection(WebsocketConnection* conn)
// we want it to send this message in the polling
if (conn->m_State == STATE_CONNECTED) {
#if defined(HAVE_WSLAY)
// close the connection and immediately transition to the DISCONNECTED
// state
WSL_Close(conn->m_Ctx);
SetState(conn, STATE_DISCONNECTED);
#else
// start disconnecting by closing the WebSocket through the JS API
// we transition to the DISCONNECTED state when we receive the
// Emscripten callback that the connection has closed
emscripten_websocket_close(conn->m_WS, 1000, "CloseConnection");
SetState(conn, STATE_DISCONNECTING);
#endif
}
SetState(conn, STATE_DISCONNECTED);
}
static int FindConnection(WebsocketConnection* conn)
static bool IsConnectionValid(WebsocketConnection* conn)
{
for (int i = 0; i < g_Websocket.m_Connections.Size(); ++i )
if (conn)
{
if (g_Websocket.m_Connections[i] == conn)
return i;
for (int i = 0; i < g_Websocket.m_Connections.Size(); ++i )
{
if (g_Websocket.m_Connections[i] == conn)
return true;
}
}
return -1;
return false;
}
/*#
@@ -306,8 +320,7 @@ static int LuaDisconnect(lua_State* L)
WebsocketConnection* conn = (WebsocketConnection*)lua_touserdata(L, 1);
int i = FindConnection(conn);
if (i != -1)
if (IsConnectionValid(conn))
{
CloseConnection(conn);
}
@@ -326,8 +339,7 @@ static int LuaSend(lua_State* L)
WebsocketConnection* conn = (WebsocketConnection*)lua_touserdata(L, 1);
int i = FindConnection(conn);
if (i == -1)
if (!IsConnectionValid(conn))
return DM_LUA_ERROR("Invalid connection");
if (conn->m_State != STATE_CONNECTED)
@@ -346,9 +358,17 @@ static int LuaSend(lua_State* L)
wslay_event_queue_msg(conn->m_Ctx, &msg); // it makes a copy of the data
#else
dmSocket::Result sr = Send(conn, string, string_length, 0);
if (dmSocket::RESULT_OK != sr)
EMSCRIPTEN_RESULT result;
int write_mode = dmScript::CheckTableNumber(L, 3, "type", DATA_TYPE_BINARY);
if (write_mode == DATA_TYPE_BINARY)
{
result = emscripten_websocket_send_binary(conn->m_WS, (void*)string, string_length);
}
else
{
result = emscripten_websocket_send_utf8_text(conn->m_WS, string);
}
if (result)
{
CLOSE_CONN("Failed to send on websocket");
}
@@ -357,7 +377,7 @@ static int LuaSend(lua_State* L)
return 0;
}
static void HandleCallback(WebsocketConnection* conn, int event, int msg_offset, int msg_length)
void HandleCallback(WebsocketConnection* conn, int event, int msg_offset, int msg_length)
{
if (!dmScript::IsCallbackValid(conn->m_Callback))
return;
@@ -524,14 +544,6 @@ static dmExtension::Result AppInitialize(dmExtension::AppParams* params)
}
#endif
#if defined(__EMSCRIPTEN__)
// avoid mixed content warning if trying to access wss resource from http page
// If not using this, we get EHOSTUNREACH
EM_ASM({
Module["websocket"].url = window["location"]["protocol"].replace("http", "ws") + "//";
});
#endif
g_Websocket.m_Initialized = 1;
if (!g_Websocket.m_Pool)
{
@@ -561,7 +573,6 @@ static dmExtension::Result Initialize(dmExtension::Params* params)
static dmExtension::Result AppFinalize(dmExtension::AppParams* params)
{
dmConnectionPool::Shutdown(g_Websocket.m_Pool, dmSocket::SHUTDOWNTYPE_READWRITE);
return dmExtension::RESULT_OK;
}
@@ -587,17 +598,13 @@ Result PushMessage(WebsocketConnection* conn, MessageType type, int length, cons
msg.m_Length = length;
conn->m_Messages.Push(msg);
// No need to copy itself (html5)
if (buffer != (const uint8_t*)conn->m_Buffer)
if ((conn->m_BufferSize + length) >= conn->m_BufferCapacity)
{
if ((conn->m_BufferSize + length) >= conn->m_BufferCapacity)
{
conn->m_BufferCapacity = conn->m_BufferSize + length + 1;
conn->m_Buffer = (char*)realloc(conn->m_Buffer, conn->m_BufferCapacity);
}
// append to the end of the buffer
memcpy(conn->m_Buffer + conn->m_BufferSize, buffer, length);
conn->m_BufferCapacity = conn->m_BufferSize + length + 1;
conn->m_Buffer = (char*)realloc(conn->m_Buffer, conn->m_BufferCapacity);
}
// append to the end of the buffer
memcpy(conn->m_Buffer + conn->m_BufferSize, buffer, length);
conn->m_BufferSize += length;
conn->m_Buffer[conn->m_BufferCapacity-1] = 0;
@@ -647,23 +654,6 @@ static dmExtension::Result OnUpdate(dmExtension::Params* params)
CLOSE_CONN("Websocket closing for %s (%s)", conn->m_Url.m_Hostname, WSL_ResultToString(r));
continue;
}
#else
int recv_bytes = 0;
dmSocket::Result sr = Receive(conn, conn->m_Buffer, conn->m_BufferCapacity-1, &recv_bytes);
if( sr == dmSocket::RESULT_WOULDBLOCK )
{
continue;
}
if (dmSocket::RESULT_OK == sr)
{
PushMessage(conn, MESSAGE_TYPE_NORMAL, recv_bytes, (const uint8_t*)conn->m_Buffer);
}
else
{
CLOSE_CONN("Websocket failed to receive data %s", dmSocket::ResultToString(sr));
continue;
}
#endif
uint32_t offset = 0;
@@ -735,9 +725,9 @@ static dmExtension::Result OnUpdate(dmExtension::Params* params)
dmSocket::SetReceiveTimeout(conn->m_Socket, 1000);
if (conn->m_SSLSocket)
dmSSLSocket::SetReceiveTimeout(conn->m_SSLSocket, 1000);
#endif
dmSocket::SetBlocking(conn->m_Socket, false);
dmSocket::SetBlocking(conn->m_Socket, false);
#endif
SetState(conn, STATE_CONNECTED);
HandleCallback(conn, EVENT_CONNECTED, 0, 0);
}
@@ -762,7 +752,7 @@ static dmExtension::Result OnUpdate(dmExtension::Params* params)
SetState(conn, STATE_HANDSHAKE_READ);
}
else if (STATE_CONNECTING == conn->m_State)
else if (STATE_CREATE == conn->m_State)
{
if (CheckConnectTimeout(conn))
{
@@ -771,40 +761,33 @@ static dmExtension::Result OnUpdate(dmExtension::Params* params)
}
#if defined(__EMSCRIPTEN__)
conn->m_SSLSocket = dmSSLSocket::INVALID_SOCKET_HANDLE;
EM_ASM({
// https://emscripten.org/docs/porting/networking.html#emulated-posix-tcp-sockets-over-websockets
Module["websocket"]["subprotocol"] = $0 ? UTF8ToString($0) : null;
}, conn->m_Protocol);
char uri_buffer[dmURI::MAX_URI_LEN];
const char* uri;
if (conn->m_Url.m_Path[0] != '\0') {
dmSnPrintf(uri_buffer, sizeof(uri_buffer), "%s%s", conn->m_Url.m_Hostname, conn->m_Url.m_Path);
uri = uri_buffer;
dmSnPrintf(uri_buffer, sizeof(uri_buffer), "%s://%s%s", conn->m_Url.m_Scheme, conn->m_Url.m_Hostname, conn->m_Url.m_Path);
} else {
uri = conn->m_Url.m_Hostname;
dmSnPrintf(uri_buffer, sizeof(uri_buffer), "%s://%s", conn->m_Url.m_Scheme, conn->m_Url.m_Hostname);
}
uri = uri_buffer;
dmSocket::Address address;
dmSocket::Result sr = dmSocket::GetHostByName(uri, &address, true, false);
if (dmSocket::RESULT_OK != sr) {
CLOSE_CONN("Failed to get address from host name '%s': %s", uri, dmSocket::ResultToString(sr));
EmscriptenWebSocketCreateAttributes ws_attrs = {
uri,
conn->m_Protocol,
EM_TRUE
};
EMSCRIPTEN_WEBSOCKET_T ws = emscripten_websocket_new(&ws_attrs);
if (ws < 0)
{
CLOSE_CONN("Failed to connect to '%s:%d': %d", conn->m_Url.m_Hostname, (int)conn->m_Url.m_Port, ws);
continue;
}
conn->m_WS = ws;
sr = dmSocket::New(address.m_family, dmSocket::TYPE_STREAM, dmSocket::PROTOCOL_TCP, &conn->m_Socket);
if (dmSocket::RESULT_OK != sr) {
CLOSE_CONN("Failed to create socket for '%s': %s", conn->m_Url.m_Hostname, dmSocket::ResultToString(sr));
continue;
}
sr = dmSocket::Connect(conn->m_Socket, address, conn->m_Url.m_Port);
if (dmSocket::RESULT_OK != sr) {
CLOSE_CONN("Failed to connect to '%s:%d': %s", conn->m_Url.m_Hostname, (int)conn->m_Url.m_Port, dmSocket::ResultToString(sr));
continue;
}
emscripten_websocket_set_onopen_callback(ws, conn, Emscripten_WebSocketOnOpen);
emscripten_websocket_set_onerror_callback(ws, conn, Emscripten_WebSocketOnError);
emscripten_websocket_set_onclose_callback(ws, conn, Emscripten_WebSocketOnClose);
emscripten_websocket_set_onmessage_callback(ws, conn, Emscripten_WebSocketOnMessage);
SetState(conn, STATE_CONNECTING);
#else
dmSocket::Result sr;
int timeout = g_Websocket.m_Timeout;
@@ -816,9 +799,16 @@ static dmExtension::Result OnUpdate(dmExtension::Params* params)
}
conn->m_Socket = dmConnectionPool::GetSocket(g_Websocket.m_Pool, conn->m_Connection);
conn->m_SSLSocket = dmConnectionPool::GetSSLSocket(g_Websocket.m_Pool, conn->m_Connection);
#endif
SetState(conn, STATE_HANDSHAKE_WRITE);
#endif
}
else if (STATE_CONNECTING == conn->m_State)
{
if (CheckConnectTimeout(conn))
{
CLOSE_CONN("Connect sequence timed out");
continue;
}
}
}

View File

@@ -13,6 +13,10 @@
#if defined(HAVE_WSLAY)
#include <wslay/wslay.h>
#endif
#if defined(__EMSCRIPTEN__)
#include "emscripten/websocket.h"
#endif
#include <dmsdk/dlib/connection_pool.h>
@@ -35,10 +39,12 @@ namespace dmWebsocket
enum State
{
STATE_CREATE,
STATE_CONNECTING,
STATE_HANDSHAKE_WRITE,
STATE_HANDSHAKE_READ,
STATE_CONNECTED,
STATE_DISCONNECTING,
STATE_DISCONNECTED,
};
@@ -104,6 +110,9 @@ namespace dmWebsocket
dmScript::LuaCallbackInfo* m_Callback;
#if defined(HAVE_WSLAY)
wslay_event_context_ptr m_Ctx;
#endif
#if defined(__EMSCRIPTEN__)
EMSCRIPTEN_WEBSOCKET_T m_WS;
#endif
dmURI::Parts m_Url;
dmConnectionPool::HConnection m_Connection;
@@ -132,6 +141,9 @@ namespace dmWebsocket
Result SetStatus(WebsocketConnection* conn, Result status, const char* fmt, ...);
#endif
// Set socket state
void SetState(WebsocketConnection* conn, State state);
// Communication
dmSocket::Result Send(WebsocketConnection* conn, const char* buffer, int length, int* out_sent_bytes);
dmSocket::Result Receive(WebsocketConnection* conn, void* buffer, int length, int* received_bytes);
@@ -142,6 +154,9 @@ namespace dmWebsocket
Result ReceiveHeaders(WebsocketConnection* conn);
Result VerifyHeaders(WebsocketConnection* conn);
// Callback to Lua
void HandleCallback(WebsocketConnection* conn, int event, int msg_offset, int msg_length);
// Messages
Result PushMessage(WebsocketConnection* conn, MessageType type, int length, const uint8_t* msg);
@@ -157,6 +172,12 @@ namespace dmWebsocket
int WSL_GenmaskCallback(wslay_event_context_ptr ctx, uint8_t *buf, size_t len, void *user_data);
const char* WSL_ResultToString(int err);
#endif
#if defined(__EMSCRIPTEN__)
EM_BOOL Emscripten_WebSocketOnOpen(int eventType, const EmscriptenWebSocketOpenEvent *websocketEvent, void *userData);
EM_BOOL Emscripten_WebSocketOnError(int eventType, const EmscriptenWebSocketErrorEvent *websocketEvent, void *userData);
EM_BOOL Emscripten_WebSocketOnClose(int eventType, const EmscriptenWebSocketCloseEvent *websocketEvent, void *userData);
EM_BOOL Emscripten_WebSocketOnMessage(int eventType, const EmscriptenWebSocketMessageEvent *websocketEvent, void *userData);
#endif
// Random numbers (PCG)
typedef struct { uint64_t state; uint64_t inc; } pcg32_random_t;