MediaWiki master
RedisBagOStuff.php
Go to the documentation of this file.
1<?php
23namespace Wikimedia\ObjectCache;
24
25use ArrayUtils;
26use Exception;
27use Redis;
29use RedisConnRef;
30use RedisException;
31
44 protected $redisPool;
46 protected $servers;
48 protected $serverTagMap;
51
81 public function __construct( $params ) {
82 parent::__construct( $params );
83 $redisConf = [ 'serializer' => 'none' ]; // manage that in this class
84 foreach ( [ 'connectTimeout', 'persistent', 'password' ] as $opt ) {
85 if ( isset( $params[$opt] ) ) {
86 $redisConf[$opt] = $params[$opt];
87 }
88 }
89 $this->redisPool = RedisConnectionPool::singleton( $redisConf );
90
91 $this->servers = $params['servers'];
92 foreach ( $this->servers as $key => $server ) {
93 $this->serverTagMap[is_int( $key ) ? $server : $key] = $server;
94 }
95
96 $this->automaticFailover = $params['automaticFailover'] ?? true;
97
98 // ...and uses rdb snapshots (redis.conf default)
100 }
101
102 protected function doGet( $key, $flags = 0, &$casToken = null ) {
103 $getToken = ( $casToken === self::PASS_BY_REF );
104 $casToken = null;
105
106 $conn = $this->getConnection( $key );
107 if ( !$conn ) {
108 return false;
109 }
110
111 $e = null;
112 try {
113 $blob = $conn->get( $key );
114 if ( $blob !== false ) {
115 $value = $this->unserialize( $blob );
116 $valueSize = strlen( $blob );
117 } else {
118 $value = false;
119 $valueSize = false;
120 }
121 if ( $getToken && $value !== false ) {
122 $casToken = $blob;
123 }
124 } catch ( RedisException $e ) {
125 $value = false;
126 $valueSize = false;
127 $this->handleException( $conn, $e );
128 }
129
130 $this->logRequest( 'get', $key, $conn->getServer(), $e );
131
132 $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
133
134 return $value;
135 }
136
137 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
138 $conn = $this->getConnection( $key );
139 if ( !$conn ) {
140 return false;
141 }
142
143 $ttl = $this->getExpirationAsTTL( $exptime );
144 $serialized = $this->getSerialized( $value, $key );
145 $valueSize = strlen( $serialized );
146
147 $e = null;
148 try {
149 if ( $ttl ) {
150 $result = $conn->setex( $key, $ttl, $serialized );
151 } else {
152 $result = $conn->set( $key, $serialized );
153 }
154 } catch ( RedisException $e ) {
155 $result = false;
156 $this->handleException( $conn, $e );
157 }
158
159 $this->logRequest( 'set', $key, $conn->getServer(), $e );
160
161 $this->updateOpStats( self::METRIC_OP_SET, [ $key => [ $valueSize, 0 ] ] );
162
163 return $result;
164 }
165
166 protected function doDelete( $key, $flags = 0 ) {
167 $conn = $this->getConnection( $key );
168 if ( !$conn ) {
169 return false;
170 }
171
172 $e = null;
173 try {
174 // Note that redis does not return false if the key was not there
175 $result = ( $conn->del( $key ) !== false );
176 } catch ( RedisException $e ) {
177 $result = false;
178 $this->handleException( $conn, $e );
179 }
180
181 $this->logRequest( 'delete', $key, $conn->getServer(), $e );
182
183 $this->updateOpStats( self::METRIC_OP_DELETE, [ $key ] );
184
185 return $result;
186 }
187
188 protected function doGetMulti( array $keys, $flags = 0 ) {
189 $blobsFound = [];
190
191 [ $keysByServer, $connByServer ] = $this->getConnectionsForKeys( $keys );
192 foreach ( $keysByServer as $server => $batchKeys ) {
193 $conn = $connByServer[$server];
194
195 $e = null;
196 try {
197 // Avoid mget() to reduce CPU hogging from a single request
198 $conn->multi( Redis::PIPELINE );
199 foreach ( $batchKeys as $key ) {
200 $conn->get( $key );
201 }
202 $batchResult = $conn->exec();
203 if ( $batchResult === false ) {
204 $this->logRequest( 'get', implode( ',', $batchKeys ), $server, true );
205 continue;
206 }
207
208 foreach ( $batchResult as $i => $blob ) {
209 if ( $blob !== false ) {
210 $blobsFound[$batchKeys[$i]] = $blob;
211 }
212 }
213 } catch ( RedisException $e ) {
214 $this->handleException( $conn, $e );
215 }
216
217 $this->logRequest( 'get', implode( ',', $batchKeys ), $server, $e );
218 }
219
220 // Preserve the order of $keys
221 $result = [];
222 $valueSizesByKey = [];
223 foreach ( $keys as $key ) {
224 if ( array_key_exists( $key, $blobsFound ) ) {
225 $blob = $blobsFound[$key];
226 $value = $this->unserialize( $blob );
227 if ( $value !== false ) {
228 $result[$key] = $value;
229 }
230 $valueSize = strlen( $blob );
231 } else {
232 $valueSize = false;
233 }
234 $valueSizesByKey[$key] = [ 0, $valueSize ];
235 }
236
237 $this->updateOpStats( self::METRIC_OP_GET, $valueSizesByKey );
238
239 return $result;
240 }
241
242 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
243 $ttl = $this->getExpirationAsTTL( $exptime );
244 $op = $ttl ? 'setex' : 'set';
245
246 $keys = array_keys( $data );
247 $valueSizesByKey = [];
248
249 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
250 foreach ( $keysByServer as $server => $batchKeys ) {
251 $conn = $connByServer[$server];
252
253 $e = null;
254 try {
255 // Avoid mset() to reduce CPU hogging from a single request
256 $conn->multi( Redis::PIPELINE );
257 foreach ( $batchKeys as $key ) {
258 $serialized = $this->getSerialized( $data[$key], $key );
259 if ( $ttl ) {
260 $conn->setex( $key, $ttl, $serialized );
261 } else {
262 $conn->set( $key, $serialized );
263 }
264 $valueSizesByKey[$key] = [ strlen( $serialized ), 0 ];
265 }
266 $batchResult = $conn->exec();
267 if ( $batchResult === false ) {
268 $result = false;
269 $this->logRequest( $op, implode( ',', $batchKeys ), $server, true );
270 continue;
271 }
272
273 $result = $result && !in_array( false, $batchResult, true );
274 } catch ( RedisException $e ) {
275 $this->handleException( $conn, $e );
276 $result = false;
277 }
278
279 $this->logRequest( $op, implode( ',', $batchKeys ), $server, $e );
280 }
281
282 $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
283
284 return $result;
285 }
286
287 protected function doDeleteMulti( array $keys, $flags = 0 ) {
288 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
289 foreach ( $keysByServer as $server => $batchKeys ) {
290 $conn = $connByServer[$server];
291
292 $e = null;
293 try {
294 // Avoid delete() with array to reduce CPU hogging from a single request
295 $conn->multi( Redis::PIPELINE );
296 foreach ( $batchKeys as $key ) {
297 $conn->del( $key );
298 }
299 $batchResult = $conn->exec();
300 if ( $batchResult === false ) {
301 $result = false;
302 $this->logRequest( 'delete', implode( ',', $batchKeys ), $server, true );
303 continue;
304 }
305 // Note that redis does not return false if the key was not there
306 $result = $result && !in_array( false, $batchResult, true );
307 } catch ( RedisException $e ) {
308 $this->handleException( $conn, $e );
309 $result = false;
310 }
311
312 $this->logRequest( 'delete', implode( ',', $batchKeys ), $server, $e );
313 }
314
315 $this->updateOpStats( self::METRIC_OP_DELETE, array_values( $keys ) );
316
317 return $result;
318 }
319
320 public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
321 $relative = $this->isRelativeExpiration( $exptime );
322 $op = ( $exptime == self::TTL_INDEFINITE )
323 ? 'persist'
324 : ( $relative ? 'expire' : 'expireAt' );
325
326 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
327 foreach ( $keysByServer as $server => $batchKeys ) {
328 $conn = $connByServer[$server];
329
330 $e = null;
331 try {
332 $conn->multi( Redis::PIPELINE );
333 foreach ( $batchKeys as $key ) {
334 if ( $exptime == self::TTL_INDEFINITE ) {
335 $conn->persist( $key );
336 } elseif ( $relative ) {
337 $conn->expire( $key, $this->getExpirationAsTTL( $exptime ) );
338 } else {
339 $conn->expireAt( $key, $this->getExpirationAsTimestamp( $exptime ) );
340 }
341 }
342 $batchResult = $conn->exec();
343 if ( $batchResult === false ) {
344 $result = false;
345 $this->logRequest( $op, implode( ',', $batchKeys ), $server, true );
346 continue;
347 }
348 $result = in_array( false, $batchResult, true ) ? false : $result;
349 } catch ( RedisException $e ) {
350 $this->handleException( $conn, $e );
351 $result = false;
352 }
353
354 $this->logRequest( $op, implode( ',', $batchKeys ), $server, $e );
355 }
356
357 $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_values( $keys ) );
358
359 return $result;
360 }
361
362 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
363 $conn = $this->getConnection( $key );
364 if ( !$conn ) {
365 return false;
366 }
367
368 $ttl = $this->getExpirationAsTTL( $exptime );
369 $serialized = $this->getSerialized( $value, $key );
370 $valueSize = strlen( $serialized );
371
372 try {
373 $result = $conn->set(
374 $key,
375 $serialized,
376 $ttl ? [ 'nx', 'ex' => $ttl ] : [ 'nx' ]
377 );
378 } catch ( RedisException $e ) {
379 $result = false;
380 $this->handleException( $conn, $e );
381 }
382
383 $this->logRequest( 'add', $key, $conn->getServer(), $result );
384
385 $this->updateOpStats( self::METRIC_OP_ADD, [ $key => [ $valueSize, 0 ] ] );
386
387 return $result;
388 }
389
390 protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
391 $conn = $this->getConnection( $key );
392 if ( !$conn ) {
393 return false;
394 }
395
396 $ttl = $this->getExpirationAsTTL( $exptime );
397 try {
398 static $script =
400<<<LUA
401 local key = KEYS[1]
402 local ttl, step, init = unpack( ARGV )
403 if redis.call( 'exists', key ) == 1 then
404 return redis.call( 'incrBy', key, step )
405 end
406 if 1 * ttl ~= 0 then
407 redis.call( 'setex', key, ttl, init )
408 else
409 redis.call( 'set', key, init )
410 end
411 return 1 * init
412LUA;
413 $result = $conn->luaEval( $script, [ $key, $ttl, $step, $init ], 1 );
414 } catch ( RedisException $e ) {
415 $result = false;
416 $this->handleException( $conn, $e );
417 }
418 $this->logRequest( 'incrWithInit', $key, $conn->getServer(), $result );
419
420 return $result;
421 }
422
423 protected function doChangeTTL( $key, $exptime, $flags ) {
424 $conn = $this->getConnection( $key );
425 if ( !$conn ) {
426 return false;
427 }
428
429 $relative = $this->isRelativeExpiration( $exptime );
430 try {
431 if ( $exptime == self::TTL_INDEFINITE ) {
432 $result = $conn->persist( $key );
433 $this->logRequest( 'persist', $key, $conn->getServer(), $result );
434 } elseif ( $relative ) {
435 $result = $conn->expire( $key, $this->getExpirationAsTTL( $exptime ) );
436 $this->logRequest( 'expire', $key, $conn->getServer(), $result );
437 } else {
438 $result = $conn->expireAt( $key, $this->getExpirationAsTimestamp( $exptime ) );
439 $this->logRequest( 'expireAt', $key, $conn->getServer(), $result );
440 }
441 } catch ( RedisException $e ) {
442 $result = false;
443 $this->handleException( $conn, $e );
444 }
445
446 $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, [ $key ] );
447
448 return $result;
449 }
450
457 protected function getConnectionsForKeys( array $keys ) {
458 $keysByServer = [];
459 $connByServer = [];
460 $success = true;
461 foreach ( $keys as $key ) {
462 $candidateTags = $this->getCandidateServerTagsForKey( $key );
463
464 $conn = null;
465 // Find a suitable server for this key...
466 while ( ( $tag = array_shift( $candidateTags ) ) !== null ) {
467 $server = $this->serverTagMap[$tag];
468 // Reuse connection handles for keys mapping to the same server
469 if ( isset( $connByServer[$server] ) ) {
470 $conn = $connByServer[$server];
471 } else {
472 $conn = $this->redisPool->getConnection( $server, $this->logger );
473 if ( !$conn ) {
474 continue;
475 }
476 // If automatic failover is enabled, check that the server's link
477 // to its master (if any) is up -- but only if there are other
478 // viable candidates left to consider. Also, getMasterLinkStatus()
479 // does not work with twemproxy, though $candidates will be empty
480 // by now in such cases.
481 if ( $this->automaticFailover && $candidateTags ) {
482 try {
484 $info = $conn->info();
485 if ( ( $info['master_link_status'] ?? null ) === 'down' ) {
486 // If the master cannot be reached, fail-over to the next server.
487 // If masters are in data-center A, and replica DBs in data-center B,
488 // this helps avoid the case were fail-over happens in A but not
489 // to the corresponding server in B (e.g. read/write mismatch).
490 continue;
491 }
492 } catch ( RedisException $e ) {
493 // Server is not accepting commands
494 $this->redisPool->handleError( $conn, $e );
495 continue;
496 }
497 }
498 // Use this connection handle
499 $connByServer[$server] = $conn;
500 }
501 // Use this server for this key
502 $keysByServer[$server][] = $key;
503 break;
504 }
505
506 if ( !$conn ) {
507 // No suitable server found for this key
508 $success = false;
510 }
511 }
512
513 return [ $keysByServer, $connByServer, $success ];
514 }
515
521 protected function getConnection( $key ) {
522 [ , $connByServer ] = $this->getConnectionsForKeys( [ $key ] );
523
524 return reset( $connByServer ) ?: null;
525 }
526
527 private function getCandidateServerTagsForKey( string $key ): array {
528 $candidates = array_keys( $this->serverTagMap );
529
530 if ( count( $this->servers ) > 1 ) {
531 ArrayUtils::consistentHashSort( $candidates, $key, '/' );
532 if ( !$this->automaticFailover ) {
533 $candidates = array_slice( $candidates, 0, 1 );
534 }
535 }
536
537 return $candidates;
538 }
539
545 protected function logError( $msg ) {
546 $this->logger->error( "Redis error: $msg" );
547 }
548
558 protected function handleException( RedisConnRef $conn, RedisException $e ) {
560 $this->redisPool->handleError( $conn, $e );
561 }
562
571 public function logRequest( $op, $keys, $server, $e = null ) {
572 $this->debug( "$op($keys) on $server: " . ( $e ? "failure" : "success" ) );
573 }
574}
575
577class_alias( RedisBagOStuff::class, 'RedisBagOStuff' );
array $params
The job parameters.
setLastError( $error)
This is actually implemented in the Job class.
A collection of static methods to play with arrays.
static consistentHashSort(&$array, $key, $separator="\000")
Sort the given array in a pseudo-random order which depends only on the given key and each element va...
Helper class to handle automatically marking connections as reusable (via RAII pattern)
Helper class to manage Redis connections.
static singleton(array $options)
setLastError( $error)
Set the "last error" registry due to a problem encountered during an attempted operation.
Storage medium specific cache for storing items (e.g.
getSerialized( $value, $key)
Get the serialized form a value, logging a warning if it involves custom classes.
getExpirationAsTTL( $exptime)
Convert an optionally absolute expiry time to a relative time.
const PASS_BY_REF
Idiom for doGet() to return extra information by reference.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
Redis-based caching module for redis server >= 2.6.12 and phpredis >= 2.2.4.
doDelete( $key, $flags=0)
Delete an item.
doChangeTTL( $key, $exptime, $flags)
array $serverTagMap
Map of (tag => server name)
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
doIncrWithInit( $key, $exptime, $step, $init, $flags)
doSetMulti(array $data, $exptime=0, $flags=0)
array $servers
List of server names.
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
doGet( $key, $flags=0, &$casToken=null)
Get an item.
doChangeTTLMulti(array $keys, $exptime, $flags=0)
handleException(RedisConnRef $conn, RedisException $e)
The redis extension throws an exception in response to various read, write and protocol errors.
__construct( $params)
Construct a RedisBagOStuff object.
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
logRequest( $op, $keys, $server, $e=null)
Send information about a single request to the debug log.
const ERR_UNREACHABLE
Storage medium could not be reached to establish a connection.
const ATTR_DURABILITY
Durability of writes; see QOS_DURABILITY_* (higher means stronger)
const ERR_UNEXPECTED
Storage medium operation failed due to usage limitations or an I/O error.
const QOS_DURABILITY_DISK
Data is saved to disk and writes do not usually block on fsync()