custom/plugins/CrswCleverReachOfficial/src/Core/BusinessLogic/Scheduler/ScheduleCheckTask.php line 27

Open in your IDE?
  1. <?php
  2. namespace Crsw\CleverReachOfficial\Core\BusinessLogic\Scheduler;
  3. use Crsw\CleverReachOfficial\Core\BusinessLogic\Scheduler\Exceptions\ScheduleSaveException;
  4. use Crsw\CleverReachOfficial\Core\BusinessLogic\Scheduler\Interfaces\Schedulable;
  5. use Crsw\CleverReachOfficial\Core\BusinessLogic\Scheduler\Interfaces\ScheduleRepositoryInterface;
  6. use Crsw\CleverReachOfficial\Core\BusinessLogic\Scheduler\Models\Schedule;
  7. use Crsw\CleverReachOfficial\Core\Infrastructure\Logger\Logger;
  8. use Crsw\CleverReachOfficial\Core\Infrastructure\ORM\Exceptions\QueryFilterInvalidParamException;
  9. use Crsw\CleverReachOfficial\Core\Infrastructure\ORM\Exceptions\RepositoryNotRegisteredException;
  10. use Crsw\CleverReachOfficial\Core\Infrastructure\ORM\QueryFilter\Operators;
  11. use Crsw\CleverReachOfficial\Core\Infrastructure\ORM\QueryFilter\QueryFilter;
  12. use Crsw\CleverReachOfficial\Core\Infrastructure\ORM\RepositoryRegistry;
  13. use Crsw\CleverReachOfficial\Core\Infrastructure\ServiceRegister;
  14. use Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\QueueStorageUnavailableException;
  15. use Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\QueueItem;
  16. use Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\QueueService;
  17. use Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Task;
  18. use Crsw\CleverReachOfficial\Core\Infrastructure\Utility\TimeProvider;
  19. /**
  20.  * Class ScheduleCheckTask
  21.  *
  22.  * @package Crsw\CleverReachOfficial\Core\BusinessLogic\Scheduler
  23.  */
  24. class ScheduleCheckTask extends Task
  25. {
  26.     /**
  27.      * @var ScheduleRepositoryInterface
  28.      */
  29.     private $repository;
  30.     /**
  31.      * @inheritdoc
  32.      */
  33.     public function isArchivable()
  34.     {
  35.         return false;
  36.     }
  37.     /**
  38.      * Runs task logic.
  39.      *
  40.      * @throws RepositoryNotRegisteredException
  41.      * @throws QueryFilterInvalidParamException
  42.      * @throws ScheduleSaveException
  43.      */
  44.     public function execute()
  45.     {
  46.         /** @var QueueService $queueService */
  47.         $queueService ServiceRegister::getService(QueueService::CLASS_NAME);
  48.         foreach ($this->getSchedules() as $schedule) {
  49.             try {
  50.                 if ($schedule->isRecurring()) {
  51.                     $lastUpdateTimestamp $schedule->getLastUpdateTimestamp();
  52.                     $schedule->setNextSchedule();
  53.                     $schedule->setLastUpdateTimestamp($this->now()->getTimestamp());
  54.                     $this->getScheduleRepository()->saveWithCondition(
  55.                         $schedule,
  56.                         array('lastUpdateTimestamp' => $lastUpdateTimestamp)
  57.                     );
  58.                 } else {
  59.                     $this->getScheduleRepository()->delete($schedule);
  60.                 }
  61.                 $task $schedule->getTask();
  62.                 if (!($task instanceof Schedulable)) {
  63.                     Logger::logError("Cannot schedule task that is not schedulable: [{$task->getClassName()}].");
  64.                     continue;
  65.                 }
  66.                 if (
  67.                     !$task->canHaveMultipleQueuedInstances() &&
  68.                     $this->isAlreadyEnqueued($schedule->getTaskType(), $schedule->getContext())
  69.                 ) {
  70.                     Logger::logInfo("Scheduled task [{$task->getClassName()}] already enqueued.");
  71.                     continue;
  72.                 }
  73.                 $queueService->enqueue($schedule->getQueueName(), $task$schedule->getContext());
  74.             } catch (QueueStorageUnavailableException $ex) {
  75.                 Logger::logError(
  76.                     'Failed to enqueue task for schedule:' $schedule->getId(),
  77.                     'Core',
  78.                     array('trace' => $ex->getTraceAsString())
  79.                 );
  80.             }
  81.         }
  82.         $this->reportProgress(100);
  83.     }
  84.     /**
  85.      * Returns an array of Schedules that are due for execution
  86.      *
  87.      * @return Schedule[]
  88.      *
  89.      * @throws QueryFilterInvalidParamException
  90.      * @throws RepositoryNotRegisteredException
  91.      */
  92.     private function getSchedules()
  93.     {
  94.         $queryFilter = new QueryFilter();
  95.         $queryFilter->where('nextSchedule'Operators::LESS_OR_EQUAL_THAN$this->now());
  96.         $queryFilter->where('isEnabled'Operators::EQUALStrue);
  97.         $queryFilter->orderBy('nextSchedule'QueryFilter::ORDER_ASC);
  98.         $queryFilter->setLimit(1000);
  99.         return $this->getScheduleRepository()->select($queryFilter);
  100.     }
  101.     /**
  102.      * Returns current date and time
  103.      *
  104.      * @return \DateTime
  105.      */
  106.     protected function now()
  107.     {
  108.         /** @var TimeProvider $timeProvider */
  109.         $timeProvider ServiceRegister::getService(TimeProvider::CLASS_NAME);
  110.         return $timeProvider->getCurrentLocalTime();
  111.     }
  112.     /**
  113.      * Returns repository instance
  114.      *
  115.      * @return ScheduleRepositoryInterface
  116.      * @throws RepositoryNotRegisteredException
  117.      */
  118.     private function getScheduleRepository()
  119.     {
  120.         if ($this->repository === null) {
  121.             /** @var ScheduleRepositoryInterface $repository */
  122.             $this->repository RepositoryRegistry::getRepository(Schedule::getClassName());
  123.         }
  124.         return $this->repository;
  125.     }
  126.     private function isAlreadyEnqueued($taskType$context)
  127.     {
  128.         $result false;
  129.         $lastTask $this->getQueueService()->findLatestByType($taskType$context);
  130.         if ($lastTask && in_array($lastTask->getStatus(), array(QueueItem::QUEUEDQueueItem::IN_PROGRESS), true)) {
  131.             $result true;
  132.         }
  133.         return $result;
  134.     }
  135.     /**
  136.      * Retrieves queue service instance.
  137.      *
  138.      * @return QueueService | object Queue Service instance.
  139.      */
  140.     private function getQueueService()
  141.     {
  142.         return ServiceRegister::getService(QueueService::CLASS_NAME);
  143.     }
  144. }