MediaWiki  master
MemcachedPeclBagOStuff.php
Go to the documentation of this file.
1 <?php
31  protected $syncClient;
33  protected $asyncClient;
34 
36  protected $syncClientIsBuffering = false;
38  protected $hasUnflushedChanges = false;
39 
41  private static $OPTS_SYNC_WRITES = [
42  Memcached::OPT_NO_BLOCK => false, // async I/O (using TCP buffers)
43  Memcached::OPT_BUFFER_WRITES => false // libmemcached buffers
44  ];
46  private static $OPTS_ASYNC_WRITES = [
47  Memcached::OPT_NO_BLOCK => true, // async I/O (using TCP buffers)
48  Memcached::OPT_BUFFER_WRITES => true // libmemcached buffers
49  ];
50 
67  public function __construct( $params ) {
68  parent::__construct( $params );
69 
70  // Default class-specific parameters
71  $params += [
72  'compress_threshold' => 1500,
73  'connect_timeout' => 0.5,
74  'serializer' => 'php',
75  'use_binary_protocol' => false,
76  'allow_tcp_nagle_delay' => true
77  ];
78 
79  if ( $params['persistent'] ) {
80  // The pool ID must be unique to the server/option combination.
81  // The Memcached object is essentially shared for each pool ID.
82  // We can only reuse a pool ID if we keep the config consistent.
83  $connectionPoolId = md5( serialize( $params ) );
84  $syncClient = new Memcached( "$connectionPoolId-sync" );
85  // Avoid clobbering the main thread-shared Memcached instance
86  $asyncClient = new Memcached( "$connectionPoolId-async" );
87  } else {
88  $syncClient = new Memcached();
89  $asyncClient = null;
90  }
91 
92  $this->initializeClient( $syncClient, $params, self::$OPTS_SYNC_WRITES );
93  if ( $asyncClient ) {
94  $this->initializeClient( $asyncClient, $params, self::$OPTS_ASYNC_WRITES );
95  }
96 
97  // Set the main client and any dedicated one for buffered writes
98  $this->syncClient = $syncClient;
99  $this->asyncClient = $asyncClient;
100  // The compression threshold is an undocumented php.ini option for some
101  // reason. There's probably not much harm in setting it globally, for
102  // compatibility with the settings for the PHP client.
103  ini_set( 'memcached.compression_threshold', $params['compress_threshold'] );
104  }
105 
115  private function initializeClient( Memcached $client, array $params, array $options ) {
116  if ( $client->getServerList() ) {
117  $this->logger->debug( __METHOD__ . ": pre-initialized client instance." );
118 
119  return; // preserve persistent handle
120  }
121 
122  $this->logger->debug( __METHOD__ . ": initializing new client instance." );
123 
124  $options += [
125  Memcached::OPT_NO_BLOCK => false,
126  Memcached::OPT_BUFFER_WRITES => false,
127  // Network protocol (ASCII or binary)
128  Memcached::OPT_BINARY_PROTOCOL => $params['use_binary_protocol'],
129  // Set various network timeouts
130  Memcached::OPT_CONNECT_TIMEOUT => $params['connect_timeout'] * 1000,
131  Memcached::OPT_SEND_TIMEOUT => $params['timeout'],
132  Memcached::OPT_RECV_TIMEOUT => $params['timeout'],
133  Memcached::OPT_POLL_TIMEOUT => $params['timeout'] / 1000,
134  // Avoid pointless delay when sending/fetching large blobs
135  Memcached::OPT_TCP_NODELAY => !$params['allow_tcp_nagle_delay'],
136  // Set libketama mode since it's recommended by the documentation
137  Memcached::OPT_LIBKETAMA_COMPATIBLE => true
138  ];
139  if ( isset( $params['retry_timeout'] ) ) {
140  $options[Memcached::OPT_RETRY_TIMEOUT] = $params['retry_timeout'];
141  }
142  if ( isset( $params['server_failure_limit'] ) ) {
143  $options[Memcached::OPT_SERVER_FAILURE_LIMIT] = $params['server_failure_limit'];
144  }
145  if ( $params['serializer'] === 'php' ) {
146  $options[Memcached::OPT_SERIALIZER] = Memcached::SERIALIZER_PHP;
147  } elseif ( $params['serializer'] === 'igbinary' ) {
148  // @phan-suppress-next-line PhanImpossibleCondition
149  if ( !Memcached::HAVE_IGBINARY ) {
150  throw new RuntimeException(
151  __CLASS__ . ': the igbinary extension is not available ' .
152  'but igbinary serialization was requested.'
153  );
154  }
155  $options[Memcached::OPT_SERIALIZER] = Memcached::SERIALIZER_IGBINARY;
156  }
157 
158  if ( !$client->setOptions( $options ) ) {
159  throw new RuntimeException(
160  "Invalid options: " . json_encode( $options, JSON_PRETTY_PRINT )
161  );
162  }
163 
164  $servers = [];
165  foreach ( $params['servers'] as $host ) {
166  if ( preg_match( '/^\[(.+)\]:(\d+)$/', $host, $m ) ) {
167  $servers[] = [ $m[1], (int)$m[2] ]; // (ip, port)
168  } elseif ( preg_match( '/^([^:]+):(\d+)$/', $host, $m ) ) {
169  $servers[] = [ $m[1], (int)$m[2] ]; // (ip or path, port)
170  } else {
171  $servers[] = [ $host, false ]; // (ip or path, port)
172  }
173  }
174 
175  if ( !$client->addServers( $servers ) ) {
176  throw new RuntimeException( "Failed to inject server address list" );
177  }
178  }
179 
180  protected function doGet( $key, $flags = 0, &$casToken = null ) {
181  $getToken = ( $casToken === self::PASS_BY_REF );
182  $casToken = null;
183 
184  $this->debug( "get($key)" );
185 
186  $routeKey = $this->validateKeyAndPrependRoute( $key );
187 
188  $client = $this->acquireSyncClient();
189  // T257003: only require "gets" (instead of "get") when a CAS token is needed
190  if ( $getToken ) {
192  $flags = Memcached::GET_EXTENDED;
193  $res = $client->get( $routeKey, null, $flags );
194  if ( is_array( $res ) ) {
195  $result = $res['value'];
196  $casToken = $res['cas'];
197  } else {
198  $result = false;
199  $casToken = null;
200  }
201  } else {
202  $result = $client->get( $routeKey );
203  }
204 
205  return $this->checkResult( $key, $result );
206  }
207 
208  protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
209  $this->debug( "set($key)" );
210 
211  $routeKey = $this->validateKeyAndPrependRoute( $key );
212  $client = $this->acquireSyncClient();
213  $result = $client->set( $routeKey, $value, $this->fixExpiry( $exptime ) );
214 
215  return ( $result === false && $client->getResultCode() === Memcached::RES_NOTSTORED )
216  // "Not stored" is always used as the mcrouter response with AllAsyncRoute
217  ? true
218  : $this->checkResult( $key, $result );
219  }
220 
221  protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
222  $this->debug( "cas($key)" );
223 
224  $routeKey = $this->validateKeyAndPrependRoute( $key );
225  $result = $this->acquireSyncClient()->cas(
226  $casToken,
227  $routeKey,
228  $value, $this->fixExpiry( $exptime )
229  );
230 
231  return $this->checkResult( $key, $result );
232  }
233 
234  protected function doDelete( $key, $flags = 0 ) {
235  $this->debug( "delete($key)" );
236 
237  $routeKey = $this->validateKeyAndPrependRoute( $key );
238  $client = $this->acquireSyncClient();
239  $result = $client->delete( $routeKey );
240 
241  return ( $result === false && $client->getResultCode() === Memcached::RES_NOTFOUND )
242  // "Not found" is counted as success in our interface
243  ? true
244  : $this->checkResult( $key, $result );
245  }
246 
247  protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
248  $this->debug( "add($key)" );
249 
250  $routeKey = $this->validateKeyAndPrependRoute( $key );
251  $result = $this->acquireSyncClient()->add(
252  $routeKey,
253  $value,
254  $this->fixExpiry( $exptime )
255  );
256 
257  return $this->checkResult( $key, $result );
258  }
259 
260  public function incr( $key, $value = 1, $flags = 0 ) {
261  $this->debug( "incr($key)" );
262 
263  $routeKey = $this->validateKeyAndPrependRoute( $key );
264  $result = $this->acquireSyncClient()->increment( $routeKey, $value );
265 
266  return $this->checkResult( $key, $result );
267  }
268 
269  public function decr( $key, $value = 1, $flags = 0 ) {
270  $this->debug( "decr($key)" );
271 
272  $routeKey = $this->validateKeyAndPrependRoute( $key );
273  $result = $this->acquireSyncClient()->decrement( $routeKey, $value );
274 
275  return $this->checkResult( $key, $result );
276  }
277 
278  public function setNewPreparedValues( array $valueByKey ) {
279  // The PECL driver does the serializing and will not reuse anything from here
280  $sizes = [];
281  foreach ( $valueByKey as $value ) {
282  $sizes[] = $this->guessSerialValueSize( $value );
283  }
284 
285  return $sizes;
286  }
287 
299  protected function checkResult( $key, $result ) {
300  if ( $result !== false ) {
301  return $result;
302  }
303 
304  $client = $this->syncClient;
305  switch ( $client->getResultCode() ) {
306  case Memcached::RES_SUCCESS:
307  break;
308  case Memcached::RES_DATA_EXISTS:
309  case Memcached::RES_NOTSTORED:
310  case Memcached::RES_NOTFOUND:
311  $this->debug( "result: " . $client->getResultMessage() );
312  break;
313  default:
314  $msg = $client->getResultMessage();
315  $logCtx = [];
316  if ( $key !== false ) {
317  $server = $client->getServerByKey( $key );
318  $logCtx['memcached-server'] = "{$server['host']}:{$server['port']}";
319  $logCtx['memcached-key'] = $key;
320  $msg = "Memcached error for key \"{memcached-key}\" " .
321  "on server \"{memcached-server}\": $msg";
322  } else {
323  $msg = "Memcached error: $msg";
324  }
325  $this->logger->error( $msg, $logCtx );
326  $this->setLastError( BagOStuff::ERR_UNEXPECTED );
327  }
328  return $result;
329  }
330 
331  protected function doGetMulti( array $keys, $flags = 0 ) {
332  $this->debug( 'getMulti(' . implode( ', ', $keys ) . ')' );
333 
334  $routeKeys = [];
335  foreach ( $keys as $key ) {
336  $routeKeys[] = $this->validateKeyAndPrependRoute( $key );
337  }
338 
339  // The PECL implementation uses multi-key "get"/"gets"; no need to pipeline.
340  // T257003: avoid Memcached::GET_EXTENDED; no tokens are needed and that requires "gets"
341  // https://github.com/libmemcached/libmemcached/blob/eda2becbec24363f56115fa5d16d38a2d1f54775/libmemcached/get.cc#L272
342  $resByRouteKey = $this->acquireSyncClient()->getMulti( $routeKeys ) ?: [];
343 
344  if ( is_array( $resByRouteKey ) ) {
345  $res = [];
346  foreach ( $resByRouteKey as $routeKey => $value ) {
347  $res[$this->stripRouteFromKey( $routeKey )] = $value;
348  }
349  } else {
350  $res = false;
351  }
352 
353  return $this->checkResult( false, $res );
354  }
355 
356  protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
357  $this->debug( 'setMulti(' . implode( ', ', array_keys( $data ) ) . ')' );
358 
359  $exptime = $this->fixExpiry( $exptime );
360  $dataByRouteKey = [];
361  foreach ( $data as $key => $value ) {
362  $dataByRouteKey[$this->validateKeyAndPrependRoute( $key )] = $value;
363  }
364 
365  // The PECL implementation is a naïve for-loop so use async I/O to pipeline;
366  // https://github.com/php-memcached-dev/php-memcached/blob/master/php_memcached.c#L1852
367  if ( $this->fieldHasFlags( $flags, self::WRITE_BACKGROUND ) ) {
368  $client = $this->acquireAsyncClient();
369  $result = $client->setMulti( $dataByRouteKey, $exptime );
370  $this->releaseAsyncClient( $client );
371  } else {
372  $result = $this->acquireSyncClient()->setMulti( $dataByRouteKey, $exptime );
373  }
374 
375  return $this->checkResult( false, $result );
376  }
377 
378  protected function doDeleteMulti( array $keys, $flags = 0 ) {
379  $this->debug( 'deleteMulti(' . implode( ', ', $keys ) . ')' );
380 
381  $routeKeys = [];
382  foreach ( $keys as $key ) {
383  $routeKeys[] = $this->validateKeyAndPrependRoute( $key );
384  }
385 
386  // The PECL implementation is a naïve for-loop so use async I/O to pipeline;
387  // https://github.com/php-memcached-dev/php-memcached/blob/7443d16d02fb73cdba2e90ae282446f80969229c/php_memcached.c#L1852
388  if ( $this->fieldHasFlags( $flags, self::WRITE_BACKGROUND ) ) {
389  $client = $this->acquireAsyncClient();
390  $resultArray = $client->deleteMulti( $routeKeys ) ?: [];
391  $this->releaseAsyncClient( $client );
392  } else {
393  $resultArray = $this->acquireSyncClient()->deleteMulti( $routeKeys ) ?: [];
394  }
395 
396  $result = true;
397  foreach ( $resultArray as $code ) {
398  if ( !in_array( $code, [ true, Memcached::RES_NOTFOUND ], true ) ) {
399  // "Not found" is counted as success in our interface
400  $result = false;
401  }
402  }
403 
404  return $this->checkResult( false, $result );
405  }
406 
407  protected function doChangeTTL( $key, $exptime, $flags ) {
408  $this->debug( "touch($key)" );
409 
410  $routeKey = $this->validateKeyAndPrependRoute( $key );
411  $result = $this->acquireSyncClient()->touch( $routeKey, $this->fixExpiry( $exptime ) );
412 
413  return $this->checkResult( $key, $result );
414  }
415 
416  protected function serialize( $value ) {
417  if ( is_int( $value ) ) {
418  return $value;
419  }
420 
421  $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER );
422  if ( $serializer === Memcached::SERIALIZER_PHP ) {
423  return serialize( $value );
424  } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
425  return igbinary_serialize( $value );
426  }
427 
428  throw new UnexpectedValueException( __METHOD__ . ": got serializer '$serializer'." );
429  }
430 
431  protected function unserialize( $value ) {
432  if ( $this->isInteger( $value ) ) {
433  return (int)$value;
434  }
435 
436  $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER );
437  if ( $serializer === Memcached::SERIALIZER_PHP ) {
438  return unserialize( $value );
439  } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
440  return igbinary_unserialize( $value );
441  }
442 
443  throw new UnexpectedValueException( __METHOD__ . ": got serializer '$serializer'." );
444  }
445 
449  private function acquireSyncClient() {
450  if ( $this->syncClientIsBuffering ) {
451  throw new RuntimeException( "The main (unbuffered I/O) client is locked" );
452  }
453 
454  if ( $this->hasUnflushedChanges ) {
455  // Force a synchronous flush of async writes so that their changes are visible
456  $this->syncClient->fetch();
457  if ( $this->asyncClient ) {
458  $this->asyncClient->fetch();
459  }
460  $this->hasUnflushedChanges = false;
461  }
462 
463  return $this->syncClient;
464  }
465 
469  private function acquireAsyncClient() {
470  if ( $this->asyncClient ) {
471  return $this->asyncClient; // dedicated buffering instance
472  }
473 
474  // Modify the main instance to temporarily buffer writes
475  $this->syncClientIsBuffering = true;
476  $this->syncClient->setOptions( self::$OPTS_ASYNC_WRITES );
477 
478  return $this->syncClient;
479  }
480 
484  private function releaseAsyncClient( $client ) {
485  $this->hasUnflushedChanges = true;
486 
487  if ( !$this->asyncClient ) {
488  // This is the main instance; make it stop buffering writes again
489  $client->setOptions( self::$OPTS_SYNC_WRITES );
490  $this->syncClientIsBuffering = false;
491  }
492  }
493 }
MemcachedPeclBagOStuff\$hasUnflushedChanges
bool $hasUnflushedChanges
Whether the non-buffering client should be flushed before use.
Definition: MemcachedPeclBagOStuff.php:38
MediumSpecificBagOStuff\setLastError
setLastError( $err)
Set the "last error" registry.
Definition: MediumSpecificBagOStuff.php:758
MediumSpecificBagOStuff\isInteger
isInteger( $value)
Check if a value is an integer.
Definition: MediumSpecificBagOStuff.php:881
MemcachedPeclBagOStuff\doDeleteMulti
doDeleteMulti(array $keys, $flags=0)
Definition: MemcachedPeclBagOStuff.php:378
MemcachedPeclBagOStuff\incr
incr( $key, $value=1, $flags=0)
Increase stored value of $key by $value while preserving its TTL.
Definition: MemcachedPeclBagOStuff.php:260
MemcachedPeclBagOStuff\initializeClient
initializeClient(Memcached $client, array $params, array $options)
Initialize the client only if needed and reuse it otherwise.
Definition: MemcachedPeclBagOStuff.php:115
MemcachedPeclBagOStuff\acquireSyncClient
acquireSyncClient()
Definition: MemcachedPeclBagOStuff.php:449
MemcachedPeclBagOStuff\acquireAsyncClient
acquireAsyncClient()
Definition: MemcachedPeclBagOStuff.php:469
MediumSpecificBagOStuff\guessSerialValueSize
guessSerialValueSize( $value, $depth=0, &$loops=0)
Estimate the size of a variable once serialized.
Definition: MediumSpecificBagOStuff.php:999
MemcachedBagOStuff
Base class for memcached clients.
Definition: MemcachedBagOStuff.php:29
MediumSpecificBagOStuff\debug
debug( $text)
Definition: MediumSpecificBagOStuff.php:1064
MemcachedPeclBagOStuff\doAdd
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
Definition: MemcachedPeclBagOStuff.php:247
MemcachedBagOStuff\stripRouteFromKey
stripRouteFromKey( $key)
Definition: MemcachedBagOStuff.php:130
MemcachedPeclBagOStuff\doChangeTTL
doChangeTTL( $key, $exptime, $flags)
Definition: MemcachedPeclBagOStuff.php:407
true
return true
Definition: router.php:90
MemcachedPeclBagOStuff\doSet
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
Definition: MemcachedPeclBagOStuff.php:208
MemcachedPeclBagOStuff\__construct
__construct( $params)
Available parameters are:
Definition: MemcachedPeclBagOStuff.php:67
MemcachedPeclBagOStuff\$OPTS_SYNC_WRITES
static array $OPTS_SYNC_WRITES
Memcached options.
Definition: MemcachedPeclBagOStuff.php:41
MemcachedPeclBagOStuff\checkResult
checkResult( $key, $result)
Check the return value from a client method call and take any necessary action.
Definition: MemcachedPeclBagOStuff.php:299
$res
$res
Definition: testCompression.php:57
MemcachedPeclBagOStuff\$asyncClient
Memcached null $asyncClient
Definition: MemcachedPeclBagOStuff.php:33
MemcachedPeclBagOStuff\doGetMulti
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
Definition: MemcachedPeclBagOStuff.php:331
MemcachedPeclBagOStuff\unserialize
unserialize( $value)
Definition: MemcachedPeclBagOStuff.php:431
MemcachedPeclBagOStuff\doGet
doGet( $key, $flags=0, &$casToken=null)
Definition: MemcachedPeclBagOStuff.php:180
MemcachedPeclBagOStuff\serialize
serialize( $value)
Definition: MemcachedPeclBagOStuff.php:416
MemcachedBagOStuff\fixExpiry
fixExpiry( $exptime)
Definition: MemcachedBagOStuff.php:147
MemcachedPeclBagOStuff\decr
decr( $key, $value=1, $flags=0)
Decrease stored value of $key by $value while preserving its TTL.
Definition: MemcachedPeclBagOStuff.php:269
MemcachedPeclBagOStuff\doSetMulti
doSetMulti(array $data, $exptime=0, $flags=0)
Definition: MemcachedPeclBagOStuff.php:356
MemcachedBagOStuff\validateKeyAndPrependRoute
validateKeyAndPrependRoute( $key)
Definition: MemcachedBagOStuff.php:112
MemcachedPeclBagOStuff\$OPTS_ASYNC_WRITES
static array $OPTS_ASYNC_WRITES
Memcached options.
Definition: MemcachedPeclBagOStuff.php:46
MemcachedPeclBagOStuff
A wrapper class for the PECL memcached client.
Definition: MemcachedPeclBagOStuff.php:29
BagOStuff\fieldHasFlags
fieldHasFlags( $field, $flags)
Definition: BagOStuff.php:509
$keys
$keys
Definition: testCompression.php:72
MemcachedPeclBagOStuff\$syncClient
Memcached $syncClient
Definition: MemcachedPeclBagOStuff.php:31
MemcachedPeclBagOStuff\releaseAsyncClient
releaseAsyncClient( $client)
Definition: MemcachedPeclBagOStuff.php:484
MemcachedPeclBagOStuff\doDelete
doDelete( $key, $flags=0)
Delete an item.
Definition: MemcachedPeclBagOStuff.php:234
MemcachedPeclBagOStuff\doCas
doCas( $casToken, $key, $value, $exptime=0, $flags=0)
Check and set an item.
Definition: MemcachedPeclBagOStuff.php:221
MemcachedPeclBagOStuff\setNewPreparedValues
setNewPreparedValues(array $valueByKey)
Prepare values for storage and get their serialized sizes, or, estimate those sizes.
Definition: MemcachedPeclBagOStuff.php:278
MemcachedPeclBagOStuff\$syncClientIsBuffering
bool $syncClientIsBuffering
Whether the non-buffering client is locked from use.
Definition: MemcachedPeclBagOStuff.php:36