83 $redisConf = [
'serializer' =>
'none' ];
84 foreach ( [
'connectTimeout',
'persistent',
'password' ] as $opt ) {
86 $redisConf[$opt] =
$params[$opt];
91 $this->servers =
$params[
'servers'];
92 foreach ( $this->servers as $key => $server ) {
93 $this->serverTagMap[is_int( $key ) ? $server : $key] = $server;
96 $this->automaticFailover =
$params[
'automaticFailover'] ??
true;
102 protected function doGet( $key, $flags = 0, &$casToken =
null ) {
113 $blob = $conn->get( $key );
114 if ( $blob !==
false ) {
116 $valueSize = strlen( $blob );
121 if ( $getToken && $value !==
false ) {
124 }
catch ( RedisException $e ) {
130 $this->
logRequest(
'get', $key, $conn->getServer(), $e );
132 $this->
updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
137 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
145 $valueSize = strlen( $serialized );
150 $result = $conn->setex( $key, $ttl, $serialized );
152 $result = $conn->set( $key, $serialized );
154 }
catch ( RedisException $e ) {
159 $this->
logRequest(
'set', $key, $conn->getServer(), $e );
161 $this->
updateOpStats( self::METRIC_OP_SET, [ $key => [ $valueSize, 0 ] ] );
175 $result = ( $conn->del( $key ) !== false );
176 }
catch ( RedisException $e ) {
181 $this->
logRequest(
'delete', $key, $conn->getServer(), $e );
192 foreach ( $keysByServer as $server => $batchKeys ) {
193 $conn = $connByServer[$server];
198 $conn->multi( Redis::PIPELINE );
199 foreach ( $batchKeys as $key ) {
202 $batchResult = $conn->exec();
203 if ( $batchResult ===
false ) {
204 $this->
logRequest(
'get', implode(
',', $batchKeys ), $server,
true );
208 foreach ( $batchResult as $i => $blob ) {
209 if ( $blob !==
false ) {
210 $blobsFound[$batchKeys[$i]] = $blob;
213 }
catch ( RedisException $e ) {
217 $this->
logRequest(
'get', implode(
',', $batchKeys ), $server, $e );
222 $valueSizesByKey = [];
223 foreach ( $keys as $key ) {
224 if ( array_key_exists( $key, $blobsFound ) ) {
225 $blob = $blobsFound[$key];
227 if ( $value !==
false ) {
228 $result[$key] = $value;
230 $valueSize = strlen( $blob );
234 $valueSizesByKey[$key] = [ 0, $valueSize ];
237 $this->
updateOpStats( self::METRIC_OP_GET, $valueSizesByKey );
242 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
244 $op = $ttl ?
'setex' :
'set';
246 $keys = array_keys( $data );
247 $valueSizesByKey = [];
250 foreach ( $keysByServer as $server => $batchKeys ) {
251 $conn = $connByServer[$server];
256 $conn->multi( Redis::PIPELINE );
257 foreach ( $batchKeys as $key ) {
260 $conn->setex( $key, $ttl, $serialized );
262 $conn->set( $key, $serialized );
264 $valueSizesByKey[$key] = [ strlen( $serialized ), 0 ];
266 $batchResult = $conn->exec();
267 if ( $batchResult ===
false ) {
269 $this->
logRequest( $op, implode(
',', $batchKeys ), $server,
true );
273 $result = $result && !in_array(
false, $batchResult,
true );
274 }
catch ( RedisException $e ) {
279 $this->
logRequest( $op, implode(
',', $batchKeys ), $server, $e );
282 $this->
updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
289 foreach ( $keysByServer as $server => $batchKeys ) {
290 $conn = $connByServer[$server];
295 $conn->multi( Redis::PIPELINE );
296 foreach ( $batchKeys as $key ) {
299 $batchResult = $conn->exec();
300 if ( $batchResult ===
false ) {
302 $this->
logRequest(
'delete', implode(
',', $batchKeys ), $server,
true );
306 $result = $result && !in_array(
false, $batchResult,
true );
307 }
catch ( RedisException $e ) {
312 $this->
logRequest(
'delete', implode(
',', $batchKeys ), $server, $e );
315 $this->
updateOpStats( self::METRIC_OP_DELETE, array_values( $keys ) );
322 $op = ( $exptime == self::TTL_INDEFINITE )
324 : ( $relative ?
'expire' :
'expireAt' );
327 foreach ( $keysByServer as $server => $batchKeys ) {
328 $conn = $connByServer[$server];
332 $conn->multi( Redis::PIPELINE );
333 foreach ( $batchKeys as $key ) {
334 if ( $exptime == self::TTL_INDEFINITE ) {
335 $conn->persist( $key );
336 } elseif ( $relative ) {
342 $batchResult = $conn->exec();
343 if ( $batchResult ===
false ) {
345 $this->
logRequest( $op, implode(
',', $batchKeys ), $server,
true );
348 $result = in_array(
false, $batchResult,
true ) ? false : $result;
349 }
catch ( RedisException $e ) {
354 $this->
logRequest( $op, implode(
',', $batchKeys ), $server, $e );
357 $this->
updateOpStats( self::METRIC_OP_CHANGE_TTL, array_values( $keys ) );
362 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
370 $valueSize = strlen( $serialized );
373 $result = $conn->set(
376 $ttl ? [
'nx',
'ex' => $ttl ] : [
'nx' ]
378 }
catch ( RedisException $e ) {
383 $this->
logRequest(
'add', $key, $conn->getServer(), $result );
385 $this->
updateOpStats( self::METRIC_OP_ADD, [ $key => [ $valueSize, 0 ] ] );
402 local ttl, step, init = unpack( ARGV )
403 if redis.call(
'exists', key ) == 1 then
404 return redis.call(
'incrBy', key, step )
407 redis.call(
'setex', key, ttl, init )
409 redis.call(
'set', key, init )
413 $result = $conn->luaEval( $script, [ $key, $ttl, $step, $init ], 1 );
414 }
catch ( RedisException $e ) {
418 $this->
logRequest(
'incrWithInit', $key, $conn->getServer(), $result );
431 if ( $exptime == self::TTL_INDEFINITE ) {
432 $result = $conn->persist( $key );
433 $this->
logRequest(
'persist', $key, $conn->getServer(), $result );
434 } elseif ( $relative ) {
436 $this->
logRequest(
'expire', $key, $conn->getServer(), $result );
439 $this->
logRequest(
'expireAt', $key, $conn->getServer(), $result );
441 }
catch ( RedisException $e ) {
446 $this->
updateOpStats( self::METRIC_OP_CHANGE_TTL, [ $key ] );
461 foreach ( $keys as $key ) {
462 $candidateTags = $this->getCandidateServerTagsForKey( $key );
467 while ( ( $tag = array_shift( $candidateTags ) ) !==
null ) {
468 $server = $this->serverTagMap[$tag];
470 if ( isset( $connByServer[$server] ) ) {
471 $conn = $connByServer[$server];
473 $conn = $this->redisPool->getConnection( $server, $this->logger );
482 if ( $this->automaticFailover && $candidateTags ) {
485 $info = $conn->info();
486 if ( ( $info[
'master_link_status'] ??
null ) ===
'down' ) {
493 }
catch ( RedisException $e ) {
495 $this->redisPool->handleError( $conn, $e );
500 $connByServer[$server] = $conn;
503 $keysByServer[$server][] = $key;
514 return [ $keysByServer, $connByServer,
$success ];
525 return reset( $connByServer ) ?:
null;
528 private function getCandidateServerTagsForKey(
string $key ): array {
529 $candidates = array_keys( $this->serverTagMap );
531 if ( count( $this->servers ) > 1 ) {
533 if ( !$this->automaticFailover ) {
534 $candidates = array_slice( $candidates, 0, 1 );
547 $this->logger->error(
"Redis error: $msg" );
561 $this->redisPool->handleError( $conn, $e );
572 public function logRequest( $op, $keys, $server, $e =
null ) {
573 $this->debug(
"$op($keys) on $server: " . ( $e ?
"failure" :
"success" ) );
578class_alias( RedisBagOStuff::class,
'RedisBagOStuff' );
array $params
The job parameters.
setLastError( $error)
This is actually implemented in the Job class.
A collection of static methods to play with arrays.
static consistentHashSort(&$array, $key, $separator="\000")
Sort the given array in a pseudo-random order which depends only on the given key and each element va...