Skip to content

Commit

Permalink
APIGOV-28913 - Fix for event catchup with harvester in poll mode (#840)
Browse files Browse the repository at this point in the history
* APIGOV-28913 - Fix for event catchup with harvester in poll mode

* APIGOV-28913 - update healthcheck status for poll client

* APIGOV-28913 - fix race condition

* APIGOV-28913 - fix to add crd and ard with create/update resource
  • Loading branch information
vivekschauhan authored Oct 2, 2024
1 parent 5f992f8 commit 09c97ff
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 31 deletions.
21 changes: 10 additions & 11 deletions pkg/agent/poller/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package poller

import (
"fmt"
"sync"
"time"

Expand All @@ -27,6 +26,7 @@ type PollClient struct {
newPollManager newPollExecutorFunc
harvesterConfig harvesterConfig
mutex sync.RWMutex
initialized bool
}

type harvesterConfig struct {
Expand Down Expand Up @@ -74,23 +74,23 @@ func (p *PollClient) Start() error {
p.handlers...,
)

poller := p.newPollManager(
p.poller = p.newPollManager(
p.interval,
withOnStop(p.onClientStop),
withHarvester(p.harvesterConfig),
)
p.poller = poller

p.mutex.Unlock()

listenCh := p.listener.Listen()

p.poller.RegisterWatch(eventCh, eventErrorCh)

if p.onStreamConnection != nil {
p.onStreamConnection()
}

p.mutex.Lock()
p.initialized = true
p.mutex.Unlock()

select {
case err := <-listenCh:
return err
Expand All @@ -103,11 +103,10 @@ func (p *PollClient) Start() error {
func (p *PollClient) Status() error {
p.mutex.RLock()
defer p.mutex.RUnlock()
if p.poller == nil || p.listener == nil {
return fmt.Errorf("harvester polling client is not ready")
}
if ok := p.poller.Status(); !ok {
return errors.ErrHarvesterConnection
if p.initialized {
if ok := p.poller.Status(); !ok {
return errors.ErrHarvesterConnection
}
}

return nil
Expand Down
28 changes: 19 additions & 9 deletions pkg/agent/poller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ func (m *pollExecutor) RegisterWatch(eventChan chan *proto.Event, errChan chan e
return
}

if err := m.harvester.EventCatchUp(m.topicSelfLink, eventChan); err != nil {
m.logger.WithError(err).Error("harvester returned an error when syncing events")
m.onHarvesterErr()
go func() {
m.Stop()
errChan <- err
}()
return
}

m.lock.Lock()
m.isReady = true
m.lock.Unlock()

go func() {
err := m.sync(m.topicSelfLink, eventChan)
m.Stop()
Expand All @@ -79,15 +93,7 @@ func (m *pollExecutor) RegisterWatch(eventChan chan *proto.Event, errChan chan e

func (m *pollExecutor) sync(topicSelfLink string, eventChan chan *proto.Event) error {
m.logger.Trace("sync events")
if err := m.harvester.EventCatchUp(topicSelfLink, eventChan); err != nil {
m.logger.WithError(err).Error("harvester returned an error when syncing events")
m.onHarvesterErr()
return err
}

m.lock.Lock()
m.isReady = true
m.lock.Unlock()
for {
select {
case <-m.ctx.Done():
Expand Down Expand Up @@ -150,5 +156,9 @@ func (m *pollExecutor) Stop() {
func (m *pollExecutor) Status() bool {
m.lock.RLock()
defer m.lock.RUnlock()
return m.ctx.Err() == nil && m.isReady
if m.ctx.Err() != nil {
return false
}

return m.isReady
}
39 changes: 28 additions & 11 deletions pkg/apic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,24 +754,37 @@ func (c *ServiceClient) CreateResource(url string, bts []byte) (*apiv1.ResourceI
return ri, err
}

// updateORCreateResourceInstance
func (c *ServiceClient) updateSpecORCreateResourceInstance(data *apiv1.ResourceInstance) (*apiv1.ResourceInstance, error) {
// default to post
url := c.createAPIServerURL(data.GetKindLink())
method := coreapi.POST
func (c *ServiceClient) getCachedResource(data *apiv1.ResourceInstance) (*apiv1.ResourceInstance, error) {
switch data.Kind {
case management.AccessRequestDefinitionGVK().Kind:
return c.caches.GetAccessRequestDefinitionByName(data.Name)
case management.CredentialRequestDefinitionGVK().Kind:
return c.caches.GetCredentialRequestDefinitionByName(data.Name)
case management.APIServiceInstanceGVK().Kind:
return c.caches.GetAPIServiceInstanceByName(data.Name)
}
return nil, nil
}

// check if the KIND and ID combo have an item in the cache
var existingRI *apiv1.ResourceInstance
var err error
func (c *ServiceClient) addResourceToCache(data *apiv1.ResourceInstance) {
switch data.Kind {
case management.AccessRequestDefinitionGVK().Kind:
existingRI, err = c.caches.GetAccessRequestDefinitionByName(data.Name)
c.caches.AddAccessRequestDefinition(data)
case management.CredentialRequestDefinitionGVK().Kind:
existingRI, err = c.caches.GetCredentialRequestDefinitionByName(data.Name)
c.caches.AddCredentialRequestDefinition(data)
case management.APIServiceInstanceGVK().Kind:
existingRI, err = c.caches.GetAPIServiceInstanceByName(data.Name)
c.caches.AddAPIServiceInstance(data)
}
}

// updateORCreateResourceInstance
func (c *ServiceClient) updateSpecORCreateResourceInstance(data *apiv1.ResourceInstance) (*apiv1.ResourceInstance, error) {
// default to post
url := c.createAPIServerURL(data.GetKindLink())
method := coreapi.POST

// check if the KIND and ID combo have an item in the cache
existingRI, err := c.getCachedResource(data)
updateRI := true
updateAgentDetails := true

Expand Down Expand Up @@ -839,6 +852,10 @@ func (c *ServiceClient) updateSpecORCreateResourceInstance(data *apiv1.ResourceI
}

}

if existingRI == nil {
c.addResourceToCache(newRI)
}
return newRI, err
}

Expand Down

0 comments on commit 09c97ff

Please sign in to comment.