From 3cbc22d5f12a3d85c1f6105eda18992670155a78 Mon Sep 17 00:00:00 2001 From: Shai Katz Date: Mon, 29 Mar 2021 16:25:07 +0300 Subject: [PATCH 1/3] Fix controller memory leak --- .../HostedServices/V1Alpha2Controller.cs | 1 + .../utils/KubernetesExtensions.cs | 27 ++++++++++--------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/crd-controller/HostedServices/V1Alpha2Controller.cs b/src/crd-controller/HostedServices/V1Alpha2Controller.cs index 3107217d..da1de6a2 100644 --- a/src/crd-controller/HostedServices/V1Alpha2Controller.cs +++ b/src/crd-controller/HostedServices/V1Alpha2Controller.cs @@ -53,6 +53,7 @@ private IDisposable ObserveKamusSecret(CancellationToken token) ApiVersion, "kamussecrets", token) + .Take(1) .SelectMany(x => Observable.FromAsync(async () => await HandleEvent(x.Item1, x.Item2)) ) diff --git a/src/crd-controller/utils/KubernetesExtensions.cs b/src/crd-controller/utils/KubernetesExtensions.cs index 10647dbc..d829392c 100644 --- a/src/crd-controller/utils/KubernetesExtensions.cs +++ b/src/crd-controller/utils/KubernetesExtensions.cs @@ -2,6 +2,7 @@ using System.Reactive.Linq; using System.Threading; using k8s; +using Microsoft.VisualBasic; namespace CustomResourceDescriptorController.utils { @@ -13,22 +14,24 @@ public static class KubernetesExtensions string version, string plural, CancellationToken cancellationToken - ) where TCRD : class + ) where TCRD : class { + Watcher watcher = null; return Observable.FromAsync(async () => - { - var subject = new System.Reactive.Subjects.Subject<(WatchEventType, TCRD)>(); - var path = $"apis/{group}/{version}/watch/{plural}"; - await kubernetes.WatchObjectAsync(path, - timeoutSeconds: int.MaxValue, - onEvent: (@type, @event) => subject.OnNext((@type, @event)), - onError: e => subject.OnError(e), - onClosed: () => subject.OnCompleted(), cancellationToken: cancellationToken); - return subject; - }) + { + var subject = new System.Reactive.Subjects.Subject<(WatchEventType, TCRD)>(); + var path = $"apis/{group}/{version}/watch/{plural}"; + watcher = await kubernetes.WatchObjectAsync(path, + timeoutSeconds: int.MaxValue, + onEvent: (@type, @event) => subject.OnNext((@type, @event)), + onError: e => subject.OnError(e), + onClosed: () => subject.OnCompleted(), cancellationToken: cancellationToken); + return subject; + }) .SelectMany(x => x) .Select(t => (t.Item1, t.Item2 as TCRD)) - .Where(t => t.Item2 != null); + .Where(t => t.Item2 != null) + .Finally(() => watcher?.Dispose()); } } } From 2596ec9b3363847a2960637c8bf19033d7f00109 Mon Sep 17 00:00:00 2001 From: Shai Katz Date: Mon, 29 Mar 2021 16:28:14 +0300 Subject: [PATCH 2/3] Bump version --- src/crd-controller/HostedServices/V1Alpha2Controller.cs | 1 + src/crd-controller/crd-controller.csproj | 2 +- src/crd-controller/utils/KubernetesExtensions.cs | 1 - src/decrypt-api/decrypt-api.csproj | 2 +- src/encrypt-api/encrypt-api.csproj | 2 +- 5 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/crd-controller/HostedServices/V1Alpha2Controller.cs b/src/crd-controller/HostedServices/V1Alpha2Controller.cs index da1de6a2..030c6a4d 100644 --- a/src/crd-controller/HostedServices/V1Alpha2Controller.cs +++ b/src/crd-controller/HostedServices/V1Alpha2Controller.cs @@ -70,6 +70,7 @@ private IDisposable ObserveKamusSecret(CancellationToken token) Environment.Exit(0); }); } + public Task StartAsync(CancellationToken token) { mSubscription = ObserveKamusSecret(token); diff --git a/src/crd-controller/crd-controller.csproj b/src/crd-controller/crd-controller.csproj index df194490..05c6e25f 100644 --- a/src/crd-controller/crd-controller.csproj +++ b/src/crd-controller/crd-controller.csproj @@ -7,7 +7,7 @@ - 0.9.0.7 + 0.9.0.8 diff --git a/src/crd-controller/utils/KubernetesExtensions.cs b/src/crd-controller/utils/KubernetesExtensions.cs index d829392c..f36ae9ac 100644 --- a/src/crd-controller/utils/KubernetesExtensions.cs +++ b/src/crd-controller/utils/KubernetesExtensions.cs @@ -2,7 +2,6 @@ using System.Reactive.Linq; using System.Threading; using k8s; -using Microsoft.VisualBasic; namespace CustomResourceDescriptorController.utils { diff --git a/src/decrypt-api/decrypt-api.csproj b/src/decrypt-api/decrypt-api.csproj index 9d81760a..aa2ae7c7 100644 --- a/src/decrypt-api/decrypt-api.csproj +++ b/src/decrypt-api/decrypt-api.csproj @@ -3,7 +3,7 @@ netcoreapp3.1 - 0.9.0.7 + 0.9.0.8 diff --git a/src/encrypt-api/encrypt-api.csproj b/src/encrypt-api/encrypt-api.csproj index 8e1eba6f..e789e663 100644 --- a/src/encrypt-api/encrypt-api.csproj +++ b/src/encrypt-api/encrypt-api.csproj @@ -3,7 +3,7 @@ netcoreapp3.1 - 0.9.0.7 + 0.9.0.8 From a5d8b283f42648b3a4c0937c3186432a368448e1 Mon Sep 17 00:00:00 2001 From: Shai Katz Date: Mon, 29 Mar 2021 16:45:52 +0300 Subject: [PATCH 3/3] First subscribe again, then dispose --- src/crd-controller/HostedServices/V1Alpha2Controller.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/crd-controller/HostedServices/V1Alpha2Controller.cs b/src/crd-controller/HostedServices/V1Alpha2Controller.cs index 030c6a4d..e71e0d22 100644 --- a/src/crd-controller/HostedServices/V1Alpha2Controller.cs +++ b/src/crd-controller/HostedServices/V1Alpha2Controller.cs @@ -76,8 +76,9 @@ public Task StartAsync(CancellationToken token) mSubscription = ObserveKamusSecret(token); Observable.Interval(TimeSpan.FromSeconds(mReconciliationIntervalInSeconds)).Subscribe((s) => { - mSubscription.Dispose(); + var oldSubscription = mSubscription; mSubscription = ObserveKamusSecret(token); + oldSubscription.Dispose(); }); mLogger.Information("Starting watch for KamusSecret V1Alpha2 events");