MediaWiki master
RedisBagOStuff.php
Go to the documentation of this file.
1<?php
20namespace Wikimedia\ObjectCache;
21
22use ArrayUtils;
23use Exception;
24use Redis;
25use RedisException;
26
44 protected $redisPool;
46 protected $servers;
48 protected $serverTagMap;
51
84 public function __construct( $params ) {
85 parent::__construct( $params );
86 $redisConf = [ 'serializer' => 'none' ]; // manage that in this class
87 foreach ( [ 'connectTimeout', 'persistent', 'password', 'prefix' ] as $opt ) {
88 if ( isset( $params[$opt] ) ) {
89 $redisConf[$opt] = $params[$opt];
90 }
91 }
92 $this->redisPool = RedisConnectionPool::singleton( $redisConf );
93
94 $this->servers = $params['servers'];
95 foreach ( $this->servers as $key => $server ) {
96 $this->serverTagMap[is_int( $key ) ? $server : $key] = $server;
97 }
98
99 $this->automaticFailover = $params['automaticFailover'] ?? true;
100
101 // ...and uses rdb snapshots (redis.conf default)
103 }
104
105 protected function doGet( $key, $flags = 0, &$casToken = null ) {
106 $getToken = ( $casToken === self::PASS_BY_REF );
107 $casToken = null;
108
109 $conn = $this->getConnection( $key );
110 if ( !$conn ) {
111 return false;
112 }
113
114 $e = null;
115 try {
116 $blob = $conn->get( $key );
117 if ( $blob !== false ) {
118 $value = $this->unserialize( $blob );
119 $valueSize = strlen( $blob );
120 } else {
121 $value = false;
122 $valueSize = false;
123 }
124 if ( $getToken && $value !== false ) {
125 $casToken = $blob;
126 }
127 } catch ( RedisException $e ) {
128 $value = false;
129 $valueSize = false;
130 $this->handleException( $conn, $e );
131 }
132
133 $this->logRequest( 'get', $key, $conn->getServer(), $e );
134
135 $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
136
137 return $value;
138 }
139
140 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
141 $conn = $this->getConnection( $key );
142 if ( !$conn ) {
143 return false;
144 }
145
146 $ttl = $this->getExpirationAsTTL( $exptime );
147 $serialized = $this->getSerialized( $value, $key );
148 $valueSize = strlen( $serialized );
149
150 $e = null;
151 try {
152 if ( $ttl ) {
153 $result = $conn->setex( $key, $ttl, $serialized );
154 } else {
155 $result = $conn->set( $key, $serialized );
156 }
157 } catch ( RedisException $e ) {
158 $result = false;
159 $this->handleException( $conn, $e );
160 }
161
162 $this->logRequest( 'set', $key, $conn->getServer(), $e );
163
164 $this->updateOpStats( self::METRIC_OP_SET, [ $key => [ $valueSize, 0 ] ] );
165
166 return $result;
167 }
168
169 protected function doDelete( $key, $flags = 0 ) {
170 $conn = $this->getConnection( $key );
171 if ( !$conn ) {
172 return false;
173 }
174
175 $e = null;
176 try {
177 // Note that redis does not return false if the key was not there
178 $result = ( $conn->del( $key ) !== false );
179 } catch ( RedisException $e ) {
180 $result = false;
181 $this->handleException( $conn, $e );
182 }
183
184 $this->logRequest( 'delete', $key, $conn->getServer(), $e );
185
186 $this->updateOpStats( self::METRIC_OP_DELETE, [ $key ] );
187
188 return $result;
189 }
190
191 protected function doGetMulti( array $keys, $flags = 0 ) {
192 $blobsFound = [];
193
194 [ $keysByServer, $connByServer ] = $this->getConnectionsForKeys( $keys );
195 foreach ( $keysByServer as $server => $batchKeys ) {
196 $conn = $connByServer[$server];
197
198 $e = null;
199 try {
200 // Avoid mget() to reduce CPU hogging from a single request
201 $conn->multi( Redis::PIPELINE );
202 foreach ( $batchKeys as $key ) {
203 $conn->get( $key );
204 }
205 $batchResult = $conn->exec();
206 if ( $batchResult === false ) {
207 $this->logRequest( 'get', implode( ',', $batchKeys ), $server, true );
208 continue;
209 }
210
211 foreach ( $batchResult as $i => $blob ) {
212 if ( $blob !== false ) {
213 $blobsFound[$batchKeys[$i]] = $blob;
214 }
215 }
216 } catch ( RedisException $e ) {
217 $this->handleException( $conn, $e );
218 }
219
220 $this->logRequest( 'get', implode( ',', $batchKeys ), $server, $e );
221 }
222
223 // Preserve the order of $keys
224 $result = [];
225 $valueSizesByKey = [];
226 foreach ( $keys as $key ) {
227 if ( array_key_exists( $key, $blobsFound ) ) {
228 $blob = $blobsFound[$key];
229 $value = $this->unserialize( $blob );
230 if ( $value !== false ) {
231 $result[$key] = $value;
232 }
233 $valueSize = strlen( $blob );
234 } else {
235 $valueSize = false;
236 }
237 $valueSizesByKey[$key] = [ 0, $valueSize ];
238 }
239
240 $this->updateOpStats( self::METRIC_OP_GET, $valueSizesByKey );
241
242 return $result;
243 }
244
245 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
246 $ttl = $this->getExpirationAsTTL( $exptime );
247 $op = $ttl ? 'setex' : 'set';
248
249 $keys = array_keys( $data );
250 $valueSizesByKey = [];
251
252 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
253 foreach ( $keysByServer as $server => $batchKeys ) {
254 $conn = $connByServer[$server];
255
256 $e = null;
257 try {
258 // Avoid mset() to reduce CPU hogging from a single request
259 $conn->multi( Redis::PIPELINE );
260 foreach ( $batchKeys as $key ) {
261 $serialized = $this->getSerialized( $data[$key], $key );
262 if ( $ttl ) {
263 $conn->setex( $key, $ttl, $serialized );
264 } else {
265 $conn->set( $key, $serialized );
266 }
267 $valueSizesByKey[$key] = [ strlen( $serialized ), 0 ];
268 }
269 $batchResult = $conn->exec();
270 if ( $batchResult === false ) {
271 $result = false;
272 $this->logRequest( $op, implode( ',', $batchKeys ), $server, true );
273 continue;
274 }
275
276 $result = $result && !in_array( false, $batchResult, true );
277 } catch ( RedisException $e ) {
278 $this->handleException( $conn, $e );
279 $result = false;
280 }
281
282 $this->logRequest( $op, implode( ',', $batchKeys ), $server, $e );
283 }
284
285 $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
286
287 return $result;
288 }
289
290 protected function doDeleteMulti( array $keys, $flags = 0 ) {
291 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
292 foreach ( $keysByServer as $server => $batchKeys ) {
293 $conn = $connByServer[$server];
294
295 $e = null;
296 try {
297 // Avoid delete() with array to reduce CPU hogging from a single request
298 $conn->multi( Redis::PIPELINE );
299 foreach ( $batchKeys as $key ) {
300 $conn->del( $key );
301 }
302 $batchResult = $conn->exec();
303 if ( $batchResult === false ) {
304 $result = false;
305 $this->logRequest( 'delete', implode( ',', $batchKeys ), $server, true );
306 continue;
307 }
308 // Note that redis does not return false if the key was not there
309 $result = $result && !in_array( false, $batchResult, true );
310 } catch ( RedisException $e ) {
311 $this->handleException( $conn, $e );
312 $result = false;
313 }
314
315 $this->logRequest( 'delete', implode( ',', $batchKeys ), $server, $e );
316 }
317
318 $this->updateOpStats( self::METRIC_OP_DELETE, array_values( $keys ) );
319
320 return $result;
321 }
322
323 public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
324 $relative = $this->isRelativeExpiration( $exptime );
325 $op = ( $exptime == self::TTL_INDEFINITE )
326 ? 'persist'
327 : ( $relative ? 'expire' : 'expireAt' );
328
329 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
330 foreach ( $keysByServer as $server => $batchKeys ) {
331 $conn = $connByServer[$server];
332
333 $e = null;
334 try {
335 $conn->multi( Redis::PIPELINE );
336 foreach ( $batchKeys as $key ) {
337 if ( $exptime == self::TTL_INDEFINITE ) {
338 $conn->persist( $key );
339 } elseif ( $relative ) {
340 $conn->expire( $key, $this->getExpirationAsTTL( $exptime ) );
341 } else {
342 $conn->expireAt( $key, $this->getExpirationAsTimestamp( $exptime ) );
343 }
344 }
345 $batchResult = $conn->exec();
346 if ( $batchResult === false ) {
347 $result = false;
348 $this->logRequest( $op, implode( ',', $batchKeys ), $server, true );
349 continue;
350 }
351 $result = in_array( false, $batchResult, true ) ? false : $result;
352 } catch ( RedisException $e ) {
353 $this->handleException( $conn, $e );
354 $result = false;
355 }
356
357 $this->logRequest( $op, implode( ',', $batchKeys ), $server, $e );
358 }
359
360 $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_values( $keys ) );
361
362 return $result;
363 }
364
365 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
366 $conn = $this->getConnection( $key );
367 if ( !$conn ) {
368 return false;
369 }
370
371 $ttl = $this->getExpirationAsTTL( $exptime );
372 $serialized = $this->getSerialized( $value, $key );
373 $valueSize = strlen( $serialized );
374
375 try {
376 $result = $conn->set(
377 $key,
378 $serialized,
379 $ttl ? [ 'nx', 'ex' => $ttl ] : [ 'nx' ]
380 );
381 } catch ( RedisException $e ) {
382 $result = false;
383 $this->handleException( $conn, $e );
384 }
385
386 $this->logRequest( 'add', $key, $conn->getServer(), $result );
387
388 $this->updateOpStats( self::METRIC_OP_ADD, [ $key => [ $valueSize, 0 ] ] );
389
390 return $result;
391 }
392
393 protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
394 $conn = $this->getConnection( $key );
395 if ( !$conn ) {
396 return false;
397 }
398
399 $ttl = $this->getExpirationAsTTL( $exptime );
400 try {
401 static $script =
403<<<LUA
404 local key = KEYS[1]
405 local ttl, step, init = unpack( ARGV )
406 if redis.call( 'exists', key ) == 1 then
407 return redis.call( 'incrBy', key, step )
408 end
409 if 1 * ttl ~= 0 then
410 redis.call( 'setex', key, ttl, init )
411 else
412 redis.call( 'set', key, init )
413 end
414 return 1 * init
415LUA;
416 $result = $conn->luaEval( $script, [ $key, $ttl, $step, $init ], 1 );
417 } catch ( RedisException $e ) {
418 $result = false;
419 $this->handleException( $conn, $e );
420 }
421 $this->logRequest( 'incrWithInit', $key, $conn->getServer(), $result );
422
423 return $result;
424 }
425
426 protected function doChangeTTL( $key, $exptime, $flags ) {
427 $conn = $this->getConnection( $key );
428 if ( !$conn ) {
429 return false;
430 }
431
432 $relative = $this->isRelativeExpiration( $exptime );
433 try {
434 if ( $exptime == self::TTL_INDEFINITE ) {
435 $result = $conn->persist( $key );
436 $this->logRequest( 'persist', $key, $conn->getServer(), $result );
437 } elseif ( $relative ) {
438 $result = $conn->expire( $key, $this->getExpirationAsTTL( $exptime ) );
439 $this->logRequest( 'expire', $key, $conn->getServer(), $result );
440 } else {
441 $result = $conn->expireAt( $key, $this->getExpirationAsTimestamp( $exptime ) );
442 $this->logRequest( 'expireAt', $key, $conn->getServer(), $result );
443 }
444 } catch ( RedisException $e ) {
445 $result = false;
446 $this->handleException( $conn, $e );
447 }
448
449 $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, [ $key ] );
450
451 return $result;
452 }
453
460 protected function getConnectionsForKeys( array $keys ) {
461 $keysByServer = [];
462 $connByServer = [];
463 $success = true;
464 foreach ( $keys as $key ) {
465 $candidateTags = $this->getCandidateServerTagsForKey( $key );
466
467 $conn = null;
468 // Find a suitable server for this key...
469 // phpcs:ignore Generic.CodeAnalysis.AssignmentInCondition.FoundInWhileCondition
470 while ( ( $tag = array_shift( $candidateTags ) ) !== null ) {
471 $server = $this->serverTagMap[$tag];
472 // Reuse connection handles for keys mapping to the same server
473 if ( isset( $connByServer[$server] ) ) {
474 $conn = $connByServer[$server];
475 } else {
476 $conn = $this->redisPool->getConnection( $server, $this->logger );
477 if ( !$conn ) {
478 continue;
479 }
480 // If automatic failover is enabled, check that the server's link
481 // to its master (if any) is up -- but only if there are other
482 // viable candidates left to consider. Also, getMasterLinkStatus()
483 // does not work with twemproxy, though $candidates will be empty
484 // by now in such cases.
485 if ( $this->automaticFailover && $candidateTags ) {
486 try {
488 $info = $conn->info();
489 if ( ( $info['master_link_status'] ?? null ) === 'down' ) {
490 // If the master cannot be reached, fail-over to the next server.
491 // If masters are in data-center A, and replica DBs in data-center B,
492 // this helps avoid the case were fail-over happens in A but not
493 // to the corresponding server in B (e.g. read/write mismatch).
494 continue;
495 }
496 } catch ( RedisException $e ) {
497 // Server is not accepting commands
498 $this->redisPool->handleError( $conn, $e );
499 continue;
500 }
501 }
502 // Use this connection handle
503 $connByServer[$server] = $conn;
504 }
505 // Use this server for this key
506 $keysByServer[$server][] = $key;
507 break;
508 }
509
510 if ( !$conn ) {
511 // No suitable server found for this key
512 $success = false;
514 }
515 }
516
517 return [ $keysByServer, $connByServer, $success ];
518 }
519
525 protected function getConnection( $key ) {
526 [ , $connByServer ] = $this->getConnectionsForKeys( [ $key ] );
527
528 return reset( $connByServer ) ?: null;
529 }
530
531 private function getCandidateServerTagsForKey( string $key ): array {
532 $candidates = array_keys( $this->serverTagMap );
533
534 if ( count( $this->servers ) > 1 ) {
535 ArrayUtils::consistentHashSort( $candidates, $key, '/' );
536 if ( !$this->automaticFailover ) {
537 $candidates = array_slice( $candidates, 0, 1 );
538 }
539 }
540
541 return $candidates;
542 }
543
549 protected function logError( $msg ) {
550 $this->logger->error( "Redis error: $msg" );
551 }
552
562 protected function handleException( RedisConnRef $conn, RedisException $e ) {
563 $this->setLastError( BagOStuff::ERR_UNEXPECTED );
564 $this->redisPool->handleError( $conn, $e );
565 }
566
575 public function logRequest( $op, $keys, $server, $e = null ) {
576 $this->debug( "$op($keys) on $server: " . ( $e ? "failure" : "success" ) );
577 }
578}
579
581class_alias( RedisBagOStuff::class, 'RedisBagOStuff' );
A collection of static methods to play with arrays.
setLastError( $error)
Set the "last error" registry due to a problem encountered during an attempted operation.
const ATTR_DURABILITY
Key in getQoS() for durability of storage writes.
const QOS_DURABILITY_DISK
Storage survives on disk on a best-effort basis (e.g.
const ERR_UNEXPECTED
Storage operation failed due to usage limitations or an I/O error.
const ERR_UNREACHABLE
Storage operation could not establish a connection.
Helper classs that implements most of BagOStuff for a backend.
getSerialized( $value, $key)
Get the serialized form a value, logging a warning if it involves custom classes.
getExpirationAsTTL( $exptime)
Convert an optionally absolute expiry time to a relative time.
const PASS_BY_REF
Idiom for doGet() to return extra information by reference.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
doDelete( $key, $flags=0)
Delete an item.
doChangeTTL( $key, $exptime, $flags)
array $serverTagMap
Map of (tag => server name)
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
doIncrWithInit( $key, $exptime, $step, $init, $flags)
doSetMulti(array $data, $exptime=0, $flags=0)
array $servers
List of server names.
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
doGet( $key, $flags=0, &$casToken=null)
Get an item.
doChangeTTLMulti(array $keys, $exptime, $flags=0)
handleException(RedisConnRef $conn, RedisException $e)
The redis extension throws an exception in response to various read, write and protocol errors.
__construct( $params)
Construct a RedisBagOStuff object.
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
logRequest( $op, $keys, $server, $e=null)
Send information about a single request to the debug log.
Wrapper class for Redis connections that automatically reuses connections (via RAII pattern)
Manage one or more Redis client connection.