Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-4.12] Bug OCPBUGS-16706: Add configmap as storage for persisting subscribers #257

Merged
merged 2 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,5 @@ sub.json

build/
bin/

pkg/storage/kubernetes/*.json

9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,11 @@ fmt-code: ## Run go fmt against code.
go fmt ./...

vet: ## Run go vet against code.
go vet ./...
go vet ./...

generate-api:
hack/verify-codegen.sh
rm -rf github.com

install-tools:
hack/install-kubebuilder-tools.sh
69 changes: 55 additions & 14 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
Expand All @@ -25,6 +27,9 @@ import (
"syscall"
"time"

"github.com/redhat-cne/sdk-go/pkg/subscriber"
v1pubs "github.com/redhat-cne/sdk-go/v1/pubsub"

"github.com/redhat-cne/sdk-go/pkg/types"

"github.com/prometheus/client_golang/prometheus/collectors"
Expand All @@ -33,6 +38,8 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/redhat-cne/cloud-event-proxy/pkg/localmetrics"
storageClient "github.com/redhat-cne/cloud-event-proxy/pkg/storage/kubernetes"

log "github.com/sirupsen/logrus"

"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -45,7 +52,6 @@ import (
v1amqp "github.com/redhat-cne/sdk-go/v1/amqp"
v1event "github.com/redhat-cne/sdk-go/v1/event"
v1http "github.com/redhat-cne/sdk-go/v1/http"
v1pubs "github.com/redhat-cne/sdk-go/v1/pubsub"
)

var (
Expand All @@ -61,6 +67,8 @@ var (
httpEventPublisher string
pluginHandler plugins.Handler
amqInitTimeout = 3 * time.Minute
nodeName string
namespace string
)

func main() {
Expand All @@ -84,7 +92,9 @@ func main() {
prometheus.Unregister(collectors.NewGoCollector())

nodeIP := os.Getenv("NODE_IP")
nodeName := os.Getenv("NODE_NAME")
nodeName = os.Getenv("NODE_NAME")
namespace = os.Getenv("NAME_SPACE")

transportHost = common.SanitizeTransportHost(transportHost, nodeIP, nodeName)
eventPublishers := updateHTTPPublishers(nodeIP, nodeName, httpEventPublisher)

Expand All @@ -94,20 +104,36 @@ func main() {
if parsedTransportHost.Err != nil {
log.Errorf("error parsing transport host, data will written to log %s", parsedTransportHost.Err.Error())
}

scConfig = &common.SCConfiguration{
EventInCh: make(chan *channel.DataChan, channelBufferSize),
EventOutCh: make(chan *channel.DataChan, channelBufferSize),
StatusCh: make(chan *channel.StatusChan, statusChannelBufferSize),
CloseCh: make(chan struct{}),
APIPort: apiPort,
APIPath: apiPath,
PubSubAPI: v1pubs.GetAPIInstance(storePath),
StorePath: storePath,
PubSubAPI: v1pubs.GetAPIInstance(storePath),
BaseURL: nil,
TransportHost: parsedTransportHost,
StorageType: storageClient.EmptyDir,
}
/****/

// Use kubeconfig to create client config.
client, err := storageClient.NewClient()
if err != nil {
log.Infof("error fetching client, storage defaulted to emptyDir{} %s", err.Error())
} else {
scConfig.K8sClient = client
}
if namespace != "" && nodeName != "" && scConfig.TransportHost.Type == common.HTTP {
// if consumer doesn't pass namespace then this will default to empty dir
if e := client.InitConfigMap(scConfig.StorePath, nodeName, namespace); e != nil {
log.Errorf("failed to initlialize configmap, subcription will be stored in empty dir %s", e.Error())
} else {
scConfig.StorageType = storageClient.ConfigMap
}
}
metricServer(metricsAddr)
wg := sync.WaitGroup{}
sigCh := make(chan os.Signal, 1)
Expand Down Expand Up @@ -150,7 +176,7 @@ func main() {
}

/* Enable pub/sub services */
_, err := common.StartPubSubService(scConfig)
_, err = common.StartPubSubService(scConfig)
if err != nil {
log.Fatal("pub/sub service API failed to start.")
}
Expand Down Expand Up @@ -186,7 +212,7 @@ func metricServer(address string) {
}, 5*time.Second, scConfig.CloseCh)
}

// ProcessOutChannel this process the out channel;data put out by amqp
// ProcessOutChannel this process the out channel;data put out by transport
func ProcessOutChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) {
// Send back the acknowledgement to publisher
defer wg.Done()
Expand Down Expand Up @@ -257,15 +283,30 @@ func ProcessOutChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) {
log.Errorf("failed to receive status request to address %s", d.Address)
localmetrics.UpdateStatusAckCount(d.Address, localmetrics.FAILED)
}
} else if d.Type == channel.SUBSCRIBER {
if d.Status == channel.SUCCESS {
} else if d.Type == channel.SUBSCRIBER { // these data are provided by HTTP transport
if scConfig.StorageType != storageClient.ConfigMap {
continue
}
if d.Status == channel.SUCCESS && d.Data != nil {
var obj subscriber.Subscriber
if err := json.Unmarshal(d.Data.Data(), &obj); err != nil {
log.Infof("data is not subscriber object ignoring processing")
continue
}
log.Infof("subscriber processed for %s", d.Address)
if err := scConfig.K8sClient.UpdateConfigMap(context.Background(), []subscriber.Subscriber{obj}, nodeName, namespace); err != nil {
log.Errorf("failed to update subscription in configmap %s", err.Error())
} else {
log.Infof("subscriber saved in configmap %s", obj.String())
}
} else if d.Status == channel.DELETE {
scConfig.EventInCh <- &channel.DataChan{
ClientID: d.ClientID,
Data: d.Data,
Status: channel.DELETE,
Type: channel.SUBSCRIBER,
var obj subscriber.Subscriber
obj.Action = channel.DELETE
obj.ClientID = d.ClientID
if err := scConfig.K8sClient.UpdateConfigMap(context.Background(), []subscriber.Subscriber{obj}, nodeName, namespace); err != nil {
log.Errorf("failed to delete subscription in configmap %s", err.Error())
} else {
log.Infof("deleted subscription %s ", obj.ClientID.String())
}
}
}
Expand Down Expand Up @@ -354,7 +395,7 @@ func enableHTTPTransport(wg *sync.WaitGroup, eventPublishers []string) bool {
var httpServer *v1http.HTTP
httpServer, err := pluginHandler.LoadHTTPPlugin(wg, scConfig, nil, nil)
if err != nil {
log.Warnf(" failied to load http plugin for tansport %s", err.Error())
log.Warnf("failed to load http plugin for tansport %s", err.Error())
scConfig.PubSubAPI.DisableTransport()
return false
}
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ COPY . .

RUN hack/build-example-go.sh

FROM openshift/origin-base AS bin
FROM registry.ci.openshift.org/ocp/4.12:base AS bin
COPY --from=builder /go/src/github.com/redhat-cne/cloud-event-proxy/build/cloud-event-consumer /

LABEL io.k8s.display-name="Cloud Event Proxy Sample Consumer" \
Expand Down
4 changes: 4 additions & 0 deletions examples/consumer/deployment/namespace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,9 @@ kind: Namespace
metadata:
name: cloud-events
labels:
security.openshift.io/scc.podSecurityLabelSync: "false"
pod-security.kubernetes.io/audit: "privileged"
pod-security.kubernetes.io/enforce: "privileged"
pod-security.kubernetes.io/warn: "privileged"
name: cloud-events
openshift.io/cluster-monitoring: "true"
13 changes: 12 additions & 1 deletion examples/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,20 @@ RETRY:
}
}()
}

log.Info("waiting for events")
wg.Wait()
deleteAllSubscriptions()
time.Sleep(3 * time.Second)
}

func deleteAllSubscriptions() int {
var status int
deleteURL := &types.URI{URL: url.URL{Scheme: "http",
Host: apiAddr,
Path: fmt.Sprintf("%s%s", apiPath, "subscriptions")}}
rc := restclient.New()
status = rc.Delete(deleteURL)
return status
}

func createSubscription(resourceAddress string) (sub pubsub.PubSub, err error) {
Expand Down
2 changes: 1 addition & 1 deletion examples/manifests/consumer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ spec:
- "--store-path=/store"
#- "--transport-host=amqp://amq-router.$(AMQP_NAMESPACE).svc.cluster.local"
- "--transport-host=consumer-events-subscription-service.cloud-events.svc.cluster.local:9043"
- "--http-event-publishers=ptp-event-publisher-service.openshift-ptp.svc.cluster.local:9043"
- "--http-event-publishers=ptp-event-publisher-service-NODE_NAME.openshift-ptp.svc.cluster.local:9043"
- "--api-port=8089"
env:
- name: AMQP_NAMESPACE
Expand Down
4 changes: 4 additions & 0 deletions examples/manifests/namespace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,9 @@ kind: Namespace
metadata:
name: cloud-events
labels:
security.openshift.io/scc.podSecurityLabelSync: "false"
pod-security.kubernetes.io/audit: "privileged"
pod-security.kubernetes.io/enforce: "privileged"
pod-security.kubernetes.io/warn: "privileged"
name: cloud-events
openshift.io/cluster-monitoring: "true"
20 changes: 20 additions & 0 deletions examples/simplehttp/external-service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
apiVersion: "v1"
kind: "Service"
metadata:
name: "event-consumer-external"
spec:
type: ClusterIP
ports:
- port: 27017
targetPort: 27017
---
apiVersion: "v1"
kind: "Endpoints"
metadata:
name: "event-consumer-external"
subsets:
- addresses:
- ip: 10.22.18.119 #change this to your laptop ip address
ports:
- port: 27017
37 changes: 14 additions & 23 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,38 @@ module github.com/redhat-cne/cloud-event-proxy
go 1.19

require (
github.com/blang/semver v3.5.1+incompatible
github.com/cloudevents/sdk-go/v2 v2.13.0
github.com/fsnotify/fsnotify v1.6.0
github.com/golang/glog v1.0.0
github.com/google/uuid v1.3.0
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.22.1
github.com/onsi/gomega v1.23.0
github.com/prometheus/client_golang v1.14.0
github.com/redhat-cne/rest-api v0.1.1-0.20230405200502-edf9a81fade8
github.com/redhat-cne/sdk-go v0.1.1-0.20230405191318-0f443c518882
github.com/redhat-cne/rest-api v0.1.1-0.20230417132116-46717e183c05
github.com/redhat-cne/sdk-go v0.1.1-0.20230417130643-d8a16493413b
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.1
golang.org/x/net v0.7.0
k8s.io/api v0.24.2
k8s.io/api v0.26.0
k8s.io/apiextensions-apiserver v0.24.2
k8s.io/apimachinery v0.24.2
k8s.io/apimachinery v0.26.3
k8s.io/client-go v0.24.2
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
k8s.io/utils v0.0.0-20221107191617-1a15be271d1d
sigs.k8s.io/controller-runtime v0.12.3
)

require (
github.com/Azure/go-amqp v0.17.5 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cloudevents/sdk-go/protocol/amqp/v2 v2.13.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/go-logr/logr v1.2.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
Expand All @@ -53,7 +52,6 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo/v2 v2.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
Expand All @@ -74,16 +72,9 @@ require (
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.60.1 // indirect
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42 // indirect
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
k8s.io/klog/v2 v2.80.1 // indirect
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

// Manually pinned to kubernetes-1.24.0
replace (
k8s.io/api => k8s.io/api v0.24.0
k8s.io/apimachinery => k8s.io/apimachinery v0.24.0
k8s.io/client-go => k8s.io/client-go v0.24.0
)
Loading
Loading