MediaWiki REL1_39
SqlBagOStuff.php
Go to the documentation of this file.
1<?php
24use Wikimedia\AtEase\AtEase;
34use Wikimedia\ScopedCallback;
35use Wikimedia\Timestamp\ConvertibleTimestamp;
36
54 protected $loadBalancer;
56 protected $dbDomain;
58 protected $useLB = false;
59
61 protected $serverInfos = [];
63 protected $serverTags = [];
65 protected $lastGarbageCollect = 0;
67 protected $purgePeriod = 10;
69 protected $purgeLimit = 100;
71 protected $numTableShards = 1;
73 protected $writeBatchSize = 100;
75 protected $tableName = 'objectcache';
77 protected $replicaOnly;
80
82 protected $conns;
84 protected $connFailureTimes = [];
86 protected $connFailureErrors = [];
87
89 private $hasZlib;
90
92 private const SAFE_CLOCK_BOUND_SEC = 15;
94 private const SAFE_PURGE_DELAY_SEC = 3600;
96 private const TOMB_SERIAL = '';
98 private const TOMB_EXPTIME = -self::SAFE_CLOCK_BOUND_SEC;
100 private const GC_DELAY_SEC = 1;
101
102 private const BLOB_VALUE = 0;
103 private const BLOB_EXPIRY = 1;
104 private const BLOB_CASTOKEN = 2;
105
112 private const INF_TIMESTAMP_PLACEHOLDER = '99991231235959';
113
143 public function __construct( $params ) {
144 parent::__construct( $params );
145
146 if ( isset( $params['servers'] ) || isset( $params['server'] ) ) {
147 // Configuration uses a direct list of servers.
148 // Object data is horizontally partitioned via key hash.
149 $index = 0;
150 foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) {
151 $this->serverInfos[$index] = $info;
152 // Allow integer-indexes arrays for b/c
153 $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index";
154 ++$index;
155 }
156 } elseif ( isset( $params['loadBalancerCallback'] ) ) {
157 $this->loadBalancerCallback = $params['loadBalancerCallback'];
158 if ( !isset( $params['dbDomain'] ) ) {
159 throw new InvalidArgumentException(
160 __METHOD__ . ": 'dbDomain' is required if 'loadBalancerCallback' is given"
161 );
162 }
163 $this->dbDomain = $params['dbDomain'];
164 $this->useLB = true;
165 } else {
166 throw new InvalidArgumentException(
167 __METHOD__ . " requires 'server', 'servers', or 'loadBalancerCallback'"
168 );
169 }
170
171 $this->purgePeriod = intval( $params['purgePeriod'] ?? $this->purgePeriod );
172 $this->purgeLimit = intval( $params['purgeLimit'] ?? $this->purgeLimit );
173 $this->tableName = $params['tableName'] ?? $this->tableName;
174 $this->numTableShards = intval( $params['shards'] ?? $this->numTableShards );
175 $this->writeBatchSize = intval( $params['writeBatchSize'] ?? $this->writeBatchSize );
176 $this->replicaOnly = $params['replicaOnly'] ?? false;
177 $this->multiPrimaryMode = $params['multiPrimaryMode'] ?? false;
178
179 $this->attrMap[self::ATTR_DURABILITY] = self::QOS_DURABILITY_RDBMS;
180 $this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL;
181
182 $this->hasZlib = extension_loaded( 'zlib' );
183 }
184
185 protected function doGet( $key, $flags = 0, &$casToken = null ) {
186 $getToken = ( $casToken === self::PASS_BY_REF );
187 $casToken = null;
188
189 $data = $this->fetchBlobs( [ $key ], $getToken )[$key];
190 if ( $data ) {
191 $result = $this->unserialize( $data[self::BLOB_VALUE] );
192 if ( $getToken && $result !== false ) {
193 $casToken = $data[self::BLOB_CASTOKEN];
194 }
195 $valueSize = strlen( $data[self::BLOB_VALUE] );
196 } else {
197 $result = false;
198 $valueSize = false;
199 }
200
201 $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
202
203 return $result;
204 }
205
206 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
207 $mtime = $this->getCurrentTime();
208
209 return $this->modifyBlobs(
210 [ $this, 'modifyTableSpecificBlobsForSet' ],
211 $mtime,
212 [ $key => [ $value, $exptime ] ],
213 $flags
214 );
215 }
216
217 protected function doDelete( $key, $flags = 0 ) {
218 $mtime = $this->getCurrentTime();
219
220 return $this->modifyBlobs(
221 [ $this, 'modifyTableSpecificBlobsForDelete' ],
222 $mtime,
223 [ $key => [] ],
224 $flags
225 );
226 }
227
228 protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
229 $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
230 if ( $mtime === null ) {
231 // Timeout or I/O error during lock acquisition
232 return false;
233 }
234
235 return $this->modifyBlobs(
236 [ $this, 'modifyTableSpecificBlobsForAdd' ],
237 $mtime,
238 [ $key => [ $value, $exptime ] ],
239 $flags
240 );
241 }
242
243 protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
244 $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
245 if ( $mtime === null ) {
246 // Timeout or I/O error during lock acquisition
247 return false;
248 }
249
250 return $this->modifyBlobs(
251 [ $this, 'modifyTableSpecificBlobsForCas' ],
252 $mtime,
253 [ $key => [ $value, $exptime, $casToken ] ],
254 $flags
255 );
256 }
257
258 protected function doChangeTTL( $key, $exptime, $flags ) {
259 $mtime = $this->getCurrentTime();
260
261 return $this->modifyBlobs(
262 [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
263 $mtime,
264 [ $key => [ $exptime ] ],
265 $flags
266 );
267 }
268
269 protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
270 $mtime = $this->getCurrentTime();
271
272 if ( $flags & self::WRITE_BACKGROUND ) {
273 $callback = [ $this, 'modifyTableSpecificBlobsForIncrInitAsync' ];
274 } else {
275 $callback = [ $this, 'modifyTableSpecificBlobsForIncrInit' ];
276 }
277
278 $result = $this->modifyBlobs(
279 $callback,
280 $mtime,
281 [ $key => [ $step, $init, $exptime ] ],
282 $flags,
283 $resByKey
284 ) ? $resByKey[$key] : false;
285
286 return $result;
287 }
288
289 public function incr( $key, $value = 1, $flags = 0 ) {
290 return $this->doIncr( $key, $value, $flags );
291 }
292
293 public function decr( $key, $value = 1, $flags = 0 ) {
294 return $this->doIncr( $key, -$value, $flags );
295 }
296
297 private function doIncr( $key, $value = 1, $flags = 0 ) {
298 $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
299 if ( $mtime === null ) {
300 // Timeout or I/O error during lock acquisition
301 return false;
302 }
303
304 $data = $this->fetchBlobs( [ $key ] )[$key];
305 if ( $data ) {
306 $serialValue = $data[self::BLOB_VALUE];
307 if ( $this->isInteger( $serialValue ) ) {
308 $newValue = max( (int)$serialValue + (int)$value, 0 );
309 $result = $this->modifyBlobs(
310 [ $this, 'modifyTableSpecificBlobsForSet' ],
311 $mtime,
312 // Preserve the old expiry timestamp
313 [ $key => [ $newValue, $data[self::BLOB_EXPIRY] ] ],
314 $flags
315 ) ? $newValue : false;
316 } else {
317 $result = false;
318 $this->logger->warning( __METHOD__ . ": $key is a non-integer" );
319 }
320 } else {
321 $result = false;
322 $this->logger->debug( __METHOD__ . ": $key does not exists" );
323 }
324
325 $this->updateOpStats( $value >= 0 ? self::METRIC_OP_INCR : self::METRIC_OP_DECR, [ $key ] );
326
327 return $result;
328 }
329
330 protected function doGetMulti( array $keys, $flags = 0 ) {
331 $result = [];
332 $valueSizeByKey = [];
333
334 $dataByKey = $this->fetchBlobs( $keys );
335 foreach ( $keys as $key ) {
336 $data = $dataByKey[$key];
337 if ( $data ) {
338 $serialValue = $data[self::BLOB_VALUE];
339 $value = $this->unserialize( $serialValue );
340 if ( $value !== false ) {
341 $result[$key] = $value;
342 }
343 $valueSize = strlen( $serialValue );
344 } else {
345 $valueSize = false;
346 }
347 $valueSizeByKey[$key] = [ 0, $valueSize ];
348 }
349
350 $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey );
351
352 return $result;
353 }
354
355 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
356 $mtime = $this->getCurrentTime();
357
358 return $this->modifyBlobs(
359 [ $this, 'modifyTableSpecificBlobsForSet' ],
360 $mtime,
361 array_map(
362 static function ( $value ) use ( $exptime ) {
363 return [ $value, $exptime ];
364 },
365 $data
366 ),
367 $flags
368 );
369 }
370
371 protected function doDeleteMulti( array $keys, $flags = 0 ) {
372 $mtime = $this->getCurrentTime();
373
374 return $this->modifyBlobs(
375 [ $this, 'modifyTableSpecificBlobsForDelete' ],
376 $mtime,
377 array_fill_keys( $keys, [] ),
378 $flags
379 );
380 }
381
382 public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
383 $mtime = $this->getCurrentTime();
384
385 return $this->modifyBlobs(
386 [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
387 $mtime,
388 array_fill_keys( $keys, [ $exptime ] ),
389 $flags
390 );
391 }
392
401 private function getConnection( $shardIndex ) {
402 if ( $this->useLB ) {
403 return $this->getConnectionViaLoadBalancer();
404 }
405
406 // Don't keep timing out trying to connect if the server is down
407 if (
408 isset( $this->connFailureErrors[$shardIndex] ) &&
409 ( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
410 ) {
411 throw $this->connFailureErrors[$shardIndex];
412 }
413
414 if ( isset( $this->serverInfos[$shardIndex] ) ) {
415 $server = $this->serverInfos[$shardIndex];
416 $conn = $this->getConnectionFromServerInfo( $shardIndex, $server );
417 } else {
418 throw new UnexpectedValueException( "Invalid server index #$shardIndex" );
419 }
420
421 return $conn;
422 }
423
429 private function getKeyLocation( $key ) {
430 if ( $this->useLB ) {
431 // LoadBalancer based configuration
432 $shardIndex = 0;
433 } else {
434 // Striped array of database servers
435 if ( count( $this->serverTags ) == 1 ) {
436 $shardIndex = 0; // short-circuit
437 } else {
438 $sortedServers = $this->serverTags;
439 ArrayUtils::consistentHashSort( $sortedServers, $key );
440 reset( $sortedServers );
441 $shardIndex = key( $sortedServers );
442 }
443 }
444
445 if ( $this->numTableShards > 1 ) {
446 $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
447 $tableIndex = $hash % $this->numTableShards;
448 } else {
449 $tableIndex = null;
450 }
451
452 return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ];
453 }
454
460 private function getTableNameByShard( $index ) {
461 if ( $index !== null && $this->numTableShards > 1 ) {
462 $decimals = strlen( (string)( $this->numTableShards - 1 ) );
463
464 return $this->tableName . sprintf( "%0{$decimals}d", $index );
465 }
466
467 return $this->tableName;
468 }
469
475 private function fetchBlobs( array $keys, bool $getCasToken = false ) {
477 $silenceScope = $this->silenceTransactionProfiler();
478
479 // Initialize order-preserved per-key results; set values for live keys below
480 $dataByKey = array_fill_keys( $keys, null );
481
482 $readTime = (int)$this->getCurrentTime();
483 $keysByTableByShard = [];
484 foreach ( $keys as $key ) {
485 list( $shardIndex, $partitionTable ) = $this->getKeyLocation( $key );
486 $keysByTableByShard[$shardIndex][$partitionTable][] = $key;
487 }
488
489 foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) {
490 try {
491 $db = $this->getConnection( $shardIndex );
492 foreach ( $serverKeys as $partitionTable => $tableKeys ) {
494 ->select(
495 $getCasToken
496 ? $this->addCasTokenFields( $db, [ 'keyname', 'value', 'exptime' ] )
497 : [ 'keyname', 'value', 'exptime' ] )
498 ->from( $partitionTable )
499 ->where( $this->buildExistenceConditions( $db, $tableKeys, $readTime ) )
500 ->caller( __METHOD__ )
501 ->fetchResultSet();
502 foreach ( $res as $row ) {
503 $row->shardIndex = $shardIndex;
504 $row->tableName = $partitionTable;
505 $dataByKey[$row->keyname] = $row;
506 }
507 }
508 } catch ( DBError $e ) {
509 $this->handleDBError( $e, $shardIndex );
510 }
511 }
512
513 foreach ( $keys as $key ) {
514 $row = $dataByKey[$key] ?? null;
515 if ( !$row ) {
516 continue;
517 }
518
519 $this->debug( __METHOD__ . ": retrieved $key; expiry time is {$row->exptime}" );
520 try {
521 $db = $this->getConnection( $row->shardIndex );
522 $dataByKey[$key] = [
523 self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ),
524 self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ),
525 self::BLOB_CASTOKEN => $getCasToken
526 ? $this->getCasTokenFromRow( $db, $row )
527 : null
528 ];
529 } catch ( DBQueryError $e ) {
530 $this->handleDBError( $e, $row->shardIndex );
531 }
532 }
533
534 return $dataByKey;
535 }
536
551 private function modifyBlobs(
552 callable $tableWriteCallback,
553 float $mtime,
554 array $argsByKey,
555 int $flags,
556 &$resByKey = []
557 ) {
558 // Initialize order-preserved per-key results; callbacks mark successful results
559 $resByKey = array_fill_keys( array_keys( $argsByKey ), false );
560
562 $silenceScope = $this->silenceTransactionProfiler();
563
564 $argsByKeyByTableByShard = [];
565 foreach ( $argsByKey as $key => $args ) {
566 list( $shardIndex, $partitionTable ) = $this->getKeyLocation( $key );
567 $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args;
568 }
569
570 $shardIndexesAffected = [];
571 foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) {
572 foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) {
573 try {
574 $db = $this->getConnection( $shardIndex );
575 $shardIndexesAffected[] = $shardIndex;
576 $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey );
577 } catch ( DBError $e ) {
578 $this->handleDBError( $e, $shardIndex );
579 continue;
580 }
581 }
582 }
583
584 $success = !in_array( false, $resByKey, true );
585
586 foreach ( $shardIndexesAffected as $shardIndex ) {
587 try {
588 $db = $this->getConnection( $shardIndex );
589 $this->occasionallyGarbageCollect( $db );
590 } catch ( DBError $e ) {
591 $this->handleDBError( $e, $shardIndex );
592 }
593 }
594
595 return $success;
596 }
597
613 private function modifyTableSpecificBlobsForSet(
614 IDatabase $db,
615 string $ptable,
616 float $mtime,
617 array $argsByKey,
618 array &$resByKey
619 ) {
620 $valueSizesByKey = [];
621
622 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
623
624 if ( $this->multiPrimaryMode ) {
625 // @TODO: use multi-row upsert() with VALUES() once supported in Database
626 foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
627 $serialValue = $this->getSerialized( $value, $key );
628 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
629 $db->upsert(
630 $ptable,
631 $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
632 [ [ 'keyname' ] ],
633 $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
634 __METHOD__
635 );
636 $resByKey[$key] = true;
637
638 $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
639 }
640 } else {
641 // T288998: use REPLACE, if possible, to avoid cluttering the binlogs
642 $rows = [];
643 foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
644 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
645 $serialValue = $this->getSerialized( $value, $key );
646 $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
647
648 $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
649 }
650 $db->replace( $ptable, 'keyname', $rows, __METHOD__ );
651 foreach ( $argsByKey as $key => $unused ) {
652 $resByKey[$key] = true;
653 }
654 }
655
656 $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
657 }
658
675 private function modifyTableSpecificBlobsForDelete(
676 IDatabase $db,
677 string $ptable,
678 float $mtime,
679 array $argsByKey,
680 array &$resByKey
681 ) {
682 if ( $this->multiPrimaryMode ) {
683 // Tombstone keys in order to respect eventual consistency
684 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
685 $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (int)$mtime );
686 $rows = [];
687 foreach ( $argsByKey as $key => $arg ) {
688 $rows[] = $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt );
689 }
690 $db->upsert(
691 $ptable,
692 $rows,
693 [ [ 'keyname' ] ],
694 $this->buildUpsertSetForOverwrite( $db, self::TOMB_SERIAL, $expiry, $mt ),
695 __METHOD__
696 );
697 } else {
698 // Just purge the keys since there is only one primary (e.g. "source of truth")
699 $db->delete( $ptable, [ 'keyname' => array_keys( $argsByKey ) ], __METHOD__ );
700 }
701
702 foreach ( $argsByKey as $key => $arg ) {
703 $resByKey[$key] = true;
704 }
705
706 $this->updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) );
707 }
708
729 private function modifyTableSpecificBlobsForAdd(
730 IDatabase $db,
731 string $ptable,
732 float $mtime,
733 array $argsByKey,
734 array &$resByKey
735 ) {
736 $valueSizesByKey = [];
737
738 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
739
740 // This check must happen outside the write query to respect eventual consistency
741 $existingKeys = $db->newSelectQueryBuilder()
742 ->select( 'keyname' )
743 ->from( $ptable )
744 ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
745 ->caller( __METHOD__ )
746 ->fetchFieldValues();
747 $existingByKey = array_fill_keys( $existingKeys, true );
748
749 // @TODO: use multi-row upsert() with VALUES() once supported in Database
750 foreach ( $argsByKey as $key => list( $value, $exptime ) ) {
751 if ( isset( $existingByKey[$key] ) ) {
752 $this->logger->debug( __METHOD__ . ": $key already exists" );
753 continue;
754 }
755
756 $serialValue = $this->getSerialized( $value, $key );
757 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
758 $db->upsert(
759 $ptable,
760 $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
761 [ [ 'keyname' ] ],
762 $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
763 __METHOD__
764 );
765 $resByKey[$key] = true;
766
767 $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
768 }
769
770 $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey );
771 }
772
793 private function modifyTableSpecificBlobsForCas(
794 IDatabase $db,
795 string $ptable,
796 float $mtime,
797 array $argsByKey,
798 array &$resByKey
799 ) {
800 $valueSizesByKey = [];
801
802 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
803
804 // This check must happen outside the write query to respect eventual consistency
806 ->select( $this->addCasTokenFields( $db, [ 'keyname' ] ) )
807 ->from( $ptable )
808 ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
809 ->caller( __METHOD__ )
810 ->fetchResultSet();
811
812 $curTokensByKey = [];
813 foreach ( $res as $row ) {
814 $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row );
815 }
816
817 // @TODO: use multi-row upsert() with VALUES() once supported in Database
818 foreach ( $argsByKey as $key => list( $value, $exptime, $casToken ) ) {
819 $curToken = $curTokensByKey[$key] ?? null;
820 if ( $curToken === null ) {
821 $this->logger->debug( __METHOD__ . ": $key does not exists" );
822 continue;
823 }
824
825 if ( $curToken !== $casToken ) {
826 $this->logger->debug( __METHOD__ . ": $key does not have a matching token" );
827 continue;
828 }
829
830 $serialValue = $this->getSerialized( $value, $key );
831 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
832 $db->upsert(
833 $ptable,
834 $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
835 [ [ 'keyname' ] ],
836 $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
837 __METHOD__
838 );
839 $resByKey[$key] = true;
840
841 $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
842 }
843
844 $this->updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey );
845 }
846
866 private function modifyTableSpecificBlobsForChangeTTL(
867 IDatabase $db,
868 string $ptable,
869 float $mtime,
870 array $argsByKey,
871 array &$resByKey
872 ) {
873 if ( $this->multiPrimaryMode ) {
874 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
875
877 ->select( [ 'keyname', 'value' ] )
878 ->from( $ptable )
879 ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
880 ->caller( __METHOD__ )
881 ->fetchResultSet();
882 // @TODO: use multi-row upsert() with VALUES() once supported in Database
883 foreach ( $res as $curRow ) {
884 $key = $curRow->keyname;
885 $serialValue = $this->dbDecodeSerialValue( $db, $curRow->value );
886 list( $exptime ) = $argsByKey[$key];
887 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
888
889 $db->upsert(
890 $ptable,
891 $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ),
892 [ [ 'keyname' ] ],
893 $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ),
894 __METHOD__
895 );
896 $resByKey[$key] = true;
897 }
898 } else {
899 $keysBatchesByExpiry = [];
900 foreach ( $argsByKey as $key => list( $exptime ) ) {
901 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
902 $keysBatchesByExpiry[$expiry][] = $key;
903 }
904
905 $existingCount = 0;
906 foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) {
907 $db->update(
908 $ptable,
909 [ 'exptime' => $this->encodeDbExpiry( $db, $expiry ) ],
910 $this->buildExistenceConditions( $db, $keyBatch, (int)$mtime ),
911 __METHOD__
912 );
913 $existingCount += $db->affectedRows();
914 }
915 if ( $existingCount === count( $argsByKey ) ) {
916 foreach ( $argsByKey as $key => $args ) {
917 $resByKey[$key] = true;
918 }
919 }
920 }
921
922 $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) );
923 }
924
944 private function modifyTableSpecificBlobsForIncrInit(
945 IDatabase $db,
946 string $ptable,
947 float $mtime,
948 array $argsByKey,
949 array &$resByKey
950 ) {
951 foreach ( $argsByKey as $key => list( $step, $init, $exptime ) ) {
952 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
953 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
954
955 // Use a transaction so that changes from other threads are not visible due to
956 // "consistent reads". This way, the exact post-increment value can be returned.
957 // The "live key exists" check can go inside the write query and remain safe for
958 // replication since the TTL for such keys is either indefinite or very short.
959 $atomic = $db->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
960 try {
961 $db->upsert(
962 $ptable,
963 $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
964 [ [ 'keyname' ] ],
965 $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ),
966 __METHOD__
967 );
968 $affectedCount = $db->affectedRows();
969 $row = $db->newSelectQueryBuilder()
970 ->select( 'value' )
971 ->from( $ptable )
972 ->where( [ 'keyname' => $key ] )
973 ->caller( __METHOD__ )
974 ->fetchRow();
975 } catch ( Exception $e ) {
976 $db->cancelAtomic( __METHOD__, $atomic );
977 throw $e;
978 }
979 $db->endAtomic( __METHOD__ );
980
981 if ( !$affectedCount || $row === false ) {
982 $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
983 continue;
984 }
985
986 $serialValue = $this->dbDecodeSerialValue( $db, $row->value );
987 if ( !$this->isInteger( $serialValue ) ) {
988 $this->logger->warning( __METHOD__ . ": got non-integer $key value" );
989 continue;
990 }
991
992 $resByKey[$key] = (int)$serialValue;
993 }
994
995 $this->updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) );
996 }
997
1009 private function modifyTableSpecificBlobsForIncrInitAsync(
1010 IDatabase $db,
1011 string $ptable,
1012 float $mtime,
1013 array $argsByKey,
1014 array &$resByKey
1015 ) {
1016 foreach ( $argsByKey as $key => list( $step, $init, $exptime ) ) {
1017 $mt = $this->makeTimestampedModificationToken( $mtime, $db );
1018 $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
1019 $db->upsert(
1020 $ptable,
1021 $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ),
1022 [ [ 'keyname' ] ],
1023 $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ),
1024 __METHOD__
1025 );
1026 if ( !$db->affectedRows() ) {
1027 $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
1028 } else {
1029 $resByKey[$key] = true;
1030 }
1031 }
1032 }
1033
1039 private function makeNewKeyExpiry( $exptime, int $nowTsUnix ) {
1040 $expiry = $this->getExpirationAsTimestamp( $exptime );
1041 // Eventual consistency requires the preservation of recently modified keys.
1042 // Do not create rows with `exptime` fields so low that they might get garbage
1043 // collected before being replicated.
1044 if ( $expiry !== self::TTL_INDEFINITE ) {
1045 $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC );
1046 }
1047
1048 return $expiry;
1049 }
1050
1069 private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) {
1070 if ( !$this->lock( $key, 0 ) ) {
1071 return null;
1072 }
1073
1074 $scope = new ScopedCallback( function () use ( $key ) {
1075 $this->unlock( $key );
1076 } );
1077
1078 // sprintf is used to adjust precision
1079 return (float)sprintf( '%.6F', $this->locks[$key][self::LOCK_TIME] );
1080 }
1081
1091 private function makeTimestampedModificationToken( float $mtime, IDatabase $db ) {
1092 // We have reserved space for upto 6 digits in the microsecond portion of the token.
1093 // This is for future use only (maybe CAS tokens) and not currently used.
1094 // It is currently populated by the microsecond portion returned by microtime,
1095 // which generally has fewer than 6 digits of meaningful precision but can still be useful
1096 // in debugging (to see the token continuously change even during rapid testing).
1097 $seconds = (int)$mtime;
1098 list( , $microseconds ) = explode( '.', sprintf( '%.6F', $mtime ) );
1099
1100 $id = $db->getTopologyBasedServerId() ?? sprintf( '%u', crc32( $db->getServerName() ) );
1101
1102 $token = implode( '', [
1103 // 67 bit integral portion of UNIX timestamp, qualified
1104 \Wikimedia\base_convert(
1105 // 35 bit integral seconds portion of UNIX timestamp
1106 str_pad( base_convert( (string)$seconds, 10, 2 ), 35, '0', STR_PAD_LEFT ) .
1107 // 32 bit ID of the primary database server handling the write
1108 str_pad( base_convert( $id, 10, 2 ), 32, '0', STR_PAD_LEFT ),
1109 2,
1110 36,
1111 13
1112 ),
1113 // 20 bit fractional portion of UNIX timestamp, as integral microseconds
1114 str_pad( base_convert( $microseconds, 10, 36 ), 4, '0', STR_PAD_LEFT )
1115 ] );
1116
1117 if ( strlen( $token ) !== 17 ) {
1118 throw new RuntimeException( "Modification timestamp overflow detected" );
1119 }
1120
1121 return $token;
1122 }
1123
1132 private function buildExistenceConditions( IDatabase $db, $keys, int $time ) {
1133 // Note that tombstones always have past expiration dates
1134 return [
1135 'keyname' => $keys,
1136 'exptime >= ' . $db->addQuotes( $db->timestamp( $time ) )
1137 ];
1138 }
1139
1150 private function buildUpsertRow(
1151 IDatabase $db,
1152 $key,
1153 $serialValue,
1154 int $expiry,
1155 string $mt
1156 ) {
1157 $row = [
1158 'keyname' => $key,
1159 'value' => $this->dbEncodeSerialValue( $db, $serialValue ),
1160 'exptime' => $this->encodeDbExpiry( $db, $expiry )
1161 ];
1162 if ( $this->multiPrimaryMode ) {
1163 $row['modtoken'] = $mt;
1164 }
1165
1166 return $row;
1167 }
1168
1178 private function buildUpsertSetForOverwrite(
1179 IDatabase $db,
1180 $serialValue,
1181 int $expiry,
1182 string $mt
1183 ) {
1184 $expressionsByColumn = [
1185 'value' => $db->addQuotes( $this->dbEncodeSerialValue( $db, $serialValue ) ),
1186 'exptime' => $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1187 ];
1188
1189 $set = [];
1190 if ( $this->multiPrimaryMode ) {
1191 // The query might take a while to replicate, during which newer values might get
1192 // written. Qualify the query so that it does not override such values. Note that
1193 // duplicate tokens generated around the same time for a key should still result
1194 // in convergence given the use of server_id in modtoken (providing a total order
1195 // among primary DB servers) and MySQL binlog ordering (providing a total order
1196 // for writes replicating from a given primary DB server).
1197 $expressionsByColumn['modtoken'] = $db->addQuotes( $mt );
1198 foreach ( $expressionsByColumn as $column => $updateExpression ) {
1199 $rhs = $db->conditional(
1200 $db->addQuotes( substr( $mt, 0, 13 ) ) . ' >= ' .
1201 $db->buildSubString( 'modtoken', 1, 13 ),
1202 $updateExpression,
1203 $column
1204 );
1205 $set[] = "{$column}=" . trim( $rhs );
1206 }
1207 } else {
1208 foreach ( $expressionsByColumn as $column => $updateExpression ) {
1209 $set[] = "{$column}={$updateExpression}";
1210 }
1211 }
1212
1213 return $set;
1214 }
1215
1227 private function buildIncrUpsertSet(
1228 IDatabase $db,
1229 int $step,
1230 int $init,
1231 int $expiry,
1232 string $mt,
1233 int $mtUnixTs
1234 ) {
1235 // Map of (column => (SQL for non-expired key rows, SQL for expired key rows))
1236 $expressionsByColumn = [
1237 'value' => [
1238 $db->buildIntegerCast( 'value' ) . " + {$db->addQuotes( $step )}",
1239 $db->addQuotes( $this->dbEncodeSerialValue( $db, $init ) )
1240 ],
1241 'exptime' => [
1242 'exptime',
1243 $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1244 ]
1245 ];
1246 if ( $this->multiPrimaryMode ) {
1247 $expressionsByColumn['modtoken'] = [ 'modtoken', $db->addQuotes( $mt ) ];
1248 }
1249
1250 $set = [];
1251 foreach ( $expressionsByColumn as $column => list( $updateExpression, $initExpression ) ) {
1252 $rhs = $db->conditional(
1253 'exptime >= ' . $db->addQuotes( $db->timestamp( $mtUnixTs ) ),
1254 $updateExpression,
1255 $initExpression
1256 );
1257 $set[] = "{$column}=" . trim( $rhs );
1258 }
1259
1260 return $set;
1261 }
1262
1268 private function encodeDbExpiry( IDatabase $db, int $expiry ) {
1269 return ( $expiry === self::TTL_INDEFINITE )
1270 // Use the maximum timestamp that the column can store
1271 ? $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER )
1272 // Convert the absolute timestamp into the DB timestamp format
1273 : $db->timestamp( $expiry );
1274 }
1275
1281 private function decodeDbExpiry( IDatabase $db, string $dbExpiry ) {
1282 return ( $dbExpiry === $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) )
1283 ? self::TTL_INDEFINITE
1284 : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry );
1285 }
1286
1292 private function dbEncodeSerialValue( IDatabase $db, $serialValue ) {
1293 return is_int( $serialValue ) ? (string)$serialValue : $db->encodeBlob( $serialValue );
1294 }
1295
1301 private function dbDecodeSerialValue( IDatabase $db, $blob ) {
1302 return $this->isInteger( $blob ) ? (int)$blob : $db->decodeBlob( $blob );
1303 }
1304
1312 private function addCasTokenFields( IDatabase $db, array $fields ) {
1313 $type = $db->getType();
1314
1315 if ( $type === 'mysql' ) {
1316 $fields['castoken'] = $db->buildConcat( [
1317 'SHA1(value)',
1318 $db->addQuotes( '@' ),
1319 'exptime'
1320 ] );
1321 } elseif ( $type === 'postgres' ) {
1322 $fields['castoken'] = $db->buildConcat( [
1323 'md5(value)',
1324 $db->addQuotes( '@' ),
1325 'exptime'
1326 ] );
1327 } else {
1328 if ( !in_array( 'value', $fields, true ) ) {
1329 $fields[] = 'value';
1330 }
1331 if ( !in_array( 'exptime', $fields, true ) ) {
1332 $fields[] = 'exptime';
1333 }
1334 }
1335
1336 return $fields;
1337 }
1338
1346 private function getCasTokenFromRow( IDatabase $db, stdClass $row ) {
1347 if ( isset( $row->castoken ) ) {
1348 $token = $row->castoken;
1349 } else {
1350 $token = sha1( $this->dbDecodeSerialValue( $db, $row->value ) ) . '@' . $row->exptime;
1351 $this->logger->debug( __METHOD__ . ": application computed hash for CAS token" );
1352 }
1353
1354 return $token;
1355 }
1356
1361 private function occasionallyGarbageCollect( IDatabase $db ) {
1362 if (
1363 // Random purging is enabled
1364 $this->purgePeriod &&
1365 // Only purge on one in every $this->purgePeriod writes
1366 mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
1367 // Avoid repeating the delete within a few seconds
1368 ( $this->getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC
1369 ) {
1370 $garbageCollector = function () use ( $db ) {
1371 $this->deleteServerObjectsExpiringBefore(
1372 $db,
1373 (int)$this->getCurrentTime(),
1374 $this->purgeLimit
1375 );
1376 $this->lastGarbageCollect = time();
1377 };
1378 if ( $this->asyncHandler ) {
1379 $this->lastGarbageCollect = $this->getCurrentTime(); // avoid duplicate enqueues
1380 ( $this->asyncHandler )( $garbageCollector );
1381 } else {
1382 $garbageCollector();
1383 }
1384 }
1385 }
1386
1387 public function expireAll() {
1388 $this->deleteObjectsExpiringBefore( (int)$this->getCurrentTime() );
1389 }
1390
1392 $timestamp,
1393 callable $progress = null,
1394 $limit = INF,
1395 string $tag = null
1396 ) {
1398 $silenceScope = $this->silenceTransactionProfiler();
1399
1400 if ( $tag !== null ) {
1401 // Purge one server only, to support concurrent purging in large wiki farms (T282761).
1402 $shardIndexes = [ $this->getShardServerIndexForTag( $tag ) ];
1403 } else {
1404 $shardIndexes = $this->getShardServerIndexes();
1405 shuffle( $shardIndexes );
1406 }
1407
1408 $ok = true;
1409 $numServers = count( $shardIndexes );
1410
1411 $keysDeletedCount = 0;
1412 foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
1413 try {
1414 $db = $this->getConnection( $shardIndex );
1415 $this->deleteServerObjectsExpiringBefore(
1416 $db,
1417 $timestamp,
1418 $limit,
1419 $keysDeletedCount,
1420 [ 'fn' => $progress, 'serversDone' => $numServersDone, 'serversTotal' => $numServers ]
1421 );
1422 } catch ( DBError $e ) {
1423 $this->handleDBError( $e, $shardIndex );
1424 $ok = false;
1425 }
1426 }
1427
1428 return $ok;
1429 }
1430
1440 private function deleteServerObjectsExpiringBefore(
1441 IDatabase $db,
1442 $timestamp,
1443 $limit,
1444 &$keysDeletedCount = 0,
1445 array $progress = null
1446 ) {
1447 $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
1448 if ( $this->multiPrimaryMode ) {
1449 // Eventual consistency requires the preservation of any key that was recently
1450 // modified. The key must exist on this database server long enough for the server
1451 // to receive, via replication, all writes to the key with lower timestamps. Such
1452 // writes should be no-ops since the existing key value should "win". If the network
1453 // partitions between datacenters A and B for 30 minutes, the database servers in
1454 // each datacenter will see an initial burst of writes with "old" timestamps via
1455 // replication. This might include writes with lower timestamps that the existing
1456 // key value. Therefore, clock skew and replication delay are both factors.
1457 $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC;
1458 $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe );
1459 }
1460 $tableIndexes = range( 0, $this->numTableShards - 1 );
1461 shuffle( $tableIndexes );
1462
1463 $batchSize = min( $this->writeBatchSize, $limit );
1464
1465 foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
1466 // The oldest expiry of a row we have deleted on this shard
1467 // (the first row that we deleted)
1468 $minExpUnix = null;
1469 // The most recent expiry time so far, from a row we have deleted on this shard
1470 $maxExp = null;
1471 // Size of the time range we'll delete, in seconds (for progress estimate)
1472 $totalSeconds = null;
1473
1474 do {
1476 ->select( [ 'keyname', 'exptime' ] )
1477 ->from( $this->getTableNameByShard( $tableIndex ) )
1478 ->where(
1479 array_merge(
1480 [ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ) ],
1481 $maxExp ? [ 'exptime >= ' . $db->addQuotes( $maxExp ) ] : []
1482 )
1483 )
1484 ->orderBy( 'exptime', SelectQueryBuilder::SORT_ASC )
1485 ->limit( $batchSize )
1486 ->caller( __METHOD__ )
1487 ->fetchResultSet();
1488
1489 if ( $res->numRows() ) {
1490 $row = $res->current();
1491 if ( $minExpUnix === null ) {
1492 $minExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
1493 $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 );
1494 }
1495
1496 $keys = [];
1497 foreach ( $res as $row ) {
1498 $keys[] = $row->keyname;
1499 $maxExp = $row->exptime;
1500 }
1501
1502 $db->delete(
1503 $this->getTableNameByShard( $tableIndex ),
1504 [
1505 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ),
1506 'keyname' => $keys
1507 ],
1508 __METHOD__
1509 );
1510 $keysDeletedCount += $db->affectedRows();
1511 }
1512
1513 if ( $progress && is_callable( $progress['fn'] ) ) {
1514 if ( $totalSeconds ) {
1515 $maxExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $maxExp );
1516 $remainingSeconds = $cutoffUnix - $maxExpUnix;
1517 $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 );
1518 // For example, if we've done 1.5 table shard, and are thus half-way on the
1519 // 2nd of perhaps 5 tables on this server, then this might be:
1520 // `( 1 + ( 43200 / 86400 ) ) / 5 = 0.3`, or 30% done, of tables on this server.
1521 $tablesDoneRatio =
1522 ( $numShardsDone + ( $processedSeconds / $totalSeconds ) ) / $this->numTableShards;
1523 } else {
1524 $tablesDoneRatio = 1;
1525 }
1526
1527 // For example, if we're 30% done on the last of 10 servers, then this might be:
1528 // `( 9 / 10 ) + ( 0.3 / 10 ) = 0.93`, or 93% done, overall.
1529 $overallRatio = ( $progress['serversDone'] / $progress['serversTotal'] ) +
1530 ( $tablesDoneRatio / $progress['serversTotal'] );
1531 ( $progress['fn'] )( $overallRatio * 100 );
1532 }
1533 } while ( $res->numRows() && $keysDeletedCount < $limit );
1534 }
1535 }
1536
1542 public function deleteAll() {
1544 $silenceScope = $this->silenceTransactionProfiler();
1545 foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1546 $db = null; // in case of connection failure
1547 try {
1548 $db = $this->getConnection( $shardIndex );
1549 for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1550 $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ );
1551 }
1552 } catch ( DBError $e ) {
1553 $this->handleDBError( $e, $shardIndex );
1554 return false;
1555 }
1556 }
1557 return true;
1558 }
1559
1560 public function doLock( $key, $timeout = 6, $exptime = 6 ) {
1562 $silenceScope = $this->silenceTransactionProfiler();
1563
1564 $lockTsUnix = null;
1565
1566 list( $shardIndex ) = $this->getKeyLocation( $key );
1567 try {
1568 $db = $this->getConnection( $shardIndex );
1569 $lockTsUnix = $db->lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP );
1570 } catch ( DBError $e ) {
1571 $this->handleDBError( $e, $shardIndex );
1572 $this->logger->warning(
1573 __METHOD__ . ' failed due to I/O error for {key}.',
1574 [ 'key' => $key ]
1575 );
1576 }
1577
1578 return $lockTsUnix;
1579 }
1580
1581 public function doUnlock( $key ) {
1583 $silenceScope = $this->silenceTransactionProfiler();
1584
1585 list( $shardIndex ) = $this->getKeyLocation( $key );
1586
1587 try {
1588 $db = $this->getConnection( $shardIndex );
1589 $released = $db->unlock( $key, __METHOD__ );
1590 } catch ( DBError $e ) {
1591 $this->handleDBError( $e, $shardIndex );
1592 $released = false;
1593 }
1594
1595 return $released;
1596 }
1597
1598 public function makeKeyInternal( $keyspace, $components ) {
1599 // SQL schema for 'objectcache' specifies keys as varchar(255). From that,
1600 // subtract the number of characters we need for the keyspace and for
1601 // the separator character needed for each argument. To handle some
1602 // custom prefixes used by thing like WANObjectCache, limit to 205.
1603 $keyspace = strtr( $keyspace, ' ', '_' );
1604 $charsLeft = 205 - strlen( $keyspace ) - count( $components );
1605 foreach ( $components as &$component ) {
1606 $component = strtr( $component, [
1607 ' ' => '_', // Avoid unnecessary misses from pre-1.35 code
1608 ':' => '%3A',
1609 ] );
1610
1611 // 33 = 32 characters for the MD5 + 1 for the '#' prefix.
1612 if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) {
1613 $component = '#' . md5( $component );
1614 }
1615 $charsLeft -= strlen( $component );
1616 }
1617
1618 if ( $charsLeft < 0 ) {
1619 return $keyspace . ':BagOStuff-long-key:##' . md5( implode( ':', $components ) );
1620 }
1621 return $keyspace . ':' . implode( ':', $components );
1622 }
1623
1624 protected function serialize( $value ) {
1625 if ( is_int( $value ) ) {
1626 return $value;
1627 }
1628
1629 $serial = serialize( $value );
1630 if ( $this->hasZlib ) {
1631 // On typical message and page data, this can provide a 3X storage savings
1632 $serial = gzdeflate( $serial );
1633 }
1634
1635 return $serial;
1636 }
1637
1638 protected function unserialize( $value ) {
1639 if ( $value === self::TOMB_SERIAL ) {
1640 return false; // tombstone
1641 }
1642
1643 if ( $this->isInteger( $value ) ) {
1644 return (int)$value;
1645 }
1646
1647 if ( $this->hasZlib ) {
1648 AtEase::suppressWarnings();
1649 $decompressed = gzinflate( $value );
1650 AtEase::restoreWarnings();
1651
1652 if ( $decompressed !== false ) {
1653 $value = $decompressed;
1654 }
1655 }
1656
1657 return unserialize( $value );
1658 }
1659
1660 private function getLoadBalancer(): ILoadBalancer {
1661 if ( !$this->loadBalancer ) {
1662 $this->loadBalancer = ( $this->loadBalancerCallback )();
1663 }
1664 return $this->loadBalancer;
1665 }
1666
1671 private function getConnectionViaLoadBalancer() {
1672 $lb = $this->getLoadBalancer();
1673
1674 if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) {
1675 // Use the main connection to avoid transaction deadlocks
1676 $conn = $lb->getMaintenanceConnectionRef( DB_PRIMARY, [], $this->dbDomain );
1677 } else {
1678 // If the RDBMS has row/table/page level locking, then use separate auto-commit
1679 // connection to avoid needless contention and deadlocks.
1680 $conn = $lb->getMaintenanceConnectionRef(
1681 $this->replicaOnly ? DB_REPLICA : DB_PRIMARY,
1682 [],
1683 $this->dbDomain,
1684 $lb::CONN_TRX_AUTOCOMMIT
1685 );
1686 }
1687
1688 // Make sure any errors are thrown now while we can more easily handle them
1689 $conn->ensureConnection();
1690 return $conn;
1691 }
1692
1699 private function getConnectionFromServerInfo( $shardIndex, array $server ) {
1700 if ( !isset( $this->conns[$shardIndex] ) ) {
1702 $conn = Database::factory(
1703 $server['type'],
1704 array_merge(
1705 $server,
1706 [
1707 // Make sure the handle uses autocommit mode
1708 'flags' => ( $server['flags'] ?? 0 ) & ~IDatabase::DBO_TRX,
1709 'connLogger' => $this->logger,
1710 'queryLogger' => $this->logger
1711 ]
1712 )
1713 );
1714 // Automatically create the objectcache table for sqlite as needed
1715 if ( $conn->getType() === 'sqlite' ) {
1716 $this->initSqliteDatabase( $conn );
1717 }
1718 $this->conns[$shardIndex] = $conn;
1719 }
1720
1721 // @phan-suppress-next-line PhanTypeMismatchReturnNullable False positive
1722 return $this->conns[$shardIndex];
1723 }
1724
1731 private function handleDBError( DBError $exception, $shardIndex ) {
1732 if ( !$this->useLB && $exception instanceof DBConnectionError ) {
1733 $this->markServerDown( $exception, $shardIndex );
1734 }
1735 $this->setAndLogDBError( $exception );
1736 }
1737
1741 private function setAndLogDBError( DBError $e ) {
1742 $this->logger->error( "DBError: {$e->getMessage()}", [ 'exception' => $e ] );
1743 if ( $e instanceof DBConnectionError ) {
1744 $this->setLastError( self::ERR_UNREACHABLE );
1745 $this->logger->warning( __METHOD__ . ": ignoring connection error" );
1746 } else {
1747 $this->setLastError( self::ERR_UNEXPECTED );
1748 $this->logger->warning( __METHOD__ . ": ignoring query error" );
1749 }
1750 }
1751
1758 private function markServerDown( DBError $exception, $shardIndex ) {
1759 unset( $this->conns[$shardIndex] ); // bug T103435
1760
1761 $now = $this->getCurrentTime();
1762 if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
1763 if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
1764 unset( $this->connFailureTimes[$shardIndex] );
1765 unset( $this->connFailureErrors[$shardIndex] );
1766 } else {
1767 $this->logger->debug( __METHOD__ . ": Server #$shardIndex already down" );
1768 return;
1769 }
1770 }
1771 $this->logger->info( __METHOD__ . ": Server #$shardIndex down until " . ( $now + 60 ) );
1772 $this->connFailureTimes[$shardIndex] = $now;
1773 $this->connFailureErrors[$shardIndex] = $exception;
1774 }
1775
1780 private function initSqliteDatabase( IMaintainableDatabase $db ) {
1781 if ( $db->tableExists( 'objectcache', __METHOD__ ) ) {
1782 return;
1783 }
1784 // Use one table for SQLite; sharding does not seem to have much benefit
1785 $db->query( "PRAGMA journal_mode=WAL", __METHOD__ ); // this is permanent
1786 $db->startAtomic( __METHOD__ ); // atomic DDL
1787 try {
1788 $encTable = $db->tableName( 'objectcache' );
1789 $encExptimeIndex = $db->addIdentifierQuotes( $db->tablePrefix() . 'exptime' );
1790 $db->query(
1791 "CREATE TABLE $encTable (\n" .
1792 " keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
1793 " value BLOB,\n" .
1794 " exptime BLOB NOT NULL\n" .
1795 ")",
1796 __METHOD__
1797 );
1798 $db->query( "CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ );
1799 $db->endAtomic( __METHOD__ );
1800 } catch ( DBError $e ) {
1801 $db->rollback( __METHOD__ );
1802 throw $e;
1803 }
1804 }
1805
1821 public function createTables() {
1822 foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1823 $db = $this->getConnection( $shardIndex );
1824 if ( in_array( $db->getType(), [ 'mysql', 'postgres' ], true ) ) {
1825 for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1826 $encBaseTable = $db->tableName( 'objectcache' );
1827 $encShardTable = $db->tableName( $this->getTableNameByShard( $i ) );
1828 $db->query( "CREATE TABLE $encShardTable LIKE $encBaseTable", __METHOD__ );
1829 }
1830 }
1831 }
1832 }
1833
1837 private function getShardServerIndexes() {
1838 if ( $this->useLB ) {
1839 // LoadBalancer based configuration
1840 $shardIndexes = [ 0 ];
1841 } else {
1842 // Striped array of database servers
1843 $shardIndexes = array_keys( $this->serverTags );
1844 }
1845
1846 return $shardIndexes;
1847 }
1848
1854 private function getShardServerIndexForTag( string $tag ) {
1855 if ( !$this->serverTags ) {
1856 throw new InvalidArgumentException( "Given a tag but no tags are configured" );
1857 }
1858 foreach ( $this->serverTags as $serverShardIndex => $serverTag ) {
1859 if ( $tag === $serverTag ) {
1860 return $serverShardIndex;
1861 }
1862 }
1863 throw new InvalidArgumentException( "Unknown server tag: $tag" );
1864 }
1865
1871 private function silenceTransactionProfiler() {
1872 if ( $this->serverInfos ) {
1873 return null; // no TransactionProfiler injected anyway
1874 }
1875 return Profiler::instance()->getTransactionProfiler()->silenceForScope();
1876 }
1877}
serialize()
unserialize( $serialized)
if(!defined('MW_SETUP_CALLBACK'))
The persistent session ID (if any) loaded at startup.
Definition WebStart.php:82
static consistentHashSort(&$array, $key, $separator="\000")
Sort the given array in a pseudo-random order which depends only on the given key and each element va...
string $keyspace
Default keyspace; used by makeKey()
Storage medium specific cache for storing items (e.g.
getExpirationAsTimestamp( $exptime)
Convert an optionally relative timestamp to an absolute time.
getSerialized( $value, $key)
Get the serialized form a value, using any applicable prepared value.
unlock( $key)
Release an advisory lock on a key string.
updateOpStats(string $op, array $keyInfo)
isInteger( $value)
Check if a value is an integer.
lock( $key, $timeout=6, $exptime=6, $rclass='')
static instance()
Singleton.
Definition Profiler.php:69
RDBMS-based caching module.
bool $multiPrimaryMode
Whether multi-primary mode is enabled.
deleteObjectsExpiringBefore( $timestamp, callable $progress=null, $limit=INF, string $tag=null)
Delete all objects expiring before a certain date.
ILoadBalancer null $loadBalancer
doDelete( $key, $flags=0)
Delete an item.
float $lastGarbageCollect
UNIX timestamp.
string[] $serverTags
(server index => tag/host name)
doSetMulti(array $data, $exptime=0, $flags=0)
createTables()
Create the shard tables on all databases.
serialize( $value)
int $purgePeriod
Average number of writes required to trigger garbage collection.
doChangeTTL( $key, $exptime, $flags)
incr( $key, $value=1, $flags=0)
Increase stored value of $key by $value while preserving its TTL.
unserialize( $value)
array[] $serverInfos
(server index => server config)
deleteAll()
Delete content of shard tables in every server.
callable null $loadBalancerCallback
Injected function which returns a LoadBalancer.
doDeleteMulti(array $keys, $flags=0)
int $numTableShards
Number of table shards to use on each server.
int $purgeLimit
Max expired rows to purge during randomized garbage collection.
__construct( $params)
Create a new backend instance from parameters injected by ObjectCache::newFromParams()
doGet( $key, $flags=0, &$casToken=null)
Get an item.
doAdd( $key, $value, $exptime=0, $flags=0)
Insert an item if it does not already exist.
bool $replicaOnly
Whether to use replicas instead of primaries (if using LoadBalancer)
doSet( $key, $value, $exptime=0, $flags=0)
Set an item.
decr( $key, $value=1, $flags=0)
Decrease stored value of $key by $value while preserving its TTL.
makeKeyInternal( $keyspace, $components)
Make a cache key for the given keyspace and components.
doLock( $key, $timeout=6, $exptime=6)
Exception[] $connFailureErrors
Map of (shard index => Exception)
doIncrWithInit( $key, $exptime, $step, $init, $flags)
IMaintainableDatabase[] $conns
Map of (shard index => DB handle)
string $tableName
doGetMulti(array $keys, $flags=0)
Get an associative array containing the item for each of the keys that have items.
bool $useLB
Whether to use the LoadBalancer.
doCas( $casToken, $key, $value, $exptime=0, $flags=0)
Set an item if the current CAS token matches the provided CAS token.
string false null $dbDomain
DB name used for keys using the LoadBalancer.
doChangeTTLMulti(array $keys, $exptime, $flags=0)
float[] $connFailureTimes
Map of (shard index => UNIX timestamps)
Database error base class.
Definition DBError.php:31
static factory( $type, $params=[], $connect=self::NEW_CONNECTED)
Construct a Database subclass instance given a database type and parameters.
Definition Database.php:379
Note that none of the methods in this class are stable to override.
addQuotes( $s)
Escape and quote a raw value string for use in a SQL query.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:39
rollback( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Rollback a transaction previously started using begin()
unlock( $lockName, $method)
Release a lock.
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
lock( $lockName, $method, $timeout=5, $flags=0)
Acquire a named lock.
getTopologyBasedServerId()
Get a non-recycled ID that uniquely identifies this server within the replication topology.
affectedRows()
Get the number of rows affected by the last write query.
delete( $table, $conds, $fname=__METHOD__)
Delete all rows in a table that match a condition.
update( $table, $set, $conds, $fname=__METHOD__, $options=[])
Update all rows in a table that match a given condition.
upsert( $table, array $rows, $uniqueKeys, array $set, $fname=__METHOD__)
Upsert row(s) into a table, in the provided order, while updating conflicting rows.
getType()
Get the RDBMS type of the server (e.g.
newSelectQueryBuilder()
Create an empty SelectQueryBuilder which can be used to run queries against this connection.
tablePrefix( $prefix=null)
Get/set the table prefix.
query( $sql, $fname=__METHOD__, $flags=0)
Run an SQL query statement and return the result.
replace( $table, $uniqueKeys, $rows, $fname=__METHOD__)
Insert row(s) into a table, in the provided order, while deleting conflicting rows.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
getServerName()
Get the readable name for the server.
cancelAtomic( $fname=__METHOD__, AtomicSectionIdentifier $sectionId=null)
Cancel an atomic section of SQL statements.
Create and track the database connections and transactions for a given database cluster.
Advanced database interface for IDatabase handles that include maintenance methods.
tableExists( $table, $fname=__METHOD__)
Query whether a given table exists.
conditional( $cond, $caseTrueExpression, $caseFalseExpression)
Returns an SQL expression for a simple conditional.
tableName( $name, $format='quoted')
Format a table name ready for use in constructing an SQL query.
buildSubString( $input, $startPosition, $length=null)
Build a SUBSTRING function.
addIdentifierQuotes( $s)
Escape a SQL identifier (e.g.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
buildConcat( $stringList)
Build a concatenation list to feed into a SQL query.
if( $line===false) $args
Definition mcc.php:124
This program is free software; you can redistribute it and/or modify it under the terms of the GNU Ge...
const DB_REPLICA
Definition defines.php:26
const DB_PRIMARY
Definition defines.php:28