MediaWiki master
RedisBagOStuff.php
Go to the documentation of this file.
1<?php
20namespace Wikimedia\ObjectCache;
21
22use ArrayUtils;
23use Exception;
24use Redis;
25use RedisException;
26
44 protected $redisPool;
46 protected $servers;
48 protected $serverTagMap;
51
81 public function __construct( $params ) {
82 parent::__construct( $params );
83 $redisConf = [ 'serializer' => 'none' ]; // manage that in this class
84 foreach ( [ 'connectTimeout', 'persistent', 'password' ] as $opt ) {
85 if ( isset( $params[$opt] ) ) {
86 $redisConf[$opt] = $params[$opt];
87 }
88 }
89 $this->redisPool = RedisConnectionPool::singleton( $redisConf );
90
91 $this->servers = $params['servers'];
92 foreach ( $this->servers as $key => $server ) {
93 $this->serverTagMap[is_int( $key ) ? $server : $key] = $server;
94 }
95
96 $this->automaticFailover = $params['automaticFailover'] ?? true;
97
98 // ...and uses rdb snapshots (redis.conf default)
100 }
101
102 protected function doGet( $key, $flags = 0, &$casToken = null ) {
103 $getToken = ( $casToken === self::PASS_BY_REF );
104 $casToken = null;
105
106 $conn = $this->getConnection( $key );
107 if ( !$conn ) {
108 return false;
109 }
110
111 $e = null;
112 try {
113 $blob = $conn->get( $key );
114 if ( $blob !== false ) {
115 $value = $this->unserialize( $blob );
116 $valueSize = strlen( $blob );
117 } else {
118 $value = false;
119 $valueSize = false;
120 }
121 if ( $getToken && $value !== false ) {
122 $casToken = $blob;
123 }
124 } catch ( RedisException $e ) {
125 $value = false;
126 $valueSize = false;
127 $this->handleException( $conn, $e );
128 }
129
130 $this->logRequest( 'get', $key, $conn->getServer(), $e );
131
132 $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
133
134 return $value;
135 }
136
137 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
138 $conn = $this->getConnection( $key );
139 if ( !$conn ) {
140 return false;
141 }
142
143 $ttl = $this->getExpirationAsTTL( $exptime );
144 $serialized = $this->getSerialized( $value, $key );
145 $valueSize = strlen( $serialized );
146
147 $e = null;
148 try {
149 if ( $ttl ) {
150 $result = $conn->setex( $key, $ttl, $serialized );
151 } else {
152 $result = $conn->set( $key, $serialized );
153 }
154 } catch ( RedisException $e ) {
155 $result = false;
156 $this->handleException( $conn, $e );
157 }
158
159 $this->logRequest( 'set', $key, $conn->getServer(), $e );
160
161 $this->updateOpStats( self::METRIC_OP_SET, [ $key => [ $valueSize, 0 ] ] );
162
163 return $result;
164 }
165
166 protected function doDelete( $key, $flags = 0 ) {
167 $conn = $this->getConnection( $key );
168 if ( !$conn ) {
169 return false;
170 }
171
172 $e = null;
173 try {
174 // Note that redis does not return false if the key was not there
175 $result = ( $conn->del( $key ) !== false );
176 } catch ( RedisException $e ) {
177 $result = false;
178 $this->handleException( $conn, $e );
179 }
180
181 $this->logRequest( 'delete', $key, $conn->getServer(), $e );
182
183 $this->updateOpStats( self::METRIC_OP_DELETE, [ $key ] );
184
185 return $result;
186 }
187
188 protected function doGetMulti( array $keys, $flags = 0 ) {
189 $blobsFound = [];
190
191 [ $keysByServer, $connByServer ] = $this->getConnectionsForKeys( $keys );
192 foreach ( $keysByServer as $server => $batchKeys ) {
193 $conn = $connByServer[$server];
194
195 $e = null;
196 try {
197 // Avoid mget() to reduce CPU hogging from a single request
198 $conn->multi( Redis::PIPELINE );
199 foreach ( $batchKeys as $key ) {
200 $conn->get( $key );
201 }
202 $batchResult = $conn->exec();
203 if ( $batchResult === false ) {
204 $this->logRequest( 'get', implode( ',', $batchKeys ), $server, true );
205 continue;
206 }
207
208 foreach ( $batchResult as $i => $blob ) {
209 if ( $blob !== false ) {
210 $blobsFound[$batchKeys[$i]] = $blob;
211 }
212 }
213 } catch ( RedisException $e ) {
214 $this->handleException( $conn, $e );
215 }
216
217 $this->logRequest( 'get', implode( ',', $batchKeys ), $server, $e );
218 }
219
220 // Preserve the order of $keys
221 $result = [];
222 $valueSizesByKey = [];
223 foreach ( $keys as $key ) {
224 if ( array_key_exists( $key, $blobsFound ) ) {
225 $blob = $blobsFound[$key];
226 $value = $this->unserialize( $blob );
227 if ( $value !== false ) {
228 $result[$key] = $value;
229 }
230 $valueSize = strlen( $blob );
231 } else {
232 $valueSize = false;
233 }
234 $valueSizesByKey[$key] = [ 0, $valueSize ];
235 }
236
237 $this->updateOpStats( self::METRIC_OP_GET, $valueSizesByKey );
238
239 return $result;
240 }
241
242 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
243 $ttl = $this->getExpirationAsTTL( $exptime );
244 $op = $ttl ? 'setex' : 'set';
245
246 $keys = array_keys( $data );
247 $valueSizesByKey = [];
248
249 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
250 foreach ( $keysByServer as $server => $batchKeys ) {
251 $conn = $connByServer[$server];
252
253 $e = null;
254 try {
255 // Avoid mset() to reduce CPU hogging from a single request
256 $conn->multi( Redis::PIPELINE );
257 foreach ( $batchKeys as $key ) {
258 $serialized = $this->getSerialized( $data[$key], $key );
259 if ( $ttl ) {
260 $conn->setex( $key, $ttl, $serialized );
261 } else {
262 $conn->set( $key, $serialized );
263 }
264 $valueSizesByKey[$key] = [ strlen( $serialized ), 0 ];
265 }
266 $batchResult = $conn->exec();
267 if ( $batchResult === false ) {
268 $result = false;
269 $this->logRequest( $op, implode( ',', $batchKeys ), $server, true );
270 continue;
271 }
272
273 $result = $result && !in_array( false, $batchResult, true );
274 } catch ( RedisException $e ) {
275 $this->handleException( $conn, $e );
276 $result = false;
277 }
278
279 $this->logRequest( $op, implode( ',', $batchKeys ), $server, $e );
280 }
281
282 $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
283
284 return $result;
285 }
286
287 protected function doDeleteMulti( array $keys, $flags = 0 ) {
288 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
289 foreach ( $keysByServer as $server => $batchKeys ) {
290 $conn = $connByServer[$server];
291
292 $e = null;
293 try {
294 // Avoid delete() with array to reduce CPU hogging from a single request
295 $conn->multi( Redis::PIPELINE );
296 foreach ( $batchKeys as $key ) {
297 $conn->del( $key );
298 }
299 $batchResult = $conn->exec();
300 if ( $batchResult === false ) {
301 $result = false;
302 $this->logRequest( 'delete', implode( ',', $batchKeys ), $server, true );
303 continue;
304 }
305 // Note that redis does not return false if the key was not there
306 $result = $result && !in_array( false, $batchResult, true );
307 } catch ( RedisException $e ) {
308 $this->handleException( $conn, $e );
309 $result = false;
310 }
311
312 $this->logRequest( 'delete', implode( ',', $batchKeys ), $server, $e );
313 }
314
315 $this->updateOpStats( self::METRIC_OP_DELETE, array_values( $keys ) );
316
317 return $result;
318 }
319
320 public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
321 $relative = $this->isRelativeExpiration( $exptime );
322 $op = ( $exptime == self::TTL_INDEFINITE )
323 ? 'persist'
324 : ( $relative ? 'expire' : 'expireAt' );
325
326 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
327 foreach ( $keysByServer as $server => $batchKeys ) {
328 $conn = $connByServer[$server];
329
330 $e = null;
331 try {
332 $conn->multi( Redis::PIPELINE );
333 foreach ( $batchKeys as $key ) {
334 if ( $exptime == self::TTL_INDEFINITE ) {
335 $conn->persist( $key );
336 } elseif ( $relative ) {
337 $conn->expire( $key, $this->getExpirationAsTTL( $exptime ) );
338 } else {
339 $conn->expireAt( $key, $this->getExpirationAsTimestamp( $exptime ) );
340 }
341 }
342 $batchResult = $conn->exec();
343 if ( $batchResult === false ) {
344 $result = false;
345 $this->logRequest( $op, implode( ',', $batchKeys ), $server, true );
346 continue;
347 }
348 $result = in_array( false, $batchResult, true ) ? false : $result;
349 } catch ( RedisException $e ) {
350 $this->handleException( $conn, $e );
351 $result = false;
352 }
353
354 $this->logRequest( $op, implode( ',', $batchKeys ), $server, $e );
355 }
356
357 $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_values( $keys ) );
358
359 return $result;
360 }
361
362 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
363 $conn = $this->getConnection( $key );
364 if ( !$conn ) {
365 return false;
366 }
367
368 $ttl = $this->getExpirationAsTTL( $exptime );
369 $serialized = $this->getSerialized( $value, $key );
370 $valueSize = strlen( $serialized );
371
372 try {
373 $result = $conn->set(
374 $key,
375 $serialized,
376 $ttl ? [ 'nx', 'ex' => $ttl ] : [ 'nx' ]
377 );
378 } catch ( RedisException $e ) {
379 $result = false;
380 $this->handleException( $conn, $e );
381 }
382
383 $this->logRequest( 'add', $key, $conn->getServer(), $result );
384
385 $this->updateOpStats( self::METRIC_OP_ADD, [ $key => [ $valueSize, 0 ] ] );
386
387 return $result;
388 }
389
390 protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
391 $conn = $this->getConnection( $key );
392 if ( !$conn ) {
393 return false;
394 }
395
396 $ttl = $this->getExpirationAsTTL( $exptime );
397 try {
398 static $script =
400<<<LUA
401 local key = KEYS[1]
402 local ttl, step, init = unpack( ARGV )
403 if redis.call( 'exists', key ) == 1 then
404 return redis.call( 'incrBy', key, step )
405 end
406 if 1 * ttl ~= 0 then
407 redis.call( 'setex', key, ttl, init )
408 else
409 redis.call( 'set', key, init )
410 end
411 return 1 * init
412LUA;
413 $result = $conn->luaEval( $script, [ $key, $ttl, $step, $init ], 1 );
414 } catch ( RedisException $e ) {
415 $result = false;
416 $this->handleException( $conn, $e );
417 }
418 $this->logRequest( 'incrWithInit', $key, $conn->getServer(), $result );
419
420 return $result;
421 }
422
423 protected function doChangeTTL( $key, $exptime, $flags ) {
424 $conn = $this->getConnection( $key );
425 if ( !$conn ) {
426 return false;
427 }
428
429 $relative = $this->isRelativeExpiration( $exptime );
430 try {
431 if ( $exptime == self::TTL_INDEFINITE ) {
432 $result = $conn->persist( $key );
433 $this->logRequest( 'persist', $key, $conn->getServer(), $result );
434 } elseif ( $relative ) {
435 $result = $conn->expire( $key, $this->getExpirationAsTTL( $exptime ) );
436 $this->logRequest( 'expire', $key, $conn->getServer(), $result );
437 } else {
438 $result = $conn->expireAt( $key, $this->getExpirationAsTimestamp( $exptime ) );
439 $this->logRequest( 'expireAt', $key, $conn->getServer(), $result );
440 }
441 } catch ( RedisException $e ) {
442 $result = false;
443 $this->handleException( $conn, $e );
444 }
445
446 $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, [ $key ] );
447
448 return $result;
449 }
450
457 protected function getConnectionsForKeys( array $keys ) {
458 $keysByServer = [];
459 $connByServer = [];
460 $success = true;
461 foreach ( $keys as $key ) {
462 $candidateTags = $this->getCandidateServerTagsForKey( $key );
463
464 $conn = null;
465 // Find a suitable server for this key...
466 // phpcs:ignore Generic.CodeAnalysis.AssignmentInCondition.FoundInWhileCondition
467 while ( ( $tag = array_shift( $candidateTags ) ) !== null ) {
468 $server = $this->serverTagMap[$tag];
469 // Reuse connection handles for keys mapping to the same server
470 if ( isset( $connByServer[$server] ) ) {
471 $conn = $connByServer[$server];
472 } else {
473 $conn = $this->redisPool->getConnection( $server, $this->logger );
474 if ( !$conn ) {
475 continue;
476 }
477 // If automatic failover is enabled, check that the server's link
478 // to its master (if any) is up -- but only if there are other
479 // viable candidates left to consider. Also, getMasterLinkStatus()
480 // does not work with twemproxy, though $candidates will be empty
481 // by now in such cases.
482 if ( $this->automaticFailover && $candidateTags ) {
483 try {
485 $info = $conn->info();
486 if ( ( $info['master_link_status'] ?? null ) === 'down' ) {
487 // If the master cannot be reached, fail-over to the next server.
488 // If masters are in data-center A, and replica DBs in data-center B,
489 // this helps avoid the case were fail-over happens in A but not
490 // to the corresponding server in B (e.g. read/write mismatch).
491 continue;
492 }
493 } catch ( RedisException $e ) {
494 // Server is not accepting commands
495 $this->redisPool->handleError( $conn, $e );
496 continue;
497 }
498 }
499 // Use this connection handle
500 $connByServer[$server] = $conn;
501 }
502 // Use this server for this key
503 $keysByServer[$server][] = $key;
504 break;
505 }
506
507 if ( !$conn ) {
508 // No suitable server found for this key
509 $success = false;
511 }
512 }
513
514 return [ $keysByServer, $connByServer, $success ];
515 }
516
522 protected function getConnection( $key ) {
523 [ , $connByServer ] = $this->getConnectionsForKeys( [ $key ] );
524
525 return reset( $connByServer ) ?: null;
526 }
527
528 private function getCandidateServerTagsForKey( string $key ): array {
529 $candidates = array_keys( $this->serverTagMap );
530
531 if ( count( $this->servers ) > 1 ) {
532 ArrayUtils::consistentHashSort( $candidates, $key, '/' );
533 if ( !$this->automaticFailover ) {
534 $candidates = array_slice( $candidates, 0, 1 );
535 }
536 }
537
538 return $candidates;
539 }
540
546 protected function logError( $msg ) {
547 $this->logger->error( "Redis error: $msg" );
548 }
549
559 protected function handleException( RedisConnRef $conn, RedisException $e ) {
561 $this->redisPool->handleError( $conn, $e );
562 }
563
572 public function logRequest( $op, $keys, $server, $e = null ) {
573 $this->debug( "$op($keys) on $server: " . ( $e ? "failure" : "success" ) );
574 }
575}
576
578class_alias( RedisBagOStuff::class, 'RedisBagOStuff' );
array $params
The job parameters.
setLastError( $error)
This is actually implemented in the Job class.
A collection of static methods to play with arrays.
static consistentHashSort(&$array, $key, $separator="\000")
Sort the given array in a pseudo-random order which depends only on the given key and each element va...
setLastError( $error)
Set the "last error" registry due to a problem encountered during an attempted operation.
Helper classs that implements most of BagOStuff for a backend.
getSerialized( $value, $key)
Get the serialized form a value, logging a warning if it involves custom classes.
getExpirationAsTTL( $exptime)
Convert an optionally absolute expiry time to a relative time.
const PASS_BY_REF
Idiom for doGet() to return extra information by reference.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
doDelete( $key, $flags=0)
Delete an item.
doChangeTTL( $key, $exptime, $flags)
array $serverTagMap
Map of (tag => server name)
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
doIncrWithInit( $key, $exptime, $step, $init, $flags)
doSetMulti(array $data, $exptime=0, $flags=0)
array $servers
List of server names.
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
doGet( $key, $flags=0, &$casToken=null)
Get an item.
doChangeTTLMulti(array $keys, $exptime, $flags=0)
handleException(RedisConnRef $conn, RedisException $e)
The redis extension throws an exception in response to various read, write and protocol errors.
__construct( $params)
Construct a RedisBagOStuff object.
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
logRequest( $op, $keys, $server, $e=null)
Send information about a single request to the debug log.
Wrapper class for Redis connections that automatically reuses connections (via RAII pattern)
Manage one or more Redis client connection.
const QOS_DURABILITY_DISK
Data is saved to disk and writes do not usually block on fsync()
const ERR_UNREACHABLE
Storage medium could not be reached to establish a connection.
const ATTR_DURABILITY
Durability of writes; see QOS_DURABILITY_* (higher means stronger)
const ERR_UNEXPECTED
Storage medium operation failed due to usage limitations or an I/O error.