Skip to content

Commit

Permalink
Add support for iterator (#253)
Browse files Browse the repository at this point in the history
* Add support for iterator & streaming

Fix issue where Streaming responses wouldn't return a result to the
client

* simplify

* Cleanup

* Fix build

* Remove commented out code

* Fix issue with compiled library code

The check for (async) generators broke in the compiled library code

This is because the compiled code didn't actually create (native)
generator functions and so all functions were assumed to be generators.

The fix was to check the return value instead to check for (async)
iterator like props

* Remove unused imports

* Bump client library package.json

* Bump nodemon for client library

* Add onError checkResult & onSuccess support

to async generators

* Add support for onError/checkResult/onSuccess

for (synchronous) iterators

* Pass in the options object...

* Fix build

* Clean up

* Tweak useAsWaterfall fix

* Remove unused import from sample app

---------

Co-authored-by: Brett Beutell <[email protected]>
  • Loading branch information
flenter and brettimus authored Sep 18, 2024
1 parent cf308ff commit 21df2af
Show file tree
Hide file tree
Showing 12 changed files with 567 additions and 127 deletions.
4 changes: 2 additions & 2 deletions packages/client-library-otel/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"author": "Fiberplane<[email protected]>",
"type": "module",
"main": "dist/index.js",
"version": "0.2.0-beta.2",
"version": "0.3.0-beta.1",
"dependencies": {
"@opentelemetry/api": "~1.9.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.52.1",
Expand All @@ -22,7 +22,7 @@
"@swc/core": "^1.5.22",
"@swc/plugin-transform-imports": "^2.0.4",
"hono": "^4.3.9",
"nodemon": "^3.1.4",
"nodemon": "^3.1.5",
"rimraf": "^6.0.1",
"tsc-alias": "^1.8.10",
"typescript": "^5.4.5"
Expand Down
54 changes: 54 additions & 0 deletions packages/client-library-otel/sample/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Hono } from "hono";
import { stream } from "hono/streaming";
import { instrument, measure } from "../src";

const app = new Hono();
Expand Down Expand Up @@ -48,4 +49,57 @@ app.get("/error", async () => {
await delayedError();
});

async function* rawRelaxedWelcome() {
await sleep(500);
yield "hello! ";
await sleep(500);
yield "Hono ";
await sleep(500);
yield "is ";
await sleep(500);
yield "awesome";
}

// This is an async generator function (and so returns an async iterator)
const generateRelaxedWelcome = measure("relaxedWelcome", rawRelaxedWelcome);

app.get("/stream", async (c) => {
c.header("Content-Type", "text/plain");
return stream(c, async (stream) => {
const result = generateRelaxedWelcome();

for await (const content of result) {
await stream.write(content);
}
});
});

const fibonacci = measure(
"fibonacci",
function* (arg: number): Generator<number> {
let a = 1;
let b = 1;
for (let i = 0; i < arg; i++) {
yield a;
[a, b] = [b, a + b];
}
},
);
// Example usage:
app.get("/fibonacci/:count", (c) => {
const count = Number.parseInt(c.req.param("count"), 10);

const result = fibonacci(count);
const values = Array.from(result);

return c.text(`Fibonacci sequence (${count} numbers): ${values.join(", ")}`);
});

app.get("/quick", async (c) => {
c.header("Content-Type", "text/plain");
return stream(c, async (stream) => {
stream.write("ok");
});
});

export default instrument(app);
31 changes: 20 additions & 11 deletions packages/client-library-otel/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
patchFetch,
patchWaitUntil,
} from "./patch";
import { PromiseStore } from "./promiseStore";
import { propagateFpxTraceId } from "./propagation";
import { isRouteInspectorRequest, respondWithRoutes } from "./routes";
import type { HonoLikeApp, HonoLikeEnv, HonoLikeFetch } from "./types";
Expand Down Expand Up @@ -148,10 +149,10 @@ export function instrument(app: HonoLikeApp, config?: FpxConfigOptions) {
endpoint,
});

const promiseStore = new PromiseStore();
// Enable tracing for waitUntil
const patched = executionContext && patchWaitUntil(executionContext);
const promises = patched?.promises ?? [];
const proxyExecutionCtx = patched?.proxyContext ?? executionContext;
const proxyExecutionCtx =
executionContext && patchWaitUntil(executionContext, promiseStore);

const activeContext = propagateFpxTraceId(request);

Expand Down Expand Up @@ -206,11 +207,19 @@ export function instrument(app: HonoLikeApp, config?: FpxConfigOptions) {
};
span.setAttributes(requestAttributes);
},
endSpanManually: true,
onSuccess: async (span, response) => {
const attributes = await getResponseAttributes(
(await response).clone(),
);
span.setAttributes(attributes);
span.addEvent("first-response");

const attributesResponse = response.clone();

const updateSpan = async (response: Response) => {
const attributes = await getResponseAttributes(response);
span.setAttributes(attributes);
span.end();
};

promiseStore.add(updateSpan(attributesResponse));
},
checkResult: async (result) => {
const r = await result;
Expand All @@ -224,14 +233,14 @@ export function instrument(app: HonoLikeApp, config?: FpxConfigOptions) {
);

try {
return await context.with(activeContext, async () => {
return await measuredFetch(newRequest, rawEnv, proxyExecutionCtx);
});
return await context.with(activeContext, () =>
measuredFetch(newRequest, rawEnv, proxyExecutionCtx),
);
} finally {
// Make sure all promises are resolved before sending data to the server
if (proxyExecutionCtx) {
proxyExecutionCtx.waitUntil(
Promise.allSettled(promises).finally(() => {
promiseStore.allSettled().finally(() => {
return provider.forceFlush();
}),
);
Expand Down
Loading

0 comments on commit 21df2af

Please sign in to comment.