69 use Psr\Log\LoggerInterface;
70 use Psr\Log\NullLogger;
263 $this->_debug =
$args[
'debug'] ??
false;
264 $this->stats =
array();
265 $this->_compress_threshold =
$args[
'compress_threshold'] ?? 0;
266 $this->_persistent =
$args[
'persistent'] ??
false;
267 $this->_compress_enable =
true;
268 $this->_have_zlib = function_exists(
'gzcompress' );
270 $this->_cache_sock =
array();
271 $this->_host_dead =
array();
273 $this->_timeout_seconds = 0;
274 $this->_timeout_microseconds =
$args[
'timeout'] ?? 500000;
276 $this->_connect_timeout =
$args[
'connect_timeout'] ?? 0.1;
277 $this->_connect_attempts = 2;
279 $this->_logger =
$args[
'logger'] ??
new NullLogger();
299 public function add( $key, $val, $exp = 0 ) {
300 return $this->
_set(
'add', $key, $val, $exp );
314 public function decr( $key, $amt = 1 ) {
315 return $this->
_incrdecr(
'decr', $key, $amt );
329 public function delete( $key,
$time = 0 ) {
330 if ( !$this->_active ) {
335 if ( !is_resource( $sock ) ) {
339 $key = is_array( $key ) ? $key[1] : $key;
341 if ( isset( $this->stats[
'delete'] ) ) {
342 $this->stats[
'delete']++;
344 $this->stats[
'delete'] = 1;
346 $cmd =
"delete $key $time\r\n";
347 if ( !$this->
_fwrite( $sock, $cmd ) ) {
352 if ( $this->_debug ) {
353 $this->
_debugprint( sprintf(
"MemCache: delete %s (%s)", $key,
$res ) );
356 if (
$res ==
"DELETED" ||
$res ==
"NOT_FOUND" ) {
372 if ( !$this->_active ) {
377 if ( !is_resource( $sock ) ) {
381 $key = is_array( $key ) ? $key[1] : $key;
383 if ( isset( $this->stats[
'touch'] ) ) {
384 $this->stats[
'touch']++;
386 $this->stats[
'touch'] = 1;
388 $cmd =
"touch $key $time\r\n";
389 if ( !$this->
_fwrite( $sock, $cmd ) ) {
394 if ( $this->_debug ) {
395 $this->
_debugprint( sprintf(
"MemCache: touch %s (%s)", $key,
$res ) );
398 if (
$res ==
"TOUCHED" ) {
410 public function lock( $key, $timeout = 0 ) {
431 foreach ( $this->_cache_sock
as $sock ) {
435 $this->_cache_sock =
array();
447 $this->_compress_enable = $enable;
457 $this->_host_dead =
array();
471 public function get( $key, &$casToken = null ) {
472 if ( $this->_debug ) {
476 if ( !is_array( $key ) && strval( $key ) ===
'' ) {
477 $this->
_debugprint(
"Skipping key which equals to an empty string" );
481 if ( !$this->_active ) {
487 if ( !is_resource( $sock ) ) {
491 $key = is_array( $key ) ? $key[1] : $key;
492 if ( isset( $this->stats[
'get'] ) ) {
493 $this->stats[
'get']++;
495 $this->stats[
'get'] = 1;
498 $cmd =
"gets $key\r\n";
499 if ( !$this->
_fwrite( $sock, $cmd ) ) {
506 if ( $this->_debug ) {
507 foreach ( $val
as $k => $v ) {
513 if ( isset( $val[$key] ) ) {
530 if ( !$this->_active ) {
534 if ( isset( $this->stats[
'get_multi'] ) ) {
535 $this->stats[
'get_multi']++;
537 $this->stats[
'get_multi'] = 1;
539 $sock_keys =
array();
543 if ( !is_resource( $sock ) ) {
546 $key = is_array( $key ) ? $key[1] : $key;
547 if ( !isset( $sock_keys[$sock] ) ) {
548 $sock_keys[intval( $sock )] =
array();
551 $sock_keys[intval( $sock )][] = $key;
556 foreach ( $socks
as $sock ) {
558 foreach ( $sock_keys[intval( $sock )]
as $key ) {
563 if ( $this->
_fwrite( $sock, $cmd ) ) {
570 foreach ( $gather
as $sock ) {
574 if ( $this->_debug ) {
575 foreach ( $val
as $k => $v ) {
576 $this->
_debugprint( sprintf(
"MemCache: got %s", $k ) );
596 public function incr( $key, $amt = 1 ) {
597 return $this->
_incrdecr(
'incr', $key, $amt );
617 return $this->
_set(
'replace', $key,
$value, $exp );
633 if ( !is_resource( $sock ) ) {
637 if ( !$this->
_fwrite( $sock, $cmd ) ) {
645 if ( preg_match(
'/^END/',
$res ) ) {
648 if ( strlen(
$res ) == 0 ) {
672 public function set( $key,
$value, $exp = 0 ) {
673 return $this->
_set(
'set', $key,
$value, $exp );
694 public function cas( $casToken, $key,
$value, $exp = 0 ) {
695 return $this->
_set(
'cas', $key,
$value, $exp, $casToken );
707 $this->_compress_threshold = $thresh;
720 $this->_debug = $dbg;
733 $this->_servers = $list;
734 $this->_active =
count( $list );
735 $this->_buckets =
null;
736 $this->_bucketcount = 0;
738 $this->_single_sock =
null;
739 if ( $this->_active == 1 ) {
740 $this->_single_sock = $this->_servers[0];
751 $this->_timeout_seconds = $seconds;
752 $this->_timeout_microseconds = $microseconds;
768 $host = array_search( $sock, $this->_cache_sock );
769 fclose( $this->_cache_sock[$host] );
770 unset( $this->_cache_sock[$host] );
786 list( $ip, $port ) = preg_split(
'/:(?=\d)/', $host );
789 $errno = $errstr =
null;
791 Wikimedia\suppressWarnings();
792 if ( $this->_persistent == 1 ) {
793 $sock = pfsockopen( $ip, $port, $errno, $errstr, $timeout );
795 $sock = fsockopen( $ip, $port, $errno, $errstr, $timeout );
797 Wikimedia\restoreWarnings();
800 $this->
_error_log(
"Error connecting to $host: $errstr" );
806 stream_set_timeout( $sock, $this->_timeout_seconds, $this->_timeout_microseconds );
810 if ( $this->_persistent ) {
827 $host = array_search( $sock, $this->_cache_sock );
835 $ip = explode(
':', $host )[0];
836 $this->_host_dead[$ip] = time() + 30 + intval( rand( 0, 10 ) );
837 $this->_host_dead[$host] = $this->_host_dead[$ip];
838 unset( $this->_cache_sock[$host] );
853 if ( !$this->_active ) {
857 if ( $this->_single_sock !==
null ) {
861 $hv = is_array( $key ) ? intval( $key[0] ) : $this->
_hashfunc( $key );
862 if ( $this->_buckets ===
null ) {
864 foreach ( $this->_servers
as $v ) {
865 if ( is_array( $v ) ) {
866 for ( $i = 0; $i < $v[1]; $i++ ) {
873 $this->_buckets = $bu;
874 $this->_bucketcount =
count( $bu );
877 $realkey = is_array( $key ) ? $key[1] : $key;
878 for ( $tries = 0; $tries < 20; $tries++ ) {
881 if ( is_resource( $sock ) ) {
884 $hv = $this->
_hashfunc( $hv . $realkey );
902 # Hash function must be in [0,0x7ffffff]
903 # We take the first 31 bits of the MD5 hash, which unlike the hash
904 # function used in a previous version of this client, works
905 return hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
922 if ( !$this->_active ) {
927 if ( !is_resource( $sock ) ) {
931 $key = is_array( $key ) ? $key[1] : $key;
932 if ( isset( $this->stats[$cmd] ) ) {
933 $this->stats[$cmd]++;
935 $this->stats[$cmd] = 1;
937 if ( !$this->
_fwrite( $sock,
"$cmd $key $amt\r\n" ) ) {
943 if ( !preg_match(
'/^(\d+)/',
$line, $match ) ) {
966 $decl = $this->
_fgets( $sock );
968 if ( $decl ===
false ) {
974 } elseif ( preg_match(
'/^VALUE (\S+) (\d+) (\d+) (\d+)$/', $decl, $match ) ) {
987 $this->
_fread( $sock, $match[3] + 2 ),
989 } elseif ( $decl ==
"END" ) {
990 if (
count( $results ) == 0 ) {
998 foreach ( $results
as $vars ) {
999 list( $rkey, $flags, $len, $casToken, $data ) =
$vars;
1001 if ( $data ===
false || substr( $data, -2 ) !==
"\r\n" ) {
1003 'line ending missing from data block from $1' );
1006 $data = substr( $data, 0, -2 );
1007 $ret[$rkey] = $data;
1009 if ( $this->_have_zlib && $flags & self::COMPRESSED ) {
1010 $ret[$rkey] = gzuncompress(
$ret[$rkey] );
1022 if ( $flags & self::SERIALIZED ) {
1024 } elseif ( $flags & self::INTVAL ) {
1025 $ret[$rkey] = intval(
$ret[$rkey] );
1031 $this->
_handle_error( $sock,
'Error parsing response from $1' );
1056 function _set( $cmd, $key, $val, $exp, $casToken =
null ) {
1057 if ( !$this->_active ) {
1062 if ( !is_resource( $sock ) ) {
1066 if ( isset( $this->stats[$cmd] ) ) {
1067 $this->stats[$cmd]++;
1069 $this->stats[$cmd] = 1;
1074 if ( is_int( $val ) ) {
1076 } elseif ( !is_scalar( $val ) ) {
1079 if ( $this->_debug ) {
1080 $this->
_debugprint( sprintf(
"client: serializing data as it is not scalar" ) );
1084 $len = strlen( $val );
1086 if ( $this->_have_zlib && $this->_compress_enable
1087 && $this->_compress_threshold && $len >= $this->_compress_threshold
1089 $c_val = gzcompress( $val, 9 );
1090 $c_len = strlen( $c_val );
1092 if ( $c_len < $len * ( 1 - self::COMPRESSION_SAVINGS ) ) {
1093 if ( $this->_debug ) {
1094 $this->
_debugprint( sprintf(
"client: compressing data; was %d bytes is now %d bytes", $len, $c_len ) );
1102 $command =
"$cmd $key $flags $exp $len";
1107 if ( !$this->
_fwrite( $sock,
"$command\r\n$val\r\n" ) ) {
1113 if ( $this->_debug ) {
1116 if (
$line ===
"STORED" ) {
1118 } elseif (
$line ===
"NOT_STORED" && $cmd ===
"set" ) {
1138 if ( isset( $this->_cache_sock[$host] ) ) {
1139 return $this->_cache_sock[$host];
1144 list( $ip, ) = explode(
':', $host );
1145 if ( isset( $this->_host_dead[$host] ) && $this->_host_dead[$host] > $now ||
1146 isset( $this->_host_dead[$ip] ) && $this->_host_dead[$ip] > $now
1156 stream_set_write_buffer( $sock, 0 );
1158 $this->_cache_sock[$host] = $sock;
1160 return $this->_cache_sock[$host];
1167 $this->_logger->debug( $text );
1174 $this->_logger->error(
"Memcached error: $text" );
1186 $bufSize = strlen( $buf );
1187 while ( $bytesWritten < $bufSize ) {
1188 $result = fwrite( $sock, $buf );
1189 $data = stream_get_meta_data( $sock );
1190 if ( $data[
'timed_out'] ) {
1212 $peer = stream_socket_get_name( $sock,
true );
1213 if ( strval( $peer ) ===
'' ) {
1214 $peer = array_search( $sock, $this->_cache_sock );
1215 if ( $peer ===
false ) {
1216 $peer =
'[unknown host]';
1219 $msg = str_replace(
'$1', $peer, $msg );
1234 while ( $len > 0 ) {
1235 $result = fread( $sock, $len );
1236 $data = stream_get_meta_data( $sock );
1237 if ( $data[
'timed_out'] ) {
1242 $this->
_handle_error( $sock,
'error reading buffer from $1' );
1247 $this->
_handle_error( $sock,
'unexpected end of file reading from $1' );
1268 $data = stream_get_meta_data( $sock );
1269 if ( $data[
'timed_out'] ) {
1270 $this->
_handle_error( $sock,
'timeout reading line from $1' );
1274 $this->
_handle_error( $sock,
'error reading line from $1' );
1277 if ( substr(
$result, -2 ) ===
"\r\n" ) {
1279 } elseif ( substr(
$result, -1 ) ===
"\n" ) {
1282 $this->
_handle_error( $sock,
'line ending missing in response from $1' );
1293 if ( !is_resource( $f ) ) {
1299 $n = stream_select( $r, $w,
$e, 0, 0 );
1300 while ( $n == 1 && !feof( $f ) ) {
1305 $n = stream_select( $r, $w,
$e, 0, 0 );