MediaWiki REL1_28
LoadBalancer.php
Go to the documentation of this file.
1<?php
23use Psr\Log\LoggerInterface;
24use Wikimedia\ScopedCallback;
25
31class LoadBalancer implements ILoadBalancer {
33 private $mServers;
35 private $mConns;
37 private $mLoads;
39 private $mGroupLoads;
47 private $tableAliases = [];
48
50 private $loadMonitor;
52 private $srvCache;
54 private $memCache;
56 private $wanCache;
58 protected $profiler;
60 protected $trxProfiler;
62 protected $replLogger;
64 protected $connLogger;
66 protected $queryLogger;
68 protected $perfLogger;
69
73 private $mReadIndex;
75 private $mWaitForPos;
77 private $laggedReplicaMode = false;
79 private $allReplicasDownMode = false;
81 private $mLastError = 'Unknown error';
83 private $readOnlyReason = false;
85 private $connsOpened = 0;
87 private $trxRoundId = false;
91 private $localDomain;
95 private $host;
97 protected $cliMode;
99 protected $agent;
100
103
105 private $disabled = false;
106
108 const CONN_HELD_WARN_THRESHOLD = 10;
109
111 const MAX_LAG_DEFAULT = 10;
113 const TTL_CACHE_READONLY = 5;
114
115 public function __construct( array $params ) {
116 if ( !isset( $params['servers'] ) ) {
117 throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
118 }
119 $this->mServers = $params['servers'];
120
121 $this->localDomain = isset( $params['localDomain'] )
122 ? DatabaseDomain::newFromId( $params['localDomain'] )
124 // In case a caller assumes that the domain ID is simply <db>-<prefix>, which is almost
125 // always true, gracefully handle the case when they fail to account for escaping.
126 if ( $this->localDomain->getTablePrefix() != '' ) {
127 $this->localDomainIdAlias =
128 $this->localDomain->getDatabase() . '-' . $this->localDomain->getTablePrefix();
129 } else {
130 $this->localDomainIdAlias = $this->localDomain->getDatabase();
131 }
132
133 $this->mWaitTimeout = isset( $params['waitTimeout'] ) ? $params['waitTimeout'] : 10;
134
135 $this->mReadIndex = -1;
136 $this->mConns = [
137 'local' => [],
138 'foreignUsed' => [],
139 'foreignFree' => []
140 ];
141 $this->mLoads = [];
142 $this->mWaitForPos = false;
143 $this->mErrorConnection = false;
144 $this->mAllowLagged = false;
145
146 if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
147 $this->readOnlyReason = $params['readOnlyReason'];
148 }
149
150 if ( isset( $params['loadMonitor'] ) ) {
151 $this->loadMonitorConfig = $params['loadMonitor'];
152 } else {
153 $this->loadMonitorConfig = [ 'class' => 'LoadMonitorNull' ];
154 }
155
156 foreach ( $params['servers'] as $i => $server ) {
157 $this->mLoads[$i] = $server['load'];
158 if ( isset( $server['groupLoads'] ) ) {
159 foreach ( $server['groupLoads'] as $group => $ratio ) {
160 if ( !isset( $this->mGroupLoads[$group] ) ) {
161 $this->mGroupLoads[$group] = [];
162 }
163 $this->mGroupLoads[$group][$i] = $ratio;
164 }
165 }
166 }
167
168 if ( isset( $params['srvCache'] ) ) {
169 $this->srvCache = $params['srvCache'];
170 } else {
171 $this->srvCache = new EmptyBagOStuff();
172 }
173 if ( isset( $params['memCache'] ) ) {
174 $this->memCache = $params['memCache'];
175 } else {
176 $this->memCache = new EmptyBagOStuff();
177 }
178 if ( isset( $params['wanCache'] ) ) {
179 $this->wanCache = $params['wanCache'];
180 } else {
181 $this->wanCache = WANObjectCache::newEmpty();
182 }
183 $this->profiler = isset( $params['profiler'] ) ? $params['profiler'] : null;
184 if ( isset( $params['trxProfiler'] ) ) {
185 $this->trxProfiler = $params['trxProfiler'];
186 } else {
187 $this->trxProfiler = new TransactionProfiler();
188 }
189
190 $this->errorLogger = isset( $params['errorLogger'] )
191 ? $params['errorLogger']
192 : function ( Exception $e ) {
193 trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
194 };
195
196 foreach ( [ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ] as $key ) {
197 $this->$key = isset( $params[$key] ) ? $params[$key] : new \Psr\Log\NullLogger();
198 }
199
200 $this->host = isset( $params['hostname'] )
201 ? $params['hostname']
202 : ( gethostname() ?: 'unknown' );
203 $this->cliMode = isset( $params['cliMode'] ) ? $params['cliMode'] : PHP_SAPI === 'cli';
204 $this->agent = isset( $params['agent'] ) ? $params['agent'] : '';
205 }
206
212 private function getLoadMonitor() {
213 if ( !isset( $this->loadMonitor ) ) {
214 $class = $this->loadMonitorConfig['class'];
215 $this->loadMonitor = new $class(
216 $this, $this->srvCache, $this->memCache, $this->loadMonitorConfig );
217 $this->loadMonitor->setLogger( $this->replLogger );
218 }
219
220 return $this->loadMonitor;
221 }
222
229 private function getRandomNonLagged( array $loads, $domain = false, $maxLag = INF ) {
230 $lags = $this->getLagTimes( $domain );
231
232 # Unset excessively lagged servers
233 foreach ( $lags as $i => $lag ) {
234 if ( $i != 0 ) {
235 # How much lag this server nominally is allowed to have
236 $maxServerLag = isset( $this->mServers[$i]['max lag'] )
237 ? $this->mServers[$i]['max lag']
238 : self::MAX_LAG_DEFAULT; // default
239 # Constrain that futher by $maxLag argument
240 $maxServerLag = min( $maxServerLag, $maxLag );
241
242 $host = $this->getServerName( $i );
243 if ( $lag === false && !is_infinite( $maxServerLag ) ) {
244 $this->replLogger->error( "Server $host (#$i) is not replicating?" );
245 unset( $loads[$i] );
246 } elseif ( $lag > $maxServerLag ) {
247 $this->replLogger->warning( "Server $host (#$i) has >= $lag seconds of lag" );
248 unset( $loads[$i] );
249 }
250 }
251 }
252
253 # Find out if all the replica DBs with non-zero load are lagged
254 $sum = 0;
255 foreach ( $loads as $load ) {
256 $sum += $load;
257 }
258 if ( $sum == 0 ) {
259 # No appropriate DB servers except maybe the master and some replica DBs with zero load
260 # Do NOT use the master
261 # Instead, this function will return false, triggering read-only mode,
262 # and a lagged replica DB will be used instead.
263 return false;
264 }
265
266 if ( count( $loads ) == 0 ) {
267 return false;
268 }
269
270 # Return a random representative of the remainder
271 return ArrayUtils::pickRandom( $loads );
272 }
273
274 public function getReaderIndex( $group = false, $domain = false ) {
275 if ( count( $this->mServers ) == 1 ) {
276 # Skip the load balancing if there's only one server
277 return $this->getWriterIndex();
278 } elseif ( $group === false && $this->mReadIndex >= 0 ) {
279 # Shortcut if generic reader exists already
280 return $this->mReadIndex;
281 }
282
283 # Find the relevant load array
284 if ( $group !== false ) {
285 if ( isset( $this->mGroupLoads[$group] ) ) {
286 $nonErrorLoads = $this->mGroupLoads[$group];
287 } else {
288 # No loads for this group, return false and the caller can use some other group
289 $this->connLogger->info( __METHOD__ . ": no loads for group $group" );
290
291 return false;
292 }
293 } else {
294 $nonErrorLoads = $this->mLoads;
295 }
296
297 if ( !count( $nonErrorLoads ) ) {
298 throw new InvalidArgumentException( "Empty server array given to LoadBalancer" );
299 }
300
301 # Scale the configured load ratios according to the dynamic load if supported
302 $this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $domain );
303
304 $laggedReplicaMode = false;
305
306 # No server found yet
307 $i = false;
308 # First try quickly looking through the available servers for a server that
309 # meets our criteria
310 $currentLoads = $nonErrorLoads;
311 while ( count( $currentLoads ) ) {
312 if ( $this->mAllowLagged || $laggedReplicaMode ) {
313 $i = ArrayUtils::pickRandom( $currentLoads );
314 } else {
315 $i = false;
316 if ( $this->mWaitForPos && $this->mWaitForPos->asOfTime() ) {
317 # ChronologyProtecter causes mWaitForPos to be set via sessions.
318 # This triggers doWait() after connect, so it's especially good to
319 # avoid lagged servers so as to avoid just blocking in that method.
320 $ago = microtime( true ) - $this->mWaitForPos->asOfTime();
321 # Aim for <= 1 second of waiting (being too picky can backfire)
322 $i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
323 }
324 if ( $i === false ) {
325 # Any server with less lag than it's 'max lag' param is preferable
326 $i = $this->getRandomNonLagged( $currentLoads, $domain );
327 }
328 if ( $i === false && count( $currentLoads ) != 0 ) {
329 # All replica DBs lagged. Switch to read-only mode
330 $this->replLogger->error( "All replica DBs lagged. Switch to read-only mode" );
331 $i = ArrayUtils::pickRandom( $currentLoads );
332 $laggedReplicaMode = true;
333 }
334 }
335
336 if ( $i === false ) {
337 # pickRandom() returned false
338 # This is permanent and means the configuration or the load monitor
339 # wants us to return false.
340 $this->connLogger->debug( __METHOD__ . ": pickRandom() returned false" );
341
342 return false;
343 }
344
345 $serverName = $this->getServerName( $i );
346 $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
347
348 $conn = $this->openConnection( $i, $domain );
349 if ( !$conn ) {
350 $this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" );
351 unset( $nonErrorLoads[$i] );
352 unset( $currentLoads[$i] );
353 $i = false;
354 continue;
355 }
356
357 // Decrement reference counter, we are finished with this connection.
358 // It will be incremented for the caller later.
359 if ( $domain !== false ) {
360 $this->reuseConnection( $conn );
361 }
362
363 # Return this server
364 break;
365 }
366
367 # If all servers were down, quit now
368 if ( !count( $nonErrorLoads ) ) {
369 $this->connLogger->error( "All servers down" );
370 }
371
372 if ( $i !== false ) {
373 # Replica DB connection successful.
374 # Wait for the session master pos for a short time.
375 if ( $this->mWaitForPos && $i > 0 ) {
376 $this->doWait( $i );
377 }
378 if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group === false ) {
379 $this->mReadIndex = $i;
380 # Record if the generic reader index is in "lagged replica DB" mode
381 if ( $laggedReplicaMode ) {
382 $this->laggedReplicaMode = true;
383 }
384 }
385 $serverName = $this->getServerName( $i );
386 $this->connLogger->debug(
387 __METHOD__ . ": using server $serverName for group '$group'" );
388 }
389
390 return $i;
391 }
392
396 public function waitFor( $pos ) {
397 $this->mWaitForPos = $pos;
398 $i = $this->mReadIndex;
399
400 if ( $i > 0 ) {
401 if ( !$this->doWait( $i ) ) {
402 $this->laggedReplicaMode = true;
403 }
404 }
405 }
406
407 public function waitForOne( $pos, $timeout = null ) {
408 $this->mWaitForPos = $pos;
409
410 $i = $this->mReadIndex;
411 if ( $i <= 0 ) {
412 // Pick a generic replica DB if there isn't one yet
413 $readLoads = $this->mLoads;
414 unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
415 $readLoads = array_filter( $readLoads ); // with non-zero load
416 $i = ArrayUtils::pickRandom( $readLoads );
417 }
418
419 if ( $i > 0 ) {
420 $ok = $this->doWait( $i, true, $timeout );
421 } else {
422 $ok = true; // no applicable loads
423 }
424
425 return $ok;
426 }
427
428 public function waitForAll( $pos, $timeout = null ) {
429 $this->mWaitForPos = $pos;
430 $serverCount = count( $this->mServers );
431
432 $ok = true;
433 for ( $i = 1; $i < $serverCount; $i++ ) {
434 if ( $this->mLoads[$i] > 0 ) {
435 $ok = $this->doWait( $i, true, $timeout ) && $ok;
436 }
437 }
438
439 return $ok;
440 }
441
446 public function getAnyOpenConnection( $i ) {
447 foreach ( $this->mConns as $connsByServer ) {
448 if ( !empty( $connsByServer[$i] ) ) {
449 return reset( $connsByServer[$i] );
450 }
451 }
452
453 return false;
454 }
455
463 protected function doWait( $index, $open = false, $timeout = null ) {
464 $close = false; // close the connection afterwards
465
466 // Check if we already know that the DB has reached this point
467 $server = $this->getServerName( $index );
468 $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server );
470 $knownReachedPos = $this->srvCache->get( $key );
471 if ( $knownReachedPos && $knownReachedPos->hasReached( $this->mWaitForPos ) ) {
472 $this->replLogger->debug( __METHOD__ .
473 ": replica DB $server known to be caught up (pos >= $knownReachedPos)." );
474 return true;
475 }
476
477 // Find a connection to wait on, creating one if needed and allowed
478 $conn = $this->getAnyOpenConnection( $index );
479 if ( !$conn ) {
480 if ( !$open ) {
481 $this->replLogger->debug( __METHOD__ . ": no connection open for $server" );
482
483 return false;
484 } else {
485 $conn = $this->openConnection( $index, self::DOMAIN_ANY );
486 if ( !$conn ) {
487 $this->replLogger->warning( __METHOD__ . ": failed to connect to $server" );
488
489 return false;
490 }
491 // Avoid connection spam in waitForAll() when connections
492 // are made just for the sake of doing this lag check.
493 $close = true;
494 }
495 }
496
497 $this->replLogger->info( __METHOD__ . ": Waiting for replica DB $server to catch up..." );
498 $timeout = $timeout ?: $this->mWaitTimeout;
499 $result = $conn->masterPosWait( $this->mWaitForPos, $timeout );
500
501 if ( $result == -1 || is_null( $result ) ) {
502 // Timed out waiting for replica DB, use master instead
503 $msg = __METHOD__ . ": Timed out waiting on $server pos {$this->mWaitForPos}";
504 $this->replLogger->warning( "$msg" );
505 $ok = false;
506 } else {
507 $this->replLogger->info( __METHOD__ . ": Done" );
508 $ok = true;
509 // Remember that the DB reached this point
510 $this->srvCache->set( $key, $this->mWaitForPos, BagOStuff::TTL_DAY );
511 }
512
513 if ( $close ) {
514 $this->closeConnection( $conn );
515 }
516
517 return $ok;
518 }
519
529 public function getConnection( $i, $groups = [], $domain = false ) {
530 if ( $i === null || $i === false ) {
531 throw new InvalidArgumentException( 'Attempt to call ' . __METHOD__ .
532 ' with invalid server index' );
533 }
534
535 if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
536 $domain = false; // local connection requested
537 }
538
539 $groups = ( $groups === false || $groups === [] )
540 ? [ false ] // check one "group": the generic pool
541 : (array)$groups;
542
543 $masterOnly = ( $i == self::DB_MASTER || $i == $this->getWriterIndex() );
544 $oldConnsOpened = $this->connsOpened; // connections open now
545
546 if ( $i == self::DB_MASTER ) {
547 $i = $this->getWriterIndex();
548 } else {
549 # Try to find an available server in any the query groups (in order)
550 foreach ( $groups as $group ) {
551 $groupIndex = $this->getReaderIndex( $group, $domain );
552 if ( $groupIndex !== false ) {
553 $i = $groupIndex;
554 break;
555 }
556 }
557 }
558
559 # Operation-based index
560 if ( $i == self::DB_REPLICA ) {
561 $this->mLastError = 'Unknown error'; // reset error string
562 # Try the general server pool if $groups are unavailable.
563 $i = ( $groups === [ false ] )
564 ? false // don't bother with this if that is what was tried above
565 : $this->getReaderIndex( false, $domain );
566 # Couldn't find a working server in getReaderIndex()?
567 if ( $i === false ) {
568 $this->mLastError = 'No working replica DB server: ' . $this->mLastError;
569 // Throw an exception
570 $this->reportConnectionError();
571 return null; // not reached
572 }
573 }
574
575 # Now we have an explicit index into the servers array
576 $conn = $this->openConnection( $i, $domain );
577 if ( !$conn ) {
578 // Throw an exception
579 $this->reportConnectionError();
580 return null; // not reached
581 }
582
583 # Profile any new connections that happen
584 if ( $this->connsOpened > $oldConnsOpened ) {
585 $host = $conn->getServer();
586 $dbname = $conn->getDBname();
587 $this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
588 }
589
590 if ( $masterOnly ) {
591 # Make master-requested DB handles inherit any read-only mode setting
592 $conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $domain, $conn ) );
593 }
594
595 return $conn;
596 }
597
598 public function reuseConnection( $conn ) {
599 $serverIndex = $conn->getLBInfo( 'serverIndex' );
600 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
601 if ( $serverIndex === null || $refCount === null ) {
612 return;
613 } elseif ( $conn instanceof DBConnRef ) {
614 // DBConnRef already handles calling reuseConnection() and only passes the live
615 // Database instance to this method. Any caller passing in a DBConnRef is broken.
616 $this->connLogger->error( __METHOD__ . ": got DBConnRef instance.\n" .
617 ( new RuntimeException() )->getTraceAsString() );
618
619 return;
620 }
621
622 if ( $this->disabled ) {
623 return; // DBConnRef handle probably survived longer than the LoadBalancer
624 }
625
626 $domain = $conn->getDomainID();
627 if ( !isset( $this->mConns['foreignUsed'][$serverIndex][$domain] ) ) {
628 throw new InvalidArgumentException( __METHOD__ .
629 ": connection $serverIndex/$domain not found; it may have already been freed." );
630 } elseif ( $this->mConns['foreignUsed'][$serverIndex][$domain] !== $conn ) {
631 throw new InvalidArgumentException( __METHOD__ .
632 ": connection $serverIndex/$domain mismatched; it may have already been freed." );
633 }
634 $conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
635 if ( $refCount <= 0 ) {
636 $this->mConns['foreignFree'][$serverIndex][$domain] = $conn;
637 unset( $this->mConns['foreignUsed'][$serverIndex][$domain] );
638 if ( !$this->mConns['foreignUsed'][$serverIndex] ) {
639 unset( $this->mConns[ 'foreignUsed' ][$serverIndex] ); // clean up
640 }
641 $this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
642 } else {
643 $this->connLogger->debug( __METHOD__ .
644 ": reference count for $serverIndex/$domain reduced to $refCount" );
645 }
646 }
647
648 public function getConnectionRef( $db, $groups = [], $domain = false ) {
649 $domain = ( $domain !== false ) ? $domain : $this->localDomain;
650
651 return new DBConnRef( $this, $this->getConnection( $db, $groups, $domain ) );
652 }
653
654 public function getLazyConnectionRef( $db, $groups = [], $domain = false ) {
655 $domain = ( $domain !== false ) ? $domain : $this->localDomain;
656
657 return new DBConnRef( $this, [ $db, $groups, $domain ] );
658 }
659
668 public function openConnection( $i, $domain = false ) {
669 if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
670 $domain = false; // local connection requested
671 }
672
673 if ( $domain !== false ) {
674 $conn = $this->openForeignConnection( $i, $domain );
675 } elseif ( isset( $this->mConns['local'][$i][0] ) ) {
676 $conn = $this->mConns['local'][$i][0];
677 } else {
678 if ( !isset( $this->mServers[$i] ) || !is_array( $this->mServers[$i] ) ) {
679 throw new InvalidArgumentException( "No server with index '$i'." );
680 }
681 // Open a new connection
682 $server = $this->mServers[$i];
683 $server['serverIndex'] = $i;
684 $conn = $this->reallyOpenConnection( $server, false );
685 $serverName = $this->getServerName( $i );
686 if ( $conn->isOpen() ) {
687 $this->connLogger->debug( "Connected to database $i at '$serverName'." );
688 $this->mConns['local'][$i][0] = $conn;
689 } else {
690 $this->connLogger->warning( "Failed to connect to database $i at '$serverName'." );
691 $this->mErrorConnection = $conn;
692 $conn = false;
693 }
694 }
695
696 if ( $conn && !$conn->isOpen() ) {
697 // Connection was made but later unrecoverably lost for some reason.
698 // Do not return a handle that will just throw exceptions on use,
699 // but let the calling code (e.g. getReaderIndex) try another server.
700 // See DatabaseMyslBase::ping() for how this can happen.
701 $this->mErrorConnection = $conn;
702 $conn = false;
703 }
704
705 return $conn;
706 }
707
728 private function openForeignConnection( $i, $domain ) {
729 $domainInstance = DatabaseDomain::newFromId( $domain );
730 $dbName = $domainInstance->getDatabase();
731 $prefix = $domainInstance->getTablePrefix();
732
733 if ( isset( $this->mConns['foreignUsed'][$i][$domain] ) ) {
734 // Reuse an already-used connection
735 $conn = $this->mConns['foreignUsed'][$i][$domain];
736 $this->connLogger->debug( __METHOD__ . ": reusing connection $i/$domain" );
737 } elseif ( isset( $this->mConns['foreignFree'][$i][$domain] ) ) {
738 // Reuse a free connection for the same domain
739 $conn = $this->mConns['foreignFree'][$i][$domain];
740 unset( $this->mConns['foreignFree'][$i][$domain] );
741 $this->mConns['foreignUsed'][$i][$domain] = $conn;
742 $this->connLogger->debug( __METHOD__ . ": reusing free connection $i/$domain" );
743 } elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) {
744 // Reuse a connection from another domain
745 $conn = reset( $this->mConns['foreignFree'][$i] );
746 $oldDomain = key( $this->mConns['foreignFree'][$i] );
747 // The empty string as a DB name means "don't care".
748 // DatabaseMysqlBase::open() already handle this on connection.
749 if ( strlen( $dbName ) && !$conn->selectDB( $dbName ) ) {
750 $this->mLastError = "Error selecting database '$dbName' on server " .
751 $conn->getServer() . " from client host {$this->host}";
752 $this->mErrorConnection = $conn;
753 $conn = false;
754 } else {
755 $conn->tablePrefix( $prefix );
756 unset( $this->mConns['foreignFree'][$i][$oldDomain] );
757 $this->mConns['foreignUsed'][$i][$domain] = $conn;
758 $this->connLogger->debug( __METHOD__ .
759 ": reusing free connection from $oldDomain for $domain" );
760 }
761 } else {
762 if ( !isset( $this->mServers[$i] ) || !is_array( $this->mServers[$i] ) ) {
763 throw new InvalidArgumentException( "No server with index '$i'." );
764 }
765 // Open a new connection
766 $server = $this->mServers[$i];
767 $server['serverIndex'] = $i;
768 $server['foreignPoolRefCount'] = 0;
769 $server['foreign'] = true;
770 $conn = $this->reallyOpenConnection( $server, $dbName );
771 if ( !$conn->isOpen() ) {
772 $this->connLogger->warning( __METHOD__ . ": connection error for $i/$domain" );
773 $this->mErrorConnection = $conn;
774 $conn = false;
775 } else {
776 $conn->tablePrefix( $prefix );
777 $this->mConns['foreignUsed'][$i][$domain] = $conn;
778 $this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
779 }
780 }
781
782 // Increment reference count
783 if ( $conn ) {
784 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
785 $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
786 }
787
788 return $conn;
789 }
790
798 private function isOpen( $index ) {
799 if ( !is_integer( $index ) ) {
800 return false;
801 }
802
803 return (bool)$this->getAnyOpenConnection( $index );
804 }
805
817 protected function reallyOpenConnection( array $server, $dbNameOverride = false ) {
818 if ( $this->disabled ) {
819 throw new DBAccessError();
820 }
821
822 if ( $dbNameOverride !== false ) {
823 $server['dbname'] = $dbNameOverride;
824 }
825
826 // Let the handle know what the cluster master is (e.g. "db1052")
827 $masterName = $this->getServerName( $this->getWriterIndex() );
828 $server['clusterMasterHost'] = $masterName;
829
830 // Log when many connection are made on requests
831 if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
832 $this->perfLogger->warning( __METHOD__ . ": " .
833 "{$this->connsOpened}+ connections made (master=$masterName)" );
834 }
835
836 $server['srvCache'] = $this->srvCache;
837 // Set loggers and profilers
838 $server['connLogger'] = $this->connLogger;
839 $server['queryLogger'] = $this->queryLogger;
840 $server['errorLogger'] = $this->errorLogger;
841 $server['profiler'] = $this->profiler;
842 $server['trxProfiler'] = $this->trxProfiler;
843 // Use the same agent and PHP mode for all DB handles
844 $server['cliMode'] = $this->cliMode;
845 $server['agent'] = $this->agent;
846 // Use DBO_DEFAULT flags by default for LoadBalancer managed databases. Assume that the
847 // application calls LoadBalancer::commitMasterChanges() before the PHP script completes.
848 $server['flags'] = isset( $server['flags'] ) ? $server['flags'] : IDatabase::DBO_DEFAULT;
849
850 // Create a live connection object
851 try {
852 $db = Database::factory( $server['type'], $server );
853 } catch ( DBConnectionError $e ) {
854 // FIXME: This is probably the ugliest thing I have ever done to
855 // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
856 $db = $e->db;
857 }
858
859 $db->setLBInfo( $server );
860 $db->setLazyMasterHandle(
861 $this->getLazyConnectionRef( self::DB_MASTER, [], $db->getDomainID() )
862 );
863 $db->setTableAliases( $this->tableAliases );
864
865 if ( $server['serverIndex'] === $this->getWriterIndex() ) {
866 if ( $this->trxRoundId !== false ) {
867 $this->applyTransactionRoundFlags( $db );
868 }
869 foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
870 $db->setTransactionListener( $name, $callback );
871 }
872 }
873
874 return $db;
875 }
876
880 private function reportConnectionError() {
881 $conn = $this->mErrorConnection; // the connection which caused the error
882 $context = [
883 'method' => __METHOD__,
884 'last_error' => $this->mLastError,
885 ];
886
887 if ( !is_object( $conn ) ) {
888 // No last connection, probably due to all servers being too busy
889 $this->connLogger->error(
890 "LB failure with no last connection. Connection error: {last_error}",
892 );
893
894 // If all servers were busy, mLastError will contain something sensible
895 throw new DBConnectionError( null, $this->mLastError );
896 } else {
897 $context['db_server'] = $conn->getServer();
898 $this->connLogger->warning(
899 "Connection error: {last_error} ({db_server})",
901 );
902
903 // throws DBConnectionError
904 $conn->reportConnectionError( "{$this->mLastError} ({$context['db_server']})" );
905 }
906 }
907
908 public function getWriterIndex() {
909 return 0;
910 }
911
912 public function haveIndex( $i ) {
913 return array_key_exists( $i, $this->mServers );
914 }
915
916 public function isNonZeroLoad( $i ) {
917 return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0;
918 }
919
920 public function getServerCount() {
921 return count( $this->mServers );
922 }
923
924 public function getServerName( $i ) {
925 if ( isset( $this->mServers[$i]['hostName'] ) ) {
926 $name = $this->mServers[$i]['hostName'];
927 } elseif ( isset( $this->mServers[$i]['host'] ) ) {
928 $name = $this->mServers[$i]['host'];
929 } else {
930 $name = '';
931 }
932
933 return ( $name != '' ) ? $name : 'localhost';
934 }
935
936 public function getServerInfo( $i ) {
937 if ( isset( $this->mServers[$i] ) ) {
938 return $this->mServers[$i];
939 } else {
940 return false;
941 }
942 }
943
944 public function setServerInfo( $i, array $serverInfo ) {
945 $this->mServers[$i] = $serverInfo;
946 }
947
948 public function getMasterPos() {
949 # If this entire request was served from a replica DB without opening a connection to the
950 # master (however unlikely that may be), then we can fetch the position from the replica DB.
951 $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
952 if ( !$masterConn ) {
953 $serverCount = count( $this->mServers );
954 for ( $i = 1; $i < $serverCount; $i++ ) {
955 $conn = $this->getAnyOpenConnection( $i );
956 if ( $conn ) {
957 return $conn->getReplicaPos();
958 }
959 }
960 } else {
961 return $masterConn->getMasterPos();
962 }
963
964 return false;
965 }
966
967 public function disable() {
968 $this->closeAll();
969 $this->disabled = true;
970 }
971
972 public function closeAll() {
973 $this->forEachOpenConnection( function ( IDatabase $conn ) {
974 $host = $conn->getServer();
975 $this->connLogger->debug( "Closing connection to database '$host'." );
976 $conn->close();
977 } );
978
979 $this->mConns = [
980 'local' => [],
981 'foreignFree' => [],
982 'foreignUsed' => [],
983 ];
984 $this->connsOpened = 0;
985 }
986
987 public function closeConnection( IDatabase $conn ) {
988 $serverIndex = $conn->getLBInfo( 'serverIndex' ); // second index level of mConns
989 foreach ( $this->mConns as $type => $connsByServer ) {
990 if ( !isset( $connsByServer[$serverIndex] ) ) {
991 continue;
992 }
993
994 foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
995 if ( $conn === $trackedConn ) {
996 $host = $this->getServerName( $i );
997 $this->connLogger->debug( "Closing connection to database $i at '$host'." );
998 unset( $this->mConns[$type][$serverIndex][$i] );
999 --$this->connsOpened;
1000 break 2;
1001 }
1002 }
1003 }
1004
1005 $conn->close();
1006 }
1007
1008 public function commitAll( $fname = __METHOD__ ) {
1009 $failures = [];
1010
1011 $restore = ( $this->trxRoundId !== false );
1012 $this->trxRoundId = false;
1013 $this->forEachOpenConnection(
1014 function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
1015 try {
1016 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1017 } catch ( DBError $e ) {
1018 call_user_func( $this->errorLogger, $e );
1019 $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1020 }
1021 if ( $restore && $conn->getLBInfo( 'master' ) ) {
1022 $this->undoTransactionRoundFlags( $conn );
1023 }
1024 }
1025 );
1026
1027 if ( $failures ) {
1028 throw new DBExpectedError(
1029 null,
1030 "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1031 );
1032 }
1033 }
1034
1035 public function finalizeMasterChanges() {
1036 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1037 // Any error should cause all DB transactions to be rolled back together
1038 $conn->setTrxEndCallbackSuppression( false );
1040 // Defer post-commit callbacks until COMMIT finishes for all DBs
1041 $conn->setTrxEndCallbackSuppression( true );
1042 } );
1043 }
1044
1046 $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
1047 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
1048 // If atomic sections or explicit transactions are still open, some caller must have
1049 // caught an exception but failed to properly rollback any changes. Detect that and
1050 // throw and error (causing rollback).
1051 if ( $conn->explicitTrxActive() ) {
1052 throw new DBTransactionError(
1053 $conn,
1054 "Explicit transaction still active. A caller may have caught an error."
1055 );
1056 }
1057 // Assert that the time to replicate the transaction will be sane.
1058 // If this fails, then all DB transactions will be rollback back together.
1059 $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1060 if ( $limit > 0 && $time > $limit ) {
1061 throw new DBTransactionSizeError(
1062 $conn,
1063 "Transaction spent $time second(s) in writes, exceeding the $limit limit.",
1064 [ $time, $limit ]
1065 );
1066 }
1067 // If a connection sits idle while slow queries execute on another, that connection
1068 // may end up dropped before the commit round is reached. Ping servers to detect this.
1069 if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1070 throw new DBTransactionError(
1071 $conn,
1072 "A connection to the {$conn->getDBname()} database was lost before commit."
1073 );
1074 }
1075 } );
1076 }
1077
1078 public function beginMasterChanges( $fname = __METHOD__ ) {
1079 if ( $this->trxRoundId !== false ) {
1080 throw new DBTransactionError(
1081 null,
1082 "$fname: Transaction round '{$this->trxRoundId}' already started."
1083 );
1084 }
1085 $this->trxRoundId = $fname;
1086
1087 $failures = [];
1089 function ( Database $conn ) use ( $fname, &$failures ) {
1090 $conn->setTrxEndCallbackSuppression( true );
1091 try {
1092 $conn->flushSnapshot( $fname );
1093 } catch ( DBError $e ) {
1094 call_user_func( $this->errorLogger, $e );
1095 $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1096 }
1097 $conn->setTrxEndCallbackSuppression( false );
1098 $this->applyTransactionRoundFlags( $conn );
1099 }
1100 );
1101
1102 if ( $failures ) {
1103 throw new DBExpectedError(
1104 null,
1105 "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
1106 );
1107 }
1108 }
1109
1110 public function commitMasterChanges( $fname = __METHOD__ ) {
1111 $failures = [];
1112
1114 $scope = $this->getScopedPHPBehaviorForCommit(); // try to ignore client aborts
1115
1116 $restore = ( $this->trxRoundId !== false );
1117 $this->trxRoundId = false;
1119 function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
1120 try {
1121 if ( $conn->writesOrCallbacksPending() ) {
1122 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1123 } elseif ( $restore ) {
1124 $conn->flushSnapshot( $fname );
1125 }
1126 } catch ( DBError $e ) {
1127 call_user_func( $this->errorLogger, $e );
1128 $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1129 }
1130 if ( $restore ) {
1131 $this->undoTransactionRoundFlags( $conn );
1132 }
1133 }
1134 );
1135
1136 if ( $failures ) {
1137 throw new DBExpectedError(
1138 null,
1139 "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1140 );
1141 }
1142 }
1143
1145 $e = null; // first exception
1146 $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
1147 $conn->setTrxEndCallbackSuppression( false );
1148 if ( $conn->writesOrCallbacksPending() ) {
1149 // This happens if onTransactionIdle() callbacks leave callbacks on *another* DB
1150 // (which finished its callbacks already). Warn and recover in this case. Let the
1151 // callbacks run in the final commitMasterChanges() in LBFactory::shutdown().
1152 $this->queryLogger->error( __METHOD__ . ": found writes/callbacks pending." );
1153 return;
1154 } elseif ( $conn->trxLevel() ) {
1155 // This happens for single-DB setups where DB_REPLICA uses the master DB,
1156 // thus leaving an implicit read-only transaction open at this point. It
1157 // also happens if onTransactionIdle() callbacks leave implicit transactions
1158 // open on *other* DBs (which is slightly improper). Let these COMMIT on the
1159 // next call to commitMasterChanges(), possibly in LBFactory::shutdown().
1160 return;
1161 }
1162 try {
1163 $conn->runOnTransactionIdleCallbacks( $type );
1164 } catch ( Exception $ex ) {
1165 $e = $e ?: $ex;
1166 }
1167 try {
1169 } catch ( Exception $ex ) {
1170 $e = $e ?: $ex;
1171 }
1172 } );
1173
1174 return $e;
1175 }
1176
1177 public function rollbackMasterChanges( $fname = __METHOD__ ) {
1178 $restore = ( $this->trxRoundId !== false );
1179 $this->trxRoundId = false;
1181 function ( IDatabase $conn ) use ( $fname, $restore ) {
1182 if ( $conn->writesOrCallbacksPending() ) {
1183 $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1184 }
1185 if ( $restore ) {
1186 $this->undoTransactionRoundFlags( $conn );
1187 }
1188 }
1189 );
1190 }
1191
1193 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1194 $conn->setTrxEndCallbackSuppression( true );
1195 } );
1196 }
1197
1201 private function applyTransactionRoundFlags( IDatabase $conn ) {
1202 if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1203 // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1204 // Force DBO_TRX even in CLI mode since a commit round is expected soon.
1205 $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
1206 // If config has explicitly requested DBO_TRX be either on or off by not
1207 // setting DBO_DEFAULT, then respect that. Forcing no transactions is useful
1208 // for things like blob stores (ExternalStore) which want auto-commit mode.
1209 }
1210 }
1211
1215 private function undoTransactionRoundFlags( IDatabase $conn ) {
1216 if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1217 $conn->restoreFlags( $conn::RESTORE_PRIOR );
1218 }
1219 }
1220
1221 public function flushReplicaSnapshots( $fname = __METHOD__ ) {
1222 $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) {
1223 $conn->flushSnapshot( __METHOD__ );
1224 } );
1225 }
1226
1227 public function hasMasterConnection() {
1228 return $this->isOpen( $this->getWriterIndex() );
1229 }
1230
1231 public function hasMasterChanges() {
1232 $pending = 0;
1233 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$pending ) {
1234 $pending |= $conn->writesOrCallbacksPending();
1235 } );
1236
1237 return (bool)$pending;
1238 }
1239
1240 public function lastMasterChangeTimestamp() {
1241 $lastTime = false;
1242 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$lastTime ) {
1243 $lastTime = max( $lastTime, $conn->lastDoneWrites() );
1244 } );
1245
1246 return $lastTime;
1247 }
1248
1249 public function hasOrMadeRecentMasterChanges( $age = null ) {
1250 $age = ( $age === null ) ? $this->mWaitTimeout : $age;
1251
1252 return ( $this->hasMasterChanges()
1253 || $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
1254 }
1255
1256 public function pendingMasterChangeCallers() {
1257 $fnames = [];
1258 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$fnames ) {
1259 $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1260 } );
1261
1262 return $fnames;
1263 }
1264
1265 public function getLaggedReplicaMode( $domain = false ) {
1266 // No-op if there is only one DB (also avoids recursion)
1267 if ( !$this->laggedReplicaMode && $this->getServerCount() > 1 ) {
1268 try {
1269 // See if laggedReplicaMode gets set
1270 $conn = $this->getConnection( self::DB_REPLICA, false, $domain );
1271 $this->reuseConnection( $conn );
1272 } catch ( DBConnectionError $e ) {
1273 // Avoid expensive re-connect attempts and failures
1274 $this->allReplicasDownMode = true;
1275 $this->laggedReplicaMode = true;
1276 }
1277 }
1278
1279 return $this->laggedReplicaMode;
1280 }
1281
1287 public function getLaggedSlaveMode( $domain = false ) {
1288 return $this->getLaggedReplicaMode( $domain );
1289 }
1290
1291 public function laggedReplicaUsed() {
1292 return $this->laggedReplicaMode;
1293 }
1294
1300 public function laggedSlaveUsed() {
1301 return $this->laggedReplicaUsed();
1302 }
1303
1304 public function getReadOnlyReason( $domain = false, IDatabase $conn = null ) {
1305 if ( $this->readOnlyReason !== false ) {
1306 return $this->readOnlyReason;
1307 } elseif ( $this->getLaggedReplicaMode( $domain ) ) {
1308 if ( $this->allReplicasDownMode ) {
1309 return 'The database has been automatically locked ' .
1310 'until the replica database servers become available';
1311 } else {
1312 return 'The database has been automatically locked ' .
1313 'while the replica database servers catch up to the master.';
1314 }
1315 } elseif ( $this->masterRunningReadOnly( $domain, $conn ) ) {
1316 return 'The database master is running in read-only mode.';
1317 }
1318
1319 return false;
1320 }
1321
1327 private function masterRunningReadOnly( $domain, IDatabase $conn = null ) {
1328 $cache = $this->wanCache;
1329 $masterServer = $this->getServerName( $this->getWriterIndex() );
1330
1331 return (bool)$cache->getWithSetCallback(
1332 $cache->makeGlobalKey( __CLASS__, 'server-read-only', $masterServer ),
1333 self::TTL_CACHE_READONLY,
1334 function () use ( $domain, $conn ) {
1335 $old = $this->trxProfiler->setSilenced( true );
1336 try {
1337 $dbw = $conn ?: $this->getConnection( self::DB_MASTER, [], $domain );
1338 $readOnly = (int)$dbw->serverIsReadOnly();
1339 if ( !$conn ) {
1340 $this->reuseConnection( $dbw );
1341 }
1342 } catch ( DBError $e ) {
1343 $readOnly = 0;
1344 }
1345 $this->trxProfiler->setSilenced( $old );
1346 return $readOnly;
1347 },
1348 [ 'pcTTL' => $cache::TTL_PROC_LONG, 'busyValue' => 0 ]
1349 );
1350 }
1351
1352 public function allowLagged( $mode = null ) {
1353 if ( $mode === null ) {
1354 return $this->mAllowLagged;
1355 }
1356 $this->mAllowLagged = $mode;
1357
1358 return $this->mAllowLagged;
1359 }
1360
1361 public function pingAll() {
1362 $success = true;
1363 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$success ) {
1364 if ( !$conn->ping() ) {
1365 $success = false;
1366 }
1367 } );
1368
1369 return $success;
1370 }
1371
1372 public function forEachOpenConnection( $callback, array $params = [] ) {
1373 foreach ( $this->mConns as $connsByServer ) {
1374 foreach ( $connsByServer as $serverConns ) {
1375 foreach ( $serverConns as $conn ) {
1376 $mergedParams = array_merge( [ $conn ], $params );
1377 call_user_func_array( $callback, $mergedParams );
1378 }
1379 }
1380 }
1381 }
1382
1383 public function forEachOpenMasterConnection( $callback, array $params = [] ) {
1384 $masterIndex = $this->getWriterIndex();
1385 foreach ( $this->mConns as $connsByServer ) {
1386 if ( isset( $connsByServer[$masterIndex] ) ) {
1388 foreach ( $connsByServer[$masterIndex] as $conn ) {
1389 $mergedParams = array_merge( [ $conn ], $params );
1390 call_user_func_array( $callback, $mergedParams );
1391 }
1392 }
1393 }
1394 }
1395
1396 public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
1397 foreach ( $this->mConns as $connsByServer ) {
1398 foreach ( $connsByServer as $i => $serverConns ) {
1399 if ( $i === $this->getWriterIndex() ) {
1400 continue; // skip master
1401 }
1402 foreach ( $serverConns as $conn ) {
1403 $mergedParams = array_merge( [ $conn ], $params );
1404 call_user_func_array( $callback, $mergedParams );
1405 }
1406 }
1407 }
1408 }
1409
1410 public function getMaxLag( $domain = false ) {
1411 $maxLag = -1;
1412 $host = '';
1413 $maxIndex = 0;
1414
1415 if ( $this->getServerCount() <= 1 ) {
1416 return [ $host, $maxLag, $maxIndex ]; // no replication = no lag
1417 }
1418
1419 $lagTimes = $this->getLagTimes( $domain );
1420 foreach ( $lagTimes as $i => $lag ) {
1421 if ( $this->mLoads[$i] > 0 && $lag > $maxLag ) {
1422 $maxLag = $lag;
1423 $host = $this->mServers[$i]['host'];
1424 $maxIndex = $i;
1425 }
1426 }
1427
1428 return [ $host, $maxLag, $maxIndex ];
1429 }
1430
1431 public function getLagTimes( $domain = false ) {
1432 if ( $this->getServerCount() <= 1 ) {
1433 return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
1434 }
1435
1436 $knownLagTimes = []; // map of (server index => 0 seconds)
1437 $indexesWithLag = [];
1438 foreach ( $this->mServers as $i => $server ) {
1439 if ( empty( $server['is static'] ) ) {
1440 $indexesWithLag[] = $i; // DB server might have replication lag
1441 } else {
1442 $knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
1443 }
1444 }
1445
1446 return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
1447 }
1448
1449 public function safeGetLag( IDatabase $conn ) {
1450 if ( $this->getServerCount() <= 1 ) {
1451 return 0;
1452 } else {
1453 return $conn->getLag();
1454 }
1455 }
1456
1462 public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = 10 ) {
1463 if ( $this->getServerCount() <= 1 || !$conn->getLBInfo( 'replica' ) ) {
1464 return true; // server is not a replica DB
1465 }
1466
1467 if ( !$pos ) {
1468 // Get the current master position, opening a connection if needed
1469 $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1470 if ( $masterConn ) {
1471 $pos = $masterConn->getMasterPos();
1472 } else {
1473 $masterConn = $this->openConnection( $this->getWriterIndex(), self::DOMAIN_ANY );
1474 $pos = $masterConn->getMasterPos();
1475 $this->closeConnection( $masterConn );
1476 }
1477 }
1478
1479 if ( $pos instanceof DBMasterPos ) {
1480 $result = $conn->masterPosWait( $pos, $timeout );
1481 if ( $result == -1 || is_null( $result ) ) {
1482 $msg = __METHOD__ . ": Timed out waiting on {$conn->getServer()} pos {$pos}";
1483 $this->replLogger->warning( "$msg" );
1484 $ok = false;
1485 } else {
1486 $this->replLogger->info( __METHOD__ . ": Done" );
1487 $ok = true;
1488 }
1489 } else {
1490 $ok = false; // something is misconfigured
1491 $this->replLogger->error( "Could not get master pos for {$conn->getServer()}." );
1492 }
1493
1494 return $ok;
1495 }
1496
1497 public function setTransactionListener( $name, callable $callback = null ) {
1498 if ( $callback ) {
1499 $this->trxRecurringCallbacks[$name] = $callback;
1500 } else {
1501 unset( $this->trxRecurringCallbacks[$name] );
1502 }
1504 function ( IDatabase $conn ) use ( $name, $callback ) {
1505 $conn->setTransactionListener( $name, $callback );
1506 }
1507 );
1508 }
1509
1510 public function setTableAliases( array $aliases ) {
1511 $this->tableAliases = $aliases;
1512 }
1513
1514 public function setDomainPrefix( $prefix ) {
1515 if ( $this->mConns['foreignUsed'] ) {
1516 // Do not switch connections to explicit foreign domains unless marked as free
1517 $domains = [];
1518 foreach ( $this->mConns['foreignUsed'] as $i => $connsByDomain ) {
1519 $domains = array_merge( $domains, array_keys( $connsByDomain ) );
1520 }
1521 $domains = implode( ', ', $domains );
1522 throw new DBUnexpectedError( null,
1523 "Foreign domain connections are still in use ($domains)." );
1524 }
1525
1526 $this->localDomain = new DatabaseDomain(
1527 $this->localDomain->getDatabase(),
1528 null,
1529 $prefix
1530 );
1531
1532 $this->forEachOpenConnection( function ( IDatabase $db ) use ( $prefix ) {
1533 $db->tablePrefix( $prefix );
1534 } );
1535 }
1536
1543 final protected function getScopedPHPBehaviorForCommit() {
1544 if ( PHP_SAPI != 'cli' ) { // http://bugs.php.net/bug.php?id=47540
1545 $old = ignore_user_abort( true ); // avoid half-finished operations
1546 return new ScopedCallback( function () use ( $old ) {
1547 ignore_user_abort( $old );
1548 } );
1549 }
1550
1551 return null;
1552 }
1553
1554 function __destruct() {
1555 // Avoid connection leaks for sanity
1556 $this->disable();
1557 }
1558}
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
if(!defined( 'MEDIAWIKI')) $fname
This file is not a valid entry point, perform no further processing unless MEDIAWIKI is defined.
Definition Setup.php:36
if($IP===false)
Definition WebStart.php:59
static pickRandom( $weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
interface is intended to be more or less compatible with the PHP memcached client.
Definition BagOStuff.php:47
Exception class for attempted DB access.
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Definition DBConnRef.php:10
Database error base class.
Definition DBError.php:26
Base class for the more common types of database errors.
Class to handle database/prefix specification for IDatabase domains.
static newFromId( $domain)
static newUnspecified()
Relational database abstraction object.
Definition Database.php:36
flushSnapshot( $fname=__METHOD__)
Commit any transaction but error out if writes or callbacks are pending.
writesOrCallbacksPending()
Returns true if there is a transaction open with possible write queries or transaction pre-commit/idl...
Definition Database.php:526
runOnTransactionPreCommitCallbacks()
Actually run and consume any "on transaction pre-commit" callbacks.
setTrxEndCallbackSuppression( $suppress)
Whether to disable running of post-COMMIT/ROLLBACK callbacks.
trxLevel()
Gets the current transaction level.
Definition Database.php:440
runTransactionListenerCallbacks( $trigger)
Actually run any "transaction listener" callbacks.
A BagOStuff object with no objects in it.
Database connection, tracking, load balancing, and transaction manager for a cluster.
isNonZeroLoad( $i)
Returns true if the specified index is valid and has non-zero load.
integer $connsOpened
Total connections opened.
pendingMasterChangeCallers()
Get the list of callers that have pending master changes.
getRandomNonLagged(array $loads, $domain=false, $maxLag=INF)
undoTransactionRoundFlags(IDatabase $conn)
getConnection( $i, $groups=[], $domain=false)
bool IDatabase $mErrorConnection
Database connection that caused a problem.
string bool $trxRoundId
String if a requested DBO_TRX transaction round is active.
integer $mWaitTimeout
Seconds to spend waiting on replica DB lag to resolve.
doWait( $index, $open=false, $timeout=null)
Wait for a given replica DB to catch up to the master pos stored in $this.
getMaxLag( $domain=false)
Get the hostname and lag time of the most-lagged replica DB.
isOpen( $index)
Test if the specified index represents an open connection.
float[] $mLoads
Map of (server index => weight)
getLagTimes( $domain=false)
Get an estimate of replication lag (in seconds) for each server.
safeWaitForMasterPos(IDatabase $conn, $pos=false, $timeout=10)
masterRunningReadOnly( $domain, IDatabase $conn=null)
array[] $mServers
Map of (server index => server config array)
openConnection( $i, $domain=false)
waitForOne( $pos, $timeout=null)
Set the master wait position and wait for a "generic" replica DB to catch up to it.
boolean $disabled
string $agent
Agent name for query profiling.
approveMasterChanges(array $options)
Perform all pre-commit checks for things like replication safety.
commitAll( $fname=__METHOD__)
Commit transactions on all open connections.
setDomainPrefix( $prefix)
Set a new table prefix for the existing local domain ID for testing.
getLaggedReplicaMode( $domain=false)
forEachOpenReplicaConnection( $callback, array $params=[])
Call a function with each open replica DB connection object.
allowLagged( $mode=null)
Disables/enables lag checks.
array[] $mGroupLoads
Map of (group => server index => weight)
getServerInfo( $i)
Return the server info structure for a given index, or false if the index is invalid.
getLaggedSlaveMode( $domain=false)
LoggerInterface $perfLogger
closeAll()
Close all open connections.
getReadOnlyReason( $domain=false, IDatabase $conn=null)
LoggerInterface $queryLogger
array[] $trxRecurringCallbacks
Map of (name => callable)
lastMasterChangeTimestamp()
Get the timestamp of the latest write query done by this thread.
WANObjectCache $wanCache
array $loadMonitorConfig
The LoadMonitor configuration.
disable()
Disable this load balancer.
forEachOpenConnection( $callback, array $params=[])
Call a function with each open connection object.
hasMasterChanges()
Determine if there are pending changes in a transaction by this thread.
closeConnection(IDatabase $conn)
Close a connection.
getServerCount()
Get the number of defined servers (not the number of open connections)
IDatabase[][] $mConns
Map of (local/foreignUsed/foreignFree => server index => IDatabase array)
DatabaseDomain $localDomain
Local Domain ID and default for selectDB() calls.
finalizeMasterChanges()
Perform all pre-commit callbacks that remain part of the atomic transactions and disable any post-com...
ILoadMonitor $loadMonitor
rollbackMasterChanges( $fname=__METHOD__)
Issue ROLLBACK only on master, only if queries were done on connection.
__construct(array $params)
Construct a manager of IDatabase connection objects.
getServerName( $i)
Get the host name or IP address of the server with the specified index Prefer a readable name if avai...
runMasterPostTrxCallbacks( $type)
Issue all pending post-COMMIT/ROLLBACK callbacks.
BagOStuff $memCache
getAnyOpenConnection( $i)
callable $errorLogger
Exception logger.
string $localDomainIdAlias
Alternate ID string for the domain instead of DatabaseDomain::getId()
bool DBMasterPos $mWaitForPos
False if not set.
bool $allReplicasDownMode
Whether the generic reader fell back to a lagged replica DB.
setServerInfo( $i, array $serverInfo)
Sets the server info structure for the given index.
string $host
Current server name.
openForeignConnection( $i, $domain)
Open a connection to a foreign DB, or return one if it is already open.
getLoadMonitor()
Get a LoadMonitor instance.
waitForAll( $pos, $timeout=null)
Set the master wait position and wait for ALL replica DBs to catch up to it.
flushReplicaSnapshots( $fname=__METHOD__)
Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshot.
applyTransactionRoundFlags(IDatabase $conn)
bool $cliMode
Whether this PHP instance is for a CLI script.
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
setTableAliases(array $aliases)
Make certain table names use their own database, schema, and table prefix when passed into SQL querie...
reallyOpenConnection(array $server, $dbNameOverride=false)
Really opens a connection.
setTransactionListener( $name, callable $callback=null)
Set a callback via IDatabase::setTransactionListener() on all current and future master connections o...
string bool $readOnlyReason
Reason the LB is read-only or false if not.
getScopedPHPBehaviorForCommit()
Make PHP ignore user aborts/disconnects until the returned value leaves scope.
string $mLastError
The last DB selection or connection error.
TransactionProfiler $trxProfiler
bool $laggedReplicaMode
Whether the generic reader fell back to a lagged replica DB.
beginMasterChanges( $fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
getMasterPos()
Get the current master position for chronology control purposes.
getReaderIndex( $group=false, $domain=false)
Get the index of the reader connection, which may be a replica DB This takes into account load ratios...
suppressTransactionEndCallbacks()
Suppress all pending post-COMMIT/ROLLBACK callbacks.
BagOStuff $srvCache
haveIndex( $i)
Returns true if the specified index is a valid server index.
reuseConnection( $conn)
Mark a foreign connection as being available for reuse under a different DB name or prefix.
safeGetLag(IDatabase $conn)
Get the lag in seconds for a given connection, or zero if this load balancer does not have replicatio...
integer $mReadIndex
The generic (not query grouped) replica DB index (of $mServers)
commitMasterChanges( $fname=__METHOD__)
Issue COMMIT on all master connections where writes where done.
object string $profiler
Class name or object With profileIn/profileOut methods.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
bool $mAllowLagged
Whether to disregard replica DB lag as a factor in replica DB selection.
getConnectionRef( $db, $groups=[], $domain=false)
Get a database connection handle reference.
LoggerInterface $replLogger
LoggerInterface $connLogger
getLazyConnectionRef( $db, $groups=[], $domain=false)
Get a database connection handle reference without connecting yet.
Helper class that detects high-contention DB queries via profiling calls.
Multi-datacenter aware caching interface.
when a variable name is used in a function
Definition design.txt:94
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any and then calling but I prefer the flexibility This should also do the output encoding The system allocates a global one in $wgOut Title Represents the title of an and does all the work of translating among various forms such as plain database key
Definition design.txt:26
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
the array() calling protocol came about after MediaWiki 1.4rc1.
namespace are movable Hooks may change this value to override the return value of MWNamespace::isMovable(). 'NewDifferenceEngine' do that in ParserLimitReportFormat instead use this to modify the parameters of the image and a DIV can begin in one section and end in another Make sure your code can handle that case gracefully See the EditSectionClearerLink extension for an example zero but section is usually empty its values are the globals values before the output is cached one of or reset my talk my contributions etc etc otherwise the built in rate limiting checks are if enabled allows for interception of redirect as a string mapping parameter names to values & $type
Definition hooks.txt:2568
see documentation in includes Linker php for Linker::makeImageLink & $time
Definition hooks.txt:1752
The index of the header message $result[1]=The index of the body text message $result[2 through n]=Parameters passed to body text message. Please note the header message cannot receive/use parameters. 'ImportHandleLogItemXMLTag':When parsing a XML tag in a log item. Return false to stop further processing of the tag $reader:XMLReader object $logInfo:Array of information 'ImportHandlePageXMLTag':When parsing a XML tag in a page. Return false to stop further processing of the tag $reader:XMLReader object & $pageInfo:Array of information 'ImportHandleRevisionXMLTag':When parsing a XML tag in a page revision. Return false to stop further processing of the tag $reader:XMLReader object $pageInfo:Array of page information $revisionInfo:Array of revision information 'ImportHandleToplevelXMLTag':When parsing a top level XML tag. Return false to stop further processing of the tag $reader:XMLReader object 'ImportHandleUploadXMLTag':When parsing a XML tag in a file upload. Return false to stop further processing of the tag $reader:XMLReader object $revisionInfo:Array of information 'ImportLogInterwikiLink':Hook to change the interwiki link used in log entries and edit summaries for transwiki imports. & $fullInterwikiPrefix:Interwiki prefix, may contain colons. & $pageTitle:String that contains page title. 'ImportSources':Called when reading from the $wgImportSources configuration variable. Can be used to lazy-load the import sources list. & $importSources:The value of $wgImportSources. Modify as necessary. See the comment in DefaultSettings.php for the detail of how to structure this array. 'InfoAction':When building information to display on the action=info page. $context:IContextSource object & $pageInfo:Array of information 'InitializeArticleMaybeRedirect':MediaWiki check to see if title is a redirect. & $title:Title object for the current page & $request:WebRequest & $ignoreRedirect:boolean to skip redirect check & $target:Title/string of redirect target & $article:Article object 'InternalParseBeforeLinks':during Parser 's internalParse method before links but after nowiki/noinclude/includeonly/onlyinclude and other processings. & $parser:Parser object & $text:string containing partially parsed text & $stripState:Parser 's internal StripState object 'InternalParseBeforeSanitize':during Parser 's internalParse method just before the parser removes unwanted/dangerous HTML tags and after nowiki/noinclude/includeonly/onlyinclude and other processings. Ideal for syntax-extensions after template/parser function execution which respect nowiki and HTML-comments. & $parser:Parser object & $text:string containing partially parsed text & $stripState:Parser 's internal StripState object 'InterwikiLoadPrefix':When resolving if a given prefix is an interwiki or not. Return true without providing an interwiki to continue interwiki search. $prefix:interwiki prefix we are looking for. & $iwData:output array describing the interwiki with keys iw_url, iw_local, iw_trans and optionally iw_api and iw_wikiid. 'InvalidateEmailComplete':Called after a user 's email has been invalidated successfully. $user:user(object) whose email is being invalidated 'IRCLineURL':When constructing the URL to use in an IRC notification. Callee may modify $url and $query, URL will be constructed as $url . $query & $url:URL to index.php & $query:Query string $rc:RecentChange object that triggered url generation 'IsFileCacheable':Override the result of Article::isFileCacheable()(if true) & $article:article(object) being checked 'IsTrustedProxy':Override the result of IP::isTrustedProxy() & $ip:IP being check & $result:Change this value to override the result of IP::isTrustedProxy() 'IsUploadAllowedFromUrl':Override the result of UploadFromUrl::isAllowedUrl() $url:URL used to upload from & $allowed:Boolean indicating if uploading is allowed for given URL 'isValidEmailAddr':Override the result of Sanitizer::validateEmail(), for instance to return false if the domain name doesn 't match your organization. $addr:The e-mail address entered by the user & $result:Set this and return false to override the internal checks 'isValidPassword':Override the result of User::isValidPassword() $password:The password entered by the user & $result:Set this and return false to override the internal checks $user:User the password is being validated for 'Language::getMessagesFileName':$code:The language code or the language we 're looking for a messages file for & $file:The messages file path, you can override this to change the location. 'LanguageGetMagic':DEPRECATED! Use $magicWords in a file listed in $wgExtensionMessagesFiles instead. Use this to define synonyms of magic words depending of the language & $magicExtensions:associative array of magic words synonyms $lang:language code(string) 'LanguageGetNamespaces':Provide custom ordering for namespaces or remove namespaces. Do not use this hook to add namespaces. Use CanonicalNamespaces for that. & $namespaces:Array of namespaces indexed by their numbers 'LanguageGetSpecialPageAliases':DEPRECATED! Use $specialPageAliases in a file listed in $wgExtensionMessagesFiles instead. Use to define aliases of special pages names depending of the language & $specialPageAliases:associative array of magic words synonyms $lang:language code(string) 'LanguageGetTranslatedLanguageNames':Provide translated language names. & $names:array of language code=> language name $code:language of the preferred translations 'LanguageLinks':Manipulate a page 's language links. This is called in various places to allow extensions to define the effective language links for a page. $title:The page 's Title. & $links:Associative array mapping language codes to prefixed links of the form "language:title". & $linkFlags:Associative array mapping prefixed links to arrays of flags. Currently unused, but planned to provide support for marking individual language links in the UI, e.g. for featured articles. 'LanguageSelector':Hook to change the language selector available on a page. $out:The output page. $cssClassName:CSS class name of the language selector. 'LinkBegin':DEPRECATED! Use HtmlPageLinkRendererBegin instead. Used when generating internal and interwiki links in Linker::link(), before processing starts. Return false to skip default processing and return $ret. See documentation for Linker::link() for details on the expected meanings of parameters. $skin:the Skin object $target:the Title that the link is pointing to & $html:the contents that the< a > tag should have(raw HTML) $result
Definition hooks.txt:1937
this hook is for auditing only RecentChangesLinked and Watchlist RecentChangesLinked and Watchlist e g Watchlist removed from all revisions and log entries to which it was applied This gives extensions a chance to take it off their books as the deletion has already been partly carried out by this point or something similar the user will be unable to create the tag set and then return false from the hook function Ensure you consume the ChangeTagAfterDelete hook to carry out custom deletion actions as context called by AbstractContent::getParserOutput May be used to override the normal model specific rendering of page content as context as context $options
Definition hooks.txt:1096
this hook is for auditing only RecentChangesLinked and Watchlist RecentChangesLinked and Watchlist e g Watchlist removed from all revisions and log entries to which it was applied This gives extensions a chance to take it off their books as the deletion has already been partly carried out by this point or something similar the user will be unable to create the tag set and then return false from the hook function Ensure you consume the ChangeTagAfterDelete hook to carry out custom deletion actions as context called by AbstractContent::getParserOutput May be used to override the normal model specific rendering of page content as context as context the output can only depend on parameters provided to this hook not on global state indicating whether full HTML should be generated If generation of HTML may be but other information should still be present in the ParserOutput object to manipulate or replace but no entry for that model exists in $wgContentHandlers if desired whether it is OK to use $contentModel on $title Handler functions that modify $ok should generally return false to prevent further hooks from further modifying $ok inclusive $limit
Definition hooks.txt:1135
Allows to change the fields on the form that will be generated $name
Definition hooks.txt:304
processing should stop and the error should be shown to the user * false
Definition hooks.txt:189
returning false will NOT prevent logging $e
Definition hooks.txt:2110
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition injection.txt:37
An object representing a master or replica DB position in a replicated setup.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:34
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
explicitTrxActive()
close()
Closes a database connection.
ping(&$rtt=null)
Ping the server and try to reconnect if it there is no connection.
flushSnapshot( $fname=__METHOD__)
Commit any transaction but error out if writes or callbacks are pending.
restoreFlags( $state=self::RESTORE_PRIOR)
Restore the flags to their prior state before the last setFlag/clearFlag call.
pendingWriteQueryDuration( $type=self::ESTIMATE_TOTAL)
Get the time spend running write queries for this transaction.
pendingWriteCallers()
Get the list of method names that did write queries for this transaction.
writesOrCallbacksPending()
Returns true if there is a transaction open with possible write queries or transaction pre-commit/idl...
getServer()
Get the server hostname or IP address.
setTransactionListener( $name, callable $callback=null)
Run a callback each time any transaction commits or rolls back.
lastDoneWrites()
Returns the last time the connection may have been used for write queries.
tablePrefix( $prefix=null)
Get/set the table prefix.
getLag()
Get replica DB lag.
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
commit( $fname=__METHOD__, $flush='')
Commits a transaction previously started using begin().
masterPosWait(DBMasterPos $pos, $timeout)
Wait for the replica DB to catch up to a given master position.
Database cluster connection, tracking, load balancing, and transaction manager interface.
An interface for database load monitoring.
$context
Definition load.php:50
$cache
Definition mcc.php:33
$params