MediaWiki master
MemcachedClient.php
Go to the documentation of this file.
1<?php
2// phpcs:ignoreFile -- It's an external lib and it isn't. Let's not bother.
69use Psr\Log\LoggerInterface;
70use Psr\Log\NullLogger;
71use Wikimedia\IPUtils;
73
81 // {{{ properties
82 // {{{ public
83
84 // {{{ constants
85 // {{{ flags
86
90 const SERIALIZED = 1;
91
95 const COMPRESSED = 2;
96
100 const INTVAL = 4;
101
102 // }}}
103
108
109 // }}}
110
117 public $stats;
118
119 // }}}
120 // {{{ private
121
129
136 public $_debug;
137
145
153
161
169
177
185
192 public $_servers;
193
200 public $_buckets;
201
209
216 public $_active;
217
225
233
238
243
245 public $_last_cmd_status = BagOStuff::ERR_NONE;
246
250 private $_logger;
251
252
253 // }}}
254 // }}}
255 // {{{ methods
256 // {{{ public functions
257 // {{{ memcached()
258
264 public function __construct( $args ) {
265 $this->set_servers( $args['servers'] ?? array() );
266 $this->_debug = $args['debug'] ?? false;
267 $this->stats = array();
268 $this->_compress_threshold = $args['compress_threshold'] ?? 0;
269 $this->_persistent = $args['persistent'] ?? false;
270 $this->_compress_enable = true;
271 $this->_have_zlib = function_exists( 'gzcompress' );
272
273 $this->_cache_sock = array();
274 $this->_host_dead = array();
275
276 $this->_timeout_seconds = 0;
277 $this->_timeout_microseconds = $args['timeout'] ?? 500_000;
278
279 $this->_connect_timeout = $args['connect_timeout'] ?? 0.1;
280 $this->_connect_attempts = 2;
281
282 $this->_logger = $args['logger'] ?? new NullLogger();
283 }
284
285 // }}}
286
291 public function serialize( $value ) {
292 return serialize( $value );
293 }
294
299 public function unserialize( $value ) {
300 return unserialize( $value );
301 }
302
303 // {{{ add()
304
319 public function add( $key, $val, $exp = 0 ) {
320 $this->_last_cmd_status = BagOStuff::ERR_NONE;
321
322 return $this->_set( 'add', $key, $val, $exp );
323 }
324
325 // }}}
326 // {{{ decr()
327
336 public function decr( $key, $amt = 1 ) {
337 $this->_last_cmd_status = BagOStuff::ERR_NONE;
338
339 return $this->_incrdecr( 'decr', $key, $amt );
340 }
341
342 // }}}
343 // {{{ delete()
344
353 public function delete( $key, $time = 0 ) {
354 $this->_last_cmd_status = BagOStuff::ERR_NONE;
355
356 if ( !$this->_active ) {
357 return false;
358 }
359
360 $sock = $this->get_sock( $key );
361 if ( !$sock ) {
362 return false;
363 }
364
365 $key = is_array( $key ) ? $key[1] : $key;
366
367 if ( isset( $this->stats['delete'] ) ) {
368 $this->stats['delete']++;
369 } else {
370 $this->stats['delete'] = 1;
371 }
372 $cmd = "delete $key $time\r\n";
373 if ( !$this->_fwrite( $sock, $cmd ) ) {
374 return false;
375 }
376 $res = $this->_fgets( $sock );
377
378 if ( $this->_debug ) {
379 $this->_debugprint( sprintf( "MemCache: delete %s (%s)", $key, $res ) );
380 }
381
382 if ( $res == "DELETED" || $res == "NOT_FOUND" ) {
383 return true;
384 }
385
386 $this->_last_cmd_status = BagOStuff::ERR_UNEXPECTED;
387
388 return false;
389 }
390
399 public function touch( $key, $time = 0 ) {
400 $this->_last_cmd_status = BagOStuff::ERR_NONE;
401
402 if ( !$this->_active ) {
403 return false;
404 }
405
406 $sock = $this->get_sock( $key );
407 if ( !$sock ) {
408 return false;
409 }
410
411 $key = is_array( $key ) ? $key[1] : $key;
412
413 if ( isset( $this->stats['touch'] ) ) {
414 $this->stats['touch']++;
415 } else {
416 $this->stats['touch'] = 1;
417 }
418 $cmd = "touch $key $time\r\n";
419 if ( !$this->_fwrite( $sock, $cmd ) ) {
420 return false;
421 }
422 $res = $this->_fgets( $sock );
423
424 if ( $this->_debug ) {
425 $this->_debugprint( sprintf( "MemCache: touch %s (%s)", $key, $res ) );
426 }
427
428 if ( $res == "TOUCHED" ) {
429 return true;
430 }
431
432 return false;
433 }
434
435 // }}}
436 // {{{ disconnect_all()
437
441 public function disconnect_all() {
442 foreach ( $this->_cache_sock as $sock ) {
443 fclose( $sock );
444 }
445
446 $this->_cache_sock = array();
447 }
448
449 // }}}
450 // {{{ enable_compress()
451
457 public function enable_compress( $enable ) {
458 $this->_compress_enable = $enable;
459 }
460
461 // }}}
462 // {{{ forget_dead_hosts()
463
467 public function forget_dead_hosts() {
468 $this->_host_dead = array();
469 }
470
471 // }}}
472 // {{{ get()
473
482 public function get( $key, &$casToken = null ) {
483 $getToken = ( func_num_args() >= 2 );
484
485 $this->_last_cmd_status = BagOStuff::ERR_NONE;
486
487 if ( $this->_debug ) {
488 $this->_debugprint( "get($key)" );
489 }
490
491 if ( !is_array( $key ) && strval( $key ) === '' ) {
492 $this->_last_cmd_status = BagOStuff::ERR_UNEXPECTED;
493 $this->_debugprint( "Skipping key which equals to an empty string" );
494 return false;
495 }
496
497 if ( !$this->_active ) {
498 $this->_last_cmd_status = BagOStuff::ERR_UNEXPECTED;
499
500 return false;
501 }
502
503 $sock = $this->get_sock( $key );
504
505 if ( !$sock ) {
506 $this->_last_cmd_status = BagOStuff::ERR_UNREACHABLE;
507
508 return false;
509 }
510
511 $key = is_array( $key ) ? $key[1] : $key;
512 if ( isset( $this->stats['get'] ) ) {
513 $this->stats['get']++;
514 } else {
515 $this->stats['get'] = 1;
516 }
517
518 $cmd = $getToken ? "gets" : "get";
519 $cmd .= " $key\r\n";
520 if ( !$this->_fwrite( $sock, $cmd ) ) {
521 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
522
523 return false;
524 }
525
526 $val = array();
527 if ( !$this->_load_items( $sock, $val, $casToken ) ) {
528 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
529 }
530
531 if ( $this->_debug ) {
532 foreach ( $val as $k => $v ) {
533 $this->_debugprint(
534 sprintf( "MemCache: sock %s got %s", $this->serialize( $sock ), $k ) );
535 }
536 }
537
538 $value = false;
539 if ( isset( $val[$key] ) ) {
540 $value = $val[$key];
541 }
542 return $value;
543 }
544
545 // }}}
546 // {{{ get_multi()
547
555 public function get_multi( $keys ) {
556 $this->_last_cmd_status = BagOStuff::ERR_NONE;
557
558 if ( !$this->_active ) {
559 $this->_last_cmd_status = BagOStuff::ERR_UNEXPECTED;
560
561 return array();
562 }
563
564 if ( isset( $this->stats['get_multi'] ) ) {
565 $this->stats['get_multi']++;
566 } else {
567 $this->stats['get_multi'] = 1;
568 }
569 $sock_keys = array();
570 $socks = array();
571 foreach ( $keys as $key ) {
572 $sock = $this->get_sock( $key );
573 if ( !$sock ) {
574 $this->_last_cmd_status = BagOStuff::ERR_UNREACHABLE;
575 continue;
576 }
577 $key = is_array( $key ) ? $key[1] : $key;
578 $sockValue = intval( $sock );
579
580 if ( !isset( $sock_keys[$sockValue] ) ) {
581 $sock_keys[$sockValue] = array();
582 $socks[] = $sock;
583 }
584 $sock_keys[$sockValue][] = $key;
585 }
586
587 $gather = array();
588 // Send out the requests
589 foreach ( $socks as $sock ) {
590 $cmd = 'get';
591 foreach ( $sock_keys[intval( $sock )] as $key ) {
592 $cmd .= ' ' . $key;
593 }
594 $cmd .= "\r\n";
595
596 if ( $this->_fwrite( $sock, $cmd ) ) {
597 $gather[] = $sock;
598 } else {
599 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
600 }
601 }
602
603 // Parse responses
604 $val = array();
605 foreach ( $gather as $sock ) {
606 if ( !$this->_load_items( $sock, $val ) ) {
607 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
608 }
609 }
610
611 if ( $this->_debug ) {
612 foreach ( $val as $k => $v ) {
613 $this->_debugprint( sprintf( "MemCache: got %s", $k ) );
614 }
615 }
616
617 return $val;
618 }
619
620 // }}}
621 // {{{ incr()
622
633 public function incr( $key, $amt = 1 ) {
634 return $this->_incrdecr( 'incr', $key, $amt );
635 }
636
637 // }}}
638 // {{{ replace()
639
653 public function replace( $key, $value, $exp = 0 ) {
654 return $this->_set( 'replace', $key, $value, $exp );
655 }
656
657 // }}}
658 // {{{ run_command()
659
669 public function run_command( $sock, $cmd ) {
670 if ( !$sock ) {
671 return array();
672 }
673
674 if ( !$this->_fwrite( $sock, $cmd ) ) {
675 return array();
676 }
677
678 $ret = array();
679 while ( true ) {
680 $res = $this->_fgets( $sock );
681 $ret[] = $res;
682 if ( preg_match( '/^END/', $res ) ) {
683 break;
684 }
685 if ( strlen( $res ) == 0 ) {
686 break;
687 }
688 }
689 return $ret;
690 }
691
692 // }}}
693 // {{{ set()
694
709 public function set( $key, $value, $exp = 0 ) {
710 return $this->_set( 'set', $key, $value, $exp );
711 }
712
713 // }}}
714 // {{{ cas()
715
731 public function cas( $casToken, $key, $value, $exp = 0 ) {
732 return $this->_set( 'cas', $key, $value, $exp, $casToken );
733 }
734
735 // }}}
736 // {{{ set_compress_threshold()
737
743 public function set_compress_threshold( $thresh ) {
744 $this->_compress_threshold = $thresh;
745 }
746
747 // }}}
748 // {{{ set_debug()
749
756 public function set_debug( $dbg ) {
757 $this->_debug = $dbg;
758 }
759
760 // }}}
761 // {{{ set_servers()
762
769 public function set_servers( $list ) {
770 $this->_servers = $list;
771 $this->_active = count( $list );
772 $this->_buckets = null;
773 $this->_bucketcount = 0;
774
775 $this->_single_sock = null;
776 if ( $this->_active == 1 ) {
777 $this->_single_sock = $this->_servers[0];
778 }
779 }
780
787 public function set_timeout( $seconds, $microseconds ) {
788 $this->_timeout_seconds = $seconds;
789 $this->_timeout_microseconds = $microseconds;
790 }
791
792 // }}}
793 // }}}
794 // {{{ private methods
795 // {{{ _close_sock()
796
804 function _close_sock( $sock ) {
805 $host = array_search( $sock, $this->_cache_sock );
806 fclose( $this->_cache_sock[$host] );
807 unset( $this->_cache_sock[$host] );
808 }
809
810 // }}}
811 // {{{ _connect_sock()
812
822 function _connect_sock( &$sock, $host ) {
823 $port = null;
824 $hostAndPort = IPUtils::splitHostAndPort( $host );
825 if ( $hostAndPort ) {
826 $ip = $hostAndPort[0];
827 if ( $hostAndPort[1] ) {
828 $port = $hostAndPort[1];
829 }
830 } else {
831 $ip = $host;
832 }
833 $sock = false;
834 $timeout = $this->_connect_timeout;
835 $errno = $errstr = null;
836 for ( $i = 0; !$sock && $i < $this->_connect_attempts; $i++ ) {
837 // phpcs:ignore Generic.PHP.NoSilencedErrors.Discouraged
838 if ( $this->_persistent == 1 ) {
839 $sock = @pfsockopen( $ip, $port, $errno, $errstr, $timeout );
840 } else {
841 $sock = @fsockopen( $ip, $port, $errno, $errstr, $timeout );
842 }
843 }
844 if ( !$sock ) {
845 $this->_error_log( "Error connecting to $host: $errstr" );
846 $this->_dead_host( $host );
847 return false;
848 }
849
850 // Initialise timeout
851 stream_set_timeout( $sock, $this->_timeout_seconds, $this->_timeout_microseconds );
852
853 // If the connection was persistent, flush the read buffer in case there
854 // was a previous incomplete request on this connection
855 if ( $this->_persistent ) {
856 $this->_flush_read_buffer( $sock );
857 }
858 return true;
859 }
860
861 // }}}
862 // {{{ _dead_sock()
863
871 function _dead_sock( $sock ) {
872 $host = array_search( $sock, $this->_cache_sock );
873 $this->_dead_host( $host );
874 }
875
879 function _dead_host( $host ) {
880 $hostAndPort = IPUtils::splitHostAndPort( $host );
881 if ( $hostAndPort ) {
882 $ip = $hostAndPort[0];
883 } else {
884 $ip = $host;
885 }
886 $this->_host_dead[$ip] = time() + 30 + intval( rand( 0, 10 ) );
887 $this->_host_dead[$host] = $this->_host_dead[$ip];
888 unset( $this->_cache_sock[$host] );
889 }
890
891 // }}}
892 // {{{ get_sock()
893
902 function get_sock( $key ) {
903 if ( !$this->_active ) {
904 return false;
905 }
906
907 if ( $this->_single_sock !== null ) {
908 return $this->sock_to_host( $this->_single_sock );
909 }
910
911 $hv = is_array( $key ) ? intval( $key[0] ) : $this->_hashfunc( $key );
912 if ( $this->_buckets === null ) {
913 $bu = array();
914 foreach ( $this->_servers as $v ) {
915 if ( is_array( $v ) ) {
916 for ( $i = 0; $i < $v[1]; $i++ ) {
917 $bu[] = $v[0];
918 }
919 } else {
920 $bu[] = $v;
921 }
922 }
923 $this->_buckets = $bu;
924 $this->_bucketcount = count( $bu );
925 }
926
927 $realkey = is_array( $key ) ? $key[1] : $key;
928 for ( $tries = 0; $tries < 20; $tries++ ) {
929 $host = $this->_buckets[$hv % $this->_bucketcount];
930 $sock = $this->sock_to_host( $host );
931 if ( $sock ) {
932 return $sock;
933 }
934 $hv = $this->_hashfunc( $hv . $realkey );
935 }
936
937 return false;
938 }
939
940 // }}}
941 // {{{ _hashfunc()
942
951 function _hashfunc( $key ) {
952 # Hash function must be in [0,0x7ffffff]
953 # We take the first 31 bits of the MD5 hash, which unlike the hash
954 # function used in a previous version of this client, works
955 return hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
956 }
957
958 // }}}
959 // {{{ _incrdecr()
960
971 function _incrdecr( $cmd, $key, $amt = 1 ) {
972 $this->_last_cmd_status = BagOStuff::ERR_NONE;
973
974 if ( !$this->_active ) {
975 $this->_last_cmd_status = BagOStuff::ERR_UNEXPECTED;
976
977 return null;
978 }
979
980 $sock = $this->get_sock( $key );
981 if ( !$sock ) {
982 $this->_last_cmd_status = BagOStuff::ERR_UNREACHABLE;
983
984 return null;
985 }
986
987 $key = is_array( $key ) ? $key[1] : $key;
988 if ( isset( $this->stats[$cmd] ) ) {
989 $this->stats[$cmd]++;
990 } else {
991 $this->stats[$cmd] = 1;
992 }
993 if ( !$this->_fwrite( $sock, "$cmd $key $amt\r\n" ) ) {
994 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
995
996 return null;
997 }
998
999 $line = $this->_fgets( $sock );
1000 if ( $this->_debug ) {
1001 $this->_debugprint( "$cmd($key): $line" );
1002 }
1003
1004 $match = array();
1005 if ( !preg_match( '/^(\d+)/', $line, $match ) ) {
1006 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
1007
1008 return null;
1009 }
1010
1011 return (int)$match[1];
1012 }
1013
1014 // }}}
1015 // {{{ _load_items()
1016
1027 function _load_items( $sock, &$ret, &$casToken = null ) {
1028 $results = array();
1029
1030 while ( 1 ) {
1031 $decl = $this->_fgets( $sock );
1032
1033 if ( $decl === false ) {
1034 /*
1035 * If nothing can be read, something is wrong because we know exactly when
1036 * to stop reading (right after "END") and we return right after that.
1037 */
1038 return false;
1039 } elseif ( preg_match( '/^VALUE (\S+) (\d+) (\d+)(?: (\d+))?$/', $decl, $match ) ) {
1040 /*
1041 * Read all data returned. This can be either one or multiple values.
1042 * Save all that data (in an array) to be processed later: we'll first
1043 * want to continue reading until "END" before doing anything else,
1044 * to make sure that we don't leave our client in a state where it's
1045 * output is not yet fully read.
1046 */
1047 $results[] = array(
1048 $match[1], // rkey
1049 $match[2], // flags
1050 $match[3], // len
1051 $match[4] ?? null, // casToken (appears with "gets" but not "get")
1052 $this->_fread( $sock, $match[3] + 2 ), // data
1053 );
1054 } elseif ( $decl == "END" ) {
1059 foreach ( $results as [ $rkey, $flags, /* length */, $casToken, $data ] ) {
1060 if ( $data === false || !str_ends_with( $data, "\r\n" ) ) {
1061 $this->_handle_error( $sock,
1062 'line ending missing from data block from $1' );
1063 return false;
1064 }
1065 $data = substr( $data, 0, -2 );
1066 $ret[$rkey] = $data;
1067
1068 if ( $this->_have_zlib && $flags & self::COMPRESSED ) {
1069 $ret[$rkey] = gzuncompress( $ret[$rkey] );
1070 }
1071
1072 /*
1073 * This unserialize is the exact reason that we only want to
1074 * process data after having read until "END" (instead of doing
1075 * this right away): "unserialize" can trigger outside code:
1076 * in the event that $ret[$rkey] is a serialized object,
1077 * unserializing it will trigger __wakeup() if present. If that
1078 * function attempted to read from memcached (while we did not
1079 * yet read "END"), these 2 calls would collide.
1080 */
1081 if ( $flags & self::SERIALIZED ) {
1082 $ret[$rkey] = $this->unserialize( $ret[$rkey] );
1083 } elseif ( $flags & self::INTVAL ) {
1084 $ret[$rkey] = intval( $ret[$rkey] );
1085 }
1086 }
1087
1088 return true;
1089 } else {
1090 $this->_handle_error( $sock, 'Error parsing response from $1' );
1091 return false;
1092 }
1093 }
1094 }
1095
1096 // }}}
1097 // {{{ _set()
1098
1115 function _set( $cmd, $key, $val, $exp, $casToken = null ) {
1116 $this->_last_cmd_status = BagOStuff::ERR_NONE;
1117
1118 if ( !$this->_active ) {
1119 $this->_last_cmd_status = BagOStuff::ERR_UNEXPECTED;
1120
1121 return false;
1122 }
1123
1124 $sock = $this->get_sock( $key );
1125 if ( !$sock ) {
1126 $this->_last_cmd_status = BagOStuff::ERR_UNREACHABLE;
1127
1128 return false;
1129 }
1130
1131 if ( isset( $this->stats[$cmd] ) ) {
1132 $this->stats[$cmd]++;
1133 } else {
1134 $this->stats[$cmd] = 1;
1135 }
1136
1137 $flags = 0;
1138
1139 if ( is_int( $val ) ) {
1140 $flags |= self::INTVAL;
1141 } elseif ( !is_scalar( $val ) ) {
1142 $val = $this->serialize( $val );
1143 $flags |= self::SERIALIZED;
1144 if ( $this->_debug ) {
1145 $this->_debugprint( "client: serializing data as it is not scalar" );
1146 }
1147 }
1148
1149 $len = strlen( $val );
1150
1151 if ( $this->_have_zlib && $this->_compress_enable
1152 && $this->_compress_threshold && $len >= $this->_compress_threshold
1153 ) {
1154 $c_val = gzcompress( $val, 9 );
1155 $c_len = strlen( $c_val );
1156
1157 if ( $c_len < $len * ( 1 - self::COMPRESSION_SAVINGS ) ) {
1158 if ( $this->_debug ) {
1159 $this->_debugprint( sprintf( "client: compressing data; was %d bytes is now %d bytes", $len, $c_len ) );
1160 }
1161 $val = $c_val;
1162 $len = $c_len;
1163 $flags |= self::COMPRESSED;
1164 }
1165 }
1166
1167 $command = "$cmd $key $flags $exp $len";
1168 if ( $casToken ) {
1169 $command .= " $casToken";
1170 }
1171
1172 if ( !$this->_fwrite( $sock, "$command\r\n$val\r\n" ) ) {
1173 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
1174
1175 return false;
1176 }
1177
1178 $line = $this->_fgets( $sock );
1179 if ( $this->_debug ) {
1180 $this->_debugprint( sprintf( "%s %s (%s)", $cmd, $key, $line ) );
1181 }
1182
1183 if ( $line === "STORED" ) {
1184 return true;
1185 } elseif ( $line === "NOT_STORED" && $cmd === "set" ) {
1186 // "Not stored" is always used as the mcrouter response with AllAsyncRoute
1187 return true;
1188 }
1189
1190 if ( $line === false ) {
1191 $this->_last_cmd_status = BagOStuff::ERR_NO_RESPONSE;
1192 }
1193
1194 return false;
1195 }
1196
1197 // }}}
1198 // {{{ sock_to_host()
1199
1208 function sock_to_host( $host ) {
1209 if ( isset( $this->_cache_sock[$host] ) ) {
1210 return $this->_cache_sock[$host];
1211 }
1212
1213 $sock = null;
1214 $now = time();
1215 $hostAndPort = IPUtils::splitHostAndPort( $host );
1216 if ( $hostAndPort ) {
1217 $ip = $hostAndPort[0];
1218 } else {
1219 $ip = $host;
1220 }
1221 if ( isset( $this->_host_dead[$host] ) && $this->_host_dead[$host] > $now ||
1222 isset( $this->_host_dead[$ip] ) && $this->_host_dead[$ip] > $now
1223 ) {
1224 return null;
1225 }
1226
1227 if ( !$this->_connect_sock( $sock, $host ) ) {
1228 return null;
1229 }
1230
1231 // Do not buffer writes
1232 stream_set_write_buffer( $sock, 0 );
1233
1234 $this->_cache_sock[$host] = $sock;
1235
1236 return $this->_cache_sock[$host];
1237 }
1238
1242 function _debugprint( $text ) {
1243 $this->_logger->debug( $text );
1244 }
1245
1249 function _error_log( $text ) {
1250 $this->_logger->error( "Memcached error: $text" );
1251 }
1252
1260 function _fwrite( $sock, $buf ) {
1261 $bytesWritten = 0;
1262 $bufSize = strlen( $buf );
1263 while ( $bytesWritten < $bufSize ) {
1264 $result = fwrite( $sock, $buf );
1265 $data = stream_get_meta_data( $sock );
1266 if ( $data['timed_out'] ) {
1267 $this->_handle_error( $sock, 'timeout writing to $1' );
1268 return false;
1269 }
1270 // Contrary to the documentation, fwrite() returns zero on error in PHP 5.3.
1271 if ( $result === false || $result === 0 ) {
1272 $this->_handle_error( $sock, 'error writing to $1' );
1273 return false;
1274 }
1275 $bytesWritten += $result;
1276 }
1277
1278 return true;
1279 }
1280
1287 function _handle_error( $sock, $msg ) {
1288 $peer = stream_socket_get_name( $sock, true );
1289 if ( strval( $peer ) === '' ) {
1290 $peer = array_search( $sock, $this->_cache_sock );
1291 if ( $peer === false ) {
1292 $peer = '[unknown host]';
1293 }
1294 }
1295 $msg = str_replace( '$1', $peer, $msg );
1296 $this->_error_log( "$msg" );
1297 $this->_dead_sock( $sock );
1298 }
1299
1308 function _fread( $sock, $len ) {
1309 $buf = '';
1310 while ( $len > 0 ) {
1311 $result = fread( $sock, $len );
1312 $data = stream_get_meta_data( $sock );
1313 if ( $data['timed_out'] ) {
1314 $this->_handle_error( $sock, 'timeout reading from $1' );
1315 return false;
1316 }
1317 if ( $result === false ) {
1318 $this->_handle_error( $sock, 'error reading buffer from $1' );
1319 return false;
1320 }
1321 if ( $result === '' ) {
1322 // This will happen if the remote end of the socket is shut down
1323 $this->_handle_error( $sock, 'unexpected end of file reading from $1' );
1324 return false;
1325 }
1326 $len -= strlen( $result );
1327 $buf .= $result;
1328 }
1329 return $buf;
1330 }
1331
1339 function _fgets( $sock ) {
1340 $result = fgets( $sock );
1341 // fgets() may return a partial line if there is a select timeout after
1342 // a successful recv(), so we have to check for a timeout even if we
1343 // got a string response.
1344 $data = stream_get_meta_data( $sock );
1345 if ( $data['timed_out'] ) {
1346 $this->_handle_error( $sock, 'timeout reading line from $1' );
1347 return false;
1348 }
1349 if ( $result === false ) {
1350 $this->_handle_error( $sock, 'error reading line from $1' );
1351 return false;
1352 }
1353 if ( str_ends_with( $result, "\r\n" ) ) {
1354 $result = substr( $result, 0, -2 );
1355 } elseif ( str_ends_with( $result, "\n" ) ) {
1356 $result = substr( $result, 0, -1 );
1357 } else {
1358 $this->_handle_error( $sock, 'line ending missing in response from $1' );
1359 return false;
1360 }
1361 return $result;
1362 }
1363
1368 function _flush_read_buffer( $f ) {
1369 if ( !$f ) {
1370 return;
1371 }
1372 $r = array( $f );
1373 $w = null;
1374 $e = null;
1375 $n = stream_select( $r, $w, $e, 0, 0 );
1376 while ( $n == 1 && !feof( $f ) ) {
1377 fread( $f, 1024 );
1378 $r = array( $f );
1379 $w = null;
1380 $e = null;
1381 $n = stream_select( $r, $w, $e, 0, 0 );
1382 }
1383 }
1384
1385 // }}}
1386 // }}}
1387 // }}}
1388}
memcached client class implemented using (p)fsockopen()
set_debug( $dbg)
Set the debug flag.
array $_cache_sock
Cached Sockets that are connected.
const SERIALIZED
Flag: indicates data is serialized.
const COMPRESSION_SAVINGS
Minimum savings to store data compressed.
_hashfunc( $key)
Creates a hash integer based on the $key.
set_compress_threshold( $thresh)
Set the compression threshold.
_fread( $sock, $len)
Read the specified number of bytes from a stream.
array $_servers
Array containing ip:port or array(ip:port, weight)
bool $_compress_enable
Do we want to use compression?
decr( $key, $amt=1)
Decrease a value stored on the memcache server.
const INTVAL
Flag: indicates data is an integer.
int $_compress_threshold
At how many bytes should we compress?
int $_timeout_seconds
Stream timeout in seconds.
disconnect_all()
Disconnects all connected sockets.
cas( $casToken, $key, $value, $exp=0)
Sets a key to a given value in the memcache if the current value still corresponds to a known,...
array $_buckets
Our bit buckets.
set_timeout( $seconds, $microseconds)
Sets the timeout for new connections.
_fgets( $sock)
Read a line from a stream.
incr( $key, $amt=1)
Increments $key (optionally) by $amt.
forget_dead_hosts()
Forget about all of the dead hosts.
_incrdecr( $cmd, $key, $amt=1)
Perform increment/decrement on $key.
$_connect_attempts
Number of connection attempts for each server.
add( $key, $val, $exp=0)
Adds a key/value to the memcache server if one isn't already set with that key.
_handle_error( $sock, $msg)
Handle an I/O error.
sock_to_host( $host)
Returns the socket for the host.
int $_last_cmd_status
BagOStuff:ERR_* constant of the last cache command.
$_connect_timeout
Connect timeout in seconds.
touch( $key, $time=0)
Changes the TTL on a key from the server to $time.
bool $_have_zlib
Is compression available?
int $_bucketcount
Total # of bit buckets we have.
_fwrite( $sock, $buf)
Write to a stream.
bool $_debug
Current debug status; 0 - none to 9 - profiling.
replace( $key, $value, $exp=0)
Overwrites an existing value for key; only works if key is already set.
set_servers( $list)
Set the server list to distribute key gets and puts between.
_load_items( $sock, &$ret, &$casToken=null)
Load items into $ret from $sock.
string $_single_sock
If only using one server; contains ip:port to connect to.
get_multi( $keys)
Get multiple keys from the server(s)
_close_sock( $sock)
Close the specified socket.
run_command( $sock, $cmd)
Passes through $cmd to the memcache server connected by $sock; returns output as an array (null array...
_set( $cmd, $key, $val, $exp, $casToken=null)
Performs the requested storage operation to the memcache server.
int $_timeout_microseconds
Stream timeout in microseconds.
__construct( $args)
Memcache initializer.
_flush_read_buffer( $f)
Flush the read buffer of a stream.
enable_compress( $enable)
Enable / Disable compression.
array $_host_dead
Dead hosts, assoc array, 'host'=>'unixtime when ok to check again'.
const COMPRESSED
Flag: indicates data is compressed.
get_sock( $key)
get_sock
_dead_sock( $sock)
Marks a host as dead until 30-40 seconds in the future.
_connect_sock(&$sock, $host)
Connects $sock to $host, timing out after $timeout.
array $stats
Command statistics.
bool $_persistent
Are we using persistent links?
Abstract class for any ephemeral data store.
Definition BagOStuff.php:73