From c220c9457a14247c2a2ba15110edadd20fb2721c Mon Sep 17 00:00:00 2001 From: Kristian Larsson Date: Fri, 8 Nov 2024 12:07:44 +0100 Subject: [PATCH 1/3] Correct on_receive callback arg --- base/src/net.act | 2 +- test/stdlib_tests/src/test_net_tcp.act | 25 ++++++++++++++----------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/base/src/net.act b/base/src/net.act index 836f76cc1..74d64007c 100644 --- a/base/src/net.act +++ b/base/src/net.act @@ -58,7 +58,7 @@ def lookup_aaaa(cap: DNSCap, name: str, on_resolve: action(list[str]) -> None, o _lookup_aaaa(name, on_resolve, on_error) -actor TCPConnection(cap: TCPConnectCap, address: str, port: int, on_connect: action(TCPConnection) -> None, on_receive: action(TCPConnection, bytes) -> None, on_error: action(TCPConnection, str) -> None, on_remote_close: ?action(TCPListenConnection) -> None, connect_timeout: float=10.0): +actor TCPConnection(cap: TCPConnectCap, address: str, port: int, on_connect: action(TCPConnection) -> None, on_receive: action(TCPConnection, bytes) -> None, on_error: action(TCPConnection, str) -> None, on_remote_close: ?action(TCPConnection) -> None, connect_timeout: float=10.0): """TCP IP Connection""" var _a_res: list[str] = [] var _aaaa_res: list[str] = [] diff --git a/test/stdlib_tests/src/test_net_tcp.act b/test/stdlib_tests/src/test_net_tcp.act index 0e3a91c3e..efbeb2809 100644 --- a/test/stdlib_tests/src/test_net_tcp.act +++ b/test/stdlib_tests/src/test_net_tcp.act @@ -110,22 +110,25 @@ actor test_tcp_server_side_close(report_result, env, log_handler: logging.Handle def on_connect(c): pass - def on_receive(c, payload): - recv_buf.append(payload) + def on_receive(c: net.TCPConnection, payload): + if client is not None and client is c: + recv_buf.append(payload) def on_error(c, errmsg): # Retry since server might not be up yet after 0.1: _connect_client() - def on_remote_close(c): - try: - testing.assertEqual(test_payload, bytes([]).join(recv_buf), "Did not receive expected data before remote close event") - except AssertionError as ex: - report_result(False, ex) - except Exception as ex: - report_result(None, ex) - else: - report_result(True, None) + def on_remote_close(c: net.TCPConnection): + if client is not None and client is c: + try: + recv = bytes([]).join(recv_buf) + testing.assertEqual(test_payload, recv, "Did not receive expected data '%s' before remote close event, got '%s', retry: %d" % (str(test_payload), str(recv), retries)) + except AssertionError as ex: + report_result(False, ex) + except Exception as ex: + report_result(None, ex) + else: + report_result(True, None) def _start(): retries -= 1 From 8b11e2a9043015330de02d3ef91d2c901f18a2aa Mon Sep 17 00:00:00 2001 From: Kristian Larsson Date: Fri, 8 Nov 2024 12:08:27 +0100 Subject: [PATCH 2/3] Change on_listen callback to also signal success Sometimes we want to know if we succeeded in listening, so changing the on_error() callback to on_listen() and the error message is now an optional, so if it is None, it means all went well! --- base/src/http.act | 7 ++++--- base/src/net.act | 2 +- base/src/net.ext.c | 16 +++++++++------- test/rts_db/ddb_test_server.act | 33 +++++++++++++++++---------------- 4 files changed, 31 insertions(+), 27 deletions(-) diff --git a/base/src/http.act b/base/src/http.act index cced00d20..7cdae7c10 100644 --- a/base/src/http.act +++ b/base/src/http.act @@ -426,15 +426,16 @@ actor Listener(cap: net.TCPListenCap, address: str, port: int, on_accept: action _log = logging.Logger(log_handler) var bufs = [] - def on_tcp_listener_error(listener, error): - _log.notice("There was an error with the TCPListener socket", {"error": error}) + def on_tcp_listen(listener, error): + if error is not None: + _log.notice("There was an error with the TCPListener socket", {"error": error}) def on_tcp_listener_accept(conn): s = Server(conn, on_accept, log_handler) await async on_accept(s) await async conn.cb_install(s.on_tcp_receive, s.on_tcp_error) - l = net.TCPListener(cap, address, port, on_tcp_listener_error, on_tcp_listener_accept) + l = net.TCPListener(cap, address, port, on_tcp_listen, on_tcp_listener_accept) # TODO: change port to u16, when u16 has a sub-type relationship to int diff --git a/base/src/net.act b/base/src/net.act index 74d64007c..eefb120b1 100644 --- a/base/src/net.act +++ b/base/src/net.act @@ -276,7 +276,7 @@ actor TCPListenConnection(cap: _TCPListenConnectCap, server_client: int): NotImplemented _init() -actor TCPListener(cap: TCPListenCap, address: str, port: int, on_error: action(TCPListener, str) -> None, on_accept: action(TCPListenConnection) -> None): +actor TCPListener(cap: TCPListenCap, address: str, port: int, on_listen: action(TCPListener, ?str) -> None, on_accept: action(TCPListenConnection) -> None): """TCP Listener""" _stream = -1 def create_tcp_listen_connection(cap: _TCPListenConnectCap, client: int): diff --git a/base/src/net.ext.c b/base/src/net.ext.c index a3b99d36b..bb243ea91 100644 --- a/base/src/net.ext.c +++ b/base/src/net.ext.c @@ -403,7 +403,7 @@ void on_new_connection(uv_stream_t *server, int status) { char errmsg[1024] = "Error on new TCP client connection: "; uv_strerror_r(status, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg)); log_warn(errmsg); - $action2 f = ($action2)self->on_error; + $action2 f = ($action2)self->on_listen; f->$class->__asyn__(f, self, to$str(errmsg)); // NOTE: free() here if do manual memory management in I/O one day return; @@ -416,7 +416,7 @@ void on_new_connection(uv_stream_t *server, int status) { char errmsg[1024] = "Error in accepting TCP client connection: "; uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg)); log_warn(errmsg); - $action2 f = ($action2)self->on_error; + $action2 f = ($action2)self->on_listen; f->$class->__asyn__(f, self, to$str(errmsg)); // NOTE: free() here if do manual memory management in I/O one day return; @@ -443,7 +443,7 @@ void on_new_connection(uv_stream_t *server, int status) { } else { B_str errmsg = $FORMAT("Address is not an IPv4 or IPv6 address: %s", fromB_str(self->address)); log_warn(fromB_str(errmsg)); - $action2 f = ($action2)self->on_error; + $action2 f = ($action2)self->on_listen; f->$class->__asyn__(f, self, errmsg); // NOTE: free() here if do manual memory management in I/O one day return $R_CONT(c$cont, B_None); @@ -452,7 +452,7 @@ void on_new_connection(uv_stream_t *server, int status) { char errmsg[1024] = "Unable to parse address: "; uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg)); log_warn(errmsg); - $action2 f = ($action2)self->on_error; + $action2 f = ($action2)self->on_listen; f->$class->__asyn__(f, self, to$str(errmsg)); // NOTE: free() here if do manual memory management in I/O one day return $R_CONT(c$cont, B_None); @@ -467,7 +467,7 @@ void on_new_connection(uv_stream_t *server, int status) { char errmsg[1024] = "Error in TCP bind: "; uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg)); log_warn(errmsg); - $action2 f = ($action2)self->on_error; + $action2 f = ($action2)self->on_listen; f->$class->__asyn__(f, self, to$str(errmsg)); // NOTE: free() here if do manual memory management in I/O one day return $R_CONT(c$cont, B_None); @@ -478,18 +478,20 @@ void on_new_connection(uv_stream_t *server, int status) { char errmsg[1024] = "Error in TCP listen: "; uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg)); log_warn(errmsg); - $action2 f = ($action2)self->on_error; + $action2 f = ($action2)self->on_listen; f->$class->__asyn__(f, self, to$str(errmsg)); // NOTE: free() here if do manual memory management in I/O one day return $R_CONT(c$cont, B_None); } + $action2 f = ($action2)self->on_listen; + f->$class->__asyn__(f, self, B_None); return $R_CONT(c$cont, B_None); } B_NoneType netQ_TCPListenerD___resume__ (netQ_TCPListener self) { self->_stream = to$int(-1); - $action2 f = ($action2)self->on_error; + $action2 f = ($action2)self->on_listen; f->$class->__asyn__(f, self, to$str("resume")); return B_None; } diff --git a/test/rts_db/ddb_test_server.act b/test/rts_db/ddb_test_server.act index 42a3fdc34..ea55f7b78 100644 --- a/test/rts_db/ddb_test_server.act +++ b/test/rts_db/ddb_test_server.act @@ -1,22 +1,23 @@ import net actor Tester(env, port): - var lsock = None + var _lsock = None var i = 0 - def on_listen_error(l, error): - print("Error with our listening socket on port" + str(port) + ": " + error) - if error == "resume": - pass - else: - print("Unhandled error:", error) - print("Attempting to re-establish listening socket") - lsock = init_listen() + def _on_listen(l, error): + if error is not None: + print("Error with our listening socket on port" + str(port) + ": " + error) + if error == "resume": + pass + else: + print("Unhandled error:", error) + print("Attempting to re-establish listening socket") + _lsock = _init_listen() - def on_server_accept(c): - c.cb_install(on_server_receive, on_server_error) + def _on_server_accept(c): + c.cb_install(_on_server_receive, _on_server_error) - def on_server_receive(c, data): + def _on_server_receive(c, data): print("RECV", c, data.decode()) if data == b"GET": response = str(i).encode() @@ -27,18 +28,18 @@ actor Tester(env, port): print("RECV", c, data.decode(), "RESPONSE:", b"OK") c.write(b"OK") - def on_server_error(c, error): + def _on_server_error(c, error): print("There was an error:", error, " from:", c) - def init_listen(): + def _init_listen(): listen_cap = net.TCPListenCap(net.TCPCap(net.NetCap(env.cap))) print("Starting to listen...") - s = net.TCPListener(listen_cap, "0.0.0.0", port, on_listen_error, on_server_accept) + s = net.TCPListener(listen_cap, "0.0.0.0", port, _on_listen, _on_server_accept) print("NOW LISTENING ON", str(port)) return s - lsock = init_listen() + _lsock = _init_listen() actor main(env): From bdd21b7a6a1c9c115d1fee206c666708f9ab57b2 Mon Sep 17 00:00:00 2001 From: Kristian Larsson Date: Fri, 8 Nov 2024 12:12:13 +0100 Subject: [PATCH 3/3] Robustify TCP tests We get some races since we start the client and server at the same time, but the server listener might fail if the port is already taken. We now avoid that race by checking if the listener starts up correctly and only then initiating the client. Also ignoring any possible received data from older clients (which really shouldn't be able to happen!). Overall, tests should be stable now. --- test/stdlib_tests/src/test_net_tcp.act | 50 ++++++++++++++------------ 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/test/stdlib_tests/src/test_net_tcp.act b/test/stdlib_tests/src/test_net_tcp.act index efbeb2809..f9a03fcf9 100644 --- a/test/stdlib_tests/src/test_net_tcp.act +++ b/test/stdlib_tests/src/test_net_tcp.act @@ -12,7 +12,7 @@ actor test_tcp_client_side_close(report_result, env, log_handler: logging.Handle var port = random.randint(10000, 40000) tcp_cap = net.TCPCap(net.NetCap(env.cap)) - recv_buf: list[bytes] = [] + var recv_buf: list[bytes] = [] var client = None @@ -33,15 +33,18 @@ actor test_tcp_client_side_close(report_result, env, log_handler: logging.Handle else: report_result(True, None) - def on_listen_error(l, errmsg): - m = re.match(r"address already in use", errmsg) - if m is not None: - if retries > 0: - _start() - log.info("Retrying...") - return - log.error(errmsg) - report_result(False, AssertionError("listen error: " + errmsg)) + def on_listen(l, error): + if error is not None: + m = re.match(r"address already in use", error) + if m is not None: + if retries > 0: + log.debug("address already in use (%d), retrying..." % port) + _start() + return + log.error(error) + report_result(False, AssertionError("listen error: " + error)) + else: + _connect_client() def on_listen_accept(s): s.cb_install(on_server_receive, on_server_error, on_server_remote_close) @@ -63,8 +66,7 @@ actor test_tcp_client_side_close(report_result, env, log_handler: logging.Handle def _start(): retries -= 1 port = random.randint(10000, 40000) - server = net.TCPListener(net.TCPListenCap(tcp_cap), address, port, on_listen_error, on_listen_accept) - _connect_client() + server = net.TCPListener(net.TCPListenCap(tcp_cap), address, port, on_listen, on_listen_accept) _start() def _test_tcp_client_side_close(report_result: action(?bool, ?Exception) -> None , env: Env, log_handler: logging.Handler): @@ -89,15 +91,18 @@ actor test_tcp_server_side_close(report_result, env, log_handler: logging.Handle log.error(errmsg) report_result(False, AssertionError("server error: " + errmsg)) - def on_listen_error(l, errmsg): - m = re.match(r"address already in use", errmsg) - if m is not None: - if retries > 0: - _start() - log.info("Retrying...") - return - log.error(errmsg) - report_result(False, AssertionError("listen error: " + errmsg)) + def on_listen(l, error): + if error is not None: + m = re.match(r"address already in use", error) + if m is not None: + if retries > 0: + _start() + log.info("address already in use, retrying...") + return + log.error(error) + report_result(False, AssertionError("listen error: " + error)) + else: + _connect_client() def on_listen_accept(s): s.cb_install(on_server_receive, on_server_error, None) @@ -133,8 +138,7 @@ actor test_tcp_server_side_close(report_result, env, log_handler: logging.Handle def _start(): retries -= 1 port = random.randint(10000, 40000) - server = net.TCPListener(net.TCPListenCap(tcp_cap), address, port, on_listen_error, on_listen_accept) - _connect_client() + server = net.TCPListener(net.TCPListenCap(tcp_cap), address, port, on_listen, on_listen_accept) _start() def _test_tcp_server_side_close(report_result: action(?bool, ?Exception) -> None , env: Env, log_handler: logging.Handler):