Skip to content

Commit

Permalink
TCPListener support for IPv6
Browse files Browse the repository at this point in the history
The TCPListener can now listen to an IPv4 or IPv6 address!

Added a test case for it as well but it is unfortunately disabled since
the CI environment currently does not support IPv6 (GitHub Actions do
not do IPv6 for docker containers).
  • Loading branch information
plajjan committed Aug 14, 2023
1 parent b788939 commit 8f3d8f1
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 26 deletions.
7 changes: 5 additions & 2 deletions base/src/net.act
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,11 @@ actor TCPConnection(cap: TCPConnectCap, address: str, port: int, on_connect: act
NotImplemented

def _connect_timeout():
_state = 0
_consecutive_connect_errors += 1
on_error(self, "Connection attempt failed due to timeout")
def _on_close(c):
on_error(self, "Connection attempt failed due to timeout")
close(_on_close)

def reconnect():
close(_connect)
Expand Down Expand Up @@ -187,7 +190,7 @@ actor TCPConnection(cap: TCPConnectCap, address: str, port: int, on_connect: act
bytes_in=_bytes_in,
bytes_out=_bytes_out)

# TODO: get rid of this, but how to call _on_connect4 directly?
# TODO: is it possible to directly call e.g. _on_connect4 from a non-CPSed C function? how?
var _fun_oncon4: action() -> None = _on_connect4
var _fun_oncon6: action() -> None = _on_connect6
var _fun_on_tcp_error: action(int, int, str) -> None = _on_tcp_error
Expand Down
26 changes: 22 additions & 4 deletions base/src/net.ext.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ void on_connect4(uv_connect_t *connect_req, int status) {
return;
}
$action f = self->_fun_oncon4;
//$action f = self->$class->_on_connect4;
f->$class->__asyn__(f, self);
}

Expand Down Expand Up @@ -433,8 +434,21 @@ void on_new_connection(uv_stream_t *server, int status) {
uv_tcp_init(get_uv_loop(), server);
server->data = (void *)self;
int r;
struct sockaddr_in addr;
r = uv_ip4_addr(fromB_str(self->address), from$int(self->port), &addr);
struct sockaddr_in addr4;
struct sockaddr_in6 addr6;
if (inet_pton(AF_INET, fromB_str(self->address), &(addr4.sin_addr)) == 1) {
r = uv_ip4_addr(fromB_str(self->address), from$int(self->port), &addr4);
} else if (inet_pton(AF_INET6, fromB_str(self->address), &(addr6.sin6_addr)) == 1) {
r = uv_ip6_addr(fromB_str(self->address), from$int(self->port), &addr6);
} else {
char errmsg[1024] = "Address is not an IPv4 or IPv6 address: ";
asprintf(errmsg + strlen(errmsg), "%s", fromB_str(self->address));
log_warn(errmsg);
$action2 f = self->on_error;
f->$class->__asyn__(f, self, to$str(errmsg));
// NOTE: free() here if do manual memory management in I/O one day
return $R_CONT(c$cont, B_None);
}
if (r != 0) {
char errmsg[1024] = "Unable to parse address: ";
uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg));
Expand All @@ -445,7 +459,11 @@ void on_new_connection(uv_stream_t *server, int status) {
return $R_CONT(c$cont, B_None);
}

r = uv_tcp_bind(server, (const struct sockaddr*)&addr, 0);
if (inet_pton(AF_INET, fromB_str(self->address), &(addr4.sin_addr)) == 1) {
r = uv_tcp_bind(server, (const struct sockaddr *)&addr4, 0);
} else if (inet_pton(AF_INET6, fromB_str(self->address), &(addr6.sin6_addr)) == 1) {
r = uv_tcp_bind(server, (const struct sockaddr6 *)&addr6, 0);
}
if (r != 0) {
char errmsg[1024] = "Error in TCP bind: ";
uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg));
Expand All @@ -456,7 +474,7 @@ void on_new_connection(uv_stream_t *server, int status) {
return $R_CONT(c$cont, B_None);
}

r = uv_listen((uv_stream_t*) server, 1024, on_new_connection);
r = uv_listen((uv_stream_t *)server, 1024, on_new_connection);
if (r != 0) {
char errmsg[1024] = "Error in TCP listen: ";
uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg));
Expand Down
73 changes: 53 additions & 20 deletions test/stdlib_auto/test_net_tcp.act
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ actor Server(conn):
print("There was an error:", error, " from:", c)


actor Listener(env, port):
actor Listener(env, address, port):
def on_lsock_error(l, error):
print("There was an error with the TCPListener socket:", error)

Expand All @@ -36,31 +36,20 @@ actor Listener(env, port):
c.cb_install(s.on_receive, s.on_error)

listen_cap = net.TCPListenCap(net.TCPCap(net.NetCap(env.cap)))
server = net.TCPListener(listen_cap, "0.0.0.0", port, on_lsock_error, on_server_accept)
server = net.TCPListener(listen_cap, address, port, on_lsock_error, on_server_accept)


actor Client(rts_monitor, env: Env, port: int):
actor Client(name, rts_monitor, env: Env, port: int, on_success):
var connection = 0
def on_connect(c):
connection += 1
print("Client connected, TCP handles", get_tcp_handles(rts_monitor))
print("Client %s connected to %s:%d, TCP handles: %s" % (name, c.remote_address(), port, str(get_tcp_handles(rts_monitor))))
await async c.write(b"PING")

def on_close(c):
print("closed connection", connection)
if connection == 2:
after 0.001: check_io()

def check_io():
tcp_handles = get_tcp_handles(rts_monitor)
print("IO check...", tcp_handles)
if len(tcp_handles) < 2:
# Only TCP listening socket left...
print("TCP client properly cleaned from IO loop, exiting...")
await async env.exit(0)
else:
print("TCP client not properly cleaned from IO loop, waiting...")
after 0.8: check_io()
on_success()

def on_receive(c, data):
print("Client RECV", data)
Expand Down Expand Up @@ -95,8 +84,52 @@ actor main(env):

after 2: timeout_error()

port = random.randint(10000, 20000)
print("Using port", port)
rts_monitor = acton.rts.Monitor(env)
l = Listener(env, port)
c = Client(rts_monitor, env, port)

port4 = random.randint(10000, 20000)
port6 = port4+1
print("Using ports, IPv4: %d IPv6: %d" % (port4, port6))

require_v4 = True
require_v6 = False

if require_v4:
l4 = Listener(env, "127.0.0.1", port4)
if require_v6:
l6 = Listener(env, "::1", port6)

var v4_connected = False
var v6_connected = False
var ds_connected = False

def _on_v4_connect():
v4_connected = True
if not require_v6 or v6_connected:
after 0.1: check_io()

def _on_v6_connect():
print("v6 connected")
v6_connected = True
if not require_v4 or v4_connected:
after 0.1: check_io()

def check_io():
tcp_handles = get_tcp_handles(rts_monitor)
print("IO check...", tcp_handles)
expected_tcp_handles = 0
if require_v4:
expected_tcp_handles += 1
if require_v6:
expected_tcp_handles += 1
if len(tcp_handles) <= expected_tcp_handles:
# Only TCP listening sockets left...
print("TCP client properly cleaned from IO loop, exiting...")
await async env.exit(0)
else:
print("TCP client not properly cleaned from IO loop, waiting...")
after 0.8: check_io()

if require_v4:
c4 = Client("c4", rts_monitor, env, port4, _on_v4_connect)
if require_v6:
c6 = Client("c6", rts_monitor, env, port6, _on_v6_connect)

0 comments on commit 8f3d8f1

Please sign in to comment.