Skip to content

Commit

Permalink
Move cleanup code to remove a matcher into a function removeMatcher()
Browse files Browse the repository at this point in the history
Change getState() to use $eventProcessors array instead of those in $waitingForNextEvent
  • Loading branch information
lucasnetau committed Jun 18, 2021
1 parent d5fda7b commit 1174db6
Showing 1 changed file with 45 additions and 26 deletions.
71 changes: 45 additions & 26 deletions src/CorrelationEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public function handle(IEvent $event)
{
/** Record stat of matcher completing */
$this->incrStat('completed_matcher', get_class($matcher));
unset($this->eventProcessors[spl_object_hash($matcher)]);
$this->removeMatcher($matcher);
unset($matcher);
}
}
Expand All @@ -253,7 +253,7 @@ public function handle(IEvent $event)
$matcher->fire();
/** Record stat of matcher timeout */
$this->incrStat('completed_matcher_timeout', get_class($matcher));
unset($this->eventProcessors[spl_object_hash($matcher)]);
$this->removeMatcher($matcher);
unset($matcher);
}

Expand All @@ -270,7 +270,7 @@ public function handle(IEvent $event)
* @return IEventMatcher
* @throws RuntimeException;
*/
public function constructMatcher(string $className)
public function constructMatcher(string $className): IEventMatcher
{
if (is_a($className, IEventMatcher::class, true))
{
Expand Down Expand Up @@ -316,6 +316,15 @@ public function handleEmit($data)
}
}

/**
* Remove matcher from our state tables
* @param IEventMatcher $matcher
*/
protected function removeMatcher(IEventMatcher $matcher) {
$this->clearWatchForEvents($matcher);
unset($this->eventProcessors[spl_object_hash($matcher)]);
}

/**
* Keep note that the state machine $matcher is waiting for events $events
* @param IEventMatcher $matcher
Expand All @@ -336,10 +345,11 @@ public function addWatchForEvents(IEventMatcher $matcher, array $events)
*/
public function removeWatchForEvents(IEventMatcher $matcher, array $events)
{
$matcherHash = spl_object_hash($matcher);
foreach($events as $eventName)
{
unset($this->waitingForNextEvent[$eventName][spl_object_hash($matcher)]);
if (0 == count($this->waitingForNextEvent[$eventName]))
unset($this->waitingForNextEvent[$eventName][$matcherHash]);
if (0 === count($this->waitingForNextEvent[$eventName]))
{
unset($this->waitingForNextEvent[$eventName]);
}
Expand All @@ -351,6 +361,21 @@ public function removeWatchForEvents(IEventMatcher $matcher, array $events)
}
}

/**
* Remove record that $matcher is waiting for any events
* @param IEventMatcher $matcher
*/
public function clearWatchForEvents(IEventMatcher $matcher) {
$matcherHash = spl_object_hash($matcher);
$events = [];
foreach(array_keys($this->waitingForNextEvent) as $eventName) {
if (array_key_exists($matcherHash, $this->waitingForNextEvent[$eventName])) {
$events[] = $eventName;
}
}
$this->removeWatchForEvents($matcher, $events);
}

/**
* Add timeout will add or remove a timeout for the matcher passed in.
* @param IEventMatcher $matcher
Expand Down Expand Up @@ -446,12 +471,11 @@ public function checkTimeouts(DateTimeInterface $time): int
$triggered++;
if ($matcher->isTimedOut())
{
/** Remove all references if the matcher is complete */
$this->removeWatchForEvents($matcher, $matcher->nextAcceptedEvents());
/** Remove all references if the matcher is timed out */
$this->removeTimeout($matcher);
/** Record stat of matcher timeout */
$this->incrStat('matcher_timeout', get_class($matcher));
unset($this->eventProcessors[spl_object_hash($matcher)]);
$this->removeMatcher($matcher);
unset($matcher);
}
else
Expand Down Expand Up @@ -484,29 +508,24 @@ public function getState() : array
$state['events'] = [];
$state['statistics'] = $this->statistics;
$state['load'] = $this->calcLoad();
foreach($this->waitingForNextEvent as $matchers)
/** @var IEventMatcher $matcher */
foreach($this->eventProcessors as $matcher)
{
foreach($matchers as $matcher)
if ($matcher->complete()) {
continue; //Don't save the matcher if it is complete
}

foreach($matcher->getEventChain() as $event)
{
/** @var IEventMatcher $matcher */
$matcher_hash = spl_object_hash($matcher);
if (isset($state['matchers'][$matcher_hash]))
$event_hash = spl_object_hash($event);
if (isset($state['events'][$event_hash]))
{
continue;
}

foreach($matcher->getEventChain() as $event)
{
$event_hash = spl_object_hash($event);
if (isset($state['events'][$event_hash]))
{
continue;
}
$state['events'][$event_hash] = serialize($event);
}

$state['matchers'][] = serialize($matcher);
$state['events'][$event_hash] = serialize($event);
}

$state['matchers'][] = serialize($matcher);
}
return $state;
}
Expand Down Expand Up @@ -662,7 +681,7 @@ public function calcLoad() : array

$this->flushOldEps();

/** @var array $shiftedArray Shift the counter so that the current time modulus is the last item in the array */
/** Shift the counter so that the current time modulus is the last item in the array */
$shiftedArray = array_merge(array_slice($this->epsCounter, $index+1), array_slice($this->epsCounter, 0, $index+1));

return [
Expand Down

0 comments on commit 1174db6

Please sign in to comment.