MediaWiki  master
MemcachedPeclBagOStuff.php
Go to the documentation of this file.
1 <?php
24 use Wikimedia\ScopedCallback;
25 
33  protected $client;
34 
51  public function __construct( $params ) {
52  parent::__construct( $params );
53 
54  // Default class-specific parameters
55  $params += [
56  'compress_threshold' => 1500,
57  'connect_timeout' => 0.5,
58  'timeout' => 500000,
59  'serializer' => 'php',
60  'use_binary_protocol' => false,
61  'allow_tcp_nagle_delay' => true
62  ];
63 
64  if ( $params['persistent'] ) {
65  // The pool ID must be unique to the server/option combination.
66  // The Memcached object is essentially shared for each pool ID.
67  // We can only reuse a pool ID if we keep the config consistent.
68  $connectionPoolId = md5( serialize( $params ) );
69  $client = new Memcached( $connectionPoolId );
70  } else {
71  $client = new Memcached();
72  }
73 
74  $this->initializeClient( $client, $params );
75 
76  $this->client = $client;
77  // The compression threshold is an undocumented php.ini option for some
78  // reason. There's probably not much harm in setting it globally, for
79  // compatibility with the settings for the PHP client.
80  ini_set( 'memcached.compression_threshold', $params['compress_threshold'] );
81  }
82 
91  private function initializeClient( Memcached $client, array $params ) {
92  if ( $client->getServerList() ) {
93  $this->logger->debug( __METHOD__ . ": pre-initialized client instance." );
94 
95  return; // preserve persistent handle
96  }
97 
98  $this->logger->debug( __METHOD__ . ": initializing new client instance." );
99 
100  $options = [
101  Memcached::OPT_NO_BLOCK => false,
102  Memcached::OPT_BUFFER_WRITES => false,
103  Memcached::OPT_NOREPLY => false,
104  // Network protocol (ASCII or binary)
105  Memcached::OPT_BINARY_PROTOCOL => $params['use_binary_protocol'],
106  // Set various network timeouts
107  Memcached::OPT_CONNECT_TIMEOUT => $params['connect_timeout'] * 1000,
108  Memcached::OPT_SEND_TIMEOUT => $params['timeout'],
109  Memcached::OPT_RECV_TIMEOUT => $params['timeout'],
110  Memcached::OPT_POLL_TIMEOUT => $params['timeout'] / 1000,
111  // Avoid pointless delay when sending/fetching large blobs
112  Memcached::OPT_TCP_NODELAY => !$params['allow_tcp_nagle_delay'],
113  // Set libketama mode since it's recommended by the documentation
114  Memcached::OPT_LIBKETAMA_COMPATIBLE => true
115  ];
116  if ( isset( $params['retry_timeout'] ) ) {
117  $options[Memcached::OPT_RETRY_TIMEOUT] = $params['retry_timeout'];
118  }
119  if ( isset( $params['server_failure_limit'] ) ) {
120  $options[Memcached::OPT_SERVER_FAILURE_LIMIT] = $params['server_failure_limit'];
121  }
122  if ( $params['serializer'] === 'php' ) {
123  $options[Memcached::OPT_SERIALIZER] = Memcached::SERIALIZER_PHP;
124  } elseif ( $params['serializer'] === 'igbinary' ) {
125  // @phan-suppress-next-line PhanImpossibleCondition
126  if ( !Memcached::HAVE_IGBINARY ) {
127  throw new RuntimeException(
128  __CLASS__ . ': the igbinary extension is not available ' .
129  'but igbinary serialization was requested.'
130  );
131  }
132  $options[Memcached::OPT_SERIALIZER] = Memcached::SERIALIZER_IGBINARY;
133  }
134 
135  if ( !$client->setOptions( $options ) ) {
136  throw new RuntimeException(
137  "Invalid options: " . json_encode( $options, JSON_PRETTY_PRINT )
138  );
139  }
140 
141  $servers = [];
142  foreach ( $params['servers'] as $host ) {
143  if ( preg_match( '/^\[(.+)\]:(\d+)$/', $host, $m ) ) {
144  $servers[] = [ $m[1], (int)$m[2] ]; // (ip, port)
145  } elseif ( preg_match( '/^([^:]+):(\d+)$/', $host, $m ) ) {
146  $servers[] = [ $m[1], (int)$m[2] ]; // (ip or path, port)
147  } else {
148  $servers[] = [ $host, false ]; // (ip or path, port)
149  }
150  }
151 
152  if ( !$client->addServers( $servers ) ) {
153  throw new RuntimeException( "Failed to inject server address list" );
154  }
155  }
156 
165  private function noReplyScope( $flags ) {
166  if ( $flags !== true && !( $flags & self::WRITE_BACKGROUND ) ) {
167  return null;
168  }
170  $client->setOption( Memcached::OPT_NOREPLY, true );
171  return new ScopedCallback( static function () use ( $client ) {
172  $client->setOption( Memcached::OPT_NOREPLY, false );
173  } );
174  }
175 
176  protected function doGet( $key, $flags = 0, &$casToken = null ) {
177  $getToken = ( $casToken === self::PASS_BY_REF );
178  $casToken = null;
179 
180  $this->debug( "get($key)" );
181 
182  $routeKey = $this->validateKeyAndPrependRoute( $key );
183 
184  // T257003: only require "gets" (instead of "get") when a CAS token is needed
185  if ( $getToken ) {
187  $flags = Memcached::GET_EXTENDED;
188  $res = $this->client->get( $routeKey, null, $flags );
189  if ( is_array( $res ) ) {
190  $result = $res['value'];
191  $casToken = $res['cas'];
192  } else {
193  $result = false;
194  }
195  } else {
196  $result = $this->client->get( $routeKey );
197  }
198 
199  return $this->checkResult( $key, $result );
200  }
201 
202  protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
203  $this->debug( "set($key)" );
204 
205  $routeKey = $this->validateKeyAndPrependRoute( $key );
206 
207  $noReplyScope = $this->noReplyScope( $flags );
208  $result = $this->client->set( $routeKey, $value, $this->fixExpiry( $exptime ) );
209  ScopedCallback::consume( $noReplyScope );
210 
211  return ( !$result && $this->client->getResultCode() === Memcached::RES_NOTSTORED )
212  // "Not stored" is always used as the mcrouter response with AllAsyncRoute
213  ? true
214  : $this->checkResult( $key, $result );
215  }
216 
217  protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
218  $this->debug( "cas($key)" );
219 
220  $routeKey = $this->validateKeyAndPrependRoute( $key );
221  $result = $this->client->cas(
222  $casToken,
223  $routeKey,
224  $value, $this->fixExpiry( $exptime )
225  );
226 
227  return $this->checkResult( $key, $result );
228  }
229 
230  protected function doDelete( $key, $flags = 0 ) {
231  $this->debug( "delete($key)" );
232 
233  $routeKey = $this->validateKeyAndPrependRoute( $key );
234  $noReplyScope = $this->noReplyScope( $flags );
235  $result = $this->client->delete( $routeKey );
236  ScopedCallback::consume( $noReplyScope );
237 
238  return ( !$result && $this->client->getResultCode() === Memcached::RES_NOTFOUND )
239  // "Not found" is counted as success in our interface
240  ? true
241  : $this->checkResult( $key, $result );
242  }
243 
244  protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
245  $this->debug( "add($key)" );
246 
247  $routeKey = $this->validateKeyAndPrependRoute( $key );
248  $noReplyScope = $this->noReplyScope( $flags );
249  $result = $this->client->add(
250  $routeKey,
251  $value,
252  $this->fixExpiry( $exptime )
253  );
254  ScopedCallback::consume( $noReplyScope );
255 
256  return $this->checkResult( $key, $result );
257  }
258 
259  protected function doIncrWithInitAsync( $key, $exptime, $step, $init ) {
260  $this->debug( "incrWithInit($key)" );
261  $routeKey = $this->validateKeyAndPrependRoute( $key );
262  $watchPoint = $this->watchErrors();
263  $scope = $this->noReplyScope( true );
264  $this->checkResult( $key, $this->client->add( $routeKey, $init - $step, $this->fixExpiry( $exptime ) ) );
265  $this->checkResult( $key, $this->client->increment( $routeKey, $step ) );
266  ScopedCallback::consume( $scope );
267  $lastError = $this->getLastError( $watchPoint );
268  return !$lastError;
269  }
270 
271  protected function doIncrWithInitSync( $key, $exptime, $step, $init ) {
272  $this->debug( "incrWithInit($key)" );
273  $routeKey = $this->validateKeyAndPrependRoute( $key );
274  $watchPoint = $this->watchErrors();
275  $result = $this->client->increment( $routeKey, $step );
276  $newValue = $this->checkResult( $key, $result );
277  if ( $newValue === false && !$this->getLastError( $watchPoint ) ) {
278  // No key set; initialize
279  $result = $this->client->add( $routeKey, $init, $this->fixExpiry( $exptime ) );
280  $newValue = $this->checkResult( $key, $result ) ? $init : false;
281  if ( $newValue === false && !$this->getLastError( $watchPoint ) ) {
282  // Raced out initializing; increment
283  $result = $this->client->increment( $routeKey, $step );
284  $newValue = $this->checkResult( $key, $result );
285  }
286  }
287 
288  return $newValue;
289  }
290 
302  protected function checkResult( $key, $result ) {
303  static $statusByCode = [
304  Memcached::RES_HOST_LOOKUP_FAILURE => self::ERR_UNREACHABLE,
305  Memcached::RES_SERVER_MARKED_DEAD => self::ERR_UNREACHABLE,
306  Memcached::RES_SERVER_TEMPORARILY_DISABLED => self::ERR_UNREACHABLE,
307  Memcached::RES_UNKNOWN_READ_FAILURE => self::ERR_NO_RESPONSE,
308  Memcached::RES_WRITE_FAILURE => self::ERR_NO_RESPONSE,
309  Memcached::RES_PARTIAL_READ => self::ERR_NO_RESPONSE,
310  // Hard-code values that only exist in recent versions of the PECL extension.
311  // https://github.com/JetBrains/phpstorm-stubs/blob/master/memcached/memcached.php
312  3 /* Memcached::RES_CONNECTION_FAILURE */ => self::ERR_UNREACHABLE,
313  27 /* Memcached::RES_FAIL_UNIX_SOCKET */ => self::ERR_UNREACHABLE,
314  6 /* Memcached::RES_READ_FAILURE */ => self::ERR_NO_RESPONSE
315  ];
316 
317  if ( $result !== false ) {
318  return $result;
319  }
320 
322  $code = $client->getResultCode();
323  switch ( $code ) {
324  case Memcached::RES_SUCCESS:
325  break;
326  case Memcached::RES_DATA_EXISTS:
327  case Memcached::RES_NOTSTORED:
328  case Memcached::RES_NOTFOUND:
329  $this->debug( "result: " . $client->getResultMessage() );
330  break;
331  default:
332  $msg = $client->getResultMessage();
333  $logCtx = [];
334  if ( $key !== false ) {
335  $server = $client->getServerByKey( $key );
336  $logCtx['memcached-server'] = "{$server['host']}:{$server['port']}";
337  $logCtx['memcached-key'] = $key;
338  $msg = "Memcached error for key \"{memcached-key}\" " .
339  "on server \"{memcached-server}\": $msg";
340  } else {
341  $msg = "Memcached error: $msg";
342  }
343  $this->logger->error( $msg, $logCtx );
344  $this->setLastError( $statusByCode[$code] ?? self::ERR_UNEXPECTED );
345  }
346  return $result;
347  }
348 
349  protected function doGetMulti( array $keys, $flags = 0 ) {
350  $this->debug( 'getMulti(' . implode( ', ', $keys ) . ')' );
351 
352  $routeKeys = [];
353  foreach ( $keys as $key ) {
354  $routeKeys[] = $this->validateKeyAndPrependRoute( $key );
355  }
356 
357  // The PECL implementation uses multi-key "get"/"gets"; no need to pipeline.
358  // T257003: avoid Memcached::GET_EXTENDED; no tokens are needed and that requires "gets"
359  // https://github.com/libmemcached/libmemcached/blob/eda2becbec24363f56115fa5d16d38a2d1f54775/libmemcached/get.cc#L272
360  $resByRouteKey = $this->client->getMulti( $routeKeys );
361 
362  if ( is_array( $resByRouteKey ) ) {
363  $res = [];
364  foreach ( $resByRouteKey as $routeKey => $value ) {
365  $res[$this->stripRouteFromKey( $routeKey )] = $value;
366  }
367  } else {
368  $res = false;
369  }
370 
371  $res = $this->checkResult( false, $res );
372  return $res !== false ? $res : [];
373  }
374 
375  protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
376  $this->debug( 'setMulti(' . implode( ', ', array_keys( $data ) ) . ')' );
377 
378  $exptime = $this->fixExpiry( $exptime );
379  $dataByRouteKey = [];
380  foreach ( $data as $key => $value ) {
381  $dataByRouteKey[$this->validateKeyAndPrependRoute( $key )] = $value;
382  }
383 
384  $noReplyScope = $this->noReplyScope( $flags );
385 
386  // Ignore "failed to set" warning from php-memcached 3.x (T251450)
387  // phpcs:ignore Generic.PHP.NoSilencedErrors.Discouraged
388  $result = @$this->client->setMulti( $dataByRouteKey, $exptime );
389  ScopedCallback::consume( $noReplyScope );
390  return $this->checkResult( false, $result );
391  }
392 
393  protected function doDeleteMulti( array $keys, $flags = 0 ) {
394  $this->debug( 'deleteMulti(' . implode( ', ', $keys ) . ')' );
395 
396  $routeKeys = [];
397  foreach ( $keys as $key ) {
398  $routeKeys[] = $this->validateKeyAndPrependRoute( $key );
399  }
400 
401  $noReplyScope = $this->noReplyScope( $flags );
402  $resultArray = $this->client->deleteMulti( $routeKeys ) ?: [];
403  ScopedCallback::consume( $noReplyScope );
404 
405  $result = true;
406  foreach ( $resultArray as $code ) {
407  if ( !in_array( $code, [ true, Memcached::RES_NOTFOUND ], true ) ) {
408  // "Not found" is counted as success in our interface
409  $result = false;
410  }
411  }
412 
413  return $this->checkResult( false, $result );
414  }
415 
416  protected function doChangeTTL( $key, $exptime, $flags ) {
417  $this->debug( "touch($key)" );
418 
419  $routeKey = $this->validateKeyAndPrependRoute( $key );
420  // Avoid NO_REPLY due to libmemcached hang
421  // https://phabricator.wikimedia.org/T310662#8031692
422  $result = $this->client->touch( $routeKey, $this->fixExpiry( $exptime ) );
423 
424  return $this->checkResult( $key, $result );
425  }
426 
427  protected function serialize( $value ) {
428  if ( is_int( $value ) ) {
429  return $value;
430  }
431 
432  $serializer = $this->client->getOption( Memcached::OPT_SERIALIZER );
433  if ( $serializer === Memcached::SERIALIZER_PHP ) {
434  return serialize( $value );
435  } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
436  return igbinary_serialize( $value );
437  }
438 
439  throw new UnexpectedValueException( __METHOD__ . ": got serializer '$serializer'." );
440  }
441 
442  protected function unserialize( $value ) {
443  if ( $this->isInteger( $value ) ) {
444  return (int)$value;
445  }
446 
447  $serializer = $this->client->getOption( Memcached::OPT_SERIALIZER );
448  if ( $serializer === Memcached::SERIALIZER_PHP ) {
449  return unserialize( $value );
450  } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
451  return igbinary_unserialize( $value );
452  }
453 
454  throw new UnexpectedValueException( __METHOD__ . ": got serializer '$serializer'." );
455  }
456 }
getLastError( $watchPoint=0)
Get the "last error" registry.
Definition: BagOStuff.php:477
setLastError( $error)
Set the "last error" registry due to a problem encountered during an attempted operation.
Definition: BagOStuff.php:497
watchErrors()
Get a "watch point" token that can be used to get the "last error" to occur after now.
Definition: BagOStuff.php:455
int $lastError
BagOStuff:ERR_* constant of the last error that occurred.
Definition: BagOStuff.php:99
const PASS_BY_REF
Idiom for doGet() to return extra information by reference.
isInteger( $value)
Check if a value is an integer.
Base class for memcached clients.
A wrapper class for the PECL memcached client.
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
doGet( $key, $flags=0, &$casToken=null)
Get an item.
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
doCas( $casToken, $key, $value, $exptime=0, $flags=0)
Set an item if the current CAS token matches the provided CAS token.
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
doChangeTTL( $key, $exptime, $flags)
doDeleteMulti(array $keys, $flags=0)
doSetMulti(array $data, $exptime=0, $flags=0)
checkResult( $key, $result)
Check the return value from a client method call and take any necessary action.
doDelete( $key, $flags=0)
Delete an item.
__construct( $params)
Available parameters are:
doIncrWithInitAsync( $key, $exptime, $step, $init)
doIncrWithInitSync( $key, $exptime, $step, $init)
const ERR_UNREACHABLE
Storage medium could not be reached to establish a connection.
const ERR_NO_RESPONSE
Storage medium failed to yield a complete response to an operation.
return true
Definition: router.php:90