MediaWiki REL1_33
LoadBalancer.php
Go to the documentation of this file.
1<?php
22namespace Wikimedia\Rdbms;
23
24use Psr\Log\LoggerInterface;
25use Psr\Log\NullLogger;
26use Wikimedia\ScopedCallback;
31use UnexpectedValueException;
32use InvalidArgumentException;
33use RuntimeException;
34use Exception;
35
41class LoadBalancer implements ILoadBalancer {
43 private $loadMonitor;
47 private $srvCache;
49 private $wanCache;
51 private $profiler;
53 private $trxProfiler;
55 private $replLogger;
57 private $connLogger;
59 private $queryLogger;
61 private $perfLogger;
63 private $errorLogger;
66
68 private $localDomain;
69
71 private $conns;
72
74 private $servers;
76 private $loads;
78 private $groupLoads;
80 private $allowLagged;
82 private $waitTimeout;
88 private $maxLag;
89
91 private $hostname;
93 private $cliMode;
95 private $agent;
96
98 private $tableAliases = [];
100 private $indexAliases = [];
103
107 private $readIndex;
109 private $waitForPos;
111 private $laggedReplicaMode = false;
113 private $allReplicasDownMode = false;
115 private $lastError = 'Unknown error';
117 private $readOnlyReason = false;
119 private $connsOpened = 0;
121 private $disabled = false;
123 private $connectionAttempted = false;
124
126 private $trxRoundId = false;
128 private $trxRoundStage = self::ROUND_CURSORY;
129
131 private $defaultGroup = null;
132
134 const CONN_HELD_WARN_THRESHOLD = 10;
135
137 const MAX_LAG_DEFAULT = 6;
139 const MAX_WAIT_DEFAULT = 10;
141 const TTL_CACHE_READONLY = 5;
142
143 const KEY_LOCAL = 'local';
144 const KEY_FOREIGN_FREE = 'foreignFree';
145 const KEY_FOREIGN_INUSE = 'foreignInUse';
146
147 const KEY_LOCAL_NOROUND = 'localAutoCommit';
148 const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
149 const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
150
152 const ROUND_CURSORY = 'cursory';
154 const ROUND_FINALIZED = 'finalized';
156 const ROUND_APPROVED = 'approved';
158 const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
160 const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
162 const ROUND_ERROR = 'error';
163
164 public function __construct( array $params ) {
165 if ( !isset( $params['servers'] ) ) {
166 throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
167 }
168 $this->servers = $params['servers'];
169 foreach ( $this->servers as $i => $server ) {
170 if ( $i == 0 ) {
171 $this->servers[$i]['master'] = true;
172 } else {
173 $this->servers[$i]['replica'] = true;
174 }
175 }
176
177 $localDomain = isset( $params['localDomain'] )
178 ? DatabaseDomain::newFromId( $params['localDomain'] )
181
182 $this->waitTimeout = $params['waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
183
184 $this->readIndex = -1;
185 $this->conns = [
186 // Connection were transaction rounds may be applied
187 self::KEY_LOCAL => [],
188 self::KEY_FOREIGN_INUSE => [],
189 self::KEY_FOREIGN_FREE => [],
190 // Auto-committing counterpart connections that ignore transaction rounds
191 self::KEY_LOCAL_NOROUND => [],
192 self::KEY_FOREIGN_INUSE_NOROUND => [],
193 self::KEY_FOREIGN_FREE_NOROUND => []
194 ];
195 $this->loads = [];
196 $this->waitForPos = false;
197 $this->allowLagged = false;
198
199 if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
200 $this->readOnlyReason = $params['readOnlyReason'];
201 }
202
203 $this->maxLag = $params['maxLag'] ?? self::MAX_LAG_DEFAULT;
204
205 $this->loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => 'LoadMonitorNull' ];
206 $this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ];
207
208 foreach ( $params['servers'] as $i => $server ) {
209 $this->loads[$i] = $server['load'];
210 if ( isset( $server['groupLoads'] ) ) {
211 foreach ( $server['groupLoads'] as $group => $ratio ) {
212 if ( !isset( $this->groupLoads[$group] ) ) {
213 $this->groupLoads[$group] = [];
214 }
215 $this->groupLoads[$group][$i] = $ratio;
216 }
217 }
218 }
219
220 $this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff();
221 $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
222 $this->profiler = $params['profiler'] ?? null;
223 $this->trxProfiler = $params['trxProfiler'] ?? new TransactionProfiler();
224
225 $this->errorLogger = $params['errorLogger'] ?? function ( Exception $e ) {
226 trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
227 };
228 $this->deprecationLogger = $params['deprecationLogger'] ?? function ( $msg ) {
229 trigger_error( $msg, E_USER_DEPRECATED );
230 };
231
232 foreach ( [ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ] as $key ) {
233 $this->$key = $params[$key] ?? new NullLogger();
234 }
235
236 $this->hostname = $params['hostname'] ?? ( gethostname() ?: 'unknown' );
237 $this->cliMode = $params['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
238 $this->agent = $params['agent'] ?? '';
239
240 if ( isset( $params['chronologyCallback'] ) ) {
241 $this->chronologyCallback = $params['chronologyCallback'];
242 }
243
244 if ( isset( $params['roundStage'] ) ) {
245 if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
246 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
247 } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
248 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
249 }
250 }
251
252 $this->defaultGroup = $params['defaultGroup'] ?? null;
253 }
254
255 public function getLocalDomainID() {
256 return $this->localDomain->getId();
257 }
258
259 public function resolveDomainID( $domain ) {
260 return ( $domain !== false ) ? (string)$domain : $this->getLocalDomainID();
261 }
262
268 private function getLoadMonitor() {
269 if ( !isset( $this->loadMonitor ) ) {
270 $compat = [
271 'LoadMonitor' => LoadMonitor::class,
272 'LoadMonitorNull' => LoadMonitorNull::class,
273 'LoadMonitorMySQL' => LoadMonitorMySQL::class,
274 ];
275
276 $class = $this->loadMonitorConfig['class'];
277 if ( isset( $compat[$class] ) ) {
278 $class = $compat[$class];
279 }
280
281 $this->loadMonitor = new $class(
282 $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
283 $this->loadMonitor->setLogger( $this->replLogger );
284 }
285
286 return $this->loadMonitor;
287 }
288
295 private function getRandomNonLagged( array $loads, $domain = false, $maxLag = INF ) {
296 $lags = $this->getLagTimes( $domain );
297
298 # Unset excessively lagged servers
299 foreach ( $lags as $i => $lag ) {
300 if ( $i != 0 ) {
301 # How much lag this server nominally is allowed to have
302 $maxServerLag = $this->servers[$i]['max lag'] ?? $this->maxLag; // default
303 # Constrain that futher by $maxLag argument
304 $maxServerLag = min( $maxServerLag, $maxLag );
305
306 $host = $this->getServerName( $i );
307 if ( $lag === false && !is_infinite( $maxServerLag ) ) {
308 $this->replLogger->debug(
309 __METHOD__ .
310 ": server {host} is not replicating?", [ 'host' => $host ] );
311 unset( $loads[$i] );
312 } elseif ( $lag > $maxServerLag ) {
313 $this->replLogger->debug(
314 __METHOD__ .
315 ": server {host} has {lag} seconds of lag (>= {maxlag})",
316 [ 'host' => $host, 'lag' => $lag, 'maxlag' => $maxServerLag ]
317 );
318 unset( $loads[$i] );
319 }
320 }
321 }
322
323 # Find out if all the replica DBs with non-zero load are lagged
324 $sum = 0;
325 foreach ( $loads as $load ) {
326 $sum += $load;
327 }
328 if ( $sum == 0 ) {
329 # No appropriate DB servers except maybe the master and some replica DBs with zero load
330 # Do NOT use the master
331 # Instead, this function will return false, triggering read-only mode,
332 # and a lagged replica DB will be used instead.
333 return false;
334 }
335
336 if ( count( $loads ) == 0 ) {
337 return false;
338 }
339
340 # Return a random representative of the remainder
342 }
343
344 public function getReaderIndex( $group = false, $domain = false ) {
345 if ( count( $this->servers ) == 1 ) {
346 // Skip the load balancing if there's only one server
347 return $this->getWriterIndex();
348 } elseif ( $group === false && $this->readIndex >= 0 ) {
349 // Shortcut if the generic reader index was already cached
350 return $this->readIndex;
351 }
352
353 if ( $group !== false ) {
354 // Use the server weight array for this load group
355 if ( isset( $this->groupLoads[$group] ) ) {
356 $loads = $this->groupLoads[$group];
357 } else {
358 // No loads for this group, return false and the caller can use some other group
359 $this->connLogger->info( __METHOD__ . ": no loads for group $group" );
360
361 return false;
362 }
363 } else {
364 // Use the generic load group
366 }
367
368 // Scale the configured load ratios according to each server's load and state
369 $this->getLoadMonitor()->scaleLoads( $loads, $domain );
370
371 // Pick a server to use, accounting for weights, load, lag, and "waitForPos"
372 list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
373 if ( $i === false ) {
374 // Replica DB connection unsuccessful
375 return false;
376 }
377
378 if ( $this->waitForPos && $i != $this->getWriterIndex() ) {
379 // Before any data queries are run, wait for the server to catch up to the
380 // specified position. This is used to improve session consistency. Note that
381 // when LoadBalancer::waitFor() sets "waitForPos", the waiting triggers here,
382 // so update laggedReplicaMode as needed for consistency.
383 if ( !$this->doWait( $i ) ) {
384 $laggedReplicaMode = true;
385 }
386 }
387
388 if ( $this->readIndex <= 0 && $this->loads[$i] > 0 && $group === false ) {
389 // Cache the generic reader index for future ungrouped DB_REPLICA handles
390 $this->readIndex = $i;
391 // Record if the generic reader index is in "lagged replica DB" mode
392 if ( $laggedReplicaMode ) {
393 $this->laggedReplicaMode = true;
394 }
395 }
396
397 $serverName = $this->getServerName( $i );
398 $this->connLogger->debug( __METHOD__ . ": using server $serverName for group '$group'" );
399
400 return $i;
401 }
402
408 private function pickReaderIndex( array $loads, $domain = false ) {
409 if ( $loads === [] ) {
410 throw new InvalidArgumentException( "Empty server array given to LoadBalancer" );
411 }
412
414 $i = false;
416 $laggedReplicaMode = false;
417
418 // Quickly look through the available servers for a server that meets criteria...
419 $currentLoads = $loads;
420 while ( count( $currentLoads ) ) {
421 if ( $this->allowLagged || $laggedReplicaMode ) {
422 $i = ArrayUtils::pickRandom( $currentLoads );
423 } else {
424 $i = false;
425 if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
426 // "chronologyCallback" sets "waitForPos" for session consistency.
427 // This triggers doWait() after connect, so it's especially good to
428 // avoid lagged servers so as to avoid excessive delay in that method.
429 $ago = microtime( true ) - $this->waitForPos->asOfTime();
430 // Aim for <= 1 second of waiting (being too picky can backfire)
431 $i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
432 }
433 if ( $i === false ) {
434 // Any server with less lag than it's 'max lag' param is preferable
435 $i = $this->getRandomNonLagged( $currentLoads, $domain );
436 }
437 if ( $i === false && count( $currentLoads ) != 0 ) {
438 // All replica DBs lagged. Switch to read-only mode
439 $this->replLogger->error(
440 __METHOD__ . ": all replica DBs lagged. Switch to read-only mode" );
441 $i = ArrayUtils::pickRandom( $currentLoads );
442 $laggedReplicaMode = true;
443 }
444 }
445
446 if ( $i === false ) {
447 // pickRandom() returned false.
448 // This is permanent and means the configuration or the load monitor
449 // wants us to return false.
450 $this->connLogger->debug( __METHOD__ . ": pickRandom() returned false" );
451
452 return [ false, false ];
453 }
454
455 $serverName = $this->getServerName( $i );
456 $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
457
458 $conn = $this->openConnection( $i, $domain );
459 if ( !$conn ) {
460 $this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" );
461 unset( $currentLoads[$i] ); // avoid this server next iteration
462 $i = false;
463 continue;
464 }
465
466 // Decrement reference counter, we are finished with this connection.
467 // It will be incremented for the caller later.
468 if ( $domain !== false ) {
469 $this->reuseConnection( $conn );
470 }
471
472 // Return this server
473 break;
474 }
475
476 // If all servers were down, quit now
477 if ( $currentLoads === [] ) {
478 $this->connLogger->error( __METHOD__ . ": all servers down" );
479 }
480
481 return [ $i, $laggedReplicaMode ];
482 }
483
484 public function waitFor( $pos ) {
485 $oldPos = $this->waitForPos;
486 try {
487 $this->waitForPos = $pos;
488 // If a generic reader connection was already established, then wait now
489 $i = $this->readIndex;
490 if ( ( $i > 0 ) && !$this->doWait( $i ) ) {
491 $this->laggedReplicaMode = true;
492 }
493 } finally {
494 // Restore the older position if it was higher since this is used for lag-protection
495 $this->setWaitForPositionIfHigher( $oldPos );
496 }
497 }
498
499 public function waitForOne( $pos, $timeout = null ) {
500 $oldPos = $this->waitForPos;
501 try {
502 $this->waitForPos = $pos;
503
504 $i = $this->readIndex;
505 if ( $i <= 0 ) {
506 // Pick a generic replica DB if there isn't one yet
507 $readLoads = $this->loads;
508 unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
509 $readLoads = array_filter( $readLoads ); // with non-zero load
510 $i = ArrayUtils::pickRandom( $readLoads );
511 }
512
513 if ( $i > 0 ) {
514 $ok = $this->doWait( $i, true, $timeout );
515 } else {
516 $ok = true; // no applicable loads
517 }
518 } finally {
519 # Restore the old position, as this is not used for lag-protection but for throttling
520 $this->waitForPos = $oldPos;
521 }
522
523 return $ok;
524 }
525
526 public function waitForAll( $pos, $timeout = null ) {
527 $timeout = $timeout ?: $this->waitTimeout;
528
529 $oldPos = $this->waitForPos;
530 try {
531 $this->waitForPos = $pos;
532 $serverCount = count( $this->servers );
533
534 $ok = true;
535 for ( $i = 1; $i < $serverCount; $i++ ) {
536 if ( $this->loads[$i] > 0 ) {
537 $start = microtime( true );
538 $ok = $this->doWait( $i, true, $timeout ) && $ok;
539 $timeout -= intval( microtime( true ) - $start );
540 if ( $timeout <= 0 ) {
541 break; // timeout reached
542 }
543 }
544 }
545 } finally {
546 # Restore the old position, as this is not used for lag-protection but for throttling
547 $this->waitForPos = $oldPos;
548 }
549
550 return $ok;
551 }
552
556 private function setWaitForPositionIfHigher( $pos ) {
557 if ( !$pos ) {
558 return;
559 }
560
561 if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
562 $this->waitForPos = $pos;
563 }
564 }
565
566 public function getAnyOpenConnection( $i, $flags = 0 ) {
567 $i = ( $i === self::DB_MASTER ) ? $this->getWriterIndex() : $i;
568 $autocommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
569
570 foreach ( $this->conns as $connsByServer ) {
571 if ( $i === self::DB_REPLICA ) {
572 $indexes = array_keys( $connsByServer );
573 } else {
574 $indexes = isset( $connsByServer[$i] ) ? [ $i ] : [];
575 }
576
577 foreach ( $indexes as $index ) {
578 foreach ( $connsByServer[$index] as $conn ) {
579 if ( !$conn->isOpen() ) {
580 continue; // some sort of error occured?
581 }
582 if ( !$autocommit || $conn->getLBInfo( 'autoCommitOnly' ) ) {
583 return $conn;
584 }
585 }
586 }
587 }
588
589 return false;
590 }
591
599 protected function doWait( $index, $open = false, $timeout = null ) {
600 $timeout = max( 1, intval( $timeout ?: $this->waitTimeout ) );
601
602 // Check if we already know that the DB has reached this point
603 $server = $this->getServerName( $index );
604 $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server, 'v1' );
606 $knownReachedPos = $this->srvCache->get( $key );
607 if (
608 $knownReachedPos instanceof DBMasterPos &&
609 $knownReachedPos->hasReached( $this->waitForPos )
610 ) {
611 $this->replLogger->debug(
612 __METHOD__ .
613 ': replica DB {dbserver} known to be caught up (pos >= $knownReachedPos).',
614 [ 'dbserver' => $server ]
615 );
616 return true;
617 }
618
619 // Find a connection to wait on, creating one if needed and allowed
620 $close = false; // close the connection afterwards
621 $conn = $this->getAnyOpenConnection( $index );
622 if ( !$conn ) {
623 if ( !$open ) {
624 $this->replLogger->debug(
625 __METHOD__ . ': no connection open for {dbserver}',
626 [ 'dbserver' => $server ]
627 );
628
629 return false;
630 } else {
631 $conn = $this->openConnection( $index, self::DOMAIN_ANY );
632 if ( !$conn ) {
633 $this->replLogger->warning(
634 __METHOD__ . ': failed to connect to {dbserver}',
635 [ 'dbserver' => $server ]
636 );
637
638 return false;
639 }
640 // Avoid connection spam in waitForAll() when connections
641 // are made just for the sake of doing this lag check.
642 $close = true;
643 }
644 }
645
646 $this->replLogger->info(
647 __METHOD__ .
648 ': waiting for replica DB {dbserver} to catch up...',
649 [ 'dbserver' => $server ]
650 );
651
652 $result = $conn->masterPosWait( $this->waitForPos, $timeout );
653
654 if ( $result === null ) {
655 $this->replLogger->warning(
656 __METHOD__ . ': Errored out waiting on {host} pos {pos}',
657 [
658 'host' => $server,
659 'pos' => $this->waitForPos,
660 'trace' => ( new RuntimeException() )->getTraceAsString()
661 ]
662 );
663 $ok = false;
664 } elseif ( $result == -1 ) {
665 $this->replLogger->warning(
666 __METHOD__ . ': Timed out waiting on {host} pos {pos}',
667 [
668 'host' => $server,
669 'pos' => $this->waitForPos,
670 'trace' => ( new RuntimeException() )->getTraceAsString()
671 ]
672 );
673 $ok = false;
674 } else {
675 $this->replLogger->debug( __METHOD__ . ": done waiting" );
676 $ok = true;
677 // Remember that the DB reached this point
678 $this->srvCache->set( $key, $this->waitForPos, BagOStuff::TTL_DAY );
679 }
680
681 if ( $close ) {
682 $this->closeConnection( $conn );
683 }
684
685 return $ok;
686 }
687
688 public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
689 if ( $i === null || $i === false ) {
690 throw new InvalidArgumentException( 'Attempt to call ' . __METHOD__ .
691 ' with invalid server index' );
692 }
693
694 if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
695 $domain = false; // local connection requested
696 }
697
698 if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) {
699 // Assuming all servers are of the same type (or similar), which is overwhelmingly
700 // the case, use the master server information to get the attributes. The information
701 // for $i cannot be used since it might be DB_REPLICA, which might require connection
702 // attempts in order to be resolved into a real server index.
703 $attributes = $this->getServerAttributes( $this->getWriterIndex() );
704 if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
705 // Callers sometimes want to (a) escape REPEATABLE-READ stateness without locking
706 // rows (e.g. FOR UPDATE) or (b) make small commits during a larger transactions
707 // to reduce lock contention. None of these apply for sqlite and using separate
708 // connections just causes self-deadlocks.
709 $flags &= ~self::CONN_TRX_AUTOCOMMIT;
710 $this->connLogger->info( __METHOD__ .
711 ': ignoring CONN_TRX_AUTOCOMMIT to avoid deadlocks.' );
712 }
713 }
714
715 // Check one "group" per default: the generic pool
716 $defaultGroups = $this->defaultGroup ? [ $this->defaultGroup ] : [ false ];
717
718 $groups = ( $groups === false || $groups === [] )
719 ? $defaultGroups
720 : (array)$groups;
721
722 $masterOnly = ( $i == self::DB_MASTER || $i == $this->getWriterIndex() );
723 $oldConnsOpened = $this->connsOpened; // connections open now
724
725 if ( $i == self::DB_MASTER ) {
726 $i = $this->getWriterIndex();
727 } elseif ( $i == self::DB_REPLICA ) {
728 # Try to find an available server in any the query groups (in order)
729 foreach ( $groups as $group ) {
730 $groupIndex = $this->getReaderIndex( $group, $domain );
731 if ( $groupIndex !== false ) {
732 $i = $groupIndex;
733 break;
734 }
735 }
736 }
737
738 # Operation-based index
739 if ( $i == self::DB_REPLICA ) {
740 $this->lastError = 'Unknown error'; // reset error string
741 # Try the general server pool if $groups are unavailable.
742 $i = ( $groups === [ false ] )
743 ? false // don't bother with this if that is what was tried above
744 : $this->getReaderIndex( false, $domain );
745 # Couldn't find a working server in getReaderIndex()?
746 if ( $i === false ) {
747 $this->lastError = 'No working replica DB server: ' . $this->lastError;
748 // Throw an exception
749 $this->reportConnectionError();
750 return null; // not reached
751 }
752 }
753
754 # Now we have an explicit index into the servers array
755 $conn = $this->openConnection( $i, $domain, $flags );
756 if ( !$conn ) {
757 // Throw an exception
758 $this->reportConnectionError();
759 return null; // not reached
760 }
761
762 # Profile any new connections that happen
763 if ( $this->connsOpened > $oldConnsOpened ) {
764 $host = $conn->getServer();
765 $dbname = $conn->getDBname();
766 $this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
767 }
768
769 if ( $masterOnly ) {
770 # Make master-requested DB handles inherit any read-only mode setting
771 $conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $domain, $conn ) );
772 }
773
774 return $conn;
775 }
776
777 public function reuseConnection( IDatabase $conn ) {
778 $serverIndex = $conn->getLBInfo( 'serverIndex' );
779 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
780 if ( $serverIndex === null || $refCount === null ) {
791 return;
792 } elseif ( $conn instanceof DBConnRef ) {
793 // DBConnRef already handles calling reuseConnection() and only passes the live
794 // Database instance to this method. Any caller passing in a DBConnRef is broken.
795 $this->connLogger->error(
796 __METHOD__ . ": got DBConnRef instance.\n" .
797 ( new RuntimeException() )->getTraceAsString() );
798
799 return;
800 }
801
802 if ( $this->disabled ) {
803 return; // DBConnRef handle probably survived longer than the LoadBalancer
804 }
805
806 if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
807 $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
808 $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
809 } else {
810 $connFreeKey = self::KEY_FOREIGN_FREE;
811 $connInUseKey = self::KEY_FOREIGN_INUSE;
812 }
813
814 $domain = $conn->getDomainID();
815 if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
816 throw new InvalidArgumentException( __METHOD__ .
817 ": connection $serverIndex/$domain not found; it may have already been freed." );
818 } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
819 throw new InvalidArgumentException( __METHOD__ .
820 ": connection $serverIndex/$domain mismatched; it may have already been freed." );
821 }
822
823 $conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
824 if ( $refCount <= 0 ) {
825 $this->conns[$connFreeKey][$serverIndex][$domain] = $conn;
826 unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
827 if ( !$this->conns[$connInUseKey][$serverIndex] ) {
828 unset( $this->conns[$connInUseKey][$serverIndex] ); // clean up
829 }
830 $this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
831 } else {
832 $this->connLogger->debug( __METHOD__ .
833 ": reference count for $serverIndex/$domain reduced to $refCount" );
834 }
835 }
836
837 public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
838 $domain = $this->resolveDomainID( $domain );
839 $role = $this->getRoleFromIndex( $i );
840
841 return new DBConnRef( $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
842 }
843
844 public function getLazyConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
845 $domain = $this->resolveDomainID( $domain );
846 $role = $this->getRoleFromIndex( $i );
847
848 return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role );
849 }
850
851 public function getMaintenanceConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
852 $domain = $this->resolveDomainID( $domain );
853 $role = $this->getRoleFromIndex( $i );
854
855 return new MaintainableDBConnRef(
856 $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
857 }
858
863 private function getRoleFromIndex( $i ) {
864 return ( $i === self::DB_MASTER || $i === $this->getWriterIndex() )
867 }
868
869 public function openConnection( $i, $domain = false, $flags = 0 ) {
870 if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
871 $domain = false; // local connection requested
872 }
873
874 if ( !$this->connectionAttempted && $this->chronologyCallback ) {
875 $this->connLogger->debug( __METHOD__ . ': calling initLB() before first connection.' );
876 // Load any "waitFor" positions before connecting so that doWait() is triggered
877 $this->connectionAttempted = true;
878 ( $this->chronologyCallback )( $this );
879 }
880
881 // Check if an auto-commit connection is being requested. If so, it will not reuse the
882 // main set of DB connections but rather its own pool since:
883 // a) those are usually set to implicitly use transaction rounds via DBO_TRX
884 // b) those must support the use of explicit transaction rounds via beginMasterChanges()
885 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
886
887 if ( $domain !== false ) {
888 // Connection is to a foreign domain
889 $conn = $this->openForeignConnection( $i, $domain, $flags );
890 } else {
891 // Connection is to the local domain
892 $conn = $this->openLocalConnection( $i, $flags );
893 }
894
895 if ( $conn instanceof IDatabase && !$conn->isOpen() ) {
896 // Connection was made but later unrecoverably lost for some reason.
897 // Do not return a handle that will just throw exceptions on use,
898 // but let the calling code (e.g. getReaderIndex) try another server.
899 // See DatabaseMyslBase::ping() for how this can happen.
900 $this->errorConnection = $conn;
901 $conn = false;
902 }
903
904 if ( $autoCommit && $conn instanceof IDatabase ) {
905 if ( $conn->trxLevel() ) { // sanity
906 throw new DBUnexpectedError(
907 $conn,
908 __METHOD__ . ': CONN_TRX_AUTOCOMMIT handle has a transaction.'
909 );
910 }
911
912 $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
913 }
914
915 return $conn;
916 }
917
930 private function openLocalConnection( $i, $flags = 0 ) {
931 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
932
933 $connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
934 if ( isset( $this->conns[$connKey][$i][0] ) ) {
935 $conn = $this->conns[$connKey][$i][0];
936 } else {
937 if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
938 throw new InvalidArgumentException( "No server with index '$i'." );
939 }
940 // Open a new connection
941 $server = $this->servers[$i];
942 $server['serverIndex'] = $i;
943 $server['autoCommitOnly'] = $autoCommit;
944 $conn = $this->reallyOpenConnection( $server, $this->localDomain );
945 $host = $this->getServerName( $i );
946 if ( $conn->isOpen() ) {
947 $this->connLogger->debug(
948 __METHOD__ . ": connected to database $i at '$host'." );
949 $this->conns[$connKey][$i][0] = $conn;
950 } else {
951 $this->connLogger->warning(
952 __METHOD__ . ": failed to connect to database $i at '$host'." );
953 $this->errorConnection = $conn;
954 $conn = false;
955 }
956 }
957
958 // Final sanity check to make sure the right domain is selected
959 if (
960 $conn instanceof IDatabase &&
961 !$this->localDomain->isCompatible( $conn->getDomainID() )
962 ) {
963 throw new UnexpectedValueException(
964 "Got connection to '{$conn->getDomainID()}', " .
965 "but expected local domain ('{$this->localDomain}')." );
966 }
967
968 return $conn;
969 }
970
993 private function openForeignConnection( $i, $domain, $flags = 0 ) {
994 $domainInstance = DatabaseDomain::newFromId( $domain );
995 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
996
997 if ( $autoCommit ) {
998 $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
999 $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
1000 } else {
1001 $connFreeKey = self::KEY_FOREIGN_FREE;
1002 $connInUseKey = self::KEY_FOREIGN_INUSE;
1003 }
1004
1006 $conn = null;
1007
1008 if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
1009 // Reuse an in-use connection for the same domain
1010 $conn = $this->conns[$connInUseKey][$i][$domain];
1011 $this->connLogger->debug( __METHOD__ . ": reusing connection $i/$domain" );
1012 } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
1013 // Reuse a free connection for the same domain
1014 $conn = $this->conns[$connFreeKey][$i][$domain];
1015 unset( $this->conns[$connFreeKey][$i][$domain] );
1016 $this->conns[$connInUseKey][$i][$domain] = $conn;
1017 $this->connLogger->debug( __METHOD__ . ": reusing free connection $i/$domain" );
1018 } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
1019 // Reuse a free connection from another domain if possible
1020 foreach ( $this->conns[$connFreeKey][$i] as $oldDomain => $conn ) {
1021 if ( $domainInstance->getDatabase() !== null ) {
1022 // Check if changing the database will require a new connection.
1023 // In that case, leave the connection handle alone and keep looking.
1024 // This prevents connections from being closed mid-transaction and can
1025 // also avoid overhead if the same database will later be requested.
1026 if (
1027 $conn->databasesAreIndependent() &&
1028 $conn->getDBname() !== $domainInstance->getDatabase()
1029 ) {
1030 continue;
1031 }
1032 // Select the new database, schema, and prefix
1033 $conn->selectDomain( $domainInstance );
1034 } else {
1035 // Stay on the current database, but update the schema/prefix
1036 $conn->dbSchema( $domainInstance->getSchema() );
1037 $conn->tablePrefix( $domainInstance->getTablePrefix() );
1038 }
1039 unset( $this->conns[$connFreeKey][$i][$oldDomain] );
1040 // Note that if $domain is an empty string, getDomainID() might not match it
1041 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1042 $this->connLogger->debug( __METHOD__ .
1043 ": reusing free connection from $oldDomain for $domain" );
1044 break;
1045 }
1046 }
1047
1048 if ( !$conn ) {
1049 if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
1050 throw new InvalidArgumentException( "No server with index '$i'." );
1051 }
1052 // Open a new connection
1053 $server = $this->servers[$i];
1054 $server['serverIndex'] = $i;
1055 $server['foreignPoolRefCount'] = 0;
1056 $server['foreign'] = true;
1057 $server['autoCommitOnly'] = $autoCommit;
1058 $conn = $this->reallyOpenConnection( $server, $domainInstance );
1059 if ( !$conn->isOpen() ) {
1060 $this->connLogger->warning( __METHOD__ . ": connection error for $i/$domain" );
1061 $this->errorConnection = $conn;
1062 $conn = false;
1063 } else {
1064 // Note that if $domain is an empty string, getDomainID() might not match it
1065 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1066 $this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
1067 }
1068 }
1069
1070 if ( $conn instanceof IDatabase ) {
1071 // Final sanity check to make sure the right domain is selected
1072 if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
1073 throw new UnexpectedValueException(
1074 "Got connection to '{$conn->getDomainID()}', but expected '$domain'." );
1075 }
1076 // Increment reference count
1077 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
1078 $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
1079 }
1080
1081 return $conn;
1082 }
1083
1084 public function getServerAttributes( $i ) {
1086 $this->getServerType( $i ),
1087 $this->servers[$i]['driver'] ?? null
1088 );
1089 }
1090
1098 private function isOpen( $index ) {
1099 if ( !is_int( $index ) ) {
1100 return false;
1101 }
1102
1103 return (bool)$this->getAnyOpenConnection( $index );
1104 }
1105
1117 protected function reallyOpenConnection( array $server, DatabaseDomain $domain ) {
1118 if ( $this->disabled ) {
1119 throw new DBAccessError();
1120 }
1121
1122 if ( $domain->getDatabase() === null ) {
1123 // The database domain does not specify a DB name and some database systems require a
1124 // valid DB specified on connection. The $server configuration array contains a default
1125 // DB name to use for connections in such cases.
1126 if ( $server['type'] === 'mysql' ) {
1127 // For MySQL, DATABASE and SCHEMA are synonyms, connections need not specify a DB,
1128 // and the DB name in $server might not exist due to legacy reasons (the default
1129 // domain used to ignore the local LB domain, even when mismatched).
1130 $server['dbname'] = null;
1131 }
1132 } else {
1133 $server['dbname'] = $domain->getDatabase();
1134 }
1135
1136 if ( $domain->getSchema() !== null ) {
1137 $server['schema'] = $domain->getSchema();
1138 }
1139
1140 // It is always possible to connect with any prefix, even the empty string
1141 $server['tablePrefix'] = $domain->getTablePrefix();
1142
1143 // Let the handle know what the cluster master is (e.g. "db1052")
1144 $masterName = $this->getServerName( $this->getWriterIndex() );
1145 $server['clusterMasterHost'] = $masterName;
1146
1147 // Log when many connection are made on requests
1148 if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
1149 $this->perfLogger->warning( __METHOD__ . ": " .
1150 "{$this->connsOpened}+ connections made (master=$masterName)" );
1151 }
1152
1153 $server['srvCache'] = $this->srvCache;
1154 // Set loggers and profilers
1155 $server['connLogger'] = $this->connLogger;
1156 $server['queryLogger'] = $this->queryLogger;
1157 $server['errorLogger'] = $this->errorLogger;
1158 $server['deprecationLogger'] = $this->deprecationLogger;
1159 $server['profiler'] = $this->profiler;
1160 $server['trxProfiler'] = $this->trxProfiler;
1161 // Use the same agent and PHP mode for all DB handles
1162 $server['cliMode'] = $this->cliMode;
1163 $server['agent'] = $this->agent;
1164 // Use DBO_DEFAULT flags by default for LoadBalancer managed databases. Assume that the
1165 // application calls LoadBalancer::commitMasterChanges() before the PHP script completes.
1166 $server['flags'] = $server['flags'] ?? IDatabase::DBO_DEFAULT;
1167
1168 // Create a live connection object
1169 try {
1170 $db = Database::factory( $server['type'], $server );
1171 } catch ( DBConnectionError $e ) {
1172 // FIXME: This is probably the ugliest thing I have ever done to
1173 // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
1174 $db = $e->db;
1175 }
1176
1177 $db->setLBInfo( $server );
1178 $db->setLazyMasterHandle(
1179 $this->getLazyConnectionRef( self::DB_MASTER, [], $db->getDomainID() )
1180 );
1181 $db->setTableAliases( $this->tableAliases );
1182 $db->setIndexAliases( $this->indexAliases );
1183
1184 if ( $server['serverIndex'] === $this->getWriterIndex() ) {
1185 if ( $this->trxRoundId !== false ) {
1186 $this->applyTransactionRoundFlags( $db );
1187 }
1188 foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
1189 $db->setTransactionListener( $name, $callback );
1190 }
1191 }
1192
1193 return $db;
1194 }
1195
1199 private function reportConnectionError() {
1200 $conn = $this->errorConnection; // the connection which caused the error
1201 $context = [
1202 'method' => __METHOD__,
1203 'last_error' => $this->lastError,
1204 ];
1205
1206 if ( $conn instanceof IDatabase ) {
1207 $context['db_server'] = $conn->getServer();
1208 $this->connLogger->warning(
1209 __METHOD__ . ": connection error: {last_error} ({db_server})",
1210 $context
1211 );
1212
1213 throw new DBConnectionError( $conn, "{$this->lastError} ({$context['db_server']})" );
1214 } else {
1215 // No last connection, probably due to all servers being too busy
1216 $this->connLogger->error(
1217 __METHOD__ .
1218 ": LB failure with no last connection. Connection error: {last_error}",
1219 $context
1220 );
1221
1222 // If all servers were busy, "lastError" will contain something sensible
1223 throw new DBConnectionError( null, $this->lastError );
1224 }
1225 }
1226
1227 public function getWriterIndex() {
1228 return 0;
1229 }
1230
1231 public function haveIndex( $i ) {
1232 return array_key_exists( $i, $this->servers );
1233 }
1234
1235 public function isNonZeroLoad( $i ) {
1236 return array_key_exists( $i, $this->servers ) && $this->loads[$i] != 0;
1237 }
1238
1239 public function getServerCount() {
1240 return count( $this->servers );
1241 }
1242
1243 public function getServerName( $i ) {
1244 $name = $this->servers[$i]['hostName'] ?? $this->servers[$i]['host'] ?? '';
1245
1246 return ( $name != '' ) ? $name : 'localhost';
1247 }
1248
1249 public function getServerInfo( $i ) {
1250 return $this->servers[$i] ?? false;
1251 }
1252
1253 public function getServerType( $i ) {
1254 return $this->servers[$i]['type'] ?? 'unknown';
1255 }
1256
1257 public function getMasterPos() {
1258 # If this entire request was served from a replica DB without opening a connection to the
1259 # master (however unlikely that may be), then we can fetch the position from the replica DB.
1260 $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1261 if ( !$masterConn ) {
1262 $serverCount = count( $this->servers );
1263 for ( $i = 1; $i < $serverCount; $i++ ) {
1264 $conn = $this->getAnyOpenConnection( $i );
1265 if ( $conn ) {
1266 return $conn->getReplicaPos();
1267 }
1268 }
1269 } else {
1270 return $masterConn->getMasterPos();
1271 }
1272
1273 return false;
1274 }
1275
1276 public function disable() {
1277 $this->closeAll();
1278 $this->disabled = true;
1279 }
1280
1281 public function closeAll() {
1282 $fname = __METHOD__;
1283 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( $fname ) {
1284 $host = $conn->getServer();
1285 $this->connLogger->debug(
1286 $fname . ": closing connection to database '$host'." );
1287 $conn->close();
1288 } );
1289
1290 $this->conns = [
1291 self::KEY_LOCAL => [],
1292 self::KEY_FOREIGN_INUSE => [],
1293 self::KEY_FOREIGN_FREE => [],
1294 self::KEY_LOCAL_NOROUND => [],
1295 self::KEY_FOREIGN_INUSE_NOROUND => [],
1296 self::KEY_FOREIGN_FREE_NOROUND => []
1297 ];
1298 $this->connsOpened = 0;
1299 }
1300
1301 public function closeConnection( IDatabase $conn ) {
1302 if ( $conn instanceof DBConnRef ) {
1303 // Avoid calling close() but still leaving the handle in the pool
1304 throw new RuntimeException( __METHOD__ . ': got DBConnRef instance.' );
1305 }
1306
1307 $serverIndex = $conn->getLBInfo( 'serverIndex' );
1308 foreach ( $this->conns as $type => $connsByServer ) {
1309 if ( !isset( $connsByServer[$serverIndex] ) ) {
1310 continue;
1311 }
1312
1313 foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
1314 if ( $conn === $trackedConn ) {
1315 $host = $this->getServerName( $i );
1316 $this->connLogger->debug(
1317 __METHOD__ . ": closing connection to database $i at '$host'." );
1318 unset( $this->conns[$type][$serverIndex][$i] );
1320 break 2;
1321 }
1322 }
1323 }
1324
1325 $conn->close();
1326 }
1327
1328 public function commitAll( $fname = __METHOD__ ) {
1329 $this->commitMasterChanges( $fname );
1330 $this->flushMasterSnapshots( $fname );
1331 $this->flushReplicaSnapshots( $fname );
1332 }
1333
1334 public function finalizeMasterChanges() {
1335 $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
1336
1337 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1338 // Loop until callbacks stop adding callbacks on other connections
1339 $total = 0;
1340 do {
1341 $count = 0; // callbacks execution attempts
1342 $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$count ) {
1343 // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
1344 // Any error should cause all (peer) transactions to be rolled back together.
1345 $count += $conn->runOnTransactionPreCommitCallbacks();
1346 } );
1347 $total += $count;
1348 } while ( $count > 0 );
1349 // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
1350 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1351 $conn->setTrxEndCallbackSuppression( true );
1352 } );
1353 $this->trxRoundStage = self::ROUND_FINALIZED;
1354
1355 return $total;
1356 }
1357
1359 $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
1360
1361 $limit = $options['maxWriteDuration'] ?? 0;
1362
1363 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1364 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
1365 // If atomic sections or explicit transactions are still open, some caller must have
1366 // caught an exception but failed to properly rollback any changes. Detect that and
1367 // throw and error (causing rollback).
1368 $conn->assertNoOpenTransactions();
1369 // Assert that the time to replicate the transaction will be sane.
1370 // If this fails, then all DB transactions will be rollback back together.
1371 $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1372 if ( $limit > 0 && $time > $limit ) {
1373 throw new DBTransactionSizeError(
1374 $conn,
1375 "Transaction spent $time second(s) in writes, exceeding the limit of $limit.",
1376 [ $time, $limit ]
1377 );
1378 }
1379 // If a connection sits idle while slow queries execute on another, that connection
1380 // may end up dropped before the commit round is reached. Ping servers to detect this.
1381 if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1382 throw new DBTransactionError(
1383 $conn,
1384 "A connection to the {$conn->getDBname()} database was lost before commit."
1385 );
1386 }
1387 } );
1388 $this->trxRoundStage = self::ROUND_APPROVED;
1389 }
1390
1391 public function beginMasterChanges( $fname = __METHOD__ ) {
1392 if ( $this->trxRoundId !== false ) {
1393 throw new DBTransactionError(
1394 null,
1395 "$fname: Transaction round '{$this->trxRoundId}' already started."
1396 );
1397 }
1398 $this->assertTransactionRoundStage( self::ROUND_CURSORY );
1399
1400 // Clear any empty transactions (no writes/callbacks) from the implicit round
1401 $this->flushMasterSnapshots( $fname );
1402
1403 $this->trxRoundId = $fname;
1404 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1405 // Mark applicable handles as participating in this explicit transaction round.
1406 // For each of these handles, any writes and callbacks will be tied to a single
1407 // transaction. The (peer) handles will reject begin()/commit() calls unless they
1408 // are part of an en masse commit or an en masse rollback.
1409 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1410 $this->applyTransactionRoundFlags( $conn );
1411 } );
1412 $this->trxRoundStage = self::ROUND_CURSORY;
1413 }
1414
1415 public function commitMasterChanges( $fname = __METHOD__ ) {
1416 $this->assertTransactionRoundStage( self::ROUND_APPROVED );
1417
1418 $failures = [];
1419
1421 $scope = ScopedCallback::newScopedIgnoreUserAbort(); // try to ignore client aborts
1422
1423 $restore = ( $this->trxRoundId !== false );
1424 $this->trxRoundId = false;
1425 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1426 // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
1427 // Note that callbacks should already be suppressed due to finalizeMasterChanges().
1429 function ( IDatabase $conn ) use ( $fname, &$failures ) {
1430 try {
1431 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1432 } catch ( DBError $e ) {
1433 ( $this->errorLogger )( $e );
1434 $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1435 }
1436 }
1437 );
1438 if ( $failures ) {
1439 throw new DBTransactionError(
1440 null,
1441 "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1442 );
1443 }
1444 if ( $restore ) {
1445 // Unmark handles as participating in this explicit transaction round
1446 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1447 $this->undoTransactionRoundFlags( $conn );
1448 } );
1449 }
1450 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
1451 }
1452
1454 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1455 $type = IDatabase::TRIGGER_COMMIT;
1456 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1457 $type = IDatabase::TRIGGER_ROLLBACK;
1458 } else {
1459 throw new DBTransactionError(
1460 null,
1461 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1462 );
1463 }
1464
1465 $oldStage = $this->trxRoundStage;
1466 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1467
1468 // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
1469 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1470 $conn->setTrxEndCallbackSuppression( false );
1471 } );
1472
1473 $e = null; // first exception
1474 $fname = __METHOD__;
1475 // Loop until callbacks stop adding callbacks on other connections
1476 do {
1477 // Run any pending callbacks for each connection...
1478 $count = 0; // callback execution attempts
1480 function ( Database $conn ) use ( $type, &$e, &$count ) {
1481 if ( $conn->trxLevel() ) {
1482 return; // retry in the next iteration, after commit() is called
1483 }
1484 try {
1485 $count += $conn->runOnTransactionIdleCallbacks( $type );
1486 } catch ( Exception $ex ) {
1487 $e = $e ?: $ex;
1488 }
1489 }
1490 );
1491 // Clear out any active transactions left over from callbacks...
1492 $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$e, $fname ) {
1493 if ( $conn->writesPending() ) {
1494 // A callback from another handle wrote to this one and DBO_TRX is set
1495 $this->queryLogger->warning( $fname . ": found writes pending." );
1496 $fnames = implode( ', ', $conn->pendingWriteAndCallbackCallers() );
1497 $this->queryLogger->warning(
1498 $fname . ": found writes pending ($fnames).",
1499 [
1500 'db_server' => $conn->getServer(),
1501 'db_name' => $conn->getDBname()
1502 ]
1503 );
1504 } elseif ( $conn->trxLevel() ) {
1505 // A callback from another handle read from this one and DBO_TRX is set,
1506 // which can easily happen if there is only one DB (no replicas)
1507 $this->queryLogger->debug( $fname . ": found empty transaction." );
1508 }
1509 try {
1510 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1511 } catch ( Exception $ex ) {
1512 $e = $e ?: $ex;
1513 }
1514 } );
1515 } while ( $count > 0 );
1516
1517 $this->trxRoundStage = $oldStage;
1518
1519 return $e;
1520 }
1521
1523 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1524 $type = IDatabase::TRIGGER_COMMIT;
1525 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1526 $type = IDatabase::TRIGGER_ROLLBACK;
1527 } else {
1528 throw new DBTransactionError(
1529 null,
1530 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1531 );
1532 }
1533
1534 $e = null;
1535
1536 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1537 $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
1538 try {
1540 } catch ( Exception $ex ) {
1541 $e = $e ?: $ex;
1542 }
1543 } );
1544 $this->trxRoundStage = self::ROUND_CURSORY;
1545
1546 return $e;
1547 }
1548
1549 public function rollbackMasterChanges( $fname = __METHOD__ ) {
1550 $restore = ( $this->trxRoundId !== false );
1551 $this->trxRoundId = false;
1552 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1553 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
1554 $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1555 } );
1556 if ( $restore ) {
1557 // Unmark handles as participating in this explicit transaction round
1558 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1559 $this->undoTransactionRoundFlags( $conn );
1560 } );
1561 }
1562 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
1563 }
1564
1568 private function assertTransactionRoundStage( $stage ) {
1569 $stages = (array)$stage;
1570
1571 if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
1572 $stageList = implode(
1573 '/',
1574 array_map( function ( $v ) {
1575 return "'$v'";
1576 }, $stages )
1577 );
1578 throw new DBTransactionError(
1579 null,
1580 "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
1581 );
1582 }
1583 }
1584
1594 private function applyTransactionRoundFlags( Database $conn ) {
1595 if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
1596 return; // transaction rounds do not apply to these connections
1597 }
1598
1599 if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1600 // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1601 // Force DBO_TRX even in CLI mode since a commit round is expected soon.
1602 $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
1603 }
1604
1605 if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1606 $conn->setLBInfo( 'trxRoundId', $this->trxRoundId );
1607 }
1608 }
1609
1613 private function undoTransactionRoundFlags( Database $conn ) {
1614 if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
1615 return; // transaction rounds do not apply to these connections
1616 }
1617
1618 if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1619 $conn->setLBInfo( 'trxRoundId', false );
1620 }
1621
1622 if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1623 $conn->restoreFlags( $conn::RESTORE_PRIOR );
1624 }
1625 }
1626
1627 public function flushReplicaSnapshots( $fname = __METHOD__ ) {
1628 $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) use ( $fname ) {
1629 $conn->flushSnapshot( $fname );
1630 } );
1631 }
1632
1633 public function flushMasterSnapshots( $fname = __METHOD__ ) {
1634 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
1635 $conn->flushSnapshot( $fname );
1636 } );
1637 }
1638
1643 public function getTransactionRoundStage() {
1644 return $this->trxRoundStage;
1645 }
1646
1647 public function hasMasterConnection() {
1648 return $this->isOpen( $this->getWriterIndex() );
1649 }
1650
1651 public function hasMasterChanges() {
1652 $pending = 0;
1653 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$pending ) {
1654 $pending |= $conn->writesOrCallbacksPending();
1655 } );
1656
1657 return (bool)$pending;
1658 }
1659
1660 public function lastMasterChangeTimestamp() {
1661 $lastTime = false;
1662 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$lastTime ) {
1663 $lastTime = max( $lastTime, $conn->lastDoneWrites() );
1664 } );
1665
1666 return $lastTime;
1667 }
1668
1669 public function hasOrMadeRecentMasterChanges( $age = null ) {
1670 $age = ( $age === null ) ? $this->waitTimeout : $age;
1671
1672 return ( $this->hasMasterChanges()
1673 || $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
1674 }
1675
1676 public function pendingMasterChangeCallers() {
1677 $fnames = [];
1678 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$fnames ) {
1679 $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1680 } );
1681
1682 return $fnames;
1683 }
1684
1685 public function getLaggedReplicaMode( $domain = false ) {
1686 // No-op if there is only one DB (also avoids recursion)
1687 if ( !$this->laggedReplicaMode && $this->getServerCount() > 1 ) {
1688 try {
1689 // See if laggedReplicaMode gets set
1690 $conn = $this->getConnection( self::DB_REPLICA, false, $domain );
1691 $this->reuseConnection( $conn );
1692 } catch ( DBConnectionError $e ) {
1693 // Avoid expensive re-connect attempts and failures
1694 $this->allReplicasDownMode = true;
1695 $this->laggedReplicaMode = true;
1696 }
1697 }
1698
1700 }
1701
1702 public function laggedReplicaUsed() {
1704 }
1705
1711 public function laggedSlaveUsed() {
1712 return $this->laggedReplicaUsed();
1713 }
1714
1715 public function getReadOnlyReason( $domain = false, IDatabase $conn = null ) {
1716 if ( $this->readOnlyReason !== false ) {
1717 return $this->readOnlyReason;
1718 } elseif ( $this->getLaggedReplicaMode( $domain ) ) {
1719 if ( $this->allReplicasDownMode ) {
1720 return 'The database has been automatically locked ' .
1721 'until the replica database servers become available';
1722 } else {
1723 return 'The database has been automatically locked ' .
1724 'while the replica database servers catch up to the master.';
1725 }
1726 } elseif ( $this->masterRunningReadOnly( $domain, $conn ) ) {
1727 return 'The database master is running in read-only mode.';
1728 }
1729
1730 return false;
1731 }
1732
1738 private function masterRunningReadOnly( $domain, IDatabase $conn = null ) {
1740 $masterServer = $this->getServerName( $this->getWriterIndex() );
1741
1742 return (bool)$cache->getWithSetCallback(
1743 $cache->makeGlobalKey( __CLASS__, 'server-read-only', $masterServer ),
1744 self::TTL_CACHE_READONLY,
1745 function () use ( $domain, $conn ) {
1746 $old = $this->trxProfiler->setSilenced( true );
1747 try {
1748 $dbw = $conn ?: $this->getConnection( self::DB_MASTER, [], $domain );
1749 $readOnly = (int)$dbw->serverIsReadOnly();
1750 if ( !$conn ) {
1751 $this->reuseConnection( $dbw );
1752 }
1753 } catch ( DBError $e ) {
1754 $readOnly = 0;
1755 }
1756 $this->trxProfiler->setSilenced( $old );
1757 return $readOnly;
1758 },
1759 [ 'pcTTL' => $cache::TTL_PROC_LONG, 'busyValue' => 0 ]
1760 );
1761 }
1762
1763 public function allowLagged( $mode = null ) {
1764 if ( $mode === null ) {
1765 return $this->allowLagged;
1766 }
1767 $this->allowLagged = $mode;
1768
1769 return $this->allowLagged;
1770 }
1771
1772 public function pingAll() {
1773 $success = true;
1774 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$success ) {
1775 if ( !$conn->ping() ) {
1776 $success = false;
1777 }
1778 } );
1779
1780 return $success;
1781 }
1782
1783 public function forEachOpenConnection( $callback, array $params = [] ) {
1784 foreach ( $this->conns as $connsByServer ) {
1785 foreach ( $connsByServer as $serverConns ) {
1786 foreach ( $serverConns as $conn ) {
1787 $callback( $conn, ...$params );
1788 }
1789 }
1790 }
1791 }
1792
1793 public function forEachOpenMasterConnection( $callback, array $params = [] ) {
1794 $masterIndex = $this->getWriterIndex();
1795 foreach ( $this->conns as $connsByServer ) {
1796 if ( isset( $connsByServer[$masterIndex] ) ) {
1798 foreach ( $connsByServer[$masterIndex] as $conn ) {
1799 $callback( $conn, ...$params );
1800 }
1801 }
1802 }
1803 }
1804
1805 public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
1806 foreach ( $this->conns as $connsByServer ) {
1807 foreach ( $connsByServer as $i => $serverConns ) {
1808 if ( $i === $this->getWriterIndex() ) {
1809 continue; // skip master
1810 }
1811 foreach ( $serverConns as $conn ) {
1812 $callback( $conn, ...$params );
1813 }
1814 }
1815 }
1816 }
1817
1818 public function getMaxLag( $domain = false ) {
1819 $maxLag = -1;
1820 $host = '';
1821 $maxIndex = 0;
1822
1823 if ( $this->getServerCount() <= 1 ) {
1824 return [ $host, $maxLag, $maxIndex ]; // no replication = no lag
1825 }
1826
1827 $lagTimes = $this->getLagTimes( $domain );
1828 foreach ( $lagTimes as $i => $lag ) {
1829 if ( $this->loads[$i] > 0 && $lag > $maxLag ) {
1830 $maxLag = $lag;
1831 $host = $this->servers[$i]['host'];
1832 $maxIndex = $i;
1833 }
1834 }
1835
1836 return [ $host, $maxLag, $maxIndex ];
1837 }
1838
1839 public function getLagTimes( $domain = false ) {
1840 if ( $this->getServerCount() <= 1 ) {
1841 return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
1842 }
1843
1844 $knownLagTimes = []; // map of (server index => 0 seconds)
1845 $indexesWithLag = [];
1846 foreach ( $this->servers as $i => $server ) {
1847 if ( empty( $server['is static'] ) ) {
1848 $indexesWithLag[] = $i; // DB server might have replication lag
1849 } else {
1850 $knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
1851 }
1852 }
1853
1854 return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
1855 }
1856
1857 public function safeGetLag( IDatabase $conn ) {
1858 if ( $this->getServerCount() <= 1 ) {
1859 return 0;
1860 } else {
1861 return $conn->getLag();
1862 }
1863 }
1864
1865 public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
1866 $timeout = max( 1, $timeout ?: $this->waitTimeout );
1867
1868 if ( $this->getServerCount() <= 1 || !$conn->getLBInfo( 'replica' ) ) {
1869 return true; // server is not a replica DB
1870 }
1871
1872 if ( !$pos ) {
1873 // Get the current master position, opening a connection if needed
1874 $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1875 if ( $masterConn ) {
1876 $pos = $masterConn->getMasterPos();
1877 } else {
1878 $masterConn = $this->openConnection( $this->getWriterIndex(), self::DOMAIN_ANY );
1879 if ( !$masterConn ) {
1880 throw new DBReplicationWaitError(
1881 null,
1882 "Could not obtain a master database connection to get the position"
1883 );
1884 }
1885 $pos = $masterConn->getMasterPos();
1886 $this->closeConnection( $masterConn );
1887 }
1888 }
1889
1890 if ( $pos instanceof DBMasterPos ) {
1891 $result = $conn->masterPosWait( $pos, $timeout );
1892 if ( $result == -1 || is_null( $result ) ) {
1893 $msg = __METHOD__ . ': timed out waiting on {host} pos {pos}';
1894 $this->replLogger->warning( $msg, [
1895 'host' => $conn->getServer(),
1896 'pos' => $pos,
1897 'trace' => ( new RuntimeException() )->getTraceAsString()
1898 ] );
1899 $ok = false;
1900 } else {
1901 $this->replLogger->debug( __METHOD__ . ': done waiting' );
1902 $ok = true;
1903 }
1904 } else {
1905 $ok = false; // something is misconfigured
1906 $this->replLogger->error(
1907 __METHOD__ . ': could not get master pos for {host}',
1908 [
1909 'host' => $conn->getServer(),
1910 'trace' => ( new RuntimeException() )->getTraceAsString()
1911 ]
1912 );
1913 }
1914
1915 return $ok;
1916 }
1917
1918 public function setTransactionListener( $name, callable $callback = null ) {
1919 if ( $callback ) {
1920 $this->trxRecurringCallbacks[$name] = $callback;
1921 } else {
1922 unset( $this->trxRecurringCallbacks[$name] );
1923 }
1925 function ( IDatabase $conn ) use ( $name, $callback ) {
1926 $conn->setTransactionListener( $name, $callback );
1927 }
1928 );
1929 }
1930
1931 public function setTableAliases( array $aliases ) {
1932 $this->tableAliases = $aliases;
1933 }
1934
1935 public function setIndexAliases( array $aliases ) {
1936 $this->indexAliases = $aliases;
1937 }
1938
1943 public function setDomainPrefix( $prefix ) {
1944 $this->setLocalDomainPrefix( $prefix );
1945 }
1946
1947 public function setLocalDomainPrefix( $prefix ) {
1948 // Find connections to explicit foreign domains still marked as in-use...
1949 $domainsInUse = [];
1950 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$domainsInUse ) {
1951 // Once reuseConnection() is called on a handle, its reference count goes from 1 to 0.
1952 // Until then, it is still in use by the caller (explicitly or via DBConnRef scope).
1953 if ( $conn->getLBInfo( 'foreignPoolRefCount' ) > 0 ) {
1954 $domainsInUse[] = $conn->getDomainID();
1955 }
1956 } );
1957
1958 // Do not switch connections to explicit foreign domains unless marked as safe
1959 if ( $domainsInUse ) {
1960 $domains = implode( ', ', $domainsInUse );
1961 throw new DBUnexpectedError( null,
1962 "Foreign domain connections are still in use ($domains)." );
1963 }
1964
1965 $this->setLocalDomain( new DatabaseDomain(
1966 $this->localDomain->getDatabase(),
1967 $this->localDomain->getSchema(),
1968 $prefix
1969 ) );
1970
1971 // Update the prefix for all local connections...
1972 $this->forEachOpenConnection( function ( IDatabase $db ) use ( $prefix ) {
1973 if ( !$db->getLBInfo( 'foreign' ) ) {
1974 $db->tablePrefix( $prefix );
1975 }
1976 } );
1977 }
1978
1979 public function redefineLocalDomain( $domain ) {
1980 $this->closeAll();
1981
1982 $this->setLocalDomain( DatabaseDomain::newFromId( $domain ) );
1983 }
1984
1988 private function setLocalDomain( DatabaseDomain $domain ) {
1989 $this->localDomain = $domain;
1990 // In case a caller assumes that the domain ID is simply <db>-<prefix>, which is almost
1991 // always true, gracefully handle the case when they fail to account for escaping.
1992 if ( $this->localDomain->getTablePrefix() != '' ) {
1993 $this->localDomainIdAlias =
1994 $this->localDomain->getDatabase() . '-' . $this->localDomain->getTablePrefix();
1995 } else {
1996 $this->localDomainIdAlias = $this->localDomain->getDatabase();
1997 }
1998 }
1999
2000 function __destruct() {
2001 // Avoid connection leaks for sanity
2002 $this->disable();
2003 }
2004}
2005
2009class_alias( LoadBalancer::class, 'LoadBalancer' );
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
if(defined( 'MW_SETUP_CALLBACK')) $fname
Customization point after all loading (constants, functions, classes, DefaultSettings,...
Definition Setup.php:123
A collection of static methods to play with arrays.
static pickRandom( $weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:58
A BagOStuff object with no objects in it.
Multi-datacenter aware caching interface.
Exception class for attempted DB access.
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Definition DBConnRef.php:14
Database error base class.
Definition DBError.php:30
Exception class for replica DB wait errors.
Class to handle database/prefix specification for IDatabase domains.
Relational database abstraction object.
Definition Database.php:49
runOnTransactionPreCommitCallbacks()
Actually consume and run any "on transaction pre-commit" callbacks.
runTransactionListenerCallbacks( $trigger)
Actually run any "transaction listener" callbacks.
restoreFlags( $state=self::RESTORE_PRIOR)
Restore the flags to their prior state before the last setFlag/clearFlag call.
Definition Database.php:827
setTrxEndCallbackSuppression( $suppress)
Whether to disable running of post-COMMIT/ROLLBACK callbacks.
static factory( $dbType, $p=[], $connect=self::NEW_CONNECTED)
Construct a Database subclass instance given a database type and parameters.
Definition Database.php:437
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
Definition Database.php:645
trxLevel()
Gets the current transaction level.
Definition Database.php:591
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
Definition Database.php:805
static attributesFromType( $dbType, $driver=null)
Definition Database.php:492
setLBInfo( $name, $value=null)
Set the LB info array, or a member of it.
Definition Database.php:657
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
Definition Database.php:840
Database connection, tracking, load balancing, and transaction manager for a cluster.
array[] $servers
Map of (server index => server config array)
masterRunningReadOnly( $domain, IDatabase $conn=null)
undoTransactionRoundFlags(Database $conn)
approveMasterChanges(array $options)
Perform all pre-commit checks for things like replication safety.
bool $allowLagged
Whether to disregard replica DB lag as a factor in replica DB selection.
mixed $profiler
Class name or object With profileIn/profileOut methods.
getAnyOpenConnection( $i, $flags=0)
Get any open connection to a given server index, local or foreign.
callable $errorLogger
Exception logger.
int $waitTimeout
Seconds to spend waiting on replica DB lag to resolve.
bool DBMasterPos $waitForPos
False if not set.
callable $deprecationLogger
Deprecation logger.
flushReplicaSnapshots( $fname=__METHOD__)
Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshots.
setLocalDomain(DatabaseDomain $domain)
getLocalDomainID()
Get the local (and default) database domain ID of connection handles.
getConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a database connection handle reference.
string $trxRoundStage
Stage of the current transaction round in the transaction round life-cycle.
getServerType( $i)
Get DB type of the server with the specified index.
flushMasterSnapshots( $fname=__METHOD__)
Commit all master DB transactions so as to flush any REPEATABLE-READ or SSI snapshots.
setTableAliases(array $aliases)
Make certain table names use their own database, schema, and table prefix when passed into SQL querie...
applyTransactionRoundFlags(Database $conn)
Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round.
commitAll( $fname=__METHOD__)
Commit transactions on all open connections.
string $agent
Agent name for query profiling.
waitFor( $pos)
Set the master wait position.
getMasterPos()
Get the current master position for chronology control purposes.
finalizeMasterChanges()
Run pre-commit callbacks and defer execution of post-commit callbacks.
bool $laggedReplicaMode
Whether the generic reader fell back to a lagged replica DB.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
redefineLocalDomain( $domain)
Close all connection and redefine the local domain for testing or schema creation.
array[] $trxRecurringCallbacks
Map of (name => callable)
pickReaderIndex(array $loads, $domain=false)
rollbackMasterChanges( $fname=__METHOD__)
Issue ROLLBACK only on master, only if queries were done on connection.
openForeignConnection( $i, $domain, $flags=0)
Open a connection to a foreign DB, or return one if it is already open.
laggedReplicaUsed()
Checks whether the database for generic connections this request was both:
doWait( $index, $open=false, $timeout=null)
Wait for a given replica DB to catch up to the master pos stored in $this.
Database[][][] $conns
Map of (connection category => server index => IDatabase[])
string[] $indexAliases
Map of (index alias => index)
isOpen( $index)
Test if the specified index represents an open connection.
array $loadMonitorConfig
The LoadMonitor configuration.
array[] $groupLoads
Map of (group => server index => weight)
isNonZeroLoad( $i)
Returns true if the specified index is valid and has non-zero load.
runMasterTransactionIdleCallbacks()
Consume and run all pending post-COMMIT/ROLLBACK callbacks and commit dangling transactions.
getLoadMonitor()
Get a LoadMonitor instance.
setTransactionListener( $name, callable $callback=null)
Set a callback via IDatabase::setTransactionListener() on all current and future master connections o...
forEachOpenReplicaConnection( $callback, array $params=[])
Call a function with each open replica DB connection object.
lastMasterChangeTimestamp()
Get the timestamp of the latest write query done by this thread.
beginMasterChanges( $fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
reuseConnection(IDatabase $conn)
Mark a foreign connection as being available for reuse under a different DB domain.
closeAll()
Close all open connections.
getLagTimes( $domain=false)
Get an estimate of replication lag (in seconds) for each server.
string bool $trxRoundId
String if a requested DBO_TRX transaction round is active.
safeWaitForMasterPos(IDatabase $conn, $pos=false, $timeout=null)
Wait for a replica DB to reach a specified master position.
closeConnection(IDatabase $conn)
Close a connection.
runMasterTransactionListenerCallbacks()
Run all recurring post-COMMIT/ROLLBACK listener callbacks.
getLazyConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a database connection handle reference without connecting yet.
getReadOnlyReason( $domain=false, IDatabase $conn=null)
getRandomNonLagged(array $loads, $domain=false, $maxLag=INF)
setLocalDomainPrefix( $prefix)
Set a new table prefix for the existing local domain ID for testing.
bool $cliMode
Whether this PHP instance is for a CLI script.
getMaintenanceConnectionRef( $i, $groups=[], $domain=false, $flags=0)
Get a maintenance database connection handle reference for migrations and schema changes.
safeGetLag(IDatabase $conn)
Get the lag in seconds for a given connection, or zero if this load balancer does not have replicatio...
openConnection( $i, $domain=false, $flags=0)
Open a connection to the server given by the specified index.
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
string $hostname
Current server name.
disable()
Disable this load balancer.
getReaderIndex( $group=false, $domain=false)
Get the index of the reader connection, which may be a replica DB.
DatabaseDomain $localDomain
Local Domain ID and default for selectDB() calls.
float[] $loads
Map of (server index => weight)
setIndexAliases(array $aliases)
Convert certain index names to alternative names before querying the DB.
getConnection( $i, $groups=[], $domain=false, $flags=0)
Get a connection handle by server index.
TransactionProfiler $trxProfiler
getServerName( $i)
Get the host name or IP address of the server with the specified index.
commitMasterChanges( $fname=__METHOD__)
Issue COMMIT on all open master connections to flush changes and view snapshots.
getLaggedReplicaMode( $domain=false)
forEachOpenConnection( $callback, array $params=[])
Call a function with each open connection object.
haveIndex( $i)
Returns true if the specified index is a valid server index.
reallyOpenConnection(array $server, DatabaseDomain $domain)
Open a new network connection to a server (uncached)
int $connsOpened
Total connections opened.
pendingMasterChangeCallers()
Get the list of callers that have pending master changes.
bool $connectionAttempted
Whether any connection has been attempted yet.
hasMasterChanges()
Whether there are pending changes or callbacks in a transaction by this thread.
openLocalConnection( $i, $flags=0)
Open a connection to a local DB, or return one if it is already open.
callable null $chronologyCallback
Callback to run before the first connection attempt.
getMaxLag( $domain=false)
Get the hostname and lag time of the most-lagged replica DB.
Database $errorConnection
DB connection object that caused a problem.
int $readIndex
The generic (not query grouped) replica DB index (of $mServers)
bool $allReplicasDownMode
Whether the generic reader fell back to a lagged replica DB.
string $lastError
The last DB selection or connection error.
string $localDomainIdAlias
Alternate ID string for the domain instead of DatabaseDomain::getId()
getServerInfo( $i)
Return the server info structure for a given index, or false if the index is invalid.
allowLagged( $mode=null)
Disables/enables lag checks.
getServerCount()
Get the number of defined servers (not the number of open connections)
string bool $readOnlyReason
Reason the LB is read-only or false if not.
waitForOne( $pos, $timeout=null)
Set the master wait position and wait for a "generic" replica DB to catch up to it.
waitForAll( $pos, $timeout=null)
Set the master wait position and wait for ALL replica DBs to catch up to it.
__construct(array $params)
Construct a manager of IDatabase connection objects.
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Helper class that detects high-contention DB queries via profiling calls.
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition deferred.txt:11
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
see documentation in includes Linker php for Linker::makeImageLink & $time
Definition hooks.txt:1802
The index of the header message $result[1]=The index of the body text message $result[2 through n]=Parameters passed to body text message. Please note the header message cannot receive/use parameters. 'ImgAuthModifyHeaders':Executed just before a file is streamed to a user via img_auth.php, allowing headers to be modified beforehand. $title:LinkTarget object & $headers:HTTP headers(name=> value, names are case insensitive). Two headers get special handling:If-Modified-Since(value must be a valid HTTP date) and Range(must be of the form "bytes=(\d*-\d*)") will be honored when streaming the file. 'ImportHandleLogItemXMLTag':When parsing a XML tag in a log item. Return false to stop further processing of the tag $reader:XMLReader object $logInfo:Array of information 'ImportHandlePageXMLTag':When parsing a XML tag in a page. Return false to stop further processing of the tag $reader:XMLReader object & $pageInfo:Array of information 'ImportHandleRevisionXMLTag':When parsing a XML tag in a page revision. Return false to stop further processing of the tag $reader:XMLReader object $pageInfo:Array of page information $revisionInfo:Array of revision information 'ImportHandleToplevelXMLTag':When parsing a top level XML tag. Return false to stop further processing of the tag $reader:XMLReader object 'ImportHandleUnknownUser':When a user doesn 't exist locally, this hook is called to give extensions an opportunity to auto-create it. If the auto-creation is successful, return false. $name:User name 'ImportHandleUploadXMLTag':When parsing a XML tag in a file upload. Return false to stop further processing of the tag $reader:XMLReader object $revisionInfo:Array of information 'ImportLogInterwikiLink':Hook to change the interwiki link used in log entries and edit summaries for transwiki imports. & $fullInterwikiPrefix:Interwiki prefix, may contain colons. & $pageTitle:String that contains page title. 'ImportSources':Called when reading from the $wgImportSources configuration variable. Can be used to lazy-load the import sources list. & $importSources:The value of $wgImportSources. Modify as necessary. See the comment in DefaultSettings.php for the detail of how to structure this array. 'InfoAction':When building information to display on the action=info page. $context:IContextSource object & $pageInfo:Array of information 'InitializeArticleMaybeRedirect':MediaWiki check to see if title is a redirect. & $title:Title object for the current page & $request:WebRequest & $ignoreRedirect:boolean to skip redirect check & $target:Title/string of redirect target & $article:Article object 'InternalParseBeforeLinks':during Parser 's internalParse method before links but after nowiki/noinclude/includeonly/onlyinclude and other processings. & $parser:Parser object & $text:string containing partially parsed text & $stripState:Parser 's internal StripState object 'InternalParseBeforeSanitize':during Parser 's internalParse method just before the parser removes unwanted/dangerous HTML tags and after nowiki/noinclude/includeonly/onlyinclude and other processings. Ideal for syntax-extensions after template/parser function execution which respect nowiki and HTML-comments. & $parser:Parser object & $text:string containing partially parsed text & $stripState:Parser 's internal StripState object 'InterwikiLoadPrefix':When resolving if a given prefix is an interwiki or not. Return true without providing an interwiki to continue interwiki search. $prefix:interwiki prefix we are looking for. & $iwData:output array describing the interwiki with keys iw_url, iw_local, iw_trans and optionally iw_api and iw_wikiid. 'InvalidateEmailComplete':Called after a user 's email has been invalidated successfully. $user:user(object) whose email is being invalidated 'IRCLineURL':When constructing the URL to use in an IRC notification. Callee may modify $url and $query, URL will be constructed as $url . $query & $url:URL to index.php & $query:Query string $rc:RecentChange object that triggered url generation 'IsFileCacheable':Override the result of Article::isFileCacheable()(if true) & $article:article(object) being checked 'IsTrustedProxy':Override the result of IP::isTrustedProxy() & $ip:IP being check & $result:Change this value to override the result of IP::isTrustedProxy() 'IsUploadAllowedFromUrl':Override the result of UploadFromUrl::isAllowedUrl() $url:URL used to upload from & $allowed:Boolean indicating if uploading is allowed for given URL 'isValidEmailAddr':Override the result of Sanitizer::validateEmail(), for instance to return false if the domain name doesn 't match your organization. $addr:The e-mail address entered by the user & $result:Set this and return false to override the internal checks 'isValidPassword':Override the result of User::isValidPassword() $password:The password entered by the user & $result:Set this and return false to override the internal checks $user:User the password is being validated for 'Language::getMessagesFileName':$code:The language code or the language we 're looking for a messages file for & $file:The messages file path, you can override this to change the location. 'LanguageGetNamespaces':Provide custom ordering for namespaces or remove namespaces. Do not use this hook to add namespaces. Use CanonicalNamespaces for that. & $namespaces:Array of namespaces indexed by their numbers 'LanguageGetTranslatedLanguageNames':Provide translated language names. & $names:array of language code=> language name $code:language of the preferred translations 'LanguageLinks':Manipulate a page 's language links. This is called in various places to allow extensions to define the effective language links for a page. $title:The page 's Title. & $links:Array with elements of the form "language:title" in the order that they will be output. & $linkFlags:Associative array mapping prefixed links to arrays of flags. Currently unused, but planned to provide support for marking individual language links in the UI, e.g. for featured articles. 'LanguageSelector':Hook to change the language selector available on a page. $out:The output page. $cssClassName:CSS class name of the language selector. 'LinkBegin':DEPRECATED since 1.28! Use HtmlPageLinkRendererBegin instead. Used when generating internal and interwiki links in Linker::link(), before processing starts. Return false to skip default processing and return $ret. See documentation for Linker::link() for details on the expected meanings of parameters. $skin:the Skin object $target:the Title that the link is pointing to & $html:the contents that the< a > tag should have(raw HTML) $result
Definition hooks.txt:1991
This code would result in ircNotify being run twice when an article is and once for brion Hooks can return three possible true was required This is the default since MediaWiki *some string
Definition hooks.txt:181
null means default in associative array with keys and values unescaped Should be merged with default with a value of false meaning to suppress the attribute in associative array with keys and values unescaped & $options
Definition hooks.txt:1999
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction you ll probably need to make sure the header is varied on and they can depend only on the ResourceLoaderContext $context
Definition hooks.txt:2848
this hook is for auditing only or null if authentication failed before getting that far or null if we can t even determine that When $user is not null
Definition hooks.txt:783
Allows to change the fields on the form that will be generated $name
Definition hooks.txt:271
processing should stop and the error should be shown to the user * false
Definition hooks.txt:187
returning false will NOT prevent logging $e
Definition hooks.txt:2175
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback function
Definition injection.txt:30
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition injection.txt:37
An object representing a master or replica DB position in a replicated setup.
hasReached(DBMasterPos $pos)
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
lastDoneWrites()
Returns the last time the connection may have been used for write queries.
getDomainID()
Return the currently selected domain ID.
assertNoOpenTransactions()
Assert that all explicit transactions or atomic sections have been closed.
getServer()
Get the server hostname or IP address.
isOpen()
Is a connection to the database open?
setTransactionListener( $name, callable $callback=null)
Run a callback after each time any transaction commits or rolls back.
setLBInfo( $name, $value=null)
Set the LB info array, or a member of it.
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
flushSnapshot( $fname=__METHOD__)
Commit any transaction but error out if writes or callbacks are pending.
pendingWriteQueryDuration( $type=self::ESTIMATE_TOTAL)
Get the time spend running write queries for this transaction.
getLag()
Get the amount of replication lag for this database server.
rollback( $fname=__METHOD__, $flush='')
Rollback a transaction previously started using begin().
close()
Close the database connection.
ping(&$rtt=null)
Ping the server and try to reconnect if it there is no connection.
masterPosWait(DBMasterPos $pos, $timeout)
Wait for the replica DB to catch up to a given master position.
commit( $fname=__METHOD__, $flush='')
Commits a transaction previously started using begin().
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...
pendingWriteCallers()
Get the list of method names that did write queries for this transaction.
Database cluster connection, tracking, load balancing, and transaction manager interface.
An interface for database load monitoring.
$cache
Definition mcc.php:33
storage can be distributed across multiple servers
Definition memcached.txt:33
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
const DB_REPLICA
Definition defines.php:25
const DB_MASTER
Definition defines.php:26
$params