31 use Wikimedia\ScopedCallback;
32 use Wikimedia\WaitConditionLoop;
107 parent::__construct(
$params );
112 if ( isset(
$params[
'servers'] ) ) {
113 $this->serverInfos = [];
114 $this->serverTags = [];
117 foreach (
$params[
'servers']
as $tag => $info ) {
118 $this->serverInfos[$index] = $info;
119 if ( is_string( $tag ) ) {
120 $this->serverTags[$index] = $tag;
122 $this->serverTags[$index] = $info[
'host'] ??
"#$index";
126 } elseif ( isset(
$params[
'server'] ) ) {
127 $this->serverInfos = [
$params[
'server'] ];
128 $this->numServers =
count( $this->serverInfos );
131 $this->serverInfos =
false;
132 $this->numServers = 1;
135 if ( isset(
$params[
'purgePeriod'] ) ) {
136 $this->purgePeriod = intval(
$params[
'purgePeriod'] );
138 if ( isset(
$params[
'tableName'] ) ) {
141 if ( isset(
$params[
'shards'] ) ) {
142 $this->shards = intval(
$params[
'shards'] );
144 if ( isset(
$params[
'syncTimeout'] ) ) {
145 $this->syncTimeout =
$params[
'syncTimeout'];
147 $this->replicaOnly = !empty(
$params[
'slaveOnly'] );
157 protected function getDB( $serverIndex ) {
158 if ( !isset( $this->conns[$serverIndex] ) ) {
159 if ( $serverIndex >= $this->numServers ) {
160 throw new MWException( __METHOD__ .
": Invalid server index \"$serverIndex\"" );
163 # Don't keep timing out trying to connect for each call if the DB is down
164 if ( isset( $this->connFailureErrors[$serverIndex] )
165 && ( time() - $this->connFailureTimes[$serverIndex] ) < 60
167 throw $this->connFailureErrors[$serverIndex];
170 if ( $this->serverInfos ) {
172 $info = $this->serverInfos[$serverIndex];
173 $type = $info[
'type'] ??
'mysql';
174 $host = $info[
'host'] ??
'[unknown]';
175 $this->logger->debug( __CLASS__ .
": connecting to $host" );
176 $db = Database::factory(
$type, $info );
180 $lb = MediaWikiServices::getInstance()->getDBLoadBalancer();
182 if ( $lb->getServerType( $lb->getWriterIndex() ) !==
'sqlite' ) {
184 $db = $lb->getConnection( $index, [],
false, $lb::CONN_TRX_AUTOCOMMIT );
188 $db = $lb->getConnection( $index );
192 $this->logger->debug( sprintf(
"Connection %s will be used for SqlBagOStuff", $db ) );
193 $this->conns[$serverIndex] = $db;
196 return $this->conns[$serverIndex];
205 if ( $this->shards > 1 ) {
206 $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
211 if ( $this->numServers > 1 ) {
214 reset( $sortedServers );
215 $serverIndex =
key( $sortedServers );
228 if ( $this->shards > 1 ) {
229 $decimals = strlen( $this->shards - 1 );
231 sprintf(
"%0{$decimals}d", $index );
237 protected function doGet( $key, $flags = 0, &$casToken =
null ) {
241 if ( array_key_exists( $key, $blobs ) ) {
242 $blob = $blobs[$key];
257 foreach ( $blobs
as $key =>
$blob ) {
270 $keysByTable[$serverIndex][
$tableName][] = $key;
276 foreach ( $keysByTable
as $serverIndex => $serverKeys ) {
278 $db = $this->
getDB( $serverIndex );
281 [
'keyname',
'value',
'exptime' ],
282 [
'keyname' => $tableKeys ],
287 $db->trxLevel() ? [
'LOCK IN SHARE MODE' ] : []
289 if (
$res ===
false ) {
292 foreach (
$res as $row ) {
293 $row->serverIndex = $serverIndex;
295 $dataRows[$row->keyname] = $row;
304 if ( isset( $dataRows[$key] ) ) {
305 $row = $dataRows[$key];
306 $this->
debug(
"get: retrieved data; expiry time is " . $row->exptime );
309 $db = $this->
getDB( $row->serverIndex );
310 if ( $this->
isExpired( $db, $row->exptime ) ) {
311 $this->
debug(
"get: key has expired" );
313 $values[$key] = $db->decodeBlob( $row->value );
319 $this->
debug(
'get: no matching rows' );
327 return $this->
insertMulti( $data, $expiry, $flags,
true );
334 $keysByTable[$serverIndex][
$tableName][] = $key;
340 $exptime = (int)$expiry;
342 foreach ( $keysByTable
as $serverIndex => $serverKeys ) {
345 $db = $this->
getDB( $serverIndex );
352 if ( $exptime < 0 ) {
356 if ( $exptime == 0 ) {
360 $encExpiry = $db->timestamp( $exptime );
364 foreach ( $tableKeys
as $key ) {
367 'value' => $db->encodeBlob( $this->
serialize( $data[$key] ) ),
368 'exptime' => $encExpiry,
387 if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
394 public function set( $key,
$value, $exptime = 0, $flags = 0 ) {
400 public function add( $key,
$value, $exptime = 0, $flags = 0 ) {
406 protected function cas( $casToken, $key,
$value, $exptime = 0, $flags = 0 ) {
411 $db = $this->
getDB( $serverIndex );
412 $exptime = intval( $exptime );
414 if ( $exptime < 0 ) {
418 if ( $exptime == 0 ) {
422 $encExpiry = $db->timestamp( $exptime );
430 'value' => $db->encodeBlob( $this->serialize(
$value ) ),
431 'exptime' => $encExpiry
435 'value' => $db->encodeBlob( $casToken )
445 return (
bool)$db->affectedRows();
452 $keysByTable[$serverIndex][
$tableName][] = $key;
457 foreach ( $keysByTable
as $serverIndex => $serverKeys ) {
460 $db = $this->
getDB( $serverIndex );
469 $db->delete(
$tableName, [
'keyname' => $tableKeys ], __METHOD__ );
478 if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
485 public function delete( $key, $flags = 0 ) {
491 public function incr( $key, $step = 1 ) {
496 $db = $this->
getDB( $serverIndex );
497 $step = intval( $step );
498 $row = $db->selectRow(
500 [
'value',
'exptime' ],
501 [
'keyname' => $key ],
505 if ( $row ===
false ) {
509 $db->delete(
$tableName, [
'keyname' => $key ], __METHOD__ );
510 if ( $this->
isExpired( $db, $row->exptime ) ) {
515 $oldValue = intval( $this->
unserialize( $db->decodeBlob( $row->value ) ) );
516 $newValue = $oldValue + $step;
521 'value' => $db->encodeBlob( $this->serialize( $newValue ) ),
522 'exptime' => $row->exptime
528 if ( $db->affectedRows() == 0 ) {
540 public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
541 $ok = $this->
mergeViaCas( $key, $callback, $exptime, $attempts, $flags );
542 if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
549 public function changeTTL( $key, $exptime = 0, $flags = 0 ) {
554 $db = $this->
getDB( $serverIndex );
555 if ( $exptime == 0 ) {
562 [
'exptime' => $timestamp ],
563 [
'keyname' => $key,
'exptime > ' . $db->addQuotes( $db->timestamp( time() ) ) ],
566 if ( $db->affectedRows() == 0 ) {
567 $exists = (bool)$db->selectField(
570 [
'keyname' => $key,
'exptime' => $timestamp ],
599 if ( time() > 0x7fffffff ) {
600 return $db->timestamp( 1 << 62 );
602 return $db->timestamp( 0x7fffffff );
607 if ( !$this->purgePeriod || $this->replicaOnly ) {
612 if ( $this->purgePeriod !== 1 && mt_rand( 0, $this->purgePeriod - 1 ) ) {
617 if ( $now > ( $this->lastExpireAll + 1 ) ) {
618 $this->lastExpireAll = $now;
638 $db = $this->
getDB( $serverIndex );
639 $dbTimestamp = $db->timestamp( $timestamp );
640 $totalSeconds =
false;
641 $baseConds = [
'exptime < ' . $db->addQuotes( $dbTimestamp ) ];
646 if ( $maxExpTime !==
false ) {
647 $conds[] =
'exptime >= ' . $db->addQuotes( $maxExpTime );
651 [
'keyname',
'exptime' ],
654 [
'LIMIT' => 100,
'ORDER BY' =>
'exptime' ] );
659 $row =
$rows->current();
660 $minExpTime = $row->exptime;
661 if ( $totalSeconds ===
false ) {
666 $keys[] = $row->keyname;
667 $maxExpTime = $row->exptime;
673 'exptime >= ' . $db->addQuotes( $minExpTime ),
674 'exptime < ' . $db->addQuotes( $dbTimestamp ),
679 if ( $progressCallback ) {
680 if ( intval( $totalSeconds ) === 0 ) {
683 $remainingSeconds =
wfTimestamp( TS_UNIX, $timestamp )
685 if ( $remainingSeconds > $totalSeconds ) {
686 $totalSeconds = $remainingSeconds;
688 $processedSeconds = $totalSeconds - $remainingSeconds;
689 $percent = ( $i + $processedSeconds / $totalSeconds )
690 / $this->shards * 100;
693 + ( $serverIndex / $this->numServers * 100 );
694 call_user_func( $progressCallback, $percent );
716 $db = $this->
getDB( $serverIndex );
739 if ( function_exists(
'gzdeflate' ) ) {
740 return gzdeflate( $serial );
752 if ( function_exists(
'gzinflate' ) ) {
753 Wikimedia\suppressWarnings();
754 $decomp = gzinflate( $serial );
755 Wikimedia\restoreWarnings();
757 if ( $decomp !==
false ) {
801 $this->logger->error(
"DBError: {$exception->getMessage()}" );
804 $this->logger->debug( __METHOD__ .
": ignoring connection error" );
807 $this->logger->debug( __METHOD__ .
": ignoring query error" );
818 unset( $this->conns[$serverIndex] );
820 if ( isset( $this->connFailureTimes[$serverIndex] ) ) {
821 if ( time() - $this->connFailureTimes[$serverIndex] >= 60 ) {
822 unset( $this->connFailureTimes[$serverIndex] );
823 unset( $this->connFailureErrors[$serverIndex] );
825 $this->logger->debug( __METHOD__ .
": Server #$serverIndex already down" );
830 $this->logger->info( __METHOD__ .
": Server #$serverIndex down until " . ( $now + 60 ) );
831 $this->connFailureTimes[$serverIndex] = $now;
832 $this->connFailureErrors[$serverIndex] = $exception;
840 $db = $this->
getDB( $serverIndex );
841 if ( $db->getType() !==
'mysql' ) {
842 throw new MWException( __METHOD__ .
' is not supported on this DB server' );
847 'CREATE TABLE ' . $db->tableName( $this->getTableNameByShard( $i ) ) .
848 ' LIKE ' . $db->tableName(
'objectcache' ),
867 $lb = MediaWikiServices::getInstance()->getDBLoadBalancer();
868 if ( $lb->getServerCount() <= 1 ) {
874 $masterPos = $lb->getMasterPos();
879 $loop =
new WaitConditionLoop(
880 function ()
use ( $lb, $masterPos ) {
881 return $lb->waitForAll( $masterPos, 1 );
887 return ( $loop->invoke() === $loop::CONDITION_REACHED );
903 $oldSilenced = $trxProfiler->setSilenced(
true );
904 return new ScopedCallback(
function ()
use ( $trxProfiler, $oldSilenced ) {
905 $trxProfiler->setSilenced( $oldSilenced );