Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 391 |
|
0.00% |
0 / 66 |
CRAP | |
0.00% |
0 / 1 |
TransactionManager | |
0.00% |
0 / 391 |
|
0.00% |
0 / 66 |
25760 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
trxLevel | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
getTrxId | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
consumeTrxId | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
trxTimestamp | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
trxStatus | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
setTrxStatusToOk | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
setTrxStatusToNone | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
assertTransactionStatus | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
20 | |||
assertSessionStatus | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
setTransactionError | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
setTrxStatusIgnoredCause | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
sessionStatus | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
setSessionError | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
clearSessionError | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
calculateLastTrxApplyTime | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
pendingWriteCallers | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
updateTrxWriteQueryReport | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
30 | |||
pendingWriteQueryDuration | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
flatAtomicSectionList | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
resetTrxAtomicLevels | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
explicitTrxActive | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
12 | |||
trxCheckBeforeClose | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
42 | |||
onAtomicSectionCancel | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
12 | |||
onCancelAtomicBeforeCriticalSection | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
12 | |||
currentAtomicSectionId | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
addToAtomicLevels | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
onBegin | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
20 | |||
onCommit | |
0.00% |
0 / 29 |
|
0.00% |
0 / 1 |
90 | |||
onEndAtomic | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
20 | |||
getPositionFromSectionId | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
cancelAtomic | |
0.00% |
0 / 21 |
|
0.00% |
0 / 1 |
20 | |||
popAtomicLevel | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
setAutomaticAtomic | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
turnOnAutomatic | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
nextSavePointId | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
6 | |||
writesPending | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
onDestruct | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
12 | |||
transactionWritingIn | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
transactionWritingOut | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
6 | |||
recordQueryCompletion | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
2 | |||
onTransactionResolution | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
addPostCommitOrIdleCallback | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
addPreCommitOrIdleCallback | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
setTransactionListener | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
setTrxEndCallbackSuppression | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
reassignCallbacksForSection | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
90 | |||
modifyCallbacksForCancel | |
0.00% |
0 / 22 |
|
0.00% |
0 / 1 |
30 | |||
consumeEndCallbacks | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
6 | |||
runOnAtomicSectionCancelCallbacks | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
20 | |||
runOnTransactionPreCommitCallbacks | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
12 | |||
clearPreEndCallbacks | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
clearEndCallbacks | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
writesOrCallbacksPending | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
42 | |||
pendingWriteAndCallbackCallers | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
12 | |||
pendingPreCommitCallbackCallers | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
isEndCallbacksSuppressed | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getRecurringCallbacks | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
countPostCommitOrIdleCallbacks | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
onBeginInCriticalSection | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
2 | |||
onRollbackInCriticalSection | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
onCommitInCriticalSection | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
onSessionLoss | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
onEndAtomicInCriticalSection | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
onFlushSnapshot | |
0.00% |
0 / 24 |
|
0.00% |
0 / 1 |
56 | |||
onGetScopedLockAndFlush | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | namespace Wikimedia\Rdbms; |
21 | |
22 | use Psr\Log\LoggerInterface; |
23 | use Psr\Log\NullLogger; |
24 | use RuntimeException; |
25 | use Throwable; |
26 | use UnexpectedValueException; |
27 | |
28 | /** |
29 | * @ingroup Database |
30 | * @internal This class should not be used outside of Database |
31 | */ |
32 | class TransactionManager { |
33 | /** Transaction is in a error state requiring a full or savepoint rollback */ |
34 | public const STATUS_TRX_ERROR = 1; |
35 | /** Transaction is active and in a normal state */ |
36 | public const STATUS_TRX_OK = 2; |
37 | /** No transaction is active */ |
38 | public const STATUS_TRX_NONE = 3; |
39 | |
40 | /** Session is in a error state requiring a reset */ |
41 | public const STATUS_SESS_ERROR = 1; |
42 | /** Session is in a normal state */ |
43 | public const STATUS_SESS_OK = 2; |
44 | |
45 | /** @var float Guess of how many seconds it takes to replicate a small insert */ |
46 | private const TINY_WRITE_SEC = 0.010; |
47 | /** @var float Consider a write slow if it took more than this many seconds */ |
48 | private const SLOW_WRITE_SEC = 0.500; |
49 | /** Assume an insert of this many rows or less should be fast to replicate */ |
50 | private const SMALL_WRITE_ROWS = 100; |
51 | |
52 | /** @var string Prefix to the atomic section counter used to make savepoint IDs */ |
53 | private const SAVEPOINT_PREFIX = 'wikimedia_rdbms_atomic'; |
54 | |
55 | /** @var ?TransactionIdentifier Application-side ID of the active (server-side) transaction */ |
56 | private $trxId; |
57 | /** @var ?float UNIX timestamp of BEGIN for the last transaction */ |
58 | private $trxTimestamp = null; |
59 | /** @var ?float Round trip time estimate for queries during the last transaction */ |
60 | private $trxRoundTripDelay = null; |
61 | /** @var int STATUS_TRX_* constant indicating the transaction lifecycle state */ |
62 | private $trxStatus = self::STATUS_TRX_NONE; |
63 | /** @var ?Throwable The cause of any unresolved transaction lifecycle state error */ |
64 | private $trxStatusCause; |
65 | /** @var ?array Details of any unresolved statement-rollback error within a transaction */ |
66 | private $trxStatusIgnoredCause; |
67 | |
68 | /** @var ?Throwable The cause of any unresolved session state loss error */ |
69 | private $sessionError; |
70 | |
71 | /** @var string[] Write query callers of the last transaction */ |
72 | private $trxWriteCallers = []; |
73 | /** @var float Seconds spent in write queries for the last transaction */ |
74 | private $trxWriteDuration = 0.0; |
75 | /** @var int Number of write queries for the last transaction */ |
76 | private $trxWriteQueryCount = 0; |
77 | /** @var int Number of rows affected by write queries for the last transaction */ |
78 | private $trxWriteAffectedRows = 0; |
79 | /** @var float Like trxWriteQueryCount but excludes lock-bound, easy to replicate, queries */ |
80 | private $trxWriteAdjDuration = 0.0; |
81 | /** @var int Number of write queries counted in trxWriteAdjDuration */ |
82 | private $trxWriteAdjQueryCount = 0; |
83 | |
84 | /** @var array Pending atomic sections; list of (name, unique ID, savepoint ID) */ |
85 | private $trxAtomicLevels = []; |
86 | /** @var bool Whether the last transaction was started implicitly due to DBO_TRX */ |
87 | private $trxAutomatic = false; |
88 | /** @var bool Whether the last transaction was started implicitly by startAtomic() */ |
89 | private $trxAutomaticAtomic = false; |
90 | |
91 | /** @var ?string Name of the function that started the last transaction */ |
92 | private $trxFname = null; |
93 | /** @var int Counter for atomic savepoint identifiers for the last transaction */ |
94 | private $trxAtomicCounter = 0; |
95 | |
96 | /** |
97 | * @var array[] Pending postcommit callbacks; list of (callable, method name, atomic section id) |
98 | * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}> |
99 | */ |
100 | private $trxPostCommitOrIdleCallbacks = []; |
101 | /** |
102 | * @var array[] Pending precommit callbacks; list of (callable, method name, atomic section id) |
103 | * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}> |
104 | */ |
105 | private $trxPreCommitOrIdleCallbacks = []; |
106 | /** |
107 | * @var array[] Pending post-trx callbacks; list of (callable, method name, atomic section id) |
108 | * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}> |
109 | */ |
110 | private $trxEndCallbacks = []; |
111 | /** |
112 | * @var array[] Pending cancel callbacks; list of (callable, method name, atomic section id) |
113 | * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}> |
114 | */ |
115 | private $trxSectionCancelCallbacks = []; |
116 | /** @var callable[] Listener callbacks; map of (name => callable) */ |
117 | private $trxRecurringCallbacks = []; |
118 | /** @var bool Whether to suppress triggering of transaction end callbacks */ |
119 | private $trxEndCallbacksSuppressed = false; |
120 | |
121 | /** @var LoggerInterface */ |
122 | private $logger; |
123 | /** @var TransactionProfiler */ |
124 | private $profiler; |
125 | |
126 | public function __construct( LoggerInterface $logger = null, $profiler = null ) { |
127 | $this->logger = $logger ?? new NullLogger(); |
128 | $this->profiler = $profiler ?? new TransactionProfiler(); |
129 | } |
130 | |
131 | public function trxLevel() { |
132 | return $this->trxId ? 1 : 0; |
133 | } |
134 | |
135 | /** |
136 | * Get the application-side transaction identifier instance |
137 | * |
138 | * @return ?TransactionIdentifier Token for the active transaction; null if there isn't one |
139 | */ |
140 | public function getTrxId() { |
141 | return $this->trxId; |
142 | } |
143 | |
144 | /** |
145 | * Reset the application-side transaction identifier instance and return the old one |
146 | * |
147 | * @return ?TransactionIdentifier The old transaction token; null if there wasn't one |
148 | */ |
149 | private function consumeTrxId() { |
150 | $old = $this->trxId; |
151 | $this->trxId = null; |
152 | |
153 | return $old; |
154 | } |
155 | |
156 | public function trxTimestamp(): ?float { |
157 | return $this->trxLevel() ? $this->trxTimestamp : null; |
158 | } |
159 | |
160 | /** |
161 | * @return int One of the STATUS_TRX_* class constants |
162 | */ |
163 | public function trxStatus(): int { |
164 | return $this->trxStatus; |
165 | } |
166 | |
167 | public function setTrxStatusToOk() { |
168 | $this->trxStatus = self::STATUS_TRX_OK; |
169 | $this->trxStatusCause = null; |
170 | $this->trxStatusIgnoredCause = null; |
171 | } |
172 | |
173 | public function setTrxStatusToNone() { |
174 | $this->trxStatus = self::STATUS_TRX_NONE; |
175 | $this->trxStatusCause = null; |
176 | $this->trxStatusIgnoredCause = null; |
177 | } |
178 | |
179 | public function assertTransactionStatus( IDatabase $db, $deprecationLogger, $fname ) { |
180 | if ( $this->trxStatus < self::STATUS_TRX_OK ) { |
181 | throw new DBTransactionStateError( |
182 | $db, |
183 | "Cannot execute query from $fname while transaction status is ERROR", |
184 | [], |
185 | $this->trxStatusCause |
186 | ); |
187 | } elseif ( $this->trxStatus === self::STATUS_TRX_OK && $this->trxStatusIgnoredCause ) { |
188 | [ $iLastError, $iLastErrno, $iFname ] = $this->trxStatusIgnoredCause; |
189 | call_user_func( $deprecationLogger, |
190 | "Caller from $fname ignored an error originally raised from $iFname: " . |
191 | "[$iLastErrno] $iLastError" |
192 | ); |
193 | $this->trxStatusIgnoredCause = null; |
194 | } |
195 | } |
196 | |
197 | public function assertSessionStatus( IDatabase $db, $fname ) { |
198 | if ( $this->sessionError ) { |
199 | throw new DBSessionStateError( |
200 | $db, |
201 | "Cannot execute query from $fname while session status is ERROR", |
202 | [], |
203 | $this->sessionError |
204 | ); |
205 | } |
206 | } |
207 | |
208 | /** |
209 | * Mark the transaction as requiring rollback (STATUS_TRX_ERROR) due to an error |
210 | * |
211 | * @param Throwable $trxError |
212 | */ |
213 | public function setTransactionError( Throwable $trxError ) { |
214 | if ( $this->trxStatus > self::STATUS_TRX_ERROR ) { |
215 | $this->trxStatus = self::STATUS_TRX_ERROR; |
216 | $this->trxStatusCause = $trxError; |
217 | } |
218 | } |
219 | |
220 | /** |
221 | * @param array|null $trxStatusIgnoredCause |
222 | */ |
223 | public function setTrxStatusIgnoredCause( ?array $trxStatusIgnoredCause ): void { |
224 | $this->trxStatusIgnoredCause = $trxStatusIgnoredCause; |
225 | } |
226 | |
227 | /** |
228 | * Get the status of the current session (ephemeral server-side state tied to the connection) |
229 | * |
230 | * @return int One of the STATUS_SESSION_* class constants |
231 | */ |
232 | public function sessionStatus() { |
233 | // Check if an unresolved error still exists |
234 | return ( $this->sessionError ) ? self::STATUS_SESS_ERROR : self::STATUS_SESS_OK; |
235 | } |
236 | |
237 | /** |
238 | * Flag the session as needing a reset due to an error, if not already flagged |
239 | * |
240 | * @param Throwable $sessionError |
241 | */ |
242 | public function setSessionError( Throwable $sessionError ) { |
243 | $this->sessionError ??= $sessionError; |
244 | } |
245 | |
246 | /** |
247 | * Unflag the session as needing a reset due to an error |
248 | */ |
249 | public function clearSessionError() { |
250 | $this->sessionError = null; |
251 | } |
252 | |
253 | /** |
254 | * @param float $rtt |
255 | * @return float Time to apply writes to replicas based on trxWrite* fields |
256 | */ |
257 | private function calculateLastTrxApplyTime( float $rtt ) { |
258 | $rttAdjTotal = $this->trxWriteAdjQueryCount * $rtt; |
259 | $applyTime = max( $this->trxWriteAdjDuration - $rttAdjTotal, 0.0 ); |
260 | // For omitted queries, make them count as something at least |
261 | $omitted = $this->trxWriteQueryCount - $this->trxWriteAdjQueryCount; |
262 | $applyTime += self::TINY_WRITE_SEC * $omitted; |
263 | |
264 | return $applyTime; |
265 | } |
266 | |
267 | public function pendingWriteCallers() { |
268 | return $this->trxLevel() ? $this->trxWriteCallers : []; |
269 | } |
270 | |
271 | /** |
272 | * Update the estimated run-time of a query, not counting large row lock times |
273 | * |
274 | * LoadBalancer can be set to rollback transactions that will create huge replication |
275 | * lag. It bases this estimate off of pendingWriteQueryDuration(). Certain simple |
276 | * queries, like inserting a row can take a long time due to row locking. This method |
277 | * uses some simple heuristics to discount those cases. |
278 | * |
279 | * @param string $queryVerb action in the write query |
280 | * @param float $runtime Total runtime, including RTT |
281 | * @param int $affected Affected row count |
282 | * @param string $fname method name invoking the action |
283 | */ |
284 | public function updateTrxWriteQueryReport( $queryVerb, $runtime, $affected, $fname ) { |
285 | // Whether this is indicative of replica DB runtime (except for RBR or ws_repl) |
286 | $indicativeOfReplicaRuntime = true; |
287 | if ( $runtime > self::SLOW_WRITE_SEC ) { |
288 | // insert(), upsert(), replace() are fast unless bulky in size or blocked on locks |
289 | if ( $queryVerb === 'INSERT' ) { |
290 | $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS; |
291 | } elseif ( $queryVerb === 'REPLACE' ) { |
292 | $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS / 2; |
293 | } |
294 | } |
295 | |
296 | $this->trxWriteDuration += $runtime; |
297 | $this->trxWriteQueryCount += 1; |
298 | $this->trxWriteAffectedRows += $affected; |
299 | if ( $indicativeOfReplicaRuntime ) { |
300 | $this->trxWriteAdjDuration += $runtime; |
301 | $this->trxWriteAdjQueryCount += 1; |
302 | } |
303 | |
304 | $this->trxWriteCallers[] = $fname; |
305 | } |
306 | |
307 | public function pendingWriteQueryDuration( $type = IDatabase::ESTIMATE_TOTAL ) { |
308 | if ( !$this->trxLevel() ) { |
309 | return false; |
310 | } elseif ( !$this->trxWriteCallers ) { |
311 | return 0.0; |
312 | } |
313 | |
314 | if ( $type === IDatabase::ESTIMATE_DB_APPLY ) { |
315 | return $this->calculateLastTrxApplyTime( $this->trxRoundTripDelay ); |
316 | } |
317 | |
318 | return $this->trxWriteDuration; |
319 | } |
320 | |
321 | /** |
322 | * @return string |
323 | */ |
324 | private function flatAtomicSectionList() { |
325 | return array_reduce( $this->trxAtomicLevels, static function ( $accum, $v ) { |
326 | return $accum === null ? $v[0] : "$accum, " . $v[0]; |
327 | } ); |
328 | } |
329 | |
330 | public function resetTrxAtomicLevels() { |
331 | $this->trxAtomicLevels = []; |
332 | $this->trxAtomicCounter = 0; |
333 | } |
334 | |
335 | public function explicitTrxActive() { |
336 | return $this->trxLevel() && ( $this->trxAtomicLevels || !$this->trxAutomatic ); |
337 | } |
338 | |
339 | public function trxCheckBeforeClose( IDatabaseForOwner $db, $fname ) { |
340 | $error = null; |
341 | if ( $this->trxAtomicLevels ) { |
342 | // Cannot let incomplete atomic sections be committed |
343 | $levels = $this->flatAtomicSectionList(); |
344 | $error = "$fname: atomic sections $levels are still open"; |
345 | } elseif ( $this->trxAutomatic ) { |
346 | // Only the connection manager can commit non-empty DBO_TRX transactions |
347 | // (empty ones we can silently roll back) |
348 | if ( $db->writesOrCallbacksPending() ) { |
349 | $error = "$fname: " . |
350 | "expected mass rollback of all peer transactions (DBO_TRX set)"; |
351 | } |
352 | } else { |
353 | // Manual transactions should have been committed or rolled |
354 | // back, even if empty. |
355 | $error = "$fname: transaction is still open (from {$this->trxFname})"; |
356 | } |
357 | |
358 | if ( $this->trxEndCallbacksSuppressed && $error === null ) { |
359 | $error = "$fname: callbacks are suppressed; cannot properly commit"; |
360 | } |
361 | |
362 | return $error; |
363 | } |
364 | |
365 | public function onAtomicSectionCancel( IDatabase $db, $callback, $fname ): void { |
366 | if ( !$this->trxLevel() || !$this->trxAtomicLevels ) { |
367 | throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" ); |
368 | } |
369 | $this->trxSectionCancelCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ]; |
370 | } |
371 | |
372 | public function onCancelAtomicBeforeCriticalSection( IDatabase $db, $fname ): void { |
373 | if ( !$this->trxLevel() || !$this->trxAtomicLevels ) { |
374 | throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" ); |
375 | } |
376 | } |
377 | |
378 | /** |
379 | * @return AtomicSectionIdentifier|null ID of the topmost atomic section level |
380 | */ |
381 | public function currentAtomicSectionId(): ?AtomicSectionIdentifier { |
382 | if ( $this->trxLevel() && $this->trxAtomicLevels ) { |
383 | $levelInfo = end( $this->trxAtomicLevels ); |
384 | |
385 | return $levelInfo[1]; |
386 | } |
387 | |
388 | return null; |
389 | } |
390 | |
391 | public function addToAtomicLevels( $fname, AtomicSectionIdentifier $sectionId, $savepointId ) { |
392 | $this->trxAtomicLevels[] = [ $fname, $sectionId, $savepointId ]; |
393 | $this->logger->debug( 'startAtomic: entering level ' . |
394 | ( count( $this->trxAtomicLevels ) - 1 ) . " ($fname)", [ 'db_log_category' => 'trx' ] ); |
395 | } |
396 | |
397 | public function onBegin( IDatabase $db, $fname ): void { |
398 | // Protect against mismatched atomic section, transaction nesting, and snapshot loss |
399 | if ( $this->trxLevel() ) { |
400 | if ( $this->trxAtomicLevels ) { |
401 | $levels = $this->flatAtomicSectionList(); |
402 | $msg = "$fname: got explicit BEGIN while atomic section(s) $levels are open"; |
403 | throw new DBUnexpectedError( $db, $msg ); |
404 | } elseif ( !$this->trxAutomatic ) { |
405 | $msg = "$fname: explicit transaction already active (from {$this->trxFname})"; |
406 | throw new DBUnexpectedError( $db, $msg ); |
407 | } else { |
408 | $msg = "$fname: implicit transaction already active (from {$this->trxFname})"; |
409 | throw new DBUnexpectedError( $db, $msg ); |
410 | } |
411 | } |
412 | } |
413 | |
414 | /** |
415 | * @param IDatabase $db |
416 | * @param string $fname |
417 | * @param string $flush one of IDatabase::FLUSHING_* values |
418 | * @return bool false if the commit should go aborted, true otherwise. |
419 | */ |
420 | public function onCommit( IDatabase $db, $fname, $flush ): bool { |
421 | if ( $this->trxLevel() && $this->trxAtomicLevels ) { |
422 | // There are still atomic sections open; this cannot be ignored |
423 | $levels = $this->flatAtomicSectionList(); |
424 | throw new DBUnexpectedError( |
425 | $db, |
426 | "$fname: got COMMIT while atomic sections $levels are still open" |
427 | ); |
428 | } |
429 | |
430 | if ( $flush === IDatabase::FLUSHING_INTERNAL || $flush === IDatabase::FLUSHING_ALL_PEERS ) { |
431 | if ( !$this->trxLevel() ) { |
432 | return false; // nothing to do |
433 | } elseif ( !$this->trxAutomatic ) { |
434 | throw new DBUnexpectedError( |
435 | $db, |
436 | "$fname: flushing an explicit transaction, getting out of sync" |
437 | ); |
438 | } |
439 | } elseif ( !$this->trxLevel() ) { |
440 | $this->logger->error( |
441 | "$fname: no transaction to commit, something got out of sync", |
442 | [ |
443 | 'exception' => new RuntimeException(), |
444 | 'db_log_category' => 'trx' |
445 | ] |
446 | ); |
447 | |
448 | return false; // nothing to do |
449 | } elseif ( $this->trxAutomatic ) { |
450 | throw new DBUnexpectedError( |
451 | $db, |
452 | "$fname: expected mass commit of all peer transactions (DBO_TRX set)" |
453 | ); |
454 | } |
455 | return true; |
456 | } |
457 | |
458 | public function onEndAtomic( IDatabase $db, $fname ): array { |
459 | if ( !$this->trxLevel() || !$this->trxAtomicLevels ) { |
460 | throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" ); |
461 | } |
462 | // Check if the current section matches $fname |
463 | $pos = count( $this->trxAtomicLevels ) - 1; |
464 | [ $savedFname, $sectionId, $savepointId ] = $this->trxAtomicLevels[$pos]; |
465 | $this->logger->debug( "endAtomic: leaving level $pos ($fname)", [ 'db_log_category' => 'trx' ] ); |
466 | |
467 | if ( $savedFname !== $fname ) { |
468 | throw new DBUnexpectedError( |
469 | $db, |
470 | "Invalid atomic section ended (got $fname but expected $savedFname)" |
471 | ); |
472 | } |
473 | |
474 | return [ $savepointId, $sectionId ]; |
475 | } |
476 | |
477 | public function getPositionFromSectionId( AtomicSectionIdentifier $sectionId = null ): ?int { |
478 | if ( $sectionId !== null ) { |
479 | // Find the (last) section with the given $sectionId |
480 | $pos = -1; |
481 | foreach ( $this->trxAtomicLevels as $i => [ , $asId, ] ) { |
482 | if ( $asId === $sectionId ) { |
483 | $pos = $i; |
484 | } |
485 | } |
486 | } else { |
487 | $pos = null; |
488 | } |
489 | |
490 | return $pos; |
491 | } |
492 | |
493 | public function cancelAtomic( $pos ) { |
494 | $excisedIds = []; |
495 | $excisedFnames = []; |
496 | $newTopSection = $this->currentAtomicSectionId(); |
497 | if ( $pos !== null ) { |
498 | // Remove all descendant sections and re-index the array |
499 | $len = count( $this->trxAtomicLevels ); |
500 | for ( $i = $pos + 1; $i < $len; ++$i ) { |
501 | $excisedFnames[] = $this->trxAtomicLevels[$i][0]; |
502 | $excisedIds[] = $this->trxAtomicLevels[$i][1]; |
503 | } |
504 | $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 ); |
505 | $newTopSection = $this->currentAtomicSectionId(); |
506 | } |
507 | |
508 | // Check if the current section matches $fname |
509 | $pos = count( $this->trxAtomicLevels ) - 1; |
510 | [ $savedFname, $savedSectionId, $savepointId ] = $this->trxAtomicLevels[$pos]; |
511 | |
512 | if ( $excisedFnames ) { |
513 | $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname) " . |
514 | "and descendants " . implode( ', ', $excisedFnames ), |
515 | [ 'db_log_category' => 'trx' ] |
516 | ); |
517 | } else { |
518 | $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname)", |
519 | [ 'db_log_category' => 'trx' ] |
520 | ); |
521 | } |
522 | |
523 | return [ $savedFname, $excisedIds, $newTopSection, $savedSectionId, $savepointId ]; |
524 | } |
525 | |
526 | /** |
527 | * @return bool Whether no levels remain and transaction was started by a popped level |
528 | */ |
529 | public function popAtomicLevel() { |
530 | array_pop( $this->trxAtomicLevels ); |
531 | |
532 | return !$this->trxAtomicLevels && $this->trxAutomaticAtomic; |
533 | } |
534 | |
535 | public function setAutomaticAtomic( $value ) { |
536 | $this->trxAutomaticAtomic = $value; |
537 | } |
538 | |
539 | public function turnOnAutomatic() { |
540 | $this->trxAutomatic = true; |
541 | } |
542 | |
543 | public function nextSavePointId( IDatabase $db, $fname ) { |
544 | $savepointId = self::SAVEPOINT_PREFIX . ++$this->trxAtomicCounter; |
545 | if ( strlen( $savepointId ) > 30 ) { |
546 | // 30 == Oracle's identifier length limit (pre 12c) |
547 | // With a 22 character prefix, that puts the highest number at 99999999. |
548 | throw new DBUnexpectedError( |
549 | $db, |
550 | 'There have been an excessively large number of atomic sections in a transaction' |
551 | . " started by $this->trxFname (at $fname)" |
552 | ); |
553 | } |
554 | |
555 | return $savepointId; |
556 | } |
557 | |
558 | public function writesPending() { |
559 | return $this->trxLevel() && $this->trxWriteCallers; |
560 | } |
561 | |
562 | public function onDestruct() { |
563 | if ( $this->trxLevel() && $this->trxWriteCallers ) { |
564 | trigger_error( "Uncommitted DB writes (transaction from {$this->trxFname})" ); |
565 | } |
566 | } |
567 | |
568 | public function transactionWritingIn( $serverName, $domainId, float $startTime ) { |
569 | if ( !$this->trxWriteCallers ) { |
570 | $this->profiler->transactionWritingIn( |
571 | $serverName, |
572 | $domainId, |
573 | (string)$this->trxId, |
574 | $startTime |
575 | ); |
576 | } |
577 | } |
578 | |
579 | public function transactionWritingOut( IDatabase $db, $oldId ) { |
580 | if ( $this->trxWriteCallers ) { |
581 | $this->profiler->transactionWritingOut( |
582 | $db->getServerName(), |
583 | $db->getDomainID(), |
584 | $oldId, |
585 | $this->pendingWriteQueryDuration( IDatabase::ESTIMATE_TOTAL ), |
586 | $this->trxWriteAffectedRows |
587 | ); |
588 | } |
589 | } |
590 | |
591 | public function recordQueryCompletion( $sql, $startTime, $isPermWrite, $rowCount, $serverName ) { |
592 | $this->profiler->recordQueryCompletion( |
593 | $sql, |
594 | $startTime, |
595 | $isPermWrite, |
596 | $rowCount, |
597 | (string)$this->trxId, |
598 | $serverName |
599 | ); |
600 | } |
601 | |
602 | public function onTransactionResolution( IDatabase $db, callable $callback, $fname ) { |
603 | if ( !$this->trxLevel() ) { |
604 | throw new DBUnexpectedError( $db, "No transaction is active" ); |
605 | } |
606 | $this->trxEndCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ]; |
607 | } |
608 | |
609 | public function addPostCommitOrIdleCallback( callable $callback, $fname ) { |
610 | $this->trxPostCommitOrIdleCallbacks[] = [ |
611 | $callback, |
612 | $fname, |
613 | $this->currentAtomicSectionId() |
614 | ]; |
615 | } |
616 | |
617 | final public function addPreCommitOrIdleCallback( callable $callback, $fname ) { |
618 | $this->trxPreCommitOrIdleCallbacks[] = [ |
619 | $callback, |
620 | $fname, |
621 | $this->currentAtomicSectionId() |
622 | ]; |
623 | } |
624 | |
625 | public function setTransactionListener( $name, callable $callback = null ) { |
626 | if ( $callback ) { |
627 | $this->trxRecurringCallbacks[$name] = $callback; |
628 | } else { |
629 | unset( $this->trxRecurringCallbacks[$name] ); |
630 | } |
631 | } |
632 | |
633 | /** |
634 | * Whether to disable running of post-COMMIT/ROLLBACK callbacks |
635 | * @param bool $suppress |
636 | */ |
637 | public function setTrxEndCallbackSuppression( bool $suppress ) { |
638 | $this->trxEndCallbacksSuppressed = $suppress; |
639 | } |
640 | |
641 | /** |
642 | * Hoist callback ownership for callbacks in a section to a parent section. |
643 | * All callbacks should have an owner that is present in trxAtomicLevels. |
644 | * @param AtomicSectionIdentifier $old |
645 | * @param AtomicSectionIdentifier $new |
646 | */ |
647 | public function reassignCallbacksForSection( |
648 | AtomicSectionIdentifier $old, |
649 | AtomicSectionIdentifier $new |
650 | ) { |
651 | foreach ( $this->trxPreCommitOrIdleCallbacks as $key => $info ) { |
652 | if ( $info[2] === $old ) { |
653 | $this->trxPreCommitOrIdleCallbacks[$key][2] = $new; |
654 | } |
655 | } |
656 | foreach ( $this->trxPostCommitOrIdleCallbacks as $key => $info ) { |
657 | if ( $info[2] === $old ) { |
658 | $this->trxPostCommitOrIdleCallbacks[$key][2] = $new; |
659 | } |
660 | } |
661 | foreach ( $this->trxEndCallbacks as $key => $info ) { |
662 | if ( $info[2] === $old ) { |
663 | $this->trxEndCallbacks[$key][2] = $new; |
664 | } |
665 | } |
666 | foreach ( $this->trxSectionCancelCallbacks as $key => $info ) { |
667 | if ( $info[2] === $old ) { |
668 | $this->trxSectionCancelCallbacks[$key][2] = $new; |
669 | } |
670 | } |
671 | } |
672 | |
673 | /** |
674 | * Update callbacks that were owned by cancelled atomic sections. |
675 | * |
676 | * Callbacks for "on commit" should never be run if they're owned by a |
677 | * section that won't be committed. |
678 | * |
679 | * Callbacks for "on resolution" need to reflect that the section was |
680 | * rolled back, even if the transaction as a whole commits successfully. |
681 | * |
682 | * Callbacks for "on section cancel" should already have been consumed, |
683 | * but errors during the cancellation itself can prevent that while still |
684 | * destroying the section. Hoist any such callbacks to the new top section, |
685 | * which we assume will itself have to be cancelled or rolled back to |
686 | * resolve the error. |
687 | * |
688 | * @param AtomicSectionIdentifier[] $excisedSectionsId Cancelled section IDs |
689 | * @param AtomicSectionIdentifier|null $newSectionId New top section ID |
690 | * @throws UnexpectedValueException |
691 | */ |
692 | public function modifyCallbacksForCancel( |
693 | array $excisedSectionsId, |
694 | AtomicSectionIdentifier $newSectionId = null |
695 | ) { |
696 | // Cancel the "on commit" callbacks owned by this savepoint |
697 | $this->trxPostCommitOrIdleCallbacks = array_filter( |
698 | $this->trxPostCommitOrIdleCallbacks, |
699 | static function ( $entry ) use ( $excisedSectionsId ) { |
700 | return !in_array( $entry[2], $excisedSectionsId, true ); |
701 | } |
702 | ); |
703 | $this->trxPreCommitOrIdleCallbacks = array_filter( |
704 | $this->trxPreCommitOrIdleCallbacks, |
705 | static function ( $entry ) use ( $excisedSectionsId ) { |
706 | return !in_array( $entry[2], $excisedSectionsId, true ); |
707 | } |
708 | ); |
709 | // Make "on resolution" callbacks owned by this savepoint to perceive a rollback |
710 | foreach ( $this->trxEndCallbacks as $key => $entry ) { |
711 | if ( in_array( $entry[2], $excisedSectionsId, true ) ) { |
712 | $callback = $entry[0]; |
713 | $this->trxEndCallbacks[$key][0] = static function ( $t, $db ) use ( $callback ) { |
714 | return $callback( IDatabase::TRIGGER_ROLLBACK, $db ); |
715 | }; |
716 | // This "on resolution" callback no longer belongs to a section. |
717 | $this->trxEndCallbacks[$key][2] = null; |
718 | } |
719 | } |
720 | // Hoist callback ownership for section cancel callbacks to the new top section |
721 | foreach ( $this->trxSectionCancelCallbacks as $key => $entry ) { |
722 | if ( in_array( $entry[2], $excisedSectionsId, true ) ) { |
723 | $this->trxSectionCancelCallbacks[$key][2] = $newSectionId; |
724 | } |
725 | } |
726 | } |
727 | |
728 | public function consumeEndCallbacks( $trigger ): array { |
729 | $callbackEntries = array_merge( |
730 | $this->trxPostCommitOrIdleCallbacks, |
731 | $this->trxEndCallbacks, |
732 | ( $trigger === IDatabase::TRIGGER_ROLLBACK ) |
733 | ? $this->trxSectionCancelCallbacks |
734 | : [] // just consume them |
735 | ); |
736 | $this->trxPostCommitOrIdleCallbacks = []; // consumed (and recursion guard) |
737 | $this->trxEndCallbacks = []; // consumed (recursion guard) |
738 | $this->trxSectionCancelCallbacks = []; // consumed (recursion guard) |
739 | |
740 | return $callbackEntries; |
741 | } |
742 | |
743 | /** |
744 | * Consume and run any relevant "on atomic section cancel" callbacks for the active transaction |
745 | * |
746 | * @param IDatabase $db |
747 | * @param int $trigger IDatabase::TRIGGER_* constant |
748 | * @param AtomicSectionIdentifier[] $sectionIds IDs of the sections that where just cancelled |
749 | * @throws Throwable Any exception thrown by a callback |
750 | */ |
751 | public function runOnAtomicSectionCancelCallbacks( IDatabase $db, int $trigger, array $sectionIds ) { |
752 | // Drain the queue of matching "atomic section cancel" callbacks until there are none |
753 | $unrelatedCallbackEntries = []; |
754 | do { |
755 | $callbackEntries = $this->trxSectionCancelCallbacks; |
756 | $this->trxSectionCancelCallbacks = []; // consumed (recursion guard) |
757 | foreach ( $callbackEntries as $entry ) { |
758 | if ( in_array( $entry[2], $sectionIds, true ) ) { |
759 | try { |
760 | $entry[0]( $trigger, $db ); |
761 | } catch ( Throwable $trxError ) { |
762 | $this->setTransactionError( $trxError ); |
763 | throw $trxError; |
764 | } |
765 | } else { |
766 | $unrelatedCallbackEntries[] = $entry; |
767 | } |
768 | } |
769 | // @phan-suppress-next-line PhanImpossibleConditionInLoop |
770 | } while ( $this->trxSectionCancelCallbacks ); |
771 | |
772 | $this->trxSectionCancelCallbacks = $unrelatedCallbackEntries; |
773 | } |
774 | |
775 | /** |
776 | * Consume and run any "on transaction pre-commit" callbacks |
777 | * |
778 | * @param IDatabase $db |
779 | * @return int Number of callbacks attempted |
780 | * @throws Throwable Any exception thrown by a callback |
781 | */ |
782 | public function runOnTransactionPreCommitCallbacks( IDatabase $db ): int { |
783 | $count = 0; |
784 | |
785 | // Drain the queues of transaction "precommit" callbacks until it is empty |
786 | do { |
787 | $callbackEntries = $this->trxPreCommitOrIdleCallbacks; |
788 | $this->trxPreCommitOrIdleCallbacks = []; // consumed (and recursion guard) |
789 | $count += count( $callbackEntries ); |
790 | foreach ( $callbackEntries as $entry ) { |
791 | try { |
792 | $entry[0]( $db ); |
793 | } catch ( Throwable $trxError ) { |
794 | $this->setTransactionError( $trxError ); |
795 | throw $trxError; |
796 | } |
797 | } |
798 | // @phan-suppress-next-line PhanImpossibleConditionInLoop |
799 | } while ( $this->trxPreCommitOrIdleCallbacks ); |
800 | |
801 | return $count; |
802 | } |
803 | |
804 | public function clearPreEndCallbacks() { |
805 | $this->trxPostCommitOrIdleCallbacks = []; |
806 | $this->trxPreCommitOrIdleCallbacks = []; |
807 | } |
808 | |
809 | public function clearEndCallbacks() { |
810 | $this->trxEndCallbacks = []; // don't copy |
811 | $this->trxSectionCancelCallbacks = []; // don't copy |
812 | } |
813 | |
814 | public function writesOrCallbacksPending(): bool { |
815 | return $this->trxLevel() && ( |
816 | $this->trxWriteCallers || |
817 | $this->trxPostCommitOrIdleCallbacks || |
818 | $this->trxPreCommitOrIdleCallbacks || |
819 | $this->trxEndCallbacks || |
820 | $this->trxSectionCancelCallbacks |
821 | ); |
822 | } |
823 | |
824 | /** |
825 | * List the methods that have write queries or callbacks for the current transaction |
826 | * |
827 | * @return string[] |
828 | */ |
829 | public function pendingWriteAndCallbackCallers(): array { |
830 | $fnames = $this->pendingWriteCallers(); |
831 | foreach ( [ |
832 | $this->trxPostCommitOrIdleCallbacks, |
833 | $this->trxPreCommitOrIdleCallbacks, |
834 | $this->trxEndCallbacks, |
835 | $this->trxSectionCancelCallbacks |
836 | ] as $callbacks ) { |
837 | foreach ( $callbacks as $callback ) { |
838 | $fnames[] = $callback[1]; |
839 | } |
840 | } |
841 | |
842 | return $fnames; |
843 | } |
844 | |
845 | /** |
846 | * List the methods that have precommit callbacks for the current transaction |
847 | * |
848 | * @return string[] |
849 | */ |
850 | public function pendingPreCommitCallbackCallers(): array { |
851 | $fnames = $this->pendingWriteCallers(); |
852 | foreach ( $this->trxPreCommitOrIdleCallbacks as $callback ) { |
853 | $fnames[] = $callback[1]; |
854 | } |
855 | |
856 | return $fnames; |
857 | } |
858 | |
859 | public function isEndCallbacksSuppressed(): bool { |
860 | return $this->trxEndCallbacksSuppressed; |
861 | } |
862 | |
863 | public function getRecurringCallbacks() { |
864 | return $this->trxRecurringCallbacks; |
865 | } |
866 | |
867 | public function countPostCommitOrIdleCallbacks() { |
868 | return count( $this->trxPostCommitOrIdleCallbacks ); |
869 | } |
870 | |
871 | /** |
872 | * @param string $mode One of IDatabase::TRANSACTION_* values |
873 | * @param string $fname method name |
874 | * @param float $rtt Trivial query round-trip-delay |
875 | */ |
876 | public function onBeginInCriticalSection( $mode, $fname, $rtt ) { |
877 | $this->trxId = new TransactionIdentifier(); |
878 | $this->setTrxStatusToOk(); |
879 | $this->resetTrxAtomicLevels(); |
880 | $this->trxWriteCallers = []; |
881 | $this->trxWriteDuration = 0.0; |
882 | $this->trxWriteQueryCount = 0; |
883 | $this->trxWriteAffectedRows = 0; |
884 | $this->trxWriteAdjDuration = 0.0; |
885 | $this->trxWriteAdjQueryCount = 0; |
886 | // T147697: make explicitTrxActive() return true until begin() finishes. This way, |
887 | // no caller triggered by getApproximateLagStatus() will think its OK to muck around |
888 | // with the transaction just because startAtomic() has not yet finished updating the |
889 | // tracking fields (e.g. trxAtomicLevels). |
890 | $this->trxAutomatic = ( $mode === IDatabase::TRANSACTION_INTERNAL ); |
891 | $this->trxAutomaticAtomic = false; |
892 | $this->trxFname = $fname; |
893 | $this->trxTimestamp = microtime( true ); |
894 | $this->trxRoundTripDelay = $rtt; |
895 | } |
896 | |
897 | public function onRollbackInCriticalSection( IDatabase $db ) { |
898 | $oldTrxId = $this->consumeTrxId(); |
899 | $this->setTrxStatusToNone(); |
900 | $this->resetTrxAtomicLevels(); |
901 | $this->clearPreEndCallbacks(); |
902 | $this->transactionWritingOut( $db, (string)$oldTrxId ); |
903 | } |
904 | |
905 | public function onCommitInCriticalSection( IDatabase $db ) { |
906 | $lastWriteTime = null; |
907 | |
908 | $oldTrxId = $this->consumeTrxId(); |
909 | $this->setTrxStatusToNone(); |
910 | if ( $this->trxWriteCallers ) { |
911 | $lastWriteTime = microtime( true ); |
912 | $this->transactionWritingOut( $db, (string)$oldTrxId ); |
913 | } |
914 | |
915 | return $lastWriteTime; |
916 | } |
917 | |
918 | public function onSessionLoss( IDatabase $db ) { |
919 | $oldTrxId = $this->consumeTrxId(); |
920 | $this->clearPreEndCallbacks(); |
921 | $this->transactionWritingOut( $db, (string)$oldTrxId ); |
922 | } |
923 | |
924 | public function onEndAtomicInCriticalSection( $sectionId ) { |
925 | // Hoist callback ownership for callbacks in the section that just ended; |
926 | // all callbacks should have an owner that is present in trxAtomicLevels. |
927 | $currentSectionId = $this->currentAtomicSectionId(); |
928 | if ( $currentSectionId ) { |
929 | $this->reassignCallbacksForSection( $sectionId, $currentSectionId ); |
930 | } |
931 | } |
932 | |
933 | public function onFlushSnapshot( IDatabase $db, $fname, $flush, $trxRoundId ) { |
934 | if ( $this->explicitTrxActive() ) { |
935 | // Committing this transaction would break callers that assume it is still open |
936 | throw new DBUnexpectedError( |
937 | $db, |
938 | "$fname: Cannot flush snapshot; " . |
939 | "explicit transaction '{$this->trxFname}' is still open" |
940 | ); |
941 | } elseif ( $this->writesOrCallbacksPending() ) { |
942 | // This only flushes transactions to clear snapshots, not to write data |
943 | $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() ); |
944 | throw new DBUnexpectedError( |
945 | $db, |
946 | "$fname: Cannot flush snapshot; " . |
947 | "writes from transaction {$this->trxFname} are still pending ($fnames)" |
948 | ); |
949 | } elseif ( |
950 | $this->trxLevel() && |
951 | $trxRoundId && |
952 | $flush !== IDatabase::FLUSHING_INTERNAL && |
953 | $flush !== IDatabase::FLUSHING_ALL_PEERS |
954 | ) { |
955 | $this->logger->warning( |
956 | "$fname: Expected mass snapshot flush of all peer transactions " . |
957 | "in the explicit transactions round '{$trxRoundId}'", |
958 | [ |
959 | 'exception' => new RuntimeException(), |
960 | 'db_log_category' => 'trx' |
961 | ] |
962 | ); |
963 | } |
964 | } |
965 | |
966 | public function onGetScopedLockAndFlush( IDatabase $db, $fname ) { |
967 | if ( $this->writesOrCallbacksPending() ) { |
968 | // This only flushes transactions to clear snapshots, not to write data |
969 | $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() ); |
970 | throw new DBUnexpectedError( |
971 | $db, |
972 | "$fname: Cannot flush pre-lock snapshot; " . |
973 | "writes from transaction {$this->trxFname} are still pending ($fnames)" |
974 | ); |
975 | } |
976 | } |
977 | } |