MediaWiki  master
LoadBalancer.php
Go to the documentation of this file.
1 <?php
22 namespace Wikimedia\Rdbms;
23 
35 
41 class LoadBalancer implements ILoadBalancer {
43  private $loadMonitor;
47  private $srvCache;
49  private $wanCache;
51  private $profiler;
53  private $trxProfiler;
55  private $replLogger;
57  private $connLogger;
59  private $queryLogger;
61  private $perfLogger;
63  private $errorLogger;
66 
68  private $localDomain;
69 
71  private $conns;
72 
74  private $servers;
76  private $genericLoads;
78  private $groupLoads;
80  private $allowLagged;
82  private $waitTimeout;
88  private $maxLag;
89 
91  private $hostname;
93  private $cliMode;
95  private $agent;
96 
98  private $tableAliases = [];
100  private $indexAliases = [];
103 
107  private $genericReadIndex = -1;
109  private $readIndexByGroup = [];
111  private $waitForPos;
113  private $laggedReplicaMode = false;
115  private $allReplicasDownMode = false;
117  private $lastError = 'Unknown error';
119  private $readOnlyReason = false;
121  private $connsOpened = 0;
123  private $disabled = false;
125  private $connectionAttempted = false;
126 
128  private $ownerId;
130  private $trxRoundId = false;
132  private $trxRoundStage = self::ROUND_CURSORY;
133 
135  private $defaultGroup = null;
136 
138  const CONN_HELD_WARN_THRESHOLD = 10;
139 
141  const MAX_LAG_DEFAULT = 6;
143  const MAX_WAIT_DEFAULT = 10;
145  const TTL_CACHE_READONLY = 5;
146 
147  const KEY_LOCAL = 'local';
148  const KEY_FOREIGN_FREE = 'foreignFree';
149  const KEY_FOREIGN_INUSE = 'foreignInUse';
150 
151  const KEY_LOCAL_NOROUND = 'localAutoCommit';
152  const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
153  const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
154 
156  const ROUND_CURSORY = 'cursory';
158  const ROUND_FINALIZED = 'finalized';
160  const ROUND_APPROVED = 'approved';
162  const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
164  const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
166  const ROUND_ERROR = 'error';
167 
168  public function __construct( array $params ) {
169  if ( !isset( $params['servers'] ) ) {
170  throw new InvalidArgumentException( __CLASS__ . ': missing "servers" parameter' );
171  }
172  $this->servers = $params['servers'];
173  foreach ( $this->servers as $i => $server ) {
174  if ( $i == 0 ) {
175  $this->servers[$i]['master'] = true;
176  } else {
177  $this->servers[$i]['replica'] = true;
178  }
179  }
180 
181  $localDomain = isset( $params['localDomain'] )
182  ? DatabaseDomain::newFromId( $params['localDomain'] )
184  $this->setLocalDomain( $localDomain );
185 
186  $this->waitTimeout = $params['waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
187 
188  $this->conns = [
189  // Connection were transaction rounds may be applied
190  self::KEY_LOCAL => [],
191  self::KEY_FOREIGN_INUSE => [],
192  self::KEY_FOREIGN_FREE => [],
193  // Auto-committing counterpart connections that ignore transaction rounds
194  self::KEY_LOCAL_NOROUND => [],
195  self::KEY_FOREIGN_INUSE_NOROUND => [],
196  self::KEY_FOREIGN_FREE_NOROUND => []
197  ];
198  $this->genericLoads = [];
199  $this->waitForPos = false;
200  $this->allowLagged = false;
201 
202  if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
203  $this->readOnlyReason = $params['readOnlyReason'];
204  }
205 
206  $this->maxLag = $params['maxLag'] ?? self::MAX_LAG_DEFAULT;
207 
208  $this->loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => 'LoadMonitorNull' ];
209  $this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ];
210 
211  foreach ( $params['servers'] as $i => $server ) {
212  $this->genericLoads[$i] = $server['load'];
213  if ( isset( $server['groupLoads'] ) ) {
214  foreach ( $server['groupLoads'] as $group => $ratio ) {
215  if ( !isset( $this->groupLoads[$group] ) ) {
216  $this->groupLoads[$group] = [];
217  }
218  $this->groupLoads[$group][$i] = $ratio;
219  }
220  }
221  }
222 
223  $this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff();
224  $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
225  $this->profiler = $params['profiler'] ?? null;
226  $this->trxProfiler = $params['trxProfiler'] ?? new TransactionProfiler();
227 
228  $this->errorLogger = $params['errorLogger'] ?? function ( Exception $e ) {
229  trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
230  };
231  $this->deprecationLogger = $params['deprecationLogger'] ?? function ( $msg ) {
232  trigger_error( $msg, E_USER_DEPRECATED );
233  };
234 
235  foreach ( [ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ] as $key ) {
236  $this->$key = $params[$key] ?? new NullLogger();
237  }
238 
239  $this->hostname = $params['hostname'] ?? ( gethostname() ?: 'unknown' );
240  $this->cliMode = $params['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
241  $this->agent = $params['agent'] ?? '';
242 
243  if ( isset( $params['chronologyCallback'] ) ) {
244  $this->chronologyCallback = $params['chronologyCallback'];
245  }
246 
247  if ( isset( $params['roundStage'] ) ) {
248  if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
249  $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
250  } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
251  $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
252  }
253  }
254 
255  $this->defaultGroup = $params['defaultGroup'] ?? null;
256  $this->ownerId = $params['ownerId'] ?? null;
257  }
258 
259  public function getLocalDomainID() {
260  return $this->localDomain->getId();
261  }
262 
263  public function resolveDomainID( $domain ) {
264  if ( $domain === $this->localDomainIdAlias || $domain === false ) {
265  // Local connection requested via some backwards-compatibility domain alias
266  return $this->getLocalDomainID();
267  }
268 
269  return (string)$domain;
270  }
271 
276  private function sanitizeConnectionFlags( $flags ) {
277  if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) {
278  // Assuming all servers are of the same type (or similar), which is overwhelmingly
279  // the case, use the master server information to get the attributes. The information
280  // for $i cannot be used since it might be DB_REPLICA, which might require connection
281  // attempts in order to be resolved into a real server index.
282  $attributes = $this->getServerAttributes( $this->getWriterIndex() );
283  if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
284  // Callers sometimes want to (a) escape REPEATABLE-READ stateness without locking
285  // rows (e.g. FOR UPDATE) or (b) make small commits during a larger transactions
286  // to reduce lock contention. None of these apply for sqlite and using separate
287  // connections just causes self-deadlocks.
288  $flags &= ~self::CONN_TRX_AUTOCOMMIT;
289  $this->connLogger->info( __METHOD__ .
290  ': ignoring CONN_TRX_AUTOCOMMIT to avoid deadlocks.' );
291  }
292  }
293 
294  return $flags;
295  }
296 
302  private function enforceConnectionFlags( IDatabase $conn, $flags ) {
303  if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
304  if ( $conn->trxLevel() ) { // sanity
305  throw new DBUnexpectedError(
306  $conn,
307  'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
308  );
309  }
310 
311  $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
312  }
313  }
314 
320  private function getLoadMonitor() {
321  if ( !isset( $this->loadMonitor ) ) {
322  $compat = [
323  'LoadMonitor' => LoadMonitor::class,
324  'LoadMonitorNull' => LoadMonitorNull::class,
325  'LoadMonitorMySQL' => LoadMonitorMySQL::class,
326  ];
327 
328  $class = $this->loadMonitorConfig['class'];
329  if ( isset( $compat[$class] ) ) {
330  $class = $compat[$class];
331  }
332 
333  $this->loadMonitor = new $class(
334  $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
335  $this->loadMonitor->setLogger( $this->replLogger );
336  }
337 
338  return $this->loadMonitor;
339  }
340 
347  private function getRandomNonLagged( array $loads, $domain = false, $maxLag = INF ) {
348  $lags = $this->getLagTimes( $domain );
349 
350  # Unset excessively lagged servers
351  foreach ( $lags as $i => $lag ) {
352  if ( $i != 0 ) {
353  # How much lag this server nominally is allowed to have
354  $maxServerLag = $this->servers[$i]['max lag'] ?? $this->maxLag; // default
355  # Constrain that futher by $maxLag argument
356  $maxServerLag = min( $maxServerLag, $maxLag );
357 
358  $host = $this->getServerName( $i );
359  if ( $lag === false && !is_infinite( $maxServerLag ) ) {
360  $this->replLogger->debug(
361  __METHOD__ .
362  ": server {host} is not replicating?", [ 'host' => $host ] );
363  unset( $loads[$i] );
364  } elseif ( $lag > $maxServerLag ) {
365  $this->replLogger->debug(
366  __METHOD__ .
367  ": server {host} has {lag} seconds of lag (>= {maxlag})",
368  [ 'host' => $host, 'lag' => $lag, 'maxlag' => $maxServerLag ]
369  );
370  unset( $loads[$i] );
371  }
372  }
373  }
374 
375  # Find out if all the replica DBs with non-zero load are lagged
376  $sum = 0;
377  foreach ( $loads as $load ) {
378  $sum += $load;
379  }
380  if ( $sum == 0 ) {
381  # No appropriate DB servers except maybe the master and some replica DBs with zero load
382  # Do NOT use the master
383  # Instead, this function will return false, triggering read-only mode,
384  # and a lagged replica DB will be used instead.
385  return false;
386  }
387 
388  if ( count( $loads ) == 0 ) {
389  return false;
390  }
391 
392  # Return a random representative of the remainder
393  return ArrayUtils::pickRandom( $loads );
394  }
395 
402  private function getConnectionIndex( $i, $groups, $domain ) {
403  // Check one "group" per default: the generic pool
404  $defaultGroups = $this->defaultGroup ? [ $this->defaultGroup ] : [ false ];
405 
406  $groups = ( $groups === false || $groups === [] )
407  ? $defaultGroups
408  : (array)$groups;
409 
410  if ( $i === self::DB_MASTER ) {
411  $i = $this->getWriterIndex();
412  } elseif ( $i === self::DB_REPLICA ) {
413  # Try to find an available server in any the query groups (in order)
414  foreach ( $groups as $group ) {
415  $groupIndex = $this->getReaderIndex( $group, $domain );
416  if ( $groupIndex !== false ) {
417  $i = $groupIndex;
418  break;
419  }
420  }
421  }
422 
423  # Operation-based index
424  if ( $i === self::DB_REPLICA ) {
425  $this->lastError = 'Unknown error'; // reset error string
426  # Try the general server pool if $groups are unavailable.
427  $i = ( $groups === [ false ] )
428  ? false // don't bother with this if that is what was tried above
429  : $this->getReaderIndex( false, $domain );
430  # Couldn't find a working server in getReaderIndex()?
431  if ( $i === false ) {
432  $this->lastError = 'No working replica DB server: ' . $this->lastError;
433  // Throw an exception
434  $this->reportConnectionError();
435  return null; // unreachable due to exception
436  }
437  }
438 
439  return $i;
440  }
441 
442  public function getReaderIndex( $group = false, $domain = false ) {
443  if ( count( $this->servers ) == 1 ) {
444  // Skip the load balancing if there's only one server
445  return $this->getWriterIndex();
446  }
447 
448  $index = $this->getExistingReaderIndex( $group );
449  if ( $index >= 0 ) {
450  // A reader index was already selected and "waitForPos" was handled
451  return $index;
452  }
453 
454  if ( $group !== false ) {
455  // Use the server weight array for this load group
456  if ( isset( $this->groupLoads[$group] ) ) {
457  $loads = $this->groupLoads[$group];
458  } else {
459  // No loads for this group, return false and the caller can use some other group
460  $this->connLogger->info( __METHOD__ . ": no loads for group $group" );
461 
462  return false;
463  }
464  } else {
465  // Use the generic load group
466  $loads = $this->genericLoads;
467  }
468 
469  // Scale the configured load ratios according to each server's load and state
470  $this->getLoadMonitor()->scaleLoads( $loads, $domain );
471 
472  // Pick a server to use, accounting for weights, load, lag, and "waitForPos"
473  $this->lazyLoadReplicationPositions(); // optimizes server candidate selection
474  list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
475  if ( $i === false ) {
476  // DB connection unsuccessful
477  return false;
478  }
479 
480  // If data seen by queries is expected to reflect the transactions committed as of
481  // or after a given replication position then wait for the DB to apply those changes
482  if ( $this->waitForPos && $i != $this->getWriterIndex() && !$this->doWait( $i ) ) {
483  // Data will be outdated compared to what was expected
484  $laggedReplicaMode = true;
485  }
486 
487  // Cache the reader index for future DB_REPLICA handles
488  $this->setExistingReaderIndex( $group, $i );
489  // Record whether the generic reader index is in "lagged replica DB" mode
490  if ( $group === false && $laggedReplicaMode ) {
491  $this->laggedReplicaMode = true;
492  }
493 
494  $serverName = $this->getServerName( $i );
495  $this->connLogger->debug( __METHOD__ . ": using server $serverName for group '$group'" );
496 
497  return $i;
498  }
499 
506  protected function getExistingReaderIndex( $group ) {
507  if ( $group === false ) {
508  $index = $this->genericReadIndex;
509  } else {
510  $index = $this->readIndexByGroup[$group] ?? -1;
511  }
512 
513  return $index;
514  }
515 
522  private function setExistingReaderIndex( $group, $index ) {
523  if ( $index < 0 ) {
524  throw new UnexpectedValueException( "Cannot set a negative read server index" );
525  }
526 
527  if ( $group === false ) {
528  $this->genericReadIndex = $index;
529  } else {
530  $this->readIndexByGroup[$group] = $index;
531  }
532  }
533 
539  private function pickReaderIndex( array $loads, $domain = false ) {
540  if ( $loads === [] ) {
541  throw new InvalidArgumentException( "Server configuration array is empty" );
542  }
543 
545  $i = false;
547  $laggedReplicaMode = false;
548 
549  // Quickly look through the available servers for a server that meets criteria...
550  $currentLoads = $loads;
551  while ( count( $currentLoads ) ) {
552  if ( $this->allowLagged || $laggedReplicaMode ) {
553  $i = ArrayUtils::pickRandom( $currentLoads );
554  } else {
555  $i = false;
556  if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
557  $this->replLogger->debug( __METHOD__ . ": replication positions detected" );
558  // "chronologyCallback" sets "waitForPos" for session consistency.
559  // This triggers doWait() after connect, so it's especially good to
560  // avoid lagged servers so as to avoid excessive delay in that method.
561  $ago = microtime( true ) - $this->waitForPos->asOfTime();
562  // Aim for <= 1 second of waiting (being too picky can backfire)
563  $i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
564  }
565  if ( $i === false ) {
566  // Any server with less lag than it's 'max lag' param is preferable
567  $i = $this->getRandomNonLagged( $currentLoads, $domain );
568  }
569  if ( $i === false && count( $currentLoads ) != 0 ) {
570  // All replica DBs lagged. Switch to read-only mode
571  $this->replLogger->error(
572  __METHOD__ . ": all replica DBs lagged. Switch to read-only mode" );
573  $i = ArrayUtils::pickRandom( $currentLoads );
574  $laggedReplicaMode = true;
575  }
576  }
577 
578  if ( $i === false ) {
579  // pickRandom() returned false.
580  // This is permanent and means the configuration or the load monitor
581  // wants us to return false.
582  $this->connLogger->debug( __METHOD__ . ": pickRandom() returned false" );
583 
584  return [ false, false ];
585  }
586 
587  $serverName = $this->getServerName( $i );
588  $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
589 
590  $conn = $this->getConnection( $i, [], $domain, self::CONN_SILENCE_ERRORS );
591  if ( !$conn ) {
592  $this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" );
593  unset( $currentLoads[$i] ); // avoid this server next iteration
594  $i = false;
595  continue;
596  }
597 
598  // Decrement reference counter, we are finished with this connection.
599  // It will be incremented for the caller later.
600  if ( $domain !== false ) {
601  $this->reuseConnection( $conn );
602  }
603 
604  // Return this server
605  break;
606  }
607 
608  // If all servers were down, quit now
609  if ( $currentLoads === [] ) {
610  $this->connLogger->error( __METHOD__ . ": all servers down" );
611  }
612 
613  return [ $i, $laggedReplicaMode ];
614  }
615 
616  public function waitFor( $pos ) {
617  $oldPos = $this->waitForPos;
618  try {
619  $this->waitForPos = $pos;
620  // If a generic reader connection was already established, then wait now
621  if ( $this->genericReadIndex > 0 && !$this->doWait( $this->genericReadIndex ) ) {
622  $this->laggedReplicaMode = true;
623  }
624  // Otherwise, wait until a connection is established in getReaderIndex()
625  } finally {
626  // Restore the older position if it was higher since this is used for lag-protection
627  $this->setWaitForPositionIfHigher( $oldPos );
628  }
629  }
630 
631  public function waitForOne( $pos, $timeout = null ) {
632  $oldPos = $this->waitForPos;
633  try {
634  $this->waitForPos = $pos;
635 
637  if ( $i <= 0 ) {
638  // Pick a generic replica DB if there isn't one yet
639  $readLoads = $this->genericLoads;
640  unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
641  $readLoads = array_filter( $readLoads ); // with non-zero load
642  $i = ArrayUtils::pickRandom( $readLoads );
643  }
644 
645  if ( $i > 0 ) {
646  $ok = $this->doWait( $i, true, $timeout );
647  } else {
648  $ok = true; // no applicable loads
649  }
650  } finally {
651  # Restore the old position, as this is not used for lag-protection but for throttling
652  $this->waitForPos = $oldPos;
653  }
654 
655  return $ok;
656  }
657 
658  public function waitForAll( $pos, $timeout = null ) {
659  $timeout = $timeout ?: $this->waitTimeout;
660 
661  $oldPos = $this->waitForPos;
662  try {
663  $this->waitForPos = $pos;
664  $serverCount = count( $this->servers );
665 
666  $ok = true;
667  for ( $i = 1; $i < $serverCount; $i++ ) {
668  if ( $this->genericLoads[$i] > 0 ) {
669  $start = microtime( true );
670  $ok = $this->doWait( $i, true, $timeout ) && $ok;
671  $timeout -= intval( microtime( true ) - $start );
672  if ( $timeout <= 0 ) {
673  break; // timeout reached
674  }
675  }
676  }
677  } finally {
678  # Restore the old position, as this is not used for lag-protection but for throttling
679  $this->waitForPos = $oldPos;
680  }
681 
682  return $ok;
683  }
684 
688  private function setWaitForPositionIfHigher( $pos ) {
689  if ( !$pos ) {
690  return;
691  }
692 
693  if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
694  $this->waitForPos = $pos;
695  }
696  }
697 
698  public function getAnyOpenConnection( $i, $flags = 0 ) {
699  $i = ( $i === self::DB_MASTER ) ? $this->getWriterIndex() : $i;
700  $autocommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
701 
702  foreach ( $this->conns as $connsByServer ) {
703  if ( $i === self::DB_REPLICA ) {
704  $indexes = array_keys( $connsByServer );
705  } else {
706  $indexes = isset( $connsByServer[$i] ) ? [ $i ] : [];
707  }
708 
709  foreach ( $indexes as $index ) {
710  foreach ( $connsByServer[$index] as $conn ) {
711  if ( !$conn->isOpen() ) {
712  continue; // some sort of error occured?
713  }
714  if ( !$autocommit || $conn->getLBInfo( 'autoCommitOnly' ) ) {
715  return $conn;
716  }
717  }
718  }
719  }
720 
721  return false;
722  }
723 
731  protected function doWait( $index, $open = false, $timeout = null ) {
732  $timeout = max( 1, intval( $timeout ?: $this->waitTimeout ) );
733 
734  // Check if we already know that the DB has reached this point
735  $server = $this->getServerName( $index );
736  $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server, 'v1' );
738  $knownReachedPos = $this->srvCache->get( $key );
739  if (
740  $knownReachedPos instanceof DBMasterPos &&
741  $knownReachedPos->hasReached( $this->waitForPos )
742  ) {
743  $this->replLogger->debug(
744  __METHOD__ .
745  ': replica DB {dbserver} known to be caught up (pos >= $knownReachedPos).',
746  [ 'dbserver' => $server ]
747  );
748  return true;
749  }
750 
751  // Find a connection to wait on, creating one if needed and allowed
752  $close = false; // close the connection afterwards
753  $conn = $this->getAnyOpenConnection( $index );
754  if ( !$conn ) {
755  if ( !$open ) {
756  $this->replLogger->debug(
757  __METHOD__ . ': no connection open for {dbserver}',
758  [ 'dbserver' => $server ]
759  );
760 
761  return false;
762  }
763  // Open a temporary new connection in order to wait for replication
764  $conn = $this->getConnection( $index, [], self::DOMAIN_ANY, self::CONN_SILENCE_ERRORS );
765  if ( !$conn ) {
766  $this->replLogger->warning(
767  __METHOD__ . ': failed to connect to {dbserver}',
768  [ 'dbserver' => $server ]
769  );
770 
771  return false;
772  }
773  // Avoid connection spam in waitForAll() when connections
774  // are made just for the sake of doing this lag check.
775  $close = true;
776  }
777 
778  $this->replLogger->info(
779  __METHOD__ .
780  ': waiting for replica DB {dbserver} to catch up...',
781  [ 'dbserver' => $server ]
782  );
783 
784  $result = $conn->masterPosWait( $this->waitForPos, $timeout );
785 
786  if ( $result === null ) {
787  $this->replLogger->warning(
788  __METHOD__ . ': Errored out waiting on {host} pos {pos}',
789  [
790  'host' => $server,
791  'pos' => $this->waitForPos,
792  'trace' => ( new RuntimeException() )->getTraceAsString()
793  ]
794  );
795  $ok = false;
796  } elseif ( $result == -1 ) {
797  $this->replLogger->warning(
798  __METHOD__ . ': Timed out waiting on {host} pos {pos}',
799  [
800  'host' => $server,
801  'pos' => $this->waitForPos,
802  'trace' => ( new RuntimeException() )->getTraceAsString()
803  ]
804  );
805  $ok = false;
806  } else {
807  $this->replLogger->debug( __METHOD__ . ": done waiting" );
808  $ok = true;
809  // Remember that the DB reached this point
810  $this->srvCache->set( $key, $this->waitForPos, BagOStuff::TTL_DAY );
811  }
812 
813  if ( $close ) {
814  $this->closeConnection( $conn );
815  }
816 
817  return $ok;
818  }
819 
820  public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
821  if ( !is_int( $i ) ) {
822  throw new InvalidArgumentException( "Cannot connect without a server index" );
823  } elseif ( $groups && $i > 0 ) {
824  throw new InvalidArgumentException( "Got query groups with server index #$i" );
825  }
826 
827  $domain = $this->resolveDomainID( $domain );
828  $flags = $this->sanitizeConnectionFlags( $flags );
829  $masterOnly = ( $i == self::DB_MASTER || $i == $this->getWriterIndex() );
830 
831  // Number of connections made before getting the server index and handle
832  $priorConnectionsMade = $this->connsOpened;
833 
834  // Choose a server if $i is DB_MASTER/DB_REPLICA (might trigger new connections)
835  $serverIndex = $this->getConnectionIndex( $i, $groups, $domain );
836  // Get an open connection to that server (might trigger a new connection)
837  $conn = $this->localDomain->equals( $domain )
838  ? $this->getLocalConnection( $serverIndex, $flags )
839  : $this->getForeignConnection( $serverIndex, $domain, $flags );
840  // Throw an error or bail out if the connection attempt failed
841  if ( !( $conn instanceof IDatabase ) ) {
842  if ( ( $flags & self::CONN_SILENCE_ERRORS ) != self::CONN_SILENCE_ERRORS ) {
843  $this->reportConnectionError();
844  }
845 
846  return false;
847  }
848 
849  // Profile any new connections caused by this method
850  if ( $this->connsOpened > $priorConnectionsMade ) {
851  $host = $conn->getServer();
852  $dbname = $conn->getDBname();
853  $this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
854  }
855 
856  if ( !$conn->isOpen() ) {
857  // Connection was made but later unrecoverably lost for some reason.
858  // Do not return a handle that will just throw exceptions on use,
859  // but let the calling code (e.g. getReaderIndex) try another server.
860  $this->errorConnection = $conn;
861  return false;
862  }
863 
864  $this->enforceConnectionFlags( $conn, $flags );
865  if ( $serverIndex == $this->getWriterIndex() ) {
866  // If the load balancer is read-only, perhaps due to replication lag, then master
867  // DB handles will reflect that. Note that Database::assertIsWritableMaster() takes
868  // care of replica DB handles whereas getReadOnlyReason() would cause infinite loops.
869  $conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $domain, $conn ) );
870  }
871 
872  return $conn;
873  }
874 
875  public function reuseConnection( IDatabase $conn ) {
876  $serverIndex = $conn->getLBInfo( 'serverIndex' );
877  $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
878  if ( $serverIndex === null || $refCount === null ) {
889  return;
890  } elseif ( $conn instanceof DBConnRef ) {
891  // DBConnRef already handles calling reuseConnection() and only passes the live
892  // Database instance to this method. Any caller passing in a DBConnRef is broken.
893  $this->connLogger->error(
894  __METHOD__ . ": got DBConnRef instance.\n" .
895  ( new RuntimeException() )->getTraceAsString() );
896 
897  return;
898  }
899 
900  if ( $this->disabled ) {
901  return; // DBConnRef handle probably survived longer than the LoadBalancer
902  }
903 
904  if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
905  $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
906  $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
907  } else {
908  $connFreeKey = self::KEY_FOREIGN_FREE;
909  $connInUseKey = self::KEY_FOREIGN_INUSE;
910  }
911 
912  $domain = $conn->getDomainID();
913  if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
914  throw new InvalidArgumentException(
915  "Connection $serverIndex/$domain not found; it may have already been freed" );
916  } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
917  throw new InvalidArgumentException(
918  "Connection $serverIndex/$domain mismatched; it may have already been freed" );
919  }
920 
921  $conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
922  if ( $refCount <= 0 ) {
923  $this->conns[$connFreeKey][$serverIndex][$domain] = $conn;
924  unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
925  if ( !$this->conns[$connInUseKey][$serverIndex] ) {
926  unset( $this->conns[$connInUseKey][$serverIndex] ); // clean up
927  }
928  $this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
929  } else {
930  $this->connLogger->debug( __METHOD__ .
931  ": reference count for $serverIndex/$domain reduced to $refCount" );
932  }
933  }
934 
935  public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
936  $domain = $this->resolveDomainID( $domain );
937  $role = $this->getRoleFromIndex( $i );
938 
939  return new DBConnRef( $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
940  }
941 
942  public function getLazyConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
943  $domain = $this->resolveDomainID( $domain );
944  $role = $this->getRoleFromIndex( $i );
945 
946  return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role );
947  }
948 
949  public function getMaintenanceConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
950  $domain = $this->resolveDomainID( $domain );
951  $role = $this->getRoleFromIndex( $i );
952 
953  return new MaintainableDBConnRef(
954  $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
955  }
956 
961  private function getRoleFromIndex( $i ) {
962  return ( $i === self::DB_MASTER || $i === $this->getWriterIndex() )
965  }
966 
974  public function openConnection( $i, $domain = false, $flags = 0 ) {
975  return $this->getConnection( $i, [], $domain, $flags | self::CONN_SILENCE_ERRORS );
976  }
977 
990  private function getLocalConnection( $i, $flags = 0 ) {
991  // Connection handles required to be in auto-commit mode use a separate connection
992  // pool since the main pool is effected by implicit and explicit transaction rounds
993  $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
994 
995  $connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
996  if ( isset( $this->conns[$connKey][$i][0] ) ) {
997  $conn = $this->conns[$connKey][$i][0];
998  } else {
999  if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
1000  throw new InvalidArgumentException( "No server with index '$i'" );
1001  }
1002  // Open a new connection
1003  $server = $this->servers[$i];
1004  $server['serverIndex'] = $i;
1005  $server['autoCommitOnly'] = $autoCommit;
1006  $conn = $this->reallyOpenConnection( $server, $this->localDomain );
1007  $host = $this->getServerName( $i );
1008  if ( $conn->isOpen() ) {
1009  $this->connLogger->debug(
1010  __METHOD__ . ": connected to database $i at '$host'." );
1011  $this->conns[$connKey][$i][0] = $conn;
1012  } else {
1013  $this->connLogger->warning(
1014  __METHOD__ . ": failed to connect to database $i at '$host'." );
1015  $this->errorConnection = $conn;
1016  $conn = false;
1017  }
1018  }
1019 
1020  // Final sanity check to make sure the right domain is selected
1021  if (
1022  $conn instanceof IDatabase &&
1023  !$this->localDomain->isCompatible( $conn->getDomainID() )
1024  ) {
1025  throw new UnexpectedValueException(
1026  "Got connection to '{$conn->getDomainID()}', " .
1027  "but expected local domain ('{$this->localDomain}')" );
1028  }
1029 
1030  return $conn;
1031  }
1032 
1055  private function getForeignConnection( $i, $domain, $flags = 0 ) {
1056  $domainInstance = DatabaseDomain::newFromId( $domain );
1057  // Connection handles required to be in auto-commit mode use a separate connection
1058  // pool since the main pool is effected by implicit and explicit transaction rounds
1059  $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
1060 
1061  if ( $autoCommit ) {
1062  $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
1063  $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
1064  } else {
1065  $connFreeKey = self::KEY_FOREIGN_FREE;
1066  $connInUseKey = self::KEY_FOREIGN_INUSE;
1067  }
1068 
1070  $conn = null;
1071 
1072  if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
1073  // Reuse an in-use connection for the same domain
1074  $conn = $this->conns[$connInUseKey][$i][$domain];
1075  $this->connLogger->debug( __METHOD__ . ": reusing connection $i/$domain" );
1076  } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
1077  // Reuse a free connection for the same domain
1078  $conn = $this->conns[$connFreeKey][$i][$domain];
1079  unset( $this->conns[$connFreeKey][$i][$domain] );
1080  $this->conns[$connInUseKey][$i][$domain] = $conn;
1081  $this->connLogger->debug( __METHOD__ . ": reusing free connection $i/$domain" );
1082  } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
1083  // Reuse a free connection from another domain if possible
1084  foreach ( $this->conns[$connFreeKey][$i] as $oldDomain => $conn ) {
1085  if ( $domainInstance->getDatabase() !== null ) {
1086  // Check if changing the database will require a new connection.
1087  // In that case, leave the connection handle alone and keep looking.
1088  // This prevents connections from being closed mid-transaction and can
1089  // also avoid overhead if the same database will later be requested.
1090  if (
1091  $conn->databasesAreIndependent() &&
1092  $conn->getDBname() !== $domainInstance->getDatabase()
1093  ) {
1094  continue;
1095  }
1096  // Select the new database, schema, and prefix
1097  $conn->selectDomain( $domainInstance );
1098  } else {
1099  // Stay on the current database, but update the schema/prefix
1100  $conn->dbSchema( $domainInstance->getSchema() );
1101  $conn->tablePrefix( $domainInstance->getTablePrefix() );
1102  }
1103  unset( $this->conns[$connFreeKey][$i][$oldDomain] );
1104  // Note that if $domain is an empty string, getDomainID() might not match it
1105  $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1106  $this->connLogger->debug( __METHOD__ .
1107  ": reusing free connection from $oldDomain for $domain" );
1108  break;
1109  }
1110  }
1111 
1112  if ( !$conn ) {
1113  if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
1114  throw new InvalidArgumentException( "No server with index '$i'" );
1115  }
1116  // Open a new connection
1117  $server = $this->servers[$i];
1118  $server['serverIndex'] = $i;
1119  $server['foreignPoolRefCount'] = 0;
1120  $server['foreign'] = true;
1121  $server['autoCommitOnly'] = $autoCommit;
1122  $conn = $this->reallyOpenConnection( $server, $domainInstance );
1123  if ( !$conn->isOpen() ) {
1124  $this->connLogger->warning( __METHOD__ . ": connection error for $i/$domain" );
1125  $this->errorConnection = $conn;
1126  $conn = false;
1127  } else {
1128  // Note that if $domain is an empty string, getDomainID() might not match it
1129  $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1130  $this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
1131  }
1132  }
1133 
1134  if ( $conn instanceof IDatabase ) {
1135  // Final sanity check to make sure the right domain is selected
1136  if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
1137  throw new UnexpectedValueException(
1138  "Got connection to '{$conn->getDomainID()}', but expected '$domain'" );
1139  }
1140  // Increment reference count
1141  $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
1142  $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
1143  }
1144 
1145  return $conn;
1146  }
1147 
1148  public function getServerAttributes( $i ) {
1150  $this->getServerType( $i ),
1151  $this->servers[$i]['driver'] ?? null
1152  );
1153  }
1154 
1162  private function isOpen( $index ) {
1163  if ( !is_int( $index ) ) {
1164  return false;
1165  }
1166 
1167  return (bool)$this->getAnyOpenConnection( $index );
1168  }
1169 
1181  protected function reallyOpenConnection( array $server, DatabaseDomain $domain ) {
1182  if ( $this->disabled ) {
1183  throw new DBAccessError();
1184  }
1185 
1186  if ( $domain->getDatabase() === null ) {
1187  // The database domain does not specify a DB name and some database systems require a
1188  // valid DB specified on connection. The $server configuration array contains a default
1189  // DB name to use for connections in such cases.
1190  if ( $server['type'] === 'mysql' ) {
1191  // For MySQL, DATABASE and SCHEMA are synonyms, connections need not specify a DB,
1192  // and the DB name in $server might not exist due to legacy reasons (the default
1193  // domain used to ignore the local LB domain, even when mismatched).
1194  $server['dbname'] = null;
1195  }
1196  } else {
1197  $server['dbname'] = $domain->getDatabase();
1198  }
1199 
1200  if ( $domain->getSchema() !== null ) {
1201  $server['schema'] = $domain->getSchema();
1202  }
1203 
1204  // It is always possible to connect with any prefix, even the empty string
1205  $server['tablePrefix'] = $domain->getTablePrefix();
1206 
1207  // Let the handle know what the cluster master is (e.g. "db1052")
1208  $masterName = $this->getServerName( $this->getWriterIndex() );
1209  $server['clusterMasterHost'] = $masterName;
1210 
1211  // Log when many connection are made on requests
1212  if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
1213  $this->perfLogger->warning( __METHOD__ . ": " .
1214  "{$this->connsOpened}+ connections made (master=$masterName)" );
1215  }
1216 
1217  $server['srvCache'] = $this->srvCache;
1218  // Set loggers and profilers
1219  $server['connLogger'] = $this->connLogger;
1220  $server['queryLogger'] = $this->queryLogger;
1221  $server['errorLogger'] = $this->errorLogger;
1222  $server['deprecationLogger'] = $this->deprecationLogger;
1223  $server['profiler'] = $this->profiler;
1224  $server['trxProfiler'] = $this->trxProfiler;
1225  // Use the same agent and PHP mode for all DB handles
1226  $server['cliMode'] = $this->cliMode;
1227  $server['agent'] = $this->agent;
1228  // Use DBO_DEFAULT flags by default for LoadBalancer managed databases. Assume that the
1229  // application calls LoadBalancer::commitMasterChanges() before the PHP script completes.
1230  $server['flags'] = $server['flags'] ?? IDatabase::DBO_DEFAULT;
1231 
1232  // Create a live connection object
1233  try {
1234  $db = Database::factory( $server['type'], $server );
1235  } catch ( DBConnectionError $e ) {
1236  // FIXME: This is probably the ugliest thing I have ever done to
1237  // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
1238  $db = $e->db;
1239  }
1240 
1241  $db->setLBInfo( $server );
1242  $db->setLazyMasterHandle(
1243  $this->getLazyConnectionRef( self::DB_MASTER, [], $db->getDomainID() )
1244  );
1245  $db->setTableAliases( $this->tableAliases );
1246  $db->setIndexAliases( $this->indexAliases );
1247 
1248  if ( $server['serverIndex'] === $this->getWriterIndex() ) {
1249  if ( $this->trxRoundId !== false ) {
1250  $this->applyTransactionRoundFlags( $db );
1251  }
1252  foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
1253  $db->setTransactionListener( $name, $callback );
1254  }
1255  }
1256 
1257  $this->lazyLoadReplicationPositions(); // session consistency
1258 
1259  return $db;
1260  }
1261 
1265  private function lazyLoadReplicationPositions() {
1266  if ( !$this->connectionAttempted && $this->chronologyCallback ) {
1267  $this->connectionAttempted = true;
1268  ( $this->chronologyCallback )( $this ); // generally calls waitFor()
1269  $this->connLogger->debug( __METHOD__ . ': executed chronology callback.' );
1270  }
1271  }
1272 
1276  private function reportConnectionError() {
1277  $conn = $this->errorConnection; // the connection which caused the error
1278  $context = [
1279  'method' => __METHOD__,
1280  'last_error' => $this->lastError,
1281  ];
1282 
1283  if ( $conn instanceof IDatabase ) {
1284  $context['db_server'] = $conn->getServer();
1285  $this->connLogger->warning(
1286  __METHOD__ . ": connection error: {last_error} ({db_server})",
1287  $context
1288  );
1289 
1290  throw new DBConnectionError( $conn, "{$this->lastError} ({$context['db_server']})" );
1291  } else {
1292  // No last connection, probably due to all servers being too busy
1293  $this->connLogger->error(
1294  __METHOD__ .
1295  ": LB failure with no last connection. Connection error: {last_error}",
1296  $context
1297  );
1298 
1299  // If all servers were busy, "lastError" will contain something sensible
1300  throw new DBConnectionError( null, $this->lastError );
1301  }
1302  }
1303 
1304  public function getWriterIndex() {
1305  return 0;
1306  }
1307 
1308  public function haveIndex( $i ) {
1309  return array_key_exists( $i, $this->servers );
1310  }
1311 
1312  public function isNonZeroLoad( $i ) {
1313  return array_key_exists( $i, $this->servers ) && $this->genericLoads[$i] != 0;
1314  }
1315 
1316  public function getServerCount() {
1317  return count( $this->servers );
1318  }
1319 
1320  public function getServerName( $i ) {
1321  $name = $this->servers[$i]['hostName'] ?? $this->servers[$i]['host'] ?? '';
1322 
1323  return ( $name != '' ) ? $name : 'localhost';
1324  }
1325 
1326  public function getServerInfo( $i ) {
1327  return $this->servers[$i] ?? false;
1328  }
1329 
1330  public function getServerType( $i ) {
1331  return $this->servers[$i]['type'] ?? 'unknown';
1332  }
1333 
1334  public function getMasterPos() {
1335  # If this entire request was served from a replica DB without opening a connection to the
1336  # master (however unlikely that may be), then we can fetch the position from the replica DB.
1337  $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1338  if ( !$masterConn ) {
1339  $serverCount = count( $this->servers );
1340  for ( $i = 1; $i < $serverCount; $i++ ) {
1341  $conn = $this->getAnyOpenConnection( $i );
1342  if ( $conn ) {
1343  return $conn->getReplicaPos();
1344  }
1345  }
1346  } else {
1347  return $masterConn->getMasterPos();
1348  }
1349 
1350  return false;
1351  }
1352 
1353  public function disable() {
1354  $this->closeAll();
1355  $this->disabled = true;
1356  }
1357 
1358  public function closeAll() {
1359  $fname = __METHOD__;
1360  $this->forEachOpenConnection( function ( IDatabase $conn ) use ( $fname ) {
1361  $host = $conn->getServer();
1362  $this->connLogger->debug(
1363  $fname . ": closing connection to database '$host'." );
1364  $conn->close();
1365  } );
1366 
1367  $this->conns = [
1368  self::KEY_LOCAL => [],
1369  self::KEY_FOREIGN_INUSE => [],
1370  self::KEY_FOREIGN_FREE => [],
1371  self::KEY_LOCAL_NOROUND => [],
1372  self::KEY_FOREIGN_INUSE_NOROUND => [],
1373  self::KEY_FOREIGN_FREE_NOROUND => []
1374  ];
1375  $this->connsOpened = 0;
1376  }
1377 
1378  public function closeConnection( IDatabase $conn ) {
1379  if ( $conn instanceof DBConnRef ) {
1380  // Avoid calling close() but still leaving the handle in the pool
1381  throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' );
1382  }
1383 
1384  $serverIndex = $conn->getLBInfo( 'serverIndex' );
1385  foreach ( $this->conns as $type => $connsByServer ) {
1386  if ( !isset( $connsByServer[$serverIndex] ) ) {
1387  continue;
1388  }
1389 
1390  foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
1391  if ( $conn === $trackedConn ) {
1392  $host = $this->getServerName( $i );
1393  $this->connLogger->debug(
1394  __METHOD__ . ": closing connection to database $i at '$host'." );
1395  unset( $this->conns[$type][$serverIndex][$i] );
1397  break 2;
1398  }
1399  }
1400  }
1401 
1402  $conn->close();
1403  }
1404 
1405  public function commitAll( $fname = __METHOD__, $owner = null ) {
1406  $this->commitMasterChanges( $fname, $owner );
1407  $this->flushMasterSnapshots( $fname );
1408  $this->flushReplicaSnapshots( $fname );
1409  }
1410 
1411  public function finalizeMasterChanges( $fname = __METHOD__, $owner = null ) {
1412  $this->assertOwnership( $fname, $owner );
1413  $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
1414 
1415  $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1416  // Loop until callbacks stop adding callbacks on other connections
1417  $total = 0;
1418  do {
1419  $count = 0; // callbacks execution attempts
1420  $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$count ) {
1421  // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
1422  // Any error should cause all (peer) transactions to be rolled back together.
1423  $count += $conn->runOnTransactionPreCommitCallbacks();
1424  } );
1425  $total += $count;
1426  } while ( $count > 0 );
1427  // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
1428  $this->forEachOpenMasterConnection( function ( Database $conn ) {
1429  $conn->setTrxEndCallbackSuppression( true );
1430  } );
1431  $this->trxRoundStage = self::ROUND_FINALIZED;
1432 
1433  return $total;
1434  }
1435 
1436  public function approveMasterChanges( array $options, $fname = __METHOD__, $owner = null ) {
1437  $this->assertOwnership( $fname, $owner );
1438  $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
1439 
1440  $limit = $options['maxWriteDuration'] ?? 0;
1441 
1442  $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1443  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
1444  // If atomic sections or explicit transactions are still open, some caller must have
1445  // caught an exception but failed to properly rollback any changes. Detect that and
1446  // throw and error (causing rollback).
1447  $conn->assertNoOpenTransactions();
1448  // Assert that the time to replicate the transaction will be sane.
1449  // If this fails, then all DB transactions will be rollback back together.
1450  $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1451  if ( $limit > 0 && $time > $limit ) {
1452  throw new DBTransactionSizeError(
1453  $conn,
1454  "Transaction spent $time second(s) in writes, exceeding the limit of $limit",
1455  [ $time, $limit ]
1456  );
1457  }
1458  // If a connection sits idle while slow queries execute on another, that connection
1459  // may end up dropped before the commit round is reached. Ping servers to detect this.
1460  if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1461  throw new DBTransactionError(
1462  $conn,
1463  "A connection to the {$conn->getDBname()} database was lost before commit"
1464  );
1465  }
1466  } );
1467  $this->trxRoundStage = self::ROUND_APPROVED;
1468  }
1469 
1470  public function beginMasterChanges( $fname = __METHOD__, $owner = null ) {
1471  $this->assertOwnership( $fname, $owner );
1472  if ( $this->trxRoundId !== false ) {
1473  throw new DBTransactionError(
1474  null,
1475  "$fname: Transaction round '{$this->trxRoundId}' already started"
1476  );
1477  }
1478  $this->assertTransactionRoundStage( self::ROUND_CURSORY );
1479 
1480  // Clear any empty transactions (no writes/callbacks) from the implicit round
1481  $this->flushMasterSnapshots( $fname );
1482 
1483  $this->trxRoundId = $fname;
1484  $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1485  // Mark applicable handles as participating in this explicit transaction round.
1486  // For each of these handles, any writes and callbacks will be tied to a single
1487  // transaction. The (peer) handles will reject begin()/commit() calls unless they
1488  // are part of an en masse commit or an en masse rollback.
1489  $this->forEachOpenMasterConnection( function ( Database $conn ) {
1490  $this->applyTransactionRoundFlags( $conn );
1491  } );
1492  $this->trxRoundStage = self::ROUND_CURSORY;
1493  }
1494 
1495  public function commitMasterChanges( $fname = __METHOD__, $owner = null ) {
1496  $this->assertOwnership( $fname, $owner );
1497  $this->assertTransactionRoundStage( self::ROUND_APPROVED );
1498 
1499  $failures = [];
1500 
1502  $scope = ScopedCallback::newScopedIgnoreUserAbort(); // try to ignore client aborts
1503 
1504  $restore = ( $this->trxRoundId !== false );
1505  $this->trxRoundId = false;
1506  $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1507  // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
1508  // Note that callbacks should already be suppressed due to finalizeMasterChanges().
1510  function ( IDatabase $conn ) use ( $fname, &$failures ) {
1511  try {
1512  $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1513  } catch ( DBError $e ) {
1514  ( $this->errorLogger )( $e );
1515  $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1516  }
1517  }
1518  );
1519  if ( $failures ) {
1520  throw new DBTransactionError(
1521  null,
1522  "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1523  );
1524  }
1525  if ( $restore ) {
1526  // Unmark handles as participating in this explicit transaction round
1527  $this->forEachOpenMasterConnection( function ( Database $conn ) {
1528  $this->undoTransactionRoundFlags( $conn );
1529  } );
1530  }
1531  $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
1532  }
1533 
1534  public function runMasterTransactionIdleCallbacks( $fname = __METHOD__, $owner = null ) {
1535  $this->assertOwnership( $fname, $owner );
1536  if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1537  $type = IDatabase::TRIGGER_COMMIT;
1538  } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1539  $type = IDatabase::TRIGGER_ROLLBACK;
1540  } else {
1541  throw new DBTransactionError(
1542  null,
1543  "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1544  );
1545  }
1546 
1547  $oldStage = $this->trxRoundStage;
1548  $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1549 
1550  // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
1551  $this->forEachOpenMasterConnection( function ( Database $conn ) {
1552  $conn->setTrxEndCallbackSuppression( false );
1553  } );
1554 
1555  $e = null; // first exception
1556  $fname = __METHOD__;
1557  // Loop until callbacks stop adding callbacks on other connections
1558  do {
1559  // Run any pending callbacks for each connection...
1560  $count = 0; // callback execution attempts
1562  function ( Database $conn ) use ( $type, &$e, &$count ) {
1563  if ( $conn->trxLevel() ) {
1564  return; // retry in the next iteration, after commit() is called
1565  }
1566  try {
1567  $count += $conn->runOnTransactionIdleCallbacks( $type );
1568  } catch ( Exception $ex ) {
1569  $e = $e ?: $ex;
1570  }
1571  }
1572  );
1573  // Clear out any active transactions left over from callbacks...
1574  $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$e, $fname ) {
1575  if ( $conn->writesPending() ) {
1576  // A callback from another handle wrote to this one and DBO_TRX is set
1577  $this->queryLogger->warning( $fname . ": found writes pending." );
1578  $fnames = implode( ', ', $conn->pendingWriteAndCallbackCallers() );
1579  $this->queryLogger->warning(
1580  $fname . ": found writes pending ($fnames).",
1581  [
1582  'db_server' => $conn->getServer(),
1583  'db_name' => $conn->getDBname()
1584  ]
1585  );
1586  } elseif ( $conn->trxLevel() ) {
1587  // A callback from another handle read from this one and DBO_TRX is set,
1588  // which can easily happen if there is only one DB (no replicas)
1589  $this->queryLogger->debug( $fname . ": found empty transaction." );
1590  }
1591  try {
1592  $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1593  } catch ( Exception $ex ) {
1594  $e = $e ?: $ex;
1595  }
1596  } );
1597  } while ( $count > 0 );
1598 
1599  $this->trxRoundStage = $oldStage;
1600 
1601  return $e;
1602  }
1603 
1604  public function runMasterTransactionListenerCallbacks( $fname = __METHOD__, $owner = null ) {
1605  $this->assertOwnership( $fname, $owner );
1606  if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1607  $type = IDatabase::TRIGGER_COMMIT;
1608  } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1609  $type = IDatabase::TRIGGER_ROLLBACK;
1610  } else {
1611  throw new DBTransactionError(
1612  null,
1613  "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1614  );
1615  }
1616 
1617  $e = null;
1618 
1619  $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1620  $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
1621  try {
1623  } catch ( Exception $ex ) {
1624  $e = $e ?: $ex;
1625  }
1626  } );
1627  $this->trxRoundStage = self::ROUND_CURSORY;
1628 
1629  return $e;
1630  }
1631 
1632  public function rollbackMasterChanges( $fname = __METHOD__, $owner = null ) {
1633  $this->assertOwnership( $fname, $owner );
1634 
1635  $restore = ( $this->trxRoundId !== false );
1636  $this->trxRoundId = false;
1637  $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1638  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
1639  $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1640  } );
1641  if ( $restore ) {
1642  // Unmark handles as participating in this explicit transaction round
1643  $this->forEachOpenMasterConnection( function ( Database $conn ) {
1644  $this->undoTransactionRoundFlags( $conn );
1645  } );
1646  }
1647  $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
1648  }
1649 
1654  private function assertTransactionRoundStage( $stage ) {
1655  $stages = (array)$stage;
1656 
1657  if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
1658  $stageList = implode(
1659  '/',
1660  array_map( function ( $v ) {
1661  return "'$v'";
1662  }, $stages )
1663  );
1664  throw new DBTransactionError(
1665  null,
1666  "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
1667  );
1668  }
1669  }
1670 
1676  private function assertOwnership( $fname, $owner ) {
1677  if ( $this->ownerId !== null && $owner !== $this->ownerId ) {
1678  throw new DBTransactionError(
1679  null,
1680  "$fname: LoadBalancer is owned by LBFactory #{$this->ownerId} (got '$owner')."
1681  );
1682  }
1683  }
1684 
1694  private function applyTransactionRoundFlags( Database $conn ) {
1695  if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
1696  return; // transaction rounds do not apply to these connections
1697  }
1698 
1699  if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1700  // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1701  // Force DBO_TRX even in CLI mode since a commit round is expected soon.
1702  $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
1703  }
1704 
1705  if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1706  $conn->setLBInfo( 'trxRoundId', $this->trxRoundId );
1707  }
1708  }
1709 
1713  private function undoTransactionRoundFlags( Database $conn ) {
1714  if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
1715  return; // transaction rounds do not apply to these connections
1716  }
1717 
1718  if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1719  $conn->setLBInfo( 'trxRoundId', false );
1720  }
1721 
1722  if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1723  $conn->restoreFlags( $conn::RESTORE_PRIOR );
1724  }
1725  }
1726 
1727  public function flushReplicaSnapshots( $fname = __METHOD__ ) {
1728  $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) use ( $fname ) {
1729  $conn->flushSnapshot( $fname );
1730  } );
1731  }
1732 
1733  public function flushMasterSnapshots( $fname = __METHOD__ ) {
1734  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
1735  $conn->flushSnapshot( $fname );
1736  } );
1737  }
1738 
1743  public function getTransactionRoundStage() {
1744  return $this->trxRoundStage;
1745  }
1746 
1747  public function hasMasterConnection() {
1748  return $this->isOpen( $this->getWriterIndex() );
1749  }
1750 
1751  public function hasMasterChanges() {
1752  $pending = 0;
1753  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$pending ) {
1754  $pending |= $conn->writesOrCallbacksPending();
1755  } );
1756 
1757  return (bool)$pending;
1758  }
1759 
1760  public function lastMasterChangeTimestamp() {
1761  $lastTime = false;
1762  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$lastTime ) {
1763  $lastTime = max( $lastTime, $conn->lastDoneWrites() );
1764  } );
1765 
1766  return $lastTime;
1767  }
1768 
1769  public function hasOrMadeRecentMasterChanges( $age = null ) {
1770  $age = ( $age === null ) ? $this->waitTimeout : $age;
1771 
1772  return ( $this->hasMasterChanges()
1773  || $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
1774  }
1775 
1776  public function pendingMasterChangeCallers() {
1777  $fnames = [];
1778  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$fnames ) {
1779  $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1780  } );
1781 
1782  return $fnames;
1783  }
1784 
1785  public function getLaggedReplicaMode( $domain = false ) {
1786  if (
1787  // Avoid recursion if there is only one DB
1788  $this->getServerCount() > 1 &&
1789  // Avoid recursion if the (non-zero load) master DB was picked for generic reads
1790  $this->genericReadIndex !== $this->getWriterIndex() &&
1791  // Stay in lagged replica mode during the load balancer instance lifetime
1792  !$this->laggedReplicaMode
1793  ) {
1794  try {
1795  // Calling this method will set "laggedReplicaMode" as needed
1796  $this->getReaderIndex( false, $domain );
1797  } catch ( DBConnectionError $e ) {
1798  // Avoid expensive re-connect attempts and failures
1799  $this->allReplicasDownMode = true;
1800  $this->laggedReplicaMode = true;
1801  }
1802  }
1803 
1804  return $this->laggedReplicaMode;
1805  }
1806 
1807  public function laggedReplicaUsed() {
1808  return $this->laggedReplicaMode;
1809  }
1810 
1816  public function laggedSlaveUsed() {
1817  return $this->laggedReplicaUsed();
1818  }
1819 
1820  public function getReadOnlyReason( $domain = false, IDatabase $conn = null ) {
1821  if ( $this->readOnlyReason !== false ) {
1822  return $this->readOnlyReason;
1823  } elseif ( $this->getLaggedReplicaMode( $domain ) ) {
1824  if ( $this->allReplicasDownMode ) {
1825  return 'The database has been automatically locked ' .
1826  'until the replica database servers become available';
1827  } else {
1828  return 'The database has been automatically locked ' .
1829  'while the replica database servers catch up to the master.';
1830  }
1831  } elseif ( $this->masterRunningReadOnly( $domain, $conn ) ) {
1832  return 'The database master is running in read-only mode.';
1833  }
1834 
1835  return false;
1836  }
1837 
1843  private function masterRunningReadOnly( $domain, IDatabase $conn = null ) {
1845  $masterServer = $this->getServerName( $this->getWriterIndex() );
1846 
1847  return (bool)$cache->getWithSetCallback(
1848  $cache->makeGlobalKey( __CLASS__, 'server-read-only', $masterServer ),
1849  self::TTL_CACHE_READONLY,
1850  function () use ( $domain, $conn ) {
1851  $old = $this->trxProfiler->setSilenced( true );
1852  try {
1853  $dbw = $conn ?: $this->getConnection( self::DB_MASTER, [], $domain );
1854  $readOnly = (int)$dbw->serverIsReadOnly();
1855  if ( !$conn ) {
1856  $this->reuseConnection( $dbw );
1857  }
1858  } catch ( DBError $e ) {
1859  $readOnly = 0;
1860  }
1861  $this->trxProfiler->setSilenced( $old );
1862  return $readOnly;
1863  },
1864  [ 'pcTTL' => $cache::TTL_PROC_LONG, 'busyValue' => 0 ]
1865  );
1866  }
1867 
1868  public function allowLagged( $mode = null ) {
1869  if ( $mode === null ) {
1870  return $this->allowLagged;
1871  }
1872  $this->allowLagged = $mode;
1873 
1874  return $this->allowLagged;
1875  }
1876 
1877  public function pingAll() {
1878  $success = true;
1879  $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$success ) {
1880  if ( !$conn->ping() ) {
1881  $success = false;
1882  }
1883  } );
1884 
1885  return $success;
1886  }
1887 
1888  public function forEachOpenConnection( $callback, array $params = [] ) {
1889  foreach ( $this->conns as $connsByServer ) {
1890  foreach ( $connsByServer as $serverConns ) {
1891  foreach ( $serverConns as $conn ) {
1892  $callback( $conn, ...$params );
1893  }
1894  }
1895  }
1896  }
1897 
1898  public function forEachOpenMasterConnection( $callback, array $params = [] ) {
1899  $masterIndex = $this->getWriterIndex();
1900  foreach ( $this->conns as $connsByServer ) {
1901  if ( isset( $connsByServer[$masterIndex] ) ) {
1903  foreach ( $connsByServer[$masterIndex] as $conn ) {
1904  $callback( $conn, ...$params );
1905  }
1906  }
1907  }
1908  }
1909 
1910  public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
1911  foreach ( $this->conns as $connsByServer ) {
1912  foreach ( $connsByServer as $i => $serverConns ) {
1913  if ( $i === $this->getWriterIndex() ) {
1914  continue; // skip master
1915  }
1916  foreach ( $serverConns as $conn ) {
1917  $callback( $conn, ...$params );
1918  }
1919  }
1920  }
1921  }
1922 
1923  public function getMaxLag( $domain = false ) {
1924  $maxLag = -1;
1925  $host = '';
1926  $maxIndex = 0;
1927 
1928  if ( $this->getServerCount() <= 1 ) {
1929  return [ $host, $maxLag, $maxIndex ]; // no replication = no lag
1930  }
1931 
1932  $lagTimes = $this->getLagTimes( $domain );
1933  foreach ( $lagTimes as $i => $lag ) {
1934  if ( $this->genericLoads[$i] > 0 && $lag > $maxLag ) {
1935  $maxLag = $lag;
1936  $host = $this->servers[$i]['host'];
1937  $maxIndex = $i;
1938  }
1939  }
1940 
1941  return [ $host, $maxLag, $maxIndex ];
1942  }
1943 
1944  public function getLagTimes( $domain = false ) {
1945  if ( $this->getServerCount() <= 1 ) {
1946  return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
1947  }
1948 
1949  $knownLagTimes = []; // map of (server index => 0 seconds)
1950  $indexesWithLag = [];
1951  foreach ( $this->servers as $i => $server ) {
1952  if ( empty( $server['is static'] ) ) {
1953  $indexesWithLag[] = $i; // DB server might have replication lag
1954  } else {
1955  $knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
1956  }
1957  }
1958 
1959  return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
1960  }
1961 
1977  public function safeGetLag( IDatabase $conn ) {
1978  if ( $this->getServerCount() <= 1 ) {
1979  return 0;
1980  } else {
1981  return $conn->getLag();
1982  }
1983  }
1984 
1985  public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
1986  $timeout = max( 1, $timeout ?: $this->waitTimeout );
1987 
1988  if ( $this->getServerCount() <= 1 || !$conn->getLBInfo( 'replica' ) ) {
1989  return true; // server is not a replica DB
1990  }
1991 
1992  if ( !$pos ) {
1993  // Get the current master position, opening a connection if needed
1994  $index = $this->getWriterIndex();
1995  $masterConn = $this->getAnyOpenConnection( $index );
1996  if ( $masterConn ) {
1997  $pos = $masterConn->getMasterPos();
1998  } else {
1999  $flags = self::CONN_SILENCE_ERRORS;
2000  $masterConn = $this->getConnection( $index, [], self::DOMAIN_ANY, $flags );
2001  if ( !$masterConn ) {
2002  throw new DBReplicationWaitError(
2003  null,
2004  "Could not obtain a master database connection to get the position"
2005  );
2006  }
2007  $pos = $masterConn->getMasterPos();
2008  $this->closeConnection( $masterConn );
2009  }
2010  }
2011 
2012  if ( $pos instanceof DBMasterPos ) {
2013  $start = microtime( true );
2014  $result = $conn->masterPosWait( $pos, $timeout );
2015  $seconds = max( microtime( true ) - $start, 0 );
2016  if ( $result == -1 || is_null( $result ) ) {
2017  $msg = __METHOD__ . ': timed out waiting on {host} pos {pos} [{seconds}s]';
2018  $this->replLogger->warning( $msg, [
2019  'host' => $conn->getServer(),
2020  'pos' => $pos,
2021  'seconds' => round( $seconds, 6 ),
2022  'trace' => ( new RuntimeException() )->getTraceAsString()
2023  ] );
2024  $ok = false;
2025  } else {
2026  $this->replLogger->debug( __METHOD__ . ': done waiting' );
2027  $ok = true;
2028  }
2029  } else {
2030  $ok = false; // something is misconfigured
2031  $this->replLogger->error(
2032  __METHOD__ . ': could not get master pos for {host}',
2033  [
2034  'host' => $conn->getServer(),
2035  'trace' => ( new RuntimeException() )->getTraceAsString()
2036  ]
2037  );
2038  }
2039 
2040  return $ok;
2041  }
2042 
2043  public function setTransactionListener( $name, callable $callback = null ) {
2044  if ( $callback ) {
2045  $this->trxRecurringCallbacks[$name] = $callback;
2046  } else {
2047  unset( $this->trxRecurringCallbacks[$name] );
2048  }
2050  function ( IDatabase $conn ) use ( $name, $callback ) {
2051  $conn->setTransactionListener( $name, $callback );
2052  }
2053  );
2054  }
2055 
2056  public function setTableAliases( array $aliases ) {
2057  $this->tableAliases = $aliases;
2058  }
2059 
2060  public function setIndexAliases( array $aliases ) {
2061  $this->indexAliases = $aliases;
2062  }
2063 
2068  public function setDomainPrefix( $prefix ) {
2069  $this->setLocalDomainPrefix( $prefix );
2070  }
2071 
2072  public function setLocalDomainPrefix( $prefix ) {
2073  // Find connections to explicit foreign domains still marked as in-use...
2074  $domainsInUse = [];
2075  $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$domainsInUse ) {
2076  // Once reuseConnection() is called on a handle, its reference count goes from 1 to 0.
2077  // Until then, it is still in use by the caller (explicitly or via DBConnRef scope).
2078  if ( $conn->getLBInfo( 'foreignPoolRefCount' ) > 0 ) {
2079  $domainsInUse[] = $conn->getDomainID();
2080  }
2081  } );
2082 
2083  // Do not switch connections to explicit foreign domains unless marked as safe
2084  if ( $domainsInUse ) {
2085  $domains = implode( ', ', $domainsInUse );
2086  throw new DBUnexpectedError( null,
2087  "Foreign domain connections are still in use ($domains)" );
2088  }
2089 
2090  $this->setLocalDomain( new DatabaseDomain(
2091  $this->localDomain->getDatabase(),
2092  $this->localDomain->getSchema(),
2093  $prefix
2094  ) );
2095 
2096  // Update the prefix for all local connections...
2097  $this->forEachOpenConnection( function ( IDatabase $db ) use ( $prefix ) {
2098  if ( !$db->getLBInfo( 'foreign' ) ) {
2099  $db->tablePrefix( $prefix );
2100  }
2101  } );
2102  }
2103 
2104  public function redefineLocalDomain( $domain ) {
2105  $this->closeAll();
2106 
2107  $this->setLocalDomain( DatabaseDomain::newFromId( $domain ) );
2108  }
2109 
2113  private function setLocalDomain( DatabaseDomain $domain ) {
2114  $this->localDomain = $domain;
2115  // In case a caller assumes that the domain ID is simply <db>-<prefix>, which is almost
2116  // always true, gracefully handle the case when they fail to account for escaping.
2117  if ( $this->localDomain->getTablePrefix() != '' ) {
2118  $this->localDomainIdAlias =
2119  $this->localDomain->getDatabase() . '-' . $this->localDomain->getTablePrefix();
2120  } else {
2121  $this->localDomainIdAlias = $this->localDomain->getDatabase();
2122  }
2123  }
2124 
2125  function __destruct() {
2126  // Avoid connection leaks for sanity
2127  $this->disable();
2128  }
2129 }
2130 
2134 class_alias( LoadBalancer::class, 'LoadBalancer' );
Helper class that detects high-contention DB queries via profiling calls.
getLocalConnection( $i, $flags=0)
Open a connection to a local DB, or return one if it is already open.
flushReplicaSnapshots( $fname=__METHOD__)
Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshots.
setExistingReaderIndex( $group, $index)
Set the server index chosen by the load balancer for use with the given query group.
Database $errorConnection
DB connection object that caused a problem.
runMasterTransactionListenerCallbacks( $fname=__METHOD__, $owner=null)
Run all recurring post-COMMIT/ROLLBACK listener callbacks.
callable $deprecationLogger
Deprecation logger.
close()
Close the database connection.
pendingWriteCallers()
Get the list of method names that did write queries for this transaction.
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition: deferred.txt:11
enforceConnectionFlags(IDatabase $conn, $flags)
masterRunningReadOnly( $domain, IDatabase $conn=null)
array [] $groupLoads
Map of (group => server index => weight)
getConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a database connection handle reference.
getLocalDomainID()
Get the local (and default) database domain ID of connection handles.
closeConnection(IDatabase $conn)
Close a connection.
setTransactionListener( $name, callable $callback=null)
Set a callback via IDatabase::setTransactionListener() on all current and future master connections o...
string bool $readOnlyReason
Reason the LB is read-only or false if not.
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
Definition: Database.php:762
$success
forEachOpenReplicaConnection( $callback, array $params=[])
Call a function with each open replica DB connection object.
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:187
allowLagged( $mode=null)
Disables/enables lag checks.
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2159
getReaderIndex( $group=false, $domain=false)
Get the index of the reader connection, which may be a replica DB.
getAnyOpenConnection( $i, $flags=0)
Get any open connection to a given server index, local or foreign.
applyTransactionRoundFlags(Database $conn)
Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round.
runMasterTransactionIdleCallbacks( $fname=__METHOD__, $owner=null)
Consume and run all pending post-COMMIT/ROLLBACK callbacks and commit dangling transactions.
TransactionProfiler $trxProfiler
getServer()
Get the server hostname or IP address.
Definition: Database.php:2349
safeGetLag(IDatabase $conn)
Get the lag in seconds for a given connection, or zero if this load balancer does not have replicatio...
lazyLoadReplicationPositions()
Make sure that any "waitForPos" positions are loaded and available to doWait()
setTrxEndCallbackSuppression( $suppress)
Whether to disable running of post-COMMIT/ROLLBACK callbacks.
Definition: Database.php:3473
rollbackMasterChanges( $fname=__METHOD__, $owner=null)
Issue ROLLBACK only on master, only if queries were done on connection.
int $genericReadIndex
The generic (not query grouped) replica DB index.
getReadOnlyReason( $domain=false, IDatabase $conn=null)
hasReached(DBMasterPos $pos)
__construct(array $params)
Construct a manager of IDatabase connection objects.
getServerType( $i)
Get DB type of the server with the specified index.
bool $connectionAttempted
Whether any connection has been attempted yet.
trxLevel()
Gets the current transaction level.
Definition: Database.php:515
lastMasterChangeTimestamp()
Get the timestamp of the latest write query done by this thread.
getLag()
Get the amount of replication lag for this database server.
setLocalDomain(DatabaseDomain $domain)
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
static factory( $dbType, $p=[], $connect=self::NEW_CONNECTED)
Construct a Database subclass instance given a database type and parameters.
Definition: Database.php:361
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Definition: DBConnRef.php:14
bool $allowLagged
Whether to disregard replica DB lag as a factor in replica DB selection.
trxLevel()
Gets the current transaction level.
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
Definition: Database.php:569
getDomainID()
Return the currently selected domain ID.
see documentation in includes Linker php for Linker::makeImageLink & $time
Definition: hooks.txt:1799
const DB_MASTER
Definition: defines.php:26
array $loadMonitorConfig
The LoadMonitor configuration.
rollback( $fname=__METHOD__, $flush='')
Rollback a transaction previously started using begin().
flushMasterSnapshots( $fname=__METHOD__)
Commit all master DB transactions so as to flush any REPEATABLE-READ or SSI snapshots.
The index of the header message $result[1]=The index of the body text message $result[2 through n]=Parameters passed to body text message. Please note the header message cannot receive/use parameters. 'ImportHandleLogItemXMLTag':When parsing a XML tag in a log item. Return false to stop further processing of the tag $reader:XMLReader object $logInfo:Array of information 'ImportHandlePageXMLTag':When parsing a XML tag in a page. Return false to stop further processing of the tag $reader:XMLReader object & $pageInfo:Array of information 'ImportHandleRevisionXMLTag':When parsing a XML tag in a page revision. Return false to stop further processing of the tag $reader:XMLReader object $pageInfo:Array of page information $revisionInfo:Array of revision information 'ImportHandleToplevelXMLTag':When parsing a top level XML tag. Return false to stop further processing of the tag $reader:XMLReader object 'ImportHandleUnknownUser':When a user doesn 't exist locally, this hook is called to give extensions an opportunity to auto-create it. If the auto-creation is successful, return false. $name:User name 'ImportHandleUploadXMLTag':When parsing a XML tag in a file upload. Return false to stop further processing of the tag $reader:XMLReader object $revisionInfo:Array of information 'ImportLogInterwikiLink':Hook to change the interwiki link used in log entries and edit summaries for transwiki imports. & $fullInterwikiPrefix:Interwiki prefix, may contain colons. & $pageTitle:String that contains page title. 'ImportSources':Called when reading from the $wgImportSources configuration variable. Can be used to lazy-load the import sources list. & $importSources:The value of $wgImportSources. Modify as necessary. See the comment in DefaultSettings.php for the detail of how to structure this array. 'InfoAction':When building information to display on the action=info page. $context:IContextSource object & $pageInfo:Array of information 'InitializeArticleMaybeRedirect':MediaWiki check to see if title is a redirect. & $title:Title object for the current page & $request:WebRequest & $ignoreRedirect:boolean to skip redirect check & $target:Title/string of redirect target & $article:Article object 'InternalParseBeforeLinks':during Parser 's internalParse method before links but after nowiki/noinclude/includeonly/onlyinclude and other processings. & $parser:Parser object & $text:string containing partially parsed text & $stripState:Parser 's internal StripState object 'InternalParseBeforeSanitize':during Parser 's internalParse method just before the parser removes unwanted/dangerous HTML tags and after nowiki/noinclude/includeonly/onlyinclude and other processings. Ideal for syntax-extensions after template/parser function execution which respect nowiki and HTML-comments. & $parser:Parser object & $text:string containing partially parsed text & $stripState:Parser 's internal StripState object 'InterwikiLoadPrefix':When resolving if a given prefix is an interwiki or not. Return true without providing an interwiki to continue interwiki search. $prefix:interwiki prefix we are looking for. & $iwData:output array describing the interwiki with keys iw_url, iw_local, iw_trans and optionally iw_api and iw_wikiid. 'InvalidateEmailComplete':Called after a user 's email has been invalidated successfully. $user:user(object) whose email is being invalidated 'IRCLineURL':When constructing the URL to use in an IRC notification. Callee may modify $url and $query, URL will be constructed as $url . $query & $url:URL to index.php & $query:Query string $rc:RecentChange object that triggered url generation 'IsFileCacheable':Override the result of Article::isFileCacheable()(if true) & $article:article(object) being checked 'IsTrustedProxy':Override the result of IP::isTrustedProxy() & $ip:IP being check & $result:Change this value to override the result of IP::isTrustedProxy() 'IsUploadAllowedFromUrl':Override the result of UploadFromUrl::isAllowedUrl() $url:URL used to upload from & $allowed:Boolean indicating if uploading is allowed for given URL 'isValidEmailAddr':Override the result of Sanitizer::validateEmail(), for instance to return false if the domain name doesn 't match your organization. $addr:The e-mail address entered by the user & $result:Set this and return false to override the internal checks 'isValidPassword':Override the result of User::isValidPassword() $password:The password entered by the user & $result:Set this and return false to override the internal checks $user:User the password is being validated for 'Language::getMessagesFileName':$code:The language code or the language we 're looking for a messages file for & $file:The messages file path, you can override this to change the location. 'LanguageGetNamespaces':Provide custom ordering for namespaces or remove namespaces. Do not use this hook to add namespaces. Use CanonicalNamespaces for that. & $namespaces:Array of namespaces indexed by their numbers 'LanguageGetTranslatedLanguageNames':Provide translated language names. & $names:array of language code=> language name $code:language of the preferred translations 'LanguageLinks':Manipulate a page 's language links. This is called in various places to allow extensions to define the effective language links for a page. $title:The page 's Title. & $links:Array with elements of the form "language:title" in the order that they will be output. & $linkFlags:Associative array mapping prefixed links to arrays of flags. Currently unused, but planned to provide support for marking individual language links in the UI, e.g. for featured articles. 'LanguageSelector':Hook to change the language selector available on a page. $out:The output page. $cssClassName:CSS class name of the language selector. 'LinkBegin':DEPRECATED since 1.28! Use HtmlPageLinkRendererBegin instead. Used when generating internal and interwiki links in Linker::link(), before processing starts. Return false to skip default processing and return $ret. See documentation for Linker::link() for details on the expected meanings of parameters. $skin:the Skin object $target:the Title that the link is pointing to & $html:the contents that the< a > tag should have(raw HTML) $result
Definition: hooks.txt:1980
pendingWriteQueryDuration( $type=self::ESTIMATE_TOTAL)
Get the time spend running write queries for this transaction.
string [] $indexAliases
Map of (index alias => index)
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...
setLBInfo( $name, $value=null)
Set the LB info array, or a member of it.
int $connsOpened
Total connections opened.
commitMasterChanges( $fname=__METHOD__, $owner=null)
Issue COMMIT on all open master connections to flush changes and view snapshots.
getDBname()
Get the current DB name.
Definition: Database.php:2345
getRandomNonLagged(array $loads, $domain=false, $maxLag=INF)
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
setLocalDomainPrefix( $prefix)
Set a new table prefix for the existing local domain ID for testing.
assertNoOpenTransactions()
Assert that all explicit transactions or atomic sections have been closed.
lastDoneWrites()
Returns the last time the connection may have been used for write queries.
getServerCount()
Get the number of defined servers (not the number of open connections)
getExistingReaderIndex( $group)
Get the server index chosen by the load balancer for use with the given query group.
disable()
Disable this load balancer.
string $agent
Agent name for query profiling.
getLazyConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a database connection handle reference without connecting yet.
An object representing a master or replica DB position in a replicated setup.
Definition: DBMasterPos.php:12
forEachOpenConnection( $callback, array $params=[])
Call a function with each open connection object.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
haveIndex( $i)
Returns true if the specified index is a valid server index.
string bool $trxRoundId
String if a requested DBO_TRX transaction round is active.
waitFor( $pos)
Set the master position to reach before the next generic group DB handle query.
setTableAliases(array $aliases)
Make certain table names use their own database, schema, and table prefix when passed into SQL querie...
bool DBMasterPos $waitForPos
Replication sync position or false if not set.
laggedReplicaUsed()
Checks whether the database for generic connections this request was both:
mixed $profiler
Class name or object With profileIn/profileOut methods.
getServerName( $i)
Get the host name or IP address of the server with the specified index.
string $trxRoundStage
Stage of the current transaction round in the transaction round life-cycle.
runOnTransactionIdleCallbacks( $trigger)
Actually consume and run any "on transaction idle/resolution" callbacks.
Definition: Database.php:3487
commitAll( $fname=__METHOD__, $owner=null)
Commit transactions on all open connections.
$cache
Definition: mcc.php:33
$params
null means default in associative array with keys and values unescaped Should be merged with default with a value of false meaning to suppress the attribute in associative array with keys and values unescaped & $options
Definition: hooks.txt:1982
undoTransactionRoundFlags(Database $conn)
runTransactionListenerCallbacks( $trigger)
Actually run any "transaction listener" callbacks.
Definition: Database.php:3583
this hook is for auditing only or null if authentication failed before getting that far or null if we can t even determine that When $user is not null
Definition: hooks.txt:780
getMaxLag( $domain=false)
Get the hostname and lag time of the most-lagged replica DB.
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
Definition: Database.php:727
hasMasterChanges()
Whether there are pending changes or callbacks in a transaction by this thread.
commit( $fname=__METHOD__, $flush='')
Commits a transaction previously started using begin().
restoreFlags( $state=self::RESTORE_PRIOR)
Restore the flags to their prior state before the last setFlag/clearFlag call.
Definition: Database.php:749
waitForOne( $pos, $timeout=null)
Set the master wait position and wait for a generic replica DB to catch up to it. ...
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
setTransactionListener( $name, callable $callback=null)
Run a callback after each time any transaction commits or rolls back.
if(defined( 'MW_SETUP_CALLBACK')) $fname
Customization point after all loading (constants, functions, classes, DefaultSettings, LocalSettings).
Definition: Setup.php:123
runOnTransactionPreCommitCallbacks()
Actually consume and run any "on transaction pre-commit" callbacks.
Definition: Database.php:3548
float [] $genericLoads
Map of (server index => weight)
getLaggedReplicaMode( $domain=false)
bool $allReplicasDownMode
Whether the generic reader fell back to a lagged replica DB.
const DBO_TRX
Definition: defines.php:12
assertOwnership( $fname, $owner)
getForeignConnection( $i, $domain, $flags=0)
Open a connection to a foreign DB, or return one if it is already open.
int $waitTimeout
Seconds to spend waiting on replica DB lag to resolve.
beginMasterChanges( $fname=__METHOD__, $owner=null)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
getLoadMonitor()
Get a LoadMonitor instance.
masterPosWait(DBMasterPos $pos, $timeout)
Wait for the replica DB to catch up to a given master position.
approveMasterChanges(array $options, $fname=__METHOD__, $owner=null)
Perform all pre-commit checks for things like replication safety.
reallyOpenConnection(array $server, DatabaseDomain $domain)
Open a new network connection to a server (uncached)
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
const DBO_DEFAULT
Definition: defines.php:13
getMasterPos()
Get the current master position for chronology control purposes.
Database cluster connection, tracking, load balancing, and transaction manager interface.
Class to handle database/prefix specification for IDatabase domains.
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
pickReaderIndex(array $loads, $domain=false)
LoggerInterface $queryLogger
setIndexAliases(array $aliases)
Convert certain index names to alternative names before querying the DB.
waitForAll( $pos, $timeout=null)
Set the master wait position and wait for ALL replica DBs to catch up to it.
int [] $readIndexByGroup
The group replica DB indexes keyed by group.
you have access to all of the normal MediaWiki so you can get a DB use the etc For full docs on the Maintenance class
Definition: maintenance.txt:52
Relational database abstraction object.
Definition: Database.php:48
static attributesFromType( $dbType, $driver=null)
Definition: Database.php:416
getConnection( $i, $groups=[], $domain=false, $flags=0)
Get a connection handle by server index.
callable null $chronologyCallback
Callback to run before the first connection attempt.
pendingWriteAndCallbackCallers()
List the methods that have write queries or callbacks for the current transaction.
Definition: Database.php:699
commit( $fname=__METHOD__, $flush=self::FLUSHING_ONE)
Commits a transaction previously started using begin().
Definition: Database.php:3882
getConnectionIndex( $i, $groups, $domain)
getLagTimes( $domain=false)
Get an estimate of replication lag (in seconds) for each server.
bool $laggedReplicaMode
Whether the generic reader fell back to a lagged replica DB.
array [] $trxRecurringCallbacks
Map of (name => callable)
array [] $servers
Map of (server index => server config array)
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction you ll probably need to make sure the header is varied on and they can depend only on the ResourceLoaderContext $context
Definition: hooks.txt:2633
getServer()
Get the server hostname or IP address.
storage can be distributed across multiple servers
Definition: memcached.txt:33
Exception class for attempted DB access.
safeWaitForMasterPos(IDatabase $conn, $pos=false, $timeout=null)
Wait for a replica DB to reach a specified master position.
Database [][][] $conns
Map of (connection category => server index => IDatabase[])
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
getMaintenanceConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a maintenance database connection handle reference for migrations and schema changes.
bool $cliMode
Whether this PHP instance is for a CLI script.
pendingMasterChangeCallers()
Get the list of callers that have pending master changes.
string $hostname
Current server name.
Exception class for replica DB wait errors.
static pickRandom( $weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
Definition: ArrayUtils.php:66
DatabaseDomain $localDomain
Local DB domain ID and default for selectDB() calls.
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:271
string $localDomainIdAlias
Alternate local DB domain instead of DatabaseDomain::getId()
isNonZeroLoad( $i)
Returns true if the specified index is valid and has non-zero load.
const DB_REPLICA
Definition: defines.php:25
Database connection, tracking, load balancing, and transaction manager for a cluster.
int null $ownerId
An integer ID of the managing LBFactory instance or null.
closeAll()
Close all open connections.
redefineLocalDomain( $domain)
Close all connection and redefine the local domain for testing or schema creation.
openConnection( $i, $domain=false, $flags=0)
getServerInfo( $i)
Return the server info structure for a given index, or false if the index is invalid.
callable $errorLogger
Exception logger.
setLBInfo( $name, $value=null)
Set the LB info array, or a member of it.
Definition: Database.php:581
doWait( $index, $open=false, $timeout=null)
Wait for a given replica DB to catch up to the master pos stored in $this.
tablePrefix( $prefix=null)
Get/set the table prefix.
reuseConnection(IDatabase $conn)
Mark a foreign connection as being available for reuse under a different DB domain.
flushSnapshot( $fname=__METHOD__)
Commit any transaction but error out if writes or callbacks are pending.
clearFlag( $flag, $remember=self::REMEMBER_NOTHING)
Clear a flag for this connection.
string $lastError
The last DB selection or connection error.
Database error base class.
Definition: DBError.php:30
isOpen( $index)
Test if the specified index represents an open connection.
ping(&$rtt=null)
Ping the server and try to reconnect if it there is no connection.
finalizeMasterChanges( $fname=__METHOD__, $owner=null)
Run pre-commit callbacks and defer execution of post-commit callbacks.