From 22f60c0bd202d7a87104e5c7044f62b9877f040b Mon Sep 17 00:00:00 2001 From: Juozas Gaigalas Date: Mon, 10 Jul 2023 17:17:32 +0300 Subject: [PATCH] client-node wip --- src/cli/kubernetes/kubernetesConfig.ts | 24 ++- src/cli/shell/exec.ts | 1 + src/data/createKubeProxyConfig.ts | 45 ++++ src/data/fluxInformer.ts | 192 ++++++++++-------- src/data/informerKubeProxy.ts | 27 +++ src/data/informerPool.ts | 0 src/data/kubeConfigWatcher.ts | 114 +++++++++++ src/extension.ts | 5 +- src/index.d.ts | 1 + src/types/errorable.ts | 16 ++ .../dataProviders/clusterDataProvider.ts | 4 +- src/ui/treeviews/treeViews.ts | 4 +- src/utils/utils.ts | 5 + 13 files changed, 347 insertions(+), 91 deletions(-) create mode 100644 src/data/createKubeProxyConfig.ts create mode 100644 src/data/informerKubeProxy.ts create mode 100644 src/data/informerPool.ts create mode 100644 src/data/kubeConfigWatcher.ts diff --git a/src/cli/kubernetes/kubernetesConfig.ts b/src/cli/kubernetes/kubernetesConfig.ts index 3ded0e70..aa051124 100644 --- a/src/cli/kubernetes/kubernetesConfig.ts +++ b/src/cli/kubernetes/kubernetesConfig.ts @@ -3,13 +3,13 @@ import { window } from 'vscode'; import { shellCodeError } from 'cli/shell/exec'; import { setVSCodeContext, telemetry } from 'extension'; -import { Errorable, failed, succeeded } from 'types/errorable'; +import { Errorable, aresult, failed, succeeded } from 'types/errorable'; import { ContextId } from 'types/extensionIds'; import { KubernetesConfig, KubernetesContextWithCluster } from 'types/kubernetes/kubernetesConfig'; import { TelemetryError } from 'types/telemetryEventNames'; import { parseJson } from 'utils/jsonUtils'; -import { invokeKubectlCommand } from './kubernetesToolsKubectl'; import { clearSupportedResourceKinds } from './kubectlGet'; +import { invokeKubectlCommand } from './kubernetesToolsKubectl'; export let currentContextName: string; @@ -38,7 +38,7 @@ export async function getKubectlConfig(): Promise> { /** * Gets current kubectl context name. */ -export async function getCurrentContext(): Promise> { +export async function getCurrentContextName(): Promise> { const currentContextShellResult = await invokeKubectlCommand('config current-context'); if (currentContextShellResult?.code !== 0) { telemetry.sendError(TelemetryError.FAILED_TO_GET_CURRENT_KUBERNETES_CONTEXT); @@ -67,7 +67,7 @@ export async function getCurrentContext(): Promise> { * whether or not context was switched or didn't need it (current). */ export async function setCurrentContext(contextName: string): Promise { - const currentContextResult = await getCurrentContext(); + const currentContextResult = await getCurrentContextName(); if (succeeded(currentContextResult) && currentContextResult.result === contextName) { return { isChanged: false, @@ -136,3 +136,19 @@ export async function getClusterName(contextName: string): Promise { } } +export async function getCurrentContextWithCluster(): Promise { + const [contextName, contexts] = await Promise.all([ + aresult(getCurrentContextName()), + aresult(getContexts()), + ]); + + if(!contextName || !contexts) { + return; + } + + // const contexts = result(contextsResults); + const context = contexts?.find(c => c.name === contextName); + + return context; +} + diff --git a/src/cli/shell/exec.ts b/src/cli/shell/exec.ts index 66549455..f8d82a00 100644 --- a/src/cli/shell/exec.ts +++ b/src/cli/shell/exec.ts @@ -109,6 +109,7 @@ function execOpts({ cwd }: { cwd?: string; } = {}): shelljs.ExecOptions { cwd: cwd, env: env, async: true, + silent: true, }; return opts; } diff --git a/src/data/createKubeProxyConfig.ts b/src/data/createKubeProxyConfig.ts new file mode 100644 index 00000000..c7f1b49c --- /dev/null +++ b/src/data/createKubeProxyConfig.ts @@ -0,0 +1,45 @@ +import * as k8s from '@kubernetes/client-node'; +import { ActionOnInvalid } from '@kubernetes/client-node/dist/config_types'; +import { kubeConfigPath } from './kubeConfigWatcher'; + +function loadDefaultKubeConfig(): k8s.KubeConfig { + const kc = new k8s.KubeConfig(); + const opts = {onInvalidEntry: ActionOnInvalid.FILTER}; + const kcFilePath = kubeConfigPath(); + + if(kcFilePath) { + kc.loadFromFile(kcFilePath, opts); + } else { + kc.loadFromDefault(); + } + + return kc; +} + +export function createKubeProxyConfig(port: number): k8s.KubeConfig { + const kcDefault = loadDefaultKubeConfig(); + + const cluster = { + name: kcDefault.getCurrentCluster()?.name, + server: `http://127.0.0.1:${port}`, + }; + + const user = kcDefault.getCurrentUser(); + + const context = { + name: kcDefault.getCurrentContext(), + user: user?.name, + cluster: cluster.name, + }; + + const kc = new k8s.KubeConfig(); + kc.loadFromOptions({ + clusters: [cluster], + users: [user], + contexts: [context], + currentContext: context.name, + }); + + return kc; +} + diff --git a/src/data/fluxInformer.ts b/src/data/fluxInformer.ts index 88038675..2de72a1f 100644 --- a/src/data/fluxInformer.ts +++ b/src/data/fluxInformer.ts @@ -16,116 +16,146 @@ * 5. recreate informer as needed */ import * as k8s from '@kubernetes/client-node'; -import { KubernetesListObject, KubernetesObject } from 'types/kubernetes/kubernetesTypes'; +import { GitRepository } from 'types/flux/gitRepository'; +import { Kind, KubernetesListObject, KubernetesObject } from 'types/kubernetes/kubernetesTypes'; +import { createKubeProxyConfig } from './createKubeProxyConfig'; +import { initKubeProxy, kubeProxyPort } from './informerKubeProxy'; +import { initKubeConfigWatcher } from './kubeConfigWatcher'; -// export const fluxInformers: Record = {}; -function startKubeProxy(): number | false { - return 57375; + +type KindPlural = string; +type ApiGroup = string; +type ApiVersion = string; +type ApiEndpointParams = [KindPlural, ApiGroup, ApiVersion]; + +type InformerEventType = 'add' | 'update' | 'delete'; +type InformerEventFunc = (event: InformerEventType, obj: KubernetesObject)=> void; + +// TODO: lookup real paths +// TODO: loop for all Kind types to automate this +function getAPIPaths(kind: Kind): ApiEndpointParams { + const paths: Record = { + 'GitRepository': ['gitrepositories', 'source.toolkit.fluxcd.io', 'v1'], + } as Record; + + return paths[kind]; } +async function informerKeepAlive(informer: k8s.Informer) { + try { + await informer.start(); + } catch (err) { + console.error('flux resources informer unavailable:', err); + informer.stop(); + setTimeout(() => { + console.info('Restarting informer...'); + informer.start(); + // this needs to be recursive + }, 2000); -function createKubeProxyConfig(port: number): k8s.KubeConfig { - const kcDefault = new k8s.KubeConfig(); - kcDefault.loadFromDefault(); + return false; + } +} - const cluster = { - name: kcDefault.getCurrentCluster()?.name, - server: `http://127.0.0.1:${port}`, - }; +// registering an add function before informer start will fire for each existing object +// registering after the start wont fire for old objects +// autonomous is somehow simpler in this case - const user = kcDefault.getCurrentUser(); - const context = { - name: kcDefault.getCurrentContext(), - user: user?.name, - cluster: cluster.name, - }; +export let informer: k8s.Informer & k8s.ObjectCache | undefined; - const kc = new k8s.KubeConfig(); - kc.loadFromOptions({ - clusters: [cluster], - users: [user], - contexts: [context], - currentContext: context.name, - }); +export async function initFluxInformers(eventFn?: InformerEventFunc) { + await initKubeConfigWatcher(() => {}); + // initKubeProxy(); - return kc; -} + // await createFluxInformer(); + // setInterval(() => createFluxInformer(), 1000); -function getAPIPaths() { - return { - 'gitrepositories': ['source.toolkit.fluxcd.io', 'v1'], - }; + + // // DEBUG + // setInterval(() => { + // if(informer) { + // console.log('+Informer exists: ', Date().slice(19, 24), informer.list()); + // } else { + // console.log('!No Informer: ', Date().slice(19, 24)); + // } + // }, 1500); } +export async function createFluxInformer() { + // running already or no proxy + if(informer || !kubeProxyPort) { + return; + } -// will start a self-healing informer for each resource type and namespaces -export async function startFluxInformers( - sourceDataProvider: any, - workloadDataProvider: any, - templateDataProvider: any): Promise { - const port = startKubeProxy(); - if(!port) { - return false; + const kc = createKubeProxyConfig(kubeProxyPort); + console.log('starting informer...'); + + informer = await startInformer(Kind.GitRepository, kc); + if(!informer) { + console.log('failed to start informer'); + return; } - const kc = createKubeProxyConfig(port); + informer.on('error', (err: any) => { + console.error('informer error event', err); + if(informer) { + informer.stop(); + } + informer = undefined; + }); - const k8sCoreApi = kc.makeApiClient(k8s.CoreV1Api); - const k8sCustomApi = kc.makeApiClient(k8s.CustomObjectsApi); + console.log('informer started'); +} - const plural = 'gitrepositories'; - const [group, version] = getAPIPaths()[plural]; + +// will start a self-healing informer for each resource type and namespaces +async function startInformer(kind: Kind, kubeConfig: k8s.KubeConfig) { + // const k8sCoreApi = kc.makeApiClient(k8s.CoreV1Api); + const k8sCustomApi = kubeConfig.makeApiClient(k8s.CustomObjectsApi); + + const [plural, group, version] = getAPIPaths(kind); const listFn = async () => { const result = await k8sCustomApi.listClusterCustomObject(group, version, plural); - const kbody = result.body as KubernetesListObject; + const kbody = result.body as KubernetesListObject; return Promise.resolve({response: result.response, body: kbody}); }; - const informer = k8s.makeInformer( - kc, + const kinformer = k8s.makeInformer( + kubeConfig, `/apis/${group}/${version}/${plural}`, listFn, ); - - informer.on('add', (obj: KubernetesObject) => { - console.log(`Added: ${obj.metadata?.name}`); - sourceDataProvider.add(obj); - }); - informer.on('update', (obj: KubernetesObject) => { - console.log(`Updated: ${obj.metadata?.name}`); - sourceDataProvider.update(obj); - }); - informer.on('delete', (obj: KubernetesObject) => { - console.log(`Deleted: ${obj.metadata?.name}`); - sourceDataProvider.delete(obj); - }); - informer.on('error', (err: any) => { - console.error('ERRORed:', err); - // Restart informer after 5sec - setTimeout(() => { - console.log('Restarting informer...'); - informer.start(); - }, 2000); - }); try { - await informer.start(); + await kinformer.start(); + return kinformer; } catch (err) { - console.error('flux resources informer unavailable:', err); - informer.stop(); - setTimeout(() => { - console.info('Restarting informer...'); - informer.start(); - // this needs to be recursive - }, 2000); - - return false; + return undefined; } - const l = informer.list(); - console.log('list:', l); - - return true; } + + + +// console.log('begin informer watch!'); +// console.log('listing: ', informer.list()); + +// informer.on('add', (obj: KubernetesObject) => { +// console.log(`Added: ${obj.metadata?.name}`); +// }); + +// informer.on('update', (obj: KubernetesObject) => { +// console.log(`Updated: ${obj.metadata?.name}`); +// }); + +// informer.on('delete', (obj: KubernetesObject) => { +// console.log(`Deleted: ${obj.metadata?.name}`); +// }); + +// sourceDataProvider.add(obj); +// sourceDataProvider.update(obj); +// sourceDataProvider.delete(obj); + +// \ No newline at end of file diff --git a/src/data/informerKubeProxy.ts b/src/data/informerKubeProxy.ts new file mode 100644 index 00000000..66c0c85a --- /dev/null +++ b/src/data/informerKubeProxy.ts @@ -0,0 +1,27 @@ +export let kubeProxyPort: number | undefined; + +// export const fluxInformers: Record = {}; +export async function initKubeProxy() { + await startKubeProxy(); + setInterval(() => { + if(!kubeProxyPort) { + startKubeProxy(); + } + }, 1000); +} + + + +export async function startKubeProxy() { + kubeProxyPort = 57375; +} + +export function stopKubeProxy(): void { + kubeProxyPort = undefined; +} + + + + + + diff --git a/src/data/informerPool.ts b/src/data/informerPool.ts new file mode 100644 index 00000000..e69de29b diff --git a/src/data/kubeConfigWatcher.ts b/src/data/kubeConfigWatcher.ts new file mode 100644 index 00000000..4474a10c --- /dev/null +++ b/src/data/kubeConfigWatcher.ts @@ -0,0 +1,114 @@ + +import * as kubernetes from 'vscode-kubernetes-tools-api'; +import * as vscode from 'vscode'; +import { Utils } from 'vscode-uri'; +import deepEqual from 'lite-deep-equal'; +import { ConfigurationV1_1 as KubernetesToolsConfigurationV1_1} from 'vscode-kubernetes-tools-api/js/configuration/v1_1'; + + +// import { ConfigurationV1_1 } from 'vscode-kubernetes-tools-api/js/configuration/v1_1'; +import { get } from 'http'; +import { getContexts, getCurrentContextName, getCurrentContextWithCluster, getKubectlConfig } from 'cli/kubernetes/kubernetesConfig'; +import { getCurrentClusterInfo } from 'ui/treeviews/treeViews'; +import { aresult, failed, result, results } from 'types/errorable'; +import { KubernetesCluster, KubernetesContext, KubernetesContextWithCluster } from 'types/kubernetes/kubernetesConfig'; + + +// type KubeConfigUniqueParams = { +// contextName: string; +// clusterName: string; +// clusterServer: string; +// userName: string; +// }; + +let fsWacher: vscode.FileSystemWatcher | undefined; +let selectedKubeConfigPath: KubernetesToolsConfigurationV1_1.KubeconfigPath | undefined; +let contextWithCluster: KubernetesContextWithCluster | undefined; + +export function kubeConfigPath(): string | undefined { + if(selectedKubeConfigPath && selectedKubeConfigPath.pathType === 'host') { + return selectedKubeConfigPath.hostPath; + } +} + + +export async function initKubeConfigWatcher(changedFn: ()=> void) { + const configuration = await kubernetes.extension.configuration.v1_1; + if (!configuration.available) { + return; + } + + selectedKubeConfigPath = configuration.api.getKubeconfigPath(); + contextWithCluster = await getCurrentContextWithCluster(); + + await initKubeConfigPathWatcher(); + console.log('watching kubeconfigs'); +} + + +// when the user changes the KUBECONFIG path +// using the kubernetes tools extension +// let previousKubeConfigPath: k8s.ConfigurationV1_1.KubeconfigPath | undefined; + +async function initKubeConfigPathWatcher() { + const configuration = await kubernetes.extension.configuration.v1_1; + if (!configuration.available) { + return; + } + + configuration.api.onDidChangeKubeconfigPath(path => { + // fires twice + console.log('path changed!', path); + if(path !== selectedKubeConfigPath) { + selectedKubeConfigPath = path; + // TODO: recreate file watcher + + if(path.pathType === 'host') { + stopFsWatcher(); + startFsWatcher(path.hostPath); + } else { + // disable file watcher for WSL for now + console.error('WSL not yet supported'); + stopFsWatcher(); + } + } + }); + + + configuration.api.onDidChangeContext((context) => { + // current context is changed, do something with it + console.log('context changed!', context); + }); + + + +} + +function startFsWatcher(path: string) { + const uri = vscode.Uri.file(path); + const dirname = Utils.dirname(uri); + const basename = Utils.basename(uri); + // const data = await vscode.workspace.fs.readFile(uri); + // console.log(data); + + const watcher = vscode.workspace.createFileSystemWatcher(new vscode.RelativePattern(dirname, basename)); + watcher.onDidChange((e) => { + console.log('kubeconfig file changed!', e); + }); +} + +function stopFsWatcher() { + if(fsWacher) { + fsWacher.dispose(); + fsWacher = undefined; + } +} + + +async function initKubeConfigFileWatcher() { + const configuration = await kubernetes.extension.configuration.v1_1; + if (!configuration.available) { + return; + } + +} diff --git a/src/extension.ts b/src/extension.ts index 862e653e..5e050d2e 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -1,6 +1,6 @@ import { commands, ExtensionContext, ExtensionMode, window, workspace } from 'vscode'; -import { startFluxInformers } from 'data/fluxInformer'; +import { initFluxInformers } from 'data/fluxInformer'; import { checkFluxPrerequisites, checkWGEVersion } from './cli/checkVersions'; import { shell } from './cli/shell/exec'; import { registerCommands } from './commands/commands'; @@ -45,7 +45,8 @@ export async function activate(context: ExtensionContext) { // create gitops tree views createTreeViews(); - // startFluxInformers(sourceTreeViewProvider, workloadTreeViewProvider, templateTreeViewProvider); + // await startFluxInformers(sourceTreeViewProvider, workloadTreeViewProvider, templateTreeViewProvider); + await initFluxInformers(); // register gitops commands registerCommands(context); diff --git a/src/index.d.ts b/src/index.d.ts index 222d0a57..ab4ef6fb 100644 --- a/src/index.d.ts +++ b/src/index.d.ts @@ -1,2 +1,3 @@ declare module 'tinytim'; declare module 'shell-escape-tag'; +declare module 'lite-deep-equal'; diff --git a/src/types/errorable.ts b/src/types/errorable.ts index 9379c33c..afab1391 100644 --- a/src/types/errorable.ts +++ b/src/types/errorable.ts @@ -18,3 +18,19 @@ export function failed(e: Errorable): e is Failed { return !e.succeeded; } +export function result(e: Errorable): T | undefined { + if (succeeded(e)) { + return e.result; + } else { + return undefined; + } +} + +export async function aresult(e: Promise>): Promise { + const r = await e; + return result(r); +} + +export function results(es: Errorable[]): (T | undefined)[] { + return es.map(e => result(e)); +} diff --git a/src/ui/treeviews/dataProviders/clusterDataProvider.ts b/src/ui/treeviews/dataProviders/clusterDataProvider.ts index 26dc793e..bbe1965d 100644 --- a/src/ui/treeviews/dataProviders/clusterDataProvider.ts +++ b/src/ui/treeviews/dataProviders/clusterDataProvider.ts @@ -1,6 +1,6 @@ import { fluxTools } from 'cli/flux/fluxTools'; import { getFluxControllers } from 'cli/kubernetes/kubectlGet'; -import { getContexts, getCurrentContext } from 'cli/kubernetes/kubernetesConfig'; +import { getContexts, getCurrentContextName } from 'cli/kubernetes/kubernetesConfig'; import { setVSCodeContext } from 'extension'; import { failed } from 'types/errorable'; import { ContextId } from 'types/extensionIds'; @@ -52,7 +52,7 @@ export class ClusterDataProvider extends DataProvider { const [contextsResult, currentContextResult] = await Promise.all([ getContexts(), - getCurrentContext(), + getCurrentContextName(), ]); if (failed(contextsResult)) { diff --git a/src/ui/treeviews/treeViews.ts b/src/ui/treeviews/treeViews.ts index e9f81500..dca78a5c 100644 --- a/src/ui/treeviews/treeViews.ts +++ b/src/ui/treeviews/treeViews.ts @@ -13,7 +13,7 @@ import { ClusterNode } from './nodes/cluster/clusterNode'; import { TreeNode } from './nodes/treeNode'; import { detectClusterProvider } from 'cli/kubernetes/clusterProvider'; -import { getClusterName, getCurrentContext } from 'cli/kubernetes/kubernetesConfig'; +import { getClusterName, getCurrentContextName } from 'cli/kubernetes/kubernetesConfig'; import { ClusterInfo } from 'types/kubernetes/clusterProvider'; import { TemplateDataProvider } from './dataProviders/templateDataProvider'; @@ -148,7 +148,7 @@ export function refreshTemplatesTreeView(node?: TreeNode) { * 3. Detect cluster provider. */ export async function getCurrentClusterInfo(): Promise> { - const currentContextResult = await getCurrentContext(); + const currentContextResult = await getCurrentContextName(); if (failed(currentContextResult)) { const error = `Failed to get current context ${currentContextResult.error[0]}`; diff --git a/src/utils/utils.ts b/src/utils/utils.ts index 7a3e5428..2a1bf24a 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -6,3 +6,8 @@ export async function delay(ms: number) { return new Promise(resolve => setTimeout(resolve, ms)); } + +// small timestamp for debug +export function ts() { + return Date().slice(19, 24); +}