MediaWiki master
MemcachedClient.php
Go to the documentation of this file.
1<?php
2// phpcs:ignoreFile -- It's an external lib and it isn't. Let's not bother.
69use Psr\Log\LoggerInterface;
70use Psr\Log\NullLogger;
71use Wikimedia\AtEase\AtEase;
72use Wikimedia\IPUtils;
74
82 // {{{ properties
83 // {{{ public
84
85 // {{{ constants
86 // {{{ flags
87
91 const SERIALIZED = 1;
92
96 const COMPRESSED = 2;
97
101 const INTVAL = 4;
102
103 // }}}
104
109
110 // }}}
111
118 public $stats;
119
120 // }}}
121 // {{{ private
122
130
137 public $_debug;
138
146
154
162
170
178
186
193 public $_servers;
194
201 public $_buckets;
202
210
217 public $_active;
218
226
234
239
244
247
251 private $_logger;
252
253
254 // }}}
255 // }}}
256 // {{{ methods
257 // {{{ public functions
258 // {{{ memcached()
259
265 public function __construct( $args ) {
266 $this->set_servers( $args['servers'] ?? array() );
267 $this->_debug = $args['debug'] ?? false;
268 $this->stats = array();
269 $this->_compress_threshold = $args['compress_threshold'] ?? 0;
270 $this->_persistent = $args['persistent'] ?? false;
271 $this->_compress_enable = true;
272 $this->_have_zlib = function_exists( 'gzcompress' );
273
274 $this->_cache_sock = array();
275 $this->_host_dead = array();
276
277 $this->_timeout_seconds = 0;
278 $this->_timeout_microseconds = $args['timeout'] ?? 500_000;
279
280 $this->_connect_timeout = $args['connect_timeout'] ?? 0.1;
281 $this->_connect_attempts = 2;
282
283 $this->_logger = $args['logger'] ?? new NullLogger();
284 }
285
286 // }}}
287
292 public function serialize( $value ) {
293 return serialize( $value );
294 }
295
300 public function unserialize( $value ) {
301 return unserialize( $value );
302 }
303
304 // {{{ add()
305
320 public function add( $key, $val, $exp = 0 ) {
321 $this->_last_cmd_status = self::ERR_NONE;
322
323 return $this->_set( 'add', $key, $val, $exp );
324 }
325
326 // }}}
327 // {{{ decr()
328
337 public function decr( $key, $amt = 1 ) {
338 $this->_last_cmd_status = self::ERR_NONE;
339
340 return $this->_incrdecr( 'decr', $key, $amt );
341 }
342
343 // }}}
344 // {{{ delete()
345
354 public function delete( $key, $time = 0 ) {
355 $this->_last_cmd_status = self::ERR_NONE;
356
357 if ( !$this->_active ) {
358 return false;
359 }
360
361 $sock = $this->get_sock( $key );
362 if ( !$sock ) {
363 return false;
364 }
365
366 $key = is_array( $key ) ? $key[1] : $key;
367
368 if ( isset( $this->stats['delete'] ) ) {
369 $this->stats['delete']++;
370 } else {
371 $this->stats['delete'] = 1;
372 }
373 $cmd = "delete $key $time\r\n";
374 if ( !$this->_fwrite( $sock, $cmd ) ) {
375 return false;
376 }
377 $res = $this->_fgets( $sock );
378
379 if ( $this->_debug ) {
380 $this->_debugprint( sprintf( "MemCache: delete %s (%s)", $key, $res ) );
381 }
382
383 if ( $res == "DELETED" || $res == "NOT_FOUND" ) {
384 return true;
385 }
386
387 $this->_last_cmd_status = self::ERR_UNEXPECTED;
388
389 return false;
390 }
391
400 public function touch( $key, $time = 0 ) {
401 $this->_last_cmd_status = self::ERR_NONE;
402
403 if ( !$this->_active ) {
404 return false;
405 }
406
407 $sock = $this->get_sock( $key );
408 if ( !$sock ) {
409 return false;
410 }
411
412 $key = is_array( $key ) ? $key[1] : $key;
413
414 if ( isset( $this->stats['touch'] ) ) {
415 $this->stats['touch']++;
416 } else {
417 $this->stats['touch'] = 1;
418 }
419 $cmd = "touch $key $time\r\n";
420 if ( !$this->_fwrite( $sock, $cmd ) ) {
421 return false;
422 }
423 $res = $this->_fgets( $sock );
424
425 if ( $this->_debug ) {
426 $this->_debugprint( sprintf( "MemCache: touch %s (%s)", $key, $res ) );
427 }
428
429 if ( $res == "TOUCHED" ) {
430 return true;
431 }
432
433 return false;
434 }
435
436 // }}}
437 // {{{ disconnect_all()
438
442 public function disconnect_all() {
443 foreach ( $this->_cache_sock as $sock ) {
444 fclose( $sock );
445 }
446
447 $this->_cache_sock = array();
448 }
449
450 // }}}
451 // {{{ enable_compress()
452
458 public function enable_compress( $enable ) {
459 $this->_compress_enable = $enable;
460 }
461
462 // }}}
463 // {{{ forget_dead_hosts()
464
468 public function forget_dead_hosts() {
469 $this->_host_dead = array();
470 }
471
472 // }}}
473 // {{{ get()
474
483 public function get( $key, &$casToken = null ) {
484 $getToken = ( func_num_args() >= 2 );
485
486 $this->_last_cmd_status = self::ERR_NONE;
487
488 if ( $this->_debug ) {
489 $this->_debugprint( "get($key)" );
490 }
491
492 if ( !is_array( $key ) && strval( $key ) === '' ) {
493 $this->_last_cmd_status = self::ERR_UNEXPECTED;
494 $this->_debugprint( "Skipping key which equals to an empty string" );
495 return false;
496 }
497
498 if ( !$this->_active ) {
499 $this->_last_cmd_status = self::ERR_UNEXPECTED;
500
501 return false;
502 }
503
504 $sock = $this->get_sock( $key );
505
506 if ( !$sock ) {
507 $this->_last_cmd_status = self::ERR_UNREACHABLE;
508
509 return false;
510 }
511
512 $key = is_array( $key ) ? $key[1] : $key;
513 if ( isset( $this->stats['get'] ) ) {
514 $this->stats['get']++;
515 } else {
516 $this->stats['get'] = 1;
517 }
518
519 $cmd = $getToken ? "gets" : "get";
520 $cmd .= " $key\r\n";
521 if ( !$this->_fwrite( $sock, $cmd ) ) {
522 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
523
524 return false;
525 }
526
527 $val = array();
528 if ( !$this->_load_items( $sock, $val, $casToken ) ) {
529 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
530 }
531
532 if ( $this->_debug ) {
533 foreach ( $val as $k => $v ) {
534 $this->_debugprint(
535 sprintf( "MemCache: sock %s got %s", $this->serialize( $sock ), $k ) );
536 }
537 }
538
539 $value = false;
540 if ( isset( $val[$key] ) ) {
541 $value = $val[$key];
542 }
543 return $value;
544 }
545
546 // }}}
547 // {{{ get_multi()
548
556 public function get_multi( $keys ) {
557 $this->_last_cmd_status = self::ERR_NONE;
558
559 if ( !$this->_active ) {
560 $this->_last_cmd_status = self::ERR_UNEXPECTED;
561
562 return array();
563 }
564
565 if ( isset( $this->stats['get_multi'] ) ) {
566 $this->stats['get_multi']++;
567 } else {
568 $this->stats['get_multi'] = 1;
569 }
570 $sock_keys = array();
571 $socks = array();
572 foreach ( $keys as $key ) {
573 $sock = $this->get_sock( $key );
574 if ( !$sock ) {
575 $this->_last_cmd_status = self::ERR_UNREACHABLE;
576 continue;
577 }
578 $key = is_array( $key ) ? $key[1] : $key;
579 $sockValue = intval( $sock );
580
581 if ( !isset( $sock_keys[$sockValue] ) ) {
582 $sock_keys[$sockValue] = array();
583 $socks[] = $sock;
584 }
585 $sock_keys[$sockValue][] = $key;
586 }
587
588 $gather = array();
589 // Send out the requests
590 foreach ( $socks as $sock ) {
591 $cmd = 'get';
592 foreach ( $sock_keys[intval( $sock )] as $key ) {
593 $cmd .= ' ' . $key;
594 }
595 $cmd .= "\r\n";
596
597 if ( $this->_fwrite( $sock, $cmd ) ) {
598 $gather[] = $sock;
599 } else {
600 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
601 }
602 }
603
604 // Parse responses
605 $val = array();
606 foreach ( $gather as $sock ) {
607 if ( !$this->_load_items( $sock, $val ) ) {
608 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
609 }
610 }
611
612 if ( $this->_debug ) {
613 foreach ( $val as $k => $v ) {
614 $this->_debugprint( sprintf( "MemCache: got %s", $k ) );
615 }
616 }
617
618 return $val;
619 }
620
621 // }}}
622 // {{{ incr()
623
634 public function incr( $key, $amt = 1 ) {
635 return $this->_incrdecr( 'incr', $key, $amt );
636 }
637
638 // }}}
639 // {{{ replace()
640
654 public function replace( $key, $value, $exp = 0 ) {
655 return $this->_set( 'replace', $key, $value, $exp );
656 }
657
658 // }}}
659 // {{{ run_command()
660
670 public function run_command( $sock, $cmd ) {
671 if ( !$sock ) {
672 return array();
673 }
674
675 if ( !$this->_fwrite( $sock, $cmd ) ) {
676 return array();
677 }
678
679 $ret = array();
680 while ( true ) {
681 $res = $this->_fgets( $sock );
682 $ret[] = $res;
683 if ( preg_match( '/^END/', $res ) ) {
684 break;
685 }
686 if ( strlen( $res ) == 0 ) {
687 break;
688 }
689 }
690 return $ret;
691 }
692
693 // }}}
694 // {{{ set()
695
710 public function set( $key, $value, $exp = 0 ) {
711 return $this->_set( 'set', $key, $value, $exp );
712 }
713
714 // }}}
715 // {{{ cas()
716
732 public function cas( $casToken, $key, $value, $exp = 0 ) {
733 return $this->_set( 'cas', $key, $value, $exp, $casToken );
734 }
735
736 // }}}
737 // {{{ set_compress_threshold()
738
744 public function set_compress_threshold( $thresh ) {
745 $this->_compress_threshold = $thresh;
746 }
747
748 // }}}
749 // {{{ set_debug()
750
757 public function set_debug( $dbg ) {
758 $this->_debug = $dbg;
759 }
760
761 // }}}
762 // {{{ set_servers()
763
770 public function set_servers( $list ) {
771 $this->_servers = $list;
772 $this->_active = count( $list );
773 $this->_buckets = null;
774 $this->_bucketcount = 0;
775
776 $this->_single_sock = null;
777 if ( $this->_active == 1 ) {
778 $this->_single_sock = $this->_servers[0];
779 }
780 }
781
788 public function set_timeout( $seconds, $microseconds ) {
789 $this->_timeout_seconds = $seconds;
790 $this->_timeout_microseconds = $microseconds;
791 }
792
793 // }}}
794 // }}}
795 // {{{ private methods
796 // {{{ _close_sock()
797
805 function _close_sock( $sock ) {
806 $host = array_search( $sock, $this->_cache_sock );
807 fclose( $this->_cache_sock[$host] );
808 unset( $this->_cache_sock[$host] );
809 }
810
811 // }}}
812 // {{{ _connect_sock()
813
823 function _connect_sock( &$sock, $host ) {
824 $port = null;
825 $hostAndPort = IPUtils::splitHostAndPort( $host );
826 if ( $hostAndPort ) {
827 $ip = $hostAndPort[0];
828 if ( $hostAndPort[1] ) {
829 $port = $hostAndPort[1];
830 }
831 } else {
832 $ip = $host;
833 }
834 $sock = false;
835 $timeout = $this->_connect_timeout;
836 $errno = $errstr = null;
837 for ( $i = 0; !$sock && $i < $this->_connect_attempts; $i++ ) {
838 AtEase::suppressWarnings();
839 if ( $this->_persistent == 1 ) {
840 $sock = pfsockopen( $ip, $port, $errno, $errstr, $timeout );
841 } else {
842 $sock = fsockopen( $ip, $port, $errno, $errstr, $timeout );
843 }
844 AtEase::restoreWarnings();
845 }
846 if ( !$sock ) {
847 $this->_error_log( "Error connecting to $host: $errstr" );
848 $this->_dead_host( $host );
849 return false;
850 }
851
852 // Initialise timeout
853 stream_set_timeout( $sock, $this->_timeout_seconds, $this->_timeout_microseconds );
854
855 // If the connection was persistent, flush the read buffer in case there
856 // was a previous incomplete request on this connection
857 if ( $this->_persistent ) {
858 $this->_flush_read_buffer( $sock );
859 }
860 return true;
861 }
862
863 // }}}
864 // {{{ _dead_sock()
865
873 function _dead_sock( $sock ) {
874 $host = array_search( $sock, $this->_cache_sock );
875 $this->_dead_host( $host );
876 }
877
881 function _dead_host( $host ) {
882 $hostAndPort = IPUtils::splitHostAndPort( $host );
883 if ( $hostAndPort ) {
884 $ip = $hostAndPort[0];
885 } else {
886 $ip = $host;
887 }
888 $this->_host_dead[$ip] = time() + 30 + intval( rand( 0, 10 ) );
889 $this->_host_dead[$host] = $this->_host_dead[$ip];
890 unset( $this->_cache_sock[$host] );
891 }
892
893 // }}}
894 // {{{ get_sock()
895
904 function get_sock( $key ) {
905 if ( !$this->_active ) {
906 return false;
907 }
908
909 if ( $this->_single_sock !== null ) {
910 return $this->sock_to_host( $this->_single_sock );
911 }
912
913 $hv = is_array( $key ) ? intval( $key[0] ) : $this->_hashfunc( $key );
914 if ( $this->_buckets === null ) {
915 $bu = array();
916 foreach ( $this->_servers as $v ) {
917 if ( is_array( $v ) ) {
918 for ( $i = 0; $i < $v[1]; $i++ ) {
919 $bu[] = $v[0];
920 }
921 } else {
922 $bu[] = $v;
923 }
924 }
925 $this->_buckets = $bu;
926 $this->_bucketcount = count( $bu );
927 }
928
929 $realkey = is_array( $key ) ? $key[1] : $key;
930 for ( $tries = 0; $tries < 20; $tries++ ) {
931 $host = $this->_buckets[$hv % $this->_bucketcount];
932 $sock = $this->sock_to_host( $host );
933 if ( $sock ) {
934 return $sock;
935 }
936 $hv = $this->_hashfunc( $hv . $realkey );
937 }
938
939 return false;
940 }
941
942 // }}}
943 // {{{ _hashfunc()
944
953 function _hashfunc( $key ) {
954 # Hash function must be in [0,0x7ffffff]
955 # We take the first 31 bits of the MD5 hash, which unlike the hash
956 # function used in a previous version of this client, works
957 return hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
958 }
959
960 // }}}
961 // {{{ _incrdecr()
962
973 function _incrdecr( $cmd, $key, $amt = 1 ) {
974 $this->_last_cmd_status = self::ERR_NONE;
975
976 if ( !$this->_active ) {
977 $this->_last_cmd_status = self::ERR_UNEXPECTED;
978
979 return null;
980 }
981
982 $sock = $this->get_sock( $key );
983 if ( !$sock ) {
984 $this->_last_cmd_status = self::ERR_UNREACHABLE;
985
986 return null;
987 }
988
989 $key = is_array( $key ) ? $key[1] : $key;
990 if ( isset( $this->stats[$cmd] ) ) {
991 $this->stats[$cmd]++;
992 } else {
993 $this->stats[$cmd] = 1;
994 }
995 if ( !$this->_fwrite( $sock, "$cmd $key $amt\r\n" ) ) {
996 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
997
998 return null;
999 }
1000
1001 $line = $this->_fgets( $sock );
1002 if ( $this->_debug ) {
1003 $this->_debugprint( "$cmd($key): $line" );
1004 }
1005
1006 $match = array();
1007 if ( !preg_match( '/^(\d+)/', $line, $match ) ) {
1008 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
1009
1010 return null;
1011 }
1012
1013 return (int)$match[1];
1014 }
1015
1016 // }}}
1017 // {{{ _load_items()
1018
1029 function _load_items( $sock, &$ret, &$casToken = null ) {
1030 $results = array();
1031
1032 while ( 1 ) {
1033 $decl = $this->_fgets( $sock );
1034
1035 if ( $decl === false ) {
1036 /*
1037 * If nothing can be read, something is wrong because we know exactly when
1038 * to stop reading (right after "END") and we return right after that.
1039 */
1040 return false;
1041 } elseif ( preg_match( '/^VALUE (\S+) (\d+) (\d+)(?: (\d+))?$/', $decl, $match ) ) {
1042 /*
1043 * Read all data returned. This can be either one or multiple values.
1044 * Save all that data (in an array) to be processed later: we'll first
1045 * want to continue reading until "END" before doing anything else,
1046 * to make sure that we don't leave our client in a state where it's
1047 * output is not yet fully read.
1048 */
1049 $results[] = array(
1050 $match[1], // rkey
1051 $match[2], // flags
1052 $match[3], // len
1053 $match[4] ?? null, // casToken (appears with "gets" but not "get")
1054 $this->_fread( $sock, $match[3] + 2 ), // data
1055 );
1056 } elseif ( $decl == "END" ) {
1061 foreach ( $results as [ $rkey, $flags, /* length */, $casToken, $data ] ) {
1062 if ( $data === false || substr( $data, -2 ) !== "\r\n" ) {
1063 $this->_handle_error( $sock,
1064 'line ending missing from data block from $1' );
1065 return false;
1066 }
1067 $data = substr( $data, 0, -2 );
1068 $ret[$rkey] = $data;
1069
1070 if ( $this->_have_zlib && $flags & self::COMPRESSED ) {
1071 $ret[$rkey] = gzuncompress( $ret[$rkey] );
1072 }
1073
1074 /*
1075 * This unserialize is the exact reason that we only want to
1076 * process data after having read until "END" (instead of doing
1077 * this right away): "unserialize" can trigger outside code:
1078 * in the event that $ret[$rkey] is a serialized object,
1079 * unserializing it will trigger __wakeup() if present. If that
1080 * function attempted to read from memcached (while we did not
1081 * yet read "END"), these 2 calls would collide.
1082 */
1083 if ( $flags & self::SERIALIZED ) {
1084 $ret[$rkey] = $this->unserialize( $ret[$rkey] );
1085 } elseif ( $flags & self::INTVAL ) {
1086 $ret[$rkey] = intval( $ret[$rkey] );
1087 }
1088 }
1089
1090 return true;
1091 } else {
1092 $this->_handle_error( $sock, 'Error parsing response from $1' );
1093 return false;
1094 }
1095 }
1096 }
1097
1098 // }}}
1099 // {{{ _set()
1100
1117 function _set( $cmd, $key, $val, $exp, $casToken = null ) {
1118 $this->_last_cmd_status = self::ERR_NONE;
1119
1120 if ( !$this->_active ) {
1121 $this->_last_cmd_status = self::ERR_UNEXPECTED;
1122
1123 return false;
1124 }
1125
1126 $sock = $this->get_sock( $key );
1127 if ( !$sock ) {
1128 $this->_last_cmd_status = self::ERR_UNREACHABLE;
1129
1130 return false;
1131 }
1132
1133 if ( isset( $this->stats[$cmd] ) ) {
1134 $this->stats[$cmd]++;
1135 } else {
1136 $this->stats[$cmd] = 1;
1137 }
1138
1139 $flags = 0;
1140
1141 if ( is_int( $val ) ) {
1142 $flags |= self::INTVAL;
1143 } elseif ( !is_scalar( $val ) ) {
1144 $val = $this->serialize( $val );
1145 $flags |= self::SERIALIZED;
1146 if ( $this->_debug ) {
1147 $this->_debugprint( "client: serializing data as it is not scalar" );
1148 }
1149 }
1150
1151 $len = strlen( $val );
1152
1153 if ( $this->_have_zlib && $this->_compress_enable
1154 && $this->_compress_threshold && $len >= $this->_compress_threshold
1155 ) {
1156 $c_val = gzcompress( $val, 9 );
1157 $c_len = strlen( $c_val );
1158
1159 if ( $c_len < $len * ( 1 - self::COMPRESSION_SAVINGS ) ) {
1160 if ( $this->_debug ) {
1161 $this->_debugprint( sprintf( "client: compressing data; was %d bytes is now %d bytes", $len, $c_len ) );
1162 }
1163 $val = $c_val;
1164 $len = $c_len;
1165 $flags |= self::COMPRESSED;
1166 }
1167 }
1168
1169 $command = "$cmd $key $flags $exp $len";
1170 if ( $casToken ) {
1171 $command .= " $casToken";
1172 }
1173
1174 if ( !$this->_fwrite( $sock, "$command\r\n$val\r\n" ) ) {
1175 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
1176
1177 return false;
1178 }
1179
1180 $line = $this->_fgets( $sock );
1181 if ( $this->_debug ) {
1182 $this->_debugprint( sprintf( "%s %s (%s)", $cmd, $key, $line ) );
1183 }
1184
1185 if ( $line === "STORED" ) {
1186 return true;
1187 } elseif ( $line === "NOT_STORED" && $cmd === "set" ) {
1188 // "Not stored" is always used as the mcrouter response with AllAsyncRoute
1189 return true;
1190 }
1191
1192 if ( $line === false ) {
1193 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
1194 }
1195
1196 return false;
1197 }
1198
1199 // }}}
1200 // {{{ sock_to_host()
1201
1210 function sock_to_host( $host ) {
1211 if ( isset( $this->_cache_sock[$host] ) ) {
1212 return $this->_cache_sock[$host];
1213 }
1214
1215 $sock = null;
1216 $now = time();
1217 $hostAndPort = IPUtils::splitHostAndPort( $host );
1218 if ( $hostAndPort ) {
1219 $ip = $hostAndPort[0];
1220 } else {
1221 $ip = $host;
1222 }
1223 if ( isset( $this->_host_dead[$host] ) && $this->_host_dead[$host] > $now ||
1224 isset( $this->_host_dead[$ip] ) && $this->_host_dead[$ip] > $now
1225 ) {
1226 return null;
1227 }
1228
1229 if ( !$this->_connect_sock( $sock, $host ) ) {
1230 return null;
1231 }
1232
1233 // Do not buffer writes
1234 stream_set_write_buffer( $sock, 0 );
1235
1236 $this->_cache_sock[$host] = $sock;
1237
1238 return $this->_cache_sock[$host];
1239 }
1240
1244 function _debugprint( $text ) {
1245 $this->_logger->debug( $text );
1246 }
1247
1251 function _error_log( $text ) {
1252 $this->_logger->error( "Memcached error: $text" );
1253 }
1254
1262 function _fwrite( $sock, $buf ) {
1263 $bytesWritten = 0;
1264 $bufSize = strlen( $buf );
1265 while ( $bytesWritten < $bufSize ) {
1266 $result = fwrite( $sock, $buf );
1267 $data = stream_get_meta_data( $sock );
1268 if ( $data['timed_out'] ) {
1269 $this->_handle_error( $sock, 'timeout writing to $1' );
1270 return false;
1271 }
1272 // Contrary to the documentation, fwrite() returns zero on error in PHP 5.3.
1273 if ( $result === false || $result === 0 ) {
1274 $this->_handle_error( $sock, 'error writing to $1' );
1275 return false;
1276 }
1277 $bytesWritten += $result;
1278 }
1279
1280 return true;
1281 }
1282
1289 function _handle_error( $sock, $msg ) {
1290 $peer = stream_socket_get_name( $sock, true );
1291 if ( strval( $peer ) === '' ) {
1292 $peer = array_search( $sock, $this->_cache_sock );
1293 if ( $peer === false ) {
1294 $peer = '[unknown host]';
1295 }
1296 }
1297 $msg = str_replace( '$1', $peer, $msg );
1298 $this->_error_log( "$msg" );
1299 $this->_dead_sock( $sock );
1300 }
1301
1310 function _fread( $sock, $len ) {
1311 $buf = '';
1312 while ( $len > 0 ) {
1313 $result = fread( $sock, $len );
1314 $data = stream_get_meta_data( $sock );
1315 if ( $data['timed_out'] ) {
1316 $this->_handle_error( $sock, 'timeout reading from $1' );
1317 return false;
1318 }
1319 if ( $result === false ) {
1320 $this->_handle_error( $sock, 'error reading buffer from $1' );
1321 return false;
1322 }
1323 if ( $result === '' ) {
1324 // This will happen if the remote end of the socket is shut down
1325 $this->_handle_error( $sock, 'unexpected end of file reading from $1' );
1326 return false;
1327 }
1328 $len -= strlen( $result );
1329 $buf .= $result;
1330 }
1331 return $buf;
1332 }
1333
1341 function _fgets( $sock ) {
1342 $result = fgets( $sock );
1343 // fgets() may return a partial line if there is a select timeout after
1344 // a successful recv(), so we have to check for a timeout even if we
1345 // got a string response.
1346 $data = stream_get_meta_data( $sock );
1347 if ( $data['timed_out'] ) {
1348 $this->_handle_error( $sock, 'timeout reading line from $1' );
1349 return false;
1350 }
1351 if ( $result === false ) {
1352 $this->_handle_error( $sock, 'error reading line from $1' );
1353 return false;
1354 }
1355 if ( substr( $result, -2 ) === "\r\n" ) {
1356 $result = substr( $result, 0, -2 );
1357 } elseif ( substr( $result, -1 ) === "\n" ) {
1358 $result = substr( $result, 0, -1 );
1359 } else {
1360 $this->_handle_error( $sock, 'line ending missing in response from $1' );
1361 return false;
1362 }
1363 return $result;
1364 }
1365
1370 function _flush_read_buffer( $f ) {
1371 if ( !$f ) {
1372 return;
1373 }
1374 $r = array( $f );
1375 $w = null;
1376 $e = null;
1377 $n = stream_select( $r, $w, $e, 0, 0 );
1378 while ( $n == 1 && !feof( $f ) ) {
1379 fread( $f, 1024 );
1380 $r = array( $f );
1381 $w = null;
1382 $e = null;
1383 $n = stream_select( $r, $w, $e, 0, 0 );
1384 }
1385 }
1386
1387 // }}}
1388 // }}}
1389 // }}}
1390}
memcached client class implemented using (p)fsockopen()
set_debug( $dbg)
Set the debug flag.
array $_cache_sock
Cached Sockets that are connected.
const SERIALIZED
Flag: indicates data is serialized.
const COMPRESSION_SAVINGS
Minimum savings to store data compressed.
_hashfunc( $key)
Creates a hash integer based on the $key.
set_compress_threshold( $thresh)
Set the compression threshold.
_fread( $sock, $len)
Read the specified number of bytes from a stream.
array $_servers
Array containing ip:port or array(ip:port, weight)
bool $_compress_enable
Do we want to use compression?
decr( $key, $amt=1)
Decrease a value stored on the memcache server.
const INTVAL
Flag: indicates data is an integer.
int $_compress_threshold
At how many bytes should we compress?
int $_timeout_seconds
Stream timeout in seconds.
disconnect_all()
Disconnects all connected sockets.
cas( $casToken, $key, $value, $exp=0)
Sets a key to a given value in the memcache if the current value still corresponds to a known,...
array $_buckets
Our bit buckets.
set_timeout( $seconds, $microseconds)
Sets the timeout for new connections.
_fgets( $sock)
Read a line from a stream.
incr( $key, $amt=1)
Increments $key (optionally) by $amt.
forget_dead_hosts()
Forget about all of the dead hosts.
_incrdecr( $cmd, $key, $amt=1)
Perform increment/decrement on $key.
$_connect_attempts
Number of connection attempts for each server.
add( $key, $val, $exp=0)
Adds a key/value to the memcache server if one isn't already set with that key.
_handle_error( $sock, $msg)
Handle an I/O error.
sock_to_host( $host)
Returns the socket for the host.
int $_last_cmd_status
StorageAwareness:ERR_* constant of the last cache command.
$_connect_timeout
Connect timeout in seconds.
touch( $key, $time=0)
Changes the TTL on a key from the server to $time.
bool $_have_zlib
Is compression available?
int $_bucketcount
Total # of bit buckets we have.
_fwrite( $sock, $buf)
Write to a stream.
bool $_debug
Current debug status; 0 - none to 9 - profiling.
replace( $key, $value, $exp=0)
Overwrites an existing value for key; only works if key is already set.
set_servers( $list)
Set the server list to distribute key gets and puts between.
_load_items( $sock, &$ret, &$casToken=null)
Load items into $ret from $sock.
string $_single_sock
If only using one server; contains ip:port to connect to.
get_multi( $keys)
Get multiple keys from the server(s)
_close_sock( $sock)
Close the specified socket.
run_command( $sock, $cmd)
Passes through $cmd to the memcache server connected by $sock; returns output as an array (null array...
_set( $cmd, $key, $val, $exp, $casToken=null)
Performs the requested storage operation to the memcache server.
int $_timeout_microseconds
Stream timeout in microseconds.
__construct( $args)
Memcache initializer.
_flush_read_buffer( $f)
Flush the read buffer of a stream.
enable_compress( $enable)
Enable / Disable compression.
array $_host_dead
Dead hosts, assoc array, 'host'=>'unixtime when ok to check again'.
const COMPRESSED
Flag: indicates data is compressed.
get_sock( $key)
get_sock
_dead_sock( $sock)
Marks a host as dead until 30-40 seconds in the future.
_connect_sock(&$sock, $host)
Connects $sock to $host, timing out after $timeout.
array $stats
Command statistics.
bool $_persistent
Are we using persistent links?
Generic interface providing error code and quality-of-service constants for object stores.
const ERR_UNREACHABLE
Storage medium could not be reached to establish a connection.
const ERR_UNEXPECTED
Storage medium operation failed due to usage limitations or an I/O error.
const ERR_NO_RESPONSE
Storage medium failed to yield a complete response to an operation.