MediaWiki master
RedisBagOStuff.php
Go to the documentation of this file.
1<?php
2
3// @phan-file-suppress PhanTypeComparisonFromArray It's unclear whether exec() can return false
4
10
11use Exception;
12use Redis;
13use RedisException;
15
32 protected $redisPool;
34 protected $servers;
36 protected $serverTagMap;
39
72 public function __construct( $params ) {
73 parent::__construct( $params );
74 $redisConf = [ 'serializer' => 'none' ]; // manage that in this class
75 foreach ( [ 'connectTimeout', 'persistent', 'password', 'prefix' ] as $opt ) {
76 if ( isset( $params[$opt] ) ) {
77 $redisConf[$opt] = $params[$opt];
78 }
79 }
80 $this->redisPool = RedisConnectionPool::singleton( $redisConf );
81
82 $this->servers = $params['servers'];
83 foreach ( $this->servers as $key => $server ) {
84 $this->serverTagMap[is_int( $key ) ? $server : $key] = $server;
85 }
86
87 $this->automaticFailover = $params['automaticFailover'] ?? true;
88
89 // ...and uses rdb snapshots (redis.conf default)
91 }
92
94 protected function doGet( $key, $flags = 0, &$casToken = null ) {
95 $getToken = ( $casToken === self::PASS_BY_REF );
96 $casToken = null;
97
98 $conn = $this->getConnection( $key );
99 if ( !$conn ) {
100 return false;
101 }
102
103 $e = null;
104 try {
105 $blob = $conn->get( $key );
106 if ( $blob !== false ) {
107 $value = $this->unserialize( $blob );
108 $valueSize = strlen( $blob );
109 } else {
110 $value = false;
111 $valueSize = false;
112 }
113 if ( $getToken && $value !== false ) {
114 $casToken = $blob;
115 }
116 } catch ( RedisException $e ) {
117 $value = false;
118 $valueSize = false;
119 $this->handleException( $conn, $e );
120 }
121
122 $this->logRequest( 'get', $key, $conn->getServer(), $e );
123
124 $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
125
126 return $value;
127 }
128
130 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
131 $conn = $this->getConnection( $key );
132 if ( !$conn ) {
133 return false;
134 }
135
136 $ttl = $this->getExpirationAsTTL( $exptime );
137 $serialized = $this->getSerialized( $value, $key );
138 $valueSize = strlen( $serialized );
139
140 $e = null;
141 try {
142 if ( $ttl ) {
143 $result = $conn->setex( $key, $ttl, $serialized );
144 } else {
145 $result = $conn->set( $key, $serialized );
146 }
147 } catch ( RedisException $e ) {
148 $result = false;
149 $this->handleException( $conn, $e );
150 }
151
152 $this->logRequest( 'set', $key, $conn->getServer(), $e );
153
154 $this->updateOpStats( self::METRIC_OP_SET, [ $key => [ $valueSize, 0 ] ] );
155
156 return $result;
157 }
158
160 protected function doDelete( $key, $flags = 0 ) {
161 $conn = $this->getConnection( $key );
162 if ( !$conn ) {
163 return false;
164 }
165
166 $e = null;
167 try {
168 // Note that redis does not return false if the key was not there
169 $result = ( $conn->del( $key ) !== false );
170 } catch ( RedisException $e ) {
171 $result = false;
172 $this->handleException( $conn, $e );
173 }
174
175 $this->logRequest( 'delete', $key, $conn->getServer(), $e );
176
177 $this->updateOpStats( self::METRIC_OP_DELETE, [ $key ] );
178
179 return $result;
180 }
181
183 protected function doGetMulti( array $keys, $flags = 0 ) {
184 $blobsFound = [];
185
186 [ $keysByServer, $connByServer ] = $this->getConnectionsForKeys( $keys );
187 foreach ( $keysByServer as $server => $batchKeys ) {
188 $conn = $connByServer[$server];
189
190 $e = null;
191 try {
192 // Avoid mget() to reduce CPU hogging from a single request
193 $conn->multi( Redis::PIPELINE );
194 foreach ( $batchKeys as $key ) {
195 $conn->get( $key );
196 }
197 $batchResult = $conn->exec();
198 if ( $batchResult === false ) {
199 $this->logRequest( 'get', implode( ',', $batchKeys ), $server, true );
200 continue;
201 }
202
203 foreach ( $batchResult as $i => $blob ) {
204 if ( $blob !== false ) {
205 $blobsFound[$batchKeys[$i]] = $blob;
206 }
207 }
208 } catch ( RedisException $e ) {
209 $this->handleException( $conn, $e );
210 }
211
212 $this->logRequest( 'get', implode( ',', $batchKeys ), $server, $e );
213 }
214
215 // Preserve the order of $keys
216 $result = [];
217 $valueSizesByKey = [];
218 foreach ( $keys as $key ) {
219 if ( array_key_exists( $key, $blobsFound ) ) {
220 $blob = $blobsFound[$key];
221 $value = $this->unserialize( $blob );
222 if ( $value !== false ) {
223 $result[$key] = $value;
224 }
225 $valueSize = strlen( $blob );
226 } else {
227 $valueSize = false;
228 }
229 $valueSizesByKey[$key] = [ 0, $valueSize ];
230 }
231
232 $this->updateOpStats( self::METRIC_OP_GET, $valueSizesByKey );
233
234 return $result;
235 }
236
238 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
239 $ttl = $this->getExpirationAsTTL( $exptime );
240 $op = $ttl ? 'setex' : 'set';
241
242 $keys = array_keys( $data );
243 $valueSizesByKey = [];
244
245 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
246 foreach ( $keysByServer as $server => $batchKeys ) {
247 $conn = $connByServer[$server];
248
249 $e = null;
250 try {
251 // Avoid mset() to reduce CPU hogging from a single request
252 $conn->multi( Redis::PIPELINE );
253 foreach ( $batchKeys as $key ) {
254 $serialized = $this->getSerialized( $data[$key], $key );
255 if ( $ttl ) {
256 $conn->setex( $key, $ttl, $serialized );
257 } else {
258 $conn->set( $key, $serialized );
259 }
260 $valueSizesByKey[$key] = [ strlen( $serialized ), 0 ];
261 }
262 $batchResult = $conn->exec();
263 if ( $batchResult === false ) {
264 $result = false;
265 $this->logRequest( $op, implode( ',', $batchKeys ), $server, true );
266 continue;
267 }
268
269 $result = $result && !in_array( false, $batchResult, true );
270 } catch ( RedisException $e ) {
271 $this->handleException( $conn, $e );
272 $result = false;
273 }
274
275 $this->logRequest( $op, implode( ',', $batchKeys ), $server, $e );
276 }
277
278 $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
279
280 return $result;
281 }
282
284 protected function doDeleteMulti( array $keys, $flags = 0 ) {
285 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
286 foreach ( $keysByServer as $server => $batchKeys ) {
287 $conn = $connByServer[$server];
288
289 $e = null;
290 try {
291 // Avoid delete() with array to reduce CPU hogging from a single request
292 $conn->multi( Redis::PIPELINE );
293 foreach ( $batchKeys as $key ) {
294 $conn->del( $key );
295 }
296 $batchResult = $conn->exec();
297 if ( $batchResult === false ) {
298 $result = false;
299 $this->logRequest( 'delete', implode( ',', $batchKeys ), $server, true );
300 continue;
301 }
302 // Note that redis does not return false if the key was not there
303 $result = $result && !in_array( false, $batchResult, true );
304 } catch ( RedisException $e ) {
305 $this->handleException( $conn, $e );
306 $result = false;
307 }
308
309 $this->logRequest( 'delete', implode( ',', $batchKeys ), $server, $e );
310 }
311
312 $this->updateOpStats( self::METRIC_OP_DELETE, array_values( $keys ) );
313
314 return $result;
315 }
316
318 public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
319 $relative = $this->isRelativeExpiration( $exptime );
320 $op = ( $exptime == self::TTL_INDEFINITE )
321 ? 'persist'
322 : ( $relative ? 'expire' : 'expireAt' );
323
324 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
325 foreach ( $keysByServer as $server => $batchKeys ) {
326 $conn = $connByServer[$server];
327
328 $e = null;
329 try {
330 $conn->multi( Redis::PIPELINE );
331 foreach ( $batchKeys as $key ) {
332 if ( $exptime == self::TTL_INDEFINITE ) {
333 $conn->persist( $key );
334 } elseif ( $relative ) {
335 $conn->expire( $key, $this->getExpirationAsTTL( $exptime ) );
336 } else {
337 $conn->expireAt( $key, $this->getExpirationAsTimestamp( $exptime ) );
338 }
339 }
340 $batchResult = $conn->exec();
341 if ( $batchResult === false ) {
342 $result = false;
343 $this->logRequest( $op, implode( ',', $batchKeys ), $server, true );
344 continue;
345 }
346 $result = in_array( false, $batchResult, true ) ? false : $result;
347 } catch ( RedisException $e ) {
348 $this->handleException( $conn, $e );
349 $result = false;
350 }
351
352 $this->logRequest( $op, implode( ',', $batchKeys ), $server, $e );
353 }
354
355 $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_values( $keys ) );
356
357 return $result;
358 }
359
361 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
362 $conn = $this->getConnection( $key );
363 if ( !$conn ) {
364 return false;
365 }
366
367 $ttl = $this->getExpirationAsTTL( $exptime );
368 $serialized = $this->getSerialized( $value, $key );
369 $valueSize = strlen( $serialized );
370
371 try {
372 $result = $conn->set(
373 $key,
374 $serialized,
375 $ttl ? [ 'nx', 'ex' => $ttl ] : [ 'nx' ]
376 );
377 } catch ( RedisException $e ) {
378 $result = false;
379 $this->handleException( $conn, $e );
380 }
381
382 $this->logRequest( 'add', $key, $conn->getServer(), $result );
383
384 $this->updateOpStats( self::METRIC_OP_ADD, [ $key => [ $valueSize, 0 ] ] );
385
386 return $result;
387 }
388
390 protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
391 $conn = $this->getConnection( $key );
392 if ( !$conn ) {
393 return false;
394 }
395
396 $ttl = $this->getExpirationAsTTL( $exptime );
397 try {
398 static $script =
400<<<LUA
401 local key = KEYS[1]
402 local ttl, step, init = unpack( ARGV )
403 if redis.call( 'exists', key ) == 1 then
404 return redis.call( 'incrBy', key, step )
405 end
406 if 1 * ttl ~= 0 then
407 redis.call( 'setex', key, ttl, init )
408 else
409 redis.call( 'set', key, init )
410 end
411 return 1 * init
412LUA;
413 $result = $conn->luaEval( $script, [ $key, $ttl, $step, $init ], 1 );
414 } catch ( RedisException $e ) {
415 $result = false;
416 $this->handleException( $conn, $e );
417 }
418 $this->logRequest( 'incrWithInit', $key, $conn->getServer(), $result );
419
420 return $result;
421 }
422
424 protected function doChangeTTL( $key, $exptime, $flags ) {
425 $conn = $this->getConnection( $key );
426 if ( !$conn ) {
427 return false;
428 }
429
430 $relative = $this->isRelativeExpiration( $exptime );
431 try {
432 if ( $exptime == self::TTL_INDEFINITE ) {
433 $result = $conn->persist( $key );
434 $this->logRequest( 'persist', $key, $conn->getServer(), $result );
435 } elseif ( $relative ) {
436 $result = $conn->expire( $key, $this->getExpirationAsTTL( $exptime ) );
437 $this->logRequest( 'expire', $key, $conn->getServer(), $result );
438 } else {
439 $result = $conn->expireAt( $key, $this->getExpirationAsTimestamp( $exptime ) );
440 $this->logRequest( 'expireAt', $key, $conn->getServer(), $result );
441 }
442 } catch ( RedisException $e ) {
443 $result = false;
444 $this->handleException( $conn, $e );
445 }
446
447 $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, [ $key ] );
448
449 return $result;
450 }
451
458 protected function getConnectionsForKeys( array $keys ) {
459 $keysByServer = [];
460 $connByServer = [];
461 $success = true;
462 foreach ( $keys as $key ) {
463 $candidateTags = $this->getCandidateServerTagsForKey( $key );
464
465 $conn = null;
466 // Find a suitable server for this key...
467 // phpcs:ignore Generic.CodeAnalysis.AssignmentInCondition.FoundInWhileCondition
468 while ( ( $tag = array_shift( $candidateTags ) ) !== null ) {
469 $server = $this->serverTagMap[$tag];
470 // Reuse connection handles for keys mapping to the same server
471 if ( isset( $connByServer[$server] ) ) {
472 $conn = $connByServer[$server];
473 } else {
474 $conn = $this->redisPool->getConnection( $server, $this->logger );
475 if ( !$conn ) {
476 continue;
477 }
478 // If automatic failover is enabled, check that the server's link
479 // to its master (if any) is up -- but only if there are other
480 // viable candidates left to consider. Also, getMasterLinkStatus()
481 // does not work with twemproxy, though $candidates will be empty
482 // by now in such cases.
483 if ( $this->automaticFailover && $candidateTags ) {
484 try {
486 $info = $conn->info();
487 if ( ( $info['master_link_status'] ?? null ) === 'down' ) {
488 // If the master cannot be reached, fail-over to the next server.
489 // If masters are in data-center A, and replica DBs in data-center B,
490 // this helps avoid the case were fail-over happens in A but not
491 // to the corresponding server in B (e.g. read/write mismatch).
492 continue;
493 }
494 } catch ( RedisException $e ) {
495 // Server is not accepting commands
496 $this->redisPool->handleError( $conn, $e );
497 continue;
498 }
499 }
500 // Use this connection handle
501 $connByServer[$server] = $conn;
502 }
503 // Use this server for this key
504 $keysByServer[$server][] = $key;
505 break;
506 }
507
508 if ( !$conn ) {
509 // No suitable server found for this key
510 $success = false;
512 }
513 }
514
515 return [ $keysByServer, $connByServer, $success ];
516 }
517
523 protected function getConnection( $key ) {
524 [ , $connByServer ] = $this->getConnectionsForKeys( [ $key ] );
525
526 return reset( $connByServer ) ?: null;
527 }
528
529 private function getCandidateServerTagsForKey( string $key ): array {
530 $candidates = array_keys( $this->serverTagMap );
531
532 if ( count( $this->servers ) > 1 ) {
533 ArrayUtils::consistentHashSort( $candidates, $key, '/' );
534 if ( !$this->automaticFailover ) {
535 $candidates = array_slice( $candidates, 0, 1 );
536 }
537 }
538
539 return $candidates;
540 }
541
547 protected function logError( $msg ) {
548 $this->logger->error( "Redis error: $msg" );
549 }
550
560 protected function handleException( RedisConnRef $conn, RedisException $e ) {
561 $this->setLastError( BagOStuff::ERR_UNEXPECTED );
562 $this->redisPool->handleError( $conn, $e );
563 }
564
573 public function logRequest( $op, $keys, $server, $e = null ) {
574 $this->debug( "$op($keys) on $server: " . ( $e ? "failure" : "success" ) );
575 }
576}
577
579class_alias( RedisBagOStuff::class, 'RedisBagOStuff' );
A collection of static methods to play with arrays.
setLastError( $error)
Set the "last error" registry due to a problem encountered during an attempted operation.
const ATTR_DURABILITY
Key in getQoS() for durability of storage writes.
const QOS_DURABILITY_DISK
Storage survives on disk on a best-effort basis (e.g.
const ERR_UNEXPECTED
Storage operation failed due to usage limitations or an I/O error.
const ERR_UNREACHABLE
Storage operation could not establish a connection.
Helper classs that implements most of BagOStuff for a backend.
getSerialized( $value, $key)
Get the serialized form a value, logging a warning if it involves custom classes.
getExpirationAsTTL( $exptime)
Convert an optionally absolute expiry time to a relative time.
const PASS_BY_REF
Idiom for doGet() to return extra information by reference.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
doDelete( $key, $flags=0)
Delete an item.bool True if the item was deleted or not found, false on failure
doChangeTTL( $key, $exptime, $flags)
bool
array $serverTagMap
Map of (tag => server name)
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.bool Success
doIncrWithInit( $key, $exptime, $step, $init, $flags)
int|bool New value or false on failure
doSetMulti(array $data, $exptime=0, $flags=0)
bool Success
array $servers
List of server names.
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items....
doGet( $key, $flags=0, &$casToken=null)
Get an item.The CAS token should be null if the key does not exist or the value is corruptmixed Retur...
doDeleteMulti(array $keys, $flags=0)
bool Success
doChangeTTLMulti(array $keys, $exptime, $flags=0)
bool Success
handleException(RedisConnRef $conn, RedisException $e)
The redis extension throws an exception in response to various read, write and protocol errors.
__construct( $params)
Construct a RedisBagOStuff object.
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.bool Success
logRequest( $op, $keys, $server, $e=null)
Send information about a single request to the debug log.
Wrapper class for Redis connections that automatically reuses connections (via RAII pattern)
Manage one or more Redis client connection.