Skip to content

Commit 013caa5

Browse files
refactor: add pod and pods fluent interface implementation (#1298)
Signed-off-by: Jeromy Cannon <[email protected]>
1 parent 938a956 commit 013caa5

File tree

9 files changed

+468
-435
lines changed

9 files changed

+468
-435
lines changed

src/core/kube/k8_client.ts

Lines changed: 35 additions & 329 deletions
Large diffs are not rendered by default.

src/core/kube/k8_client/k8_client_clusters.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {type Clusters} from '../clusters.js';
55
import {type KubeConfig} from '@kubernetes/client-node';
66
import {IllegalArgumentError} from '../../errors.js';
77

8-
export default class K8ClientClusters implements Clusters {
8+
export class K8ClientClusters implements Clusters {
99
public constructor(private readonly kubeConfig: KubeConfig) {
1010
if (!kubeConfig) {
1111
throw new IllegalArgumentError('kubeConfig must not be null or undefined');

src/core/kube/k8_client/k8_client_config_maps.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {ResourceType} from '../resource_type.js';
1414
import {ResourceOperation} from '../resource_operation.js';
1515
import {KubeApiResponse} from '../kube_api_response.js';
1616

17-
export default class K8ClientConfigMaps implements ConfigMaps {
17+
export class K8ClientConfigMaps implements ConfigMaps {
1818
public constructor(private readonly kubeClient: CoreV1Api) {}
1919

2020
public async create(

src/core/kube/k8_client/k8_client_contexts.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {type Contexts} from '../contexts.js';
55
import {type KubeConfig, CoreV1Api} from '@kubernetes/client-node';
66
import {NamespaceName} from '../namespace_name.js';
77

8-
export default class K8ClientContexts implements Contexts {
8+
export class K8ClientContexts implements Contexts {
99
public constructor(private readonly kubeConfig: KubeConfig) {}
1010

1111
public list(): string[] {
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* SPDX-License-Identifier: Apache-2.0
3+
*/
4+
import {MissingArgumentError, SoloError} from '../../errors.js';
5+
6+
/**
7+
* The abstract K8 Client Filter adds the `filterItem` method to the class that extends it.
8+
*/
9+
export abstract class K8ClientFilter {
10+
/**
11+
* Apply filters to metadata
12+
* @param items - list of items
13+
* @param [filters] - an object with metadata fields and value
14+
* @returns a list of items that match the filters
15+
* @throws MissingArgumentError - filters are required
16+
*/
17+
private applyMetadataFilter(items: (object | any)[], filters: Record<string, string> = {}) {
18+
if (!filters) throw new MissingArgumentError('filters are required');
19+
20+
const matched = [];
21+
const filterMap = new Map(Object.entries(filters));
22+
for (const item of items) {
23+
// match all filters
24+
let foundMatch = true;
25+
for (const entry of filterMap.entries()) {
26+
const field = entry[0];
27+
const value = entry[1];
28+
29+
if (item.metadata[field] !== value) {
30+
foundMatch = false;
31+
break;
32+
}
33+
}
34+
35+
if (foundMatch) {
36+
matched.push(item);
37+
}
38+
}
39+
40+
return matched;
41+
}
42+
43+
/**
44+
* Filter a single item using metadata filter
45+
* @param items - list of items
46+
* @param [filters] - an object with metadata fields and value
47+
* @throws SoloError - multiple items found with filters
48+
* @throws MissingArgumentError - filters are required
49+
*/
50+
protected filterItem(items: (object | any)[], filters: Record<string, string> = {}) {
51+
const filtered = this.applyMetadataFilter(items, filters);
52+
if (filtered.length > 1) throw new SoloError('multiple items found with filters', {filters});
53+
return filtered[0];
54+
}
55+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/**
2+
* SPDX-License-Identifier: Apache-2.0
3+
*/
4+
import {type Pod} from '../pod.js';
5+
import {type ExtendedNetServer} from '../../../types/index.js';
6+
import {type PodRef} from '../pod_ref.js';
7+
import {SoloError} from '../../errors.js';
8+
import {sleep} from '../../helpers.js';
9+
import {Duration} from '../../time/duration.js';
10+
import {StatusCodes} from 'http-status-codes';
11+
import {SoloLogger} from '../../logging.js';
12+
import {container} from 'tsyringe-neo';
13+
import {type KubeConfig, type CoreV1Api, PortForward} from '@kubernetes/client-node';
14+
import {type Pods} from '../pods.js';
15+
import * as constants from '../../constants.js';
16+
import net from 'net';
17+
18+
export class K8ClientPod implements Pod {
19+
private readonly logger: SoloLogger;
20+
21+
constructor(
22+
private readonly podRef: PodRef,
23+
private readonly pods: Pods,
24+
private readonly kubeClient: CoreV1Api,
25+
private readonly kubeConfig: KubeConfig,
26+
) {
27+
this.logger = container.resolve(SoloLogger);
28+
}
29+
30+
public async killPod(): Promise<void> {
31+
try {
32+
const result = await this.kubeClient.deleteNamespacedPod(
33+
this.podRef.podName.name,
34+
this.podRef.namespaceName.name,
35+
undefined,
36+
undefined,
37+
1,
38+
);
39+
40+
if (result.response.statusCode !== StatusCodes.OK) {
41+
throw new SoloError(
42+
`Failed to delete pod ${this.podRef.podName.name} in namespace ${this.podRef.namespaceName.name}: statusCode: ${result.response.statusCode}`,
43+
);
44+
}
45+
46+
let podExists = true;
47+
while (podExists) {
48+
const pod = await this.pods.readByName(this.podRef);
49+
50+
if (!pod?.metadata?.deletionTimestamp) {
51+
podExists = false;
52+
} else {
53+
await sleep(Duration.ofSeconds(1));
54+
}
55+
}
56+
} catch (e) {
57+
const errorMessage = `Failed to delete pod ${this.podRef.podName.name} in namespace ${this.podRef.namespaceName.name}: ${e.message}`;
58+
59+
if (e.body?.code === StatusCodes.NOT_FOUND || e.response?.body?.code === StatusCodes.NOT_FOUND) {
60+
this.logger.info(`Pod not found: ${errorMessage}`, e);
61+
return;
62+
}
63+
64+
this.logger.error(errorMessage, e);
65+
throw new SoloError(errorMessage, e);
66+
}
67+
}
68+
69+
public async portForward(localPort: number, podPort: number): Promise<ExtendedNetServer> {
70+
try {
71+
this.logger.debug(
72+
`Creating port-forwarder for ${this.podRef.podName.name}:${podPort} -> ${constants.LOCAL_HOST}:${localPort}`,
73+
);
74+
75+
const ns = this.podRef.namespaceName;
76+
const forwarder = new PortForward(this.kubeConfig, false);
77+
78+
const server = (await net.createServer(socket => {
79+
forwarder.portForward(ns.name, this.podRef.podName.name, [podPort], socket, null, socket, 3);
80+
})) as ExtendedNetServer;
81+
82+
// add info for logging
83+
server.info = `${this.podRef.podName.name}:${podPort} -> ${constants.LOCAL_HOST}:${localPort}`;
84+
server.localPort = localPort;
85+
this.logger.debug(`Starting port-forwarder [${server.info}]`);
86+
return server.listen(localPort, constants.LOCAL_HOST);
87+
} catch (e) {
88+
const message = `failed to start port-forwarder [${this.podRef.podName.name}:${podPort} -> ${constants.LOCAL_HOST}:${localPort}]: ${e.message}`;
89+
this.logger.error(message, e);
90+
throw new SoloError(message, e);
91+
}
92+
}
93+
94+
public async stopPortForward(server: ExtendedNetServer, maxAttempts: number, timeout: number): Promise<void> {
95+
if (!server) {
96+
return;
97+
}
98+
99+
this.logger.debug(`Stopping port-forwarder [${server.info}]`);
100+
101+
// try to close the websocket server
102+
await new Promise<void>((resolve, reject) => {
103+
server.close(e => {
104+
if (e) {
105+
if (e.message?.includes('Server is not running')) {
106+
this.logger.debug(`Server not running, port-forwarder [${server.info}]`);
107+
resolve();
108+
} else {
109+
this.logger.debug(`Failed to stop port-forwarder [${server.info}]: ${e.message}`, e);
110+
reject(e);
111+
}
112+
} else {
113+
this.logger.debug(`Stopped port-forwarder [${server.info}]`);
114+
resolve();
115+
}
116+
});
117+
});
118+
119+
// test to see if the port has been closed or if it is still open
120+
let attempts = 0;
121+
while (attempts < maxAttempts) {
122+
let hasError = 0;
123+
attempts++;
124+
125+
try {
126+
const isPortOpen = await new Promise(resolve => {
127+
const testServer = net
128+
.createServer()
129+
.once('error', err => {
130+
if (err) {
131+
resolve(false);
132+
}
133+
})
134+
.once('listening', () => {
135+
testServer
136+
.once('close', () => {
137+
hasError++;
138+
if (hasError > 1) {
139+
resolve(false);
140+
} else {
141+
resolve(true);
142+
}
143+
})
144+
.close();
145+
})
146+
.listen(server.localPort, '0.0.0.0');
147+
});
148+
if (isPortOpen) {
149+
return;
150+
}
151+
} catch {
152+
return;
153+
}
154+
await sleep(Duration.ofMillis(timeout));
155+
}
156+
if (attempts >= maxAttempts) {
157+
throw new SoloError(`failed to stop port-forwarder [${server.info}]`);
158+
}
159+
}
160+
}

0 commit comments

Comments
 (0)