3 Commits
2.1.0 ... 2.2.2

Author SHA1 Message Date
Björn Ritzl
97cca427d7 Reduced thread name length (#39)
pthread_setname_np() restricts thread name length to 16 characters.
2021-03-18 20:41:03 +01:00
Björn Ritzl
cba11de5f1 Run DNS lookup in a separate thread (#37)
* Moved connection pool dial to a thread
* Make sure to join the thread when done
* Cleanup in AppInitialize. Early exit if pool creation failed.
* Only check connection timeout for emscripten
2021-03-17 12:59:01 +01:00
Björn Ritzl
4e67c4dfaa Add the port to the url for html5 builds (#35)
Fixes #34
2021-03-10 11:56:26 +01:00
2 changed files with 53 additions and 28 deletions

View File

@@ -8,6 +8,7 @@
#include "script_util.h" #include "script_util.h"
#include <dmsdk/dlib/connection_pool.h> #include <dmsdk/dlib/connection_pool.h>
#include <dmsdk/dlib/dns.h> #include <dmsdk/dlib/dns.h>
#include <dmsdk/dlib/thread.h>
#include <dmsdk/dlib/sslsocket.h> #include <dmsdk/dlib/sslsocket.h>
#include <ctype.h> // isprint et al #include <ctype.h> // isprint et al
@@ -193,6 +194,7 @@ static WebsocketConnection* CreateConnection(const char* url)
conn->m_Status = RESULT_OK; conn->m_Status = RESULT_OK;
conn->m_HasHandshakeData = 0; conn->m_HasHandshakeData = 0;
conn->m_HandshakeResponse = 0; conn->m_HandshakeResponse = 0;
conn->m_ConnectionThread = 0;
#if defined(HAVE_WSLAY) #if defined(HAVE_WSLAY)
conn->m_Ctx = 0; conn->m_Ctx = 0;
@@ -232,6 +234,13 @@ static void DestroyConnection(WebsocketConnection* conn)
free((void*)conn->m_Buffer); free((void*)conn->m_Buffer);
if (conn->m_ConnectionThread)
{
dmThread::Join(conn->m_ConnectionThread);
conn->m_ConnectionThread = 0;
}
delete conn; delete conn;
DebugLog(2, "DestroyConnection: %p", conn); DebugLog(2, "DestroyConnection: %p", conn);
} }
@@ -520,6 +529,7 @@ static dmExtension::Result AppInitialize(dmExtension::AppParams* params)
g_Websocket.m_Connections.SetCapacity(4); g_Websocket.m_Connections.SetCapacity(4);
g_Websocket.m_Channel = 0; g_Websocket.m_Channel = 0;
g_Websocket.m_Pool = 0; g_Websocket.m_Pool = 0;
g_Websocket.m_Initialized = 0;
dmConnectionPool::Params pool_params; dmConnectionPool::Params pool_params;
pool_params.m_MaxConnections = dmConfigFile::GetInt(params->m_ConfigFile, "websocket.max_connections", 2); pool_params.m_MaxConnections = dmConfigFile::GetInt(params->m_ConfigFile, "websocket.max_connections", 2);
@@ -532,6 +542,7 @@ static dmExtension::Result AppInitialize(dmExtension::AppParams* params)
if (dmConnectionPool::RESULT_OK != result) if (dmConnectionPool::RESULT_OK != result)
{ {
dmLogError("Failed to create connection pool: %d", result); dmLogError("Failed to create connection pool: %d", result);
return dmExtension::RESULT_INIT_ERROR;
} }
// We can do without the channel, it will then fallback to the dmSocket::GetHostname (as opposed to dmDNS::GetHostname) // We can do without the channel, it will then fallback to the dmSocket::GetHostname (as opposed to dmDNS::GetHostname)
@@ -540,23 +551,11 @@ static dmExtension::Result AppInitialize(dmExtension::AppParams* params)
if (dmDNS::RESULT_OK != dns_result) if (dmDNS::RESULT_OK != dns_result)
{ {
dmLogError("Failed to create connection pool: %d", dns_result); dmLogError("Failed to create DNS channel: %d", dns_result);
} }
#endif #endif
g_Websocket.m_Initialized = 1; g_Websocket.m_Initialized = 1;
if (!g_Websocket.m_Pool)
{
if (!g_Websocket.m_Pool)
{
dmLogInfo("pool is null!");
dmConnectionPool::Delete(g_Websocket.m_Pool);
}
dmLogInfo("%s extension not initialized", MODULE_NAME);
g_Websocket.m_Initialized = 0;
}
return dmExtension::RESULT_OK; return dmExtension::RESULT_OK;
} }
@@ -622,6 +621,19 @@ static bool CheckConnectTimeout(WebsocketConnection* conn)
return t >= conn->m_ConnectTimeout; return t >= conn->m_ConnectTimeout;
} }
static void ConnectionWorker(void* _conn)
{
WebsocketConnection* conn = (WebsocketConnection*)_conn;
dmSocket::Result sr;
dmConnectionPool::Result pool_result = dmConnectionPool::Dial(g_Websocket.m_Pool, conn->m_Url.m_Hostname, conn->m_Url.m_Port, g_Websocket.m_Channel, conn->m_SSL, g_Websocket.m_Timeout, &conn->m_Connection, &sr);
if (dmConnectionPool::RESULT_OK != pool_result)
{
CLOSE_CONN("Failed to open connection: %s", dmSocket::ResultToString(sr));
return;
}
SetState(conn, STATE_HANDSHAKE_WRITE);
}
static dmExtension::Result OnUpdate(dmExtension::Params* params) static dmExtension::Result OnUpdate(dmExtension::Params* params)
{ {
uint32_t size = g_Websocket.m_Connections.Size(); uint32_t size = g_Websocket.m_Connections.Size();
@@ -739,6 +751,13 @@ static dmExtension::Result OnUpdate(dmExtension::Params* params)
continue; continue;
} }
if (conn->m_ConnectionThread)
{
dmThread::Join(conn->m_ConnectionThread);
conn->m_ConnectionThread = 0;
}
conn->m_Socket = dmConnectionPool::GetSocket(g_Websocket.m_Pool, conn->m_Connection);
conn->m_SSLSocket = dmConnectionPool::GetSSLSocket(g_Websocket.m_Pool, conn->m_Connection);
Result result = SendClientHandshake(conn); Result result = SendClientHandshake(conn);
if (RESULT_WOULDBLOCK == result) if (RESULT_WOULDBLOCK == result)
{ {
@@ -763,11 +782,23 @@ static dmExtension::Result OnUpdate(dmExtension::Params* params)
#if defined(__EMSCRIPTEN__) #if defined(__EMSCRIPTEN__)
char uri_buffer[dmURI::MAX_URI_LEN]; char uri_buffer[dmURI::MAX_URI_LEN];
const char* uri; const char* uri;
if (conn->m_Url.m_Path[0] != '\0') { bool no_path = conn->m_Url.m_Path[0] == '\0';
dmSnPrintf(uri_buffer, sizeof(uri_buffer), "%s://%s%s", conn->m_Url.m_Scheme, conn->m_Url.m_Hostname, conn->m_Url.m_Path); bool no_port = conn->m_Url.m_Port == -1;
} else { if (no_path && no_port)
{
dmSnPrintf(uri_buffer, sizeof(uri_buffer), "%s://%s", conn->m_Url.m_Scheme, conn->m_Url.m_Hostname); dmSnPrintf(uri_buffer, sizeof(uri_buffer), "%s://%s", conn->m_Url.m_Scheme, conn->m_Url.m_Hostname);
} }
else if (no_port)
{
dmSnPrintf(uri_buffer, sizeof(uri_buffer), "%s://%s%s", conn->m_Url.m_Scheme, conn->m_Url.m_Hostname, conn->m_Url.m_Path);
}
else if (no_path)
{
dmSnPrintf(uri_buffer, sizeof(uri_buffer), "%s://%s:%d", conn->m_Url.m_Scheme, conn->m_Url.m_Hostname, conn->m_Url.m_Port);
}
else {
dmSnPrintf(uri_buffer, sizeof(uri_buffer), "%s://%s:%d%s", conn->m_Url.m_Scheme, conn->m_Url.m_Hostname, conn->m_Url.m_Port, conn->m_Url.m_Path);
}
uri = uri_buffer; uri = uri_buffer;
EmscriptenWebSocketCreateAttributes ws_attrs = { EmscriptenWebSocketCreateAttributes ws_attrs = {
@@ -787,28 +818,20 @@ static dmExtension::Result OnUpdate(dmExtension::Params* params)
emscripten_websocket_set_onerror_callback(ws, conn, Emscripten_WebSocketOnError); emscripten_websocket_set_onerror_callback(ws, conn, Emscripten_WebSocketOnError);
emscripten_websocket_set_onclose_callback(ws, conn, Emscripten_WebSocketOnClose); emscripten_websocket_set_onclose_callback(ws, conn, Emscripten_WebSocketOnClose);
emscripten_websocket_set_onmessage_callback(ws, conn, Emscripten_WebSocketOnMessage); emscripten_websocket_set_onmessage_callback(ws, conn, Emscripten_WebSocketOnMessage);
SetState(conn, STATE_CONNECTING);
#else #else
dmSocket::Result sr; conn->m_ConnectionThread = dmThread::New((dmThread::ThreadStart)ConnectionWorker, 0x80000, conn, "WSConnect");
int timeout = g_Websocket.m_Timeout;
dmConnectionPool::Result pool_result = dmConnectionPool::Dial(g_Websocket.m_Pool, conn->m_Url.m_Hostname, conn->m_Url.m_Port, g_Websocket.m_Channel, conn->m_SSL, timeout, &conn->m_Connection, &sr);
if (dmConnectionPool::RESULT_OK != pool_result)
{
CLOSE_CONN("Failed to open connection: %s", dmSocket::ResultToString(sr));
continue;
}
conn->m_Socket = dmConnectionPool::GetSocket(g_Websocket.m_Pool, conn->m_Connection);
conn->m_SSLSocket = dmConnectionPool::GetSSLSocket(g_Websocket.m_Pool, conn->m_Connection);
SetState(conn, STATE_HANDSHAKE_WRITE);
#endif #endif
SetState(conn, STATE_CONNECTING);
} }
else if (STATE_CONNECTING == conn->m_State) else if (STATE_CONNECTING == conn->m_State)
{ {
#if defined(__EMSCRIPTEN__)
if (CheckConnectTimeout(conn)) if (CheckConnectTimeout(conn))
{ {
CLOSE_CONN("Connect sequence timed out"); CLOSE_CONN("Connect sequence timed out");
continue; continue;
} }
#endif
} }
} }

View File

@@ -24,6 +24,7 @@
#include <dmsdk/dlib/dns.h> #include <dmsdk/dlib/dns.h>
#include <dmsdk/dlib/uri.h> #include <dmsdk/dlib/uri.h>
#include <dmsdk/dlib/array.h> #include <dmsdk/dlib/array.h>
#include <dmsdk/dlib/thread.h>
namespace dmCrypt namespace dmCrypt
{ {
@@ -118,6 +119,7 @@ namespace dmWebsocket
dmConnectionPool::HConnection m_Connection; dmConnectionPool::HConnection m_Connection;
dmSocket::Socket m_Socket; dmSocket::Socket m_Socket;
dmSSLSocket::Socket m_SSLSocket; dmSSLSocket::Socket m_SSLSocket;
dmThread::Thread m_ConnectionThread;
dmArray<Message> m_Messages; // lengths of the messages in the data buffer dmArray<Message> m_Messages; // lengths of the messages in the data buffer
uint64_t m_ConnectTimeout; uint64_t m_ConnectTimeout;
uint8_t m_Key[16]; uint8_t m_Key[16];