source: vendor/guzzlehttp/guzzle/src/Pool.php@ f9c482b

Last change on this file since f9c482b was f9c482b, checked in by Vlado 222039 <vlado.popovski@…>, 7 days ago

Upload new project files

  • Property mode set to 100644
File size: 4.6 KB
Line 
1<?php
2
3namespace GuzzleHttp;
4
5use GuzzleHttp\Promise as P;
6use GuzzleHttp\Promise\EachPromise;
7use GuzzleHttp\Promise\PromiseInterface;
8use GuzzleHttp\Promise\PromisorInterface;
9use Psr\Http\Message\RequestInterface;
10
11/**
12 * Sends an iterator of requests concurrently using a capped pool size.
13 *
14 * The pool will read from an iterator until it is cancelled or until the
15 * iterator is consumed. When a request is yielded, the request is sent after
16 * applying the "request_options" request options (if provided in the ctor).
17 *
18 * When a function is yielded by the iterator, the function is provided the
19 * "request_options" array that should be merged on top of any existing
20 * options, and the function MUST then return a wait-able promise.
21 *
22 * @final
23 */
24class Pool implements PromisorInterface
25{
26 /**
27 * @var EachPromise
28 */
29 private $each;
30
31 /**
32 * @param ClientInterface $client Client used to send the requests.
33 * @param array|\Iterator $requests Requests or functions that return
34 * requests to send concurrently.
35 * @param array $config Associative array of options
36 * - concurrency: (int) Maximum number of requests to send concurrently
37 * - options: Array of request options to apply to each request.
38 * - fulfilled: (callable) Function to invoke when a request completes.
39 * - rejected: (callable) Function to invoke when a request is rejected.
40 */
41 public function __construct(ClientInterface $client, $requests, array $config = [])
42 {
43 if (!isset($config['concurrency'])) {
44 $config['concurrency'] = 25;
45 }
46
47 if (isset($config['options'])) {
48 $opts = $config['options'];
49 unset($config['options']);
50 } else {
51 $opts = [];
52 }
53
54 $iterable = P\Create::iterFor($requests);
55 $requests = static function () use ($iterable, $client, $opts) {
56 foreach ($iterable as $key => $rfn) {
57 if ($rfn instanceof RequestInterface) {
58 yield $key => $client->sendAsync($rfn, $opts);
59 } elseif (\is_callable($rfn)) {
60 yield $key => $rfn($opts);
61 } else {
62 throw new \InvalidArgumentException('Each value yielded by the iterator must be a Psr7\Http\Message\RequestInterface or a callable that returns a promise that fulfills with a Psr7\Message\Http\ResponseInterface object.');
63 }
64 }
65 };
66
67 $this->each = new EachPromise($requests(), $config);
68 }
69
70 /**
71 * Get promise
72 */
73 public function promise(): PromiseInterface
74 {
75 return $this->each->promise();
76 }
77
78 /**
79 * Sends multiple requests concurrently and returns an array of responses
80 * and exceptions that uses the same ordering as the provided requests.
81 *
82 * IMPORTANT: This method keeps every request and response in memory, and
83 * as such, is NOT recommended when sending a large number or an
84 * indeterminate number of requests concurrently.
85 *
86 * @param ClientInterface $client Client used to send the requests
87 * @param array|\Iterator $requests Requests to send concurrently.
88 * @param array $options Passes through the options available in
89 * {@see \GuzzleHttp\Pool::__construct}
90 *
91 * @return array Returns an array containing the response or an exception
92 * in the same order that the requests were sent.
93 *
94 * @throws \InvalidArgumentException if the event format is incorrect.
95 */
96 public static function batch(ClientInterface $client, $requests, array $options = []): array
97 {
98 $res = [];
99 self::cmpCallback($options, 'fulfilled', $res);
100 self::cmpCallback($options, 'rejected', $res);
101 $pool = new static($client, $requests, $options);
102 $pool->promise()->wait();
103 \ksort($res);
104
105 return $res;
106 }
107
108 /**
109 * Execute callback(s)
110 */
111 private static function cmpCallback(array &$options, string $name, array &$results): void
112 {
113 if (!isset($options[$name])) {
114 $options[$name] = static function ($v, $k) use (&$results) {
115 $results[$k] = $v;
116 };
117 } else {
118 $currentFn = $options[$name];
119 $options[$name] = static function ($v, $k) use (&$results, $currentFn) {
120 $currentFn($v, $k);
121 $results[$k] = $v;
122 };
123 }
124 }
125}
Note: See TracBrowser for help on using the repository browser.