diff --git a/base/rts/rts.c b/base/rts/rts.c index 889d422e8..193503e02 100644 --- a/base/rts/rts.c +++ b/base/rts/rts.c @@ -522,6 +522,9 @@ struct $ConstContG_class $ConstContG_methods = { // else immediately return false. bool ADD_waiting($Actor a, B_Msg m) { bool did_add = false; + + assert(m != NULL); + spinlock_lock(&m->$wait_lock); if (!FROZEN(m)) { a->$next = m->$waiting; @@ -1588,6 +1591,7 @@ void wt_work_cb(uv_check_t *ev) { } m->$cont = r.cont; B_Msg x = (B_Msg)r.value; + assert(x != NULL); if (ADD_waiting(current, x)) { // x->cont is a proper $Cont: x is still being processed so current was added to x->waiting rtsd_printf("## AWAIT actor %ld : %s", current->$globkey, current->$class->$GCINFO); current->$waitsfor = x; diff --git a/base/src/http.act b/base/src/http.act new file mode 100644 index 000000000..7b070a71c --- /dev/null +++ b/base/src/http.act @@ -0,0 +1,517 @@ +import acton.rts +import json +import logging +import net +import time + +responses = { + 100: b"Continue", + 101: b"Switching Protocols", + 102: b"Processing", + 103: b"Early Hints", + 200: b"OK", + 201: b"Created", + 202: b"Accepted", + 203: b"Non-Authoritative Information", + 204: b"No Content", + 205: b"Reset Content", + 206: b"Partial Content", + 207: b"Multi-Status", + 208: b"Already Reported", + 226: b"IM Used", + 300: b"Multiple Choices", + 301: b"Moved Permanently", + 302: b"Found", + 303: b"See Other", + 304: b"Not Modified", + 305: b"Use Proxy", + 306: b"(Unused)", + 307: b"Temporary Redirect", + 308: b"Permanent Redirect", + 400: b"Bad Request", + 401: b"Unauthorized", + 402: b"Payment Required", + 403: b"Forbidden", + 404: b"Not Found", + 405: b"Method Not Allowed", + 406: b"Not Acceptable", + 407: b"Proxy Authentication Required", + 408: b"Request Timeout", + 409: b"Conflict", + 410: b"Gone", + 411: b"Length Required", + 412: b"Precondition Failed", + 413: b"Content Too Large", + 414: b"URI Too Long", + 415: b"Unsupported Media Type", + 416: b"Range Not Satisfiable", + 417: b"Expectation Failed", + 418: b"(Unused)", + 421: b"Misdirected Request", + 422: b"Unprocessable Content", + 423: b"Locked", + 424: b"Failed Dependency", + 425: b"Too Early", + 426: b"Upgrade Required", + 428: b"Precondition Required", + 429: b"Too Many Requests", + 431: b"Request Header Fields Too Large", + 451: b"Unavailable For Legal Reasons", + 500: b"Internal Server Error", + 501: b"Not Implemented", + 502: b"Bad Gateway", + 503: b"Service Unavailable", + 504: b"Gateway Timeout", + 505: b"HTTP Version Not Supported", + 506: b"Variant Also Negotiates", + 507: b"Insufficient Storage", + 508: b"Loop Detected", + 510: b"Not Extended (OBSOLETED)", + 511: b"Network Authentication Required" +} + +def build_request(host: str, method: bytes, path: bytes, version: bytes, headers: dict[str, str], body: bytes) -> bytes: + r = [ method + b" " + path + b" HTTP/" + version ] + lheaders: dict[str, str] = {} + for k in headers: + lheaders[k.lower()] = k + + if "host" not in lheaders: + headers["Host"] = host + if "user-agent" not in lheaders: + headers["User-Agent"] = "Acton HTTP Client" + if "accept" not in lheaders: + headers["Accept"] = "*/*" +# if "accept-encoding" not in lheaders: +# headers["Accept-Encoding"] = "gzip, deflate" + if "connection" not in lheaders: + headers["Connection"] = "keep-alive" + if "content-length" not in lheaders: + headers["Content-Length"] = str(len(body)) + + for k, v in headers.items(): + r.append(k.encode() + b": " + v.encode()) + + r.append(b"\r\n") + res = b"\r\n".join(r) + if len(body) > 0: + res += body + return res + + +def build_response(version: bytes, status: int, headers: dict[str, str], body: str): + b = body.encode() + # TODO: Add Connection? + status_line: bytes = b"HTTP/" + version + b" " + str(status).encode() + if status in responses: + status_line += b" " + responses[status] + + r = [ status_line ] + if "server" not in headers: + headers["Server"] = "Acton HTTP Server" + if "content-type" not in headers: + headers["Content-type"] = "text/html; charset=utf-8" + if "date" not in headers: + headers["Date"] = time.now().str_rfc1123() + + for k, v in headers.items(): + if k.lower() == "content-length": + # Disregard content-length, we'll compute it from body length + continue + r.append(k.encode() + b": " + v.encode()) + + r.append(b"Content-Length: " + str(len(b)).encode()) + r.append(b"\r\n") + res = b"\r\n".join(r) + if len(b) > 0: + res += b + + return res + + +class Request(object): + @property + method: str + @property + path: str + @property + version: bytes + @property + headers: dict[str, str] + @property + body: bytes + + def __init__(self, method: str, path: str, version: bytes, headers: dict[str, str], body: bytes): + self.method = method + self.path = path + self.version = version + self.headers = headers + self.body = body + + def __str__(self): + return "" + + +class Response(object): + @property + method: str + @property + version: bytes + @property + status: int + @property + headers: dict[str, str] + @property + body: bytes + + def __init__(self, method: str, version: bytes, status: int, headers: dict[str, str], body: bytes) -> None: + self.version = version + self.status = status + self.headers = headers + self.body = body + + def __str__(self) -> str: + return "" + + def to_json(self): + return json.decode(self.body.decode()) + + + +# TODO: separate main part into parse_message() +def parse_response(i: bytes) -> (?Response, bytes): + rs = i.split(b"\r\n\r\n", 1) + if len(rs) == 1: + # Not enough data + return None, i + else: + qlines = rs[0].split(b"\r\n", None) + start_line = qlines[0].rstrip(b"\r\n") + slparts = start_line.split(b" ", None) + verparts = slparts[0].split(b"/", 1) + if len(verparts) != 2: + # invalid request + # TODO: actually HTTP 0.9 might only have 2 parts, but we don't support that + return None, b"" + version = verparts[1] + if version != b"1.1" and version != b"1.0": + return None, b"" + + status = int(slparts[1].decode()) + + # Parse headers + headers : dict[str, str] = {} + for hline in qlines[1:]: + hv = hline.split(b":", 1) + if len(hv) == 1: + # TODO: silently ignore or explicitly throw error or something? + pass + else: + hname = hv[0].decode().strip(" ").lower() + headers[hname] = hv[1].decode().strip(" ") + + # TODO: why do we have to init body & rest here? seems we segfault otherwise... + body = b"" + rest = b"" + if "content-length" in headers: + blen = int(headers["content-length"].strip(" ")) + if len(rs[1]) >= blen: + body = rs[1][:blen] + rest = rs[1][blen:] + else: + return None, i + else: + body = b"" + rest = rs[1] + # TODO: sanity check slparts components first to avoid weird things when decoding + r = Response("GABBA", version, status, headers, body) + return r, rest + + +def parse_request(i: bytes) -> (?Request, bytes): + qs = i.split(b"\r\n\r\n", 1) + if len(qs) == 1: + # Not enough data + return None, i + else: + qlines = qs[0].split(b"\r\n", None) + start_line = qlines[0].rstrip(b"\r\n") + slparts = start_line.split(b" ", None) + verparts = slparts[2].split(b"/", 1) + if len(verparts) != 2: + # invalid request + # TODO: actually HTTP 0.9 might only have 2 parts, but we don't support that + return None, b"" + version = verparts[1] + if version != b"1.1" and version != b"1.0": + return None, b"" + + # Parse headers + hs : dict[str, str] = {} + for hline in qlines[1:]: + hv = hline.split(b":", 1) + if len(hv) == 1: + # TODO: silently ignore or explicitly throw error or something? + pass + else: + hname = hv[0].decode().lower() + hs[hname] = hv[1].decode() + + # TODO: why do we have to init body & rest here? seems we segfault otherwise... + body = b"" + rest = b"" + if "content-length" in hs: + blen = int(hs["content-length"].strip(" ")) + if len(qs[1]) >= blen: + body = qs[1][:blen] + rest = qs[1][blen:] + else: + return None, i + else: + body = b"" + rest = qs[1] + # TODO: sanity check slparts components first to avoid weird things when decoding + req = Request(slparts[0].decode(), slparts[1].decode(), version, hs, body) + return req, rest + +actor Server(conn: net.TCPListenConnection, on_accept: action(Server) -> None, log_handler: ?logging.Handler): + """Server serves a single client connection""" + _log = logging.Logger(None) + var on_request_cb: ?action(Server, Request, action(int, dict[str, str], str) -> None) -> None = None + var on_error_cb: ?action(Server, str) -> None = None + var version: ?bytes = None + var buf = b"" + var close_connection: bool = True + var query_count: u64 = 0 + var response_count: u64 = 0 + var outstanding_requests: dict[u64, Response] = {} + + def cb_install(new_on_request: action(Server, Request, action(int, dict[str, str], str) -> None) -> None, new_on_error: action(Server, str) -> None): + on_request_cb = new_on_request + on_error_cb = new_on_error + if buf != b"": + req, buf = parse_request(buf) + + def on_tcp_receive(tcp_conn, data: bytes) -> None: + # TODO: do we really need a buf? + if on_request_cb is None: + buf += data + return None + else: + if buf != b"": + data = buf + data + + req, buf = parse_request(data) + if req is not None: + if version is None: + version = req.version + elif version is not None and version != req.version: + _log.debug("Version mismatch", None) + conn.close() + + if version is not None and version == b"1.0": + if "connection" in req.headers: + if req.headers["connection"] == "close": + _log.debug("HTTP 1.0 with connection: close, closing connection...", None) + close_connection = True + else: + _log.debug("HTTP 1.0 with connection header, not closing connection...", None) + _log.trace("connection header", {"connection": req.headers["connection"]}) + else: + close_connection = True + elif version is not None and version == b"1.1": + if "connection" in req.headers and req.headers["connection"] == "close": + _log.debug("HTTP 1.1, closing connection...", None) + close_connection = True + else: + close_connection = False + + query_count += 1 + def respond(status_code: int, headers: dict[str, str], body: str): + _log.trace("Going to respond with query id", {"query_count": query_count}) + if query_count == response_count + 1: + # In order, send response immediately + _log.trace("Sending response immediately", None) + send_response(status_code, body) + response_count += 1 + else: + # Buffer up response + _log.trace("Buffering response", None) + # TODO: actually implement buffering? + #outstanding_requests[query_count] = Response("GABBA", version, status_code, headers, body.encode()) + + if on_request_cb is not None: + response = on_request_cb(self, req, respond) + if response is not None: + print("Sending response immediately") + else: + print("Async response") + else: + print("No on_request callback set") + + def on_tcp_error(conn, error): + print("There was an error:", error, " from:", conn) + + def close(): + conn.close() + + def send_response(status_code: int, data: str): + if version is not None: + res = build_response(version, status_code, {}, data) + conn.write(res) + + if close_connection: + conn.close() + + +# TODO: change port to u16, when u16 has a sub-type relationship to int +actor Listener(cap: net.TCPListenCap, address: str, port: int, on_listen_error: action(net.TCPListener, str) -> None, on_accept: action(Server) -> None, log_handler: ?logging.Handler): + """HTTP Server Listener + + The Listener binds to a listening socket and instantiates a http.Server + actor instance per incoming connection. + + Plain TCP is the currently only supported transport. + """ + _log = logging.Logger(log_handler) + var bufs = [] + def on_tcp_listener_error(listener, error): + _log.notice("There was an error with the TCPListener socket", {"error": error}) + + def on_tcp_listener_accept(conn): + s = Server(conn, on_accept, log_handler) + await async on_accept(s) + await async conn.cb_install(s.on_tcp_receive, s.on_tcp_error) + + l = net.TCPListener(cap, address, port, on_tcp_listener_error, on_tcp_listener_accept) + + +# TODO: change port to u16, when u16 has a sub-type relationship to int +# TODO: default schema="https" +# TODO: default port=None +# TODO: default tls_verify=True +actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_verify: bool, on_connect: action(Client) -> None, on_error: action(Client, str) -> None, log_handler: ?logging.Handler): + """HTTP(S) Client + + scheme is either 'http' or 'https', the default is 'https' + port is optional, if not provided, it will be inferred from the scheme where http=80 and https=443 + """ + _log = logging.Logger(log_handler) + var _on_response: list[(bytes, action(Client, Response) -> None)] = [] + var version: ?bytes = None + var buf = b"" + var close_connection: bool = True + var tcp_conn: ?net.TCPConnection = None + var tls_conn: ?net.TLSConnection = None + + def _connect(): + if scheme == "http": + _log.verbose("Using http scheme and port 80", None) + tcp_port = port if port is not None else 80 + tcp_conn = net.TCPConnection(cap, address, tcp_port, _on_tcp_connect, _on_tcp_receive, _on_tcp_error) + elif scheme == "https": + _log.verbose("Using https scheme and port 443", None) + tls_port = port if port is not None else 443 + tls_conn = net.TLSConnection(cap, address, tls_port, _on_tls_connect, _on_tls_receive, _on_tls_error, tls_verify) + else: + raise ValueError("Only http and https schemes are supported. Unsupported scheme: %s" % scheme) + + def _on_conn_connect(): + # If there are outstanding requests, it probably means we were + for r in _on_response: + _conn_write(r.0) + await async on_connect(self) + + def _on_tcp_connect(conn: net.TCPConnection) -> None: + _on_conn_connect() + + def _on_tls_connect(conn: net.TLSConnection) -> None: + _on_conn_connect() + + def _on_tcp_receive(conn: net.TCPConnection, data: bytes) -> None: + _on_con_receive(data) + + def _on_tls_receive(conn: net.TLSConnection, data: bytes) -> None: + _on_con_receive(data) + + def _on_con_receive(data: bytes) -> None: + buf += data + _log.trace("Received data", {"bytes": len(data)}) + + while True: + r, buf = parse_response(buf) + if r is not None: + if "connection" in r.headers and r.headers["connection"] == "close": + close_connection = True + _conn_close() + _log.debug("Closing TCP connection due to header: Connection: close", None) + _connect() + if len(_on_response) == 0: + _log.notice("Data received with no on_response callback set", None) + break + outreq = _on_response[0] + del _on_response[0] + cb = outreq.1 + + await async cb(self, r) + else: + break + + def _on_tcp_error(conn: net.TCPConnection, error: str) -> None: + _on_con_error(error) + + def _on_tls_error(conn: net.TLSConnection, error: str) -> None: + _on_con_error(error) + + def _on_con_error(error: str) -> None: + on_error(self, error) + + def _conn_close() -> None: + if tcp_conn is not None: + def _noop(c): + pass + tcp_conn.close(_noop) + elif tls_conn is not None: + def _noop(c): + pass + tls_conn.close(_noop) + + def _conn_write(data: bytes) -> None: + _log.trace("Sending data", {"data": data}) + if tcp_conn is not None: + tcp_conn.write(data) + elif tls_conn is not None: + tls_conn.write(data) + + # HTTP methods + def get(path: str, on_response: action(Client, Response) -> None): + req = build_request(address, b"GET", path.encode(), b"1.1", {}, b"") + _log.debug("Sending request", {"method": "GET", "path": path}) + _on_response.append((req, on_response)) + _conn_write(req) + + def head(path: str, on_response: action(Client, Response) -> None): + req = build_request(address, b"HEAD", path.encode(), b"1.1", {}, b"") + _log.debug("Sending request", {"method": "HEAD", "path": path}) + _on_response.append((req, on_response)) + _conn_write(req) + + def post(path: str, body: bytes, on_response: action(Client, Response) -> None): + req = build_request(address, b"POST", path.encode(), b"1.1", {}, body) + _log.debug("Sending request", {"method": "POST", "path": path}) + _on_response.append((req, on_response)) + _conn_write(req) + + def put(path: str, body: bytes, on_response: action(Client, Response) -> None): + req = build_request(address, b"PUT", path.encode(), b"1.1", {}, body) + _log.debug("Sending request", {"method": "PUT", "path": path}) + _on_response.append((req, on_response)) + _conn_write(req) + + def delete(path: str, on_response: action(Client, Response) -> None): + req = build_request(address, b"DELETE", path.encode(), b"1.1", {}, b"") + _log.debug("Sending request", {"method": "DELETE", "path": path}) + _on_response.append((req, on_response)) + _conn_write(req) + + _connect() diff --git a/builder/build.zig b/builder/build.zig index 14edb36db..93e084245 100644 --- a/builder/build.zig +++ b/builder/build.zig @@ -43,7 +43,7 @@ fn dotsToRoot(allocator: std.mem.Allocator, cwd: []const u8) []const u8 { } pub fn build(b: *std.build.Builder) void { - const buildroot_path = b.build_root.handle.realpathAlloc(b.allocator, ".") catch @panic("ASDF"); + const buildroot_path = b.build_root.handle.realpathAlloc(b.allocator, ".") catch @panic("ASD"); const dots_to_root = dotsToRoot(b.allocator, buildroot_path); defer b.allocator.free(dots_to_root); print("Acton Project Builder\nBuilding in {s}\n", .{buildroot_path}); diff --git a/test/stdlib/test_http.act b/test/stdlib/test_http.act new file mode 100644 index 000000000..c8e52a6aa --- /dev/null +++ b/test/stdlib/test_http.act @@ -0,0 +1,114 @@ + +import http +import logging +import net + +#actor HttpGetter(connect_auth, host, batch_size): +# var count = 0 +# var outstanding = 0 +# var reconnects = 0 +# +# def request_some(conn, n): +# for i in range(0, n, 1): +# conn.get("/" + str(i), on_response) +# outstanding += 1 +# +# def on_response(conn: http.Client, resp: http.Response): +# count += 1 +# outstanding -= 1 +## if (count+outstanding) < 1234: +## request_some(conn, 1) +# +# def on_connect(conn): +# print("Connected") +# if reconnects == 0: +# request_some(conn, batch_size) +# reconnects += 1 +# +# +# def on_error(conn, err): +# print("Error: " + err) +# +# def get_stats(): +# return "Count: " + str(count) + " outstanding: " + str(outstanding) +# +# print("Connecting to http://" + host) +# client = http.Client(connect_auth, host, 7000, on_connect, on_error) +# +# +actor main(env): + + log_handler = logging.Handler("HTTPtest") + log_handler.add_sink(logging.StdoutSink()) + log = logging.Logger(log_handler) +# def test_http_request_parser(query, parsed) -> bool: +# # Go through query byte by byte, and feed it to the parser one more +# # byte at a time until we get a complete request +# for i in range(0, len(query), 1): +# partial_request = query[0:i+1] +# req, rest = http.parse_request(partial_request) +# if req is not None: +# if str(parsed) == str(req): +# return True +# else: +# print("Expected: " + str(parsed) + " got: " + str(req)) +# return False +# print("No request found") +# return False +# +# def run_test_http_request_parser(): +# a = b"\r\n\r\n" +# qs = a.split(b"\r\n\r\n", 1) +# tests = [ +# (query = b"GET / HTTP/1.1\r\nHost: 127.0.0.1:8000\r\nUser-Agent: curl/7.85.0\r\nAccept: */*\r\n\r\n", +# parsed = http.Request("GET", "/", b"1.1", {"Host": "127.0.0.1:8000", "User-Agent": "curl/7.85.0", "Accept": "*/*"}, b"") +# ) +# ] +# all_good = True +# for t in tests: +# if not test_http_request_parser(t.query, t.parsed): +# all_good = False +# return all_good +# +# +## host = env.argv[1] + tcpccap = net.TCPConnectCap(net.TCPCap(net.NetCap(env.cap))) +## +## var workers = [] +## for i in range(0, 1, 1): +## print("Starting worker " + str(i)) +## workers.append(HttpGetter(connect_auth, host, 20)) +## +## def print_stats(): +## for w in workers: +## print(w.get_stats()) +## after 1: print_stats() +## print_stats() +## +## def _exit(): +## print_stats() +## env.exit(0) +## +### after 5: _exit() +# +# if not run_test_http_request_parser(): +# await async env.exit(1) + + def _on_http_connect(c): + c.get("/products/1", _on_http_receive) + + def _on_http_receive(c, data): + print("YAY, got something") + print(data) + print(data.status) + try: + print(data.to_json()) + except: + print("Failed to parse json") + + def _on_http_error(c, errmsg): + print(errmsg) + + print("asdf") + hc = http.Client(tcpccap, "https", "dummyjson.com", None, True, _on_http_connect, _on_http_error, log_handler) + hc2 = http.Client(tcpccap, "http", "neverssl.com", None, True, _on_http_connect, _on_http_error, log_handler)