custom/plugins/CrswCleverReachOfficial/src/Core/Infrastructure/TaskExecution/TaskRunner.php line 451

Open in your IDE?
  1. <?php
  2. namespace Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution;
  3. use Exception;
  4. use Crsw\CleverReachOfficial\Core\Infrastructure\Configuration\Configuration;
  5. use Crsw\CleverReachOfficial\Core\Infrastructure\Configuration\ConfigurationManager;
  6. use Crsw\CleverReachOfficial\Core\Infrastructure\Logger\Logger;
  7. use Crsw\CleverReachOfficial\Core\Infrastructure\ServiceRegister;
  8. use Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Interfaces\TaskRunnerStatusStorage;
  9. use Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Interfaces\TaskRunnerWakeup;
  10. use Crsw\CleverReachOfficial\Core\Infrastructure\Utility\TimeProvider;
  11. /**
  12.  * Class TaskRunner.
  13.  *
  14.  * @package Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution
  15.  */
  16. class TaskRunner
  17. {
  18.     /**
  19.      * Fully qualified name of this class.
  20.      */
  21.     const CLASS_NAME __CLASS__;
  22.     /**
  23.      * Automatic task runner wakeup delay in seconds
  24.      */
  25.     const WAKEUP_DELAY 5;
  26.     /**
  27.      * Defines minimal time in seconds between two consecutive alive since updates.
  28.      */
  29.     const TASK_RUNNER_KEEP_ALIVE_PERIOD 2;
  30.     /**
  31.      * Runner guid.
  32.      *
  33.      * @var string
  34.      */
  35.     protected $guid;
  36.     /**
  37.      * Service.
  38.      *
  39.      * @var QueueService
  40.      */
  41.     private $queueService;
  42.     /**
  43.      * Service.
  44.      *
  45.      * @var TaskRunnerStatusStorage
  46.      */
  47.     private $runnerStorage;
  48.     /**
  49.      * Service.
  50.      *
  51.      * @var Configuration
  52.      */
  53.     private $configurationService;
  54.     /**
  55.      * Service.
  56.      *
  57.      * @var TimeProvider
  58.      */
  59.     private $timeProvider;
  60.     /**
  61.      * Service.
  62.      *
  63.      * @var TaskRunnerWakeup
  64.      */
  65.     private $taskWakeup;
  66.     /**
  67.      * Configuration manager.
  68.      *
  69.      * @var ConfigurationManager Configuration manager instance.
  70.      */
  71.     private $configurationManager;
  72.     /**
  73.      * Defines when was the task runner alive since time step last updated at.
  74.      *
  75.      * @var int
  76.      */
  77.     private $aliveSinceUpdatedAt 0;
  78.     /**
  79.      * Sleep time in seconds with microsecond precision.
  80.      *
  81.      * @var float
  82.      */
  83.     private $batchSleepTime 0.0;
  84.     /**
  85.      * Sets task runner guid.
  86.      *
  87.      * @param string $guid Runner guid to set.
  88.      */
  89.     public function setGuid($guid)
  90.     {
  91.         $this->guid $guid;
  92.     }
  93.     /**
  94.      * Starts task runner lifecycle.
  95.      */
  96.     public function run()
  97.     {
  98.         try {
  99.             $this->keepAlive();
  100.             if ($this->getConfigurationService()->isTaskRunnerHalted()) {
  101.                 $this->logDebug(array('Message' => 'Task runner is currently halted.'));
  102.                 return;
  103.             }
  104.             $this->logDebug(array('Message' => 'Task runner: lifecycle started.'));
  105.             if ($this->isCurrentRunnerAlive()) {
  106.                 $this->failOrRequeueExpiredTasks();
  107.                 $this->startOldestQueuedItems();
  108.             }
  109.             $this->keepAlive();
  110.             $this->wakeup();
  111.             $this->logDebug(array('Message' => 'Task runner: lifecycle ended.'));
  112.         } catch (Exception $ex) {
  113.             $this->logDebug(
  114.                 array(
  115.                     'Message' => 'Fail to run task runner. Unexpected error occurred.',
  116.                     'ExceptionMessage' => $ex->getMessage(),
  117.                     'ExceptionTrace' => $ex->getTraceAsString(),
  118.                 )
  119.             );
  120.         }
  121.     }
  122.     /**
  123.      * Fails or re-queues expired tasks.
  124.      *
  125.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\QueueItemDeserializationException
  126.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\QueueStorageUnavailableException
  127.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\TaskRunnerStatusStorageUnavailableException
  128.      */
  129.     private function failOrRequeueExpiredTasks()
  130.     {
  131.         $this->logDebug(array('Message' => 'Task runner: expired tasks cleanup started.'));
  132.         $runningItems $this->getQueue()->findExpiredRunningItems();
  133.         if (!$this->isCurrentRunnerAlive()) {
  134.             return;
  135.         }
  136.         $this->keepAlive();
  137.         foreach ($runningItems as $runningItem) {
  138.             if ($this->isItemExpired($runningItem) && $this->isCurrentRunnerAlive()) {
  139.                 $this->logMessageFor($runningItem'Task runner: Expired task detected.');
  140.                 $this->getConfigurationManager()->setContext($runningItem->getContext());
  141.                 if ($runningItem->getProgressBasePoints() > $runningItem->getLastExecutionProgressBasePoints()) {
  142.                     $this->logMessageFor($runningItem'Task runner: Task requeue for execution continuation.');
  143.                     $this->getQueue()->requeue($runningItem);
  144.                 } else {
  145.                     $runningItem->reconfigureTask();
  146.                     $this->getQueue()->fail(
  147.                         $runningItem,
  148.                         sprintf(
  149.                             'Task %s failed due to extended inactivity period.',
  150.                             $this->getItemDescription($runningItem)
  151.                         )
  152.                     );
  153.                 }
  154.             }
  155.             $this->keepAlive();
  156.         }
  157.     }
  158.     /**
  159.      * Starts oldest queue item from all queues respecting following list of criteria:
  160.      *      - Queue must be without already running queue items
  161.      *      - For one queue only one (oldest queued) item should be started
  162.      *      - Number of running tasks must NOT be greater than maximal allowed by integration configuration.
  163.      *
  164.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\ProcessStarterSaveException
  165.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\QueueItemDeserializationException
  166.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\TaskRunnerStatusStorageUnavailableException
  167.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\ORM\Exceptions\QueryFilterInvalidParamException
  168.      */
  169.     private function startOldestQueuedItems()
  170.     {
  171.         $this->keepAlive();
  172.         $this->logDebug(array('Message' => 'Task runner: available task detection started.'));
  173.         // Calculate how many queue items can be started
  174.         $maxRunningTasks $this->getConfigurationService()->getMaxStartedTasksLimit();
  175.         $alreadyRunningItemsCount $this->getQueue()->countItems(QueueItem::IN_PROGRESS);
  176.         $numberOfAvailableSlots $maxRunningTasks $alreadyRunningItemsCount;
  177.         if ($numberOfAvailableSlots <= 0) {
  178.             $this->logDebug(array('Message' => 'Task runner: max number of active tasks reached.'));
  179.             return;
  180.         }
  181.         $this->keepAlive();
  182.         $items $this->getQueue()->findOldestQueuedItems($numberOfAvailableSlots);
  183.         $this->keepAlive();
  184.         if (!$this->isCurrentRunnerAlive()) {
  185.             return;
  186.         }
  187.         $asyncStarterBatchSize $this->getConfigurationService()->getAsyncStarterBatchSize();
  188.         $batchStarter = new AsyncBatchStarter($asyncStarterBatchSize);
  189.         foreach ($items as $item) {
  190.             $this->logMessageFor($item'Task runner: Adding task to a batch starter for async execution.');
  191.             $batchStarter->addRunner(new QueueItemStarter($item->getId()));
  192.         }
  193.         $this->keepAlive();
  194.         if (!$this->isCurrentRunnerAlive()) {
  195.             return;
  196.         }
  197.         $this->logDebug(array('Message' => 'Task runner: Starting batch starter execution.'));
  198.         $startTime $this->getTimeProvider()->getMicroTimestamp();
  199.         $batchStarter->run();
  200.         $endTime $this->getTimeProvider()->getMicroTimestamp();
  201.         $this->keepAlive();
  202.         $averageRequestTime = ($endTime $startTime) / $asyncStarterBatchSize;
  203.         $this->batchSleepTime $batchStarter->getWaitTime($averageRequestTime);
  204.         
  205.         $this->logDebug(
  206.             array(
  207.                 'Message' => 'Task runner: Batch starter execution finished.',
  208.                 'ExecutionTime' => ($endTime $startTime) . 's',
  209.                 'AverageRequestTime' => $averageRequestTime 's',
  210.                 'StartedItems' => count($items),
  211.             )
  212.         );
  213.     }
  214.     /**
  215.      * Executes wakeup on runner.
  216.      *
  217.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\TaskRunnerStatusChangeException
  218.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\TaskRunnerStatusStorageUnavailableException
  219.      */
  220.     private function wakeup()
  221.     {
  222.         $this->logDebug(array('Message' => 'Task runner: starting self deactivation.'));
  223.         for ($i 0$i $this->getWakeupDelay(); $i++) {
  224.             $this->getTimeProvider()->sleep(1);
  225.             $this->keepAlive();
  226.         }
  227.         $this->getRunnerStorage()->setStatus(TaskRunnerStatus::createNullStatus());
  228.         $this->logDebug(array('Message' => 'Task runner: sending task runner wakeup signal.'));
  229.         $this->getTaskWakeup()->wakeup();
  230.     }
  231.     /**
  232.      * Updates alive since time stamp of the task runner.
  233.      *
  234.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\TaskRunnerStatusStorageUnavailableException
  235.      */
  236.     private function keepAlive()
  237.     {
  238.         $currentTime $this->getTimeProvider()->getCurrentLocalTime()->getTimestamp();
  239.         if (($currentTime $this->aliveSinceUpdatedAt) < self::TASK_RUNNER_KEEP_ALIVE_PERIOD) {
  240.             return;
  241.         }
  242.         $this->getConfigurationService()->setTaskRunnerStatus($this->guid$currentTime);
  243.         $this->aliveSinceUpdatedAt $currentTime;
  244.     }
  245.     /**
  246.      * Checks whether current runner is alive.
  247.      *
  248.      * @return bool TRUE if runner is alive; otherwise, FALSE.
  249.      *
  250.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\TaskRunnerStatusStorageUnavailableException
  251.      */
  252.     private function isCurrentRunnerAlive()
  253.     {
  254.         $runnerStatus $this->getRunnerStorage()->getStatus();
  255.         $runnerExpired $runnerStatus->isExpired();
  256.         $runnerGuidIsCorrect $this->guid === $runnerStatus->getGuid();
  257.         if ($runnerExpired) {
  258.             $this->logWarning(array('Message' => 'Task runner: Task runner started but it is expired.'));
  259.         }
  260.         if (!$runnerGuidIsCorrect) {
  261.             $this->logWarning(array('Message' => 'Task runner: Task runner started but it is not active anymore.'));
  262.         }
  263.         return !$runnerExpired && $runnerGuidIsCorrect;
  264.     }
  265.     /**
  266.      * Checks whether queue item is expired.
  267.      *
  268.      * @param QueueItem $item Queue item for checking.
  269.      *
  270.      * @return bool TRUE if queue item expired; otherwise, FALSE.
  271.      *
  272.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\QueueItemDeserializationException
  273.      */
  274.     private function isItemExpired(QueueItem $item)
  275.     {
  276.         $currentTimestamp $this->getTimeProvider()->getCurrentLocalTime()->getTimestamp();
  277.         $maxTaskInactivityPeriod $item->getTask()->getMaxInactivityPeriod();
  278.         return ($item->getLastUpdateTimestamp() + $maxTaskInactivityPeriod) < $currentTimestamp;
  279.     }
  280.     /**
  281.      * Gets queue item description.
  282.      *
  283.      * @param QueueItem $item Queue item.
  284.      *
  285.      * @return string Description of queue item.
  286.      *
  287.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\QueueItemDeserializationException
  288.      */
  289.     private function getItemDescription(QueueItem $item)
  290.     {
  291.         return "{$item->getId()}({$item->getTaskType()})";
  292.     }
  293.     /**
  294.      * Gets @return QueueService Queue service instance.
  295.      * @see QueueService service instance.
  296.      *
  297.      */
  298.     private function getQueue()
  299.     {
  300.         if ($this->queueService === null) {
  301.             $this->queueService ServiceRegister::getService(QueueService::CLASS_NAME);
  302.         }
  303.         return $this->queueService;
  304.     }
  305.     /**
  306.      * Gets @return TaskRunnerStatusStorage Service instance.
  307.      * @see TaskRunnerStatusStorageInterface service instance.
  308.      *
  309.      */
  310.     private function getRunnerStorage()
  311.     {
  312.         if ($this->runnerStorage === null) {
  313.             $this->runnerStorage ServiceRegister::getService(TaskRunnerStatusStorage::CLASS_NAME);
  314.         }
  315.         return $this->runnerStorage;
  316.     }
  317.     /**
  318.      * Retrieves configuration manager.
  319.      *
  320.      * @return \Crsw\CleverReachOfficial\Core\Infrastructure\Configuration\ConfigurationManager Configuration manager instance.
  321.      */
  322.     public function getConfigurationManager()
  323.     {
  324.         if ($this->configurationManager === null) {
  325.             $this->configurationManager ServiceRegister::getService(ConfigurationManager::CLASS_NAME);
  326.         }
  327.         return $this->configurationManager;
  328.     }
  329.     /**
  330.      * Gets @return Configuration Service instance.
  331.      * @see Configuration service instance.
  332.      *
  333.      */
  334.     private function getConfigurationService()
  335.     {
  336.         if ($this->configurationService === null) {
  337.             $this->configurationService ServiceRegister::getService(Configuration::CLASS_NAME);
  338.         }
  339.         return $this->configurationService;
  340.     }
  341.     /**
  342.      * Gets @return TimeProvider Service instance.
  343.      * @see TimeProvider instance.
  344.      *
  345.      */
  346.     private function getTimeProvider()
  347.     {
  348.         if ($this->timeProvider === null) {
  349.             $this->timeProvider ServiceRegister::getService(TimeProvider::CLASS_NAME);
  350.         }
  351.         return $this->timeProvider;
  352.     }
  353.     /**
  354.      * Gets @return TaskRunnerWakeup Service instance.
  355.      * @see TaskRunnerWakeupInterface service instance.
  356.      *
  357.      */
  358.     private function getTaskWakeup()
  359.     {
  360.         if ($this->taskWakeup === null) {
  361.             $this->taskWakeup ServiceRegister::getService(TaskRunnerWakeup::CLASS_NAME);
  362.         }
  363.         return $this->taskWakeup;
  364.     }
  365.     /**
  366.      * Returns wakeup delay in seconds
  367.      *
  368.      * @return int Wakeup delay in seconds.
  369.      */
  370.     private function getWakeupDelay()
  371.     {
  372.         $configurationValue $this->getConfigurationService()->getTaskRunnerWakeupDelay();
  373.         $minimalSleepTime  $configurationValue !== null $configurationValue self::WAKEUP_DELAY;
  374.         return $minimalSleepTime ceil($this->batchSleepTime);
  375.     }
  376.     /**
  377.      * Logs message and queue item details.
  378.      *
  379.      * @param QueueItem $queueItem Queue item.
  380.      * @param string $message Message to be logged.
  381.      *
  382.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\QueueItemDeserializationException
  383.      */
  384.     private function logMessageFor(QueueItem $queueItem$message)
  385.     {
  386.         $this->logDebug(
  387.             array(
  388.                 'RunnerGuid' => $this->guid,
  389.                 'Message' => $message,
  390.                 'TaskId' => $queueItem->getId(),
  391.                 'TaskType' => $queueItem->getTaskType(),
  392.                 'TaskRetries' => $queueItem->getRetries(),
  393.                 'TaskProgressBasePoints' => $queueItem->getProgressBasePoints(),
  394.                 'TaskLastExecutionProgressBasePoints' => $queueItem->getLastExecutionProgressBasePoints(),
  395.             )
  396.         );
  397.     }
  398.     /**
  399.      * Helper methods to encapsulate debug level logging.
  400.      *
  401.      * @param array $debugContent Array of debug content for logging.
  402.      */
  403.     private function logDebug(array $debugContent)
  404.     {
  405.         $debugContent['RunnerGuid'] = $this->guid;
  406.         Logger::logDebug($debugContent['Message'], 'Core'$debugContent);
  407.     }
  408.     /**
  409.      * Helper methods to encapsulate warning level logging.
  410.      *
  411.      * @param array $debugContent Array of debug content for logging.
  412.      */
  413.     private function logWarning(array $debugContent)
  414.     {
  415.         $debugContent['RunnerGuid'] = $this->guid;
  416.         Logger::logWarning($debugContent['Message'], 'Core'$debugContent);
  417.     }
  418. }