MediaWiki  master
MemcachedPeclBagOStuff.php
Go to the documentation of this file.
1 <?php
31  protected $syncClient;
33  protected $asyncClient;
34 
36  protected $syncClientIsBuffering = false;
38  protected $hasUnflushedChanges = false;
39 
41  private static $OPTS_SYNC_WRITES = [
42  Memcached::OPT_NO_BLOCK => false, // async I/O (using TCP buffers)
43  Memcached::OPT_BUFFER_WRITES => false // libmemcached buffers
44  ];
46  private static $OPTS_ASYNC_WRITES = [
47  Memcached::OPT_NO_BLOCK => true, // async I/O (using TCP buffers)
48  Memcached::OPT_BUFFER_WRITES => true // libmemcached buffers
49  ];
50 
67  function __construct( $params ) {
68  parent::__construct( $params );
69 
70  // Default class-specific parameters
71  $params += [
72  'compress_threshold' => 1500,
73  'connect_timeout' => 0.5,
74  'serializer' => 'php',
75  'use_binary_protocol' => false,
76  'allow_tcp_nagle_delay' => true
77  ];
78 
79  if ( $params['persistent'] ) {
80  // The pool ID must be unique to the server/option combination.
81  // The Memcached object is essentially shared for each pool ID.
82  // We can only reuse a pool ID if we keep the config consistent.
83  $connectionPoolId = md5( serialize( $params ) );
84  $syncClient = new Memcached( "$connectionPoolId-sync" );
85  // Avoid clobbering the main thread-shared Memcached instance
86  $asyncClient = new Memcached( "$connectionPoolId-async" );
87  } else {
88  $syncClient = new Memcached();
89  $asyncClient = null;
90  }
91 
92  $this->initializeClient( $syncClient, $params, self::$OPTS_SYNC_WRITES );
93  if ( $asyncClient ) {
94  $this->initializeClient( $asyncClient, $params, self::$OPTS_ASYNC_WRITES );
95  }
96 
97  // Set the main client and any dedicated one for buffered writes
98  $this->syncClient = $syncClient;
99  $this->asyncClient = $asyncClient;
100  // The compression threshold is an undocumented php.ini option for some
101  // reason. There's probably not much harm in setting it globally, for
102  // compatibility with the settings for the PHP client.
103  ini_set( 'memcached.compression_threshold', $params['compress_threshold'] );
104  }
105 
115  private function initializeClient( Memcached $client, array $params, array $options ) {
116  if ( $client->getServerList() ) {
117  $this->logger->debug( __METHOD__ . ": pre-initialized client instance." );
118 
119  return; // preserve persistent handle
120  }
121 
122  $this->logger->debug( __METHOD__ . ": initializing new client instance." );
123 
124  $options += [
125  Memcached::OPT_NO_BLOCK => false,
126  Memcached::OPT_BUFFER_WRITES => false,
127  // Network protocol (ASCII or binary)
128  Memcached::OPT_BINARY_PROTOCOL => $params['use_binary_protocol'],
129  // Set various network timeouts
130  Memcached::OPT_CONNECT_TIMEOUT => $params['connect_timeout'] * 1000,
131  Memcached::OPT_SEND_TIMEOUT => $params['timeout'],
132  Memcached::OPT_RECV_TIMEOUT => $params['timeout'],
133  Memcached::OPT_POLL_TIMEOUT => $params['timeout'] / 1000,
134  // Avoid pointless delay when sending/fetching large blobs
135  Memcached::OPT_TCP_NODELAY => !$params['allow_tcp_nagle_delay'],
136  // Set libketama mode since it's recommended by the documentation
137  Memcached::OPT_LIBKETAMA_COMPATIBLE => true
138  ];
139  if ( isset( $params['retry_timeout'] ) ) {
140  $options[Memcached::OPT_RETRY_TIMEOUT] = $params['retry_timeout'];
141  }
142  if ( isset( $params['server_failure_limit'] ) ) {
143  $options[Memcached::OPT_SERVER_FAILURE_LIMIT] = $params['server_failure_limit'];
144  }
145  if ( $params['serializer'] === 'php' ) {
146  $options[Memcached::OPT_SERIALIZER] = Memcached::SERIALIZER_PHP;
147  } elseif ( $params['serializer'] === 'igbinary' ) {
148  if ( !Memcached::HAVE_IGBINARY ) {
149  throw new RuntimeException(
150  __CLASS__ . ': the igbinary extension is not available ' .
151  'but igbinary serialization was requested.'
152  );
153  }
154  $options[Memcached::OPT_SERIALIZER] = Memcached::SERIALIZER_IGBINARY;
155  }
156 
157  if ( !$client->setOptions( $options ) ) {
158  throw new RuntimeException(
159  "Invalid options: " . json_encode( $options, JSON_PRETTY_PRINT )
160  );
161  }
162 
163  $servers = [];
164  foreach ( $params['servers'] as $host ) {
165  if ( preg_match( '/^\[(.+)\]:(\d+)$/', $host, $m ) ) {
166  $servers[] = [ $m[1], (int)$m[2] ]; // (ip, port)
167  } elseif ( preg_match( '/^([^:]+):(\d+)$/', $host, $m ) ) {
168  $servers[] = [ $m[1], (int)$m[2] ]; // (ip or path, port)
169  } else {
170  $servers[] = [ $host, false ]; // (ip or path, port)
171  }
172  }
173 
174  if ( !$client->addServers( $servers ) ) {
175  throw new RuntimeException( "Failed to inject server address list" );
176  }
177  }
178 
179  protected function doGet( $key, $flags = 0, &$casToken = null ) {
180  $this->debug( "get($key)" );
181 
182  $client = $this->acquireSyncClient();
183  if ( defined( Memcached::class . '::GET_EXTENDED' ) ) { // v3.0.0
185  $flags = Memcached::GET_EXTENDED;
186  $res = $client->get( $this->validateKeyEncoding( $key ), null, $flags );
187  if ( is_array( $res ) ) {
188  $result = $res['value'];
189  $casToken = $res['cas'];
190  } else {
191  $result = false;
192  $casToken = null;
193  }
194  } else {
195  $result = $client->get( $this->validateKeyEncoding( $key ), null, $casToken );
196  }
197 
198  return $this->checkResult( $key, $result );
199  }
200 
201  protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
202  $this->debug( "set($key)" );
203 
204  $client = $this->acquireSyncClient();
205  $result = $client->set(
206  $this->validateKeyEncoding( $key ),
207  $value,
208  $this->fixExpiry( $exptime )
209  );
210 
211  return ( $result === false && $client->getResultCode() === Memcached::RES_NOTSTORED )
212  // "Not stored" is always used as the mcrouter response with AllAsyncRoute
213  ? true
214  : $this->checkResult( $key, $result );
215  }
216 
217  protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
218  $this->debug( "cas($key)" );
219 
220  $result = $this->acquireSyncClient()->cas(
221  $casToken,
222  $this->validateKeyEncoding( $key ),
223  $value, $this->fixExpiry( $exptime )
224  );
225 
226  return $this->checkResult( $key, $result );
227  }
228 
229  protected function doDelete( $key, $flags = 0 ) {
230  $this->debug( "delete($key)" );
231 
232  $client = $this->acquireSyncClient();
233  $result = $client->delete( $this->validateKeyEncoding( $key ) );
234 
235  return ( $result === false && $client->getResultCode() === Memcached::RES_NOTFOUND )
236  // "Not found" is counted as success in our interface
237  ? true
238  : $this->checkResult( $key, $result );
239  }
240 
241  protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
242  $this->debug( "add($key)" );
243 
244  $result = $this->acquireSyncClient()->add(
245  $this->validateKeyEncoding( $key ),
246  $value,
247  $this->fixExpiry( $exptime )
248  );
249 
250  return $this->checkResult( $key, $result );
251  }
252 
253  public function incr( $key, $value = 1, $flags = 0 ) {
254  $this->debug( "incr($key)" );
255 
256  $result = $this->acquireSyncClient()->increment( $key, $value );
257 
258  return $this->checkResult( $key, $result );
259  }
260 
261  public function decr( $key, $value = 1, $flags = 0 ) {
262  $this->debug( "decr($key)" );
263 
264  $result = $this->acquireSyncClient()->decrement( $key, $value );
265 
266  return $this->checkResult( $key, $result );
267  }
268 
280  protected function checkResult( $key, $result ) {
281  if ( $result !== false ) {
282  return $result;
283  }
284 
285  $client = $this->syncClient;
286  switch ( $client->getResultCode() ) {
287  case Memcached::RES_SUCCESS:
288  break;
289  case Memcached::RES_DATA_EXISTS:
290  case Memcached::RES_NOTSTORED:
291  case Memcached::RES_NOTFOUND:
292  $this->debug( "result: " . $client->getResultMessage() );
293  break;
294  default:
295  $msg = $client->getResultMessage();
296  $logCtx = [];
297  if ( $key !== false ) {
298  $server = $client->getServerByKey( $key );
299  $logCtx['memcached-server'] = "{$server['host']}:{$server['port']}";
300  $logCtx['memcached-key'] = $key;
301  $msg = "Memcached error for key \"{memcached-key}\" " .
302  "on server \"{memcached-server}\": $msg";
303  } else {
304  $msg = "Memcached error: $msg";
305  }
306  $this->logger->error( $msg, $logCtx );
308  }
309  return $result;
310  }
311 
312  protected function doGetMulti( array $keys, $flags = 0 ) {
313  $this->debug( 'getMulti(' . implode( ', ', $keys ) . ')' );
314 
315  foreach ( $keys as $key ) {
316  $this->validateKeyEncoding( $key );
317  }
318 
319  // The PECL implementation uses "gets" which works as well as a pipeline
320  $result = $this->acquireSyncClient()->getMulti( $keys ) ?: [];
321 
322  return $this->checkResult( false, $result );
323  }
324 
325  protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
326  $this->debug( 'setMulti(' . implode( ', ', array_keys( $data ) ) . ')' );
327 
328  $exptime = $this->fixExpiry( $exptime );
329  foreach ( array_keys( $data ) as $key ) {
330  $this->validateKeyEncoding( $key );
331  }
332 
333  // The PECL implementation is a naïve for-loop so use async I/O to pipeline;
334  // https://github.com/php-memcached-dev/php-memcached/blob/master/php_memcached.c#L1852
335  if ( $this->fieldHasFlags( $flags, self::WRITE_BACKGROUND ) ) {
336  $client = $this->acquireAsyncClient();
337  $result = $client->setMulti( $data, $exptime );
338  $this->releaseAsyncClient( $client );
339  } else {
340  $result = $this->acquireSyncClient()->setMulti( $data, $exptime );
341  }
342 
343  return $this->checkResult( false, $result );
344  }
345 
346  protected function doDeleteMulti( array $keys, $flags = 0 ) {
347  $this->debug( 'deleteMulti(' . implode( ', ', $keys ) . ')' );
348 
349  foreach ( $keys as $key ) {
350  $this->validateKeyEncoding( $key );
351  }
352 
353  // The PECL implementation is a naïve for-loop so use async I/O to pipeline;
354  // https://github.com/php-memcached-dev/php-memcached/blob/7443d16d02fb73cdba2e90ae282446f80969229c/php_memcached.c#L1852
355  if ( $this->fieldHasFlags( $flags, self::WRITE_BACKGROUND ) ) {
356  $client = $this->acquireAsyncClient();
357  $resultArray = $client->deleteMulti( $keys ) ?: [];
358  $this->releaseAsyncClient( $client );
359  } else {
360  $resultArray = $this->acquireSyncClient()->deleteMulti( $keys ) ?: [];
361  }
362 
363  $result = true;
364  foreach ( $resultArray as $code ) {
365  if ( !in_array( $code, [ true, Memcached::RES_NOTFOUND ], true ) ) {
366  // "Not found" is counted as success in our interface
367  $result = false;
368  }
369  }
370 
371  return $this->checkResult( false, $result );
372  }
373 
374  protected function doChangeTTL( $key, $exptime, $flags ) {
375  $this->debug( "touch($key)" );
376 
377  $result = $this->acquireSyncClient()->touch( $key, $this->fixExpiry( $exptime ) );
378 
379  return $this->checkResult( $key, $result );
380  }
381 
382  protected function serialize( $value ) {
383  if ( is_int( $value ) ) {
384  return $value;
385  }
386 
387  $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER );
388  if ( $serializer === Memcached::SERIALIZER_PHP ) {
389  return serialize( $value );
390  } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
391  return igbinary_serialize( $value );
392  }
393 
394  throw new UnexpectedValueException( __METHOD__ . ": got serializer '$serializer'." );
395  }
396 
397  protected function unserialize( $value ) {
398  if ( $this->isInteger( $value ) ) {
399  return (int)$value;
400  }
401 
402  $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER );
403  if ( $serializer === Memcached::SERIALIZER_PHP ) {
404  return unserialize( $value );
405  } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
406  return igbinary_unserialize( $value );
407  }
408 
409  throw new UnexpectedValueException( __METHOD__ . ": got serializer '$serializer'." );
410  }
411 
415  private function acquireSyncClient() {
416  if ( $this->syncClientIsBuffering ) {
417  throw new RuntimeException( "The main (unbuffered I/O) client is locked" );
418  }
419 
420  if ( $this->hasUnflushedChanges ) {
421  // Force a synchronous flush of async writes so that their changes are visible
422  $this->syncClient->fetch();
423  if ( $this->asyncClient ) {
424  $this->asyncClient->fetch();
425  }
426  $this->hasUnflushedChanges = false;
427  }
428 
429  return $this->syncClient;
430  }
431 
435  private function acquireAsyncClient() {
436  if ( $this->asyncClient ) {
437  return $this->asyncClient; // dedicated buffering instance
438  }
439 
440  // Modify the main instance to temporarily buffer writes
441  $this->syncClientIsBuffering = true;
442  $this->syncClient->setOptions( self::$OPTS_ASYNC_WRITES );
443 
444  return $this->syncClient;
445  }
446 
450  private function releaseAsyncClient( $client ) {
451  $this->hasUnflushedChanges = true;
452 
453  if ( !$this->asyncClient ) {
454  // This is the main instance; make it stop buffering writes again
455  $client->setOptions( self::$OPTS_SYNC_WRITES );
456  $this->syncClientIsBuffering = false;
457  }
458  }
459 }
checkResult( $key, $result)
Check the return value from a client method call and take any necessary action.
decr( $key, $value=1, $flags=0)
A wrapper class for the PECL memcached client.
doCas( $casToken, $key, $value, $exptime=0, $flags=0)
bool $syncClientIsBuffering
Whether the non-buffering client is locked from use.
validateKeyEncoding( $key)
Ensure that a key is safe to use (contains no control characters and no characters above the ASCII ra...
initializeClient(Memcached $client, array $params, array $options)
Initialize the client only if needed and reuse it otherwise.
doSet( $key, $value, $exptime=0, $flags=0)
bool $hasUnflushedChanges
Whether the non-buffering client should be flushed before use.
doDeleteMulti(array $keys, $flags=0)
static array $OPTS_ASYNC_WRITES
Memcached options.
static array $OPTS_SYNC_WRITES
Memcached options.
Base class for memcached clients.
doGetMulti(array $keys, $flags=0)
setLastError( $err)
Set the "last error" registry.
doSetMulti(array $data, $exptime=0, $flags=0)
incr( $key, $value=1, $flags=0)
isInteger( $value)
Check if a value is an integer.
__construct( $params)
Available parameters are:
doAdd( $key, $value, $exptime=0, $flags=0)
return true
Definition: router.php:92
doGet( $key, $flags=0, &$casToken=null)
fieldHasFlags( $field, $flags)
Definition: BagOStuff.php:493
doChangeTTL( $key, $exptime, $flags)