MediaWiki  master
MemcachedPeclBagOStuff.php
Go to the documentation of this file.
1 <?php
31  protected $syncClient;
33  protected $asyncClient;
34 
36  protected $syncClientIsBuffering = false;
38  protected $hasUnflushedChanges = false;
39 
41  private static $OPTS_SYNC_WRITES = [
42  Memcached::OPT_NO_BLOCK => false, // async I/O (using TCP buffers)
43  Memcached::OPT_BUFFER_WRITES => false // libmemcached buffers
44  ];
46  private static $OPTS_ASYNC_WRITES = [
47  Memcached::OPT_NO_BLOCK => true, // async I/O (using TCP buffers)
48  Memcached::OPT_BUFFER_WRITES => true // libmemcached buffers
49  ];
50 
67  public function __construct( $params ) {
68  parent::__construct( $params );
69 
70  // Default class-specific parameters
71  $params += [
72  'compress_threshold' => 1500,
73  'connect_timeout' => 0.5,
74  'timeout' => 500000,
75  'serializer' => 'php',
76  'use_binary_protocol' => false,
77  'allow_tcp_nagle_delay' => true
78  ];
79 
80  if ( $params['persistent'] ) {
81  // The pool ID must be unique to the server/option combination.
82  // The Memcached object is essentially shared for each pool ID.
83  // We can only reuse a pool ID if we keep the config consistent.
84  $connectionPoolId = md5( serialize( $params ) );
85  $syncClient = new Memcached( "$connectionPoolId-sync" );
86  // Avoid clobbering the main thread-shared Memcached instance
87  $asyncClient = new Memcached( "$connectionPoolId-async" );
88  } else {
89  $syncClient = new Memcached();
90  $asyncClient = null;
91  }
92 
93  $this->initializeClient( $syncClient, $params, self::$OPTS_SYNC_WRITES );
94  if ( $asyncClient ) {
95  $this->initializeClient( $asyncClient, $params, self::$OPTS_ASYNC_WRITES );
96  }
97 
98  // Set the main client and any dedicated one for buffered writes
99  $this->syncClient = $syncClient;
100  $this->asyncClient = $asyncClient;
101  // The compression threshold is an undocumented php.ini option for some
102  // reason. There's probably not much harm in setting it globally, for
103  // compatibility with the settings for the PHP client.
104  ini_set( 'memcached.compression_threshold', $params['compress_threshold'] );
105  }
106 
116  private function initializeClient( Memcached $client, array $params, array $options ) {
117  if ( $client->getServerList() ) {
118  $this->logger->debug( __METHOD__ . ": pre-initialized client instance." );
119 
120  return; // preserve persistent handle
121  }
122 
123  $this->logger->debug( __METHOD__ . ": initializing new client instance." );
124 
125  $options += [
126  Memcached::OPT_NO_BLOCK => false,
127  Memcached::OPT_BUFFER_WRITES => false,
128  // Network protocol (ASCII or binary)
129  Memcached::OPT_BINARY_PROTOCOL => $params['use_binary_protocol'],
130  // Set various network timeouts
131  Memcached::OPT_CONNECT_TIMEOUT => $params['connect_timeout'] * 1000,
132  Memcached::OPT_SEND_TIMEOUT => $params['timeout'],
133  Memcached::OPT_RECV_TIMEOUT => $params['timeout'],
134  Memcached::OPT_POLL_TIMEOUT => $params['timeout'] / 1000,
135  // Avoid pointless delay when sending/fetching large blobs
136  Memcached::OPT_TCP_NODELAY => !$params['allow_tcp_nagle_delay'],
137  // Set libketama mode since it's recommended by the documentation
138  Memcached::OPT_LIBKETAMA_COMPATIBLE => true
139  ];
140  if ( isset( $params['retry_timeout'] ) ) {
141  $options[Memcached::OPT_RETRY_TIMEOUT] = $params['retry_timeout'];
142  }
143  if ( isset( $params['server_failure_limit'] ) ) {
144  $options[Memcached::OPT_SERVER_FAILURE_LIMIT] = $params['server_failure_limit'];
145  }
146  if ( $params['serializer'] === 'php' ) {
147  $options[Memcached::OPT_SERIALIZER] = Memcached::SERIALIZER_PHP;
148  } elseif ( $params['serializer'] === 'igbinary' ) {
149  // @phan-suppress-next-line PhanImpossibleCondition
150  if ( !Memcached::HAVE_IGBINARY ) {
151  throw new RuntimeException(
152  __CLASS__ . ': the igbinary extension is not available ' .
153  'but igbinary serialization was requested.'
154  );
155  }
156  $options[Memcached::OPT_SERIALIZER] = Memcached::SERIALIZER_IGBINARY;
157  }
158 
159  if ( !$client->setOptions( $options ) ) {
160  throw new RuntimeException(
161  "Invalid options: " . json_encode( $options, JSON_PRETTY_PRINT )
162  );
163  }
164 
165  $servers = [];
166  foreach ( $params['servers'] as $host ) {
167  if ( preg_match( '/^\[(.+)\]:(\d+)$/', $host, $m ) ) {
168  $servers[] = [ $m[1], (int)$m[2] ]; // (ip, port)
169  } elseif ( preg_match( '/^([^:]+):(\d+)$/', $host, $m ) ) {
170  $servers[] = [ $m[1], (int)$m[2] ]; // (ip or path, port)
171  } else {
172  $servers[] = [ $host, false ]; // (ip or path, port)
173  }
174  }
175 
176  if ( !$client->addServers( $servers ) ) {
177  throw new RuntimeException( "Failed to inject server address list" );
178  }
179  }
180 
181  protected function doGet( $key, $flags = 0, &$casToken = null ) {
182  $getToken = ( $casToken === self::PASS_BY_REF );
183  $casToken = null;
184 
185  $this->debug( "get($key)" );
186 
187  $routeKey = $this->validateKeyAndPrependRoute( $key );
188 
189  $client = $this->acquireSyncClient();
190  // T257003: only require "gets" (instead of "get") when a CAS token is needed
191  if ( $getToken ) {
193  $flags = Memcached::GET_EXTENDED;
194  $res = $client->get( $routeKey, null, $flags );
195  if ( is_array( $res ) ) {
196  $result = $res['value'];
197  $casToken = $res['cas'];
198  } else {
199  $result = false;
200  }
201  } else {
202  $result = $client->get( $routeKey );
203  }
204 
205  return $this->checkResult( $key, $result );
206  }
207 
208  protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
209  $this->debug( "set($key)" );
210 
211  $routeKey = $this->validateKeyAndPrependRoute( $key );
212  $client = $this->acquireSyncClient();
213  $result = $client->set( $routeKey, $value, $this->fixExpiry( $exptime ) );
214 
215  return ( $result === false && $client->getResultCode() === Memcached::RES_NOTSTORED )
216  // "Not stored" is always used as the mcrouter response with AllAsyncRoute
217  ? true
218  : $this->checkResult( $key, $result );
219  }
220 
221  protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
222  $this->debug( "cas($key)" );
223 
224  $routeKey = $this->validateKeyAndPrependRoute( $key );
225  $result = $this->acquireSyncClient()->cas(
226  $casToken,
227  $routeKey,
228  $value, $this->fixExpiry( $exptime )
229  );
230 
231  return $this->checkResult( $key, $result );
232  }
233 
234  protected function doDelete( $key, $flags = 0 ) {
235  $this->debug( "delete($key)" );
236 
237  $routeKey = $this->validateKeyAndPrependRoute( $key );
238  $client = $this->acquireSyncClient();
239  $result = $client->delete( $routeKey );
240 
241  return ( $result === false && $client->getResultCode() === Memcached::RES_NOTFOUND )
242  // "Not found" is counted as success in our interface
243  ? true
244  : $this->checkResult( $key, $result );
245  }
246 
247  protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
248  $this->debug( "add($key)" );
249 
250  $routeKey = $this->validateKeyAndPrependRoute( $key );
251  $result = $this->acquireSyncClient()->add(
252  $routeKey,
253  $value,
254  $this->fixExpiry( $exptime )
255  );
256 
257  return $this->checkResult( $key, $result );
258  }
259 
260  public function incr( $key, $value = 1, $flags = 0 ) {
261  $this->debug( "incr($key)" );
262 
263  $routeKey = $this->validateKeyAndPrependRoute( $key );
264  $result = $this->acquireSyncClient()->increment( $routeKey, $value );
265 
266  return $this->checkResult( $key, $result );
267  }
268 
269  public function decr( $key, $value = 1, $flags = 0 ) {
270  $this->debug( "decr($key)" );
271 
272  $routeKey = $this->validateKeyAndPrependRoute( $key );
273  $result = $this->acquireSyncClient()->decrement( $routeKey, $value );
274 
275  return $this->checkResult( $key, $result );
276  }
277 
278  protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
279  $this->debug( "incrWithInit($key)" );
280 
281  $routeKey = $this->validateKeyAndPrependRoute( $key );
282 
283  $client = $this->acquireSyncClient();
284  $watchPoint = $this->watchErrors();
285  $result = $client->increment( $routeKey, $step );
286  $newValue = $this->checkResult( $key, $result );
287  if ( $newValue === false && !$this->getLastError( $watchPoint ) ) {
288  // No key set; initialize
289  $result = $client->add( $routeKey, $init, $this->fixExpiry( $exptime ) );
290  $newValue = $this->checkResult( $key, $result ) ? $init : false;
291  if ( $newValue === false && !$this->getLastError( $watchPoint ) ) {
292  // Raced out initializing; increment
293  $result = $client->increment( $routeKey, $step );
294  $newValue = $this->checkResult( $key, $result );
295  }
296  }
297 
298  return $newValue;
299  }
300 
301  public function setNewPreparedValues( array $valueByKey ) {
302  // The PECL driver does the serializing and will not reuse anything from here
303  $sizes = [];
304  foreach ( $valueByKey as $value ) {
305  $sizes[] = $this->guessSerialValueSize( $value );
306  }
307 
308  return $sizes;
309  }
310 
322  protected function checkResult( $key, $result ) {
323  static $statusByCode = [
324  Memcached::RES_HOST_LOOKUP_FAILURE => self::ERR_UNREACHABLE,
325  Memcached::RES_SERVER_MARKED_DEAD => self::ERR_UNREACHABLE,
326  Memcached::RES_SERVER_TEMPORARILY_DISABLED => self::ERR_UNREACHABLE,
327  Memcached::RES_UNKNOWN_READ_FAILURE => self::ERR_NO_RESPONSE,
328  Memcached::RES_WRITE_FAILURE => self::ERR_NO_RESPONSE,
329  Memcached::RES_PARTIAL_READ => self::ERR_NO_RESPONSE,
330  // Hard-code values that only exist in recent versions of the PECL extension.
331  // https://github.com/JetBrains/phpstorm-stubs/blob/master/memcached/memcached.php
332  3 /* Memcached::RES_CONNECTION_FAILURE */ => self::ERR_UNREACHABLE,
333  27 /* Memcached::RES_FAIL_UNIX_SOCKET */ => self::ERR_UNREACHABLE,
334  6 /* Memcached::RES_READ_FAILURE */ => self::ERR_NO_RESPONSE
335  ];
336 
337  if ( $result !== false ) {
338  return $result;
339  }
340 
341  $client = $this->syncClient;
342  $code = $client->getResultCode();
343  switch ( $code ) {
344  case Memcached::RES_SUCCESS:
345  break;
346  case Memcached::RES_DATA_EXISTS:
347  case Memcached::RES_NOTSTORED:
348  case Memcached::RES_NOTFOUND:
349  $this->debug( "result: " . $client->getResultMessage() );
350  break;
351  default:
352  $msg = $client->getResultMessage();
353  $logCtx = [];
354  if ( $key !== false ) {
355  $server = $client->getServerByKey( $key );
356  $logCtx['memcached-server'] = "{$server['host']}:{$server['port']}";
357  $logCtx['memcached-key'] = $key;
358  $msg = "Memcached error for key \"{memcached-key}\" " .
359  "on server \"{memcached-server}\": $msg";
360  } else {
361  $msg = "Memcached error: $msg";
362  }
363  $this->logger->error( $msg, $logCtx );
364  $this->setLastError( $statusByCode[$code] ?? self::ERR_UNEXPECTED );
365  }
366  return $result;
367  }
368 
369  protected function doGetMulti( array $keys, $flags = 0 ) {
370  $this->debug( 'getMulti(' . implode( ', ', $keys ) . ')' );
371 
372  $routeKeys = [];
373  foreach ( $keys as $key ) {
374  $routeKeys[] = $this->validateKeyAndPrependRoute( $key );
375  }
376 
377  // The PECL implementation uses multi-key "get"/"gets"; no need to pipeline.
378  // T257003: avoid Memcached::GET_EXTENDED; no tokens are needed and that requires "gets"
379  // https://github.com/libmemcached/libmemcached/blob/eda2becbec24363f56115fa5d16d38a2d1f54775/libmemcached/get.cc#L272
380  $resByRouteKey = $this->acquireSyncClient()->getMulti( $routeKeys ) ?: [];
381 
382  if ( is_array( $resByRouteKey ) ) {
383  $res = [];
384  foreach ( $resByRouteKey as $routeKey => $value ) {
385  $res[$this->stripRouteFromKey( $routeKey )] = $value;
386  }
387  } else {
388  $res = false;
389  }
390 
391  return $this->checkResult( false, $res );
392  }
393 
394  protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
395  $this->debug( 'setMulti(' . implode( ', ', array_keys( $data ) ) . ')' );
396 
397  $exptime = $this->fixExpiry( $exptime );
398  $dataByRouteKey = [];
399  foreach ( $data as $key => $value ) {
400  $dataByRouteKey[$this->validateKeyAndPrependRoute( $key )] = $value;
401  }
402 
403  // The PECL implementation is a naïve for-loop so use async I/O to pipeline;
404  // https://github.com/php-memcached-dev/php-memcached/blob/master/php_memcached.c#L1852
405  if ( $this->fieldHasFlags( $flags, self::WRITE_BACKGROUND ) ) {
406  $client = $this->acquireAsyncClient();
407  // Ignore "failed to set" warning from php-memcached 3.x (T251450)
408  // phpcs:ignore Generic.PHP.NoSilencedErrors.Discouraged
409  $result = @$client->setMulti( $dataByRouteKey, $exptime );
410  $this->releaseAsyncClient( $client );
411  } else {
412  $client = $this->acquireSyncClient();
413  // Ignore "failed to set" warning from php-memcached 3.x (T251450)
414  // phpcs:ignore Generic.PHP.NoSilencedErrors.Discouraged
415  $result = @$client->setMulti( $dataByRouteKey, $exptime );
416  }
417 
418  return $this->checkResult( false, $result );
419  }
420 
421  protected function doDeleteMulti( array $keys, $flags = 0 ) {
422  $this->debug( 'deleteMulti(' . implode( ', ', $keys ) . ')' );
423 
424  $routeKeys = [];
425  foreach ( $keys as $key ) {
426  $routeKeys[] = $this->validateKeyAndPrependRoute( $key );
427  }
428 
429  // The PECL implementation is a naïve for-loop so use async I/O to pipeline;
430  // https://github.com/php-memcached-dev/php-memcached/blob/7443d16d02fb73cdba2e90ae282446f80969229c/php_memcached.c#L1852
431  if ( $this->fieldHasFlags( $flags, self::WRITE_BACKGROUND ) ) {
432  $client = $this->acquireAsyncClient();
433  $resultArray = $client->deleteMulti( $routeKeys ) ?: [];
434  $this->releaseAsyncClient( $client );
435  } else {
436  $resultArray = $this->acquireSyncClient()->deleteMulti( $routeKeys ) ?: [];
437  }
438 
439  $result = true;
440  foreach ( $resultArray as $code ) {
441  if ( !in_array( $code, [ true, Memcached::RES_NOTFOUND ], true ) ) {
442  // "Not found" is counted as success in our interface
443  $result = false;
444  }
445  }
446 
447  return $this->checkResult( false, $result );
448  }
449 
450  protected function doChangeTTL( $key, $exptime, $flags ) {
451  $this->debug( "touch($key)" );
452 
453  $routeKey = $this->validateKeyAndPrependRoute( $key );
454  $result = $this->acquireSyncClient()->touch( $routeKey, $this->fixExpiry( $exptime ) );
455 
456  return $this->checkResult( $key, $result );
457  }
458 
459  protected function serialize( $value ) {
460  if ( is_int( $value ) ) {
461  return $value;
462  }
463 
464  $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER );
465  if ( $serializer === Memcached::SERIALIZER_PHP ) {
466  return serialize( $value );
467  } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
468  return igbinary_serialize( $value );
469  }
470 
471  throw new UnexpectedValueException( __METHOD__ . ": got serializer '$serializer'." );
472  }
473 
474  protected function unserialize( $value ) {
475  if ( $this->isInteger( $value ) ) {
476  return (int)$value;
477  }
478 
479  $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER );
480  if ( $serializer === Memcached::SERIALIZER_PHP ) {
481  return unserialize( $value );
482  } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
483  return igbinary_unserialize( $value );
484  }
485 
486  throw new UnexpectedValueException( __METHOD__ . ": got serializer '$serializer'." );
487  }
488 
492  private function acquireSyncClient() {
493  if ( $this->syncClientIsBuffering ) {
494  throw new RuntimeException( "The main (unbuffered I/O) client is locked" );
495  }
496 
497  if ( $this->hasUnflushedChanges ) {
498  // Force a synchronous flush of async writes so that their changes are visible
499  $this->syncClient->fetch();
500  if ( $this->asyncClient ) {
501  $this->asyncClient->fetch();
502  }
503  $this->hasUnflushedChanges = false;
504  }
505 
506  return $this->syncClient;
507  }
508 
512  private function acquireAsyncClient() {
513  if ( $this->asyncClient ) {
514  return $this->asyncClient; // dedicated buffering instance
515  }
516 
517  // Modify the main instance to temporarily buffer writes
518  $this->syncClientIsBuffering = true;
519  $this->syncClient->setOptions( self::$OPTS_ASYNC_WRITES );
520 
521  return $this->syncClient;
522  }
523 
527  private function releaseAsyncClient( $client ) {
528  $this->hasUnflushedChanges = true;
529 
530  if ( !$this->asyncClient ) {
531  // This is the main instance; make it stop buffering writes again
532  $client->setOptions( self::$OPTS_SYNC_WRITES );
533  $this->syncClientIsBuffering = false;
534  }
535  }
536 }
getLastError( $watchPoint=0)
Get the "last error" registry.
Definition: BagOStuff.php:494
setLastError( $error)
Set the "last error" registry due to a problem encountered during an attempted operation.
Definition: BagOStuff.php:514
watchErrors()
Get a "watch point" token that can be used to get the "last error" to occur after now.
Definition: BagOStuff.php:472
fieldHasFlags( $field, $flags)
Definition: BagOStuff.php:622
guessSerialValueSize( $value, $depth=0, &$loops=0)
Estimate the size of a variable once serialized.
isInteger( $value)
Check if a value is an integer.
Base class for memcached clients.
A wrapper class for the PECL memcached client.
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
bool $syncClientIsBuffering
Whether the non-buffering client is locked from use.
doGet( $key, $flags=0, &$casToken=null)
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
doCas( $casToken, $key, $value, $exptime=0, $flags=0)
Check and set an item.
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
incr( $key, $value=1, $flags=0)
Increase stored value of $key by $value while preserving its TTL.
doChangeTTL( $key, $exptime, $flags)
doDeleteMulti(array $keys, $flags=0)
decr( $key, $value=1, $flags=0)
Decrease stored value of $key by $value while preserving its TTL.
setNewPreparedValues(array $valueByKey)
Make a "generic" reversible cache key from the given components.
initializeClient(Memcached $client, array $params, array $options)
Initialize the client only if needed and reuse it otherwise.
doIncrWithInit( $key, $exptime, $step, $init, $flags)
bool $hasUnflushedChanges
Whether the non-buffering client should be flushed before use.
static array $OPTS_SYNC_WRITES
Memcached options.
doSetMulti(array $data, $exptime=0, $flags=0)
static array $OPTS_ASYNC_WRITES
Memcached options.
checkResult( $key, $result)
Check the return value from a client method call and take any necessary action.
doDelete( $key, $flags=0)
Delete an item.
__construct( $params)
Available parameters are:
const ERR_UNREACHABLE
Storage medium could not be reached to establish a connection.
const ERR_NO_RESPONSE
Storage medium failed to yield a complete response to an operation.
return true
Definition: router.php:90