MediaWiki REL1_32
LoadBalancer.php
Go to the documentation of this file.
1<?php
22namespace Wikimedia\Rdbms;
23
24use Psr\Log\LoggerInterface;
25use Psr\Log\NullLogger;
26use Wikimedia\ScopedCallback;
31use UnexpectedValueException;
32use InvalidArgumentException;
33use RuntimeException;
34use Exception;
35
41class LoadBalancer implements ILoadBalancer {
43 private $loadMonitor;
47 private $srvCache;
49 private $wanCache;
51 private $profiler;
53 private $trxProfiler;
55 private $replLogger;
57 private $connLogger;
59 private $queryLogger;
61 private $perfLogger;
63 private $errorLogger;
66
68 private $localDomain;
69
71 private $conns;
72
74 private $servers;
76 private $loads;
78 private $groupLoads;
80 private $allowLagged;
82 private $waitTimeout;
88 private $maxLag = self::MAX_LAG_DEFAULT;
89
91 private $hostname;
93 private $cliMode;
95 private $agent;
96
98 private $tableAliases = [];
100 private $indexAliases = [];
103
107 private $readIndex;
109 private $waitForPos;
111 private $laggedReplicaMode = false;
113 private $allReplicasDownMode = false;
115 private $lastError = 'Unknown error';
117 private $readOnlyReason = false;
119 private $connsOpened = 0;
121 private $disabled = false;
123 private $connectionAttempted = false;
124
126 private $trxRoundId = false;
128 private $trxRoundStage = self::ROUND_CURSORY;
129
131 private $defaultGroup = null;
132
134 const CONN_HELD_WARN_THRESHOLD = 10;
135
137 const MAX_LAG_DEFAULT = 10;
139 const MAX_WAIT_DEFAULT = 10;
141 const TTL_CACHE_READONLY = 5;
142
143 const KEY_LOCAL = 'local';
144 const KEY_FOREIGN_FREE = 'foreignFree';
145 const KEY_FOREIGN_INUSE = 'foreignInUse';
146
147 const KEY_LOCAL_NOROUND = 'localAutoCommit';
148 const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
149 const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
150
152 const ROUND_CURSORY = 'cursory';
154 const ROUND_FINALIZED = 'finalized';
156 const ROUND_APPROVED = 'approved';
158 const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
160 const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
162 const ROUND_ERROR = 'error';
163
164 public function __construct( array $params ) {
165 if ( !isset( $params['servers'] ) ) {
166 throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
167 }
168 $this->servers = $params['servers'];
169 foreach ( $this->servers as $i => $server ) {
170 if ( $i == 0 ) {
171 $this->servers[$i]['master'] = true;
172 } else {
173 $this->servers[$i]['replica'] = true;
174 }
175 }
176
177 $localDomain = isset( $params['localDomain'] )
178 ? DatabaseDomain::newFromId( $params['localDomain'] )
181
182 $this->waitTimeout = $params['waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
183
184 $this->readIndex = -1;
185 $this->conns = [
186 // Connection were transaction rounds may be applied
187 self::KEY_LOCAL => [],
188 self::KEY_FOREIGN_INUSE => [],
189 self::KEY_FOREIGN_FREE => [],
190 // Auto-committing counterpart connections that ignore transaction rounds
191 self::KEY_LOCAL_NOROUND => [],
192 self::KEY_FOREIGN_INUSE_NOROUND => [],
193 self::KEY_FOREIGN_FREE_NOROUND => []
194 ];
195 $this->loads = [];
196 $this->waitForPos = false;
197 $this->allowLagged = false;
198
199 if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
200 $this->readOnlyReason = $params['readOnlyReason'];
201 }
202
203 if ( isset( $params['maxLag'] ) ) {
204 $this->maxLag = $params['maxLag'];
205 }
206
207 if ( isset( $params['loadMonitor'] ) ) {
208 $this->loadMonitorConfig = $params['loadMonitor'];
209 } else {
210 $this->loadMonitorConfig = [ 'class' => 'LoadMonitorNull' ];
211 }
212 $this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ];
213
214 foreach ( $params['servers'] as $i => $server ) {
215 $this->loads[$i] = $server['load'];
216 if ( isset( $server['groupLoads'] ) ) {
217 foreach ( $server['groupLoads'] as $group => $ratio ) {
218 if ( !isset( $this->groupLoads[$group] ) ) {
219 $this->groupLoads[$group] = [];
220 }
221 $this->groupLoads[$group][$i] = $ratio;
222 }
223 }
224 }
225
226 if ( isset( $params['srvCache'] ) ) {
227 $this->srvCache = $params['srvCache'];
228 } else {
229 $this->srvCache = new EmptyBagOStuff();
230 }
231 if ( isset( $params['wanCache'] ) ) {
232 $this->wanCache = $params['wanCache'];
233 } else {
234 $this->wanCache = WANObjectCache::newEmpty();
235 }
236 $this->profiler = $params['profiler'] ?? null;
237 if ( isset( $params['trxProfiler'] ) ) {
238 $this->trxProfiler = $params['trxProfiler'];
239 } else {
240 $this->trxProfiler = new TransactionProfiler();
241 }
242
243 $this->errorLogger = $params['errorLogger'] ?? function ( Exception $e ) {
244 trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
245 };
246 $this->deprecationLogger = $params['deprecationLogger'] ?? function ( $msg ) {
247 trigger_error( $msg, E_USER_DEPRECATED );
248 };
249
250 foreach ( [ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ] as $key ) {
251 $this->$key = $params[$key] ?? new NullLogger();
252 }
253
254 $this->hostname = $params['hostname'] ?? ( gethostname() ?: 'unknown' );
255 $this->cliMode = $params['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
256 $this->agent = $params['agent'] ?? '';
257
258 if ( isset( $params['chronologyCallback'] ) ) {
259 $this->chronologyCallback = $params['chronologyCallback'];
260 }
261
262 if ( isset( $params['roundStage'] ) ) {
263 if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
264 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
265 } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
266 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
267 }
268 }
269
270 $this->defaultGroup = $params['defaultGroup'] ?? null;
271 }
272
273 public function getLocalDomainID() {
274 return $this->localDomain->getId();
275 }
276
277 public function resolveDomainID( $domain ) {
278 return ( $domain !== false ) ? (string)$domain : $this->getLocalDomainID();
279 }
280
286 private function getLoadMonitor() {
287 if ( !isset( $this->loadMonitor ) ) {
288 $compat = [
289 'LoadMonitor' => LoadMonitor::class,
290 'LoadMonitorNull' => LoadMonitorNull::class,
291 'LoadMonitorMySQL' => LoadMonitorMySQL::class,
292 ];
293
294 $class = $this->loadMonitorConfig['class'];
295 if ( isset( $compat[$class] ) ) {
296 $class = $compat[$class];
297 }
298
299 $this->loadMonitor = new $class(
300 $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
301 $this->loadMonitor->setLogger( $this->replLogger );
302 }
303
304 return $this->loadMonitor;
305 }
306
313 private function getRandomNonLagged( array $loads, $domain = false, $maxLag = INF ) {
314 $lags = $this->getLagTimes( $domain );
315
316 # Unset excessively lagged servers
317 foreach ( $lags as $i => $lag ) {
318 if ( $i != 0 ) {
319 # How much lag this server nominally is allowed to have
320 $maxServerLag = $this->servers[$i]['max lag'] ?? $this->maxLag; // default
321 # Constrain that futher by $maxLag argument
322 $maxServerLag = min( $maxServerLag, $maxLag );
323
324 $host = $this->getServerName( $i );
325 if ( $lag === false && !is_infinite( $maxServerLag ) ) {
326 $this->replLogger->error(
327 __METHOD__ .
328 ": server {host} is not replicating?", [ 'host' => $host ] );
329 unset( $loads[$i] );
330 } elseif ( $lag > $maxServerLag ) {
331 $this->replLogger->debug(
332 __METHOD__ .
333 ": server {host} has {lag} seconds of lag (>= {maxlag})",
334 [ 'host' => $host, 'lag' => $lag, 'maxlag' => $maxServerLag ]
335 );
336 unset( $loads[$i] );
337 }
338 }
339 }
340
341 # Find out if all the replica DBs with non-zero load are lagged
342 $sum = 0;
343 foreach ( $loads as $load ) {
344 $sum += $load;
345 }
346 if ( $sum == 0 ) {
347 # No appropriate DB servers except maybe the master and some replica DBs with zero load
348 # Do NOT use the master
349 # Instead, this function will return false, triggering read-only mode,
350 # and a lagged replica DB will be used instead.
351 return false;
352 }
353
354 if ( count( $loads ) == 0 ) {
355 return false;
356 }
357
358 # Return a random representative of the remainder
360 }
361
362 public function getReaderIndex( $group = false, $domain = false ) {
363 if ( count( $this->servers ) == 1 ) {
364 // Skip the load balancing if there's only one server
365 return $this->getWriterIndex();
366 } elseif ( $group === false && $this->readIndex >= 0 ) {
367 // Shortcut if the generic reader index was already cached
368 return $this->readIndex;
369 }
370
371 if ( $group !== false ) {
372 // Use the server weight array for this load group
373 if ( isset( $this->groupLoads[$group] ) ) {
374 $loads = $this->groupLoads[$group];
375 } else {
376 // No loads for this group, return false and the caller can use some other group
377 $this->connLogger->info( __METHOD__ . ": no loads for group $group" );
378
379 return false;
380 }
381 } else {
382 // Use the generic load group
384 }
385
386 // Scale the configured load ratios according to each server's load and state
387 $this->getLoadMonitor()->scaleLoads( $loads, $domain );
388
389 // Pick a server to use, accounting for weights, load, lag, and "waitForPos"
390 list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
391 if ( $i === false ) {
392 // Replica DB connection unsuccessful
393 return false;
394 }
395
396 if ( $this->waitForPos && $i != $this->getWriterIndex() ) {
397 // Before any data queries are run, wait for the server to catch up to the
398 // specified position. This is used to improve session consistency. Note that
399 // when LoadBalancer::waitFor() sets "waitForPos", the waiting triggers here,
400 // so update laggedReplicaMode as needed for consistency.
401 if ( !$this->doWait( $i ) ) {
402 $laggedReplicaMode = true;
403 }
404 }
405
406 if ( $this->readIndex <= 0 && $this->loads[$i] > 0 && $group === false ) {
407 // Cache the generic reader index for future ungrouped DB_REPLICA handles
408 $this->readIndex = $i;
409 // Record if the generic reader index is in "lagged replica DB" mode
410 if ( $laggedReplicaMode ) {
411 $this->laggedReplicaMode = true;
412 }
413 }
414
415 $serverName = $this->getServerName( $i );
416 $this->connLogger->debug( __METHOD__ . ": using server $serverName for group '$group'" );
417
418 return $i;
419 }
420
426 private function pickReaderIndex( array $loads, $domain = false ) {
427 if ( !count( $loads ) ) {
428 throw new InvalidArgumentException( "Empty server array given to LoadBalancer" );
429 }
430
432 $i = false;
434 $laggedReplicaMode = false;
435
436 // Quickly look through the available servers for a server that meets criteria...
437 $currentLoads = $loads;
438 while ( count( $currentLoads ) ) {
439 if ( $this->allowLagged || $laggedReplicaMode ) {
440 $i = ArrayUtils::pickRandom( $currentLoads );
441 } else {
442 $i = false;
443 if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
444 // "chronologyCallback" sets "waitForPos" for session consistency.
445 // This triggers doWait() after connect, so it's especially good to
446 // avoid lagged servers so as to avoid excessive delay in that method.
447 $ago = microtime( true ) - $this->waitForPos->asOfTime();
448 // Aim for <= 1 second of waiting (being too picky can backfire)
449 $i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
450 }
451 if ( $i === false ) {
452 // Any server with less lag than it's 'max lag' param is preferable
453 $i = $this->getRandomNonLagged( $currentLoads, $domain );
454 }
455 if ( $i === false && count( $currentLoads ) != 0 ) {
456 // All replica DBs lagged. Switch to read-only mode
457 $this->replLogger->error(
458 __METHOD__ . ": all replica DBs lagged. Switch to read-only mode" );
459 $i = ArrayUtils::pickRandom( $currentLoads );
460 $laggedReplicaMode = true;
461 }
462 }
463
464 if ( $i === false ) {
465 // pickRandom() returned false.
466 // This is permanent and means the configuration or the load monitor
467 // wants us to return false.
468 $this->connLogger->debug( __METHOD__ . ": pickRandom() returned false" );
469
470 return [ false, false ];
471 }
472
473 $serverName = $this->getServerName( $i );
474 $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
475
476 $conn = $this->openConnection( $i, $domain );
477 if ( !$conn ) {
478 $this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" );
479 unset( $currentLoads[$i] ); // avoid this server next iteration
480 $i = false;
481 continue;
482 }
483
484 // Decrement reference counter, we are finished with this connection.
485 // It will be incremented for the caller later.
486 if ( $domain !== false ) {
487 $this->reuseConnection( $conn );
488 }
489
490 // Return this server
491 break;
492 }
493
494 // If all servers were down, quit now
495 if ( !count( $currentLoads ) ) {
496 $this->connLogger->error( __METHOD__ . ": all servers down" );
497 }
498
499 return [ $i, $laggedReplicaMode ];
500 }
501
502 public function waitFor( $pos ) {
503 $oldPos = $this->waitForPos;
504 try {
505 $this->waitForPos = $pos;
506 // If a generic reader connection was already established, then wait now
507 $i = $this->readIndex;
508 if ( $i > 0 ) {
509 if ( !$this->doWait( $i ) ) {
510 $this->laggedReplicaMode = true;
511 }
512 }
513 } finally {
514 // Restore the older position if it was higher since this is used for lag-protection
515 $this->setWaitForPositionIfHigher( $oldPos );
516 }
517 }
518
519 public function waitForOne( $pos, $timeout = null ) {
520 $oldPos = $this->waitForPos;
521 try {
522 $this->waitForPos = $pos;
523
524 $i = $this->readIndex;
525 if ( $i <= 0 ) {
526 // Pick a generic replica DB if there isn't one yet
527 $readLoads = $this->loads;
528 unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
529 $readLoads = array_filter( $readLoads ); // with non-zero load
530 $i = ArrayUtils::pickRandom( $readLoads );
531 }
532
533 if ( $i > 0 ) {
534 $ok = $this->doWait( $i, true, $timeout );
535 } else {
536 $ok = true; // no applicable loads
537 }
538 } finally {
539 # Restore the old position, as this is not used for lag-protection but for throttling
540 $this->waitForPos = $oldPos;
541 }
542
543 return $ok;
544 }
545
546 public function waitForAll( $pos, $timeout = null ) {
547 $timeout = $timeout ?: $this->waitTimeout;
548
549 $oldPos = $this->waitForPos;
550 try {
551 $this->waitForPos = $pos;
552 $serverCount = count( $this->servers );
553
554 $ok = true;
555 for ( $i = 1; $i < $serverCount; $i++ ) {
556 if ( $this->loads[$i] > 0 ) {
557 $start = microtime( true );
558 $ok = $this->doWait( $i, true, $timeout ) && $ok;
559 $timeout -= intval( microtime( true ) - $start );
560 if ( $timeout <= 0 ) {
561 break; // timeout reached
562 }
563 }
564 }
565 } finally {
566 # Restore the old position, as this is not used for lag-protection but for throttling
567 $this->waitForPos = $oldPos;
568 }
569
570 return $ok;
571 }
572
576 private function setWaitForPositionIfHigher( $pos ) {
577 if ( !$pos ) {
578 return;
579 }
580
581 if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
582 $this->waitForPos = $pos;
583 }
584 }
585
586 public function getAnyOpenConnection( $i, $flags = 0 ) {
587 $autocommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
588 foreach ( $this->conns as $connsByServer ) {
589 if ( !isset( $connsByServer[$i] ) ) {
590 continue;
591 }
592
593 foreach ( $connsByServer[$i] as $conn ) {
594 if ( !$autocommit || $conn->getLBInfo( 'autoCommitOnly' ) ) {
595 return $conn;
596 }
597 }
598 }
599
600 return false;
601 }
602
610 protected function doWait( $index, $open = false, $timeout = null ) {
611 $timeout = max( 1, intval( $timeout ?: $this->waitTimeout ) );
612
613 // Check if we already know that the DB has reached this point
614 $server = $this->getServerName( $index );
615 $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server, 'v1' );
617 $knownReachedPos = $this->srvCache->get( $key );
618 if (
619 $knownReachedPos instanceof DBMasterPos &&
620 $knownReachedPos->hasReached( $this->waitForPos )
621 ) {
622 $this->replLogger->debug(
623 __METHOD__ .
624 ': replica DB {dbserver} known to be caught up (pos >= $knownReachedPos).',
625 [ 'dbserver' => $server ]
626 );
627 return true;
628 }
629
630 // Find a connection to wait on, creating one if needed and allowed
631 $close = false; // close the connection afterwards
632 $conn = $this->getAnyOpenConnection( $index );
633 if ( !$conn ) {
634 if ( !$open ) {
635 $this->replLogger->debug(
636 __METHOD__ . ': no connection open for {dbserver}',
637 [ 'dbserver' => $server ]
638 );
639
640 return false;
641 } else {
642 $conn = $this->openConnection( $index, self::DOMAIN_ANY );
643 if ( !$conn ) {
644 $this->replLogger->warning(
645 __METHOD__ . ': failed to connect to {dbserver}',
646 [ 'dbserver' => $server ]
647 );
648
649 return false;
650 }
651 // Avoid connection spam in waitForAll() when connections
652 // are made just for the sake of doing this lag check.
653 $close = true;
654 }
655 }
656
657 $this->replLogger->info(
658 __METHOD__ .
659 ': waiting for replica DB {dbserver} to catch up...',
660 [ 'dbserver' => $server ]
661 );
662
663 $result = $conn->masterPosWait( $this->waitForPos, $timeout );
664
665 if ( $result === null ) {
666 $this->replLogger->warning(
667 __METHOD__ . ': Errored out waiting on {host} pos {pos}',
668 [
669 'host' => $server,
670 'pos' => $this->waitForPos,
671 'trace' => ( new RuntimeException() )->getTraceAsString()
672 ]
673 );
674 $ok = false;
675 } elseif ( $result == -1 ) {
676 $this->replLogger->warning(
677 __METHOD__ . ': Timed out waiting on {host} pos {pos}',
678 [
679 'host' => $server,
680 'pos' => $this->waitForPos,
681 'trace' => ( new RuntimeException() )->getTraceAsString()
682 ]
683 );
684 $ok = false;
685 } else {
686 $this->replLogger->debug( __METHOD__ . ": done waiting" );
687 $ok = true;
688 // Remember that the DB reached this point
689 $this->srvCache->set( $key, $this->waitForPos, BagOStuff::TTL_DAY );
690 }
691
692 if ( $close ) {
693 $this->closeConnection( $conn );
694 }
695
696 return $ok;
697 }
698
699 public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
700 if ( $i === null || $i === false ) {
701 throw new InvalidArgumentException( 'Attempt to call ' . __METHOD__ .
702 ' with invalid server index' );
703 }
704
705 if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
706 $domain = false; // local connection requested
707 }
708
709 if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) {
710 // Assuming all servers are of the same type (or similar), which is overwhelmingly
711 // the case, use the master server information to get the attributes. The information
712 // for $i cannot be used since it might be DB_REPLICA, which might require connection
713 // attempts in order to be resolved into a real server index.
714 $attributes = $this->getServerAttributes( $this->getWriterIndex() );
715 if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
716 // Callers sometimes want to (a) escape REPEATABLE-READ stateness without locking
717 // rows (e.g. FOR UPDATE) or (b) make small commits during a larger transactions
718 // to reduce lock contention. None of these apply for sqlite and using separate
719 // connections just causes self-deadlocks.
720 $flags &= ~self::CONN_TRX_AUTOCOMMIT;
721 $this->connLogger->info( __METHOD__ .
722 ': ignoring CONN_TRX_AUTOCOMMIT to avoid deadlocks.' );
723 }
724 }
725
726 // Check one "group" per default: the generic pool
727 $defaultGroups = $this->defaultGroup ? [ $this->defaultGroup ] : [ false ];
728
729 $groups = ( $groups === false || $groups === [] )
730 ? $defaultGroups
731 : (array)$groups;
732
733 $masterOnly = ( $i == self::DB_MASTER || $i == $this->getWriterIndex() );
734 $oldConnsOpened = $this->connsOpened; // connections open now
735
736 if ( $i == self::DB_MASTER ) {
737 $i = $this->getWriterIndex();
738 } elseif ( $i == self::DB_REPLICA ) {
739 # Try to find an available server in any the query groups (in order)
740 foreach ( $groups as $group ) {
741 $groupIndex = $this->getReaderIndex( $group, $domain );
742 if ( $groupIndex !== false ) {
743 $i = $groupIndex;
744 break;
745 }
746 }
747 }
748
749 # Operation-based index
750 if ( $i == self::DB_REPLICA ) {
751 $this->lastError = 'Unknown error'; // reset error string
752 # Try the general server pool if $groups are unavailable.
753 $i = ( $groups === [ false ] )
754 ? false // don't bother with this if that is what was tried above
755 : $this->getReaderIndex( false, $domain );
756 # Couldn't find a working server in getReaderIndex()?
757 if ( $i === false ) {
758 $this->lastError = 'No working replica DB server: ' . $this->lastError;
759 // Throw an exception
760 $this->reportConnectionError();
761 return null; // not reached
762 }
763 }
764
765 # Now we have an explicit index into the servers array
766 $conn = $this->openConnection( $i, $domain, $flags );
767 if ( !$conn ) {
768 // Throw an exception
769 $this->reportConnectionError();
770 return null; // not reached
771 }
772
773 # Profile any new connections that happen
774 if ( $this->connsOpened > $oldConnsOpened ) {
775 $host = $conn->getServer();
776 $dbname = $conn->getDBname();
777 $this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
778 }
779
780 if ( $masterOnly ) {
781 # Make master-requested DB handles inherit any read-only mode setting
782 $conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $domain, $conn ) );
783 }
784
785 return $conn;
786 }
787
788 public function reuseConnection( IDatabase $conn ) {
789 $serverIndex = $conn->getLBInfo( 'serverIndex' );
790 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
791 if ( $serverIndex === null || $refCount === null ) {
802 return;
803 } elseif ( $conn instanceof DBConnRef ) {
804 // DBConnRef already handles calling reuseConnection() and only passes the live
805 // Database instance to this method. Any caller passing in a DBConnRef is broken.
806 $this->connLogger->error(
807 __METHOD__ . ": got DBConnRef instance.\n" .
808 ( new RuntimeException() )->getTraceAsString() );
809
810 return;
811 }
812
813 if ( $this->disabled ) {
814 return; // DBConnRef handle probably survived longer than the LoadBalancer
815 }
816
817 if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
818 $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
819 $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
820 } else {
821 $connFreeKey = self::KEY_FOREIGN_FREE;
822 $connInUseKey = self::KEY_FOREIGN_INUSE;
823 }
824
825 $domain = $conn->getDomainID();
826 if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
827 throw new InvalidArgumentException( __METHOD__ .
828 ": connection $serverIndex/$domain not found; it may have already been freed." );
829 } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
830 throw new InvalidArgumentException( __METHOD__ .
831 ": connection $serverIndex/$domain mismatched; it may have already been freed." );
832 }
833
834 $conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
835 if ( $refCount <= 0 ) {
836 $this->conns[$connFreeKey][$serverIndex][$domain] = $conn;
837 unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
838 if ( !$this->conns[$connInUseKey][$serverIndex] ) {
839 unset( $this->conns[$connInUseKey][$serverIndex] ); // clean up
840 }
841 $this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
842 } else {
843 $this->connLogger->debug( __METHOD__ .
844 ": reference count for $serverIndex/$domain reduced to $refCount" );
845 }
846 }
847
848 public function getConnectionRef( $db, $groups = [], $domain = false, $flags = 0 ) {
849 $domain = $this->resolveDomainID( $domain );
850
851 return new DBConnRef( $this, $this->getConnection( $db, $groups, $domain, $flags ) );
852 }
853
854 public function getLazyConnectionRef( $db, $groups = [], $domain = false, $flags = 0 ) {
855 $domain = $this->resolveDomainID( $domain );
856
857 return new DBConnRef( $this, [ $db, $groups, $domain, $flags ] );
858 }
859
860 public function getMaintenanceConnectionRef( $db, $groups = [], $domain = false, $flags = 0 ) {
861 $domain = $this->resolveDomainID( $domain );
862
863 return new MaintainableDBConnRef(
864 $this, $this->getConnection( $db, $groups, $domain, $flags ) );
865 }
866
867 public function openConnection( $i, $domain = false, $flags = 0 ) {
868 if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
869 $domain = false; // local connection requested
870 }
871
872 if ( !$this->connectionAttempted && $this->chronologyCallback ) {
873 $this->connLogger->debug( __METHOD__ . ': calling initLB() before first connection.' );
874 // Load any "waitFor" positions before connecting so that doWait() is triggered
875 $this->connectionAttempted = true;
876 ( $this->chronologyCallback )( $this );
877 }
878
879 // Check if an auto-commit connection is being requested. If so, it will not reuse the
880 // main set of DB connections but rather its own pool since:
881 // a) those are usually set to implicitly use transaction rounds via DBO_TRX
882 // b) those must support the use of explicit transaction rounds via beginMasterChanges()
883 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
884
885 if ( $domain !== false ) {
886 // Connection is to a foreign domain
887 $conn = $this->openForeignConnection( $i, $domain, $flags );
888 } else {
889 // Connection is to the local domain
890 $conn = $this->openLocalConnection( $i, $flags );
891 }
892
893 if ( $conn instanceof IDatabase && !$conn->isOpen() ) {
894 // Connection was made but later unrecoverably lost for some reason.
895 // Do not return a handle that will just throw exceptions on use,
896 // but let the calling code (e.g. getReaderIndex) try another server.
897 // See DatabaseMyslBase::ping() for how this can happen.
898 $this->errorConnection = $conn;
899 $conn = false;
900 }
901
902 if ( $autoCommit && $conn instanceof IDatabase ) {
903 if ( $conn->trxLevel() ) { // sanity
904 throw new DBUnexpectedError(
905 $conn,
906 __METHOD__ . ': CONN_TRX_AUTOCOMMIT handle has a transaction.'
907 );
908 }
909
910 $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
911 }
912
913 return $conn;
914 }
915
928 private function openLocalConnection( $i, $flags = 0 ) {
929 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
930
931 $connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
932 if ( isset( $this->conns[$connKey][$i][0] ) ) {
933 $conn = $this->conns[$connKey][$i][0];
934 } else {
935 if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
936 throw new InvalidArgumentException( "No server with index '$i'." );
937 }
938 // Open a new connection
939 $server = $this->servers[$i];
940 $server['serverIndex'] = $i;
941 $server['autoCommitOnly'] = $autoCommit;
942 $conn = $this->reallyOpenConnection( $server, $this->localDomain );
943 $host = $this->getServerName( $i );
944 if ( $conn->isOpen() ) {
945 $this->connLogger->debug(
946 __METHOD__ . ": connected to database $i at '$host'." );
947 $this->conns[$connKey][$i][0] = $conn;
948 } else {
949 $this->connLogger->warning(
950 __METHOD__ . ": failed to connect to database $i at '$host'." );
951 $this->errorConnection = $conn;
952 $conn = false;
953 }
954 }
955
956 // Final sanity check to make sure the right domain is selected
957 if (
958 $conn instanceof IDatabase &&
959 !$this->localDomain->isCompatible( $conn->getDomainID() )
960 ) {
961 throw new UnexpectedValueException(
962 "Got connection to '{$conn->getDomainID()}', " .
963 "but expected local domain ('{$this->localDomain}')." );
964 }
965
966 return $conn;
967 }
968
991 private function openForeignConnection( $i, $domain, $flags = 0 ) {
992 $domainInstance = DatabaseDomain::newFromId( $domain );
993 $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
994
995 if ( $autoCommit ) {
996 $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
997 $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
998 } else {
999 $connFreeKey = self::KEY_FOREIGN_FREE;
1000 $connInUseKey = self::KEY_FOREIGN_INUSE;
1001 }
1002
1004 if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
1005 // Reuse an in-use connection for the same domain
1006 $conn = $this->conns[$connInUseKey][$i][$domain];
1007 $this->connLogger->debug( __METHOD__ . ": reusing connection $i/$domain" );
1008 } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
1009 // Reuse a free connection for the same domain
1010 $conn = $this->conns[$connFreeKey][$i][$domain];
1011 unset( $this->conns[$connFreeKey][$i][$domain] );
1012 $this->conns[$connInUseKey][$i][$domain] = $conn;
1013 $this->connLogger->debug( __METHOD__ . ": reusing free connection $i/$domain" );
1014 } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
1015 // Reuse a free connection from another domain
1016 $conn = reset( $this->conns[$connFreeKey][$i] );
1017 $oldDomain = key( $this->conns[$connFreeKey][$i] );
1018 if ( $domainInstance->getDatabase() !== null ) {
1019 $conn->selectDomain( $domainInstance );
1020 } else {
1021 // Stay on the current database, but update the schema/prefix
1022 $conn->dbSchema( $domainInstance->getSchema() );
1023 $conn->tablePrefix( $domainInstance->getTablePrefix() );
1024 }
1025 unset( $this->conns[$connFreeKey][$i][$oldDomain] );
1026 // Note that if $domain is an empty string, getDomainID() might not match it
1027 $this->conns[$connInUseKey][$i][$conn->getDomainId()] = $conn;
1028 $this->connLogger->debug( __METHOD__ .
1029 ": reusing free connection from $oldDomain for $domain" );
1030 } else {
1031 if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
1032 throw new InvalidArgumentException( "No server with index '$i'." );
1033 }
1034 // Open a new connection
1035 $server = $this->servers[$i];
1036 $server['serverIndex'] = $i;
1037 $server['foreignPoolRefCount'] = 0;
1038 $server['foreign'] = true;
1039 $server['autoCommitOnly'] = $autoCommit;
1040 $conn = $this->reallyOpenConnection( $server, $domainInstance );
1041 if ( !$conn->isOpen() ) {
1042 $this->connLogger->warning( __METHOD__ . ": connection error for $i/$domain" );
1043 $this->errorConnection = $conn;
1044 $conn = false;
1045 } else {
1046 // Note that if $domain is an empty string, getDomainID() might not match it
1047 $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
1048 $this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
1049 }
1050 }
1051
1052 if ( $conn instanceof IDatabase ) {
1053 // Final sanity check to make sure the right domain is selected
1054 if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
1055 throw new UnexpectedValueException(
1056 "Got connection to '{$conn->getDomainID()}', but expected '$domain'." );
1057 }
1058 // Increment reference count
1059 $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
1060 $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
1061 }
1062
1063 return $conn;
1064 }
1065
1066 public function getServerAttributes( $i ) {
1068 $this->getServerType( $i ),
1069 $this->servers[$i]['driver'] ?? null
1070 );
1071 }
1072
1080 private function isOpen( $index ) {
1081 if ( !is_int( $index ) ) {
1082 return false;
1083 }
1084
1085 return (bool)$this->getAnyOpenConnection( $index );
1086 }
1087
1099 protected function reallyOpenConnection( array $server, DatabaseDomain $domain ) {
1100 if ( $this->disabled ) {
1101 throw new DBAccessError();
1102 }
1103
1104 if ( $domain->getDatabase() === null ) {
1105 // The database domain does not specify a DB name and some database systems require a
1106 // valid DB specified on connection. The $server configuration array contains a default
1107 // DB name to use for connections in such cases.
1108 if ( $server['type'] === 'mysql' ) {
1109 // For MySQL, DATABASE and SCHEMA are synonyms, connections need not specify a DB,
1110 // and the DB name in $server might not exist due to legacy reasons (the default
1111 // domain used to ignore the local LB domain, even when mismatched).
1112 $server['dbname'] = null;
1113 }
1114 } else {
1115 $server['dbname'] = $domain->getDatabase();
1116 }
1117
1118 if ( $domain->getSchema() !== null ) {
1119 $server['schema'] = $domain->getSchema();
1120 }
1121
1122 // It is always possible to connect with any prefix, even the empty string
1123 $server['tablePrefix'] = $domain->getTablePrefix();
1124
1125 // Let the handle know what the cluster master is (e.g. "db1052")
1126 $masterName = $this->getServerName( $this->getWriterIndex() );
1127 $server['clusterMasterHost'] = $masterName;
1128
1129 // Log when many connection are made on requests
1130 if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
1131 $this->perfLogger->warning( __METHOD__ . ": " .
1132 "{$this->connsOpened}+ connections made (master=$masterName)" );
1133 }
1134
1135 $server['srvCache'] = $this->srvCache;
1136 // Set loggers and profilers
1137 $server['connLogger'] = $this->connLogger;
1138 $server['queryLogger'] = $this->queryLogger;
1139 $server['errorLogger'] = $this->errorLogger;
1140 $server['deprecationLogger'] = $this->deprecationLogger;
1141 $server['profiler'] = $this->profiler;
1142 $server['trxProfiler'] = $this->trxProfiler;
1143 // Use the same agent and PHP mode for all DB handles
1144 $server['cliMode'] = $this->cliMode;
1145 $server['agent'] = $this->agent;
1146 // Use DBO_DEFAULT flags by default for LoadBalancer managed databases. Assume that the
1147 // application calls LoadBalancer::commitMasterChanges() before the PHP script completes.
1148 $server['flags'] = $server['flags'] ?? IDatabase::DBO_DEFAULT;
1149
1150 // Create a live connection object
1151 try {
1152 $db = Database::factory( $server['type'], $server );
1153 } catch ( DBConnectionError $e ) {
1154 // FIXME: This is probably the ugliest thing I have ever done to
1155 // PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
1156 $db = $e->db;
1157 }
1158
1159 $db->setLBInfo( $server );
1160 $db->setLazyMasterHandle(
1161 $this->getLazyConnectionRef( self::DB_MASTER, [], $db->getDomainID() )
1162 );
1163 $db->setTableAliases( $this->tableAliases );
1164 $db->setIndexAliases( $this->indexAliases );
1165
1166 if ( $server['serverIndex'] === $this->getWriterIndex() ) {
1167 if ( $this->trxRoundId !== false ) {
1168 $this->applyTransactionRoundFlags( $db );
1169 }
1170 foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
1171 $db->setTransactionListener( $name, $callback );
1172 }
1173 }
1174
1175 return $db;
1176 }
1177
1181 private function reportConnectionError() {
1182 $conn = $this->errorConnection; // the connection which caused the error
1183 $context = [
1184 'method' => __METHOD__,
1185 'last_error' => $this->lastError,
1186 ];
1187
1188 if ( $conn instanceof IDatabase ) {
1189 $context['db_server'] = $conn->getServer();
1190 $this->connLogger->warning(
1191 __METHOD__ . ": connection error: {last_error} ({db_server})",
1192 $context
1193 );
1194
1195 throw new DBConnectionError( $conn, "{$this->lastError} ({$context['db_server']})" );
1196 } else {
1197 // No last connection, probably due to all servers being too busy
1198 $this->connLogger->error(
1199 __METHOD__ .
1200 ": LB failure with no last connection. Connection error: {last_error}",
1201 $context
1202 );
1203
1204 // If all servers were busy, "lastError" will contain something sensible
1205 throw new DBConnectionError( null, $this->lastError );
1206 }
1207 }
1208
1209 public function getWriterIndex() {
1210 return 0;
1211 }
1212
1213 public function haveIndex( $i ) {
1214 return array_key_exists( $i, $this->servers );
1215 }
1216
1217 public function isNonZeroLoad( $i ) {
1218 return array_key_exists( $i, $this->servers ) && $this->loads[$i] != 0;
1219 }
1220
1221 public function getServerCount() {
1222 return count( $this->servers );
1223 }
1224
1225 public function getServerName( $i ) {
1226 if ( isset( $this->servers[$i]['hostName'] ) ) {
1227 $name = $this->servers[$i]['hostName'];
1228 } elseif ( isset( $this->servers[$i]['host'] ) ) {
1229 $name = $this->servers[$i]['host'];
1230 } else {
1231 $name = '';
1232 }
1233
1234 return ( $name != '' ) ? $name : 'localhost';
1235 }
1236
1237 public function getServerInfo( $i ) {
1238 if ( isset( $this->servers[$i] ) ) {
1239 return $this->servers[$i];
1240 } else {
1241 return false;
1242 }
1243 }
1244
1245 public function getServerType( $i ) {
1246 return $this->servers[$i]['type'] ?? 'unknown';
1247 }
1248
1249 public function getMasterPos() {
1250 # If this entire request was served from a replica DB without opening a connection to the
1251 # master (however unlikely that may be), then we can fetch the position from the replica DB.
1252 $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1253 if ( !$masterConn ) {
1254 $serverCount = count( $this->servers );
1255 for ( $i = 1; $i < $serverCount; $i++ ) {
1256 $conn = $this->getAnyOpenConnection( $i );
1257 if ( $conn ) {
1258 return $conn->getReplicaPos();
1259 }
1260 }
1261 } else {
1262 return $masterConn->getMasterPos();
1263 }
1264
1265 return false;
1266 }
1267
1268 public function disable() {
1269 $this->closeAll();
1270 $this->disabled = true;
1271 }
1272
1273 public function closeAll() {
1274 $fname = __METHOD__;
1275 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( $fname ) {
1276 $host = $conn->getServer();
1277 $this->connLogger->debug(
1278 $fname . ": closing connection to database '$host'." );
1279 $conn->close();
1280 } );
1281
1282 $this->conns = [
1283 self::KEY_LOCAL => [],
1284 self::KEY_FOREIGN_INUSE => [],
1285 self::KEY_FOREIGN_FREE => [],
1286 self::KEY_LOCAL_NOROUND => [],
1287 self::KEY_FOREIGN_INUSE_NOROUND => [],
1288 self::KEY_FOREIGN_FREE_NOROUND => []
1289 ];
1290 $this->connsOpened = 0;
1291 }
1292
1293 public function closeConnection( IDatabase $conn ) {
1294 $serverIndex = $conn->getLBInfo( 'serverIndex' );
1295 foreach ( $this->conns as $type => $connsByServer ) {
1296 if ( !isset( $connsByServer[$serverIndex] ) ) {
1297 continue;
1298 }
1299
1300 foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
1301 if ( $conn === $trackedConn ) {
1302 $host = $this->getServerName( $i );
1303 $this->connLogger->debug(
1304 __METHOD__ . ": closing connection to database $i at '$host'." );
1305 unset( $this->conns[$type][$serverIndex][$i] );
1307 break 2;
1308 }
1309 }
1310 }
1311
1312 $conn->close();
1313 }
1314
1315 public function commitAll( $fname = __METHOD__ ) {
1316 $this->commitMasterChanges( $fname );
1317 $this->flushMasterSnapshots( $fname );
1318 $this->flushReplicaSnapshots( $fname );
1319 }
1320
1321 public function finalizeMasterChanges() {
1322 $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
1323
1324 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1325 // Loop until callbacks stop adding callbacks on other connections
1326 $total = 0;
1327 do {
1328 $count = 0; // callbacks execution attempts
1329 $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$count ) {
1330 // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
1331 // Any error should cause all (peer) transactions to be rolled back together.
1332 $count += $conn->runOnTransactionPreCommitCallbacks();
1333 } );
1334 $total += $count;
1335 } while ( $count > 0 );
1336 // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
1337 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1338 $conn->setTrxEndCallbackSuppression( true );
1339 } );
1340 $this->trxRoundStage = self::ROUND_FINALIZED;
1341
1342 return $total;
1343 }
1344
1346 $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
1347
1348 $limit = $options['maxWriteDuration'] ?? 0;
1349
1350 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1351 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
1352 // If atomic sections or explicit transactions are still open, some caller must have
1353 // caught an exception but failed to properly rollback any changes. Detect that and
1354 // throw and error (causing rollback).
1355 $conn->assertNoOpenTransactions();
1356 // Assert that the time to replicate the transaction will be sane.
1357 // If this fails, then all DB transactions will be rollback back together.
1358 $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1359 if ( $limit > 0 && $time > $limit ) {
1360 throw new DBTransactionSizeError(
1361 $conn,
1362 "Transaction spent $time second(s) in writes, exceeding the limit of $limit.",
1363 [ $time, $limit ]
1364 );
1365 }
1366 // If a connection sits idle while slow queries execute on another, that connection
1367 // may end up dropped before the commit round is reached. Ping servers to detect this.
1368 if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1369 throw new DBTransactionError(
1370 $conn,
1371 "A connection to the {$conn->getDBname()} database was lost before commit."
1372 );
1373 }
1374 } );
1375 $this->trxRoundStage = self::ROUND_APPROVED;
1376 }
1377
1378 public function beginMasterChanges( $fname = __METHOD__ ) {
1379 if ( $this->trxRoundId !== false ) {
1380 throw new DBTransactionError(
1381 null,
1382 "$fname: Transaction round '{$this->trxRoundId}' already started."
1383 );
1384 }
1385 $this->assertTransactionRoundStage( self::ROUND_CURSORY );
1386
1387 // Clear any empty transactions (no writes/callbacks) from the implicit round
1388 $this->flushMasterSnapshots( $fname );
1389
1390 $this->trxRoundId = $fname;
1391 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1392 // Mark applicable handles as participating in this explicit transaction round.
1393 // For each of these handles, any writes and callbacks will be tied to a single
1394 // transaction. The (peer) handles will reject begin()/commit() calls unless they
1395 // are part of an en masse commit or an en masse rollback.
1396 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1397 $this->applyTransactionRoundFlags( $conn );
1398 } );
1399 $this->trxRoundStage = self::ROUND_CURSORY;
1400 }
1401
1402 public function commitMasterChanges( $fname = __METHOD__ ) {
1403 $this->assertTransactionRoundStage( self::ROUND_APPROVED );
1404
1405 $failures = [];
1406
1408 $scope = $this->getScopedPHPBehaviorForCommit(); // try to ignore client aborts
1409
1410 $restore = ( $this->trxRoundId !== false );
1411 $this->trxRoundId = false;
1412 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1413 // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
1414 // Note that callbacks should already be suppressed due to finalizeMasterChanges().
1416 function ( IDatabase $conn ) use ( $fname, &$failures ) {
1417 try {
1418 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1419 } catch ( DBError $e ) {
1420 ( $this->errorLogger )( $e );
1421 $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1422 }
1423 }
1424 );
1425 if ( $failures ) {
1426 throw new DBTransactionError(
1427 null,
1428 "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1429 );
1430 }
1431 if ( $restore ) {
1432 // Unmark handles as participating in this explicit transaction round
1433 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1434 $this->undoTransactionRoundFlags( $conn );
1435 } );
1436 }
1437 $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
1438 }
1439
1441 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1442 $type = IDatabase::TRIGGER_COMMIT;
1443 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1444 $type = IDatabase::TRIGGER_ROLLBACK;
1445 } else {
1446 throw new DBTransactionError(
1447 null,
1448 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1449 );
1450 }
1451
1452 $oldStage = $this->trxRoundStage;
1453 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1454
1455 // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
1456 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1457 $conn->setTrxEndCallbackSuppression( false );
1458 } );
1459
1460 $e = null; // first exception
1461 $fname = __METHOD__;
1462 // Loop until callbacks stop adding callbacks on other connections
1463 do {
1464 // Run any pending callbacks for each connection...
1465 $count = 0; // callback execution attempts
1467 function ( Database $conn ) use ( $type, &$e, &$count ) {
1468 if ( $conn->trxLevel() ) {
1469 return; // retry in the next iteration, after commit() is called
1470 }
1471 try {
1472 $count += $conn->runOnTransactionIdleCallbacks( $type );
1473 } catch ( Exception $ex ) {
1474 $e = $e ?: $ex;
1475 }
1476 }
1477 );
1478 // Clear out any active transactions left over from callbacks...
1479 $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$e, $fname ) {
1480 if ( $conn->writesPending() ) {
1481 // A callback from another handle wrote to this one and DBO_TRX is set
1482 $this->queryLogger->warning( $fname . ": found writes pending." );
1483 $fnames = implode( ', ', $conn->pendingWriteAndCallbackCallers() );
1484 $this->queryLogger->warning(
1485 $fname . ": found writes pending ($fnames).",
1486 [
1487 'db_server' => $conn->getServer(),
1488 'db_name' => $conn->getDBname()
1489 ]
1490 );
1491 } elseif ( $conn->trxLevel() ) {
1492 // A callback from another handle read from this one and DBO_TRX is set,
1493 // which can easily happen if there is only one DB (no replicas)
1494 $this->queryLogger->debug( $fname . ": found empty transaction." );
1495 }
1496 try {
1497 $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1498 } catch ( Exception $ex ) {
1499 $e = $e ?: $ex;
1500 }
1501 } );
1502 } while ( $count > 0 );
1503
1504 $this->trxRoundStage = $oldStage;
1505
1506 return $e;
1507 }
1508
1510 if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
1511 $type = IDatabase::TRIGGER_COMMIT;
1512 } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
1513 $type = IDatabase::TRIGGER_ROLLBACK;
1514 } else {
1515 throw new DBTransactionError(
1516 null,
1517 "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
1518 );
1519 }
1520
1521 $e = null;
1522
1523 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1524 $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
1525 try {
1527 } catch ( Exception $ex ) {
1528 $e = $e ?: $ex;
1529 }
1530 } );
1531 $this->trxRoundStage = self::ROUND_CURSORY;
1532
1533 return $e;
1534 }
1535
1536 public function rollbackMasterChanges( $fname = __METHOD__ ) {
1537 $restore = ( $this->trxRoundId !== false );
1538 $this->trxRoundId = false;
1539 $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
1540 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
1541 $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1542 } );
1543 if ( $restore ) {
1544 // Unmark handles as participating in this explicit transaction round
1545 $this->forEachOpenMasterConnection( function ( Database $conn ) {
1546 $this->undoTransactionRoundFlags( $conn );
1547 } );
1548 }
1549 $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
1550 }
1551
1555 private function assertTransactionRoundStage( $stage ) {
1556 $stages = (array)$stage;
1557
1558 if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
1559 $stageList = implode(
1560 '/',
1561 array_map( function ( $v ) {
1562 return "'$v'";
1563 }, $stages )
1564 );
1565 throw new DBTransactionError(
1566 null,
1567 "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
1568 );
1569 }
1570 }
1571
1581 private function applyTransactionRoundFlags( Database $conn ) {
1582 if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
1583 return; // transaction rounds do not apply to these connections
1584 }
1585
1586 if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1587 // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1588 // Force DBO_TRX even in CLI mode since a commit round is expected soon.
1589 $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
1590 }
1591
1592 if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1593 $conn->setLBInfo( 'trxRoundId', $this->trxRoundId );
1594 }
1595 }
1596
1600 private function undoTransactionRoundFlags( Database $conn ) {
1601 if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
1602 return; // transaction rounds do not apply to these connections
1603 }
1604
1605 if ( $conn->getFlag( $conn::DBO_TRX ) ) {
1606 $conn->setLBInfo( 'trxRoundId', false );
1607 }
1608
1609 if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
1610 $conn->restoreFlags( $conn::RESTORE_PRIOR );
1611 }
1612 }
1613
1614 public function flushReplicaSnapshots( $fname = __METHOD__ ) {
1615 $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) use ( $fname ) {
1616 $conn->flushSnapshot( $fname );
1617 } );
1618 }
1619
1620 public function flushMasterSnapshots( $fname = __METHOD__ ) {
1621 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
1622 $conn->flushSnapshot( $fname );
1623 } );
1624 }
1625
1630 public function getTransactionRoundStage() {
1631 return $this->trxRoundStage;
1632 }
1633
1634 public function hasMasterConnection() {
1635 return $this->isOpen( $this->getWriterIndex() );
1636 }
1637
1638 public function hasMasterChanges() {
1639 $pending = 0;
1640 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$pending ) {
1641 $pending |= $conn->writesOrCallbacksPending();
1642 } );
1643
1644 return (bool)$pending;
1645 }
1646
1647 public function lastMasterChangeTimestamp() {
1648 $lastTime = false;
1649 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$lastTime ) {
1650 $lastTime = max( $lastTime, $conn->lastDoneWrites() );
1651 } );
1652
1653 return $lastTime;
1654 }
1655
1656 public function hasOrMadeRecentMasterChanges( $age = null ) {
1657 $age = ( $age === null ) ? $this->waitTimeout : $age;
1658
1659 return ( $this->hasMasterChanges()
1660 || $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
1661 }
1662
1663 public function pendingMasterChangeCallers() {
1664 $fnames = [];
1665 $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$fnames ) {
1666 $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1667 } );
1668
1669 return $fnames;
1670 }
1671
1672 public function getLaggedReplicaMode( $domain = false ) {
1673 // No-op if there is only one DB (also avoids recursion)
1674 if ( !$this->laggedReplicaMode && $this->getServerCount() > 1 ) {
1675 try {
1676 // See if laggedReplicaMode gets set
1677 $conn = $this->getConnection( self::DB_REPLICA, false, $domain );
1678 $this->reuseConnection( $conn );
1679 } catch ( DBConnectionError $e ) {
1680 // Avoid expensive re-connect attempts and failures
1681 $this->allReplicasDownMode = true;
1682 $this->laggedReplicaMode = true;
1683 }
1684 }
1685
1687 }
1688
1689 public function laggedReplicaUsed() {
1691 }
1692
1698 public function laggedSlaveUsed() {
1699 return $this->laggedReplicaUsed();
1700 }
1701
1702 public function getReadOnlyReason( $domain = false, IDatabase $conn = null ) {
1703 if ( $this->readOnlyReason !== false ) {
1704 return $this->readOnlyReason;
1705 } elseif ( $this->getLaggedReplicaMode( $domain ) ) {
1706 if ( $this->allReplicasDownMode ) {
1707 return 'The database has been automatically locked ' .
1708 'until the replica database servers become available';
1709 } else {
1710 return 'The database has been automatically locked ' .
1711 'while the replica database servers catch up to the master.';
1712 }
1713 } elseif ( $this->masterRunningReadOnly( $domain, $conn ) ) {
1714 return 'The database master is running in read-only mode.';
1715 }
1716
1717 return false;
1718 }
1719
1725 private function masterRunningReadOnly( $domain, IDatabase $conn = null ) {
1727 $masterServer = $this->getServerName( $this->getWriterIndex() );
1728
1729 return (bool)$cache->getWithSetCallback(
1730 $cache->makeGlobalKey( __CLASS__, 'server-read-only', $masterServer ),
1731 self::TTL_CACHE_READONLY,
1732 function () use ( $domain, $conn ) {
1733 $old = $this->trxProfiler->setSilenced( true );
1734 try {
1735 $dbw = $conn ?: $this->getConnection( self::DB_MASTER, [], $domain );
1736 $readOnly = (int)$dbw->serverIsReadOnly();
1737 if ( !$conn ) {
1738 $this->reuseConnection( $dbw );
1739 }
1740 } catch ( DBError $e ) {
1741 $readOnly = 0;
1742 }
1743 $this->trxProfiler->setSilenced( $old );
1744 return $readOnly;
1745 },
1746 [ 'pcTTL' => $cache::TTL_PROC_LONG, 'busyValue' => 0 ]
1747 );
1748 }
1749
1750 public function allowLagged( $mode = null ) {
1751 if ( $mode === null ) {
1752 return $this->allowLagged;
1753 }
1754 $this->allowLagged = $mode;
1755
1756 return $this->allowLagged;
1757 }
1758
1759 public function pingAll() {
1760 $success = true;
1761 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$success ) {
1762 if ( !$conn->ping() ) {
1763 $success = false;
1764 }
1765 } );
1766
1767 return $success;
1768 }
1769
1770 public function forEachOpenConnection( $callback, array $params = [] ) {
1771 foreach ( $this->conns as $connsByServer ) {
1772 foreach ( $connsByServer as $serverConns ) {
1773 foreach ( $serverConns as $conn ) {
1774 $callback( $conn, ...$params );
1775 }
1776 }
1777 }
1778 }
1779
1780 public function forEachOpenMasterConnection( $callback, array $params = [] ) {
1781 $masterIndex = $this->getWriterIndex();
1782 foreach ( $this->conns as $connsByServer ) {
1783 if ( isset( $connsByServer[$masterIndex] ) ) {
1785 foreach ( $connsByServer[$masterIndex] as $conn ) {
1786 $callback( $conn, ...$params );
1787 }
1788 }
1789 }
1790 }
1791
1792 public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
1793 foreach ( $this->conns as $connsByServer ) {
1794 foreach ( $connsByServer as $i => $serverConns ) {
1795 if ( $i === $this->getWriterIndex() ) {
1796 continue; // skip master
1797 }
1798 foreach ( $serverConns as $conn ) {
1799 $callback( $conn, ...$params );
1800 }
1801 }
1802 }
1803 }
1804
1805 public function getMaxLag( $domain = false ) {
1806 $maxLag = -1;
1807 $host = '';
1808 $maxIndex = 0;
1809
1810 if ( $this->getServerCount() <= 1 ) {
1811 return [ $host, $maxLag, $maxIndex ]; // no replication = no lag
1812 }
1813
1814 $lagTimes = $this->getLagTimes( $domain );
1815 foreach ( $lagTimes as $i => $lag ) {
1816 if ( $this->loads[$i] > 0 && $lag > $maxLag ) {
1817 $maxLag = $lag;
1818 $host = $this->servers[$i]['host'];
1819 $maxIndex = $i;
1820 }
1821 }
1822
1823 return [ $host, $maxLag, $maxIndex ];
1824 }
1825
1826 public function getLagTimes( $domain = false ) {
1827 if ( $this->getServerCount() <= 1 ) {
1828 return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
1829 }
1830
1831 $knownLagTimes = []; // map of (server index => 0 seconds)
1832 $indexesWithLag = [];
1833 foreach ( $this->servers as $i => $server ) {
1834 if ( empty( $server['is static'] ) ) {
1835 $indexesWithLag[] = $i; // DB server might have replication lag
1836 } else {
1837 $knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
1838 }
1839 }
1840
1841 return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
1842 }
1843
1844 public function safeGetLag( IDatabase $conn ) {
1845 if ( $this->getServerCount() <= 1 ) {
1846 return 0;
1847 } else {
1848 return $conn->getLag();
1849 }
1850 }
1851
1858 public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
1859 $timeout = max( 1, $timeout ?: $this->waitTimeout );
1860
1861 if ( $this->getServerCount() <= 1 || !$conn->getLBInfo( 'replica' ) ) {
1862 return true; // server is not a replica DB
1863 }
1864
1865 if ( !$pos ) {
1866 // Get the current master position, opening a connection if needed
1867 $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
1868 if ( $masterConn ) {
1869 $pos = $masterConn->getMasterPos();
1870 } else {
1871 $masterConn = $this->openConnection( $this->getWriterIndex(), self::DOMAIN_ANY );
1872 $pos = $masterConn->getMasterPos();
1873 $this->closeConnection( $masterConn );
1874 }
1875 }
1876
1877 if ( $pos instanceof DBMasterPos ) {
1878 $result = $conn->masterPosWait( $pos, $timeout );
1879 if ( $result == -1 || is_null( $result ) ) {
1880 $msg = __METHOD__ . ': timed out waiting on {host} pos {pos}';
1881 $this->replLogger->warning( $msg, [
1882 'host' => $conn->getServer(),
1883 'pos' => $pos,
1884 'trace' => ( new RuntimeException() )->getTraceAsString()
1885 ] );
1886 $ok = false;
1887 } else {
1888 $this->replLogger->debug( __METHOD__ . ': done waiting' );
1889 $ok = true;
1890 }
1891 } else {
1892 $ok = false; // something is misconfigured
1893 $this->replLogger->error(
1894 __METHOD__ . ': could not get master pos for {host}',
1895 [
1896 'host' => $conn->getServer(),
1897 'trace' => ( new RuntimeException() )->getTraceAsString()
1898 ]
1899 );
1900 }
1901
1902 return $ok;
1903 }
1904
1905 public function setTransactionListener( $name, callable $callback = null ) {
1906 if ( $callback ) {
1907 $this->trxRecurringCallbacks[$name] = $callback;
1908 } else {
1909 unset( $this->trxRecurringCallbacks[$name] );
1910 }
1912 function ( IDatabase $conn ) use ( $name, $callback ) {
1913 $conn->setTransactionListener( $name, $callback );
1914 }
1915 );
1916 }
1917
1918 public function setTableAliases( array $aliases ) {
1919 $this->tableAliases = $aliases;
1920 }
1921
1922 public function setIndexAliases( array $aliases ) {
1923 $this->indexAliases = $aliases;
1924 }
1925
1926 public function setDomainPrefix( $prefix ) {
1927 // Find connections to explicit foreign domains still marked as in-use...
1928 $domainsInUse = [];
1929 $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$domainsInUse ) {
1930 // Once reuseConnection() is called on a handle, its reference count goes from 1 to 0.
1931 // Until then, it is still in use by the caller (explicitly or via DBConnRef scope).
1932 if ( $conn->getLBInfo( 'foreignPoolRefCount' ) > 0 ) {
1933 $domainsInUse[] = $conn->getDomainID();
1934 }
1935 } );
1936
1937 // Do not switch connections to explicit foreign domains unless marked as safe
1938 if ( $domainsInUse ) {
1939 $domains = implode( ', ', $domainsInUse );
1940 throw new DBUnexpectedError( null,
1941 "Foreign domain connections are still in use ($domains)." );
1942 }
1943
1944 $this->setLocalDomain( new DatabaseDomain(
1945 $this->localDomain->getDatabase(),
1946 $this->localDomain->getSchema(),
1947 $prefix
1948 ) );
1949
1950 // Update the prefix for all local connections...
1951 $this->forEachOpenConnection( function ( IDatabase $db ) use ( $prefix ) {
1952 if ( !$db->getLBInfo( 'foreign' ) ) {
1953 $db->tablePrefix( $prefix );
1954 }
1955 } );
1956 }
1957
1958 public function redefineLocalDomain( $domain ) {
1959 $this->closeAll();
1960
1961 $this->setLocalDomain( DatabaseDomain::newFromId( $domain ) );
1962 }
1963
1967 private function setLocalDomain( DatabaseDomain $domain ) {
1968 $this->localDomain = $domain;
1969 // In case a caller assumes that the domain ID is simply <db>-<prefix>, which is almost
1970 // always true, gracefully handle the case when they fail to account for escaping.
1971 if ( $this->localDomain->getTablePrefix() != '' ) {
1972 $this->localDomainIdAlias =
1973 $this->localDomain->getDatabase() . '-' . $this->localDomain->getTablePrefix();
1974 } else {
1975 $this->localDomainIdAlias = $this->localDomain->getDatabase();
1976 }
1977 }
1978
1985 final protected function getScopedPHPBehaviorForCommit() {
1986 if ( PHP_SAPI != 'cli' ) { // https://bugs.php.net/bug.php?id=47540
1987 $old = ignore_user_abort( true ); // avoid half-finished operations
1988 return new ScopedCallback( function () use ( $old ) {
1989 ignore_user_abort( $old );
1990 } );
1991 }
1992
1993 return null;
1994 }
1995
1996 function __destruct() {
1997 // Avoid connection leaks for sanity
1998 $this->disable();
1999 }
2000}
2001
2005class_alias( LoadBalancer::class, 'LoadBalancer' );
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
if(defined( 'MW_SETUP_CALLBACK')) $fname
Customization point after all loading (constants, functions, classes, DefaultSettings,...
Definition Setup.php:121
A collection of static methods to play with arrays.
static pickRandom( $weights)
Given an array of non-normalised probabilities, this function will select an element and return the a...
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:58
A BagOStuff object with no objects in it.
Multi-datacenter aware caching interface.
Exception class for attempted DB access.
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Definition DBConnRef.php:15
Database error base class.
Definition DBError.php:30
Class to handle database/prefix specification for IDatabase domains.
Relational database abstraction object.
Definition Database.php:48
runOnTransactionPreCommitCallbacks()
Actually consume and run any "on transaction pre-commit" callbacks.
runTransactionListenerCallbacks( $trigger)
Actually run any "transaction listener" callbacks.
restoreFlags( $state=self::RESTORE_PRIOR)
Restore the flags to their prior state before the last setFlag/clearFlag call.
Definition Database.php:809
setTrxEndCallbackSuppression( $suppress)
Whether to disable running of post-COMMIT/ROLLBACK callbacks.
static factory( $dbType, $p=[], $connect=self::NEW_CONNECTED)
Construct a Database subclass instance given a database type and parameters.
Definition Database.php:426
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
Definition Database.php:629
trxLevel()
Gets the current transaction level.
Definition Database.php:579
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
Definition Database.php:787
static attributesFromType( $dbType, $driver=null)
Definition Database.php:481
setLBInfo( $name, $value=null)
Set the LB info array, or a member of it.
Definition Database.php:641
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
Definition Database.php:822
Database connection, tracking, load balancing, and transaction manager for a cluster.
array[] $servers
Map of (server index => server config array)
masterRunningReadOnly( $domain, IDatabase $conn=null)
undoTransactionRoundFlags(Database $conn)
object string $profiler
Class name or object With profileIn/profileOut methods.
approveMasterChanges(array $options)
Perform all pre-commit checks for things like replication safety.
bool $allowLagged
Whether to disregard replica DB lag as a factor in replica DB selection.
getAnyOpenConnection( $i, $flags=0)
Get any open connection to a given server index, local or foreign.
callable $errorLogger
Exception logger.
int $waitTimeout
Seconds to spend waiting on replica DB lag to resolve.
bool DBMasterPos $waitForPos
False if not set.
callable $deprecationLogger
Deprecation logger.
flushReplicaSnapshots( $fname=__METHOD__)
Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshots.
setLocalDomain(DatabaseDomain $domain)
getLocalDomainID()
Get the local (and default) database domain ID of connection handles.
string $trxRoundStage
Stage of the current transaction round in the transaction round life-cycle.
getServerType( $i)
Get DB type of the server with the specified index.
flushMasterSnapshots( $fname=__METHOD__)
Commit all master DB transactions so as to flush any REPEATABLE-READ or SSI snapshots.
setTableAliases(array $aliases)
Make certain table names use their own database, schema, and table prefix when passed into SQL querie...
applyTransactionRoundFlags(Database $conn)
Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round.
commitAll( $fname=__METHOD__)
Commit transactions on all open connections.
string $agent
Agent name for query profiling.
waitFor( $pos)
Set the master wait position.
getMasterPos()
Get the current master position for chronology control purposes.
setDomainPrefix( $prefix)
Set a new table prefix for the existing local domain ID for testing.
finalizeMasterChanges()
Run pre-commit callbacks and defer execution of post-commit callbacks.
bool $laggedReplicaMode
Whether the generic reader fell back to a lagged replica DB.
forEachOpenMasterConnection( $callback, array $params=[])
Call a function with each open connection object to a master.
redefineLocalDomain( $domain)
Close all connection and redefine the local domain for testing or schema creation.
array[] $trxRecurringCallbacks
Map of (name => callable)
pickReaderIndex(array $loads, $domain=false)
getLazyConnectionRef( $db, $groups=[], $domain=false, $flags=0)
Get a database connection handle reference without connecting yet.
rollbackMasterChanges( $fname=__METHOD__)
Issue ROLLBACK only on master, only if queries were done on connection.
openForeignConnection( $i, $domain, $flags=0)
Open a connection to a foreign DB, or return one if it is already open.
laggedReplicaUsed()
Checks whether the database for generic connections this request was both:
doWait( $index, $open=false, $timeout=null)
Wait for a given replica DB to catch up to the master pos stored in $this.
Database[][][] $conns
Map of (connection category => server index => IDatabase[])
getConnectionRef( $db, $groups=[], $domain=false, $flags=0)
Get a database connection handle reference.
string[] $indexAliases
Map of (index alias => index)
isOpen( $index)
Test if the specified index represents an open connection.
array $loadMonitorConfig
The LoadMonitor configuration.
array[] $groupLoads
Map of (group => server index => weight)
isNonZeroLoad( $i)
Returns true if the specified index is valid and has non-zero load.
runMasterTransactionIdleCallbacks()
Consume and run all pending post-COMMIT/ROLLBACK callbacks and commit dangling transactions.
getMaintenanceConnectionRef( $db, $groups=[], $domain=false, $flags=0)
Get a maintenance database connection handle reference for migrations and schema changes.
getLoadMonitor()
Get a LoadMonitor instance.
setTransactionListener( $name, callable $callback=null)
Set a callback via IDatabase::setTransactionListener() on all current and future master connections o...
forEachOpenReplicaConnection( $callback, array $params=[])
Call a function with each open replica DB connection object.
lastMasterChangeTimestamp()
Get the timestamp of the latest write query done by this thread.
beginMasterChanges( $fname=__METHOD__)
Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
reuseConnection(IDatabase $conn)
Mark a foreign connection as being available for reuse under a different DB domain.
closeAll()
Close all open connections.
getLagTimes( $domain=false)
Get an estimate of replication lag (in seconds) for each server.
string bool $trxRoundId
String if a requested DBO_TRX transaction round is active.
safeWaitForMasterPos(IDatabase $conn, $pos=false, $timeout=null)
closeConnection(IDatabase $conn)
Close a connection.
runMasterTransactionListenerCallbacks()
Run all recurring post-COMMIT/ROLLBACK listener callbacks.
getReadOnlyReason( $domain=false, IDatabase $conn=null)
getRandomNonLagged(array $loads, $domain=false, $maxLag=INF)
bool $cliMode
Whether this PHP instance is for a CLI script.
safeGetLag(IDatabase $conn)
Get the lag in seconds for a given connection, or zero if this load balancer does not have replicatio...
openConnection( $i, $domain=false, $flags=0)
Open a connection to the server given by the specified index.
hasOrMadeRecentMasterChanges( $age=null)
Check if this load balancer object had any recent or still pending writes issued against it by this P...
string $hostname
Current server name.
disable()
Disable this load balancer.
getReaderIndex( $group=false, $domain=false)
Get the index of the reader connection, which may be a replica DB.
DatabaseDomain $localDomain
Local Domain ID and default for selectDB() calls.
float[] $loads
Map of (server index => weight)
setIndexAliases(array $aliases)
Convert certain index names to alternative names before querying the DB.
getConnection( $i, $groups=[], $domain=false, $flags=0)
Get a connection handle by server index.
TransactionProfiler $trxProfiler
getScopedPHPBehaviorForCommit()
Make PHP ignore user aborts/disconnects until the returned value leaves scope.
getServerName( $i)
Get the host name or IP address of the server with the specified index.
commitMasterChanges( $fname=__METHOD__)
Issue COMMIT on all open master connections to flush changes and view snapshots.
getLaggedReplicaMode( $domain=false)
forEachOpenConnection( $callback, array $params=[])
Call a function with each open connection object.
haveIndex( $i)
Returns true if the specified index is a valid server index.
reallyOpenConnection(array $server, DatabaseDomain $domain)
Open a new network connection to a server (uncached)
int $connsOpened
Total connections opened.
pendingMasterChangeCallers()
Get the list of callers that have pending master changes.
bool $connectionAttempted
Whether any connection has been attempted yet.
hasMasterChanges()
Whether there are pending changes or callbacks in a transaction by this thread.
openLocalConnection( $i, $flags=0)
Open a connection to a local DB, or return one if it is already open.
callable null $chronologyCallback
Callback to run before the first connection attempt.
getMaxLag( $domain=false)
Get the hostname and lag time of the most-lagged replica DB.
Database $errorConnection
DB connection object that caused a problem.
int $readIndex
The generic (not query grouped) replica DB index (of $mServers)
bool $allReplicasDownMode
Whether the generic reader fell back to a lagged replica DB.
string $lastError
The last DB selection or connection error.
string $localDomainIdAlias
Alternate ID string for the domain instead of DatabaseDomain::getId()
getServerInfo( $i)
Return the server info structure for a given index, or false if the index is invalid.
allowLagged( $mode=null)
Disables/enables lag checks.
getServerCount()
Get the number of defined servers (not the number of open connections)
string bool $readOnlyReason
Reason the LB is read-only or false if not.
waitForOne( $pos, $timeout=null)
Set the master wait position and wait for a "generic" replica DB to catch up to it.
waitForAll( $pos, $timeout=null)
Set the master wait position and wait for ALL replica DBs to catch up to it.
__construct(array $params)
Construct a manager of IDatabase connection objects.
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Helper class that detects high-contention DB queries via profiling calls.
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition deferred.txt:11
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
see documentation in includes Linker php for Linker::makeImageLink & $time
Definition hooks.txt:1841
The index of the header message $result[1]=The index of the body text message $result[2 through n]=Parameters passed to body text message. Please note the header message cannot receive/use parameters. 'ImgAuthModifyHeaders':Executed just before a file is streamed to a user via img_auth.php, allowing headers to be modified beforehand. $title:LinkTarget object & $headers:HTTP headers(name=> value, names are case insensitive). Two headers get special handling:If-Modified-Since(value must be a valid HTTP date) and Range(must be of the form "bytes=(\d*-\d*)") will be honored when streaming the file. 'ImportHandleLogItemXMLTag':When parsing a XML tag in a log item. Return false to stop further processing of the tag $reader:XMLReader object $logInfo:Array of information 'ImportHandlePageXMLTag':When parsing a XML tag in a page. Return false to stop further processing of the tag $reader:XMLReader object & $pageInfo:Array of information 'ImportHandleRevisionXMLTag':When parsing a XML tag in a page revision. Return false to stop further processing of the tag $reader:XMLReader object $pageInfo:Array of page information $revisionInfo:Array of revision information 'ImportHandleToplevelXMLTag':When parsing a top level XML tag. Return false to stop further processing of the tag $reader:XMLReader object 'ImportHandleUnknownUser':When a user doesn 't exist locally, this hook is called to give extensions an opportunity to auto-create it. If the auto-creation is successful, return false. $name:User name 'ImportHandleUploadXMLTag':When parsing a XML tag in a file upload. Return false to stop further processing of the tag $reader:XMLReader object $revisionInfo:Array of information 'ImportLogInterwikiLink':Hook to change the interwiki link used in log entries and edit summaries for transwiki imports. & $fullInterwikiPrefix:Interwiki prefix, may contain colons. & $pageTitle:String that contains page title. 'ImportSources':Called when reading from the $wgImportSources configuration variable. Can be used to lazy-load the import sources list. & $importSources:The value of $wgImportSources. Modify as necessary. See the comment in DefaultSettings.php for the detail of how to structure this array. 'InfoAction':When building information to display on the action=info page. $context:IContextSource object & $pageInfo:Array of information 'InitializeArticleMaybeRedirect':MediaWiki check to see if title is a redirect. & $title:Title object for the current page & $request:WebRequest & $ignoreRedirect:boolean to skip redirect check & $target:Title/string of redirect target & $article:Article object 'InternalParseBeforeLinks':during Parser 's internalParse method before links but after nowiki/noinclude/includeonly/onlyinclude and other processings. & $parser:Parser object & $text:string containing partially parsed text & $stripState:Parser 's internal StripState object 'InternalParseBeforeSanitize':during Parser 's internalParse method just before the parser removes unwanted/dangerous HTML tags and after nowiki/noinclude/includeonly/onlyinclude and other processings. Ideal for syntax-extensions after template/parser function execution which respect nowiki and HTML-comments. & $parser:Parser object & $text:string containing partially parsed text & $stripState:Parser 's internal StripState object 'InterwikiLoadPrefix':When resolving if a given prefix is an interwiki or not. Return true without providing an interwiki to continue interwiki search. $prefix:interwiki prefix we are looking for. & $iwData:output array describing the interwiki with keys iw_url, iw_local, iw_trans and optionally iw_api and iw_wikiid. 'InvalidateEmailComplete':Called after a user 's email has been invalidated successfully. $user:user(object) whose email is being invalidated 'IRCLineURL':When constructing the URL to use in an IRC notification. Callee may modify $url and $query, URL will be constructed as $url . $query & $url:URL to index.php & $query:Query string $rc:RecentChange object that triggered url generation 'IsFileCacheable':Override the result of Article::isFileCacheable()(if true) & $article:article(object) being checked 'IsTrustedProxy':Override the result of IP::isTrustedProxy() & $ip:IP being check & $result:Change this value to override the result of IP::isTrustedProxy() 'IsUploadAllowedFromUrl':Override the result of UploadFromUrl::isAllowedUrl() $url:URL used to upload from & $allowed:Boolean indicating if uploading is allowed for given URL 'isValidEmailAddr':Override the result of Sanitizer::validateEmail(), for instance to return false if the domain name doesn 't match your organization. $addr:The e-mail address entered by the user & $result:Set this and return false to override the internal checks 'isValidPassword':Override the result of User::isValidPassword() $password:The password entered by the user & $result:Set this and return false to override the internal checks $user:User the password is being validated for 'Language::getMessagesFileName':$code:The language code or the language we 're looking for a messages file for & $file:The messages file path, you can override this to change the location. 'LanguageGetMagic':DEPRECATED since 1.16! Use $magicWords in a file listed in $wgExtensionMessagesFiles instead. Use this to define synonyms of magic words depending of the language & $magicExtensions:associative array of magic words synonyms $lang:language code(string) 'LanguageGetNamespaces':Provide custom ordering for namespaces or remove namespaces. Do not use this hook to add namespaces. Use CanonicalNamespaces for that. & $namespaces:Array of namespaces indexed by their numbers 'LanguageGetSpecialPageAliases':DEPRECATED! Use $specialPageAliases in a file listed in $wgExtensionMessagesFiles instead. Use to define aliases of special pages names depending of the language & $specialPageAliases:associative array of magic words synonyms $lang:language code(string) 'LanguageGetTranslatedLanguageNames':Provide translated language names. & $names:array of language code=> language name $code:language of the preferred translations 'LanguageLinks':Manipulate a page 's language links. This is called in various places to allow extensions to define the effective language links for a page. $title:The page 's Title. & $links:Array with elements of the form "language:title" in the order that they will be output. & $linkFlags:Associative array mapping prefixed links to arrays of flags. Currently unused, but planned to provide support for marking individual language links in the UI, e.g. for featured articles. 'LanguageSelector':Hook to change the language selector available on a page. $out:The output page. $cssClassName:CSS class name of the language selector. 'LinkBegin':DEPRECATED since 1.28! Use HtmlPageLinkRendererBegin instead. Used when generating internal and interwiki links in Linker::link(), before processing starts. Return false to skip default processing and return $ret. See documentation for Linker::link() for details on the expected meanings of parameters. $skin:the Skin object $target:the Title that the link is pointing to & $html:the contents that the< a > tag should have(raw HTML) $result
Definition hooks.txt:2042
This code would result in ircNotify being run twice when an article is and once for brion Hooks can return three possible true was required This is the default since MediaWiki *some string
Definition hooks.txt:181
null means default in associative array with keys and values unescaped Should be merged with default with a value of false meaning to suppress the attribute in associative array with keys and values unescaped & $options
Definition hooks.txt:2050
either a unescaped string or a HtmlArmor object after in associative array form externallinks including delete and has completed for all link tables whether this was an auto creation use $formDescriptor instead default is conds Array Extra conditions for the No matching items in log is displayed if loglist is empty msgKey Array If you want a nice box with a set this to the key of the message First element is the message key
Definition hooks.txt:2214
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction you ll probably need to make sure the header is varied on and they can depend only on the ResourceLoaderContext $context
Definition hooks.txt:2885
Allows to change the fields on the form that will be generated $name
Definition hooks.txt:302
processing should stop and the error should be shown to the user * false
Definition hooks.txt:187
returning false will NOT prevent logging $e
Definition hooks.txt:2226
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback function
Definition injection.txt:30
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition injection.txt:37
An object representing a master or replica DB position in a replicated setup.
hasReached(DBMasterPos $pos)
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
lastDoneWrites()
Returns the last time the connection may have been used for write queries.
getDomainID()
Return the currently selected domain ID.
assertNoOpenTransactions()
Assert that all explicit transactions or atomic sections have been closed.
getServer()
Get the server hostname or IP address.
isOpen()
Is a connection to the database open?
setTransactionListener( $name, callable $callback=null)
Run a callback after each time any transaction commits or rolls back.
setLBInfo( $name, $value=null)
Set the LB info array, or a member of it.
getLBInfo( $name=null)
Get properties passed down from the server info array of the load balancer.
flushSnapshot( $fname=__METHOD__)
Commit any transaction but error out if writes or callbacks are pending.
pendingWriteQueryDuration( $type=self::ESTIMATE_TOTAL)
Get the time spend running write queries for this transaction.
getLag()
Get the amount of replication lag for this database server.
rollback( $fname=__METHOD__, $flush='')
Rollback a transaction previously started using begin().
close()
Close the database connection.
ping(&$rtt=null)
Ping the server and try to reconnect if it there is no connection.
masterPosWait(DBMasterPos $pos, $timeout)
Wait for the replica DB to catch up to a given master position.
commit( $fname=__METHOD__, $flush='')
Commits a transaction previously started using begin().
writesOrCallbacksPending()
Whether there is a transaction open with either possible write queries or unresolved pre-commit/commi...
pendingWriteCallers()
Get the list of method names that did write queries for this transaction.
Database cluster connection, tracking, load balancing, and transaction manager interface.
An interface for database load monitoring.
$cache
Definition mcc.php:33
storage can be distributed across multiple servers
Definition memcached.txt:33
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
$params