69use Psr\Log\LoggerInterface;
70use Psr\Log\NullLogger;
71use Wikimedia\AtEase\AtEase;
267 $this->_debug = $args[
'debug'] ??
false;
268 $this->stats = array();
269 $this->_compress_threshold = $args[
'compress_threshold'] ?? 0;
270 $this->_persistent = $args[
'persistent'] ??
false;
271 $this->_compress_enable =
true;
272 $this->_have_zlib = function_exists(
'gzcompress' );
274 $this->_cache_sock = array();
275 $this->_host_dead = array();
277 $this->_timeout_seconds = 0;
278 $this->_timeout_microseconds = $args[
'timeout'] ?? 500_000;
280 $this->_connect_timeout = $args[
'connect_timeout'] ?? 0.1;
281 $this->_connect_attempts = 2;
283 $this->_logger = $args[
'logger'] ??
new NullLogger();
320 public function add( $key, $val, $exp = 0 ) {
321 $this->_last_cmd_status = BagOStuff::ERR_NONE;
323 return $this->
_set(
'add', $key, $val, $exp );
337 public function decr( $key, $amt = 1 ) {
338 $this->_last_cmd_status = BagOStuff::ERR_NONE;
340 return $this->
_incrdecr(
'decr', $key, $amt );
354 public function delete( $key, $time = 0 ) {
355 $this->_last_cmd_status = BagOStuff::ERR_NONE;
357 if ( !$this->_active ) {
366 $key = is_array( $key ) ? $key[1] : $key;
368 if ( isset( $this->stats[
'delete'] ) ) {
369 $this->stats[
'delete']++;
371 $this->stats[
'delete'] = 1;
373 $cmd =
"delete $key $time\r\n";
374 if ( !$this->
_fwrite( $sock, $cmd ) ) {
377 $res = $this->
_fgets( $sock );
379 if ( $this->_debug ) {
380 $this->
_debugprint( sprintf(
"MemCache: delete %s (%s)", $key, $res ) );
383 if ( $res ==
"DELETED" || $res ==
"NOT_FOUND" ) {
387 $this->_last_cmd_status = BagOStuff::ERR_UNEXPECTED;
400 public function touch( $key, $time = 0 ) {
401 $this->_last_cmd_status = BagOStuff::ERR_NONE;
403 if ( !$this->_active ) {
412 $key = is_array( $key ) ? $key[1] : $key;
414 if ( isset( $this->stats[
'touch'] ) ) {
415 $this->stats[
'touch']++;
417 $this->stats[
'touch'] = 1;
419 $cmd =
"touch $key $time\r\n";
420 if ( !$this->
_fwrite( $sock, $cmd ) ) {
423 $res = $this->
_fgets( $sock );
425 if ( $this->_debug ) {
426 $this->
_debugprint( sprintf(
"MemCache: touch %s (%s)", $key, $res ) );
429 if ( $res ==
"TOUCHED" ) {
443 foreach ( $this->_cache_sock as $sock ) {
447 $this->_cache_sock = array();
459 $this->_compress_enable = $enable;
469 $this->_host_dead = array();
483 public function get( $key, &$casToken = null ) {
484 $getToken = ( func_num_args() >= 2 );
486 $this->_last_cmd_status = BagOStuff::ERR_NONE;
488 if ( $this->_debug ) {
492 if ( !is_array( $key ) && strval( $key ) ===
'' ) {
493 $this->_last_cmd_status = BagOStuff::ERR_UNEXPECTED;
494 $this->
_debugprint(
"Skipping key which equals to an empty string" );
498 if ( !$this->_active ) {
499 $this->_last_cmd_status = BagOStuff::ERR_UNEXPECTED;
507 $this->_last_cmd_status = BagOStuff::ERR_UNREACHABLE;
512 $key = is_array( $key ) ? $key[1] : $key;
513 if ( isset( $this->stats[
'get'] ) ) {
514 $this->stats[
'get']++;
516 $this->stats[
'get'] = 1;
519 $cmd = $getToken ?
"gets" :
"get";
521 if ( !$this->
_fwrite( $sock, $cmd ) ) {
522 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
528 if ( !$this->
_load_items( $sock, $val, $casToken ) ) {
529 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
532 if ( $this->_debug ) {
533 foreach ( $val as $k => $v ) {
535 sprintf(
"MemCache: sock %s got %s", $this->
serialize( $sock ), $k ) );
540 if ( isset( $val[$key] ) ) {
557 $this->_last_cmd_status = BagOStuff::ERR_NONE;
559 if ( !$this->_active ) {
560 $this->_last_cmd_status = BagOStuff::ERR_UNEXPECTED;
565 if ( isset( $this->stats[
'get_multi'] ) ) {
566 $this->stats[
'get_multi']++;
568 $this->stats[
'get_multi'] = 1;
570 $sock_keys = array();
572 foreach ( $keys as $key ) {
575 $this->_last_cmd_status = BagOStuff::ERR_UNREACHABLE;
578 $key = is_array( $key ) ? $key[1] : $key;
579 $sockValue = intval( $sock );
581 if ( !isset( $sock_keys[$sockValue] ) ) {
582 $sock_keys[$sockValue] = array();
585 $sock_keys[$sockValue][] = $key;
590 foreach ( $socks as $sock ) {
592 foreach ( $sock_keys[intval( $sock )] as $key ) {
597 if ( $this->
_fwrite( $sock, $cmd ) ) {
600 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
606 foreach ( $gather as $sock ) {
608 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
612 if ( $this->_debug ) {
613 foreach ( $val as $k => $v ) {
614 $this->
_debugprint( sprintf(
"MemCache: got %s", $k ) );
634 public function incr( $key, $amt = 1 ) {
635 return $this->
_incrdecr(
'incr', $key, $amt );
654 public function replace( $key, $value, $exp = 0 ) {
655 return $this->
_set(
'replace', $key, $value, $exp );
675 if ( !$this->
_fwrite( $sock, $cmd ) ) {
681 $res = $this->
_fgets( $sock );
683 if ( preg_match(
'/^END/', $res ) ) {
686 if ( strlen( $res ) == 0 ) {
710 public function set( $key, $value, $exp = 0 ) {
711 return $this->
_set(
'set', $key, $value, $exp );
732 public function cas( $casToken, $key, $value, $exp = 0 ) {
733 return $this->
_set(
'cas', $key, $value, $exp, $casToken );
745 $this->_compress_threshold = $thresh;
758 $this->_debug = $dbg;
771 $this->_servers = $list;
772 $this->_active = count( $list );
773 $this->_buckets =
null;
774 $this->_bucketcount = 0;
776 $this->_single_sock =
null;
777 if ( $this->_active == 1 ) {
778 $this->_single_sock = $this->_servers[0];
789 $this->_timeout_seconds = $seconds;
790 $this->_timeout_microseconds = $microseconds;
806 $host = array_search( $sock, $this->_cache_sock );
807 fclose( $this->_cache_sock[$host] );
808 unset( $this->_cache_sock[$host] );
825 $hostAndPort = IPUtils::splitHostAndPort( $host );
826 if ( $hostAndPort ) {
827 $ip = $hostAndPort[0];
828 if ( $hostAndPort[1] ) {
829 $port = $hostAndPort[1];
835 $timeout = $this->_connect_timeout;
836 $errno = $errstr =
null;
837 for ( $i = 0; !$sock && $i < $this->_connect_attempts; $i++ ) {
838 AtEase::suppressWarnings();
839 if ( $this->_persistent == 1 ) {
840 $sock = pfsockopen( $ip, $port, $errno, $errstr, $timeout );
842 $sock = fsockopen( $ip, $port, $errno, $errstr, $timeout );
844 AtEase::restoreWarnings();
847 $this->
_error_log(
"Error connecting to $host: $errstr" );
853 stream_set_timeout( $sock, $this->_timeout_seconds, $this->_timeout_microseconds );
857 if ( $this->_persistent ) {
874 $host = array_search( $sock, $this->_cache_sock );
882 $hostAndPort = IPUtils::splitHostAndPort( $host );
883 if ( $hostAndPort ) {
884 $ip = $hostAndPort[0];
888 $this->_host_dead[$ip] = time() + 30 + intval( rand( 0, 10 ) );
889 $this->_host_dead[$host] = $this->_host_dead[$ip];
890 unset( $this->_cache_sock[$host] );
905 if ( !$this->_active ) {
909 if ( $this->_single_sock !==
null ) {
913 $hv = is_array( $key ) ? intval( $key[0] ) : $this->
_hashfunc( $key );
914 if ( $this->_buckets ===
null ) {
916 foreach ( $this->_servers as $v ) {
917 if ( is_array( $v ) ) {
918 for ( $i = 0; $i < $v[1]; $i++ ) {
925 $this->_buckets = $bu;
926 $this->_bucketcount = count( $bu );
929 $realkey = is_array( $key ) ? $key[1] : $key;
930 for ( $tries = 0; $tries < 20; $tries++ ) {
931 $host = $this->_buckets[$hv % $this->_bucketcount];
936 $hv = $this->
_hashfunc( $hv . $realkey );
954 # Hash function must be in [0,0x7ffffff]
955 # We take the first 31 bits of the MD5 hash, which unlike the hash
956 # function used in a previous version of this client, works
957 return hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
974 $this->_last_cmd_status = BagOStuff::ERR_NONE;
976 if ( !$this->_active ) {
977 $this->_last_cmd_status = BagOStuff::ERR_UNEXPECTED;
984 $this->_last_cmd_status = BagOStuff::ERR_UNREACHABLE;
989 $key = is_array( $key ) ? $key[1] : $key;
990 if ( isset( $this->stats[$cmd] ) ) {
991 $this->stats[$cmd]++;
993 $this->stats[$cmd] = 1;
995 if ( !$this->
_fwrite( $sock,
"$cmd $key $amt\r\n" ) ) {
996 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
1001 $line = $this->
_fgets( $sock );
1002 if ( $this->_debug ) {
1007 if ( !preg_match(
'/^(\d+)/', $line, $match ) ) {
1008 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
1013 return (
int)$match[1];
1033 $decl = $this->
_fgets( $sock );
1035 if ( $decl ===
false ) {
1041 } elseif ( preg_match(
'/^VALUE (\S+) (\d+) (\d+)(?: (\d+))?$/', $decl, $match ) ) {
1054 $this->
_fread( $sock, $match[3] + 2 ),
1056 } elseif ( $decl ==
"END" ) {
1061 foreach ( $results as [ $rkey, $flags, , $casToken, $data ] ) {
1062 if ( $data ===
false || substr( $data, -2 ) !==
"\r\n" ) {
1064 'line ending missing from data block from $1' );
1067 $data = substr( $data, 0, -2 );
1068 $ret[$rkey] = $data;
1070 if ( $this->_have_zlib && $flags & self::COMPRESSED ) {
1071 $ret[$rkey] = gzuncompress( $ret[$rkey] );
1083 if ( $flags & self::SERIALIZED ) {
1085 } elseif ( $flags & self::INTVAL ) {
1086 $ret[$rkey] = intval( $ret[$rkey] );
1092 $this->
_handle_error( $sock,
'Error parsing response from $1' );
1117 function _set( $cmd, $key, $val, $exp, $casToken =
null ) {
1118 $this->_last_cmd_status = BagOStuff::ERR_NONE;
1120 if ( !$this->_active ) {
1121 $this->_last_cmd_status = BagOStuff::ERR_UNEXPECTED;
1128 $this->_last_cmd_status = BagOStuff::ERR_UNREACHABLE;
1133 if ( isset( $this->stats[$cmd] ) ) {
1134 $this->stats[$cmd]++;
1136 $this->stats[$cmd] = 1;
1141 if ( is_int( $val ) ) {
1142 $flags |= self::INTVAL;
1143 } elseif ( !is_scalar( $val ) ) {
1145 $flags |= self::SERIALIZED;
1146 if ( $this->_debug ) {
1147 $this->
_debugprint(
"client: serializing data as it is not scalar" );
1151 $len = strlen( $val );
1153 if ( $this->_have_zlib && $this->_compress_enable
1154 && $this->_compress_threshold && $len >= $this->_compress_threshold
1156 $c_val = gzcompress( $val, 9 );
1157 $c_len = strlen( $c_val );
1159 if ( $c_len < $len * ( 1 - self::COMPRESSION_SAVINGS ) ) {
1160 if ( $this->_debug ) {
1161 $this->
_debugprint( sprintf(
"client: compressing data; was %d bytes is now %d bytes", $len, $c_len ) );
1165 $flags |= self::COMPRESSED;
1169 $command =
"$cmd $key $flags $exp $len";
1171 $command .=
" $casToken";
1174 if ( !$this->
_fwrite( $sock,
"$command\r\n$val\r\n" ) ) {
1175 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
1180 $line = $this->
_fgets( $sock );
1181 if ( $this->_debug ) {
1182 $this->
_debugprint( sprintf(
"%s %s (%s)", $cmd, $key, $line ) );
1185 if ( $line ===
"STORED" ) {
1187 } elseif ( $line ===
"NOT_STORED" && $cmd ===
"set" ) {
1192 if ( $line ===
false ) {
1193 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
1211 if ( isset( $this->_cache_sock[$host] ) ) {
1212 return $this->_cache_sock[$host];
1217 $hostAndPort = IPUtils::splitHostAndPort( $host );
1218 if ( $hostAndPort ) {
1219 $ip = $hostAndPort[0];
1223 if ( isset( $this->_host_dead[$host] ) && $this->_host_dead[$host] > $now ||
1224 isset( $this->_host_dead[$ip] ) && $this->_host_dead[$ip] > $now
1234 stream_set_write_buffer( $sock, 0 );
1236 $this->_cache_sock[$host] = $sock;
1238 return $this->_cache_sock[$host];
1245 $this->_logger->debug( $text );
1252 $this->_logger->error(
"Memcached error: $text" );
1264 $bufSize = strlen( $buf );
1265 while ( $bytesWritten < $bufSize ) {
1266 $result = fwrite( $sock, $buf );
1267 $data = stream_get_meta_data( $sock );
1268 if ( $data[
'timed_out'] ) {
1273 if ( $result ===
false || $result === 0 ) {
1277 $bytesWritten += $result;
1290 $peer = stream_socket_get_name( $sock,
true );
1291 if ( strval( $peer ) ===
'' ) {
1292 $peer = array_search( $sock, $this->_cache_sock );
1293 if ( $peer ===
false ) {
1294 $peer =
'[unknown host]';
1297 $msg = str_replace(
'$1', $peer, $msg );
1312 while ( $len > 0 ) {
1313 $result = fread( $sock, $len );
1314 $data = stream_get_meta_data( $sock );
1315 if ( $data[
'timed_out'] ) {
1319 if ( $result ===
false ) {
1320 $this->
_handle_error( $sock,
'error reading buffer from $1' );
1323 if ( $result ===
'' ) {
1325 $this->
_handle_error( $sock,
'unexpected end of file reading from $1' );
1328 $len -= strlen( $result );
1342 $result = fgets( $sock );
1346 $data = stream_get_meta_data( $sock );
1347 if ( $data[
'timed_out'] ) {
1348 $this->
_handle_error( $sock,
'timeout reading line from $1' );
1351 if ( $result ===
false ) {
1352 $this->
_handle_error( $sock,
'error reading line from $1' );
1355 if ( substr( $result, -2 ) ===
"\r\n" ) {
1356 $result = substr( $result, 0, -2 );
1357 } elseif ( substr( $result, -1 ) ===
"\n" ) {
1358 $result = substr( $result, 0, -1 );
1360 $this->
_handle_error( $sock,
'line ending missing in response from $1' );
1377 $n = stream_select( $r, $w, $e, 0, 0 );
1378 while ( $n == 1 && !feof( $f ) ) {
1383 $n = stream_select( $r, $w, $e, 0, 0 );
memcached client class implemented using (p)fsockopen()
set_debug( $dbg)
Set the debug flag.
array $_cache_sock
Cached Sockets that are connected.
const SERIALIZED
Flag: indicates data is serialized.
const COMPRESSION_SAVINGS
Minimum savings to store data compressed.
_hashfunc( $key)
Creates a hash integer based on the $key.
set_compress_threshold( $thresh)
Set the compression threshold.
_fread( $sock, $len)
Read the specified number of bytes from a stream.
array $_servers
Array containing ip:port or array(ip:port, weight)
bool $_compress_enable
Do we want to use compression?
decr( $key, $amt=1)
Decrease a value stored on the memcache server.
const INTVAL
Flag: indicates data is an integer.
int $_compress_threshold
At how many bytes should we compress?
int $_timeout_seconds
Stream timeout in seconds.
disconnect_all()
Disconnects all connected sockets.
cas( $casToken, $key, $value, $exp=0)
Sets a key to a given value in the memcache if the current value still corresponds to a known,...
array $_buckets
Our bit buckets.
set_timeout( $seconds, $microseconds)
Sets the timeout for new connections.
_fgets( $sock)
Read a line from a stream.
incr( $key, $amt=1)
Increments $key (optionally) by $amt.
forget_dead_hosts()
Forget about all of the dead hosts.
_incrdecr( $cmd, $key, $amt=1)
Perform increment/decrement on $key.
$_connect_attempts
Number of connection attempts for each server.
add( $key, $val, $exp=0)
Adds a key/value to the memcache server if one isn't already set with that key.
_handle_error( $sock, $msg)
Handle an I/O error.
sock_to_host( $host)
Returns the socket for the host.
int $_last_cmd_status
BagOStuff:ERR_* constant of the last cache command.
$_connect_timeout
Connect timeout in seconds.
touch( $key, $time=0)
Changes the TTL on a key from the server to $time.
bool $_have_zlib
Is compression available?
int $_bucketcount
Total # of bit buckets we have.
_fwrite( $sock, $buf)
Write to a stream.
bool $_debug
Current debug status; 0 - none to 9 - profiling.
replace( $key, $value, $exp=0)
Overwrites an existing value for key; only works if key is already set.
set_servers( $list)
Set the server list to distribute key gets and puts between.
_load_items( $sock, &$ret, &$casToken=null)
Load items into $ret from $sock.
string $_single_sock
If only using one server; contains ip:port to connect to.
get_multi( $keys)
Get multiple keys from the server(s)
_close_sock( $sock)
Close the specified socket.
run_command( $sock, $cmd)
Passes through $cmd to the memcache server connected by $sock; returns output as an array (null array...
_set( $cmd, $key, $val, $exp, $casToken=null)
Performs the requested storage operation to the memcache server.
int $_timeout_microseconds
Stream timeout in microseconds.
__construct( $args)
Memcache initializer.
_flush_read_buffer( $f)
Flush the read buffer of a stream.
enable_compress( $enable)
Enable / Disable compression.
array $_host_dead
Dead hosts, assoc array, 'host'=>'unixtime when ok to check again'.
const COMPRESSED
Flag: indicates data is compressed.
_dead_sock( $sock)
Marks a host as dead until 30-40 seconds in the future.
_connect_sock(&$sock, $host)
Connects $sock to $host, timing out after $timeout.
array $stats
Command statistics.
bool $_persistent
Are we using persistent links?