MediaWiki  master
RedisBagOStuff.php
Go to the documentation of this file.
1 <?php
35  protected $redisPool;
37  protected $servers;
39  protected $serverTagMap;
41  protected $automaticFailover;
42 
71  public function __construct( $params ) {
72  parent::__construct( $params );
73  $redisConf = [ 'serializer' => 'none' ]; // manage that in this class
74  foreach ( [ 'connectTimeout', 'persistent', 'password' ] as $opt ) {
75  if ( isset( $params[$opt] ) ) {
76  $redisConf[$opt] = $params[$opt];
77  }
78  }
79  $this->redisPool = RedisConnectionPool::singleton( $redisConf );
80 
81  $this->servers = $params['servers'];
82  foreach ( $this->servers as $key => $server ) {
83  $this->serverTagMap[is_int( $key ) ? $server : $key] = $server;
84  }
85 
86  $this->automaticFailover = $params['automaticFailover'] ?? true;
87 
88  // ...and uses rdb snapshots (redis.conf default)
90  }
91 
92  protected function doGet( $key, $flags = 0, &$casToken = null ) {
93  $getToken = ( $casToken === self::PASS_BY_REF );
94  $casToken = null;
95 
96  $conn = $this->getConnection( $key );
97  if ( !$conn ) {
98  return false;
99  }
100 
101  $e = null;
102  try {
103  $blob = $conn->get( $key );
104  if ( $blob !== false ) {
105  $value = $this->unserialize( $blob );
106  $valueSize = strlen( $blob );
107  } else {
108  $value = false;
109  $valueSize = false;
110  }
111  if ( $getToken && $value !== false ) {
112  $casToken = $blob;
113  }
114  } catch ( RedisException $e ) {
115  $value = false;
116  $valueSize = false;
117  $this->handleException( $conn, $e );
118  }
119 
120  $this->logRequest( 'get', $key, $conn->getServer(), $e );
121 
122  $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
123 
124  return $value;
125  }
126 
127  protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
128  $conn = $this->getConnection( $key );
129  if ( !$conn ) {
130  return false;
131  }
132 
133  $ttl = $this->getExpirationAsTTL( $exptime );
134  $serialized = $this->getSerialized( $value, $key );
135  $valueSize = strlen( $serialized );
136 
137  $e = null;
138  try {
139  if ( $ttl ) {
140  $result = $conn->setex( $key, $ttl, $serialized );
141  } else {
142  $result = $conn->set( $key, $serialized );
143  }
144  } catch ( RedisException $e ) {
145  $result = false;
146  $this->handleException( $conn, $e );
147  }
148 
149  $this->logRequest( 'set', $key, $conn->getServer(), $e );
150 
151  $this->updateOpStats( self::METRIC_OP_SET, [ $key => [ $valueSize, 0 ] ] );
152 
153  return $result;
154  }
155 
156  protected function doDelete( $key, $flags = 0 ) {
157  $conn = $this->getConnection( $key );
158  if ( !$conn ) {
159  return false;
160  }
161 
162  $e = null;
163  try {
164  // Note that redis does not return false if the key was not there
165  $result = ( $conn->del( $key ) !== false );
166  } catch ( RedisException $e ) {
167  $result = false;
168  $this->handleException( $conn, $e );
169  }
170 
171  $this->logRequest( 'delete', $key, $conn->getServer(), $e );
172 
173  $this->updateOpStats( self::METRIC_OP_DELETE, [ $key ] );
174 
175  return $result;
176  }
177 
178  protected function doGetMulti( array $keys, $flags = 0 ) {
179  $blobsFound = [];
180 
181  [ $keysByServer, $connByServer ] = $this->getConnectionsForKeys( $keys );
182  foreach ( $keysByServer as $server => $batchKeys ) {
183  $conn = $connByServer[$server];
184 
185  $e = null;
186  try {
187  // Avoid mget() to reduce CPU hogging from a single request
188  $conn->multi( Redis::PIPELINE );
189  foreach ( $batchKeys as $key ) {
190  $conn->get( $key );
191  }
192  $batchResult = $conn->exec();
193  if ( $batchResult === false ) {
194  $this->logRequest( 'get', implode( ',', $batchKeys ), $server, true );
195  continue;
196  }
197 
198  foreach ( $batchResult as $i => $blob ) {
199  if ( $blob !== false ) {
200  $blobsFound[$batchKeys[$i]] = $blob;
201  }
202  }
203  } catch ( RedisException $e ) {
204  $this->handleException( $conn, $e );
205  }
206 
207  $this->logRequest( 'get', implode( ',', $batchKeys ), $server, $e );
208  }
209 
210  // Preserve the order of $keys
211  $result = [];
212  $valueSizesByKey = [];
213  foreach ( $keys as $key ) {
214  if ( array_key_exists( $key, $blobsFound ) ) {
215  $blob = $blobsFound[$key];
216  $value = $this->unserialize( $blob );
217  if ( $value !== false ) {
218  $result[$key] = $value;
219  }
220  $valueSize = strlen( $blob );
221  } else {
222  $valueSize = false;
223  }
224  $valueSizesByKey[$key] = [ 0, $valueSize ];
225  }
226 
227  $this->updateOpStats( self::METRIC_OP_GET, $valueSizesByKey );
228 
229  return $result;
230  }
231 
232  protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
233  $ttl = $this->getExpirationAsTTL( $exptime );
234  $op = $ttl ? 'setex' : 'set';
235 
236  $keys = array_keys( $data );
237  $valueSizesByKey = [];
238 
239  [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
240  foreach ( $keysByServer as $server => $batchKeys ) {
241  $conn = $connByServer[$server];
242 
243  $e = null;
244  try {
245  // Avoid mset() to reduce CPU hogging from a single request
246  $conn->multi( Redis::PIPELINE );
247  foreach ( $batchKeys as $key ) {
248  $serialized = $this->getSerialized( $data[$key], $key );
249  if ( $ttl ) {
250  $conn->setex( $key, $ttl, $serialized );
251  } else {
252  $conn->set( $key, $serialized );
253  }
254  $valueSizesByKey[$key] = [ strlen( $serialized ), 0 ];
255  }
256  $batchResult = $conn->exec();
257  if ( $batchResult === false ) {
258  $result = false;
259  $this->logRequest( $op, implode( ',', $batchKeys ), $server, true );
260  continue;
261  }
262 
263  $result = $result && !in_array( false, $batchResult, true );
264  } catch ( RedisException $e ) {
265  $this->handleException( $conn, $e );
266  $result = false;
267  }
268 
269  $this->logRequest( $op, implode( ',', $batchKeys ), $server, $e );
270  }
271 
272  $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
273 
274  return $result;
275  }
276 
277  protected function doDeleteMulti( array $keys, $flags = 0 ) {
278  [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
279  foreach ( $keysByServer as $server => $batchKeys ) {
280  $conn = $connByServer[$server];
281 
282  $e = null;
283  try {
284  // Avoid delete() with array to reduce CPU hogging from a single request
285  $conn->multi( Redis::PIPELINE );
286  foreach ( $batchKeys as $key ) {
287  $conn->del( $key );
288  }
289  $batchResult = $conn->exec();
290  if ( $batchResult === false ) {
291  $result = false;
292  $this->logRequest( 'delete', implode( ',', $batchKeys ), $server, true );
293  continue;
294  }
295  // Note that redis does not return false if the key was not there
296  $result = $result && !in_array( false, $batchResult, true );
297  } catch ( RedisException $e ) {
298  $this->handleException( $conn, $e );
299  $result = false;
300  }
301 
302  $this->logRequest( 'delete', implode( ',', $batchKeys ), $server, $e );
303  }
304 
305  $this->updateOpStats( self::METRIC_OP_DELETE, array_values( $keys ) );
306 
307  return $result;
308  }
309 
310  public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
311  $relative = $this->isRelativeExpiration( $exptime );
312  $op = ( $exptime == self::TTL_INDEFINITE )
313  ? 'persist'
314  : ( $relative ? 'expire' : 'expireAt' );
315 
316  [ $keysByServer, $connByServer, $result ] = $this->getConnectionsForKeys( $keys );
317  foreach ( $keysByServer as $server => $batchKeys ) {
318  $conn = $connByServer[$server];
319 
320  $e = null;
321  try {
322  $conn->multi( Redis::PIPELINE );
323  foreach ( $batchKeys as $key ) {
324  if ( $exptime == self::TTL_INDEFINITE ) {
325  $conn->persist( $key );
326  } elseif ( $relative ) {
327  $conn->expire( $key, $this->getExpirationAsTTL( $exptime ) );
328  } else {
329  $conn->expireAt( $key, $this->getExpirationAsTimestamp( $exptime ) );
330  }
331  }
332  $batchResult = $conn->exec();
333  if ( $batchResult === false ) {
334  $result = false;
335  $this->logRequest( $op, implode( ',', $batchKeys ), $server, true );
336  continue;
337  }
338  $result = in_array( false, $batchResult, true ) ? false : $result;
339  } catch ( RedisException $e ) {
340  $this->handleException( $conn, $e );
341  $result = false;
342  }
343 
344  $this->logRequest( $op, implode( ',', $batchKeys ), $server, $e );
345  }
346 
347  $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_values( $keys ) );
348 
349  return $result;
350  }
351 
352  protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
353  $conn = $this->getConnection( $key );
354  if ( !$conn ) {
355  return false;
356  }
357 
358  $ttl = $this->getExpirationAsTTL( $exptime );
359  $serialized = $this->getSerialized( $value, $key );
360  $valueSize = strlen( $serialized );
361 
362  try {
363  $result = $conn->set(
364  $key,
365  $serialized,
366  $ttl ? [ 'nx', 'ex' => $ttl ] : [ 'nx' ]
367  );
368  } catch ( RedisException $e ) {
369  $result = false;
370  $this->handleException( $conn, $e );
371  }
372 
373  $this->logRequest( 'add', $key, $conn->getServer(), $result );
374 
375  $this->updateOpStats( self::METRIC_OP_ADD, [ $key => [ $valueSize, 0 ] ] );
376 
377  return $result;
378  }
379 
380  protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
381  $conn = $this->getConnection( $key );
382  if ( !$conn ) {
383  return false;
384  }
385 
386  $ttl = $this->getExpirationAsTTL( $exptime );
387  try {
388  static $script =
390 <<<LUA
391  local key = KEYS[1]
392  local ttl, step, init = unpack( ARGV )
393  if redis.call( 'exists', key ) == 1 then
394  return redis.call( 'incrBy', key, step )
395  end
396  if 1 * ttl ~= 0 then
397  redis.call( 'setex', key, ttl, init )
398  else
399  redis.call( 'set', key, init )
400  end
401  return 1 * init
402 LUA;
403  $result = $conn->luaEval( $script, [ $key, $ttl, $step, $init ], 1 );
404  } catch ( RedisException $e ) {
405  $result = false;
406  $this->handleException( $conn, $e );
407  }
408  $this->logRequest( 'incrWithInit', $key, $conn->getServer(), $result );
409 
410  return $result;
411  }
412 
413  protected function doChangeTTL( $key, $exptime, $flags ) {
414  $conn = $this->getConnection( $key );
415  if ( !$conn ) {
416  return false;
417  }
418 
419  $relative = $this->isRelativeExpiration( $exptime );
420  try {
421  if ( $exptime == self::TTL_INDEFINITE ) {
422  $result = $conn->persist( $key );
423  $this->logRequest( 'persist', $key, $conn->getServer(), $result );
424  } elseif ( $relative ) {
425  $result = $conn->expire( $key, $this->getExpirationAsTTL( $exptime ) );
426  $this->logRequest( 'expire', $key, $conn->getServer(), $result );
427  } else {
428  $result = $conn->expireAt( $key, $this->getExpirationAsTimestamp( $exptime ) );
429  $this->logRequest( 'expireAt', $key, $conn->getServer(), $result );
430  }
431  } catch ( RedisException $e ) {
432  $result = false;
433  $this->handleException( $conn, $e );
434  }
435 
436  $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, [ $key ] );
437 
438  return $result;
439  }
440 
446  protected function getConnectionsForKeys( array $keys ) {
447  $keysByServer = [];
448  $connByServer = [];
449  $success = true;
450  foreach ( $keys as $key ) {
451  $candidateTags = $this->getCandidateServerTagsForKey( $key );
452 
453  $conn = null;
454  // Find a suitable server for this key...
455  while ( ( $tag = array_shift( $candidateTags ) ) !== null ) {
456  $server = $this->serverTagMap[$tag];
457  // Reuse connection handles for keys mapping to the same server
458  if ( isset( $connByServer[$server] ) ) {
459  $conn = $connByServer[$server];
460  } else {
461  $conn = $this->redisPool->getConnection( $server, $this->logger );
462  if ( !$conn ) {
463  continue;
464  }
465  // If automatic failover is enabled, check that the server's link
466  // to its master (if any) is up -- but only if there are other
467  // viable candidates left to consider. Also, getMasterLinkStatus()
468  // does not work with twemproxy, though $candidates will be empty
469  // by now in such cases.
470  if ( $this->automaticFailover && $candidateTags ) {
471  try {
473  $info = $conn->info();
474  if ( ( $info['master_link_status'] ?? null ) === 'down' ) {
475  // If the master cannot be reached, fail-over to the next server.
476  // If masters are in data-center A, and replica DBs in data-center B,
477  // this helps avoid the case were fail-over happens in A but not
478  // to the corresponding server in B (e.g. read/write mismatch).
479  continue;
480  }
481  } catch ( RedisException $e ) {
482  // Server is not accepting commands
483  $this->redisPool->handleError( $conn, $e );
484  continue;
485  }
486  }
487  // Use this connection handle
488  $connByServer[$server] = $conn;
489  }
490  // Use this server for this key
491  $keysByServer[$server][] = $key;
492  break;
493  }
494 
495  if ( !$conn ) {
496  // No suitable server found for this key
497  $success = false;
499  }
500  }
501 
502  return [ $keysByServer, $connByServer, $success ];
503  }
504 
509  protected function getConnection( $key ) {
510  [ , $connByServer ] = $this->getConnectionsForKeys( [ $key ] );
511 
512  return reset( $connByServer ) ?: null;
513  }
514 
515  private function getCandidateServerTagsForKey( string $key ): array {
516  $candidates = array_keys( $this->serverTagMap );
517 
518  if ( count( $this->servers ) > 1 ) {
519  ArrayUtils::consistentHashSort( $candidates, $key, '/' );
520  if ( !$this->automaticFailover ) {
521  $candidates = array_slice( $candidates, 0, 1 );
522  }
523  }
524 
525  return $candidates;
526  }
527 
532  protected function logError( $msg ) {
533  $this->logger->error( "Redis error: $msg" );
534  }
535 
544  protected function handleException( RedisConnRef $conn, RedisException $e ) {
545  $this->setLastError( BagOStuff::ERR_UNEXPECTED );
546  $this->redisPool->handleError( $conn, $e );
547  }
548 
556  public function logRequest( $op, $keys, $server, $e = null ) {
557  $this->debug( "$op($keys) on $server: " . ( $e ? "failure" : "success" ) );
558  }
559 }
$success
static consistentHashSort(&$array, $key, $separator="\000")
Sort the given array in a pseudo-random order which depends only on the given key and each element va...
Definition: ArrayUtils.php:49
setLastError( $error)
Set the "last error" registry due to a problem encountered during an attempted operation.
Definition: BagOStuff.php:497
Storage medium specific cache for storing items (e.g.
const PASS_BY_REF
Idiom for doGet() to return extra information by reference.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
getSerialized( $value, $key)
Get the serialized form a value, logging a warning if it involves custom classes.
updateOpStats(string $op, array $keyInfo)
getExpirationAsTTL( $exptime)
Convert an optionally absolute expiry time to a relative time.
Redis-based caching module for redis server >= 2.6.12 and phpredis >= 2.2.4.
RedisConnectionPool $redisPool
handleException(RedisConnRef $conn, RedisException $e)
The redis extension throws an exception in response to various read, write and protocol errors.
array $servers
List of server names.
doSetMulti(array $data, $exptime=0, $flags=0)
doChangeTTL( $key, $exptime, $flags)
logRequest( $op, $keys, $server, $e=null)
Send information about a single request to the debug log.
doIncrWithInit( $key, $exptime, $step, $init, $flags)
doDelete( $key, $flags=0)
Delete an item.
getConnectionsForKeys(array $keys)
logError( $msg)
Log a fatal error.
__construct( $params)
Construct a RedisBagOStuff object.
doGet( $key, $flags=0, &$casToken=null)
Get an item.
array $serverTagMap
Map of (tag => server name)
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
doChangeTTLMulti(array $keys, $exptime, $flags=0)
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
doDeleteMulti(array $keys, $flags=0)
Helper class to handle automatically marking connections as reusable (via RAII pattern)
static singleton(array $options)
const ERR_UNREACHABLE
Storage medium could not be reached to establish a connection.
const ATTR_DURABILITY
Durability of writes; see QOS_DURABILITY_* (higher means stronger)
const ERR_UNEXPECTED
Storage medium operation failed due to usage limitations or an I/O error.
const QOS_DURABILITY_DISK
Data is saved to disk and writes do not usually block on fsync()