Skip to content

Commit 574816a

Browse files
authored
Merge pull request #1048 from snyk/chore/ACC-3063-add-typings-for-primus
chore: ACC-3064 add typings for Primus
2 parents 968719d + a508cb7 commit 574816a

27 files changed

+432
-174
lines changed

lib/broker-workload/requestFiltering.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { getFilterConfig } from '../hybrid-sdk/client/config/filters';
2+
import { WebSocketConnection } from '../hybrid-sdk/client/types/client';
23
import { LOADEDFILTERSET } from '../hybrid-sdk/common/types/filter';
34

45
export const filterRequest = (payload, options, websocketConnectionHandler) => {
@@ -28,7 +29,7 @@ export const filterRequest = (payload, options, websocketConnectionHandler) => {
2829
export const filterClientRequest = (
2930
payload,
3031
options,
31-
websocketConnectionHandler,
32+
websocketConnectionHandler: WebSocketConnection,
3233
) => {
3334
let filterResponse;
3435
if (

lib/broker-workload/websocketRequests.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,17 @@ import {
2424
incrementHttpRequestsTotal,
2525
} from '../hybrid-sdk/common/utils/metrics';
2626
import { maskToken, hashToken } from '../hybrid-sdk/common/utils/token';
27+
import { WebSocketServer } from '../hybrid-sdk/server/types/socket';
28+
import { WebSocketConnection } from '../hybrid-sdk/client/types/client';
2729

2830
export class BrokerWorkload extends Workload<WorkloadType.remoteServer> {
2931
options;
3032
connectionIdentifier: string;
31-
websocketConnectionHandler;
33+
websocketConnectionHandler: WebSocketServer | WebSocketConnection;
3234
constructor(
3335
connectionIdentifier: string,
3436
options,
35-
websocketConnectionHandler,
37+
websocketConnectionHandler: WebSocketServer | WebSocketConnection,
3638
) {
3739
super('broker', WorkloadType['remote-server']);
3840
this.options = options;

lib/hybrid-sdk/client/socket.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,10 @@ export const createWebSocket = (
158158
},
159159
};
160160
}
161-
const websocket: WebSocketConnection = new Socket(
161+
const websocket = new Socket(
162162
localClientOps.config.brokerServerUrlForSocket,
163163
socketSettings,
164-
);
164+
) as unknown as WebSocketConnection;
165165
websocket.socketVersion = 1;
166166
websocket.socketType = 'client';
167167
if (localClientOps.config.universalBrokerEnabled) {
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1-
export const identifyHandler = (serverData, websocket) => {
1+
import { WebSocketConnection } from '../types/client';
2+
3+
export const identifyHandler = (serverData, websocket: WebSocketConnection) => {
24
websocket.capabilities = serverData.capabilities;
35
};

lib/hybrid-sdk/client/socketHandlers/requestHandler.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
import { forwardWebSocketRequest } from '../../common/connectionToWorkloadInterface/forwardWebsocketRequest';
22
import { RequestPayload } from '../../common/types/http';
33
import { LoadedClientOpts } from '../../common/types/options';
4+
import { WebSocketConnection } from '../types/client';
5+
import { WebSocketServer } from '../../server/types/socket';
46

57
let initializedReqHandler: (
68
webSocketIdentifier: string,
79
) => (payload: RequestPayload, emit: any) => void;
810

9-
export const initRequestHandler = (websocket, clientOps: LoadedClientOpts) => {
11+
export const initRequestHandler = (
12+
websocket: WebSocketServer | WebSocketConnection,
13+
clientOps: LoadedClientOpts,
14+
) => {
1015
initializedReqHandler = forwardWebSocketRequest(clientOps, websocket);
1116
};
1217

lib/hybrid-sdk/client/types/client.ts

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { FiltersType } from '../../common/types/filter';
22
import { CheckResult } from '../checks/types';
3+
import { Primus } from 'primus';
34

45
export interface HookResults {
56
preflightCheckResults?: CheckResult[];
@@ -49,42 +50,30 @@ export interface ConnectionMetadata {
4950
serverId?: string;
5051
}
5152

52-
export interface WebSocketConnection {
53-
options: {
54-
reconnect: any;
55-
ping: number;
56-
pong: number;
57-
timeout: number;
58-
transport: any;
59-
queueSize: any;
60-
stategy: any;
61-
};
62-
transport: any;
63-
socketVersion?: any;
64-
socketType?: string;
65-
identifier?: string;
53+
export interface WebSocketConnection
54+
extends Pick<Primus, 'destroy' | 'emit' | 'end' | 'on'> {
55+
capabilities?: string[];
6656
clientConfig?: any;
67-
role: Role;
6857
friendlyName?: string;
69-
supportedIntegrationType: string;
58+
identifier?: string;
59+
role: Role;
7060
serverId: string;
71-
url: any;
72-
latency: any;
61+
socketType: 'client';
62+
socketVersion?: number;
63+
supportedIntegrationType: string;
64+
timeoutHandlerId?: NodeJS.Timeout;
65+
66+
// Added by primus, but specific to the client
7367
socket: any;
74-
destroy: any;
75-
send: any;
76-
end: any;
77-
open: any;
78-
emit: any;
79-
capabilities?: any;
80-
on: (string, any) => any;
81-
readyState: any;
82-
timeoutHandlerId?: any;
83-
}
84-
// export interface WebSocketConnection {
85-
// websocket: Connection;
86-
// }
68+
readyState: number;
69+
transport: {
70+
extraHeaders?: Record<string, string>;
71+
};
72+
url: URL;
8773

74+
// Added by primus-emitter plugin
75+
send: (event: string, ...args: any[]) => void;
76+
}
8877
export interface ValidationResult {
8978
connectionName: string;
9079
validated: boolean;
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import primus from 'primus';
2+
import { WebSocketConnection } from '../types/client';
23

3-
export const isWebsocketConnOpen = (io) => {
4+
export const isWebsocketConnOpen = (io: WebSocketConnection) => {
45
return io.readyState === primus.Spark.OPEN;
56
};
Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,43 @@
11
import { RequestPayload } from '../types/http';
2-
import { WebSocketConnection } from '../../client/types/client';
32
import { LoadedClientOpts, LoadedServerOpts } from '../types/options';
43
import { BrokerWorkload } from '../../../broker-workload/websocketRequests';
54
import {
65
Workload,
76
WorkloadType,
87
RemoteServerWorkloadRuntimeParams,
98
} from '../../workloadFactory';
9+
import { WebSocketServer } from '../../server/types/socket';
10+
import { WebSocketConnection } from '../../client/types/client';
11+
import { HybridResponse } from '../../responseSenders';
1012

1113
export const forwardWebSocketRequest = (
1214
options: LoadedClientOpts | LoadedServerOpts,
13-
websocketConnectionHandler: WebSocketConnection,
15+
websocketConnectionHandler: WebSocketServer | WebSocketConnection,
1416
) => {
1517
// 1. Request coming in over websocket conn (logged)
1618
// 2. Filter for rule match (log and block if no match)
1719
// 3. Relay over HTTP conn (logged)
1820
// 4. Get response over HTTP conn (logged)
1921
// 5. Send response over websocket conn
2022

21-
return (connectionIdentifier) => async (payload: RequestPayload, emit) => {
22-
const workloadName = options.config.remoteWorkloadName;
23-
const workloadModulePath = options.config.remoteWorkloadModulePath;
24-
const workload = (await Workload.instantiate(
25-
workloadName,
26-
workloadModulePath,
27-
WorkloadType.remoteServer,
28-
{ connectionIdentifier, options, websocketConnectionHandler },
29-
)) as BrokerWorkload;
23+
return (connectionIdentifier) =>
24+
async (
25+
payload: RequestPayload,
26+
emit: (response: HybridResponse) => void,
27+
) => {
28+
const workloadName = options.config.remoteWorkloadName;
29+
const workloadModulePath = options.config.remoteWorkloadModulePath;
30+
const workload = (await Workload.instantiate(
31+
workloadName,
32+
workloadModulePath,
33+
WorkloadType.remoteServer,
34+
{ connectionIdentifier, options, websocketConnectionHandler },
35+
)) as BrokerWorkload;
3036

31-
const data: RemoteServerWorkloadRuntimeParams = {
32-
payload,
33-
websocketHandler: emit,
37+
const data: RemoteServerWorkloadRuntimeParams = {
38+
payload,
39+
websocketHandler: emit,
40+
};
41+
await workload.handler(data);
3442
};
35-
await workload.handler(data);
36-
};
3743
};

lib/hybrid-sdk/responseSenders.ts

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import { logError, logResponse } from '../logs/log';
66
import { isJson } from './common/utils/json';
77
import { replaceUrlPartialChunk } from './common/utils/replace-vars';
88
import { RequestMetadata } from './types';
9+
import { WebSocketConnection } from './client/types/client';
10+
import { WebSocketServer } from './server/types/socket';
911

1012
export interface HybridResponse {
1113
status: number;
@@ -16,16 +18,16 @@ export interface HybridResponse {
1618
}
1719
export class HybridResponseHandler {
1820
connectionIdentifier;
19-
websocketConnectionHandler;
21+
websocketConnectionHandler: WebSocketServer | WebSocketConnection;
2022
logContext;
2123
config;
2224
requestMetadata: RequestMetadata;
23-
websocketResponseHandler;
25+
websocketResponseHandler: (response: HybridResponse) => void;
2426
responseHandler;
2527
constructor(
2628
requestMetadata: RequestMetadata,
27-
websocketConnectionHandler,
28-
websocketResponseHandler,
29+
websocketConnectionHandler: WebSocketServer | WebSocketConnection,
30+
websocketResponseHandler: (response: HybridResponse) => void,
2931
config,
3032
logContext,
3133
) {
@@ -38,9 +40,9 @@ export class HybridResponseHandler {
3840
// WebsocketResponseHandler provided means WS response expected
3941
// header x-broker-ws-response:true used on server side
4042
if (
41-
this.websocketConnectionHandler?.capabilities?.includes(
42-
'receive-post-streams',
43-
) &&
43+
(
44+
this.websocketConnectionHandler as { capabilities?: string[] }
45+
)?.capabilities?.includes('receive-post-streams') &&
4446
!this.websocketResponseHandler
4547
) {
4648
// Response Traffic over HTTP Post
@@ -107,9 +109,10 @@ export class HybridResponseHandler {
107109
this.logContext,
108110
this.config,
109111
this.connectionIdentifier,
110-
this.websocketConnectionHandler?.serverId ?? '',
112+
(this.websocketConnectionHandler as { serverId?: string })?.serverId ??
113+
'',
111114
this.requestMetadata.requestId,
112-
this.websocketConnectionHandler.role,
115+
(this.websocketConnectionHandler as { role?: string })?.role ?? '',
113116
);
114117
if (streamingRequestData) {
115118
// POST Streaming

lib/hybrid-sdk/server/socket.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import Primus from 'primus';
22
import Emitter from 'primus-emitter';
33
import { LoadedServerOpts } from '../common/types/options';
4-
import { SocketHandler } from './types/socket';
4+
import { WebSocketServer, SocketHandler } from './types/socket';
55
import { handleIoError } from './socketHandlers/errorHandler';
66
import { handleSocketConnection } from './socketHandlers/connectionHandler';
77
import { initConnectionHandler } from './socketHandlers/initHandlers';
@@ -47,17 +47,20 @@ const socket = ({ server, loadedServerOpts }): SocketHandler => {
4747
compression: Boolean(loadedServerOpts.config.socketUseCompression) || false,
4848
};
4949

50-
const websocket = new Primus(server, ioConfig);
50+
const websocket = new Primus(server, ioConfig) as WebSocketServer;
5151
if (loadedServerOpts.config.BROKER_SERVER_MANDATORY_AUTH_ENABLED) {
5252
websocket.authorize(async (req, done) => {
5353
const connectionIdentifier = req.uri.pathname
5454
.replaceAll(/^\/primus\/([^/]+)\//g, '$1')
5555
.toLowerCase();
5656
const maskedToken = maskToken(connectionIdentifier);
57-
const authHeader =
58-
req.headers['Authorization'] ?? req.headers['authorization'];
59-
const brokerClientId = req.headers['x-snyk-broker-client-id'] ?? null;
60-
const role = req.headers['x-snyk-broker-client-role'] ?? null;
57+
const authHeader = (req.headers['Authorization'] ??
58+
req.headers['authorization']) as string | undefined;
59+
const brokerClientId =
60+
(req.headers['x-snyk-broker-client-id'] as string | undefined) ?? null;
61+
const role =
62+
(req.headers['x-snyk-broker-client-role'] as string | undefined) ??
63+
null;
6164
if (
6265
(!authHeader ||
6366
!authHeader.toLowerCase().startsWith('bearer') ||
@@ -93,8 +96,8 @@ const socket = ({ server, loadedServerOpts }): SocketHandler => {
9396
);
9497
// deepcode ignore Ssrf: request URL comes from the filter response, with the origin url being injected by the filtered version
9598
const credsCheckResponse = await validateBrokerClientCredentials(
96-
authHeader,
97-
brokerClientId,
99+
authHeader!,
100+
brokerClientId!,
98101
connectionIdentifier,
99102
);
100103
if (!credsCheckResponse) {
@@ -121,9 +124,9 @@ const socket = ({ server, loadedServerOpts }): SocketHandler => {
121124
const currentClient: ClientSocket = {
122125
socketType: 'server',
123126
socketVersion: 1,
124-
brokerClientId: brokerClientId,
127+
brokerClientId: brokerClientId!,
125128
brokerAppClientId: brokerAppClientId,
126-
role: role ?? Role.primary,
129+
role: (role ?? Role.primary) as Role,
127130
credsValidationTime: nowDate,
128131
};
129132
const connections = getSocketConnections();

0 commit comments

Comments
 (0)