1
0
mirror of https://github.com/pumpitupdev/pumptools.git synced 2025-02-26 06:18:14 +01:00

prinet-proxy: Fix shutdown getting stuck when thread is waiting to receive

This commit is contained in:
icex2 2021-01-24 12:28:01 +01:00
parent a81e886f0f
commit b14751a362
3 changed files with 49 additions and 22 deletions

View File

@ -1,4 +1,5 @@
#include <pthread.h> #include <pthread.h>
#include <stdatomic.h>
#include "pumpnet/prinet-proxy/client.h" #include "pumpnet/prinet-proxy/client.h"
@ -14,11 +15,13 @@ struct pumpnet_prinet_proxy_client_connection {
static const uint32_t _pumpnet_prinet_proxy_client_recv_length_timeout_ms = 500; static const uint32_t _pumpnet_prinet_proxy_client_recv_length_timeout_ms = 500;
static atomic_bool _pumpnet_prinet_proxy_client_recv_block = true;
static bool _pumpnet_prinet_proxy_client_recv_packet_length(struct pumpnet_prinet_proxy_client_connection* connection, uint32_t* length) static bool _pumpnet_prinet_proxy_client_recv_packet_length(struct pumpnet_prinet_proxy_client_connection* connection, uint32_t* length)
{ {
uint32_t packet_length; uint32_t packet_length;
while (true) { while (_pumpnet_prinet_proxy_client_recv_block) {
ssize_t read; ssize_t read;
pthread_mutex_lock(&connection->mutex); pthread_mutex_lock(&connection->mutex);
@ -39,12 +42,15 @@ static bool _pumpnet_prinet_proxy_client_recv_packet_length(struct pumpnet_prine
return false; return false;
} }
break; *length = packet_length;
return true;
} }
*length = packet_length; log_warn("Recv block disabled, exiting with error");
return true; // thread blocking disabled externally -> shutdown signaled
return false;
} }
struct pumpnet_prinet_proxy_client_connection* pumpnet_prinet_proxy_client_connection_alloc() struct pumpnet_prinet_proxy_client_connection* pumpnet_prinet_proxy_client_connection_alloc()
@ -135,6 +141,13 @@ void pumpnet_prinet_proxy_client_connection_free(struct pumpnet_prinet_proxy_cli
util_xfree((void**) connection); util_xfree((void**) connection);
} }
void pumpnet_prinet_proxy_client_exit_recv_request_blocking()
{
log_debug("Disable recv blocking");
_pumpnet_prinet_proxy_client_recv_block = false;
}
struct pumpnet_prinet_proxy_packet* pumpnet_prinet_proxy_client_recv_request(struct pumpnet_prinet_proxy_client_connection* connection) struct pumpnet_prinet_proxy_packet* pumpnet_prinet_proxy_client_recv_request(struct pumpnet_prinet_proxy_client_connection* connection)
{ {
log_assert(connection); log_assert(connection);
@ -147,8 +160,6 @@ struct pumpnet_prinet_proxy_packet* pumpnet_prinet_proxy_client_recv_request(str
return NULL; return NULL;
} }
pthread_mutex_lock(&connection->mutex);
log_debug("Received length source request (%X): %d", connection->handle, packet_length); log_debug("Received length source request (%X): %d", connection->handle, packet_length);
struct pumpnet_prinet_proxy_packet* packet = util_xmalloc(packet_length); struct pumpnet_prinet_proxy_packet* packet = util_xmalloc(packet_length);
@ -158,14 +169,18 @@ struct pumpnet_prinet_proxy_packet* pumpnet_prinet_proxy_client_recv_request(str
size_t read_pos = 0; size_t read_pos = 0;
while (true) { while (_pumpnet_prinet_proxy_client_recv_block) {
size_t remaining_size = packet->length - sizeof(uint32_t) - read_pos; size_t remaining_size = packet->length - sizeof(uint32_t) - read_pos;
pthread_mutex_lock(&connection->mutex);
ssize_t read = util_sock_tcp_recv_block( ssize_t read = util_sock_tcp_recv_block(
connection->handle, connection->handle,
((uint8_t*) packet) + sizeof(uint32_t) + read_pos, ((uint8_t*) packet) + sizeof(uint32_t) + read_pos,
remaining_size); remaining_size);
pthread_mutex_unlock(&connection->mutex);
if (read == 0) { if (read == 0) {
log_error("Unexpected remote close and no data"); log_error("Unexpected remote close and no data");
util_xfree((void**) &packet); util_xfree((void**) &packet);
@ -185,7 +200,11 @@ struct pumpnet_prinet_proxy_packet* pumpnet_prinet_proxy_client_recv_request(str
} }
} }
pthread_mutex_unlock(&connection->mutex); if (!_pumpnet_prinet_proxy_client_recv_block) {
log_warn("Recv block disabled, exiting with error");
util_xfree((void**) &packet);
return NULL;
}
return packet; return packet;
} }

View File

@ -19,6 +19,8 @@ void pumpnet_prinet_proxy_client_connection_close(struct pumpnet_prinet_proxy_cl
void pumpnet_prinet_proxy_client_connection_free(struct pumpnet_prinet_proxy_client_connection** connection); void pumpnet_prinet_proxy_client_connection_free(struct pumpnet_prinet_proxy_client_connection** connection);
void pumpnet_prinet_proxy_client_exit_recv_request_blocking();
struct pumpnet_prinet_proxy_packet* pumpnet_prinet_proxy_client_recv_request(struct pumpnet_prinet_proxy_client_connection* connection); struct pumpnet_prinet_proxy_packet* pumpnet_prinet_proxy_client_recv_request(struct pumpnet_prinet_proxy_client_connection* connection);
bool pumpnet_prinet_proxy_client_send_response(struct pumpnet_prinet_proxy_client_connection* connection, const struct pumpnet_prinet_proxy_packet* packet); bool pumpnet_prinet_proxy_client_send_response(struct pumpnet_prinet_proxy_client_connection* connection, const struct pumpnet_prinet_proxy_packet* packet);

View File

@ -1,4 +1,5 @@
#include <signal.h> #include <signal.h>
#include <stdatomic.h>
#include <stdbool.h> #include <stdbool.h>
#include "pumpnet/lib/prinet.h" #include "pumpnet/lib/prinet.h"
@ -24,7 +25,8 @@ static const size_t PUMPNET_MAX_RESP_SIZE = 1024 * 1024;
static const uint32_t KEEPALIVE_POLL_MS = 30000; static const uint32_t KEEPALIVE_POLL_MS = 30000;
static int _source_sock; static int _source_sock;
struct pumpnet_prinet_proxy_client_connection* _source_con; static struct pumpnet_prinet_proxy_client_connection* _source_con;
static atomic_bool _run_loop;
/* Compiled binary data from data folder. Symbols are defined by compiler */ /* Compiled binary data from data folder. Symbols are defined by compiler */
extern const uint8_t _binary_prime_private_key_start[]; extern const uint8_t _binary_prime_private_key_start[];
@ -36,18 +38,9 @@ static void _sigint_handler(int sig)
{ {
log_info("SIGINT, exiting"); log_info("SIGINT, exiting");
pumpnet_prinet_proxy_keepalive_shutdown(); pumpnet_prinet_proxy_client_exit_recv_request_blocking();
pumpnet_prinet_proxy_client_connection_close(_source_con); _run_loop = false;
pumpnet_prinet_proxy_client_connection_free(&_source_con);
util_sock_tcp_close(_source_sock);
sec_prinet_finit();
pumpnet_lib_prinet_shutdown();
exit(EXIT_SUCCESS);
} }
int main(int argc, char** argv) int main(int argc, char** argv)
@ -114,7 +107,9 @@ int main(int argc, char** argv)
// i guess the devs never expected to see more than a few hundred clients // i guess the devs never expected to see more than a few hundred clients
// active at the same time~ // active at the same time~
while (true) { _run_loop = true;
while (_run_loop) {
log_debug("Waiting for incoming connection..."); log_debug("Waiting for incoming connection...");
if (!pumpnet_prinet_proxy_client_connection_accept(_source_sock, _source_con)) { if (!pumpnet_prinet_proxy_client_connection_accept(_source_sock, _source_con)) {
@ -203,8 +198,19 @@ int main(int argc, char** argv)
if (source_resp != NULL) { if (source_resp != NULL) {
util_xfree((void**) &source_resp); util_xfree((void**) &source_resp);
} }
} while (inner_loop); } while (inner_loop && _run_loop);
pumpnet_prinet_proxy_client_connection_close(_source_con); pumpnet_prinet_proxy_client_connection_close(_source_con);
} }
pumpnet_prinet_proxy_keepalive_shutdown();
pumpnet_prinet_proxy_client_connection_close(_source_con);
pumpnet_prinet_proxy_client_connection_free(&_source_con);
util_sock_tcp_close(_source_sock);
sec_prinet_finit();
pumpnet_lib_prinet_shutdown();
} }