MediaWiki REL1_41
MemcachedClient.php
Go to the documentation of this file.
1<?php
2// phpcs:ignoreFile -- It's an external lib and it isn't. Let's not bother.
69use Psr\Log\LoggerInterface;
70use Psr\Log\NullLogger;
71use Wikimedia\AtEase\AtEase;
72use Wikimedia\IPUtils;
74
75// {{{ class MemcachedClient
83 // {{{ properties
84 // {{{ public
85
86 // {{{ constants
87 // {{{ flags
88
92 const SERIALIZED = 1;
93
97 const COMPRESSED = 2;
98
102 const INTVAL = 4;
103
104 // }}}
105
110
111 // }}}
112
119 public $stats;
120
121 // }}}
122 // {{{ private
123
131
138 public $_debug;
139
147
155
163
171
179
187
194 public $_servers;
195
202 public $_buckets;
203
211
218 public $_active;
219
227
235
240
245
248
252 private $_logger;
253
254
255 // }}}
256 // }}}
257 // {{{ methods
258 // {{{ public functions
259 // {{{ memcached()
260
266 public function __construct( $args ) {
267 $this->set_servers( $args['servers'] ?? array() );
268 $this->_debug = $args['debug'] ?? false;
269 $this->stats = array();
270 $this->_compress_threshold = $args['compress_threshold'] ?? 0;
271 $this->_persistent = $args['persistent'] ?? false;
272 $this->_compress_enable = true;
273 $this->_have_zlib = function_exists( 'gzcompress' );
274
275 $this->_cache_sock = array();
276 $this->_host_dead = array();
277
278 $this->_timeout_seconds = 0;
279 $this->_timeout_microseconds = $args['timeout'] ?? 500000;
280
281 $this->_connect_timeout = $args['connect_timeout'] ?? 0.1;
282 $this->_connect_attempts = 2;
283
284 $this->_logger = $args['logger'] ?? new NullLogger();
285 }
286
287 // }}}
288
293 public function serialize( $value ) {
294 return serialize( $value );
295 }
296
301 public function unserialize( $value ) {
302 return unserialize( $value );
303 }
304
305 // {{{ add()
306
321 public function add( $key, $val, $exp = 0 ) {
322 $this->_last_cmd_status = self::ERR_NONE;
323
324 return $this->_set( 'add', $key, $val, $exp );
325 }
326
327 // }}}
328 // {{{ decr()
329
338 public function decr( $key, $amt = 1 ) {
339 $this->_last_cmd_status = self::ERR_NONE;
340
341 return $this->_incrdecr( 'decr', $key, $amt );
342 }
343
344 // }}}
345 // {{{ delete()
346
355 public function delete( $key, $time = 0 ) {
356 $this->_last_cmd_status = self::ERR_NONE;
357
358 if ( !$this->_active ) {
359 return false;
360 }
361
362 $sock = $this->get_sock( $key );
363 if ( !$sock ) {
364 return false;
365 }
366
367 $key = is_array( $key ) ? $key[1] : $key;
368
369 if ( isset( $this->stats['delete'] ) ) {
370 $this->stats['delete']++;
371 } else {
372 $this->stats['delete'] = 1;
373 }
374 $cmd = "delete $key $time\r\n";
375 if ( !$this->_fwrite( $sock, $cmd ) ) {
376 return false;
377 }
378 $res = $this->_fgets( $sock );
379
380 if ( $this->_debug ) {
381 $this->_debugprint( sprintf( "MemCache: delete %s (%s)", $key, $res ) );
382 }
383
384 if ( $res == "DELETED" || $res == "NOT_FOUND" ) {
385 return true;
386 }
387
388 $this->_last_cmd_status = self::ERR_UNEXPECTED;
389
390 return false;
391 }
392
401 public function touch( $key, $time = 0 ) {
402 $this->_last_cmd_status = self::ERR_NONE;
403
404 if ( !$this->_active ) {
405 return false;
406 }
407
408 $sock = $this->get_sock( $key );
409 if ( !$sock ) {
410 return false;
411 }
412
413 $key = is_array( $key ) ? $key[1] : $key;
414
415 if ( isset( $this->stats['touch'] ) ) {
416 $this->stats['touch']++;
417 } else {
418 $this->stats['touch'] = 1;
419 }
420 $cmd = "touch $key $time\r\n";
421 if ( !$this->_fwrite( $sock, $cmd ) ) {
422 return false;
423 }
424 $res = $this->_fgets( $sock );
425
426 if ( $this->_debug ) {
427 $this->_debugprint( sprintf( "MemCache: touch %s (%s)", $key, $res ) );
428 }
429
430 if ( $res == "TOUCHED" ) {
431 return true;
432 }
433
434 return false;
435 }
436
437 // }}}
438 // {{{ disconnect_all()
439
443 public function disconnect_all() {
444 foreach ( $this->_cache_sock as $sock ) {
445 fclose( $sock );
446 }
447
448 $this->_cache_sock = array();
449 }
450
451 // }}}
452 // {{{ enable_compress()
453
459 public function enable_compress( $enable ) {
460 $this->_compress_enable = $enable;
461 }
462
463 // }}}
464 // {{{ forget_dead_hosts()
465
469 public function forget_dead_hosts() {
470 $this->_host_dead = array();
471 }
472
473 // }}}
474 // {{{ get()
475
484 public function get( $key, &$casToken = null ) {
485 $getToken = ( func_num_args() >= 2 );
486
487 $this->_last_cmd_status = self::ERR_NONE;
488
489 if ( $this->_debug ) {
490 $this->_debugprint( "get($key)" );
491 }
492
493 if ( !is_array( $key ) && strval( $key ) === '' ) {
494 $this->_last_cmd_status = self::ERR_UNEXPECTED;
495 $this->_debugprint( "Skipping key which equals to an empty string" );
496 return false;
497 }
498
499 if ( !$this->_active ) {
500 $this->_last_cmd_status = self::ERR_UNEXPECTED;
501
502 return false;
503 }
504
505 $sock = $this->get_sock( $key );
506
507 if ( !$sock ) {
508 $this->_last_cmd_status = self::ERR_UNREACHABLE;
509
510 return false;
511 }
512
513 $key = is_array( $key ) ? $key[1] : $key;
514 if ( isset( $this->stats['get'] ) ) {
515 $this->stats['get']++;
516 } else {
517 $this->stats['get'] = 1;
518 }
519
520 $cmd = $getToken ? "gets" : "get";
521 $cmd .= " $key\r\n";
522 if ( !$this->_fwrite( $sock, $cmd ) ) {
523 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
524
525 return false;
526 }
527
528 $val = array();
529 if ( !$this->_load_items( $sock, $val, $casToken ) ) {
530 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
531 }
532
533 if ( $this->_debug ) {
534 foreach ( $val as $k => $v ) {
535 $this->_debugprint(
536 sprintf( "MemCache: sock %s got %s", $this->serialize( $sock ), $k ) );
537 }
538 }
539
540 $value = false;
541 if ( isset( $val[$key] ) ) {
542 $value = $val[$key];
543 }
544 return $value;
545 }
546
547 // }}}
548 // {{{ get_multi()
549
557 public function get_multi( $keys ) {
558 $this->_last_cmd_status = self::ERR_NONE;
559
560 if ( !$this->_active ) {
561 $this->_last_cmd_status = self::ERR_UNEXPECTED;
562
563 return array();
564 }
565
566 if ( isset( $this->stats['get_multi'] ) ) {
567 $this->stats['get_multi']++;
568 } else {
569 $this->stats['get_multi'] = 1;
570 }
571 $sock_keys = array();
572 $socks = array();
573 foreach ( $keys as $key ) {
574 $sock = $this->get_sock( $key );
575 if ( !$sock ) {
576 $this->_last_cmd_status = self::ERR_UNREACHABLE;
577 continue;
578 }
579 $key = is_array( $key ) ? $key[1] : $key;
580 $sockValue = intval( $sock );
581
582 if ( !isset( $sock_keys[$sockValue] ) ) {
583 $sock_keys[$sockValue] = array();
584 $socks[] = $sock;
585 }
586 $sock_keys[$sockValue][] = $key;
587 }
588
589 $gather = array();
590 // Send out the requests
591 foreach ( $socks as $sock ) {
592 $cmd = 'get';
593 foreach ( $sock_keys[intval( $sock )] as $key ) {
594 $cmd .= ' ' . $key;
595 }
596 $cmd .= "\r\n";
597
598 if ( $this->_fwrite( $sock, $cmd ) ) {
599 $gather[] = $sock;
600 } else {
601 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
602 }
603 }
604
605 // Parse responses
606 $val = array();
607 foreach ( $gather as $sock ) {
608 if ( !$this->_load_items( $sock, $val ) ) {
609 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
610 }
611 }
612
613 if ( $this->_debug ) {
614 foreach ( $val as $k => $v ) {
615 $this->_debugprint( sprintf( "MemCache: got %s", $k ) );
616 }
617 }
618
619 return $val;
620 }
621
622 // }}}
623 // {{{ incr()
624
635 public function incr( $key, $amt = 1 ) {
636 return $this->_incrdecr( 'incr', $key, $amt );
637 }
638
639 // }}}
640 // {{{ replace()
641
655 public function replace( $key, $value, $exp = 0 ) {
656 return $this->_set( 'replace', $key, $value, $exp );
657 }
658
659 // }}}
660 // {{{ run_command()
661
671 public function run_command( $sock, $cmd ) {
672 if ( !$sock ) {
673 return array();
674 }
675
676 if ( !$this->_fwrite( $sock, $cmd ) ) {
677 return array();
678 }
679
680 $ret = array();
681 while ( true ) {
682 $res = $this->_fgets( $sock );
683 $ret[] = $res;
684 if ( preg_match( '/^END/', $res ) ) {
685 break;
686 }
687 if ( strlen( $res ) == 0 ) {
688 break;
689 }
690 }
691 return $ret;
692 }
693
694 // }}}
695 // {{{ set()
696
711 public function set( $key, $value, $exp = 0 ) {
712 return $this->_set( 'set', $key, $value, $exp );
713 }
714
715 // }}}
716 // {{{ cas()
717
733 public function cas( $casToken, $key, $value, $exp = 0 ) {
734 return $this->_set( 'cas', $key, $value, $exp, $casToken );
735 }
736
737 // }}}
738 // {{{ set_compress_threshold()
739
745 public function set_compress_threshold( $thresh ) {
746 $this->_compress_threshold = $thresh;
747 }
748
749 // }}}
750 // {{{ set_debug()
751
758 public function set_debug( $dbg ) {
759 $this->_debug = $dbg;
760 }
761
762 // }}}
763 // {{{ set_servers()
764
771 public function set_servers( $list ) {
772 $this->_servers = $list;
773 $this->_active = count( $list );
774 $this->_buckets = null;
775 $this->_bucketcount = 0;
776
777 $this->_single_sock = null;
778 if ( $this->_active == 1 ) {
779 $this->_single_sock = $this->_servers[0];
780 }
781 }
782
789 public function set_timeout( $seconds, $microseconds ) {
790 $this->_timeout_seconds = $seconds;
791 $this->_timeout_microseconds = $microseconds;
792 }
793
794 // }}}
795 // }}}
796 // {{{ private methods
797 // {{{ _close_sock()
798
806 function _close_sock( $sock ) {
807 $host = array_search( $sock, $this->_cache_sock );
808 fclose( $this->_cache_sock[$host] );
809 unset( $this->_cache_sock[$host] );
810 }
811
812 // }}}
813 // {{{ _connect_sock()
814
824 function _connect_sock( &$sock, $host ) {
825 $port = null;
826 $hostAndPort = IPUtils::splitHostAndPort( $host );
827 if ( $hostAndPort ) {
828 $ip = $hostAndPort[0];
829 if ( $hostAndPort[1] ) {
830 $port = $hostAndPort[1];
831 }
832 } else {
833 $ip = $host;
834 }
835 $sock = false;
836 $timeout = $this->_connect_timeout;
837 $errno = $errstr = null;
838 for ( $i = 0; !$sock && $i < $this->_connect_attempts; $i++ ) {
839 AtEase::suppressWarnings();
840 if ( $this->_persistent == 1 ) {
841 $sock = pfsockopen( $ip, $port, $errno, $errstr, $timeout );
842 } else {
843 $sock = fsockopen( $ip, $port, $errno, $errstr, $timeout );
844 }
845 AtEase::restoreWarnings();
846 }
847 if ( !$sock ) {
848 $this->_error_log( "Error connecting to $host: $errstr" );
849 $this->_dead_host( $host );
850 return false;
851 }
852
853 // Initialise timeout
854 stream_set_timeout( $sock, $this->_timeout_seconds, $this->_timeout_microseconds );
855
856 // If the connection was persistent, flush the read buffer in case there
857 // was a previous incomplete request on this connection
858 if ( $this->_persistent ) {
859 $this->_flush_read_buffer( $sock );
860 }
861 return true;
862 }
863
864 // }}}
865 // {{{ _dead_sock()
866
874 function _dead_sock( $sock ) {
875 $host = array_search( $sock, $this->_cache_sock );
876 $this->_dead_host( $host );
877 }
878
882 function _dead_host( $host ) {
883 $hostAndPort = IPUtils::splitHostAndPort( $host );
884 if ( $hostAndPort ) {
885 $ip = $hostAndPort[0];
886 } else {
887 $ip = $host;
888 }
889 $this->_host_dead[$ip] = time() + 30 + intval( rand( 0, 10 ) );
890 $this->_host_dead[$host] = $this->_host_dead[$ip];
891 unset( $this->_cache_sock[$host] );
892 }
893
894 // }}}
895 // {{{ get_sock()
896
905 function get_sock( $key ) {
906 if ( !$this->_active ) {
907 return false;
908 }
909
910 if ( $this->_single_sock !== null ) {
911 return $this->sock_to_host( $this->_single_sock );
912 }
913
914 $hv = is_array( $key ) ? intval( $key[0] ) : $this->_hashfunc( $key );
915 if ( $this->_buckets === null ) {
916 $bu = array();
917 foreach ( $this->_servers as $v ) {
918 if ( is_array( $v ) ) {
919 for ( $i = 0; $i < $v[1]; $i++ ) {
920 $bu[] = $v[0];
921 }
922 } else {
923 $bu[] = $v;
924 }
925 }
926 $this->_buckets = $bu;
927 $this->_bucketcount = count( $bu );
928 }
929
930 $realkey = is_array( $key ) ? $key[1] : $key;
931 for ( $tries = 0; $tries < 20; $tries++ ) {
932 $host = $this->_buckets[$hv % $this->_bucketcount];
933 $sock = $this->sock_to_host( $host );
934 if ( $sock ) {
935 return $sock;
936 }
937 $hv = $this->_hashfunc( $hv . $realkey );
938 }
939
940 return false;
941 }
942
943 // }}}
944 // {{{ _hashfunc()
945
954 function _hashfunc( $key ) {
955 # Hash function must be in [0,0x7ffffff]
956 # We take the first 31 bits of the MD5 hash, which unlike the hash
957 # function used in a previous version of this client, works
958 return hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
959 }
960
961 // }}}
962 // {{{ _incrdecr()
963
974 function _incrdecr( $cmd, $key, $amt = 1 ) {
975 $this->_last_cmd_status = self::ERR_NONE;
976
977 if ( !$this->_active ) {
978 $this->_last_cmd_status = self::ERR_UNEXPECTED;
979
980 return null;
981 }
982
983 $sock = $this->get_sock( $key );
984 if ( !$sock ) {
985 $this->_last_cmd_status = self::ERR_UNREACHABLE;
986
987 return null;
988 }
989
990 $key = is_array( $key ) ? $key[1] : $key;
991 if ( isset( $this->stats[$cmd] ) ) {
992 $this->stats[$cmd]++;
993 } else {
994 $this->stats[$cmd] = 1;
995 }
996 if ( !$this->_fwrite( $sock, "$cmd $key $amt\r\n" ) ) {
997 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
998
999 return null;
1000 }
1001
1002 $line = $this->_fgets( $sock );
1003 if ( $this->_debug ) {
1004 $this->_debugprint( "$cmd($key): $line" );
1005 }
1006
1007 $match = array();
1008 if ( !preg_match( '/^(\d+)/', $line, $match ) ) {
1009 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
1010
1011 return null;
1012 }
1013
1014 return (int)$match[1];
1015 }
1016
1017 // }}}
1018 // {{{ _load_items()
1019
1030 function _load_items( $sock, &$ret, &$casToken = null ) {
1031 $results = array();
1032
1033 while ( 1 ) {
1034 $decl = $this->_fgets( $sock );
1035
1036 if ( $decl === false ) {
1037 /*
1038 * If nothing can be read, something is wrong because we know exactly when
1039 * to stop reading (right after "END") and we return right after that.
1040 */
1041 return false;
1042 } elseif ( preg_match( '/^VALUE (\S+) (\d+) (\d+)(?: (\d+))?$/', $decl, $match ) ) {
1043 /*
1044 * Read all data returned. This can be either one or multiple values.
1045 * Save all that data (in an array) to be processed later: we'll first
1046 * want to continue reading until "END" before doing anything else,
1047 * to make sure that we don't leave our client in a state where it's
1048 * output is not yet fully read.
1049 */
1050 $results[] = array(
1051 $match[1], // rkey
1052 $match[2], // flags
1053 $match[3], // len
1054 $match[4] ?? null, // casToken (appears with "gets" but not "get")
1055 $this->_fread( $sock, $match[3] + 2 ), // data
1056 );
1057 } elseif ( $decl == "END" ) {
1062 foreach ( $results as [ $rkey, $flags, /* length */, $casToken, $data ] ) {
1063 if ( $data === false || substr( $data, -2 ) !== "\r\n" ) {
1064 $this->_handle_error( $sock,
1065 'line ending missing from data block from $1' );
1066 return false;
1067 }
1068 $data = substr( $data, 0, -2 );
1069 $ret[$rkey] = $data;
1070
1071 if ( $this->_have_zlib && $flags & self::COMPRESSED ) {
1072 $ret[$rkey] = gzuncompress( $ret[$rkey] );
1073 }
1074
1075 /*
1076 * This unserialize is the exact reason that we only want to
1077 * process data after having read until "END" (instead of doing
1078 * this right away): "unserialize" can trigger outside code:
1079 * in the event that $ret[$rkey] is a serialized object,
1080 * unserializing it will trigger __wakeup() if present. If that
1081 * function attempted to read from memcached (while we did not
1082 * yet read "END"), these 2 calls would collide.
1083 */
1084 if ( $flags & self::SERIALIZED ) {
1085 $ret[$rkey] = $this->unserialize( $ret[$rkey] );
1086 } elseif ( $flags & self::INTVAL ) {
1087 $ret[$rkey] = intval( $ret[$rkey] );
1088 }
1089 }
1090
1091 return true;
1092 } else {
1093 $this->_handle_error( $sock, 'Error parsing response from $1' );
1094 return false;
1095 }
1096 }
1097 }
1098
1099 // }}}
1100 // {{{ _set()
1101
1118 function _set( $cmd, $key, $val, $exp, $casToken = null ) {
1119 $this->_last_cmd_status = self::ERR_NONE;
1120
1121 if ( !$this->_active ) {
1122 $this->_last_cmd_status = self::ERR_UNEXPECTED;
1123
1124 return false;
1125 }
1126
1127 $sock = $this->get_sock( $key );
1128 if ( !$sock ) {
1129 $this->_last_cmd_status = self::ERR_UNREACHABLE;
1130
1131 return false;
1132 }
1133
1134 if ( isset( $this->stats[$cmd] ) ) {
1135 $this->stats[$cmd]++;
1136 } else {
1137 $this->stats[$cmd] = 1;
1138 }
1139
1140 $flags = 0;
1141
1142 if ( is_int( $val ) ) {
1143 $flags |= self::INTVAL;
1144 } elseif ( !is_scalar( $val ) ) {
1145 $val = $this->serialize( $val );
1146 $flags |= self::SERIALIZED;
1147 if ( $this->_debug ) {
1148 $this->_debugprint( "client: serializing data as it is not scalar" );
1149 }
1150 }
1151
1152 $len = strlen( $val );
1153
1154 if ( $this->_have_zlib && $this->_compress_enable
1155 && $this->_compress_threshold && $len >= $this->_compress_threshold
1156 ) {
1157 $c_val = gzcompress( $val, 9 );
1158 $c_len = strlen( $c_val );
1159
1160 if ( $c_len < $len * ( 1 - self::COMPRESSION_SAVINGS ) ) {
1161 if ( $this->_debug ) {
1162 $this->_debugprint( sprintf( "client: compressing data; was %d bytes is now %d bytes", $len, $c_len ) );
1163 }
1164 $val = $c_val;
1165 $len = $c_len;
1166 $flags |= self::COMPRESSED;
1167 }
1168 }
1169
1170 $command = "$cmd $key $flags $exp $len";
1171 if ( $casToken ) {
1172 $command .= " $casToken";
1173 }
1174
1175 if ( !$this->_fwrite( $sock, "$command\r\n$val\r\n" ) ) {
1176 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
1177
1178 return false;
1179 }
1180
1181 $line = $this->_fgets( $sock );
1182 if ( $this->_debug ) {
1183 $this->_debugprint( sprintf( "%s %s (%s)", $cmd, $key, $line ) );
1184 }
1185
1186 if ( $line === "STORED" ) {
1187 return true;
1188 } elseif ( $line === "NOT_STORED" && $cmd === "set" ) {
1189 // "Not stored" is always used as the mcrouter response with AllAsyncRoute
1190 return true;
1191 }
1192
1193 if ( $line === false ) {
1194 $this->_last_cmd_status = self::ERR_NO_RESPONSE;
1195 }
1196
1197 return false;
1198 }
1199
1200 // }}}
1201 // {{{ sock_to_host()
1202
1211 function sock_to_host( $host ) {
1212 if ( isset( $this->_cache_sock[$host] ) ) {
1213 return $this->_cache_sock[$host];
1214 }
1215
1216 $sock = null;
1217 $now = time();
1218 $hostAndPort = IPUtils::splitHostAndPort( $host );
1219 if ( $hostAndPort ) {
1220 $ip = $hostAndPort[0];
1221 } else {
1222 $ip = $host;
1223 }
1224 if ( isset( $this->_host_dead[$host] ) && $this->_host_dead[$host] > $now ||
1225 isset( $this->_host_dead[$ip] ) && $this->_host_dead[$ip] > $now
1226 ) {
1227 return null;
1228 }
1229
1230 if ( !$this->_connect_sock( $sock, $host ) ) {
1231 return null;
1232 }
1233
1234 // Do not buffer writes
1235 stream_set_write_buffer( $sock, 0 );
1236
1237 $this->_cache_sock[$host] = $sock;
1238
1239 return $this->_cache_sock[$host];
1240 }
1241
1245 function _debugprint( $text ) {
1246 $this->_logger->debug( $text );
1247 }
1248
1252 function _error_log( $text ) {
1253 $this->_logger->error( "Memcached error: $text" );
1254 }
1255
1263 function _fwrite( $sock, $buf ) {
1264 $bytesWritten = 0;
1265 $bufSize = strlen( $buf );
1266 while ( $bytesWritten < $bufSize ) {
1267 $result = fwrite( $sock, $buf );
1268 $data = stream_get_meta_data( $sock );
1269 if ( $data['timed_out'] ) {
1270 $this->_handle_error( $sock, 'timeout writing to $1' );
1271 return false;
1272 }
1273 // Contrary to the documentation, fwrite() returns zero on error in PHP 5.3.
1274 if ( $result === false || $result === 0 ) {
1275 $this->_handle_error( $sock, 'error writing to $1' );
1276 return false;
1277 }
1278 $bytesWritten += $result;
1279 }
1280
1281 return true;
1282 }
1283
1290 function _handle_error( $sock, $msg ) {
1291 $peer = stream_socket_get_name( $sock, true );
1292 if ( strval( $peer ) === '' ) {
1293 $peer = array_search( $sock, $this->_cache_sock );
1294 if ( $peer === false ) {
1295 $peer = '[unknown host]';
1296 }
1297 }
1298 $msg = str_replace( '$1', $peer, $msg );
1299 $this->_error_log( "$msg" );
1300 $this->_dead_sock( $sock );
1301 }
1302
1311 function _fread( $sock, $len ) {
1312 $buf = '';
1313 while ( $len > 0 ) {
1314 $result = fread( $sock, $len );
1315 $data = stream_get_meta_data( $sock );
1316 if ( $data['timed_out'] ) {
1317 $this->_handle_error( $sock, 'timeout reading from $1' );
1318 return false;
1319 }
1320 if ( $result === false ) {
1321 $this->_handle_error( $sock, 'error reading buffer from $1' );
1322 return false;
1323 }
1324 if ( $result === '' ) {
1325 // This will happen if the remote end of the socket is shut down
1326 $this->_handle_error( $sock, 'unexpected end of file reading from $1' );
1327 return false;
1328 }
1329 $len -= strlen( $result );
1330 $buf .= $result;
1331 }
1332 return $buf;
1333 }
1334
1342 function _fgets( $sock ) {
1343 $result = fgets( $sock );
1344 // fgets() may return a partial line if there is a select timeout after
1345 // a successful recv(), so we have to check for a timeout even if we
1346 // got a string response.
1347 $data = stream_get_meta_data( $sock );
1348 if ( $data['timed_out'] ) {
1349 $this->_handle_error( $sock, 'timeout reading line from $1' );
1350 return false;
1351 }
1352 if ( $result === false ) {
1353 $this->_handle_error( $sock, 'error reading line from $1' );
1354 return false;
1355 }
1356 if ( substr( $result, -2 ) === "\r\n" ) {
1357 $result = substr( $result, 0, -2 );
1358 } elseif ( substr( $result, -1 ) === "\n" ) {
1359 $result = substr( $result, 0, -1 );
1360 } else {
1361 $this->_handle_error( $sock, 'line ending missing in response from $1' );
1362 return false;
1363 }
1364 return $result;
1365 }
1366
1371 function _flush_read_buffer( $f ) {
1372 if ( !$f ) {
1373 return;
1374 }
1375 $r = array( $f );
1376 $w = null;
1377 $e = null;
1378 $n = stream_select( $r, $w, $e, 0, 0 );
1379 while ( $n == 1 && !feof( $f ) ) {
1380 fread( $f, 1024 );
1381 $r = array( $f );
1382 $w = null;
1383 $e = null;
1384 $n = stream_select( $r, $w, $e, 0, 0 );
1385 }
1386 }
1387
1388 // }}}
1389 // }}}
1390 // }}}
1391}
1392
1393// }}}
memcached client class implemented using (p)fsockopen()
set_debug( $dbg)
Set the debug flag.
array $_cache_sock
Cached Sockets that are connected.
const SERIALIZED
Flag: indicates data is serialized.
const COMPRESSION_SAVINGS
Minimum savings to store data compressed.
_hashfunc( $key)
Creates a hash integer based on the $key.
set_compress_threshold( $thresh)
Set the compression threshold.
_fread( $sock, $len)
Read the specified number of bytes from a stream.
array $_servers
Array containing ip:port or array(ip:port, weight)
bool $_compress_enable
Do we want to use compression?
decr( $key, $amt=1)
Decrease a value stored on the memcache server.
const INTVAL
Flag: indicates data is an integer.
int $_compress_threshold
At how many bytes should we compress?
int $_timeout_seconds
Stream timeout in seconds.
disconnect_all()
Disconnects all connected sockets.
cas( $casToken, $key, $value, $exp=0)
Sets a key to a given value in the memcache if the current value still corresponds to a known,...
array $_buckets
Our bit buckets.
set_timeout( $seconds, $microseconds)
Sets the timeout for new connections.
_fgets( $sock)
Read a line from a stream.
incr( $key, $amt=1)
Increments $key (optionally) by $amt.
forget_dead_hosts()
Forget about all of the dead hosts.
_incrdecr( $cmd, $key, $amt=1)
Perform increment/decrement on $key.
$_connect_attempts
Number of connection attempts for each server.
add( $key, $val, $exp=0)
Adds a key/value to the memcache server if one isn't already set with that key.
_handle_error( $sock, $msg)
Handle an I/O error.
sock_to_host( $host)
Returns the socket for the host.
int $_last_cmd_status
StorageAwareness:ERR_* constant of the last cache command.
$_connect_timeout
Connect timeout in seconds.
touch( $key, $time=0)
Changes the TTL on a key from the server to $time.
bool $_have_zlib
Is compression available?
int $_bucketcount
Total # of bit buckets we have.
_fwrite( $sock, $buf)
Write to a stream.
bool $_debug
Current debug status; 0 - none to 9 - profiling.
replace( $key, $value, $exp=0)
Overwrites an existing value for key; only works if key is already set.
set_servers( $list)
Set the server list to distribute key gets and puts between.
_load_items( $sock, &$ret, &$casToken=null)
Load items into $ret from $sock.
string $_single_sock
If only using one server; contains ip:port to connect to.
get_multi( $keys)
Get multiple keys from the server(s)
_close_sock( $sock)
Close the specified socket.
run_command( $sock, $cmd)
Passes through $cmd to the memcache server connected by $sock; returns output as an array (null array...
_set( $cmd, $key, $val, $exp, $casToken=null)
Performs the requested storage operation to the memcache server.
int $_timeout_microseconds
Stream timeout in microseconds.
__construct( $args)
Memcache initializer.
_flush_read_buffer( $f)
Flush the read buffer of a stream.
enable_compress( $enable)
Enable / Disable compression.
array $_host_dead
Dead hosts, assoc array, 'host'=>'unixtime when ok to check again'.
const COMPRESSED
Flag: indicates data is compressed.
get_sock( $key)
get_sock
_dead_sock( $sock)
Marks a host as dead until 30-40 seconds in the future.
_connect_sock(&$sock, $host)
Connects $sock to $host, timing out after $timeout.
array $stats
Command statistics.
bool $_persistent
Are we using persistent links?
Generic interface providing error code and quality-of-service constants for object stores.
const ERR_UNREACHABLE
Storage medium could not be reached to establish a connection.
const ERR_UNEXPECTED
Storage medium operation failed due to usage limitations or an I/O error.
const ERR_NO_RESPONSE
Storage medium failed to yield a complete response to an operation.