MediaWiki master
RedisBagOStuff.php
Go to the documentation of this file.
1<?php
35 protected $redisPool;
37 protected $servers;
39 protected $serverTagMap;
42
71 public function __construct( $params ) {
72 parent::__construct( $params );
73 $redisConf = [ 'serializer' => 'none' ]; // manage that in this class
74 foreach ( [ 'connectTimeout', 'persistent', 'password' ] as $opt ) {
75 if ( isset( $params[$opt] ) ) {
76 $redisConf[$opt] = $params[$opt];
77 }
78 }
79 $this->redisPool = RedisConnectionPool::singleton( $redisConf );
80
81 $this->servers = $params['servers'];
82 foreach ( $this->servers as $key => $server ) {
83 $this->serverTagMap[is_int( $key ) ? $server : $key] = $server;
84 }
85
86 $this->automaticFailover = $params['automaticFailover'] ?? true;
87
88 // ...and uses rdb snapshots (redis.conf default)
90 }
91
92 protected function doGet( $key, $flags = 0, &$casToken = null ) {
93 $getToken = ( $casToken === self::PASS_BY_REF );
94 $casToken = null;
95
96 $conn = $this->getConnection( $key );
97 if ( !$conn ) {
98 return false;
99 }
100
101 $e = null;
102 try {
103 $blob = $conn->get( $key );
104 if ( $blob !== false ) {
105 $value = $this->unserialize( $blob );
106 $valueSize = strlen( $blob );
107 } else {
108 $value = false;
109 $valueSize = false;
110 }
111 if ( $getToken && $value !== false ) {
112 $casToken = $blob;
113 }
114 } catch ( RedisException $e ) {
115 $value = false;
116 $valueSize = false;
117 $this->handleException( $conn, $e );
118 }
119
120 $this->logRequest( 'get', $key, $conn->getServer(), $e );
121
122 $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
123
124 return $value;
125 }
126
127 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
128 $conn = $this->getConnection( $key );
129 if ( !$conn ) {
130 return false;
131 }
132
133 $ttl = $this->getExpirationAsTTL( $exptime );
134 $serialized = $this->getSerialized( $value, $key );
135 $valueSize = strlen( $serialized );
136
137 $e = null;
138 try {
139 if ( $ttl ) {
140 $result = $conn->setex( $key, $ttl, $serialized );
141 } else {
142 $result = $conn->set( $key, $serialized );
143 }
144 } catch ( RedisException $e ) {
145 $result = false;
146 $this->handleException( $conn, $e );
147 }
148
149 $this->logRequest( 'set', $key, $conn->getServer(), $e );
150
151 $this->updateOpStats( self::METRIC_OP_SET, [ $key => [ $valueSize, 0 ] ] );
152
153 return $result;
154 }
155
156 protected function doDelete( $key, $flags = 0 ) {
157 $conn = $this->getConnection( $key );
158 if ( !$conn ) {
159 return false;
160 }
161
162 $e = null;
163 try {
164 // Note that redis does not return false if the key was not there
165 $result = ( $conn->del( $key ) !== false );
166 } catch ( RedisException $e ) {
167 $result = false;
168 $this->handleException( $conn, $e );
169 }
170
171 $this->logRequest( 'delete', $key, $conn->getServer(), $e );
172
173 $this->updateOpStats( self::METRIC_OP_DELETE, [ $key ] );
174
175 return $result;
176 }
177
178 protected function doGetMulti( array $keys, $flags = 0 ) {
179 $blobsFound = [];
180
181 [ $keysByServer, $connByServer ] = $this->getConnectionsForKeys( $keys );
182 foreach ( $keysByServer as $server => $batchKeys ) {
183 $conn = $connByServer[$server];
184
185 $e = null;
186 try {
187 // Avoid mget() to reduce CPU hogging from a single request
188 $conn->multi( Redis::PIPELINE );
189 foreach ( $batchKeys as $key ) {
190 $conn->get( $key );
191 }
192 $batchResult = $conn->exec();
193 if ( $batchResult === false ) {
194 $this->logRequest( 'get', implode( ',', $batchKeys ), $server, true );
195 continue;
196 }
197
198 foreach ( $batchResult as $i => $blob ) {
199 if ( $blob !== false ) {
200 $blobsFound[$batchKeys[$i]] = $blob;
201 }
202 }
203 } catch ( RedisException $e ) {
204 $this->handleException( $conn, $e );
205 }
206
207 $this->logRequest( 'get', implode( ',', $batchKeys ), $server, $e );
208 }
209
210 // Preserve the order of $keys
211 $result = [];
212 $valueSizesByKey = [];
213 foreach ( $keys as $key ) {
214 if ( array_key_exists( $key, $blobsFound ) ) {
215 $blob = $blobsFound[$key];
216 $value = $this->unserialize( $blob );
217 if ( $value !== false ) {
218 $result[$key] = $value;
219 }
220 $valueSize = strlen( $blob );
221 } else {
222 $valueSize = false;
223 }
224 $valueSizesByKey[$key] = [ 0, $valueSize ];
225 }
226
227 $this->updateOpStats( self::METRIC_OP_GET, $valueSizesByKey );
228
229 return $result;
230 }
231
232 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
233 $ttl = $this->getExpirationAsTTL( $exptime );
234 $op = $ttl ? 'setex' : 'set';
235
236 $keys = array_keys( $data );
237 $valueSizesByKey = [];
238
239 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
240 foreach ( $keysByServer as $server => $batchKeys ) {
241 $conn = $connByServer[$server];
242
243 $e = null;
244 try {
245 // Avoid mset() to reduce CPU hogging from a single request
246 $conn->multi( Redis::PIPELINE );
247 foreach ( $batchKeys as $key ) {
248 $serialized = $this->getSerialized( $data[$key], $key );
249 if ( $ttl ) {
250 $conn->setex( $key, $ttl, $serialized );
251 } else {
252 $conn->set( $key, $serialized );
253 }
254 $valueSizesByKey[$key] = [ strlen( $serialized ), 0 ];
255 }
256 $batchResult = $conn->exec();
257 if ( $batchResult === false ) {
258 $result = false;
259 $this->logRequest( $op, implode( ',', $batchKeys ), $server, true );
260 continue;
261 }
262
263 $result = $result && !in_array( false, $batchResult, true );
264 } catch ( RedisException $e ) {
265 $this->handleException( $conn, $e );
266 $result = false;
267 }
268
269 $this->logRequest( $op, implode( ',', $batchKeys ), $server, $e );
270 }
271
272 $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
273
274 return $result;
275 }
276
277 protected function doDeleteMulti( array $keys, $flags = 0 ) {
278 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
279 foreach ( $keysByServer as $server => $batchKeys ) {
280 $conn = $connByServer[$server];
281
282 $e = null;
283 try {
284 // Avoid delete() with array to reduce CPU hogging from a single request
285 $conn->multi( Redis::PIPELINE );
286 foreach ( $batchKeys as $key ) {
287 $conn->del( $key );
288 }
289 $batchResult = $conn->exec();
290 if ( $batchResult === false ) {
291 $result = false;
292 $this->logRequest( 'delete', implode( ',', $batchKeys ), $server, true );
293 continue;
294 }
295 // Note that redis does not return false if the key was not there
296 $result = $result && !in_array( false, $batchResult, true );
297 } catch ( RedisException $e ) {
298 $this->handleException( $conn, $e );
299 $result = false;
300 }
301
302 $this->logRequest( 'delete', implode( ',', $batchKeys ), $server, $e );
303 }
304
305 $this->updateOpStats( self::METRIC_OP_DELETE, array_values( $keys ) );
306
307 return $result;
308 }
309
310 public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
311 $relative = $this->isRelativeExpiration( $exptime );
312 $op = ( $exptime == self::TTL_INDEFINITE )
313 ? 'persist'
314 : ( $relative ? 'expire' : 'expireAt' );
315
316 [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
317 foreach ( $keysByServer as $server => $batchKeys ) {
318 $conn = $connByServer[$server];
319
320 $e = null;
321 try {
322 $conn->multi( Redis::PIPELINE );
323 foreach ( $batchKeys as $key ) {
324 if ( $exptime == self::TTL_INDEFINITE ) {
325 $conn->persist( $key );
326 } elseif ( $relative ) {
327 $conn->expire( $key, $this->getExpirationAsTTL( $exptime ) );
328 } else {
329 $conn->expireAt( $key, $this->getExpirationAsTimestamp( $exptime ) );
330 }
331 }
332 $batchResult = $conn->exec();
333 if ( $batchResult === false ) {
334 $result = false;
335 $this->logRequest( $op, implode( ',', $batchKeys ), $server, true );
336 continue;
337 }
338 $result = in_array( false, $batchResult, true ) ? false : $result;
339 } catch ( RedisException $e ) {
340 $this->handleException( $conn, $e );
341 $result = false;
342 }
343
344 $this->logRequest( $op, implode( ',', $batchKeys ), $server, $e );
345 }
346
347 $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_values( $keys ) );
348
349 return $result;
350 }
351
352 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
353 $conn = $this->getConnection( $key );
354 if ( !$conn ) {
355 return false;
356 }
357
358 $ttl = $this->getExpirationAsTTL( $exptime );
359 $serialized = $this->getSerialized( $value, $key );
360 $valueSize = strlen( $serialized );
361
362 try {
363 $result = $conn->set(
364 $key,
365 $serialized,
366 $ttl ? [ 'nx', 'ex' => $ttl ] : [ 'nx' ]
367 );
368 } catch ( RedisException $e ) {
369 $result = false;
370 $this->handleException( $conn, $e );
371 }
372
373 $this->logRequest( 'add', $key, $conn->getServer(), $result );
374
375 $this->updateOpStats( self::METRIC_OP_ADD, [ $key => [ $valueSize, 0 ] ] );
376
377 return $result;
378 }
379
380 protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
381 $conn = $this->getConnection( $key );
382 if ( !$conn ) {
383 return false;
384 }
385
386 $ttl = $this->getExpirationAsTTL( $exptime );
387 try {
388 static $script =
390<<<LUA
391 local key = KEYS[1]
392 local ttl, step, init = unpack( ARGV )
393 if redis.call( 'exists', key ) == 1 then
394 return redis.call( 'incrBy', key, step )
395 end
396 if 1 * ttl ~= 0 then
397 redis.call( 'setex', key, ttl, init )
398 else
399 redis.call( 'set', key, init )
400 end
401 return 1 * init
402LUA;
403 $result = $conn->luaEval( $script, [ $key, $ttl, $step, $init ], 1 );
404 } catch ( RedisException $e ) {
405 $result = false;
406 $this->handleException( $conn, $e );
407 }
408 $this->logRequest( 'incrWithInit', $key, $conn->getServer(), $result );
409
410 return $result;
411 }
412
413 protected function doChangeTTL( $key, $exptime, $flags ) {
414 $conn = $this->getConnection( $key );
415 if ( !$conn ) {
416 return false;
417 }
418
419 $relative = $this->isRelativeExpiration( $exptime );
420 try {
421 if ( $exptime == self::TTL_INDEFINITE ) {
422 $result = $conn->persist( $key );
423 $this->logRequest( 'persist', $key, $conn->getServer(), $result );
424 } elseif ( $relative ) {
425 $result = $conn->expire( $key, $this->getExpirationAsTTL( $exptime ) );
426 $this->logRequest( 'expire', $key, $conn->getServer(), $result );
427 } else {
428 $result = $conn->expireAt( $key, $this->getExpirationAsTimestamp( $exptime ) );
429 $this->logRequest( 'expireAt', $key, $conn->getServer(), $result );
430 }
431 } catch ( RedisException $e ) {
432 $result = false;
433 $this->handleException( $conn, $e );
434 }
435
436 $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, [ $key ] );
437
438 return $result;
439 }
440
446 protected function getConnectionsForKeys( array $keys ) {
447 $keysByServer = [];
448 $connByServer = [];
449 $success = true;
450 foreach ( $keys as $key ) {
451 $candidateTags = $this->getCandidateServerTagsForKey( $key );
452
453 $conn = null;
454 // Find a suitable server for this key...
455 while ( ( $tag = array_shift( $candidateTags ) ) !== null ) {
456 $server = $this->serverTagMap[$tag];
457 // Reuse connection handles for keys mapping to the same server
458 if ( isset( $connByServer[$server] ) ) {
459 $conn = $connByServer[$server];
460 } else {
461 $conn = $this->redisPool->getConnection( $server, $this->logger );
462 if ( !$conn ) {
463 continue;
464 }
465 // If automatic failover is enabled, check that the server's link
466 // to its master (if any) is up -- but only if there are other
467 // viable candidates left to consider. Also, getMasterLinkStatus()
468 // does not work with twemproxy, though $candidates will be empty
469 // by now in such cases.
470 if ( $this->automaticFailover && $candidateTags ) {
471 try {
473 $info = $conn->info();
474 if ( ( $info['master_link_status'] ?? null ) === 'down' ) {
475 // If the master cannot be reached, fail-over to the next server.
476 // If masters are in data-center A, and replica DBs in data-center B,
477 // this helps avoid the case were fail-over happens in A but not
478 // to the corresponding server in B (e.g. read/write mismatch).
479 continue;
480 }
481 } catch ( RedisException $e ) {
482 // Server is not accepting commands
483 $this->redisPool->handleError( $conn, $e );
484 continue;
485 }
486 }
487 // Use this connection handle
488 $connByServer[$server] = $conn;
489 }
490 // Use this server for this key
491 $keysByServer[$server][] = $key;
492 break;
493 }
494
495 if ( !$conn ) {
496 // No suitable server found for this key
497 $success = false;
499 }
500 }
501
502 return [ $keysByServer, $connByServer, $success ];
503 }
504
509 protected function getConnection( $key ) {
510 [ , $connByServer ] = $this->getConnectionsForKeys( [ $key ] );
511
512 return reset( $connByServer ) ?: null;
513 }
514
515 private function getCandidateServerTagsForKey( string $key ): array {
516 $candidates = array_keys( $this->serverTagMap );
517
518 if ( count( $this->servers ) > 1 ) {
519 ArrayUtils::consistentHashSort( $candidates, $key, '/' );
520 if ( !$this->automaticFailover ) {
521 $candidates = array_slice( $candidates, 0, 1 );
522 }
523 }
524
525 return $candidates;
526 }
527
532 protected function logError( $msg ) {
533 $this->logger->error( "Redis error: $msg" );
534 }
535
544 protected function handleException( RedisConnRef $conn, RedisException $e ) {
546 $this->redisPool->handleError( $conn, $e );
547 }
548
556 public function logRequest( $op, $keys, $server, $e = null ) {
557 $this->debug( "$op($keys) on $server: " . ( $e ? "failure" : "success" ) );
558 }
559}
array $params
The job parameters.
setLastError( $error)
This is actually implemented in the Job class.
setLastError( $error)
Set the "last error" registry due to a problem encountered during an attempted operation.
Storage medium specific cache for storing items (e.g.
const PASS_BY_REF
Idiom for doGet() to return extra information by reference.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
getSerialized( $value, $key)
Get the serialized form a value, logging a warning if it involves custom classes.
updateOpStats(string $op, array $keyInfo)
getExpirationAsTTL( $exptime)
Convert an optionally absolute expiry time to a relative time.
Redis-based caching module for redis server >= 2.6.12 and phpredis >= 2.2.4.
RedisConnectionPool $redisPool
handleException(RedisConnRef $conn, RedisException $e)
The redis extension throws an exception in response to various read, write and protocol errors.
array $servers
List of server names.
doSetMulti(array $data, $exptime=0, $flags=0)
doChangeTTL( $key, $exptime, $flags)
logRequest( $op, $keys, $server, $e=null)
Send information about a single request to the debug log.
doIncrWithInit( $key, $exptime, $step, $init, $flags)
doDelete( $key, $flags=0)
Delete an item.
getConnectionsForKeys(array $keys)
logError( $msg)
Log a fatal error.
__construct( $params)
Construct a RedisBagOStuff object.
doGet( $key, $flags=0, &$casToken=null)
Get an item.
array $serverTagMap
Map of (tag => server name)
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
doChangeTTLMulti(array $keys, $exptime, $flags=0)
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
doDeleteMulti(array $keys, $flags=0)
Helper class to handle automatically marking connections as reusable (via RAII pattern)
Helper class to manage Redis connections.
const ERR_UNREACHABLE
Storage medium could not be reached to establish a connection.
const ATTR_DURABILITY
Durability of writes; see QOS_DURABILITY_* (higher means stronger)
const ERR_UNEXPECTED
Storage medium operation failed due to usage limitations or an I/O error.
const QOS_DURABILITY_DISK
Data is saved to disk and writes do not usually block on fsync()