MediaWiki  master
LoadBalancer.php
Go to the documentation of this file.
1 <?php
22 namespace Wikimedia\Rdbms;
23 
36 
42 class LoadBalancer implements ILoadBalancer {
44  private $loadMonitor;
48  private $srvCache;
50  private $wanCache;
52  private $profiler;
54  private $trxProfiler;
56  private $replLogger;
58  private $connLogger;
60  private $queryLogger;
62  private $perfLogger;
64  private $errorLogger;
67 
69  private $localDomain;
70 
72  private $conns;
73 
75  private $servers;
77  private $groupLoads;
79  private $allowLagged;
81  private $waitTimeout;
87  private $maxLag;
89  private $defaultGroup;
90 
92  private $hostname;
94  private $cliMode;
96  private $agent;
97 
99  private $tableAliases = [];
101  private $indexAliases = [];
104 
108  private $readIndexByGroup = [];
110  private $waitForPos;
112  private $laggedReplicaMode = false;
114  private $lastError = 'Unknown error';
116  private $readOnlyReason = false;
118  private $connectionCounter = 0;
120  private $disabled = false;
122  private $connectionAttempted = false;
123 
125  private $ownerId;
127  private $trxRoundId = false;
129  private $trxRoundStage = self::ROUND_CURSORY;
130 
132  const CONN_HELD_WARN_THRESHOLD = 10;
133 
135  const MAX_LAG_DEFAULT = 6;
137  const MAX_WAIT_DEFAULT = 10;
139  const TTL_CACHE_READONLY = 5;
140 
141  const KEY_LOCAL = 'local';
142  const KEY_FOREIGN_FREE = 'foreignFree';
143  const KEY_FOREIGN_INUSE = 'foreignInUse';
144 
145  const KEY_LOCAL_NOROUND = 'localAutoCommit';
146  const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
147  const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
148 
150  const ROUND_CURSORY = 'cursory';
152  const ROUND_FINALIZED = 'finalized';
154  const ROUND_APPROVED = 'approved';
156  const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
158  const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
160  const ROUND_ERROR = 'error';
161 
162  public function __construct( array $params ) {
163  if ( !isset( $params['servers'] ) || !count( $params['servers'] ) ) {
164  throw new InvalidArgumentException( 'Missing or empty "servers" parameter' );
165  }
166 
167  $listKey = -1;
168  $this->servers = [];
169  $this->groupLoads = [ self::GROUP_GENERIC => [] ];
170  foreach ( $params['servers'] as $i => $server ) {
171  if ( ++$listKey !== $i ) {
172  throw new UnexpectedValueException( 'List expected for "servers" parameter' );
173  }
174  if ( $i == 0 ) {
175  $server['master'] = true;
176  } else {
177  $server['replica'] = true;
178  }
179  $this->servers[$i] = $server;
180  foreach ( ( $server['groupLoads'] ?? [] ) as $group => $ratio ) {
181  $this->groupLoads[$group][$i] = $ratio;
182  }
183  $this->groupLoads[self::GROUP_GENERIC][$i] = $server['load'];
184  }
185 
186  $localDomain = isset( $params['localDomain'] )
187  ? DatabaseDomain::newFromId( $params['localDomain'] )
189  $this->setLocalDomain( $localDomain );
190 
191  $this->waitTimeout = $params['waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
192 
193  $this->conns = self::newTrackedConnectionsArray();
194  $this->waitForPos = false;
195  $this->allowLagged = false;
196 
197  if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
198  $this->readOnlyReason = $params['readOnlyReason'];
199  }
200 
201  $this->maxLag = $params['maxLag'] ?? self::MAX_LAG_DEFAULT;
202 
203  $this->loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => 'LoadMonitorNull' ];
204  $this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ];
205 
206  $this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff();
207  $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
208  $this->profiler = $params['profiler'] ?? null;
209  $this->trxProfiler = $params['trxProfiler'] ?? new TransactionProfiler();
210 
211  $this->errorLogger = $params['errorLogger'] ?? function ( Exception $e ) {
212  trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
213  };
214  $this->deprecationLogger = $params['deprecationLogger'] ?? function ( $msg ) {
215  trigger_error( $msg, E_USER_DEPRECATED );
216  };
217 
218  foreach ( [ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ] as $key ) {
219  $this->$key = $params[$key] ?? new NullLogger();
220  }
221 
222  $this->hostname = $params['hostname'] ?? ( gethostname() ?: 'unknown' );
223  $this->cliMode = $params['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
224  $this->agent = $params['agent'] ?? '';
225 
226  if ( isset( $params['chronologyCallback'] ) ) {
227  $this->chronologyCallback = $params['chronologyCallback'];
228  }
229 
230  if ( isset( $params['roundStage'] ) ) {
231  if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
232  $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
233  } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
234  $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
235  }
236  }
237 
238  $group = $params['defaultGroup'] ?? self::GROUP_GENERIC;
239  $this->defaultGroup = isset( $this->groupLoads[$group] ) ? $group : self::GROUP_GENERIC;
240 
241  $this->ownerId = $params['ownerId'] ?? null;
242  }
243 
244  private static function newTrackedConnectionsArray() {
245  return [
246  // Connection were transaction rounds may be applied
247  self::KEY_LOCAL => [],
248  self::KEY_FOREIGN_INUSE => [],
249  self::KEY_FOREIGN_FREE => [],
250  // Auto-committing counterpart connections that ignore transaction rounds
251  self::KEY_LOCAL_NOROUND => [],
252  self::KEY_FOREIGN_INUSE_NOROUND => [],
253  self::KEY_FOREIGN_FREE_NOROUND => []
254  ];
255  }
256 
257  public function getLocalDomainID() {
258  return $this->localDomain->getId();
259  }
260 
261  public function resolveDomainID( $domain ) {
262  if ( $domain === $this->localDomainIdAlias || $domain === false ) {
263  // Local connection requested via some backwards-compatibility domain alias
264  return $this->getLocalDomainID();
265  }
266 
267  return (string)$domain;
268  }
269 
277  private function resolveGroups( $groups, $i ) {
278  // If a specific replica server was specified, then $groups makes no sense
279  if ( $i > 0 && $groups !== [] && $groups !== false ) {
280  $list = implode( ', ', (array)$groups );
281  throw new LogicException( "Query group(s) ($list) given with server index (#$i)" );
282  }
283 
284  if ( $groups === [] || $groups === false || $groups === $this->defaultGroup ) {
285  $resolvedGroups = [ $this->defaultGroup ]; // common case
286  } elseif ( is_string( $groups ) && isset( $this->groupLoads[$groups] ) ) {
287  $resolvedGroups = [ $groups, $this->defaultGroup ];
288  } elseif ( is_array( $groups ) ) {
289  $resolvedGroups = array_keys( array_flip( $groups ) + [ self::GROUP_GENERIC => 1 ] );
290  } else {
291  $resolvedGroups = [ $this->defaultGroup ];
292  }
293 
294  return $resolvedGroups;
295  }
296 
302  private function sanitizeConnectionFlags( $flags, $i ) {
303  // Whether an outside caller is explicitly requesting the master database server
304  if ( $i === self::DB_MASTER || $i === $this->getWriterIndex() ) {
305  $flags |= self::CONN_INTENT_WRITABLE;
306  }
307 
308  if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
309  // Callers use CONN_TRX_AUTOCOMMIT to bypass REPEATABLE-READ staleness without
310  // resorting to row locks (e.g. FOR UPDATE) or to make small out-of-band commits
311  // during larger transactions. This is useful for avoiding lock contention.
312 
313  // Master DB server attributes (should match those of the replica DB servers)
314  $attributes = $this->getServerAttributes( $this->getWriterIndex() );
315  if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
316  // The RDBMS does not support concurrent writes (e.g. SQLite), so attempts
317  // to use separate connections would just cause self-deadlocks. Note that
318  // REPEATABLE-READ staleness is not an issue since DB-level locking means
319  // that transactions are Strict Serializable anyway.
320  $flags &= ~self::CONN_TRX_AUTOCOMMIT;
321  $type = $this->getServerType( $this->getWriterIndex() );
322  $this->connLogger->info( __METHOD__ . ": CONN_TRX_AUTOCOMMIT disallowed ($type)" );
323  }
324  }
325 
326  return $flags;
327  }
328 
334  private function enforceConnectionFlags( IDatabase $conn, $flags ) {
335  if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
336  if ( $conn->trxLevel() ) { // sanity
337  throw new DBUnexpectedError(
338  $conn,
339  'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
340  );
341  }
342 
343  $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
344  }
345  }
346 
352  private function getLoadMonitor() {
353  if ( !isset( $this->loadMonitor ) ) {
354  $compat = [
355  'LoadMonitor' => LoadMonitor::class,
356  'LoadMonitorNull' => LoadMonitorNull::class,
357  'LoadMonitorMySQL' => LoadMonitorMySQL::class,
358  ];
359 
360  $class = $this->loadMonitorConfig['class'];
361  if ( isset( $compat[$class] ) ) {
362  $class = $compat[$class];
363  }
364 
365  $this->loadMonitor = new $class(
366  $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
367  $this->loadMonitor->setLogger( $this->replLogger );
368  }
369 
370  return $this->loadMonitor;
371  }
372 
379  private function getRandomNonLagged( array $loads, $domain = false, $maxLag = INF ) {
380  $lags = $this->getLagTimes( $domain );
381 
382  # Unset excessively lagged servers
383  foreach ( $lags as $i => $lag ) {
384  if ( $i !== $this->getWriterIndex() ) {
385  # How much lag this server nominally is allowed to have
386  $maxServerLag = $this->servers[$i]['max lag'] ?? $this->maxLag; // default
387  # Constrain that futher by $maxLag argument
388  $maxServerLag = min( $maxServerLag, $maxLag );
389 
390  $host = $this->getServerName( $i );
391  if ( $lag === false && !is_infinite( $maxServerLag ) ) {
392  $this->replLogger->debug(
393  __METHOD__ .
394  ": server {host} is not replicating?", [ 'host' => $host ] );
395  unset( $loads[$i] );
396  } elseif ( $lag > $maxServerLag ) {
397  $this->replLogger->debug(
398  __METHOD__ .
399  ": server {host} has {lag} seconds of lag (>= {maxlag})",
400  [ 'host' => $host, 'lag' => $lag, 'maxlag' => $maxServerLag ]
401  );
402  unset( $loads[$i] );
403  }
404  }
405  }
406 
407  # Find out if all the replica DBs with non-zero load are lagged
408  $sum = 0;
409  foreach ( $loads as $load ) {
410  $sum += $load;
411  }
412  if ( $sum == 0 ) {
413  # No appropriate DB servers except maybe the master and some replica DBs with zero load
414  # Do NOT use the master
415  # Instead, this function will return false, triggering read-only mode,
416  # and a lagged replica DB will be used instead.
417  return false;
418  }
419 
420  if ( count( $loads ) == 0 ) {
421  return false;
422  }
423 
424  # Return a random representative of the remainder
425  return ArrayUtils::pickRandom( $loads );
426  }
427 
436  private function getConnectionIndex( $i, array $groups, $domain ) {
437  if ( $i === self::DB_MASTER ) {
438  $i = $this->getWriterIndex();
439  } elseif ( $i === self::DB_REPLICA ) {
440  foreach ( $groups as $group ) {
441  $groupIndex = $this->getReaderIndex( $group, $domain );
442  if ( $groupIndex !== false ) {
443  $i = $groupIndex; // group connection succeeded
444  break;
445  }
446  }
447  } elseif ( !isset( $this->servers[$i] ) ) {
448  throw new UnexpectedValueException( "Invalid server index index #$i" );
449  }
450 
451  if ( $i === self::DB_REPLICA ) {
452  $this->lastError = 'Unknown error'; // set here in case of worse failure
453  $this->lastError = 'No working replica DB server: ' . $this->lastError;
454  $this->reportConnectionError();
455  return null; // unreachable due to exception
456  }
457 
458  return $i;
459  }
460 
461  public function getReaderIndex( $group = false, $domain = false ) {
462  if ( $this->getServerCount() == 1 ) {
463  // Skip the load balancing if there's only one server
464  return $this->getWriterIndex();
465  }
466 
467  $group = is_string( $group ) ? $group : self::GROUP_GENERIC;
468 
469  $index = $this->getExistingReaderIndex( $group );
470  if ( $index >= 0 ) {
471  // A reader index was already selected and "waitForPos" was handled
472  return $index;
473  }
474 
475  // Use the server weight array for this load group
476  if ( isset( $this->groupLoads[$group] ) ) {
477  $loads = $this->groupLoads[$group];
478  } else {
479  $this->connLogger->info( __METHOD__ . ": no loads for group $group" );
480 
481  return false;
482  }
483 
484  // Scale the configured load ratios according to each server's load and state
485  $this->getLoadMonitor()->scaleLoads( $loads, $domain );
486 
487  // Pick a server to use, accounting for weights, load, lag, and "waitForPos"
488  $this->lazyLoadReplicationPositions(); // optimizes server candidate selection
489  list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
490  if ( $i === false ) {
491  // DB connection unsuccessful
492  return false;
493  }
494 
495  // If data seen by queries is expected to reflect the transactions committed as of
496  // or after a given replication position then wait for the DB to apply those changes
497  if ( $this->waitForPos && $i !== $this->getWriterIndex() && !$this->doWait( $i ) ) {
498  // Data will be outdated compared to what was expected
499  $laggedReplicaMode = true;
500  }
501 
502  // Cache the reader index for future DB_REPLICA handles
503  $this->setExistingReaderIndex( $group, $i );
504  // Record whether the generic reader index is in "lagged replica DB" mode
505  if ( $group === self::GROUP_GENERIC && $laggedReplicaMode ) {
506  $this->laggedReplicaMode = true;
507  }
508 
509  $serverName = $this->getServerName( $i );
510  $this->connLogger->debug( __METHOD__ . ": using server $serverName for group '$group'" );
511 
512  return $i;
513  }
514 
521  protected function getExistingReaderIndex( $group ) {
522  return $this->readIndexByGroup[$group] ?? -1;
523  }
524 
531  private function setExistingReaderIndex( $group, $index ) {
532  if ( $index < 0 ) {
533  throw new UnexpectedValueException( "Cannot set a negative read server index" );
534  }
535  $this->readIndexByGroup[$group] = $index;
536  }
537 
543  private function pickReaderIndex( array $loads, $domain = false ) {
544  if ( $loads === [] ) {
545  throw new InvalidArgumentException( "Server configuration array is empty" );
546  }
547 
549  $i = false;
551  $laggedReplicaMode = false;
552 
553  // Quickly look through the available servers for a server that meets criteria...
554  $currentLoads = $loads;
555  while ( count( $currentLoads ) ) {
556  if ( $this->allowLagged || $laggedReplicaMode ) {
557  $i = ArrayUtils::pickRandom( $currentLoads );
558  } else {
559  $i = false;
560  if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
561  $this->replLogger->debug( __METHOD__ . ": replication positions detected" );
562  // "chronologyCallback" sets "waitForPos" for session consistency.
563  // This triggers doWait() after connect, so it's especially good to
564  // avoid lagged servers so as to avoid excessive delay in that method.
565  $ago = microtime( true ) - $this->waitForPos->asOfTime();
566  // Aim for <= 1 second of waiting (being too picky can backfire)
567  $i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
568  }
569  if ( $i === false ) {
570  // Any server with less lag than it's 'max lag' param is preferable
571  $i = $this->getRandomNonLagged( $currentLoads, $domain );
572  }
573  if ( $i === false && count( $currentLoads ) ) {
574  // All replica DBs lagged. Switch to read-only mode
575  $this->replLogger->error(
576  __METHOD__ . ": all replica DBs lagged. Switch to read-only mode" );
577  $i = ArrayUtils::pickRandom( $currentLoads );
578  $laggedReplicaMode = true;
579  }
580  }
581 
582  if ( $i === false ) {
583  // pickRandom() returned false.
584  // This is permanent and means the configuration or the load monitor
585  // wants us to return false.
586  $this->connLogger->debug( __METHOD__ . ": pickRandom() returned false" );
587 
588  return [ false, false ];
589  }
590 
591  $serverName = $this->getServerName( $i );
592  $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
593 
594  // Get a connection to this server without triggering other server connections
595  $flags = self::CONN_SILENCE_ERRORS;
596  $conn = $this->getServerConnection( $i, $domain, $flags );
597  if ( !$conn ) {
598  $this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" );
599  unset( $currentLoads[$i] ); // avoid this server next iteration
600  $i = false;
601  continue;
602  }
603 
604  // Decrement reference counter, we are finished with this connection.
605  // It will be incremented for the caller later.
606  if ( $domain !== false ) {
607  $this->reuseConnection( $conn );
608  }
609 
610  // Return this server
611  break;
612  }
613 
614  // If all servers were down, quit now
615  if ( $currentLoads === [] ) {
616  $this->connLogger->error( __METHOD__ . ": all servers down" );
617  }
618 
619  return [ $i, $laggedReplicaMode ];
620  }
621 
622  public function waitFor( $pos ) {
623  $oldPos = $this->waitForPos;
624  try {
625  $this->waitForPos = $pos;
626  // If a generic reader connection was already established, then wait now
627  $i = $this->getExistingReaderIndex( self::GROUP_GENERIC );
628  if ( $i > 0 && !$this->doWait( $i ) ) {
629  $this->laggedReplicaMode = true;
630  }
631  // Otherwise, wait until a connection is established in getReaderIndex()
632  } finally {
633  // Restore the older position if it was higher since this is used for lag-protection
634  $this->setWaitForPositionIfHigher( $oldPos );
635  }
636  }
637 
638  public function waitForOne( $pos, $timeout = null ) {
639  $oldPos = $this->waitForPos;
640  try {
641  $this->waitForPos = $pos;
642 
643  $i = $this->getExistingReaderIndex( self::GROUP_GENERIC );
644  if ( $i <= 0 ) {
645  // Pick a generic replica DB if there isn't one yet
646  $readLoads = $this->groupLoads[self::GROUP_GENERIC];
647  unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
648  $readLoads = array_filter( $readLoads ); // with non-zero load
649  $i = ArrayUtils::pickRandom( $readLoads );
650  }
651 
652  if ( $i > 0 ) {
653  $ok = $this->doWait( $i, true, $timeout );
654  } else {
655  $ok = true; // no applicable loads
656  }
657  } finally {
658  // Restore the old position; this is used for throttling, not lag-protection
659  $this->waitForPos = $oldPos;
660  }
661 
662  return $ok;
663  }
664 
665  public function waitForAll( $pos, $timeout = null ) {
666  $timeout = $timeout ?: $this->waitTimeout;
667 
668  $oldPos = $this->waitForPos;
669  try {
670  $this->waitForPos = $pos;
671  $serverCount = $this->getServerCount();
672 
673  $ok = true;
674  for ( $i = 1; $i < $serverCount; $i++ ) {
675  if ( $this->serverHasLoadInAnyGroup( $i ) ) {
676  $start = microtime( true );
677  $ok = $this->doWait( $i, true, $timeout ) && $ok;
678  $timeout -= intval( microtime( true ) - $start );
679  if ( $timeout <= 0 ) {
680  break; // timeout reached
681  }
682  }
683  }
684  } finally {
685  // Restore the old position; this is used for throttling, not lag-protection
686  $this->waitForPos = $oldPos;
687  }
688 
689  return $ok;
690  }
691 
696  private function serverHasLoadInAnyGroup( $i ) {
697  foreach ( $this->groupLoads as $loadsByIndex ) {
698  if ( ( $loadsByIndex[$i] ?? 0 ) > 0 ) {
699  return true;
700  }
701  }
702 
703  return false;
704  }
705 
709  private function setWaitForPositionIfHigher( $pos ) {
710  if ( !$pos ) {
711  return;
712  }
713 
714  if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
715  $this->waitForPos = $pos;
716  }
717  }
718 
719  public function getAnyOpenConnection( $i, $flags = 0 ) {
720  $i = ( $i === self::DB_MASTER ) ? $this->getWriterIndex() : $i;
721  // Connection handles required to be in auto-commit mode use a separate connection
722  // pool since the main pool is effected by implicit and explicit transaction rounds
723  $autocommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
724 
725  $conn = false;
726  foreach ( $this->conns as $connsByServer ) {
727  // Get the connection array server indexes to inspect
728  if ( $i === self::DB_REPLICA ) {
729  $indexes = array_keys( $connsByServer );
730  } else {
731  $indexes = isset( $connsByServer[$i] ) ? [ $i ] : [];
732  }
733 
734  foreach ( $indexes as $index ) {
735  $conn = $this->pickAnyOpenConnection( $connsByServer[$index], $autocommit );
736  if ( $conn ) {
737  break;
738  }
739  }
740  }
741 
742  if ( $conn ) {
743  $this->enforceConnectionFlags( $conn, $flags );
744  }
745 
746  return $conn;
747  }
748 
754  private function pickAnyOpenConnection( $candidateConns, $autocommit ) {
755  $conn = false;
756 
757  foreach ( $candidateConns as $candidateConn ) {
758  if ( !$candidateConn->isOpen() ) {
759  continue; // some sort of error occured?
760  } elseif (
761  $autocommit &&
762  (
763  // Connection is transaction round aware
764  !$candidateConn->getLBInfo( 'autoCommitOnly' ) ||
765  // Some sort of error left a transaction open?
766  $candidateConn->trxLevel()
767  )
768  ) {
769  continue; // some sort of error left a transaction open?
770  }
771 
772  $conn = $candidateConn;
773  }
774 
775  return $conn;
776  }
777 
785  protected function doWait( $index, $open = false, $timeout = null ) {
786  $timeout = max( 1, intval( $timeout ?: $this->waitTimeout ) );
787 
788  // Check if we already know that the DB has reached this point
789  $server = $this->getServerName( $index );
790  $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server, 'v1' );
792  $knownReachedPos = $this->srvCache->get( $key );
793  if (
794  $knownReachedPos instanceof DBMasterPos &&
795  $knownReachedPos->hasReached( $this->waitForPos )
796  ) {
797  $this->replLogger->debug(
798  __METHOD__ .
799  ': replica DB {dbserver} known to be caught up (pos >= $knownReachedPos).',
800  [ 'dbserver' => $server ]
801  );
802  return true;
803  }
804 
805  // Find a connection to wait on, creating one if needed and allowed
806  $close = false; // close the connection afterwards
807  $flags = self::CONN_SILENCE_ERRORS;
808  $conn = $this->getAnyOpenConnection( $index, $flags );
809  if ( !$conn ) {
810  if ( !$open ) {
811  $this->replLogger->debug(
812  __METHOD__ . ': no connection open for {dbserver}',
813  [ 'dbserver' => $server ]
814  );
815 
816  return false;
817  }
818  // Get a connection to this server without triggering other server connections
819  $conn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
820  if ( !$conn ) {
821  $this->replLogger->warning(
822  __METHOD__ . ': failed to connect to {dbserver}',
823  [ 'dbserver' => $server ]
824  );
825 
826  return false;
827  }
828  // Avoid connection spam in waitForAll() when connections
829  // are made just for the sake of doing this lag check.
830  $close = true;
831  }
832 
833  $this->replLogger->info(
834  __METHOD__ .
835  ': waiting for replica DB {dbserver} to catch up...',
836  [ 'dbserver' => $server ]
837  );
838 
839  $result = $conn->masterPosWait( $this->waitForPos, $timeout );
840 
841  if ( $result === null ) {
842  $this->replLogger->warning(
843  __METHOD__ . ': Errored out waiting on {host} pos {pos}',
844  [
845  'host' => $server,
846  'pos' => $this->waitForPos,
847  'trace' => ( new RuntimeException() )->getTraceAsString()
848  ]
849  );
850  $ok = false;
851  } elseif ( $result == -1 ) {
852  $this->replLogger->warning(
853  __METHOD__ . ': Timed out waiting on {host} pos {pos}',
854  [
855  'host' => $server,
856  'pos' => $this->waitForPos,
857  'trace' => ( new RuntimeException() )->getTraceAsString()
858  ]
859  );
860  $ok = false;
861  } else {
862  $this->replLogger->debug( __METHOD__ . ": done waiting" );
863  $ok = true;
864  // Remember that the DB reached this point
865  $this->srvCache->set( $key, $this->waitForPos, BagOStuff::TTL_DAY );
866  }
867 
868  if ( $close ) {
869  $this->closeConnection( $conn );
870  }
871 
872  return $ok;
873  }
874 
875  public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
876  $domain = $this->resolveDomainID( $domain );
877  $groups = $this->resolveGroups( $groups, $i );
878  $flags = $this->sanitizeConnectionFlags( $flags, $i );
879  // If given DB_MASTER/DB_REPLICA, resolve it to a specific server index. Resolving
880  // DB_REPLICA might trigger getServerConnection() calls due to the getReaderIndex()
881  // connectivity checks or LoadMonitor::scaleLoads() server state cache regeneration.
882  // The use of getServerConnection() instead of getConnection() avoids infinite loops.
883  $serverIndex = $this->getConnectionIndex( $i, $groups, $domain );
884  // Get an open connection to that server (might trigger a new connection)
885  $conn = $this->getServerConnection( $serverIndex, $domain, $flags );
886  // Set master DB handles as read-only if there is high replication lag
887  if ( $serverIndex === $this->getWriterIndex() && $this->getLaggedReplicaMode( $domain ) ) {
888  $reason = ( $this->getExistingReaderIndex( self::GROUP_GENERIC ) >= 0 )
889  ? 'The database is read-only until replication lag decreases.'
890  : 'The database is read-only until replica database servers becomes reachable.';
891  $conn->setLBInfo( 'readOnlyReason', $reason );
892  }
893 
894  return $conn;
895  }
896 
904  public function getServerConnection( $i, $domain, $flags = 0 ) {
905  // Number of connections made before getting the server index and handle
906  $priorConnectionsMade = $this->connectionCounter;
907  // Get an open connection to this server (might trigger a new connection)
908  $conn = $this->localDomain->equals( $domain )
909  ? $this->getLocalConnection( $i, $flags )
910  : $this->getForeignConnection( $i, $domain, $flags );
911  // Throw an error or otherwise bail out if the connection attempt failed
912  if ( !( $conn instanceof IDatabase ) ) {
913  if ( ( $flags & self::CONN_SILENCE_ERRORS ) != self::CONN_SILENCE_ERRORS ) {
914  $this->reportConnectionError();
915  }
916 
917  return false;
918  }
919 
920  // Profile any new connections caused by this method
921  if ( $this->connectionCounter > $priorConnectionsMade ) {
922  $this->trxProfiler->recordConnection(
923  $conn->getServer(),
924  $conn->getDBname(),
925  ( ( $flags & self::CONN_INTENT_WRITABLE ) == self::CONN_INTENT_WRITABLE )
926  );
927  }
928 
929  if ( !$conn->isOpen() ) {
930  $this->errorConnection = $conn;
931  // Connection was made but later unrecoverably lost for some reason.
932  // Do not return a handle that will just throw exceptions on use, but
933  // let the calling code, e.g. getReaderIndex(), try another server.
934  return false;
935  }
936 
937  // Make sure that flags like CONN_TRX_AUTOCOMMIT are respected by this handle
938  $this->enforceConnectionFlags( $conn, $flags );
939  // Set master DB handles as read-only if the load balancer is configured as read-only
940  // or the master database server is running in server-side read-only mode. Note that
941  // replica DB handles are always read-only via Database::assertIsWritableMaster().
942  // Read-only mode due to replication lag is *avoided* here to avoid recursion.
943  if ( $conn->getLBInfo( 'serverIndex' ) === $this->getWriterIndex() ) {
944  if ( $this->readOnlyReason !== false ) {
945  $conn->setLBInfo( 'readOnlyReason', $this->readOnlyReason );
946  } elseif ( $this->masterRunningReadOnly( $domain, $conn ) ) {
947  $conn->setLBInfo(
948  'readOnlyReason',
949  'The master database server is running in read-only mode.'
950  );
951  }
952  }
953 
954  return $conn;
955  }
956 
957  public function reuseConnection( IDatabase $conn ) {
958  $serverIndex = $conn->getLBInfo( 'serverIndex' );
959  $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
960  if ( $serverIndex === null || $refCount === null ) {
971  return;
972  } elseif ( $conn instanceof DBConnRef ) {
973  // DBConnRef already handles calling reuseConnection() and only passes the live
974  // Database instance to this method. Any caller passing in a DBConnRef is broken.
975  $this->connLogger->error(
976  __METHOD__ . ": got DBConnRef instance.\n" .
977  ( new LogicException() )->getTraceAsString() );
978 
979  return;
980  }
981 
982  if ( $this->disabled ) {
983  return; // DBConnRef handle probably survived longer than the LoadBalancer
984  }
985 
986  if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
987  $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
988  $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
989  } else {
990  $connFreeKey = self::KEY_FOREIGN_FREE;
991  $connInUseKey = self::KEY_FOREIGN_INUSE;
992  }
993 
994  $domain = $conn->getDomainID();
995  if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
996  throw new InvalidArgumentException(
997  "Connection $serverIndex/$domain not found; it may have already been freed" );
998  } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
999  throw new InvalidArgumentException(
1000  "Connection $serverIndex/$domain mismatched; it may have already been freed" );
1001  }
1002 
1003  $conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
1004  if ( $refCount <= 0 ) {
1005  $this->conns[$connFreeKey][$serverIndex][$domain] = $conn;
1006  unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
1007  if ( !$this->conns[$connInUseKey][$serverIndex] ) {
1008  unset( $this->conns[$connInUseKey][$serverIndex] ); // clean up
1009  }
1010  $this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
1011  } else {
1012  $this->connLogger->debug( __METHOD__ .
1013  ": reference count for $serverIndex/$domain reduced to $refCount" );
1014  }
1015  }
1016 
1017  public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
1018  $domain = $this->resolveDomainID( $domain );
1019  $role = $this->getRoleFromIndex( $i );
1020 
1021  return new DBConnRef( $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
1022  }
1023 
1024  public function getLazyConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
1025  $domain = $this->resolveDomainID( $domain );
1026  $role = $this->getRoleFromIndex( $i );
1027 
1028  return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role );
1029  }
1030 
1031  public function getMaintenanceConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
1032  $domain = $this->resolveDomainID( $domain );
1033  $role = $this->getRoleFromIndex( $i );
1034 
1035  return new MaintainableDBConnRef(
1036  $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
1037  }
1038 
1043  private function getRoleFromIndex( $i ) {
1044  return ( $i === self::DB_MASTER || $i === $this->getWriterIndex() )
1045  ? self::DB_MASTER
1046  : self::DB_REPLICA;
1047  }
1048 
1056  public function openConnection( $i, $domain = false, $flags = 0 ) {
1057  return $this->getConnection( $i, [], $domain, $flags | self::CONN_SILENCE_ERRORS );
1058  }
1059 
1074  private function getLocalConnection( $i, $flags = 0 ) {
1075  // Connection handles required to be in auto-commit mode use a separate connection
1076  // pool since the main pool is effected by implicit and explicit transaction rounds
1077  $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
1078 
1079  $connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
1080  if ( isset( $this->conns[$connKey][$i][0] ) ) {
1081  $conn = $this->conns[$connKey][$i][0];
1082  } else {
1083  // Open a new connection
1084  $server = $this->getServerInfoStrict( $i );
1085  $server['serverIndex'] = $i;
1086  $server['autoCommitOnly'] = $autoCommit;
1087  $conn = $this->reallyOpenConnection( $server, $this->localDomain );
1088  $host = $this->getServerName( $i );
1089  if ( $conn->isOpen() ) {
1090  $this->connLogger->debug(
1091  __METHOD__ . ": connected to database $i at '$host'." );
1092  $this->conns[$connKey][$i][0] = $conn;
1093  } else {
1094  $this->connLogger->warning(
1095  __METHOD__ . ": failed to connect to database $i at '$host'." );
1096  $this->errorConnection = $conn;
1097  $conn = false;
1098  }
1099  }
1100 
1101  // Final sanity check to make sure the right domain is selected
1102  if (
1103  $conn instanceof IDatabase &&
1104  !$this->localDomain->isCompatible( $conn->getDomainID() )
1105  ) {
1106  throw new UnexpectedValueException(
1107  "Got connection to '{$conn->getDomainID()}', " .
1108  "but expected local domain ('{$this->localDomain}')" );
1109  }
1110 
1111  return $conn;
1112  }
1113 
1138  private function getForeignConnection( $i, $domain, $flags = 0 ) {
1139  $domainInstance = DatabaseDomain::newFromId( $domain );
1140  // Connection handles required to be in auto-commit mode use a separate connection
1141  // pool since the main pool is effected by implicit and explicit transaction rounds
1142  $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
1143 
1144  if ( $autoCommit ) {
1145  $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
1146  $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
1147  } else {
1148  $connFreeKey = self::KEY_FOREIGN_FREE;
1149  $connInUseKey = self::KEY_FOREIGN_INUSE;
1150  }
1151 
1153  $conn = null;
1154 
1155  if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
1156  // Reuse an in-use connection for the same domain
1157  $conn = $this->conns[$connInUseKey][$i][$domain];
1158  $this->connLogger->debug( __METHOD__ . ": reusing connection $i/$domain" );
1159  } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
1160  // Reuse a free connection for the same domain
1161  $conn = $this->conns[$connFreeKey][$i][$domain];
1162  unset( $this->conns[$connFreeKey][$i][$domain] );
1163  $this->conns[$connInUseKey][$i][$domain] = $conn;
1164  $this->connLogger->debug( __METHOD__ . ": reusing free connection $i/$domain" );
1165  } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
1166  // Reuse a free connection from another domain if possible
1167  foreach ( $this->conns[$connFreeKey][$i] as $oldDomain => $conn ) {
1168  if ( $domainInstance->getDatabase() !== null ) {
1169  // Check if changing the database will require a new connection.
1170  // In that case, leave the connection handle alone and keep looking.
1171  // This prevents connections from being closed mid-transaction and can
1172  // also avoid overhead if the same database will later be requested.
1173  if (
1174  $conn->databasesAreIndependent() &&
1175  $conn->getDBname() !== $domainInstance->getDatabase()
1176  ) {
1177  continue;
1178  }
1179  // Select the new database, schema, and prefix
1180  $conn->selectDomain( $domainInstance );
1181  } else {
1182  // Stay on the current database, but update the schema/prefix
1183  $conn->dbSchema( $domainInstance->getSchema() );
1184  $conn->tablePrefix( $domainInstance->getTablePrefix() );
1185  }
1186  unset( $this->conns[$connFreeKey][$i][$oldDomain] );
1187  // Note that if $domain is an empty string, getDomainID() might not match it
1188  $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1189  $this->connLogger->debug( __METHOD__ .
1190  ": reusing free connection from $oldDomain for $domain" );
1191  break;
1192  }
1193  }
1194 
1195  if ( !$conn ) {
1196  // Open a new connection
1197  $server = $this->getServerInfoStrict( $i );
1198  $server['serverIndex'] = $i;
1199  $server['foreignPoolRefCount'] = 0;
1200  $server['foreign'] = true;
1201  $server['autoCommitOnly'] = $autoCommit;
1202  $conn = $this->reallyOpenConnection( $server, $domainInstance );
1203  if ( !$conn->isOpen() ) {
1204  $this->connLogger->warning( __METHOD__ . ": connection error for $i/$domain" );
1205  $this->errorConnection = $conn;
1206  $conn = false;
1207  } else {
1208  // Note that if $domain is an empty string, getDomainID() might not match it
1209  $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1210  $this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
1211  }
1212  }
1213 
1214  if ( $conn instanceof IDatabase ) {
1215  // Final sanity check to make sure the right domain is selected
1216  if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
1217  throw new UnexpectedValueException(
1218  "Got connection to '{$conn->getDomainID()}', but expected '$domain'" );
1219  }
1220  // Increment reference count
1221  $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
1222  $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
1223  }
1224 
1225  return $conn;
1226  }
1227 
1228  public function getServerAttributes( $i ) {
1230  $this->getServerType( $i ),
1231  $this->servers[$i]['driver'] ?? null
1232  );
1233  }
1234 
1241  private function isOpen( $index ) {
1242  return (bool)$this->getAnyOpenConnection( $index );
1243  }
1244 
1256  protected function reallyOpenConnection( array $server, DatabaseDomain $domain ) {
1257  if ( $this->disabled ) {
1258  throw new DBAccessError();
1259  }
1260 
1261  if ( $domain->getDatabase() === null ) {
1262  // The database domain does not specify a DB name and some database systems require a
1263  // valid DB specified on connection. The $server configuration array contains a default
1264  // DB name to use for connections in such cases.
1265  if ( $server['type'] === 'mysql' ) {
1266  // For MySQL, DATABASE and SCHEMA are synonyms, connections need not specify a DB,
1267  // and the DB name in $server might not exist due to legacy reasons (the default
1268  // domain used to ignore the local LB domain, even when mismatched).
1269  $server['dbname'] = null;
1270  }
1271  } else {
1272  $server['dbname'] = $domain->getDatabase();
1273  }
1274 
1275  if ( $domain->getSchema() !== null ) {
1276  $server['schema'] = $domain->getSchema();
1277  }
1278 
1279  // It is always possible to connect with any prefix, even the empty string
1280  $server['tablePrefix'] = $domain->getTablePrefix();
1281 
1282  // Let the handle know what the cluster master is (e.g. "db1052")
1283  $masterName = $this->getServerName( $this->getWriterIndex() );
1284  $server['clusterMasterHost'] = $masterName;
1285 
1286  $server['srvCache'] = $this->srvCache;
1287  // Set loggers and profilers
1288  $server['connLogger'] = $this->connLogger;
1289  $server['queryLogger'] = $this->queryLogger;
1290  $server['errorLogger'] = $this->errorLogger;
1291  $server['deprecationLogger'] = $this->deprecationLogger;
1292  $server['profiler'] = $this->profiler;
1293  $server['trxProfiler'] = $this->trxProfiler;
1294  // Use the same agent and PHP mode for all DB handles
1295  $server['cliMode'] = $this->cliMode;
1296  $server['agent'] = $this->agent;
1297  // Use DBO_DEFAULT flags by default for LoadBalancer managed databases. Assume that the
1298  // application calls LoadBalancer::commitMasterChanges() before the PHP script completes.
1299  $server['flags'] = $server['flags'] ?? IDatabase::DBO_DEFAULT;
1300 
1301  // Create a live connection object
1302  try {
1303  $db = Database::factory( $server['type'], $server );
1304  // Log when many connection are made on requests
1306  $currentConnCount = $this->getCurrentConnectionCount();
1307  if ( $currentConnCount >= self::CONN_HELD_WARN_THRESHOLD ) {
1308  $this->perfLogger->warning(
1309  __METHOD__ . ": {connections}+ connections made (master={masterdb})",
1310  [ 'connections' => $currentConnCount, 'masterdb' => $masterName ]
1311  );
1312  }
1313  } catch ( DBConnectionError $e ) {
1314  // FIXME: This is probably the ugliest thing I have ever done to
1315  // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
1316  $db = $e->db;
1317  }
1318 
1319  $db->setLBInfo( $server );
1320  $db->setLazyMasterHandle(
1321  $this->getLazyConnectionRef( self::DB_MASTER, [], $db->getDomainID() )
1322  );
1323  $db->setTableAliases( $this->tableAliases );
1324  $db->setIndexAliases( $this->indexAliases );
1325 
1326  if ( $server['serverIndex'] === $this->getWriterIndex() ) {
1327  if ( $this->trxRoundId !== false ) {
1328  $this->applyTransactionRoundFlags( $db );
1329  }
1330  foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
1331  $db->setTransactionListener( $name, $callback );
1332  }
1333  }
1334 
1335  $this->lazyLoadReplicationPositions(); // session consistency
1336 
1337  return $db;
1338  }
1339 
1343  private function lazyLoadReplicationPositions() {
1344  if ( !$this->connectionAttempted && $this->chronologyCallback ) {
1345  $this->connectionAttempted = true;
1346  ( $this->chronologyCallback )( $this ); // generally calls waitFor()
1347  $this->connLogger->debug( __METHOD__ . ': executed chronology callback.' );
1348  }
1349  }
1350 
1354  private function reportConnectionError() {
1355  $conn = $this->errorConnection; // the connection which caused the error
1356  $context = [
1357  'method' => __METHOD__,
1358  'last_error' => $this->lastError,
1359  ];
1360 
1361  if ( $conn instanceof IDatabase ) {
1362  $context['db_server'] = $conn->getServer();
1363  $this->connLogger->warning(
1364  __METHOD__ . ": connection error: {last_error} ({db_server})",
1365  $context
1366  );
1367 
1368  throw new DBConnectionError( $conn, "{$this->lastError} ({$context['db_server']})" );
1369  } else {
1370  // No last connection, probably due to all servers being too busy
1371  $this->connLogger->error(
1372  __METHOD__ .
1373  ": LB failure with no last connection. Connection error: {last_error}",
1374  $context
1375  );
1376 
1377  // If all servers were busy, "lastError" will contain something sensible
1378  throw new DBConnectionError( null, $this->lastError );
1379  }
1380  }
1381 
1382  public function getWriterIndex() {
1383  return 0;
1384  }
1385 
1393  public function haveIndex( $i ) {
1394  return array_key_exists( $i, $this->servers );
1395  }
1396 
1404  public function isNonZeroLoad( $i ) {
1405  return ( isset( $this->servers[$i] ) && $this->groupLoads[self::GROUP_GENERIC][$i] > 0 );
1406  }
1407 
1408  public function getServerCount() {
1409  return count( $this->servers );
1410  }
1411 
1412  public function hasReplicaServers() {
1413  return ( $this->getServerCount() > 1 );
1414  }
1415 
1416  public function hasStreamingReplicaServers() {
1417  foreach ( $this->servers as $i => $server ) {
1418  if ( $i !== $this->getWriterIndex() && empty( $server['is static'] ) ) {
1419  return true;
1420  }
1421  }
1422 
1423  return false;
1424  }
1425 
1426  public function getServerName( $i ) {
1427  $name = $this->servers[$i]['hostName'] ?? ( $this->servers[$i]['host'] ?? '' );
1428 
1429  return ( $name != '' ) ? $name : 'localhost';
1430  }
1431 
1432  public function getServerInfo( $i ) {
1433  return $this->servers[$i] ?? false;
1434  }
1435 
1436  public function getServerType( $i ) {
1437  return $this->servers[$i]['type'] ?? 'unknown';
1438  }
1439 
1440  public function getMasterPos() {
1441  $index = $this->getWriterIndex();
1442 
1443  $conn = $this->getAnyOpenConnection( $index );
1444  if ( $conn ) {
1445  return $conn->getMasterPos();
1446  }
1447 
1448  $conn = $this->getConnection( $index, self::CONN_SILENCE_ERRORS );
1449  if ( !$conn ) {
1450  $this->reportConnectionError();
1451  return null; // unreachable due to exception
1452  }
1453 
1454  try {
1455  $pos = $conn->getMasterPos();
1456  } finally {
1457  $this->closeConnection( $conn );
1458  }
1459 
1460  return $pos;
1461  }
1462 
1463  public function getReplicaResumePos() {
1464  // Get the position of any existing master server connection
1465  $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1466  if ( $masterConn ) {
1467  return $masterConn->getMasterPos();
1468  }
1469 
1470  // Get the highest position of any existing replica server connection
1471  $highestPos = false;
1472  $serverCount = $this->getServerCount();
1473  for ( $i = 1; $i < $serverCount; $i++ ) {
1474  if ( !empty( $this->servers[$i]['is static'] ) ) {
1475  continue; // server does not use replication
1476  }
1477 
1478  $conn = $this->getAnyOpenConnection( $i );
1479  $pos = $conn ? $conn->getReplicaPos() : false;
1480  if ( !$pos ) {
1481  continue; // no open connection or could not get position
1482  }
1483 
1484  $highestPos = $highestPos ?: $pos;
1485  if ( $pos->hasReached( $highestPos ) ) {
1486  $highestPos = $pos;
1487  }
1488  }
1489 
1490  return $highestPos;
1491  }
1492 
1493  public function disable() {
1494  $this->closeAll();
1495  $this->disabled = true;
1496  }
1497 
1498  public function closeAll() {
1499  $fname = __METHOD__;
1500  $this->forEachOpenConnection( function ( IDatabase $conn ) use ( $fname ) {
1501  $host = $conn->getServer();
1502  $this->connLogger->debug(
1503  $fname . ": closing connection to database '$host'." );
1504  $conn->close();
1505  } );
1506 
1507  $this->conns = self::newTrackedConnectionsArray();
1508  }
1509 
1510  public function closeConnection( IDatabase $conn ) {
1511  if ( $conn instanceof DBConnRef ) {
1512  // Avoid calling close() but still leaving the handle in the pool
1513  throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' );
1514  }
1515 
1516  $serverIndex = $conn->getLBInfo( 'serverIndex' );
1517  foreach ( $this->conns as $type => $connsByServer ) {
1518  if ( !isset( $connsByServer[$serverIndex] ) ) {
1519  continue;
1520  }
1521 
1522  foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
1523  if ( $conn === $trackedConn ) {
1524  $host = $this->getServerName( $i );
1525  $this->connLogger->debug(
1526  __METHOD__ . ": closing connection to database $i at '$host'." );
1527  unset( $this->conns[$type][$serverIndex][$i] );
1528  break 2;
1529  }
1530  }
1531  }
1532 
1533  $conn->close();
1534  }
1535 
1536  public function commitAll( $fname = __METHOD__, $owner = null ) {
1537  $this->commitMasterChanges( $fname, $owner );
1538  $this->flushMasterSnapshots( $fname );
1539  $this->flushReplicaSnapshots( $fname );
1540  }
1541 
1542  public function finalizeMasterChanges( $fname = __METHOD__, $owner = null ) {
1543  $this->assertOwnership( $fname, $owner );
1544  $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
1545 
1546  $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1547  // Loop until callbacks stop adding callbacks on other connections
1548  $total = 0;
1549  do {
1550  $count = 0; // callbacks execution attempts
1551  $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$count ) {
1552  // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
1553  // Any error should cause all (peer) transactions to be rolled back together.
1554  $count += $conn->runOnTransactionPreCommitCallbacks();
1555  } );
1556  $total += $count;
1557  } while ( $count > 0 );
1558  // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
1559  $this->forEachOpenMasterConnection( function ( Database $conn ) {
1560  $conn->setTrxEndCallbackSuppression( true );
1561  } );
1562  $this->trxRoundStage = self::ROUND_FINALIZED;
1563 
1564  return $total;
1565  }
1566 
1567  public function approveMasterChanges( array $options, $fname = __METHOD__, $owner = null ) {
1568  $this->assertOwnership( $fname, $owner );
1569  $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
1570 
1571  $limit = $options['maxWriteDuration'] ?? 0;
1572 
1573  $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1574  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
1575  // If atomic sections or explicit transactions are still open, some caller must have
1576  // caught an exception but failed to properly rollback any changes. Detect that and
1577  // throw and error (causing rollback).
1578  $conn->assertNoOpenTransactions();
1579  // Assert that the time to replicate the transaction will be sane.
1580  // If this fails, then all DB transactions will be rollback back together.
1581  $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1582  if ( $limit > 0 && $time > $limit ) {
1583  throw new DBTransactionSizeError(
1584  $conn,
1585  "Transaction spent $time second(s) in writes, exceeding the limit of $limit",
1586  [ $time, $limit ]
1587  );
1588  }
1589  // If a connection sits idle while slow queries execute on another, that connection
1590  // may end up dropped before the commit round is reached. Ping servers to detect this.
1591  if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1592  throw new DBTransactionError(
1593  $conn,
1594  "A connection to the {$conn->getDBname()} database was lost before commit"
1595  );
1596  }
1597  } );
1598  $this->trxRoundStage = self::ROUND_APPROVED;
1599  }
1600 
1601  public function beginMasterChanges( $fname = __METHOD__, $owner = null ) {
1602  $this->assertOwnership( $fname, $owner );
1603  if ( $this->trxRoundId !== false ) {
1604  throw new DBTransactionError(
1605  null,
1606  "$fname: Transaction round '{$this->trxRoundId}' already started"
1607  );
1608  }
1609  $this->assertTransactionRoundStage( self::ROUND_CURSORY );
1610 
1611  // Clear any empty transactions (no writes/callbacks) from the implicit round
1612  $this->flushMasterSnapshots( $fname );
1613 
1614  $this->trxRoundId = $fname;
1615  $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1616  // Mark applicable handles as participating in this explicit transaction round.
1617  // For each of these handles, any writes and callbacks will be tied to a single
1618  // transaction. The (peer) handles will reject begin()/commit() calls unless they
1619  // are part of an en masse commit or an en masse rollback.
1620  $this->forEachOpenMasterConnection( function ( Database $conn ) {
1621  $this->applyTransactionRoundFlags( $conn );
1622  } );
1623  $this->trxRoundStage = self::ROUND_CURSORY;
1624  }
1625 
1626  public function commitMasterChanges( $fname = __METHOD__, $owner = null ) {
1627  $this->assertOwnership( $fname, $owner );
1628  $this->assertTransactionRoundStage( self::ROUND_APPROVED );
1629 
1630  $failures = [];
1631 
1633  $scope = ScopedCallback::newScopedIgnoreUserAbort(); // try to ignore client aborts
1634 
1635  $restore = ( $this->trxRoundId !== false );
1636  $this->trxRoundId = false;
1637  $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1638  // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
1639  // Note that callbacks should already be suppressed due to finalizeMasterChanges().
1641  function ( IDatabase $conn ) use ( $fname, &$failures ) {
1642  try {
1643  $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1644  } catch ( DBError $e ) {
1645  ( $this->errorLogger )( $e );
1646  $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1647  }
1648  }
1649  );
1650  if ( $failures ) {
1651  throw new DBTransactionError(
1652  null,
1653  "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1654  );
1655  }
1656  if ( $restore ) {
1657  // Unmark handles as participating in this explicit transaction round
1658  $this->forEachOpenMasterConnection( function ( Database $conn ) {
1659  $this->undoTransactionRoundFlags( $conn );
1660  } );
1661  }
1662  $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
1663  }
1664 
1665  public function runMasterTransactionIdleCallbacks( $fname = __METHOD__, $owner = null ) {
1666  $this->assertOwnership( $fname, $owner );
1667  if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1668  $type = IDatabase::TRIGGER_COMMIT;
1669  } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1670  $type = IDatabase::TRIGGER_ROLLBACK;
1671  } else {
1672  throw new DBTransactionError(
1673  null,
1674  "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1675  );
1676  }
1677 
1678  $oldStage = $this->trxRoundStage;
1679  $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1680 
1681  // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
1682  $this->forEachOpenMasterConnection( function ( Database $conn ) {
1683  $conn->setTrxEndCallbackSuppression( false );
1684  } );
1685 
1686  $e = null; // first exception
1687  $fname = __METHOD__;
1688  // Loop until callbacks stop adding callbacks on other connections
1689  do {
1690  // Run any pending callbacks for each connection...
1691  $count = 0; // callback execution attempts
1693  function ( Database $conn ) use ( $type, &$e, &$count ) {
1694  if ( $conn->trxLevel() ) {
1695  return; // retry in the next iteration, after commit() is called
1696  }
1697  try {
1698  $count += $conn->runOnTransactionIdleCallbacks( $type );
1699  } catch ( Exception $ex ) {
1700  $e = $e ?: $ex;
1701  }
1702  }
1703  );
1704  // Clear out any active transactions left over from callbacks...
1705  $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$e, $fname ) {
1706  if ( $conn->writesPending() ) {
1707  // A callback from another handle wrote to this one and DBO_TRX is set
1708  $this->queryLogger->warning( $fname . ": found writes pending." );
1709  $fnames = implode( ', ', $conn->pendingWriteAndCallbackCallers() );
1710  $this->queryLogger->warning(
1711  $fname . ": found writes pending ($fnames).",
1712  [
1713  'db_server' => $conn->getServer(),
1714  'db_name' => $conn->getDBname()
1715  ]
1716  );
1717  } elseif ( $conn->trxLevel() ) {
1718  // A callback from another handle read from this one and DBO_TRX is set,
1719  // which can easily happen if there is only one DB (no replicas)
1720  $this->queryLogger->debug( $fname . ": found empty transaction." );
1721  }
1722  try {
1723  $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1724  } catch ( Exception $ex ) {
1725  $e = $e ?: $ex;
1726  }
1727  } );
1728  } while ( $count > 0 );
1729 
1730  $this->trxRoundStage = $oldStage;
1731 
1732  return $e;
1733  }
1734 
1735  public function runMasterTransactionListenerCallbacks( $fname = __METHOD__, $owner = null ) {
1736  $this->assertOwnership( $fname, $owner );
1737  if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1738  $type = IDatabase::TRIGGER_COMMIT;
1739  } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1740  $type = IDatabase::TRIGGER_ROLLBACK;
1741  } else {
1742  throw new DBTransactionError(
1743  null,
1744  "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1745  );
1746  }
1747 
1748  $e = null;
1749 
1750  $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1751  $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
1752  try {
1754  } catch ( Exception $ex ) {
1755  $e = $e ?: $ex;
1756  }
1757  } );
1758  $this->trxRoundStage = self::ROUND_CURSORY;
1759 
1760  return $e;
1761  }
1762 
1763  public function rollbackMasterChanges( $fname = __METHOD__, $owner = null ) {
1764  $this->assertOwnership( $fname, $owner );
1765 
1766  $restore = ( $this->trxRoundId !== false );
1767  $this->trxRoundId = false;
1768  $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1769  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
1770  $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1771  } );
1772  if ( $restore ) {
1773  // Unmark handles as participating in this explicit transaction round
1774  $this->forEachOpenMasterConnection( function ( Database $conn ) {
1775  $this->undoTransactionRoundFlags( $conn );
1776  } );
1777  }
1778  $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
1779  }
1780 
1785  private function assertTransactionRoundStage( $stage ) {
1786  $stages = (array)$stage;
1787 
1788  if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
1789  $stageList = implode(
1790  '/',
1791  array_map( function ( $v ) {
1792  return "'$v'";
1793  }, $stages )
1794  );
1795  throw new DBTransactionError(
1796  null,
1797  "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
1798  );
1799  }
1800  }
1801 
1807  private function assertOwnership( $fname, $owner ) {
1808  if ( $this->ownerId !== null && $owner !== $this->ownerId ) {
1809  throw new DBTransactionError(
1810  null,
1811  "$fname: LoadBalancer is owned by LBFactory #{$this->ownerId} (got '$owner')."
1812  );
1813  }
1814  }
1815 
1825  private function applyTransactionRoundFlags( Database $conn ) {
1826  if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
1827  return; // transaction rounds do not apply to these connections
1828  }
1829 
1830  if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1831  // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1832  // Force DBO_TRX even in CLI mode since a commit round is expected soon.
1833  $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
1834  }
1835 
1836  if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1837  $conn->setLBInfo( 'trxRoundId', $this->trxRoundId );
1838  }
1839  }
1840 
1844  private function undoTransactionRoundFlags( Database $conn ) {
1845  if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
1846  return; // transaction rounds do not apply to these connections
1847  }
1848 
1849  if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1850  $conn->setLBInfo( 'trxRoundId', null ); // remove the round ID
1851  }
1852 
1853  if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1854  $conn->restoreFlags( $conn::RESTORE_PRIOR );
1855  }
1856  }
1857 
1858  public function flushReplicaSnapshots( $fname = __METHOD__ ) {
1859  $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) use ( $fname ) {
1860  $conn->flushSnapshot( $fname );
1861  } );
1862  }
1863 
1864  public function flushMasterSnapshots( $fname = __METHOD__ ) {
1865  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
1866  $conn->flushSnapshot( $fname );
1867  } );
1868  }
1869 
1874  public function getTransactionRoundStage() {
1875  return $this->trxRoundStage;
1876  }
1877 
1878  public function hasMasterConnection() {
1879  return $this->isOpen( $this->getWriterIndex() );
1880  }
1881 
1882  public function hasMasterChanges() {
1883  $pending = 0;
1884  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$pending ) {
1885  $pending |= $conn->writesOrCallbacksPending();
1886  } );
1887 
1888  return (bool)$pending;
1889  }
1890 
1891  public function lastMasterChangeTimestamp() {
1892  $lastTime = false;
1893  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$lastTime ) {
1894  $lastTime = max( $lastTime, $conn->lastDoneWrites() );
1895  } );
1896 
1897  return $lastTime;
1898  }
1899 
1900  public function hasOrMadeRecentMasterChanges( $age = null ) {
1901  $age = ( $age === null ) ? $this->waitTimeout : $age;
1902 
1903  return ( $this->hasMasterChanges()
1904  || $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
1905  }
1906 
1907  public function pendingMasterChangeCallers() {
1908  $fnames = [];
1909  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$fnames ) {
1910  $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1911  } );
1912 
1913  return $fnames;
1914  }
1915 
1916  public function getLaggedReplicaMode( $domain = false ) {
1917  if ( $this->laggedReplicaMode ) {
1918  return true; // stay in lagged replica mode
1919  }
1920 
1921  if ( $this->hasStreamingReplicaServers() ) {
1922  try {
1923  // Set "laggedReplicaMode"
1924  $this->getReaderIndex( self::GROUP_GENERIC, $domain );
1925  } catch ( DBConnectionError $e ) {
1926  // Sanity: avoid expensive re-connect attempts and failures
1927  $this->laggedReplicaMode = true;
1928  }
1929  }
1930 
1931  return $this->laggedReplicaMode;
1932  }
1933 
1934  public function laggedReplicaUsed() {
1935  return $this->laggedReplicaMode;
1936  }
1937 
1938  public function getReadOnlyReason( $domain = false, IDatabase $conn = null ) {
1939  if ( $this->readOnlyReason !== false ) {
1940  return $this->readOnlyReason;
1941  } elseif ( $this->masterRunningReadOnly( $domain, $conn ) ) {
1942  return 'The master database server is running in read-only mode.';
1943  } elseif ( $this->getLaggedReplicaMode( $domain ) ) {
1944  return ( $this->getExistingReaderIndex( self::GROUP_GENERIC ) >= 0 )
1945  ? 'The database is read-only until replication lag decreases.'
1946  : 'The database is read-only until a replica database server becomes reachable.';
1947  }
1948 
1949  return false;
1950  }
1951 
1957  private function masterRunningReadOnly( $domain, IDatabase $conn = null ) {
1959  $masterServer = $this->getServerName( $this->getWriterIndex() );
1960 
1961  return (bool)$cache->getWithSetCallback(
1962  $cache->makeGlobalKey( __CLASS__, 'server-read-only', $masterServer ),
1963  self::TTL_CACHE_READONLY,
1964  function () use ( $domain, $conn ) {
1965  $old = $this->trxProfiler->setSilenced( true );
1966  try {
1967  $index = $this->getWriterIndex();
1968  $dbw = $conn ?: $this->getServerConnection( $index, $domain );
1969  $readOnly = (int)$dbw->serverIsReadOnly();
1970  if ( !$conn ) {
1971  $this->reuseConnection( $dbw );
1972  }
1973  } catch ( DBError $e ) {
1974  $readOnly = 0;
1975  }
1976  $this->trxProfiler->setSilenced( $old );
1977 
1978  return $readOnly;
1979  },
1980  [ 'pcTTL' => $cache::TTL_PROC_LONG, 'busyValue' => 0 ]
1981  );
1982  }
1983 
1984  public function allowLagged( $mode = null ) {
1985  if ( $mode === null ) {
1986  return $this->allowLagged;
1987  }
1988  $this->allowLagged = $mode;
1989 
1990  return $this->allowLagged;
1991  }
1992 
1993  public function pingAll() {
1994  $success = true;
1995  $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$success ) {
1996  if ( !$conn->ping() ) {
1997  $success = false;
1998  }
1999  } );
2000 
2001  return $success;
2002  }
2003 
2004  public function forEachOpenConnection( $callback, array $params = [] ) {
2005  foreach ( $this->conns as $connsByServer ) {
2006  foreach ( $connsByServer as $serverConns ) {
2007  foreach ( $serverConns as $conn ) {
2008  $callback( $conn, ...$params );
2009  }
2010  }
2011  }
2012  }
2013 
2014  public function forEachOpenMasterConnection( $callback, array $params = [] ) {
2015  $masterIndex = $this->getWriterIndex();
2016  foreach ( $this->conns as $connsByServer ) {
2017  if ( isset( $connsByServer[$masterIndex] ) ) {
2019  foreach ( $connsByServer[$masterIndex] as $conn ) {
2020  $callback( $conn, ...$params );
2021  }
2022  }
2023  }
2024  }
2025 
2026  public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
2027  foreach ( $this->conns as $connsByServer ) {
2028  foreach ( $connsByServer as $i => $serverConns ) {
2029  if ( $i === $this->getWriterIndex() ) {
2030  continue; // skip master
2031  }
2032  foreach ( $serverConns as $conn ) {
2033  $callback( $conn, ...$params );
2034  }
2035  }
2036  }
2037  }
2038 
2042  private function getCurrentConnectionCount() {
2043  $count = 0;
2044  foreach ( $this->conns as $connsByServer ) {
2045  foreach ( $connsByServer as $serverConns ) {
2046  $count += count( $serverConns );
2047  }
2048  }
2049 
2050  return $count;
2051  }
2052 
2053  public function getMaxLag( $domain = false ) {
2054  $host = '';
2055  $maxLag = -1;
2056  $maxIndex = 0;
2057 
2058  if ( $this->hasReplicaServers() ) {
2059  $lagTimes = $this->getLagTimes( $domain );
2060  foreach ( $lagTimes as $i => $lag ) {
2061  if ( $this->groupLoads[self::GROUP_GENERIC][$i] > 0 && $lag > $maxLag ) {
2062  $maxLag = $lag;
2063  $host = $this->getServerInfoStrict( $i, 'host' );
2064  $maxIndex = $i;
2065  }
2066  }
2067  }
2068 
2069  return [ $host, $maxLag, $maxIndex ];
2070  }
2071 
2072  public function getLagTimes( $domain = false ) {
2073  if ( !$this->hasReplicaServers() ) {
2074  return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
2075  }
2076 
2077  $knownLagTimes = []; // map of (server index => 0 seconds)
2078  $indexesWithLag = [];
2079  foreach ( $this->servers as $i => $server ) {
2080  if ( empty( $server['is static'] ) ) {
2081  $indexesWithLag[] = $i; // DB server might have replication lag
2082  } else {
2083  $knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
2084  }
2085  }
2086 
2087  return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
2088  }
2089 
2105  public function safeGetLag( IDatabase $conn ) {
2106  if ( $conn->getLBInfo( 'is static' ) ) {
2107  return 0; // static dataset
2108  } elseif ( $conn->getLBInfo( 'serverIndex' ) == $this->getWriterIndex() ) {
2109  return 0; // this is the master
2110  }
2111 
2112  return $conn->getLag();
2113  }
2114 
2115  public function waitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
2116  $timeout = max( 1, $timeout ?: $this->waitTimeout );
2117 
2118  if ( $this->getServerCount() <= 1 || !$conn->getLBInfo( 'replica' ) ) {
2119  return true; // server is not a replica DB
2120  }
2121 
2122  if ( !$pos ) {
2123  // Get the current master position, opening a connection if needed
2124  $index = $this->getWriterIndex();
2125  $flags = self::CONN_SILENCE_ERRORS;
2126  $masterConn = $this->getAnyOpenConnection( $index, $flags );
2127  if ( $masterConn ) {
2128  $pos = $masterConn->getMasterPos();
2129  } else {
2130  $masterConn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
2131  if ( !$masterConn ) {
2132  throw new DBReplicationWaitError(
2133  null,
2134  "Could not obtain a master database connection to get the position"
2135  );
2136  }
2137  $pos = $masterConn->getMasterPos();
2138  $this->closeConnection( $masterConn );
2139  }
2140  }
2141 
2142  if ( $pos instanceof DBMasterPos ) {
2143  $start = microtime( true );
2144  $result = $conn->masterPosWait( $pos, $timeout );
2145  $seconds = max( microtime( true ) - $start, 0 );
2146  if ( $result == -1 || is_null( $result ) ) {
2147  $msg = __METHOD__ . ': timed out waiting on {host} pos {pos} [{seconds}s]';
2148  $this->replLogger->warning( $msg, [
2149  'host' => $conn->getServer(),
2150  'pos' => $pos,
2151  'seconds' => round( $seconds, 6 ),
2152  'trace' => ( new RuntimeException() )->getTraceAsString()
2153  ] );
2154  $ok = false;
2155  } else {
2156  $this->replLogger->debug( __METHOD__ . ': done waiting' );
2157  $ok = true;
2158  }
2159  } else {
2160  $ok = false; // something is misconfigured
2161  $this->replLogger->error(
2162  __METHOD__ . ': could not get master pos for {host}',
2163  [
2164  'host' => $conn->getServer(),
2165  'trace' => ( new RuntimeException() )->getTraceAsString()
2166  ]
2167  );
2168  }
2169 
2170  return $ok;
2171  }
2172 
2185  public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
2186  return $this->waitForMasterPos( $conn, $pos, $timeout );
2187  }
2188 
2189  public function setTransactionListener( $name, callable $callback = null ) {
2190  if ( $callback ) {
2191  $this->trxRecurringCallbacks[$name] = $callback;
2192  } else {
2193  unset( $this->trxRecurringCallbacks[$name] );
2194  }
2196  function ( IDatabase $conn ) use ( $name, $callback ) {
2197  $conn->setTransactionListener( $name, $callback );
2198  }
2199  );
2200  }
2201 
2202  public function setTableAliases( array $aliases ) {
2203  $this->tableAliases = $aliases;
2204  }
2205 
2206  public function setIndexAliases( array $aliases ) {
2207  $this->indexAliases = $aliases;
2208  }
2209 
2210  public function setLocalDomainPrefix( $prefix ) {
2211  // Find connections to explicit foreign domains still marked as in-use...
2212  $domainsInUse = [];
2213  $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$domainsInUse ) {
2214  // Once reuseConnection() is called on a handle, its reference count goes from 1 to 0.
2215  // Until then, it is still in use by the caller (explicitly or via DBConnRef scope).
2216  if ( $conn->getLBInfo( 'foreignPoolRefCount' ) > 0 ) {
2217  $domainsInUse[] = $conn->getDomainID();
2218  }
2219  } );
2220 
2221  // Do not switch connections to explicit foreign domains unless marked as safe
2222  if ( $domainsInUse ) {
2223  $domains = implode( ', ', $domainsInUse );
2224  throw new DBUnexpectedError( null,
2225  "Foreign domain connections are still in use ($domains)" );
2226  }
2227 
2228  $this->setLocalDomain( new DatabaseDomain(
2229  $this->localDomain->getDatabase(),
2230  $this->localDomain->getSchema(),
2231  $prefix
2232  ) );
2233 
2234  // Update the prefix for all local connections...
2235  $this->forEachOpenConnection( function ( IDatabase $db ) use ( $prefix ) {
2236  if ( !$db->getLBInfo( 'foreign' ) ) {
2237  $db->tablePrefix( $prefix );
2238  }
2239  } );
2240  }
2241 
2242  public function redefineLocalDomain( $domain ) {
2243  $this->closeAll();
2244 
2245  $this->setLocalDomain( DatabaseDomain::newFromId( $domain ) );
2246  }
2247 
2251  private function setLocalDomain( DatabaseDomain $domain ) {
2252  $this->localDomain = $domain;
2253  // In case a caller assumes that the domain ID is simply <db>-<prefix>, which is almost
2254  // always true, gracefully handle the case when they fail to account for escaping.
2255  if ( $this->localDomain->getTablePrefix() != '' ) {
2256  $this->localDomainIdAlias =
2257  $this->localDomain->getDatabase() . '-' . $this->localDomain->getTablePrefix();
2258  } else {
2259  $this->localDomainIdAlias = $this->localDomain->getDatabase();
2260  }
2261  }
2262 
2269  private function getServerInfoStrict( $i, $field = null ) {
2270  if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
2271  throw new InvalidArgumentException( "No server with index '$i'" );
2272  }
2273 
2274  if ( $field !== null ) {
2275  if ( !array_key_exists( $field, $this->servers[$i] ) ) {
2276  throw new InvalidArgumentException( "No field '$field' in server index '$i'" );
2277  }
2278 
2279  return $this->servers[$i][$field];
2280  }
2281 
2282  return $this->servers[$i];
2283  }
2284 
2285  function __destruct() {
2286  // Avoid connection leaks for sanity
2287  $this->disable();
2288  }
2289 }
2290 
2294 class_alias( LoadBalancer::class, 'LoadBalancer' );
Helper class that detects high-contention DB queries via profiling calls.
getLocalConnection( $i, $flags=0)
Open a connection to a local DB, or return one if it is already open.
flushReplicaSnapshots( $fname=__METHOD__)
Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshots.
setExistingReaderIndex( $group, $index)
Set the server index chosen by the load balancer for use with the given query group.
Database $errorConnection
Connection handle that caused a problem.
runMasterTransactionListenerCallbacks( $fname=__METHOD__, $owner=null)
Run all recurring post-COMMIT/ROLLBACK listener callbacks.
callable $deprecationLogger
Deprecation logger.
getWriterIndex()
Get the server index of the master server.
close()
Close the database connection.
pendingWriteCallers()
Get the list of method names that did write queries for this transaction.
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition: deferred.txt:11
enforceConnectionFlags(IDatabase $conn, $flags)
masterRunningReadOnly( $domain, IDatabase $conn=null)
array [] $groupLoads
Map of (group => server index => weight)
getConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a live database handle reference for a real or virtual (DB_MASTER/DB_REPLICA) server index...
getLocalDomainID()
Get the local (and default) database domain ID of connection handles.
closeConnection(IDatabase $conn)
Close a connection.
setTransactionListener( $name, callable $callback=null)
Set a callback via IDatabase::setTransactionListener() on all current and future master connections o...
string bool $readOnlyReason
Reason this instance is read-only or false if not.
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
Definition: Database.php:782
pickAnyOpenConnection( $candidateConns, $autocommit)
getConnectionIndex( $i, array $groups, $domain)
Get the server index to use for a specified server index and query group list.
$success
forEachOpenReplicaConnection( $callback, array $params=[])
Call a function with each open replica DB connection object.
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:187
allowLagged( $mode=null)
Disables/enables lag checks.
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
setLBInfo( $nameOrArray, $value=null)
Set the entire array or a particular key of the managing load balancer info array.
Definition: Database.php:593
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2147
getReaderIndex( $group=false, $domain=false)
Get the server index of the reader connection for a given group.
getAnyOpenConnection( $i, $flags=0)
Get any open connection to a given server index, local or foreign.
applyTransactionRoundFlags(Database $conn)
Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round.
runMasterTransactionIdleCallbacks( $fname=__METHOD__, $owner=null)
Consume and run all pending post-COMMIT/ROLLBACK callbacks and commit dangling transactions.
string null $defaultGroup
Default query group to use with getConnection()
setLBInfo( $nameOrArray, $value=null)
Set the entire array or a particular key of the managing load balancer info array.
TransactionProfiler $trxProfiler
getServer()
Get the server hostname or IP address.
Definition: Database.php:2392
safeGetLag(IDatabase $conn)
Get the lag in seconds for a given connection, or zero if this load balancer does not have replicatio...
resolveGroups( $groups, $i)
Resolve $groups into a list of query groups defining as having database servers.
lazyLoadReplicationPositions()
Make sure that any "waitForPos" positions are loaded and available to doWait()
setTrxEndCallbackSuppression( $suppress)
Whether to disable running of post-COMMIT/ROLLBACK callbacks.
Definition: Database.php:3558
rollbackMasterChanges( $fname=__METHOD__, $owner=null)
Issue ROLLBACK only on master, only if queries were done on connection.
getReadOnlyReason( $domain=false, IDatabase $conn=null)
commit( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Commits a transaction previously started using begin().
hasReached(DBMasterPos $pos)
__construct(array $params)
Construct a manager of IDatabase connection objects.
getServerType( $i)
Get DB type of the server with the specified index.
bool $connectionAttempted
Whether any connection has been attempted yet.
trxLevel()
Gets the current transaction level.
Definition: Database.php:527
lastMasterChangeTimestamp()
Get the timestamp of the latest write query done by this thread.
getLag()
Get the amount of replication lag for this database server.
setLocalDomain(DatabaseDomain $domain)
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
flushSnapshot( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Commit any transaction but error out if writes or callbacks are pending.
Helper class used for automatically marking an IDatabase connection as reusable (once it no longer ma...
Definition: DBConnRef.php:29
bool $allowLagged
Whether to disregard replica DB lag as a factor in replica DB selection.
trxLevel()
Gets the current transaction level.
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
Definition: Database.php:581
hasStreamingReplicaServers()
Whether any replica servers use streaming replication from the master server.
getDomainID()
Return the currently selected domain ID.
see documentation in includes Linker php for Linker::makeImageLink & $time
Definition: hooks.txt:1781
sanitizeConnectionFlags( $flags, $i)
const DB_MASTER
Definition: defines.php:26
waitForMasterPos(IDatabase $conn, $pos=false, $timeout=null)
Wait for a replica DB to reach a specified master position.
array $loadMonitorConfig
The LoadMonitor configuration.
flushMasterSnapshots( $fname=__METHOD__)
Commit all master DB transactions so as to flush any REPEATABLE-READ or SSI snapshots.
int $maxLag
Amount of replication lag, in seconds, that is considered "high".
The index of the header message $result[1]=The index of the body text message $result[2 through n]=Parameters passed to body text message. Please note the header message cannot receive/use parameters. 'ImgAuthModifyHeaders':Executed just before a file is streamed to a user via img_auth.php, allowing headers to be modified beforehand. $title:LinkTarget object & $headers:HTTP headers(name=> value, names are case insensitive). Two headers get special handling:If-Modified-Since(value must be a valid HTTP date) and Range(must be of the form "bytes=(\*-\*)") will be honored when streaming the file. 'ImportHandleLogItemXMLTag':When parsing a XML tag in a log item. Return false to stop further processing of the tag $reader:XMLReader object $logInfo:Array of information 'ImportHandlePageXMLTag':When parsing a XML tag in a page. Return false to stop further processing of the tag $reader:XMLReader object & $pageInfo:Array of information 'ImportHandleRevisionXMLTag':When parsing a XML tag in a page revision. Return false to stop further processing of the tag $reader:XMLReader object $pageInfo:Array of page information $revisionInfo:Array of revision information 'ImportHandleToplevelXMLTag':When parsing a top level XML tag. Return false to stop further processing of the tag $reader:XMLReader object 'ImportHandleUnknownUser':When a user doesn 't exist locally, this hook is called to give extensions an opportunity to auto-create it. If the auto-creation is successful, return false. $name:User name 'ImportHandleUploadXMLTag':When parsing a XML tag in a file upload. Return false to stop further processing of the tag $reader:XMLReader object $revisionInfo:Array of information 'ImportLogInterwikiLink':Hook to change the interwiki link used in log entries and edit summaries for transwiki imports. & $fullInterwikiPrefix:Interwiki prefix, may contain colons. & $pageTitle:String that contains page title. 'ImportSources':Called when reading from the $wgImportSources configuration variable. Can be used to lazy-load the import sources list. & $importSources:The value of $wgImportSources. Modify as necessary. See the comment in DefaultSettings.php for the detail of how to structure this array. 'InfoAction':When building information to display on the action=info page. $context:IContextSource object & $pageInfo:Array of information 'InitializeArticleMaybeRedirect':MediaWiki check to see if title is a redirect. & $title:Title object for the current page & $request:WebRequest & $ignoreRedirect:boolean to skip redirect check & $target:Title/string of redirect target & $article:Article object 'InternalParseBeforeLinks':during Parser 's internalParse method before links but after nowiki/noinclude/includeonly/onlyinclude and other processings. & $parser:Parser object & $text:string containing partially parsed text & $stripState:Parser 's internal StripState object 'InternalParseBeforeSanitize':during Parser 's internalParse method just before the parser removes unwanted/dangerous HTML tags and after nowiki/noinclude/includeonly/onlyinclude and other processings. Ideal for syntax-extensions after template/parser function execution which respect nowiki and HTML-comments. & $parser:Parser object & $text:string containing partially parsed text & $stripState:Parser 's internal StripState object 'InterwikiLoadPrefix':When resolving if a given prefix is an interwiki or not. Return true without providing an interwiki to continue interwiki search. $prefix:interwiki prefix we are looking for. & $iwData:output array describing the interwiki with keys iw_url, iw_local, iw_trans and optionally iw_api and iw_wikiid. 'InvalidateEmailComplete':Called after a user 's email has been invalidated successfully. $user:user(object) whose email is being invalidated 'IRCLineURL':When constructing the URL to use in an IRC notification. Callee may modify $url and $query, URL will be constructed as $url . $query & $url:URL to index.php & $query:Query string $rc:RecentChange object that triggered url generation 'IsFileCacheable':Override the result of Article::isFileCacheable()(if true) & $article:article(object) being checked 'IsTrustedProxy':Override the result of IP::isTrustedProxy() & $ip:IP being check & $result:Change this value to override the result of IP::isTrustedProxy() 'IsUploadAllowedFromUrl':Override the result of UploadFromUrl::isAllowedUrl() $url:URL used to upload from & $allowed:Boolean indicating if uploading is allowed for given URL 'isValidEmailAddr':Override the result of Sanitizer::validateEmail(), for instance to return false if the domain name doesn 't match your organization. $addr:The e-mail address entered by the user & $result:Set this and return false to override the internal checks 'isValidPassword':Override the result of User::isValidPassword() $password:The password entered by the user & $result:Set this and return false to override the internal checks $user:User the password is being validated for 'Language::getMessagesFileName':$code:The language code or the language we 're looking for a messages file for & $file:The messages file path, you can override this to change the location. 'LanguageGetNamespaces':Provide custom ordering for namespaces or remove namespaces. Do not use this hook to add namespaces. Use CanonicalNamespaces for that. & $namespaces:Array of namespaces indexed by their numbers 'LanguageGetTranslatedLanguageNames':Provide translated language names. & $names:array of language code=> language name $code:language of the preferred translations 'LanguageLinks':Manipulate a page 's language links. This is called in various places to allow extensions to define the effective language links for a page. $title:The page 's Title. & $links:Array with elements of the form "language:title" in the order that they will be output. & $linkFlags:Associative array mapping prefixed links to arrays of flags. Currently unused, but planned to provide support for marking individual language links in the UI, e.g. for featured articles. 'LanguageSelector':Hook to change the language selector available on a page. $out:The output page. $cssClassName:CSS class name of the language selector. 'LinkBegin':DEPRECATED since 1.28! Use HtmlPageLinkRendererBegin instead. Used when generating internal and interwiki links in Linker::link(), before processing starts. Return false to skip default processing and return $ret. See documentation for Linker::link() for details on the expected meanings of parameters. $skin:the Skin object $target:the Title that the link is pointing to & $html:the contents that the< a > tag should have(raw HTML) $result
Definition: hooks.txt:1970
pendingWriteQueryDuration( $type=self::ESTIMATE_TOTAL)
Get the time spend running write queries for this transaction.
string [] $indexAliases
Map of (index alias => index)
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...
commitMasterChanges( $fname=__METHOD__, $owner=null)
Issue COMMIT on all open master connections to flush changes and view snapshots.
getDBname()
Get the current DB name.
Definition: Database.php:2388
getRandomNonLagged(array $loads, $domain=false, $maxLag=INF)
rollback( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Rollback a transaction previously started using begin().
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
setLocalDomainPrefix( $prefix)
Set a new table prefix for the existing local domain ID for testing.
assertNoOpenTransactions()
Assert that all explicit transactions or atomic sections have been closed.
lastDoneWrites()
Returns the last time the connection may have been used for write queries.
getServerCount()
Get the number of servers defined in configuration.
getExistingReaderIndex( $group)
Get the server index chosen by the load balancer for use with the given query group.
disable()
Disable this load balancer.
string $agent
Agent name for query profiling.
getLazyConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a database handle reference for a real or virtual (DB_MASTER/DB_REPLICA) server index...
An object representing a master or replica DB position in a replicated setup.
Definition: DBMasterPos.php:12
forEachOpenConnection( $callback, array $params=[])
Call a function with each open connection object.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
haveIndex( $i)
Returns true if the specified index is a valid server index.
string bool $trxRoundId
Explicit DBO_TRX transaction round active or false if none.
waitFor( $pos)
Set the master position to reach before the next generic group DB handle query.
setTableAliases(array $aliases)
Make certain table names use their own database, schema, and table prefix when passed into SQL querie...
bool DBMasterPos $waitForPos
Replication sync position or false if not set.
laggedReplicaUsed()
Checks whether the database for generic connections this request was both:
mixed $profiler
Class name or object With profileIn/profileOut methods.
getServerName( $i)
Get the host name or IP address of the server with the specified index.
string $trxRoundStage
Stage of the current transaction round in the transaction round life-cycle.
runOnTransactionIdleCallbacks( $trigger)
Actually consume and run any "on transaction idle/resolution" callbacks.
Definition: Database.php:3572
commitAll( $fname=__METHOD__, $owner=null)
Commit transactions on all open connections.
$cache
Definition: mcc.php:33
$params
null means default in associative array with keys and values unescaped Should be merged with default with a value of false meaning to suppress the attribute in associative array with keys and values unescaped & $options
Definition: hooks.txt:1972
undoTransactionRoundFlags(Database $conn)
runTransactionListenerCallbacks( $trigger)
Actually run any "transaction listener" callbacks.
Definition: Database.php:3716
this hook is for auditing only or null if authentication failed before getting that far or null if we can t even determine that When $user is not null
Definition: hooks.txt:767
getMaxLag( $domain=false)
Get the hostname and lag time of the most-lagged replica server.
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
Definition: Database.php:739
hasMasterChanges()
Whether there are pending changes or callbacks in a transaction by this thread.
restoreFlags( $state=self::RESTORE_PRIOR)
Restore the flags to their prior state before the last setFlag/clearFlag call.
Definition: Database.php:769
waitForOne( $pos, $timeout=null)
Set the master wait position and wait for a generic replica DB to catch up to it. ...
hasReplicaServers()
Whether there are any replica servers configured.
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
setTransactionListener( $name, callable $callback=null)
Run a callback after each time any transaction commits or rolls back.
if(defined( 'MW_SETUP_CALLBACK')) $fname
Customization point after all loading (constants, functions, classes, DefaultSettings, LocalSettings).
Definition: Setup.php:131
runOnTransactionPreCommitCallbacks()
Actually consume and run any "on transaction pre-commit" callbacks.
Definition: Database.php:3641
getLaggedReplicaMode( $domain=false)
const DBO_TRX
Definition: defines.php:12
assertOwnership( $fname, $owner)
getForeignConnection( $i, $domain, $flags=0)
Open a connection to a foreign DB, or return one if it is already open.
int $waitTimeout
Seconds to spend waiting on replica DB lag to resolve.
beginMasterChanges( $fname=__METHOD__, $owner=null)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
getLoadMonitor()
Get a LoadMonitor instance.
masterPosWait(DBMasterPos $pos, $timeout)
Wait for the replica DB to catch up to a given master position.
approveMasterChanges(array $options, $fname=__METHOD__, $owner=null)
Perform all pre-commit checks for things like replication safety.
reallyOpenConnection(array $server, DatabaseDomain $domain)
Open a new network connection to a server (uncached)
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
const DBO_DEFAULT
Definition: defines.php:13
getMasterPos()
Get the current master replication position.
Database cluster connection, tracking, load balancing, and transaction manager interface.
Class to handle database/prefix specification for IDatabase domains.
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
pickReaderIndex(array $loads, $domain=false)
LoggerInterface $queryLogger
setIndexAliases(array $aliases)
Convert certain index names to alternative names before querying the DB.
waitForAll( $pos, $timeout=null)
Set the master wait position and wait for ALL replica DBs to catch up to it.
int [] $readIndexByGroup
The group replica server indexes keyed by group.
you have access to all of the normal MediaWiki so you can get a DB use the etc For full docs on the Maintenance class
Definition: maintenance.txt:52
Relational database abstraction object.
Definition: Database.php:49
static attributesFromType( $dbType, $driver=null)
Definition: Database.php:429
getConnection( $i, $groups=[], $domain=false, $flags=0)
Get a live handle for a real or virtual (DB_MASTER/DB_REPLICA) server index.
callable null $chronologyCallback
Callback to run before the first connection attempt.
pendingWriteAndCallbackCallers()
List the methods that have write queries or callbacks for the current transaction.
Definition: Database.php:710
commit( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Commits a transaction previously started using begin().
Definition: Database.php:4027
getLagTimes( $domain=false)
Get an estimate of replication lag (in seconds) for each server.
bool $laggedReplicaMode
Whether the generic reader fell back to a lagged replica DB.
array [] $trxRecurringCallbacks
Map of (name => callable)
array [] $servers
Map of (server index => server config array)
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction you ll probably need to make sure the header is varied on and they can depend only on the ResourceLoaderContext $context
Definition: hooks.txt:2621
getServer()
Get the server hostname or IP address.
storage can be distributed across multiple servers
Definition: memcached.txt:33
Exception class for attempted DB access.
safeWaitForMasterPos(IDatabase $conn, $pos=false, $timeout=null)
Wait for a replica DB to reach a specified master position.
Database [][][] $conns
Map of (connection category => server index => IDatabase[])
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
getMaintenanceConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a live database handle for a real or virtual (DB_MASTER/DB_REPLICA) server index that can be used...
bool $cliMode
Whether this PHP instance is for a CLI script.
pendingMasterChangeCallers()
Get the list of callers that have pending master changes.
string $hostname
Current server name.
Exception class for replica DB wait errors.
static pickRandom( $weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
Definition: ArrayUtils.php:66
DatabaseDomain $localDomain
Local DB domain ID and default for selectDB() calls.
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:271
getServerInfoStrict( $i, $field=null)
string $localDomainIdAlias
Alternate local DB domain instead of DatabaseDomain::getId()
isNonZeroLoad( $i)
Returns true if the specified index is valid and has non-zero load.
const DB_REPLICA
Definition: defines.php:25
Database connection, tracking, load balancing, and transaction manager for a cluster.
int null $ownerId
Integer ID of the managing LBFactory instance or null if none.
closeAll()
Close all open connections.
redefineLocalDomain( $domain)
Close all connection and redefine the local domain for testing or schema creation.
openConnection( $i, $domain=false, $flags=0)
getServerInfo( $i)
Return the server info structure for a given index or false if the index is invalid.
callable $errorLogger
Exception logger.
doWait( $index, $open=false, $timeout=null)
Wait for a given replica DB to catch up to the master pos stored in "waitForPos". ...
getServerConnection( $i, $domain, $flags=0)
static factory( $type, $params=[], $connect=self::NEW_CONNECTED)
Construct a Database subclass instance given a database type and parameters.
Definition: Database.php:368
tablePrefix( $prefix=null)
Get/set the table prefix.
reuseConnection(IDatabase $conn)
Mark a live handle as being available for reuse under a different database domain.
getReplicaResumePos()
Get the highest DB replication position for chronology control purposes.
clearFlag( $flag, $remember=self::REMEMBER_NOTHING)
Clear a flag for this connection.
string $lastError
The last DB selection or connection error.
Database error base class.
Definition: DBError.php:30
isOpen( $index)
Test if the specified index represents an open connection.
ping(&$rtt=null)
Ping the server and try to reconnect if it there is no connection.
int $connectionCounter
Total number of new connections ever made with this instance.
finalizeMasterChanges( $fname=__METHOD__, $owner=null)
Run pre-commit callbacks and defer execution of post-commit callbacks.