MediaWiki REL1_30
LoadBalancer.php
Go to the documentation of this file.
1<?php
23namespace Wikimedia\Rdbms;
24
25use Psr\Log\LoggerInterface;
26use Psr\Log\NullLogger;
27use Wikimedia\ScopedCallback;
28use BagOStuff;
31use ArrayUtils;
32use InvalidArgumentException;
33use RuntimeException;
34use Exception;
35
41class LoadBalancer implements ILoadBalancer {
43 private $mServers;
45 private $mConns;
47 private $mLoads;
49 private $mGroupLoads;
57 private $tableAliases = [];
58
60 private $loadMonitor;
62 private $chronProt;
64 private $srvCache;
66 private $wanCache;
68 protected $profiler;
70 protected $trxProfiler;
72 protected $replLogger;
74 protected $connLogger;
76 protected $queryLogger;
78 protected $perfLogger;
79
83 private $mReadIndex;
85 private $mWaitForPos;
87 private $laggedReplicaMode = false;
89 private $allReplicasDownMode = false;
91 private $mLastError = 'Unknown error';
93 private $readOnlyReason = false;
95 private $connsOpened = 0;
97 private $trxRoundId = false;
105 private $host;
107 protected $cliMode;
109 protected $agent;
110
113
115 private $disabled = false;
117 private $chronProtInitialized = false;
118
120 const CONN_HELD_WARN_THRESHOLD = 10;
121
123 const MAX_LAG_DEFAULT = 10;
125 const TTL_CACHE_READONLY = 5;
126
127 const KEY_LOCAL = 'local';
128 const KEY_FOREIGN_FREE = 'foreignFree';
129 const KEY_FOREIGN_INUSE = 'foreignInUse';
130
131 const KEY_LOCAL_NOROUND = 'localAutoCommit';
132 const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
133 const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
134
135 public function __construct( array $params ) {
136 if ( !isset( $params['servers'] ) ) {
137 throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
138 }
139 $this->mServers = $params['servers'];
140 foreach ( $this->mServers as $i => $server ) {
141 if ( $i == 0 ) {
142 $this->mServers[$i]['master'] = true;
143 } else {
144 $this->mServers[$i]['replica'] = true;
145 }
146 }
147
148 $this->localDomain = isset( $params['localDomain'] )
149 ? DatabaseDomain::newFromId( $params['localDomain'] )
151 // In case a caller assumes that the domain ID is simply <db>-<prefix>, which is almost
152 // always true, gracefully handle the case when they fail to account for escaping.
153 if ( $this->localDomain->getTablePrefix() != '' ) {
154 $this->localDomainIdAlias =
155 $this->localDomain->getDatabase() . '-' . $this->localDomain->getTablePrefix();
156 } else {
157 $this->localDomainIdAlias = $this->localDomain->getDatabase();
158 }
159
160 $this->mWaitTimeout = isset( $params['waitTimeout'] ) ? $params['waitTimeout'] : 10;
161
162 $this->mReadIndex = -1;
163 $this->mConns = [
164 // Connection were transaction rounds may be applied
165 self::KEY_LOCAL => [],
166 self::KEY_FOREIGN_INUSE => [],
167 self::KEY_FOREIGN_FREE => [],
168 // Auto-committing counterpart connections that ignore transaction rounds
169 self::KEY_LOCAL_NOROUND => [],
170 self::KEY_FOREIGN_INUSE_NOROUND => [],
171 self::KEY_FOREIGN_FREE_NOROUND => []
172 ];
173 $this->mLoads = [];
174 $this->mWaitForPos = false;
175 $this->mAllowLagged = false;
176
177 if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
178 $this->readOnlyReason = $params['readOnlyReason'];
179 }
180
181 if ( isset( $params['loadMonitor'] ) ) {
182 $this->loadMonitorConfig = $params['loadMonitor'];
183 } else {
184 $this->loadMonitorConfig = [ 'class' => 'LoadMonitorNull' ];
185 }
186
187 foreach ( $params['servers'] as $i => $server ) {
188 $this->mLoads[$i] = $server['load'];
189 if ( isset( $server['groupLoads'] ) ) {
190 foreach ( $server['groupLoads'] as $group => $ratio ) {
191 if ( !isset( $this->mGroupLoads[$group] ) ) {
192 $this->mGroupLoads[$group] = [];
193 }
194 $this->mGroupLoads[$group][$i] = $ratio;
195 }
196 }
197 }
198
199 if ( isset( $params['srvCache'] ) ) {
200 $this->srvCache = $params['srvCache'];
201 } else {
202 $this->srvCache = new EmptyBagOStuff();
203 }
204 if ( isset( $params['wanCache'] ) ) {
205 $this->wanCache = $params['wanCache'];
206 } else {
207 $this->wanCache = WANObjectCache::newEmpty();
208 }
209 $this->profiler = isset( $params['profiler'] ) ? $params['profiler'] : null;
210 if ( isset( $params['trxProfiler'] ) ) {
211 $this->trxProfiler = $params['trxProfiler'];
212 } else {
213 $this->trxProfiler = new TransactionProfiler();
214 }
215
216 $this->errorLogger = isset( $params['errorLogger'] )
217 ? $params['errorLogger']
218 : function ( Exception $e ) {
219 trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
220 };
221
222 foreach ( [ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ] as $key ) {
223 $this->$key = isset( $params[$key] ) ? $params[$key] : new NullLogger();
224 }
225
226 $this->host = isset( $params['hostname'] )
227 ? $params['hostname']
228 : ( gethostname() ?: 'unknown' );
229 $this->cliMode = isset( $params['cliMode'] ) ? $params['cliMode'] : PHP_SAPI === 'cli';
230 $this->agent = isset( $params['agent'] ) ? $params['agent'] : '';
231
232 if ( isset( $params['chronologyProtector'] ) ) {
233 $this->chronProt = $params['chronologyProtector'];
234 }
235 }
236
242 private function getLoadMonitor() {
243 if ( !isset( $this->loadMonitor ) ) {
244 $compat = [
245 'LoadMonitor' => LoadMonitor::class,
246 'LoadMonitorNull' => LoadMonitorNull::class,
247 'LoadMonitorMySQL' => LoadMonitorMySQL::class,
248 ];
249
250 $class = $this->loadMonitorConfig['class'];
251 if ( isset( $compat[$class] ) ) {
252 $class = $compat[$class];
253 }
254
255 $this->loadMonitor = new $class(
256 $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
257 $this->loadMonitor->setLogger( $this->replLogger );
258 }
259
260 return $this->loadMonitor;
261 }
262
269 private function getRandomNonLagged( array $loads, $domain = false, $maxLag = INF ) {
270 $lags = $this->getLagTimes( $domain );
271
272 # Unset excessively lagged servers
273 foreach ( $lags as $i => $lag ) {
274 if ( $i != 0 ) {
275 # How much lag this server nominally is allowed to have
276 $maxServerLag = isset( $this->mServers[$i]['max lag'] )
277 ? $this->mServers[$i]['max lag']
278 : self::MAX_LAG_DEFAULT; // default
279 # Constrain that futher by $maxLag argument
280 $maxServerLag = min( $maxServerLag, $maxLag );
281
282 $host = $this->getServerName( $i );
283 if ( $lag === false && !is_infinite( $maxServerLag ) ) {
284 $this->replLogger->error(
285 "Server {host} is not replicating?", [ 'host' => $host ] );
286 unset( $loads[$i] );
287 } elseif ( $lag > $maxServerLag ) {
288 $this->replLogger->debug(
289 __METHOD__ .
290 ": server {host} has {lag} seconds of lag (>= {maxlag})",
291 [ 'host' => $host, 'lag' => $lag, 'maxlag' => $maxServerLag ]
292 );
293 unset( $loads[$i] );
294 }
295 }
296 }
297
298 # Find out if all the replica DBs with non-zero load are lagged
299 $sum = 0;
300 foreach ( $loads as $load ) {
301 $sum += $load;
302 }
303 if ( $sum == 0 ) {
304 # No appropriate DB servers except maybe the master and some replica DBs with zero load
305 # Do NOT use the master
306 # Instead, this function will return false, triggering read-only mode,
307 # and a lagged replica DB will be used instead.
308 return false;
309 }
310
311 if ( count( $loads ) == 0 ) {
312 return false;
313 }
314
315 # Return a random representative of the remainder
316 return ArrayUtils::pickRandom( $loads );
317 }
318
319 public function getReaderIndex( $group = false, $domain = false ) {
320 if ( count( $this->mServers ) == 1 ) {
321 // Skip the load balancing if there's only one server
322 return $this->getWriterIndex();
323 } elseif ( $group === false && $this->mReadIndex >= 0 ) {
324 // Shortcut if the generic reader index was already cached
325 return $this->mReadIndex;
326 }
327
328 if ( $group !== false ) {
329 // Use the server weight array for this load group
330 if ( isset( $this->mGroupLoads[$group] ) ) {
331 $loads = $this->mGroupLoads[$group];
332 } else {
333 // No loads for this group, return false and the caller can use some other group
334 $this->connLogger->info( __METHOD__ . ": no loads for group $group" );
335
336 return false;
337 }
338 } else {
339 // Use the generic load group
340 $loads = $this->mLoads;
341 }
342
343 // Scale the configured load ratios according to each server's load and state
344 $this->getLoadMonitor()->scaleLoads( $loads, $domain );
345
346 // Pick a server to use, accounting for weights, load, lag, and mWaitForPos
347 list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
348 if ( $i === false ) {
349 // Replica DB connection unsuccessful
350 return false;
351 }
352
353 if ( $this->mWaitForPos && $i != $this->getWriterIndex() ) {
354 // Before any data queries are run, wait for the server to catch up to the
355 // specified position. This is used to improve session consistency. Note that
356 // when LoadBalancer::waitFor() sets mWaitForPos, the waiting triggers here,
357 // so update laggedReplicaMode as needed for consistency.
358 if ( !$this->doWait( $i ) ) {
359 $laggedReplicaMode = true;
360 }
361 }
362
363 if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group === false ) {
364 // Cache the generic reader index for future ungrouped DB_REPLICA handles
365 $this->mReadIndex = $i;
366 // Record if the generic reader index is in "lagged replica DB" mode
367 if ( $laggedReplicaMode ) {
368 $this->laggedReplicaMode = true;
369 }
370 }
371
372 $serverName = $this->getServerName( $i );
373 $this->connLogger->debug( __METHOD__ . ": using server $serverName for group '$group'" );
374
375 return $i;
376 }
377
383 private function pickReaderIndex( array $loads, $domain = false ) {
384 if ( !count( $loads ) ) {
385 throw new InvalidArgumentException( "Empty server array given to LoadBalancer" );
386 }
387
389 $i = false;
391 $laggedReplicaMode = false;
392
393 // Quickly look through the available servers for a server that meets criteria...
394 $currentLoads = $loads;
395 while ( count( $currentLoads ) ) {
396 if ( $this->mAllowLagged || $laggedReplicaMode ) {
397 $i = ArrayUtils::pickRandom( $currentLoads );
398 } else {
399 $i = false;
400 if ( $this->mWaitForPos && $this->mWaitForPos->asOfTime() ) {
401 // ChronologyProtecter sets mWaitForPos for session consistency.
402 // This triggers doWait() after connect, so it's especially good to
403 // avoid lagged servers so as to avoid excessive delay in that method.
404 $ago = microtime( true ) - $this->mWaitForPos->asOfTime();
405 // Aim for <= 1 second of waiting (being too picky can backfire)
406 $i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
407 }
408 if ( $i === false ) {
409 // Any server with less lag than it's 'max lag' param is preferable
410 $i = $this->getRandomNonLagged( $currentLoads, $domain );
411 }
412 if ( $i === false && count( $currentLoads ) != 0 ) {
413 // All replica DBs lagged. Switch to read-only mode
414 $this->replLogger->error( "All replica DBs lagged. Switch to read-only mode" );
415 $i = ArrayUtils::pickRandom( $currentLoads );
416 $laggedReplicaMode = true;
417 }
418 }
419
420 if ( $i === false ) {
421 // pickRandom() returned false.
422 // This is permanent and means the configuration or the load monitor
423 // wants us to return false.
424 $this->connLogger->debug( __METHOD__ . ": pickRandom() returned false" );
425
426 return [ false, false ];
427 }
428
429 $serverName = $this->getServerName( $i );
430 $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
431
432 $conn = $this->openConnection( $i, $domain );
433 if ( !$conn ) {
434 $this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" );
435 unset( $currentLoads[$i] ); // avoid this server next iteration
436 $i = false;
437 continue;
438 }
439
440 // Decrement reference counter, we are finished with this connection.
441 // It will be incremented for the caller later.
442 if ( $domain !== false ) {
443 $this->reuseConnection( $conn );
444 }
445
446 // Return this server
447 break;
448 }
449
450 // If all servers were down, quit now
451 if ( !count( $currentLoads ) ) {
452 $this->connLogger->error( "All servers down" );
453 }
454
455 return [ $i, $laggedReplicaMode ];
456 }
457
458 public function waitFor( $pos ) {
459 $oldPos = $this->mWaitForPos;
460 try {
461 $this->mWaitForPos = $pos;
462 // If a generic reader connection was already established, then wait now
464 if ( $i > 0 ) {
465 if ( !$this->doWait( $i ) ) {
466 $this->laggedReplicaMode = true;
467 }
468 }
469 } finally {
470 // Restore the older position if it was higher since this is used for lag-protection
471 $this->setWaitForPositionIfHigher( $oldPos );
472 }
473 }
474
475 public function waitForOne( $pos, $timeout = null ) {
476 $oldPos = $this->mWaitForPos;
477 try {
478 $this->mWaitForPos = $pos;
479
481 if ( $i <= 0 ) {
482 // Pick a generic replica DB if there isn't one yet
483 $readLoads = $this->mLoads;
484 unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
485 $readLoads = array_filter( $readLoads ); // with non-zero load
486 $i = ArrayUtils::pickRandom( $readLoads );
487 }
488
489 if ( $i > 0 ) {
490 $ok = $this->doWait( $i, true, $timeout );
491 } else {
492 $ok = true; // no applicable loads
493 }
494 } finally {
495 # Restore the old position, as this is not used for lag-protection but for throttling
496 $this->mWaitForPos = $oldPos;
497 }
498
499 return $ok;
500 }
501
502 public function waitForAll( $pos, $timeout = null ) {
503 $oldPos = $this->mWaitForPos;
504 try {
505 $this->mWaitForPos = $pos;
506 $serverCount = count( $this->mServers );
507
508 $ok = true;
509 for ( $i = 1; $i < $serverCount; $i++ ) {
510 if ( $this->mLoads[$i] > 0 ) {
511 $ok = $this->doWait( $i, true, $timeout ) && $ok;
512 }
513 }
514 } finally {
515 # Restore the old position, as this is not used for lag-protection but for throttling
516 $this->mWaitForPos = $oldPos;
517 }
518
519 return $ok;
520 }
521
525 private function setWaitForPositionIfHigher( $pos ) {
526 if ( !$pos ) {
527 return;
528 }
529
530 if ( !$this->mWaitForPos || $pos->hasReached( $this->mWaitForPos ) ) {
531 $this->mWaitForPos = $pos;
532 }
533 }
534
539 public function getAnyOpenConnection( $i ) {
540 foreach ( $this->mConns as $connsByServer ) {
541 if ( !empty( $connsByServer[$i] ) ) {
543 $serverConns = $connsByServer[$i];
544
545 return reset( $serverConns );
546 }
547 }
548
549 return false;
550 }
551
559 protected function doWait( $index, $open = false, $timeout = null ) {
560 $close = false; // close the connection afterwards
561
562 // Check if we already know that the DB has reached this point
563 $server = $this->getServerName( $index );
564 $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server, 'v1' );
566 $knownReachedPos = $this->srvCache->get( $key );
567 if (
568 $knownReachedPos instanceof DBMasterPos &&
569 $knownReachedPos->hasReached( $this->mWaitForPos )
570 ) {
571 $this->replLogger->debug( __METHOD__ .
572 ": replica DB $server known to be caught up (pos >= $knownReachedPos)." );
573 return true;
574 }
575
576 // Find a connection to wait on, creating one if needed and allowed
577 $conn = $this->getAnyOpenConnection( $index );
578 if ( !$conn ) {
579 if ( !$open ) {
580 $this->replLogger->debug( __METHOD__ . ": no connection open for $server" );
581
582 return false;
583 } else {
584 $conn = $this->openConnection( $index, self::DOMAIN_ANY );
585 if ( !$conn ) {
586 $this->replLogger->warning( __METHOD__ . ": failed to connect to $server" );
587
588 return false;
589 }
590 // Avoid connection spam in waitForAll() when connections
591 // are made just for the sake of doing this lag check.
592 $close = true;
593 }
594 }
595
596 $this->replLogger->info( __METHOD__ . ": Waiting for replica DB $server to catch up..." );
597 $timeout = $timeout ?: $this->mWaitTimeout;
598 $result = $conn->masterPosWait( $this->mWaitForPos, $timeout );
599
600 if ( $result == -1 || is_null( $result ) ) {
601 // Timed out waiting for replica DB, use master instead
602 $this->replLogger->warning(
603 __METHOD__ . ": Timed out waiting on {host} pos {$this->mWaitForPos}",
604 [ 'host' => $server ]
605 );
606 $ok = false;
607 } else {
608 $this->replLogger->info( __METHOD__ . ": Done" );
609 $ok = true;
610 // Remember that the DB reached this point
611 $this->srvCache->set( $key, $this->mWaitForPos, BagOStuff::TTL_DAY );
612 }
613
614 if ( $close ) {
615 $this->closeConnection( $conn );
616 }
617
618 return $ok;
619 }
620
621 public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
622 if ( $i === null || $i === false ) {
623 throw new InvalidArgumentException( 'Attempt to call ' . __METHOD__ .
624 ' with invalid server index' );
625 }
626
627 if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
628 $domain = false; // local connection requested
629 }
630
631 $groups = ( $groups === false || $groups === [] )
632 ? [ false ] // check one "group": the generic pool
633 : (array)$groups;
634
635 $masterOnly = ( $i == self::DB_MASTER || $i == $this->getWriterIndex() );
636 $oldConnsOpened = $this->connsOpened; // connections open now
637
638 if ( $i == self::DB_MASTER ) {
639 $i = $this->getWriterIndex();
640 } else {
641 # Try to find an available server in any the query groups (in order)
642 foreach ( $groups as $group ) {
643 $groupIndex = $this->getReaderIndex( $group, $domain );
644 if ( $groupIndex !== false ) {
645 $i = $groupIndex;
646 break;
647 }
648 }
649 }
650
651 # Operation-based index
652 if ( $i == self::DB_REPLICA ) {
653 $this->mLastError = 'Unknown error'; // reset error string
654 # Try the general server pool if $groups are unavailable.
655 $i = ( $groups === [ false ] )
656 ? false // don't bother with this if that is what was tried above
657 : $this->getReaderIndex( false, $domain );
658 # Couldn't find a working server in getReaderIndex()?
659 if ( $i === false ) {
660 $this->mLastError = 'No working replica DB server: ' . $this->mLastError;
661 // Throw an exception
662 $this->reportConnectionError();
663 return null; // not reached
664 }
665 }
666
667 # Now we have an explicit index into the servers array
668 $conn = $this->openConnection( $i, $domain, $flags );
669 if ( !$conn ) {
670 // Throw an exception
671 $this->reportConnectionError();
672 return null; // not reached
673 }
674
675 # Profile any new connections that happen
676 if ( $this->connsOpened > $oldConnsOpened ) {
677 $host = $conn->getServer();
678 $dbname = $conn->getDBname();
679 $this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
680 }
681
682 if ( $masterOnly ) {
683 # Make master-requested DB handles inherit any read-only mode setting
684 $conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $domain, $conn ) );
685 }
686
687 return $conn;
688 }
689
690 public function reuseConnection( $conn ) {
691 $serverIndex = $conn->getLBInfo( 'serverIndex' );
692 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
693 if ( $serverIndex === null || $refCount === null ) {
704 return;
705 } elseif ( $conn instanceof DBConnRef ) {
706 // DBConnRef already handles calling reuseConnection() and only passes the live
707 // Database instance to this method. Any caller passing in a DBConnRef is broken.
708 $this->connLogger->error( __METHOD__ . ": got DBConnRef instance.\n" .
709 ( new RuntimeException() )->getTraceAsString() );
710
711 return;
712 }
713
714 if ( $this->disabled ) {
715 return; // DBConnRef handle probably survived longer than the LoadBalancer
716 }
717
718 if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
719 $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
720 $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
721 } else {
722 $connFreeKey = self::KEY_FOREIGN_FREE;
723 $connInUseKey = self::KEY_FOREIGN_INUSE;
724 }
725
726 $domain = $conn->getDomainID();
727 if ( !isset( $this->mConns[$connInUseKey][$serverIndex][$domain] ) ) {
728 throw new InvalidArgumentException( __METHOD__ .
729 ": connection $serverIndex/$domain not found; it may have already been freed." );
730 } elseif ( $this->mConns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
731 throw new InvalidArgumentException( __METHOD__ .
732 ": connection $serverIndex/$domain mismatched; it may have already been freed." );
733 }
734
735 $conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
736 if ( $refCount <= 0 ) {
737 $this->mConns[$connFreeKey][$serverIndex][$domain] = $conn;
738 unset( $this->mConns[$connInUseKey][$serverIndex][$domain] );
739 if ( !$this->mConns[$connInUseKey][$serverIndex] ) {
740 unset( $this->mConns[$connInUseKey][$serverIndex] ); // clean up
741 }
742 $this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
743 } else {
744 $this->connLogger->debug( __METHOD__ .
745 ": reference count for $serverIndex/$domain reduced to $refCount" );
746 }
747 }
748
749 public function getConnectionRef( $db, $groups = [], $domain = false, $flags = 0 ) {
750 $domain = ( $domain !== false ) ? $domain : $this->localDomain;
751
752 return new DBConnRef( $this, $this->getConnection( $db, $groups, $domain, $flags ) );
753 }
754
755 public function getLazyConnectionRef( $db, $groups = [], $domain = false, $flags = 0 ) {
756 $domain = ( $domain !== false ) ? $domain : $this->localDomain;
757
758 return new DBConnRef( $this, [ $db, $groups, $domain, $flags ] );
759 }
760
761 public function getMaintenanceConnectionRef( $db, $groups = [], $domain = false, $flags = 0 ) {
762 $domain = ( $domain !== false ) ? $domain : $this->localDomain;
763
764 return new MaintainableDBConnRef(
765 $this, $this->getConnection( $db, $groups, $domain, $flags ) );
766 }
767
768 public function openConnection( $i, $domain = false, $flags = 0 ) {
769 if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
770 $domain = false; // local connection requested
771 }
772
773 if ( !$this->chronProtInitialized && $this->chronProt ) {
774 $this->connLogger->debug( __METHOD__ . ': calling initLB() before first connection.' );
775 // Load CP positions before connecting so that doWait() triggers later if needed
776 $this->chronProtInitialized = true;
777 $this->chronProt->initLB( $this );
778 }
779
780 // Check if an auto-commit connection is being requested. If so, it will not reuse the
781 // main set of DB connections but rather its own pool since:
782 // a) those are usually set to implicitly use transaction rounds via DBO_TRX
783 // b) those must support the use of explicit transaction rounds via beginMasterChanges()
784 $autoCommit = ( ( $flags & self::CONN_TRX_AUTO ) == self::CONN_TRX_AUTO );
785
786 if ( $domain !== false ) {
787 // Connection is to a foreign domain
788 $conn = $this->openForeignConnection( $i, $domain, $flags );
789 } else {
790 // Connection is to the local domain
791 $connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
792 if ( isset( $this->mConns[$connKey][$i][0] ) ) {
793 $conn = $this->mConns[$connKey][$i][0];
794 } else {
795 if ( !isset( $this->mServers[$i] ) || !is_array( $this->mServers[$i] ) ) {
796 throw new InvalidArgumentException( "No server with index '$i'." );
797 }
798 // Open a new connection
799 $server = $this->mServers[$i];
800 $server['serverIndex'] = $i;
801 $server['autoCommitOnly'] = $autoCommit;
802 $conn = $this->reallyOpenConnection( $server, false );
803 $host = $this->getServerName( $i );
804 if ( $conn->isOpen() ) {
805 $this->connLogger->debug( "Connected to database $i at '$host'." );
806 $this->mConns[$connKey][$i][0] = $conn;
807 } else {
808 $this->connLogger->warning( "Failed to connect to database $i at '$host'." );
809 $this->errorConnection = $conn;
810 $conn = false;
811 }
812 }
813 }
814
815 if ( $conn instanceof IDatabase && !$conn->isOpen() ) {
816 // Connection was made but later unrecoverably lost for some reason.
817 // Do not return a handle that will just throw exceptions on use,
818 // but let the calling code (e.g. getReaderIndex) try another server.
819 // See DatabaseMyslBase::ping() for how this can happen.
820 $this->errorConnection = $conn;
821 $conn = false;
822 }
823
824 if ( $autoCommit && $conn instanceof IDatabase ) {
825 $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
826 }
827
828 return $conn;
829 }
830
852 private function openForeignConnection( $i, $domain, $flags = 0 ) {
853 $domainInstance = DatabaseDomain::newFromId( $domain );
854 $dbName = $domainInstance->getDatabase();
855 $prefix = $domainInstance->getTablePrefix();
856 $autoCommit = ( ( $flags & self::CONN_TRX_AUTO ) == self::CONN_TRX_AUTO );
857
858 if ( $autoCommit ) {
859 $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
860 $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
861 } else {
862 $connFreeKey = self::KEY_FOREIGN_FREE;
863 $connInUseKey = self::KEY_FOREIGN_INUSE;
864 }
865
866 if ( isset( $this->mConns[$connInUseKey][$i][$domain] ) ) {
867 // Reuse an in-use connection for the same domain
868 $conn = $this->mConns[$connInUseKey][$i][$domain];
869 $this->connLogger->debug( __METHOD__ . ": reusing connection $i/$domain" );
870 } elseif ( isset( $this->mConns[$connFreeKey][$i][$domain] ) ) {
871 // Reuse a free connection for the same domain
872 $conn = $this->mConns[$connFreeKey][$i][$domain];
873 unset( $this->mConns[$connFreeKey][$i][$domain] );
874 $this->mConns[$connInUseKey][$i][$domain] = $conn;
875 $this->connLogger->debug( __METHOD__ . ": reusing free connection $i/$domain" );
876 } elseif ( !empty( $this->mConns[$connFreeKey][$i] ) ) {
877 // Reuse a free connection from another domain
878 $conn = reset( $this->mConns[$connFreeKey][$i] );
879 $oldDomain = key( $this->mConns[$connFreeKey][$i] );
880 // The empty string as a DB name means "don't care".
881 // DatabaseMysqlBase::open() already handle this on connection.
882 if ( strlen( $dbName ) && !$conn->selectDB( $dbName ) ) {
883 $this->mLastError = "Error selecting database '$dbName' on server " .
884 $conn->getServer() . " from client host {$this->host}";
885 $this->errorConnection = $conn;
886 $conn = false;
887 } else {
888 $conn->tablePrefix( $prefix );
889 unset( $this->mConns[$connFreeKey][$i][$oldDomain] );
890 $this->mConns[$connInUseKey][$i][$domain] = $conn;
891 $this->connLogger->debug( __METHOD__ .
892 ": reusing free connection from $oldDomain for $domain" );
893 }
894 } else {
895 if ( !isset( $this->mServers[$i] ) || !is_array( $this->mServers[$i] ) ) {
896 throw new InvalidArgumentException( "No server with index '$i'." );
897 }
898 // Open a new connection
899 $server = $this->mServers[$i];
900 $server['serverIndex'] = $i;
901 $server['foreignPoolRefCount'] = 0;
902 $server['foreign'] = true;
903 $server['autoCommitOnly'] = $autoCommit;
904 $conn = $this->reallyOpenConnection( $server, $dbName );
905 if ( !$conn->isOpen() ) {
906 $this->connLogger->warning( __METHOD__ . ": connection error for $i/$domain" );
907 $this->errorConnection = $conn;
908 $conn = false;
909 } else {
910 $conn->tablePrefix( $prefix );
911 $this->mConns[$connInUseKey][$i][$domain] = $conn;
912 $this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
913 }
914 }
915
916 // Increment reference count
917 if ( $conn instanceof IDatabase ) {
918 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
919 $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
920 }
921
922 return $conn;
923 }
924
932 private function isOpen( $index ) {
933 if ( !is_integer( $index ) ) {
934 return false;
935 }
936
937 return (bool)$this->getAnyOpenConnection( $index );
938 }
939
951 protected function reallyOpenConnection( array $server, $dbNameOverride = false ) {
952 if ( $this->disabled ) {
953 throw new DBAccessError();
954 }
955
956 if ( $dbNameOverride !== false ) {
957 $server['dbname'] = $dbNameOverride;
958 }
959
960 // Let the handle know what the cluster master is (e.g. "db1052")
961 $masterName = $this->getServerName( $this->getWriterIndex() );
962 $server['clusterMasterHost'] = $masterName;
963
964 // Log when many connection are made on requests
965 if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
966 $this->perfLogger->warning( __METHOD__ . ": " .
967 "{$this->connsOpened}+ connections made (master=$masterName)" );
968 }
969
970 $server['srvCache'] = $this->srvCache;
971 // Set loggers and profilers
972 $server['connLogger'] = $this->connLogger;
973 $server['queryLogger'] = $this->queryLogger;
974 $server['errorLogger'] = $this->errorLogger;
975 $server['profiler'] = $this->profiler;
976 $server['trxProfiler'] = $this->trxProfiler;
977 // Use the same agent and PHP mode for all DB handles
978 $server['cliMode'] = $this->cliMode;
979 $server['agent'] = $this->agent;
980 // Use DBO_DEFAULT flags by default for LoadBalancer managed databases. Assume that the
981 // application calls LoadBalancer::commitMasterChanges() before the PHP script completes.
982 $server['flags'] = isset( $server['flags'] ) ? $server['flags'] : IDatabase::DBO_DEFAULT;
983
984 // Create a live connection object
985 try {
986 $db = Database::factory( $server['type'], $server );
987 } catch ( DBConnectionError $e ) {
988 // FIXME: This is probably the ugliest thing I have ever done to
989 // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
990 $db = $e->db;
991 }
992
993 $db->setLBInfo( $server );
994 $db->setLazyMasterHandle(
995 $this->getLazyConnectionRef( self::DB_MASTER, [], $db->getDomainID() )
996 );
997 $db->setTableAliases( $this->tableAliases );
998
999 if ( $server['serverIndex'] === $this->getWriterIndex() ) {
1000 if ( $this->trxRoundId !== false ) {
1001 $this->applyTransactionRoundFlags( $db );
1002 }
1003 foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
1004 $db->setTransactionListener( $name, $callback );
1005 }
1006 }
1007
1008 return $db;
1009 }
1010
1014 private function reportConnectionError() {
1015 $conn = $this->errorConnection; // the connection which caused the error
1016 $context = [
1017 'method' => __METHOD__,
1018 'last_error' => $this->mLastError,
1019 ];
1020
1021 if ( $conn instanceof IDatabase ) {
1022 $context['db_server'] = $conn->getServer();
1023 $this->connLogger->warning(
1024 "Connection error: {last_error} ({db_server})",
1025 $context
1026 );
1027
1028 // throws DBConnectionError
1029 $conn->reportConnectionError( "{$this->mLastError} ({$context['db_server']})" );
1030 } else {
1031 // No last connection, probably due to all servers being too busy
1032 $this->connLogger->error(
1033 "LB failure with no last connection. Connection error: {last_error}",
1034 $context
1035 );
1036
1037 // If all servers were busy, mLastError will contain something sensible
1038 throw new DBConnectionError( null, $this->mLastError );
1039 }
1040 }
1041
1042 public function getWriterIndex() {
1043 return 0;
1044 }
1045
1046 public function haveIndex( $i ) {
1047 return array_key_exists( $i, $this->mServers );
1048 }
1049
1050 public function isNonZeroLoad( $i ) {
1051 return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0;
1052 }
1053
1054 public function getServerCount() {
1055 return count( $this->mServers );
1056 }
1057
1058 public function getServerName( $i ) {
1059 if ( isset( $this->mServers[$i]['hostName'] ) ) {
1060 $name = $this->mServers[$i]['hostName'];
1061 } elseif ( isset( $this->mServers[$i]['host'] ) ) {
1062 $name = $this->mServers[$i]['host'];
1063 } else {
1064 $name = '';
1065 }
1066
1067 return ( $name != '' ) ? $name : 'localhost';
1068 }
1069
1070 public function getServerType( $i ) {
1071 return isset( $this->mServers[$i]['type'] ) ? $this->mServers[$i]['type'] : 'unknown';
1072 }
1073
1077 public function getServerInfo( $i ) {
1078 wfDeprecated( __METHOD__, '1.30' );
1079 if ( isset( $this->mServers[$i] ) ) {
1080 return $this->mServers[$i];
1081 } else {
1082 return false;
1083 }
1084 }
1085
1089 public function setServerInfo( $i, array $serverInfo ) {
1090 wfDeprecated( __METHOD__, '1.30' );
1091 $this->mServers[$i] = $serverInfo;
1092 }
1093
1094 public function getMasterPos() {
1095 # If this entire request was served from a replica DB without opening a connection to the
1096 # master (however unlikely that may be), then we can fetch the position from the replica DB.
1097 $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1098 if ( !$masterConn ) {
1099 $serverCount = count( $this->mServers );
1100 for ( $i = 1; $i < $serverCount; $i++ ) {
1101 $conn = $this->getAnyOpenConnection( $i );
1102 if ( $conn ) {
1103 return $conn->getReplicaPos();
1104 }
1105 }
1106 } else {
1107 return $masterConn->getMasterPos();
1108 }
1109
1110 return false;
1111 }
1112
1113 public function disable() {
1114 $this->closeAll();
1115 $this->disabled = true;
1116 }
1117
1118 public function closeAll() {
1119 $this->forEachOpenConnection( function ( IDatabase $conn ) {
1120 $host = $conn->getServer();
1121 $this->connLogger->debug( "Closing connection to database '$host'." );
1122 $conn->close();
1123 } );
1124
1125 $this->mConns = [
1126 self::KEY_LOCAL => [],
1127 self::KEY_FOREIGN_INUSE => [],
1128 self::KEY_FOREIGN_FREE => [],
1129 self::KEY_LOCAL_NOROUND => [],
1130 self::KEY_FOREIGN_INUSE_NOROUND => [],
1131 self::KEY_FOREIGN_FREE_NOROUND => []
1132 ];
1133 $this->connsOpened = 0;
1134 }
1135
1136 public function closeConnection( IDatabase $conn ) {
1137 $serverIndex = $conn->getLBInfo( 'serverIndex' ); // second index level of mConns
1138 foreach ( $this->mConns as $type => $connsByServer ) {
1139 if ( !isset( $connsByServer[$serverIndex] ) ) {
1140 continue;
1141 }
1142
1143 foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
1144 if ( $conn === $trackedConn ) {
1145 $host = $this->getServerName( $i );
1146 $this->connLogger->debug( "Closing connection to database $i at '$host'." );
1147 unset( $this->mConns[$type][$serverIndex][$i] );
1149 break 2;
1150 }
1151 }
1152 }
1153
1154 $conn->close();
1155 }
1156
1157 public function commitAll( $fname = __METHOD__ ) {
1158 $failures = [];
1159
1160 $restore = ( $this->trxRoundId !== false );
1161 $this->trxRoundId = false;
1162 $this->forEachOpenConnection(
1163 function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
1164 try {
1165 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1166 } catch ( DBError $e ) {
1167 call_user_func( $this->errorLogger, $e );
1168 $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1169 }
1170 if ( $restore && $conn->getLBInfo( 'master' ) ) {
1171 $this->undoTransactionRoundFlags( $conn );
1172 }
1173 }
1174 );
1175
1176 if ( $failures ) {
1177 throw new DBExpectedError(
1178 null,
1179 "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1180 );
1181 }
1182 }
1183
1184 public function finalizeMasterChanges() {
1185 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1186 // Any error should cause all DB transactions to be rolled back together
1187 $conn->setTrxEndCallbackSuppression( false );
1189 // Defer post-commit callbacks until COMMIT finishes for all DBs
1190 $conn->setTrxEndCallbackSuppression( true );
1191 } );
1192 }
1193
1194 public function approveMasterChanges( array $options ) {
1195 $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
1196 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
1197 // If atomic sections or explicit transactions are still open, some caller must have
1198 // caught an exception but failed to properly rollback any changes. Detect that and
1199 // throw and error (causing rollback).
1200 if ( $conn->explicitTrxActive() ) {
1201 throw new DBTransactionError(
1202 $conn,
1203 "Explicit transaction still active. A caller may have caught an error."
1204 );
1205 }
1206 // Assert that the time to replicate the transaction will be sane.
1207 // If this fails, then all DB transactions will be rollback back together.
1208 $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1209 if ( $limit > 0 && $time > $limit ) {
1210 throw new DBTransactionSizeError(
1211 $conn,
1212 "Transaction spent $time second(s) in writes, exceeding the limit of $limit.",
1213 [ $time, $limit ]
1214 );
1215 }
1216 // If a connection sits idle while slow queries execute on another, that connection
1217 // may end up dropped before the commit round is reached. Ping servers to detect this.
1218 if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1219 throw new DBTransactionError(
1220 $conn,
1221 "A connection to the {$conn->getDBname()} database was lost before commit."
1222 );
1223 }
1224 } );
1225 }
1226
1227 public function beginMasterChanges( $fname = __METHOD__ ) {
1228 if ( $this->trxRoundId !== false ) {
1229 throw new DBTransactionError(
1230 null,
1231 "$fname: Transaction round '{$this->trxRoundId}' already started."
1232 );
1233 }
1234 $this->trxRoundId = $fname;
1235
1236 $failures = [];
1238 function ( Database $conn ) use ( $fname, &$failures ) {
1239 $conn->setTrxEndCallbackSuppression( true );
1240 try {
1241 $conn->flushSnapshot( $fname );
1242 } catch ( DBError $e ) {
1243 call_user_func( $this->errorLogger, $e );
1244 $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1245 }
1246 $conn->setTrxEndCallbackSuppression( false );
1247 $this->applyTransactionRoundFlags( $conn );
1248 }
1249 );
1250
1251 if ( $failures ) {
1252 throw new DBExpectedError(
1253 null,
1254 "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
1255 );
1256 }
1257 }
1258
1259 public function commitMasterChanges( $fname = __METHOD__ ) {
1260 $failures = [];
1261
1263 $scope = $this->getScopedPHPBehaviorForCommit(); // try to ignore client aborts
1264
1265 $restore = ( $this->trxRoundId !== false );
1266 $this->trxRoundId = false;
1268 function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
1269 try {
1270 if ( $conn->writesOrCallbacksPending() ) {
1271 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1272 } elseif ( $restore ) {
1273 $conn->flushSnapshot( $fname );
1274 }
1275 } catch ( DBError $e ) {
1276 call_user_func( $this->errorLogger, $e );
1277 $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1278 }
1279 if ( $restore ) {
1280 $this->undoTransactionRoundFlags( $conn );
1281 }
1282 }
1283 );
1284
1285 if ( $failures ) {
1286 throw new DBExpectedError(
1287 null,
1288 "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1289 );
1290 }
1291 }
1292
1294 $e = null; // first exception
1295 $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
1296 $conn->setTrxEndCallbackSuppression( false );
1297 if ( $conn->writesOrCallbacksPending() ) {
1298 // This happens if onTransactionIdle() callbacks leave callbacks on *another* DB
1299 // (which finished its callbacks already). Warn and recover in this case. Let the
1300 // callbacks run in the final commitMasterChanges() in LBFactory::shutdown().
1301 $this->queryLogger->info( __METHOD__ . ": found writes/callbacks pending." );
1302 return;
1303 } elseif ( $conn->trxLevel() ) {
1304 // This happens for single-DB setups where DB_REPLICA uses the master DB,
1305 // thus leaving an implicit read-only transaction open at this point. It
1306 // also happens if onTransactionIdle() callbacks leave implicit transactions
1307 // open on *other* DBs (which is slightly improper). Let these COMMIT on the
1308 // next call to commitMasterChanges(), possibly in LBFactory::shutdown().
1309 return;
1310 }
1311 try {
1312 $conn->runOnTransactionIdleCallbacks( $type );
1313 } catch ( Exception $ex ) {
1314 $e = $e ?: $ex;
1315 }
1316 try {
1318 } catch ( Exception $ex ) {
1319 $e = $e ?: $ex;
1320 }
1321 } );
1322
1323 return $e;
1324 }
1325
1326 public function rollbackMasterChanges( $fname = __METHOD__ ) {
1327 $restore = ( $this->trxRoundId !== false );
1328 $this->trxRoundId = false;
1330 function ( IDatabase $conn ) use ( $fname, $restore ) {
1331 if ( $conn->writesOrCallbacksPending() ) {
1332 $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1333 }
1334 if ( $restore ) {
1335 $this->undoTransactionRoundFlags( $conn );
1336 }
1337 }
1338 );
1339 }
1340
1342 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1343 $conn->setTrxEndCallbackSuppression( true );
1344 } );
1345 }
1346
1350 private function applyTransactionRoundFlags( IDatabase $conn ) {
1351 if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
1352 return; // transaction rounds do not apply to these connections
1353 }
1354
1355 if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1356 // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1357 // Force DBO_TRX even in CLI mode since a commit round is expected soon.
1358 $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
1359 // If config has explicitly requested DBO_TRX be either on or off by not
1360 // setting DBO_DEFAULT, then respect that. Forcing no transactions is useful
1361 // for things like blob stores (ExternalStore) which want auto-commit mode.
1362 }
1363 }
1364
1368 private function undoTransactionRoundFlags( IDatabase $conn ) {
1369 if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
1370 return; // transaction rounds do not apply to these connections
1371 }
1372
1373 if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1374 $conn->restoreFlags( $conn::RESTORE_PRIOR );
1375 }
1376 }
1377
1378 public function flushReplicaSnapshots( $fname = __METHOD__ ) {
1379 $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) {
1380 $conn->flushSnapshot( __METHOD__ );
1381 } );
1382 }
1383
1384 public function hasMasterConnection() {
1385 return $this->isOpen( $this->getWriterIndex() );
1386 }
1387
1388 public function hasMasterChanges() {
1389 $pending = 0;
1390 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$pending ) {
1391 $pending |= $conn->writesOrCallbacksPending();
1392 } );
1393
1394 return (bool)$pending;
1395 }
1396
1397 public function lastMasterChangeTimestamp() {
1398 $lastTime = false;
1399 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$lastTime ) {
1400 $lastTime = max( $lastTime, $conn->lastDoneWrites() );
1401 } );
1402
1403 return $lastTime;
1404 }
1405
1406 public function hasOrMadeRecentMasterChanges( $age = null ) {
1407 $age = ( $age === null ) ? $this->mWaitTimeout : $age;
1408
1409 return ( $this->hasMasterChanges()
1410 || $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
1411 }
1412
1413 public function pendingMasterChangeCallers() {
1414 $fnames = [];
1415 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$fnames ) {
1416 $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1417 } );
1418
1419 return $fnames;
1420 }
1421
1422 public function getLaggedReplicaMode( $domain = false ) {
1423 // No-op if there is only one DB (also avoids recursion)
1424 if ( !$this->laggedReplicaMode && $this->getServerCount() > 1 ) {
1425 try {
1426 // See if laggedReplicaMode gets set
1427 $conn = $this->getConnection( self::DB_REPLICA, false, $domain );
1428 $this->reuseConnection( $conn );
1429 } catch ( DBConnectionError $e ) {
1430 // Avoid expensive re-connect attempts and failures
1431 $this->allReplicasDownMode = true;
1432 $this->laggedReplicaMode = true;
1433 }
1434 }
1435
1437 }
1438
1444 public function getLaggedSlaveMode( $domain = false ) {
1445 return $this->getLaggedReplicaMode( $domain );
1446 }
1447
1448 public function laggedReplicaUsed() {
1450 }
1451
1457 public function laggedSlaveUsed() {
1458 return $this->laggedReplicaUsed();
1459 }
1460
1461 public function getReadOnlyReason( $domain = false, IDatabase $conn = null ) {
1462 if ( $this->readOnlyReason !== false ) {
1463 return $this->readOnlyReason;
1464 } elseif ( $this->getLaggedReplicaMode( $domain ) ) {
1465 if ( $this->allReplicasDownMode ) {
1466 return 'The database has been automatically locked ' .
1467 'until the replica database servers become available';
1468 } else {
1469 return 'The database has been automatically locked ' .
1470 'while the replica database servers catch up to the master.';
1471 }
1472 } elseif ( $this->masterRunningReadOnly( $domain, $conn ) ) {
1473 return 'The database master is running in read-only mode.';
1474 }
1475
1476 return false;
1477 }
1478
1484 private function masterRunningReadOnly( $domain, IDatabase $conn = null ) {
1486 $masterServer = $this->getServerName( $this->getWriterIndex() );
1487
1488 return (bool)$cache->getWithSetCallback(
1489 $cache->makeGlobalKey( __CLASS__, 'server-read-only', $masterServer ),
1490 self::TTL_CACHE_READONLY,
1491 function () use ( $domain, $conn ) {
1492 $old = $this->trxProfiler->setSilenced( true );
1493 try {
1494 $dbw = $conn ?: $this->getConnection( self::DB_MASTER, [], $domain );
1495 $readOnly = (int)$dbw->serverIsReadOnly();
1496 if ( !$conn ) {
1497 $this->reuseConnection( $dbw );
1498 }
1499 } catch ( DBError $e ) {
1500 $readOnly = 0;
1501 }
1502 $this->trxProfiler->setSilenced( $old );
1503 return $readOnly;
1504 },
1505 [ 'pcTTL' => $cache::TTL_PROC_LONG, 'busyValue' => 0 ]
1506 );
1507 }
1508
1509 public function allowLagged( $mode = null ) {
1510 if ( $mode === null ) {
1511 return $this->mAllowLagged;
1512 }
1513 $this->mAllowLagged = $mode;
1514
1515 return $this->mAllowLagged;
1516 }
1517
1518 public function pingAll() {
1519 $success = true;
1520 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$success ) {
1521 if ( !$conn->ping() ) {
1522 $success = false;
1523 }
1524 } );
1525
1526 return $success;
1527 }
1528
1529 public function forEachOpenConnection( $callback, array $params = [] ) {
1530 foreach ( $this->mConns as $connsByServer ) {
1531 foreach ( $connsByServer as $serverConns ) {
1532 foreach ( $serverConns as $conn ) {
1533 $mergedParams = array_merge( [ $conn ], $params );
1534 call_user_func_array( $callback, $mergedParams );
1535 }
1536 }
1537 }
1538 }
1539
1540 public function forEachOpenMasterConnection( $callback, array $params = [] ) {
1541 $masterIndex = $this->getWriterIndex();
1542 foreach ( $this->mConns as $connsByServer ) {
1543 if ( isset( $connsByServer[$masterIndex] ) ) {
1545 foreach ( $connsByServer[$masterIndex] as $conn ) {
1546 $mergedParams = array_merge( [ $conn ], $params );
1547 call_user_func_array( $callback, $mergedParams );
1548 }
1549 }
1550 }
1551 }
1552
1553 public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
1554 foreach ( $this->mConns as $connsByServer ) {
1555 foreach ( $connsByServer as $i => $serverConns ) {
1556 if ( $i === $this->getWriterIndex() ) {
1557 continue; // skip master
1558 }
1559 foreach ( $serverConns as $conn ) {
1560 $mergedParams = array_merge( [ $conn ], $params );
1561 call_user_func_array( $callback, $mergedParams );
1562 }
1563 }
1564 }
1565 }
1566
1567 public function getMaxLag( $domain = false ) {
1568 $maxLag = -1;
1569 $host = '';
1570 $maxIndex = 0;
1571
1572 if ( $this->getServerCount() <= 1 ) {
1573 return [ $host, $maxLag, $maxIndex ]; // no replication = no lag
1574 }
1575
1576 $lagTimes = $this->getLagTimes( $domain );
1577 foreach ( $lagTimes as $i => $lag ) {
1578 if ( $this->mLoads[$i] > 0 && $lag > $maxLag ) {
1579 $maxLag = $lag;
1580 $host = $this->mServers[$i]['host'];
1581 $maxIndex = $i;
1582 }
1583 }
1584
1585 return [ $host, $maxLag, $maxIndex ];
1586 }
1587
1588 public function getLagTimes( $domain = false ) {
1589 if ( $this->getServerCount() <= 1 ) {
1590 return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
1591 }
1592
1593 $knownLagTimes = []; // map of (server index => 0 seconds)
1594 $indexesWithLag = [];
1595 foreach ( $this->mServers as $i => $server ) {
1596 if ( empty( $server['is static'] ) ) {
1597 $indexesWithLag[] = $i; // DB server might have replication lag
1598 } else {
1599 $knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
1600 }
1601 }
1602
1603 return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
1604 }
1605
1606 public function safeGetLag( IDatabase $conn ) {
1607 if ( $this->getServerCount() <= 1 ) {
1608 return 0;
1609 } else {
1610 return $conn->getLag();
1611 }
1612 }
1613
1620 public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = 10 ) {
1621 if ( $this->getServerCount() <= 1 || !$conn->getLBInfo( 'replica' ) ) {
1622 return true; // server is not a replica DB
1623 }
1624
1625 if ( !$pos ) {
1626 // Get the current master position, opening a connection if needed
1627 $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1628 if ( $masterConn ) {
1629 $pos = $masterConn->getMasterPos();
1630 } else {
1631 $masterConn = $this->openConnection( $this->getWriterIndex(), self::DOMAIN_ANY );
1632 $pos = $masterConn->getMasterPos();
1633 $this->closeConnection( $masterConn );
1634 }
1635 }
1636
1637 if ( $pos instanceof DBMasterPos ) {
1638 $result = $conn->masterPosWait( $pos, $timeout );
1639 if ( $result == -1 || is_null( $result ) ) {
1640 $msg = __METHOD__ . ": Timed out waiting on {$conn->getServer()} pos {$pos}";
1641 $this->replLogger->warning( "$msg" );
1642 $ok = false;
1643 } else {
1644 $this->replLogger->info( __METHOD__ . ": Done" );
1645 $ok = true;
1646 }
1647 } else {
1648 $ok = false; // something is misconfigured
1649 $this->replLogger->error( "Could not get master pos for {$conn->getServer()}." );
1650 }
1651
1652 return $ok;
1653 }
1654
1655 public function setTransactionListener( $name, callable $callback = null ) {
1656 if ( $callback ) {
1657 $this->trxRecurringCallbacks[$name] = $callback;
1658 } else {
1659 unset( $this->trxRecurringCallbacks[$name] );
1660 }
1662 function ( IDatabase $conn ) use ( $name, $callback ) {
1663 $conn->setTransactionListener( $name, $callback );
1664 }
1665 );
1666 }
1667
1668 public function setTableAliases( array $aliases ) {
1669 $this->tableAliases = $aliases;
1670 }
1671
1672 public function setDomainPrefix( $prefix ) {
1673 // Find connections to explicit foreign domains still marked as in-use...
1674 $domainsInUse = [];
1675 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$domainsInUse ) {
1676 // Once reuseConnection() is called on a handle, its reference count goes from 1 to 0.
1677 // Until then, it is still in use by the caller (explicitly or via DBConnRef scope).
1678 if ( $conn->getLBInfo( 'foreignPoolRefCount' ) > 0 ) {
1679 $domainsInUse[] = $conn->getDomainID();
1680 }
1681 } );
1682
1683 // Do not switch connections to explicit foreign domains unless marked as safe
1684 if ( $domainsInUse ) {
1685 $domains = implode( ', ', $domainsInUse );
1686 throw new DBUnexpectedError( null,
1687 "Foreign domain connections are still in use ($domains)." );
1688 }
1689
1690 $this->localDomain = new DatabaseDomain(
1691 $this->localDomain->getDatabase(),
1692 null,
1693 $prefix
1694 );
1695
1696 $this->forEachOpenConnection( function ( IDatabase $db ) use ( $prefix ) {
1697 $db->tablePrefix( $prefix );
1698 } );
1699 }
1700
1707 final protected function getScopedPHPBehaviorForCommit() {
1708 if ( PHP_SAPI != 'cli' ) { // https://bugs.php.net/bug.php?id=47540
1709 $old = ignore_user_abort( true ); // avoid half-finished operations
1710 return new ScopedCallback( function () use ( $old ) {
1711 ignore_user_abort( $old );
1712 } );
1713 }
1714
1715 return null;
1716 }
1717
1718 function __destruct() {
1719 // Avoid connection leaks for sanity
1720 $this->disable();
1721 }
1722}
1723
1724class_alias( LoadBalancer::class, 'LoadBalancer' );
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Throws a warning that $function is deprecated.
if(!defined( 'MEDIAWIKI')) $fname
This file is not a valid entry point, perform no further processing unless MEDIAWIKI is defined.
Definition Setup.php:36
A collection of static methods to play with arrays.
static pickRandom( $weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
interface is intended to be more or less compatible with the PHP memcached client.
Definition BagOStuff.php:47
A BagOStuff object with no objects in it.
Multi-datacenter aware caching interface.
Class for ensuring a consistent ordering of events as seen by the user, despite replication.
Exception class for attempted DB access.
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Definition DBConnRef.php:15
Database error base class.
Definition DBError.php:30
Base class for the more common types of database errors.
Class to handle database/prefix specification for IDatabase domains.
Relational database abstraction object.
Definition Database.php:45
static factory( $dbType, $p=[])
Construct a Database subclass instance given a database type and parameters.
Definition Database.php:338
flushSnapshot( $fname=__METHOD__)
Commit any transaction but error out if writes or callbacks are pending.
runOnTransactionPreCommitCallbacks()
Actually run and consume any "on transaction pre-commit" callbacks.
runTransactionListenerCallbacks( $trigger)
Actually run any "transaction listener" callbacks.
writesOrCallbacksPending()
Returns true if there is a transaction open with possible write queries or transaction pre-commit/idl...
Definition Database.php:558
setTrxEndCallbackSuppression( $suppress)
Whether to disable running of post-COMMIT/ROLLBACK callbacks.
trxLevel()
Gets the current transaction level.
Definition Database.php:472
Database connection, tracking, load balancing, and transaction manager for a cluster.
masterRunningReadOnly( $domain, IDatabase $conn=null)
object string $profiler
Class name or object With profileIn/profileOut methods.
approveMasterChanges(array $options)
Perform all pre-commit checks for things like replication safety.
string $mLastError
The last DB selection or connection error.
callable $errorLogger
Exception logger.
runMasterPostTrxCallbacks( $type)
Issue all pending post-COMMIT/ROLLBACK callbacks.
string $host
Current server name.
flushReplicaSnapshots( $fname=__METHOD__)
Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshot.
reuseConnection( $conn)
Mark a foreign connection as being available for reuse under a different DB domain.
int $mReadIndex
The generic (not query grouped) replica DB index (of $mServers)
getServerType( $i)
Get DB type of the server with the specified index.
setTableAliases(array $aliases)
Make certain table names use their own database, schema, and table prefix when passed into SQL querie...
commitAll( $fname=__METHOD__)
Commit transactions on all open connections.
string $agent
Agent name for query profiling.
waitFor( $pos)
Set the master wait position.
getMasterPos()
Get the current master position for chronology control purposes.
setDomainPrefix( $prefix)
Set a new table prefix for the existing local domain ID for testing.
finalizeMasterChanges()
Perform all pre-commit callbacks that remain part of the atomic transactions and disable any post-com...
bool $laggedReplicaMode
Whether the generic reader fell back to a lagged replica DB.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
array[] $trxRecurringCallbacks
Map of (name => callable)
pickReaderIndex(array $loads, $domain=false)
getLazyConnectionRef( $db, $groups=[], $domain=false, $flags=0)
Get a database connection handle reference without connecting yet.
rollbackMasterChanges( $fname=__METHOD__)
Issue ROLLBACK only on master, only if queries were done on connection.
openForeignConnection( $i, $domain, $flags=0)
Open a connection to a foreign DB, or return one if it is already open.
laggedReplicaUsed()
Checks whether the database for generic connections this request was both:
doWait( $index, $open=false, $timeout=null)
Wait for a given replica DB to catch up to the master pos stored in $this.
undoTransactionRoundFlags(IDatabase $conn)
getConnectionRef( $db, $groups=[], $domain=false, $flags=0)
Get a database connection handle reference.
isOpen( $index)
Test if the specified index represents an open connection.
array $loadMonitorConfig
The LoadMonitor configuration.
isNonZeroLoad( $i)
Returns true if the specified index is valid and has non-zero load.
float[] $mLoads
Map of (server index => weight)
getMaintenanceConnectionRef( $db, $groups=[], $domain=false, $flags=0)
Get a maintenance database connection handle reference for migrations and schema changes.
bool DBMasterPos $mWaitForPos
False if not set.
getLoadMonitor()
Get a LoadMonitor instance.
setTransactionListener( $name, callable $callback=null)
Set a callback via IDatabase::setTransactionListener() on all current and future master connections o...
forEachOpenReplicaConnection( $callback, array $params=[])
Call a function with each open replica DB connection object.
lastMasterChangeTimestamp()
Get the timestamp of the latest write query done by this thread.
beginMasterChanges( $fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
array[] $mGroupLoads
Map of (group => server index => weight)
closeAll()
Close all open connections.
getLagTimes( $domain=false)
Get an estimate of replication lag (in seconds) for each server.
string bool $trxRoundId
String if a requested DBO_TRX transaction round is active.
closeConnection(IDatabase $conn)
Close a connection.
getReadOnlyReason( $domain=false, IDatabase $conn=null)
getRandomNonLagged(array $loads, $domain=false, $maxLag=INF)
bool $cliMode
Whether this PHP instance is for a CLI script.
safeGetLag(IDatabase $conn)
Get the lag in seconds for a given connection, or zero if this load balancer does not have replicatio...
openConnection( $i, $domain=false, $flags=0)
Open a connection to the server given by the specified index.
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
suppressTransactionEndCallbacks()
Suppress all pending post-COMMIT/ROLLBACK callbacks.
disable()
Disable this load balancer.
getReaderIndex( $group=false, $domain=false)
Get the index of the reader connection, which may be a replica DB.
reallyOpenConnection(array $server, $dbNameOverride=false)
Really opens a connection.
DatabaseDomain $localDomain
Local Domain ID and default for selectDB() calls.
safeWaitForMasterPos(IDatabase $conn, $pos=false, $timeout=10)
getConnection( $i, $groups=[], $domain=false, $flags=0)
Get a connection by index.
TransactionProfiler $trxProfiler
getScopedPHPBehaviorForCommit()
Make PHP ignore user aborts/disconnects until the returned value leaves scope.
getServerName( $i)
Get the host name or IP address of the server with the specified index.
commitMasterChanges( $fname=__METHOD__)
Issue COMMIT on all master connections where writes where done.
getLaggedReplicaMode( $domain=false)
forEachOpenConnection( $callback, array $params=[])
Call a function with each open connection object.
haveIndex( $i)
Returns true if the specified index is a valid server index.
bool $mAllowLagged
Whether to disregard replica DB lag as a factor in replica DB selection.
int $connsOpened
Total connections opened.
applyTransactionRoundFlags(IDatabase $conn)
pendingMasterChangeCallers()
Get the list of callers that have pending master changes.
hasMasterChanges()
Determine if there are pending changes in a transaction by this thread.
getMaxLag( $domain=false)
Get the hostname and lag time of the most-lagged replica DB.
Database $errorConnection
DB connection object that caused a problem.
bool $allReplicasDownMode
Whether the generic reader fell back to a lagged replica DB.
ChronologyProtector null $chronProt
string $localDomainIdAlias
Alternate ID string for the domain instead of DatabaseDomain::getId()
setServerInfo( $i, array $serverInfo)
allowLagged( $mode=null)
Disables/enables lag checks.
getServerCount()
Get the number of defined servers (not the number of open connections)
string bool $readOnlyReason
Reason the LB is read-only or false if not.
Database[][][] $mConns
Map of (connection category => server index => IDatabase[])
waitForOne( $pos, $timeout=null)
Set the master wait position and wait for a "generic" replica DB to catch up to it.
waitForAll( $pos, $timeout=null)
Set the master wait position and wait for ALL replica DBs to catch up to it.
array[] $mServers
Map of (server index => server config array)
int $mWaitTimeout
Seconds to spend waiting on replica DB lag to resolve.
__construct(array $params)
Construct a manager of IDatabase connection objects.
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Helper class that detects high-contention DB queries via profiling calls.
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition deferred.txt:11
when a variable name is used in a function
Definition design.txt:94
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any and then calling but I prefer the flexibility This should also do the output encoding The system allocates a global one in $wgOut Title Represents the title of an and does all the work of translating among various forms such as plain database key
Definition design.txt:26
the array() calling protocol came about after MediaWiki 1.4rc1.
see documentation in includes Linker php for Linker::makeImageLink & $time
Definition hooks.txt:1778
null means default in associative array with keys and values unescaped Should be merged with default with a value of false meaning to suppress the attribute in associative array with keys and values unescaped & $options
Definition hooks.txt:1971
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction you ll probably need to make sure the header is varied on and they can depend only on the ResourceLoaderContext $context
Definition hooks.txt:2780
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition hooks.txt:2805
Allows to change the fields on the form that will be generated $name
Definition hooks.txt:302
processing should stop and the error should be shown to the user * false
Definition hooks.txt:187
returning false will NOT prevent logging $e
Definition hooks.txt:2146
An object representing a master or replica DB position in a replicated setup.
hasReached(DBMasterPos $pos)
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:40
lastDoneWrites()
Returns the last time the connection may have been used for write queries.
getServer()
Get the server hostname or IP address.
isOpen()
Is a connection to the database open?
setTransactionListener( $name, callable $callback=null)
Run a callback each time any transaction commits or rolls back.
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
restoreFlags( $state=self::RESTORE_PRIOR)
Restore the flags to their prior state before the last setFlag/clearFlag call.
flushSnapshot( $fname=__METHOD__)
Commit any transaction but error out if writes or callbacks are pending.
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
pendingWriteQueryDuration( $type=self::ESTIMATE_TOTAL)
Get the time spend running write queries for this transaction.
getLag()
Get replica DB lag.
close()
Closes a database connection.
tablePrefix( $prefix=null)
Get/set the table prefix.
ping(&$rtt=null)
Ping the server and try to reconnect if it there is no connection.
masterPosWait(DBMasterPos $pos, $timeout)
Wait for the replica DB to catch up to a given master position.
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
commit( $fname=__METHOD__, $flush='')
Commits a transaction previously started using begin().
writesOrCallbacksPending()
Returns true if there is a transaction open with possible write queries or transaction pre-commit/idl...
pendingWriteCallers()
Get the list of method names that did write queries for this transaction.
Database cluster connection, tracking, load balancing, and transaction manager interface.
An interface for database load monitoring.
$cache
Definition mcc.php:33
$params