Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store uploaded profiles, add /test/profiles endpoint to retrieve them #186

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 81 additions & 1 deletion ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ async def traces(self) -> TraceMap:
_traces[trace_id].append(s)
return _traces

# TODO: type signature
async def profiles(self) -> List[bytes]:
"""Return the profiles stored by the agent in the order in which they
arrived.
"""
profiles: List[bytes] = []
for req in reversed(self._requests):
profile = self._profile_from_request(req)
if profile:
profiles.append(profile)
return profiles

async def clear_trace_check_failures(self, request: Request) -> web.Response:
"""Clear traces by session token provided."""
token = request["session_token"]
Expand Down Expand Up @@ -405,6 +417,20 @@ async def _traces_from_request(self, req: Request) -> List[List[Span]]:
return self._decode_v07_traces(req)
return []

def _profile_from_request(self, req: Request) -> bytes | None:
"""Return the profile from a profile request."""
if req.match_info.handler == self.handle_v1_profiling:
profile = self._request_data(req)
resp = {}
for name, body in profile:
if name == "event" or name.endswith(".json"):
resp[name] = body
else:
# assume anything else is binary
resp[name] = base64.b64encode(body).decode()
return resp
return None

async def _traces_by_session(self, token: Optional[str]) -> List[Trace]:
"""Return the traces that belong to the given session token.

Expand Down Expand Up @@ -440,6 +466,22 @@ async def _apmtelemetry_by_session(self, token: Optional[str]) -> List[Telemetry
# TODO: Sort the events?
return events

# TODO: types
async def _profiles_by_session(self, token: Optional[str]):
"""Return the profiles that belong to the given session token.

If token is None or if the token was used to manually start a session
with /session-start then return all traces that were sent since the last
/session-start request was made.
"""
# TODO: Dedup with profiles()
profiles: List[bytes] = []
for req in self._requests_by_session(token):
profile = self._profile_from_request(req)
if profile:
profiles.append(profile)
return profiles

async def _tracerflares_by_session(self, token: Optional[str]) -> List[TracerFlareEvent]:
"""Return the tracer-flare events that belong to the given session token.

Expand Down Expand Up @@ -901,6 +943,16 @@ async def handle_session_requests(self, request: Request) -> web.Response:
)
return web.json_response(resp)

async def handle_session_profiles(self, request: Request) -> web.Response:
token = request["session_token"]
profiles = []
try:
profiles = await self._profiles_by_session(token)
except NoSuchSessionException as e:
return web.HTTPNotFound(reason=str(e))

return web.json_response(profiles)

async def handle_test_traces(self, request: Request) -> web.Response:
"""Return requested traces as JSON.

Expand Down Expand Up @@ -936,8 +988,34 @@ async def handle_test_apmtelemetry(self, request: Request) -> web.Response:
events = await self.apmtelemetry()
return web.json_response(data=events)

async def handle_test_profiles(self, request: Request) -> web.Response:
"""Return requested profiles as JSON.

The response is a JSON dictionary whose keys are names of parts of the
profile upload, and values are the contents of those parts. The "event"
info and "metrics.json" are encoded as JSON. All other
parts/attachments are treated as binary and base64-encoded.
"""
profiles = await self.profiles()
return web.json_response(data=profiles)

async def handle_v1_profiling(self, request: Request) -> web.Response:
await request.read()
reader = await request.multipart()
parts = []
while not reader.at_eof():
p = await reader.next()
if not p:
# why would p be None?
continue
if p.name == "event" or p.name.endswith(".json"):
body = await p.json()
else:
body = await p.read()
parts.append((p.name, body))
# TODO: any more validation?
# TODO: headers?

request["_testagent_data"] = parts
self._requests.append(request)
# TODO: valid response?
return web.HTTPOk()
Expand Down Expand Up @@ -1129,12 +1207,14 @@ def make_app(
web.get("/test/session/tracerflares", agent.handle_session_tracerflares),
web.get("/test/session/stats", agent.handle_session_tracestats),
web.get("/test/session/requests", agent.handle_session_requests),
web.get("/test/session/profiles", agent.handle_session_profiles),
web.post("/test/session/responses/config", agent.handle_v07_remoteconfig_create),
web.post("/test/session/responses/config/path", agent.handle_v07_remoteconfig_path_create),
web.put("/test/session/responses/config", agent.handle_v07_remoteconfig_put),
web.put("/test/session/integrations", agent.handle_put_tested_integrations),
web.get("/test/traces", agent.handle_test_traces),
web.get("/test/apmtelemetry", agent.handle_test_apmtelemetry),
web.get("/test/profiles", agent.handle_test_profiles),
# web.get("/test/benchmark", agent.handle_test_traces),
web.get("/test/trace/analyze", agent.handle_trace_analyze),
web.get("/test/trace_check/failures", agent.get_trace_check_failures),
Expand Down
Loading