From 91dbf2f9b0e7d623bfa492b71232f47029776148 Mon Sep 17 00:00:00 2001 From: Mr_Goldberg Date: Fri, 1 May 2020 22:56:53 -0400 Subject: [PATCH] Fixed some networking api related threading --- dll/steam_networking.h | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/dll/steam_networking.h b/dll/steam_networking.h index 180214d..816a34c 100644 --- a/dll/steam_networking.h +++ b/dll/steam_networking.h @@ -72,7 +72,8 @@ public ISteamNetworking class RunEveryRunCB *run_every_runcb; std::recursive_mutex messages_mutex; - std::vector messages; + std::list messages; + std::list unprocessed_messages; std::recursive_mutex connections_edit_mutex; std::vector connections; @@ -131,6 +132,17 @@ void remove_connection(CSteamID id) } } } + + { + auto msg = std::begin(unprocessed_messages); + while (msg != std::end(unprocessed_messages)) { + if (msg->source_id() == id.ConvertToUint64()) { + msg = messages.erase(msg); + } else { + ++msg; + } + } + } } SNetSocket_t create_connection_socket(CSteamID target, int nVirtualPort, uint32 nIP, uint16 nPort, SNetListenSocket_t id=0, enum steam_socket_connection_status status=SOCKET_CONNECTING, SNetSocket_t other_id=0) @@ -817,9 +829,10 @@ void RunCallbacks() { std::lock_guard lock(messages_mutex); - for (auto &msg : messages) { - CSteamID source_id((uint64)msg.source_id()); - if (!msg.network().processed()) { + { + auto msg = std::begin(unprocessed_messages); + while (msg != std::end(unprocessed_messages)) { + CSteamID source_id((uint64)msg->source_id()); if (!connection_exists(source_id)) { if (new_connection_times.find(source_id) == new_connection_times.end()) { new_connections_to_call_cb.push(source_id); @@ -827,11 +840,13 @@ void RunCallbacks() } } else { struct Steam_Networking_Connection *conn = get_or_create_connection(source_id); - conn->open_channels.insert(msg.network().channel()); + conn->open_channels.insert(msg->network().channel()); } - msg.mutable_network()->set_processed(true); - msg.mutable_network()->set_time_processed(current_time); + msg->mutable_network()->set_processed(true); + msg->mutable_network()->set_time_processed(current_time); + messages.push_back(*msg); + msg = unprocessed_messages.erase(msg); } } @@ -897,12 +912,12 @@ void Callback(Common_Message *msg) }PRINT_DEBUG("\n"); #endif - std::lock_guard lock(messages_mutex); if (msg->network().type() == Network::DATA) { - messages.push_back(Common_Message(*msg)); + unprocessed_messages.push_back(Common_Message(*msg)); } if (msg->network().type() == Network::NEW_CONNECTION) { + std::lock_guard lock(messages_mutex); auto msg_temp = std::begin(messages); while (msg_temp != std::end(messages)) { //only delete processed to handle unreliable message arriving at the same time.