MediaWiki REL1_37
SqlBagOStuff.php
Go to the documentation of this file.
1<?php
24use Wikimedia\AtEase\AtEase;
25use Wikimedia\ObjectFactory;
34use Wikimedia\ScopedCallback;
35use Wikimedia\Timestamp\ConvertibleTimestamp;
36use Wikimedia\WaitConditionLoop;
37
54 protected $localKeyLb;
56 protected $globalKeyLb;
57
59 protected $serverInfos = [];
61 protected $serverTags = [];
63 protected $lastGarbageCollect = 0;
65 protected $purgePeriod = 10;
67 protected $purgeLimit = 100;
69 protected $numTableShards = 1;
71 protected $writeBatchSize = 100;
73 protected $tableName = 'objectcache';
75 protected $replicaOnly;
78
80 protected $conns;
82 protected $connFailureTimes = [];
84 protected $connFailureErrors = [];
85
86 private const SHARD_LOCAL = 'local';
87 private const SHARD_GLOBAL = 'global';
88
90 private const SAFE_CLOCK_BOUND_SEC = 15;
92 private const SAFE_PURGE_DELAY_SEC = 3600;
94 private const TOMB_SERIAL = '';
98 private const GC_DELAY_SEC = 1;
99
100 private const BLOB_VALUE = 0;
101 private const BLOB_EXPIRY = 1;
102 private const BLOB_CASTOKEN = 2;
103
110 private const INF_TIMESTAMP_PLACEHOLDER = '99991231235959';
111
159 public function __construct( $params ) {
160 parent::__construct( $params );
161
162 $dbType = null;
163 if ( isset( $params['servers'] ) || isset( $params['server'] ) ) {
164 // Configuration uses a direct list of servers.
165 // Object data is horizontally partitioned via key hash.
166 $index = 0;
167 foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) {
168 $this->serverInfos[$index] = $info;
169 // Allow integer-indexes arrays for b/c
170 $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index";
171 $dbType = $info['type'];
172 ++$index;
173 }
174 } else {
175 // Configuration uses the servers defined in LoadBalancer instances.
176 // Object data is vertically partitioned via global vs local keys.
177 if ( isset( $params['globalKeyLB'] ) ) {
178 $this->globalKeyLb = ( $params['globalKeyLB'] instanceof ILoadBalancer )
179 ? $params['globalKeyLB']
180 : ObjectFactory::getObjectFromSpec( $params['globalKeyLB'] );
181 }
182 if ( isset( $params['localKeyLB'] ) ) {
183 $this->localKeyLb = ( $params['localKeyLB'] instanceof ILoadBalancer )
184 ? $params['localKeyLB']
185 : ObjectFactory::getObjectFromSpec( $params['localKeyLB'] );
186 } else {
187 $this->localKeyLb = $this->globalKeyLb;
188 }
189 // When using LoadBalancer instances, one *must* be defined for local keys
190 if ( !$this->localKeyLb ) {
191 throw new InvalidArgumentException(
192 "Config requires 'server', 'servers', or 'localKeyLB'/'globalKeyLB'"
193 );
194 }
195 }
196
197 $this->purgePeriod = intval( $params['purgePeriod'] ?? $this->purgePeriod );
198 $this->purgeLimit = intval( $params['purgeLimit'] ?? $this->purgeLimit );
199 $this->tableName = $params['tableName'] ?? $this->tableName;
200 $this->numTableShards = intval( $params['shards'] ?? $this->numTableShards );
201 $this->writeBatchSize = intval( $params['writeBatchSize'] ?? $this->writeBatchSize );
202 $this->replicaOnly = $params['replicaOnly'] ?? false;
203
204 if ( $params['multiPrimaryMode'] ?? false ) {
205 if ( $dbType !== 'mysql' ) {
206 throw new InvalidArgumentException( "Multi-primary mode only supports MySQL" );
207 }
208
209 $this->multiPrimaryModeType = $dbType;
210 }
211
214 }
215
216 protected function doGet( $key, $flags = 0, &$casToken = null ) {
217 $getToken = ( $casToken === self::PASS_BY_REF );
218 $casToken = null;
219
220 $data = $this->fetchBlobs( [ $key ], $getToken )[$key];
221 if ( $data ) {
222 $result = $this->unserialize( $data[self::BLOB_VALUE] );
223 if ( $getToken && $result !== false ) {
224 $casToken = $data[self::BLOB_CASTOKEN];
225 }
226 $valueSize = strlen( $data[self::BLOB_VALUE] );
227 } else {
228 $result = false;
229 $valueSize = false;
230 }
231
232 $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ null, $valueSize ] ] );
233
234 return $result;
235 }
236
237 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
238 $mtime = $this->getCurrentTime();
239
240 return $this->modifyBlobs(
241 [ $this, 'modifyTableSpecificBlobsForSet' ],
242 $mtime,
243 [ $key => [ $value, $exptime ] ],
244 $flags
245 );
246 }
247
248 protected function doDelete( $key, $flags = 0 ) {
249 $mtime = $this->getCurrentTime();
250
251 return $this->modifyBlobs(
252 [ $this, 'modifyTableSpecificBlobsForDelete' ],
253 $mtime,
254 [ $key => [] ],
255 $flags
256 );
257 }
258
259 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
260 $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
261 if ( $mtime === null ) {
262 // Timeout or I/O error during lock acquisition
263 return false;
264 }
265
266 return $this->modifyBlobs(
267 [ $this, 'modifyTableSpecificBlobsForAdd' ],
268 $mtime,
269 [ $key => [ $value, $exptime ] ],
270 $flags
271 );
272 }
273
274 protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
275 $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
276 if ( $mtime === null ) {
277 // Timeout or I/O error during lock acquisition
278 return false;
279 }
280
281 return $this->modifyBlobs(
282 [ $this, 'modifyTableSpecificBlobsForCas' ],
283 $mtime,
284 [ $key => [ $value, $exptime, $casToken ] ],
285 $flags
286 );
287 }
288
289 protected function doChangeTTL( $key, $exptime, $flags ) {
290 $mtime = $this->getCurrentTime();
291
292 return $this->modifyBlobs(
293 [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
294 $mtime,
295 [ $key => [ $exptime ] ],
296 $flags
297 );
298 }
299
300 public function incrWithInit( $key, $exptime, $value = 1, $init = null, $flags = 0 ) {
301 $value = (int)$value;
302 $init = is_int( $init ) ? $init : $value;
303
304 $mtime = $this->getCurrentTime();
305
306 $result = $this->modifyBlobs(
307 [ $this, 'modifyTableSpecificBlobsForIncrInit' ],
308 $mtime,
309 [ $key => [ $value, $init, $exptime ] ],
310 $flags,
311 $resByKey
312 ) ? $resByKey[$key] : false;
313
314 return $result;
315 }
316
317 public function incr( $key, $value = 1, $flags = 0 ) {
318 return $this->doIncr( $key, $value, $flags );
319 }
320
321 public function decr( $key, $value = 1, $flags = 0 ) {
322 return $this->doIncr( $key, -$value, $flags );
323 }
324
325 private function doIncr( $key, $value = 1, $flags = 0 ) {
326 $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
327 if ( $mtime === null ) {
328 // Timeout or I/O error during lock acquisition
329 return false;
330 }
331
332 $data = $this->fetchBlobs( [ $key ] )[$key];
333 if ( $data ) {
334 $serialValue = $data[self::BLOB_VALUE];
335 if ( $this->isInteger( $serialValue ) ) {
336 $newValue = max( (int)$serialValue + (int)$value, 0 );
337 $result = $this->modifyBlobs(
338 [ $this, 'modifyTableSpecificBlobsForSet' ],
339 $mtime,
340 // Preserve the old expiry timestamp
341 [ $key => [ $newValue, $data[self::BLOB_EXPIRY] ] ],
342 $flags
343 ) ? $newValue : false;
344 } else {
345 $result = false;
346 $this->logger->warning( __METHOD__ . ": $key is a non-integer" );
347 }
348 } else {
349 $result = false;
350 $this->logger->debug( __METHOD__ . ": $key does not exists" );
351 }
352
353 return $result;
354 }
355
356 protected function doGetMulti( array $keys, $flags = 0 ) {
357 $result = [];
358 $valueSizeByKey = [];
359
360 $dataByKey = $this->fetchBlobs( $keys );
361 foreach ( $keys as $key ) {
362 $data = $dataByKey[$key];
363 if ( $data ) {
364 $serialValue = $data[self::BLOB_VALUE];
365 $value = $this->unserialize( $serialValue );
366 if ( $value !== false ) {
367 $result[$key] = $value;
368 }
369 $valueSize = strlen( $serialValue );
370 } else {
371 $valueSize = false;
372 }
373 $valueSizeByKey[$key] = [ null, $valueSize ];
374 }
375
376 $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey );
377
378 return $result;
379 }
380
381 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
382 $mtime = $this->getCurrentTime();
383
384 return $this->modifyBlobs(
385 [ $this, 'modifyTableSpecificBlobsForSet' ],
386 $mtime,
387 array_map(
388 static function ( $value ) use ( $exptime ) {
389 return [ $value, $exptime ];
390 },
391 $data
392 ),
393 $flags
394 );
395 }
396
397 protected function doDeleteMulti( array $keys, $flags = 0 ) {
398 $mtime = $this->getCurrentTime();
399
400 return $this->modifyBlobs(
401 [ $this, 'modifyTableSpecificBlobsForDelete' ],
402 $mtime,
403 array_fill_keys( $keys, [] ),
404 $flags
405 );
406 }
407
408 public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
409 $mtime = $this->getCurrentTime();
410
411 return $this->modifyBlobs(
412 [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
413 $mtime,
414 array_fill_keys( $keys, [ $exptime ] ),
415 $flags
416 );
417 }
418
427 private function getConnection( $shardIndex ) {
428 // Don't keep timing out trying to connect if the server is down
429 if (
430 isset( $this->connFailureErrors[$shardIndex] ) &&
431 ( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
432 ) {
433 throw $this->connFailureErrors[$shardIndex];
434 }
435
436 if ( $shardIndex === self::SHARD_LOCAL ) {
437 $conn = $this->getConnectionViaLoadBalancer( $shardIndex );
438 } elseif ( $shardIndex === self::SHARD_GLOBAL ) {
439 $conn = $this->getConnectionViaLoadBalancer( $shardIndex );
440 } elseif ( is_int( $shardIndex ) ) {
441 if ( isset( $this->serverInfos[$shardIndex] ) ) {
442 $server = $this->serverInfos[$shardIndex];
443 $conn = $this->getConnectionFromServerInfo( $shardIndex, $server );
444 } else {
445 throw new UnexpectedValueException( "Invalid server index #$shardIndex" );
446 }
447 } else {
448 throw new UnexpectedValueException( "Invalid server index '$shardIndex'" );
449 }
450
451 return $conn;
452 }
453
459 private function getKeyLocation( $key ) {
460 if ( $this->serverTags ) {
461 // Striped array of database servers
462 if ( count( $this->serverTags ) == 1 ) {
463 $shardIndex = 0; // short-circuit
464 } else {
465 $sortedServers = $this->serverTags;
466 ArrayUtils::consistentHashSort( $sortedServers, $key );
467 reset( $sortedServers );
468 $shardIndex = key( $sortedServers );
469 }
470 } else {
471 // LoadBalancer based configuration
472 $shardIndex = ( strpos( $key, 'global:' ) === 0 && $this->globalKeyLb )
473 ? self::SHARD_GLOBAL
475 }
476
477 if ( $this->numTableShards > 1 ) {
478 $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
479 $tableIndex = $hash % $this->numTableShards;
480 } else {
481 $tableIndex = null;
482 }
483
484 return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ];
485 }
486
492 private function getTableNameByShard( $index ) {
493 if ( $index !== null && $this->numTableShards > 1 ) {
494 $decimals = strlen( $this->numTableShards - 1 );
495
496 return $this->tableName . sprintf( "%0{$decimals}d", $index );
497 }
498
499 return $this->tableName;
500 }
501
507 private function fetchBlobs( array $keys, bool $getCasToken = false ) {
509 $silenceScope = $this->silenceTransactionProfiler();
510
511 // Initialize order-preserved per-key results; set values for live keys below
512 $dataByKey = array_fill_keys( $keys, null );
513
514 $readTime = (int)$this->getCurrentTime();
515 $keysByTableByShard = [];
516 foreach ( $keys as $key ) {
517 list( $shardIndex, $partitionTable ) = $this->getKeyLocation( $key );
518 $keysByTableByShard[$shardIndex][$partitionTable][] = $key;
519 }
520
521 foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) {
522 try {
523 $db = $this->getConnection( $shardIndex );
524 foreach ( $serverKeys as $partitionTable => $tableKeys ) {
525 $res = $db->select(
526 $partitionTable,
527 $getCasToken
528 ? $this->addCasTokenFields( $db, [ 'keyname', 'value', 'exptime' ] )
529 : [ 'keyname', 'value', 'exptime' ],
530 $this->buildExistenceConditions( $db, $tableKeys, $readTime ),
531 __METHOD__
532 );
533 foreach ( $res as $row ) {
534 $row->shardIndex = $shardIndex;
535 $row->tableName = $partitionTable;
536 $dataByKey[$row->keyname] = $row;
537 }
538 }
539 } catch ( DBError $e ) {
540 $this->handleDBError( $e, $shardIndex );
541 }
542 }
543
544 foreach ( $keys as $key ) {
545 $row = $dataByKey[$key] ?? null;
546 if ( !$row ) {
547 continue;
548 }
549
550 $this->debug( __METHOD__ . ": retrieved $key; expiry time is {$row->exptime}" );
551 try {
552 $db = $this->getConnection( $row->shardIndex );
553 $dataByKey[$key] = [
554 self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ),
555 self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ),
556 self::BLOB_CASTOKEN => $getCasToken
557 ? $this->getCasTokenFromRow( $db, $row )
558 : null
559 ];
560 } catch ( DBQueryError $e ) {
561 $this->handleDBError( $e, $row->shardIndex );
562 }
563 }
564
565 return $dataByKey;
566 }
567
582 private function modifyBlobs(
583 callable $tableWriteCallback,
584 float $mtime,
585 array $argsByKey,
586 int $flags,
587 &$resByKey = []
588 ) {
589 // Initialize order-preserved per-key results; callbacks mark successful results
590 $resByKey = array_fill_keys( array_keys( $argsByKey ), false );
591
593 $silenceScope = $this->silenceTransactionProfiler();
594
595 $argsByKeyByTableByShard = [];
596 foreach ( $argsByKey as $key => $args ) {
597 list( $shardIndex, $partitionTable ) = $this->getKeyLocation( $key );
598 $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args;
599 }
600
601 $shardIndexesAffected = [];
602 foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) {
603 foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) {
604 try {
605 $db = $this->getConnection( $shardIndex );
606 $shardIndexesAffected[] = $shardIndex;
607 $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey );
608 } catch ( DBError $e ) {
609 $this->handleDBError( $e, $shardIndex );
610 continue;
611 }
612 }
613 }
614
615 $success = !in_array( false, $resByKey, true );
616
617 if ( $this->fieldHasFlags( $flags, self::WRITE_SYNC ) ) {
618 foreach ( $shardIndexesAffected as $shardIndex ) {
619 if ( !$this->waitForReplication( $shardIndex ) ) {
620 $success = false;
621 }
622 }
623 }
624
625 foreach ( $shardIndexesAffected as $shardIndex ) {
626 try {
627 $db = $this->getConnection( $shardIndex );
628 $this->occasionallyGarbageCollect( $db );
629 } catch ( DBError $e ) {
630 $this->handleDBError( $e, $shardIndex );
631 }
632 }
633
634 return $success;
635 }
636
653 IDatabase $db,
654 string $ptable,
655 float $mtime,
656 array $argsByKey,
657 array &$resByKey
658 ) {
659 $valueSizesByKey = [];
660
661 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
662
663 if ( $this->multiPrimaryModeType !== null ) {
664 // @TODO: use multi-row upsert() with VALUES() once supported in Database
665 foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
666 $serialValue = $this->getSerialized( $value, $key );
667 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
668 $db->upsert(
669 $ptable,
670 $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
671 [ [ 'keyname' ] ],
672 $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
673 __METHOD__
674 );
675 $resByKey[$key] = true;
676
677 $valueSizesByKey[$key] = [ strlen( $serialValue ), null ];
678 }
679 } else {
680 // T288998: use REPLACE, if possible, to avoid cluttering the binlogs
681 $rows = [];
682 foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
683 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
684 $serialValue = $this->getSerialized( $value, $key );
685 $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
686
687 $valueSizesByKey[$key] = [ strlen( $serialValue ), null ];
688 }
689 $db->replace( $ptable, 'keyname', $rows, __METHOD__ );
690 foreach ( $argsByKey as $key => $unused ) {
691 $resByKey[$key] = true;
692 }
693 }
694
695 $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
696 }
697
715 IDatabase $db,
716 string $ptable,
717 float $mtime,
718 array $argsByKey,
719 array &$resByKey
720 ) {
721 if ( $this->isMultiPrimaryModeEnabled() ) {
722 // Tombstone keys in order to respect eventual consistency
723 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
724 $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (int)$mtime );
725 $rows = [];
726 foreach ( $argsByKey as $key => $arg ) {
727 $rows[] = $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt );
728 }
729 $db->upsert(
730 $ptable,
731 $rows,
732 [ [ 'keyname' ] ],
733 $this->buildUpsertSetForOverwrite( $db, self::TOMB_SERIAL, $expiry, $mt ),
734 __METHOD__
735 );
736 } else {
737 // Just purge the keys since there is only one primary (e.g. "source of truth")
738 $db->delete( $ptable, [ 'keyname' => array_keys( $argsByKey ) ], __METHOD__ );
739 }
740
741 foreach ( $argsByKey as $key => $arg ) {
742 $resByKey[$key] = true;
743 }
744
745 $this->updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) );
746 }
747
769 IDatabase $db,
770 string $ptable,
771 float $mtime,
772 array $argsByKey,
773 array &$resByKey
774 ) {
775 $valueSizesByKey = [];
776
777 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
778
779 // This check must happen outside the write query to respect eventual consistency
780 $existingKeys = $db->selectFieldValues(
781 $ptable,
782 'keyname',
783 $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ),
784 __METHOD__
785 );
786 $existingByKey = array_fill_keys( $existingKeys, true );
787
788 // @TODO: use multi-row upsert() with VALUES() once supported in Database
789 foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
790 if ( isset( $existingByKey[$key] ) ) {
791 $this->logger->debug( __METHOD__ . ": $key already exists" );
792 continue;
793 }
794
795 $serialValue = $this->getSerialized( $value, $key );
796 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
797 $db->upsert(
798 $ptable,
799 $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
800 [ [ 'keyname' ] ],
801 $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
802 __METHOD__
803 );
804 $resByKey[$key] = true;
805
806 $valueSizesByKey[$key] = [ strlen( $serialValue ), null ];
807 }
808
809 $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey );
810 }
811
833 IDatabase $db,
834 string $ptable,
835 float $mtime,
836 array $argsByKey,
837 array &$resByKey
838 ) {
839 $valueSizesByKey = [];
840
841 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
842
843 // This check must happen outside the write query to respect eventual consistency
844 $res = $db->select(
845 $ptable,
846 $this->addCasTokenFields( $db, [ 'keyname' ] ),
847 $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ),
848 __METHOD__
849 );
850
851 $curTokensByKey = [];
852 foreach ( $res as $row ) {
853 $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row );
854 }
855
856 // @TODO: use multi-row upsert() with VALUES() once supported in Database
857 foreach ( $argsByKey as $key => list( $value, $exptime, $casToken ) ) {
858 $curToken = $curTokensByKey[$key] ?? null;
859 if ( $curToken === null ) {
860 $this->logger->debug( __METHOD__ . ": $key does not exists" );
861 continue;
862 }
863
864 if ( $curToken !== $casToken ) {
865 $this->logger->debug( __METHOD__ . ": $key does not have a matching token" );
866 continue;
867 }
868
869 $serialValue = $this->getSerialized( $value, $key );
870 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
871 $db->upsert(
872 $ptable,
873 $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
874 [ [ 'keyname' ] ],
875 $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
876 __METHOD__
877 );
878 $resByKey[$key] = true;
879
880 $valueSizesByKey[$key] = [ strlen( $serialValue ), null ];
881 }
882
883 $this->updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey );
884 }
885
906 IDatabase $db,
907 string $ptable,
908 float $mtime,
909 array $argsByKey,
910 array &$resByKey
911 ) {
912 if ( $this->isMultiPrimaryModeEnabled() ) {
913 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
914
915 $res = $db->select(
916 $ptable,
917 [ 'keyname', 'value' ],
918 $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ),
919 __METHOD__
920 );
921 // @TODO: use multi-row upsert() with VALUES() once supported in Database
922 foreach ( $res as $curRow ) {
923 $key = $curRow->keyname;
924 $serialValue = $this->dbDecodeSerialValue( $db, $curRow->value );
925 list( $exptime ) = $argsByKey[$key];
926 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
927
928 $db->upsert(
929 $ptable,
930 $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
931 [ [ 'keyname' ] ],
932 $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
933 __METHOD__
934 );
935 $resByKey[$key] = true;
936 }
937 } else {
938 $keysBatchesByExpiry = [];
939 foreach ( $argsByKey as $key => list( $exptime ) ) {
940 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
941 $keysBatchesByExpiry[$expiry][] = $key;
942 }
943
944 $existingCount = 0;
945 foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) {
946 $db->update(
947 $ptable,
948 [ 'exptime' => $this->encodeDbExpiry( $db, $expiry ) ],
949 $this->buildExistenceConditions( $db, $keyBatch, (int)$mtime ),
950 __METHOD__
951 );
952 $existingCount += $db->affectedRows();
953 }
954 if ( $existingCount === count( $argsByKey ) ) {
955 foreach ( $argsByKey as $key => $args ) {
956 $resByKey[$key] = true;
957 }
958 }
959 }
960
961 $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) );
962 }
963
984 IDatabase $db,
985 string $ptable,
986 float $mtime,
987 array $argsByKey,
988 array &$resByKey
989 ) {
990 foreach ( $argsByKey as $key => list( $step, $init, $exptime ) ) {
991 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
992 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
993
994 // Use a transaction so that changes from other threads are not visible due to
995 // "consistent reads". This way, the exact post-increment value can be returned.
996 // The "live key exists" check can go inside the write query and remain safe for
997 // replication since the TTL for such keys is either indefinite or very short.
998 $db->startAtomic( __METHOD__ );
999 $db->upsert(
1000 $ptable,
1001 $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
1002 [ [ 'keyname' ] ],
1003 $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ),
1004 __METHOD__
1005 );
1006 $affectedCount = $db->affectedRows();
1007 $row = $db->selectRow( $ptable, 'value', [ 'keyname' => $key ], __METHOD__ );
1008 $db->endAtomic( __METHOD__ );
1009
1010 if ( !$affectedCount || $row === false ) {
1011 $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
1012 continue;
1013 }
1014
1015 $serialValue = $this->dbDecodeSerialValue( $db, $row->value );
1016 if ( !$this->isInteger( $serialValue ) ) {
1017 $this->logger->warning( __METHOD__ . ": got non-integer $key value" );
1018 continue;
1019 }
1020
1021 $resByKey[$key] = (int)$serialValue;
1022 }
1023
1024 $this->updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) );
1025 }
1026
1032 private function makeNewKeyExpiry( $exptime, int $nowTsUnix ) {
1033 $expiry = $this->getExpirationAsTimestamp( $exptime );
1034 // Eventual consistency requires the preservation of recently modified keys.
1035 // Do not create rows with `exptime` fields so low that they might get garbage
1036 // collected before being replicated.
1037 if ( $expiry !== self::TTL_INDEFINITE ) {
1038 $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC );
1039 }
1040
1041 return $expiry;
1042 }
1043
1062 private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) {
1063 if ( !$this->lock( $key, 0 ) ) {
1064 return null;
1065 }
1066
1067 $scope = new ScopedCallback( function () use ( $key ) {
1068 $this->unlock( $key );
1069 } );
1070
1071 return sprintf( '%.6f', $this->locks[$key][self::LOCK_TIME] );
1072 }
1073
1083 private function makeTimestampedModificationToken( $mtime, IDatabase $db ) {
1084 // We have reserved space for upto 6 digits in the microsecond portion of the token.
1085 // This is for future use only (maybe CAS tokens) and not currently used.
1086 // It is currently populated by the microsecond portion returned by microtime,
1087 // which generally has fewer than 6 digits of meaningful precision but can still be useful
1088 // in debugging (to see the token continuously change even during rapid testing).
1089 $seconds = (int)$mtime;
1090 list( , $microseconds ) = explode( '.', sprintf( '%.6f', $mtime ) );
1091
1092 $id = $db->getTopologyBasedServerId() ?? sprintf( '%u', crc32( $db->getServerName() ) );
1093
1094 $token = implode( '', [
1095 // 67 bit integral portion of UNIX timestamp, qualified
1096 \Wikimedia\base_convert(
1097 // 35 bit integral seconds portion of UNIX timestamp
1098 str_pad( base_convert( $seconds, 10, 2 ), 35, '0', STR_PAD_LEFT ) .
1099 // 32 bit ID of the primary database server handling the write
1100 str_pad( base_convert( $id, 10, 2 ), 32, '0', STR_PAD_LEFT ),
1101 2,
1102 36,
1103 13
1104 ),
1105 // 20 bit fractional portion of UNIX timestamp, as integral microseconds
1106 str_pad( base_convert( $microseconds, 10, 36 ), 4, '0', STR_PAD_LEFT )
1107 ] );
1108
1109 if ( strlen( $token ) !== 17 ) {
1110 throw new RuntimeException( "Modification timestamp overflow detected" );
1111 }
1112
1113 return $token;
1114 }
1115
1124 private function buildExistenceConditions( IDatabase $db, $keys, string $time ) {
1125 // Note that tombstones always have past expiration dates
1126 return [
1127 'keyname' => $keys,
1128 'exptime >= ' . $db->addQuotes( $db->timestamp( (int)$time ) )
1129 ];
1130 }
1131
1142 private function buildUpsertRow(
1143 IDatabase $db,
1144 $key,
1145 $serialValue,
1146 int $expiry,
1147 string $mt
1148 ) {
1149 $row = [
1150 'keyname' => $key,
1151 'value' => $this->dbEncodeSerialValue( $db, $serialValue ),
1152 'exptime' => $this->encodeDbExpiry( $db, $expiry )
1153 ];
1154 if ( $this->isMultiPrimaryModeEnabled() ) {
1155 $row['modtoken'] = $mt;
1156 }
1157
1158 return $row;
1159 }
1160
1171 IDatabase $db,
1172 $serialValue,
1173 int $expiry,
1174 string $mt
1175 ) {
1176 $expressionsByColumn = [
1177 'value' => $db->addQuotes( $this->dbEncodeSerialValue( $db, $serialValue ) ),
1178 'exptime' => $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1179 ];
1180
1181 $set = [];
1182 if ( $this->isMultiPrimaryModeEnabled() ) {
1183 // The query might take a while to replicate, during which newer values might get
1184 // written. Qualify the query so that it does not override such values. Note that
1185 // duplicate tokens generated around the same time for a key should still result
1186 // in convergence given the use of server_id in modtoken (providing a total order
1187 // among primary DB servers) and MySQL binlog ordering (providing a total order
1188 // for writes replicating from a given primary DB server).
1189 $expressionsByColumn['modtoken'] = $db->addQuotes( $mt );
1190 foreach ( $expressionsByColumn as $column => $updateExpression ) {
1191 $rhs = $db->conditional(
1192 $db->addQuotes( substr( $mt, 0, 13 ) ) . ' >= SUBSTR(modtoken,0,13)',
1193 $updateExpression,
1194 $column
1195 );
1196 $set[] = "{$column}=" . trim( $rhs );
1197 }
1198 } else {
1199 foreach ( $expressionsByColumn as $column => $updateExpression ) {
1200 $set[] = "{$column}={$updateExpression}";
1201 }
1202 }
1203
1204 return $set;
1205 }
1206
1218 private function buildIncrUpsertSet(
1219 IDatabase $db,
1220 int $step,
1221 int $init,
1222 int $expiry,
1223 string $mt,
1224 int $mtUnixTs
1225 ) {
1226 // Map of (column => (SQL for non-expired key rows, SQL for expired key rows))
1227 $expressionsByColumn = [
1228 'value' => [
1229 $db->buildIntegerCast( 'value' ) . " + {$db->addQuotes( $step )}",
1230 $db->addQuotes( $this->dbEncodeSerialValue( $db, $init ) )
1231 ],
1232 'exptime' => [
1233 'exptime',
1234 $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1235 ]
1236 ];
1237 if ( $this->isMultiPrimaryModeEnabled() ) {
1238 $expressionsByColumn['modtoken'] = [ 'modtoken', $db->addQuotes( $mt ) ];
1239 }
1240
1241 $set = [];
1242 foreach ( $expressionsByColumn as $column => list( $updateExpression, $initExpression ) ) {
1243 $rhs = $db->conditional(
1244 'exptime >= ' . $db->addQuotes( $db->timestamp( $mtUnixTs ) ),
1245 $updateExpression,
1246 $initExpression
1247 );
1248 $set[] = "{$column}=" . trim( $rhs );
1249 }
1250
1251 return $set;
1252 }
1253
1259 private function encodeDbExpiry( IDatabase $db, int $expiry ) {
1260 return ( $expiry === self::TTL_INDEFINITE )
1261 // Use the maximum timestamp that the column can store
1262 ? $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER )
1263 // Convert the absolute timestamp into the DB timestamp format
1264 : $db->timestamp( $expiry );
1265 }
1266
1272 private function decodeDbExpiry( IDatabase $db, string $dbExpiry ) {
1273 return ( $dbExpiry === $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) )
1274 ? self::TTL_INDEFINITE
1275 : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry );
1276 }
1277
1283 private function dbEncodeSerialValue( IDatabase $db, $serialValue ) {
1284 return is_int( $serialValue ) ? $serialValue : $db->encodeBlob( $serialValue );
1285 }
1286
1292 private function dbDecodeSerialValue( IDatabase $db, $blob ) {
1293 return $this->isInteger( $blob ) ? (int)$blob : $db->decodeBlob( $blob );
1294 }
1295
1303 private function addCasTokenFields( IDatabase $db, array $fields ) {
1304 $type = $db->getType();
1305
1306 if ( $type === 'mysql' ) {
1307 $fields['castoken'] = $db->buildConcat( [
1308 'SHA1(value)',
1309 $db->addQuotes( '@' ),
1310 'exptime'
1311 ] );
1312 } elseif ( $type === 'postgres' ) {
1313 $fields['castoken'] = $db->buildConcat( [
1314 'md5(value)',
1315 $db->addQuotes( '@' ),
1316 'exptime'
1317 ] );
1318 } else {
1319 if ( !in_array( 'value', $fields, true ) ) {
1320 $fields[] = 'value';
1321 }
1322 if ( !in_array( 'exptime', $fields, true ) ) {
1323 $fields[] = 'exptime';
1324 }
1325 }
1326
1327 return $fields;
1328 }
1329
1337 private function getCasTokenFromRow( IDatabase $db, stdClass $row ) {
1338 if ( isset( $row->castoken ) ) {
1339 $token = $row->castoken;
1340 } else {
1341 $token = sha1( $this->dbDecodeSerialValue( $db, $row->value ) ) . '@' . $row->exptime;
1342 $this->logger->debug( __METHOD__ . ": application computed hash for CAS token" );
1343 }
1344
1345 return $token;
1346 }
1347
1352 private function occasionallyGarbageCollect( IDatabase $db ) {
1353 if (
1354 // Random purging is enabled
1355 $this->purgePeriod &&
1356 // Only purge on one in every $this->purgePeriod writes
1357 mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
1358 // Avoid repeating the delete within a few seconds
1359 ( $this->getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC
1360 ) {
1361 $garbageCollector = function () use ( $db ) {
1363 $db,
1364 (int)$this->getCurrentTime(),
1365 $this->purgeLimit
1366 );
1367 $this->lastGarbageCollect = time();
1368 };
1369 if ( $this->asyncHandler ) {
1370 $this->lastGarbageCollect = $this->getCurrentTime(); // avoid duplicate enqueues
1371 ( $this->asyncHandler )( $garbageCollector );
1372 } else {
1373 $garbageCollector();
1374 }
1375 }
1376 }
1377
1378 public function expireAll() {
1379 $this->deleteObjectsExpiringBefore( (int)$this->getCurrentTime() );
1380 }
1381
1383 $timestamp,
1384 callable $progress = null,
1385 $limit = INF,
1386 string $tag = null
1387 ) {
1389 $silenceScope = $this->silenceTransactionProfiler();
1390
1391 if ( $tag !== null ) {
1392 // Purge one server only, to support concurrent purging in large wiki farms (T282761).
1393 $shardIndexes = [ $this->getShardServerIndexForTag( $tag ) ];
1394 } else {
1395 $shardIndexes = $this->getShardServerIndexes();
1396 shuffle( $shardIndexes );
1397 }
1398
1399 $ok = true;
1400 $numServers = count( $shardIndexes );
1401
1402 $keysDeletedCount = 0;
1403 foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
1404 try {
1405 $db = $this->getConnection( $shardIndex );
1407 $db,
1408 $timestamp,
1409 $limit,
1410 $keysDeletedCount,
1411 [ 'fn' => $progress, 'serversDone' => $numServersDone, 'serversTotal' => $numServers ]
1412 );
1413 } catch ( DBError $e ) {
1414 $this->handleDBError( $e, $shardIndex );
1415 $ok = false;
1416 }
1417 }
1418
1419 return $ok;
1420 }
1421
1431 IDatabase $db,
1432 $timestamp,
1433 $limit,
1434 &$keysDeletedCount = 0,
1435 array $progress = null
1436 ) {
1437 $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
1438 if ( $this->isMultiPrimaryModeEnabled() ) {
1439 // Eventual consistency requires the preservation of any key that was recently
1440 // modified. The key must exist on this database server long enough for the server
1441 // to receive, via replication, all writes to the key with lower timestamps. Such
1442 // writes should be no-ops since the existing key value should "win". If the network
1443 // partitions between datacenters A and B for 30 minutes, the database servers in
1444 // each datacenter will see an initial burst of writes with "old" timestamps via
1445 // replication. This might include writes with lower timestamps that the existing
1446 // key value. Therefore, clock skew and replication delay are both factors.
1447 $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC;
1448 $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe );
1449 }
1450 $tableIndexes = range( 0, $this->numTableShards - 1 );
1451 shuffle( $tableIndexes );
1452
1453 $batchSize = min( $this->writeBatchSize, $limit );
1454
1455 foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
1456 // The oldest expiry of a row we have deleted on this shard
1457 // (the first row that we deleted)
1458 $minExpUnix = null;
1459 // The most recent expiry time so far, from a row we have deleted on this shard
1460 $maxExp = null;
1461 // Size of the time range we'll delete, in seconds (for progress estimate)
1462 $totalSeconds = null;
1463
1464 do {
1465 $res = $db->select(
1466 $this->getTableNameByShard( $tableIndex ),
1467 [ 'keyname', 'exptime' ],
1468 array_merge(
1469 [ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ) ],
1470 $maxExp ? [ 'exptime >= ' . $db->addQuotes( $maxExp ) ] : []
1471 ),
1472 __METHOD__,
1473 [ 'LIMIT' => $batchSize, 'ORDER BY' => 'exptime ASC' ]
1474 );
1475
1476 if ( $res->numRows() ) {
1477 $row = $res->current();
1478 if ( $minExpUnix === null ) {
1479 $minExpUnix = ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
1480 $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 );
1481 }
1482
1483 $keys = [];
1484 foreach ( $res as $row ) {
1485 $keys[] = $row->keyname;
1486 $maxExp = $row->exptime;
1487 }
1488
1489 $db->delete(
1490 $this->getTableNameByShard( $tableIndex ),
1491 [
1492 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ),
1493 'keyname' => $keys
1494 ],
1495 __METHOD__
1496 );
1497 $keysDeletedCount += $db->affectedRows();
1498 }
1499
1500 if ( $progress && is_callable( $progress['fn'] ) ) {
1501 if ( $totalSeconds ) {
1502 $maxExpUnix = ConvertibleTimestamp::convert( TS_UNIX, $maxExp );
1503 $remainingSeconds = $cutoffUnix - $maxExpUnix;
1504 $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 );
1505 // For example, if we've done 1.5 table shard, and are thus half-way on the
1506 // 2nd of perhaps 5 tables on this server, then this might be:
1507 // `( 1 + ( 43200 / 86400 ) ) / 5 = 0.3`, or 30% done, of tables on this server.
1508 $tablesDoneRatio =
1509 ( $numShardsDone + ( $processedSeconds / $totalSeconds ) ) / $this->numTableShards;
1510 } else {
1511 $tablesDoneRatio = 1;
1512 }
1513
1514 // For example, if we're 30% done on the last of 10 servers, then this might be:
1515 // `( 9 / 10 ) + ( 0.3 / 10 ) = 0.93`, or 93% done, overall.
1516 $overallRatio = ( $progress['serversDone'] / $progress['serversTotal'] ) +
1517 ( $tablesDoneRatio / $progress['serversTotal'] );
1518 ( $progress['fn'] )( $overallRatio * 100 );
1519 }
1520 } while ( $res->numRows() && $keysDeletedCount < $limit );
1521 }
1522 }
1523
1529 public function deleteAll() {
1531 $silenceScope = $this->silenceTransactionProfiler();
1532 foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1533 $db = null; // in case of connection failure
1534 try {
1535 $db = $this->getConnection( $shardIndex );
1536 for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1537 $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ );
1538 }
1539 } catch ( DBError $e ) {
1540 $this->handleDBError( $e, $shardIndex );
1541 return false;
1542 }
1543 }
1544 return true;
1545 }
1546
1547 public function doLock( $key, $timeout = 6, $exptime = 6 ) {
1549 $silenceScope = $this->silenceTransactionProfiler();
1550
1551 $lockTsUnix = null;
1552
1553 list( $shardIndex ) = $this->getKeyLocation( $key );
1554 try {
1555 $db = $this->getConnection( $shardIndex );
1556 $lockTsUnix = $db->lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP );
1557 } catch ( DBError $e ) {
1558 $this->handleDBError( $e, $shardIndex );
1559 $this->logger->warning(
1560 __METHOD__ . ' failed due to I/O error for {key}.',
1561 [ 'key' => $key ]
1562 );
1563 }
1564
1565 return $lockTsUnix;
1566 }
1567
1568 public function doUnlock( $key ) {
1570 $silenceScope = $this->silenceTransactionProfiler();
1571
1572 list( $shardIndex ) = $this->getKeyLocation( $key );
1573
1574 try {
1575 $db = $this->getConnection( $shardIndex );
1576 $released = $db->unlock( $key, __METHOD__ );
1577 } catch ( DBError $e ) {
1578 $this->handleDBError( $e, $shardIndex );
1579 $released = false;
1580 }
1581
1582 return $released;
1583 }
1584
1585 public function makeKeyInternal( $keyspace, $components ) {
1586 // SQL schema for 'objectcache' specifies keys as varchar(255). From that,
1587 // subtract the number of characters we need for the keyspace and for
1588 // the separator character needed for each argument. To handle some
1589 // custom prefixes used by thing like WANObjectCache, limit to 205.
1590 $keyspace = strtr( $keyspace, ' ', '_' );
1591 $charsLeft = 205 - strlen( $keyspace ) - count( $components );
1592 foreach ( $components as &$component ) {
1593 $component = strtr( $component, [
1594 ' ' => '_', // Avoid unnecessary misses from pre-1.35 code
1595 ':' => '%3A',
1596 ] );
1597
1598 // 33 = 32 characters for the MD5 + 1 for the '#' prefix.
1599 if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) {
1600 $component = '#' . md5( $component );
1601 }
1602 $charsLeft -= strlen( $component );
1603 }
1604
1605 if ( $charsLeft < 0 ) {
1606 return $keyspace . ':BagOStuff-long-key:##' . md5( implode( ':', $components ) );
1607 }
1608 return $keyspace . ':' . implode( ':', $components );
1609 }
1610
1611 protected function serialize( $value ) {
1612 if ( is_int( $value ) ) {
1613 return $value;
1614 }
1615
1616 $serial = serialize( $value );
1617 if ( function_exists( 'gzdeflate' ) ) {
1618 // On typical message and page data, this can provide a 3X storage savings
1619 $serial = gzdeflate( $serial );
1620 }
1621
1622 return $serial;
1623 }
1624
1625 protected function unserialize( $value ) {
1626 if ( $value === self::TOMB_SERIAL ) {
1627 return false; // tombstone
1628 }
1629
1630 if ( $this->isInteger( $value ) ) {
1631 return (int)$value;
1632 }
1633
1634 if ( function_exists( 'gzinflate' ) ) {
1635 AtEase::suppressWarnings();
1636 $decompressed = gzinflate( $value );
1637 AtEase::restoreWarnings();
1638
1639 if ( $decompressed !== false ) {
1640 $value = $decompressed;
1641 }
1642 }
1643
1644 return unserialize( $value );
1645 }
1646
1652 private function getConnectionViaLoadBalancer( $shardIndex ) {
1653 $lb = ( $shardIndex === self::SHARD_LOCAL ) ? $this->localKeyLb : $this->globalKeyLb;
1654 if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) {
1655 // Use the main connection to avoid transaction deadlocks
1657 } else {
1658 // If the RDBMs has row/table/page level locking, then use separate auto-commit
1659 // connection to avoid needless contention and deadlocks.
1660 $conn = $lb->getMaintenanceConnectionRef(
1661 $this->replicaOnly ? DB_REPLICA : DB_PRIMARY, [],
1662 false,
1663 $lb::CONN_TRX_AUTOCOMMIT
1664 );
1665 }
1666
1667 return $conn;
1668 }
1669
1676 private function getConnectionFromServerInfo( $shardIndex, array $server ) {
1677 if ( !isset( $this->conns[$shardIndex] ) ) {
1679 $conn = Database::factory(
1680 $server['type'],
1681 array_merge(
1682 $server,
1683 [
1684 // Make sure the handle uses autocommit mode
1685 'flags' => ( $server['flags'] ?? 0 ) & ~IDatabase::DBO_TRX,
1686 'connLogger' => $this->logger,
1687 'queryLogger' => $this->logger
1688 ]
1689 )
1690 );
1691 // Automatically create the objectcache table for sqlite as needed
1692 if ( $conn->getType() === 'sqlite' ) {
1693 if ( !$conn->tableExists( 'objectcache', __METHOD__ ) ) {
1694 $this->initSqliteDatabase( $conn );
1695 }
1696 }
1697 $this->conns[$shardIndex] = $conn;
1698 }
1699
1700 return $this->conns[$shardIndex];
1701 }
1702
1709 private function handleDBError( DBError $exception, $shardIndex ) {
1710 if ( $exception instanceof DBConnectionError ) {
1711 $this->markServerDown( $exception, $shardIndex );
1712 }
1713
1714 $this->setAndLogDBError( $exception );
1715 }
1716
1720 private function setAndLogDBError( DBError $exception ) {
1721 $this->logger->error( "DBError: {$exception->getMessage()}" );
1722 if ( $exception instanceof DBConnectionError ) {
1723 $this->setLastError( self::ERR_UNREACHABLE );
1724 $this->logger->warning( __METHOD__ . ": ignoring connection error" );
1725 } else {
1726 $this->setLastError( self::ERR_UNEXPECTED );
1727 $this->logger->warning( __METHOD__ . ": ignoring query error" );
1728 }
1729 }
1730
1737 private function markServerDown( DBError $exception, $shardIndex ) {
1738 unset( $this->conns[$shardIndex] ); // bug T103435
1739
1740 $now = $this->getCurrentTime();
1741 if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
1742 if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
1743 unset( $this->connFailureTimes[$shardIndex] );
1744 unset( $this->connFailureErrors[$shardIndex] );
1745 } else {
1746 $this->logger->debug( __METHOD__ . ": Server #$shardIndex already down" );
1747 return;
1748 }
1749 }
1750 $this->logger->info( __METHOD__ . ": Server #$shardIndex down until " . ( $now + 60 ) );
1751 $this->connFailureTimes[$shardIndex] = $now;
1752 $this->connFailureErrors[$shardIndex] = $exception;
1753 }
1754
1760 if ( $db->tableExists( 'objectcache', __METHOD__ ) ) {
1761 return;
1762 }
1763 // Use one table for SQLite; sharding does not seem to have much benefit
1764 $db->query( "PRAGMA journal_mode=WAL", __METHOD__ ); // this is permanent
1765 $db->startAtomic( __METHOD__ ); // atomic DDL
1766 try {
1767 $encTable = $db->tableName( 'objectcache' );
1768 $encExptimeIndex = $db->addIdentifierQuotes( $db->tablePrefix() . 'exptime' );
1769 $db->query(
1770 "CREATE TABLE $encTable (\n" .
1771 " keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
1772 " value BLOB,\n" .
1773 " exptime BLOB NOT NULL\n" .
1774 ")",
1775 __METHOD__
1776 );
1777 $db->query( "CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ );
1778 $db->endAtomic( __METHOD__ );
1779 } catch ( DBError $e ) {
1780 $db->rollback( __METHOD__ );
1781 throw $e;
1782 }
1783 }
1784
1788 public function createTables() {
1789 foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1790 $db = $this->getConnection( $shardIndex );
1791 if ( in_array( $db->getType(), [ 'mysql', 'postgres' ], true ) ) {
1792 for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1793 $encBaseTable = $db->tableName( 'objectcache' );
1794 $encShardTable = $db->tableName( $this->getTableNameByShard( $i ) );
1795 $db->query( "CREATE TABLE $encShardTable LIKE $encBaseTable", __METHOD__ );
1796 }
1797 }
1798 }
1799 }
1800
1804 private function getShardServerIndexes() {
1805 if ( $this->serverTags ) {
1806 // Striped array of database servers
1807 $shardIndexes = array_keys( $this->serverTags );
1808 } else {
1809 // LoadBalancer based configuration
1810 $shardIndexes = [];
1811 if ( $this->localKeyLb ) {
1812 $shardIndexes[] = self::SHARD_LOCAL;
1813 }
1814 if ( $this->globalKeyLb ) {
1815 $shardIndexes[] = self::SHARD_GLOBAL;
1816 }
1817 }
1818
1819 return $shardIndexes;
1820 }
1821
1827 private function getShardServerIndexForTag( string $tag ) {
1828 if ( !$this->serverTags ) {
1829 throw new InvalidArgumentException( "Given a tag but no tags are configured" );
1830 }
1831 foreach ( $this->serverTags as $serverShardIndex => $serverTag ) {
1832 if ( $tag === $serverTag ) {
1833 return $serverShardIndex;
1834 }
1835 }
1836 throw new InvalidArgumentException( "Unknown server tag: $tag" );
1837 }
1838
1842 private function isMultiPrimaryModeEnabled() {
1843 return ( $this->multiPrimaryModeType !== null );
1844 }
1845
1852 private function waitForReplication( $shardIndex ) {
1853 if ( is_int( $shardIndex ) ) {
1854 return true; // striped only, no LoadBalancer
1855 }
1856
1857 $lb = ( $shardIndex === self::SHARD_LOCAL ) ? $this->localKeyLb : $this->globalKeyLb;
1858 if ( !$lb->hasStreamingReplicaServers() ) {
1859 return true;
1860 }
1861
1862 try {
1863 // Wait for any replica DBs to catch up
1864 $masterPos = $lb->getPrimaryPos();
1865 if ( !$masterPos ) {
1866 return true; // not applicable
1867 }
1868
1869 $loop = new WaitConditionLoop(
1870 static function () use ( $lb, $masterPos ) {
1871 return $lb->waitForAll( $masterPos, 1 );
1872 },
1875 );
1876
1877 return ( $loop->invoke() === $loop::CONDITION_REACHED );
1878 } catch ( DBError $e ) {
1879 $this->setAndLogDBError( $e );
1880
1881 return false;
1882 }
1883 }
1884
1890 private function silenceTransactionProfiler() {
1891 if ( $this->serverInfos ) {
1892 return null; // no TransactionProfiler injected anyway
1893 }
1894 return Profiler::instance()->getTransactionProfiler()->silenceForScope();
1895 }
1896}
serialize()
string $keyspace
Default keyspace; used by makeKey()
fieldHasFlags( $field, $flags)
callable null $asyncHandler
Definition BagOStuff.php:92
Storage medium specific cache for storing items (e.g.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
getSerialized( $value, $key)
Get the serialized form a value, using any applicable prepared value.
unlock( $key)
Release an advisory lock on a key string.
updateOpStats(string $op, array $keyInfo)
isInteger( $value)
Check if a value is an integer.
lock( $key, $timeout=6, $exptime=6, $rclass='')
setLastError( $err)
Set the "last error" registry.
RDBMS-based caching module.
deleteObjectsExpiringBefore( $timestamp, callable $progress=null, $limit=INF, string $tag=null)
Delete all objects expiring before a certain date.
decodeDbExpiry(IDatabase $db, string $dbExpiry)
modifyTableSpecificBlobsForChangeTTL(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Update the TTL for keys belonging to a partition table on the the given server.
doDelete( $key, $flags=0)
Delete an item.
buildUpsertSetForOverwrite(IDatabase $db, $serialValue, int $expiry, string $mt)
SET array for handling key overwrites when a live or stale key exists.
const INF_TIMESTAMP_PLACEHOLDER
Placeholder timestamp to use for TTL_INDEFINITE that can be stored in all RDBMs types.
string[] $serverTags
(server index => tag/host name)
doSetMulti(array $data, $exptime=0, $flags=0)
makeNewKeyExpiry( $exptime, int $nowTsUnix)
buildUpsertRow(IDatabase $db, $key, $serialValue, int $expiry, string $mt)
INSERT array for handling key writes/overwrites when no live nor stale key exists.
createTables()
Create the shard tables on all databases (e.g.
encodeDbExpiry(IDatabase $db, int $expiry)
addCasTokenFields(IDatabase $db, array $fields)
Either append a 'castoken' field or append the fields needed to compute the CAS token.
serialize( $value)
deleteServerObjectsExpiringBefore(IDatabase $db, $timestamp, $limit, &$keysDeletedCount=0, array $progress=null)
modifyTableSpecificBlobsForIncrInit(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Either increment a counter key, if it exists, or initialize it, otherwise.
modifyBlobs(callable $tableWriteCallback, float $mtime, array $argsByKey, int $flags, &$resByKey=[])
int $purgePeriod
Average number of writes required to trigger garbage collection.
getConnectionViaLoadBalancer( $shardIndex)
doChangeTTL( $key, $exptime, $flags)
incr( $key, $value=1, $flags=0)
Increase stored value of $key by $value while preserving its TTL.
unserialize( $value)
waitForReplication( $shardIndex)
Wait for replica DBs to catch up to the primary DB.
array[] $serverInfos
(server index => server config)
fetchBlobs(array $keys, bool $getCasToken=false)
modifyTableSpecificBlobsForCas(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Insert key/value pairs belonging to a partition table on the the given server.
deleteAll()
Delete content of shard tables in every server.
doDeleteMulti(array $keys, $flags=0)
getConnectionFromServerInfo( $shardIndex, array $server)
string null $multiPrimaryModeType
Multi-primary mode DB type ("mysql",...); null if not enabled.
int $numTableShards
Number of table shards to use on each server.
int $purgeLimit
Max expired rows to purge during randomized garbage collection.
__construct( $params)
Create a new backend instance from configuration.
int $lastGarbageCollect
UNIX timestamp.
markServerDown(DBError $exception, $shardIndex)
Mark a server down due to a DBConnectionError exception.
doIncr( $key, $value=1, $flags=0)
modifyTableSpecificBlobsForAdd(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Insert key/value pairs belonging to a partition table on the the given server.
const SAFE_PURGE_DELAY_SEC
A number of seconds well above any expected clock skew and replication lag.
doGet( $key, $flags=0, &$casToken=null)
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
const TOMB_SERIAL
Distinct string for tombstones stored in the "serialized" value column.
bool $replicaOnly
Whether to use replicas instead of primaries (if using LoadBalancer)
handleDBError(DBError $exception, $shardIndex)
Handle a DBError which occurred during a read operation.
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
modifyTableSpecificBlobsForDelete(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Purge/tombstone key/value pairs belonging to a partition table on the the given server.
dbEncodeSerialValue(IDatabase $db, $serialValue)
decr( $key, $value=1, $flags=0)
Decrease stored value of $key by $value while preserving its TTL.
ILoadBalancer null $localKeyLb
getCasTokenFromRow(IDatabase $db, stdClass $row)
Get a CAS token from a SELECT result row.
silenceTransactionProfiler()
Silence the transaction profiler until the return value falls out of scope.
makeKeyInternal( $keyspace, $components)
Make a cache key for the given keyspace and components.
getConnection( $shardIndex)
Get a connection to the specified database.
doLock( $key, $timeout=6, $exptime=6)
ILoadBalancer null $globalKeyLb
Exception[] $connFailureErrors
Map of (shard index => Exception)
setAndLogDBError(DBError $exception)
const TOMB_EXPTIME
Relative seconds-to-live to use for tombstones.
IMaintainableDatabase[] $conns
Map of (shard index => DB handle)
newLockingWriteSectionModificationTimestamp( $key, &$scope)
Get a scoped lock and modification timestamp for a critical section of reads/writes.
getShardServerIndexForTag(string $tag)
string $tableName
modifyTableSpecificBlobsForSet(IDatabase $db, string $ptable, float $mtime, array $argsByKey, array &$resByKey)
Set key/value pairs belonging to a partition table on the the given server.
getKeyLocation( $key)
Get the server index and table name for a given key.
getTableNameByShard( $index)
Get the table name for a given shard index.
const SAFE_CLOCK_BOUND_SEC
A number of seconds well above any expected clock skew.
incrWithInit( $key, $exptime, $value=1, $init=null, $flags=0)
Increase the value of the given key (no TTL change) if it exists or create it otherwise.
const GC_DELAY_SEC
How many seconds must pass before triggering a garbage collection.
initSqliteDatabase(IMaintainableDatabase $db)
dbDecodeSerialValue(IDatabase $db, $blob)
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
doCas( $casToken, $key, $value, $exptime=0, $flags=0)
Check and set an item.
occasionallyGarbageCollect(IDatabase $db)
buildExistenceConditions(IDatabase $db, $keys, string $time)
WHERE conditions that check for existence and liveness of keys.
buildIncrUpsertSet(IDatabase $db, int $step, int $init, int $expiry, string $mt, int $mtUnixTs)
SET array for handling key overwrites when a live or stale key exists.
doChangeTTLMulti(array $keys, $exptime, $flags=0)
float[] $connFailureTimes
Map of (shard index => UNIX timestamps)
makeTimestampedModificationToken( $mtime, IDatabase $db)
Make a modtoken column value with the original time and source database server of a write.
Database error base class @newable.
Definition DBError.php:32
Relational database abstraction object.
Definition Database.php:52
const ATTR_EMULATION
Emulation/fallback mode; see QOS_EMULATION_*; higher is better.
const QOS_DURABILITY_RDBMS
Data is saved to disk and writes usually block on fsync(), like a standard RDBMS.
const ATTR_DURABILITY
Durability of writes; see QOS_DURABILITY_* (higher means stronger)
const QOS_EMULATION_SQL
Fallback disk-based SQL store.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
rollback( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Rollback a transaction previously started using begin()
selectRow( $table, $vars, $conds, $fname=__METHOD__, $options=[], $join_conds=[])
Wrapper to IDatabase::select() that only fetches one row (via LIMIT)
addIdentifierQuotes( $s)
Escape a SQL identifier (e.g.
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
select( $table, $vars, $conds='', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
tableExists( $table, $fname=__METHOD__)
Query whether a given table exists.
getTopologyBasedServerId()
Get a non-recycled ID that uniquely identifies this server within the replication topology.
affectedRows()
Get the number of rows affected by the last write query.
buildConcat( $stringList)
Build a concatenation list to feed into a SQL query.
delete( $table, $conds, $fname=__METHOD__)
Delete all rows in a table that match a condition.
update( $table, $set, $conds, $fname=__METHOD__, $options=[])
Update all rows in a table that match a given condition.
addQuotes( $s)
Escape and quote a raw value string for use in a SQL query.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
upsert( $table, array $rows, $uniqueKeys, array $set, $fname=__METHOD__)
Upsert the given row(s) into a table.
getType()
Get the RDBMS type of the server (e.g.
conditional( $cond, $caseTrueExpression, $caseFalseExpression)
Returns an SQL expression for a simple conditional.
tablePrefix( $prefix=null)
Get/set the table prefix.
decodeBlob( $b)
Some DBMSs return a special placeholder object representing blob fields in result objects.
query( $sql, $fname=__METHOD__, $flags=0)
Run an SQL query and return the result.
encodeBlob( $b)
Some DBMSs have a special format for inserting into blob fields, they don't allow simple quoted strin...
replace( $table, $uniqueKeys, $rows, $fname=__METHOD__)
Insert row(s) into a table, deleting all conflicting rows beforehand.
selectFieldValues( $table, $var, $cond='', $fname=__METHOD__, $options=[], $join_conds=[])
A SELECT wrapper which returns a list of single field values from result rows.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
getServerName()
Get the readable name for the server.
Database cluster connection, tracking, load balancing, and transaction manager interface.
getPrimaryPos()
Get the current primary replication position.
getMaintenanceConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a live database handle, suitable for migrations and schema changes, for a server index.
Advanced database interface for IDatabase handles that include maintenance methods.
tableName( $name, $format='quoted')
Format a table name ready for use in constructing an SQL query.
if( $line===false) $args
Definition mcc.php:124
This program is free software; you can redistribute it and/or modify it under the terms of the GNU Ge...
const DB_REPLICA
Definition defines.php:25
const DB_PRIMARY
Definition defines.php:27