From cba11de5f1eaa28edab0695cc39a1e5d7536c16e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Ritzl?= Date: Wed, 17 Mar 2021 12:59:01 +0100 Subject: [PATCH] Run DNS lookup in a separate thread (#37) * Moved connection pool dial to a thread * Make sure to join the thread when done * Cleanup in AppInitialize. Early exit if pool creation failed. * Only check connection timeout for emscripten --- websocket/src/websocket.cpp | 61 ++++++++++++++++++++++--------------- websocket/src/websocket.h | 2 ++ 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/websocket/src/websocket.cpp b/websocket/src/websocket.cpp index 1330337..c24f27a 100644 --- a/websocket/src/websocket.cpp +++ b/websocket/src/websocket.cpp @@ -8,6 +8,7 @@ #include "script_util.h" #include #include +#include #include #include // isprint et al @@ -193,6 +194,7 @@ static WebsocketConnection* CreateConnection(const char* url) conn->m_Status = RESULT_OK; conn->m_HasHandshakeData = 0; conn->m_HandshakeResponse = 0; + conn->m_ConnectionThread = 0; #if defined(HAVE_WSLAY) conn->m_Ctx = 0; @@ -232,6 +234,13 @@ static void DestroyConnection(WebsocketConnection* conn) free((void*)conn->m_Buffer); + + if (conn->m_ConnectionThread) + { + dmThread::Join(conn->m_ConnectionThread); + conn->m_ConnectionThread = 0; + } + delete conn; DebugLog(2, "DestroyConnection: %p", conn); } @@ -520,6 +529,7 @@ static dmExtension::Result AppInitialize(dmExtension::AppParams* params) g_Websocket.m_Connections.SetCapacity(4); g_Websocket.m_Channel = 0; g_Websocket.m_Pool = 0; + g_Websocket.m_Initialized = 0; dmConnectionPool::Params pool_params; pool_params.m_MaxConnections = dmConfigFile::GetInt(params->m_ConfigFile, "websocket.max_connections", 2); @@ -532,6 +542,7 @@ static dmExtension::Result AppInitialize(dmExtension::AppParams* params) if (dmConnectionPool::RESULT_OK != result) { dmLogError("Failed to create connection pool: %d", result); + return dmExtension::RESULT_INIT_ERROR; } // We can do without the channel, it will then fallback to the dmSocket::GetHostname (as opposed to dmDNS::GetHostname) @@ -540,23 +551,11 @@ static dmExtension::Result AppInitialize(dmExtension::AppParams* params) if (dmDNS::RESULT_OK != dns_result) { - dmLogError("Failed to create connection pool: %d", dns_result); + dmLogError("Failed to create DNS channel: %d", dns_result); } #endif g_Websocket.m_Initialized = 1; - if (!g_Websocket.m_Pool) - { - if (!g_Websocket.m_Pool) - { - dmLogInfo("pool is null!"); - dmConnectionPool::Delete(g_Websocket.m_Pool); - } - - dmLogInfo("%s extension not initialized", MODULE_NAME); - g_Websocket.m_Initialized = 0; - } - return dmExtension::RESULT_OK; } @@ -622,6 +621,19 @@ static bool CheckConnectTimeout(WebsocketConnection* conn) return t >= conn->m_ConnectTimeout; } +static void ConnectionWorker(void* _conn) +{ + WebsocketConnection* conn = (WebsocketConnection*)_conn; + dmSocket::Result sr; + dmConnectionPool::Result pool_result = dmConnectionPool::Dial(g_Websocket.m_Pool, conn->m_Url.m_Hostname, conn->m_Url.m_Port, g_Websocket.m_Channel, conn->m_SSL, g_Websocket.m_Timeout, &conn->m_Connection, &sr); + if (dmConnectionPool::RESULT_OK != pool_result) + { + CLOSE_CONN("Failed to open connection: %s", dmSocket::ResultToString(sr)); + return; + } + SetState(conn, STATE_HANDSHAKE_WRITE); +} + static dmExtension::Result OnUpdate(dmExtension::Params* params) { uint32_t size = g_Websocket.m_Connections.Size(); @@ -739,6 +751,13 @@ static dmExtension::Result OnUpdate(dmExtension::Params* params) continue; } + if (conn->m_ConnectionThread) + { + dmThread::Join(conn->m_ConnectionThread); + conn->m_ConnectionThread = 0; + } + conn->m_Socket = dmConnectionPool::GetSocket(g_Websocket.m_Pool, conn->m_Connection); + conn->m_SSLSocket = dmConnectionPool::GetSSLSocket(g_Websocket.m_Pool, conn->m_Connection); Result result = SendClientHandshake(conn); if (RESULT_WOULDBLOCK == result) { @@ -799,28 +818,20 @@ static dmExtension::Result OnUpdate(dmExtension::Params* params) emscripten_websocket_set_onerror_callback(ws, conn, Emscripten_WebSocketOnError); emscripten_websocket_set_onclose_callback(ws, conn, Emscripten_WebSocketOnClose); emscripten_websocket_set_onmessage_callback(ws, conn, Emscripten_WebSocketOnMessage); - SetState(conn, STATE_CONNECTING); #else - dmSocket::Result sr; - int timeout = g_Websocket.m_Timeout; - dmConnectionPool::Result pool_result = dmConnectionPool::Dial(g_Websocket.m_Pool, conn->m_Url.m_Hostname, conn->m_Url.m_Port, g_Websocket.m_Channel, conn->m_SSL, timeout, &conn->m_Connection, &sr); - if (dmConnectionPool::RESULT_OK != pool_result) - { - CLOSE_CONN("Failed to open connection: %s", dmSocket::ResultToString(sr)); - continue; - } - conn->m_Socket = dmConnectionPool::GetSocket(g_Websocket.m_Pool, conn->m_Connection); - conn->m_SSLSocket = dmConnectionPool::GetSSLSocket(g_Websocket.m_Pool, conn->m_Connection); - SetState(conn, STATE_HANDSHAKE_WRITE); + conn->m_ConnectionThread = dmThread::New((dmThread::ThreadStart)ConnectionWorker, 0x80000, conn, "WebSocketConnectionThread"); #endif + SetState(conn, STATE_CONNECTING); } else if (STATE_CONNECTING == conn->m_State) { +#if defined(__EMSCRIPTEN__) if (CheckConnectTimeout(conn)) { CLOSE_CONN("Connect sequence timed out"); continue; } +#endif } } diff --git a/websocket/src/websocket.h b/websocket/src/websocket.h index 37766ad..4589d27 100644 --- a/websocket/src/websocket.h +++ b/websocket/src/websocket.h @@ -24,6 +24,7 @@ #include #include #include +#include namespace dmCrypt { @@ -118,6 +119,7 @@ namespace dmWebsocket dmConnectionPool::HConnection m_Connection; dmSocket::Socket m_Socket; dmSSLSocket::Socket m_SSLSocket; + dmThread::Thread m_ConnectionThread; dmArray m_Messages; // lengths of the messages in the data buffer uint64_t m_ConnectTimeout; uint8_t m_Key[16];