diff --git a/ddapm_test_agent/agent.py b/ddapm_test_agent/agent.py index 83c983b..35a837a 100644 --- a/ddapm_test_agent/agent.py +++ b/ddapm_test_agent/agent.py @@ -258,6 +258,18 @@ async def traces(self) -> TraceMap: _traces[trace_id].append(s) return _traces + # TODO: type signature + async def profiles(self) -> List[bytes]: + """Return the profiles stored by the agent in the order in which they + arrived. + """ + profiles: List[bytes] = [] + for req in reversed(self._requests): + profile = self._profile_from_request(req) + if profile: + profiles.append(profile) + return profiles + async def clear_trace_check_failures(self, request: Request) -> web.Response: """Clear traces by session token provided.""" token = request["session_token"] @@ -405,6 +417,20 @@ async def _traces_from_request(self, req: Request) -> List[List[Span]]: return self._decode_v07_traces(req) return [] + def _profile_from_request(self, req: Request) -> bytes | None: + """Return the profile from a profile request.""" + if req.match_info.handler == self.handle_v1_profiling: + profile = self._request_data(req) + resp = {} + for name, body in profile: + if name == "event" or name.endswith(".json"): + resp[name] = body + else: + # assume anything else is binary + resp[name] = base64.b64encode(body).decode() + return resp + return None + async def _traces_by_session(self, token: Optional[str]) -> List[Trace]: """Return the traces that belong to the given session token. @@ -440,6 +466,22 @@ async def _apmtelemetry_by_session(self, token: Optional[str]) -> List[Telemetry # TODO: Sort the events? return events + # TODO: types + async def _profiles_by_session(self, token: Optional[str]): + """Return the profiles that belong to the given session token. + + If token is None or if the token was used to manually start a session + with /session-start then return all traces that were sent since the last + /session-start request was made. + """ + # TODO: Dedup with profiles() + profiles: List[bytes] = [] + for req in self._requests_by_session(token): + profile = self._profile_from_request(req) + if profile: + profiles.append(profile) + return profiles + async def _tracerflares_by_session(self, token: Optional[str]) -> List[TracerFlareEvent]: """Return the tracer-flare events that belong to the given session token. @@ -901,6 +943,16 @@ async def handle_session_requests(self, request: Request) -> web.Response: ) return web.json_response(resp) + async def handle_session_profiles(self, request: Request) -> web.Response: + token = request["session_token"] + profiles = [] + try: + profiles = await self._profiles_by_session(token) + except NoSuchSessionException as e: + return web.HTTPNotFound(reason=str(e)) + + return web.json_response(profiles) + async def handle_test_traces(self, request: Request) -> web.Response: """Return requested traces as JSON. @@ -936,8 +988,34 @@ async def handle_test_apmtelemetry(self, request: Request) -> web.Response: events = await self.apmtelemetry() return web.json_response(data=events) + async def handle_test_profiles(self, request: Request) -> web.Response: + """Return requested profiles as JSON. + + The response is a JSON dictionary whose keys are names of parts of the + profile upload, and values are the contents of those parts. The "event" + info and "metrics.json" are encoded as JSON. All other + parts/attachments are treated as binary and base64-encoded. + """ + profiles = await self.profiles() + return web.json_response(data=profiles) + async def handle_v1_profiling(self, request: Request) -> web.Response: - await request.read() + reader = await request.multipart() + parts = [] + while not reader.at_eof(): + p = await reader.next() + if not p: + # why would p be None? + continue + if p.name == "event" or p.name.endswith(".json"): + body = await p.json() + else: + body = await p.read() + parts.append((p.name, body)) + # TODO: any more validation? + # TODO: headers? + + request["_testagent_data"] = parts self._requests.append(request) # TODO: valid response? return web.HTTPOk() @@ -1129,12 +1207,14 @@ def make_app( web.get("/test/session/tracerflares", agent.handle_session_tracerflares), web.get("/test/session/stats", agent.handle_session_tracestats), web.get("/test/session/requests", agent.handle_session_requests), + web.get("/test/session/profiles", agent.handle_session_profiles), web.post("/test/session/responses/config", agent.handle_v07_remoteconfig_create), web.post("/test/session/responses/config/path", agent.handle_v07_remoteconfig_path_create), web.put("/test/session/responses/config", agent.handle_v07_remoteconfig_put), web.put("/test/session/integrations", agent.handle_put_tested_integrations), web.get("/test/traces", agent.handle_test_traces), web.get("/test/apmtelemetry", agent.handle_test_apmtelemetry), + web.get("/test/profiles", agent.handle_test_profiles), # web.get("/test/benchmark", agent.handle_test_traces), web.get("/test/trace/analyze", agent.handle_trace_analyze), web.get("/test/trace_check/failures", agent.get_trace_check_failures),