diff --git a/src/FetchOpaqueInterceptor.ts b/src/FetchOpaqueInterceptor.ts new file mode 100644 index 0000000..2a8564c --- /dev/null +++ b/src/FetchOpaqueInterceptor.ts @@ -0,0 +1,41 @@ +// const { AsyncLocalStorage } = require('node:async_hooks'); +import { AsyncLocalStorage } from 'node:async_hooks'; +import symbols from './symbols.js'; +import { Dispatcher } from 'undici'; + +// const RedirectHandler = require('../handler/redirect-handler') + +export interface FetchOpaque { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + [symbols.kRequestId]: number; + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + [symbols.kRequestStartTime]: number; + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + [symbols.kEnableRequestTiming]: number; +} + +// const internalOpaque = { +// [symbols.kRequestId]: requestId, +// [symbols.kRequestStartTime]: requestStartTime, +// [symbols.kEnableRequestTiming]: !!(init.timing ?? true), +// [symbols.kRequestTiming]: timing, +// // [symbols.kRequestOriginalOpaque]: originalOpaque, +// }; + +export interface OpaqueInterceptorOptions { + opaqueLocalStorage: AsyncLocalStorage; +} + +export function fetchOpaqueInterceptor(opts: OpaqueInterceptorOptions) { + const opaqueLocalStorage = opts?.opaqueLocalStorage; + return (dispatch: Dispatcher['dispatch']): Dispatcher['dispatch'] => { + return function redirectInterceptor(opts: Dispatcher.DispatchOptions, handler: Dispatcher.DispatchHandlers) { + const opaque = opaqueLocalStorage?.getStore(); + (handler as any).opaque = opaque; + return dispatch(opts, handler); + }; + }; +} diff --git a/src/HttpAgent.ts b/src/HttpAgent.ts index e26eecd..5b38121 100644 --- a/src/HttpAgent.ts +++ b/src/HttpAgent.ts @@ -8,12 +8,12 @@ import { export type CheckAddressFunction = (ip: string, family: number | string, hostname: string) => boolean; -export type HttpAgentOptions = { +export interface HttpAgentOptions extends Agent.Options { lookup?: LookupFunction; checkAddress?: CheckAddressFunction; connect?: buildConnector.BuildOptions, allowH2?: boolean; -}; +} class IllegalAddressError extends Error { hostname: string; @@ -36,9 +36,10 @@ export class HttpAgent extends Agent { constructor(options: HttpAgentOptions) { /* eslint node/prefer-promises/dns: off*/ - const _lookup = options.lookup ?? dns.lookup; - const lookup: LookupFunction = (hostname, dnsOptions, callback) => { - _lookup(hostname, dnsOptions, (err, ...args: any[]) => { + const { lookup = dns.lookup, ...baseOpts } = options; + + const lookupFunction: LookupFunction = (hostname, dnsOptions, callback) => { + lookup(hostname, dnsOptions, (err, ...args: any[]) => { // address will be array on Node.js >= 20 const address = args[0]; const family = args[1]; @@ -63,7 +64,8 @@ export class HttpAgent extends Agent { }); }; super({ - connect: { ...options.connect, lookup, allowH2: options.allowH2 }, + ...baseOpts, + connect: { ...options.connect, lookup: lookupFunction, allowH2: options.allowH2 }, }); this.#checkAddress = options.checkAddress; } diff --git a/src/HttpClient.ts b/src/HttpClient.ts index a0b44bf..f05d5b5 100644 --- a/src/HttpClient.ts +++ b/src/HttpClient.ts @@ -37,7 +37,7 @@ import { HttpAgent, CheckAddressFunction } from './HttpAgent.js'; import type { IncomingHttpHeaders } from './IncomingHttpHeaders.js'; import { RequestURL, RequestOptions, HttpMethod, RequestMeta } from './Request.js'; import { RawResponseWithMeta, HttpClientResponse, SocketInfo } from './Response.js'; -import { parseJSON, digestAuthHeader, globalId, performanceTime, isReadable } from './utils.js'; +import { parseJSON, digestAuthHeader, globalId, performanceTime, isReadable, updateSocketInfo } from './utils.js'; import symbols from './symbols.js'; import { initDiagnosticsChannel } from './diagnosticsChannel.js'; import { HttpClientConnectTimeoutError, HttpClientRequestTimeoutError } from './HttpClientError.js'; @@ -47,7 +47,28 @@ type UndiciRequestOption = Exists[1]>; type PropertyShouldBe = Omit & { [P in K]: V }; type IUndiciRequestOption = PropertyShouldBe; -const PROTO_RE = /^https?:\/\//i; +export const PROTO_RE = /^https?:\/\//i; + +export interface UnidiciTimingInfo { + startTime: number; + redirectStartTime: number; + redirectEndTime: number; + postRedirectStartTime: number; + finalServiceWorkerStartTime: number; + finalNetworkResponseStartTime: number; + finalNetworkRequestStartTime: number; + endTime: number; + encodedBodySize: number; + decodedBodySize: number; + finalConnectionTimingInfo: { + domainLookupStartTime: number; + domainLookupEndTime: number; + connectionStartTime: number; + connectionEndTime: number; + secureConnectionStartTime: number; + // ALPNNegotiatedProtocol: undefined + }; +} function noop() { // noop @@ -137,9 +158,11 @@ export type RequestContext = { requestStartTime?: number; }; -const channels = { +export const channels = { request: diagnosticsChannel.channel('urllib:request'), response: diagnosticsChannel.channel('urllib:response'), + fetchRequest: diagnosticsChannel.channel('urllib:fetch:request'), + fetchResponse: diagnosticsChannel.channel('urllib:fetch:response'), }; export type RequestDiagnosticsMessage = { @@ -631,7 +654,7 @@ export class HttpClient extends EventEmitter { } res.rt = performanceTime(requestStartTime); // get real socket info from internalOpaque - this.#updateSocketInfo(socketInfo, internalOpaque); + updateSocketInfo(socketInfo, internalOpaque); const clientResponse: HttpClientResponse = { opaque: originalOpaque, @@ -707,7 +730,7 @@ export class HttpClient extends EventEmitter { res.requestUrls.push(requestUrl.href); } res.rt = performanceTime(requestStartTime); - this.#updateSocketInfo(socketInfo, internalOpaque, rawError); + updateSocketInfo(socketInfo, internalOpaque, rawError); channels.response.publish({ request: reqMeta, @@ -729,40 +752,4 @@ export class HttpClient extends EventEmitter { throw err; } } - - #updateSocketInfo(socketInfo: SocketInfo, internalOpaque: any, err?: any) { - const socket = internalOpaque[symbols.kRequestSocket] ?? err?.[symbols.kErrorSocket]; - if (socket) { - socketInfo.id = socket[symbols.kSocketId]; - socketInfo.handledRequests = socket[symbols.kHandledRequests]; - socketInfo.handledResponses = socket[symbols.kHandledResponses]; - if (socket[symbols.kSocketLocalAddress]) { - socketInfo.localAddress = socket[symbols.kSocketLocalAddress]; - socketInfo.localPort = socket[symbols.kSocketLocalPort]; - } - if (socket.remoteAddress) { - socketInfo.remoteAddress = socket.remoteAddress; - socketInfo.remotePort = socket.remotePort; - socketInfo.remoteFamily = socket.remoteFamily; - } - socketInfo.bytesRead = socket.bytesRead; - socketInfo.bytesWritten = socket.bytesWritten; - if (socket[symbols.kSocketConnectErrorTime]) { - socketInfo.connectErrorTime = socket[symbols.kSocketConnectErrorTime]; - if (Array.isArray(socket.autoSelectFamilyAttemptedAddresses)) { - socketInfo.attemptedRemoteAddresses = socket.autoSelectFamilyAttemptedAddresses; - } - socketInfo.connectProtocol = socket[symbols.kSocketConnectProtocol]; - socketInfo.connectHost = socket[symbols.kSocketConnectHost]; - socketInfo.connectPort = socket[symbols.kSocketConnectPort]; - } - if (socket[symbols.kSocketConnectedTime]) { - socketInfo.connectedTime = socket[symbols.kSocketConnectedTime]; - } - if (socket[symbols.kSocketRequestEndTime]) { - socketInfo.lastRequestEndTime = socket[symbols.kSocketRequestEndTime]; - } - socket[symbols.kSocketRequestEndTime] = new Date(); - } - } } diff --git a/src/Request.ts b/src/Request.ts index 44f61e9..3f83c44 100644 --- a/src/Request.ts +++ b/src/Request.ts @@ -3,6 +3,7 @@ import type { EventEmitter } from 'node:events'; import type { Dispatcher } from 'undici'; import type { IncomingHttpHeaders } from './IncomingHttpHeaders.js'; import type { HttpClientResponse } from './Response.js'; +import { Request } from 'undici'; export type HttpMethod = Dispatcher.HttpMethod; @@ -161,3 +162,8 @@ export type RequestMeta = { ctx?: unknown; retries: number; }; + +export type FetchMeta = { + requestId: number; + request: Request, +}; diff --git a/src/fetch.ts b/src/fetch.ts new file mode 100644 index 0000000..f923f55 --- /dev/null +++ b/src/fetch.ts @@ -0,0 +1,267 @@ +import { + fetch as UndiciFetch, + RequestInfo, + RequestInit, + Request, + Response, + Agent, + getGlobalDispatcher, + Pool, Dispatcher, + // interceptors, + // Request, + // HeadersInit, +} from 'undici'; +// eslint-disable-next-line @typescript-eslint/ban-ts-comment +// @ts-ignore +import undiciSymbols from 'undici/lib/core/symbols.js'; +// eslint-disable-next-line @typescript-eslint/ban-ts-comment +// @ts-ignore +import undiciFetchSymbols from 'undici/lib/web/fetch/symbols.js'; +import { + channels, + ClientOptions, + PoolStat, RequestDiagnosticsMessage, ResponseDiagnosticsMessage, UnidiciTimingInfo, + // RequestDiagnosticsMessage, +} from './HttpClient.js'; +import { + HttpAgent, HttpAgentOptions, + // CheckAddressFunction, +} from './HttpAgent.js'; +import { initDiagnosticsChannel } from './diagnosticsChannel.js'; +import { convertHeader, globalId, performanceTime, updateSocketInfo } from './utils.js'; +import symbols from './symbols.js'; +import { + FetchMeta, HttpMethod, RequestMeta, + // RequestMeta, +} from './Request.js'; +import { AsyncLocalStorage } from 'node:async_hooks'; +import { FetchOpaque, fetchOpaqueInterceptor } from './FetchOpaqueInterceptor.js'; +import { RawResponseWithMeta, SocketInfo } from './Response.js'; +import { IncomingHttpHeaders } from './IncomingHttpHeaders.js'; + +export interface UrllibRequestInit extends RequestInit { + // checkAddress?: CheckAddressFunction; + // default is true + timing?: boolean; +} + +export type FetchDiagnosticsMessage = { + fetch: FetchMeta; +}; + +export type FetchResponseDiagnosticsMessage = { + fetch: FetchMeta; + timingInfo?: UnidiciTimingInfo; + response?: Response; + error?: Error; +}; + +export class FetchFactory { + static #dispatcher: Agent; + static #opaqueLocalStorage = new AsyncLocalStorage(); + + static getDispatcher() { + return FetchFactory.#dispatcher ?? getGlobalDispatcher(); + } + + static setDispatcher(dispatcher: Agent) { + FetchFactory.#dispatcher = dispatcher; + } + + static setClientOptions(clientOptions: ClientOptions) { + let dispatcherOption: Agent.Options = { + interceptors: { + Agent: [ + fetchOpaqueInterceptor({ + opaqueLocalStorage: FetchFactory.#opaqueLocalStorage, + }), + ], + Client: [], + }, + }; + let dispatcherClazz: new (options: Agent.Options) => Agent = Agent; + if (clientOptions?.lookup || clientOptions?.checkAddress) { + dispatcherOption = { + ...dispatcherOption, + lookup: clientOptions.lookup, + checkAddress: clientOptions.checkAddress, + connect: clientOptions.connect, + allowH2: clientOptions.allowH2, + } as HttpAgentOptions; + dispatcherClazz = HttpAgent as unknown as new (options: Agent.Options) => Agent; + } else if (clientOptions?.connect) { + dispatcherOption = { + ...dispatcherOption, + connect: clientOptions.connect, + allowH2: clientOptions.allowH2, + } as HttpAgentOptions; + dispatcherClazz = Agent; + } else if (clientOptions?.allowH2) { + // Support HTTP2 + dispatcherOption = { + ...dispatcherOption, + allowH2: clientOptions.allowH2, + } as HttpAgentOptions; + dispatcherClazz = Agent; + } + FetchFactory.#dispatcher = new dispatcherClazz(dispatcherOption); + initDiagnosticsChannel(); + } + + static getDispatcherPoolStats() { + const agent = FetchFactory.getDispatcher(); + // origin => Pool Instance + const clients: Map> | undefined = Reflect.get(agent, undiciSymbols.kClients); + const poolStatsMap: Record = {}; + if (!clients) { + return poolStatsMap; + } + for (const [ key, ref ] of clients) { + const pool = typeof ref.deref === 'function' ? ref.deref() : ref as unknown as Pool; + const stats = pool?.stats; + if (!stats) continue; + poolStatsMap[key] = { + connected: stats.connected, + free: stats.free, + pending: stats.pending, + queued: stats.queued, + running: stats.running, + size: stats.size, + } satisfies PoolStat; + } + return poolStatsMap; + } + + static async fetch(input: RequestInfo, init?: UrllibRequestInit): Promise { + const requestStartTime = performance.now(); + init = init ?? {}; + init.dispatcher = init.dispatcher ?? FetchFactory.#dispatcher; + const request = new Request(input, init); + const requestId = globalId('HttpClientRequest'); + // https://developer.chrome.com/docs/devtools/network/reference/?utm_source=devtools#timing-explanation + const timing = { + // socket assigned + queuing: 0, + // dns lookup time + // dnslookup: 0, + // socket connected + connected: 0, + // request headers sent + requestHeadersSent: 0, + // request sent, including headers and body + requestSent: 0, + // Time to first byte (TTFB), the response headers have been received + waiting: 0, + // the response body and trailers have been received + contentDownload: 0, + }; + + // using opaque to diagnostics channel, binding request and socket + const internalOpaque = { + [symbols.kRequestId]: requestId, + [symbols.kRequestStartTime]: requestStartTime, + [symbols.kEnableRequestTiming]: !!(init.timing ?? true), + [symbols.kRequestTiming]: timing, + // [symbols.kRequestOriginalOpaque]: originalOpaque, + }; + const reqMeta: RequestMeta = { + requestId, + url: request.url, + args: { + method: request.method as HttpMethod, + type: request.method as HttpMethod, + data: request.body, + headers: convertHeader(request.headers), + }, + retries: 0, + }; + const fetchMeta: FetchMeta = { + requestId, + request, + }; + // const request = new Request(input, { + // + // }); + const socketInfo: SocketInfo = { + id: 0, + localAddress: '', + localPort: 0, + remoteAddress: '', + remotePort: 0, + remoteFamily: '', + bytesWritten: 0, + bytesRead: 0, + handledRequests: 0, + handledResponses: 0, + }; + channels.request.publish({ + request: reqMeta, + } as RequestDiagnosticsMessage); + channels.fetchRequest.publish({ + fetch: fetchMeta, + } as FetchDiagnosticsMessage); + + let res: Response; + // keep urllib createCallbackResponse style + const resHeaders: IncomingHttpHeaders = {}; + const urllibResponse = { + status: -1, + statusCode: -1, + statusText: '', + statusMessage: '', + headers: resHeaders, + size: 0, + aborted: false, + rt: 0, + keepAliveSocket: true, + requestUrls: [ + request.url, + ], + timing, + socket: socketInfo, + retries: 0, + socketErrorRetries: 0, + } as any as RawResponseWithMeta; + try { + await FetchFactory.#opaqueLocalStorage.run(internalOpaque, async () => { + res = await UndiciFetch(input, init); + }); + } catch (e: any) { + channels.response.publish({ + fetch: fetchMeta, + error: e, + } as FetchResponseDiagnosticsMessage); + channels.fetchResponse.publish({ + request: reqMeta, + response: urllibResponse, + error: e, + } as ResponseDiagnosticsMessage); + throw e; + } + + // get unidici internal response + const state = Reflect.get(res!, undiciFetchSymbols.kState) as Dispatcher.ResponseData; + updateSocketInfo(socketInfo, internalOpaque /* , rawError */); + + urllibResponse.headers = convertHeader(res!.headers); + urllibResponse.status = urllibResponse.statusCode = res!.status; + urllibResponse!.statusMessage = res!.statusText; + if (urllibResponse.headers['content-length']) { + urllibResponse.size = parseInt(urllibResponse.headers['content-length']); + } + urllibResponse.rt = performanceTime(requestStartTime); + + channels.fetchResponse.publish({ + fetch: fetchMeta, + timingInfo: (state as any).timingInfo, + response: res!, + } as FetchResponseDiagnosticsMessage); + channels.response.publish({ + request: reqMeta, + response: urllibResponse, + } as ResponseDiagnosticsMessage); + return res!; + } +} + +export const fetch = FetchFactory.fetch; diff --git a/src/index.ts b/src/index.ts index b56b152..06a495d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -39,6 +39,8 @@ export async function curl(url: RequestURL, options?: RequestOptions) { export { MockAgent, ProxyAgent, Agent, Dispatcher, setGlobalDispatcher, getGlobalDispatcher, + Request, RequestInfo, RequestInit, + Response, } from 'undici'; // HttpClient2 is keep compatible with urllib@2 HttpClient2 export { @@ -60,6 +62,7 @@ export { IncomingHttpHeaders, } from './IncomingHttpHeaders.js'; export * from './HttpClientError.js'; +export { FetchFactory, fetch } from './fetch.js'; export default { request, diff --git a/src/utils.ts b/src/utils.ts index 533871e..f1c0510 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -2,6 +2,9 @@ import { randomBytes, createHash } from 'node:crypto'; import { Readable } from 'node:stream'; import { performance } from 'node:perf_hooks'; import type { FixJSONCtlChars } from './Request.js'; +import { SocketInfo } from './Response.js'; +import symbols from './symbols.js'; +import { IncomingHttpHeaders } from './IncomingHttpHeaders.js'; const JSONCtlCharsMap: Record = { '"': '\\"', // \u0022 @@ -151,3 +154,54 @@ export function isReadable(stream: any) { && typeof stream._read === 'function' && typeof stream._readableState === 'object'; } + +export function updateSocketInfo(socketInfo: SocketInfo, internalOpaque: any, err?: any) { + const socket = internalOpaque[symbols.kRequestSocket] ?? err?.[symbols.kErrorSocket]; + if (socket) { + socketInfo.id = socket[symbols.kSocketId]; + socketInfo.handledRequests = socket[symbols.kHandledRequests]; + socketInfo.handledResponses = socket[symbols.kHandledResponses]; + if (socket[symbols.kSocketLocalAddress]) { + socketInfo.localAddress = socket[symbols.kSocketLocalAddress]; + socketInfo.localPort = socket[symbols.kSocketLocalPort]; + } + if (socket.remoteAddress) { + socketInfo.remoteAddress = socket.remoteAddress; + socketInfo.remotePort = socket.remotePort; + socketInfo.remoteFamily = socket.remoteFamily; + } + socketInfo.bytesRead = socket.bytesRead; + socketInfo.bytesWritten = socket.bytesWritten; + if (socket[symbols.kSocketConnectErrorTime]) { + socketInfo.connectErrorTime = socket[symbols.kSocketConnectErrorTime]; + if (Array.isArray(socket.autoSelectFamilyAttemptedAddresses)) { + socketInfo.attemptedRemoteAddresses = socket.autoSelectFamilyAttemptedAddresses; + } + socketInfo.connectProtocol = socket[symbols.kSocketConnectProtocol]; + socketInfo.connectHost = socket[symbols.kSocketConnectHost]; + socketInfo.connectPort = socket[symbols.kSocketConnectPort]; + } + if (socket[symbols.kSocketConnectedTime]) { + socketInfo.connectedTime = socket[symbols.kSocketConnectedTime]; + } + if (socket[symbols.kSocketRequestEndTime]) { + socketInfo.lastRequestEndTime = socket[symbols.kSocketRequestEndTime]; + } + socket[symbols.kSocketRequestEndTime] = new Date(); + } +} + +export function convertHeader(headers: Headers): IncomingHttpHeaders { + const res: IncomingHttpHeaders = {}; + for (const [ key, value ] of headers.entries()) { + if (res[key]) { + if (!Array.isArray(res[key])) { + res[key] = [ res[key] ]; + } + res[key].push(value); + } else { + res[key] = value; + } + } + return res; +} diff --git a/test/fetch.test.ts b/test/fetch.test.ts new file mode 100644 index 0000000..71427eb --- /dev/null +++ b/test/fetch.test.ts @@ -0,0 +1,57 @@ +import assert from 'node:assert/strict'; +import { describe, it, beforeAll, afterAll } from 'vitest'; +import { startServer } from './fixtures/server.js'; +import { fetch, FetchDiagnosticsMessage, FetchFactory, FetchResponseDiagnosticsMessage } from '../src/fetch.js'; +import { RequestDiagnosticsMessage, ResponseDiagnosticsMessage } from '../src/HttpClient.js'; +import diagnosticsChannel from 'node:diagnostics_channel'; + +describe('fetch.test.ts', () => { + let close: any; + let _url: string; + beforeAll(async () => { + const { closeServer, url } = await startServer(); + close = closeServer; + _url = url; + }); + + afterAll(async () => { + await close(); + }); + + + it('fetch should work', async () => { + let requestDiagnosticsMessage: RequestDiagnosticsMessage; + let responseDiagnosticsMessage: ResponseDiagnosticsMessage; + let fetchDiagnosticsMessage: FetchDiagnosticsMessage; + let fetchResponseDiagnosticsMessage: FetchResponseDiagnosticsMessage; + diagnosticsChannel.subscribe('urllib:request', msg => { + requestDiagnosticsMessage = msg as RequestDiagnosticsMessage; + }); + diagnosticsChannel.subscribe('urllib:response', msg => { + responseDiagnosticsMessage = msg as ResponseDiagnosticsMessage; + }); + diagnosticsChannel.subscribe('urllib:fetch:request', msg => { + fetchDiagnosticsMessage = msg as FetchDiagnosticsMessage; + }); + diagnosticsChannel.subscribe('urllib:fetch:response', msg => { + fetchResponseDiagnosticsMessage = msg as FetchResponseDiagnosticsMessage; + }); + FetchFactory.setClientOptions({}); + + const response = await fetch(`${_url}html`); + + assert(response); + assert(requestDiagnosticsMessage!.request); + assert(responseDiagnosticsMessage!.request); + assert(responseDiagnosticsMessage!.response); + + assert(fetchDiagnosticsMessage!.fetch); + assert(fetchResponseDiagnosticsMessage!.fetch); + assert(fetchResponseDiagnosticsMessage!.response); + assert(fetchResponseDiagnosticsMessage!.timingInfo); + + const stats = FetchFactory.getDispatcherPoolStats(); + assert(stats); + assert(Object.keys(stats).length > 0); + }); +});