Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
57.18% |
502 / 878 |
|
34.18% |
27 / 79 |
CRAP | |
0.00% |
0 / 1 |
LoadBalancer | |
57.18% |
502 / 878 |
|
34.18% |
27 / 79 |
8260.08 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
configure | |
86.67% |
39 / 45 |
|
0.00% |
0 / 1 |
13.40 | |||
newTrackedConnectionsArray | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
1 | |||
getClusterName | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getLocalDomainID | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
resolveDomainID | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
resolveDomainInstance | |
100.00% |
13 / 13 |
|
100.00% |
1 / 1 |
6 | |||
resolveGroups | |
73.33% |
11 / 15 |
|
0.00% |
0 / 1 |
14.73 | |||
sanitizeConnectionFlags | |
77.78% |
7 / 9 |
|
0.00% |
0 / 1 |
4.18 | |||
enforceConnectionFlags | |
50.00% |
4 / 8 |
|
0.00% |
0 / 1 |
6.00 | |||
getRandomNonLagged | |
45.45% |
10 / 22 |
|
0.00% |
0 / 1 |
14.95 | |||
getReaderIndex | |
78.26% |
18 / 23 |
|
0.00% |
0 / 1 |
8.66 | |||
getExistingReaderIndex | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
pickReaderIndex | |
55.56% |
15 / 27 |
|
0.00% |
0 / 1 |
18.78 | |||
waitForAll | |
0.00% |
0 / 24 |
|
0.00% |
0 / 1 |
72 | |||
serverHasLoadInAnyGroup | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
getAnyOpenConnection | |
93.33% |
14 / 15 |
|
0.00% |
0 / 1 |
7.01 | |||
pickAnyOpenConnection | |
45.45% |
5 / 11 |
|
0.00% |
0 / 1 |
6.60 | |||
awaitSessionPrimaryPos | |
7.14% |
3 / 42 |
|
0.00% |
0 / 1 |
127.29 | |||
getConnection | |
66.67% |
6 / 9 |
|
0.00% |
0 / 1 |
4.59 | |||
getConnectionInternal | |
93.33% |
14 / 15 |
|
0.00% |
0 / 1 |
6.01 | |||
getServerConnection | |
73.68% |
14 / 19 |
|
0.00% |
0 / 1 |
7.89 | |||
getConnectionRef | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
getMaintenanceConnectionRef | |
55.56% |
5 / 9 |
|
0.00% |
0 / 1 |
5.40 | |||
reuseOrOpenConnectionForNewRef | |
92.68% |
38 / 41 |
|
0.00% |
0 / 1 |
15.09 | |||
assertConnectionDomain | |
25.00% |
1 / 4 |
|
0.00% |
0 / 1 |
3.69 | |||
getServerAttributes | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
1 | |||
reallyOpenConnection | |
78.79% |
52 / 66 |
|
0.00% |
0 / 1 |
14.61 | |||
loadSessionPrimaryPos | |
71.43% |
5 / 7 |
|
0.00% |
0 / 1 |
6.84 | |||
reportConnectionError | |
60.61% |
20 / 33 |
|
0.00% |
0 / 1 |
10.00 | |||
getServerCount | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
hasReplicaServers | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
hasStreamingReplicaServers | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getServerName | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getServerInfo | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getServerType | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getPrimaryPos | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 | |||
reconfigure | |
89.47% |
17 / 19 |
|
0.00% |
0 / 1 |
8.07 | |||
disable | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
closeAll | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
2 | |||
closeConnection | |
0.00% |
0 / 23 |
|
0.00% |
0 / 1 |
42 | |||
finalizePrimaryChanges | |
100.00% |
13 / 13 |
|
100.00% |
1 / 1 |
3 | |||
approvePrimaryChanges | |
28.57% |
10 / 35 |
|
0.00% |
0 / 1 |
46.44 | |||
beginPrimaryChanges | |
69.23% |
9 / 13 |
|
0.00% |
0 / 1 |
3.26 | |||
commitPrimaryChanges | |
61.11% |
11 / 18 |
|
0.00% |
0 / 1 |
6.47 | |||
runPrimaryTransactionIdleCallbacks | |
52.63% |
20 / 38 |
|
0.00% |
0 / 1 |
20.63 | |||
runPrimaryTransactionListenerCallbacks | |
60.00% |
9 / 15 |
|
0.00% |
0 / 1 |
5.02 | |||
rollbackPrimaryChanges | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 | |||
flushPrimarySessions | |
80.00% |
4 / 5 |
|
0.00% |
0 / 1 |
3.07 | |||
assertTransactionRoundStage | |
16.67% |
2 / 12 |
|
0.00% |
0 / 1 |
4.31 | |||
syncConnectionRoundState | |
100.00% |
14 / 14 |
|
100.00% |
1 / 1 |
7 | |||
flushReplicaSnapshots | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
5 | |||
flushPrimarySnapshots | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
2 | |||
hasPrimaryConnection | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
hasPrimaryChanges | |
75.00% |
3 / 4 |
|
0.00% |
0 / 1 |
3.14 | |||
lastPrimaryChangeTimestamp | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
hasOrMadeRecentPrimaryChanges | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
pendingPrimaryChangeCallers | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
2 | |||
explicitTrxActive | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
setLaggedReplicaMode | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
laggedReplicaUsed | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getReadOnlyReason | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
12 | |||
isPrimaryRunningReadOnly | |
92.31% |
24 / 26 |
|
0.00% |
0 / 1 |
4.01 | |||
pingAll | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
12 | |||
getOpenConnections | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
4 | |||
getOpenPrimaryConnections | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
3 | |||
getMaxLag | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
30 | |||
getLagTimes | |
90.91% |
20 / 22 |
|
0.00% |
0 / 1 |
4.01 | |||
waitForPrimaryPos | |
0.00% |
0 / 27 |
|
0.00% |
0 / 1 |
72 | |||
setTransactionListener | |
80.00% |
4 / 5 |
|
0.00% |
0 / 1 |
3.07 | |||
setTableAliases | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
setIndexAliases | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
setDomainAliases | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
setLocalDomainPrefix | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 | |||
redefineLocalDomain | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
setTempTablesOnlyMode | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
stringifyConn | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
fieldHasBit | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getConnLogContext | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
2 | |||
setMockTime | n/a |
0 / 0 |
n/a |
0 / 0 |
1 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | namespace Wikimedia\Rdbms; |
21 | |
22 | use ArrayUtils; |
23 | use InvalidArgumentException; |
24 | use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface; |
25 | use LogicException; |
26 | use Psr\Log\LoggerInterface; |
27 | use Psr\Log\NullLogger; |
28 | use RuntimeException; |
29 | use Throwable; |
30 | use UnexpectedValueException; |
31 | use Wikimedia\ObjectCache\BagOStuff; |
32 | use Wikimedia\ObjectCache\EmptyBagOStuff; |
33 | use Wikimedia\ObjectCache\WANObjectCache; |
34 | use Wikimedia\ScopedCallback; |
35 | use Wikimedia\Stats\NullStatsdDataFactory; |
36 | |
37 | /** |
38 | * @see ILoadBalancer |
39 | * @ingroup Database |
40 | */ |
41 | class LoadBalancer implements ILoadBalancerForOwner { |
42 | /** @var ILoadMonitor */ |
43 | private $loadMonitor; |
44 | /** @var BagOStuff */ |
45 | private $srvCache; |
46 | /** @var WANObjectCache */ |
47 | private $wanCache; |
48 | /** @var DatabaseFactory */ |
49 | private $databaseFactory; |
50 | |
51 | /** @var TransactionProfiler */ |
52 | private $trxProfiler; |
53 | /** @var StatsdDataFactoryInterface */ |
54 | private $statsd; |
55 | /** @var LoggerInterface */ |
56 | private $logger; |
57 | /** @var callable Exception logger */ |
58 | private $errorLogger; |
59 | /** @var DatabaseDomain Local DB domain ID and default for new connections */ |
60 | private $localDomain; |
61 | /** @var bool Whether this PHP instance is for a CLI script */ |
62 | private $cliMode; |
63 | |
64 | /** @var array<string,array<int,Database[]>> Map of (connection pool => server index => Database[]) */ |
65 | private $conns; |
66 | |
67 | /** @var string The name of the DB cluster */ |
68 | private $clusterName; |
69 | /** @var ServerInfo */ |
70 | private $serverInfo; |
71 | /** @var array<string,array<int,int|float>> Map of (group => server index => weight) */ |
72 | private $groupLoads; |
73 | /** @var string|null Default query group to use with getConnection() */ |
74 | private $defaultGroup; |
75 | |
76 | /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */ |
77 | private $tableAliases = []; |
78 | /** @var string[] Map of (index alias => index) */ |
79 | private $indexAliases = []; |
80 | /** @var DatabaseDomain[]|string[] Map of (domain alias => DB domain) */ |
81 | private $domainAliases = []; |
82 | /** @var callable[] Map of (name => callable) */ |
83 | private $trxRecurringCallbacks = []; |
84 | /** @var bool[] Map of (domain => whether to use "temp tables only" mode) */ |
85 | private $tempTablesOnlyMode = []; |
86 | |
87 | /** @var string|null Active explicit transaction round owner or false if none */ |
88 | private $trxRoundFname = null; |
89 | /** @var string Stage of the current transaction round in the transaction round life-cycle */ |
90 | private $trxRoundStage = self::ROUND_CURSORY; |
91 | /** @var int[] The group replica server indexes keyed by group */ |
92 | private $readIndexByGroup = []; |
93 | /** @var DBPrimaryPos|null Replication sync position or false if not set */ |
94 | private $waitForPos; |
95 | /** @var bool Whether a lagged replica DB was used */ |
96 | private $laggedReplicaMode = false; |
97 | /** @var string|false Reason this instance is read-only or false if not */ |
98 | private $readOnlyReason = false; |
99 | /** @var int Total number of new connections ever made with this instance */ |
100 | private $connectionCounter = 0; |
101 | /** @var bool */ |
102 | private $disabled = false; |
103 | private ?ChronologyProtector $chronologyProtector = null; |
104 | /** @var bool Whether the session consistency callback already executed */ |
105 | private $chronologyProtectorCalled = false; |
106 | |
107 | /** @var Database|null The last connection handle that caused a problem */ |
108 | private $lastErrorConn; |
109 | |
110 | /** @var DatabaseDomain[] Map of (domain ID => domain instance) */ |
111 | private $nonLocalDomainCache = []; |
112 | |
113 | /** |
114 | * @var int Modification counter for invalidating connections held by |
115 | * DBConnRef instances. This is bumped by reconfigure(). |
116 | */ |
117 | private $modcount = 0; |
118 | |
119 | /** The "server index" LB info key; see {@link IDatabase::getLBInfo()} */ |
120 | private const INFO_SERVER_INDEX = 'serverIndex'; |
121 | /** The "connection category" LB info key; see {@link IDatabase::getLBInfo()} */ |
122 | private const INFO_CONN_CATEGORY = 'connCategory'; |
123 | |
124 | /** Warn when this many connection are held */ |
125 | private const CONN_HELD_WARN_THRESHOLD = 10; |
126 | |
127 | /** Default 'waitTimeout' when unspecified */ |
128 | private const MAX_WAIT_DEFAULT = 10; |
129 | /** Seconds to cache primary DB server read-only status */ |
130 | private const TTL_CACHE_READONLY = 5; |
131 | |
132 | /** A category of connections that are tracked and transaction round aware */ |
133 | private const CATEGORY_ROUND = 'round'; |
134 | /** A category of connections that are tracked and in autocommit-mode */ |
135 | private const CATEGORY_AUTOCOMMIT = 'auto-commit'; |
136 | /** A category of connections that are untracked and in gauge-mode */ |
137 | private const CATEGORY_GAUGE = 'gauge'; |
138 | |
139 | /** Transaction round, explicit or implicit, has not finished writing */ |
140 | private const ROUND_CURSORY = 'cursory'; |
141 | /** Transaction round writes are complete and ready for pre-commit checks */ |
142 | private const ROUND_FINALIZED = 'finalized'; |
143 | /** Transaction round passed final pre-commit checks */ |
144 | private const ROUND_APPROVED = 'approved'; |
145 | /** Transaction round was committed and post-commit callbacks must be run */ |
146 | private const ROUND_COMMIT_CALLBACKS = 'commit-callbacks'; |
147 | /** Transaction round was rolled back and post-rollback callbacks must be run */ |
148 | private const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks'; |
149 | /** Transaction round encountered an error */ |
150 | private const ROUND_ERROR = 'error'; |
151 | |
152 | /** @var int Idiom for getExistingReaderIndex() meaning "no index selected" */ |
153 | private const READER_INDEX_NONE = -1; |
154 | |
155 | public function __construct( array $params ) { |
156 | $this->configure( $params ); |
157 | |
158 | $this->conns = self::newTrackedConnectionsArray(); |
159 | } |
160 | |
161 | /** |
162 | * @param array $params A database configuration array, see $wgLBFactoryConf. |
163 | * |
164 | * @return void |
165 | */ |
166 | protected function configure( array $params ): void { |
167 | $this->localDomain = isset( $params['localDomain'] ) |
168 | ? DatabaseDomain::newFromId( $params['localDomain'] ) |
169 | : DatabaseDomain::newUnspecified(); |
170 | |
171 | $this->serverInfo = new ServerInfo(); |
172 | $this->groupLoads = [ self::GROUP_GENERIC => [] ]; |
173 | foreach ( $this->serverInfo->normalizeServerMaps( $params['servers'] ?? [] ) as $i => $server ) { |
174 | $this->serverInfo->addServer( $i, $server ); |
175 | foreach ( $server['groupLoads'] as $group => $weight ) { |
176 | $this->groupLoads[$group][$i] = $weight; |
177 | } |
178 | $this->groupLoads[self::GROUP_GENERIC][$i] = $server['load']; |
179 | } |
180 | // If the cluster name is not specified, fallback to the current primary name |
181 | $this->clusterName = $params['clusterName'] |
182 | ?? $this->serverInfo->getServerName( ServerInfo::WRITER_INDEX ); |
183 | |
184 | if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) { |
185 | $this->readOnlyReason = $params['readOnlyReason']; |
186 | } |
187 | |
188 | $this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff(); |
189 | $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty(); |
190 | |
191 | // Note: this parameter is normally absent. It is injectable for testing purposes only. |
192 | $this->databaseFactory = $params['databaseFactory'] ?? new DatabaseFactory( $params ); |
193 | |
194 | $this->errorLogger = $params['errorLogger'] ?? static function ( Throwable $e ) { |
195 | trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING ); |
196 | }; |
197 | $this->logger = $params['logger'] ?? new NullLogger(); |
198 | |
199 | $this->trxProfiler = $params['trxProfiler'] ?? new TransactionProfiler(); |
200 | $this->statsd = $params['statsdDataFactory'] ?? new NullStatsdDataFactory(); |
201 | |
202 | // Set up LoadMonitor |
203 | $loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => LoadMonitorNull::class ]; |
204 | $compat = [ |
205 | 'LoadMonitor' => LoadMonitor::class, |
206 | 'LoadMonitorNull' => LoadMonitorNull::class |
207 | ]; |
208 | $class = $loadMonitorConfig['class']; |
209 | // @phan-suppress-next-line PhanImpossibleCondition |
210 | if ( isset( $compat[$class] ) ) { |
211 | $class = $compat[$class]; |
212 | } |
213 | $this->loadMonitor = new $class( |
214 | $this, $this->srvCache, $this->wanCache, $loadMonitorConfig ); |
215 | $this->loadMonitor->setLogger( $this->logger ); |
216 | $this->loadMonitor->setStatsdDataFactory( $this->statsd ); |
217 | |
218 | if ( isset( $params['chronologyProtector'] ) ) { |
219 | $this->chronologyProtector = $params['chronologyProtector']; |
220 | } |
221 | |
222 | if ( isset( $params['roundStage'] ) ) { |
223 | if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) { |
224 | $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS; |
225 | } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) { |
226 | $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS; |
227 | } |
228 | } |
229 | |
230 | $this->cliMode = $params['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ); |
231 | |
232 | $group = $params['defaultGroup'] ?? self::GROUP_GENERIC; |
233 | $this->defaultGroup = isset( $this->groupLoads[ $group ] ) ? $group : self::GROUP_GENERIC; |
234 | } |
235 | |
236 | private static function newTrackedConnectionsArray() { |
237 | // Note that CATEGORY_GAUGE connections are untracked |
238 | return [ |
239 | self::CATEGORY_ROUND => [], |
240 | self::CATEGORY_AUTOCOMMIT => [] |
241 | ]; |
242 | } |
243 | |
244 | public function getClusterName(): string { |
245 | return $this->clusterName; |
246 | } |
247 | |
248 | public function getLocalDomainID(): string { |
249 | return $this->localDomain->getId(); |
250 | } |
251 | |
252 | public function resolveDomainID( $domain ): string { |
253 | return $this->resolveDomainInstance( $domain )->getId(); |
254 | } |
255 | |
256 | /** |
257 | * @param DatabaseDomain|string|false $domain |
258 | * @return DatabaseDomain |
259 | */ |
260 | final protected function resolveDomainInstance( $domain ): DatabaseDomain { |
261 | if ( $domain instanceof DatabaseDomain ) { |
262 | return $domain; // already a domain instance |
263 | } elseif ( $domain === false || $domain === $this->localDomain->getId() ) { |
264 | return $this->localDomain; |
265 | } elseif ( isset( $this->domainAliases[$domain] ) ) { |
266 | $this->domainAliases[$domain] = |
267 | DatabaseDomain::newFromId( $this->domainAliases[$domain] ); |
268 | |
269 | return $this->domainAliases[$domain]; |
270 | } |
271 | |
272 | $cachedDomain = $this->nonLocalDomainCache[$domain] ?? null; |
273 | if ( $cachedDomain === null ) { |
274 | $cachedDomain = DatabaseDomain::newFromId( $domain ); |
275 | $this->nonLocalDomainCache = [ $domain => $cachedDomain ]; |
276 | } |
277 | |
278 | return $cachedDomain; |
279 | } |
280 | |
281 | /** |
282 | * Get the first group in $groups with assigned servers, falling back to the default group |
283 | * |
284 | * @param string[]|string|false $groups Query group(s) in preference order, [], or false |
285 | * @param int $i Specific server index or DB_PRIMARY/DB_REPLICA |
286 | * @return string Query group |
287 | */ |
288 | private function resolveGroups( $groups, $i ) { |
289 | // If a specific replica server was specified, then $groups makes no sense |
290 | if ( $i > 0 && $groups !== [] && $groups !== false ) { |
291 | $list = implode( ', ', (array)$groups ); |
292 | throw new LogicException( "Query group(s) ($list) given with server index (#$i)" ); |
293 | } |
294 | |
295 | if ( $groups === [] || $groups === false || $groups === $this->defaultGroup ) { |
296 | $resolvedGroup = $this->defaultGroup; |
297 | } elseif ( is_string( $groups ) ) { |
298 | $resolvedGroup = isset( $this->groupLoads[$groups] ) ? $groups : $this->defaultGroup; |
299 | } elseif ( is_array( $groups ) ) { |
300 | $resolvedGroup = $this->defaultGroup; |
301 | foreach ( $groups as $group ) { |
302 | if ( isset( $this->groupLoads[$group] ) ) { |
303 | $resolvedGroup = $group; |
304 | break; |
305 | } |
306 | } |
307 | } else { |
308 | $resolvedGroup = $this->defaultGroup; |
309 | } |
310 | |
311 | return $resolvedGroup; |
312 | } |
313 | |
314 | /** |
315 | * Sanitize connection flags provided by a call to getConnection() |
316 | * |
317 | * @param int $flags Bitfield of class CONN_* constants |
318 | * @param string $domain Database domain |
319 | * @return int Sanitized bitfield |
320 | */ |
321 | protected function sanitizeConnectionFlags( $flags, $domain ) { |
322 | if ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) { |
323 | // Callers use CONN_TRX_AUTOCOMMIT to bypass REPEATABLE-READ staleness without |
324 | // resorting to row locks (e.g. FOR UPDATE) or to make small out-of-band commits |
325 | // during larger transactions. This is useful for avoiding lock contention. |
326 | // Assuming all servers are of the same type (or similar), which is overwhelmingly |
327 | // the case, use the primary server information to get the attributes. The information |
328 | // for $i cannot be used since it might be DB_REPLICA, which might require connection |
329 | // attempts in order to be resolved into a real server index. |
330 | $attributes = $this->getServerAttributes( ServerInfo::WRITER_INDEX ); |
331 | if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) { |
332 | // The RDBMS does not support concurrent writes (e.g. SQLite), so attempts |
333 | // to use separate connections would just cause self-deadlocks. Note that |
334 | // REPEATABLE-READ staleness is not an issue since DB-level locking means |
335 | // that transactions are Strict Serializable anyway. |
336 | $flags &= ~self::CONN_TRX_AUTOCOMMIT; |
337 | $type = $this->serverInfo->getServerType( ServerInfo::WRITER_INDEX ); |
338 | $this->logger->info( __METHOD__ . ": CONN_TRX_AUTOCOMMIT disallowed ($type)" ); |
339 | } elseif ( isset( $this->tempTablesOnlyMode[$domain] ) ) { |
340 | // T202116: integration tests are active and queries should be all be using |
341 | // temporary clone tables (via prefix). Such tables are not visible across |
342 | // different connections nor can there be REPEATABLE-READ snapshot staleness, |
343 | // so use the same connection for everything. |
344 | $flags &= ~self::CONN_TRX_AUTOCOMMIT; |
345 | } |
346 | } |
347 | |
348 | return $flags; |
349 | } |
350 | |
351 | /** |
352 | * @param IDatabase $conn |
353 | * @param int $flags |
354 | * @throws DBUnexpectedError |
355 | */ |
356 | private function enforceConnectionFlags( IDatabase $conn, $flags ) { |
357 | if ( |
358 | self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) || |
359 | // Handles with open transactions are avoided since they might be subject |
360 | // to REPEATABLE-READ snapshots, which could affect the lag estimate query. |
361 | self::fieldHasBit( $flags, self::CONN_UNTRACKED_GAUGE ) |
362 | ) { |
363 | if ( $conn->trxLevel() ) { |
364 | throw new DBUnexpectedError( |
365 | $conn, |
366 | 'Handle requested with autocommit-mode yet it has a transaction' |
367 | ); |
368 | } |
369 | |
370 | $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode |
371 | } |
372 | } |
373 | |
374 | /** |
375 | * @param array<int,int|float> $loads Map of (server index => weight) for a load group |
376 | * @param int|float $sessionLagLimit Additional maximum lag threshold imposed by the session; |
377 | * use INF if none applies. Servers will count as lagged if their lag exceeds either this |
378 | * value or the configured "max lag" value. |
379 | * @return int|false Index of a non-lagged server with non-zero weight; false if none |
380 | */ |
381 | private function getRandomNonLagged( array $loads, $sessionLagLimit = INF ) { |
382 | $lags = $this->getLagTimes(); |
383 | |
384 | // Unset excessively lagged servers from the load group |
385 | foreach ( $lags as $i => $lag ) { |
386 | if ( $i !== ServerInfo::WRITER_INDEX ) { |
387 | // How much lag normally counts as "excessive" for this server |
388 | $maxServerLag = $this->serverInfo->getServerMaxLag( $i ); |
389 | // How much lag counts as "excessive" for this server given the session |
390 | $maxServerLag = min( $maxServerLag, $sessionLagLimit ); |
391 | |
392 | $srvName = $this->serverInfo->getServerName( $i ); |
393 | if ( $lag === false && !is_infinite( $maxServerLag ) ) { |
394 | $this->logger->debug( |
395 | __METHOD__ . ": server {db_server} is not replicating?", |
396 | [ 'db_server' => $srvName ] |
397 | ); |
398 | unset( $loads[$i] ); |
399 | } elseif ( $lag > $maxServerLag ) { |
400 | $this->logger->debug( |
401 | __METHOD__ . |
402 | ": server {db_server} has {lag} seconds of lag (>= {maxlag})", |
403 | [ 'db_server' => $srvName, 'lag' => $lag, 'maxlag' => $maxServerLag ] |
404 | ); |
405 | unset( $loads[$i] ); |
406 | } |
407 | } |
408 | } |
409 | |
410 | if ( array_sum( $loads ) == 0 ) { |
411 | // All the replicas with non-zero weight are lagged and the primary has zero load. |
412 | // Inform caller so that it can use switch to read-only mode and use a lagged replica. |
413 | return false; |
414 | } |
415 | |
416 | // Return a server index based on weighted random selection |
417 | return ArrayUtils::pickRandom( $loads ); |
418 | } |
419 | |
420 | public function getReaderIndex( $group = false ) { |
421 | $group = is_string( $group ) ? $group : self::GROUP_GENERIC; |
422 | |
423 | if ( !$this->serverInfo->hasReplicaServers() ) { |
424 | // There is only one possible server to use (the primary) |
425 | return ServerInfo::WRITER_INDEX; |
426 | } |
427 | |
428 | $index = $this->getExistingReaderIndex( $group ); |
429 | if ( $index !== self::READER_INDEX_NONE ) { |
430 | // A reader index was already selected for this query group. Keep using it, |
431 | // since any session replication position was already waited on and any |
432 | // active transaction will be reused (e.g. for point-in-time snapshots). |
433 | return $index; |
434 | } |
435 | |
436 | // Get the server weight array for this load group |
437 | $loads = $this->groupLoads[$group] ?? []; |
438 | if ( !$loads ) { |
439 | $this->logger->info( __METHOD__ . ": no loads for group $group" ); |
440 | return false; |
441 | } |
442 | |
443 | // Load any session replication positions, before any connection attempts, |
444 | // since reading them afterwards can only cause more delay due to possibly |
445 | // seeing even higher replication positions (e.g. from concurrent requests). |
446 | $this->loadSessionPrimaryPos(); |
447 | |
448 | // Scale the configured load weights according to each server's load/state. |
449 | // This can sometimes trigger server connections due to cache regeneration. |
450 | $this->loadMonitor->scaleLoads( $loads ); |
451 | |
452 | // Pick a server, accounting for load weights, lag, and session consistency |
453 | $i = $this->pickReaderIndex( $loads ); |
454 | if ( $i === false ) { |
455 | // Connection attempts failed |
456 | return false; |
457 | } |
458 | |
459 | // If data seen by queries is expected to reflect writes from a prior transaction, |
460 | // then wait for the chosen server to apply those changes. This is used to improve |
461 | // session consistency. |
462 | if ( !$this->awaitSessionPrimaryPos( $i ) ) { |
463 | // Data will be outdated compared to what was expected |
464 | $this->setLaggedReplicaMode(); |
465 | } |
466 | |
467 | // Keep using this server for DB_REPLICA handles for this group |
468 | if ( $i < 0 ) { |
469 | throw new UnexpectedValueException( "Cannot set a negative read server index" ); |
470 | } |
471 | $this->readIndexByGroup[$group] = $i; |
472 | |
473 | $serverName = $this->getServerName( $i ); |
474 | $this->logger->debug( __METHOD__ . ": using server $serverName for group '$group'" ); |
475 | |
476 | return $i; |
477 | } |
478 | |
479 | /** |
480 | * Get the server index chosen for DB_REPLICA connections for the given query group |
481 | * |
482 | * @param string $group Query group; use false for the generic group |
483 | * @return int Specific server index or LoadBalancer::READER_INDEX_NONE if none was chosen |
484 | */ |
485 | protected function getExistingReaderIndex( $group ) { |
486 | return $this->readIndexByGroup[$group] ?? self::READER_INDEX_NONE; |
487 | } |
488 | |
489 | /** |
490 | * Pick a server that is reachable and preferably non-lagged based on the load weights |
491 | * |
492 | * This will leave the server connection open within the pool for reuse |
493 | * |
494 | * @param array<int,int|float> $loads Map of (server index => weight) for a load group |
495 | * @return int|false Server index to use as the load group reader index; false on failure |
496 | */ |
497 | private function pickReaderIndex( array $loads ) { |
498 | if ( $loads === [] ) { |
499 | throw new InvalidArgumentException( "Load group array is empty" ); |
500 | } |
501 | |
502 | $i = false; |
503 | // Quickly look through the available servers for a server that meets criteria... |
504 | $currentLoads = $loads; |
505 | while ( count( $currentLoads ) ) { |
506 | if ( $this->waitForPos && $this->waitForPos->asOfTime() ) { |
507 | $this->logger->debug( __METHOD__ . ": session has replication position" ); |
508 | // ChronologyProtector::getSessionPrimaryPos called in loadSessionPrimaryPos() |
509 | // sets "waitForPos" for session consistency. This triggers doWait() after |
510 | // connect, so it's especially good to avoid lagged servers so as to avoid |
511 | // excessive delay in that method. |
512 | $ago = microtime( true ) - $this->waitForPos->asOfTime(); |
513 | // Aim for <= 1 second of waiting (being too picky can backfire) |
514 | $i = $this->getRandomNonLagged( $currentLoads, $ago + 1 ); |
515 | } else { |
516 | // Any server with less lag than it's 'max lag' param is preferable |
517 | $i = $this->getRandomNonLagged( $currentLoads ); |
518 | } |
519 | |
520 | if ( $i === false && count( $currentLoads ) ) { |
521 | $this->setLaggedReplicaMode(); |
522 | // All replica DBs lagged, just pick anything. |
523 | $i = ArrayUtils::pickRandom( $currentLoads ); |
524 | } |
525 | |
526 | if ( $i === false ) { |
527 | // pickRandom() returned false. |
528 | // This is permanent and means the configuration or LoadMonitor |
529 | // wants us to return false. |
530 | $this->logger->debug( __METHOD__ . ": no suitable server found" ); |
531 | return false; |
532 | } |
533 | |
534 | $serverName = $this->getServerName( $i ); |
535 | $this->logger->debug( __METHOD__ . ": connecting to $serverName..." ); |
536 | |
537 | // Get a connection to this server without triggering complementary connections |
538 | // to other servers (due to things like lag or read-only checks). We want to avoid |
539 | // the risk of overhead and recursion here. |
540 | $conn = $this->getServerConnection( $i, self::DOMAIN_ANY, self::CONN_SILENCE_ERRORS ); |
541 | if ( !$conn ) { |
542 | $this->logger->warning( __METHOD__ . ": failed connecting to $serverName" ); |
543 | unset( $currentLoads[$i] ); // avoid this server next iteration |
544 | continue; |
545 | } |
546 | |
547 | // Return this server |
548 | break; |
549 | } |
550 | |
551 | // If all servers were down, quit now |
552 | if ( $currentLoads === [] ) { |
553 | $this->logger->error( __METHOD__ . ": all servers down" ); |
554 | } |
555 | |
556 | return $i; |
557 | } |
558 | |
559 | public function waitForAll( DBPrimaryPos $pos, $timeout = null ) { |
560 | $timeout = $timeout ?: self::MAX_WAIT_DEFAULT; |
561 | |
562 | $oldPos = $this->waitForPos; |
563 | try { |
564 | $this->waitForPos = $pos; |
565 | |
566 | $failedReplicas = []; |
567 | foreach ( $this->serverInfo->getStreamingReplicaIndexes() as $i ) { |
568 | if ( $this->serverHasLoadInAnyGroup( $i ) ) { |
569 | $start = microtime( true ); |
570 | $ok = $this->awaitSessionPrimaryPos( $i, $timeout ); |
571 | if ( !$ok ) { |
572 | $failedReplicas[] = $this->getServerName( $i ); |
573 | } |
574 | $timeout -= intval( microtime( true ) - $start ); |
575 | } |
576 | } |
577 | |
578 | // Stop spamming logs when only one replica is lagging and we have 5+ replicas. |
579 | // Mediawiki automatically stops sending queries to the lagged one. |
580 | $failed = $failedReplicas && ( count( $failedReplicas ) > 1 || $this->getServerCount() < 5 ); |
581 | if ( $failed ) { |
582 | $this->logger->error( |
583 | "Timed out waiting for replication to reach {raw_pos}", |
584 | [ |
585 | 'raw_pos' => $pos->__toString(), |
586 | 'failed_hosts' => $failedReplicas, |
587 | 'timeout' => $timeout, |
588 | 'exception' => new RuntimeException() |
589 | ] |
590 | ); |
591 | } |
592 | |
593 | return !$failed; |
594 | } finally { |
595 | // Restore the old position; this is used for throttling, not lag-protection |
596 | $this->waitForPos = $oldPos; |
597 | } |
598 | } |
599 | |
600 | /** |
601 | * @param int $i Specific server index |
602 | * @return bool |
603 | */ |
604 | private function serverHasLoadInAnyGroup( $i ) { |
605 | foreach ( $this->groupLoads as $loadsByIndex ) { |
606 | if ( ( $loadsByIndex[$i] ?? 0 ) > 0 ) { |
607 | return true; |
608 | } |
609 | } |
610 | |
611 | return false; |
612 | } |
613 | |
614 | public function getAnyOpenConnection( $i, $flags = 0 ) { |
615 | $i = ( $i === self::DB_PRIMARY ) ? ServerInfo::WRITER_INDEX : $i; |
616 | $conn = false; |
617 | foreach ( $this->conns as $type => $poolConnsByServer ) { |
618 | if ( $i === self::DB_REPLICA ) { |
619 | // Consider all existing connections to any server |
620 | $applicableConnsByServer = $poolConnsByServer; |
621 | } else { |
622 | // Consider all existing connections to a specific server |
623 | $applicableConnsByServer = isset( $poolConnsByServer[$i] ) |
624 | ? [ $i => $poolConnsByServer[$i] ] |
625 | : []; |
626 | } |
627 | |
628 | $conn = $this->pickAnyOpenConnection( $applicableConnsByServer ); |
629 | if ( $conn ) { |
630 | $this->logger->debug( __METHOD__ . ": found '$type' connection to #$i." ); |
631 | break; |
632 | } |
633 | } |
634 | |
635 | if ( $conn ) { |
636 | $this->enforceConnectionFlags( $conn, $flags ); |
637 | } |
638 | |
639 | return $conn; |
640 | } |
641 | |
642 | /** |
643 | * @param Database[][] $connsByServer Map of (server index => array of DB handles) |
644 | * @return IDatabaseForOwner|false An appropriate open connection or false if none found |
645 | */ |
646 | private function pickAnyOpenConnection( array $connsByServer ) { |
647 | foreach ( $connsByServer as $i => $conns ) { |
648 | foreach ( $conns as $conn ) { |
649 | if ( !$conn->isOpen() ) { |
650 | $this->logger->warning( |
651 | __METHOD__ . |
652 | ": pooled DB handle for {db_server} (#$i) has no open connection.", |
653 | $this->getConnLogContext( $conn ) |
654 | ); |
655 | continue; // some sort of error occurred? |
656 | } |
657 | return $conn; |
658 | } |
659 | } |
660 | |
661 | return false; |
662 | } |
663 | |
664 | /** |
665 | * Wait for a given replica DB to catch up to the primary DB pos stored in "waitForPos" |
666 | * |
667 | * @see loadSessionPrimaryPos() |
668 | * |
669 | * @param int $index Specific server index |
670 | * @param int|null $timeout Max seconds to wait; default is "waitTimeout" |
671 | * @return bool Success |
672 | */ |
673 | private function awaitSessionPrimaryPos( $index, $timeout = null ) { |
674 | $timeout = max( 1, intval( $timeout ?: self::MAX_WAIT_DEFAULT ) ); |
675 | |
676 | if ( !$this->waitForPos || $index === ServerInfo::WRITER_INDEX ) { |
677 | return true; |
678 | } |
679 | |
680 | $srvName = $this->getServerName( $index ); |
681 | |
682 | // Check if we already know that the DB has reached this point |
683 | $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $srvName, 'v2' ); |
684 | |
685 | /** @var DBPrimaryPos $knownReachedPos */ |
686 | $position = $this->srvCache->get( $key ); |
687 | if ( !is_array( $position ) ) { |
688 | $knownReachedPos = null; |
689 | } else { |
690 | $class = $position['_type_']; |
691 | $knownReachedPos = $class::newFromArray( $position ); |
692 | } |
693 | if ( |
694 | $knownReachedPos instanceof DBPrimaryPos && |
695 | $knownReachedPos->hasReached( $this->waitForPos ) |
696 | ) { |
697 | $this->logger->debug( |
698 | __METHOD__ . |
699 | ": replica DB {db_server} known to be caught up (pos >= $knownReachedPos).", |
700 | [ 'db_server' => $srvName ] |
701 | ); |
702 | |
703 | return true; |
704 | } |
705 | |
706 | $close = false; // close the connection afterwards |
707 | $flags = self::CONN_SILENCE_ERRORS; |
708 | // Check if there is an existing connection that can be used |
709 | $conn = $this->getAnyOpenConnection( $index, $flags ); |
710 | if ( !$conn ) { |
711 | // Get a connection to this server without triggering complementary connections |
712 | // to other servers (due to things like lag or read-only checks). We want to avoid |
713 | // the risk of overhead and recursion here. |
714 | $conn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags ); |
715 | if ( !$conn ) { |
716 | $this->logger->warning( |
717 | __METHOD__ . ': failed to connect to {db_server}', |
718 | [ 'db_server' => $srvName ] |
719 | ); |
720 | |
721 | return false; |
722 | } |
723 | // Avoid connection spam in waitForAll() when connections |
724 | // are made just for the sake of doing this lag check. |
725 | $close = true; |
726 | } |
727 | |
728 | $this->logger->info( |
729 | __METHOD__ . |
730 | ': waiting for replica DB {db_server} to catch up...', |
731 | $this->getConnLogContext( $conn ) |
732 | ); |
733 | |
734 | $result = $conn->primaryPosWait( $this->waitForPos, $timeout ); |
735 | |
736 | $ok = ( $result !== null && $result != -1 ); |
737 | if ( $ok ) { |
738 | // Remember that the DB reached this point |
739 | $this->srvCache->set( $key, $this->waitForPos->toArray(), BagOStuff::TTL_DAY ); |
740 | } |
741 | |
742 | if ( $close ) { |
743 | $this->closeConnection( $conn ); |
744 | } |
745 | |
746 | return $ok; |
747 | } |
748 | |
749 | public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) { |
750 | if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) { |
751 | throw new UnexpectedValueException( |
752 | __METHOD__ . ' got CONN_SILENCE_ERRORS; connection is already deferred' |
753 | ); |
754 | } |
755 | |
756 | $domain = $this->resolveDomainID( $domain ); |
757 | $role = ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX ) |
758 | ? self::DB_PRIMARY |
759 | : self::DB_REPLICA; |
760 | |
761 | return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role, $this->modcount ); |
762 | } |
763 | |
764 | public function getConnectionInternal( $i, $groups = [], $domain = false, $flags = 0 ): IDatabase { |
765 | $domain = $this->resolveDomainID( $domain ); |
766 | $group = $this->resolveGroups( $groups, $i ); |
767 | $flags = $this->sanitizeConnectionFlags( $flags, $domain ); |
768 | // If given DB_PRIMARY/DB_REPLICA, resolve it to a specific server index. Resolving |
769 | // DB_REPLICA might trigger getServerConnection() calls due to the getReaderIndex() |
770 | // connectivity checks or LoadMonitor::scaleLoads() server state cache regeneration. |
771 | // The use of getServerConnection() instead of getConnection() avoids infinite loops. |
772 | $serverIndex = $i; |
773 | if ( $i === self::DB_PRIMARY ) { |
774 | $serverIndex = ServerInfo::WRITER_INDEX; |
775 | } elseif ( $i === self::DB_REPLICA ) { |
776 | $groupIndex = $this->getReaderIndex( $group ); |
777 | if ( $groupIndex !== false ) { |
778 | // Group connection succeeded |
779 | $serverIndex = $groupIndex; |
780 | } |
781 | if ( $serverIndex < 0 ) { |
782 | $this->reportConnectionError( 'could not connect to any replica DB server' ); |
783 | } |
784 | } elseif ( !$this->serverInfo->hasServerIndex( $i ) ) { |
785 | throw new UnexpectedValueException( "Invalid server index index #$i" ); |
786 | } |
787 | // Get an open connection to that server (might trigger a new connection) |
788 | return $this->getServerConnection( $serverIndex, $domain, $flags ); |
789 | } |
790 | |
791 | public function getServerConnection( $i, $domain, $flags = 0 ) { |
792 | $domainInstance = DatabaseDomain::newFromId( $domain ); |
793 | // Number of connections made before getting the server index and handle |
794 | $priorConnectionsMade = $this->connectionCounter; |
795 | // Get an open connection to this server (might trigger a new connection) |
796 | $conn = $this->reuseOrOpenConnectionForNewRef( $i, $domainInstance, $flags ); |
797 | // Throw an error or otherwise bail out if the connection attempt failed |
798 | if ( !( $conn instanceof IDatabaseForOwner ) ) { |
799 | if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) { |
800 | $this->reportConnectionError(); |
801 | } |
802 | |
803 | return false; |
804 | } |
805 | |
806 | // Profile any new connections caused by this method |
807 | if ( $this->connectionCounter > $priorConnectionsMade ) { |
808 | $this->trxProfiler->recordConnection( |
809 | $conn->getServerName(), |
810 | $conn->getDBname(), |
811 | ( $i === ServerInfo::WRITER_INDEX && $this->hasStreamingReplicaServers() ) |
812 | ); |
813 | } |
814 | |
815 | if ( !$conn->isOpen() ) { |
816 | $this->lastErrorConn = $conn; |
817 | // Connection was made but later unrecoverably lost for some reason. |
818 | // Do not return a handle that will just throw exceptions on use, but |
819 | // let the calling code, e.g. getReaderIndex(), try another server. |
820 | if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) { |
821 | $this->reportConnectionError(); |
822 | } |
823 | return false; |
824 | } |
825 | |
826 | return $conn; |
827 | } |
828 | |
829 | /** |
830 | * @deprecated since 1.39. |
831 | */ |
832 | public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ): DBConnRef { |
833 | wfDeprecated( __METHOD__, '1.39' ); |
834 | // @phan-suppress-next-line PhanTypeMismatchReturnSuperType to be removed soon |
835 | return $this->getConnection( $i, $groups, $domain, $flags ); |
836 | } |
837 | |
838 | public function getMaintenanceConnectionRef( |
839 | $i, |
840 | $groups = [], |
841 | $domain = false, |
842 | $flags = 0 |
843 | ): DBConnRef { |
844 | if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) { |
845 | throw new UnexpectedValueException( |
846 | __METHOD__ . ' CONN_SILENCE_ERRORS is not supported' |
847 | ); |
848 | } |
849 | |
850 | $domain = $this->resolveDomainID( $domain ); |
851 | $role = ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX ) |
852 | ? self::DB_PRIMARY |
853 | : self::DB_REPLICA; |
854 | |
855 | return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role, $this->modcount ); |
856 | } |
857 | |
858 | /** |
859 | * Get a live connection handle to the given domain |
860 | * |
861 | * This will reuse an existing tracked connection when possible. In some cases, this |
862 | * involves switching the DB domain of an existing handle in order to reuse it. If no |
863 | * existing handles can be reused, then a new connection will be made. |
864 | * |
865 | * @param int $i Specific server index |
866 | * @param DatabaseDomain $domain Database domain ID required by the reference |
867 | * @param int $flags Bit field of class CONN_* constants |
868 | * @return IDatabase|null Database or null on error |
869 | * @throws DBError When database selection fails |
870 | * @throws InvalidArgumentException When the server index is invalid |
871 | * @throws UnexpectedValueException When the DB domain of the connection is corrupted |
872 | * @throws DBAccessError If disable() was called |
873 | */ |
874 | private function reuseOrOpenConnectionForNewRef( $i, DatabaseDomain $domain, $flags = 0 ) { |
875 | // Figure out which connection pool to use based on the flags |
876 | if ( $this->fieldHasBit( $flags, self::CONN_UNTRACKED_GAUGE ) ) { |
877 | // Use low timeouts, use autocommit mode, ignore transaction rounds |
878 | $category = self::CATEGORY_GAUGE; |
879 | } elseif ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) { |
880 | // Use autocommit mode, ignore transaction rounds |
881 | $category = self::CATEGORY_AUTOCOMMIT; |
882 | } else { |
883 | // Respect DBO_DEFAULT, respect transaction rounds |
884 | $category = self::CATEGORY_ROUND; |
885 | } |
886 | |
887 | $conn = null; |
888 | // Reuse a free connection in the pool from any domain if possible. There should only |
889 | // be one connection in this pool unless either: |
890 | // - a) IDatabase::databasesAreIndependent() returns true (e.g. postgres) and two |
891 | // or more database domains have been used during the load balancer's lifetime |
892 | // - b) Two or more nested function calls used getConnection() on different domains. |
893 | foreach ( ( $this->conns[$category][$i] ?? [] ) as $poolConn ) { |
894 | // Check if any required DB domain changes for the new reference are possible |
895 | // Calling selectDomain() would trigger a reconnect, which will break if a |
896 | // transaction is active or if there is any other meaningful session state. |
897 | $isShareable = !( |
898 | $poolConn->databasesAreIndependent() && |
899 | $domain->getDatabase() !== null && |
900 | $domain->getDatabase() !== $poolConn->getDBname() |
901 | ); |
902 | if ( $isShareable ) { |
903 | $conn = $poolConn; |
904 | // Make any required DB domain changes for the new reference |
905 | if ( !$domain->isUnspecified() ) { |
906 | $conn->selectDomain( $domain ); |
907 | } |
908 | $this->logger->debug( __METHOD__ . ": reusing connection for $i/$domain" ); |
909 | break; |
910 | } |
911 | } |
912 | |
913 | // If necessary, try to open a new connection and add it to the pool |
914 | if ( !$conn ) { |
915 | $conn = $this->reallyOpenConnection( |
916 | $i, |
917 | $domain, |
918 | [ self::INFO_CONN_CATEGORY => $category ] |
919 | ); |
920 | if ( $conn->isOpen() ) { |
921 | // Make Database::isReadOnly() respect server-side and configuration-based |
922 | // read-only mode. Note that replica handles are always seen as read-only |
923 | // in Database::isReadOnly() and Database::assertIsWritablePrimary(). |
924 | if ( $i === ServerInfo::WRITER_INDEX ) { |
925 | if ( $this->readOnlyReason !== false ) { |
926 | $readOnlyReason = $this->readOnlyReason; |
927 | } elseif ( $this->isPrimaryRunningReadOnly( $conn ) ) { |
928 | $readOnlyReason = 'The primary database server is running in read-only mode.'; |
929 | } else { |
930 | $readOnlyReason = false; |
931 | } |
932 | $conn->setLBInfo( $conn::LB_READ_ONLY_REASON, $readOnlyReason ); |
933 | } |
934 | // Connection obtained; check if it belongs to a tracked connection category |
935 | if ( isset( $this->conns[$category] ) ) { |
936 | // Track this connection for future reuse |
937 | $this->conns[$category][$i][] = $conn; |
938 | } |
939 | } else { |
940 | $this->logger->warning( __METHOD__ . ": connection error for $i/$domain" ); |
941 | $this->lastErrorConn = $conn; |
942 | $conn = null; |
943 | } |
944 | } |
945 | |
946 | if ( $conn instanceof IDatabaseForOwner ) { |
947 | // Check to make sure that the right domain is selected |
948 | $this->assertConnectionDomain( $conn, $domain ); |
949 | // Check to make sure that the CONN_* flags are respected |
950 | $this->enforceConnectionFlags( $conn, $flags ); |
951 | } |
952 | |
953 | return $conn; |
954 | } |
955 | |
956 | /** |
957 | * Sanity check to make sure that the right domain is selected |
958 | * |
959 | * @param Database $conn |
960 | * @param DatabaseDomain $domain |
961 | * @throws DBUnexpectedError |
962 | */ |
963 | private function assertConnectionDomain( Database $conn, DatabaseDomain $domain ) { |
964 | if ( !$domain->isCompatible( $conn->getDomainID() ) ) { |
965 | throw new UnexpectedValueException( |
966 | "Got connection to '{$conn->getDomainID()}', but expected one for '{$domain}'" |
967 | ); |
968 | } |
969 | } |
970 | |
971 | public function getServerAttributes( $i ) { |
972 | return $this->databaseFactory->attributesFromType( |
973 | $this->getServerType( $i ), |
974 | $this->serverInfo->getServerDriver( $i ) |
975 | ); |
976 | } |
977 | |
978 | /** |
979 | * Open a new network connection to a server (uncached) |
980 | * |
981 | * Returns a Database object whether or not the connection was successful. |
982 | * |
983 | * @param int $i Specific server index |
984 | * @param DatabaseDomain $domain Domain the connection is for, possibly unspecified |
985 | * @param array $lbInfo Additional information for setLBInfo() |
986 | * @return Database |
987 | * @throws DBAccessError |
988 | * @throws InvalidArgumentException |
989 | */ |
990 | protected function reallyOpenConnection( $i, DatabaseDomain $domain, array $lbInfo ) { |
991 | if ( $this->disabled ) { |
992 | throw new DBAccessError(); |
993 | } |
994 | |
995 | $server = $this->serverInfo->getServerInfoStrict( $i ); |
996 | if ( $lbInfo[self::INFO_CONN_CATEGORY] === self::CATEGORY_GAUGE ) { |
997 | // Use low connection/read timeouts for connection used for gauging server health. |
998 | // Gauge information should be cached and used to avoid outages. Indefinite hanging |
999 | // while gauging servers would do the opposite. |
1000 | $server['connectTimeout'] = min( 1, $server['connectTimeout'] ?? INF ); |
1001 | $server['receiveTimeout'] = min( 1, $server['receiveTimeout'] ?? INF ); |
1002 | // Avoid implicit transactions and avoid any SET query for session variables during |
1003 | // Database::open(). If a server becomes slow, every extra query can cause significant |
1004 | // delays, even with low connect/receive timeouts. |
1005 | $server['flags'] ??= 0; |
1006 | $server['flags'] &= ~IDatabase::DBO_DEFAULT; |
1007 | $server['flags'] |= IDatabase::DBO_GAUGE; |
1008 | } else { |
1009 | // Use implicit transactions unless explicitly configured otherwise |
1010 | $server['flags'] ??= IDatabase::DBO_DEFAULT; |
1011 | } |
1012 | |
1013 | if ( !empty( $server['is static'] ) ) { |
1014 | $topologyRole = IDatabase::ROLE_STATIC_CLONE; |
1015 | } else { |
1016 | $topologyRole = ( $i === ServerInfo::WRITER_INDEX ) |
1017 | ? IDatabase::ROLE_STREAMING_MASTER |
1018 | : IDatabase::ROLE_STREAMING_REPLICA; |
1019 | } |
1020 | |
1021 | $conn = $this->databaseFactory->create( |
1022 | $server['type'], |
1023 | array_merge( $server, [ |
1024 | // Basic replication role information |
1025 | 'topologyRole' => $topologyRole, |
1026 | // Use the database specified in $domain (null means "none or entrypoint DB"); |
1027 | // fallback to the $server default if the RDBMs is an embedded library using a |
1028 | // file on disk since there would be nothing to access to without a DB/file name. |
1029 | 'dbname' => $this->getServerAttributes( $i )[Database::ATTR_DB_IS_FILE] |
1030 | ? ( $domain->getDatabase() ?? $server['dbname'] ?? null ) |
1031 | : $domain->getDatabase(), |
1032 | // Override the $server default schema with that of $domain if specified |
1033 | 'schema' => $domain->getSchema(), |
1034 | // Use the table prefix specified in $domain |
1035 | 'tablePrefix' => $domain->getTablePrefix(), |
1036 | 'srvCache' => $this->srvCache, |
1037 | 'logger' => $this->logger, |
1038 | 'errorLogger' => $this->errorLogger, |
1039 | 'trxProfiler' => $this->trxProfiler, |
1040 | 'lbInfo' => [ self::INFO_SERVER_INDEX => $i ] + $lbInfo |
1041 | ] ), |
1042 | Database::NEW_UNCONNECTED |
1043 | ); |
1044 | // Set alternative table/index names before any queries can be issued |
1045 | $conn->setTableAliases( $this->tableAliases ); |
1046 | $conn->setIndexAliases( $this->indexAliases ); |
1047 | // Account for any active transaction round and listeners |
1048 | $this->syncConnectionRoundState( $conn ); |
1049 | if ( $i === ServerInfo::WRITER_INDEX ) { |
1050 | foreach ( $this->trxRecurringCallbacks as $name => $callback ) { |
1051 | $conn->setTransactionListener( $name, $callback ); |
1052 | } |
1053 | } |
1054 | |
1055 | // Make the connection handle live |
1056 | try { |
1057 | $conn->initConnection(); |
1058 | ++$this->connectionCounter; |
1059 | } catch ( DBConnectionError $e ) { |
1060 | $this->lastErrorConn = $conn; |
1061 | // ignore; let the DB handle the logging |
1062 | } |
1063 | |
1064 | if ( $conn->isOpen() ) { |
1065 | $this->logger->debug( __METHOD__ . ": opened new connection for $i/$domain" ); |
1066 | } else { |
1067 | $this->logger->warning( |
1068 | __METHOD__ . ": connection error for $i/{db_domain}", |
1069 | [ 'db_domain' => $domain->getId() ] |
1070 | ); |
1071 | } |
1072 | |
1073 | // Log when many connection are made during a single request/script |
1074 | $count = 0; |
1075 | foreach ( $this->conns as $poolConnsByServer ) { |
1076 | foreach ( $poolConnsByServer as $serverConns ) { |
1077 | $count += count( $serverConns ); |
1078 | } |
1079 | } |
1080 | if ( $count >= self::CONN_HELD_WARN_THRESHOLD ) { |
1081 | $this->logger->warning( |
1082 | __METHOD__ . ": {connections}+ connections made (primary={primarydb})", |
1083 | $this->getConnLogContext( |
1084 | $conn, |
1085 | [ |
1086 | 'connections' => $count, |
1087 | 'primarydb' => $this->serverInfo->getPrimaryServerName(), |
1088 | 'db_domain' => $domain->getId() |
1089 | ] |
1090 | ) |
1091 | ); |
1092 | } |
1093 | |
1094 | $this->assertConnectionDomain( $conn, $domain ); |
1095 | |
1096 | return $conn; |
1097 | } |
1098 | |
1099 | /** |
1100 | * Make sure that any "waitForPos" replication positions are loaded and available |
1101 | * |
1102 | * Each load balancer cluster has up to one replication position for the session. |
1103 | * These are used when data read by queries is expected to reflect writes caused |
1104 | * by a prior request/script from the same client. |
1105 | * |
1106 | * @see awaitSessionPrimaryPos() |
1107 | */ |
1108 | private function loadSessionPrimaryPos() { |
1109 | if ( !$this->chronologyProtectorCalled && $this->chronologyProtector ) { |
1110 | $this->chronologyProtectorCalled = true; |
1111 | $pos = $this->chronologyProtector->getSessionPrimaryPos( $this ); |
1112 | $this->logger->debug( __METHOD__ . ': executed chronology callback.' ); |
1113 | if ( $pos ) { |
1114 | if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) { |
1115 | $this->waitForPos = $pos; |
1116 | } |
1117 | } |
1118 | } |
1119 | } |
1120 | |
1121 | /** |
1122 | * @param string $extraLbError Separat load balancer error |
1123 | * @throws DBConnectionError |
1124 | * @return never |
1125 | */ |
1126 | private function reportConnectionError( $extraLbError = '' ) { |
1127 | if ( $this->lastErrorConn instanceof IDatabaseForOwner ) { |
1128 | $srvName = $this->lastErrorConn->getServerName(); |
1129 | $lastDbError = $this->lastErrorConn->lastError() ?: 'unknown error'; |
1130 | |
1131 | $exception = new DBConnectionError( |
1132 | $this->lastErrorConn, |
1133 | $extraLbError |
1134 | ? "{$extraLbError}; {$lastDbError} ({$srvName})" |
1135 | : "{$lastDbError} ({$srvName})" |
1136 | ); |
1137 | |
1138 | if ( $extraLbError ) { |
1139 | $this->logger->warning( |
1140 | __METHOD__ . ": $extraLbError; {last_error} ({db_server})", |
1141 | $this->getConnLogContext( |
1142 | $this->lastErrorConn, |
1143 | [ |
1144 | 'method' => __METHOD__, |
1145 | 'last_error' => $lastDbError |
1146 | ] |
1147 | ) |
1148 | ); |
1149 | } |
1150 | } else { |
1151 | $exception = new DBConnectionError( |
1152 | null, |
1153 | $extraLbError ?: 'could not connect to the DB server' |
1154 | ); |
1155 | |
1156 | if ( $extraLbError ) { |
1157 | $this->logger->error( |
1158 | __METHOD__ . ": $extraLbError", |
1159 | [ |
1160 | 'method' => __METHOD__, |
1161 | 'last_error' => '(last connection error missing)' |
1162 | ] |
1163 | ); |
1164 | } |
1165 | } |
1166 | |
1167 | throw $exception; |
1168 | } |
1169 | |
1170 | public function getServerCount() { |
1171 | return $this->serverInfo->getServerCount(); |
1172 | } |
1173 | |
1174 | public function hasReplicaServers() { |
1175 | return $this->serverInfo->hasReplicaServers(); |
1176 | } |
1177 | |
1178 | public function hasStreamingReplicaServers() { |
1179 | return $this->serverInfo->hasStreamingReplicaServers(); |
1180 | } |
1181 | |
1182 | public function getServerName( $i ): string { |
1183 | return $this->serverInfo->getServerName( $i ); |
1184 | } |
1185 | |
1186 | public function getServerInfo( $i ) { |
1187 | return $this->serverInfo->getServerInfo( $i ); |
1188 | } |
1189 | |
1190 | public function getServerType( $i ) { |
1191 | return $this->serverInfo->getServerType( $i ); |
1192 | } |
1193 | |
1194 | public function getPrimaryPos() { |
1195 | $conn = $this->getAnyOpenConnection( ServerInfo::WRITER_INDEX ); |
1196 | if ( $conn ) { |
1197 | return $conn->getPrimaryPos(); |
1198 | } |
1199 | |
1200 | /** @var IDatabaseForOwner|null $conn */ |
1201 | $conn = $this->getConnectionInternal( ServerInfo::WRITER_INDEX, self::CONN_SILENCE_ERRORS ); |
1202 | // @phan-suppress-next-line PhanRedundantCondition |
1203 | if ( !$conn ) { |
1204 | $this->reportConnectionError(); |
1205 | } |
1206 | |
1207 | // ::getConnectionInternal() should return IDatabaseForOwner but changing signature |
1208 | // is not straightforward (being implemented in Wikibase) |
1209 | '@phan-var IDatabaseForOwner $conn'; |
1210 | try { |
1211 | return $conn->getPrimaryPos(); |
1212 | } finally { |
1213 | $this->closeConnection( $conn ); |
1214 | } |
1215 | } |
1216 | |
1217 | /** |
1218 | * Apply updated configuration. |
1219 | * |
1220 | * This only unregisters servers that were removed in the new configuration. |
1221 | * It does not register new servers nor update the group load weights. |
1222 | * |
1223 | * This invalidates any open connections. However, existing connections may continue to be |
1224 | * used while they are in an active transaction. In that case, the old connection will be |
1225 | * discarded on the first operation after the transaction is complete. The next operation |
1226 | * will use a new connection based on the new configuration. |
1227 | * |
1228 | * @internal for use by LBFactory::reconfigure() |
1229 | * |
1230 | * @see DBConnRef::ensureConnection() |
1231 | * @see LBFactory::reconfigure() |
1232 | * |
1233 | * @param array $params A database configuration array, see $wgLBFactoryConf. |
1234 | * |
1235 | * @return void |
1236 | */ |
1237 | public function reconfigure( array $params ) { |
1238 | $anyServerDepooled = false; |
1239 | |
1240 | $paramServers = $params['servers']; |
1241 | $newIndexByServerIndex = $this->serverInfo->reconfigureServers( $paramServers ); |
1242 | foreach ( $newIndexByServerIndex as $i => $ni ) { |
1243 | if ( $ni !== null ) { |
1244 | // Server still exists in the new config |
1245 | $newWeightByGroup = $paramServers[$ni]['groupLoads'] ?? []; |
1246 | $newWeightByGroup[ILoadBalancer::GROUP_GENERIC] = $paramServers[$ni]['load']; |
1247 | // Check if the server was removed from any load groups |
1248 | foreach ( $this->groupLoads as $group => $weightByIndex ) { |
1249 | if ( isset( $weightByIndex[$i] ) && !isset( $newWeightByGroup[$group] ) ) { |
1250 | // Server no longer in this load group in the new config |
1251 | $anyServerDepooled = true; |
1252 | unset( $this->groupLoads[$group][$i] ); |
1253 | } |
1254 | } |
1255 | } else { |
1256 | // Server no longer exists in the new config |
1257 | $anyServerDepooled = true; |
1258 | // Note that if the primary server is depooled and a replica server promoted |
1259 | // to new primary, then DB_PRIMARY handles will fail with server index errors |
1260 | foreach ( $this->groupLoads as $group => $loads ) { |
1261 | unset( $this->groupLoads[$group][$i] ); |
1262 | } |
1263 | } |
1264 | } |
1265 | |
1266 | if ( $anyServerDepooled ) { |
1267 | // NOTE: We could close all connection here, but some may be in the middle of |
1268 | // a transaction. So instead, we leave it to DBConnRef to close the |
1269 | // connection when it detects that the modcount has changed and no |
1270 | // transaction is open. |
1271 | $this->logger->info( 'Reconfiguring dbs!' ); |
1272 | // Unpin DB_REPLICA connection groups from server indexes |
1273 | $this->readIndexByGroup = []; |
1274 | // We could close all connection here, but some may be in the middle of a |
1275 | // transaction. So instead, we leave it to DBConnRef to close the connection |
1276 | // when it detects that the modcount has changed and no transaction is open. |
1277 | $this->conns = self::newTrackedConnectionsArray(); |
1278 | // Bump modification counter to invalidate the connections held by DBConnRef |
1279 | // instances. This will cause the next call to a method on the DBConnRef |
1280 | // to get a new connection from getConnectionInternal() |
1281 | $this->modcount++; |
1282 | } |
1283 | } |
1284 | |
1285 | public function disable( $fname = __METHOD__ ) { |
1286 | $this->closeAll( $fname ); |
1287 | $this->disabled = true; |
1288 | } |
1289 | |
1290 | public function closeAll( $fname = __METHOD__ ) { |
1291 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1292 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1293 | foreach ( $this->getOpenConnections() as $conn ) { |
1294 | $conn->close( $fname ); |
1295 | } |
1296 | |
1297 | $this->conns = self::newTrackedConnectionsArray(); |
1298 | } |
1299 | |
1300 | /** |
1301 | * Close a connection |
1302 | * |
1303 | * Using this function makes sure the LoadBalancer knows the connection is closed. |
1304 | * If you use $conn->close() directly, the load balancer won't update its state. |
1305 | */ |
1306 | private function closeConnection( IDatabaseForOwner $conn ) { |
1307 | if ( $conn instanceof DBConnRef ) { |
1308 | // Avoid calling close() but still leaving the handle in the pool |
1309 | throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' ); |
1310 | } |
1311 | |
1312 | $domain = $conn->getDomainID(); |
1313 | $serverIndex = $conn->getLBInfo( self::INFO_SERVER_INDEX ); |
1314 | if ( $serverIndex === null ) { |
1315 | throw new UnexpectedValueException( "Handle on '$domain' missing server index" ); |
1316 | } |
1317 | |
1318 | $srvName = $this->serverInfo->getServerName( $serverIndex ); |
1319 | |
1320 | $found = false; |
1321 | foreach ( $this->conns as $type => $poolConnsByServer ) { |
1322 | $key = array_search( $conn, $poolConnsByServer[$serverIndex] ?? [], true ); |
1323 | if ( $key !== false ) { |
1324 | $found = true; |
1325 | unset( $this->conns[$type][$serverIndex][$key] ); |
1326 | } |
1327 | } |
1328 | |
1329 | if ( !$found ) { |
1330 | $this->logger->warning( |
1331 | __METHOD__ . |
1332 | ": orphaned connection to database {$this->stringifyConn( $conn )} at '$srvName'." |
1333 | ); |
1334 | } |
1335 | |
1336 | $this->logger->debug( |
1337 | __METHOD__ . |
1338 | ": closing connection to database {$this->stringifyConn( $conn )} at '$srvName'." |
1339 | ); |
1340 | |
1341 | $conn->close( __METHOD__ ); |
1342 | } |
1343 | |
1344 | public function finalizePrimaryChanges( $fname = __METHOD__ ) { |
1345 | $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] ); |
1346 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1347 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1348 | |
1349 | $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise |
1350 | // Loop until callbacks stop adding callbacks on other connections |
1351 | $total = 0; |
1352 | do { |
1353 | $count = 0; // callbacks execution attempts |
1354 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1355 | // Run any pre-commit callbacks while leaving the post-commit ones suppressed. |
1356 | // Any error should cause all (peer) transactions to be rolled back together. |
1357 | $count += $conn->runOnTransactionPreCommitCallbacks(); |
1358 | } |
1359 | $total += $count; |
1360 | } while ( $count > 0 ); |
1361 | // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles |
1362 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1363 | $conn->setTrxEndCallbackSuppression( true ); |
1364 | } |
1365 | $this->trxRoundStage = self::ROUND_FINALIZED; |
1366 | |
1367 | return $total; |
1368 | } |
1369 | |
1370 | public function approvePrimaryChanges( int $maxWriteDuration, $fname = __METHOD__ ) { |
1371 | $this->assertTransactionRoundStage( self::ROUND_FINALIZED ); |
1372 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1373 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1374 | |
1375 | $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise |
1376 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1377 | // Any atomic sections should have been closed by now and there definitely should |
1378 | // not be any open transactions started by begin() from callers outside Database. |
1379 | if ( $conn->explicitTrxActive() ) { |
1380 | throw new DBTransactionError( |
1381 | $conn, |
1382 | "Explicit transaction still active; a caller might have failed to call " . |
1383 | "endAtomic() or cancelAtomic()." |
1384 | ); |
1385 | } |
1386 | // Assert that the time to replicate the transaction will be reasonable. |
1387 | // If this fails, then all DB transactions will be rollback back together. |
1388 | $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY ); |
1389 | if ( $maxWriteDuration > 0 ) { |
1390 | if ( $time > $maxWriteDuration ) { |
1391 | $humanTimeSec = round( $time, 3 ); |
1392 | throw new DBTransactionSizeError( |
1393 | $conn, |
1394 | "Transaction spent {time}s in writes, exceeding the {$maxWriteDuration}s limit", |
1395 | // Message parameters for: transaction-duration-limit-exceeded |
1396 | [ $time, $maxWriteDuration ], |
1397 | null, |
1398 | [ 'time' => $humanTimeSec ] |
1399 | ); |
1400 | } elseif ( $time > 0 ) { |
1401 | $timeMs = $time * 1000; |
1402 | $humanTimeMs = round( $timeMs, $timeMs > 1 ? 0 : 3 ); |
1403 | $this->logger->debug( |
1404 | "Transaction spent {time_ms}ms in writes, under the {$maxWriteDuration}s limit", |
1405 | [ 'time_ms' => $humanTimeMs ] |
1406 | ); |
1407 | } |
1408 | } |
1409 | // If a connection sits idle for too long it might be dropped, causing transaction |
1410 | // writes and session locks to be lost. Ping all the server connections before making |
1411 | // any attempt to commit the transactions belonging to the active transaction round. |
1412 | if ( $conn->writesOrCallbacksPending() || $conn->sessionLocksPending() ) { |
1413 | if ( !$conn->ping() ) { |
1414 | throw new DBTransactionError( |
1415 | $conn, |
1416 | "Pre-commit ping failed on server {$conn->getServerName()}" |
1417 | ); |
1418 | } |
1419 | } |
1420 | } |
1421 | $this->trxRoundStage = self::ROUND_APPROVED; |
1422 | } |
1423 | |
1424 | public function beginPrimaryChanges( $fname = __METHOD__ ) { |
1425 | if ( $this->trxRoundFname !== null ) { |
1426 | throw new DBTransactionError( |
1427 | null, |
1428 | "Transaction round '{$this->trxRoundFname}' already started" |
1429 | ); |
1430 | } |
1431 | $this->assertTransactionRoundStage( self::ROUND_CURSORY ); |
1432 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1433 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1434 | |
1435 | // Clear any empty transactions (no writes/callbacks) from the implicit round |
1436 | $this->flushPrimarySnapshots( $fname ); |
1437 | |
1438 | $this->trxRoundFname = $fname; |
1439 | $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise |
1440 | // Mark applicable handles as participating in this explicit transaction round. |
1441 | // For each of these handles, any writes and callbacks will be tied to a single |
1442 | // transaction. The (peer) handles will reject begin()/commit() calls unless they |
1443 | // are part of an en masse commit or an en masse rollback. |
1444 | foreach ( $this->getOpenConnections() as $conn ) { |
1445 | $this->syncConnectionRoundState( $conn ); |
1446 | } |
1447 | $this->trxRoundStage = self::ROUND_CURSORY; |
1448 | } |
1449 | |
1450 | public function commitPrimaryChanges( $fname = __METHOD__ ) { |
1451 | $this->assertTransactionRoundStage( self::ROUND_APPROVED ); |
1452 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1453 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1454 | |
1455 | $failures = []; |
1456 | |
1457 | $this->trxRoundFname = null; |
1458 | $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise |
1459 | // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT). |
1460 | // Note that callbacks should already be suppressed due to finalizePrimaryChanges(). |
1461 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1462 | try { |
1463 | $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS ); |
1464 | } catch ( DBError $e ) { |
1465 | ( $this->errorLogger )( $e ); |
1466 | $failures[] = "{$conn->getServerName()}: {$e->getMessage()}"; |
1467 | } |
1468 | } |
1469 | if ( $failures ) { |
1470 | throw new DBTransactionError( |
1471 | null, |
1472 | "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) ) |
1473 | ); |
1474 | } |
1475 | // Unmark handles as participating in this explicit transaction round |
1476 | foreach ( $this->getOpenConnections() as $conn ) { |
1477 | $this->syncConnectionRoundState( $conn ); |
1478 | } |
1479 | $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS; |
1480 | } |
1481 | |
1482 | public function runPrimaryTransactionIdleCallbacks( $fname = __METHOD__ ) { |
1483 | if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) { |
1484 | $type = IDatabase::TRIGGER_COMMIT; |
1485 | } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) { |
1486 | $type = IDatabase::TRIGGER_ROLLBACK; |
1487 | } else { |
1488 | throw new DBTransactionError( |
1489 | null, |
1490 | "Transaction should be in the callback stage (not '{$this->trxRoundStage}')" |
1491 | ); |
1492 | } |
1493 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1494 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1495 | |
1496 | $oldStage = $this->trxRoundStage; |
1497 | $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise |
1498 | |
1499 | // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs |
1500 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1501 | $conn->setTrxEndCallbackSuppression( false ); |
1502 | } |
1503 | |
1504 | $errors = []; |
1505 | $fname = __METHOD__; |
1506 | // Loop until callbacks stop adding callbacks on other connections |
1507 | do { |
1508 | // Run any pending callbacks for each connection... |
1509 | $count = 0; // callback execution attempts |
1510 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1511 | if ( $conn->trxLevel() ) { |
1512 | continue; // retry in the next iteration, after commit() is called |
1513 | } |
1514 | $count += $conn->runOnTransactionIdleCallbacks( $type, $errors ); |
1515 | } |
1516 | // Clear out any active transactions left over from callbacks... |
1517 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1518 | if ( $conn->writesPending() ) { |
1519 | // A callback from another handle wrote to this one and DBO_TRX is set |
1520 | $fnames = implode( ', ', $conn->pendingWriteCallers() ); |
1521 | $this->logger->info( |
1522 | "$fname: found writes pending ($fnames).", |
1523 | $this->getConnLogContext( |
1524 | $conn, |
1525 | [ 'exception' => new RuntimeException() ] |
1526 | ) |
1527 | ); |
1528 | } elseif ( $conn->trxLevel() ) { |
1529 | // A callback from another handle read from this one and DBO_TRX is set, |
1530 | // which can easily happen if there is only one DB (no replicas) |
1531 | $this->logger->debug( "$fname: found empty transaction." ); |
1532 | } |
1533 | try { |
1534 | $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS ); |
1535 | } catch ( DBError $ex ) { |
1536 | $errors[] = $ex; |
1537 | } |
1538 | } |
1539 | } while ( $count > 0 ); |
1540 | |
1541 | $this->trxRoundStage = $oldStage; |
1542 | |
1543 | return $errors[0] ?? null; |
1544 | } |
1545 | |
1546 | public function runPrimaryTransactionListenerCallbacks( $fname = __METHOD__ ) { |
1547 | if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) { |
1548 | $type = IDatabase::TRIGGER_COMMIT; |
1549 | } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) { |
1550 | $type = IDatabase::TRIGGER_ROLLBACK; |
1551 | } else { |
1552 | throw new DBTransactionError( |
1553 | null, |
1554 | "Transaction should be in the callback stage (not '{$this->trxRoundStage}')" |
1555 | ); |
1556 | } |
1557 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1558 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1559 | |
1560 | $errors = []; |
1561 | $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise |
1562 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1563 | $conn->runTransactionListenerCallbacks( $type, $errors ); |
1564 | } |
1565 | $this->trxRoundStage = self::ROUND_CURSORY; |
1566 | |
1567 | return $errors[0] ?? null; |
1568 | } |
1569 | |
1570 | public function rollbackPrimaryChanges( $fname = __METHOD__ ) { |
1571 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1572 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1573 | |
1574 | $this->trxRoundFname = null; |
1575 | $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise |
1576 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1577 | $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS ); |
1578 | } |
1579 | // Unmark handles as participating in this explicit transaction round |
1580 | foreach ( $this->getOpenConnections() as $conn ) { |
1581 | $this->syncConnectionRoundState( $conn ); |
1582 | } |
1583 | $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS; |
1584 | } |
1585 | |
1586 | public function flushPrimarySessions( $fname = __METHOD__ ) { |
1587 | $this->assertTransactionRoundStage( [ self::ROUND_CURSORY ] ); |
1588 | if ( $this->hasPrimaryChanges() ) { |
1589 | // Any transaction should have been rolled back beforehand |
1590 | throw new DBTransactionError( null, "Cannot reset session while writes are pending" ); |
1591 | } |
1592 | |
1593 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1594 | $conn->flushSession( $fname, $conn::FLUSHING_ALL_PEERS ); |
1595 | } |
1596 | } |
1597 | |
1598 | /** |
1599 | * @param string|string[] $stage |
1600 | * @throws DBTransactionError |
1601 | */ |
1602 | private function assertTransactionRoundStage( $stage ) { |
1603 | $stages = (array)$stage; |
1604 | |
1605 | if ( !in_array( $this->trxRoundStage, $stages, true ) ) { |
1606 | $stageList = implode( |
1607 | '/', |
1608 | array_map( static function ( $v ) { |
1609 | return "'$v'"; |
1610 | }, $stages ) |
1611 | ); |
1612 | throw new DBTransactionError( |
1613 | null, |
1614 | "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')" |
1615 | ); |
1616 | } |
1617 | } |
1618 | |
1619 | /** |
1620 | * Make connections with DBO_DEFAULT/DBO_TRX set join any active transaction round |
1621 | * |
1622 | * Some servers may have neither flag enabled, meaning that they opt out of such |
1623 | * transaction rounds and remain in auto-commit mode. Such behavior might be desired |
1624 | * when a DB server is used for something like simple key/value storage. |
1625 | * |
1626 | * @param Database $conn |
1627 | */ |
1628 | private function syncConnectionRoundState( Database $conn ) { |
1629 | if ( $conn->getLBInfo( self::INFO_CONN_CATEGORY ) !== self::CATEGORY_ROUND ) { |
1630 | return; // transaction rounds do not apply to these connections |
1631 | } |
1632 | |
1633 | if ( $this->trxRoundFname !== null ) { |
1634 | // Explicit transaction round |
1635 | $trxRoundLevel = 1; |
1636 | } else { |
1637 | // Implicit auto-commit round (cli mode) or implicit transaction round (web mode) |
1638 | $trxRoundLevel = ( $this->cliMode ? 0 : 1 ); |
1639 | } |
1640 | |
1641 | // CATEGORY_ROUND DB handles with DBO_DEFAULT are considered "round aware" and the |
1642 | // load balancer will take over the logic of when DBO_TRX is set for such DB handles. |
1643 | // DBO_TRX will be set only when a transaction round is active (explicit or implicit). |
1644 | if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) { |
1645 | if ( $trxRoundLevel ) { |
1646 | // Wrap queries in a transaction joined to the active transaction round |
1647 | $conn->setFlag( $conn::DBO_TRX ); |
1648 | } else { |
1649 | // Do not wrap queries in a transaction (no active transaction round) |
1650 | $conn->clearFlag( $conn::DBO_TRX ); |
1651 | } |
1652 | } |
1653 | |
1654 | // Note that DBO_TRX is normally only set above or by DatabaseFactory applying |
1655 | // DBO_DEFAULT. However, integration tests might directly force DBO_TRX in the |
1656 | // server configuration arrays. Such handles will still be flushed during calls |
1657 | // to {@link LoadBalancer::commitPrimaryChanges()}. |
1658 | if ( $conn->getFlag( $conn::DBO_TRX ) ) { |
1659 | // DB handle is participating in the active transction round |
1660 | $conn->setLBInfo( $conn::LB_TRX_ROUND_LEVEL, $trxRoundLevel ); |
1661 | $conn->setLBInfo( $conn::LB_TRX_ROUND_FNAME, $this->trxRoundFname ); |
1662 | } else { |
1663 | // DB handle is not participating in any transction rounds |
1664 | $conn->setLBInfo( $conn::LB_TRX_ROUND_LEVEL, 0 ); |
1665 | $conn->setLBInfo( $conn::LB_TRX_ROUND_FNAME, null ); |
1666 | } |
1667 | } |
1668 | |
1669 | public function flushReplicaSnapshots( $fname = __METHOD__ ) { |
1670 | foreach ( $this->conns as $poolConnsByServer ) { |
1671 | foreach ( $poolConnsByServer as $serverIndex => $serverConns ) { |
1672 | if ( $serverIndex === ServerInfo::WRITER_INDEX ) { |
1673 | continue; // skip primary |
1674 | } |
1675 | foreach ( $serverConns as $conn ) { |
1676 | $conn->flushSnapshot( $fname ); |
1677 | } |
1678 | } |
1679 | } |
1680 | } |
1681 | |
1682 | public function flushPrimarySnapshots( $fname = __METHOD__ ) { |
1683 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1684 | $conn->flushSnapshot( $fname ); |
1685 | } |
1686 | } |
1687 | |
1688 | public function hasPrimaryConnection() { |
1689 | return (bool)$this->getAnyOpenConnection( ServerInfo::WRITER_INDEX ); |
1690 | } |
1691 | |
1692 | public function hasPrimaryChanges() { |
1693 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1694 | if ( $conn->writesOrCallbacksPending() ) { |
1695 | return true; |
1696 | } |
1697 | } |
1698 | |
1699 | return false; |
1700 | } |
1701 | |
1702 | public function lastPrimaryChangeTimestamp() { |
1703 | $lastTime = null; |
1704 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1705 | $lastTime = max( $lastTime, $conn->lastDoneWrites() ); |
1706 | } |
1707 | |
1708 | return $lastTime; |
1709 | } |
1710 | |
1711 | public function hasOrMadeRecentPrimaryChanges( $age = null ) { |
1712 | $age ??= self::MAX_WAIT_DEFAULT; |
1713 | |
1714 | return ( $this->hasPrimaryChanges() |
1715 | || $this->lastPrimaryChangeTimestamp() > microtime( true ) - $age ); |
1716 | } |
1717 | |
1718 | public function pendingPrimaryChangeCallers() { |
1719 | $fnames = []; |
1720 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1721 | $fnames = array_merge( $fnames, $conn->pendingWriteCallers() ); |
1722 | } |
1723 | |
1724 | return $fnames; |
1725 | } |
1726 | |
1727 | public function explicitTrxActive() { |
1728 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1729 | if ( $conn->explicitTrxActive() ) { |
1730 | return true; |
1731 | } |
1732 | } |
1733 | return false; |
1734 | } |
1735 | |
1736 | private function setLaggedReplicaMode(): void { |
1737 | $this->laggedReplicaMode = true; |
1738 | $this->logger->warning( __METHOD__ . ": setting lagged replica mode" ); |
1739 | } |
1740 | |
1741 | public function laggedReplicaUsed() { |
1742 | return $this->laggedReplicaMode; |
1743 | } |
1744 | |
1745 | public function getReadOnlyReason() { |
1746 | if ( $this->readOnlyReason !== false ) { |
1747 | return $this->readOnlyReason; |
1748 | } elseif ( $this->isPrimaryRunningReadOnly() ) { |
1749 | return 'The primary database server is running in read-only mode.'; |
1750 | } |
1751 | |
1752 | return false; |
1753 | } |
1754 | |
1755 | /** |
1756 | * @note This method suppresses DBError exceptions in order to avoid severe downtime |
1757 | * @param IDatabaseForOwner|null $conn Recently acquired primary connection; null if not applicable |
1758 | * @return bool Whether the entire primary DB server or the local domain DB is read-only |
1759 | */ |
1760 | private function isPrimaryRunningReadOnly( ?IDatabaseForOwner $conn = null ) { |
1761 | // Context will often be HTTP GET/HEAD; heavily cache the results |
1762 | return (bool)$this->wanCache->getWithSetCallback( |
1763 | // Note that table prefixes are not related to server-side read-only mode |
1764 | $this->wanCache->makeGlobalKey( |
1765 | 'rdbms-server-readonly', |
1766 | $this->serverInfo->getPrimaryServerName() |
1767 | ), |
1768 | self::TTL_CACHE_READONLY, |
1769 | function ( $oldValue ) use ( $conn ) { |
1770 | $scope = $this->trxProfiler->silenceForScope(); |
1771 | $conn ??= $this->getServerConnection( |
1772 | ServerInfo::WRITER_INDEX, |
1773 | self::DOMAIN_ANY, |
1774 | self::CONN_SILENCE_ERRORS |
1775 | ); |
1776 | if ( $conn ) { |
1777 | try { |
1778 | $value = (int)$conn->serverIsReadOnly(); |
1779 | } catch ( DBError $e ) { |
1780 | $value = is_int( $oldValue ) ? $oldValue : 0; |
1781 | } |
1782 | } else { |
1783 | $value = 0; |
1784 | } |
1785 | ScopedCallback::consume( $scope ); |
1786 | |
1787 | return $value; |
1788 | }, |
1789 | [ |
1790 | 'busyValue' => 0, |
1791 | 'pcTTL' => WANObjectCache::TTL_PROC_LONG |
1792 | ] |
1793 | ); |
1794 | } |
1795 | |
1796 | public function pingAll() { |
1797 | $success = true; |
1798 | foreach ( $this->getOpenConnections() as $conn ) { |
1799 | if ( !$conn->ping() ) { |
1800 | $success = false; |
1801 | } |
1802 | } |
1803 | |
1804 | return $success; |
1805 | } |
1806 | |
1807 | /** |
1808 | * Get all open connections |
1809 | * @return \Generator|Database[] |
1810 | */ |
1811 | private function getOpenConnections() { |
1812 | foreach ( $this->conns as $poolConnsByServer ) { |
1813 | foreach ( $poolConnsByServer as $serverConns ) { |
1814 | foreach ( $serverConns as $conn ) { |
1815 | yield $conn; |
1816 | } |
1817 | } |
1818 | } |
1819 | } |
1820 | |
1821 | /** |
1822 | * Get all open primary connections |
1823 | * @return \Generator|Database[] |
1824 | */ |
1825 | private function getOpenPrimaryConnections() { |
1826 | foreach ( $this->conns as $poolConnsByServer ) { |
1827 | /** @var IDatabaseForOwner $conn */ |
1828 | foreach ( ( $poolConnsByServer[ServerInfo::WRITER_INDEX] ?? [] ) as $conn ) { |
1829 | yield $conn; |
1830 | } |
1831 | } |
1832 | } |
1833 | |
1834 | public function getMaxLag() { |
1835 | $host = ''; |
1836 | $maxLag = -1; |
1837 | $maxIndex = 0; |
1838 | |
1839 | if ( $this->serverInfo->hasReplicaServers() ) { |
1840 | $lagTimes = $this->getLagTimes(); |
1841 | foreach ( $lagTimes as $i => $lag ) { |
1842 | // Allowing the value to be unset due to stale cache (T361824) |
1843 | $load = $this->groupLoads[self::GROUP_GENERIC][$i] ?? 0; |
1844 | if ( $load > 0 && $lag > $maxLag ) { |
1845 | $maxLag = $lag; |
1846 | $host = $this->serverInfo->getServerInfoStrict( $i, 'host' ); |
1847 | $maxIndex = $i; |
1848 | } |
1849 | } |
1850 | } |
1851 | |
1852 | return [ $host, $maxLag, $maxIndex ]; |
1853 | } |
1854 | |
1855 | public function getLagTimes() { |
1856 | if ( !$this->hasReplicaServers() ) { |
1857 | return [ ServerInfo::WRITER_INDEX => 0 ]; // no replication = no lag |
1858 | } |
1859 | $fname = __METHOD__; |
1860 | return $this->wanCache->getWithSetCallback( |
1861 | $this->wanCache->makeGlobalKey( 'rdbms-lags', $this->getClusterName() ), |
1862 | // Add jitter to avoid stampede |
1863 | 10 + mt_rand( 1, 10 ), |
1864 | function () use ( $fname ) { |
1865 | $lags = []; |
1866 | foreach ( $this->serverInfo->getStreamingReplicaIndexes() as $i ) { |
1867 | $conn = $this->getServerConnection( |
1868 | $i, |
1869 | self::DOMAIN_ANY, |
1870 | self::CONN_SILENCE_ERRORS | self::CONN_UNTRACKED_GAUGE |
1871 | ); |
1872 | if ( $conn ) { |
1873 | $lags[$i] = $conn->getLag(); |
1874 | $conn->close( $fname ); |
1875 | } else { |
1876 | $lags[$i] = false; |
1877 | } |
1878 | } |
1879 | return $lags; |
1880 | }, |
1881 | [ 'lockTSE' => 30 ] |
1882 | ); |
1883 | } |
1884 | |
1885 | public function waitForPrimaryPos( IDatabase $conn ) { |
1886 | if ( $conn->getLBInfo( self::INFO_SERVER_INDEX ) === ServerInfo::WRITER_INDEX ) { |
1887 | return true; // not a replica DB server |
1888 | } |
1889 | |
1890 | // Get the current primary DB position, opening a connection only if needed |
1891 | $flags = self::CONN_SILENCE_ERRORS; |
1892 | $primaryConn = $this->getAnyOpenConnection( ServerInfo::WRITER_INDEX, $flags ); |
1893 | if ( $primaryConn ) { |
1894 | $pos = $primaryConn->getPrimaryPos(); |
1895 | } else { |
1896 | $primaryConn = $this->getServerConnection( ServerInfo::WRITER_INDEX, self::DOMAIN_ANY, $flags ); |
1897 | if ( !$primaryConn ) { |
1898 | throw new DBReplicationWaitError( |
1899 | null, |
1900 | "Could not obtain a primary database connection to get the position" |
1901 | ); |
1902 | } |
1903 | $pos = $primaryConn->getPrimaryPos(); |
1904 | $this->closeConnection( $primaryConn ); |
1905 | } |
1906 | |
1907 | if ( $pos instanceof DBPrimaryPos && $conn instanceof IDatabaseForOwner ) { |
1908 | $this->logger->debug( __METHOD__ . ': waiting' ); |
1909 | $result = $conn->primaryPosWait( $pos, self::MAX_WAIT_DEFAULT ); |
1910 | $ok = ( $result !== null && $result != -1 ); |
1911 | if ( $ok ) { |
1912 | $this->logger->debug( __METHOD__ . ': done waiting (success)' ); |
1913 | } else { |
1914 | $this->logger->debug( __METHOD__ . ': done waiting (failure)' ); |
1915 | } |
1916 | } else { |
1917 | $ok = false; // something is misconfigured |
1918 | $this->logger->error( |
1919 | __METHOD__ . ': could not get primary pos for {db_server}', |
1920 | $this->getConnLogContext( $conn, [ 'exception' => new RuntimeException() ] ) |
1921 | ); |
1922 | } |
1923 | |
1924 | return $ok; |
1925 | } |
1926 | |
1927 | public function setTransactionListener( $name, ?callable $callback = null ) { |
1928 | if ( $callback ) { |
1929 | $this->trxRecurringCallbacks[$name] = $callback; |
1930 | } else { |
1931 | unset( $this->trxRecurringCallbacks[$name] ); |
1932 | } |
1933 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1934 | $conn->setTransactionListener( $name, $callback ); |
1935 | } |
1936 | } |
1937 | |
1938 | public function setTableAliases( array $aliases ) { |
1939 | $this->tableAliases = $aliases; |
1940 | } |
1941 | |
1942 | public function setIndexAliases( array $aliases ) { |
1943 | $this->indexAliases = $aliases; |
1944 | } |
1945 | |
1946 | public function setDomainAliases( array $aliases ) { |
1947 | $this->domainAliases = $aliases; |
1948 | } |
1949 | |
1950 | public function setLocalDomainPrefix( $prefix ) { |
1951 | $oldLocalDomain = $this->localDomain; |
1952 | $this->localDomain = new DatabaseDomain( |
1953 | $this->localDomain->getDatabase(), |
1954 | $this->localDomain->getSchema(), |
1955 | $prefix |
1956 | ); |
1957 | |
1958 | // Update the prefix for existing connections. |
1959 | // Existing DBConnRef handles will not be affected. |
1960 | foreach ( $this->getOpenConnections() as $conn ) { |
1961 | if ( $oldLocalDomain->equals( $conn->getDomainID() ) ) { |
1962 | $conn->tablePrefix( $prefix ); |
1963 | } |
1964 | } |
1965 | } |
1966 | |
1967 | public function redefineLocalDomain( $domain ) { |
1968 | $this->closeAll( __METHOD__ ); |
1969 | $this->localDomain = DatabaseDomain::newFromId( $domain ); |
1970 | } |
1971 | |
1972 | public function setTempTablesOnlyMode( $value, $domain ) { |
1973 | $old = $this->tempTablesOnlyMode[$domain] ?? false; |
1974 | if ( $value ) { |
1975 | $this->tempTablesOnlyMode[$domain] = true; |
1976 | } else { |
1977 | unset( $this->tempTablesOnlyMode[$domain] ); |
1978 | } |
1979 | |
1980 | return $old; |
1981 | } |
1982 | |
1983 | /** |
1984 | * @param IDatabase $conn |
1985 | * @return string Description of a connection handle for log messages |
1986 | * @throws InvalidArgumentException |
1987 | */ |
1988 | private function stringifyConn( IDatabase $conn ) { |
1989 | return $conn->getLBInfo( self::INFO_SERVER_INDEX ) . '/' . $conn->getDomainID(); |
1990 | } |
1991 | |
1992 | /** |
1993 | * @param int $flags A bitfield of flags |
1994 | * @param int $bit Bit flag constant |
1995 | * @return bool Whether the bit field has the specified bit flag set |
1996 | */ |
1997 | private function fieldHasBit( int $flags, int $bit ) { |
1998 | return ( ( $flags & $bit ) === $bit ); |
1999 | } |
2000 | |
2001 | /** |
2002 | * Create a log context to pass to PSR-3 logger functions. |
2003 | * |
2004 | * @param IDatabase $conn |
2005 | * @param array $extras Additional data to add to context |
2006 | * @return array |
2007 | */ |
2008 | protected function getConnLogContext( IDatabase $conn, array $extras = [] ) { |
2009 | return array_merge( |
2010 | [ |
2011 | 'db_server' => $conn->getServerName(), |
2012 | 'db_domain' => $conn->getDomainID() |
2013 | ], |
2014 | $extras |
2015 | ); |
2016 | } |
2017 | |
2018 | /** |
2019 | * @param float|null &$time Mock UNIX timestamp for testing |
2020 | * @internal |
2021 | * @codeCoverageIgnore |
2022 | */ |
2023 | public function setMockTime( &$time ) { |
2024 | $this->loadMonitor->setMockTime( $time ); |
2025 | } |
2026 | } |