MediaWiki REL1_34
LoadBalancer.php
Go to the documentation of this file.
1<?php
22namespace Wikimedia\Rdbms;
23
24use Psr\Log\LoggerInterface;
25use Psr\Log\NullLogger;
26use Wikimedia\ScopedCallback;
27use BagOStuff;
30use ArrayUtils;
31use LogicException;
32use UnexpectedValueException;
33use InvalidArgumentException;
34use RuntimeException;
35use Exception;
36
42class LoadBalancer implements ILoadBalancer {
44 private $loadMonitor;
48 private $srvCache;
50 private $wanCache;
52 private $profiler;
54 private $trxProfiler;
56 private $replLogger;
58 private $connLogger;
60 private $queryLogger;
62 private $perfLogger;
64 private $errorLogger;
67
69 private $localDomain;
70
74 private $conns;
75
77 private $servers;
79 private $groupLoads;
81 private $allowLagged;
83 private $waitTimeout;
89 private $maxLag;
92
94 private $hostname;
96 private $cliMode;
98 private $agent;
99
101 private $tableAliases = [];
103 private $indexAliases = [];
108
110 private $trxRoundId = false;
112 private $trxRoundStage = self::ROUND_CURSORY;
116 private $readIndexByGroup = [];
118 private $waitForPos;
120 private $laggedReplicaMode = false;
122 private $lastError = 'Unknown error';
124 private $readOnlyReason = false;
128 private $disabled = false;
130 private $connectionAttempted = false;
131
133 private $id;
135 private $ownerId;
136
138 const CONN_HELD_WARN_THRESHOLD = 10;
139
141 const MAX_LAG_DEFAULT = 6;
143 const MAX_WAIT_DEFAULT = 10;
145 const TTL_CACHE_READONLY = 5;
146
147 const KEY_LOCAL = 'local';
148 const KEY_FOREIGN_FREE = 'foreignFree';
149 const KEY_FOREIGN_INUSE = 'foreignInUse';
150
151 const KEY_LOCAL_NOROUND = 'localAutoCommit';
152 const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
153 const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
154
156 const ROUND_CURSORY = 'cursory';
158 const ROUND_FINALIZED = 'finalized';
160 const ROUND_APPROVED = 'approved';
162 const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
164 const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
166 const ROUND_ERROR = 'error';
167
168 public function __construct( array $params ) {
169 if ( !isset( $params['servers'] ) || !count( $params['servers'] ) ) {
170 throw new InvalidArgumentException( 'Missing or empty "servers" parameter' );
171 }
172
173 $listKey = -1;
174 $this->servers = [];
175 $this->groupLoads = [ self::GROUP_GENERIC => [] ];
176 foreach ( $params['servers'] as $i => $server ) {
177 if ( ++$listKey !== $i ) {
178 throw new UnexpectedValueException( 'List expected for "servers" parameter' );
179 }
180 if ( $i == 0 ) {
181 $server['master'] = true;
182 } else {
183 $server['replica'] = true;
184 }
185 $this->servers[$i] = $server;
186 foreach ( ( $server['groupLoads'] ?? [] ) as $group => $ratio ) {
187 $this->groupLoads[$group][$i] = $ratio;
188 }
189 $this->groupLoads[self::GROUP_GENERIC][$i] = $server['load'];
190 }
191
192 $localDomain = isset( $params['localDomain'] )
193 ? DatabaseDomain::newFromId( $params['localDomain'] )
196
197 $this->waitTimeout = $params['waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
198
199 $this->conns = self::newTrackedConnectionsArray();
200 $this->waitForPos = false;
201 $this->allowLagged = false;
202
203 if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
204 $this->readOnlyReason = $params['readOnlyReason'];
205 }
206
207 $this->maxLag = $params['maxLag'] ?? self::MAX_LAG_DEFAULT;
208
209 $this->loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => 'LoadMonitorNull' ];
210 $this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ];
211
212 $this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff();
213 $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
214 $this->profiler = $params['profiler'] ?? null;
215 $this->trxProfiler = $params['trxProfiler'] ?? new TransactionProfiler();
216
217 $this->errorLogger = $params['errorLogger'] ?? function ( Exception $e ) {
218 trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
219 };
220 $this->deprecationLogger = $params['deprecationLogger'] ?? function ( $msg ) {
221 trigger_error( $msg, E_USER_DEPRECATED );
222 };
223 foreach ( [ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ] as $key ) {
224 $this->$key = $params[$key] ?? new NullLogger();
225 }
226
227 $this->hostname = $params['hostname'] ?? ( gethostname() ?: 'unknown' );
228 $this->cliMode = $params['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
229 $this->agent = $params['agent'] ?? '';
230
231 if ( isset( $params['chronologyCallback'] ) ) {
232 $this->chronologyCallback = $params['chronologyCallback'];
233 }
234
235 if ( isset( $params['roundStage'] ) ) {
236 if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
237 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
238 } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
239 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
240 }
241 }
242
243 $group = $params['defaultGroup'] ?? self::GROUP_GENERIC;
244 $this->defaultGroup = isset( $this->groupLoads[$group] ) ? $group : self::GROUP_GENERIC;
245
246 static $nextId;
247 $this->id = $nextId = ( is_int( $nextId ) ? $nextId++ : mt_rand() );
248 $this->ownerId = $params['ownerId'] ?? null;
249 }
250
251 private static function newTrackedConnectionsArray() {
252 return [
253 // Connection were transaction rounds may be applied
254 self::KEY_LOCAL => [],
255 self::KEY_FOREIGN_INUSE => [],
256 self::KEY_FOREIGN_FREE => [],
257 // Auto-committing counterpart connections that ignore transaction rounds
258 self::KEY_LOCAL_NOROUND => [],
259 self::KEY_FOREIGN_INUSE_NOROUND => [],
260 self::KEY_FOREIGN_FREE_NOROUND => []
261 ];
262 }
263
264 public function getLocalDomainID() {
265 return $this->localDomain->getId();
266 }
267
268 public function resolveDomainID( $domain ) {
269 if ( $domain === $this->localDomainIdAlias || $domain === false ) {
270 // Local connection requested via some backwards-compatibility domain alias
271 return $this->getLocalDomainID();
272 }
273
274 return (string)$domain;
275 }
276
284 private function resolveGroups( $groups, $i ) {
285 // If a specific replica server was specified, then $groups makes no sense
286 if ( $i > 0 && $groups !== [] && $groups !== false ) {
287 $list = implode( ', ', (array)$groups );
288 throw new LogicException( "Query group(s) ($list) given with server index (#$i)" );
289 }
290
291 if ( $groups === [] || $groups === false || $groups === $this->defaultGroup ) {
292 $resolvedGroups = [ $this->defaultGroup ]; // common case
293 } elseif ( is_string( $groups ) && isset( $this->groupLoads[$groups] ) ) {
294 $resolvedGroups = [ $groups, $this->defaultGroup ];
295 } elseif ( is_array( $groups ) ) {
296 $resolvedGroups = array_keys( array_flip( $groups ) + [ self::GROUP_GENERIC => 1 ] );
297 } else {
298 $resolvedGroups = [ $this->defaultGroup ];
299 }
300
301 return $resolvedGroups;
302 }
303
310 private function sanitizeConnectionFlags( $flags, $i, $domain ) {
311 // Whether an outside caller is explicitly requesting the master database server
312 if ( $i === self::DB_MASTER || $i === $this->getWriterIndex() ) {
313 $flags |= self::CONN_INTENT_WRITABLE;
314 }
315
316 if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
317 // Callers use CONN_TRX_AUTOCOMMIT to bypass REPEATABLE-READ staleness without
318 // resorting to row locks (e.g. FOR UPDATE) or to make small out-of-band commits
319 // during larger transactions. This is useful for avoiding lock contention.
320
321 // Master DB server attributes (should match those of the replica DB servers)
322 $attributes = $this->getServerAttributes( $this->getWriterIndex() );
323 if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
324 // The RDBMS does not support concurrent writes (e.g. SQLite), so attempts
325 // to use separate connections would just cause self-deadlocks. Note that
326 // REPEATABLE-READ staleness is not an issue since DB-level locking means
327 // that transactions are Strict Serializable anyway.
328 $flags &= ~self::CONN_TRX_AUTOCOMMIT;
329 $type = $this->getServerType( $this->getWriterIndex() );
330 $this->connLogger->info( __METHOD__ . ": CONN_TRX_AUTOCOMMIT disallowed ($type)" );
331 } elseif ( isset( $this->tempTablesOnlyMode[$domain] ) ) {
332 // T202116: integration tests are active and queries should be all be using
333 // temporary clone tables (via prefix). Such tables are not visible accross
334 // different connections nor can there be REPEATABLE-READ snapshot staleness,
335 // so use the same connection for everything.
336 $flags &= ~self::CONN_TRX_AUTOCOMMIT;
337 }
338 }
339
340 return $flags;
341 }
342
348 private function enforceConnectionFlags( IDatabase $conn, $flags ) {
349 if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
350 if ( $conn->trxLevel() ) { // sanity
351 throw new DBUnexpectedError(
352 $conn,
353 'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
354 );
355 }
356
357 $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
358 }
359 }
360
366 private function getLoadMonitor() {
367 if ( !isset( $this->loadMonitor ) ) {
368 $compat = [
369 'LoadMonitor' => LoadMonitor::class,
370 'LoadMonitorNull' => LoadMonitorNull::class,
371 'LoadMonitorMySQL' => LoadMonitorMySQL::class,
372 ];
373
374 $class = $this->loadMonitorConfig['class'];
375 if ( isset( $compat[$class] ) ) {
376 $class = $compat[$class];
377 }
378
379 $this->loadMonitor = new $class(
380 $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
381 $this->loadMonitor->setLogger( $this->replLogger );
382 }
383
384 return $this->loadMonitor;
385 }
386
393 private function getRandomNonLagged( array $loads, $domain = false, $maxLag = INF ) {
394 $lags = $this->getLagTimes( $domain );
395
396 # Unset excessively lagged servers
397 foreach ( $lags as $i => $lag ) {
398 if ( $i !== $this->getWriterIndex() ) {
399 # How much lag this server nominally is allowed to have
400 $maxServerLag = $this->servers[$i]['max lag'] ?? $this->maxLag; // default
401 # Constrain that futher by $maxLag argument
402 $maxServerLag = min( $maxServerLag, $maxLag );
403
404 $host = $this->getServerName( $i );
405 if ( $lag === false && !is_infinite( $maxServerLag ) ) {
406 $this->replLogger->debug(
407 __METHOD__ .
408 ": server {host} is not replicating?", [ 'host' => $host ] );
409 unset( $loads[$i] );
410 } elseif ( $lag > $maxServerLag ) {
411 $this->replLogger->debug(
412 __METHOD__ .
413 ": server {host} has {lag} seconds of lag (>= {maxlag})",
414 [ 'host' => $host, 'lag' => $lag, 'maxlag' => $maxServerLag ]
415 );
416 unset( $loads[$i] );
417 }
418 }
419 }
420
421 # Find out if all the replica DBs with non-zero load are lagged
422 $sum = 0;
423 foreach ( $loads as $load ) {
424 $sum += $load;
425 }
426 if ( $sum == 0 ) {
427 # No appropriate DB servers except maybe the master and some replica DBs with zero load
428 # Do NOT use the master
429 # Instead, this function will return false, triggering read-only mode,
430 # and a lagged replica DB will be used instead.
431 return false;
432 }
433
434 if ( count( $loads ) == 0 ) {
435 return false;
436 }
437
438 # Return a random representative of the remainder
439 return ArrayUtils::pickRandom( $loads );
440 }
441
450 private function getConnectionIndex( $i, array $groups, $domain ) {
451 if ( $i === self::DB_MASTER ) {
452 $i = $this->getWriterIndex();
453 } elseif ( $i === self::DB_REPLICA ) {
454 foreach ( $groups as $group ) {
455 $groupIndex = $this->getReaderIndex( $group, $domain );
456 if ( $groupIndex !== false ) {
457 $i = $groupIndex; // group connection succeeded
458 break;
459 }
460 }
461 } elseif ( !isset( $this->servers[$i] ) ) {
462 throw new UnexpectedValueException( "Invalid server index index #$i" );
463 }
464
465 if ( $i === self::DB_REPLICA ) {
466 $this->lastError = 'Unknown error'; // set here in case of worse failure
467 $this->lastError = 'No working replica DB server: ' . $this->lastError;
468 $this->reportConnectionError();
469 return null; // unreachable due to exception
470 }
471
472 return $i;
473 }
474
475 public function getReaderIndex( $group = false, $domain = false ) {
476 if ( $this->getServerCount() == 1 ) {
477 // Skip the load balancing if there's only one server
478 return $this->getWriterIndex();
479 }
480
481 $group = is_string( $group ) ? $group : self::GROUP_GENERIC;
482
483 $index = $this->getExistingReaderIndex( $group );
484 if ( $index >= 0 ) {
485 // A reader index was already selected and "waitForPos" was handled
486 return $index;
487 }
488
489 // Use the server weight array for this load group
490 if ( isset( $this->groupLoads[$group] ) ) {
491 $loads = $this->groupLoads[$group];
492 } else {
493 $this->connLogger->info( __METHOD__ . ": no loads for group $group" );
494
495 return false;
496 }
497
498 // Scale the configured load ratios according to each server's load and state
499 $this->getLoadMonitor()->scaleLoads( $loads, $domain );
500
501 // Pick a server to use, accounting for weights, load, lag, and "waitForPos"
502 $this->lazyLoadReplicationPositions(); // optimizes server candidate selection
503 list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
504 if ( $i === false ) {
505 // DB connection unsuccessful
506 return false;
507 }
508
509 // If data seen by queries is expected to reflect the transactions committed as of
510 // or after a given replication position then wait for the DB to apply those changes
511 if ( $this->waitForPos && $i !== $this->getWriterIndex() && !$this->doWait( $i ) ) {
512 // Data will be outdated compared to what was expected
513 $laggedReplicaMode = true;
514 }
515
516 // Cache the reader index for future DB_REPLICA handles
517 $this->setExistingReaderIndex( $group, $i );
518 // Record whether the generic reader index is in "lagged replica DB" mode
519 if ( $group === self::GROUP_GENERIC && $laggedReplicaMode ) {
520 $this->laggedReplicaMode = true;
521 }
522
523 $serverName = $this->getServerName( $i );
524 $this->connLogger->debug( __METHOD__ . ": using server $serverName for group '$group'" );
525
526 return $i;
527 }
528
535 protected function getExistingReaderIndex( $group ) {
536 return $this->readIndexByGroup[$group] ?? -1;
537 }
538
545 private function setExistingReaderIndex( $group, $index ) {
546 if ( $index < 0 ) {
547 throw new UnexpectedValueException( "Cannot set a negative read server index" );
548 }
549 $this->readIndexByGroup[$group] = $index;
550 }
551
557 private function pickReaderIndex( array $loads, $domain = false ) {
558 if ( $loads === [] ) {
559 throw new InvalidArgumentException( "Server configuration array is empty" );
560 }
561
563 $i = false;
565 $laggedReplicaMode = false;
566
567 // Quickly look through the available servers for a server that meets criteria...
568 $currentLoads = $loads;
569 while ( count( $currentLoads ) ) {
570 if ( $this->allowLagged || $laggedReplicaMode ) {
571 $i = ArrayUtils::pickRandom( $currentLoads );
572 } else {
573 $i = false;
574 if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
575 $this->replLogger->debug( __METHOD__ . ": replication positions detected" );
576 // "chronologyCallback" sets "waitForPos" for session consistency.
577 // This triggers doWait() after connect, so it's especially good to
578 // avoid lagged servers so as to avoid excessive delay in that method.
579 $ago = microtime( true ) - $this->waitForPos->asOfTime();
580 // Aim for <= 1 second of waiting (being too picky can backfire)
581 $i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
582 }
583 if ( $i === false ) {
584 // Any server with less lag than it's 'max lag' param is preferable
585 $i = $this->getRandomNonLagged( $currentLoads, $domain );
586 }
587 if ( $i === false && count( $currentLoads ) ) {
588 // All replica DBs lagged. Switch to read-only mode
589 $this->replLogger->error(
590 __METHOD__ . ": all replica DBs lagged. Switch to read-only mode" );
591 $i = ArrayUtils::pickRandom( $currentLoads );
592 $laggedReplicaMode = true;
593 }
594 }
595
596 if ( $i === false ) {
597 // pickRandom() returned false.
598 // This is permanent and means the configuration or the load monitor
599 // wants us to return false.
600 $this->connLogger->debug( __METHOD__ . ": pickRandom() returned false" );
601
602 return [ false, false ];
603 }
604
605 $serverName = $this->getServerName( $i );
606 $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
607
608 // Get a connection to this server without triggering other server connections
609 $conn = $this->getServerConnection( $i, $domain, self::CONN_SILENCE_ERRORS );
610 if ( !$conn ) {
611 $this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" );
612 unset( $currentLoads[$i] ); // avoid this server next iteration
613 $i = false;
614 continue;
615 }
616
617 // Decrement reference counter, we are finished with this connection.
618 // It will be incremented for the caller later.
619 if ( $domain !== false ) {
620 $this->reuseConnection( $conn );
621 }
622
623 // Return this server
624 break;
625 }
626
627 // If all servers were down, quit now
628 if ( $currentLoads === [] ) {
629 $this->connLogger->error( __METHOD__ . ": all servers down" );
630 }
631
632 return [ $i, $laggedReplicaMode ];
633 }
634
635 public function waitFor( $pos ) {
636 $oldPos = $this->waitForPos;
637 try {
638 $this->waitForPos = $pos;
639 // If a generic reader connection was already established, then wait now
640 $i = $this->getExistingReaderIndex( self::GROUP_GENERIC );
641 if ( $i > 0 && !$this->doWait( $i ) ) {
642 $this->laggedReplicaMode = true;
643 }
644 // Otherwise, wait until a connection is established in getReaderIndex()
645 } finally {
646 // Restore the older position if it was higher since this is used for lag-protection
647 $this->setWaitForPositionIfHigher( $oldPos );
648 }
649 }
650
651 public function waitForOne( $pos, $timeout = null ) {
652 $oldPos = $this->waitForPos;
653 try {
654 $this->waitForPos = $pos;
655
656 $i = $this->getExistingReaderIndex( self::GROUP_GENERIC );
657 if ( $i <= 0 ) {
658 // Pick a generic replica DB if there isn't one yet
659 $readLoads = $this->groupLoads[self::GROUP_GENERIC];
660 unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
661 $readLoads = array_filter( $readLoads ); // with non-zero load
662 $i = ArrayUtils::pickRandom( $readLoads );
663 }
664
665 if ( $i > 0 ) {
666 $ok = $this->doWait( $i, true, $timeout );
667 } else {
668 $ok = true; // no applicable loads
669 }
670 } finally {
671 // Restore the old position; this is used for throttling, not lag-protection
672 $this->waitForPos = $oldPos;
673 }
674
675 return $ok;
676 }
677
678 public function waitForAll( $pos, $timeout = null ) {
679 $timeout = $timeout ?: $this->waitTimeout;
680
681 $oldPos = $this->waitForPos;
682 try {
683 $this->waitForPos = $pos;
684 $serverCount = $this->getServerCount();
685
686 $ok = true;
687 for ( $i = 1; $i < $serverCount; $i++ ) {
688 if ( $this->serverHasLoadInAnyGroup( $i ) ) {
689 $start = microtime( true );
690 $ok = $this->doWait( $i, true, $timeout ) && $ok;
691 $timeout -= intval( microtime( true ) - $start );
692 if ( $timeout <= 0 ) {
693 break; // timeout reached
694 }
695 }
696 }
697 } finally {
698 // Restore the old position; this is used for throttling, not lag-protection
699 $this->waitForPos = $oldPos;
700 }
701
702 return $ok;
703 }
704
709 private function serverHasLoadInAnyGroup( $i ) {
710 foreach ( $this->groupLoads as $loadsByIndex ) {
711 if ( ( $loadsByIndex[$i] ?? 0 ) > 0 ) {
712 return true;
713 }
714 }
715
716 return false;
717 }
718
722 private function setWaitForPositionIfHigher( $pos ) {
723 if ( !$pos ) {
724 return;
725 }
726
727 if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
728 $this->waitForPos = $pos;
729 }
730 }
731
732 public function getAnyOpenConnection( $i, $flags = 0 ) {
733 $i = ( $i === self::DB_MASTER ) ? $this->getWriterIndex() : $i;
734 // Connection handles required to be in auto-commit mode use a separate connection
735 // pool since the main pool is effected by implicit and explicit transaction rounds
736 $autocommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
737
738 $conn = false;
739 foreach ( $this->conns as $connsByServer ) {
740 // Get the connection array server indexes to inspect
741 if ( $i === self::DB_REPLICA ) {
742 $indexes = array_keys( $connsByServer );
743 } else {
744 $indexes = isset( $connsByServer[$i] ) ? [ $i ] : [];
745 }
746
747 foreach ( $indexes as $index ) {
748 $conn = $this->pickAnyOpenConnection( $connsByServer[$index], $autocommit );
749 if ( $conn ) {
750 break;
751 }
752 }
753 }
754
755 if ( $conn ) {
756 $this->enforceConnectionFlags( $conn, $flags );
757 }
758
759 return $conn;
760 }
761
767 private function pickAnyOpenConnection( $candidateConns, $autocommit ) {
768 $conn = false;
769
770 foreach ( $candidateConns as $candidateConn ) {
771 if ( !$candidateConn->isOpen() ) {
772 continue; // some sort of error occured?
773 } elseif (
774 $autocommit &&
775 (
776 // Connection is transaction round aware
777 !$candidateConn->getLBInfo( 'autoCommitOnly' ) ||
778 // Some sort of error left a transaction open?
779 $candidateConn->trxLevel()
780 )
781 ) {
782 continue; // some sort of error left a transaction open?
783 }
784
785 $conn = $candidateConn;
786 }
787
788 return $conn;
789 }
790
798 protected function doWait( $index, $open = false, $timeout = null ) {
799 $timeout = max( 1, intval( $timeout ?: $this->waitTimeout ) );
800
801 // Check if we already know that the DB has reached this point
802 $server = $this->getServerName( $index );
803 $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server, 'v1' );
805 $knownReachedPos = $this->srvCache->get( $key );
806 if (
807 $knownReachedPos instanceof DBMasterPos &&
808 $knownReachedPos->hasReached( $this->waitForPos )
809 ) {
810 $this->replLogger->debug(
811 __METHOD__ .
812 ': replica DB {dbserver} known to be caught up (pos >= $knownReachedPos).',
813 [ 'dbserver' => $server ]
814 );
815 return true;
816 }
817
818 // Find a connection to wait on, creating one if needed and allowed
819 $close = false; // close the connection afterwards
820 $flags = self::CONN_SILENCE_ERRORS;
821 $conn = $this->getAnyOpenConnection( $index, $flags );
822 if ( !$conn ) {
823 if ( !$open ) {
824 $this->replLogger->debug(
825 __METHOD__ . ': no connection open for {dbserver}',
826 [ 'dbserver' => $server ]
827 );
828
829 return false;
830 }
831 // Get a connection to this server without triggering other server connections
832 $conn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
833 if ( !$conn ) {
834 $this->replLogger->warning(
835 __METHOD__ . ': failed to connect to {dbserver}',
836 [ 'dbserver' => $server ]
837 );
838
839 return false;
840 }
841 // Avoid connection spam in waitForAll() when connections
842 // are made just for the sake of doing this lag check.
843 $close = true;
844 }
845
846 $this->replLogger->info(
847 __METHOD__ .
848 ': waiting for replica DB {dbserver} to catch up...',
849 [ 'dbserver' => $server ]
850 );
851
852 $result = $conn->masterPosWait( $this->waitForPos, $timeout );
853
854 if ( $result === null ) {
855 $this->replLogger->warning(
856 __METHOD__ . ': Errored out waiting on {host} pos {pos}',
857 [
858 'host' => $server,
859 'pos' => $this->waitForPos,
860 'trace' => ( new RuntimeException() )->getTraceAsString()
861 ]
862 );
863 $ok = false;
864 } elseif ( $result == -1 ) {
865 $this->replLogger->warning(
866 __METHOD__ . ': Timed out waiting on {host} pos {pos}',
867 [
868 'host' => $server,
869 'pos' => $this->waitForPos,
870 'trace' => ( new RuntimeException() )->getTraceAsString()
871 ]
872 );
873 $ok = false;
874 } else {
875 $this->replLogger->debug( __METHOD__ . ": done waiting" );
876 $ok = true;
877 // Remember that the DB reached this point
878 $this->srvCache->set( $key, $this->waitForPos, BagOStuff::TTL_DAY );
879 }
880
881 if ( $close ) {
882 $this->closeConnection( $conn );
883 }
884
885 return $ok;
886 }
887
888 public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
889 $domain = $this->resolveDomainID( $domain );
890 $groups = $this->resolveGroups( $groups, $i );
891 $flags = $this->sanitizeConnectionFlags( $flags, $i, $domain );
892 // If given DB_MASTER/DB_REPLICA, resolve it to a specific server index. Resolving
893 // DB_REPLICA might trigger getServerConnection() calls due to the getReaderIndex()
894 // connectivity checks or LoadMonitor::scaleLoads() server state cache regeneration.
895 // The use of getServerConnection() instead of getConnection() avoids infinite loops.
896 $serverIndex = $this->getConnectionIndex( $i, $groups, $domain );
897 // Get an open connection to that server (might trigger a new connection)
898 $conn = $this->getServerConnection( $serverIndex, $domain, $flags );
899 // Set master DB handles as read-only if there is high replication lag
900 if (
901 $serverIndex === $this->getWriterIndex() &&
902 $this->getLaggedReplicaMode( $domain ) &&
903 !is_string( $conn->getLBInfo( 'readOnlyReason' ) )
904 ) {
905 $reason = ( $this->getExistingReaderIndex( self::GROUP_GENERIC ) >= 0 )
906 ? 'The database is read-only until replication lag decreases.'
907 : 'The database is read-only until replica database servers becomes reachable.';
908 $conn->setLBInfo( 'readOnlyReason', $reason );
909 }
910
911 return $conn;
912 }
913
921 public function getServerConnection( $i, $domain, $flags = 0 ) {
922 // Number of connections made before getting the server index and handle
923 $priorConnectionsMade = $this->connectionCounter;
924 // Get an open connection to this server (might trigger a new connection)
925 $conn = $this->localDomain->equals( $domain )
926 ? $this->getLocalConnection( $i, $flags )
927 : $this->getForeignConnection( $i, $domain, $flags );
928 // Throw an error or otherwise bail out if the connection attempt failed
929 if ( !( $conn instanceof IDatabase ) ) {
930 if ( ( $flags & self::CONN_SILENCE_ERRORS ) != self::CONN_SILENCE_ERRORS ) {
931 $this->reportConnectionError();
932 }
933
934 return false;
935 }
936
937 // Profile any new connections caused by this method
938 if ( $this->connectionCounter > $priorConnectionsMade ) {
939 $this->trxProfiler->recordConnection(
940 $conn->getServer(),
941 $conn->getDBname(),
942 ( ( $flags & self::CONN_INTENT_WRITABLE ) == self::CONN_INTENT_WRITABLE )
943 );
944 }
945
946 if ( !$conn->isOpen() ) {
947 $this->errorConnection = $conn;
948 // Connection was made but later unrecoverably lost for some reason.
949 // Do not return a handle that will just throw exceptions on use, but
950 // let the calling code, e.g. getReaderIndex(), try another server.
951 return false;
952 }
953
954 // Make sure that flags like CONN_TRX_AUTOCOMMIT are respected by this handle
955 $this->enforceConnectionFlags( $conn, $flags );
956 // Set master DB handles as read-only if the load balancer is configured as read-only
957 // or the master database server is running in server-side read-only mode. Note that
958 // replica DB handles are always read-only via Database::assertIsWritableMaster().
959 // Read-only mode due to replication lag is *avoided* here to avoid recursion.
960 if ( $i === $this->getWriterIndex() ) {
961 if ( $this->readOnlyReason !== false ) {
963 } elseif ( $this->isMasterConnectionReadOnly( $conn, $flags ) ) {
964 $readOnlyReason = 'The master database server is running in read-only mode.';
965 } else {
966 $readOnlyReason = false;
967 }
968 $conn->setLBInfo( 'readOnlyReason', $readOnlyReason );
969 }
970
971 return $conn;
972 }
973
974 public function reuseConnection( IDatabase $conn ) {
975 $serverIndex = $conn->getLBInfo( 'serverIndex' );
976 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
977 if ( $serverIndex === null || $refCount === null ) {
978 return; // non-foreign connection; no domain-use tracking to update
979 } elseif ( $conn instanceof DBConnRef ) {
980 // DBConnRef already handles calling reuseConnection() and only passes the live
981 // Database instance to this method. Any caller passing in a DBConnRef is broken.
982 $this->connLogger->error(
983 __METHOD__ . ": got DBConnRef instance.\n" .
984 ( new LogicException() )->getTraceAsString() );
985
986 return;
987 }
988
989 if ( $this->disabled ) {
990 return; // DBConnRef handle probably survived longer than the LoadBalancer
991 }
992
993 if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
994 $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
995 $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
996 } else {
997 $connFreeKey = self::KEY_FOREIGN_FREE;
998 $connInUseKey = self::KEY_FOREIGN_INUSE;
999 }
1000
1001 $domain = $conn->getDomainID();
1002 if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
1003 throw new InvalidArgumentException(
1004 "Connection $serverIndex/$domain not found; it may have already been freed" );
1005 } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
1006 throw new InvalidArgumentException(
1007 "Connection $serverIndex/$domain mismatched; it may have already been freed" );
1008 }
1009
1010 $conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
1011 if ( $refCount <= 0 ) {
1012 $this->conns[$connFreeKey][$serverIndex][$domain] = $conn;
1013 unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
1014 if ( !$this->conns[$connInUseKey][$serverIndex] ) {
1015 unset( $this->conns[$connInUseKey][$serverIndex] ); // clean up
1016 }
1017 $this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
1018 } else {
1019 $this->connLogger->debug( __METHOD__ .
1020 ": reference count for $serverIndex/$domain reduced to $refCount" );
1021 }
1022 }
1023
1024 public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
1025 $domain = $this->resolveDomainID( $domain );
1026 $role = $this->getRoleFromIndex( $i );
1027
1028 return new DBConnRef( $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
1029 }
1030
1031 public function getLazyConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
1032 $domain = $this->resolveDomainID( $domain );
1033 $role = $this->getRoleFromIndex( $i );
1034
1035 return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role );
1036 }
1037
1038 public function getMaintenanceConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
1039 $domain = $this->resolveDomainID( $domain );
1040 $role = $this->getRoleFromIndex( $i );
1041
1042 return new MaintainableDBConnRef(
1043 $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
1044 }
1045
1050 private function getRoleFromIndex( $i ) {
1051 return ( $i === self::DB_MASTER || $i === $this->getWriterIndex() )
1054 }
1055
1063 public function openConnection( $i, $domain = false, $flags = 0 ) {
1064 return $this->getConnection( $i, [], $domain, $flags | self::CONN_SILENCE_ERRORS );
1065 }
1066
1081 private function getLocalConnection( $i, $flags = 0 ) {
1082 // Connection handles required to be in auto-commit mode use a separate connection
1083 // pool since the main pool is effected by implicit and explicit transaction rounds
1084 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
1085
1086 $connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
1087 if ( isset( $this->conns[$connKey][$i][0] ) ) {
1088 $conn = $this->conns[$connKey][$i][0];
1089 } else {
1090 // Open a new connection
1091 $server = $this->getServerInfoStrict( $i );
1092 $server['serverIndex'] = $i;
1093 $server['autoCommitOnly'] = $autoCommit;
1094 $conn = $this->reallyOpenConnection( $server, $this->localDomain );
1095 $host = $this->getServerName( $i );
1096 if ( $conn->isOpen() ) {
1097 $this->connLogger->debug(
1098 __METHOD__ . ": connected to database $i at '$host'." );
1099 $this->conns[$connKey][$i][0] = $conn;
1100 } else {
1101 $this->connLogger->warning(
1102 __METHOD__ . ": failed to connect to database $i at '$host'." );
1103 $this->errorConnection = $conn;
1104 $conn = false;
1105 }
1106 }
1107
1108 // Final sanity check to make sure the right domain is selected
1109 if (
1110 $conn instanceof IDatabase &&
1111 !$this->localDomain->isCompatible( $conn->getDomainID() )
1112 ) {
1113 throw new UnexpectedValueException(
1114 "Got connection to '{$conn->getDomainID()}', " .
1115 "but expected local domain ('{$this->localDomain}')" );
1116 }
1117
1118 return $conn;
1119 }
1120
1145 private function getForeignConnection( $i, $domain, $flags = 0 ) {
1146 $domainInstance = DatabaseDomain::newFromId( $domain );
1147 // Connection handles required to be in auto-commit mode use a separate connection
1148 // pool since the main pool is effected by implicit and explicit transaction rounds
1149 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
1150
1151 if ( $autoCommit ) {
1152 $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
1153 $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
1154 } else {
1155 $connFreeKey = self::KEY_FOREIGN_FREE;
1156 $connInUseKey = self::KEY_FOREIGN_INUSE;
1157 }
1158
1160 $conn = null;
1161
1162 if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
1163 // Reuse an in-use connection for the same domain
1164 $conn = $this->conns[$connInUseKey][$i][$domain];
1165 $this->connLogger->debug( __METHOD__ . ": reusing connection $i/$domain" );
1166 } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
1167 // Reuse a free connection for the same domain
1168 $conn = $this->conns[$connFreeKey][$i][$domain];
1169 unset( $this->conns[$connFreeKey][$i][$domain] );
1170 $this->conns[$connInUseKey][$i][$domain] = $conn;
1171 $this->connLogger->debug( __METHOD__ . ": reusing free connection $i/$domain" );
1172 } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
1173 // Reuse a free connection from another domain if possible
1174 foreach ( $this->conns[$connFreeKey][$i] as $oldDomain => $conn ) {
1175 if ( $domainInstance->getDatabase() !== null ) {
1176 // Check if changing the database will require a new connection.
1177 // In that case, leave the connection handle alone and keep looking.
1178 // This prevents connections from being closed mid-transaction and can
1179 // also avoid overhead if the same database will later be requested.
1180 if (
1181 $conn->databasesAreIndependent() &&
1182 $conn->getDBname() !== $domainInstance->getDatabase()
1183 ) {
1184 continue;
1185 }
1186 // Select the new database, schema, and prefix
1187 $conn->selectDomain( $domainInstance );
1188 } else {
1189 // Stay on the current database, but update the schema/prefix
1190 $conn->dbSchema( $domainInstance->getSchema() );
1191 $conn->tablePrefix( $domainInstance->getTablePrefix() );
1192 }
1193 unset( $this->conns[$connFreeKey][$i][$oldDomain] );
1194 // Note that if $domain is an empty string, getDomainID() might not match it
1195 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1196 $this->connLogger->debug( __METHOD__ .
1197 ": reusing free connection from $oldDomain for $domain" );
1198 break;
1199 }
1200 }
1201
1202 if ( !$conn ) {
1203 // Open a new connection
1204 $server = $this->getServerInfoStrict( $i );
1205 $server['serverIndex'] = $i;
1206 $server['foreignPoolRefCount'] = 0;
1207 $server['foreign'] = true;
1208 $server['autoCommitOnly'] = $autoCommit;
1209 $conn = $this->reallyOpenConnection( $server, $domainInstance );
1210 if ( !$conn->isOpen() ) {
1211 $this->connLogger->warning( __METHOD__ . ": connection error for $i/$domain" );
1212 $this->errorConnection = $conn;
1213 $conn = false;
1214 } else {
1215 // Note that if $domain is an empty string, getDomainID() might not match it
1216 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1217 $this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
1218 }
1219 }
1220
1221 if ( $conn instanceof IDatabase ) {
1222 // Final sanity check to make sure the right domain is selected
1223 if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
1224 throw new UnexpectedValueException(
1225 "Got connection to '{$conn->getDomainID()}', but expected '$domain'" );
1226 }
1227 // Increment reference count
1228 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
1229 $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
1230 }
1231
1232 return $conn;
1233 }
1234
1235 public function getServerAttributes( $i ) {
1237 $this->getServerType( $i ),
1238 $this->servers[$i]['driver'] ?? null
1239 );
1240 }
1241
1248 private function isOpen( $index ) {
1249 return (bool)$this->getAnyOpenConnection( $index );
1250 }
1251
1263 protected function reallyOpenConnection( array $server, DatabaseDomain $domain ) {
1264 if ( $this->disabled ) {
1265 throw new DBAccessError();
1266 }
1267
1268 if ( $domain->getDatabase() === null ) {
1269 // The database domain does not specify a DB name and some database systems require a
1270 // valid DB specified on connection. The $server configuration array contains a default
1271 // DB name to use for connections in such cases.
1272 if ( $server['type'] === 'mysql' ) {
1273 // For MySQL, DATABASE and SCHEMA are synonyms, connections need not specify a DB,
1274 // and the DB name in $server might not exist due to legacy reasons (the default
1275 // domain used to ignore the local LB domain, even when mismatched).
1276 $server['dbname'] = null;
1277 }
1278 } else {
1279 $server['dbname'] = $domain->getDatabase();
1280 }
1281
1282 if ( $domain->getSchema() !== null ) {
1283 $server['schema'] = $domain->getSchema();
1284 }
1285
1286 // It is always possible to connect with any prefix, even the empty string
1287 $server['tablePrefix'] = $domain->getTablePrefix();
1288
1289 // Let the handle know what the cluster master is (e.g. "db1052")
1290 $masterName = $this->getServerName( $this->getWriterIndex() );
1291 $server['clusterMasterHost'] = $masterName;
1292
1293 $server['srvCache'] = $this->srvCache;
1294 // Set loggers and profilers
1295 $server['connLogger'] = $this->connLogger;
1296 $server['queryLogger'] = $this->queryLogger;
1297 $server['errorLogger'] = $this->errorLogger;
1298 $server['deprecationLogger'] = $this->deprecationLogger;
1299 $server['profiler'] = $this->profiler;
1300 $server['trxProfiler'] = $this->trxProfiler;
1301 // Use the same agent and PHP mode for all DB handles
1302 $server['cliMode'] = $this->cliMode;
1303 $server['agent'] = $this->agent;
1304 // Use DBO_DEFAULT flags by default for LoadBalancer managed databases. Assume that the
1305 // application calls LoadBalancer::commitMasterChanges() before the PHP script completes.
1306 $server['flags'] = $server['flags'] ?? IDatabase::DBO_DEFAULT;
1307 $server['ownerId'] = $this->id;
1308
1309 // Create a live connection object
1310 $conn = Database::factory( $server['type'], $server, Database::NEW_UNCONNECTED );
1311 $conn->setLBInfo( $server );
1312 $conn->setLazyMasterHandle(
1313 $this->getLazyConnectionRef( self::DB_MASTER, [], $conn->getDomainID() )
1314 );
1315 $conn->setTableAliases( $this->tableAliases );
1316 $conn->setIndexAliases( $this->indexAliases );
1317
1318 try {
1319 $conn->initConnection();
1321 } catch ( DBConnectionError $e ) {
1322 // ignore; let the DB handle the logging
1323 }
1324
1325 if ( $server['serverIndex'] === $this->getWriterIndex() ) {
1326 if ( $this->trxRoundId !== false ) {
1327 $this->applyTransactionRoundFlags( $conn );
1328 }
1329 foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
1330 $conn->setTransactionListener( $name, $callback );
1331 }
1332 }
1333
1334 $this->lazyLoadReplicationPositions(); // session consistency
1335
1336 // Log when many connection are made on requests
1337 $count = $this->getCurrentConnectionCount();
1338 if ( $count >= self::CONN_HELD_WARN_THRESHOLD ) {
1339 $this->perfLogger->warning(
1340 __METHOD__ . ": {connections}+ connections made (master={masterdb})",
1341 [
1342 'connections' => $count,
1343 'dbserver' => $conn->getServer(),
1344 'masterdb' => $conn->getLBInfo( 'clusterMasterHost' )
1345 ]
1346 );
1347 }
1348
1349 return $conn;
1350 }
1351
1355 private function lazyLoadReplicationPositions() {
1356 if ( !$this->connectionAttempted && $this->chronologyCallback ) {
1357 $this->connectionAttempted = true;
1358 ( $this->chronologyCallback )( $this ); // generally calls waitFor()
1359 $this->connLogger->debug( __METHOD__ . ': executed chronology callback.' );
1360 }
1361 }
1362
1366 private function reportConnectionError() {
1367 $conn = $this->errorConnection; // the connection which caused the error
1368 $context = [
1369 'method' => __METHOD__,
1370 'last_error' => $this->lastError,
1371 ];
1372
1373 if ( $conn instanceof IDatabase ) {
1374 $context['db_server'] = $conn->getServer();
1375 $this->connLogger->warning(
1376 __METHOD__ . ": connection error: {last_error} ({db_server})",
1377 $context
1378 );
1379
1380 throw new DBConnectionError( $conn, "{$this->lastError} ({$context['db_server']})" );
1381 } else {
1382 // No last connection, probably due to all servers being too busy
1383 $this->connLogger->error(
1384 __METHOD__ .
1385 ": LB failure with no last connection. Connection error: {last_error}",
1386 $context
1387 );
1388
1389 // If all servers were busy, "lastError" will contain something sensible
1390 throw new DBConnectionError( null, $this->lastError );
1391 }
1392 }
1393
1394 public function getWriterIndex() {
1395 return 0;
1396 }
1397
1405 public function haveIndex( $i ) {
1406 return array_key_exists( $i, $this->servers );
1407 }
1408
1416 public function isNonZeroLoad( $i ) {
1417 return ( isset( $this->servers[$i] ) && $this->groupLoads[self::GROUP_GENERIC][$i] > 0 );
1418 }
1419
1420 public function getServerCount() {
1421 return count( $this->servers );
1422 }
1423
1424 public function hasReplicaServers() {
1425 return ( $this->getServerCount() > 1 );
1426 }
1427
1428 public function hasStreamingReplicaServers() {
1429 foreach ( $this->servers as $i => $server ) {
1430 if ( $i !== $this->getWriterIndex() && empty( $server['is static'] ) ) {
1431 return true;
1432 }
1433 }
1434
1435 return false;
1436 }
1437
1438 public function getServerName( $i ) {
1439 $name = $this->servers[$i]['hostName'] ?? ( $this->servers[$i]['host'] ?? '' );
1440
1441 return ( $name != '' ) ? $name : 'localhost';
1442 }
1443
1444 public function getServerInfo( $i ) {
1445 return $this->servers[$i] ?? false;
1446 }
1447
1448 public function getServerType( $i ) {
1449 return $this->servers[$i]['type'] ?? 'unknown';
1450 }
1451
1452 public function getMasterPos() {
1453 $index = $this->getWriterIndex();
1454
1455 $conn = $this->getAnyOpenConnection( $index );
1456 if ( $conn ) {
1457 return $conn->getMasterPos();
1458 }
1459
1460 $conn = $this->getConnection( $index, self::CONN_SILENCE_ERRORS );
1461 if ( !$conn ) {
1462 $this->reportConnectionError();
1463 return null; // unreachable due to exception
1464 }
1465
1466 try {
1467 $pos = $conn->getMasterPos();
1468 } finally {
1469 $this->closeConnection( $conn );
1470 }
1471
1472 return $pos;
1473 }
1474
1475 public function getReplicaResumePos() {
1476 // Get the position of any existing master server connection
1477 $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1478 if ( $masterConn ) {
1479 return $masterConn->getMasterPos();
1480 }
1481
1482 // Get the highest position of any existing replica server connection
1483 $highestPos = false;
1484 $serverCount = $this->getServerCount();
1485 for ( $i = 1; $i < $serverCount; $i++ ) {
1486 if ( !empty( $this->servers[$i]['is static'] ) ) {
1487 continue; // server does not use replication
1488 }
1489
1490 $conn = $this->getAnyOpenConnection( $i );
1491 $pos = $conn ? $conn->getReplicaPos() : false;
1492 if ( !$pos ) {
1493 continue; // no open connection or could not get position
1494 }
1495
1496 $highestPos = $highestPos ?: $pos;
1497 if ( $pos->hasReached( $highestPos ) ) {
1498 $highestPos = $pos;
1499 }
1500 }
1501
1502 return $highestPos;
1503 }
1504
1505 public function disable( $fname = __METHOD__, $owner = null ) {
1506 $this->assertOwnership( $fname, $owner );
1507 $this->closeAll( $fname, $owner );
1508 $this->disabled = true;
1509 }
1510
1511 public function closeAll( $fname = __METHOD__, $owner = null ) {
1512 $this->assertOwnership( $fname, $owner );
1513 if ( $this->ownerId === null ) {
1515 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1516 }
1517 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( $fname ) {
1518 $host = $conn->getServer();
1519 $this->connLogger->debug( "$fname: closing connection to database '$host'." );
1520 $conn->close( $fname, $this->id );
1521 } );
1522
1523 $this->conns = self::newTrackedConnectionsArray();
1524 }
1525
1526 public function closeConnection( IDatabase $conn ) {
1527 if ( $conn instanceof DBConnRef ) {
1528 // Avoid calling close() but still leaving the handle in the pool
1529 throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' );
1530 }
1531
1532 $serverIndex = $conn->getLBInfo( 'serverIndex' );
1533 foreach ( $this->conns as $type => $connsByServer ) {
1534 if ( !isset( $connsByServer[$serverIndex] ) ) {
1535 continue;
1536 }
1537
1538 foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
1539 if ( $conn === $trackedConn ) {
1540 $host = $this->getServerName( $i );
1541 $this->connLogger->debug(
1542 __METHOD__ . ": closing connection to database $i at '$host'." );
1543 unset( $this->conns[$type][$serverIndex][$i] );
1544 break 2;
1545 }
1546 }
1547 }
1548
1549 $conn->close( __METHOD__ );
1550 }
1551
1552 public function commitAll( $fname = __METHOD__, $owner = null ) {
1553 $this->commitMasterChanges( $fname, $owner );
1554 $this->flushMasterSnapshots( $fname, $owner );
1555 $this->flushReplicaSnapshots( $fname, $owner );
1556 }
1557
1558 public function finalizeMasterChanges( $fname = __METHOD__, $owner = null ) {
1559 $this->assertOwnership( $fname, $owner );
1560 $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
1561 if ( $this->ownerId === null ) {
1563 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1564 }
1565
1566 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1567 // Loop until callbacks stop adding callbacks on other connections
1568 $total = 0;
1569 do {
1570 $count = 0; // callbacks execution attempts
1571 $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$count ) {
1572 // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
1573 // Any error should cause all (peer) transactions to be rolled back together.
1574 $count += $conn->runOnTransactionPreCommitCallbacks();
1575 } );
1576 $total += $count;
1577 } while ( $count > 0 );
1578 // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
1579 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1580 $conn->setTrxEndCallbackSuppression( true );
1581 } );
1582 $this->trxRoundStage = self::ROUND_FINALIZED;
1583
1584 return $total;
1585 }
1586
1587 public function approveMasterChanges( array $options, $fname = __METHOD__, $owner = null ) {
1588 $this->assertOwnership( $fname, $owner );
1589 $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
1590 if ( $this->ownerId === null ) {
1592 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1593 }
1594
1595 $limit = $options['maxWriteDuration'] ?? 0;
1596
1597 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1598 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
1599 // If atomic sections or explicit transactions are still open, some caller must have
1600 // caught an exception but failed to properly rollback any changes. Detect that and
1601 // throw and error (causing rollback).
1602 $conn->assertNoOpenTransactions();
1603 // Assert that the time to replicate the transaction will be sane.
1604 // If this fails, then all DB transactions will be rollback back together.
1605 $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1606 if ( $limit > 0 && $time > $limit ) {
1607 throw new DBTransactionSizeError(
1608 $conn,
1609 "Transaction spent $time second(s) in writes, exceeding the limit of $limit",
1610 [ $time, $limit ]
1611 );
1612 }
1613 // If a connection sits idle while slow queries execute on another, that connection
1614 // may end up dropped before the commit round is reached. Ping servers to detect this.
1615 if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1616 throw new DBTransactionError(
1617 $conn,
1618 "A connection to the {$conn->getDBname()} database was lost before commit"
1619 );
1620 }
1621 } );
1622 $this->trxRoundStage = self::ROUND_APPROVED;
1623 }
1624
1625 public function beginMasterChanges( $fname = __METHOD__, $owner = null ) {
1626 $this->assertOwnership( $fname, $owner );
1627 if ( $this->trxRoundId !== false ) {
1628 throw new DBTransactionError(
1629 null,
1630 "$fname: Transaction round '{$this->trxRoundId}' already started"
1631 );
1632 }
1633 $this->assertTransactionRoundStage( self::ROUND_CURSORY );
1634 if ( $this->ownerId === null ) {
1636 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1637 }
1638
1639 // Clear any empty transactions (no writes/callbacks) from the implicit round
1640 $this->flushMasterSnapshots( $fname, $owner );
1641
1642 $this->trxRoundId = $fname;
1643 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1644 // Mark applicable handles as participating in this explicit transaction round.
1645 // For each of these handles, any writes and callbacks will be tied to a single
1646 // transaction. The (peer) handles will reject begin()/commit() calls unless they
1647 // are part of an en masse commit or an en masse rollback.
1648 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1649 $this->applyTransactionRoundFlags( $conn );
1650 } );
1651 $this->trxRoundStage = self::ROUND_CURSORY;
1652 }
1653
1654 public function commitMasterChanges( $fname = __METHOD__, $owner = null ) {
1655 $this->assertOwnership( $fname, $owner );
1656 $this->assertTransactionRoundStage( self::ROUND_APPROVED );
1657 if ( $this->ownerId === null ) {
1659 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1660 }
1661
1662 $failures = [];
1663
1664 $restore = ( $this->trxRoundId !== false );
1665 $this->trxRoundId = false;
1666 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1667 // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
1668 // Note that callbacks should already be suppressed due to finalizeMasterChanges().
1670 function ( IDatabase $conn ) use ( $fname, &$failures ) {
1671 try {
1672 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1673 } catch ( DBError $e ) {
1674 ( $this->errorLogger )( $e );
1675 $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1676 }
1677 }
1678 );
1679 if ( $failures ) {
1680 throw new DBTransactionError(
1681 null,
1682 "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1683 );
1684 }
1685 if ( $restore ) {
1686 // Unmark handles as participating in this explicit transaction round
1687 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1688 $this->undoTransactionRoundFlags( $conn );
1689 } );
1690 }
1691 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
1692 }
1693
1694 public function runMasterTransactionIdleCallbacks( $fname = __METHOD__, $owner = null ) {
1695 $this->assertOwnership( $fname, $owner );
1696 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1697 $type = IDatabase::TRIGGER_COMMIT;
1698 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1699 $type = IDatabase::TRIGGER_ROLLBACK;
1700 } else {
1701 throw new DBTransactionError(
1702 null,
1703 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1704 );
1705 }
1706 if ( $this->ownerId === null ) {
1708 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1709 }
1710
1711 $oldStage = $this->trxRoundStage;
1712 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1713
1714 // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
1715 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1716 $conn->setTrxEndCallbackSuppression( false );
1717 } );
1718
1719 $e = null; // first exception
1720 $fname = __METHOD__;
1721 // Loop until callbacks stop adding callbacks on other connections
1722 do {
1723 // Run any pending callbacks for each connection...
1724 $count = 0; // callback execution attempts
1726 function ( Database $conn ) use ( $type, &$e, &$count ) {
1727 if ( $conn->trxLevel() ) {
1728 return; // retry in the next iteration, after commit() is called
1729 }
1730 try {
1731 $count += $conn->runOnTransactionIdleCallbacks( $type );
1732 } catch ( Exception $ex ) {
1733 $e = $e ?: $ex;
1734 }
1735 }
1736 );
1737 // Clear out any active transactions left over from callbacks...
1738 $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$e, $fname ) {
1739 if ( $conn->writesPending() ) {
1740 // A callback from another handle wrote to this one and DBO_TRX is set
1741 $this->queryLogger->warning( $fname . ": found writes pending." );
1742 $fnames = implode( ', ', $conn->pendingWriteAndCallbackCallers() );
1743 $this->queryLogger->warning(
1744 "$fname: found writes pending ($fnames).",
1745 [
1746 'db_server' => $conn->getServer(),
1747 'db_name' => $conn->getDBname()
1748 ]
1749 );
1750 } elseif ( $conn->trxLevel() ) {
1751 // A callback from another handle read from this one and DBO_TRX is set,
1752 // which can easily happen if there is only one DB (no replicas)
1753 $this->queryLogger->debug( "$fname: found empty transaction." );
1754 }
1755 try {
1756 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1757 } catch ( Exception $ex ) {
1758 $e = $e ?: $ex;
1759 }
1760 } );
1761 } while ( $count > 0 );
1762
1763 $this->trxRoundStage = $oldStage;
1764
1765 return $e;
1766 }
1767
1768 public function runMasterTransactionListenerCallbacks( $fname = __METHOD__, $owner = null ) {
1769 $this->assertOwnership( $fname, $owner );
1770 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1771 $type = IDatabase::TRIGGER_COMMIT;
1772 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1773 $type = IDatabase::TRIGGER_ROLLBACK;
1774 } else {
1775 throw new DBTransactionError(
1776 null,
1777 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1778 );
1779 }
1780 if ( $this->ownerId === null ) {
1782 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1783 }
1784
1785 $e = null;
1786
1787 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1788 $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
1789 try {
1791 } catch ( Exception $ex ) {
1792 $e = $e ?: $ex;
1793 }
1794 } );
1795 $this->trxRoundStage = self::ROUND_CURSORY;
1796
1797 return $e;
1798 }
1799
1800 public function rollbackMasterChanges( $fname = __METHOD__, $owner = null ) {
1801 $this->assertOwnership( $fname, $owner );
1802 if ( $this->ownerId === null ) {
1804 $scope = ScopedCallback::newScopedIgnoreUserAbort();
1805 }
1806
1807 $restore = ( $this->trxRoundId !== false );
1808 $this->trxRoundId = false;
1809 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1810 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
1811 $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1812 } );
1813 if ( $restore ) {
1814 // Unmark handles as participating in this explicit transaction round
1815 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1816 $this->undoTransactionRoundFlags( $conn );
1817 } );
1818 }
1819 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
1820 }
1821
1826 private function assertTransactionRoundStage( $stage ) {
1827 $stages = (array)$stage;
1828
1829 if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
1830 $stageList = implode(
1831 '/',
1832 array_map( function ( $v ) {
1833 return "'$v'";
1834 }, $stages )
1835 );
1836 throw new DBTransactionError(
1837 null,
1838 "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
1839 );
1840 }
1841 }
1842
1855 private function assertOwnership( $fname, $owner ) {
1856 if ( $this->ownerId !== null && $owner !== $this->ownerId && $owner !== $this->id ) {
1857 throw new DBTransactionError(
1858 null,
1859 "$fname: LoadBalancer is owned by ID '{$this->ownerId}' (got '$owner')."
1860 );
1861 }
1862 }
1863
1873 private function applyTransactionRoundFlags( Database $conn ) {
1874 if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
1875 return; // transaction rounds do not apply to these connections
1876 }
1877
1878 if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1879 // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1880 // Force DBO_TRX even in CLI mode since a commit round is expected soon.
1881 $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
1882 }
1883
1884 if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1885 $conn->setLBInfo( 'trxRoundId', $this->trxRoundId );
1886 }
1887 }
1888
1892 private function undoTransactionRoundFlags( Database $conn ) {
1893 if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
1894 return; // transaction rounds do not apply to these connections
1895 }
1896
1897 if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1898 $conn->setLBInfo( 'trxRoundId', null ); // remove the round ID
1899 }
1900
1901 if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1902 $conn->restoreFlags( $conn::RESTORE_PRIOR );
1903 }
1904 }
1905
1906 public function flushReplicaSnapshots( $fname = __METHOD__, $owner = null ) {
1907 $this->assertOwnership( $fname, $owner );
1908 $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) use ( $fname ) {
1909 $conn->flushSnapshot( $fname );
1910 } );
1911 }
1912
1913 public function flushMasterSnapshots( $fname = __METHOD__, $owner = null ) {
1914 $this->assertOwnership( $fname, $owner );
1915 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
1916 $conn->flushSnapshot( $fname );
1917 } );
1918 }
1919
1924 public function getTransactionRoundStage() {
1925 return $this->trxRoundStage;
1926 }
1927
1928 public function hasMasterConnection() {
1929 return $this->isOpen( $this->getWriterIndex() );
1930 }
1931
1932 public function hasMasterChanges() {
1933 $pending = 0;
1934 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$pending ) {
1935 $pending |= $conn->writesOrCallbacksPending();
1936 } );
1937
1938 return (bool)$pending;
1939 }
1940
1941 public function lastMasterChangeTimestamp() {
1942 $lastTime = false;
1943 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$lastTime ) {
1944 $lastTime = max( $lastTime, $conn->lastDoneWrites() );
1945 } );
1946
1947 return $lastTime;
1948 }
1949
1950 public function hasOrMadeRecentMasterChanges( $age = null ) {
1951 $age = ( $age === null ) ? $this->waitTimeout : $age;
1952
1953 return ( $this->hasMasterChanges()
1954 || $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
1955 }
1956
1957 public function pendingMasterChangeCallers() {
1958 $fnames = [];
1959 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$fnames ) {
1960 $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1961 } );
1962
1963 return $fnames;
1964 }
1965
1966 public function getLaggedReplicaMode( $domain = false ) {
1967 if ( $this->laggedReplicaMode ) {
1968 return true; // stay in lagged replica mode
1969 }
1970
1971 if ( $this->hasStreamingReplicaServers() ) {
1972 // This will set "laggedReplicaMode" as needed
1973 $this->getReaderIndex( self::GROUP_GENERIC, $domain );
1974 }
1975
1977 }
1978
1979 public function laggedReplicaUsed() {
1981 }
1982
1983 public function getReadOnlyReason( $domain = false ) {
1984 $domainInstance = DatabaseDomain::newFromId( $this->resolveDomainID( $domain ) );
1985
1986 if ( $this->readOnlyReason !== false ) {
1987 return $this->readOnlyReason;
1988 } elseif ( $this->isMasterRunningReadOnly( $domainInstance ) ) {
1989 return 'The master database server is running in read-only mode.';
1990 } elseif ( $this->getLaggedReplicaMode( $domain ) ) {
1991 return ( $this->getExistingReaderIndex( self::GROUP_GENERIC ) >= 0 )
1992 ? 'The database is read-only until replication lag decreases.'
1993 : 'The database is read-only until a replica database server becomes reachable.';
1994 }
1995
1996 return false;
1997 }
1998
2004 private function isMasterConnectionReadOnly( IDatabase $conn, $flags = 0 ) {
2005 // Note that table prefixes are not related to server-side read-only mode
2006 $key = $this->srvCache->makeGlobalKey(
2007 'rdbms-server-readonly',
2008 $conn->getServer(),
2009 $conn->getDBname(),
2010 $conn->dbSchema()
2011 );
2012
2013 if ( ( $flags & self::CONN_REFRESH_READ_ONLY ) == self::CONN_REFRESH_READ_ONLY ) {
2014 try {
2015 $readOnly = (int)$conn->serverIsReadOnly();
2016 } catch ( DBError $e ) {
2017 $readOnly = 0;
2018 }
2019 $this->srvCache->set( $key, $readOnly, BagOStuff::TTL_PROC_SHORT );
2020 } else {
2021 $readOnly = $this->srvCache->getWithSetCallback(
2022 $key,
2023 BagOStuff::TTL_PROC_SHORT,
2024 function () use ( $conn ) {
2025 try {
2026 return (int)$conn->serverIsReadOnly();
2027 } catch ( DBError $e ) {
2028 return 0;
2029 }
2030 }
2031 );
2032 }
2033
2034 return (bool)$readOnly;
2035 }
2036
2041 private function isMasterRunningReadOnly( DatabaseDomain $domain ) {
2042 // Context will often be HTTP GET/HEAD; heavily cache the results
2043 return (bool)$this->wanCache->getWithSetCallback(
2044 // Note that table prefixes are not related to server-side read-only mode
2045 $this->wanCache->makeGlobalKey(
2046 'rdbms-server-readonly',
2047 $this->getMasterServerName(),
2048 $domain->getDatabase(),
2049 $domain->getSchema()
2050 ),
2051 self::TTL_CACHE_READONLY,
2052 function () use ( $domain ) {
2053 $old = $this->trxProfiler->setSilenced( true );
2054 try {
2055 $index = $this->getWriterIndex();
2056 // Reset the cache for isMasterConnectionReadOnly()
2057 $flags = self::CONN_REFRESH_READ_ONLY;
2058 $conn = $this->getServerConnection( $index, $domain->getId(), $flags );
2059 // Reuse the process cache set above
2060 $readOnly = (int)$this->isMasterConnectionReadOnly( $conn );
2061 $this->reuseConnection( $conn );
2062 } catch ( DBError $e ) {
2063 $readOnly = 0;
2064 }
2065 $this->trxProfiler->setSilenced( $old );
2066
2067 return $readOnly;
2068 },
2069 [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG, 'lockTSE' => 10, 'busyValue' => 0 ]
2070 );
2071 }
2072
2073 public function allowLagged( $mode = null ) {
2074 if ( $mode === null ) {
2075 return $this->allowLagged;
2076 }
2077 $this->allowLagged = $mode;
2078
2079 return $this->allowLagged;
2080 }
2081
2082 public function pingAll() {
2083 $success = true;
2084 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$success ) {
2085 if ( !$conn->ping() ) {
2086 $success = false;
2087 }
2088 } );
2089
2090 return $success;
2091 }
2092
2093 public function forEachOpenConnection( $callback, array $params = [] ) {
2094 foreach ( $this->conns as $connsByServer ) {
2095 foreach ( $connsByServer as $serverConns ) {
2096 foreach ( $serverConns as $conn ) {
2097 $callback( $conn, ...$params );
2098 }
2099 }
2100 }
2101 }
2102
2103 public function forEachOpenMasterConnection( $callback, array $params = [] ) {
2104 $masterIndex = $this->getWriterIndex();
2105 foreach ( $this->conns as $connsByServer ) {
2106 if ( isset( $connsByServer[$masterIndex] ) ) {
2108 foreach ( $connsByServer[$masterIndex] as $conn ) {
2109 $callback( $conn, ...$params );
2110 }
2111 }
2112 }
2113 }
2114
2115 public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
2116 foreach ( $this->conns as $connsByServer ) {
2117 foreach ( $connsByServer as $i => $serverConns ) {
2118 if ( $i === $this->getWriterIndex() ) {
2119 continue; // skip master
2120 }
2121 foreach ( $serverConns as $conn ) {
2122 $callback( $conn, ...$params );
2123 }
2124 }
2125 }
2126 }
2127
2131 private function getCurrentConnectionCount() {
2132 $count = 0;
2133 foreach ( $this->conns as $connsByServer ) {
2134 foreach ( $connsByServer as $serverConns ) {
2135 $count += count( $serverConns );
2136 }
2137 }
2138
2139 return $count;
2140 }
2141
2142 public function getMaxLag( $domain = false ) {
2143 $host = '';
2144 $maxLag = -1;
2145 $maxIndex = 0;
2146
2147 if ( $this->hasReplicaServers() ) {
2148 $lagTimes = $this->getLagTimes( $domain );
2149 foreach ( $lagTimes as $i => $lag ) {
2150 if ( $this->groupLoads[self::GROUP_GENERIC][$i] > 0 && $lag > $maxLag ) {
2151 $maxLag = $lag;
2152 $host = $this->getServerInfoStrict( $i, 'host' );
2153 $maxIndex = $i;
2154 }
2155 }
2156 }
2157
2158 return [ $host, $maxLag, $maxIndex ];
2159 }
2160
2161 public function getLagTimes( $domain = false ) {
2162 if ( !$this->hasReplicaServers() ) {
2163 return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
2164 }
2165
2166 $knownLagTimes = []; // map of (server index => 0 seconds)
2167 $indexesWithLag = [];
2168 foreach ( $this->servers as $i => $server ) {
2169 if ( empty( $server['is static'] ) ) {
2170 $indexesWithLag[] = $i; // DB server might have replication lag
2171 } else {
2172 $knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
2173 }
2174 }
2175
2176 return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
2177 }
2178
2194 public function safeGetLag( IDatabase $conn ) {
2195 if ( $conn->getLBInfo( 'is static' ) ) {
2196 return 0; // static dataset
2197 } elseif ( $conn->getLBInfo( 'serverIndex' ) == $this->getWriterIndex() ) {
2198 return 0; // this is the master
2199 }
2200
2201 return $conn->getLag();
2202 }
2203
2204 public function waitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
2205 $timeout = max( 1, $timeout ?: $this->waitTimeout );
2206
2207 if ( $this->getServerCount() <= 1 || !$conn->getLBInfo( 'replica' ) ) {
2208 return true; // server is not a replica DB
2209 }
2210
2211 if ( !$pos ) {
2212 // Get the current master position, opening a connection if needed
2213 $index = $this->getWriterIndex();
2214 $flags = self::CONN_SILENCE_ERRORS;
2215 $masterConn = $this->getAnyOpenConnection( $index, $flags );
2216 if ( $masterConn ) {
2217 $pos = $masterConn->getMasterPos();
2218 } else {
2219 $masterConn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
2220 if ( !$masterConn ) {
2221 throw new DBReplicationWaitError(
2222 null,
2223 "Could not obtain a master database connection to get the position"
2224 );
2225 }
2226 $pos = $masterConn->getMasterPos();
2227 $this->closeConnection( $masterConn );
2228 }
2229 }
2230
2231 if ( $pos instanceof DBMasterPos ) {
2232 $start = microtime( true );
2233 $result = $conn->masterPosWait( $pos, $timeout );
2234 $seconds = max( microtime( true ) - $start, 0 );
2235 if ( $result == -1 || is_null( $result ) ) {
2236 $msg = __METHOD__ . ': timed out waiting on {host} pos {pos} [{seconds}s]';
2237 $this->replLogger->warning( $msg, [
2238 'host' => $conn->getServer(),
2239 'pos' => $pos,
2240 'seconds' => round( $seconds, 6 ),
2241 'trace' => ( new RuntimeException() )->getTraceAsString()
2242 ] );
2243 $ok = false;
2244 } else {
2245 $this->replLogger->debug( __METHOD__ . ': done waiting' );
2246 $ok = true;
2247 }
2248 } else {
2249 $ok = false; // something is misconfigured
2250 $this->replLogger->error(
2251 __METHOD__ . ': could not get master pos for {host}',
2252 [
2253 'host' => $conn->getServer(),
2254 'trace' => ( new RuntimeException() )->getTraceAsString()
2255 ]
2256 );
2257 }
2258
2259 return $ok;
2260 }
2261
2274 public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
2275 return $this->waitForMasterPos( $conn, $pos, $timeout );
2276 }
2277
2278 public function setTransactionListener( $name, callable $callback = null ) {
2279 if ( $callback ) {
2280 $this->trxRecurringCallbacks[$name] = $callback;
2281 } else {
2282 unset( $this->trxRecurringCallbacks[$name] );
2283 }
2285 function ( IDatabase $conn ) use ( $name, $callback ) {
2286 $conn->setTransactionListener( $name, $callback );
2287 }
2288 );
2289 }
2290
2291 public function setTableAliases( array $aliases ) {
2292 $this->tableAliases = $aliases;
2293 }
2294
2295 public function setIndexAliases( array $aliases ) {
2296 $this->indexAliases = $aliases;
2297 }
2298
2299 public function setLocalDomainPrefix( $prefix ) {
2300 // Find connections to explicit foreign domains still marked as in-use...
2301 $domainsInUse = [];
2302 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$domainsInUse ) {
2303 // Once reuseConnection() is called on a handle, its reference count goes from 1 to 0.
2304 // Until then, it is still in use by the caller (explicitly or via DBConnRef scope).
2305 if ( $conn->getLBInfo( 'foreignPoolRefCount' ) > 0 ) {
2306 $domainsInUse[] = $conn->getDomainID();
2307 }
2308 } );
2309
2310 // Do not switch connections to explicit foreign domains unless marked as safe
2311 if ( $domainsInUse ) {
2312 $domains = implode( ', ', $domainsInUse );
2313 throw new DBUnexpectedError( null,
2314 "Foreign domain connections are still in use ($domains)" );
2315 }
2316
2317 $this->setLocalDomain( new DatabaseDomain(
2318 $this->localDomain->getDatabase(),
2319 $this->localDomain->getSchema(),
2320 $prefix
2321 ) );
2322
2323 // Update the prefix for all local connections...
2324 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( $prefix ) {
2325 if ( !$conn->getLBInfo( 'foreign' ) ) {
2326 $conn->tablePrefix( $prefix );
2327 }
2328 } );
2329 }
2330
2331 public function redefineLocalDomain( $domain ) {
2332 $this->closeAll( __METHOD__, $this->id );
2333
2334 $this->setLocalDomain( DatabaseDomain::newFromId( $domain ) );
2335 }
2336
2337 public function setTempTablesOnlyMode( $value, $domain ) {
2338 $old = $this->tempTablesOnlyMode[$domain] ?? false;
2339 if ( $value ) {
2340 $this->tempTablesOnlyMode[$domain] = true;
2341 } else {
2342 unset( $this->tempTablesOnlyMode[$domain] );
2343 }
2344
2345 return $old;
2346 }
2347
2351 private function setLocalDomain( DatabaseDomain $domain ) {
2352 $this->localDomain = $domain;
2353 // In case a caller assumes that the domain ID is simply <db>-<prefix>, which is almost
2354 // always true, gracefully handle the case when they fail to account for escaping.
2355 if ( $this->localDomain->getTablePrefix() != '' ) {
2356 $this->localDomainIdAlias =
2357 $this->localDomain->getDatabase() . '-' . $this->localDomain->getTablePrefix();
2358 } else {
2359 $this->localDomainIdAlias = $this->localDomain->getDatabase();
2360 }
2361 }
2362
2369 private function getServerInfoStrict( $i, $field = null ) {
2370 if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
2371 throw new InvalidArgumentException( "No server with index '$i'" );
2372 }
2373
2374 if ( $field !== null ) {
2375 if ( !array_key_exists( $field, $this->servers[$i] ) ) {
2376 throw new InvalidArgumentException( "No field '$field' in server index '$i'" );
2377 }
2378
2379 return $this->servers[$i][$field];
2380 }
2381
2382 return $this->servers[$i];
2383 }
2384
2388 private function getMasterServerName() {
2389 return $this->getServerName( $this->getWriterIndex() );
2390 }
2391
2392 function __destruct() {
2393 // Avoid connection leaks for sanity
2394 $this->disable( __METHOD__, $this->ownerId );
2395 }
2396}
2397
2401class_alias( LoadBalancer::class, 'LoadBalancer' );
A collection of static methods to play with arrays.
static pickRandom( $weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:63
A BagOStuff object with no objects in it.
Multi-datacenter aware caching interface.
Exception class for attempted DB access.
Helper class used for automatically marking an IDatabase connection as reusable (once it no longer ma...
Definition DBConnRef.php:29
Database error base class.
Definition DBError.php:30
Exception class for replica DB wait errors.
Class to handle database/schema/prefix specifications for IDatabase.
Relational database abstraction object.
Definition Database.php:49
runOnTransactionPreCommitCallbacks()
Actually consume and run any "on transaction pre-commit" callbacks.
setLBInfo( $nameOrArray, $value=null)
Set the entire array or a particular key of the managing load balancer info array.
Definition Database.php:597
runTransactionListenerCallbacks( $trigger)
Actually run any "transaction listener" callbacks.
restoreFlags( $state=self::RESTORE_PRIOR)
Restore the flags to their prior state before the last setFlag/clearFlag call.
Definition Database.php:773
setTrxEndCallbackSuppression( $suppress)
Whether to disable running of post-COMMIT/ROLLBACK callbacks.
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
Definition Database.php:585
trxLevel()
Gets the current transaction level.
Definition Database.php:531
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
Definition Database.php:743
static attributesFromType( $dbType, $driver=null)
Definition Database.php:433
getServer()
Get the server hostname or IP address.
static factory( $type, $params=[], $connect=self::NEW_CONNECTED)
Construct a Database subclass instance given a database type and parameters.
Definition Database.php:370
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
Definition Database.php:786
Database connection, tracking, load balancing, and transaction manager for a cluster.
array[] $servers
Map of (server index => server config array)
undoTransactionRoundFlags(Database $conn)
bool $allowLagged
Whether to disregard replica DB lag as a factor in replica DB selection.
mixed $profiler
Class name or object With profileIn/profileOut methods.
getAnyOpenConnection( $i, $flags=0)
Get any open connection to a given server index, local or foreign.
callable $errorLogger
Exception logger.
IDatabase[][][] Database[][][] $conns
Map of (connection category => server index => IDatabase[])
int $waitTimeout
Seconds to spend waiting on replica DB lag to resolve.
bool DBMasterPos $waitForPos
Replication sync position or false if not set.
callable $deprecationLogger
Deprecation logger.
setLocalDomain(DatabaseDomain $domain)
$id
var int An identifier for this class instance
getLocalDomainID()
Get the local (and default) database domain ID of connection handles.
hasReplicaServers()
Whether there are any replica servers configured.
getConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a live database handle reference for a real or virtual (DB_MASTER/DB_REPLICA) server index.
string $trxRoundStage
Stage of the current transaction round in the transaction round life-cycle.
getServerType( $i)
Get DB type of the server with the specified index.
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the master server.
setTableAliases(array $aliases)
Make certain table names use their own database, schema, and table prefix when passed into SQL querie...
flushMasterSnapshots( $fname=__METHOD__, $owner=null)
Commit all master DB transactions so as to flush any REPEATABLE-READ or SSI snapshots.
flushReplicaSnapshots( $fname=__METHOD__, $owner=null)
Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshots.
applyTransactionRoundFlags(Database $conn)
Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round.
string $agent
Agent name for query profiling.
waitFor( $pos)
Set the master position to reach before the next generic group DB handle query.
getMasterPos()
Get the current master replication position.
getForeignConnection( $i, $domain, $flags=0)
Open a connection to a foreign DB, or return one if it is already open.
beginMasterChanges( $fname=__METHOD__, $owner=null)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
pickAnyOpenConnection( $candidateConns, $autocommit)
getServerConnection( $i, $domain, $flags=0)
assertOwnership( $fname, $owner)
Assure that if this instance is owned, the caller is either the owner or is internal.
bool $laggedReplicaMode
Whether the generic reader fell back to a lagged replica DB.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
redefineLocalDomain( $domain)
Close all connection and redefine the local domain for testing or schema creation.
enforceConnectionFlags(IDatabase $conn, $flags)
pickReaderIndex(array $loads, $domain=false)
sanitizeConnectionFlags( $flags, $i, $domain)
runMasterTransactionListenerCallbacks( $fname=__METHOD__, $owner=null)
Run all recurring post-COMMIT/ROLLBACK listener callbacks.
laggedReplicaUsed()
Checks whether the database for generic connections this request was both:
doWait( $index, $open=false, $timeout=null)
Wait for a given replica DB to catch up to the master pos stored in "waitForPos".
int null $ownerId
Integer ID of the managing LBFactory instance or null if none.
string[] $indexAliases
Map of (index alias => index)
isOpen( $index)
Test if the specified index represents an open connection.
array $loadMonitorConfig
The LoadMonitor configuration.
array[] $groupLoads
Map of (group => server index => weight)
callable[] $trxRecurringCallbacks
Map of (name => callable)
commitAll( $fname=__METHOD__, $owner=null)
Commit transactions on all open connections.
string null $defaultGroup
Default query group to use with getConnection()
getConnectionIndex( $i, array $groups, $domain)
Get the server index to use for a specified server index and query group list.
isNonZeroLoad( $i)
Returns true if the specified index is valid and has non-zero load.
int[] $readIndexByGroup
The group replica server indexes keyed by group.
getLoadMonitor()
Get a LoadMonitor instance.
int $maxLag
Amount of replication lag, in seconds, that is considered "high".
setTransactionListener( $name, callable $callback=null)
Set a callback via IDatabase::setTransactionListener() on all current and future master connections o...
forEachOpenReplicaConnection( $callback, array $params=[])
Call a function with each open replica DB connection object.
lastMasterChangeTimestamp()
Get the timestamp of the latest write query done by this thread.
setTempTablesOnlyMode( $value, $domain)
Indicate whether the tables on this domain are only temporary tables for testing.
isMasterRunningReadOnly(DatabaseDomain $domain)
reuseConnection(IDatabase $conn)
Mark a live handle as being available for reuse under a different database domain.
getLagTimes( $domain=false)
Get an estimate of replication lag (in seconds) for each server.
string bool $trxRoundId
Explicit DBO_TRX transaction round active or false if none.
safeWaitForMasterPos(IDatabase $conn, $pos=false, $timeout=null)
Wait for a replica DB to reach a specified master position.
closeConnection(IDatabase $conn)
Close a connection.
int $connectionCounter
Total number of new connections ever made with this instance.
getExistingReaderIndex( $group)
Get the server index chosen by the load balancer for use with the given query group.
getLazyConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a database handle reference for a real or virtual (DB_MASTER/DB_REPLICA) server index.
closeAll( $fname=__METHOD__, $owner=null)
Close all open connections.
getRandomNonLagged(array $loads, $domain=false, $maxLag=INF)
setLocalDomainPrefix( $prefix)
Set a new table prefix for the existing local domain ID for testing.
bool $cliMode
Whether this PHP instance is for a CLI script.
getMaintenanceConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a live database handle for a real or virtual (DB_MASTER/DB_REPLICA) server index that can be used...
commitMasterChanges( $fname=__METHOD__, $owner=null)
Issue COMMIT on all open master connections to flush changes and view snapshots.
safeGetLag(IDatabase $conn)
Get the lag in seconds for a given connection, or zero if this load balancer does not have replicatio...
openConnection( $i, $domain=false, $flags=0)
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
isMasterConnectionReadOnly(IDatabase $conn, $flags=0)
string $hostname
Current server name.
getServerInfoStrict( $i, $field=null)
setExistingReaderIndex( $group, $index)
Set the server index chosen by the load balancer for use with the given query group.
getReaderIndex( $group=false, $domain=false)
Get the server index of the reader connection for a given group.
getLocalConnection( $i, $flags=0)
Open a connection to a local DB, or return one if it is already open.
waitForMasterPos(IDatabase $conn, $pos=false, $timeout=null)
Wait for a replica DB to reach a specified master position.
DatabaseDomain $localDomain
Local DB domain ID and default for selectDB() calls.
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
setIndexAliases(array $aliases)
Convert certain index names to alternative names before querying the DB.
getConnection( $i, $groups=[], $domain=false, $flags=0)
Get a live handle for a real or virtual (DB_MASTER/DB_REPLICA) server index.
TransactionProfiler $trxProfiler
disable( $fname=__METHOD__, $owner=null)
Close all connections and disable this load balancer.
getServerName( $i)
Get the host name or IP address of the server with the specified index.
approveMasterChanges(array $options, $fname=__METHOD__, $owner=null)
Perform all pre-commit checks for things like replication safety.
getLaggedReplicaMode( $domain=false)
forEachOpenConnection( $callback, array $params=[])
Call a function with each open connection object.
haveIndex( $i)
Returns true if the specified index is a valid server index.
reallyOpenConnection(array $server, DatabaseDomain $domain)
Open a new network connection to a server (uncached)
getWriterIndex()
Get the server index of the master server.
pendingMasterChangeCallers()
Get the list of callers that have pending master changes.
bool $connectionAttempted
Whether any connection has been attempted yet.
hasMasterChanges()
Whether there are pending changes or callbacks in a transaction by this thread.
array[] $tableAliases
$aliases Map of (table => (dbname, schema, prefix) map)
callable null $chronologyCallback
Callback to run before the first connection attempt.
getMaxLag( $domain=false)
Get the hostname and lag time of the most-lagged replica server.
Database $errorConnection
Connection handle that caused a problem.
string $lastError
The last DB selection or connection error.
lazyLoadReplicationPositions()
Make sure that any "waitForPos" positions are loaded and available to doWait()
string $localDomainIdAlias
Alternate local DB domain instead of DatabaseDomain::getId()
finalizeMasterChanges( $fname=__METHOD__, $owner=null)
Run pre-commit callbacks and defer execution of post-commit callbacks.
getServerInfo( $i)
Return the server info structure for a given index or false if the index is invalid.
allowLagged( $mode=null)
Disables/enables lag checks.
getServerCount()
Get the number of servers defined in configuration.
runMasterTransactionIdleCallbacks( $fname=__METHOD__, $owner=null)
Consume and run all pending post-COMMIT/ROLLBACK callbacks and commit dangling transactions.
resolveGroups( $groups, $i)
Resolve $groups into a list of query groups defining as having database servers.
string bool $readOnlyReason
Reason this instance is read-only or false if not.
waitForOne( $pos, $timeout=null)
Set the master wait position and wait for a generic replica DB to catch up to it.
waitForAll( $pos, $timeout=null)
Set the master wait position and wait for ALL replica DBs to catch up to it.
bool[] $tempTablesOnlyMode
Map of (domain => whether to use "temp tables only" mode)
__construct(array $params)
Construct a manager of IDatabase connection objects.
rollbackMasterChanges( $fname=__METHOD__, $owner=null)
Issue ROLLBACK only on master, only if queries were done on connection.
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Helper class that detects high-contention DB queries via profiling calls.
An object representing a master or replica DB position in a replicated setup.
hasReached(DBMasterPos $pos)
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
flushSnapshot( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Commit any transaction but error out if writes or callbacks are pending.
rollback( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Rollback a transaction previously started using begin()
lastDoneWrites()
Get the last time the connection may have been used for a write query.
getDomainID()
Return the currently selected domain ID.
assertNoOpenTransactions()
Assert that all explicit transactions or atomic sections have been closed.
getServer()
Get the server hostname or IP address.
setLBInfo( $nameOrArray, $value=null)
Set the entire array or a particular key of the managing load balancer info array.
setTransactionListener( $name, callable $callback=null)
Run a callback after each time any transaction commits or rolls back.
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
clearFlag( $flag, $remember=self::REMEMBER_NOTHING)
Clear a flag for this connection.
pendingWriteQueryDuration( $type=self::ESTIMATE_TOTAL)
Get the time spend running write queries for this transaction.
commit( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Commits a transaction previously started using begin()
getLag()
Get the amount of replication lag for this database server.
ping(&$rtt=null)
Ping the server and try to reconnect if it there is no connection.
masterPosWait(DBMasterPos $pos, $timeout)
Wait for the replica DB to catch up to a given master position.
close( $fname=__METHOD__, $owner=null)
Close the database connection.
trxLevel()
Gets the current transaction level.
getDBname()
Get the current DB name.
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...
pendingWriteCallers()
Get the list of method names that did write queries for this transaction.
dbSchema( $schema=null)
Get/set the db schema.
Database cluster connection, tracking, load balancing, and transaction manager interface.
An interface for database load monitoring.
$context
Definition load.php:45
const DB_REPLICA
Definition defines.php:25
const DB_MASTER
Definition defines.php:26