Skip to content

Commit fdfe958

Browse files
authored
feat(core): support config.httpclient.interceptors for fetch/safeFetch tracer injection (#5771)
- Add `config.httpclient.interceptors` support in fetch_factory.js by composing interceptors via `dispatcher.compose(...)` after `setClientOptions()` (since `setClientOptions()` resets dispatcher). - Apply the same behavior to `safeFetch` (SSRF factory). - Add test cases. **Why** - Fix tracer plugins being overridden by `fetch()` lazy initialization since 3.32.0
1 parent 6307e97 commit fdfe958

File tree

7 files changed

+245
-6
lines changed

7 files changed

+245
-6
lines changed

‎index.d.ts‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,10 @@ declare module 'egg' {
295295
maxFreeSockets?: number;
296296
}
297297

298+
type Dispatcher = FetchFactory['getDispatcher'] extends () => infer R
299+
? R
300+
: never;
301+
298302
/** HttpClient config */
299303
export interface HttpClientConfig extends HttpClientBaseConfig {
300304
/** http.Agent */
@@ -319,8 +323,8 @@ declare module 'egg' {
319323
allowH2?: boolean;
320324
/** Custom lookup function for DNS resolution */
321325
lookup?: LookupFunction;
326+
interceptors?: Parameters<Dispatcher['compose']>;
322327
}
323-
324328
export interface EggAppConfig {
325329
workerStartTimeout: number;
326330
baseDir: string;

‎lib/core/fetch_factory.js‎

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ const debug = require('util').debuglog('egg:lib:core:fetch_factory');
33
const mainNodejsVersion = parseInt(process.versions.node.split('.')[0]);
44
let FetchFactory;
55
let fetch;
6-
let fetchInitialized = false;
6+
// Track initialization per app instance by storing a WeakMap
7+
const fetchInitializedMap = new WeakMap();
78
let safeFetch;
89
let ssrfFetchFactory;
910

@@ -14,15 +15,24 @@ if (mainNodejsVersion >= 20) {
1415
FetchFactory = urllib4.FetchFactory;
1516
debug('urllib4 enable');
1617

17-
18-
fetch = function fetch(url, init) {
19-
if (!fetchInitialized) {
18+
fetch = function(url, init) {
19+
if (!fetchInitializedMap.get(this)) {
2020
const clientOptions = {};
2121
if (this.config.httpclient?.lookup) {
2222
clientOptions.lookup = this.config.httpclient.lookup;
2323
}
2424
FetchFactory.setClientOptions(clientOptions);
25-
fetchInitialized = true;
25+
26+
// Support custom interceptors via dispatcher.compose
27+
// Must be set after setClientOptions because setClientOptions resets dispatcher
28+
// interceptors is an array of interceptor functions that follow undici's dispatcher API(undici have not supported clientOptions.interceptors natively yet)
29+
if (this.config.httpclient?.interceptors) {
30+
const interceptors = this.config.httpclient.interceptors;
31+
const originalDispatcher = FetchFactory.getDispatcher();
32+
FetchFactory.setDispatcher(originalDispatcher.compose(interceptors));
33+
}
34+
35+
fetchInitializedMap.set(this, true);
2636
}
2737
return FetchFactory.fetch(url, init);
2838
};
@@ -41,6 +51,12 @@ if (mainNodejsVersion >= 20) {
4151
}
4252
ssrfFetchFactory = new FetchFactory();
4353
ssrfFetchFactory.setClientOptions(clientOptions);
54+
55+
if (this.config.httpclient?.interceptors) {
56+
const interceptors = this.config.httpclient.interceptors;
57+
const originalDispatcher = ssrfFetchFactory.getDispatcher();
58+
ssrfFetchFactory.setDispatcher(originalDispatcher.compose(interceptors));
59+
}
4460
}
4561
return ssrfFetchFactory.fetch(url, init);
4662
};
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
const assert = require('assert');
2+
3+
const TRACE_ID = Symbol('TRACE_ID');
4+
const RPC_ID = Symbol('RPC_ID');
5+
6+
// Simple Tracer implementation
7+
class Tracer {
8+
constructor(traceId, rpcId = '0') {
9+
this.traceId = traceId;
10+
this._rpcId = rpcId;
11+
this._rpcIdSeq = 0;
12+
}
13+
14+
get rpcId() {
15+
return this._rpcId;
16+
}
17+
18+
get rpcIdPlus() {
19+
return `${this._rpcId}.${++this._rpcIdSeq}`;
20+
}
21+
}
22+
23+
module.exports = class TracerApp {
24+
constructor(app) {
25+
this.app = app;
26+
assert(app.config);
27+
// Expose Tracer class for testing
28+
app.Tracer = Tracer;
29+
}
30+
31+
configWillLoad() {
32+
// Setup tracer interceptor using interceptors config
33+
this.app.config.httpclient = this.app.config.httpclient || {};
34+
if (!this.app.FetchFactory) {
35+
return;
36+
}
37+
const tracerConfig = this.app.config.tracer;
38+
const HTTP_HEADER_TRACE_ID_KEY = tracerConfig.HTTP_HEADER_TRACE_ID_KEY.toLowerCase();
39+
const HTTP_HEADER_RPC_ID_KEY = tracerConfig.HTTP_HEADER_RPC_ID_KEY.toLowerCase();
40+
41+
this.app.config.httpclient.interceptors = [
42+
dispatch => {
43+
const app = this.app;
44+
return async function tracerInterceptor(opts, handler) {
45+
const tracer = app.currentContext?.tracer;
46+
let traceId;
47+
let rpcId;
48+
49+
try {
50+
if (tracer) {
51+
traceId = opts.headers[HTTP_HEADER_TRACE_ID_KEY] = tracer.traceId;
52+
rpcId = opts.headers[HTTP_HEADER_RPC_ID_KEY] = tracer.rpcIdPlus;
53+
}
54+
} catch (e) {
55+
e.message = '[egg-tracelog] set tracer header failed: ' + e.message;
56+
app.logger.warn(e);
57+
}
58+
59+
try {
60+
return await dispatch(opts, handler);
61+
} finally {
62+
const opaque = handler.opaque;
63+
if (opaque) {
64+
opaque[TRACE_ID] = traceId;
65+
opaque[RPC_ID] = rpcId;
66+
}
67+
}
68+
};
69+
},
70+
];
71+
}
72+
};
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
module.exports = app => {
2+
app.get('/test', async ctx => {
3+
// Mock a tracer on the context using the Tracer class
4+
ctx.tracer = new app.Tracer('test-trace-id-123', '0');
5+
6+
// Store the current context so fetch can access it
7+
app.currentContext = ctx;
8+
9+
// Make a fetch request
10+
const response = await app.fetch(ctx.query.url);
11+
12+
const traceId = response.headers.get('x-trace-id');
13+
if (traceId) ctx.set('x-trace-id', traceId);
14+
const rpcId = response.headers.get('x-rpc-id');
15+
if (rpcId) ctx.set('x-rpc-id', rpcId);
16+
17+
ctx.body = {
18+
status: response.status,
19+
ok: response.ok,
20+
};
21+
});
22+
};
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
exports.keys = 'test key';
2+
3+
exports.tracer = {
4+
HTTP_HEADER_TRACE_ID_KEY: 'x-trace-id',
5+
HTTP_HEADER_RPC_ID_KEY: 'x-rpc-id',
6+
};
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"name": "fetch-tracer"
3+
}

‎test/lib/core/fetch_tracer.test.js‎

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
const assert = require('node:assert');
2+
const http = require('http');
3+
const utils = require('../../utils');
4+
5+
describe('test/lib/core/fetch_tracer.test.js', () => {
6+
const version = utils.getNodeVersion();
7+
if (version < 20) return;
8+
9+
let app;
10+
let mockServer;
11+
12+
before(async () => {
13+
// Create a mock server to capture headers
14+
mockServer = http.createServer((req, res) => {
15+
const headers = {
16+
'Content-Type': 'application/json',
17+
};
18+
if (req.headers['x-trace-id']) {
19+
headers['x-trace-id'] = req.headers['x-trace-id'];
20+
}
21+
if (req.headers['x-rpc-id']) {
22+
headers['x-rpc-id'] = req.headers['x-rpc-id'];
23+
}
24+
25+
res.writeHead(200, headers);
26+
res.end(JSON.stringify({ ok: true }));
27+
});
28+
29+
await new Promise(resolve => {
30+
mockServer.listen(0, '127.0.0.1', resolve);
31+
});
32+
33+
app = utils.app('apps/fetch-tracer');
34+
await app.ready();
35+
});
36+
37+
after(() => {
38+
if (mockServer?.listening) {
39+
mockServer.close();
40+
}
41+
});
42+
43+
it('should add tracer headers when fetch is called', async () => {
44+
const port = mockServer.address().port;
45+
const targetUrl = `http://127.0.0.1:${port}/mock`;
46+
47+
const response = await app.httpRequest()
48+
.get('/test')
49+
.query({ url: targetUrl })
50+
.expect(200);
51+
52+
assert.strictEqual(response.body.status, 200);
53+
assert.strictEqual(response.body.ok, true);
54+
55+
// Verify tracer headers were added with incremented rpcId
56+
assert.strictEqual(response.headers['x-trace-id'], 'test-trace-id-123');
57+
assert.strictEqual(response.headers['x-rpc-id'], '0.1'); // rpcIdPlus increments from 0
58+
});
59+
60+
it('should work when tracer is not set', async () => {
61+
// Clear currentContext
62+
app.currentContext = null;
63+
64+
const port = mockServer.address().port;
65+
const targetUrl = `http://127.0.0.1:${port}/mock`;
66+
67+
const response = await app.fetch(targetUrl);
68+
69+
assert.strictEqual(response.status, 200);
70+
71+
// Verify no tracer headers when tracer is not set
72+
assert.strictEqual(response.headers.get('x-trace-id'), null);
73+
assert.strictEqual(response.headers.get('x-rpc-id'), null);
74+
});
75+
76+
77+
it('should handle fetch before configDidLoad completes', async () => {
78+
// Test that lazy initialization preserves interceptors set in configDidLoad
79+
const port = mockServer.address().port;
80+
const targetUrl = `http://127.0.0.1:${port}/mock`;
81+
82+
const ctx = app.mockContext();
83+
ctx.tracer = new app.Tracer('early-trace-id', '0.1');
84+
app.currentContext = ctx;
85+
86+
const response = await app.fetch(targetUrl);
87+
assert.strictEqual(response.status, 200);
88+
assert.strictEqual(response.headers.get('x-trace-id'), 'early-trace-id');
89+
assert.strictEqual(response.headers.get('x-rpc-id'), '0.1.1'); // rpcIdPlus increments from 0.1
90+
});
91+
92+
it('should increment rpcId on multiple fetch calls', async () => {
93+
// Test that rpcId increments properly on each fetch
94+
const port = mockServer.address().port;
95+
const targetUrl = `http://127.0.0.1:${port}/mock`;
96+
97+
const ctx = app.mockContext();
98+
ctx.tracer = new app.Tracer('multi-trace-id', '0');
99+
app.currentContext = ctx;
100+
101+
// First fetch
102+
let response = await app.fetch(targetUrl);
103+
assert.strictEqual(response.headers.get('x-trace-id'), 'multi-trace-id');
104+
assert.strictEqual(response.headers.get('x-rpc-id'), '0.1');
105+
106+
// Second fetch
107+
response = await app.fetch(targetUrl);
108+
assert.strictEqual(response.headers.get('x-trace-id'), 'multi-trace-id');
109+
assert.strictEqual(response.headers.get('x-rpc-id'), '0.2');
110+
111+
// Third fetch
112+
response = await app.fetch(targetUrl);
113+
assert.strictEqual(response.headers.get('x-trace-id'), 'multi-trace-id');
114+
assert.strictEqual(response.headers.get('x-rpc-id'), '0.3');
115+
});
116+
});

0 commit comments

Comments
 (0)