83 $redisConf = [
'serializer' =>
'none' ];
84 foreach ( [
'connectTimeout',
'persistent',
'password' ] as $opt ) {
86 $redisConf[$opt] =
$params[$opt];
91 $this->servers =
$params[
'servers'];
92 foreach ( $this->servers as $key => $server ) {
93 $this->serverTagMap[is_int( $key ) ? $server : $key] = $server;
96 $this->automaticFailover =
$params[
'automaticFailover'] ??
true;
102 protected function doGet( $key, $flags = 0, &$casToken =
null ) {
113 $blob = $conn->get( $key );
114 if ( $blob !==
false ) {
116 $valueSize = strlen( $blob );
121 if ( $getToken && $value !==
false ) {
124 }
catch ( RedisException $e ) {
130 $this->
logRequest(
'get', $key, $conn->getServer(), $e );
132 $this->
updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
137 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
145 $valueSize = strlen( $serialized );
150 $result = $conn->setex( $key, $ttl, $serialized );
152 $result = $conn->set( $key, $serialized );
154 }
catch ( RedisException $e ) {
159 $this->
logRequest(
'set', $key, $conn->getServer(), $e );
161 $this->
updateOpStats( self::METRIC_OP_SET, [ $key => [ $valueSize, 0 ] ] );
175 $result = ( $conn->del( $key ) !== false );
176 }
catch ( RedisException $e ) {
181 $this->
logRequest(
'delete', $key, $conn->getServer(), $e );
192 foreach ( $keysByServer as $server => $batchKeys ) {
193 $conn = $connByServer[$server];
198 $conn->multi( Redis::PIPELINE );
199 foreach ( $batchKeys as $key ) {
202 $batchResult = $conn->exec();
203 if ( $batchResult ===
false ) {
204 $this->
logRequest(
'get', implode(
',', $batchKeys ), $server,
true );
208 foreach ( $batchResult as $i => $blob ) {
209 if ( $blob !==
false ) {
210 $blobsFound[$batchKeys[$i]] = $blob;
213 }
catch ( RedisException $e ) {
217 $this->
logRequest(
'get', implode(
',', $batchKeys ), $server, $e );
222 $valueSizesByKey = [];
223 foreach ( $keys as $key ) {
224 if ( array_key_exists( $key, $blobsFound ) ) {
225 $blob = $blobsFound[$key];
227 if ( $value !==
false ) {
228 $result[$key] = $value;
230 $valueSize = strlen( $blob );
234 $valueSizesByKey[$key] = [ 0, $valueSize ];
237 $this->
updateOpStats( self::METRIC_OP_GET, $valueSizesByKey );
242 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
244 $op = $ttl ?
'setex' :
'set';
246 $keys = array_keys( $data );
247 $valueSizesByKey = [];
250 foreach ( $keysByServer as $server => $batchKeys ) {
251 $conn = $connByServer[$server];
256 $conn->multi( Redis::PIPELINE );
257 foreach ( $batchKeys as $key ) {
260 $conn->setex( $key, $ttl, $serialized );
262 $conn->set( $key, $serialized );
264 $valueSizesByKey[$key] = [ strlen( $serialized ), 0 ];
266 $batchResult = $conn->exec();
267 if ( $batchResult ===
false ) {
269 $this->
logRequest( $op, implode(
',', $batchKeys ), $server,
true );
273 $result = $result && !in_array(
false, $batchResult,
true );
274 }
catch ( RedisException $e ) {
279 $this->
logRequest( $op, implode(
',', $batchKeys ), $server, $e );
282 $this->
updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
289 foreach ( $keysByServer as $server => $batchKeys ) {
290 $conn = $connByServer[$server];
295 $conn->multi( Redis::PIPELINE );
296 foreach ( $batchKeys as $key ) {
299 $batchResult = $conn->exec();
300 if ( $batchResult ===
false ) {
302 $this->
logRequest(
'delete', implode(
',', $batchKeys ), $server,
true );
306 $result = $result && !in_array(
false, $batchResult,
true );
307 }
catch ( RedisException $e ) {
312 $this->
logRequest(
'delete', implode(
',', $batchKeys ), $server, $e );
315 $this->
updateOpStats( self::METRIC_OP_DELETE, array_values( $keys ) );
322 $op = ( $exptime == self::TTL_INDEFINITE )
324 : ( $relative ?
'expire' :
'expireAt' );
327 foreach ( $keysByServer as $server => $batchKeys ) {
328 $conn = $connByServer[$server];
332 $conn->multi( Redis::PIPELINE );
333 foreach ( $batchKeys as $key ) {
334 if ( $exptime == self::TTL_INDEFINITE ) {
335 $conn->persist( $key );
336 } elseif ( $relative ) {
342 $batchResult = $conn->exec();
343 if ( $batchResult ===
false ) {
345 $this->
logRequest( $op, implode(
',', $batchKeys ), $server,
true );
348 $result = in_array(
false, $batchResult,
true ) ? false : $result;
349 }
catch ( RedisException $e ) {
354 $this->
logRequest( $op, implode(
',', $batchKeys ), $server, $e );
357 $this->
updateOpStats( self::METRIC_OP_CHANGE_TTL, array_values( $keys ) );
362 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
370 $valueSize = strlen( $serialized );
373 $result = $conn->set(
376 $ttl ? [
'nx',
'ex' => $ttl ] : [
'nx' ]
378 }
catch ( RedisException $e ) {
383 $this->
logRequest(
'add', $key, $conn->getServer(), $result );
385 $this->
updateOpStats( self::METRIC_OP_ADD, [ $key => [ $valueSize, 0 ] ] );
402 local ttl, step, init = unpack( ARGV )
403 if redis.call(
'exists', key ) == 1 then
404 return redis.call(
'incrBy', key, step )
407 redis.call(
'setex', key, ttl, init )
409 redis.call(
'set', key, init )
413 $result = $conn->luaEval( $script, [ $key, $ttl, $step, $init ], 1 );
414 }
catch ( RedisException $e ) {
418 $this->
logRequest(
'incrWithInit', $key, $conn->getServer(), $result );
431 if ( $exptime == self::TTL_INDEFINITE ) {
432 $result = $conn->persist( $key );
433 $this->
logRequest(
'persist', $key, $conn->getServer(), $result );
434 } elseif ( $relative ) {
436 $this->
logRequest(
'expire', $key, $conn->getServer(), $result );
439 $this->
logRequest(
'expireAt', $key, $conn->getServer(), $result );
441 }
catch ( RedisException $e ) {
446 $this->
updateOpStats( self::METRIC_OP_CHANGE_TTL, [ $key ] );
461 foreach ( $keys as $key ) {
462 $candidateTags = $this->getCandidateServerTagsForKey( $key );
466 while ( ( $tag = array_shift( $candidateTags ) ) !==
null ) {
467 $server = $this->serverTagMap[$tag];
469 if ( isset( $connByServer[$server] ) ) {
470 $conn = $connByServer[$server];
472 $conn = $this->redisPool->getConnection( $server, $this->logger );
481 if ( $this->automaticFailover && $candidateTags ) {
484 $info = $conn->info();
485 if ( ( $info[
'master_link_status'] ??
null ) ===
'down' ) {
492 }
catch ( RedisException $e ) {
494 $this->redisPool->handleError( $conn, $e );
499 $connByServer[$server] = $conn;
502 $keysByServer[$server][] = $key;
513 return [ $keysByServer, $connByServer,
$success ];
524 return reset( $connByServer ) ?:
null;
527 private function getCandidateServerTagsForKey(
string $key ): array {
528 $candidates = array_keys( $this->serverTagMap );
530 if ( count( $this->servers ) > 1 ) {
532 if ( !$this->automaticFailover ) {
533 $candidates = array_slice( $candidates, 0, 1 );
546 $this->logger->error(
"Redis error: $msg" );
560 $this->redisPool->handleError( $conn, $e );
571 public function logRequest( $op, $keys, $server, $e =
null ) {
572 $this->debug(
"$op($keys) on $server: " . ( $e ?
"failure" :
"success" ) );
577class_alias( RedisBagOStuff::class,
'RedisBagOStuff' );
array $params
The job parameters.
setLastError( $error)
This is actually implemented in the Job class.
A collection of static methods to play with arrays.
static consistentHashSort(&$array, $key, $separator="\000")
Sort the given array in a pseudo-random order which depends only on the given key and each element va...
Helper class to handle automatically marking connections as reusable (via RAII pattern)
Helper class to manage Redis connections.
static singleton(array $options)