Skip to content

Commit

Permalink
Merge pull request #257 from jzding/config-map-4.12
Browse files Browse the repository at this point in the history
[release-4.12] Bug OCPBUGS-16706: Add configmap as storage for persisting subscribers
  • Loading branch information
openshift-merge-robot authored Jul 26, 2023
2 parents ce7a22d + ec44707 commit 23ad6e2
Show file tree
Hide file tree
Showing 1,179 changed files with 117,370 additions and 14,259 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,5 @@ sub.json

build/
bin/

pkg/storage/kubernetes/*.json

9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,11 @@ fmt-code: ## Run go fmt against code.
go fmt ./...

vet: ## Run go vet against code.
go vet ./...
go vet ./...

generate-api:
hack/verify-codegen.sh
rm -rf github.com

install-tools:
hack/install-kubebuilder-tools.sh
69 changes: 55 additions & 14 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
Expand All @@ -25,6 +27,9 @@ import (
"syscall"
"time"

"github.com/redhat-cne/sdk-go/pkg/subscriber"
v1pubs "github.com/redhat-cne/sdk-go/v1/pubsub"

"github.com/redhat-cne/sdk-go/pkg/types"

"github.com/prometheus/client_golang/prometheus/collectors"
Expand All @@ -33,6 +38,8 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/redhat-cne/cloud-event-proxy/pkg/localmetrics"
storageClient "github.com/redhat-cne/cloud-event-proxy/pkg/storage/kubernetes"

log "github.com/sirupsen/logrus"

"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -45,7 +52,6 @@ import (
v1amqp "github.com/redhat-cne/sdk-go/v1/amqp"
v1event "github.com/redhat-cne/sdk-go/v1/event"
v1http "github.com/redhat-cne/sdk-go/v1/http"
v1pubs "github.com/redhat-cne/sdk-go/v1/pubsub"
)

var (
Expand All @@ -61,6 +67,8 @@ var (
httpEventPublisher string
pluginHandler plugins.Handler
amqInitTimeout = 3 * time.Minute
nodeName string
namespace string
)

func main() {
Expand All @@ -84,7 +92,9 @@ func main() {
prometheus.Unregister(collectors.NewGoCollector())

nodeIP := os.Getenv("NODE_IP")
nodeName := os.Getenv("NODE_NAME")
nodeName = os.Getenv("NODE_NAME")
namespace = os.Getenv("NAME_SPACE")

transportHost = common.SanitizeTransportHost(transportHost, nodeIP, nodeName)
eventPublishers := updateHTTPPublishers(nodeIP, nodeName, httpEventPublisher)

Expand All @@ -94,20 +104,36 @@ func main() {
if parsedTransportHost.Err != nil {
log.Errorf("error parsing transport host, data will written to log %s", parsedTransportHost.Err.Error())
}

scConfig = &common.SCConfiguration{
EventInCh: make(chan *channel.DataChan, channelBufferSize),
EventOutCh: make(chan *channel.DataChan, channelBufferSize),
StatusCh: make(chan *channel.StatusChan, statusChannelBufferSize),
CloseCh: make(chan struct{}),
APIPort: apiPort,
APIPath: apiPath,
PubSubAPI: v1pubs.GetAPIInstance(storePath),
StorePath: storePath,
PubSubAPI: v1pubs.GetAPIInstance(storePath),
BaseURL: nil,
TransportHost: parsedTransportHost,
StorageType: storageClient.EmptyDir,
}
/****/

// Use kubeconfig to create client config.
client, err := storageClient.NewClient()
if err != nil {
log.Infof("error fetching client, storage defaulted to emptyDir{} %s", err.Error())
} else {
scConfig.K8sClient = client
}
if namespace != "" && nodeName != "" && scConfig.TransportHost.Type == common.HTTP {
// if consumer doesn't pass namespace then this will default to empty dir
if e := client.InitConfigMap(scConfig.StorePath, nodeName, namespace); e != nil {
log.Errorf("failed to initlialize configmap, subcription will be stored in empty dir %s", e.Error())
} else {
scConfig.StorageType = storageClient.ConfigMap
}
}
metricServer(metricsAddr)
wg := sync.WaitGroup{}
sigCh := make(chan os.Signal, 1)
Expand Down Expand Up @@ -150,7 +176,7 @@ func main() {
}

/* Enable pub/sub services */
_, err := common.StartPubSubService(scConfig)
_, err = common.StartPubSubService(scConfig)
if err != nil {
log.Fatal("pub/sub service API failed to start.")
}
Expand Down Expand Up @@ -186,7 +212,7 @@ func metricServer(address string) {
}, 5*time.Second, scConfig.CloseCh)
}

// ProcessOutChannel this process the out channel;data put out by amqp
// ProcessOutChannel this process the out channel;data put out by transport
func ProcessOutChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) {
// Send back the acknowledgement to publisher
defer wg.Done()
Expand Down Expand Up @@ -257,15 +283,30 @@ func ProcessOutChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) {
log.Errorf("failed to receive status request to address %s", d.Address)
localmetrics.UpdateStatusAckCount(d.Address, localmetrics.FAILED)
}
} else if d.Type == channel.SUBSCRIBER {
if d.Status == channel.SUCCESS {
} else if d.Type == channel.SUBSCRIBER { // these data are provided by HTTP transport
if scConfig.StorageType != storageClient.ConfigMap {
continue
}
if d.Status == channel.SUCCESS && d.Data != nil {
var obj subscriber.Subscriber
if err := json.Unmarshal(d.Data.Data(), &obj); err != nil {
log.Infof("data is not subscriber object ignoring processing")
continue
}
log.Infof("subscriber processed for %s", d.Address)
if err := scConfig.K8sClient.UpdateConfigMap(context.Background(), []subscriber.Subscriber{obj}, nodeName, namespace); err != nil {
log.Errorf("failed to update subscription in configmap %s", err.Error())
} else {
log.Infof("subscriber saved in configmap %s", obj.String())
}
} else if d.Status == channel.DELETE {
scConfig.EventInCh <- &channel.DataChan{
ClientID: d.ClientID,
Data: d.Data,
Status: channel.DELETE,
Type: channel.SUBSCRIBER,
var obj subscriber.Subscriber
obj.Action = channel.DELETE
obj.ClientID = d.ClientID
if err := scConfig.K8sClient.UpdateConfigMap(context.Background(), []subscriber.Subscriber{obj}, nodeName, namespace); err != nil {
log.Errorf("failed to delete subscription in configmap %s", err.Error())
} else {
log.Infof("deleted subscription %s ", obj.ClientID.String())
}
}
}
Expand Down Expand Up @@ -354,7 +395,7 @@ func enableHTTPTransport(wg *sync.WaitGroup, eventPublishers []string) bool {
var httpServer *v1http.HTTP
httpServer, err := pluginHandler.LoadHTTPPlugin(wg, scConfig, nil, nil)
if err != nil {
log.Warnf(" failied to load http plugin for tansport %s", err.Error())
log.Warnf("failed to load http plugin for tansport %s", err.Error())
scConfig.PubSubAPI.DisableTransport()
return false
}
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ COPY . .

RUN hack/build-example-go.sh

FROM openshift/origin-base AS bin
FROM registry.ci.openshift.org/ocp/4.12:base AS bin
COPY --from=builder /go/src/github.com/redhat-cne/cloud-event-proxy/build/cloud-event-consumer /

LABEL io.k8s.display-name="Cloud Event Proxy Sample Consumer" \
Expand Down
4 changes: 4 additions & 0 deletions examples/consumer/deployment/namespace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,9 @@ kind: Namespace
metadata:
name: cloud-events
labels:
security.openshift.io/scc.podSecurityLabelSync: "false"
pod-security.kubernetes.io/audit: "privileged"
pod-security.kubernetes.io/enforce: "privileged"
pod-security.kubernetes.io/warn: "privileged"
name: cloud-events
openshift.io/cluster-monitoring: "true"
13 changes: 12 additions & 1 deletion examples/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,20 @@ RETRY:
}
}()
}

log.Info("waiting for events")
wg.Wait()
deleteAllSubscriptions()
time.Sleep(3 * time.Second)
}

func deleteAllSubscriptions() int {
var status int
deleteURL := &types.URI{URL: url.URL{Scheme: "http",
Host: apiAddr,
Path: fmt.Sprintf("%s%s", apiPath, "subscriptions")}}
rc := restclient.New()
status = rc.Delete(deleteURL)
return status
}

func createSubscription(resourceAddress string) (sub pubsub.PubSub, err error) {
Expand Down
2 changes: 1 addition & 1 deletion examples/manifests/consumer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ spec:
- "--store-path=/store"
#- "--transport-host=amqp://amq-router.$(AMQP_NAMESPACE).svc.cluster.local"
- "--transport-host=consumer-events-subscription-service.cloud-events.svc.cluster.local:9043"
- "--http-event-publishers=ptp-event-publisher-service.openshift-ptp.svc.cluster.local:9043"
- "--http-event-publishers=ptp-event-publisher-service-NODE_NAME.openshift-ptp.svc.cluster.local:9043"
- "--api-port=8089"
env:
- name: AMQP_NAMESPACE
Expand Down
4 changes: 4 additions & 0 deletions examples/manifests/namespace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,9 @@ kind: Namespace
metadata:
name: cloud-events
labels:
security.openshift.io/scc.podSecurityLabelSync: "false"
pod-security.kubernetes.io/audit: "privileged"
pod-security.kubernetes.io/enforce: "privileged"
pod-security.kubernetes.io/warn: "privileged"
name: cloud-events
openshift.io/cluster-monitoring: "true"
20 changes: 20 additions & 0 deletions examples/simplehttp/external-service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
apiVersion: "v1"
kind: "Service"
metadata:
name: "event-consumer-external"
spec:
type: ClusterIP
ports:
- port: 27017
targetPort: 27017
---
apiVersion: "v1"
kind: "Endpoints"
metadata:
name: "event-consumer-external"
subsets:
- addresses:
- ip: 10.22.18.119 #change this to your laptop ip address
ports:
- port: 27017
37 changes: 14 additions & 23 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,38 @@ module github.com/redhat-cne/cloud-event-proxy
go 1.19

require (
github.com/blang/semver v3.5.1+incompatible
github.com/cloudevents/sdk-go/v2 v2.13.0
github.com/fsnotify/fsnotify v1.6.0
github.com/golang/glog v1.0.0
github.com/google/uuid v1.3.0
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.22.1
github.com/onsi/gomega v1.23.0
github.com/prometheus/client_golang v1.14.0
github.com/redhat-cne/rest-api v0.1.1-0.20230405200502-edf9a81fade8
github.com/redhat-cne/sdk-go v0.1.1-0.20230405191318-0f443c518882
github.com/redhat-cne/rest-api v0.1.1-0.20230417132116-46717e183c05
github.com/redhat-cne/sdk-go v0.1.1-0.20230417130643-d8a16493413b
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.1
golang.org/x/net v0.7.0
k8s.io/api v0.24.2
k8s.io/api v0.26.0
k8s.io/apiextensions-apiserver v0.24.2
k8s.io/apimachinery v0.24.2
k8s.io/apimachinery v0.26.3
k8s.io/client-go v0.24.2
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
k8s.io/utils v0.0.0-20221107191617-1a15be271d1d
sigs.k8s.io/controller-runtime v0.12.3
)

require (
github.com/Azure/go-amqp v0.17.5 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cloudevents/sdk-go/protocol/amqp/v2 v2.13.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/go-logr/logr v1.2.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
Expand All @@ -53,7 +52,6 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo/v2 v2.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
Expand All @@ -74,16 +72,9 @@ require (
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.60.1 // indirect
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42 // indirect
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
k8s.io/klog/v2 v2.80.1 // indirect
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

// Manually pinned to kubernetes-1.24.0
replace (
k8s.io/api => k8s.io/api v0.24.0
k8s.io/apimachinery => k8s.io/apimachinery v0.24.0
k8s.io/client-go => k8s.io/client-go v0.24.0
)
Loading

0 comments on commit 23ad6e2

Please sign in to comment.