source: vendor/guzzlehttp/promises/src/EachPromise.php@ e3d4e0a

Last change on this file since e3d4e0a was e3d4e0a, checked in by Vlado 222039 <vlado.popovski@…>, 7 days ago

Upload project files

  • Property mode set to 100644
File size: 7.4 KB
RevLine 
[e3d4e0a]1<?php
2
3declare(strict_types=1);
4
5namespace GuzzleHttp\Promise;
6
7/**
8 * Represents a promise that iterates over many promises and invokes
9 * side-effect functions in the process.
10 *
11 * @final
12 */
13class EachPromise implements PromisorInterface
14{
15 private $pending = [];
16
17 private $nextPendingIndex = 0;
18
19 /** @var \Iterator|null */
20 private $iterable;
21
22 /** @var callable|int|null */
23 private $concurrency;
24
25 /** @var callable|null */
26 private $onFulfilled;
27
28 /** @var callable|null */
29 private $onRejected;
30
31 /** @var Promise|null */
32 private $aggregate;
33
34 /** @var bool|null */
35 private $mutex;
36
37 /**
38 * Configuration hash can include the following key value pairs:
39 *
40 * - fulfilled: (callable) Invoked when a promise fulfills. The function
41 * is invoked with three arguments: the fulfillment value, the index
42 * position from the iterable list of the promise, and the aggregate
43 * promise that manages all of the promises. The aggregate promise may
44 * be resolved from within the callback to short-circuit the promise.
45 * - rejected: (callable) Invoked when a promise is rejected. The
46 * function is invoked with three arguments: the rejection reason, the
47 * index position from the iterable list of the promise, and the
48 * aggregate promise that manages all of the promises. The aggregate
49 * promise may be resolved from within the callback to short-circuit
50 * the promise.
51 * - concurrency: (integer) Pass this configuration option to limit the
52 * allowed number of outstanding concurrently executing promises,
53 * creating a capped pool of promises. There is no limit by default.
54 *
55 * @param mixed $iterable Promises or values to iterate.
56 * @param array $config Configuration options
57 */
58 public function __construct($iterable, array $config = [])
59 {
60 $this->iterable = Create::iterFor($iterable);
61
62 if (isset($config['concurrency'])) {
63 $this->concurrency = $config['concurrency'];
64 }
65
66 if (isset($config['fulfilled'])) {
67 $this->onFulfilled = $config['fulfilled'];
68 }
69
70 if (isset($config['rejected'])) {
71 $this->onRejected = $config['rejected'];
72 }
73 }
74
75 /** @psalm-suppress InvalidNullableReturnType */
76 public function promise(): PromiseInterface
77 {
78 if ($this->aggregate) {
79 return $this->aggregate;
80 }
81
82 try {
83 $this->createPromise();
84 /** @psalm-assert Promise $this->aggregate */
85 $this->iterable->rewind();
86 $this->refillPending();
87 } catch (\Throwable $e) {
88 $this->aggregate->reject($e);
89 }
90
91 /**
92 * @psalm-suppress NullableReturnStatement
93 */
94 return $this->aggregate;
95 }
96
97 private function createPromise(): void
98 {
99 $this->mutex = false;
100 $this->aggregate = new Promise(function (): void {
101 if ($this->checkIfFinished()) {
102 return;
103 }
104 reset($this->pending);
105 // Consume a potentially fluctuating list of promises while
106 // ensuring that indexes are maintained (precluding array_shift).
107 while ($promise = current($this->pending)) {
108 next($this->pending);
109 $promise->wait();
110 if (Is::settled($this->aggregate)) {
111 return;
112 }
113 }
114 });
115
116 // Clear the references when the promise is resolved.
117 $clearFn = function (): void {
118 $this->iterable = $this->concurrency = $this->pending = null;
119 $this->onFulfilled = $this->onRejected = null;
120 $this->nextPendingIndex = 0;
121 };
122
123 $this->aggregate->then($clearFn, $clearFn);
124 }
125
126 private function refillPending(): void
127 {
128 if (!$this->concurrency) {
129 // Add all pending promises.
130 while ($this->addPending() && $this->advanceIterator()) {
131 }
132
133 return;
134 }
135
136 // Add only up to N pending promises.
137 $concurrency = is_callable($this->concurrency)
138 ? ($this->concurrency)(count($this->pending))
139 : $this->concurrency;
140 $concurrency = max($concurrency - count($this->pending), 0);
141 // Concurrency may be set to 0 to disallow new promises.
142 if (!$concurrency) {
143 return;
144 }
145 // Add the first pending promise.
146 $this->addPending();
147 // Note this is special handling for concurrency=1 so that we do
148 // not advance the iterator after adding the first promise. This
149 // helps work around issues with generators that might not have the
150 // next value to yield until promise callbacks are called.
151 while (--$concurrency
152 && $this->advanceIterator()
153 && $this->addPending()) {
154 }
155 }
156
157 private function addPending(): bool
158 {
159 if (!$this->iterable || !$this->iterable->valid()) {
160 return false;
161 }
162
163 $promise = Create::promiseFor($this->iterable->current());
164 $key = $this->iterable->key();
165
166 // Iterable keys may not be unique, so we use a counter to
167 // guarantee uniqueness
168 $idx = $this->nextPendingIndex++;
169
170 $this->pending[$idx] = $promise->then(
171 function ($value) use ($idx, $key): void {
172 if ($this->onFulfilled) {
173 ($this->onFulfilled)(
174 $value,
175 $key,
176 $this->aggregate
177 );
178 }
179 $this->step($idx);
180 },
181 function ($reason) use ($idx, $key): void {
182 if ($this->onRejected) {
183 ($this->onRejected)(
184 $reason,
185 $key,
186 $this->aggregate
187 );
188 }
189 $this->step($idx);
190 }
191 );
192
193 return true;
194 }
195
196 private function advanceIterator(): bool
197 {
198 // Place a lock on the iterator so that we ensure to not recurse,
199 // preventing fatal generator errors.
200 if ($this->mutex) {
201 return false;
202 }
203
204 $this->mutex = true;
205
206 try {
207 $this->iterable->next();
208 $this->mutex = false;
209
210 return true;
211 } catch (\Throwable $e) {
212 $this->aggregate->reject($e);
213 $this->mutex = false;
214
215 return false;
216 }
217 }
218
219 private function step(int $idx): void
220 {
221 // If the promise was already resolved, then ignore this step.
222 if (Is::settled($this->aggregate)) {
223 return;
224 }
225
226 unset($this->pending[$idx]);
227
228 // Only refill pending promises if we are not locked, preventing the
229 // EachPromise to recursively invoke the provided iterator, which
230 // cause a fatal error: "Cannot resume an already running generator"
231 if ($this->advanceIterator() && !$this->checkIfFinished()) {
232 // Add more pending promises if possible.
233 $this->refillPending();
234 }
235 }
236
237 private function checkIfFinished(): bool
238 {
239 if (!$this->pending && !$this->iterable->valid()) {
240 // Resolve the promise if there's nothing left to do.
241 $this->aggregate->resolve(null);
242
243 return true;
244 }
245
246 return false;
247 }
248}
Note: See TracBrowser for help on using the repository browser.