Skip to content

Commit

Permalink
Removed ContainerAwareCommand in favor of Command with DI
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanvierkant committed Feb 21, 2019
1 parent ecc7fe1 commit 15811ba
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 35 deletions.
16 changes: 5 additions & 11 deletions Command/CleanUpCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
use Doctrine\ORM\EntityManager;
use JMS\JobQueueBundle\Entity\Job;
use JMS\JobQueueBundle\Entity\Repository\JobManager;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class CleanUpCommand extends ContainerAwareCommand
class CleanUpCommand extends Command
{
protected static $defaultName = 'jms-job-queue:clean-up';

Expand Down Expand Up @@ -148,9 +148,7 @@ private function findExpiredJobs(EntityManager $em, InputInterface $input)
->setMaxResults(100)
->getResult();
};
foreach ($this->whileResults($succeededJobs) as $job) {
yield $job;
}
yield from $this->whileResults( $succeededJobs );

$finishedJobs = function(array $excludedIds) use ($em, $input) {
return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.closedAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)")
Expand All @@ -159,9 +157,7 @@ private function findExpiredJobs(EntityManager $em, InputInterface $input)
->setMaxResults(100)
->getResult();
};
foreach ($this->whileResults($finishedJobs) as $job) {
yield $job;
}
yield from $this->whileResults( $finishedJobs );

$canceledJobs = function(array $excludedIds) use ($em, $input) {
return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.state = :canceled AND j.createdAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)")
Expand All @@ -171,9 +167,7 @@ private function findExpiredJobs(EntityManager $em, InputInterface $input)
->setMaxResults(100)
->getResult();
};
foreach ($this->whileResults($canceledJobs) as $job) {
yield $job;
}
yield from $this->whileResults( $canceledJobs );
}

private function whileResults(callable $resultProducer)
Expand Down
4 changes: 2 additions & 2 deletions Command/MarkJobIncompleteCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
use Doctrine\Common\Persistence\ManagerRegistry;
use Doctrine\ORM\EntityManager;
use JMS\JobQueueBundle\Entity\Job;
use Symfony\Component\Console\Command\Command;
use JMS\JobQueueBundle\Entity\Repository\JobManager;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;

class MarkJobIncompleteCommand extends ContainerAwareCommand
class MarkJobIncompleteCommand extends Command
{
protected static $defaultName = 'jms-job-queue:mark-incomplete';

Expand Down
53 changes: 32 additions & 21 deletions Command/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
use JMS\JobQueueBundle\Event\StateChangeEvent;
use JMS\JobQueueBundle\Exception\InvalidArgumentException;
use Symfony\Bridge\Doctrine\ManagerRegistry;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Process\Exception\ProcessFailedException;
use Symfony\Component\Process\Process;

class RunCommand extends \Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand
class RunCommand extends Command
{
protected static $defaultName = 'jms-job-queue:run';

Expand All @@ -48,7 +49,10 @@ class RunCommand extends \Symfony\Bundle\FrameworkBundle\Command\ContainerAwareC
/** @var ManagerRegistry */
private $registry;

/** @var EventDispatcher */
/** @var JobManager */
private $jobManager;

/** @var EventDispatcherInterface */
private $dispatcher;

/** @var array */
Expand All @@ -57,6 +61,23 @@ class RunCommand extends \Symfony\Bundle\FrameworkBundle\Command\ContainerAwareC
/** @var bool */
private $shouldShutdown = false;

/** @var array */
private $queueOptionsDefault;

/** @var array */
private $queueOptions;

public function __construct(ManagerRegistry $managerRegistry, JobManager $jobManager, EventDispatcherInterface $dispatcher, array $queueOptionsDefault, array $queueOptions)
{
parent::__construct();

$this->registry = $managerRegistry;
$this->jobManager = $jobManager;
$this->dispatcher = $dispatcher;
$this->queueOptionsDefault = $queueOptionsDefault;
$this->queueOptions = $queueOptions;
}

protected function configure()
{
$this
Expand All @@ -79,7 +100,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
}

if ($maxRuntime > 600) {
$maxRuntime += mt_rand(-120, 120);
$maxRuntime += random_int(-120, 120);
}

$maxJobs = (integer) $input->getOption('max-concurrent-jobs');
Expand Down Expand Up @@ -110,8 +131,6 @@ protected function execute(InputInterface $input, OutputInterface $output)
$this->env = $input->getOption('env');
$this->verbose = $input->getOption('verbose');
$this->output = $output;
$this->registry = $this->getContainer()->get('doctrine');
$this->dispatcher = $this->getContainer()->get('event_dispatcher');
$this->getEntityManager()->getConnection()->getConfiguration()->setSQLLogger(null);

if ($this->verbose) {
Expand All @@ -127,8 +146,8 @@ protected function execute(InputInterface $input, OutputInterface $output)
$idleTime,
$maxJobs,
$restrictedQueues,
$this->getContainer()->getParameter('jms_job_queue.queue_options_defaults'),
$this->getContainer()->getParameter('jms_job_queue.queue_options')
$this->queueOptionsDefault,
$this->queueOptions
);
}

Expand Down Expand Up @@ -161,7 +180,7 @@ private function runJobs($workerName, $startTime, $maxRuntime, $idleTime, $maxJo
$this->checkRunningJobs();
$this->startJobs($workerName, $idleTime, $maxJobs, $restrictedQueues, $queueOptionsDefaults, $queueOptions);

$waitTimeInMs = mt_rand(500, 1000);
$waitTimeInMs = random_int(500, 1000);
usleep($waitTimeInMs * 1E3);
}

Expand Down Expand Up @@ -194,7 +213,7 @@ private function startJobs($workerName, $idleTime, $maxJobs, array $restrictedQu
{
$excludedIds = array();
while (count($this->runningJobs) < $maxJobs) {
$pendingJob = $this->getJobManager()->findStartableJob(
$pendingJob = $this->jobManager->findStartableJob(
$workerName,
$excludedIds,
$this->getExcludedQueues($queueOptionsDefaults, $queueOptions, $maxJobs),
Expand Down Expand Up @@ -291,7 +310,7 @@ private function checkRunningJobs()
$data['process']->stop(5);

$this->output->writeln($data['job'].' terminated; maximum runtime exceeded.');
$this->getJobManager()->closeJob($data['job'], Job::STATE_TERMINATED);
$this->jobManager->closeJob($data['job'], Job::STATE_TERMINATED);
unset($this->runningJobs[$i]);

continue;
Expand Down Expand Up @@ -321,7 +340,7 @@ private function checkRunningJobs()
$data['job']->setRuntime(time() - $data['start_time']);

$newState = 0 === $data['process']->getExitCode() ? Job::STATE_FINISHED : Job::STATE_FAILED;
$this->getJobManager()->closeJob($data['job'], $newState);
$this->jobManager->closeJob($data['job'], $newState);
unset($this->runningJobs[$i]);
}

Expand All @@ -335,7 +354,7 @@ private function startJob(Job $job)
$newState = $event->getNewState();

if (Job::STATE_CANCELED === $newState) {
$this->getJobManager()->closeJob($job, Job::STATE_CANCELED);
$this->jobManager->closeJob($job, Job::STATE_CANCELED);

return;
}
Expand Down Expand Up @@ -428,12 +447,4 @@ private function getEntityManager(): EntityManager
{
return /** @var EntityManager */ $this->registry->getManagerForClass('JMSJobQueueBundle:Job');
}

/**
* @return JobManager
*/
private function getJobManager()
{
return $this->getContainer()->get('jms_job_queue.job_manager');
}
}
2 changes: 1 addition & 1 deletion Command/ScheduleCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
{
$maxRuntime = $input->getOption('max-runtime');
if ($maxRuntime > 300) {
$maxRuntime += mt_rand(0, (integer)($input->getOption('max-runtime') * 0.05));
$maxRuntime += random_int(0, (integer)($input->getOption('max-runtime') * 0.05));
}
if ($maxRuntime <= 0) {
throw new \RuntimeException('Max. runtime must be greater than zero.');
Expand Down
5 changes: 5 additions & 0 deletions Resources/config/console.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@

<service id="jms_job_queue.command.run" class="JMS\JobQueueBundle\Command\RunCommand">
<tag name="console.command" />
<argument type="service" id="doctrine" />
<argument type="service" id="jms_job_queue.job_manager" />
<argument type="service" id="event_dispatcher" />
<argument key="$queueOptionsDefault">%jms_job_queue.queue_options_defaults%</argument>
<argument key="$queueOptions">%jms_job_queue.queue_options%</argument>
</service>

<service id="jms_job_queue.command.schedule" class="JMS\JobQueueBundle\Command\ScheduleCommand">
Expand Down

0 comments on commit 15811ba

Please sign in to comment.