Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 146 |
|
0.00% |
0 / 13 |
CRAP | |
0.00% |
0 / 1 |
PoolCounterRedis | |
0.00% |
0 / 145 |
|
0.00% |
0 / 13 |
1560 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
12 | |||
getConnection | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
30 | |||
acquireForMe | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
acquireForAnyone | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
release | |
0.00% |
0 / 34 |
|
0.00% |
0 / 1 |
30 | |||
waitForSlotOrNotif | |
0.00% |
0 / 36 |
|
0.00% |
0 / 1 |
132 | |||
initAndPopPoolSlotList | |
0.00% |
0 / 17 |
|
0.00% |
0 / 1 |
2 | |||
registerAcquisitionTime | |
0.00% |
0 / 15 |
|
0.00% |
0 / 1 |
2 | |||
getSlotListKey | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getSlotRTimeSetKey | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getWaitSetKey | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getWakeupListKey | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
releaseAll | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
30 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | |
21 | namespace MediaWiki\PoolCounter; |
22 | |
23 | use ArrayUtils; |
24 | use Exception; |
25 | use HashRing; |
26 | use MediaWiki\Status\Status; |
27 | use RedisException; |
28 | use Wikimedia\ObjectCache\RedisConnectionPool; |
29 | use Wikimedia\ObjectCache\RedisConnRef; |
30 | |
31 | /** |
32 | * Version of PoolCounter that uses Redis |
33 | * |
34 | * There are four main redis keys used to track each pool counter key: |
35 | * - poolcounter:l-slots-* : A list of available slot IDs for a pool. |
36 | * - poolcounter:z-renewtime-* : A sorted set of (slot ID, UNIX timestamp as score) |
37 | * used for tracking the next time a slot should be |
38 | * released. This is -1 when a slot is created, and is |
39 | * set when released (expired), locked, and unlocked. |
40 | * - poolcounter:z-wait-* : A sorted set of (slot ID, UNIX timestamp as score) |
41 | * used for tracking waiting processes (and wait time). |
42 | * - poolcounter:l-wakeup-* : A list pushed to for the sake of waking up processes |
43 | * when a any process in the pool finishes (lasts for 1ms). |
44 | * |
45 | * For a given pool key, all the redis keys start off non-existing and are deleted if not |
46 | * used for a while to prevent garbage from building up on the server. They are atomically |
47 | * re-initialized as needed. The "z-renewtime" key is used for detecting sessions which got |
48 | * slots but then disappeared. Stale entries from there have their timestamp updated and the |
49 | * corresponding slots freed up. The "z-wait" key is used for detecting processes registered |
50 | * as waiting but that disappeared. Stale entries from there are deleted and the corresponding |
51 | * slots are freed up. The worker count is included in all the redis key names as it does not |
52 | * vary within each $wgPoolCounterConf type and doing so handles configuration changes. |
53 | * |
54 | * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations. |
55 | * Also this should be on a server plenty of RAM for the working set to avoid evictions. |
56 | * Evictions could temporarily allow wait queues to double in size or temporarily cause |
57 | * pools to appear as full when they are not. Using volatile-ttl and bumping memory-samples |
58 | * in redis.conf can be helpful otherwise. |
59 | * |
60 | * @since 1.23 |
61 | */ |
62 | class PoolCounterRedis extends PoolCounter { |
63 | /** @var HashRing */ |
64 | protected $ring; |
65 | /** @var RedisConnectionPool */ |
66 | protected $pool; |
67 | /** @var array (server label => host) map */ |
68 | protected $serversByLabel; |
69 | /** @var string SHA-1 of the key */ |
70 | protected $keySha1; |
71 | /** @var int TTL for locks to expire (work should finish in this time) */ |
72 | protected $lockTTL; |
73 | /** @var RedisConnRef */ |
74 | protected $conn; |
75 | /** @var string|null Pool slot value */ |
76 | protected $slot; |
77 | /** @var int|null AWAKE_* constant */ |
78 | protected $onRelease; |
79 | /** @var string Unique string to identify this process */ |
80 | protected $session; |
81 | /** @var float|null UNIX timestamp */ |
82 | protected $slotTime; |
83 | |
84 | private const AWAKE_ONE = 1; // wake-up if when a slot can be taken from an existing process |
85 | private const AWAKE_ALL = 2; // wake-up if an existing process finishes and wake up such others |
86 | |
87 | /** @var PoolCounterRedis[] List of active PoolCounterRedis objects in this script */ |
88 | protected static $active = null; |
89 | |
90 | public function __construct( $conf, $type, $key ) { |
91 | parent::__construct( $conf, $type, $key ); |
92 | |
93 | $this->serversByLabel = $conf['servers']; |
94 | |
95 | $serverLabels = array_keys( $conf['servers'] ); |
96 | $this->ring = new HashRing( array_fill_keys( $serverLabels, 10 ) ); |
97 | |
98 | $conf['redisConfig']['serializer'] = 'none'; // for use with Lua |
99 | $this->pool = RedisConnectionPool::singleton( $conf['redisConfig'] ); |
100 | |
101 | $this->keySha1 = sha1( $this->key ); |
102 | $met = ini_get( 'max_execution_time' ); // usually 0 in CLI mode |
103 | $this->lockTTL = $met ? 2 * (int)$met : 3600; |
104 | |
105 | if ( self::$active === null ) { |
106 | self::$active = []; |
107 | register_shutdown_function( [ __CLASS__, 'releaseAll' ] ); |
108 | } |
109 | } |
110 | |
111 | /** |
112 | * @return Status Uses RediConnRef as value on success |
113 | */ |
114 | protected function getConnection() { |
115 | if ( !isset( $this->conn ) ) { |
116 | $conn = false; |
117 | $servers = $this->ring->getLocations( $this->key, 3 ); |
118 | ArrayUtils::consistentHashSort( $servers, $this->key ); |
119 | foreach ( $servers as $server ) { |
120 | $conn = $this->pool->getConnection( $this->serversByLabel[$server], $this->logger ); |
121 | if ( $conn ) { |
122 | break; |
123 | } |
124 | } |
125 | if ( !$conn ) { |
126 | return Status::newFatal( 'pool-servererror', implode( ', ', $servers ) ); |
127 | } |
128 | $this->conn = $conn; |
129 | } |
130 | return Status::newGood( $this->conn ); |
131 | } |
132 | |
133 | public function acquireForMe( $timeout = null ) { |
134 | $status = $this->precheckAcquire(); |
135 | if ( !$status->isGood() ) { |
136 | return $status; |
137 | } |
138 | |
139 | return $this->waitForSlotOrNotif( self::AWAKE_ONE, $timeout ); |
140 | } |
141 | |
142 | public function acquireForAnyone( $timeout = null ) { |
143 | $status = $this->precheckAcquire(); |
144 | if ( !$status->isGood() ) { |
145 | return $status; |
146 | } |
147 | |
148 | return $this->waitForSlotOrNotif( self::AWAKE_ALL, $timeout ); |
149 | } |
150 | |
151 | public function release() { |
152 | if ( $this->slot === null ) { |
153 | return Status::newGood( PoolCounter::NOT_LOCKED ); // not locked |
154 | } |
155 | |
156 | $status = $this->getConnection(); |
157 | if ( !$status->isOK() ) { |
158 | return $status; |
159 | } |
160 | /** @var RedisConnRef $conn */ |
161 | $conn = $status->value; |
162 | '@phan-var RedisConnRef $conn'; |
163 | |
164 | // phpcs:disable Generic.Files.LineLength |
165 | static $script = |
166 | /** @lang Lua */ |
167 | <<<LUA |
168 | local kSlots,kSlotsNextRelease,kWakeup,kWaiting = unpack(KEYS) |
169 | local rMaxWorkers,rExpiry,rSlot,rSlotTime,rAwakeAll,rTime = unpack(ARGV) |
170 | -- Add the slots back to the list (if rSlot is "w" then it is not a slot). |
171 | -- Treat the list as expired if the "next release" time sorted-set is missing. |
172 | if rSlot ~= 'w' and redis.call('exists',kSlotsNextRelease) == 1 then |
173 | if 1*redis.call('zScore',kSlotsNextRelease,rSlot) ~= (rSlotTime + rExpiry) then |
174 | -- Slot lock expired and was released already |
175 | elseif redis.call('lLen',kSlots) >= 1*rMaxWorkers then |
176 | -- Slots somehow got out of sync; reset the list |
177 | redis.call('del',kSlots,kSlotsNextRelease) |
178 | elseif redis.call('lLen',kSlots) == (1*rMaxWorkers - 1) and redis.call('zCard',kWaiting) == 0 then |
179 | -- Slot list will be made full; clear it to save space (it re-inits as needed) |
180 | -- since nothing is waiting on being unblocked by a push to the list |
181 | redis.call('del',kSlots,kSlotsNextRelease) |
182 | else |
183 | -- Add slot back to pool and update the "next release" time |
184 | redis.call('rPush',kSlots,rSlot) |
185 | redis.call('zAdd',kSlotsNextRelease,rTime + 30,rSlot) |
186 | -- Always keep renewing the expiry on use |
187 | redis.call('expireAt',kSlots,math.ceil(rTime + rExpiry)) |
188 | redis.call('expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry)) |
189 | end |
190 | end |
191 | -- Update an ephemeral list to wake up other clients that can |
192 | -- reuse any cached work from this process. Only do this if no |
193 | -- slots are currently free (e.g. clients could be waiting). |
194 | if 1*rAwakeAll == 1 then |
195 | local count = redis.call('zCard',kWaiting) |
196 | for i = 1,count do |
197 | redis.call('rPush',kWakeup,'w') |
198 | end |
199 | redis.call('pexpire',kWakeup,1) |
200 | end |
201 | return 1 |
202 | LUA; |
203 | // phpcs:enable |
204 | |
205 | try { |
206 | $conn->luaEval( |
207 | $script, |
208 | [ |
209 | $this->getSlotListKey(), |
210 | $this->getSlotRTimeSetKey(), |
211 | $this->getWakeupListKey(), |
212 | $this->getWaitSetKey(), |
213 | $this->workers, |
214 | $this->lockTTL, |
215 | $this->slot, |
216 | $this->slotTime, // used for CAS-style check |
217 | ( $this->onRelease === self::AWAKE_ALL ) ? 1 : 0, |
218 | microtime( true ), |
219 | ], |
220 | 4 # number of first argument(s) that are keys |
221 | ); |
222 | } catch ( RedisException $e ) { |
223 | return Status::newFatal( 'pool-error-unknown', $e->getMessage() ); |
224 | } |
225 | |
226 | $this->slot = null; |
227 | $this->slotTime = null; |
228 | $this->onRelease = null; |
229 | unset( self::$active[$this->session] ); |
230 | |
231 | $this->onRelease(); |
232 | |
233 | return Status::newGood( PoolCounter::RELEASED ); |
234 | } |
235 | |
236 | /** |
237 | * @param int $doWakeup AWAKE_* constant |
238 | * @param int|float|null $timeout |
239 | * @return Status |
240 | */ |
241 | protected function waitForSlotOrNotif( $doWakeup, $timeout = null ) { |
242 | if ( $this->slot !== null ) { |
243 | return Status::newGood( PoolCounter::LOCK_HELD ); // already acquired |
244 | } |
245 | |
246 | $status = $this->getConnection(); |
247 | if ( !$status->isOK() ) { |
248 | return $status; |
249 | } |
250 | /** @var RedisConnRef $conn */ |
251 | $conn = $status->value; |
252 | '@phan-var RedisConnRef $conn'; |
253 | |
254 | $now = microtime( true ); |
255 | $timeout ??= $this->timeout; |
256 | try { |
257 | $slot = $this->initAndPopPoolSlotList( $conn, $now ); |
258 | if ( ctype_digit( $slot ) ) { |
259 | // Pool slot acquired by this process |
260 | $slotTime = $now; |
261 | } elseif ( $slot === 'QUEUE_FULL' ) { |
262 | // Too many processes are waiting for pooled processes to finish |
263 | return Status::newGood( PoolCounter::QUEUE_FULL ); |
264 | } elseif ( $slot === 'QUEUE_WAIT' ) { |
265 | // This process is now registered as waiting |
266 | $keys = |
267 | ( $doWakeup == self::AWAKE_ALL ) // Wait for an open slot or wake-up signal (preferring the latter) |
268 | ? [ $this->getWakeupListKey(), $this->getSlotListKey() ] // Just wait for an actual pool slot |
269 | : [ $this->getSlotListKey() ]; |
270 | |
271 | $res = $conn->blPop( $keys, $timeout ); |
272 | if ( $res === [] ) { |
273 | $conn->zRem( $this->getWaitSetKey(), $this->session ); // no longer waiting |
274 | return Status::newGood( PoolCounter::TIMEOUT ); |
275 | } |
276 | |
277 | $slot = $res[1]; // pool slot or "w" for wake-up notifications |
278 | $slotTime = microtime( true ); // last microtime() was a few RTTs ago |
279 | // Unregister this process as waiting and bump slot "next release" time |
280 | $this->registerAcquisitionTime( $conn, $slot, $slotTime ); |
281 | } else { |
282 | return Status::newFatal( 'pool-error-unknown', "Server gave slot '$slot'." ); |
283 | } |
284 | } catch ( RedisException $e ) { |
285 | return Status::newFatal( 'pool-error-unknown', $e->getMessage() ); |
286 | } |
287 | |
288 | if ( $slot !== 'w' ) { |
289 | $this->slot = $slot; |
290 | $this->slotTime = $slotTime; |
291 | $this->onRelease = $doWakeup; |
292 | self::$active[$this->session] = $this; |
293 | } |
294 | |
295 | $this->onAcquire(); |
296 | |
297 | return Status::newGood( $slot === 'w' ? PoolCounter::DONE : PoolCounter::LOCKED ); |
298 | } |
299 | |
300 | /** |
301 | * @param RedisConnRef $conn |
302 | * @param float $now UNIX timestamp |
303 | * @return string|bool False on failure |
304 | */ |
305 | protected function initAndPopPoolSlotList( RedisConnRef $conn, $now ) { |
306 | static $script = /** @lang Lua */ <<<LUA |
307 | local kSlots,kSlotsNextRelease,kSlotWaits = unpack(KEYS) |
308 | local rMaxWorkers,rMaxQueue,rTimeout,rExpiry,rSess,rTime = unpack(ARGV) |
309 | -- Initialize if the "next release" time sorted-set is empty. The slot key |
310 | -- itself is empty if all slots are busy or when nothing is initialized. |
311 | -- If the list is empty but the set is not, then it is the latter case. |
312 | -- If the list exists but not the set, then reset everything. |
313 | if redis.call('exists',kSlotsNextRelease) == 0 then |
314 | redis.call('del',kSlots) |
315 | for i = 1,1*rMaxWorkers do |
316 | redis.call('rPush',kSlots,i) |
317 | redis.call('zAdd',kSlotsNextRelease,-1,i) |
318 | end |
319 | -- Otherwise do maintenance to clean up after network partitions |
320 | else |
321 | -- Find stale slot locks and add free them (avoid duplicates) |
322 | local staleLocks = redis.call('zRangeByScore',kSlotsNextRelease,0,rTime) |
323 | for k,slot in ipairs(staleLocks) do |
324 | redis.call('lRem',kSlots,0,slot) |
325 | redis.call('rPush',kSlots,slot) |
326 | redis.call('zAdd',kSlotsNextRelease,rTime + 30,slot) |
327 | end |
328 | -- Find stale wait slot entries and remove them |
329 | redis.call('zRemRangeByScore',kSlotWaits,0,rTime - 2*rTimeout) |
330 | end |
331 | local slot |
332 | -- Try to acquire a slot if possible now |
333 | if redis.call('lLen',kSlots) > 0 then |
334 | slot = redis.call('lPop',kSlots) |
335 | -- Update the slot "next release" time |
336 | redis.call('zAdd',kSlotsNextRelease,rTime + rExpiry,slot) |
337 | elseif redis.call('zCard',kSlotWaits) >= 1*rMaxQueue then |
338 | slot = 'QUEUE_FULL' |
339 | else |
340 | slot = 'QUEUE_WAIT' |
341 | -- Register this process as waiting |
342 | redis.call('zAdd',kSlotWaits,rTime,rSess) |
343 | redis.call('expireAt',kSlotWaits,math.ceil(rTime + 2*rTimeout)) |
344 | end |
345 | -- Always keep renewing the expiry on use |
346 | redis.call('expireAt',kSlots,math.ceil(rTime + rExpiry)) |
347 | redis.call('expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry)) |
348 | return slot |
349 | LUA; |
350 | return $conn->luaEval( |
351 | $script, |
352 | [ |
353 | $this->getSlotListKey(), |
354 | $this->getSlotRTimeSetKey(), |
355 | $this->getWaitSetKey(), |
356 | $this->workers, |
357 | $this->maxqueue, |
358 | $this->timeout, |
359 | $this->lockTTL, |
360 | $this->session, |
361 | $now, |
362 | ], |
363 | 3 # number of first argument(s) that are keys |
364 | ); |
365 | } |
366 | |
367 | /** |
368 | * @param RedisConnRef $conn |
369 | * @param string $slot |
370 | * @param float $now |
371 | * @return int|bool False on failure |
372 | */ |
373 | protected function registerAcquisitionTime( RedisConnRef $conn, $slot, $now ) { |
374 | static $script = /** @lang Lua */ <<<LUA |
375 | local kSlots,kSlotsNextRelease,kSlotWaits = unpack(KEYS) |
376 | local rSlot,rExpiry,rSess,rTime = unpack(ARGV) |
377 | -- If rSlot is 'w' then the client was told to wake up but got no slot |
378 | if rSlot ~= 'w' then |
379 | -- Update the slot "next release" time |
380 | redis.call('zAdd',kSlotsNextRelease,rTime + rExpiry,rSlot) |
381 | -- Always keep renewing the expiry on use |
382 | redis.call('expireAt',kSlots,math.ceil(rTime + rExpiry)) |
383 | redis.call('expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry)) |
384 | end |
385 | -- Unregister this process as waiting |
386 | redis.call('zRem',kSlotWaits,rSess) |
387 | return 1 |
388 | LUA; |
389 | return $conn->luaEval( |
390 | $script, |
391 | [ |
392 | $this->getSlotListKey(), |
393 | $this->getSlotRTimeSetKey(), |
394 | $this->getWaitSetKey(), |
395 | $slot, |
396 | $this->lockTTL, |
397 | $this->session, |
398 | $now, |
399 | ], |
400 | 3 # number of first argument(s) that are keys |
401 | ); |
402 | } |
403 | |
404 | /** |
405 | * @return string |
406 | */ |
407 | protected function getSlotListKey() { |
408 | return "poolcounter:l-slots-{$this->keySha1}-{$this->workers}"; |
409 | } |
410 | |
411 | /** |
412 | * @return string |
413 | */ |
414 | protected function getSlotRTimeSetKey() { |
415 | return "poolcounter:z-renewtime-{$this->keySha1}-{$this->workers}"; |
416 | } |
417 | |
418 | /** |
419 | * @return string |
420 | */ |
421 | protected function getWaitSetKey() { |
422 | return "poolcounter:z-wait-{$this->keySha1}-{$this->workers}"; |
423 | } |
424 | |
425 | /** |
426 | * @return string |
427 | */ |
428 | protected function getWakeupListKey() { |
429 | return "poolcounter:l-wakeup-{$this->keySha1}-{$this->workers}"; |
430 | } |
431 | |
432 | /** |
433 | * Try to make sure that locks get released (even with exceptions and fatals) |
434 | */ |
435 | public static function releaseAll() { |
436 | $e = null; |
437 | foreach ( self::$active as $poolCounter ) { |
438 | try { |
439 | if ( $poolCounter->slot !== null ) { |
440 | $poolCounter->release(); |
441 | } |
442 | } catch ( Exception $e ) { |
443 | } |
444 | } |
445 | if ( $e ) { |
446 | throw $e; |
447 | } |
448 | } |
449 | } |
450 | |
451 | /** @deprecated class alias since 1.42 */ |
452 | class_alias( PoolCounterRedis::class, 'PoolCounterRedis' ); |