Skip to content

Commit

Permalink
Merge pull request #1979 from actonlang/fix-test-net-tcp-retry
Browse files Browse the repository at this point in the history
Fix test net tcp retry
  • Loading branch information
plajjan authored Nov 8, 2024
2 parents 0958368 + bdd21b7 commit 14faefd
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 62 deletions.
7 changes: 4 additions & 3 deletions base/src/http.act
Original file line number Diff line number Diff line change
Expand Up @@ -426,15 +426,16 @@ actor Listener(cap: net.TCPListenCap, address: str, port: int, on_accept: action
_log = logging.Logger(log_handler)
var bufs = []

def on_tcp_listener_error(listener, error):
_log.notice("There was an error with the TCPListener socket", {"error": error})
def on_tcp_listen(listener, error):
if error is not None:
_log.notice("There was an error with the TCPListener socket", {"error": error})

def on_tcp_listener_accept(conn):
s = Server(conn, on_accept, log_handler)
await async on_accept(s)
await async conn.cb_install(s.on_tcp_receive, s.on_tcp_error)

l = net.TCPListener(cap, address, port, on_tcp_listener_error, on_tcp_listener_accept)
l = net.TCPListener(cap, address, port, on_tcp_listen, on_tcp_listener_accept)


# TODO: change port to u16, when u16 has a sub-type relationship to int
Expand Down
4 changes: 2 additions & 2 deletions base/src/net.act
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def lookup_aaaa(cap: DNSCap, name: str, on_resolve: action(list[str]) -> None, o
_lookup_aaaa(name, on_resolve, on_error)


actor TCPConnection(cap: TCPConnectCap, address: str, port: int, on_connect: action(TCPConnection) -> None, on_receive: action(TCPConnection, bytes) -> None, on_error: action(TCPConnection, str) -> None, on_remote_close: ?action(TCPListenConnection) -> None, connect_timeout: float=10.0):
actor TCPConnection(cap: TCPConnectCap, address: str, port: int, on_connect: action(TCPConnection) -> None, on_receive: action(TCPConnection, bytes) -> None, on_error: action(TCPConnection, str) -> None, on_remote_close: ?action(TCPConnection) -> None, connect_timeout: float=10.0):
"""TCP IP Connection"""
var _a_res: list[str] = []
var _aaaa_res: list[str] = []
Expand Down Expand Up @@ -276,7 +276,7 @@ actor TCPListenConnection(cap: _TCPListenConnectCap, server_client: int):
NotImplemented
_init()

actor TCPListener(cap: TCPListenCap, address: str, port: int, on_error: action(TCPListener, str) -> None, on_accept: action(TCPListenConnection) -> None):
actor TCPListener(cap: TCPListenCap, address: str, port: int, on_listen: action(TCPListener, ?str) -> None, on_accept: action(TCPListenConnection) -> None):
"""TCP Listener"""
_stream = -1
def create_tcp_listen_connection(cap: _TCPListenConnectCap, client: int):
Expand Down
16 changes: 9 additions & 7 deletions base/src/net.ext.c
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ void on_new_connection(uv_stream_t *server, int status) {
char errmsg[1024] = "Error on new TCP client connection: ";
uv_strerror_r(status, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg));
log_warn(errmsg);
$action2 f = ($action2)self->on_error;
$action2 f = ($action2)self->on_listen;
f->$class->__asyn__(f, self, to$str(errmsg));
// NOTE: free() here if do manual memory management in I/O one day
return;
Expand All @@ -416,7 +416,7 @@ void on_new_connection(uv_stream_t *server, int status) {
char errmsg[1024] = "Error in accepting TCP client connection: ";
uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg));
log_warn(errmsg);
$action2 f = ($action2)self->on_error;
$action2 f = ($action2)self->on_listen;
f->$class->__asyn__(f, self, to$str(errmsg));
// NOTE: free() here if do manual memory management in I/O one day
return;
Expand All @@ -443,7 +443,7 @@ void on_new_connection(uv_stream_t *server, int status) {
} else {
B_str errmsg = $FORMAT("Address is not an IPv4 or IPv6 address: %s", fromB_str(self->address));
log_warn(fromB_str(errmsg));
$action2 f = ($action2)self->on_error;
$action2 f = ($action2)self->on_listen;
f->$class->__asyn__(f, self, errmsg);
// NOTE: free() here if do manual memory management in I/O one day
return $R_CONT(c$cont, B_None);
Expand All @@ -452,7 +452,7 @@ void on_new_connection(uv_stream_t *server, int status) {
char errmsg[1024] = "Unable to parse address: ";
uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg));
log_warn(errmsg);
$action2 f = ($action2)self->on_error;
$action2 f = ($action2)self->on_listen;
f->$class->__asyn__(f, self, to$str(errmsg));
// NOTE: free() here if do manual memory management in I/O one day
return $R_CONT(c$cont, B_None);
Expand All @@ -467,7 +467,7 @@ void on_new_connection(uv_stream_t *server, int status) {
char errmsg[1024] = "Error in TCP bind: ";
uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg));
log_warn(errmsg);
$action2 f = ($action2)self->on_error;
$action2 f = ($action2)self->on_listen;
f->$class->__asyn__(f, self, to$str(errmsg));
// NOTE: free() here if do manual memory management in I/O one day
return $R_CONT(c$cont, B_None);
Expand All @@ -478,18 +478,20 @@ void on_new_connection(uv_stream_t *server, int status) {
char errmsg[1024] = "Error in TCP listen: ";
uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg));
log_warn(errmsg);
$action2 f = ($action2)self->on_error;
$action2 f = ($action2)self->on_listen;
f->$class->__asyn__(f, self, to$str(errmsg));
// NOTE: free() here if do manual memory management in I/O one day
return $R_CONT(c$cont, B_None);
}

$action2 f = ($action2)self->on_listen;
f->$class->__asyn__(f, self, B_None);
return $R_CONT(c$cont, B_None);
}

B_NoneType netQ_TCPListenerD___resume__ (netQ_TCPListener self) {
self->_stream = to$int(-1);
$action2 f = ($action2)self->on_error;
$action2 f = ($action2)self->on_listen;
f->$class->__asyn__(f, self, to$str("resume"));
return B_None;
}
Expand Down
33 changes: 17 additions & 16 deletions test/rts_db/ddb_test_server.act
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import net

actor Tester(env, port):
var lsock = None
var _lsock = None
var i = 0

def on_listen_error(l, error):
print("Error with our listening socket on port" + str(port) + ": " + error)
if error == "resume":
pass
else:
print("Unhandled error:", error)
print("Attempting to re-establish listening socket")
lsock = init_listen()
def _on_listen(l, error):
if error is not None:
print("Error with our listening socket on port" + str(port) + ": " + error)
if error == "resume":
pass
else:
print("Unhandled error:", error)
print("Attempting to re-establish listening socket")
_lsock = _init_listen()

def on_server_accept(c):
c.cb_install(on_server_receive, on_server_error)
def _on_server_accept(c):
c.cb_install(_on_server_receive, _on_server_error)

def on_server_receive(c, data):
def _on_server_receive(c, data):
print("RECV", c, data.decode())
if data == b"GET":
response = str(i).encode()
Expand All @@ -27,18 +28,18 @@ actor Tester(env, port):
print("RECV", c, data.decode(), "RESPONSE:", b"OK")
c.write(b"OK")

def on_server_error(c, error):
def _on_server_error(c, error):
print("There was an error:", error, " from:", c)

def init_listen():
def _init_listen():
listen_cap = net.TCPListenCap(net.TCPCap(net.NetCap(env.cap)))
print("Starting to listen...")
s = net.TCPListener(listen_cap, "0.0.0.0", port, on_listen_error, on_server_accept)
s = net.TCPListener(listen_cap, "0.0.0.0", port, _on_listen, _on_server_accept)
print("NOW LISTENING ON", str(port))

return s

lsock = init_listen()
_lsock = _init_listen()


actor main(env):
Expand Down
75 changes: 41 additions & 34 deletions test/stdlib_tests/src/test_net_tcp.act
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ actor test_tcp_client_side_close(report_result, env, log_handler: logging.Handle
var port = random.randint(10000, 40000)
tcp_cap = net.TCPCap(net.NetCap(env.cap))

recv_buf: list[bytes] = []
var recv_buf: list[bytes] = []

var client = None

Expand All @@ -33,15 +33,18 @@ actor test_tcp_client_side_close(report_result, env, log_handler: logging.Handle
else:
report_result(True, None)

def on_listen_error(l, errmsg):
m = re.match(r"address already in use", errmsg)
if m is not None:
if retries > 0:
_start()
log.info("Retrying...")
return
log.error(errmsg)
report_result(False, AssertionError("listen error: " + errmsg))
def on_listen(l, error):
if error is not None:
m = re.match(r"address already in use", error)
if m is not None:
if retries > 0:
log.debug("address already in use (%d), retrying..." % port)
_start()
return
log.error(error)
report_result(False, AssertionError("listen error: " + error))
else:
_connect_client()

def on_listen_accept(s):
s.cb_install(on_server_receive, on_server_error, on_server_remote_close)
Expand All @@ -63,8 +66,7 @@ actor test_tcp_client_side_close(report_result, env, log_handler: logging.Handle
def _start():
retries -= 1
port = random.randint(10000, 40000)
server = net.TCPListener(net.TCPListenCap(tcp_cap), address, port, on_listen_error, on_listen_accept)
_connect_client()
server = net.TCPListener(net.TCPListenCap(tcp_cap), address, port, on_listen, on_listen_accept)
_start()

def _test_tcp_client_side_close(report_result: action(?bool, ?Exception) -> None , env: Env, log_handler: logging.Handler):
Expand All @@ -89,15 +91,18 @@ actor test_tcp_server_side_close(report_result, env, log_handler: logging.Handle
log.error(errmsg)
report_result(False, AssertionError("server error: " + errmsg))

def on_listen_error(l, errmsg):
m = re.match(r"address already in use", errmsg)
if m is not None:
if retries > 0:
_start()
log.info("Retrying...")
return
log.error(errmsg)
report_result(False, AssertionError("listen error: " + errmsg))
def on_listen(l, error):
if error is not None:
m = re.match(r"address already in use", error)
if m is not None:
if retries > 0:
_start()
log.info("address already in use, retrying...")
return
log.error(error)
report_result(False, AssertionError("listen error: " + error))
else:
_connect_client()

def on_listen_accept(s):
s.cb_install(on_server_receive, on_server_error, None)
Expand All @@ -110,28 +115,30 @@ actor test_tcp_server_side_close(report_result, env, log_handler: logging.Handle
def on_connect(c):
pass

def on_receive(c, payload):
recv_buf.append(payload)
def on_receive(c: net.TCPConnection, payload):
if client is not None and client is c:
recv_buf.append(payload)

def on_error(c, errmsg):
# Retry since server might not be up yet
after 0.1: _connect_client()

def on_remote_close(c):
try:
testing.assertEqual(test_payload, bytes([]).join(recv_buf), "Did not receive expected data before remote close event")
except AssertionError as ex:
report_result(False, ex)
except Exception as ex:
report_result(None, ex)
else:
report_result(True, None)
def on_remote_close(c: net.TCPConnection):
if client is not None and client is c:
try:
recv = bytes([]).join(recv_buf)
testing.assertEqual(test_payload, recv, "Did not receive expected data '%s' before remote close event, got '%s', retry: %d" % (str(test_payload), str(recv), retries))
except AssertionError as ex:
report_result(False, ex)
except Exception as ex:
report_result(None, ex)
else:
report_result(True, None)

def _start():
retries -= 1
port = random.randint(10000, 40000)
server = net.TCPListener(net.TCPListenCap(tcp_cap), address, port, on_listen_error, on_listen_accept)
_connect_client()
server = net.TCPListener(net.TCPListenCap(tcp_cap), address, port, on_listen, on_listen_accept)
_start()

def _test_tcp_server_side_close(report_result: action(?bool, ?Exception) -> None , env: Env, log_handler: logging.Handler):
Expand Down

0 comments on commit 14faefd

Please sign in to comment.