custom/plugins/CrswCleverReachOfficial/src/Core/Infrastructure/TaskExecution/AsyncBatchStarter.php line 10

Open in your IDE?
  1. <?php
  2. namespace Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution;
  3. use Crsw\CleverReachOfficial\Core\Infrastructure\Serializer\Serializer;
  4. use Crsw\CleverReachOfficial\Core\Infrastructure\ServiceRegister;
  5. use Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Interfaces\AsyncProcessService;
  6. use Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Interfaces\Runnable;
  7. class AsyncBatchStarter implements Runnable
  8. {
  9.     /**
  10.      * Batch size.
  11.      *
  12.      * @var int
  13.      */
  14.     private $batchSize;
  15.     /**
  16.      * List of sub-batches.
  17.      *
  18.      * @var AsyncBatchStarter[]
  19.      */
  20.     private $subBatches = array();
  21.     /**
  22.      * List of runners.
  23.      *
  24.      * @var Runnable[]
  25.      */
  26.     private $runners = array();
  27.     /**
  28.      * Current add index.
  29.      *
  30.      * @var int
  31.      */
  32.     private $addIndex 0;
  33.     /**
  34.      * Instance of async process starter.
  35.      *
  36.      * @var AsyncProcessService
  37.      */
  38.     private $asyncProcessStarter;
  39.     /**
  40.      * @inheritDoc
  41.      */
  42.     public function serialize()
  43.     {
  44.         return Serializer::serialize(array($this->batchSize$this->subBatches$this->runners$this->addIndex));
  45.     }
  46.     /**
  47.      * @inheritDoc
  48.      */
  49.     public function unserialize($serialized)
  50.     {
  51.         list(
  52.             $this->batchSize$this->subBatches$this->runners$this->addIndex
  53.             ) =
  54.             Serializer::unserialize($serialized);
  55.     }
  56.     /**
  57.      * @inheritDoc
  58.      */
  59.     public static function fromArray(array $data)
  60.     {
  61.         $runners = array();
  62.         $subBatches = array();
  63.         foreach ($data['runners'] as $runner) {
  64.             $runners[] = Serializer::unserialize($runner);
  65.         }
  66.         foreach ($data['subBatches'] as $subBatch) {
  67.             $subBatches[] = Serializer::unserialize($subBatch);
  68.         }
  69.         $instance = new self($data['batchSize'], $runners);
  70.         $instance->subBatches $subBatches;
  71.         $instance->addIndex $data['addIndex'];
  72.         return $instance;
  73.     }
  74.     /**
  75.      * @inheritDoc
  76.      */
  77.     public function toArray()
  78.     {
  79.         $runners = array();
  80.         $subBatches = array();
  81.         foreach ($this->runners as $runner) {
  82.             $runners[] = Serializer::serialize($runner);
  83.         }
  84.         foreach ($this->subBatches as $subBatch) {
  85.             $subBatches[] = Serializer::serialize($subBatch);
  86.         }
  87.         return array(
  88.             'batchSize' => $this->batchSize,
  89.             'subBatches' => $subBatches,
  90.             'runners' => $runners,
  91.             'addIndex' => $this->addIndex,
  92.         );
  93.     }
  94.     /**
  95.      * AsyncBatchStarter constructor.
  96.      *
  97.      * @param int $batchSize
  98.      * @param Runnable[] $runners
  99.      */
  100.     public function __construct($batchSize, array $runners = array())
  101.     {
  102.         $this->batchSize $batchSize;
  103.         foreach ($runners as $runner) {
  104.             $this->addRunner($runner);
  105.         }
  106.     }
  107.     /**
  108.      * Add runnable to the batch
  109.      *
  110.      * @param Runnable $runner
  111.      */
  112.     public function addRunner(Runnable $runner)
  113.     {
  114.         if ($this->isCapacityFull()) {
  115.             $this->subBatches[$this->addIndex]->addRunner($runner);
  116.             $this->addIndex = ($this->addIndex 1) % $this->batchSize;
  117.             return;
  118.         }
  119.         if ($this->isRunnersCapacityFull()) {
  120.             $this->subBatches[] = new self($this->batchSize$this->runners);
  121.             $this->runners = array();
  122.         }
  123.         $this->runners[] = $runner;
  124.     }
  125.     /**
  126.      * @inheritDoc
  127.      *
  128.      * @throws \Crsw\CleverReachOfficial\Core\Infrastructure\TaskExecution\Exceptions\ProcessStarterSaveException
  129.      */
  130.     public function run()
  131.     {
  132.         foreach ($this->subBatches as $subBatch) {
  133.             $this->getAsyncProcessStarter()->start($subBatch);
  134.         }
  135.         foreach ($this->runners as $runner) {
  136.             $this->getAsyncProcessStarter()->start($runner);
  137.         }
  138.     }
  139.     /**
  140.      * Returns max number of nested sub-batch levels. No sub-batches will return 0, one sub-batch 1, sub-batch with
  141.      * sub-batch 2....
  142.      *
  143.      * @return int Max number of nested sub-batch levels
  144.      */
  145.     public function getMaxNestingLevels()
  146.     {
  147.         if (empty($this->subBatches)) {
  148.             return 0;
  149.         }
  150.         $maxLevel 0;
  151.         foreach ($this->subBatches as $subBatch) {
  152.             $subBatchMaxLevel $subBatch->getMaxNestingLevels();
  153.             if ($maxLevel $subBatchMaxLevel) {
  154.                 $maxLevel $subBatchMaxLevel;
  155.             }
  156.         }
  157.         return $maxLevel 1;
  158.     }
  159.     /**
  160.      * Calculates time required for whole batch with its sub-batches to run. Wait time calculation si based on HTTP
  161.      * request duration provided as method argument
  162.      *
  163.      * @param float $requestDuration Expected HTTP request duration in microseconds.
  164.      *
  165.      * @return float Wait period in micro seconds that is required for whole batch (with sub-batches) to run
  166.      */
  167.     public function getWaitTime($requestDuration)
  168.     {
  169.         // Without sub-batches all requests are started as soon as run method is done
  170.         if (empty($this->subBatches)) {
  171.             return 0;
  172.         }
  173.         $subBatchWaitTime $this->batchSize $this->getMaxNestingLevels() * $requestDuration;
  174.         $runnersStartupTime count($this->runners) * $requestDuration;
  175.         return $subBatchWaitTime $runnersStartupTime;
  176.     }
  177.     /**
  178.      * @inheritDoc
  179.      */
  180.     public function __toString()
  181.     {
  182.         $out implode(', '$this->subBatches);
  183.         $countOfRunners count($this->runners);
  184.         for ($i 0$i $countOfRunners$i++) {
  185.             $out .= empty($out) ? 'R' ', R';
  186.         }
  187.         return "B({$out})";
  188.     }
  189.     /**
  190.      * @return bool
  191.      *      True if current batch cant take any more runners nor create any more sub-batches itself; False otherwise
  192.      */
  193.     protected function isCapacityFull()
  194.     {
  195.         return $this->isRunnersCapacityFull() && $this->isSubBatchCapacityFull();
  196.     }
  197.     /**
  198.      * @return bool
  199.      *      True if current batch cant create any more sub-batches itself; False otherwise
  200.      */
  201.     protected function isSubBatchCapacityFull()
  202.     {
  203.         return count($this->subBatches) >= $this->batchSize;
  204.     }
  205.     /**
  206.      * @return bool
  207.      *      True if current batch cant take any more runners itself; False otherwise
  208.      */
  209.     protected function isRunnersCapacityFull()
  210.     {
  211.         return count($this->runners) >= $this->batchSize;
  212.     }
  213.     /**
  214.      * Gets instance of async process starter.
  215.      *
  216.      * @return AsyncProcessService
  217.      *   Instance of async process starter.
  218.      */
  219.     protected function getAsyncProcessStarter()
  220.     {
  221.         if ($this->asyncProcessStarter === null) {
  222.             $this->asyncProcessStarter ServiceRegister::getService(AsyncProcessService::CLASS_NAME);
  223.         }
  224.         return $this->asyncProcessStarter;
  225.     }
  226. }