MediaWiki  1.28.0
LoadBalancer.php
Go to the documentation of this file.
1 <?php
25 
31 class LoadBalancer implements ILoadBalancer {
33  private $mServers;
35  private $mConns;
37  private $mLoads;
39  private $mGroupLoads;
41  private $mAllowLagged;
43  private $mWaitTimeout;
47  private $tableAliases = [];
48 
50  private $loadMonitor;
52  private $srvCache;
54  private $memCache;
56  private $wanCache;
58  protected $profiler;
60  protected $trxProfiler;
62  protected $replLogger;
64  protected $connLogger;
66  protected $queryLogger;
68  protected $perfLogger;
69 
73  private $mReadIndex;
75  private $mWaitForPos;
77  private $laggedReplicaMode = false;
79  private $allReplicasDownMode = false;
81  private $mLastError = 'Unknown error';
83  private $readOnlyReason = false;
85  private $connsOpened = 0;
87  private $trxRoundId = false;
89  private $trxRecurringCallbacks = [];
91  private $localDomain;
95  private $host;
97  protected $cliMode;
99  protected $agent;
100 
102  private $errorLogger;
103 
105  private $disabled = false;
106 
108  const CONN_HELD_WARN_THRESHOLD = 10;
109 
111  const MAX_LAG_DEFAULT = 10;
113  const TTL_CACHE_READONLY = 5;
114 
115  public function __construct( array $params ) {
116  if ( !isset( $params['servers'] ) ) {
117  throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
118  }
119  $this->mServers = $params['servers'];
120 
121  $this->localDomain = isset( $params['localDomain'] )
122  ? DatabaseDomain::newFromId( $params['localDomain'] )
124  // In case a caller assumes that the domain ID is simply <db>-<prefix>, which is almost
125  // always true, gracefully handle the case when they fail to account for escaping.
126  if ( $this->localDomain->getTablePrefix() != '' ) {
127  $this->localDomainIdAlias =
128  $this->localDomain->getDatabase() . '-' . $this->localDomain->getTablePrefix();
129  } else {
130  $this->localDomainIdAlias = $this->localDomain->getDatabase();
131  }
132 
133  $this->mWaitTimeout = isset( $params['waitTimeout'] ) ? $params['waitTimeout'] : 10;
134 
135  $this->mReadIndex = -1;
136  $this->mConns = [
137  'local' => [],
138  'foreignUsed' => [],
139  'foreignFree' => []
140  ];
141  $this->mLoads = [];
142  $this->mWaitForPos = false;
143  $this->mErrorConnection = false;
144  $this->mAllowLagged = false;
145 
146  if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
147  $this->readOnlyReason = $params['readOnlyReason'];
148  }
149 
150  if ( isset( $params['loadMonitor'] ) ) {
151  $this->loadMonitorConfig = $params['loadMonitor'];
152  } else {
153  $this->loadMonitorConfig = [ 'class' => 'LoadMonitorNull' ];
154  }
155 
156  foreach ( $params['servers'] as $i => $server ) {
157  $this->mLoads[$i] = $server['load'];
158  if ( isset( $server['groupLoads'] ) ) {
159  foreach ( $server['groupLoads'] as $group => $ratio ) {
160  if ( !isset( $this->mGroupLoads[$group] ) ) {
161  $this->mGroupLoads[$group] = [];
162  }
163  $this->mGroupLoads[$group][$i] = $ratio;
164  }
165  }
166  }
167 
168  if ( isset( $params['srvCache'] ) ) {
169  $this->srvCache = $params['srvCache'];
170  } else {
171  $this->srvCache = new EmptyBagOStuff();
172  }
173  if ( isset( $params['memCache'] ) ) {
174  $this->memCache = $params['memCache'];
175  } else {
176  $this->memCache = new EmptyBagOStuff();
177  }
178  if ( isset( $params['wanCache'] ) ) {
179  $this->wanCache = $params['wanCache'];
180  } else {
181  $this->wanCache = WANObjectCache::newEmpty();
182  }
183  $this->profiler = isset( $params['profiler'] ) ? $params['profiler'] : null;
184  if ( isset( $params['trxProfiler'] ) ) {
185  $this->trxProfiler = $params['trxProfiler'];
186  } else {
187  $this->trxProfiler = new TransactionProfiler();
188  }
189 
190  $this->errorLogger = isset( $params['errorLogger'] )
191  ? $params['errorLogger']
192  : function ( Exception $e ) {
193  trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
194  };
195 
196  foreach ( [ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ] as $key ) {
197  $this->$key = isset( $params[$key] ) ? $params[$key] : new \Psr\Log\NullLogger();
198  }
199 
200  $this->host = isset( $params['hostname'] )
201  ? $params['hostname']
202  : ( gethostname() ?: 'unknown' );
203  $this->cliMode = isset( $params['cliMode'] ) ? $params['cliMode'] : PHP_SAPI === 'cli';
204  $this->agent = isset( $params['agent'] ) ? $params['agent'] : '';
205  }
206 
212  private function getLoadMonitor() {
213  if ( !isset( $this->loadMonitor ) ) {
214  $class = $this->loadMonitorConfig['class'];
215  $this->loadMonitor = new $class(
216  $this, $this->srvCache, $this->memCache, $this->loadMonitorConfig );
217  $this->loadMonitor->setLogger( $this->replLogger );
218  }
219 
220  return $this->loadMonitor;
221  }
222 
229  private function getRandomNonLagged( array $loads, $domain = false, $maxLag = INF ) {
230  $lags = $this->getLagTimes( $domain );
231 
232  # Unset excessively lagged servers
233  foreach ( $lags as $i => $lag ) {
234  if ( $i != 0 ) {
235  # How much lag this server nominally is allowed to have
236  $maxServerLag = isset( $this->mServers[$i]['max lag'] )
237  ? $this->mServers[$i]['max lag']
238  : self::MAX_LAG_DEFAULT; // default
239  # Constrain that futher by $maxLag argument
240  $maxServerLag = min( $maxServerLag, $maxLag );
241 
242  $host = $this->getServerName( $i );
243  if ( $lag === false && !is_infinite( $maxServerLag ) ) {
244  $this->replLogger->error( "Server $host (#$i) is not replicating?" );
245  unset( $loads[$i] );
246  } elseif ( $lag > $maxServerLag ) {
247  $this->replLogger->warning( "Server $host (#$i) has >= $lag seconds of lag" );
248  unset( $loads[$i] );
249  }
250  }
251  }
252 
253  # Find out if all the replica DBs with non-zero load are lagged
254  $sum = 0;
255  foreach ( $loads as $load ) {
256  $sum += $load;
257  }
258  if ( $sum == 0 ) {
259  # No appropriate DB servers except maybe the master and some replica DBs with zero load
260  # Do NOT use the master
261  # Instead, this function will return false, triggering read-only mode,
262  # and a lagged replica DB will be used instead.
263  return false;
264  }
265 
266  if ( count( $loads ) == 0 ) {
267  return false;
268  }
269 
270  # Return a random representative of the remainder
271  return ArrayUtils::pickRandom( $loads );
272  }
273 
274  public function getReaderIndex( $group = false, $domain = false ) {
275  if ( count( $this->mServers ) == 1 ) {
276  # Skip the load balancing if there's only one server
277  return $this->getWriterIndex();
278  } elseif ( $group === false && $this->mReadIndex >= 0 ) {
279  # Shortcut if generic reader exists already
280  return $this->mReadIndex;
281  }
282 
283  # Find the relevant load array
284  if ( $group !== false ) {
285  if ( isset( $this->mGroupLoads[$group] ) ) {
286  $nonErrorLoads = $this->mGroupLoads[$group];
287  } else {
288  # No loads for this group, return false and the caller can use some other group
289  $this->connLogger->info( __METHOD__ . ": no loads for group $group" );
290 
291  return false;
292  }
293  } else {
294  $nonErrorLoads = $this->mLoads;
295  }
296 
297  if ( !count( $nonErrorLoads ) ) {
298  throw new InvalidArgumentException( "Empty server array given to LoadBalancer" );
299  }
300 
301  # Scale the configured load ratios according to the dynamic load if supported
302  $this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $domain );
303 
304  $laggedReplicaMode = false;
305 
306  # No server found yet
307  $i = false;
308  # First try quickly looking through the available servers for a server that
309  # meets our criteria
310  $currentLoads = $nonErrorLoads;
311  while ( count( $currentLoads ) ) {
312  if ( $this->mAllowLagged || $laggedReplicaMode ) {
313  $i = ArrayUtils::pickRandom( $currentLoads );
314  } else {
315  $i = false;
316  if ( $this->mWaitForPos && $this->mWaitForPos->asOfTime() ) {
317  # ChronologyProtecter causes mWaitForPos to be set via sessions.
318  # This triggers doWait() after connect, so it's especially good to
319  # avoid lagged servers so as to avoid just blocking in that method.
320  $ago = microtime( true ) - $this->mWaitForPos->asOfTime();
321  # Aim for <= 1 second of waiting (being too picky can backfire)
322  $i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
323  }
324  if ( $i === false ) {
325  # Any server with less lag than it's 'max lag' param is preferable
326  $i = $this->getRandomNonLagged( $currentLoads, $domain );
327  }
328  if ( $i === false && count( $currentLoads ) != 0 ) {
329  # All replica DBs lagged. Switch to read-only mode
330  $this->replLogger->error( "All replica DBs lagged. Switch to read-only mode" );
331  $i = ArrayUtils::pickRandom( $currentLoads );
332  $laggedReplicaMode = true;
333  }
334  }
335 
336  if ( $i === false ) {
337  # pickRandom() returned false
338  # This is permanent and means the configuration or the load monitor
339  # wants us to return false.
340  $this->connLogger->debug( __METHOD__ . ": pickRandom() returned false" );
341 
342  return false;
343  }
344 
345  $serverName = $this->getServerName( $i );
346  $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
347 
348  $conn = $this->openConnection( $i, $domain );
349  if ( !$conn ) {
350  $this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" );
351  unset( $nonErrorLoads[$i] );
352  unset( $currentLoads[$i] );
353  $i = false;
354  continue;
355  }
356 
357  // Decrement reference counter, we are finished with this connection.
358  // It will be incremented for the caller later.
359  if ( $domain !== false ) {
360  $this->reuseConnection( $conn );
361  }
362 
363  # Return this server
364  break;
365  }
366 
367  # If all servers were down, quit now
368  if ( !count( $nonErrorLoads ) ) {
369  $this->connLogger->error( "All servers down" );
370  }
371 
372  if ( $i !== false ) {
373  # Replica DB connection successful.
374  # Wait for the session master pos for a short time.
375  if ( $this->mWaitForPos && $i > 0 ) {
376  $this->doWait( $i );
377  }
378  if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group === false ) {
379  $this->mReadIndex = $i;
380  # Record if the generic reader index is in "lagged replica DB" mode
381  if ( $laggedReplicaMode ) {
382  $this->laggedReplicaMode = true;
383  }
384  }
385  $serverName = $this->getServerName( $i );
386  $this->connLogger->debug(
387  __METHOD__ . ": using server $serverName for group '$group'" );
388  }
389 
390  return $i;
391  }
392 
393  public function waitFor( $pos ) {
394  $this->mWaitForPos = $pos;
395  $i = $this->mReadIndex;
396 
397  if ( $i > 0 ) {
398  if ( !$this->doWait( $i ) ) {
399  $this->laggedReplicaMode = true;
400  }
401  }
402  }
403 
404  public function waitForOne( $pos, $timeout = null ) {
405  $this->mWaitForPos = $pos;
406 
407  $i = $this->mReadIndex;
408  if ( $i <= 0 ) {
409  // Pick a generic replica DB if there isn't one yet
410  $readLoads = $this->mLoads;
411  unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
412  $readLoads = array_filter( $readLoads ); // with non-zero load
413  $i = ArrayUtils::pickRandom( $readLoads );
414  }
415 
416  if ( $i > 0 ) {
417  $ok = $this->doWait( $i, true, $timeout );
418  } else {
419  $ok = true; // no applicable loads
420  }
421 
422  return $ok;
423  }
424 
425  public function waitForAll( $pos, $timeout = null ) {
426  $this->mWaitForPos = $pos;
427  $serverCount = count( $this->mServers );
428 
429  $ok = true;
430  for ( $i = 1; $i < $serverCount; $i++ ) {
431  if ( $this->mLoads[$i] > 0 ) {
432  $ok = $this->doWait( $i, true, $timeout ) && $ok;
433  }
434  }
435 
436  return $ok;
437  }
438 
439  public function getAnyOpenConnection( $i ) {
440  foreach ( $this->mConns as $connsByServer ) {
441  if ( !empty( $connsByServer[$i] ) ) {
442  return reset( $connsByServer[$i] );
443  }
444  }
445 
446  return false;
447  }
448 
456  protected function doWait( $index, $open = false, $timeout = null ) {
457  $close = false; // close the connection afterwards
458 
459  // Check if we already know that the DB has reached this point
460  $server = $this->getServerName( $index );
461  $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server );
463  $knownReachedPos = $this->srvCache->get( $key );
464  if ( $knownReachedPos && $knownReachedPos->hasReached( $this->mWaitForPos ) ) {
465  $this->replLogger->debug( __METHOD__ .
466  ": replica DB $server known to be caught up (pos >= $knownReachedPos)." );
467  return true;
468  }
469 
470  // Find a connection to wait on, creating one if needed and allowed
471  $conn = $this->getAnyOpenConnection( $index );
472  if ( !$conn ) {
473  if ( !$open ) {
474  $this->replLogger->debug( __METHOD__ . ": no connection open for $server" );
475 
476  return false;
477  } else {
478  $conn = $this->openConnection( $index, self::DOMAIN_ANY );
479  if ( !$conn ) {
480  $this->replLogger->warning( __METHOD__ . ": failed to connect to $server" );
481 
482  return false;
483  }
484  // Avoid connection spam in waitForAll() when connections
485  // are made just for the sake of doing this lag check.
486  $close = true;
487  }
488  }
489 
490  $this->replLogger->info( __METHOD__ . ": Waiting for replica DB $server to catch up..." );
491  $timeout = $timeout ?: $this->mWaitTimeout;
492  $result = $conn->masterPosWait( $this->mWaitForPos, $timeout );
493 
494  if ( $result == -1 || is_null( $result ) ) {
495  // Timed out waiting for replica DB, use master instead
496  $msg = __METHOD__ . ": Timed out waiting on $server pos {$this->mWaitForPos}";
497  $this->replLogger->warning( "$msg" );
498  $ok = false;
499  } else {
500  $this->replLogger->info( __METHOD__ . ": Done" );
501  $ok = true;
502  // Remember that the DB reached this point
503  $this->srvCache->set( $key, $this->mWaitForPos, BagOStuff::TTL_DAY );
504  }
505 
506  if ( $close ) {
507  $this->closeConnection( $conn );
508  }
509 
510  return $ok;
511  }
512 
522  public function getConnection( $i, $groups = [], $domain = false ) {
523  if ( $i === null || $i === false ) {
524  throw new InvalidArgumentException( 'Attempt to call ' . __METHOD__ .
525  ' with invalid server index' );
526  }
527 
528  if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
529  $domain = false; // local connection requested
530  }
531 
532  $groups = ( $groups === false || $groups === [] )
533  ? [ false ] // check one "group": the generic pool
534  : (array)$groups;
535 
536  $masterOnly = ( $i == self::DB_MASTER || $i == $this->getWriterIndex() );
537  $oldConnsOpened = $this->connsOpened; // connections open now
538 
539  if ( $i == self::DB_MASTER ) {
540  $i = $this->getWriterIndex();
541  } else {
542  # Try to find an available server in any the query groups (in order)
543  foreach ( $groups as $group ) {
544  $groupIndex = $this->getReaderIndex( $group, $domain );
545  if ( $groupIndex !== false ) {
546  $i = $groupIndex;
547  break;
548  }
549  }
550  }
551 
552  # Operation-based index
553  if ( $i == self::DB_REPLICA ) {
554  $this->mLastError = 'Unknown error'; // reset error string
555  # Try the general server pool if $groups are unavailable.
556  $i = ( $groups === [ false ] )
557  ? false // don't bother with this if that is what was tried above
558  : $this->getReaderIndex( false, $domain );
559  # Couldn't find a working server in getReaderIndex()?
560  if ( $i === false ) {
561  $this->mLastError = 'No working replica DB server: ' . $this->mLastError;
562  // Throw an exception
563  $this->reportConnectionError();
564  return null; // not reached
565  }
566  }
567 
568  # Now we have an explicit index into the servers array
569  $conn = $this->openConnection( $i, $domain );
570  if ( !$conn ) {
571  // Throw an exception
572  $this->reportConnectionError();
573  return null; // not reached
574  }
575 
576  # Profile any new connections that happen
577  if ( $this->connsOpened > $oldConnsOpened ) {
578  $host = $conn->getServer();
579  $dbname = $conn->getDBname();
580  $this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
581  }
582 
583  if ( $masterOnly ) {
584  # Make master-requested DB handles inherit any read-only mode setting
585  $conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $domain, $conn ) );
586  }
587 
588  return $conn;
589  }
590 
591  public function reuseConnection( $conn ) {
592  $serverIndex = $conn->getLBInfo( 'serverIndex' );
593  $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
594  if ( $serverIndex === null || $refCount === null ) {
605  return;
606  } elseif ( $conn instanceof DBConnRef ) {
607  // DBConnRef already handles calling reuseConnection() and only passes the live
608  // Database instance to this method. Any caller passing in a DBConnRef is broken.
609  $this->connLogger->error( __METHOD__ . ": got DBConnRef instance.\n" .
610  ( new RuntimeException() )->getTraceAsString() );
611 
612  return;
613  }
614 
615  if ( $this->disabled ) {
616  return; // DBConnRef handle probably survived longer than the LoadBalancer
617  }
618 
619  $domain = $conn->getDomainID();
620  if ( !isset( $this->mConns['foreignUsed'][$serverIndex][$domain] ) ) {
621  throw new InvalidArgumentException( __METHOD__ .
622  ": connection $serverIndex/$domain not found; it may have already been freed." );
623  } elseif ( $this->mConns['foreignUsed'][$serverIndex][$domain] !== $conn ) {
624  throw new InvalidArgumentException( __METHOD__ .
625  ": connection $serverIndex/$domain mismatched; it may have already been freed." );
626  }
627  $conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
628  if ( $refCount <= 0 ) {
629  $this->mConns['foreignFree'][$serverIndex][$domain] = $conn;
630  unset( $this->mConns['foreignUsed'][$serverIndex][$domain] );
631  if ( !$this->mConns['foreignUsed'][$serverIndex] ) {
632  unset( $this->mConns[ 'foreignUsed' ][$serverIndex] ); // clean up
633  }
634  $this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
635  } else {
636  $this->connLogger->debug( __METHOD__ .
637  ": reference count for $serverIndex/$domain reduced to $refCount" );
638  }
639  }
640 
641  public function getConnectionRef( $db, $groups = [], $domain = false ) {
642  $domain = ( $domain !== false ) ? $domain : $this->localDomain;
643 
644  return new DBConnRef( $this, $this->getConnection( $db, $groups, $domain ) );
645  }
646 
647  public function getLazyConnectionRef( $db, $groups = [], $domain = false ) {
648  $domain = ( $domain !== false ) ? $domain : $this->localDomain;
649 
650  return new DBConnRef( $this, [ $db, $groups, $domain ] );
651  }
652 
661  public function openConnection( $i, $domain = false ) {
662  if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
663  $domain = false; // local connection requested
664  }
665 
666  if ( $domain !== false ) {
667  $conn = $this->openForeignConnection( $i, $domain );
668  } elseif ( isset( $this->mConns['local'][$i][0] ) ) {
669  $conn = $this->mConns['local'][$i][0];
670  } else {
671  if ( !isset( $this->mServers[$i] ) || !is_array( $this->mServers[$i] ) ) {
672  throw new InvalidArgumentException( "No server with index '$i'." );
673  }
674  // Open a new connection
675  $server = $this->mServers[$i];
676  $server['serverIndex'] = $i;
677  $conn = $this->reallyOpenConnection( $server, false );
678  $serverName = $this->getServerName( $i );
679  if ( $conn->isOpen() ) {
680  $this->connLogger->debug( "Connected to database $i at '$serverName'." );
681  $this->mConns['local'][$i][0] = $conn;
682  } else {
683  $this->connLogger->warning( "Failed to connect to database $i at '$serverName'." );
684  $this->mErrorConnection = $conn;
685  $conn = false;
686  }
687  }
688 
689  if ( $conn && !$conn->isOpen() ) {
690  // Connection was made but later unrecoverably lost for some reason.
691  // Do not return a handle that will just throw exceptions on use,
692  // but let the calling code (e.g. getReaderIndex) try another server.
693  // See DatabaseMyslBase::ping() for how this can happen.
694  $this->mErrorConnection = $conn;
695  $conn = false;
696  }
697 
698  return $conn;
699  }
700 
721  private function openForeignConnection( $i, $domain ) {
722  $domainInstance = DatabaseDomain::newFromId( $domain );
723  $dbName = $domainInstance->getDatabase();
724  $prefix = $domainInstance->getTablePrefix();
725 
726  if ( isset( $this->mConns['foreignUsed'][$i][$domain] ) ) {
727  // Reuse an already-used connection
728  $conn = $this->mConns['foreignUsed'][$i][$domain];
729  $this->connLogger->debug( __METHOD__ . ": reusing connection $i/$domain" );
730  } elseif ( isset( $this->mConns['foreignFree'][$i][$domain] ) ) {
731  // Reuse a free connection for the same domain
732  $conn = $this->mConns['foreignFree'][$i][$domain];
733  unset( $this->mConns['foreignFree'][$i][$domain] );
734  $this->mConns['foreignUsed'][$i][$domain] = $conn;
735  $this->connLogger->debug( __METHOD__ . ": reusing free connection $i/$domain" );
736  } elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) {
737  // Reuse a connection from another domain
738  $conn = reset( $this->mConns['foreignFree'][$i] );
739  $oldDomain = key( $this->mConns['foreignFree'][$i] );
740  // The empty string as a DB name means "don't care".
741  // DatabaseMysqlBase::open() already handle this on connection.
742  if ( strlen( $dbName ) && !$conn->selectDB( $dbName ) ) {
743  $this->mLastError = "Error selecting database '$dbName' on server " .
744  $conn->getServer() . " from client host {$this->host}";
745  $this->mErrorConnection = $conn;
746  $conn = false;
747  } else {
748  $conn->tablePrefix( $prefix );
749  unset( $this->mConns['foreignFree'][$i][$oldDomain] );
750  $this->mConns['foreignUsed'][$i][$domain] = $conn;
751  $this->connLogger->debug( __METHOD__ .
752  ": reusing free connection from $oldDomain for $domain" );
753  }
754  } else {
755  if ( !isset( $this->mServers[$i] ) || !is_array( $this->mServers[$i] ) ) {
756  throw new InvalidArgumentException( "No server with index '$i'." );
757  }
758  // Open a new connection
759  $server = $this->mServers[$i];
760  $server['serverIndex'] = $i;
761  $server['foreignPoolRefCount'] = 0;
762  $server['foreign'] = true;
763  $conn = $this->reallyOpenConnection( $server, $dbName );
764  if ( !$conn->isOpen() ) {
765  $this->connLogger->warning( __METHOD__ . ": connection error for $i/$domain" );
766  $this->mErrorConnection = $conn;
767  $conn = false;
768  } else {
769  $conn->tablePrefix( $prefix );
770  $this->mConns['foreignUsed'][$i][$domain] = $conn;
771  $this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
772  }
773  }
774 
775  // Increment reference count
776  if ( $conn ) {
777  $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
778  $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
779  }
780 
781  return $conn;
782  }
783 
791  private function isOpen( $index ) {
792  if ( !is_integer( $index ) ) {
793  return false;
794  }
795 
796  return (bool)$this->getAnyOpenConnection( $index );
797  }
798 
810  protected function reallyOpenConnection( array $server, $dbNameOverride = false ) {
811  if ( $this->disabled ) {
812  throw new DBAccessError();
813  }
814 
815  if ( $dbNameOverride !== false ) {
816  $server['dbname'] = $dbNameOverride;
817  }
818 
819  // Let the handle know what the cluster master is (e.g. "db1052")
820  $masterName = $this->getServerName( $this->getWriterIndex() );
821  $server['clusterMasterHost'] = $masterName;
822 
823  // Log when many connection are made on requests
824  if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
825  $this->perfLogger->warning( __METHOD__ . ": " .
826  "{$this->connsOpened}+ connections made (master=$masterName)" );
827  }
828 
829  $server['srvCache'] = $this->srvCache;
830  // Set loggers and profilers
831  $server['connLogger'] = $this->connLogger;
832  $server['queryLogger'] = $this->queryLogger;
833  $server['errorLogger'] = $this->errorLogger;
834  $server['profiler'] = $this->profiler;
835  $server['trxProfiler'] = $this->trxProfiler;
836  // Use the same agent and PHP mode for all DB handles
837  $server['cliMode'] = $this->cliMode;
838  $server['agent'] = $this->agent;
839  // Use DBO_DEFAULT flags by default for LoadBalancer managed databases. Assume that the
840  // application calls LoadBalancer::commitMasterChanges() before the PHP script completes.
841  $server['flags'] = isset( $server['flags'] ) ? $server['flags'] : IDatabase::DBO_DEFAULT;
842 
843  // Create a live connection object
844  try {
845  $db = Database::factory( $server['type'], $server );
846  } catch ( DBConnectionError $e ) {
847  // FIXME: This is probably the ugliest thing I have ever done to
848  // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
849  $db = $e->db;
850  }
851 
852  $db->setLBInfo( $server );
853  $db->setLazyMasterHandle(
854  $this->getLazyConnectionRef( self::DB_MASTER, [], $db->getDomainID() )
855  );
856  $db->setTableAliases( $this->tableAliases );
857 
858  if ( $server['serverIndex'] === $this->getWriterIndex() ) {
859  if ( $this->trxRoundId !== false ) {
860  $this->applyTransactionRoundFlags( $db );
861  }
862  foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
863  $db->setTransactionListener( $name, $callback );
864  }
865  }
866 
867  return $db;
868  }
869 
873  private function reportConnectionError() {
874  $conn = $this->mErrorConnection; // the connection which caused the error
875  $context = [
876  'method' => __METHOD__,
877  'last_error' => $this->mLastError,
878  ];
879 
880  if ( !is_object( $conn ) ) {
881  // No last connection, probably due to all servers being too busy
882  $this->connLogger->error(
883  "LB failure with no last connection. Connection error: {last_error}",
884  $context
885  );
886 
887  // If all servers were busy, mLastError will contain something sensible
888  throw new DBConnectionError( null, $this->mLastError );
889  } else {
890  $context['db_server'] = $conn->getServer();
891  $this->connLogger->warning(
892  "Connection error: {last_error} ({db_server})",
893  $context
894  );
895 
896  // throws DBConnectionError
897  $conn->reportConnectionError( "{$this->mLastError} ({$context['db_server']})" );
898  }
899  }
900 
901  public function getWriterIndex() {
902  return 0;
903  }
904 
905  public function haveIndex( $i ) {
906  return array_key_exists( $i, $this->mServers );
907  }
908 
909  public function isNonZeroLoad( $i ) {
910  return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0;
911  }
912 
913  public function getServerCount() {
914  return count( $this->mServers );
915  }
916 
917  public function getServerName( $i ) {
918  if ( isset( $this->mServers[$i]['hostName'] ) ) {
919  $name = $this->mServers[$i]['hostName'];
920  } elseif ( isset( $this->mServers[$i]['host'] ) ) {
921  $name = $this->mServers[$i]['host'];
922  } else {
923  $name = '';
924  }
925 
926  return ( $name != '' ) ? $name : 'localhost';
927  }
928 
929  public function getServerInfo( $i ) {
930  if ( isset( $this->mServers[$i] ) ) {
931  return $this->mServers[$i];
932  } else {
933  return false;
934  }
935  }
936 
937  public function setServerInfo( $i, array $serverInfo ) {
938  $this->mServers[$i] = $serverInfo;
939  }
940 
941  public function getMasterPos() {
942  # If this entire request was served from a replica DB without opening a connection to the
943  # master (however unlikely that may be), then we can fetch the position from the replica DB.
944  $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
945  if ( !$masterConn ) {
946  $serverCount = count( $this->mServers );
947  for ( $i = 1; $i < $serverCount; $i++ ) {
948  $conn = $this->getAnyOpenConnection( $i );
949  if ( $conn ) {
950  return $conn->getReplicaPos();
951  }
952  }
953  } else {
954  return $masterConn->getMasterPos();
955  }
956 
957  return false;
958  }
959 
960  public function disable() {
961  $this->closeAll();
962  $this->disabled = true;
963  }
964 
965  public function closeAll() {
966  $this->forEachOpenConnection( function ( IDatabase $conn ) {
967  $host = $conn->getServer();
968  $this->connLogger->debug( "Closing connection to database '$host'." );
969  $conn->close();
970  } );
971 
972  $this->mConns = [
973  'local' => [],
974  'foreignFree' => [],
975  'foreignUsed' => [],
976  ];
977  $this->connsOpened = 0;
978  }
979 
980  public function closeConnection( IDatabase $conn ) {
981  $serverIndex = $conn->getLBInfo( 'serverIndex' ); // second index level of mConns
982  foreach ( $this->mConns as $type => $connsByServer ) {
983  if ( !isset( $connsByServer[$serverIndex] ) ) {
984  continue;
985  }
986 
987  foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
988  if ( $conn === $trackedConn ) {
989  $host = $this->getServerName( $i );
990  $this->connLogger->debug( "Closing connection to database $i at '$host'." );
991  unset( $this->mConns[$type][$serverIndex][$i] );
993  break 2;
994  }
995  }
996  }
997 
998  $conn->close();
999  }
1000 
1001  public function commitAll( $fname = __METHOD__ ) {
1002  $failures = [];
1003 
1004  $restore = ( $this->trxRoundId !== false );
1005  $this->trxRoundId = false;
1006  $this->forEachOpenConnection(
1007  function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
1008  try {
1009  $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1010  } catch ( DBError $e ) {
1011  call_user_func( $this->errorLogger, $e );
1012  $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1013  }
1014  if ( $restore && $conn->getLBInfo( 'master' ) ) {
1015  $this->undoTransactionRoundFlags( $conn );
1016  }
1017  }
1018  );
1019 
1020  if ( $failures ) {
1021  throw new DBExpectedError(
1022  null,
1023  "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1024  );
1025  }
1026  }
1027 
1028  public function finalizeMasterChanges() {
1029  $this->forEachOpenMasterConnection( function ( Database $conn ) {
1030  // Any error should cause all DB transactions to be rolled back together
1031  $conn->setTrxEndCallbackSuppression( false );
1033  // Defer post-commit callbacks until COMMIT finishes for all DBs
1034  $conn->setTrxEndCallbackSuppression( true );
1035  } );
1036  }
1037 
1038  public function approveMasterChanges( array $options ) {
1039  $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
1040  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
1041  // If atomic sections or explicit transactions are still open, some caller must have
1042  // caught an exception but failed to properly rollback any changes. Detect that and
1043  // throw and error (causing rollback).
1044  if ( $conn->explicitTrxActive() ) {
1045  throw new DBTransactionError(
1046  $conn,
1047  "Explicit transaction still active. A caller may have caught an error."
1048  );
1049  }
1050  // Assert that the time to replicate the transaction will be sane.
1051  // If this fails, then all DB transactions will be rollback back together.
1052  $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1053  if ( $limit > 0 && $time > $limit ) {
1054  throw new DBTransactionSizeError(
1055  $conn,
1056  "Transaction spent $time second(s) in writes, exceeding the $limit limit.",
1057  [ $time, $limit ]
1058  );
1059  }
1060  // If a connection sits idle while slow queries execute on another, that connection
1061  // may end up dropped before the commit round is reached. Ping servers to detect this.
1062  if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1063  throw new DBTransactionError(
1064  $conn,
1065  "A connection to the {$conn->getDBname()} database was lost before commit."
1066  );
1067  }
1068  } );
1069  }
1070 
1071  public function beginMasterChanges( $fname = __METHOD__ ) {
1072  if ( $this->trxRoundId !== false ) {
1073  throw new DBTransactionError(
1074  null,
1075  "$fname: Transaction round '{$this->trxRoundId}' already started."
1076  );
1077  }
1078  $this->trxRoundId = $fname;
1079 
1080  $failures = [];
1082  function ( Database $conn ) use ( $fname, &$failures ) {
1083  $conn->setTrxEndCallbackSuppression( true );
1084  try {
1085  $conn->flushSnapshot( $fname );
1086  } catch ( DBError $e ) {
1087  call_user_func( $this->errorLogger, $e );
1088  $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1089  }
1090  $conn->setTrxEndCallbackSuppression( false );
1091  $this->applyTransactionRoundFlags( $conn );
1092  }
1093  );
1094 
1095  if ( $failures ) {
1096  throw new DBExpectedError(
1097  null,
1098  "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
1099  );
1100  }
1101  }
1102 
1103  public function commitMasterChanges( $fname = __METHOD__ ) {
1104  $failures = [];
1105 
1107  $scope = $this->getScopedPHPBehaviorForCommit(); // try to ignore client aborts
1108 
1109  $restore = ( $this->trxRoundId !== false );
1110  $this->trxRoundId = false;
1112  function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
1113  try {
1114  if ( $conn->writesOrCallbacksPending() ) {
1115  $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1116  } elseif ( $restore ) {
1117  $conn->flushSnapshot( $fname );
1118  }
1119  } catch ( DBError $e ) {
1120  call_user_func( $this->errorLogger, $e );
1121  $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1122  }
1123  if ( $restore ) {
1124  $this->undoTransactionRoundFlags( $conn );
1125  }
1126  }
1127  );
1128 
1129  if ( $failures ) {
1130  throw new DBExpectedError(
1131  null,
1132  "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1133  );
1134  }
1135  }
1136 
1137  public function runMasterPostTrxCallbacks( $type ) {
1138  $e = null; // first exception
1139  $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
1140  $conn->setTrxEndCallbackSuppression( false );
1141  if ( $conn->writesOrCallbacksPending() ) {
1142  // This happens if onTransactionIdle() callbacks leave callbacks on *another* DB
1143  // (which finished its callbacks already). Warn and recover in this case. Let the
1144  // callbacks run in the final commitMasterChanges() in LBFactory::shutdown().
1145  $this->queryLogger->error( __METHOD__ . ": found writes/callbacks pending." );
1146  return;
1147  } elseif ( $conn->trxLevel() ) {
1148  // This happens for single-DB setups where DB_REPLICA uses the master DB,
1149  // thus leaving an implicit read-only transaction open at this point. It
1150  // also happens if onTransactionIdle() callbacks leave implicit transactions
1151  // open on *other* DBs (which is slightly improper). Let these COMMIT on the
1152  // next call to commitMasterChanges(), possibly in LBFactory::shutdown().
1153  return;
1154  }
1155  try {
1157  } catch ( Exception $ex ) {
1158  $e = $e ?: $ex;
1159  }
1160  try {
1162  } catch ( Exception $ex ) {
1163  $e = $e ?: $ex;
1164  }
1165  } );
1166 
1167  return $e;
1168  }
1169 
1170  public function rollbackMasterChanges( $fname = __METHOD__ ) {
1171  $restore = ( $this->trxRoundId !== false );
1172  $this->trxRoundId = false;
1174  function ( IDatabase $conn ) use ( $fname, $restore ) {
1175  if ( $conn->writesOrCallbacksPending() ) {
1176  $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1177  }
1178  if ( $restore ) {
1179  $this->undoTransactionRoundFlags( $conn );
1180  }
1181  }
1182  );
1183  }
1184 
1186  $this->forEachOpenMasterConnection( function ( Database $conn ) {
1187  $conn->setTrxEndCallbackSuppression( true );
1188  } );
1189  }
1190 
1194  private function applyTransactionRoundFlags( IDatabase $conn ) {
1195  if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1196  // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1197  // Force DBO_TRX even in CLI mode since a commit round is expected soon.
1198  $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
1199  // If config has explicitly requested DBO_TRX be either on or off by not
1200  // setting DBO_DEFAULT, then respect that. Forcing no transactions is useful
1201  // for things like blob stores (ExternalStore) which want auto-commit mode.
1202  }
1203  }
1204 
1208  private function undoTransactionRoundFlags( IDatabase $conn ) {
1209  if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1210  $conn->restoreFlags( $conn::RESTORE_PRIOR );
1211  }
1212  }
1213 
1214  public function flushReplicaSnapshots( $fname = __METHOD__ ) {
1215  $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) {
1216  $conn->flushSnapshot( __METHOD__ );
1217  } );
1218  }
1219 
1220  public function hasMasterConnection() {
1221  return $this->isOpen( $this->getWriterIndex() );
1222  }
1223 
1224  public function hasMasterChanges() {
1225  $pending = 0;
1226  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$pending ) {
1227  $pending |= $conn->writesOrCallbacksPending();
1228  } );
1229 
1230  return (bool)$pending;
1231  }
1232 
1233  public function lastMasterChangeTimestamp() {
1234  $lastTime = false;
1235  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$lastTime ) {
1236  $lastTime = max( $lastTime, $conn->lastDoneWrites() );
1237  } );
1238 
1239  return $lastTime;
1240  }
1241 
1242  public function hasOrMadeRecentMasterChanges( $age = null ) {
1243  $age = ( $age === null ) ? $this->mWaitTimeout : $age;
1244 
1245  return ( $this->hasMasterChanges()
1246  || $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
1247  }
1248 
1249  public function pendingMasterChangeCallers() {
1250  $fnames = [];
1251  $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$fnames ) {
1252  $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1253  } );
1254 
1255  return $fnames;
1256  }
1257 
1258  public function getLaggedReplicaMode( $domain = false ) {
1259  // No-op if there is only one DB (also avoids recursion)
1260  if ( !$this->laggedReplicaMode && $this->getServerCount() > 1 ) {
1261  try {
1262  // See if laggedReplicaMode gets set
1263  $conn = $this->getConnection( self::DB_REPLICA, false, $domain );
1264  $this->reuseConnection( $conn );
1265  } catch ( DBConnectionError $e ) {
1266  // Avoid expensive re-connect attempts and failures
1267  $this->allReplicasDownMode = true;
1268  $this->laggedReplicaMode = true;
1269  }
1270  }
1271 
1272  return $this->laggedReplicaMode;
1273  }
1274 
1280  public function getLaggedSlaveMode( $domain = false ) {
1281  return $this->getLaggedReplicaMode( $domain );
1282  }
1283 
1284  public function laggedReplicaUsed() {
1285  return $this->laggedReplicaMode;
1286  }
1287 
1293  public function laggedSlaveUsed() {
1294  return $this->laggedReplicaUsed();
1295  }
1296 
1297  public function getReadOnlyReason( $domain = false, IDatabase $conn = null ) {
1298  if ( $this->readOnlyReason !== false ) {
1299  return $this->readOnlyReason;
1300  } elseif ( $this->getLaggedReplicaMode( $domain ) ) {
1301  if ( $this->allReplicasDownMode ) {
1302  return 'The database has been automatically locked ' .
1303  'until the replica database servers become available';
1304  } else {
1305  return 'The database has been automatically locked ' .
1306  'while the replica database servers catch up to the master.';
1307  }
1308  } elseif ( $this->masterRunningReadOnly( $domain, $conn ) ) {
1309  return 'The database master is running in read-only mode.';
1310  }
1311 
1312  return false;
1313  }
1314 
1320  private function masterRunningReadOnly( $domain, IDatabase $conn = null ) {
1322  $masterServer = $this->getServerName( $this->getWriterIndex() );
1323 
1324  return (bool)$cache->getWithSetCallback(
1325  $cache->makeGlobalKey( __CLASS__, 'server-read-only', $masterServer ),
1326  self::TTL_CACHE_READONLY,
1327  function () use ( $domain, $conn ) {
1328  $old = $this->trxProfiler->setSilenced( true );
1329  try {
1330  $dbw = $conn ?: $this->getConnection( self::DB_MASTER, [], $domain );
1331  $readOnly = (int)$dbw->serverIsReadOnly();
1332  if ( !$conn ) {
1333  $this->reuseConnection( $dbw );
1334  }
1335  } catch ( DBError $e ) {
1336  $readOnly = 0;
1337  }
1338  $this->trxProfiler->setSilenced( $old );
1339  return $readOnly;
1340  },
1341  [ 'pcTTL' => $cache::TTL_PROC_LONG, 'busyValue' => 0 ]
1342  );
1343  }
1344 
1345  public function allowLagged( $mode = null ) {
1346  if ( $mode === null ) {
1347  return $this->mAllowLagged;
1348  }
1349  $this->mAllowLagged = $mode;
1350 
1351  return $this->mAllowLagged;
1352  }
1353 
1354  public function pingAll() {
1355  $success = true;
1356  $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$success ) {
1357  if ( !$conn->ping() ) {
1358  $success = false;
1359  }
1360  } );
1361 
1362  return $success;
1363  }
1364 
1365  public function forEachOpenConnection( $callback, array $params = [] ) {
1366  foreach ( $this->mConns as $connsByServer ) {
1367  foreach ( $connsByServer as $serverConns ) {
1368  foreach ( $serverConns as $conn ) {
1369  $mergedParams = array_merge( [ $conn ], $params );
1370  call_user_func_array( $callback, $mergedParams );
1371  }
1372  }
1373  }
1374  }
1375 
1376  public function forEachOpenMasterConnection( $callback, array $params = [] ) {
1377  $masterIndex = $this->getWriterIndex();
1378  foreach ( $this->mConns as $connsByServer ) {
1379  if ( isset( $connsByServer[$masterIndex] ) ) {
1381  foreach ( $connsByServer[$masterIndex] as $conn ) {
1382  $mergedParams = array_merge( [ $conn ], $params );
1383  call_user_func_array( $callback, $mergedParams );
1384  }
1385  }
1386  }
1387  }
1388 
1389  public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
1390  foreach ( $this->mConns as $connsByServer ) {
1391  foreach ( $connsByServer as $i => $serverConns ) {
1392  if ( $i === $this->getWriterIndex() ) {
1393  continue; // skip master
1394  }
1395  foreach ( $serverConns as $conn ) {
1396  $mergedParams = array_merge( [ $conn ], $params );
1397  call_user_func_array( $callback, $mergedParams );
1398  }
1399  }
1400  }
1401  }
1402 
1403  public function getMaxLag( $domain = false ) {
1404  $maxLag = -1;
1405  $host = '';
1406  $maxIndex = 0;
1407 
1408  if ( $this->getServerCount() <= 1 ) {
1409  return [ $host, $maxLag, $maxIndex ]; // no replication = no lag
1410  }
1411 
1412  $lagTimes = $this->getLagTimes( $domain );
1413  foreach ( $lagTimes as $i => $lag ) {
1414  if ( $this->mLoads[$i] > 0 && $lag > $maxLag ) {
1415  $maxLag = $lag;
1416  $host = $this->mServers[$i]['host'];
1417  $maxIndex = $i;
1418  }
1419  }
1420 
1421  return [ $host, $maxLag, $maxIndex ];
1422  }
1423 
1424  public function getLagTimes( $domain = false ) {
1425  if ( $this->getServerCount() <= 1 ) {
1426  return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
1427  }
1428 
1429  $knownLagTimes = []; // map of (server index => 0 seconds)
1430  $indexesWithLag = [];
1431  foreach ( $this->mServers as $i => $server ) {
1432  if ( empty( $server['is static'] ) ) {
1433  $indexesWithLag[] = $i; // DB server might have replication lag
1434  } else {
1435  $knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
1436  }
1437  }
1438 
1439  return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
1440  }
1441 
1442  public function safeGetLag( IDatabase $conn ) {
1443  if ( $this->getServerCount() <= 1 ) {
1444  return 0;
1445  } else {
1446  return $conn->getLag();
1447  }
1448  }
1449 
1450  public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = 10 ) {
1451  if ( $this->getServerCount() <= 1 || !$conn->getLBInfo( 'replica' ) ) {
1452  return true; // server is not a replica DB
1453  }
1454 
1455  if ( !$pos ) {
1456  // Get the current master position, opening a connection if needed
1457  $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1458  if ( $masterConn ) {
1459  $pos = $masterConn->getMasterPos();
1460  } else {
1461  $masterConn = $this->openConnection( $this->getWriterIndex(), self::DOMAIN_ANY );
1462  $pos = $masterConn->getMasterPos();
1463  $this->closeConnection( $masterConn );
1464  }
1465  }
1466 
1467  if ( $pos instanceof DBMasterPos ) {
1468  $result = $conn->masterPosWait( $pos, $timeout );
1469  if ( $result == -1 || is_null( $result ) ) {
1470  $msg = __METHOD__ . ": Timed out waiting on {$conn->getServer()} pos {$pos}";
1471  $this->replLogger->warning( "$msg" );
1472  $ok = false;
1473  } else {
1474  $this->replLogger->info( __METHOD__ . ": Done" );
1475  $ok = true;
1476  }
1477  } else {
1478  $ok = false; // something is misconfigured
1479  $this->replLogger->error( "Could not get master pos for {$conn->getServer()}." );
1480  }
1481 
1482  return $ok;
1483  }
1484 
1485  public function setTransactionListener( $name, callable $callback = null ) {
1486  if ( $callback ) {
1487  $this->trxRecurringCallbacks[$name] = $callback;
1488  } else {
1489  unset( $this->trxRecurringCallbacks[$name] );
1490  }
1492  function ( IDatabase $conn ) use ( $name, $callback ) {
1493  $conn->setTransactionListener( $name, $callback );
1494  }
1495  );
1496  }
1497 
1498  public function setTableAliases( array $aliases ) {
1499  $this->tableAliases = $aliases;
1500  }
1501 
1502  public function setDomainPrefix( $prefix ) {
1503  if ( $this->mConns['foreignUsed'] ) {
1504  // Do not switch connections to explicit foreign domains unless marked as free
1505  $domains = [];
1506  foreach ( $this->mConns['foreignUsed'] as $i => $connsByDomain ) {
1507  $domains = array_merge( $domains, array_keys( $connsByDomain ) );
1508  }
1509  $domains = implode( ', ', $domains );
1510  throw new DBUnexpectedError( null,
1511  "Foreign domain connections are still in use ($domains)." );
1512  }
1513 
1514  $this->localDomain = new DatabaseDomain(
1515  $this->localDomain->getDatabase(),
1516  null,
1517  $prefix
1518  );
1519 
1520  $this->forEachOpenConnection( function ( IDatabase $db ) use ( $prefix ) {
1521  $db->tablePrefix( $prefix );
1522  } );
1523  }
1524 
1531  final protected function getScopedPHPBehaviorForCommit() {
1532  if ( PHP_SAPI != 'cli' ) { // http://bugs.php.net/bug.php?id=47540
1533  $old = ignore_user_abort( true ); // avoid half-finished operations
1534  return new ScopedCallback( function () use ( $old ) {
1535  ignore_user_abort( $old );
1536  } );
1537  }
1538 
1539  return null;
1540  }
1541 
1542  function __destruct() {
1543  // Avoid connection leaks for sanity
1544  $this->disable();
1545  }
1546 }
explicitTrxActive()
lastDoneWrites()
Returns the last time the connection may have been used for write queries.
commitAll($fname=__METHOD__)
Commit transactions on all open connections.
array[] $trxRecurringCallbacks
Map of (name => callable)
Database error base class.
Definition: DBError.php:26
the array() calling protocol came about after MediaWiki 1.4rc1.
safeGetLag(IDatabase $conn)
Get the lag in seconds for a given connection, or zero if this load balancer does not have replicatio...
integer $mWaitTimeout
Seconds to spend waiting on replica DB lag to resolve.
getAnyOpenConnection($i)
Get any open connection to a given server index, local or foreign Returns false if there is no connec...
static newUnspecified()
writesOrCallbacksPending()
Returns true if there is a transaction open with possible write queries or transaction pre-commit/idl...
Definition: Database.php:526
bool IDatabase $mErrorConnection
Database connection that caused a problem.
$context
Definition: load.php:50
static factory($dbType, $p=[])
Construct a Database subclass instance given a database type and parameters.
Definition: Database.php:325
$success
trxLevel()
Gets the current transaction level.
Definition: Database.php:440
reuseConnection($conn)
Mark a foreign connection as being available for reuse under a different DB name or prefix...
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:189
ILoadMonitor $loadMonitor
getServerCount()
Get the number of defined servers (not the number of open connections)
array[] $mServers
Map of (server index => server config array)
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
getLoadMonitor()
Get a LoadMonitor instance.
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException'returning false will NOT prevent logging $e
Definition: hooks.txt:2102
rollbackMasterChanges($fname=__METHOD__)
Issue ROLLBACK only on master, only if queries were done on connection.
LoggerInterface $replLogger
runTransactionListenerCallbacks($trigger)
Actually run any "transaction listener" callbacks.
Definition: Database.php:2615
close()
Closes a database connection.
finalizeMasterChanges()
Perform all pre-commit callbacks that remain part of the atomic transactions and disable any post-com...
object string $profiler
Class name or object With profileIn/profileOut methods.
setTableAliases(array $aliases)
Make certain table names use their own database, schema, and table prefix when passed into SQL querie...
reallyOpenConnection(array $server, $dbNameOverride=false)
Really opens a connection.
An object representing a master or replica DB position in a replicated setup.
Definition: DBMasterPos.php:7
bool $cliMode
Whether this PHP instance is for a CLI script.
getReaderIndex($group=false, $domain=false)
Get the index of the reader connection, which may be a replica DB This takes into account load ratios...
openConnection($i, $domain=false)
array[] $mGroupLoads
Map of (group => server index => weight)
float[] $mLoads
Map of (server index => weight)
bool $allReplicasDownMode
Whether the generic reader fell back to a lagged replica DB.
const DB_MASTER
Definition: defines.php:23
forEachOpenConnection($callback, array $params=[])
Call a function with each open connection object.
getLazyConnectionRef($db, $groups=[], $domain=false)
Get a database connection handle reference without connecting yet.
masterPosWait(DBMasterPos $pos, $timeout)
Wait for the replica DB to catch up to a given master position.
boolean $disabled
__construct(array $params)
Construct a manager of IDatabase connection objects.
array[] $mConns
Map of (local/foreignUsed/foreignFree => server index => IDatabase array)
The index of the header message $result[1]=The index of the body text message $result[2 through n]=Parameters passed to body text message.Please note the header message cannot receive/use parameters. 'ImportHandleLogItemXMLTag':When parsing a XML tag in a log item.Return false to stop further processing of the tag $reader:XMLReader object $logInfo:Array of information 'ImportHandlePageXMLTag':When parsing a XML tag in a page.Return false to stop further processing of the tag $reader:XMLReader object &$pageInfo:Array of information 'ImportHandleRevisionXMLTag':When parsing a XML tag in a page revision.Return false to stop further processing of the tag $reader:XMLReader object $pageInfo:Array of page information $revisionInfo:Array of revision information 'ImportHandleToplevelXMLTag':When parsing a top level XML tag.Return false to stop further processing of the tag $reader:XMLReader object 'ImportHandleUploadXMLTag':When parsing a XML tag in a file upload.Return false to stop further processing of the tag $reader:XMLReader object $revisionInfo:Array of information 'ImportLogInterwikiLink':Hook to change the interwiki link used in log entries and edit summaries for transwiki imports.&$fullInterwikiPrefix:Interwiki prefix, may contain colons.&$pageTitle:String that contains page title. 'ImportSources':Called when reading from the $wgImportSources configuration variable.Can be used to lazy-load the import sources list.&$importSources:The value of $wgImportSources.Modify as necessary.See the comment in DefaultSettings.php for the detail of how to structure this array. 'InfoAction':When building information to display on the action=info page.$context:IContextSource object &$pageInfo:Array of information 'InitializeArticleMaybeRedirect':MediaWiki check to see if title is a redirect.&$title:Title object for the current page &$request:WebRequest &$ignoreRedirect:boolean to skip redirect check &$target:Title/string of redirect target &$article:Article object 'InternalParseBeforeLinks':during Parser's internalParse method before links but after nowiki/noinclude/includeonly/onlyinclude and other processings.&$parser:Parser object &$text:string containing partially parsed text &$stripState:Parser's internal StripState object 'InternalParseBeforeSanitize':during Parser's internalParse method just before the parser removes unwanted/dangerous HTML tags and after nowiki/noinclude/includeonly/onlyinclude and other processings.Ideal for syntax-extensions after template/parser function execution which respect nowiki and HTML-comments.&$parser:Parser object &$text:string containing partially parsed text &$stripState:Parser's internal StripState object 'InterwikiLoadPrefix':When resolving if a given prefix is an interwiki or not.Return true without providing an interwiki to continue interwiki search.$prefix:interwiki prefix we are looking for.&$iwData:output array describing the interwiki with keys iw_url, iw_local, iw_trans and optionally iw_api and iw_wikiid. 'InvalidateEmailComplete':Called after a user's email has been invalidated successfully.$user:user(object) whose email is being invalidated 'IRCLineURL':When constructing the URL to use in an IRC notification.Callee may modify $url and $query, URL will be constructed as $url.$query &$url:URL to index.php &$query:Query string $rc:RecentChange object that triggered url generation 'IsFileCacheable':Override the result of Article::isFileCacheable()(if true) &$article:article(object) being checked 'IsTrustedProxy':Override the result of IP::isTrustedProxy() &$ip:IP being check &$result:Change this value to override the result of IP::isTrustedProxy() 'IsUploadAllowedFromUrl':Override the result of UploadFromUrl::isAllowedUrl() $url:URL used to upload from &$allowed:Boolean indicating if uploading is allowed for given URL 'isValidEmailAddr':Override the result of Sanitizer::validateEmail(), for instance to return false if the domain name doesn't match your organization.$addr:The e-mail address entered by the user &$result:Set this and return false to override the internal checks 'isValidPassword':Override the result of User::isValidPassword() $password:The password entered by the user &$result:Set this and return false to override the internal checks $user:User the password is being validated for 'Language::getMessagesFileName':$code:The language code or the language we're looking for a messages file for &$file:The messages file path, you can override this to change the location. 'LanguageGetMagic':DEPRECATED!Use $magicWords in a file listed in $wgExtensionMessagesFiles instead.Use this to define synonyms of magic words depending of the language &$magicExtensions:associative array of magic words synonyms $lang:language code(string) 'LanguageGetNamespaces':Provide custom ordering for namespaces or remove namespaces.Do not use this hook to add namespaces.Use CanonicalNamespaces for that.&$namespaces:Array of namespaces indexed by their numbers 'LanguageGetSpecialPageAliases':DEPRECATED!Use $specialPageAliases in a file listed in $wgExtensionMessagesFiles instead.Use to define aliases of special pages names depending of the language &$specialPageAliases:associative array of magic words synonyms $lang:language code(string) 'LanguageGetTranslatedLanguageNames':Provide translated language names.&$names:array of language code=> language name $code:language of the preferred translations 'LanguageLinks':Manipulate a page's language links.This is called in various places to allow extensions to define the effective language links for a page.$title:The page's Title.&$links:Associative array mapping language codes to prefixed links of the form"language:title".&$linkFlags:Associative array mapping prefixed links to arrays of flags.Currently unused, but planned to provide support for marking individual language links in the UI, e.g.for featured articles. 'LanguageSelector':Hook to change the language selector available on a page.$out:The output page.$cssClassName:CSS class name of the language selector. 'LinkBegin':DEPRECATED!Use HtmlPageLinkRendererBegin instead.Used when generating internal and interwiki links in Linker::link(), before processing starts.Return false to skip default processing and return $ret.See documentation for Linker::link() for details on the expected meanings of parameters.$skin:the Skin object $target:the Title that the link is pointing to &$html:the contents that the< a > tag should have(raw HTML) $result
Definition: hooks.txt:1934
doWait($index, $open=false, $timeout=null)
Wait for a given replica DB to catch up to the master pos stored in $this.
getLagTimes($domain=false)
Get an estimate of replication lag (in seconds) for each server.
Class to handle database/prefix specification for IDatabase domains.
BagOStuff $srvCache
WANObjectCache $wanCache
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
getRandomNonLagged(array $loads, $domain=false, $maxLag=INF)
getServerInfo($i)
Return the server info structure for a given index, or false if the index is invalid.
array $loadMonitorConfig
The LoadMonitor configuration.
string $mLastError
The last DB selection or connection error.
setServerInfo($i, array $serverInfo)
Sets the server info structure for the given index.
Base class for the more common types of database errors.
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any and then calling but I prefer the flexibility This should also do the output encoding The system allocates a global one in $wgOut Title Represents the title of an and does all the work of translating among various forms such as plain database key
Definition: design.txt:25
setFlag($flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
getReadOnlyReason($domain=false, IDatabase $conn=null)
getLag()
Get replica DB lag.
Database cluster connection, tracking, load balancing, and transaction manager interface.
Helper class that detects high-contention DB queries via profiling calls.
this hook is for auditing only RecentChangesLinked and Watchlist RecentChangesLinked and Watchlist e g Watchlist removed from all revisions and log entries to which it was applied This gives extensions a chance to take it off their books as the deletion has already been partly carried out by this point or something similar the user will be unable to create the tag set and then return false from the hook function Ensure you consume the ChangeTagAfterDelete hook to carry out custom deletion actions as context called by AbstractContent::getParserOutput May be used to override the normal model specific rendering of page content as context as context $options
Definition: hooks.txt:1046
waitForOne($pos, $timeout=null)
Set the master wait position and wait for a "generic" replica DB to catch up to it.
allowLagged($mode=null)
Disables/enables lag checks.
hasMasterChanges()
Determine if there are pending changes in a transaction by this thread.
getScopedPHPBehaviorForCommit()
Make PHP ignore user aborts/disconnects until the returned value leaves scope.
Exception class for attempted DB access.
integer $connsOpened
Total connections opened.
setDomainPrefix($prefix)
Set a new table prefix for the existing local domain ID for testing.
$cache
Definition: mcc.php:33
$params
writesOrCallbacksPending()
Returns true if there is a transaction open with possible write queries or transaction pre-commit/idl...
flushReplicaSnapshots($fname=__METHOD__)
Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshot.
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Definition: DBConnRef.php:10
getMaxLag($domain=false)
Get the hostname and lag time of the most-lagged replica DB.
A BagOStuff object with no objects in it.
LoggerInterface $connLogger
disable()
Disable this load balancer.
getFlag($flag)
Returns a boolean whether the flag $flag is set for this connection.
beginMasterChanges($fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
string $agent
Agent name for query profiling.
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
approveMasterChanges(array $options)
Perform all pre-commit checks for things like replication safety.
hasOrMadeRecentMasterChanges($age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
pendingWriteQueryDuration($type=self::ESTIMATE_TOTAL)
Get the time spend running write queries for this transaction.
forEachOpenMasterConnection($callback, array $params=[])
Call a function with each open connection object to a master.
const DBO_TRX
Definition: defines.php:9
commit($fname=__METHOD__, $flush= '')
Commits a transaction previously started using begin().
runOnTransactionIdleCallbacks($trigger)
Actually run and consume any "on transaction idle/resolution" callbacks.
Definition: Database.php:2535
flushSnapshot($fname=__METHOD__)
Commit any transaction but error out if writes or callbacks are pending.
Definition: Database.php:2848
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
const DBO_DEFAULT
Definition: defines.php:10
restoreFlags($state=self::RESTORE_PRIOR)
Restore the flags to their prior state before the last setFlag/clearFlag call.
suppressTransactionEndCallbacks()
Suppress all pending post-COMMIT/ROLLBACK callbacks.
if(!defined( 'MEDIAWIKI')) $fname
This file is not a valid entry point, perform no further processing unless MEDIAWIKI is defined...
Definition: Setup.php:36
setTransactionListener($name, callable $callback=null)
Set a callback via IDatabase::setTransactionListener() on all current and future master connections o...
getLaggedReplicaMode($domain=false)
closeAll()
Close all open connections.
pendingWriteCallers()
Get the list of method names that did write queries for this transaction.
string $localDomainIdAlias
Alternate ID string for the domain instead of DatabaseDomain::getId()
masterRunningReadOnly($domain, IDatabase $conn=null)
getServer()
Get the server hostname or IP address.
getConnection($i, $groups=[], $domain=false)
static newFromId($domain)
LoggerInterface $perfLogger
tablePrefix($prefix=null)
Get/set the table prefix.
closeConnection(IDatabase $conn)
Close a connection.
haveIndex($i)
Returns true if the specified index is a valid server index.
getLBInfo($name=null)
Get properties passed down from the server info array of the load balancer.
this hook is for auditing only RecentChangesLinked and Watchlist RecentChangesLinked and Watchlist e g Watchlist removed from all revisions and log entries to which it was applied This gives extensions a chance to take it off their books as the deletion has already been partly carried out by this point or something similar the user will be unable to create the tag set and then return false from the hook function Ensure you consume the ChangeTagAfterDelete hook to carry out custom deletion actions as context called by AbstractContent::getParserOutput May be used to override the normal model specific rendering of page content as context as context the output can only depend on parameters provided to this hook not on global state indicating whether full HTML should be generated If generation of HTML may be but other information should still be present in the ParserOutput object to manipulate or replace but no entry for that model exists in $wgContentHandlers if desired whether it is OK to use $contentModel on $title Handler functions that modify $ok should generally return false to prevent further hooks from further modifying $ok inclusive $limit
Definition: hooks.txt:1046
setTrxEndCallbackSuppression($suppress)
Whether to disable running of post-COMMIT/ROLLBACK callbacks.
Definition: Database.php:2522
isOpen($index)
Test if the specified index represents an open connection.
string $host
Current server name.
string bool $trxRoundId
String if a requested DBO_TRX transaction round is active.
static pickRandom($weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
Definition: ArrayUtils.php:66
pendingMasterChangeCallers()
Get the list of callers that have pending master changes.
bool $laggedReplicaMode
Whether the generic reader fell back to a lagged replica DB.
isNonZeroLoad($i)
Returns true if the specified index is valid and has non-zero load.
waitFor($pos)
Set the master wait position If a DB_REPLICA connection has been opened already, waits Otherwise sets...
integer $mReadIndex
The generic (not query grouped) replica DB index (of $mServers)
callable $errorLogger
Exception logger.
getMasterPos()
Get the current master position for chronology control purposes.
const DB_REPLICA
Definition: defines.php:22
openForeignConnection($i, $domain)
Open a connection to a foreign DB, or return one if it is already open.
DatabaseDomain $localDomain
Local Domain ID and default for selectDB() calls.
runOnTransactionPreCommitCallbacks()
Actually run and consume any "on transaction pre-commit" callbacks.
Definition: Database.php:2585
string bool $readOnlyReason
Reason the LB is read-only or false if not.
bool $mAllowLagged
Whether to disregard replica DB lag as a factor in replica DB selection.
commitMasterChanges($fname=__METHOD__)
Issue COMMIT on all master connections where writes where done.
rollback($fname=__METHOD__, $flush= '')
Rollback a transaction previously started using begin().
bool DBMasterPos $mWaitForPos
False if not set.
getServerName($i)
Get the host name or IP address of the server with the specified index Prefer a readable name if avai...
ping(&$rtt=null)
Ping the server and try to reconnect if it there is no connection.
getConnectionRef($db, $groups=[], $domain=false)
Get a database connection handle reference.
forEachOpenReplicaConnection($callback, array $params=[])
Call a function with each open replica DB connection object.
getLaggedSlaveMode($domain=false)
undoTransactionRoundFlags(IDatabase $conn)
waitForAll($pos, $timeout=null)
Set the master wait position and wait for ALL replica DBs to catch up to it.
BagOStuff $memCache
setTransactionListener($name, callable $callback=null)
Run a callback each time any transaction commits or rolls back.
do that in ParserLimitReportFormat instead use this to modify the parameters of the image and a DIV can begin in one section and end in another Make sure your code can handle that case gracefully See the EditSectionClearerLink extension for an example zero but section is usually empty its values are the globals values before the output is cached one of or reset my talk my contributions etc etc otherwise the built in rate limiting checks are if enabled allows for interception of redirect as a string mapping parameter names to values & $type
Definition: hooks.txt:2491
TransactionProfiler $trxProfiler
see documentation in includes Linker php for Linker::makeImageLink & $time
Definition: hooks.txt:1749
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:34
applyTransactionRoundFlags(IDatabase $conn)
LoggerInterface $queryLogger
runMasterPostTrxCallbacks($type)
Issue all pending post-COMMIT/ROLLBACK callbacks.
safeWaitForMasterPos(IDatabase $conn, $pos=false, $timeout=10)
Wait for a replica DB to reach a specified master position.
lastMasterChangeTimestamp()
Get the timestamp of the latest write query done by this thread.
flushSnapshot($fname=__METHOD__)
Commit any transaction but error out if writes or callbacks are pending.
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:300