MediaWiki  1.28.1
LoadBalancer.php
Go to the documentation of this file.
1 <?php
25 
31 class LoadBalancer implements ILoadBalancer {
33  private $mServers;
35  private $mConns;
37  private $mLoads;
39  private $mGroupLoads;
41  private $mAllowLagged;
43  private $mWaitTimeout;
47  private $tableAliases = [];
48 
50  private $loadMonitor;
52  private $srvCache;
54  private $memCache;
56  private $wanCache;
58  protected $profiler;
60  protected $trxProfiler;
62  protected $replLogger;
64  protected $connLogger;
66  protected $queryLogger;
68  protected $perfLogger;
69 
73  private $mReadIndex;
75  private $mWaitForPos;
77  private $laggedReplicaMode = false;
79  private $allReplicasDownMode = false;
81  private $mLastError = 'Unknown error';
83  private $readOnlyReason = false;
85  private $connsOpened = 0;
87  private $trxRoundId = false;
89  private $trxRecurringCallbacks = [];
91  private $localDomain;
95  private $host;
97  protected $cliMode;
99  protected $agent;
100 
102  private $errorLogger;
103 
105  private $disabled = false;
106 
108  const CONN_HELD_WARN_THRESHOLD = 10;
109 
111  const MAX_LAG_DEFAULT = 10;
113  const TTL_CACHE_READONLY = 5;
114 
115  public function __construct( array $params ) {
116  if ( !isset( $params['servers'] ) ) {
117  throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
118  }
119  $this->mServers = $params['servers'];
120 
121  $this->localDomain = isset( $params['localDomain'] )
122  ? DatabaseDomain::newFromId( $params['localDomain'] )
124  // In case a caller assumes that the domain ID is simply <db>-<prefix>, which is almost
125  // always true, gracefully handle the case when they fail to account for escaping.
126  if ( $this->localDomain->getTablePrefix() != '' ) {
127  $this->localDomainIdAlias =
128  $this->localDomain->getDatabase() . '-' . $this->localDomain->getTablePrefix();
129  } else {
130  $this->localDomainIdAlias = $this->localDomain->getDatabase();
131  }
132 
133  $this->mWaitTimeout = isset( $params['waitTimeout'] ) ? $params['waitTimeout'] : 10;
134 
135  $this->mReadIndex = -1;
136  $this->mConns = [
137  'local' => [],
138  'foreignUsed' => [],
139  'foreignFree' => []
140  ];
141  $this->mLoads = [];
142  $this->mWaitForPos = false;
143  $this->mErrorConnection = false;
144  $this->mAllowLagged = false;
145 
146  if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
147  $this->readOnlyReason = $params['readOnlyReason'];
148  }
149 
150  if ( isset( $params['loadMonitor'] ) ) {
151  $this->loadMonitorConfig = $params['loadMonitor'];
152  } else {
153  $this->loadMonitorConfig = [ 'class' => 'LoadMonitorNull' ];
154  }
155 
156  foreach ( $params['servers'] as $i => $server ) {
157  $this->mLoads[$i] = $server['load'];
158  if ( isset( $server['groupLoads'] ) ) {
159  foreach ( $server['groupLoads'] as $group => $ratio ) {
160  if ( !isset( $this->mGroupLoads[$group] ) ) {
161  $this->mGroupLoads[$group] = [];
162  }
163  $this->mGroupLoads[$group][$i] = $ratio;
164  }
165  }
166  }
167 
168  if ( isset( $params['srvCache'] ) ) {
169  $this->srvCache = $params['srvCache'];
170  } else {
171  $this->srvCache = new EmptyBagOStuff();
172  }
173  if ( isset( $params['memCache'] ) ) {
174  $this->memCache = $params['memCache'];
175  } else {
176  $this->memCache = new EmptyBagOStuff();
177  }
178  if ( isset( $params['wanCache'] ) ) {
179  $this->wanCache = $params['wanCache'];
180  } else {
181  $this->wanCache = WANObjectCache::newEmpty();
182  }
183  $this->profiler = isset( $params['profiler'] ) ? $params['profiler'] : null;
184  if ( isset( $params['trxProfiler'] ) ) {
185  $this->trxProfiler = $params['trxProfiler'];
186  } else {
187  $this->trxProfiler = new TransactionProfiler();
188  }
189 
190  $this->errorLogger = isset( $params['errorLogger'] )
191  ? $params['errorLogger']
192  : function ( Exception $e ) {
193  trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
194  };
195 
196  foreach ( [ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ] as $key ) {
197  $this->$key = isset( $params[$key] ) ? $params[$key] : new \Psr\Log\NullLogger();
198  }
199 
200  $this->host = isset( $params['hostname'] )
201  ? $params['hostname']
202  : ( gethostname() ?: 'unknown' );
203  $this->cliMode = isset( $params['cliMode'] ) ? $params['cliMode'] : PHP_SAPI === 'cli';
204  $this->agent = isset( $params['agent'] ) ? $params['agent'] : '';
205  }
206 
212  private function getLoadMonitor() {
213  if ( !isset( $this->loadMonitor ) ) {
214  $class = $this->loadMonitorConfig['class'];
215  $this->loadMonitor = new $class(
216  $this, $this->srvCache, $this->memCache, $this->loadMonitorConfig );
217  $this->loadMonitor->setLogger( $this->replLogger );
218  }
219 
220  return $this->loadMonitor;
221  }
222 
229  private function getRandomNonLagged( array $loads, $domain = false, $maxLag = INF ) {
230  $lags = $this->getLagTimes( $domain );
231 
232  # Unset excessively lagged servers
233  foreach ( $lags as $i => $lag ) {
234  if ( $i != 0 ) {
235  # How much lag this server nominally is allowed to have
236  $maxServerLag = isset( $this->mServers[$i]['max lag'] )
237  ? $this->mServers[$i]['max lag']
238  : self::MAX_LAG_DEFAULT; // default
239  # Constrain that futher by $maxLag argument
240  $maxServerLag = min( $maxServerLag, $maxLag );
241 
242  $host = $this->getServerName( $i );
243  if ( $lag === false && !is_infinite( $maxServerLag ) ) {
244  $this->replLogger->error( "Server $host (#$i) is not replicating?" );
245  unset( $loads[$i] );
246  } elseif ( $lag > $maxServerLag ) {
247  $this->replLogger->warning( "Server $host (#$i) has >= $lag seconds of lag" );
248  unset( $loads[$i] );
249  }
250  }
251  }
252 
253  # Find out if all the replica DBs with non-zero load are lagged
254  $sum = 0;
255  foreach ( $loads as $load ) {
256  $sum += $load;
257  }
258  if ( $sum == 0 ) {
259  # No appropriate DB servers except maybe the master and some replica DBs with zero load
260  # Do NOT use the master
261  # Instead, this function will return false, triggering read-only mode,
262  # and a lagged replica DB will be used instead.
263  return false;
264  }
265 
266  if ( count( $loads ) == 0 ) {
267  return false;
268  }
269 
270  # Return a random representative of the remainder
271  return ArrayUtils::pickRandom( $loads );
272  }
273 
274  public function getReaderIndex( $group = false, $domain = false ) {
275  if ( count( $this->mServers ) == 1 ) {
276  # Skip the load balancing if there's only one server
277  return $this->getWriterIndex();
278  } elseif ( $group === false && $this->mReadIndex >= 0 ) {
279  # Shortcut if generic reader exists already
280  return $this->mReadIndex;
281  }
282 
283  # Find the relevant load array
284  if ( $group !== false ) {
285  if ( isset( $this->mGroupLoads[$group] ) ) {
286  $nonErrorLoads = $this->mGroupLoads[$group];
287  } else {
288  # No loads for this group, return false and the caller can use some other group
289  $this->connLogger->info( __METHOD__ . ": no loads for group $group" );
290 
291  return false;
292  }
293  } else {
294  $nonErrorLoads = $this->mLoads;
295  }
296 
297  if ( !count( $nonErrorLoads ) ) {
298  throw new InvalidArgumentException( "Empty server array given to LoadBalancer" );
299  }
300 
301  # Scale the configured load ratios according to the dynamic load if supported
302  $this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $domain );
303 
304  $laggedReplicaMode = false;
305 
306  # No server found yet
307  $i = false;
308  # First try quickly looking through the available servers for a server that
309  # meets our criteria
310  $currentLoads = $nonErrorLoads;
311  while ( count( $currentLoads ) ) {
312  if ( $this->mAllowLagged || $laggedReplicaMode ) {
313  $i = ArrayUtils::pickRandom( $currentLoads );
314  } else {
315  $i = false;
316  if ( $this->mWaitForPos && $this->mWaitForPos->asOfTime() ) {
317  # ChronologyProtecter causes mWaitForPos to be set via sessions.
318  # This triggers doWait() after connect, so it's especially good to
319  # avoid lagged servers so as to avoid just blocking in that method.
320  $ago = microtime( true ) - $this->mWaitForPos->asOfTime();
321  # Aim for <= 1 second of waiting (being too picky can backfire)
322  $i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
323  }
324  if ( $i === false ) {
325  # Any server with less lag than it's 'max lag' param is preferable
326  $i = $this->getRandomNonLagged( $currentLoads, $domain );
327  }
328  if ( $i === false && count( $currentLoads ) != 0 ) {
329  # All replica DBs lagged. Switch to read-only mode
330  $this->replLogger->error( "All replica DBs lagged. Switch to read-only mode" );
331  $i = ArrayUtils::pickRandom( $currentLoads );
332  $laggedReplicaMode = true;
333  }
334  }
335 
336  if ( $i === false ) {
337  # pickRandom() returned false
338  # This is permanent and means the configuration or the load monitor
339  # wants us to return false.
340  $this->connLogger->debug( __METHOD__ . ": pickRandom() returned false" );
341 
342  return false;
343  }
344 
345  $serverName = $this->getServerName( $i );
346  $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
347 
348  $conn = $this->openConnection( $i, $domain );
349  if ( !$conn ) {
350  $this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" );
351  unset( $nonErrorLoads[$i] );
352  unset( $currentLoads[$i] );
353  $i = false;
354  continue;
355  }
356 
357  // Decrement reference counter, we are finished with this connection.
358  // It will be incremented for the caller later.
359  if ( $domain !== false ) {
360  $this->reuseConnection( $conn );
361  }
362 
363  # Return this server
364  break;
365  }
366 
367  # If all servers were down, quit now
368  if ( !count( $nonErrorLoads ) ) {
369  $this->connLogger->error( "All servers down" );
370  }
371 
372  if ( $i !== false ) {
373  # Replica DB connection successful.
374  # Wait for the session master pos for a short time.
375  if ( $this->mWaitForPos && $i > 0 ) {
376  $this->doWait( $i );
377  }
378  if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group === false ) {
379  $this->mReadIndex = $i;
380  # Record if the generic reader index is in "lagged replica DB" mode
381  if ( $laggedReplicaMode ) {
382  $this->laggedReplicaMode = true;
383  }
384  }
385  $serverName = $this->getServerName( $i );
386  $this->connLogger->debug(
387  __METHOD__ . ": using server $serverName for group '$group'" );
388  }
389 
390  return $i;
391  }
392 
396  public function waitFor( $pos ) {
397  $this->mWaitForPos = $pos;
398  $i = $this->mReadIndex;
399 
400  if ( $i > 0 ) {
401  if ( !$this->doWait( $i ) ) {
402  $this->laggedReplicaMode = true;
403  }
404  }
405  }
406 
407  public function waitForOne( $pos, $timeout = null ) {
408  $this->mWaitForPos = $pos;
409 
410  $i = $this->mReadIndex;
411  if ( $i <= 0 ) {
412  // Pick a generic replica DB if there isn't one yet
413  $readLoads = $this->mLoads;
414  unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
415  $readLoads = array_filter( $readLoads ); // with non-zero load
416  $i = ArrayUtils::pickRandom( $readLoads );
417  }
418 
419  if ( $i > 0 ) {
420  $ok = $this->doWait( $i, true, $timeout );
421  } else {
422  $ok = true; // no applicable loads
423  }
424 
425  return $ok;
426  }
427 
428  public function waitForAll( $pos, $timeout = null ) {
429  $this->mWaitForPos = $pos;
430  $serverCount = count( $this->mServers );
431 
432  $ok = true;
433  for ( $i = 1; $i < $serverCount; $i++ ) {
434  if ( $this->mLoads[$i] > 0 ) {
435  $ok = $this->doWait( $i, true, $timeout ) && $ok;
436  }
437  }
438 
439  return $ok;
440  }
441 
446  public function getAnyOpenConnection( $i ) {
447  foreach ( $this->mConns as $connsByServer ) {
448  if ( !empty( $connsByServer[$i] ) ) {
449  return reset( $connsByServer[$i] );
450  }
451  }
452 
453  return false;
454  }
455 
463  protected function doWait( $index, $open = false, $timeout = null ) {
464  $close = false; // close the connection afterwards
465 
466  // Check if we already know that the DB has reached this point
467  $server = $this->getServerName( $index );
468  $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server );
470  $knownReachedPos = $this->srvCache->get( $key );
471  if ( $knownReachedPos && $knownReachedPos->hasReached( $this->mWaitForPos ) ) {
472  $this->replLogger->debug( __METHOD__ .
473  ": replica DB $server known to be caught up (pos >= $knownReachedPos)." );
474  return true;
475  }
476 
477  // Find a connection to wait on, creating one if needed and allowed
478  $conn = $this->getAnyOpenConnection( $index );
479  if ( !$conn ) {
480  if ( !$open ) {
481  $this->replLogger->debug( __METHOD__ . ": no connection open for $server" );
482 
483  return false;
484  } else {
485  $conn = $this->openConnection( $index, self::DOMAIN_ANY );
486  if ( !$conn ) {
487  $this->replLogger->warning( __METHOD__ . ": failed to connect to $server" );
488 
489  return false;
490  }
491  // Avoid connection spam in waitForAll() when connections
492  // are made just for the sake of doing this lag check.
493  $close = true;
494  }
495  }
496 
497  $this->replLogger->info( __METHOD__ . ": Waiting for replica DB $server to catch up..." );
498  $timeout = $timeout ?: $this->mWaitTimeout;
499  $result = $conn->masterPosWait( $this->mWaitForPos, $timeout );
500 
501  if ( $result == -1 || is_null( $result ) ) {
502  // Timed out waiting for replica DB, use master instead
503  $msg = __METHOD__ . ": Timed out waiting on $server pos {$this->mWaitForPos}";
504  $this->replLogger->warning( "$msg" );
505  $ok = false;
506  } else {
507  $this->replLogger->info( __METHOD__ . ": Done" );
508  $ok = true;
509  // Remember that the DB reached this point
510  $this->srvCache->set( $key, $this->mWaitForPos, BagOStuff::TTL_DAY );
511  }
512 
513  if ( $close ) {
514  $this->closeConnection( $conn );
515  }
516 
517  return $ok;
518  }
519 
529  public function getConnection( $i, $groups = [], $domain = false ) {
530  if ( $i === null || $i === false ) {
531  throw new InvalidArgumentException( 'Attempt to call ' . __METHOD__ .
532  ' with invalid server index' );
533  }
534 
535  if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
536  $domain = false; // local connection requested
537  }
538 
539  $groups = ( $groups === false || $groups === [] )
540  ? [ false ] // check one "group": the generic pool
541  : (array)$groups;
542 
543  $masterOnly = ( $i == self::DB_MASTER || $i == $this->getWriterIndex() );
544  $oldConnsOpened = $this->connsOpened; // connections open now
545 
546  if ( $i == self::DB_MASTER ) {
547  $i = $this->getWriterIndex();
548  } else {
549  # Try to find an available server in any the query groups (in order)
550  foreach ( $groups as $group ) {
551  $groupIndex = $this->getReaderIndex( $group, $domain );
552  if ( $groupIndex !== false ) {
553  $i = $groupIndex;
554  break;
555  }
556  }
557  }
558 
559  # Operation-based index
560  if ( $i == self::DB_REPLICA ) {
561  $this->mLastError = 'Unknown error'; // reset error string
562  # Try the general server pool if $groups are unavailable.
563  $i = ( $groups === [ false ] )
564  ? false // don't bother with this if that is what was tried above
565  : $this->getReaderIndex( false, $domain );
566  # Couldn't find a working server in getReaderIndex()?
567  if ( $i === false ) {
568  $this->mLastError = 'No working replica DB server: ' . $this->mLastError;
569  // Throw an exception
570  $this->reportConnectionError();
571  return null; // not reached
572  }
573  }
574 
575  # Now we have an explicit index into the servers array
576  $conn = $this->openConnection( $i, $domain );
577  if ( !$conn ) {
578  // Throw an exception
579  $this->reportConnectionError();
580  return null; // not reached
581  }
582 
583  # Profile any new connections that happen
584  if ( $this->connsOpened > $oldConnsOpened ) {
585  $host = $conn->getServer();
586  $dbname = $conn->getDBname();
587  $this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
588  }
589 
590  if ( $masterOnly ) {
591  # Make master-requested DB handles inherit any read-only mode setting
592  $conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $domain, $conn ) );
593  }
594 
595  return $conn;
596  }
597 
598  public function reuseConnection( $conn ) {
599  $serverIndex = $conn->getLBInfo( 'serverIndex' );
600  $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
601  if ( $serverIndex === null || $refCount === null ) {
612  return;
613  } elseif ( $conn instanceof DBConnRef ) {
614  // DBConnRef already handles calling reuseConnection() and only passes the live
615  // Database instance to this method. Any caller passing in a DBConnRef is broken.
616  $this->connLogger->error( __METHOD__ . ": got DBConnRef instance.\n" .
617  ( new RuntimeException() )->getTraceAsString() );
618 
619  return;
620  }
621 
622  if ( $this->disabled ) {
623  return; // DBConnRef handle probably survived longer than the LoadBalancer
624  }
625 
626  $domain = $conn->getDomainID();
627  if ( !isset( $this->mConns['foreignUsed'][$serverIndex][$domain] ) ) {
628  throw new InvalidArgumentException( __METHOD__ .
629  ": connection $serverIndex/$domain not found; it may have already been freed." );
630  } elseif ( $this->mConns['foreignUsed'][$serverIndex][$domain] !== $conn ) {
631  throw new InvalidArgumentException( __METHOD__ .
632  ": connection $serverIndex/$domain mismatched; it may have already been freed." );
633  }
634  $conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
635  if ( $refCount <= 0 ) {
636  $this->mConns['foreignFree'][$serverIndex][$domain] = $conn;
637  unset( $this->mConns['foreignUsed'][$serverIndex][$domain] );
638  if ( !$this->mConns['foreignUsed'][$serverIndex] ) {
639  unset( $this->mConns[ 'foreignUsed' ][$serverIndex] ); // clean up
640  }
641  $this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
642  } else {
643  $this->connLogger->debug( __METHOD__ .
644  ": reference count for $serverIndex/$domain reduced to $refCount" );
645  }
646  }
647 
648  public function getConnectionRef( $db, $groups = [], $domain = false ) {
649  $domain = ( $domain !== false ) ? $domain : $this->localDomain;
650 
651  return new DBConnRef( $this, $this->getConnection( $db, $groups, $domain ) );
652  }
653 
654  public function getLazyConnectionRef( $db, $groups = [], $domain = false ) {
655  $domain = ( $domain !== false ) ? $domain : $this->localDomain;
656 
657  return new DBConnRef( $this, [ $db, $groups, $domain ] );
658  }
659 
668  public function openConnection( $i, $domain = false ) {
669  if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
670  $domain = false; // local connection requested
671  }
672 
673  if ( $domain !== false ) {
674  $conn = $this->openForeignConnection( $i, $domain );
675  } elseif ( isset( $this->mConns['local'][$i][0] ) ) {
676  $conn = $this->mConns['local'][$i][0];
677  } else {
678  if ( !isset( $this->mServers[$i] ) || !is_array( $this->mServers[$i] ) ) {
679  throw new InvalidArgumentException( "No server with index '$i'." );
680  }
681  // Open a new connection
682  $server = $this->mServers[$i];
683  $server['serverIndex'] = $i;
684  $conn = $this->reallyOpenConnection( $server, false );
685  $serverName = $this->getServerName( $i );
686  if ( $conn->isOpen() ) {
687  $this->connLogger->debug( "Connected to database $i at '$serverName'." );
688  $this->mConns['local'][$i][0] = $conn;
689  } else {
690  $this->connLogger->warning( "Failed to connect to database $i at '$serverName'." );
691  $this->mErrorConnection = $conn;
692  $conn = false;
693  }
694  }
695 
696  if ( $conn && !$conn->isOpen() ) {
697  // Connection was made but later unrecoverably lost for some reason.
698  // Do not return a handle that will just throw exceptions on use,
699  // but let the calling code (e.g. getReaderIndex) try another server.
700  // See DatabaseMyslBase::ping() for how this can happen.
701  $this->mErrorConnection = $conn;
702  $conn = false;
703  }
704 
705  return $conn;
706  }
707 
728  private function openForeignConnection( $i, $domain ) {
729  $domainInstance = DatabaseDomain::newFromId( $domain );
730  $dbName = $domainInstance->getDatabase();
731  $prefix = $domainInstance->getTablePrefix();
732 
733  if ( isset( $this->mConns['foreignUsed'][$i][$domain] ) ) {
734  // Reuse an already-used connection
735  $conn = $this->mConns['foreignUsed'][$i][$domain];
736  $this->connLogger->debug( __METHOD__ . ": reusing connection $i/$domain" );
737  } elseif ( isset( $this->mConns['foreignFree'][$i][$domain] ) ) {
738  // Reuse a free connection for the same domain
739  $conn = $this->mConns['foreignFree'][$i][$domain];
740  unset( $this->mConns['foreignFree'][$i][$domain] );
741  $this->mConns['foreignUsed'][$i][$domain] = $conn;
742  $this->connLogger->debug( __METHOD__ . ": reusing free connection $i/$domain" );
743  } elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) {
744  // Reuse a connection from another domain
745  $conn = reset( $this->mConns['foreignFree'][$i] );
746  $oldDomain = key( $this->mConns['foreignFree'][$i] );
747  // The empty string as a DB name means "don't care".
748  // DatabaseMysqlBase::open() already handle this on connection.
749  if ( strlen( $dbName ) && !$conn->selectDB( $dbName ) ) {
750  $this->mLastError = "Error selecting database '$dbName' on server " .
751  $conn->getServer() . " from client host {$this->host}";
752  $this->mErrorConnection = $conn;
753  $conn = false;
754  } else {
755  $conn->tablePrefix( $prefix );
756  unset( $this->mConns['foreignFree'][$i][$oldDomain] );
757  $this->mConns['foreignUsed'][$i][$domain] = $conn;
758  $this->connLogger->debug( __METHOD__ .
759  ": reusing free connection from $oldDomain for $domain" );
760  }
761  } else {
762  if ( !isset( $this->mServers[$i] ) || !is_array( $this->mServers[$i] ) ) {
763  throw new InvalidArgumentException( "No server with index '$i'." );
764  }
765  // Open a new connection
766  $server = $this->mServers[$i];
767  $server['serverIndex'] = $i;
768  $server['foreignPoolRefCount'] = 0;
769  $server['foreign'] = true;
770  $conn = $this->reallyOpenConnection( $server, $dbName );
771  if ( !$conn->isOpen() ) {
772  $this->connLogger->warning( __METHOD__ . ": connection error for $i/$domain" );
773  $this->mErrorConnection = $conn;
774  $conn = false;
775  } else {
776  $conn->tablePrefix( $prefix );
777  $this->mConns['foreignUsed'][$i][$domain] = $conn;
778  $this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
779  }
780  }
781 
782  // Increment reference count
783  if ( $conn ) {
784  $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
785  $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
786  }
787 
788  return $conn;
789  }
790 
798  private function isOpen( $index ) {
799  if ( !is_integer( $index ) ) {
800  return false;
801  }
802 
803  return (bool)$this->getAnyOpenConnection( $index );
804  }
805 
817  protected function reallyOpenConnection( array $server, $dbNameOverride = false ) {
818  if ( $this->disabled ) {
819  throw new DBAccessError();
820  }
821 
822  if ( $dbNameOverride !== false ) {
823  $server['dbname'] = $dbNameOverride;
824  }
825 
826  // Let the handle know what the cluster master is (e.g. "db1052")
827  $masterName = $this->getServerName( $this->getWriterIndex() );
828  $server['clusterMasterHost'] = $masterName;
829 
830  // Log when many connection are made on requests
831  if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
832  $this->perfLogger->warning( __METHOD__ . ": " .
833  "{$this->connsOpened}+ connections made (master=$masterName)" );
834  }
835 
836  $server['srvCache'] = $this->srvCache;
837  // Set loggers and profilers
838  $server['connLogger'] = $this->connLogger;
839  $server['queryLogger'] = $this->queryLogger;
840  $server['errorLogger'] = $this->errorLogger;
841  $server['profiler'] = $this->profiler;
842  $server['trxProfiler'] = $this->trxProfiler;
843  // Use the same agent and PHP mode for all DB handles
844  $server['cliMode'] = $this->cliMode;
845  $server['agent'] = $this->agent;
846  // Use DBO_DEFAULT flags by default for LoadBalancer managed databases. Assume that the
847  // application calls LoadBalancer::commitMasterChanges() before the PHP script completes.
848  $server['flags'] = isset( $server['flags'] ) ? $server['flags'] : IDatabase::DBO_DEFAULT;
849 
850  // Create a live connection object
851  try {
852  $db = Database::factory( $server['type'], $server );
853  } catch ( DBConnectionError $e ) {
854  // FIXME: This is probably the ugliest thing I have ever done to
855  // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
856  $db = $e->db;
857  }
858 
859  $db->setLBInfo( $server );
860  $db->setLazyMasterHandle(
861  $this->getLazyConnectionRef( self::DB_MASTER, [], $db->getDomainID() )
862  );
863  $db->setTableAliases( $this->tableAliases );
864 
865  if ( $server['serverIndex'] === $this->getWriterIndex() ) {
866  if ( $this->trxRoundId !== false ) {
867  $this->applyTransactionRoundFlags( $db );
868  }
869  foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
870  $db->setTransactionListener( $name, $callback );
871  }
872  }
873 
874  return $db;
875  }
876 
880  private function reportConnectionError() {
881  $conn = $this->mErrorConnection; // the connection which caused the error
882  $context = [
883  'method' => __METHOD__,
884  'last_error' => $this->mLastError,
885  ];
886 
887  if ( !is_object( $conn ) ) {
888  // No last connection, probably due to all servers being too busy
889  $this->connLogger->error(
890  "LB failure with no last connection. Connection error: {last_error}",
891  $context
892  );
893 
894  // If all servers were busy, mLastError will contain something sensible
895  throw new DBConnectionError( null, $this->mLastError );
896  } else {
897  $context['db_server'] = $conn->getServer();
898  $this->connLogger->warning(
899  "Connection error: {last_error} ({db_server})",
900  $context
901  );
902 
903  // throws DBConnectionError
904  $conn->reportConnectionError( "{$this->mLastError} ({$context['db_server']})" );
905  }
906  }
907 
908  public function getWriterIndex() {
909  return 0;
910  }
911 
912  public function haveIndex( $i ) {
913  return array_key_exists( $i, $this->mServers );
914  }
915 
916  public function isNonZeroLoad( $i ) {
917  return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0;
918  }
919 
920  public function getServerCount() {
921  return count( $this->mServers );
922  }
923 
924  public function getServerName( $i ) {
925  if ( isset( $this->mServers[$i]['hostName'] ) ) {
926  $name = $this->mServers[$i]['hostName'];
927  } elseif ( isset( $this->mServers[$i]['host'] ) ) {
928  $name = $this->mServers[$i]['host'];
929  } else {
930  $name = '';
931  }
932 
933  return ( $name != '' ) ? $name : 'localhost';
934  }
935 
936  public function getServerInfo( $i ) {
937  if ( isset( $this->mServers[$i] ) ) {
938  return $this->mServers[$i];
939  } else {
940  return false;
941  }
942  }
943 
944  public function setServerInfo( $i, array $serverInfo ) {
945  $this->mServers[$i] = $serverInfo;
946  }
947 
948  public function getMasterPos() {
949  # If this entire request was served from a replica DB without opening a connection to the
950  # master (however unlikely that may be), then we can fetch the position from the replica DB.
951  $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
952  if ( !$masterConn ) {
953  $serverCount = count( $this->mServers );
954  for ( $i = 1; $i < $serverCount; $i++ ) {
955  $conn = $this->getAnyOpenConnection( $i );
956  if ( $conn ) {
957  return $conn->getReplicaPos();
958  }
959  }
960  } else {
961  return $masterConn->getMasterPos();
962  }
963 
964  return false;
965  }
966 
967  public function disable() {
968  $this->closeAll();
969  $this->disabled = true;
970  }
971 
972  public function closeAll() {
973  $this->forEachOpenConnection( function ( IDatabase $conn ) {
974  $host = $conn->getServer();
975  $this->connLogger->debug( "Closing connection to database '$host'." );
976  $conn->close();
977  } );
978 
979  $this->mConns = [
980  'local' => [],
981  'foreignFree' => [],
982  'foreignUsed' => [],
983  ];
984  $this->connsOpened = 0;
985  }
986 
987  public function closeConnection( IDatabase $conn ) {
988  $serverIndex = $conn->getLBInfo( 'serverIndex' ); // second index level of mConns
989  foreach ( $this->mConns as $type => $connsByServer ) {
990  if ( !isset( $connsByServer[$serverIndex] ) ) {
991  continue;
992  }
993 
994  foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
995  if ( $conn === $trackedConn ) {
996  $host = $this->getServerName( $i );
997  $this->connLogger->debug( "Closing connection to database $i at '$host'." );
998  unset( $this->mConns[$type][$serverIndex][$i] );
1000  break 2;
1001  }
1002  }
1003  }
1004 
1005  $conn->close();
1006  }
1007 
1008  public function commitAll( $fname = __METHOD__ ) {
1009  $failures = [];
1010 
1011  $restore = ( $this->trxRoundId !== false );
1012  $this->trxRoundId = false;
1013  $this->forEachOpenConnection(
1014  function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
1015  try {
1016  $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1017  } catch ( DBError $e ) {
1018  call_user_func( $this->errorLogger, $e );
1019  $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1020  }
1021  if ( $restore && $conn->getLBInfo( 'master' ) ) {
1022  $this->undoTransactionRoundFlags( $conn );
1023  }
1024  }
1025  );
1026 
1027  if ( $failures ) {
1028  throw new DBExpectedError(
1029  null,
1030  "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1031  );
1032  }
1033  }
1034 
1035  public function finalizeMasterChanges() {
1036  $this->forEachOpenMasterConnection( function ( Database $conn ) {
1037  // Any error should cause all DB transactions to be rolled back together
1038  $conn->setTrxEndCallbackSuppression( false );
1040  // Defer post-commit callbacks until COMMIT finishes for all DBs
1041  $conn->setTrxEndCallbackSuppression( true );
1042  } );
1043  }
1044 
1045  public function approveMasterChanges( array $options ) {
1046  $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
1047  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
1048  // If atomic sections or explicit transactions are still open, some caller must have
1049  // caught an exception but failed to properly rollback any changes. Detect that and
1050  // throw and error (causing rollback).
1051  if ( $conn->explicitTrxActive() ) {
1052  throw new DBTransactionError(
1053  $conn,
1054  "Explicit transaction still active. A caller may have caught an error."
1055  );
1056  }
1057  // Assert that the time to replicate the transaction will be sane.
1058  // If this fails, then all DB transactions will be rollback back together.
1059  $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1060  if ( $limit > 0 && $time > $limit ) {
1061  throw new DBTransactionSizeError(
1062  $conn,
1063  "Transaction spent $time second(s) in writes, exceeding the $limit limit.",
1064  [ $time, $limit ]
1065  );
1066  }
1067  // If a connection sits idle while slow queries execute on another, that connection
1068  // may end up dropped before the commit round is reached. Ping servers to detect this.
1069  if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1070  throw new DBTransactionError(
1071  $conn,
1072  "A connection to the {$conn->getDBname()} database was lost before commit."
1073  );
1074  }
1075  } );
1076  }
1077 
1078  public function beginMasterChanges( $fname = __METHOD__ ) {
1079  if ( $this->trxRoundId !== false ) {
1080  throw new DBTransactionError(
1081  null,
1082  "$fname: Transaction round '{$this->trxRoundId}' already started."
1083  );
1084  }
1085  $this->trxRoundId = $fname;
1086 
1087  $failures = [];
1089  function ( Database $conn ) use ( $fname, &$failures ) {
1090  $conn->setTrxEndCallbackSuppression( true );
1091  try {
1092  $conn->flushSnapshot( $fname );
1093  } catch ( DBError $e ) {
1094  call_user_func( $this->errorLogger, $e );
1095  $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1096  }
1097  $conn->setTrxEndCallbackSuppression( false );
1098  $this->applyTransactionRoundFlags( $conn );
1099  }
1100  );
1101 
1102  if ( $failures ) {
1103  throw new DBExpectedError(
1104  null,
1105  "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
1106  );
1107  }
1108  }
1109 
1110  public function commitMasterChanges( $fname = __METHOD__ ) {
1111  $failures = [];
1112 
1114  $scope = $this->getScopedPHPBehaviorForCommit(); // try to ignore client aborts
1115 
1116  $restore = ( $this->trxRoundId !== false );
1117  $this->trxRoundId = false;
1119  function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
1120  try {
1121  if ( $conn->writesOrCallbacksPending() ) {
1122  $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1123  } elseif ( $restore ) {
1124  $conn->flushSnapshot( $fname );
1125  }
1126  } catch ( DBError $e ) {
1127  call_user_func( $this->errorLogger, $e );
1128  $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1129  }
1130  if ( $restore ) {
1131  $this->undoTransactionRoundFlags( $conn );
1132  }
1133  }
1134  );
1135 
1136  if ( $failures ) {
1137  throw new DBExpectedError(
1138  null,
1139  "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1140  );
1141  }
1142  }
1143 
1144  public function runMasterPostTrxCallbacks( $type ) {
1145  $e = null; // first exception
1146  $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
1147  $conn->setTrxEndCallbackSuppression( false );
1148  if ( $conn->writesOrCallbacksPending() ) {
1149  // This happens if onTransactionIdle() callbacks leave callbacks on *another* DB
1150  // (which finished its callbacks already). Warn and recover in this case. Let the
1151  // callbacks run in the final commitMasterChanges() in LBFactory::shutdown().
1152  $this->queryLogger->error( __METHOD__ . ": found writes/callbacks pending." );
1153  return;
1154  } elseif ( $conn->trxLevel() ) {
1155  // This happens for single-DB setups where DB_REPLICA uses the master DB,
1156  // thus leaving an implicit read-only transaction open at this point. It
1157  // also happens if onTransactionIdle() callbacks leave implicit transactions
1158  // open on *other* DBs (which is slightly improper). Let these COMMIT on the
1159  // next call to commitMasterChanges(), possibly in LBFactory::shutdown().
1160  return;
1161  }
1162  try {
1164  } catch ( Exception $ex ) {
1165  $e = $e ?: $ex;
1166  }
1167  try {
1169  } catch ( Exception $ex ) {
1170  $e = $e ?: $ex;
1171  }
1172  } );
1173 
1174  return $e;
1175  }
1176 
1177  public function rollbackMasterChanges( $fname = __METHOD__ ) {
1178  $restore = ( $this->trxRoundId !== false );
1179  $this->trxRoundId = false;
1181  function ( IDatabase $conn ) use ( $fname, $restore ) {
1182  if ( $conn->writesOrCallbacksPending() ) {
1183  $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1184  }
1185  if ( $restore ) {
1186  $this->undoTransactionRoundFlags( $conn );
1187  }
1188  }
1189  );
1190  }
1191 
1193  $this->forEachOpenMasterConnection( function ( Database $conn ) {
1194  $conn->setTrxEndCallbackSuppression( true );
1195  } );
1196  }
1197 
1201  private function applyTransactionRoundFlags( IDatabase $conn ) {
1202  if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1203  // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1204  // Force DBO_TRX even in CLI mode since a commit round is expected soon.
1205  $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
1206  // If config has explicitly requested DBO_TRX be either on or off by not
1207  // setting DBO_DEFAULT, then respect that. Forcing no transactions is useful
1208  // for things like blob stores (ExternalStore) which want auto-commit mode.
1209  }
1210  }
1211 
1215  private function undoTransactionRoundFlags( IDatabase $conn ) {
1216  if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1217  $conn->restoreFlags( $conn::RESTORE_PRIOR );
1218  }
1219  }
1220 
1221  public function flushReplicaSnapshots( $fname = __METHOD__ ) {
1222  $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) {
1223  $conn->flushSnapshot( __METHOD__ );
1224  } );
1225  }
1226 
1227  public function hasMasterConnection() {
1228  return $this->isOpen( $this->getWriterIndex() );
1229  }
1230 
1231  public function hasMasterChanges() {
1232  $pending = 0;
1233  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$pending ) {
1234  $pending |= $conn->writesOrCallbacksPending();
1235  } );
1236 
1237  return (bool)$pending;
1238  }
1239 
1240  public function lastMasterChangeTimestamp() {
1241  $lastTime = false;
1242  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$lastTime ) {
1243  $lastTime = max( $lastTime, $conn->lastDoneWrites() );
1244  } );
1245 
1246  return $lastTime;
1247  }
1248 
1249  public function hasOrMadeRecentMasterChanges( $age = null ) {
1250  $age = ( $age === null ) ? $this->mWaitTimeout : $age;
1251 
1252  return ( $this->hasMasterChanges()
1253  || $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
1254  }
1255 
1256  public function pendingMasterChangeCallers() {
1257  $fnames = [];
1258  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$fnames ) {
1259  $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1260  } );
1261 
1262  return $fnames;
1263  }
1264 
1265  public function getLaggedReplicaMode( $domain = false ) {
1266  // No-op if there is only one DB (also avoids recursion)
1267  if ( !$this->laggedReplicaMode && $this->getServerCount() > 1 ) {
1268  try {
1269  // See if laggedReplicaMode gets set
1270  $conn = $this->getConnection( self::DB_REPLICA, false, $domain );
1271  $this->reuseConnection( $conn );
1272  } catch ( DBConnectionError $e ) {
1273  // Avoid expensive re-connect attempts and failures
1274  $this->allReplicasDownMode = true;
1275  $this->laggedReplicaMode = true;
1276  }
1277  }
1278 
1279  return $this->laggedReplicaMode;
1280  }
1281 
1287  public function getLaggedSlaveMode( $domain = false ) {
1288  return $this->getLaggedReplicaMode( $domain );
1289  }
1290 
1291  public function laggedReplicaUsed() {
1292  return $this->laggedReplicaMode;
1293  }
1294 
1300  public function laggedSlaveUsed() {
1301  return $this->laggedReplicaUsed();
1302  }
1303 
1304  public function getReadOnlyReason( $domain = false, IDatabase $conn = null ) {
1305  if ( $this->readOnlyReason !== false ) {
1306  return $this->readOnlyReason;
1307  } elseif ( $this->getLaggedReplicaMode( $domain ) ) {
1308  if ( $this->allReplicasDownMode ) {
1309  return 'The database has been automatically locked ' .
1310  'until the replica database servers become available';
1311  } else {
1312  return 'The database has been automatically locked ' .
1313  'while the replica database servers catch up to the master.';
1314  }
1315  } elseif ( $this->masterRunningReadOnly( $domain, $conn ) ) {
1316  return 'The database master is running in read-only mode.';
1317  }
1318 
1319  return false;
1320  }
1321 
1327  private function masterRunningReadOnly( $domain, IDatabase $conn = null ) {
1329  $masterServer = $this->getServerName( $this->getWriterIndex() );
1330 
1331  return (bool)$cache->getWithSetCallback(
1332  $cache->makeGlobalKey( __CLASS__, 'server-read-only', $masterServer ),
1333  self::TTL_CACHE_READONLY,
1334  function () use ( $domain, $conn ) {
1335  $old = $this->trxProfiler->setSilenced( true );
1336  try {
1337  $dbw = $conn ?: $this->getConnection( self::DB_MASTER, [], $domain );
1338  $readOnly = (int)$dbw->serverIsReadOnly();
1339  if ( !$conn ) {
1340  $this->reuseConnection( $dbw );
1341  }
1342  } catch ( DBError $e ) {
1343  $readOnly = 0;
1344  }
1345  $this->trxProfiler->setSilenced( $old );
1346  return $readOnly;
1347  },
1348  [ 'pcTTL' => $cache::TTL_PROC_LONG, 'busyValue' => 0 ]
1349  );
1350  }
1351 
1352  public function allowLagged( $mode = null ) {
1353  if ( $mode === null ) {
1354  return $this->mAllowLagged;
1355  }
1356  $this->mAllowLagged = $mode;
1357 
1358  return $this->mAllowLagged;
1359  }
1360 
1361  public function pingAll() {
1362  $success = true;
1363  $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$success ) {
1364  if ( !$conn->ping() ) {
1365  $success = false;
1366  }
1367  } );
1368 
1369  return $success;
1370  }
1371 
1372  public function forEachOpenConnection( $callback, array $params = [] ) {
1373  foreach ( $this->mConns as $connsByServer ) {
1374  foreach ( $connsByServer as $serverConns ) {
1375  foreach ( $serverConns as $conn ) {
1376  $mergedParams = array_merge( [ $conn ], $params );
1377  call_user_func_array( $callback, $mergedParams );
1378  }
1379  }
1380  }
1381  }
1382 
1383  public function forEachOpenMasterConnection( $callback, array $params = [] ) {
1384  $masterIndex = $this->getWriterIndex();
1385  foreach ( $this->mConns as $connsByServer ) {
1386  if ( isset( $connsByServer[$masterIndex] ) ) {
1388  foreach ( $connsByServer[$masterIndex] as $conn ) {
1389  $mergedParams = array_merge( [ $conn ], $params );
1390  call_user_func_array( $callback, $mergedParams );
1391  }
1392  }
1393  }
1394  }
1395 
1396  public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
1397  foreach ( $this->mConns as $connsByServer ) {
1398  foreach ( $connsByServer as $i => $serverConns ) {
1399  if ( $i === $this->getWriterIndex() ) {
1400  continue; // skip master
1401  }
1402  foreach ( $serverConns as $conn ) {
1403  $mergedParams = array_merge( [ $conn ], $params );
1404  call_user_func_array( $callback, $mergedParams );
1405  }
1406  }
1407  }
1408  }
1409 
1410  public function getMaxLag( $domain = false ) {
1411  $maxLag = -1;
1412  $host = '';
1413  $maxIndex = 0;
1414 
1415  if ( $this->getServerCount() <= 1 ) {
1416  return [ $host, $maxLag, $maxIndex ]; // no replication = no lag
1417  }
1418 
1419  $lagTimes = $this->getLagTimes( $domain );
1420  foreach ( $lagTimes as $i => $lag ) {
1421  if ( $this->mLoads[$i] > 0 && $lag > $maxLag ) {
1422  $maxLag = $lag;
1423  $host = $this->mServers[$i]['host'];
1424  $maxIndex = $i;
1425  }
1426  }
1427 
1428  return [ $host, $maxLag, $maxIndex ];
1429  }
1430 
1431  public function getLagTimes( $domain = false ) {
1432  if ( $this->getServerCount() <= 1 ) {
1433  return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
1434  }
1435 
1436  $knownLagTimes = []; // map of (server index => 0 seconds)
1437  $indexesWithLag = [];
1438  foreach ( $this->mServers as $i => $server ) {
1439  if ( empty( $server['is static'] ) ) {
1440  $indexesWithLag[] = $i; // DB server might have replication lag
1441  } else {
1442  $knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
1443  }
1444  }
1445 
1446  return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
1447  }
1448 
1449  public function safeGetLag( IDatabase $conn ) {
1450  if ( $this->getServerCount() <= 1 ) {
1451  return 0;
1452  } else {
1453  return $conn->getLag();
1454  }
1455  }
1456 
1462  public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = 10 ) {
1463  if ( $this->getServerCount() <= 1 || !$conn->getLBInfo( 'replica' ) ) {
1464  return true; // server is not a replica DB
1465  }
1466 
1467  if ( !$pos ) {
1468  // Get the current master position, opening a connection if needed
1469  $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1470  if ( $masterConn ) {
1471  $pos = $masterConn->getMasterPos();
1472  } else {
1473  $masterConn = $this->openConnection( $this->getWriterIndex(), self::DOMAIN_ANY );
1474  $pos = $masterConn->getMasterPos();
1475  $this->closeConnection( $masterConn );
1476  }
1477  }
1478 
1479  if ( $pos instanceof DBMasterPos ) {
1480  $result = $conn->masterPosWait( $pos, $timeout );
1481  if ( $result == -1 || is_null( $result ) ) {
1482  $msg = __METHOD__ . ": Timed out waiting on {$conn->getServer()} pos {$pos}";
1483  $this->replLogger->warning( "$msg" );
1484  $ok = false;
1485  } else {
1486  $this->replLogger->info( __METHOD__ . ": Done" );
1487  $ok = true;
1488  }
1489  } else {
1490  $ok = false; // something is misconfigured
1491  $this->replLogger->error( "Could not get master pos for {$conn->getServer()}." );
1492  }
1493 
1494  return $ok;
1495  }
1496 
1497  public function setTransactionListener( $name, callable $callback = null ) {
1498  if ( $callback ) {
1499  $this->trxRecurringCallbacks[$name] = $callback;
1500  } else {
1501  unset( $this->trxRecurringCallbacks[$name] );
1502  }
1504  function ( IDatabase $conn ) use ( $name, $callback ) {
1505  $conn->setTransactionListener( $name, $callback );
1506  }
1507  );
1508  }
1509 
1510  public function setTableAliases( array $aliases ) {
1511  $this->tableAliases = $aliases;
1512  }
1513 
1514  public function setDomainPrefix( $prefix ) {
1515  if ( $this->mConns['foreignUsed'] ) {
1516  // Do not switch connections to explicit foreign domains unless marked as free
1517  $domains = [];
1518  foreach ( $this->mConns['foreignUsed'] as $i => $connsByDomain ) {
1519  $domains = array_merge( $domains, array_keys( $connsByDomain ) );
1520  }
1521  $domains = implode( ', ', $domains );
1522  throw new DBUnexpectedError( null,
1523  "Foreign domain connections are still in use ($domains)." );
1524  }
1525 
1526  $this->localDomain = new DatabaseDomain(
1527  $this->localDomain->getDatabase(),
1528  null,
1529  $prefix
1530  );
1531 
1532  $this->forEachOpenConnection( function ( IDatabase $db ) use ( $prefix ) {
1533  $db->tablePrefix( $prefix );
1534  } );
1535  }
1536 
1543  final protected function getScopedPHPBehaviorForCommit() {
1544  if ( PHP_SAPI != 'cli' ) { // http://bugs.php.net/bug.php?id=47540
1545  $old = ignore_user_abort( true ); // avoid half-finished operations
1546  return new ScopedCallback( function () use ( $old ) {
1547  ignore_user_abort( $old );
1548  } );
1549  }
1550 
1551  return null;
1552  }
1553 
1554  function __destruct() {
1555  // Avoid connection leaks for sanity
1556  $this->disable();
1557  }
1558 }
explicitTrxActive()
lastDoneWrites()
Returns the last time the connection may have been used for write queries.
commitAll($fname=__METHOD__)
Commit transactions on all open connections.
array[] $trxRecurringCallbacks
Map of (name => callable)
Database error base class.
Definition: DBError.php:26
the array() calling protocol came about after MediaWiki 1.4rc1.
safeGetLag(IDatabase $conn)
Get the lag in seconds for a given connection, or zero if this load balancer does not have replicatio...
integer $mWaitTimeout
Seconds to spend waiting on replica DB lag to resolve.
getAnyOpenConnection($i)
static newUnspecified()
writesOrCallbacksPending()
Returns true if there is a transaction open with possible write queries or transaction pre-commit/idl...
Definition: Database.php:526
bool IDatabase $mErrorConnection
Database connection that caused a problem.
$context
Definition: load.php:50
static factory($dbType, $p=[])
Construct a Database subclass instance given a database type and parameters.
Definition: Database.php:325
$success
trxLevel()
Gets the current transaction level.
Definition: Database.php:440
reuseConnection($conn)
Mark a foreign connection as being available for reuse under a different DB name or prefix...
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:189
ILoadMonitor $loadMonitor
getServerCount()
Get the number of defined servers (not the number of open connections)
array[] $mServers
Map of (server index => server config array)
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
getLoadMonitor()
Get a LoadMonitor instance.
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException'returning false will NOT prevent logging $e
Definition: hooks.txt:2102
rollbackMasterChanges($fname=__METHOD__)
Issue ROLLBACK only on master, only if queries were done on connection.
LoggerInterface $replLogger
runTransactionListenerCallbacks($trigger)
Actually run any "transaction listener" callbacks.
Definition: Database.php:2629
close()
Closes a database connection.
finalizeMasterChanges()
Perform all pre-commit callbacks that remain part of the atomic transactions and disable any post-com...
object string $profiler
Class name or object With profileIn/profileOut methods.
setTableAliases(array $aliases)
Make certain table names use their own database, schema, and table prefix when passed into SQL querie...
reallyOpenConnection(array $server, $dbNameOverride=false)
Really opens a connection.
An object representing a master or replica DB position in a replicated setup.
Definition: DBMasterPos.php:7
bool $cliMode
Whether this PHP instance is for a CLI script.
getReaderIndex($group=false, $domain=false)
Get the index of the reader connection, which may be a replica DB This takes into account load ratios...
openConnection($i, $domain=false)
array[] $mGroupLoads
Map of (group => server index => weight)
float[] $mLoads
Map of (server index => weight)
bool $allReplicasDownMode
Whether the generic reader fell back to a lagged replica DB.
const DB_MASTER
Definition: defines.php:23
forEachOpenConnection($callback, array $params=[])
Call a function with each open connection object.
getLazyConnectionRef($db, $groups=[], $domain=false)
Get a database connection handle reference without connecting yet.
masterPosWait(DBMasterPos $pos, $timeout)
Wait for the replica DB to catch up to a given master position.
boolean $disabled
__construct(array $params)
Construct a manager of IDatabase connection objects.
The index of the header message $result[1]=The index of the body text message $result[2 through n]=Parameters passed to body text message.Please note the header message cannot receive/use parameters. 'ImportHandleLogItemXMLTag':When parsing a XML tag in a log item.Return false to stop further processing of the tag $reader:XMLReader object $logInfo:Array of information 'ImportHandlePageXMLTag':When parsing a XML tag in a page.Return false to stop further processing of the tag $reader:XMLReader object &$pageInfo:Array of information 'ImportHandleRevisionXMLTag':When parsing a XML tag in a page revision.Return false to stop further processing of the tag $reader:XMLReader object $pageInfo:Array of page information $revisionInfo:Array of revision information 'ImportHandleToplevelXMLTag':When parsing a top level XML tag.Return false to stop further processing of the tag $reader:XMLReader object 'ImportHandleUploadXMLTag':When parsing a XML tag in a file upload.Return false to stop further processing of the tag $reader:XMLReader object $revisionInfo:Array of information 'ImportLogInterwikiLink':Hook to change the interwiki link used in log entries and edit summaries for transwiki imports.&$fullInterwikiPrefix:Interwiki prefix, may contain colons.&$pageTitle:String that contains page title. 'ImportSources':Called when reading from the $wgImportSources configuration variable.Can be used to lazy-load the import sources list.&$importSources:The value of $wgImportSources.Modify as necessary.See the comment in DefaultSettings.php for the detail of how to structure this array. 'InfoAction':When building information to display on the action=info page.$context:IContextSource object &$pageInfo:Array of information 'InitializeArticleMaybeRedirect':MediaWiki check to see if title is a redirect.&$title:Title object for the current page &$request:WebRequest &$ignoreRedirect:boolean to skip redirect check &$target:Title/string of redirect target &$article:Article object 'InternalParseBeforeLinks':during Parser's internalParse method before links but after nowiki/noinclude/includeonly/onlyinclude and other processings.&$parser:Parser object &$text:string containing partially parsed text &$stripState:Parser's internal StripState object 'InternalParseBeforeSanitize':during Parser's internalParse method just before the parser removes unwanted/dangerous HTML tags and after nowiki/noinclude/includeonly/onlyinclude and other processings.Ideal for syntax-extensions after template/parser function execution which respect nowiki and HTML-comments.&$parser:Parser object &$text:string containing partially parsed text &$stripState:Parser's internal StripState object 'InterwikiLoadPrefix':When resolving if a given prefix is an interwiki or not.Return true without providing an interwiki to continue interwiki search.$prefix:interwiki prefix we are looking for.&$iwData:output array describing the interwiki with keys iw_url, iw_local, iw_trans and optionally iw_api and iw_wikiid. 'InvalidateEmailComplete':Called after a user's email has been invalidated successfully.$user:user(object) whose email is being invalidated 'IRCLineURL':When constructing the URL to use in an IRC notification.Callee may modify $url and $query, URL will be constructed as $url.$query &$url:URL to index.php &$query:Query string $rc:RecentChange object that triggered url generation 'IsFileCacheable':Override the result of Article::isFileCacheable()(if true) &$article:article(object) being checked 'IsTrustedProxy':Override the result of IP::isTrustedProxy() &$ip:IP being check &$result:Change this value to override the result of IP::isTrustedProxy() 'IsUploadAllowedFromUrl':Override the result of UploadFromUrl::isAllowedUrl() $url:URL used to upload from &$allowed:Boolean indicating if uploading is allowed for given URL 'isValidEmailAddr':Override the result of Sanitizer::validateEmail(), for instance to return false if the domain name doesn't match your organization.$addr:The e-mail address entered by the user &$result:Set this and return false to override the internal checks 'isValidPassword':Override the result of User::isValidPassword() $password:The password entered by the user &$result:Set this and return false to override the internal checks $user:User the password is being validated for 'Language::getMessagesFileName':$code:The language code or the language we're looking for a messages file for &$file:The messages file path, you can override this to change the location. 'LanguageGetMagic':DEPRECATED!Use $magicWords in a file listed in $wgExtensionMessagesFiles instead.Use this to define synonyms of magic words depending of the language &$magicExtensions:associative array of magic words synonyms $lang:language code(string) 'LanguageGetNamespaces':Provide custom ordering for namespaces or remove namespaces.Do not use this hook to add namespaces.Use CanonicalNamespaces for that.&$namespaces:Array of namespaces indexed by their numbers 'LanguageGetSpecialPageAliases':DEPRECATED!Use $specialPageAliases in a file listed in $wgExtensionMessagesFiles instead.Use to define aliases of special pages names depending of the language &$specialPageAliases:associative array of magic words synonyms $lang:language code(string) 'LanguageGetTranslatedLanguageNames':Provide translated language names.&$names:array of language code=> language name $code:language of the preferred translations 'LanguageLinks':Manipulate a page's language links.This is called in various places to allow extensions to define the effective language links for a page.$title:The page's Title.&$links:Associative array mapping language codes to prefixed links of the form"language:title".&$linkFlags:Associative array mapping prefixed links to arrays of flags.Currently unused, but planned to provide support for marking individual language links in the UI, e.g.for featured articles. 'LanguageSelector':Hook to change the language selector available on a page.$out:The output page.$cssClassName:CSS class name of the language selector. 'LinkBegin':DEPRECATED!Use HtmlPageLinkRendererBegin instead.Used when generating internal and interwiki links in Linker::link(), before processing starts.Return false to skip default processing and return $ret.See documentation for Linker::link() for details on the expected meanings of parameters.$skin:the Skin object $target:the Title that the link is pointing to &$html:the contents that the< a > tag should have(raw HTML) $result
Definition: hooks.txt:1934
doWait($index, $open=false, $timeout=null)
Wait for a given replica DB to catch up to the master pos stored in $this.
getLagTimes($domain=false)
Get an estimate of replication lag (in seconds) for each server.
Class to handle database/prefix specification for IDatabase domains.
BagOStuff $srvCache
WANObjectCache $wanCache
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
getRandomNonLagged(array $loads, $domain=false, $maxLag=INF)
getServerInfo($i)
Return the server info structure for a given index, or false if the index is invalid.
array $loadMonitorConfig
The LoadMonitor configuration.
string $mLastError
The last DB selection or connection error.
setServerInfo($i, array $serverInfo)
Sets the server info structure for the given index.
Base class for the more common types of database errors.
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any and then calling but I prefer the flexibility This should also do the output encoding The system allocates a global one in $wgOut Title Represents the title of an and does all the work of translating among various forms such as plain database key
Definition: design.txt:25
setFlag($flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
getReadOnlyReason($domain=false, IDatabase $conn=null)
getLag()
Get replica DB lag.
Database cluster connection, tracking, load balancing, and transaction manager interface.
Helper class that detects high-contention DB queries via profiling calls.
this hook is for auditing only RecentChangesLinked and Watchlist RecentChangesLinked and Watchlist e g Watchlist removed from all revisions and log entries to which it was applied This gives extensions a chance to take it off their books as the deletion has already been partly carried out by this point or something similar the user will be unable to create the tag set and then return false from the hook function Ensure you consume the ChangeTagAfterDelete hook to carry out custom deletion actions as context called by AbstractContent::getParserOutput May be used to override the normal model specific rendering of page content as context as context $options
Definition: hooks.txt:1046
waitForOne($pos, $timeout=null)
Set the master wait position and wait for a "generic" replica DB to catch up to it.
allowLagged($mode=null)
Disables/enables lag checks.
hasMasterChanges()
Determine if there are pending changes in a transaction by this thread.
getScopedPHPBehaviorForCommit()
Make PHP ignore user aborts/disconnects until the returned value leaves scope.
Exception class for attempted DB access.
integer $connsOpened
Total connections opened.
setDomainPrefix($prefix)
Set a new table prefix for the existing local domain ID for testing.
$cache
Definition: mcc.php:33
$params
writesOrCallbacksPending()
Returns true if there is a transaction open with possible write queries or transaction pre-commit/idl...
flushReplicaSnapshots($fname=__METHOD__)
Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshot.
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Definition: DBConnRef.php:10
getMaxLag($domain=false)
Get the hostname and lag time of the most-lagged replica DB.
A BagOStuff object with no objects in it.
LoggerInterface $connLogger
disable()
Disable this load balancer.
getFlag($flag)
Returns a boolean whether the flag $flag is set for this connection.
beginMasterChanges($fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
string $agent
Agent name for query profiling.
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
approveMasterChanges(array $options)
Perform all pre-commit checks for things like replication safety.
hasOrMadeRecentMasterChanges($age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
pendingWriteQueryDuration($type=self::ESTIMATE_TOTAL)
Get the time spend running write queries for this transaction.
forEachOpenMasterConnection($callback, array $params=[])
Call a function with each open connection object to a master.
const DBO_TRX
Definition: defines.php:9
commit($fname=__METHOD__, $flush= '')
Commits a transaction previously started using begin().
runOnTransactionIdleCallbacks($trigger)
Actually run and consume any "on transaction idle/resolution" callbacks.
Definition: Database.php:2549
flushSnapshot($fname=__METHOD__)
Commit any transaction but error out if writes or callbacks are pending.
Definition: Database.php:2862
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
const DBO_DEFAULT
Definition: defines.php:10
restoreFlags($state=self::RESTORE_PRIOR)
Restore the flags to their prior state before the last setFlag/clearFlag call.
suppressTransactionEndCallbacks()
Suppress all pending post-COMMIT/ROLLBACK callbacks.
if(!defined( 'MEDIAWIKI')) $fname
This file is not a valid entry point, perform no further processing unless MEDIAWIKI is defined...
Definition: Setup.php:36
setTransactionListener($name, callable $callback=null)
Set a callback via IDatabase::setTransactionListener() on all current and future master connections o...
getLaggedReplicaMode($domain=false)
closeAll()
Close all open connections.
pendingWriteCallers()
Get the list of method names that did write queries for this transaction.
string $localDomainIdAlias
Alternate ID string for the domain instead of DatabaseDomain::getId()
masterRunningReadOnly($domain, IDatabase $conn=null)
getServer()
Get the server hostname or IP address.
getConnection($i, $groups=[], $domain=false)
static newFromId($domain)
LoggerInterface $perfLogger
tablePrefix($prefix=null)
Get/set the table prefix.
closeConnection(IDatabase $conn)
Close a connection.
haveIndex($i)
Returns true if the specified index is a valid server index.
getLBInfo($name=null)
Get properties passed down from the server info array of the load balancer.
this hook is for auditing only RecentChangesLinked and Watchlist RecentChangesLinked and Watchlist e g Watchlist removed from all revisions and log entries to which it was applied This gives extensions a chance to take it off their books as the deletion has already been partly carried out by this point or something similar the user will be unable to create the tag set and then return false from the hook function Ensure you consume the ChangeTagAfterDelete hook to carry out custom deletion actions as context called by AbstractContent::getParserOutput May be used to override the normal model specific rendering of page content as context as context the output can only depend on parameters provided to this hook not on global state indicating whether full HTML should be generated If generation of HTML may be but other information should still be present in the ParserOutput object to manipulate or replace but no entry for that model exists in $wgContentHandlers if desired whether it is OK to use $contentModel on $title Handler functions that modify $ok should generally return false to prevent further hooks from further modifying $ok inclusive $limit
Definition: hooks.txt:1046
setTrxEndCallbackSuppression($suppress)
Whether to disable running of post-COMMIT/ROLLBACK callbacks.
Definition: Database.php:2536
isOpen($index)
Test if the specified index represents an open connection.
string $host
Current server name.
string bool $trxRoundId
String if a requested DBO_TRX transaction round is active.
static pickRandom($weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
Definition: ArrayUtils.php:66
pendingMasterChangeCallers()
Get the list of callers that have pending master changes.
bool $laggedReplicaMode
Whether the generic reader fell back to a lagged replica DB.
isNonZeroLoad($i)
Returns true if the specified index is valid and has non-zero load.
integer $mReadIndex
The generic (not query grouped) replica DB index (of $mServers)
callable $errorLogger
Exception logger.
getMasterPos()
Get the current master position for chronology control purposes.
const DB_REPLICA
Definition: defines.php:22
openForeignConnection($i, $domain)
Open a connection to a foreign DB, or return one if it is already open.
DatabaseDomain $localDomain
Local Domain ID and default for selectDB() calls.
runOnTransactionPreCommitCallbacks()
Actually run and consume any "on transaction pre-commit" callbacks.
Definition: Database.php:2599
string bool $readOnlyReason
Reason the LB is read-only or false if not.
bool $mAllowLagged
Whether to disregard replica DB lag as a factor in replica DB selection.
commitMasterChanges($fname=__METHOD__)
Issue COMMIT on all master connections where writes where done.
rollback($fname=__METHOD__, $flush= '')
Rollback a transaction previously started using begin().
bool DBMasterPos $mWaitForPos
False if not set.
getServerName($i)
Get the host name or IP address of the server with the specified index Prefer a readable name if avai...
ping(&$rtt=null)
Ping the server and try to reconnect if it there is no connection.
getConnectionRef($db, $groups=[], $domain=false)
Get a database connection handle reference.
forEachOpenReplicaConnection($callback, array $params=[])
Call a function with each open replica DB connection object.
getLaggedSlaveMode($domain=false)
undoTransactionRoundFlags(IDatabase $conn)
waitForAll($pos, $timeout=null)
Set the master wait position and wait for ALL replica DBs to catch up to it.
BagOStuff $memCache
IDatabase[][] $mConns
Map of (local/foreignUsed/foreignFree => server index => IDatabase array)
setTransactionListener($name, callable $callback=null)
Run a callback each time any transaction commits or rolls back.
do that in ParserLimitReportFormat instead use this to modify the parameters of the image and a DIV can begin in one section and end in another Make sure your code can handle that case gracefully See the EditSectionClearerLink extension for an example zero but section is usually empty its values are the globals values before the output is cached one of or reset my talk my contributions etc etc otherwise the built in rate limiting checks are if enabled allows for interception of redirect as a string mapping parameter names to values & $type
Definition: hooks.txt:2491
TransactionProfiler $trxProfiler
see documentation in includes Linker php for Linker::makeImageLink & $time
Definition: hooks.txt:1749
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:34
applyTransactionRoundFlags(IDatabase $conn)
LoggerInterface $queryLogger
runMasterPostTrxCallbacks($type)
Issue all pending post-COMMIT/ROLLBACK callbacks.
safeWaitForMasterPos(IDatabase $conn, $pos=false, $timeout=10)
lastMasterChangeTimestamp()
Get the timestamp of the latest write query done by this thread.
flushSnapshot($fname=__METHOD__)
Commit any transaction but error out if writes or callbacks are pending.
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:300