Skip to content

Commit

Permalink
client-node wip
Browse files Browse the repository at this point in the history
  • Loading branch information
juozasg authored and Kingdon Barrett committed Jul 20, 2023
1 parent 3d75a6f commit 22f60c0
Show file tree
Hide file tree
Showing 13 changed files with 347 additions and 91 deletions.
24 changes: 20 additions & 4 deletions src/cli/kubernetes/kubernetesConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ import { window } from 'vscode';

import { shellCodeError } from 'cli/shell/exec';
import { setVSCodeContext, telemetry } from 'extension';
import { Errorable, failed, succeeded } from 'types/errorable';
import { Errorable, aresult, failed, succeeded } from 'types/errorable';
import { ContextId } from 'types/extensionIds';
import { KubernetesConfig, KubernetesContextWithCluster } from 'types/kubernetes/kubernetesConfig';
import { TelemetryError } from 'types/telemetryEventNames';
import { parseJson } from 'utils/jsonUtils';
import { invokeKubectlCommand } from './kubernetesToolsKubectl';
import { clearSupportedResourceKinds } from './kubectlGet';
import { invokeKubectlCommand } from './kubernetesToolsKubectl';

export let currentContextName: string;

Expand Down Expand Up @@ -38,7 +38,7 @@ export async function getKubectlConfig(): Promise<Errorable<KubernetesConfig>> {
/**
* Gets current kubectl context name.
*/
export async function getCurrentContext(): Promise<Errorable<string>> {
export async function getCurrentContextName(): Promise<Errorable<string>> {
const currentContextShellResult = await invokeKubectlCommand('config current-context');
if (currentContextShellResult?.code !== 0) {
telemetry.sendError(TelemetryError.FAILED_TO_GET_CURRENT_KUBERNETES_CONTEXT);
Expand Down Expand Up @@ -67,7 +67,7 @@ export async function getCurrentContext(): Promise<Errorable<string>> {
* whether or not context was switched or didn't need it (current).
*/
export async function setCurrentContext(contextName: string): Promise<undefined | { isChanged: boolean; }> {
const currentContextResult = await getCurrentContext();
const currentContextResult = await getCurrentContextName();
if (succeeded(currentContextResult) && currentContextResult.result === contextName) {
return {
isChanged: false,
Expand Down Expand Up @@ -136,3 +136,19 @@ export async function getClusterName(contextName: string): Promise<string> {
}
}

export async function getCurrentContextWithCluster(): Promise<KubernetesContextWithCluster | undefined> {
const [contextName, contexts] = await Promise.all([
aresult(getCurrentContextName()),
aresult(getContexts()),
]);

if(!contextName || !contexts) {
return;
}

// const contexts = result(contextsResults);
const context = contexts?.find(c => c.name === contextName);

return context;
}

1 change: 1 addition & 0 deletions src/cli/shell/exec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ function execOpts({ cwd }: { cwd?: string; } = {}): shelljs.ExecOptions {
cwd: cwd,
env: env,
async: true,
silent: true,
};
return opts;
}
Expand Down
45 changes: 45 additions & 0 deletions src/data/createKubeProxyConfig.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import * as k8s from '@kubernetes/client-node';
import { ActionOnInvalid } from '@kubernetes/client-node/dist/config_types';
import { kubeConfigPath } from './kubeConfigWatcher';

function loadDefaultKubeConfig(): k8s.KubeConfig {
const kc = new k8s.KubeConfig();
const opts = {onInvalidEntry: ActionOnInvalid.FILTER};
const kcFilePath = kubeConfigPath();

if(kcFilePath) {
kc.loadFromFile(kcFilePath, opts);
} else {
kc.loadFromDefault();
}

return kc;
}

export function createKubeProxyConfig(port: number): k8s.KubeConfig {
const kcDefault = loadDefaultKubeConfig();

const cluster = {
name: kcDefault.getCurrentCluster()?.name,
server: `http://127.0.0.1:${port}`,
};

const user = kcDefault.getCurrentUser();

const context = {
name: kcDefault.getCurrentContext(),
user: user?.name,
cluster: cluster.name,
};

const kc = new k8s.KubeConfig();
kc.loadFromOptions({
clusters: [cluster],
users: [user],
contexts: [context],
currentContext: context.name,
});

return kc;
}

192 changes: 111 additions & 81 deletions src/data/fluxInformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,116 +16,146 @@
* 5. recreate informer as needed
*/
import * as k8s from '@kubernetes/client-node';
import { KubernetesListObject, KubernetesObject } from 'types/kubernetes/kubernetesTypes';
import { GitRepository } from 'types/flux/gitRepository';
import { Kind, KubernetesListObject, KubernetesObject } from 'types/kubernetes/kubernetesTypes';
import { createKubeProxyConfig } from './createKubeProxyConfig';
import { initKubeProxy, kubeProxyPort } from './informerKubeProxy';
import { initKubeConfigWatcher } from './kubeConfigWatcher';

// export const fluxInformers: Record<string, FluxInformer> = {};

function startKubeProxy(): number | false {
return 57375;

type KindPlural = string;
type ApiGroup = string;
type ApiVersion = string;
type ApiEndpointParams = [KindPlural, ApiGroup, ApiVersion];

type InformerEventType = 'add' | 'update' | 'delete';
type InformerEventFunc = (event: InformerEventType, obj: KubernetesObject)=> void;

// TODO: lookup real paths
// TODO: loop for all Kind types to automate this
function getAPIPaths(kind: Kind): ApiEndpointParams {
const paths: Record<Kind, ApiEndpointParams> = {
'GitRepository': ['gitrepositories', 'source.toolkit.fluxcd.io', 'v1'],
} as Record<Kind, ApiEndpointParams>;

return paths[kind];
}

async function informerKeepAlive<T extends KubernetesObject>(informer: k8s.Informer<T>) {
try {
await informer.start();
} catch (err) {
console.error('flux resources informer unavailable:', err);
informer.stop();
setTimeout(() => {
console.info('Restarting informer...');
informer.start();
// this needs to be recursive
}, 2000);

function createKubeProxyConfig(port: number): k8s.KubeConfig {
const kcDefault = new k8s.KubeConfig();
kcDefault.loadFromDefault();
return false;
}
}

const cluster = {
name: kcDefault.getCurrentCluster()?.name,
server: `http://127.0.0.1:${port}`,
};
// registering an add function before informer start will fire for each existing object
// registering after the start wont fire for old objects
// autonomous is somehow simpler in this case

const user = kcDefault.getCurrentUser();

const context = {
name: kcDefault.getCurrentContext(),
user: user?.name,
cluster: cluster.name,
};
export let informer: k8s.Informer<GitRepository> & k8s.ObjectCache<GitRepository> | undefined;

const kc = new k8s.KubeConfig();
kc.loadFromOptions({
clusters: [cluster],
users: [user],
contexts: [context],
currentContext: context.name,
});
export async function initFluxInformers(eventFn?: InformerEventFunc) {
await initKubeConfigWatcher(() => {});
// initKubeProxy();

return kc;
}
// await createFluxInformer();
// setInterval(() => createFluxInformer(), 1000);

function getAPIPaths() {
return {
'gitrepositories': ['source.toolkit.fluxcd.io', 'v1'],
};

// // DEBUG
// setInterval(() => {
// if(informer) {
// console.log('+Informer exists: ', Date().slice(19, 24), informer.list());
// } else {
// console.log('!No Informer: ', Date().slice(19, 24));
// }
// }, 1500);
}

export async function createFluxInformer() {
// running already or no proxy
if(informer || !kubeProxyPort) {
return;
}

// will start a self-healing informer for each resource type and namespaces
export async function startFluxInformers(
sourceDataProvider: any,
workloadDataProvider: any,
templateDataProvider: any): Promise<boolean> {
const port = startKubeProxy();
if(!port) {
return false;
const kc = createKubeProxyConfig(kubeProxyPort);
console.log('starting informer...');

informer = await startInformer(Kind.GitRepository, kc);
if(!informer) {
console.log('failed to start informer');
return;
}

const kc = createKubeProxyConfig(port);
informer.on('error', (err: any) => {
console.error('informer error event', err);
if(informer) {
informer.stop();
}
informer = undefined;
});

const k8sCoreApi = kc.makeApiClient(k8s.CoreV1Api);
const k8sCustomApi = kc.makeApiClient(k8s.CustomObjectsApi);
console.log('informer started');
}

const plural = 'gitrepositories';
const [group, version] = getAPIPaths()[plural];

// will start a self-healing informer for each resource type and namespaces
async function startInformer(kind: Kind, kubeConfig: k8s.KubeConfig) {
// const k8sCoreApi = kc.makeApiClient(k8s.CoreV1Api);
const k8sCustomApi = kubeConfig.makeApiClient(k8s.CustomObjectsApi);

const [plural, group, version] = getAPIPaths(kind);

const listFn = async () => {
const result = await k8sCustomApi.listClusterCustomObject(group, version, plural);
const kbody = result.body as KubernetesListObject<KubernetesObject>;
const kbody = result.body as KubernetesListObject<GitRepository>;
return Promise.resolve({response: result.response, body: kbody});
};

const informer = k8s.makeInformer(
kc,
const kinformer = k8s.makeInformer(
kubeConfig,
`/apis/${group}/${version}/${plural}`,
listFn,
);


informer.on('add', (obj: KubernetesObject) => {
console.log(`Added: ${obj.metadata?.name}`);
sourceDataProvider.add(obj);
});
informer.on('update', (obj: KubernetesObject) => {
console.log(`Updated: ${obj.metadata?.name}`);
sourceDataProvider.update(obj);
});
informer.on('delete', (obj: KubernetesObject) => {
console.log(`Deleted: ${obj.metadata?.name}`);
sourceDataProvider.delete(obj);
});
informer.on('error', (err: any) => {
console.error('ERRORed:', err);
// Restart informer after 5sec
setTimeout(() => {
console.log('Restarting informer...');
informer.start();
}, 2000);
});
try {
await informer.start();
await kinformer.start();
return kinformer;
} catch (err) {
console.error('flux resources informer unavailable:', err);
informer.stop();
setTimeout(() => {
console.info('Restarting informer...');
informer.start();
// this needs to be recursive
}, 2000);

return false;
return undefined;
}
const l = informer.list();
console.log('list:', l);

return true;
}



// console.log('begin informer watch!');
// console.log('listing: ', informer.list());

// informer.on('add', (obj: KubernetesObject) => {
// console.log(`Added: ${obj.metadata?.name}`);
// });

// informer.on('update', (obj: KubernetesObject) => {
// console.log(`Updated: ${obj.metadata?.name}`);
// });

// informer.on('delete', (obj: KubernetesObject) => {
// console.log(`Deleted: ${obj.metadata?.name}`);
// });

// sourceDataProvider.add(obj);
// sourceDataProvider.update(obj);
// sourceDataProvider.delete(obj);

//

Check warning on line 161 in src/data/fluxInformer.ts

View workflow job for this annotation

GitHub Actions / e2e-testing

Newline required at end of file but not found
27 changes: 27 additions & 0 deletions src/data/informerKubeProxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
export let kubeProxyPort: number | undefined;

// export const fluxInformers: Record<string, FluxInformer> = {};
export async function initKubeProxy() {
await startKubeProxy();
setInterval(() => {
if(!kubeProxyPort) {
startKubeProxy();
}
}, 1000);
}



export async function startKubeProxy() {
kubeProxyPort = 57375;
}

export function stopKubeProxy(): void {
kubeProxyPort = undefined;
}






Empty file added src/data/informerPool.ts
Empty file.
Loading

0 comments on commit 22f60c0

Please sign in to comment.