Utils.php
8.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
<?php
namespace GuzzleHttp\Promise;
final class Utils
{
/**
* Get the global task queue used for promise resolution.
*
* This task queue MUST be run in an event loop in order for promises to be
* settled asynchronously. It will be automatically run when synchronously
* waiting on a promise.
*
* <code>
* while ($eventLoop->isRunning()) {
* GuzzleHttp\Promise\Utils::queue()->run();
* }
* </code>
*
* @param TaskQueueInterface $assign Optionally specify a new queue instance.
*
* @return TaskQueueInterface
*/
public static function queue(TaskQueueInterface $assign = null)
{
static $queue;
if ($assign) {
$queue = $assign;
} elseif (!$queue) {
$queue = new TaskQueue();
}
return $queue;
}
/**
* Adds a function to run in the task queue when it is next `run()` and
* returns a promise that is fulfilled or rejected with the result.
*
* @param callable $task Task function to run.
*
* @return PromiseInterface
*/
public static function task(callable $task)
{
$queue = self::queue();
$promise = new Promise([$queue, 'run']);
$queue->add(function () use ($task, $promise) {
try {
$promise->resolve($task());
} catch (\Throwable $e) {
$promise->reject($e);
} catch (\Exception $e) {
$promise->reject($e);
}
});
return $promise;
}
/**
* Synchronously waits on a promise to resolve and returns an inspection
* state array.
*
* Returns a state associative array containing a "state" key mapping to a
* valid promise state. If the state of the promise is "fulfilled", the
* array will contain a "value" key mapping to the fulfilled value of the
* promise. If the promise is rejected, the array will contain a "reason"
* key mapping to the rejection reason of the promise.
*
* @param PromiseInterface $promise Promise or value.
*
* @return array
*/
public static function inspect(PromiseInterface $promise)
{
try {
return [
'state' => PromiseInterface::FULFILLED,
'value' => $promise->wait()
];
} catch (RejectionException $e) {
return ['state' => PromiseInterface::REJECTED, 'reason' => $e->getReason()];
} catch (\Throwable $e) {
return ['state' => PromiseInterface::REJECTED, 'reason' => $e];
} catch (\Exception $e) {
return ['state' => PromiseInterface::REJECTED, 'reason' => $e];
}
}
/**
* Waits on all of the provided promises, but does not unwrap rejected
* promises as thrown exception.
*
* Returns an array of inspection state arrays.
*
* @see inspect for the inspection state array format.
*
* @param PromiseInterface[] $promises Traversable of promises to wait upon.
*
* @return array
*/
public static function inspectAll($promises)
{
$results = [];
foreach ($promises as $key => $promise) {
$results[$key] = inspect($promise);
}
return $results;
}
/**
* Waits on all of the provided promises and returns the fulfilled values.
*
* Returns an array that contains the value of each promise (in the same
* order the promises were provided). An exception is thrown if any of the
* promises are rejected.
*
* @param iterable<PromiseInterface> $promises Iterable of PromiseInterface objects to wait on.
*
* @return array
*
* @throws \Exception on error
* @throws \Throwable on error in PHP >=7
*/
public static function unwrap($promises)
{
$results = [];
foreach ($promises as $key => $promise) {
$results[$key] = $promise->wait();
}
return $results;
}
/**
* Given an array of promises, return a promise that is fulfilled when all
* the items in the array are fulfilled.
*
* The promise's fulfillment value is an array with fulfillment values at
* respective positions to the original array. If any promise in the array
* rejects, the returned promise is rejected with the rejection reason.
*
* @param mixed $promises Promises or values.
* @param bool $recursive If true, resolves new promises that might have been added to the stack during its own resolution.
*
* @return PromiseInterface
*/
public static function all($promises, $recursive = false)
{
$results = [];
$promise = Each::of(
$promises,
function ($value, $idx) use (&$results) {
$results[$idx] = $value;
},
function ($reason, $idx, Promise $aggregate) {
$aggregate->reject($reason);
}
)->then(function () use (&$results) {
ksort($results);
return $results;
});
if (true === $recursive) {
$promise = $promise->then(function ($results) use ($recursive, &$promises) {
foreach ($promises as $promise) {
if (Is::pending($promise)) {
return self::all($promises, $recursive);
}
}
return $results;
});
}
return $promise;
}
/**
* Initiate a competitive race between multiple promises or values (values
* will become immediately fulfilled promises).
*
* When count amount of promises have been fulfilled, the returned promise
* is fulfilled with an array that contains the fulfillment values of the
* winners in order of resolution.
*
* This promise is rejected with a {@see AggregateException} if the number
* of fulfilled promises is less than the desired $count.
*
* @param int $count Total number of promises.
* @param mixed $promises Promises or values.
*
* @return PromiseInterface
*/
public static function some($count, $promises)
{
$results = [];
$rejections = [];
return Each::of(
$promises,
function ($value, $idx, PromiseInterface $p) use (&$results, $count) {
if (Is::settled($p)) {
return;
}
$results[$idx] = $value;
if (count($results) >= $count) {
$p->resolve(null);
}
},
function ($reason) use (&$rejections) {
$rejections[] = $reason;
}
)->then(
function () use (&$results, &$rejections, $count) {
if (count($results) !== $count) {
throw new AggregateException(
'Not enough promises to fulfill count',
$rejections
);
}
ksort($results);
return array_values($results);
}
);
}
/**
* Like some(), with 1 as count. However, if the promise fulfills, the
* fulfillment value is not an array of 1 but the value directly.
*
* @param mixed $promises Promises or values.
*
* @return PromiseInterface
*/
public static function any($promises)
{
return self::some(1, $promises)->then(function ($values) {
return $values[0];
});
}
/**
* Returns a promise that is fulfilled when all of the provided promises have
* been fulfilled or rejected.
*
* The returned promise is fulfilled with an array of inspection state arrays.
*
* @see inspect for the inspection state array format.
*
* @param mixed $promises Promises or values.
*
* @return PromiseInterface
*/
public static function settle($promises)
{
$results = [];
return Each::of(
$promises,
function ($value, $idx) use (&$results) {
$results[$idx] = ['state' => PromiseInterface::FULFILLED, 'value' => $value];
},
function ($reason, $idx) use (&$results) {
$results[$idx] = ['state' => PromiseInterface::REJECTED, 'reason' => $reason];
}
)->then(function () use (&$results) {
ksort($results);
return $results;
});
}
}