MediaWiki  master
MemcachedClient.php
Go to the documentation of this file.
1 <?php
2 // phpcs:ignoreFile -- It's an external lib and it isn't. Let's not bother.
69 use Psr\Log\LoggerInterface;
70 use Psr\Log\NullLogger;
71 use Wikimedia\AtEase\AtEase;
72 use Wikimedia\IPUtils;
74 
75 // {{{ class MemcachedClient
83  // {{{ properties
84  // {{{ public
85 
86  // {{{ constants
87  // {{{ flags
88 
92  const SERIALIZED = 1;
93 
97  const COMPRESSED = 2;
98 
102  const INTVAL = 4;
103 
104  // }}}
105 
109  const COMPRESSION_SAVINGS = 0.20;
110 
111  // }}}
112 
119  public $stats;
120 
121  // }}}
122  // {{{ private
123 
130  public $_cache_sock;
131 
138  public $_debug;
139 
146  public $_host_dead;
147 
154  public $_have_zlib;
155 
163 
171 
178  public $_persistent;
179 
187 
194  public $_servers;
195 
202  public $_buckets;
203 
211 
218  public $_active;
219 
227 
235 
240 
245 
248 
252  private $_logger;
253 
254 
255  // }}}
256  // }}}
257  // {{{ methods
258  // {{{ public functions
259  // {{{ memcached()
260 
266  public function __construct( $args ) {
267  $this->set_servers( $args['servers'] ?? array() );
268  $this->_debug = $args['debug'] ?? false;
269  $this->stats = array();
270  $this->_compress_threshold = $args['compress_threshold'] ?? 0;
271  $this->_persistent = $args['persistent'] ?? false;
272  $this->_compress_enable = true;
273  $this->_have_zlib = function_exists( 'gzcompress' );
274 
275  $this->_cache_sock = array();
276  $this->_host_dead = array();
277 
278  $this->_timeout_seconds = 0;
279  $this->_timeout_microseconds = $args['timeout'] ?? 500000;
280 
281  $this->_connect_timeout = $args['connect_timeout'] ?? 0.1;
282  $this->_connect_attempts = 2;
283 
284  $this->_logger = $args['logger'] ?? new NullLogger();
285  }
286 
287  // }}}
288 
293  public function serialize( $value ) {
294  return serialize( $value );
295  }
296 
301  public function unserialize( $value ) {
302  return unserialize( $value );
303  }
304 
305  // {{{ add()
306 
321  public function add( $key, $val, $exp = 0 ) {
322  $this->_last_cmd_status = self::ERR_NONE;
323 
324  return $this->_set( 'add', $key, $val, $exp );
325  }
326 
327  // }}}
328  // {{{ decr()
329 
338  public function decr( $key, $amt = 1 ) {
339  $this->_last_cmd_status = self::ERR_NONE;
340 
341  return $this->_incrdecr( 'decr', $key, $amt );
342  }
343 
344  // }}}
345  // {{{ delete()
346 
355  public function delete( $key, $time = 0 ) {
356  $this->_last_cmd_status = self::ERR_NONE;
357 
358  if ( !$this->_active ) {
359  return false;
360  }
361 
362  $sock = $this->get_sock( $key );
363  if ( !$sock ) {
364  return false;
365  }
366 
367  $key = is_array( $key ) ? $key[1] : $key;
368 
369  if ( isset( $this->stats['delete'] ) ) {
370  $this->stats['delete']++;
371  } else {
372  $this->stats['delete'] = 1;
373  }
374  $cmd = "delete $key $time\r\n";
375  if ( !$this->_fwrite( $sock, $cmd ) ) {
376  return false;
377  }
378  $res = $this->_fgets( $sock );
379 
380  if ( $this->_debug ) {
381  $this->_debugprint( sprintf( "MemCache: delete %s (%s)", $key, $res ) );
382  }
383 
384  if ( $res == "DELETED" || $res == "NOT_FOUND" ) {
385  return true;
386  }
387 
388  $this->_last_cmd_status = self::ERR_UNEXPECTED;
389 
390  return false;
391  }
392 
401  public function touch( $key, $time = 0 ) {
402  $this->_last_cmd_status = self::ERR_NONE;
403 
404  if ( !$this->_active ) {
405  return false;
406  }
407 
408  $sock = $this->get_sock( $key );
409  if ( !$sock ) {
410  return false;
411  }
412 
413  $key = is_array( $key ) ? $key[1] : $key;
414 
415  if ( isset( $this->stats['touch'] ) ) {
416  $this->stats['touch']++;
417  } else {
418  $this->stats['touch'] = 1;
419  }
420  $cmd = "touch $key $time\r\n";
421  if ( !$this->_fwrite( $sock, $cmd ) ) {
422  return false;
423  }
424  $res = $this->_fgets( $sock );
425 
426  if ( $this->_debug ) {
427  $this->_debugprint( sprintf( "MemCache: touch %s (%s)", $key, $res ) );
428  }
429 
430  if ( $res == "TOUCHED" ) {
431  return true;
432  }
433 
434  return false;
435  }
436 
437  // }}}
438  // {{{ disconnect_all()
439 
443  public function disconnect_all() {
444  foreach ( $this->_cache_sock as $sock ) {
445  fclose( $sock );
446  }
447 
448  $this->_cache_sock = array();
449  }
450 
451  // }}}
452  // {{{ enable_compress()
453 
459  public function enable_compress( $enable ) {
460  $this->_compress_enable = $enable;
461  }
462 
463  // }}}
464  // {{{ forget_dead_hosts()
465 
469  public function forget_dead_hosts() {
470  $this->_host_dead = array();
471  }
472 
473  // }}}
474  // {{{ get()
475 
484  public function get( $key, &$casToken = null ) {
485  $getToken = ( func_num_args() >= 2 );
486 
487  $this->_last_cmd_status = self::ERR_NONE;
488 
489  if ( $this->_debug ) {
490  $this->_debugprint( "get($key)" );
491  }
492 
493  if ( !is_array( $key ) && strval( $key ) === '' ) {
494  $this->_last_cmd_status = self::ERR_UNEXPECTED;
495  $this->_debugprint( "Skipping key which equals to an empty string" );
496  return false;
497  }
498 
499  if ( !$this->_active ) {
500  $this->_last_cmd_status = self::ERR_UNEXPECTED;
501 
502  return false;
503  }
504 
505  $sock = $this->get_sock( $key );
506 
507  if ( !$sock ) {
508  $this->_last_cmd_status = self::ERR_UNREACHABLE;
509 
510  return false;
511  }
512 
513  $key = is_array( $key ) ? $key[1] : $key;
514  if ( isset( $this->stats['get'] ) ) {
515  $this->stats['get']++;
516  } else {
517  $this->stats['get'] = 1;
518  }
519 
520  $cmd = $getToken ? "gets" : "get";
521  $cmd .= " $key\r\n";
522  if ( !$this->_fwrite( $sock, $cmd ) ) {
523  $this->_last_cmd_status = self::ERR_NO_RESPONSE;
524 
525  return false;
526  }
527 
528  $val = array();
529  if ( !$this->_load_items( $sock, $val, $casToken ) ) {
530  $this->_last_cmd_status = self::ERR_NO_RESPONSE;
531  }
532 
533  if ( $this->_debug ) {
534  foreach ( $val as $k => $v ) {
535  $this->_debugprint(
536  sprintf( "MemCache: sock %s got %s", $this->serialize( $sock ), $k ) );
537  }
538  }
539 
540  $value = false;
541  if ( isset( $val[$key] ) ) {
542  $value = $val[$key];
543  }
544  return $value;
545  }
546 
547  // }}}
548  // {{{ get_multi()
549 
557  public function get_multi( $keys ) {
558  $this->_last_cmd_status = self::ERR_NONE;
559 
560  if ( !$this->_active ) {
561  $this->_last_cmd_status = self::ERR_UNEXPECTED;
562 
563  return array();
564  }
565 
566  if ( isset( $this->stats['get_multi'] ) ) {
567  $this->stats['get_multi']++;
568  } else {
569  $this->stats['get_multi'] = 1;
570  }
571  $sock_keys = array();
572  $socks = array();
573  foreach ( $keys as $key ) {
574  $sock = $this->get_sock( $key );
575  if ( !$sock ) {
576  $this->_last_cmd_status = self::ERR_UNREACHABLE;
577  continue;
578  }
579  $key = is_array( $key ) ? $key[1] : $key;
580  $sockValue = intval( $sock );
581 
582  if ( !isset( $sock_keys[$sockValue] ) ) {
583  $sock_keys[$sockValue] = array();
584  $socks[] = $sock;
585  }
586  $sock_keys[$sockValue][] = $key;
587  }
588 
589  $gather = array();
590  // Send out the requests
591  foreach ( $socks as $sock ) {
592  $cmd = 'get';
593  foreach ( $sock_keys[intval( $sock )] as $key ) {
594  $cmd .= ' ' . $key;
595  }
596  $cmd .= "\r\n";
597 
598  if ( $this->_fwrite( $sock, $cmd ) ) {
599  $gather[] = $sock;
600  } else {
601  $this->_last_cmd_status = self::ERR_NO_RESPONSE;
602  }
603  }
604 
605  // Parse responses
606  $val = array();
607  foreach ( $gather as $sock ) {
608  if ( !$this->_load_items( $sock, $val ) ) {
609  $this->_last_cmd_status = self::ERR_NO_RESPONSE;
610  }
611  }
612 
613  if ( $this->_debug ) {
614  foreach ( $val as $k => $v ) {
615  $this->_debugprint( sprintf( "MemCache: got %s", $k ) );
616  }
617  }
618 
619  return $val;
620  }
621 
622  // }}}
623  // {{{ incr()
624 
635  public function incr( $key, $amt = 1 ) {
636  return $this->_incrdecr( 'incr', $key, $amt );
637  }
638 
639  // }}}
640  // {{{ replace()
641 
655  public function replace( $key, $value, $exp = 0 ) {
656  return $this->_set( 'replace', $key, $value, $exp );
657  }
658 
659  // }}}
660  // {{{ run_command()
661 
671  public function run_command( $sock, $cmd ) {
672  if ( !$sock ) {
673  return array();
674  }
675 
676  if ( !$this->_fwrite( $sock, $cmd ) ) {
677  return array();
678  }
679 
680  $ret = array();
681  while ( true ) {
682  $res = $this->_fgets( $sock );
683  $ret[] = $res;
684  if ( preg_match( '/^END/', $res ) ) {
685  break;
686  }
687  if ( strlen( $res ) == 0 ) {
688  break;
689  }
690  }
691  return $ret;
692  }
693 
694  // }}}
695  // {{{ set()
696 
711  public function set( $key, $value, $exp = 0 ) {
712  return $this->_set( 'set', $key, $value, $exp );
713  }
714 
715  // }}}
716  // {{{ cas()
717 
733  public function cas( $casToken, $key, $value, $exp = 0 ) {
734  return $this->_set( 'cas', $key, $value, $exp, $casToken );
735  }
736 
737  // }}}
738  // {{{ set_compress_threshold()
739 
745  public function set_compress_threshold( $thresh ) {
746  $this->_compress_threshold = $thresh;
747  }
748 
749  // }}}
750  // {{{ set_debug()
751 
758  public function set_debug( $dbg ) {
759  $this->_debug = $dbg;
760  }
761 
762  // }}}
763  // {{{ set_servers()
764 
771  public function set_servers( $list ) {
772  $this->_servers = $list;
773  $this->_active = count( $list );
774  $this->_buckets = null;
775  $this->_bucketcount = 0;
776 
777  $this->_single_sock = null;
778  if ( $this->_active == 1 ) {
779  $this->_single_sock = $this->_servers[0];
780  }
781  }
782 
789  public function set_timeout( $seconds, $microseconds ) {
790  $this->_timeout_seconds = $seconds;
791  $this->_timeout_microseconds = $microseconds;
792  }
793 
794  // }}}
795  // }}}
796  // {{{ private methods
797  // {{{ _close_sock()
798 
806  function _close_sock( $sock ) {
807  $host = array_search( $sock, $this->_cache_sock );
808  fclose( $this->_cache_sock[$host] );
809  unset( $this->_cache_sock[$host] );
810  }
811 
812  // }}}
813  // {{{ _connect_sock()
814 
824  function _connect_sock( &$sock, $host ) {
825  $port = null;
826  $hostAndPort = IPUtils::splitHostAndPort( $host );
827  if ( $hostAndPort ) {
828  $ip = $hostAndPort[0];
829  if ( $hostAndPort[1] ) {
830  $port = $hostAndPort[1];
831  }
832  } else {
833  $ip = $host;
834  }
835  $sock = false;
836  $timeout = $this->_connect_timeout;
837  $errno = $errstr = null;
838  for ( $i = 0; !$sock && $i < $this->_connect_attempts; $i++ ) {
839  AtEase::suppressWarnings();
840  if ( $this->_persistent == 1 ) {
841  $sock = pfsockopen( $ip, $port, $errno, $errstr, $timeout );
842  } else {
843  $sock = fsockopen( $ip, $port, $errno, $errstr, $timeout );
844  }
845  AtEase::restoreWarnings();
846  }
847  if ( !$sock ) {
848  $this->_error_log( "Error connecting to $host: $errstr" );
849  $this->_dead_host( $host );
850  return false;
851  }
852 
853  // Initialise timeout
854  stream_set_timeout( $sock, $this->_timeout_seconds, $this->_timeout_microseconds );
855 
856  // If the connection was persistent, flush the read buffer in case there
857  // was a previous incomplete request on this connection
858  if ( $this->_persistent ) {
859  $this->_flush_read_buffer( $sock );
860  }
861  return true;
862  }
863 
864  // }}}
865  // {{{ _dead_sock()
866 
874  function _dead_sock( $sock ) {
875  $host = array_search( $sock, $this->_cache_sock );
876  $this->_dead_host( $host );
877  }
878 
882  function _dead_host( $host ) {
883  $hostAndPort = IPUtils::splitHostAndPort( $host );
884  if ( $hostAndPort ) {
885  $ip = $hostAndPort[0];
886  } else {
887  $ip = $host;
888  }
889  $this->_host_dead[$ip] = time() + 30 + intval( rand( 0, 10 ) );
890  $this->_host_dead[$host] = $this->_host_dead[$ip];
891  unset( $this->_cache_sock[$host] );
892  }
893 
894  // }}}
895  // {{{ get_sock()
896 
905  function get_sock( $key ) {
906  if ( !$this->_active ) {
907  return false;
908  }
909 
910  if ( $this->_single_sock !== null ) {
911  return $this->sock_to_host( $this->_single_sock );
912  }
913 
914  $hv = is_array( $key ) ? intval( $key[0] ) : $this->_hashfunc( $key );
915  if ( $this->_buckets === null ) {
916  $bu = array();
917  foreach ( $this->_servers as $v ) {
918  if ( is_array( $v ) ) {
919  for ( $i = 0; $i < $v[1]; $i++ ) {
920  $bu[] = $v[0];
921  }
922  } else {
923  $bu[] = $v;
924  }
925  }
926  $this->_buckets = $bu;
927  $this->_bucketcount = count( $bu );
928  }
929 
930  $realkey = is_array( $key ) ? $key[1] : $key;
931  for ( $tries = 0; $tries < 20; $tries++ ) {
932  $host = $this->_buckets[$hv % $this->_bucketcount];
933  $sock = $this->sock_to_host( $host );
934  if ( $sock ) {
935  return $sock;
936  }
937  $hv = $this->_hashfunc( $hv . $realkey );
938  }
939 
940  return false;
941  }
942 
943  // }}}
944  // {{{ _hashfunc()
945 
954  function _hashfunc( $key ) {
955  # Hash function must be in [0,0x7ffffff]
956  # We take the first 31 bits of the MD5 hash, which unlike the hash
957  # function used in a previous version of this client, works
958  return hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
959  }
960 
961  // }}}
962  // {{{ _incrdecr()
963 
974  function _incrdecr( $cmd, $key, $amt = 1 ) {
975  $this->_last_cmd_status = self::ERR_NONE;
976 
977  if ( !$this->_active ) {
978  $this->_last_cmd_status = self::ERR_UNEXPECTED;
979 
980  return null;
981  }
982 
983  $sock = $this->get_sock( $key );
984  if ( !$sock ) {
985  $this->_last_cmd_status = self::ERR_UNREACHABLE;
986 
987  return null;
988  }
989 
990  $key = is_array( $key ) ? $key[1] : $key;
991  if ( isset( $this->stats[$cmd] ) ) {
992  $this->stats[$cmd]++;
993  } else {
994  $this->stats[$cmd] = 1;
995  }
996  if ( !$this->_fwrite( $sock, "$cmd $key $amt\r\n" ) ) {
997  $this->_last_cmd_status = self::ERR_NO_RESPONSE;
998 
999  return null;
1000  }
1001 
1002  $line = $this->_fgets( $sock );
1003  if ( $this->_debug ) {
1004  $this->_debugprint( "$cmd($key): $line" );
1005  }
1006 
1007  $match = array();
1008  if ( !preg_match( '/^(\d+)/', $line, $match ) ) {
1009  $this->_last_cmd_status = self::ERR_NO_RESPONSE;
1010 
1011  return null;
1012  }
1013 
1014  return (int)$match[1];
1015  }
1016 
1017  // }}}
1018  // {{{ _load_items()
1019 
1030  function _load_items( $sock, &$ret, &$casToken = null ) {
1031  $results = array();
1032 
1033  while ( 1 ) {
1034  $decl = $this->_fgets( $sock );
1035 
1036  if ( $decl === false ) {
1037  /*
1038  * If nothing can be read, something is wrong because we know exactly when
1039  * to stop reading (right after "END") and we return right after that.
1040  */
1041  return false;
1042  } elseif ( preg_match( '/^VALUE (\S+) (\d+) (\d+)(?: (\d+))?$/', $decl, $match ) ) {
1043  /*
1044  * Read all data returned. This can be either one or multiple values.
1045  * Save all that data (in an array) to be processed later: we'll first
1046  * want to continue reading until "END" before doing anything else,
1047  * to make sure that we don't leave our client in a state where it's
1048  * output is not yet fully read.
1049  */
1050  $results[] = array(
1051  $match[1], // rkey
1052  $match[2], // flags
1053  $match[3], // len
1054  $match[4] ?? null, // casToken (appears with "gets" but not "get")
1055  $this->_fread( $sock, $match[3] + 2 ), // data
1056  );
1057  } elseif ( $decl == "END" ) {
1062  foreach ( $results as [ $rkey, $flags, /* length */, $casToken, $data ] ) {
1063  if ( $data === false || substr( $data, -2 ) !== "\r\n" ) {
1064  $this->_handle_error( $sock,
1065  'line ending missing from data block from $1' );
1066  return false;
1067  }
1068  $data = substr( $data, 0, -2 );
1069  $ret[$rkey] = $data;
1070 
1071  if ( $this->_have_zlib && $flags & self::COMPRESSED ) {
1072  $ret[$rkey] = gzuncompress( $ret[$rkey] );
1073  }
1074 
1075  /*
1076  * This unserialize is the exact reason that we only want to
1077  * process data after having read until "END" (instead of doing
1078  * this right away): "unserialize" can trigger outside code:
1079  * in the event that $ret[$rkey] is a serialized object,
1080  * unserializing it will trigger __wakeup() if present. If that
1081  * function attempted to read from memcached (while we did not
1082  * yet read "END"), these 2 calls would collide.
1083  */
1084  if ( $flags & self::SERIALIZED ) {
1085  $ret[$rkey] = $this->unserialize( $ret[$rkey] );
1086  } elseif ( $flags & self::INTVAL ) {
1087  $ret[$rkey] = intval( $ret[$rkey] );
1088  }
1089  }
1090 
1091  return true;
1092  } else {
1093  $this->_handle_error( $sock, 'Error parsing response from $1' );
1094  return false;
1095  }
1096  }
1097  }
1098 
1099  // }}}
1100  // {{{ _set()
1101 
1118  function _set( $cmd, $key, $val, $exp, $casToken = null ) {
1119  $this->_last_cmd_status = self::ERR_NONE;
1120 
1121  if ( !$this->_active ) {
1122  $this->_last_cmd_status = self::ERR_UNEXPECTED;
1123 
1124  return false;
1125  }
1126 
1127  $sock = $this->get_sock( $key );
1128  if ( !$sock ) {
1129  $this->_last_cmd_status = self::ERR_UNREACHABLE;
1130 
1131  return false;
1132  }
1133 
1134  if ( isset( $this->stats[$cmd] ) ) {
1135  $this->stats[$cmd]++;
1136  } else {
1137  $this->stats[$cmd] = 1;
1138  }
1139 
1140  $flags = 0;
1141 
1142  if ( is_int( $val ) ) {
1143  $flags |= self::INTVAL;
1144  } elseif ( !is_scalar( $val ) ) {
1145  $val = $this->serialize( $val );
1146  $flags |= self::SERIALIZED;
1147  if ( $this->_debug ) {
1148  $this->_debugprint( "client: serializing data as it is not scalar" );
1149  }
1150  }
1151 
1152  $len = strlen( $val );
1153 
1154  if ( $this->_have_zlib && $this->_compress_enable
1155  && $this->_compress_threshold && $len >= $this->_compress_threshold
1156  ) {
1157  $c_val = gzcompress( $val, 9 );
1158  $c_len = strlen( $c_val );
1159 
1160  if ( $c_len < $len * ( 1 - self::COMPRESSION_SAVINGS ) ) {
1161  if ( $this->_debug ) {
1162  $this->_debugprint( sprintf( "client: compressing data; was %d bytes is now %d bytes", $len, $c_len ) );
1163  }
1164  $val = $c_val;
1165  $len = $c_len;
1166  $flags |= self::COMPRESSED;
1167  }
1168  }
1169 
1170  $command = "$cmd $key $flags $exp $len";
1171  if ( $casToken ) {
1172  $command .= " $casToken";
1173  }
1174 
1175  if ( !$this->_fwrite( $sock, "$command\r\n$val\r\n" ) ) {
1176  $this->_last_cmd_status = self::ERR_NO_RESPONSE;
1177 
1178  return false;
1179  }
1180 
1181  $line = $this->_fgets( $sock );
1182  if ( $this->_debug ) {
1183  $this->_debugprint( sprintf( "%s %s (%s)", $cmd, $key, $line ) );
1184  }
1185 
1186  if ( $line === "STORED" ) {
1187  return true;
1188  } elseif ( $line === "NOT_STORED" && $cmd === "set" ) {
1189  // "Not stored" is always used as the mcrouter response with AllAsyncRoute
1190  return true;
1191  }
1192 
1193  if ( $line === false ) {
1194  $this->_last_cmd_status = self::ERR_NO_RESPONSE;
1195  }
1196 
1197  return false;
1198  }
1199 
1200  // }}}
1201  // {{{ sock_to_host()
1202 
1211  function sock_to_host( $host ) {
1212  if ( isset( $this->_cache_sock[$host] ) ) {
1213  return $this->_cache_sock[$host];
1214  }
1215 
1216  $sock = null;
1217  $now = time();
1218  $hostAndPort = IPUtils::splitHostAndPort( $host );
1219  if ( $hostAndPort ) {
1220  $ip = $hostAndPort[0];
1221  } else {
1222  $ip = $host;
1223  }
1224  if ( isset( $this->_host_dead[$host] ) && $this->_host_dead[$host] > $now ||
1225  isset( $this->_host_dead[$ip] ) && $this->_host_dead[$ip] > $now
1226  ) {
1227  return null;
1228  }
1229 
1230  if ( !$this->_connect_sock( $sock, $host ) ) {
1231  return null;
1232  }
1233 
1234  // Do not buffer writes
1235  stream_set_write_buffer( $sock, 0 );
1236 
1237  $this->_cache_sock[$host] = $sock;
1238 
1239  return $this->_cache_sock[$host];
1240  }
1241 
1245  function _debugprint( $text ) {
1246  $this->_logger->debug( $text );
1247  }
1248 
1252  function _error_log( $text ) {
1253  $this->_logger->error( "Memcached error: $text" );
1254  }
1255 
1263  function _fwrite( $sock, $buf ) {
1264  $bytesWritten = 0;
1265  $bufSize = strlen( $buf );
1266  while ( $bytesWritten < $bufSize ) {
1267  $result = fwrite( $sock, $buf );
1268  $data = stream_get_meta_data( $sock );
1269  if ( $data['timed_out'] ) {
1270  $this->_handle_error( $sock, 'timeout writing to $1' );
1271  return false;
1272  }
1273  // Contrary to the documentation, fwrite() returns zero on error in PHP 5.3.
1274  if ( $result === false || $result === 0 ) {
1275  $this->_handle_error( $sock, 'error writing to $1' );
1276  return false;
1277  }
1278  $bytesWritten += $result;
1279  }
1280 
1281  return true;
1282  }
1283 
1290  function _handle_error( $sock, $msg ) {
1291  $peer = stream_socket_get_name( $sock, true );
1292  if ( strval( $peer ) === '' ) {
1293  $peer = array_search( $sock, $this->_cache_sock );
1294  if ( $peer === false ) {
1295  $peer = '[unknown host]';
1296  }
1297  }
1298  $msg = str_replace( '$1', $peer, $msg );
1299  $this->_error_log( "$msg" );
1300  $this->_dead_sock( $sock );
1301  }
1302 
1311  function _fread( $sock, $len ) {
1312  $buf = '';
1313  while ( $len > 0 ) {
1314  $result = fread( $sock, $len );
1315  $data = stream_get_meta_data( $sock );
1316  if ( $data['timed_out'] ) {
1317  $this->_handle_error( $sock, 'timeout reading from $1' );
1318  return false;
1319  }
1320  if ( $result === false ) {
1321  $this->_handle_error( $sock, 'error reading buffer from $1' );
1322  return false;
1323  }
1324  if ( $result === '' ) {
1325  // This will happen if the remote end of the socket is shut down
1326  $this->_handle_error( $sock, 'unexpected end of file reading from $1' );
1327  return false;
1328  }
1329  $len -= strlen( $result );
1330  $buf .= $result;
1331  }
1332  return $buf;
1333  }
1334 
1342  function _fgets( $sock ) {
1343  $result = fgets( $sock );
1344  // fgets() may return a partial line if there is a select timeout after
1345  // a successful recv(), so we have to check for a timeout even if we
1346  // got a string response.
1347  $data = stream_get_meta_data( $sock );
1348  if ( $data['timed_out'] ) {
1349  $this->_handle_error( $sock, 'timeout reading line from $1' );
1350  return false;
1351  }
1352  if ( $result === false ) {
1353  $this->_handle_error( $sock, 'error reading line from $1' );
1354  return false;
1355  }
1356  if ( substr( $result, -2 ) === "\r\n" ) {
1357  $result = substr( $result, 0, -2 );
1358  } elseif ( substr( $result, -1 ) === "\n" ) {
1359  $result = substr( $result, 0, -1 );
1360  } else {
1361  $this->_handle_error( $sock, 'line ending missing in response from $1' );
1362  return false;
1363  }
1364  return $result;
1365  }
1366 
1371  function _flush_read_buffer( $f ) {
1372  if ( !$f ) {
1373  return;
1374  }
1375  $r = array( $f );
1376  $w = null;
1377  $e = null;
1378  $n = stream_select( $r, $w, $e, 0, 0 );
1379  while ( $n == 1 && !feof( $f ) ) {
1380  fread( $f, 1024 );
1381  $r = array( $f );
1382  $w = null;
1383  $e = null;
1384  $n = stream_select( $r, $w, $e, 0, 0 );
1385  }
1386  }
1387 
1388  // }}}
1389  // }}}
1390  // }}}
1391 }
1392 
1393 // }}}
memcached client class implemented using (p)fsockopen()
set_debug( $dbg)
Set the debug flag.
array $_cache_sock
Cached Sockets that are connected.
const SERIALIZED
Flag: indicates data is serialized.
const COMPRESSION_SAVINGS
Minimum savings to store data compressed.
_hashfunc( $key)
Creates a hash integer based on the $key.
set_compress_threshold( $thresh)
Set the compression threshold.
_fread( $sock, $len)
Read the specified number of bytes from a stream.
array $_servers
Array containing ip:port or array(ip:port, weight)
bool $_compress_enable
Do we want to use compression?
decr( $key, $amt=1)
Decrease a value stored on the memcache server.
const INTVAL
Flag: indicates data is an integer.
int $_compress_threshold
At how many bytes should we compress?
int $_timeout_seconds
Stream timeout in seconds.
disconnect_all()
Disconnects all connected sockets.
cas( $casToken, $key, $value, $exp=0)
Sets a key to a given value in the memcache if the current value still corresponds to a known,...
array $_buckets
Our bit buckets.
set_timeout( $seconds, $microseconds)
Sets the timeout for new connections.
_fgets( $sock)
Read a line from a stream.
incr( $key, $amt=1)
Increments $key (optionally) by $amt.
forget_dead_hosts()
Forget about all of the dead hosts.
_incrdecr( $cmd, $key, $amt=1)
Perform increment/decrement on $key.
$_connect_attempts
Number of connection attempts for each server.
add( $key, $val, $exp=0)
Adds a key/value to the memcache server if one isn't already set with that key.
_handle_error( $sock, $msg)
Handle an I/O error.
sock_to_host( $host)
Returns the socket for the host.
int $_last_cmd_status
StorageAwareness:ERR_* constant of the last cache command.
$_connect_timeout
Connect timeout in seconds.
touch( $key, $time=0)
Changes the TTL on a key from the server to $time.
bool $_have_zlib
Is compression available?
int $_bucketcount
Total # of bit buckets we have.
_fwrite( $sock, $buf)
Write to a stream.
bool $_debug
Current debug status; 0 - none to 9 - profiling.
replace( $key, $value, $exp=0)
Overwrites an existing value for key; only works if key is already set.
set_servers( $list)
Set the server list to distribute key gets and puts between.
_load_items( $sock, &$ret, &$casToken=null)
Load items into $ret from $sock.
string $_single_sock
If only using one server; contains ip:port to connect to.
get_multi( $keys)
Get multiple keys from the server(s)
_close_sock( $sock)
Close the specified socket.
run_command( $sock, $cmd)
Passes through $cmd to the memcache server connected by $sock; returns output as an array (null array...
_set( $cmd, $key, $val, $exp, $casToken=null)
Performs the requested storage operation to the memcache server.
int $_timeout_microseconds
Stream timeout in microseconds.
__construct( $args)
Memcache initializer.
_flush_read_buffer( $f)
Flush the read buffer of a stream.
enable_compress( $enable)
Enable / Disable compression.
array $_host_dead
Dead hosts, assoc array, 'host'=>'unixtime when ok to check again'.
const COMPRESSED
Flag: indicates data is compressed.
get_sock( $key)
get_sock
_dead_sock( $sock)
Marks a host as dead until 30-40 seconds in the future.
_connect_sock(&$sock, $host)
Connects $sock to $host, timing out after $timeout.
array $stats
Command statistics.
bool $_persistent
Are we using persistent links?
Generic interface providing error code and quality-of-service constants for object stores.
const ERR_UNREACHABLE
Storage medium could not be reached to establish a connection.
const ERR_UNEXPECTED
Storage medium operation failed due to usage limitations or an I/O error.
const ERR_NO_RESPONSE
Storage medium failed to yield a complete response to an operation.