Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
56.46% |
498 / 882 |
|
32.93% |
27 / 82 |
CRAP | |
0.00% |
0 / 1 |
LoadBalancer | |
56.46% |
498 / 882 |
|
32.93% |
27 / 82 |
8932.80 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
configure | |
88.64% |
39 / 44 |
|
0.00% |
0 / 1 |
12.21 | |||
newTrackedConnectionsArray | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
1 | |||
getClusterName | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getLocalDomainID | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
resolveDomainID | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
resolveDomainInstance | |
100.00% |
13 / 13 |
|
100.00% |
1 / 1 |
6 | |||
resolveGroups | |
73.33% |
11 / 15 |
|
0.00% |
0 / 1 |
14.73 | |||
sanitizeConnectionFlags | |
81.82% |
9 / 11 |
|
0.00% |
0 / 1 |
6.22 | |||
enforceConnectionFlags | |
50.00% |
4 / 8 |
|
0.00% |
0 / 1 |
6.00 | |||
getRandomNonLagged | |
45.45% |
10 / 22 |
|
0.00% |
0 / 1 |
14.95 | |||
getReaderIndex | |
78.26% |
18 / 23 |
|
0.00% |
0 / 1 |
8.66 | |||
getExistingReaderIndex | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
pickReaderIndex | |
55.56% |
15 / 27 |
|
0.00% |
0 / 1 |
18.78 | |||
waitForAll | |
0.00% |
0 / 24 |
|
0.00% |
0 / 1 |
72 | |||
serverHasLoadInAnyGroup | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
getAnyOpenConnection | |
93.33% |
14 / 15 |
|
0.00% |
0 / 1 |
7.01 | |||
pickAnyOpenConnection | |
45.45% |
5 / 11 |
|
0.00% |
0 / 1 |
6.60 | |||
awaitSessionPrimaryPos | |
7.14% |
3 / 42 |
|
0.00% |
0 / 1 |
127.29 | |||
getConnection | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getConnectionInternal | |
93.33% |
14 / 15 |
|
0.00% |
0 / 1 |
6.01 | |||
getServerConnection | |
73.68% |
14 / 19 |
|
0.00% |
0 / 1 |
6.66 | |||
reuseConnection | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getConnectionRef | |
66.67% |
6 / 9 |
|
0.00% |
0 / 1 |
4.59 | |||
getMaintenanceConnectionRef | |
55.56% |
5 / 9 |
|
0.00% |
0 / 1 |
5.40 | |||
reuseOrOpenConnectionForNewRef | |
92.68% |
38 / 41 |
|
0.00% |
0 / 1 |
15.09 | |||
assertConnectionDomain | |
25.00% |
1 / 4 |
|
0.00% |
0 / 1 |
3.69 | |||
getServerAttributes | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
1 | |||
reallyOpenConnection | |
79.10% |
53 / 67 |
|
0.00% |
0 / 1 |
15.79 | |||
loadSessionPrimaryPos | |
71.43% |
5 / 7 |
|
0.00% |
0 / 1 |
6.84 | |||
reportConnectionError | |
60.61% |
20 / 33 |
|
0.00% |
0 / 1 |
10.00 | |||
getWriterIndex | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getServerCount | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
hasReplicaServers | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
hasStreamingReplicaServers | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getServerName | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getServerInfo | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getServerType | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getPrimaryPos | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 | |||
reconfigure | |
89.47% |
17 / 19 |
|
0.00% |
0 / 1 |
8.07 | |||
disable | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
closeAll | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
2 | |||
closeConnection | |
0.00% |
0 / 23 |
|
0.00% |
0 / 1 |
42 | |||
finalizePrimaryChanges | |
100.00% |
13 / 13 |
|
100.00% |
1 / 1 |
3 | |||
approvePrimaryChanges | |
28.57% |
10 / 35 |
|
0.00% |
0 / 1 |
46.44 | |||
beginPrimaryChanges | |
69.23% |
9 / 13 |
|
0.00% |
0 / 1 |
3.26 | |||
commitPrimaryChanges | |
65.00% |
13 / 20 |
|
0.00% |
0 / 1 |
7.54 | |||
runPrimaryTransactionIdleCallbacks | |
52.63% |
20 / 38 |
|
0.00% |
0 / 1 |
20.63 | |||
runPrimaryTransactionListenerCallbacks | |
60.00% |
9 / 15 |
|
0.00% |
0 / 1 |
5.02 | |||
rollbackPrimaryChanges | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
20 | |||
flushPrimarySessions | |
80.00% |
4 / 5 |
|
0.00% |
0 / 1 |
3.07 | |||
assertTransactionRoundStage | |
16.67% |
2 / 12 |
|
0.00% |
0 / 1 |
4.31 | |||
applyTransactionRoundFlags | |
83.33% |
5 / 6 |
|
0.00% |
0 / 1 |
4.07 | |||
undoTransactionRoundFlags | |
83.33% |
5 / 6 |
|
0.00% |
0 / 1 |
4.07 | |||
flushReplicaSnapshots | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
30 | |||
flushPrimarySnapshots | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
2 | |||
hasPrimaryConnection | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
hasPrimaryChanges | |
75.00% |
3 / 4 |
|
0.00% |
0 / 1 |
3.14 | |||
lastPrimaryChangeTimestamp | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
hasOrMadeRecentPrimaryChanges | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
pendingPrimaryChangeCallers | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
2 | |||
explicitTrxActive | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
setLaggedReplicaMode | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
laggedReplicaUsed | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getReadOnlyReason | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
12 | |||
isPrimaryRunningReadOnly | |
92.31% |
24 / 26 |
|
0.00% |
0 / 1 |
4.01 | |||
pingAll | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
12 | |||
getOpenConnections | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
4 | |||
getOpenPrimaryConnections | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
3 | |||
getMaxLag | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
30 | |||
getLagTimes | |
90.48% |
19 / 21 |
|
0.00% |
0 / 1 |
4.01 | |||
waitForPrimaryPos | |
0.00% |
0 / 27 |
|
0.00% |
0 / 1 |
56 | |||
setTransactionListener | |
80.00% |
4 / 5 |
|
0.00% |
0 / 1 |
3.07 | |||
setTableAliases | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
setIndexAliases | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
setDomainAliases | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
setLocalDomainPrefix | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 | |||
redefineLocalDomain | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
setTempTablesOnlyMode | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
stringifyConn | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
fieldHasBit | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getConnLogContext | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
2 | |||
setMockTime | n/a |
0 / 0 |
n/a |
0 / 0 |
1 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | namespace Wikimedia\Rdbms; |
21 | |
22 | use ArrayUtils; |
23 | use BagOStuff; |
24 | use EmptyBagOStuff; |
25 | use InvalidArgumentException; |
26 | use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface; |
27 | use LogicException; |
28 | use NullStatsdDataFactory; |
29 | use Psr\Log\LoggerInterface; |
30 | use Psr\Log\NullLogger; |
31 | use RuntimeException; |
32 | use Throwable; |
33 | use UnexpectedValueException; |
34 | use WANObjectCache; |
35 | use Wikimedia\ScopedCallback; |
36 | |
37 | /** |
38 | * @see ILoadBalancer |
39 | * @ingroup Database |
40 | */ |
41 | class LoadBalancer implements ILoadBalancerForOwner { |
42 | /** @var ILoadMonitor */ |
43 | private $loadMonitor; |
44 | /** @var BagOStuff */ |
45 | private $srvCache; |
46 | /** @var WANObjectCache */ |
47 | private $wanCache; |
48 | /** @var DatabaseFactory */ |
49 | private $databaseFactory; |
50 | |
51 | /** @var TransactionProfiler */ |
52 | private $trxProfiler; |
53 | /** @var StatsdDataFactoryInterface */ |
54 | private $statsd; |
55 | /** @var LoggerInterface */ |
56 | private $logger; |
57 | /** @var callable Exception logger */ |
58 | private $errorLogger; |
59 | /** @var DatabaseDomain Local DB domain ID and default for new connections */ |
60 | private $localDomain; |
61 | |
62 | /** @var array<string,array<int,Database[]>> Map of (connection pool => server index => Database[]) */ |
63 | private $conns; |
64 | |
65 | /** @var string|null The name of the DB cluster */ |
66 | private $clusterName; |
67 | /** @var ServerInfo */ |
68 | private $serverInfo; |
69 | /** @var array[] Map of (group => server index => weight) */ |
70 | private $groupLoads; |
71 | /** @var string|null Default query group to use with getConnection() */ |
72 | private $defaultGroup; |
73 | |
74 | /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */ |
75 | private $tableAliases = []; |
76 | /** @var string[] Map of (index alias => index) */ |
77 | private $indexAliases = []; |
78 | /** @var DatabaseDomain[]|string[] Map of (domain alias => DB domain) */ |
79 | private $domainAliases = []; |
80 | /** @var callable[] Map of (name => callable) */ |
81 | private $trxRecurringCallbacks = []; |
82 | /** @var bool[] Map of (domain => whether to use "temp tables only" mode) */ |
83 | private $tempTablesOnlyMode = []; |
84 | |
85 | /** @var string|false Explicit DBO_TRX transaction round active or false if none */ |
86 | private $trxRoundId = false; |
87 | /** @var string Stage of the current transaction round in the transaction round life-cycle */ |
88 | private $trxRoundStage = self::ROUND_CURSORY; |
89 | /** @var int[] The group replica server indexes keyed by group */ |
90 | private $readIndexByGroup = []; |
91 | /** @var DBPrimaryPos|null Replication sync position or false if not set */ |
92 | private $waitForPos; |
93 | /** @var bool Whether a lagged replica DB was used */ |
94 | private $laggedReplicaMode = false; |
95 | /** @var string|false Reason this instance is read-only or false if not */ |
96 | private $readOnlyReason = false; |
97 | /** @var int Total number of new connections ever made with this instance */ |
98 | private $connectionCounter = 0; |
99 | /** @var bool */ |
100 | private $disabled = false; |
101 | private ?ChronologyProtector $chronologyProtector = null; |
102 | /** @var bool Whether the session consistency callback already executed */ |
103 | private $chronologyProtectorCalled = false; |
104 | |
105 | /** @var Database|null The last connection handle that caused a problem */ |
106 | private $lastErrorConn; |
107 | |
108 | /** @var DatabaseDomain[] Map of (domain ID => domain instance) */ |
109 | private $nonLocalDomainCache = []; |
110 | |
111 | /** |
112 | * @var int Modification counter for invalidating connections held by |
113 | * DBConnRef instances. This is bumped by reconfigure(). |
114 | */ |
115 | private $modcount = 0; |
116 | |
117 | /** The "server index" LB info key; see {@link IDatabase::getLBInfo()} */ |
118 | private const INFO_SERVER_INDEX = 'serverIndex'; |
119 | /** The "connection category" LB info key; see {@link IDatabase::getLBInfo()} */ |
120 | private const INFO_CONN_CATEGORY = 'connCategory'; |
121 | |
122 | /** |
123 | * Default 'maxLag' when unspecified |
124 | * @internal Only for use within LoadBalancer/LoadMonitor |
125 | */ |
126 | public const MAX_LAG_DEFAULT = ServerInfo::MAX_LAG_DEFAULT; |
127 | |
128 | /** Warn when this many connection are held */ |
129 | private const CONN_HELD_WARN_THRESHOLD = 10; |
130 | |
131 | /** Default 'waitTimeout' when unspecified */ |
132 | private const MAX_WAIT_DEFAULT = 10; |
133 | /** Seconds to cache primary DB server read-only status */ |
134 | private const TTL_CACHE_READONLY = 5; |
135 | |
136 | /** A category of connections that are tracked and transaction round aware */ |
137 | private const CATEGORY_ROUND = 'round'; |
138 | /** A category of connections that are tracked and in autocommit-mode */ |
139 | private const CATEGORY_AUTOCOMMIT = 'auto-commit'; |
140 | /** A category of connections that are untracked and in gauge-mode */ |
141 | private const CATEGORY_GAUGE = 'gauge'; |
142 | |
143 | /** Transaction round, explicit or implicit, has not finished writing */ |
144 | private const ROUND_CURSORY = 'cursory'; |
145 | /** Transaction round writes are complete and ready for pre-commit checks */ |
146 | private const ROUND_FINALIZED = 'finalized'; |
147 | /** Transaction round passed final pre-commit checks */ |
148 | private const ROUND_APPROVED = 'approved'; |
149 | /** Transaction round was committed and post-commit callbacks must be run */ |
150 | private const ROUND_COMMIT_CALLBACKS = 'commit-callbacks'; |
151 | /** Transaction round was rolled back and post-rollback callbacks must be run */ |
152 | private const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks'; |
153 | /** Transaction round encountered an error */ |
154 | private const ROUND_ERROR = 'error'; |
155 | |
156 | /** @var int Idiom for getExistingReaderIndex() meaning "no index selected" */ |
157 | private const READER_INDEX_NONE = -1; |
158 | |
159 | public function __construct( array $params ) { |
160 | $this->configure( $params ); |
161 | |
162 | $this->conns = self::newTrackedConnectionsArray(); |
163 | } |
164 | |
165 | /** |
166 | * @param array $params A database configuration array, see $wgLBFactoryConf. |
167 | * |
168 | * @return void |
169 | */ |
170 | protected function configure( array $params ): void { |
171 | $this->localDomain = isset( $params['localDomain'] ) |
172 | ? DatabaseDomain::newFromId( $params['localDomain'] ) |
173 | : DatabaseDomain::newUnspecified(); |
174 | |
175 | $this->serverInfo = new ServerInfo(); |
176 | $this->groupLoads = [ self::GROUP_GENERIC => [] ]; |
177 | foreach ( $this->serverInfo->normalizeServerMaps( $params['servers'] ?? [] ) as $i => $server ) { |
178 | $this->serverInfo->addServer( $i, $server ); |
179 | foreach ( $server['groupLoads'] as $group => $weight ) { |
180 | $this->groupLoads[$group][$i] = $weight; |
181 | } |
182 | $this->groupLoads[self::GROUP_GENERIC][$i] = $server['load']; |
183 | } |
184 | |
185 | if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) { |
186 | $this->readOnlyReason = $params['readOnlyReason']; |
187 | } |
188 | |
189 | $this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff(); |
190 | $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty(); |
191 | |
192 | // Note: this parameter is normally absent. It is injectable for testing purposes only. |
193 | $this->databaseFactory = $params['databaseFactory'] ?? new DatabaseFactory( $params ); |
194 | |
195 | $this->errorLogger = $params['errorLogger'] ?? static function ( Throwable $e ) { |
196 | trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING ); |
197 | }; |
198 | $this->logger = $params['logger'] ?? new NullLogger(); |
199 | |
200 | $this->clusterName = $params['clusterName'] ?? null; |
201 | $this->trxProfiler = $params['trxProfiler'] ?? new TransactionProfiler(); |
202 | $this->statsd = $params['statsdDataFactory'] ?? new NullStatsdDataFactory(); |
203 | |
204 | // Set up LoadMonitor |
205 | $loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => 'LoadMonitorNull' ]; |
206 | $loadMonitorConfig += [ 'maxConnCount' => 500 ]; |
207 | $compat = [ |
208 | 'LoadMonitor' => LoadMonitor::class, |
209 | 'LoadMonitorNull' => LoadMonitorNull::class |
210 | ]; |
211 | $class = $loadMonitorConfig['class']; |
212 | if ( isset( $compat[$class] ) ) { |
213 | $class = $compat[$class]; |
214 | } |
215 | $this->loadMonitor = new $class( |
216 | $this, $this->srvCache, $this->wanCache, $loadMonitorConfig ); |
217 | $this->loadMonitor->setLogger( $this->logger ); |
218 | $this->loadMonitor->setStatsdDataFactory( $this->statsd ); |
219 | |
220 | if ( isset( $params['chronologyProtector'] ) ) { |
221 | $this->chronologyProtector = $params['chronologyProtector']; |
222 | } |
223 | |
224 | if ( isset( $params['roundStage'] ) ) { |
225 | if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) { |
226 | $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS; |
227 | } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) { |
228 | $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS; |
229 | } |
230 | } |
231 | |
232 | $group = $params['defaultGroup'] ?? self::GROUP_GENERIC; |
233 | $this->defaultGroup = isset( $this->groupLoads[ $group ] ) ? $group : self::GROUP_GENERIC; |
234 | } |
235 | |
236 | private static function newTrackedConnectionsArray() { |
237 | // Note that CATEGORY_GAUGE connections are untracked |
238 | return [ |
239 | self::CATEGORY_ROUND => [], |
240 | self::CATEGORY_AUTOCOMMIT => [] |
241 | ]; |
242 | } |
243 | |
244 | public function getClusterName(): string { |
245 | // Fallback to the current primary name if not specified |
246 | return $this->clusterName ?? $this->getServerName( ServerInfo::WRITER_INDEX ); |
247 | } |
248 | |
249 | public function getLocalDomainID(): string { |
250 | return $this->localDomain->getId(); |
251 | } |
252 | |
253 | public function resolveDomainID( $domain ): string { |
254 | return $this->resolveDomainInstance( $domain )->getId(); |
255 | } |
256 | |
257 | /** |
258 | * @param DatabaseDomain|string|false $domain |
259 | * @return DatabaseDomain |
260 | */ |
261 | final protected function resolveDomainInstance( $domain ): DatabaseDomain { |
262 | if ( $domain instanceof DatabaseDomain ) { |
263 | return $domain; // already a domain instance |
264 | } elseif ( $domain === false || $domain === $this->localDomain->getId() ) { |
265 | return $this->localDomain; |
266 | } elseif ( isset( $this->domainAliases[$domain] ) ) { |
267 | $this->domainAliases[$domain] = |
268 | DatabaseDomain::newFromId( $this->domainAliases[$domain] ); |
269 | |
270 | return $this->domainAliases[$domain]; |
271 | } |
272 | |
273 | $cachedDomain = $this->nonLocalDomainCache[$domain] ?? null; |
274 | if ( $cachedDomain === null ) { |
275 | $cachedDomain = DatabaseDomain::newFromId( $domain ); |
276 | $this->nonLocalDomainCache = [ $domain => $cachedDomain ]; |
277 | } |
278 | |
279 | return $cachedDomain; |
280 | } |
281 | |
282 | /** |
283 | * Get the first group in $groups with assigned servers, falling back to the default group |
284 | * |
285 | * @param string[]|string|false $groups Query group(s) in preference order, [], or false |
286 | * @param int $i Specific server index or DB_PRIMARY/DB_REPLICA |
287 | * @return string Query group |
288 | */ |
289 | private function resolveGroups( $groups, $i ) { |
290 | // If a specific replica server was specified, then $groups makes no sense |
291 | if ( $i > 0 && $groups !== [] && $groups !== false ) { |
292 | $list = implode( ', ', (array)$groups ); |
293 | throw new LogicException( "Query group(s) ($list) given with server index (#$i)" ); |
294 | } |
295 | |
296 | if ( $groups === [] || $groups === false || $groups === $this->defaultGroup ) { |
297 | $resolvedGroup = $this->defaultGroup; |
298 | } elseif ( is_string( $groups ) ) { |
299 | $resolvedGroup = isset( $this->groupLoads[$groups] ) ? $groups : $this->defaultGroup; |
300 | } elseif ( is_array( $groups ) ) { |
301 | $resolvedGroup = $this->defaultGroup; |
302 | foreach ( $groups as $group ) { |
303 | if ( isset( $this->groupLoads[$group] ) ) { |
304 | $resolvedGroup = $group; |
305 | break; |
306 | } |
307 | } |
308 | } else { |
309 | $resolvedGroup = $this->defaultGroup; |
310 | } |
311 | |
312 | return $resolvedGroup; |
313 | } |
314 | |
315 | /** |
316 | * Sanitize connection flags provided by a call to getConnection() |
317 | * |
318 | * @param int $flags Bitfield of class CONN_* constants |
319 | * @param int $i Specific server index or DB_PRIMARY/DB_REPLICA |
320 | * @param string $domain Database domain |
321 | * @return int Sanitized bitfield |
322 | */ |
323 | private function sanitizeConnectionFlags( $flags, $i, $domain ) { |
324 | // Whether an outside caller is explicitly requesting the primary database server |
325 | if ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX ) { |
326 | $flags |= self::CONN_INTENT_WRITABLE; |
327 | } |
328 | |
329 | if ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) { |
330 | // Callers use CONN_TRX_AUTOCOMMIT to bypass REPEATABLE-READ staleness without |
331 | // resorting to row locks (e.g. FOR UPDATE) or to make small out-of-band commits |
332 | // during larger transactions. This is useful for avoiding lock contention. |
333 | // Assuming all servers are of the same type (or similar), which is overwhelmingly |
334 | // the case, use the primary server information to get the attributes. The information |
335 | // for $i cannot be used since it might be DB_REPLICA, which might require connection |
336 | // attempts in order to be resolved into a real server index. |
337 | $attributes = $this->getServerAttributes( ServerInfo::WRITER_INDEX ); |
338 | if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) { |
339 | // The RDBMS does not support concurrent writes (e.g. SQLite), so attempts |
340 | // to use separate connections would just cause self-deadlocks. Note that |
341 | // REPEATABLE-READ staleness is not an issue since DB-level locking means |
342 | // that transactions are Strict Serializable anyway. |
343 | $flags &= ~self::CONN_TRX_AUTOCOMMIT; |
344 | $type = $this->serverInfo->getServerType( ServerInfo::WRITER_INDEX ); |
345 | $this->logger->info( __METHOD__ . ": CONN_TRX_AUTOCOMMIT disallowed ($type)" ); |
346 | } elseif ( isset( $this->tempTablesOnlyMode[$domain] ) ) { |
347 | // T202116: integration tests are active and queries should be all be using |
348 | // temporary clone tables (via prefix). Such tables are not visible across |
349 | // different connections nor can there be REPEATABLE-READ snapshot staleness, |
350 | // so use the same connection for everything. |
351 | $flags &= ~self::CONN_TRX_AUTOCOMMIT; |
352 | } |
353 | } |
354 | |
355 | return $flags; |
356 | } |
357 | |
358 | /** |
359 | * @param IDatabase $conn |
360 | * @param int $flags |
361 | * @throws DBUnexpectedError |
362 | */ |
363 | private function enforceConnectionFlags( IDatabase $conn, $flags ) { |
364 | if ( |
365 | self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) || |
366 | // Handles with open transactions are avoided since they might be subject |
367 | // to REPEATABLE-READ snapshots, which could affect the lag estimate query. |
368 | self::fieldHasBit( $flags, self::CONN_UNTRACKED_GAUGE ) |
369 | ) { |
370 | if ( $conn->trxLevel() ) { |
371 | throw new DBUnexpectedError( |
372 | $conn, |
373 | 'Handle requested with autocommit-mode yet it has a transaction' |
374 | ); |
375 | } |
376 | |
377 | $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode |
378 | } |
379 | } |
380 | |
381 | /** |
382 | * @param array $loads |
383 | * @param int|float $maxLag Restrict the maximum allowed lag to this many seconds, or INF for no max |
384 | * @return int|string|false |
385 | */ |
386 | private function getRandomNonLagged( array $loads, $maxLag = INF ) { |
387 | $lags = $this->getLagTimes(); |
388 | |
389 | # Unset excessively lagged servers |
390 | foreach ( $lags as $i => $lag ) { |
391 | if ( $i !== ServerInfo::WRITER_INDEX ) { |
392 | # How much lag this server nominally is allowed to have |
393 | $maxServerLag = $this->serverInfo->getServerMaxLag( $i ); // default |
394 | # Constrain that further by $maxLag argument |
395 | $maxServerLag = min( $maxServerLag, $maxLag ); |
396 | |
397 | $srvName = $this->serverInfo->getServerName( $i ); |
398 | if ( $lag === false && !is_infinite( $maxServerLag ) ) { |
399 | $this->logger->debug( |
400 | __METHOD__ . ": server {db_server} is not replicating?", |
401 | [ 'db_server' => $srvName ] |
402 | ); |
403 | unset( $loads[$i] ); |
404 | } elseif ( $lag > $maxServerLag ) { |
405 | $this->logger->debug( |
406 | __METHOD__ . |
407 | ": server {db_server} has {lag} seconds of lag (>= {maxlag})", |
408 | [ 'db_server' => $srvName, 'lag' => $lag, 'maxlag' => $maxServerLag ] |
409 | ); |
410 | unset( $loads[$i] ); |
411 | } |
412 | } |
413 | } |
414 | |
415 | if ( array_sum( $loads ) == 0 ) { |
416 | // All the replicas with non-zero load are lagged and the primary has zero load. |
417 | // Inform caller so that it can use switch to read-only mode and use a lagged replica. |
418 | return false; |
419 | } |
420 | |
421 | # Return a random representative of the remainder |
422 | return ArrayUtils::pickRandom( $loads ); |
423 | } |
424 | |
425 | public function getReaderIndex( $group = false ) { |
426 | $group = is_string( $group ) ? $group : self::GROUP_GENERIC; |
427 | |
428 | if ( !$this->serverInfo->hasReplicaServers() ) { |
429 | // There is only one possible server to use (the primary) |
430 | return ServerInfo::WRITER_INDEX; |
431 | } |
432 | |
433 | $index = $this->getExistingReaderIndex( $group ); |
434 | if ( $index !== self::READER_INDEX_NONE ) { |
435 | // A reader index was already selected for this query group. Keep using it, |
436 | // since any session replication position was already waited on and any |
437 | // active transaction will be reused (e.g. for point-in-time snapshots). |
438 | return $index; |
439 | } |
440 | |
441 | // Get the server weight array for this load group |
442 | $loads = $this->groupLoads[$group] ?? []; |
443 | if ( !$loads ) { |
444 | $this->logger->info( __METHOD__ . ": no loads for group $group" ); |
445 | return false; |
446 | } |
447 | |
448 | // Load any session replication positions, before any connection attempts, |
449 | // since reading them afterwards can only cause more delay due to possibly |
450 | // seeing even higher replication positions (e.g. from concurrent requests). |
451 | $this->loadSessionPrimaryPos(); |
452 | |
453 | // Scale the configured load ratios according to each server's load/state. |
454 | // This can sometimes trigger server connections due to cache regeneration. |
455 | $this->loadMonitor->scaleLoads( $loads ); |
456 | |
457 | // Pick a server, accounting for weight, load, lag, and session consistency |
458 | $i = $this->pickReaderIndex( $loads ); |
459 | if ( $i === false ) { |
460 | // Connection attempts failed |
461 | return false; |
462 | } |
463 | |
464 | // If data seen by queries is expected to reflect writes from a prior transaction, |
465 | // then wait for the chosen server to apply those changes. This is used to improve |
466 | // session consistency. |
467 | if ( !$this->awaitSessionPrimaryPos( $i ) ) { |
468 | // Data will be outdated compared to what was expected |
469 | $this->setLaggedReplicaMode(); |
470 | } |
471 | |
472 | // Keep using this server for DB_REPLICA handles for this group |
473 | if ( $i < 0 ) { |
474 | throw new UnexpectedValueException( "Cannot set a negative read server index" ); |
475 | } |
476 | $this->readIndexByGroup[$group] = $i; |
477 | |
478 | $serverName = $this->getServerName( $i ); |
479 | $this->logger->debug( __METHOD__ . ": using server $serverName for group '$group'" ); |
480 | |
481 | return $i; |
482 | } |
483 | |
484 | /** |
485 | * Get the server index chosen for DB_REPLICA connections for the given query group |
486 | * |
487 | * @param string $group Query group; use false for the generic group |
488 | * @return int Specific server index or LoadBalancer::READER_INDEX_NONE if none was chosen |
489 | */ |
490 | protected function getExistingReaderIndex( $group ) { |
491 | return $this->readIndexByGroup[$group] ?? self::READER_INDEX_NONE; |
492 | } |
493 | |
494 | /** |
495 | * Pick a server that is reachable, preferably non-lagged, and return its server index |
496 | * |
497 | * This will leave the server connection open within the pool for reuse |
498 | * |
499 | * @param array $loads List of server weights |
500 | * @return int|false reader index or false |
501 | */ |
502 | private function pickReaderIndex( array $loads ) { |
503 | if ( $loads === [] ) { |
504 | throw new InvalidArgumentException( "Server configuration array is empty" ); |
505 | } |
506 | |
507 | // Quickly look through the available servers for a server that meets criteria... |
508 | $currentLoads = $loads; |
509 | $i = false; |
510 | while ( count( $currentLoads ) ) { |
511 | if ( $this->waitForPos && $this->waitForPos->asOfTime() ) { |
512 | $this->logger->debug( __METHOD__ . ": session has replication position" ); |
513 | // ChronologyProtector::getSessionPrimaryPos called in $this->loadSessionPrimaryPos() |
514 | // sets "waitForPos" for session consistency. |
515 | // This triggers doWait() after connect, so it's especially good to |
516 | // avoid lagged servers so as to avoid excessive delay in that method. |
517 | $ago = microtime( true ) - $this->waitForPos->asOfTime(); |
518 | // Aim for <= 1 second of waiting (being too picky can backfire) |
519 | $i = $this->getRandomNonLagged( $currentLoads, $ago + 1 ); |
520 | } else { |
521 | // Any server with less lag than it's 'max lag' param is preferable |
522 | $i = $this->getRandomNonLagged( $currentLoads ); |
523 | } |
524 | |
525 | if ( $i === false && count( $currentLoads ) ) { |
526 | $this->setLaggedReplicaMode(); |
527 | // All replica DBs lagged, just pick anything. |
528 | $i = ArrayUtils::pickRandom( $currentLoads ); |
529 | } |
530 | |
531 | if ( $i === false ) { |
532 | // pickRandom() returned false. |
533 | // This is permanent and means the configuration or LoadMonitor |
534 | // wants us to return false. |
535 | $this->logger->debug( __METHOD__ . ": no suitable server found" ); |
536 | return false; |
537 | } |
538 | |
539 | $serverName = $this->getServerName( $i ); |
540 | $this->logger->debug( __METHOD__ . ": connecting to $serverName..." ); |
541 | |
542 | // Get a connection to this server without triggering complementary connections |
543 | // to other servers (due to things like lag or read-only checks). We want to avoid |
544 | // the risk of overhead and recursion here. |
545 | $conn = $this->getServerConnection( $i, self::DOMAIN_ANY, self::CONN_SILENCE_ERRORS ); |
546 | if ( !$conn ) { |
547 | $this->logger->warning( __METHOD__ . ": failed connecting to $serverName" ); |
548 | unset( $currentLoads[$i] ); // avoid this server next iteration |
549 | continue; |
550 | } |
551 | |
552 | // Return this server |
553 | break; |
554 | } |
555 | |
556 | // If all servers were down, quit now |
557 | if ( $currentLoads === [] ) { |
558 | $this->logger->error( __METHOD__ . ": all servers down" ); |
559 | } |
560 | |
561 | return $i; |
562 | } |
563 | |
564 | public function waitForAll( DBPrimaryPos $pos, $timeout = null ) { |
565 | $timeout = $timeout ?: self::MAX_WAIT_DEFAULT; |
566 | |
567 | $oldPos = $this->waitForPos; |
568 | try { |
569 | $this->waitForPos = $pos; |
570 | |
571 | $failedReplicas = []; |
572 | foreach ( $this->serverInfo->getStreamingReplicaIndexes() as $i ) { |
573 | if ( $this->serverHasLoadInAnyGroup( $i ) ) { |
574 | $start = microtime( true ); |
575 | $ok = $this->awaitSessionPrimaryPos( $i, $timeout ); |
576 | if ( !$ok ) { |
577 | $failedReplicas[] = $this->getServerName( $i ); |
578 | } |
579 | $timeout -= intval( microtime( true ) - $start ); |
580 | } |
581 | } |
582 | |
583 | // Stop spamming logs when only one replica is lagging and we have 5+ replicas. |
584 | // Mediawiki automatically stops sending queries to the lagged one. |
585 | $failed = $failedReplicas && ( count( $failedReplicas ) > 1 || $this->getServerCount() < 5 ); |
586 | if ( $failed ) { |
587 | $this->logger->error( |
588 | "Timed out waiting for replication to reach {raw_pos}", |
589 | [ |
590 | 'raw_pos' => $pos->__toString(), |
591 | 'failed_hosts' => $failedReplicas, |
592 | 'timeout' => $timeout, |
593 | 'exception' => new RuntimeException() |
594 | ] |
595 | ); |
596 | } |
597 | |
598 | return !$failed; |
599 | } finally { |
600 | // Restore the old position; this is used for throttling, not lag-protection |
601 | $this->waitForPos = $oldPos; |
602 | } |
603 | } |
604 | |
605 | /** |
606 | * @param int $i Specific server index |
607 | * @return bool |
608 | */ |
609 | private function serverHasLoadInAnyGroup( $i ) { |
610 | foreach ( $this->groupLoads as $loadsByIndex ) { |
611 | if ( ( $loadsByIndex[$i] ?? 0 ) > 0 ) { |
612 | return true; |
613 | } |
614 | } |
615 | |
616 | return false; |
617 | } |
618 | |
619 | public function getAnyOpenConnection( $i, $flags = 0 ) { |
620 | $i = ( $i === self::DB_PRIMARY ) ? ServerInfo::WRITER_INDEX : $i; |
621 | $conn = false; |
622 | foreach ( $this->conns as $type => $poolConnsByServer ) { |
623 | if ( $i === self::DB_REPLICA ) { |
624 | // Consider all existing connections to any server |
625 | $applicableConnsByServer = $poolConnsByServer; |
626 | } else { |
627 | // Consider all existing connections to a specific server |
628 | $applicableConnsByServer = isset( $poolConnsByServer[$i] ) |
629 | ? [ $i => $poolConnsByServer[$i] ] |
630 | : []; |
631 | } |
632 | |
633 | $conn = $this->pickAnyOpenConnection( $applicableConnsByServer ); |
634 | if ( $conn ) { |
635 | $this->logger->debug( __METHOD__ . ": found '$type' connection to #$i." ); |
636 | break; |
637 | } |
638 | } |
639 | |
640 | if ( $conn ) { |
641 | $this->enforceConnectionFlags( $conn, $flags ); |
642 | } |
643 | |
644 | return $conn; |
645 | } |
646 | |
647 | /** |
648 | * @param Database[][] $connsByServer Map of (server index => array of DB handles) |
649 | * @return IDatabase|false An appropriate open connection or false if none found |
650 | */ |
651 | private function pickAnyOpenConnection( array $connsByServer ) { |
652 | foreach ( $connsByServer as $i => $conns ) { |
653 | foreach ( $conns as $conn ) { |
654 | if ( !$conn->isOpen() ) { |
655 | $this->logger->warning( |
656 | __METHOD__ . |
657 | ": pooled DB handle for {db_server} (#$i) has no open connection.", |
658 | $this->getConnLogContext( $conn ) |
659 | ); |
660 | continue; // some sort of error occurred? |
661 | } |
662 | return $conn; |
663 | } |
664 | } |
665 | |
666 | return false; |
667 | } |
668 | |
669 | /** |
670 | * Wait for a given replica DB to catch up to the primary DB pos stored in "waitForPos" |
671 | * |
672 | * @see loadSessionPrimaryPos() |
673 | * |
674 | * @param int $index Specific server index |
675 | * @param int|null $timeout Max seconds to wait; default is "waitTimeout" |
676 | * @return bool Success |
677 | */ |
678 | private function awaitSessionPrimaryPos( $index, $timeout = null ) { |
679 | $timeout = max( 1, intval( $timeout ?: self::MAX_WAIT_DEFAULT ) ); |
680 | |
681 | if ( !$this->waitForPos || $index === ServerInfo::WRITER_INDEX ) { |
682 | return true; |
683 | } |
684 | |
685 | $srvName = $this->getServerName( $index ); |
686 | |
687 | // Check if we already know that the DB has reached this point |
688 | $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $srvName, 'v2' ); |
689 | |
690 | /** @var DBPrimaryPos $knownReachedPos */ |
691 | $position = $this->srvCache->get( $key ); |
692 | if ( !is_array( $position ) ) { |
693 | $knownReachedPos = null; |
694 | } else { |
695 | $class = $position['_type_']; |
696 | $knownReachedPos = $class::newFromArray( $position ); |
697 | } |
698 | if ( |
699 | $knownReachedPos instanceof DBPrimaryPos && |
700 | $knownReachedPos->hasReached( $this->waitForPos ) |
701 | ) { |
702 | $this->logger->debug( |
703 | __METHOD__ . |
704 | ": replica DB {db_server} known to be caught up (pos >= $knownReachedPos).", |
705 | [ 'db_server' => $srvName ] |
706 | ); |
707 | |
708 | return true; |
709 | } |
710 | |
711 | $close = false; // close the connection afterwards |
712 | $flags = self::CONN_SILENCE_ERRORS; |
713 | // Check if there is an existing connection that can be used |
714 | $conn = $this->getAnyOpenConnection( $index, $flags ); |
715 | if ( !$conn ) { |
716 | // Get a connection to this server without triggering complementary connections |
717 | // to other servers (due to things like lag or read-only checks). We want to avoid |
718 | // the risk of overhead and recursion here. |
719 | $conn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags ); |
720 | if ( !$conn ) { |
721 | $this->logger->warning( |
722 | __METHOD__ . ': failed to connect to {db_server}', |
723 | [ 'db_server' => $srvName ] |
724 | ); |
725 | |
726 | return false; |
727 | } |
728 | // Avoid connection spam in waitForAll() when connections |
729 | // are made just for the sake of doing this lag check. |
730 | $close = true; |
731 | } |
732 | |
733 | $this->logger->info( |
734 | __METHOD__ . |
735 | ': waiting for replica DB {db_server} to catch up...', |
736 | $this->getConnLogContext( $conn ) |
737 | ); |
738 | |
739 | $result = $conn->primaryPosWait( $this->waitForPos, $timeout ); |
740 | |
741 | $ok = ( $result !== null && $result != -1 ); |
742 | if ( $ok ) { |
743 | // Remember that the DB reached this point |
744 | $this->srvCache->set( $key, $this->waitForPos->toArray(), BagOStuff::TTL_DAY ); |
745 | } |
746 | |
747 | if ( $close ) { |
748 | $this->closeConnection( $conn ); |
749 | } |
750 | |
751 | return $ok; |
752 | } |
753 | |
754 | public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) { |
755 | return $this->getConnectionRef( $i, $groups, $domain, $flags ); |
756 | } |
757 | |
758 | public function getConnectionInternal( $i, $groups = [], $domain = false, $flags = 0 ): IDatabase { |
759 | $domain = $this->resolveDomainID( $domain ); |
760 | $group = $this->resolveGroups( $groups, $i ); |
761 | $flags = $this->sanitizeConnectionFlags( $flags, $i, $domain ); |
762 | // If given DB_PRIMARY/DB_REPLICA, resolve it to a specific server index. Resolving |
763 | // DB_REPLICA might trigger getServerConnection() calls due to the getReaderIndex() |
764 | // connectivity checks or LoadMonitor::scaleLoads() server state cache regeneration. |
765 | // The use of getServerConnection() instead of getConnection() avoids infinite loops. |
766 | $serverIndex = $i; |
767 | if ( $i === self::DB_PRIMARY ) { |
768 | $serverIndex = ServerInfo::WRITER_INDEX; |
769 | } elseif ( $i === self::DB_REPLICA ) { |
770 | $groupIndex = $this->getReaderIndex( $group ); |
771 | if ( $groupIndex !== false ) { |
772 | // Group connection succeeded |
773 | $serverIndex = $groupIndex; |
774 | } |
775 | if ( $serverIndex < 0 ) { |
776 | $this->reportConnectionError( 'could not connect to any replica DB server' ); |
777 | } |
778 | } elseif ( !$this->serverInfo->hasServerIndex( $i ) ) { |
779 | throw new UnexpectedValueException( "Invalid server index index #$i" ); |
780 | } |
781 | // Get an open connection to that server (might trigger a new connection) |
782 | return $this->getServerConnection( $serverIndex, $domain, $flags ); |
783 | } |
784 | |
785 | public function getServerConnection( $i, $domain, $flags = 0 ) { |
786 | $domainInstance = DatabaseDomain::newFromId( $domain ); |
787 | // Number of connections made before getting the server index and handle |
788 | $priorConnectionsMade = $this->connectionCounter; |
789 | // Get an open connection to this server (might trigger a new connection) |
790 | $conn = $this->reuseOrOpenConnectionForNewRef( $i, $domainInstance, $flags ); |
791 | // Throw an error or otherwise bail out if the connection attempt failed |
792 | if ( !( $conn instanceof IDatabase ) ) { |
793 | if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) { |
794 | $this->reportConnectionError(); |
795 | } |
796 | |
797 | return false; |
798 | } |
799 | |
800 | // Profile any new connections caused by this method |
801 | if ( $this->connectionCounter > $priorConnectionsMade ) { |
802 | $this->trxProfiler->recordConnection( |
803 | $conn->getServerName(), |
804 | $conn->getDBname(), |
805 | self::fieldHasBit( $flags, self::CONN_INTENT_WRITABLE ) |
806 | ); |
807 | } |
808 | |
809 | if ( !$conn->isOpen() ) { |
810 | $this->lastErrorConn = $conn; |
811 | // Connection was made but later unrecoverably lost for some reason. |
812 | // Do not return a handle that will just throw exceptions on use, but |
813 | // let the calling code, e.g. getReaderIndex(), try another server. |
814 | if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) { |
815 | $this->reportConnectionError(); |
816 | } |
817 | return false; |
818 | } |
819 | |
820 | return $conn; |
821 | } |
822 | |
823 | public function reuseConnection( IDatabase $conn ) { |
824 | // no-op |
825 | } |
826 | |
827 | public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ): IDatabase { |
828 | if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) { |
829 | throw new UnexpectedValueException( |
830 | __METHOD__ . ' got CONN_SILENCE_ERRORS; connection is already deferred' |
831 | ); |
832 | } |
833 | |
834 | $domain = $this->resolveDomainID( $domain ); |
835 | $role = ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX ) |
836 | ? self::DB_PRIMARY |
837 | : self::DB_REPLICA; |
838 | |
839 | return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role, $this->modcount ); |
840 | } |
841 | |
842 | public function getMaintenanceConnectionRef( |
843 | $i, |
844 | $groups = [], |
845 | $domain = false, |
846 | $flags = 0 |
847 | ): DBConnRef { |
848 | if ( self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) { |
849 | throw new UnexpectedValueException( |
850 | __METHOD__ . ' CONN_SILENCE_ERRORS is not supported' |
851 | ); |
852 | } |
853 | |
854 | $domain = $this->resolveDomainID( $domain ); |
855 | $role = ( $i === self::DB_PRIMARY || $i === ServerInfo::WRITER_INDEX ) |
856 | ? self::DB_PRIMARY |
857 | : self::DB_REPLICA; |
858 | |
859 | return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role, $this->modcount ); |
860 | } |
861 | |
862 | /** |
863 | * Get a live connection handle to the given domain |
864 | * |
865 | * This will reuse an existing tracked connection when possible. In some cases, this |
866 | * involves switching the DB domain of an existing handle in order to reuse it. If no |
867 | * existing handles can be reused, then a new connection will be made. |
868 | * |
869 | * @param int $i Specific server index |
870 | * @param DatabaseDomain $domain Database domain ID required by the reference |
871 | * @param int $flags Bit field of class CONN_* constants |
872 | * @return IDatabase|null Database or null on error |
873 | * @throws DBError When database selection fails |
874 | * @throws InvalidArgumentException When the server index is invalid |
875 | * @throws UnexpectedValueException When the DB domain of the connection is corrupted |
876 | * @throws DBAccessError If disable() was called |
877 | */ |
878 | private function reuseOrOpenConnectionForNewRef( $i, DatabaseDomain $domain, $flags = 0 ) { |
879 | // Figure out which connection pool to use based on the flags |
880 | if ( $this->fieldHasBit( $flags, self::CONN_UNTRACKED_GAUGE ) ) { |
881 | // Use low timeouts, use autocommit mode, ignore transaction rounds |
882 | $category = self::CATEGORY_GAUGE; |
883 | } elseif ( self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT ) ) { |
884 | // Use autocommit mode, ignore transaction rounds |
885 | $category = self::CATEGORY_AUTOCOMMIT; |
886 | } else { |
887 | // Respect DBO_DEFAULT, respect transaction rounds |
888 | $category = self::CATEGORY_ROUND; |
889 | } |
890 | |
891 | $conn = null; |
892 | // Reuse a free connection in the pool from any domain if possible. There should only |
893 | // be one connection in this pool unless either: |
894 | // - a) IDatabase::databasesAreIndependent() returns true (e.g. postgres) and two |
895 | // or more database domains have been used during the load balancer's lifetime |
896 | // - b) Two or more nested function calls used getConnection() on different domains. |
897 | foreach ( ( $this->conns[$category][$i] ?? [] ) as $poolConn ) { |
898 | // Check if any required DB domain changes for the new reference are possible |
899 | // Calling selectDomain() would trigger a reconnect, which will break if a |
900 | // transaction is active or if there is any other meaningful session state. |
901 | $isShareable = !( |
902 | $poolConn->databasesAreIndependent() && |
903 | $domain->getDatabase() !== null && |
904 | $domain->getDatabase() !== $poolConn->getDBname() |
905 | ); |
906 | if ( $isShareable ) { |
907 | $conn = $poolConn; |
908 | // Make any required DB domain changes for the new reference |
909 | if ( !$domain->isUnspecified() ) { |
910 | $conn->selectDomain( $domain ); |
911 | } |
912 | $this->logger->debug( __METHOD__ . ": reusing connection for $i/$domain" ); |
913 | break; |
914 | } |
915 | } |
916 | |
917 | // If necessary, try to open a new connection and add it to the pool |
918 | if ( !$conn ) { |
919 | $conn = $this->reallyOpenConnection( |
920 | $i, |
921 | $domain, |
922 | [ self::INFO_CONN_CATEGORY => $category ] |
923 | ); |
924 | if ( $conn->isOpen() ) { |
925 | // Make Database::isReadOnly() respect server-side and configuration-based |
926 | // read-only mode. Note that replica handles are always seen as read-only |
927 | // in Database::isReadOnly() and Database::assertIsWritablePrimary(). |
928 | if ( $i === ServerInfo::WRITER_INDEX ) { |
929 | if ( $this->readOnlyReason !== false ) { |
930 | $readOnlyReason = $this->readOnlyReason; |
931 | } elseif ( $this->isPrimaryRunningReadOnly( $conn ) ) { |
932 | $readOnlyReason = 'The primary database server is running in read-only mode.'; |
933 | } else { |
934 | $readOnlyReason = false; |
935 | } |
936 | $conn->setLBInfo( $conn::LB_READ_ONLY_REASON, $readOnlyReason ); |
937 | } |
938 | // Connection obtained; check if it belongs to a tracked connection category |
939 | if ( isset( $this->conns[$category] ) ) { |
940 | // Track this connection for future reuse |
941 | $this->conns[$category][$i][] = $conn; |
942 | } |
943 | } else { |
944 | $this->logger->warning( __METHOD__ . ": connection error for $i/$domain" ); |
945 | $this->lastErrorConn = $conn; |
946 | $conn = null; |
947 | } |
948 | } |
949 | |
950 | if ( $conn instanceof IDatabase ) { |
951 | // Check to make sure that the right domain is selected |
952 | $this->assertConnectionDomain( $conn, $domain ); |
953 | // Check to make sure that the CONN_* flags are respected |
954 | $this->enforceConnectionFlags( $conn, $flags ); |
955 | } |
956 | |
957 | return $conn; |
958 | } |
959 | |
960 | /** |
961 | * Sanity check to make sure that the right domain is selected |
962 | * |
963 | * @param Database $conn |
964 | * @param DatabaseDomain $domain |
965 | * @throws DBUnexpectedError |
966 | */ |
967 | private function assertConnectionDomain( Database $conn, DatabaseDomain $domain ) { |
968 | if ( !$domain->isCompatible( $conn->getDomainID() ) ) { |
969 | throw new UnexpectedValueException( |
970 | "Got connection to '{$conn->getDomainID()}', but expected one for '{$domain}'" |
971 | ); |
972 | } |
973 | } |
974 | |
975 | public function getServerAttributes( $i ) { |
976 | return $this->databaseFactory->attributesFromType( |
977 | $this->getServerType( $i ), |
978 | $this->serverInfo->getServerDriver( $i ) |
979 | ); |
980 | } |
981 | |
982 | /** |
983 | * Open a new network connection to a server (uncached) |
984 | * |
985 | * Returns a Database object whether or not the connection was successful. |
986 | * |
987 | * @param int $i Specific server index |
988 | * @param DatabaseDomain $domain Domain the connection is for, possibly unspecified |
989 | * @param array $lbInfo Additional information for setLBInfo() |
990 | * @return Database |
991 | * @throws DBAccessError |
992 | * @throws InvalidArgumentException |
993 | */ |
994 | protected function reallyOpenConnection( $i, DatabaseDomain $domain, array $lbInfo ) { |
995 | if ( $this->disabled ) { |
996 | throw new DBAccessError(); |
997 | } |
998 | |
999 | $server = $this->serverInfo->getServerInfoStrict( $i ); |
1000 | if ( $lbInfo[self::INFO_CONN_CATEGORY] === self::CATEGORY_GAUGE ) { |
1001 | // Use low connection/read timeouts for connection used for gauging server health. |
1002 | // Gauge information should be cached and used to avoid outages. Indefinite hanging |
1003 | // while gauging servers would do the opposite. |
1004 | $server['connectTimeout'] = min( 1, $server['connectTimeout'] ?? INF ); |
1005 | $server['receiveTimeout'] = min( 1, $server['receiveTimeout'] ?? INF ); |
1006 | // Avoid implicit transactions and avoid any SET query for session variables during |
1007 | // Database::open(). If a server becomes slow, every extra query can cause significant |
1008 | // delays, even with low connect/receive timeouts. |
1009 | $server['flags'] ??= 0; |
1010 | $server['flags'] &= ~IDatabase::DBO_DEFAULT; |
1011 | $server['flags'] |= IDatabase::DBO_GAUGE; |
1012 | } else { |
1013 | // Use implicit transactions unless explicitly configured otherwise |
1014 | $server['flags'] ??= IDatabase::DBO_DEFAULT; |
1015 | } |
1016 | |
1017 | if ( !empty( $server['is static'] ) ) { |
1018 | $topologyRole = IDatabase::ROLE_STATIC_CLONE; |
1019 | } else { |
1020 | $topologyRole = ( $i === ServerInfo::WRITER_INDEX ) |
1021 | ? IDatabase::ROLE_STREAMING_MASTER |
1022 | : IDatabase::ROLE_STREAMING_REPLICA; |
1023 | } |
1024 | |
1025 | $conn = $this->databaseFactory->create( |
1026 | $server['type'], |
1027 | array_merge( $server, [ |
1028 | // Basic replication role information |
1029 | 'topologyRole' => $topologyRole, |
1030 | // Use the database specified in $domain (null means "none or entrypoint DB"); |
1031 | // fallback to the $server default if the RDBMs is an embedded library using a |
1032 | // file on disk since there would be nothing to access to without a DB/file name. |
1033 | 'dbname' => $this->getServerAttributes( $i )[Database::ATTR_DB_IS_FILE] |
1034 | ? ( $domain->getDatabase() ?? $server['dbname'] ?? null ) |
1035 | : $domain->getDatabase(), |
1036 | // Override the $server default schema with that of $domain if specified |
1037 | 'schema' => $domain->getSchema(), |
1038 | // Use the table prefix specified in $domain |
1039 | 'tablePrefix' => $domain->getTablePrefix(), |
1040 | 'srvCache' => $this->srvCache, |
1041 | 'logger' => $this->logger, |
1042 | 'errorLogger' => $this->errorLogger, |
1043 | 'trxProfiler' => $this->trxProfiler, |
1044 | 'lbInfo' => [ self::INFO_SERVER_INDEX => $i ] + $lbInfo |
1045 | ] ), |
1046 | Database::NEW_UNCONNECTED |
1047 | ); |
1048 | // Set alternative table/index names before any queries can be issued |
1049 | $conn->setTableAliases( $this->tableAliases ); |
1050 | $conn->setIndexAliases( $this->indexAliases ); |
1051 | // Account for any active transaction round and listeners |
1052 | if ( $i === ServerInfo::WRITER_INDEX ) { |
1053 | if ( $this->trxRoundId !== false ) { |
1054 | $this->applyTransactionRoundFlags( $conn ); |
1055 | } |
1056 | foreach ( $this->trxRecurringCallbacks as $name => $callback ) { |
1057 | $conn->setTransactionListener( $name, $callback ); |
1058 | } |
1059 | } |
1060 | |
1061 | // Make the connection handle live |
1062 | try { |
1063 | $conn->initConnection(); |
1064 | ++$this->connectionCounter; |
1065 | } catch ( DBConnectionError $e ) { |
1066 | $this->lastErrorConn = $conn; |
1067 | // ignore; let the DB handle the logging |
1068 | } |
1069 | |
1070 | if ( $conn->isOpen() ) { |
1071 | $this->logger->debug( __METHOD__ . ": opened new connection for $i/$domain" ); |
1072 | } else { |
1073 | $this->logger->warning( |
1074 | __METHOD__ . ": connection error for $i/{db_domain}", |
1075 | [ 'db_domain' => $domain->getId() ] |
1076 | ); |
1077 | } |
1078 | |
1079 | // Log when many connection are made during a single request/script |
1080 | $count = 0; |
1081 | foreach ( $this->conns as $poolConnsByServer ) { |
1082 | foreach ( $poolConnsByServer as $serverConns ) { |
1083 | $count += count( $serverConns ); |
1084 | } |
1085 | } |
1086 | if ( $count >= self::CONN_HELD_WARN_THRESHOLD ) { |
1087 | $this->logger->warning( |
1088 | __METHOD__ . ": {connections}+ connections made (primary={primarydb})", |
1089 | $this->getConnLogContext( |
1090 | $conn, |
1091 | [ |
1092 | 'connections' => $count, |
1093 | 'primarydb' => $this->serverInfo->getPrimaryServerName(), |
1094 | 'db_domain' => $domain->getId() |
1095 | ] |
1096 | ) |
1097 | ); |
1098 | } |
1099 | |
1100 | $this->assertConnectionDomain( $conn, $domain ); |
1101 | |
1102 | return $conn; |
1103 | } |
1104 | |
1105 | /** |
1106 | * Make sure that any "waitForPos" replication positions are loaded and available |
1107 | * |
1108 | * Each load balancer cluster has up to one replication position for the session. |
1109 | * These are used when data read by queries is expected to reflect writes caused |
1110 | * by a prior request/script from the same client. |
1111 | * |
1112 | * @see awaitSessionPrimaryPos() |
1113 | */ |
1114 | private function loadSessionPrimaryPos() { |
1115 | if ( !$this->chronologyProtectorCalled && $this->chronologyProtector ) { |
1116 | $this->chronologyProtectorCalled = true; |
1117 | $pos = $this->chronologyProtector->getSessionPrimaryPos( $this ); |
1118 | $this->logger->debug( __METHOD__ . ': executed chronology callback.' ); |
1119 | if ( $pos ) { |
1120 | if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) { |
1121 | $this->waitForPos = $pos; |
1122 | } |
1123 | } |
1124 | } |
1125 | } |
1126 | |
1127 | /** |
1128 | * @param string $extraLbError Separat load balancer error |
1129 | * @throws DBConnectionError |
1130 | * @return never |
1131 | */ |
1132 | private function reportConnectionError( $extraLbError = '' ) { |
1133 | if ( $this->lastErrorConn instanceof IDatabase ) { |
1134 | $srvName = $this->lastErrorConn->getServerName(); |
1135 | $lastDbError = $this->lastErrorConn->lastError() ?: 'unknown error'; |
1136 | |
1137 | $exception = new DBConnectionError( |
1138 | $this->lastErrorConn, |
1139 | $extraLbError |
1140 | ? "{$extraLbError}; {$lastDbError} ({$srvName})" |
1141 | : "{$lastDbError} ({$srvName})" |
1142 | ); |
1143 | |
1144 | if ( $extraLbError ) { |
1145 | $this->logger->warning( |
1146 | __METHOD__ . ": $extraLbError; {last_error} ({db_server})", |
1147 | $this->getConnLogContext( |
1148 | $this->lastErrorConn, |
1149 | [ |
1150 | 'method' => __METHOD__, |
1151 | 'last_error' => $lastDbError |
1152 | ] |
1153 | ) |
1154 | ); |
1155 | } |
1156 | } else { |
1157 | $exception = new DBConnectionError( |
1158 | null, |
1159 | $extraLbError ?: 'could not connect to the DB server' |
1160 | ); |
1161 | |
1162 | if ( $extraLbError ) { |
1163 | $this->logger->error( |
1164 | __METHOD__ . ": $extraLbError", |
1165 | [ |
1166 | 'method' => __METHOD__, |
1167 | 'last_error' => '(last connection error missing)' |
1168 | ] |
1169 | ); |
1170 | } |
1171 | } |
1172 | |
1173 | throw $exception; |
1174 | } |
1175 | |
1176 | public function getWriterIndex() { |
1177 | return ServerInfo::WRITER_INDEX; |
1178 | } |
1179 | |
1180 | public function getServerCount() { |
1181 | return $this->serverInfo->getServerCount(); |
1182 | } |
1183 | |
1184 | public function hasReplicaServers() { |
1185 | return $this->serverInfo->hasReplicaServers(); |
1186 | } |
1187 | |
1188 | public function hasStreamingReplicaServers() { |
1189 | return $this->serverInfo->hasStreamingReplicaServers(); |
1190 | } |
1191 | |
1192 | public function getServerName( $i ): string { |
1193 | return $this->serverInfo->getServerName( $i ); |
1194 | } |
1195 | |
1196 | public function getServerInfo( $i ) { |
1197 | return $this->serverInfo->getServerInfo( $i ); |
1198 | } |
1199 | |
1200 | public function getServerType( $i ) { |
1201 | return $this->serverInfo->getServerType( $i ); |
1202 | } |
1203 | |
1204 | public function getPrimaryPos() { |
1205 | $conn = $this->getAnyOpenConnection( ServerInfo::WRITER_INDEX ); |
1206 | if ( $conn ) { |
1207 | return $conn->getPrimaryPos(); |
1208 | } |
1209 | |
1210 | $conn = $this->getConnectionInternal( ServerInfo::WRITER_INDEX, self::CONN_SILENCE_ERRORS ); |
1211 | // @phan-suppress-next-line PhanRedundantCondition |
1212 | if ( !$conn ) { |
1213 | $this->reportConnectionError(); |
1214 | } |
1215 | |
1216 | try { |
1217 | return $conn->getPrimaryPos(); |
1218 | } finally { |
1219 | $this->closeConnection( $conn ); |
1220 | } |
1221 | } |
1222 | |
1223 | /** |
1224 | * Apply updated configuration. |
1225 | * |
1226 | * This only unregisters servers that were removed in the new configuration. |
1227 | * It does not register new servers nor update the group load weights. |
1228 | * |
1229 | * This invalidates any open connections. However, existing connections may continue to be |
1230 | * used while they are in an active transaction. In that case, the old connection will be |
1231 | * discarded on the first operation after the transaction is complete. The next operation |
1232 | * will use a new connection based on the new configuration. |
1233 | * |
1234 | * @internal for use by LBFactory::reconfigure() |
1235 | * |
1236 | * @see DBConnRef::ensureConnection() |
1237 | * @see LBFactory::reconfigure() |
1238 | * |
1239 | * @param array $params A database configuration array, see $wgLBFactoryConf. |
1240 | * |
1241 | * @return void |
1242 | */ |
1243 | public function reconfigure( array $params ) { |
1244 | $anyServerDepooled = false; |
1245 | |
1246 | $paramServers = $params['servers']; |
1247 | $newIndexByServerIndex = $this->serverInfo->reconfigureServers( $paramServers ); |
1248 | foreach ( $newIndexByServerIndex as $i => $ni ) { |
1249 | if ( $ni !== null ) { |
1250 | // Server still exists in the new config |
1251 | $newWeightByGroup = $paramServers[$ni]['groupLoads'] ?? []; |
1252 | $newWeightByGroup[ILoadBalancer::GROUP_GENERIC] = $paramServers[$ni]['load']; |
1253 | // Check if the server was removed from any load groups |
1254 | foreach ( $this->groupLoads as $group => $weightByIndex ) { |
1255 | if ( isset( $weightByIndex[$i] ) && !isset( $newWeightByGroup[$group] ) ) { |
1256 | // Server no longer in this load group in the new config |
1257 | $anyServerDepooled = true; |
1258 | unset( $this->groupLoads[$group][$i] ); |
1259 | } |
1260 | } |
1261 | } else { |
1262 | // Server no longer exists in the new config |
1263 | $anyServerDepooled = true; |
1264 | // Note that if the primary server is depooled and a replica server promoted |
1265 | // to new primary, then DB_PRIMARY handles will fail with server index errors |
1266 | foreach ( $this->groupLoads as $group => $loads ) { |
1267 | unset( $this->groupLoads[$group][$i] ); |
1268 | } |
1269 | } |
1270 | } |
1271 | |
1272 | if ( $anyServerDepooled ) { |
1273 | // NOTE: We could close all connection here, but some may be in the middle of |
1274 | // a transaction. So instead, we leave it to DBConnRef to close the |
1275 | // connection when it detects that the modcount has changed and no |
1276 | // transaction is open. |
1277 | $this->logger->info( 'Reconfiguring dbs!' ); |
1278 | // Unpin DB_REPLICA connection groups from server indexes |
1279 | $this->readIndexByGroup = []; |
1280 | // We could close all connection here, but some may be in the middle of a |
1281 | // transaction. So instead, we leave it to DBConnRef to close the connection |
1282 | // when it detects that the modcount has changed and no transaction is open. |
1283 | $this->conns = self::newTrackedConnectionsArray(); |
1284 | // Bump modification counter to invalidate the connections held by DBConnRef |
1285 | // instances. This will cause the next call to a method on the DBConnRef |
1286 | // to get a new connection from getConnectionInternal() |
1287 | $this->modcount++; |
1288 | } |
1289 | } |
1290 | |
1291 | public function disable( $fname = __METHOD__ ) { |
1292 | $this->closeAll( $fname ); |
1293 | $this->disabled = true; |
1294 | } |
1295 | |
1296 | public function closeAll( $fname = __METHOD__ ) { |
1297 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1298 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1299 | foreach ( $this->getOpenConnections() as $conn ) { |
1300 | $conn->close( $fname ); |
1301 | } |
1302 | |
1303 | $this->conns = self::newTrackedConnectionsArray(); |
1304 | } |
1305 | |
1306 | /** |
1307 | * Close a connection |
1308 | * |
1309 | * Using this function makes sure the LoadBalancer knows the connection is closed. |
1310 | * If you use $conn->close() directly, the load balancer won't update its state. |
1311 | * |
1312 | * @param IDatabase $conn |
1313 | */ |
1314 | private function closeConnection( IDatabase $conn ) { |
1315 | if ( $conn instanceof DBConnRef ) { |
1316 | // Avoid calling close() but still leaving the handle in the pool |
1317 | throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' ); |
1318 | } |
1319 | |
1320 | $domain = $conn->getDomainID(); |
1321 | $serverIndex = $conn->getLBInfo( self::INFO_SERVER_INDEX ); |
1322 | if ( $serverIndex === null ) { |
1323 | throw new UnexpectedValueException( "Handle on '$domain' missing server index" ); |
1324 | } |
1325 | |
1326 | $srvName = $this->serverInfo->getServerName( $serverIndex ); |
1327 | |
1328 | $found = false; |
1329 | foreach ( $this->conns as $type => $poolConnsByServer ) { |
1330 | $key = array_search( $conn, $poolConnsByServer[$serverIndex] ?? [], true ); |
1331 | if ( $key !== false ) { |
1332 | $found = true; |
1333 | unset( $this->conns[$type][$serverIndex][$key] ); |
1334 | } |
1335 | } |
1336 | |
1337 | if ( !$found ) { |
1338 | $this->logger->warning( |
1339 | __METHOD__ . |
1340 | ": orphaned connection to database {$this->stringifyConn( $conn )} at '$srvName'." |
1341 | ); |
1342 | } |
1343 | |
1344 | $this->logger->debug( |
1345 | __METHOD__ . |
1346 | ": closing connection to database {$this->stringifyConn( $conn )} at '$srvName'." |
1347 | ); |
1348 | |
1349 | $conn->close( __METHOD__ ); |
1350 | } |
1351 | |
1352 | public function finalizePrimaryChanges( $fname = __METHOD__ ) { |
1353 | $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] ); |
1354 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1355 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1356 | |
1357 | $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise |
1358 | // Loop until callbacks stop adding callbacks on other connections |
1359 | $total = 0; |
1360 | do { |
1361 | $count = 0; // callbacks execution attempts |
1362 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1363 | // Run any pre-commit callbacks while leaving the post-commit ones suppressed. |
1364 | // Any error should cause all (peer) transactions to be rolled back together. |
1365 | $count += $conn->runOnTransactionPreCommitCallbacks(); |
1366 | } |
1367 | $total += $count; |
1368 | } while ( $count > 0 ); |
1369 | // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles |
1370 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1371 | $conn->setTrxEndCallbackSuppression( true ); |
1372 | } |
1373 | $this->trxRoundStage = self::ROUND_FINALIZED; |
1374 | |
1375 | return $total; |
1376 | } |
1377 | |
1378 | public function approvePrimaryChanges( int $maxWriteDuration, $fname = __METHOD__ ) { |
1379 | $this->assertTransactionRoundStage( self::ROUND_FINALIZED ); |
1380 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1381 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1382 | |
1383 | $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise |
1384 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1385 | // Any atomic sections should have been closed by now and there definitely should |
1386 | // not be any open transactions started by begin() from callers outside Database. |
1387 | if ( $conn->explicitTrxActive() ) { |
1388 | throw new DBTransactionError( |
1389 | $conn, |
1390 | "Explicit transaction still active; a caller might have failed to call " . |
1391 | "endAtomic() or cancelAtomic()." |
1392 | ); |
1393 | } |
1394 | // Assert that the time to replicate the transaction will be reasonable. |
1395 | // If this fails, then all DB transactions will be rollback back together. |
1396 | $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY ); |
1397 | if ( $maxWriteDuration > 0 ) { |
1398 | if ( $time > $maxWriteDuration ) { |
1399 | $humanTimeSec = round( $time, 3 ); |
1400 | throw new DBTransactionSizeError( |
1401 | $conn, |
1402 | "Transaction spent {time}s in writes, exceeding the {$maxWriteDuration}s limit", |
1403 | // Message parameters for: transaction-duration-limit-exceeded |
1404 | [ $time, $maxWriteDuration ], |
1405 | null, |
1406 | [ 'time' => $humanTimeSec ] |
1407 | ); |
1408 | } elseif ( $time > 0 ) { |
1409 | $timeMs = $time * 1000; |
1410 | $humanTimeMs = round( $timeMs, $timeMs > 1 ? 0 : 3 ); |
1411 | $this->logger->debug( |
1412 | "Transaction spent {time_ms}ms in writes, under the {$maxWriteDuration}s limit", |
1413 | [ 'time_ms' => $humanTimeMs ] |
1414 | ); |
1415 | } |
1416 | } |
1417 | // If a connection sits idle for too long it might be dropped, causing transaction |
1418 | // writes and session locks to be lost. Ping all the server connections before making |
1419 | // any attempt to commit the transactions belonging to the active transaction round. |
1420 | if ( $conn->writesOrCallbacksPending() || $conn->sessionLocksPending() ) { |
1421 | if ( !$conn->ping() ) { |
1422 | throw new DBTransactionError( |
1423 | $conn, |
1424 | "Pre-commit ping failed on server {$conn->getServerName()}" |
1425 | ); |
1426 | } |
1427 | } |
1428 | } |
1429 | $this->trxRoundStage = self::ROUND_APPROVED; |
1430 | } |
1431 | |
1432 | public function beginPrimaryChanges( $fname = __METHOD__ ) { |
1433 | if ( $this->trxRoundId !== false ) { |
1434 | throw new DBTransactionError( |
1435 | null, |
1436 | "Transaction round '{$this->trxRoundId}' already started" |
1437 | ); |
1438 | } |
1439 | $this->assertTransactionRoundStage( self::ROUND_CURSORY ); |
1440 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1441 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1442 | |
1443 | // Clear any empty transactions (no writes/callbacks) from the implicit round |
1444 | $this->flushPrimarySnapshots( $fname ); |
1445 | |
1446 | $this->trxRoundId = $fname; |
1447 | $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise |
1448 | // Mark applicable handles as participating in this explicit transaction round. |
1449 | // For each of these handles, any writes and callbacks will be tied to a single |
1450 | // transaction. The (peer) handles will reject begin()/commit() calls unless they |
1451 | // are part of an en masse commit or an en masse rollback. |
1452 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1453 | $this->applyTransactionRoundFlags( $conn ); |
1454 | } |
1455 | $this->trxRoundStage = self::ROUND_CURSORY; |
1456 | } |
1457 | |
1458 | public function commitPrimaryChanges( $fname = __METHOD__ ) { |
1459 | $this->assertTransactionRoundStage( self::ROUND_APPROVED ); |
1460 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1461 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1462 | |
1463 | $failures = []; |
1464 | |
1465 | $restore = ( $this->trxRoundId !== false ); |
1466 | $this->trxRoundId = false; |
1467 | $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise |
1468 | // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT). |
1469 | // Note that callbacks should already be suppressed due to finalizePrimaryChanges(). |
1470 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1471 | try { |
1472 | $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS ); |
1473 | } catch ( DBError $e ) { |
1474 | ( $this->errorLogger )( $e ); |
1475 | $failures[] = "{$conn->getServerName()}: {$e->getMessage()}"; |
1476 | } |
1477 | } |
1478 | if ( $failures ) { |
1479 | throw new DBTransactionError( |
1480 | null, |
1481 | "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) ) |
1482 | ); |
1483 | } |
1484 | if ( $restore ) { |
1485 | // Unmark handles as participating in this explicit transaction round |
1486 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1487 | $this->undoTransactionRoundFlags( $conn ); |
1488 | } |
1489 | } |
1490 | $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS; |
1491 | } |
1492 | |
1493 | public function runPrimaryTransactionIdleCallbacks( $fname = __METHOD__ ) { |
1494 | if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) { |
1495 | $type = IDatabase::TRIGGER_COMMIT; |
1496 | } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) { |
1497 | $type = IDatabase::TRIGGER_ROLLBACK; |
1498 | } else { |
1499 | throw new DBTransactionError( |
1500 | null, |
1501 | "Transaction should be in the callback stage (not '{$this->trxRoundStage}')" |
1502 | ); |
1503 | } |
1504 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1505 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1506 | |
1507 | $oldStage = $this->trxRoundStage; |
1508 | $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise |
1509 | |
1510 | // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs |
1511 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1512 | $conn->setTrxEndCallbackSuppression( false ); |
1513 | } |
1514 | |
1515 | $errors = []; |
1516 | $fname = __METHOD__; |
1517 | // Loop until callbacks stop adding callbacks on other connections |
1518 | do { |
1519 | // Run any pending callbacks for each connection... |
1520 | $count = 0; // callback execution attempts |
1521 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1522 | if ( $conn->trxLevel() ) { |
1523 | continue; // retry in the next iteration, after commit() is called |
1524 | } |
1525 | $count += $conn->runOnTransactionIdleCallbacks( $type, $errors ); |
1526 | } |
1527 | // Clear out any active transactions left over from callbacks... |
1528 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1529 | if ( $conn->writesPending() ) { |
1530 | // A callback from another handle wrote to this one and DBO_TRX is set |
1531 | $fnames = implode( ', ', $conn->pendingWriteAndCallbackCallers() ); |
1532 | $this->logger->warning( |
1533 | "$fname: found writes pending ($fnames).", |
1534 | $this->getConnLogContext( |
1535 | $conn, |
1536 | [ 'exception' => new RuntimeException() ] |
1537 | ) |
1538 | ); |
1539 | } elseif ( $conn->trxLevel() ) { |
1540 | // A callback from another handle read from this one and DBO_TRX is set, |
1541 | // which can easily happen if there is only one DB (no replicas) |
1542 | $this->logger->debug( "$fname: found empty transaction." ); |
1543 | } |
1544 | try { |
1545 | $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS ); |
1546 | } catch ( DBError $ex ) { |
1547 | $errors[] = $ex; |
1548 | } |
1549 | } |
1550 | } while ( $count > 0 ); |
1551 | |
1552 | $this->trxRoundStage = $oldStage; |
1553 | |
1554 | return $errors[0] ?? null; |
1555 | } |
1556 | |
1557 | public function runPrimaryTransactionListenerCallbacks( $fname = __METHOD__ ) { |
1558 | if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) { |
1559 | $type = IDatabase::TRIGGER_COMMIT; |
1560 | } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) { |
1561 | $type = IDatabase::TRIGGER_ROLLBACK; |
1562 | } else { |
1563 | throw new DBTransactionError( |
1564 | null, |
1565 | "Transaction should be in the callback stage (not '{$this->trxRoundStage}')" |
1566 | ); |
1567 | } |
1568 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1569 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1570 | |
1571 | $errors = []; |
1572 | $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise |
1573 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1574 | $conn->runTransactionListenerCallbacks( $type, $errors ); |
1575 | } |
1576 | $this->trxRoundStage = self::ROUND_CURSORY; |
1577 | |
1578 | return $errors[0] ?? null; |
1579 | } |
1580 | |
1581 | public function rollbackPrimaryChanges( $fname = __METHOD__ ) { |
1582 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1583 | $scope = ScopedCallback::newScopedIgnoreUserAbort(); |
1584 | |
1585 | $restore = ( $this->trxRoundId !== false ); |
1586 | $this->trxRoundId = false; |
1587 | $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise |
1588 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1589 | $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS ); |
1590 | } |
1591 | if ( $restore ) { |
1592 | // Unmark handles as participating in this explicit transaction round |
1593 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1594 | $this->undoTransactionRoundFlags( $conn ); |
1595 | } |
1596 | } |
1597 | $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS; |
1598 | } |
1599 | |
1600 | public function flushPrimarySessions( $fname = __METHOD__ ) { |
1601 | $this->assertTransactionRoundStage( [ self::ROUND_CURSORY ] ); |
1602 | if ( $this->hasPrimaryChanges() ) { |
1603 | // Any transaction should have been rolled back beforehand |
1604 | throw new DBTransactionError( null, "Cannot reset session while writes are pending" ); |
1605 | } |
1606 | |
1607 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1608 | $conn->flushSession( $fname, $conn::FLUSHING_ALL_PEERS ); |
1609 | } |
1610 | } |
1611 | |
1612 | /** |
1613 | * @param string|string[] $stage |
1614 | * @throws DBTransactionError |
1615 | */ |
1616 | private function assertTransactionRoundStage( $stage ) { |
1617 | $stages = (array)$stage; |
1618 | |
1619 | if ( !in_array( $this->trxRoundStage, $stages, true ) ) { |
1620 | $stageList = implode( |
1621 | '/', |
1622 | array_map( static function ( $v ) { |
1623 | return "'$v'"; |
1624 | }, $stages ) |
1625 | ); |
1626 | throw new DBTransactionError( |
1627 | null, |
1628 | "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')" |
1629 | ); |
1630 | } |
1631 | } |
1632 | |
1633 | /** |
1634 | * Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round |
1635 | * |
1636 | * Some servers may have neither flag enabled, meaning that they opt out of such |
1637 | * transaction rounds and remain in auto-commit mode. Such behavior might be desired |
1638 | * when a DB server is used for something like simple key/value storage. |
1639 | * |
1640 | * @param Database $conn |
1641 | */ |
1642 | private function applyTransactionRoundFlags( Database $conn ) { |
1643 | if ( $conn->getLBInfo( self::INFO_CONN_CATEGORY ) !== self::CATEGORY_ROUND ) { |
1644 | return; // transaction rounds do not apply to these connections |
1645 | } |
1646 | |
1647 | if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) { |
1648 | // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT. |
1649 | // Force DBO_TRX even in CLI mode since a commit round is expected soon. |
1650 | $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR ); |
1651 | } |
1652 | |
1653 | if ( $conn->getFlag( $conn::DBO_TRX ) ) { |
1654 | $conn->setLBInfo( $conn::LB_TRX_ROUND_ID, $this->trxRoundId ); |
1655 | } |
1656 | } |
1657 | |
1658 | /** |
1659 | * @param Database $conn |
1660 | */ |
1661 | private function undoTransactionRoundFlags( Database $conn ) { |
1662 | if ( $conn->getLBInfo( self::INFO_CONN_CATEGORY ) !== self::CATEGORY_ROUND ) { |
1663 | return; // transaction rounds do not apply to these connections |
1664 | } |
1665 | |
1666 | if ( $conn->getFlag( $conn::DBO_TRX ) ) { |
1667 | $conn->setLBInfo( $conn::LB_TRX_ROUND_ID, null ); // remove the round ID |
1668 | } |
1669 | |
1670 | if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) { |
1671 | $conn->restoreFlags( $conn::RESTORE_PRIOR ); |
1672 | } |
1673 | } |
1674 | |
1675 | public function flushReplicaSnapshots( $fname = __METHOD__ ) { |
1676 | foreach ( $this->conns as $poolConnsByServer ) { |
1677 | foreach ( $poolConnsByServer as $serverIndex => $serverConns ) { |
1678 | if ( $serverIndex === ServerInfo::WRITER_INDEX ) { |
1679 | continue; // skip primary |
1680 | } |
1681 | foreach ( $serverConns as $conn ) { |
1682 | $conn->flushSnapshot( $fname ); |
1683 | } |
1684 | } |
1685 | } |
1686 | } |
1687 | |
1688 | public function flushPrimarySnapshots( $fname = __METHOD__ ) { |
1689 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1690 | $conn->flushSnapshot( $fname ); |
1691 | } |
1692 | } |
1693 | |
1694 | public function hasPrimaryConnection() { |
1695 | return (bool)$this->getAnyOpenConnection( ServerInfo::WRITER_INDEX ); |
1696 | } |
1697 | |
1698 | public function hasPrimaryChanges() { |
1699 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1700 | if ( $conn->writesOrCallbacksPending() ) { |
1701 | return true; |
1702 | } |
1703 | } |
1704 | |
1705 | return false; |
1706 | } |
1707 | |
1708 | public function lastPrimaryChangeTimestamp() { |
1709 | $lastTime = false; |
1710 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1711 | $lastTime = max( $lastTime, $conn->lastDoneWrites() ); |
1712 | } |
1713 | |
1714 | return $lastTime; |
1715 | } |
1716 | |
1717 | public function hasOrMadeRecentPrimaryChanges( $age = null ) { |
1718 | $age ??= self::MAX_WAIT_DEFAULT; |
1719 | |
1720 | return ( $this->hasPrimaryChanges() |
1721 | || $this->lastPrimaryChangeTimestamp() > microtime( true ) - $age ); |
1722 | } |
1723 | |
1724 | public function pendingPrimaryChangeCallers() { |
1725 | $fnames = []; |
1726 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1727 | $fnames = array_merge( $fnames, $conn->pendingWriteCallers() ); |
1728 | } |
1729 | |
1730 | return $fnames; |
1731 | } |
1732 | |
1733 | public function explicitTrxActive() { |
1734 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1735 | if ( $conn->explicitTrxActive() ) { |
1736 | return true; |
1737 | } |
1738 | } |
1739 | return false; |
1740 | } |
1741 | |
1742 | private function setLaggedReplicaMode(): void { |
1743 | $this->laggedReplicaMode = true; |
1744 | $this->logger->warning( __METHOD__ . ": setting lagged replica mode" ); |
1745 | } |
1746 | |
1747 | public function laggedReplicaUsed() { |
1748 | return $this->laggedReplicaMode; |
1749 | } |
1750 | |
1751 | public function getReadOnlyReason( $domain = false ) { |
1752 | if ( $this->readOnlyReason !== false ) { |
1753 | return $this->readOnlyReason; |
1754 | } elseif ( $this->isPrimaryRunningReadOnly() ) { |
1755 | return 'The primary database server is running in read-only mode.'; |
1756 | } |
1757 | |
1758 | return false; |
1759 | } |
1760 | |
1761 | /** |
1762 | * @note This method suppresses DBError exceptions in order to avoid severe downtime |
1763 | * @param IDatabase|null $conn Recently acquired primary connection; null if not applicable |
1764 | * @return bool Whether the entire primary DB server or the local domain DB is read-only |
1765 | */ |
1766 | private function isPrimaryRunningReadOnly( IDatabase $conn = null ) { |
1767 | // Context will often be HTTP GET/HEAD; heavily cache the results |
1768 | return (bool)$this->wanCache->getWithSetCallback( |
1769 | // Note that table prefixes are not related to server-side read-only mode |
1770 | $this->wanCache->makeGlobalKey( |
1771 | 'rdbms-server-readonly', |
1772 | $this->serverInfo->getPrimaryServerName() |
1773 | ), |
1774 | self::TTL_CACHE_READONLY, |
1775 | function ( $oldValue ) use ( $conn ) { |
1776 | $scope = $this->trxProfiler->silenceForScope(); |
1777 | $conn ??= $this->getServerConnection( |
1778 | ServerInfo::WRITER_INDEX, |
1779 | self::DOMAIN_ANY, |
1780 | self::CONN_SILENCE_ERRORS |
1781 | ); |
1782 | if ( $conn ) { |
1783 | try { |
1784 | $value = (int)$conn->serverIsReadOnly(); |
1785 | } catch ( DBError $e ) { |
1786 | $value = is_int( $oldValue ) ? $oldValue : 0; |
1787 | } |
1788 | } else { |
1789 | $value = 0; |
1790 | } |
1791 | ScopedCallback::consume( $scope ); |
1792 | |
1793 | return $value; |
1794 | }, |
1795 | [ |
1796 | 'busyValue' => 0, |
1797 | 'pcTTL' => WANObjectCache::TTL_PROC_LONG |
1798 | ] |
1799 | ); |
1800 | } |
1801 | |
1802 | public function pingAll() { |
1803 | $success = true; |
1804 | foreach ( $this->getOpenConnections() as $conn ) { |
1805 | if ( !$conn->ping() ) { |
1806 | $success = false; |
1807 | } |
1808 | } |
1809 | |
1810 | return $success; |
1811 | } |
1812 | |
1813 | /** |
1814 | * Get all open connections |
1815 | * @return \Generator|Database[] |
1816 | */ |
1817 | private function getOpenConnections() { |
1818 | foreach ( $this->conns as $poolConnsByServer ) { |
1819 | foreach ( $poolConnsByServer as $serverConns ) { |
1820 | foreach ( $serverConns as $conn ) { |
1821 | yield $conn; |
1822 | } |
1823 | } |
1824 | } |
1825 | } |
1826 | |
1827 | /** |
1828 | * Get all open primary connections |
1829 | * @return \Generator|Database[] |
1830 | */ |
1831 | private function getOpenPrimaryConnections() { |
1832 | foreach ( $this->conns as $poolConnsByServer ) { |
1833 | /** @var IDatabase $conn */ |
1834 | foreach ( ( $poolConnsByServer[ServerInfo::WRITER_INDEX] ?? [] ) as $conn ) { |
1835 | yield $conn; |
1836 | } |
1837 | } |
1838 | } |
1839 | |
1840 | public function getMaxLag() { |
1841 | $host = ''; |
1842 | $maxLag = -1; |
1843 | $maxIndex = 0; |
1844 | |
1845 | if ( $this->serverInfo->hasReplicaServers() ) { |
1846 | $lagTimes = $this->getLagTimes(); |
1847 | foreach ( $lagTimes as $i => $lag ) { |
1848 | // Allowing the value to be unset due to stale cache (T361824) |
1849 | $load = $this->groupLoads[self::GROUP_GENERIC][$i] ?? 0; |
1850 | if ( $load > 0 && $lag > $maxLag ) { |
1851 | $maxLag = $lag; |
1852 | $host = $this->serverInfo->getServerInfoStrict( $i, 'host' ); |
1853 | $maxIndex = $i; |
1854 | } |
1855 | } |
1856 | } |
1857 | |
1858 | return [ $host, $maxLag, $maxIndex ]; |
1859 | } |
1860 | |
1861 | public function getLagTimes() { |
1862 | if ( !$this->hasReplicaServers() ) { |
1863 | return [ ServerInfo::WRITER_INDEX => 0 ]; // no replication = no lag |
1864 | } |
1865 | return $this->wanCache->getWithSetCallback( |
1866 | $this->wanCache->makeGlobalKey( 'rdbms-lags', $this->clusterName ?? '' ), |
1867 | // Add jitter to avoid stampede |
1868 | 10 + mt_rand( 1, 10 ), |
1869 | function () { |
1870 | $lags = []; |
1871 | foreach ( $this->serverInfo->getStreamingReplicaIndexes() as $i ) { |
1872 | $conn = $this->getServerConnection( |
1873 | $i, |
1874 | self::DOMAIN_ANY, |
1875 | self::CONN_SILENCE_ERRORS | self::CONN_UNTRACKED_GAUGE |
1876 | ); |
1877 | if ( $conn ) { |
1878 | $lags[$i] = $conn->getLag(); |
1879 | $conn->close(); |
1880 | } else { |
1881 | $lags[$i] = false; |
1882 | } |
1883 | } |
1884 | return $lags; |
1885 | }, |
1886 | [ 'lockTSE' => 30 ] |
1887 | ); |
1888 | } |
1889 | |
1890 | public function waitForPrimaryPos( IDatabase $conn ) { |
1891 | if ( $conn->getLBInfo( self::INFO_SERVER_INDEX ) === ServerInfo::WRITER_INDEX ) { |
1892 | return true; // not a replica DB server |
1893 | } |
1894 | |
1895 | // Get the current primary DB position, opening a connection only if needed |
1896 | $flags = self::CONN_SILENCE_ERRORS; |
1897 | $primaryConn = $this->getAnyOpenConnection( ServerInfo::WRITER_INDEX, $flags ); |
1898 | if ( $primaryConn ) { |
1899 | $pos = $primaryConn->getPrimaryPos(); |
1900 | } else { |
1901 | $primaryConn = $this->getServerConnection( ServerInfo::WRITER_INDEX, self::DOMAIN_ANY, $flags ); |
1902 | if ( !$primaryConn ) { |
1903 | throw new DBReplicationWaitError( |
1904 | null, |
1905 | "Could not obtain a primary database connection to get the position" |
1906 | ); |
1907 | } |
1908 | $pos = $primaryConn->getPrimaryPos(); |
1909 | $this->closeConnection( $primaryConn ); |
1910 | } |
1911 | |
1912 | if ( $pos instanceof DBPrimaryPos ) { |
1913 | $this->logger->debug( __METHOD__ . ': waiting' ); |
1914 | $result = $conn->primaryPosWait( $pos, self::MAX_WAIT_DEFAULT ); |
1915 | $ok = ( $result !== null && $result != -1 ); |
1916 | if ( $ok ) { |
1917 | $this->logger->debug( __METHOD__ . ': done waiting (success)' ); |
1918 | } else { |
1919 | $this->logger->debug( __METHOD__ . ': done waiting (failure)' ); |
1920 | } |
1921 | } else { |
1922 | $ok = false; // something is misconfigured |
1923 | $this->logger->error( |
1924 | __METHOD__ . ': could not get primary pos for {db_server}', |
1925 | $this->getConnLogContext( $conn, [ 'exception' => new RuntimeException() ] ) |
1926 | ); |
1927 | } |
1928 | |
1929 | return $ok; |
1930 | } |
1931 | |
1932 | public function setTransactionListener( $name, callable $callback = null ) { |
1933 | if ( $callback ) { |
1934 | $this->trxRecurringCallbacks[$name] = $callback; |
1935 | } else { |
1936 | unset( $this->trxRecurringCallbacks[$name] ); |
1937 | } |
1938 | foreach ( $this->getOpenPrimaryConnections() as $conn ) { |
1939 | $conn->setTransactionListener( $name, $callback ); |
1940 | } |
1941 | } |
1942 | |
1943 | public function setTableAliases( array $aliases ) { |
1944 | $this->tableAliases = $aliases; |
1945 | } |
1946 | |
1947 | public function setIndexAliases( array $aliases ) { |
1948 | $this->indexAliases = $aliases; |
1949 | } |
1950 | |
1951 | public function setDomainAliases( array $aliases ) { |
1952 | $this->domainAliases = $aliases; |
1953 | } |
1954 | |
1955 | public function setLocalDomainPrefix( $prefix ) { |
1956 | $oldLocalDomain = $this->localDomain; |
1957 | $this->localDomain = new DatabaseDomain( |
1958 | $this->localDomain->getDatabase(), |
1959 | $this->localDomain->getSchema(), |
1960 | $prefix |
1961 | ); |
1962 | |
1963 | // Update the prefix for existing connections. |
1964 | // Existing DBConnRef handles will not be affected. |
1965 | foreach ( $this->getOpenConnections() as $conn ) { |
1966 | if ( $oldLocalDomain->equals( $conn->getDomainID() ) ) { |
1967 | $conn->tablePrefix( $prefix ); |
1968 | } |
1969 | } |
1970 | } |
1971 | |
1972 | public function redefineLocalDomain( $domain ) { |
1973 | $this->closeAll( __METHOD__ ); |
1974 | $this->localDomain = DatabaseDomain::newFromId( $domain ); |
1975 | } |
1976 | |
1977 | public function setTempTablesOnlyMode( $value, $domain ) { |
1978 | $old = $this->tempTablesOnlyMode[$domain] ?? false; |
1979 | if ( $value ) { |
1980 | $this->tempTablesOnlyMode[$domain] = true; |
1981 | } else { |
1982 | unset( $this->tempTablesOnlyMode[$domain] ); |
1983 | } |
1984 | |
1985 | return $old; |
1986 | } |
1987 | |
1988 | /** |
1989 | * @param IDatabase $conn |
1990 | * @return string Description of a connection handle for log messages |
1991 | * @throws InvalidArgumentException |
1992 | */ |
1993 | private function stringifyConn( IDatabase $conn ) { |
1994 | return $conn->getLBInfo( self::INFO_SERVER_INDEX ) . '/' . $conn->getDomainID(); |
1995 | } |
1996 | |
1997 | /** |
1998 | * @param int $flags A bitfield of flags |
1999 | * @param int $bit Bit flag constant |
2000 | * @return bool Whether the bit field has the specified bit flag set |
2001 | */ |
2002 | private function fieldHasBit( int $flags, int $bit ) { |
2003 | return ( ( $flags & $bit ) === $bit ); |
2004 | } |
2005 | |
2006 | /** |
2007 | * Create a log context to pass to PSR-3 logger functions. |
2008 | * |
2009 | * @param IDatabase $conn |
2010 | * @param array $extras Additional data to add to context |
2011 | * @return array |
2012 | */ |
2013 | protected function getConnLogContext( IDatabase $conn, array $extras = [] ) { |
2014 | return array_merge( |
2015 | [ |
2016 | 'db_server' => $conn->getServerName(), |
2017 | 'db_domain' => $conn->getDomainID() |
2018 | ], |
2019 | $extras |
2020 | ); |
2021 | } |
2022 | |
2023 | /** |
2024 | * @param float|null &$time Mock UNIX timestamp for testing |
2025 | * @internal |
2026 | * @codeCoverageIgnore |
2027 | */ |
2028 | public function setMockTime( &$time ) { |
2029 | $this->loadMonitor->setMockTime( $time ); |
2030 | } |
2031 | } |