MediaWiki REL1_40
SqlBagOStuff.php
Go to the documentation of this file.
1<?php
25use Wikimedia\AtEase\AtEase;
35use Wikimedia\ScopedCallback;
36use Wikimedia\Timestamp\ConvertibleTimestamp;
37
55 protected $loadBalancer;
57 protected $dbDomain;
59 protected $useLB = false;
60
62 protected $serverInfos = [];
64 protected $serverTags = [];
66 protected $lastGarbageCollect = 0;
68 protected $purgePeriod = 10;
70 protected $purgeLimit = 100;
72 protected $numTableShards = 1;
74 protected $writeBatchSize = 100;
76 protected $tableName = 'objectcache';
78 protected $replicaOnly;
81
83 protected $conns;
85 protected $connFailureTimes = [];
87 protected $connFailureErrors = [];
88
90 private $hasZlib;
91
93 private const SAFE_CLOCK_BOUND_SEC = 15;
95 private const SAFE_PURGE_DELAY_SEC = 3600;
97 private const TOMB_SERIAL = '';
99 private const TOMB_EXPTIME = -self::SAFE_CLOCK_BOUND_SEC;
101 private const GC_DELAY_SEC = 1;
102
103 private const BLOB_VALUE = 0;
104 private const BLOB_EXPIRY = 1;
105 private const BLOB_CASTOKEN = 2;
106
113 private const INF_TIMESTAMP_PLACEHOLDER = '99991231235959';
114
144 public function __construct( $params ) {
145 parent::__construct( $params );
146
147 if ( isset( $params['servers'] ) || isset( $params['server'] ) ) {
148 // Configuration uses a direct list of servers.
149 // Object data is horizontally partitioned via key hash.
150 $index = 0;
151 foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) {
152 $this->serverInfos[$index] = $info;
153 // Allow integer-indexes arrays for b/c
154 $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index";
155 ++$index;
156 }
157 } elseif ( isset( $params['loadBalancerCallback'] ) ) {
158 $this->loadBalancerCallback = $params['loadBalancerCallback'];
159 if ( !isset( $params['dbDomain'] ) ) {
160 throw new InvalidArgumentException(
161 __METHOD__ . ": 'dbDomain' is required if 'loadBalancerCallback' is given"
162 );
163 }
164 $this->dbDomain = $params['dbDomain'];
165 $this->useLB = true;
166 } else {
167 throw new InvalidArgumentException(
168 __METHOD__ . " requires 'server', 'servers', or 'loadBalancerCallback'"
169 );
170 }
171
172 $this->purgePeriod = intval( $params['purgePeriod'] ?? $this->purgePeriod );
173 $this->purgeLimit = intval( $params['purgeLimit'] ?? $this->purgeLimit );
174 $this->tableName = $params['tableName'] ?? $this->tableName;
175 $this->numTableShards = intval( $params['shards'] ?? $this->numTableShards );
176 $this->writeBatchSize = intval( $params['writeBatchSize'] ?? $this->writeBatchSize );
177 $this->replicaOnly = $params['replicaOnly'] ?? false;
178 $this->multiPrimaryMode = $params['multiPrimaryMode'] ?? false;
179
180 $this->attrMap[self::ATTR_DURABILITY] = self::QOS_DURABILITY_RDBMS;
181 $this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL;
182
183 $this->hasZlib = extension_loaded( 'zlib' );
184 }
185
186 protected function doGet( $key, $flags = 0, &$casToken = null ) {
187 $getToken = ( $casToken === self::PASS_BY_REF );
188 $casToken = null;
189
190 $data = $this->fetchBlobs( [ $key ], $getToken )[$key];
191 if ( $data ) {
192 $result = $this->unserialize( $data[self::BLOB_VALUE] );
193 if ( $getToken && $result !== false ) {
194 $casToken = $data[self::BLOB_CASTOKEN];
195 }
196 $valueSize = strlen( $data[self::BLOB_VALUE] );
197 } else {
198 $result = false;
199 $valueSize = false;
200 }
201
202 $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
203
204 return $result;
205 }
206
207 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
208 $mtime = $this->getCurrentTime();
209
210 return $this->modifyBlobs(
211 [ $this, 'modifyTableSpecificBlobsForSet' ],
212 $mtime,
213 [ $key => [ $value, $exptime ] ]
214 );
215 }
216
217 protected function doDelete( $key, $flags = 0 ) {
218 $mtime = $this->getCurrentTime();
219
220 return $this->modifyBlobs(
221 [ $this, 'modifyTableSpecificBlobsForDelete' ],
222 $mtime,
223 [ $key => [] ]
224 );
225 }
226
227 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
228 $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
229 if ( $mtime === null ) {
230 // Timeout or I/O error during lock acquisition
231 return false;
232 }
233
234 return $this->modifyBlobs(
235 [ $this, 'modifyTableSpecificBlobsForAdd' ],
236 $mtime,
237 [ $key => [ $value, $exptime ] ]
238 );
239 }
240
241 protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
242 $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
243 if ( $mtime === null ) {
244 // Timeout or I/O error during lock acquisition
245 return false;
246 }
247
248 return $this->modifyBlobs(
249 [ $this, 'modifyTableSpecificBlobsForCas' ],
250 $mtime,
251 [ $key => [ $value, $exptime, $casToken ] ]
252 );
253 }
254
255 protected function doChangeTTL( $key, $exptime, $flags ) {
256 $mtime = $this->getCurrentTime();
257
258 return $this->modifyBlobs(
259 [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
260 $mtime,
261 [ $key => [ $exptime ] ]
262 );
263 }
264
265 protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
266 $mtime = $this->getCurrentTime();
267
268 if ( $flags & self::WRITE_BACKGROUND ) {
269 $callback = [ $this, 'modifyTableSpecificBlobsForIncrInitAsync' ];
270 } else {
271 $callback = [ $this, 'modifyTableSpecificBlobsForIncrInit' ];
272 }
273
274 $result = $this->modifyBlobs(
275 $callback,
276 $mtime,
277 [ $key => [ $step, $init, $exptime ] ],
278 $resByKey
279 ) ? $resByKey[$key] : false;
280
281 return $result;
282 }
283
284 protected function doGetMulti( array $keys, $flags = 0 ) {
285 $result = [];
286 $valueSizeByKey = [];
287
288 $dataByKey = $this->fetchBlobs( $keys );
289 foreach ( $keys as $key ) {
290 $data = $dataByKey[$key];
291 if ( $data ) {
292 $serialValue = $data[self::BLOB_VALUE];
293 $value = $this->unserialize( $serialValue );
294 if ( $value !== false ) {
295 $result[$key] = $value;
296 }
297 $valueSize = strlen( $serialValue );
298 } else {
299 $valueSize = false;
300 }
301 $valueSizeByKey[$key] = [ 0, $valueSize ];
302 }
303
304 $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey );
305
306 return $result;
307 }
308
309 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
310 $mtime = $this->getCurrentTime();
311
312 return $this->modifyBlobs(
313 [ $this, 'modifyTableSpecificBlobsForSet' ],
314 $mtime,
315 array_map(
316 static function ( $value ) use ( $exptime ) {
317 return [ $value, $exptime ];
318 },
319 $data
320 )
321 );
322 }
323
324 protected function doDeleteMulti( array $keys, $flags = 0 ) {
325 $mtime = $this->getCurrentTime();
326
327 return $this->modifyBlobs(
328 [ $this, 'modifyTableSpecificBlobsForDelete' ],
329 $mtime,
330 array_fill_keys( $keys, [] )
331 );
332 }
333
334 public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
335 $mtime = $this->getCurrentTime();
336
337 return $this->modifyBlobs(
338 [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
339 $mtime,
340 array_fill_keys( $keys, [ $exptime ] )
341 );
342 }
343
352 private function getConnection( $shardIndex ) {
353 if ( $this->useLB ) {
354 return $this->getConnectionViaLoadBalancer();
355 }
356
357 // Don't keep timing out trying to connect if the server is down
358 if (
359 isset( $this->connFailureErrors[$shardIndex] ) &&
360 ( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
361 ) {
362 throw $this->connFailureErrors[$shardIndex];
363 }
364
365 if ( isset( $this->serverInfos[$shardIndex] ) ) {
366 $server = $this->serverInfos[$shardIndex];
367 $conn = $this->getConnectionFromServerInfo( $shardIndex, $server );
368 } else {
369 throw new UnexpectedValueException( "Invalid server index #$shardIndex" );
370 }
371
372 return $conn;
373 }
374
380 private function getKeyLocation( $key ) {
381 if ( $this->useLB ) {
382 // LoadBalancer based configuration
383 $shardIndex = 0;
384 } else {
385 // Striped array of database servers
386 if ( count( $this->serverTags ) == 1 ) {
387 $shardIndex = 0; // short-circuit
388 } else {
389 $sortedServers = $this->serverTags;
390 ArrayUtils::consistentHashSort( $sortedServers, $key );
391 $shardIndex = array_key_first( $sortedServers );
392 }
393 }
394
395 if ( $this->numTableShards > 1 ) {
396 $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
397 $tableIndex = $hash % $this->numTableShards;
398 } else {
399 $tableIndex = null;
400 }
401
402 return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ];
403 }
404
410 private function getTableNameByShard( $index ) {
411 if ( $index !== null && $this->numTableShards > 1 ) {
412 $decimals = strlen( (string)( $this->numTableShards - 1 ) );
413
414 return $this->tableName . sprintf( "%0{$decimals}d", $index );
415 }
416
417 return $this->tableName;
418 }
419
425 private function fetchBlobs( array $keys, bool $getCasToken = false ) {
427 $silenceScope = $this->silenceTransactionProfiler();
428
429 // Initialize order-preserved per-key results; set values for live keys below
430 $dataByKey = array_fill_keys( $keys, null );
431
432 $readTime = (int)$this->getCurrentTime();
433 $keysByTableByShard = [];
434 foreach ( $keys as $key ) {
435 [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key );
436 $keysByTableByShard[$shardIndex][$partitionTable][] = $key;
437 }
438
439 foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) {
440 try {
441 $db = $this->getConnection( $shardIndex );
442 foreach ( $serverKeys as $partitionTable => $tableKeys ) {
444 ->select(
445 $getCasToken
446 ? $this->addCasTokenFields( $db, [ 'keyname', 'value', 'exptime' ] )
447 : [ 'keyname', 'value', 'exptime' ] )
448 ->from( $partitionTable )
449 ->where( $this->buildExistenceConditions( $db, $tableKeys, $readTime ) )
450 ->caller( __METHOD__ )
451 ->fetchResultSet();
452 foreach ( $res as $row ) {
453 $row->shardIndex = $shardIndex;
454 $row->tableName = $partitionTable;
455 $dataByKey[$row->keyname] = $row;
456 }
457 }
458 } catch ( DBError $e ) {
459 $this->handleDBError( $e, $shardIndex );
460 }
461 }
462
463 foreach ( $keys as $key ) {
464 $row = $dataByKey[$key] ?? null;
465 if ( !$row ) {
466 continue;
467 }
468
469 $this->debug( __METHOD__ . ": retrieved $key; expiry time is {$row->exptime}" );
470 try {
471 $db = $this->getConnection( $row->shardIndex );
472 $dataByKey[$key] = [
473 self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ),
474 self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ),
475 self::BLOB_CASTOKEN => $getCasToken
476 ? $this->getCasTokenFromRow( $db, $row )
477 : null
478 ];
479 } catch ( DBQueryError $e ) {
480 $this->handleDBError( $e, $row->shardIndex );
481 }
482 }
483
484 return $dataByKey;
485 }
486
500 private function modifyBlobs(
501 callable $tableWriteCallback,
502 float $mtime,
503 array $argsByKey,
504 &$resByKey = []
505 ) {
506 // Initialize order-preserved per-key results; callbacks mark successful results
507 $resByKey = array_fill_keys( array_keys( $argsByKey ), false );
508
510 $silenceScope = $this->silenceTransactionProfiler();
511
512 $argsByKeyByTableByShard = [];
513 foreach ( $argsByKey as $key => $args ) {
514 [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key );
515 $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args;
516 }
517
518 $shardIndexesAffected = [];
519 foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) {
520 foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) {
521 try {
522 $db = $this->getConnection( $shardIndex );
523 $shardIndexesAffected[] = $shardIndex;
524 $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey );
525 } catch ( DBError $e ) {
526 $this->handleDBError( $e, $shardIndex );
527 continue;
528 }
529 }
530 }
531
532 $success = !in_array( false, $resByKey, true );
533
534 foreach ( $shardIndexesAffected as $shardIndex ) {
535 try {
536 $db = $this->getConnection( $shardIndex );
537 $this->occasionallyGarbageCollect( $db );
538 } catch ( DBError $e ) {
539 $this->handleDBError( $e, $shardIndex );
540 }
541 }
542
543 return $success;
544 }
545
561 private function modifyTableSpecificBlobsForSet(
562 IDatabase $db,
563 string $ptable,
564 float $mtime,
565 array $argsByKey,
566 array &$resByKey
567 ) {
568 $valueSizesByKey = [];
569
570 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
571
572 $rows = [];
573 foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
574 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
575 $serialValue = $this->getSerialized( $value, $key );
576 $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
577
578 $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
579 }
580
581 if ( $this->multiPrimaryMode ) {
582 $db->upsert(
583 $ptable,
584 $rows,
585 [ [ 'keyname' ] ],
586 $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
587 __METHOD__
588 );
589 } else {
590 // T288998: use REPLACE, if possible, to avoid cluttering the binlogs
591 $db->replace( $ptable, 'keyname', $rows, __METHOD__ );
592 }
593
594 foreach ( $argsByKey as $key => $unused ) {
595 $resByKey[$key] = true;
596 }
597
598 $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
599 }
600
617 private function modifyTableSpecificBlobsForDelete(
618 IDatabase $db,
619 string $ptable,
620 float $mtime,
621 array $argsByKey,
622 array &$resByKey
623 ) {
624 if ( $this->multiPrimaryMode ) {
625 // Tombstone keys in order to respect eventual consistency
626 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
627 $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (int)$mtime );
628 $rows = [];
629 foreach ( $argsByKey as $key => $arg ) {
630 $rows[] = $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt );
631 }
632 $db->upsert(
633 $ptable,
634 $rows,
635 [ [ 'keyname' ] ],
636 $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
637 __METHOD__
638 );
639 } else {
640 // Just purge the keys since there is only one primary (e.g. "source of truth")
641 $db->delete( $ptable, [ 'keyname' => array_keys( $argsByKey ) ], __METHOD__ );
642 }
643
644 foreach ( $argsByKey as $key => $arg ) {
645 $resByKey[$key] = true;
646 }
647
648 $this->updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) );
649 }
650
671 private function modifyTableSpecificBlobsForAdd(
672 IDatabase $db,
673 string $ptable,
674 float $mtime,
675 array $argsByKey,
676 array &$resByKey
677 ) {
678 $valueSizesByKey = [];
679
680 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
681
682 // This check must happen outside the write query to respect eventual consistency
683 $existingKeys = $db->newSelectQueryBuilder()
684 ->select( 'keyname' )
685 ->from( $ptable )
686 ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
687 ->caller( __METHOD__ )
688 ->fetchFieldValues();
689 $existingByKey = array_fill_keys( $existingKeys, true );
690
691 $rows = [];
692 foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
693 if ( isset( $existingByKey[$key] ) ) {
694 $this->logger->debug( __METHOD__ . ": $key already exists" );
695 continue;
696 }
697
698 $serialValue = $this->getSerialized( $value, $key );
699 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
700 $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
701
702 $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
703 }
704
705 $db->upsert(
706 $ptable,
707 $rows,
708 [ [ 'keyname' ] ],
709 $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
710 __METHOD__
711 );
712
713 foreach ( $argsByKey as $key => $unused ) {
714 $resByKey[$key] = !isset( $existingByKey[$key] );
715 }
716
717 $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey );
718 }
719
740 private function modifyTableSpecificBlobsForCas(
741 IDatabase $db,
742 string $ptable,
743 float $mtime,
744 array $argsByKey,
745 array &$resByKey
746 ) {
747 $valueSizesByKey = [];
748
749 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
750
751 // This check must happen outside the write query to respect eventual consistency
753 ->select( $this->addCasTokenFields( $db, [ 'keyname' ] ) )
754 ->from( $ptable )
755 ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
756 ->caller( __METHOD__ )
757 ->fetchResultSet();
758
759 $curTokensByKey = [];
760 foreach ( $res as $row ) {
761 $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row );
762 }
763
764 $rows = [];
765 $nonMatchingByKey = [];
766 foreach ( $argsByKey as $key => [ $value, $exptime, $casToken ] ) {
767 $curToken = $curTokensByKey[$key] ?? null;
768 if ( $curToken === null ) {
769 $nonMatchingByKey[$key] = true;
770 $this->logger->debug( __METHOD__ . ": $key does not exists" );
771 continue;
772 }
773
774 if ( $curToken !== $casToken ) {
775 $nonMatchingByKey[$key] = true;
776 $this->logger->debug( __METHOD__ . ": $key does not have a matching token" );
777 continue;
778 }
779
780 $serialValue = $this->getSerialized( $value, $key );
781 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
782 $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
783
784 $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
785 }
786
787 $db->upsert(
788 $ptable,
789 $rows,
790 [ [ 'keyname' ] ],
791 $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
792 __METHOD__
793 );
794
795 foreach ( $argsByKey as $key => $unused ) {
796 $resByKey[$key] = !isset( $nonMatchingByKey[$key] );
797 }
798
799 $this->updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey );
800 }
801
821 private function modifyTableSpecificBlobsForChangeTTL(
822 IDatabase $db,
823 string $ptable,
824 float $mtime,
825 array $argsByKey,
826 array &$resByKey
827 ) {
828 if ( $this->multiPrimaryMode ) {
829 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
830
832 ->select( [ 'keyname', 'value' ] )
833 ->from( $ptable )
834 ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
835 ->caller( __METHOD__ )
836 ->fetchResultSet();
837
838 $rows = [];
839 $existingKeys = [];
840 foreach ( $res as $curRow ) {
841 $key = $curRow->keyname;
842 $existingKeys[$key] = true;
843 $serialValue = $this->dbDecodeSerialValue( $db, $curRow->value );
844 [ $exptime ] = $argsByKey[$key];
845 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
846 $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
847 }
848
849 $db->upsert(
850 $ptable,
851 $rows,
852 [ [ 'keyname' ] ],
853 $this->buildMultiUpsertSetForOverwrite( $db, $mt ),
854 __METHOD__
855 );
856
857 foreach ( $argsByKey as $key => $unused ) {
858 $resByKey[$key] = isset( $existingKeys[$key] );
859 }
860 } else {
861 $keysBatchesByExpiry = [];
862 foreach ( $argsByKey as $key => [ $exptime ] ) {
863 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
864 $keysBatchesByExpiry[$expiry][] = $key;
865 }
866
867 $existingCount = 0;
868 foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) {
869 $db->update(
870 $ptable,
871 [ 'exptime' => $this->encodeDbExpiry( $db, $expiry ) ],
872 $this->buildExistenceConditions( $db, $keyBatch, (int)$mtime ),
873 __METHOD__
874 );
875 $existingCount += $db->affectedRows();
876 }
877 if ( $existingCount === count( $argsByKey ) ) {
878 foreach ( $argsByKey as $key => $args ) {
879 $resByKey[$key] = true;
880 }
881 }
882 }
883
884 $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) );
885 }
886
906 private function modifyTableSpecificBlobsForIncrInit(
907 IDatabase $db,
908 string $ptable,
909 float $mtime,
910 array $argsByKey,
911 array &$resByKey
912 ) {
913 foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) {
914 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
915 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
916
917 // Use a transaction so that changes from other threads are not visible due to
918 // "consistent reads". This way, the exact post-increment value can be returned.
919 // The "live key exists" check can go inside the write query and remain safe for
920 // replication since the TTL for such keys is either indefinite or very short.
921 $atomic = $db->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
922 try {
923 $db->upsert(
924 $ptable,
925 $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
926 [ [ 'keyname' ] ],
927 $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ),
928 __METHOD__
929 );
930 $affectedCount = $db->affectedRows();
931 $row = $db->newSelectQueryBuilder()
932 ->select( 'value' )
933 ->from( $ptable )
934 ->where( [ 'keyname' => $key ] )
935 ->caller( __METHOD__ )
936 ->fetchRow();
937 } catch ( Exception $e ) {
938 $db->cancelAtomic( __METHOD__, $atomic );
939 throw $e;
940 }
941 $db->endAtomic( __METHOD__ );
942
943 if ( !$affectedCount || $row === false ) {
944 $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
945 continue;
946 }
947
948 $serialValue = $this->dbDecodeSerialValue( $db, $row->value );
949 if ( !$this->isInteger( $serialValue ) ) {
950 $this->logger->warning( __METHOD__ . ": got non-integer $key value" );
951 continue;
952 }
953
954 $resByKey[$key] = (int)$serialValue;
955 }
956
957 $this->updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) );
958 }
959
971 private function modifyTableSpecificBlobsForIncrInitAsync(
972 IDatabase $db,
973 string $ptable,
974 float $mtime,
975 array $argsByKey,
976 array &$resByKey
977 ) {
978 foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) {
979 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
980 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
981 $db->upsert(
982 $ptable,
983 $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
984 [ [ 'keyname' ] ],
985 $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ),
986 __METHOD__
987 );
988 if ( !$db->affectedRows() ) {
989 $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
990 } else {
991 $resByKey[$key] = true;
992 }
993 }
994 }
995
1001 private function makeNewKeyExpiry( $exptime, int $nowTsUnix ) {
1002 $expiry = $this->getExpirationAsTimestamp( $exptime );
1003 // Eventual consistency requires the preservation of recently modified keys.
1004 // Do not create rows with `exptime` fields so low that they might get garbage
1005 // collected before being replicated.
1006 if ( $expiry !== self::TTL_INDEFINITE ) {
1007 $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC );
1008 }
1009
1010 return $expiry;
1011 }
1012
1031 private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) {
1032 if ( !$this->lock( $key, 0 ) ) {
1033 return null;
1034 }
1035
1036 $scope = new ScopedCallback( function () use ( $key ) {
1037 $this->unlock( $key );
1038 } );
1039
1040 // sprintf is used to adjust precision
1041 return (float)sprintf( '%.6F', $this->locks[$key][self::LOCK_TIME] );
1042 }
1043
1053 private function makeTimestampedModificationToken( float $mtime, IDatabase $db ) {
1054 // We have reserved space for upto 6 digits in the microsecond portion of the token.
1055 // This is for future use only (maybe CAS tokens) and not currently used.
1056 // It is currently populated by the microsecond portion returned by microtime,
1057 // which generally has fewer than 6 digits of meaningful precision but can still be useful
1058 // in debugging (to see the token continuously change even during rapid testing).
1059 $seconds = (int)$mtime;
1060 [ , $microseconds ] = explode( '.', sprintf( '%.6F', $mtime ) );
1061
1062 $id = $db->getTopologyBasedServerId() ?? sprintf( '%u', crc32( $db->getServerName() ) );
1063
1064 $token = implode( '', [
1065 // 67 bit integral portion of UNIX timestamp, qualified
1066 \Wikimedia\base_convert(
1067 // 35 bit integral seconds portion of UNIX timestamp
1068 str_pad( base_convert( (string)$seconds, 10, 2 ), 35, '0', STR_PAD_LEFT ) .
1069 // 32 bit ID of the primary database server handling the write
1070 str_pad( base_convert( $id, 10, 2 ), 32, '0', STR_PAD_LEFT ),
1071 2,
1072 36,
1073 13
1074 ),
1075 // 20 bit fractional portion of UNIX timestamp, as integral microseconds
1076 str_pad( base_convert( $microseconds, 10, 36 ), 4, '0', STR_PAD_LEFT )
1077 ] );
1078
1079 if ( strlen( $token ) !== 17 ) {
1080 throw new RuntimeException( "Modification timestamp overflow detected" );
1081 }
1082
1083 return $token;
1084 }
1085
1094 private function buildExistenceConditions( IDatabase $db, $keys, int $time ) {
1095 // Note that tombstones always have past expiration dates
1096 return [
1097 'keyname' => $keys,
1098 'exptime >= ' . $db->addQuotes( $db->timestamp( $time ) )
1099 ];
1100 }
1101
1112 private function buildUpsertRow(
1113 IDatabase $db,
1114 $key,
1115 $serialValue,
1116 int $expiry,
1117 string $mt
1118 ) {
1119 $row = [
1120 'keyname' => $key,
1121 'value' => $this->dbEncodeSerialValue( $db, $serialValue ),
1122 'exptime' => $this->encodeDbExpiry( $db, $expiry )
1123 ];
1124 if ( $this->multiPrimaryMode ) {
1125 $row['modtoken'] = $mt;
1126 }
1127
1128 return $row;
1129 }
1130
1138 private function buildMultiUpsertSetForOverwrite( IDatabase $db, string $mt ) {
1139 $expressionsByColumn = [
1140 'value' => $db->buildExcludedValue( 'value' ),
1141 'exptime' => $db->buildExcludedValue( 'exptime' )
1142 ];
1143
1144 $set = [];
1145 if ( $this->multiPrimaryMode ) {
1146 // The query might take a while to replicate, during which newer values might get
1147 // written. Qualify the query so that it does not override such values. Note that
1148 // duplicate tokens generated around the same time for a key should still result
1149 // in convergence given the use of server_id in modtoken (providing a total order
1150 // among primary DB servers) and MySQL binlog ordering (providing a total order
1151 // for writes replicating from a given primary DB server).
1152 $expressionsByColumn['modtoken'] = $db->addQuotes( $mt );
1153 foreach ( $expressionsByColumn as $column => $updateExpression ) {
1154 $rhs = $db->conditional(
1155 $db->addQuotes( substr( $mt, 0, 13 ) ) . ' >= ' .
1156 $db->buildSubString( 'modtoken', 1, 13 ),
1157 $updateExpression,
1158 $column
1159 );
1160 $set[] = "{$column}=" . trim( $rhs );
1161 }
1162 } else {
1163 foreach ( $expressionsByColumn as $column => $updateExpression ) {
1164 $set[] = "{$column}={$updateExpression}";
1165 }
1166 }
1167
1168 return $set;
1169 }
1170
1182 private function buildIncrUpsertSet(
1183 IDatabase $db,
1184 int $step,
1185 int $init,
1186 int $expiry,
1187 string $mt,
1188 int $mtUnixTs
1189 ) {
1190 // Map of (column => (SQL for non-expired key rows, SQL for expired key rows))
1191 $expressionsByColumn = [
1192 'value' => [
1193 $db->buildIntegerCast( 'value' ) . " + {$db->addQuotes( $step )}",
1194 $db->addQuotes( $this->dbEncodeSerialValue( $db, $init ) )
1195 ],
1196 'exptime' => [
1197 'exptime',
1198 $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1199 ]
1200 ];
1201 if ( $this->multiPrimaryMode ) {
1202 $expressionsByColumn['modtoken'] = [ 'modtoken', $db->addQuotes( $mt ) ];
1203 }
1204
1205 $set = [];
1206 foreach ( $expressionsByColumn as $column => [ $updateExpression, $initExpression ] ) {
1207 $rhs = $db->conditional(
1208 'exptime >= ' . $db->addQuotes( $db->timestamp( $mtUnixTs ) ),
1209 $updateExpression,
1210 $initExpression
1211 );
1212 $set[] = "{$column}=" . trim( $rhs );
1213 }
1214
1215 return $set;
1216 }
1217
1223 private function encodeDbExpiry( IDatabase $db, int $expiry ) {
1224 return ( $expiry === self::TTL_INDEFINITE )
1225 // Use the maximum timestamp that the column can store
1226 ? $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER )
1227 // Convert the absolute timestamp into the DB timestamp format
1228 : $db->timestamp( $expiry );
1229 }
1230
1236 private function decodeDbExpiry( IDatabase $db, string $dbExpiry ) {
1237 return ( $dbExpiry === $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) )
1238 ? self::TTL_INDEFINITE
1239 : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry );
1240 }
1241
1247 private function dbEncodeSerialValue( IDatabase $db, $serialValue ) {
1248 return is_int( $serialValue ) ? (string)$serialValue : $db->encodeBlob( $serialValue );
1249 }
1250
1256 private function dbDecodeSerialValue( IDatabase $db, $blob ) {
1257 return $this->isInteger( $blob ) ? (int)$blob : $db->decodeBlob( $blob );
1258 }
1259
1267 private function addCasTokenFields( IDatabase $db, array $fields ) {
1268 $type = $db->getType();
1269
1270 if ( $type === 'mysql' ) {
1271 $fields['castoken'] = $db->buildConcat( [
1272 'SHA1(value)',
1273 $db->addQuotes( '@' ),
1274 'exptime'
1275 ] );
1276 } elseif ( $type === 'postgres' ) {
1277 $fields['castoken'] = $db->buildConcat( [
1278 'md5(value)',
1279 $db->addQuotes( '@' ),
1280 'exptime'
1281 ] );
1282 } else {
1283 if ( !in_array( 'value', $fields, true ) ) {
1284 $fields[] = 'value';
1285 }
1286 if ( !in_array( 'exptime', $fields, true ) ) {
1287 $fields[] = 'exptime';
1288 }
1289 }
1290
1291 return $fields;
1292 }
1293
1301 private function getCasTokenFromRow( IDatabase $db, stdClass $row ) {
1302 if ( isset( $row->castoken ) ) {
1303 $token = $row->castoken;
1304 } else {
1305 $token = sha1( $this->dbDecodeSerialValue( $db, $row->value ) ) . '@' . $row->exptime;
1306 $this->logger->debug( __METHOD__ . ": application computed hash for CAS token" );
1307 }
1308
1309 return $token;
1310 }
1311
1316 private function occasionallyGarbageCollect( IDatabase $db ) {
1317 if (
1318 // Random purging is enabled
1319 $this->purgePeriod &&
1320 // Only purge on one in every $this->purgePeriod writes
1321 mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
1322 // Avoid repeating the delete within a few seconds
1323 ( $this->getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC
1324 ) {
1325 $garbageCollector = function () use ( $db ) {
1327 $silenceScope = $this->silenceTransactionProfiler();
1328 $this->deleteServerObjectsExpiringBefore(
1329 $db,
1330 (int)$this->getCurrentTime(),
1331 $this->purgeLimit
1332 );
1333 $this->lastGarbageCollect = time();
1334 };
1335 if ( $this->asyncHandler ) {
1336 $this->lastGarbageCollect = $this->getCurrentTime(); // avoid duplicate enqueues
1337 ( $this->asyncHandler )( $garbageCollector );
1338 } else {
1339 $garbageCollector();
1340 }
1341 }
1342 }
1343
1344 public function expireAll() {
1345 $this->deleteObjectsExpiringBefore( (int)$this->getCurrentTime() );
1346 }
1347
1349 $timestamp,
1350 callable $progress = null,
1351 $limit = INF,
1352 string $tag = null
1353 ) {
1355 $silenceScope = $this->silenceTransactionProfiler();
1356
1357 if ( $tag !== null ) {
1358 // Purge one server only, to support concurrent purging in large wiki farms (T282761).
1359 $shardIndexes = [ $this->getShardServerIndexForTag( $tag ) ];
1360 } else {
1361 $shardIndexes = $this->getShardServerIndexes();
1362 shuffle( $shardIndexes );
1363 }
1364
1365 $ok = true;
1366 $numServers = count( $shardIndexes );
1367
1368 $keysDeletedCount = 0;
1369 foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
1370 try {
1371 $db = $this->getConnection( $shardIndex );
1372 $this->deleteServerObjectsExpiringBefore(
1373 $db,
1374 $timestamp,
1375 $limit,
1376 $keysDeletedCount,
1377 [ 'fn' => $progress, 'serversDone' => $numServersDone, 'serversTotal' => $numServers ]
1378 );
1379 } catch ( DBError $e ) {
1380 $this->handleDBError( $e, $shardIndex );
1381 $ok = false;
1382 }
1383 }
1384
1385 return $ok;
1386 }
1387
1397 private function deleteServerObjectsExpiringBefore(
1398 IDatabase $db,
1399 $timestamp,
1400 $limit,
1401 &$keysDeletedCount = 0,
1402 array $progress = null
1403 ) {
1404 $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
1405 if ( $this->multiPrimaryMode ) {
1406 // Eventual consistency requires the preservation of any key that was recently
1407 // modified. The key must exist on this database server long enough for the server
1408 // to receive, via replication, all writes to the key with lower timestamps. Such
1409 // writes should be no-ops since the existing key value should "win". If the network
1410 // partitions between datacenters A and B for 30 minutes, the database servers in
1411 // each datacenter will see an initial burst of writes with "old" timestamps via
1412 // replication. This might include writes with lower timestamps that the existing
1413 // key value. Therefore, clock skew and replication delay are both factors.
1414 $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC;
1415 $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe );
1416 }
1417 $tableIndexes = range( 0, $this->numTableShards - 1 );
1418 shuffle( $tableIndexes );
1419
1420 $batchSize = min( $this->writeBatchSize, $limit );
1421
1422 foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
1423 // The oldest expiry of a row we have deleted on this shard
1424 // (the first row that we deleted)
1425 $minExpUnix = null;
1426 // The most recent expiry time so far, from a row we have deleted on this shard
1427 $maxExp = null;
1428 // Size of the time range we'll delete, in seconds (for progress estimate)
1429 $totalSeconds = null;
1430
1431 do {
1433 ->select( [ 'keyname', 'exptime' ] )
1434 ->from( $this->getTableNameByShard( $tableIndex ) )
1435 ->where(
1436 array_merge(
1437 [ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ) ],
1438 $maxExp ? [ 'exptime >= ' . $db->addQuotes( $maxExp ) ] : []
1439 )
1440 )
1441 ->orderBy( 'exptime', SelectQueryBuilder::SORT_ASC )
1442 ->limit( $batchSize )
1443 ->caller( __METHOD__ )
1444 ->fetchResultSet();
1445
1446 if ( $res->numRows() ) {
1447 $row = $res->current();
1448 if ( $minExpUnix === null ) {
1449 $minExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
1450 $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 );
1451 }
1452
1453 $keys = [];
1454 foreach ( $res as $row ) {
1455 $keys[] = $row->keyname;
1456 $maxExp = $row->exptime;
1457 }
1458
1459 $db->delete(
1460 $this->getTableNameByShard( $tableIndex ),
1461 [
1462 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ),
1463 'keyname' => $keys
1464 ],
1465 __METHOD__
1466 );
1467 $keysDeletedCount += $db->affectedRows();
1468 }
1469
1470 if ( $progress && is_callable( $progress['fn'] ) ) {
1471 if ( $totalSeconds ) {
1472 $maxExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $maxExp );
1473 $remainingSeconds = $cutoffUnix - $maxExpUnix;
1474 $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 );
1475 // For example, if we've done 1.5 table shard, and are thus half-way on the
1476 // 2nd of perhaps 5 tables on this server, then this might be:
1477 // `( 1 + ( 43200 / 86400 ) ) / 5 = 0.3`, or 30% done, of tables on this server.
1478 $tablesDoneRatio =
1479 ( $numShardsDone + ( $processedSeconds / $totalSeconds ) ) / $this->numTableShards;
1480 } else {
1481 $tablesDoneRatio = 1;
1482 }
1483
1484 // For example, if we're 30% done on the last of 10 servers, then this might be:
1485 // `( 9 / 10 ) + ( 0.3 / 10 ) = 0.93`, or 93% done, overall.
1486 $overallRatio = ( $progress['serversDone'] / $progress['serversTotal'] ) +
1487 ( $tablesDoneRatio / $progress['serversTotal'] );
1488 ( $progress['fn'] )( $overallRatio * 100 );
1489 }
1490 } while ( $res->numRows() && $keysDeletedCount < $limit );
1491 }
1492 }
1493
1499 public function deleteAll() {
1501 $silenceScope = $this->silenceTransactionProfiler();
1502 foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1503 try {
1504 $db = $this->getConnection( $shardIndex );
1505 for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1506 $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ );
1507 }
1508 } catch ( DBError $e ) {
1509 $this->handleDBError( $e, $shardIndex );
1510 return false;
1511 }
1512 }
1513 return true;
1514 }
1515
1516 public function doLock( $key, $timeout = 6, $exptime = 6 ) {
1518 $silenceScope = $this->silenceTransactionProfiler();
1519
1520 $lockTsUnix = null;
1521
1522 [ $shardIndex ] = $this->getKeyLocation( $key );
1523 try {
1524 $db = $this->getConnection( $shardIndex );
1525 $lockTsUnix = $db->lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP );
1526 } catch ( DBError $e ) {
1527 $this->handleDBError( $e, $shardIndex );
1528 $this->logger->warning(
1529 __METHOD__ . ' failed due to I/O error for {key}.',
1530 [ 'key' => $key ]
1531 );
1532 }
1533
1534 return $lockTsUnix;
1535 }
1536
1537 public function doUnlock( $key ) {
1539 $silenceScope = $this->silenceTransactionProfiler();
1540
1541 [ $shardIndex ] = $this->getKeyLocation( $key );
1542
1543 try {
1544 $db = $this->getConnection( $shardIndex );
1545 $released = $db->unlock( $key, __METHOD__ );
1546 } catch ( DBError $e ) {
1547 $this->handleDBError( $e, $shardIndex );
1548 $released = false;
1549 }
1550
1551 return $released;
1552 }
1553
1554 protected function makeKeyInternal( $keyspace, $components ) {
1555 // SQL schema for 'objectcache' specifies keys as varchar(255). From that,
1556 // subtract the number of characters we need for the keyspace and for
1557 // the separator character needed for each argument. To handle some
1558 // custom prefixes used by thing like WANObjectCache, limit to 205.
1559 $keyspace = strtr( $keyspace, ' ', '_' );
1560 $charsLeft = 205 - strlen( $keyspace ) - count( $components );
1561 foreach ( $components as &$component ) {
1562 $component = strtr( $component, [
1563 ' ' => '_', // Avoid unnecessary misses from pre-1.35 code
1564 ':' => '%3A',
1565 ] );
1566
1567 // 33 = 32 characters for the MD5 + 1 for the '#' prefix.
1568 if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) {
1569 $component = '#' . md5( $component );
1570 }
1571 $charsLeft -= strlen( $component );
1572 }
1573
1574 if ( $charsLeft < 0 ) {
1575 return $keyspace . ':BagOStuff-long-key:##' . md5( implode( ':', $components ) );
1576 }
1577 return $keyspace . ':' . implode( ':', $components );
1578 }
1579
1580 protected function serialize( $value ) {
1581 if ( is_int( $value ) ) {
1582 return $value;
1583 }
1584
1585 $serial = serialize( $value );
1586 if ( $this->hasZlib ) {
1587 // On typical message and page data, this can provide a 3X storage savings
1588 $serial = gzdeflate( $serial );
1589 }
1590
1591 return $serial;
1592 }
1593
1594 protected function unserialize( $value ) {
1595 if ( $value === self::TOMB_SERIAL ) {
1596 return false; // tombstone
1597 }
1598
1599 if ( $this->isInteger( $value ) ) {
1600 return (int)$value;
1601 }
1602
1603 if ( $this->hasZlib ) {
1604 AtEase::suppressWarnings();
1605 $decompressed = gzinflate( $value );
1606 AtEase::restoreWarnings();
1607
1608 if ( $decompressed !== false ) {
1609 $value = $decompressed;
1610 }
1611 }
1612
1613 return unserialize( $value );
1614 }
1615
1616 private function getLoadBalancer(): ILoadBalancer {
1617 if ( !$this->loadBalancer ) {
1618 $this->loadBalancer = ( $this->loadBalancerCallback )();
1619 }
1620 return $this->loadBalancer;
1621 }
1622
1627 private function getConnectionViaLoadBalancer() {
1628 $lb = $this->getLoadBalancer();
1629
1630 if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) {
1631 // Use the main connection to avoid transaction deadlocks
1632 $conn = $lb->getMaintenanceConnectionRef( DB_PRIMARY, [], $this->dbDomain );
1633 } else {
1634 // If the RDBMS has row/table/page level locking, then use separate auto-commit
1635 // connection to avoid needless contention and deadlocks.
1636 $conn = $lb->getMaintenanceConnectionRef(
1637 $this->replicaOnly ? DB_REPLICA : DB_PRIMARY,
1638 [],
1639 $this->dbDomain,
1640 $lb::CONN_TRX_AUTOCOMMIT
1641 );
1642 }
1643
1644 // Make sure any errors are thrown now while we can more easily handle them
1645 $conn->ensureConnection();
1646 return $conn;
1647 }
1648
1655 private function getConnectionFromServerInfo( $shardIndex, array $server ) {
1656 if ( !isset( $this->conns[$shardIndex] ) ) {
1658 $dbFactory = MediaWikiServices::getInstance()->getDatabaseFactory();
1659 $conn = $dbFactory->create(
1660 $server['type'],
1661 array_merge(
1662 $server,
1663 [
1664 // Make sure the handle uses autocommit mode
1665 'flags' => ( $server['flags'] ?? 0 ) & ~IDatabase::DBO_TRX,
1666 'logger' => $this->logger,
1667 ]
1668 )
1669 );
1670 // Automatically create the objectcache table for sqlite as needed
1671 if ( $conn->getType() === 'sqlite' ) {
1672 $this->initSqliteDatabase( $conn );
1673 }
1674 $this->conns[$shardIndex] = $conn;
1675 }
1676
1677 // @phan-suppress-next-line PhanTypeMismatchReturnNullable False positive
1678 return $this->conns[$shardIndex];
1679 }
1680
1687 private function handleDBError( DBError $exception, $shardIndex ) {
1688 if ( !$this->useLB && $exception instanceof DBConnectionError ) {
1689 $this->markServerDown( $exception, $shardIndex );
1690 }
1691 $this->setAndLogDBError( $exception );
1692 }
1693
1697 private function setAndLogDBError( DBError $e ) {
1698 $this->logger->error( "DBError: {$e->getMessage()}", [ 'exception' => $e ] );
1699 if ( $e instanceof DBConnectionError ) {
1700 $this->setLastError( self::ERR_UNREACHABLE );
1701 $this->logger->warning( __METHOD__ . ": ignoring connection error" );
1702 } else {
1703 $this->setLastError( self::ERR_UNEXPECTED );
1704 $this->logger->warning( __METHOD__ . ": ignoring query error" );
1705 }
1706 }
1707
1714 private function markServerDown( DBError $exception, $shardIndex ) {
1715 unset( $this->conns[$shardIndex] ); // bug T103435
1716
1717 $now = $this->getCurrentTime();
1718 if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
1719 if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
1720 unset( $this->connFailureTimes[$shardIndex] );
1721 unset( $this->connFailureErrors[$shardIndex] );
1722 } else {
1723 $this->logger->debug( __METHOD__ . ": Server #$shardIndex already down" );
1724 return;
1725 }
1726 }
1727 $this->logger->info( __METHOD__ . ": Server #$shardIndex down until " . ( $now + 60 ) );
1728 $this->connFailureTimes[$shardIndex] = $now;
1729 $this->connFailureErrors[$shardIndex] = $exception;
1730 }
1731
1736 private function initSqliteDatabase( IMaintainableDatabase $db ) {
1737 if ( $db->tableExists( 'objectcache', __METHOD__ ) ) {
1738 return;
1739 }
1740 // Use one table for SQLite; sharding does not seem to have much benefit
1741 $db->query( "PRAGMA journal_mode=WAL", __METHOD__ ); // this is permanent
1742 $db->startAtomic( __METHOD__ ); // atomic DDL
1743 try {
1744 $encTable = $db->tableName( 'objectcache' );
1745 $encExptimeIndex = $db->addIdentifierQuotes( $db->tablePrefix() . 'exptime' );
1746 $db->query(
1747 "CREATE TABLE $encTable (\n" .
1748 " keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
1749 " value BLOB,\n" .
1750 " exptime BLOB NOT NULL\n" .
1751 ")",
1752 __METHOD__
1753 );
1754 $db->query( "CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ );
1755 $db->endAtomic( __METHOD__ );
1756 } catch ( DBError $e ) {
1757 $db->rollback( __METHOD__ );
1758 throw $e;
1759 }
1760 }
1761
1777 public function createTables() {
1778 foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1779 $db = $this->getConnection( $shardIndex );
1780 if ( in_array( $db->getType(), [ 'mysql', 'postgres' ], true ) ) {
1781 for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1782 $encBaseTable = $db->tableName( 'objectcache' );
1783 $encShardTable = $db->tableName( $this->getTableNameByShard( $i ) );
1784 $db->query( "CREATE TABLE $encShardTable LIKE $encBaseTable", __METHOD__ );
1785 }
1786 }
1787 }
1788 }
1789
1793 private function getShardServerIndexes() {
1794 if ( $this->useLB ) {
1795 // LoadBalancer based configuration
1796 $shardIndexes = [ 0 ];
1797 } else {
1798 // Striped array of database servers
1799 $shardIndexes = array_keys( $this->serverTags );
1800 }
1801
1802 return $shardIndexes;
1803 }
1804
1810 private function getShardServerIndexForTag( string $tag ) {
1811 if ( !$this->serverTags ) {
1812 throw new InvalidArgumentException( "Given a tag but no tags are configured" );
1813 }
1814 foreach ( $this->serverTags as $serverShardIndex => $serverTag ) {
1815 if ( $tag === $serverTag ) {
1816 return $serverShardIndex;
1817 }
1818 }
1819 throw new InvalidArgumentException( "Unknown server tag: $tag" );
1820 }
1821
1827 private function silenceTransactionProfiler() {
1828 if ( $this->serverInfos ) {
1829 return null; // no TransactionProfiler injected anyway
1830 }
1831 return Profiler::instance()->getTransactionProfiler()->silenceForScope();
1832 }
1833}
if(!defined('MW_SETUP_CALLBACK'))
The persistent session ID (if any) loaded at startup.
Definition WebStart.php:88
static consistentHashSort(&$array, $key, $separator="\000")
Sort the given array in a pseudo-random order which depends only on the given key and each element va...
string $keyspace
Default keyspace; used by makeKey()
Service locator for MediaWiki core services.
Storage medium specific cache for storing items (e.g.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
getSerialized( $value, $key)
Get the serialized form a value, logging a warning if it involves custom classes.
unlock( $key)
Release an advisory lock on a key string.
updateOpStats(string $op, array $keyInfo)
isInteger( $value)
Check if a value is an integer.
lock( $key, $timeout=6, $exptime=6, $rclass='')
static instance()
Singleton.
Definition Profiler.php:108
RDBMS-based caching module.
bool $multiPrimaryMode
Whether multi-primary mode is enabled.
deleteObjectsExpiringBefore( $timestamp, callable $progress=null, $limit=INF, string $tag=null)
Delete all objects expiring before a certain date.
ILoadBalancer null $loadBalancer
doDelete( $key, $flags=0)
Delete an item.
float $lastGarbageCollect
UNIX timestamp.
string[] $serverTags
(server index => tag/host name)
doSetMulti(array $data, $exptime=0, $flags=0)
createTables()
Create the shard tables on all databases.
serialize( $value)
int $purgePeriod
Average number of writes required to trigger garbage collection.
doChangeTTL( $key, $exptime, $flags)
unserialize( $value)
array[] $serverInfos
(server index => server config)
deleteAll()
Delete content of shard tables in every server.
callable null $loadBalancerCallback
Injected function which returns a LoadBalancer.
doDeleteMulti(array $keys, $flags=0)
int $numTableShards
Number of table shards to use on each server.
int $purgeLimit
Max expired rows to purge during randomized garbage collection.
__construct( $params)
Create a new backend instance from parameters injected by ObjectCache::newFromParams()
doGet( $key, $flags=0, &$casToken=null)
Get an item.
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
bool $replicaOnly
Whether to use replicas instead of primaries (if using LoadBalancer)
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
makeKeyInternal( $keyspace, $components)
Make a cache key for the given keyspace and components.
doLock( $key, $timeout=6, $exptime=6)
Exception[] $connFailureErrors
Map of (shard index => Exception)
doIncrWithInit( $key, $exptime, $step, $init, $flags)
IMaintainableDatabase[] $conns
Map of (shard index => DB handle)
string $tableName
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
bool $useLB
Whether to use the LoadBalancer.
doCas( $casToken, $key, $value, $exptime=0, $flags=0)
Set an item if the current CAS token matches the provided CAS token.
string false null $dbDomain
DB name used for keys using the LoadBalancer.
doChangeTTLMulti(array $keys, $exptime, $flags=0)
float[] $connFailureTimes
Map of (shard index => UNIX timestamps)
Database error base class.
Definition DBError.php:31
A query builder for SELECT queries with a fluent interface.
addQuotes( $s)
Escape and quote a raw value string for use in a SQL query.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:36
rollback( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Rollback a transaction previously started using begin()
unlock( $lockName, $method)
Release a lock.
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
lock( $lockName, $method, $timeout=5, $flags=0)
Acquire a named lock.
getTopologyBasedServerId()
Get a non-recycled ID that uniquely identifies this server within the replication topology.
affectedRows()
Get the number of rows affected by the last attempted query statement.
delete( $table, $conds, $fname=__METHOD__)
Delete all rows in a table that match a condition.
update( $table, $set, $conds, $fname=__METHOD__, $options=[])
Update all rows in a table that match a given condition.
upsert( $table, array $rows, $uniqueKeys, array $set, $fname=__METHOD__)
Upsert row(s) into a table, in the provided order, while updating conflicting rows.
query( $sql, $fname=__METHOD__, $flags=0)
Run an SQL query statement and return the result.
replace( $table, $uniqueKeys, $rows, $fname=__METHOD__)
Insert row(s) into a table, in the provided order, while deleting conflicting rows.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
cancelAtomic( $fname=__METHOD__, AtomicSectionIdentifier $sectionId=null)
Cancel an atomic section of SQL statements.
This class is a delegate to ILBFactory for a given database cluster.
Advanced database interface for IDatabase handles that include maintenance methods.
tableExists( $table, $fname=__METHOD__)
Query whether a given table exists.
getType()
Get the RDBMS type of the server (e.g.
newSelectQueryBuilder()
Create an empty SelectQueryBuilder which can be used to run queries against this connection.
tablePrefix( $prefix=null)
Get/set the table prefix.
getServerName()
Get the readable name for the server.
buildExcludedValue( $column)
Build a reference to a column value from the conflicting proposed upsert() row.
conditional( $cond, $caseTrueExpression, $caseFalseExpression)
Returns an SQL expression for a simple conditional.
tableName( $name, $format='quoted')
Format a table name ready for use in constructing an SQL query.
buildSubString( $input, $startPosition, $length=null)
Build a SUBSTRING function.
addIdentifierQuotes( $s)
Escape a SQL identifier (e.g.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
buildConcat( $stringList)
Build a concatenation list to feed into a SQL query.
This program is free software; you can redistribute it and/or modify it under the terms of the GNU Ge...
const DB_REPLICA
Definition defines.php:26
const DB_PRIMARY
Definition defines.php:28