Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 391 |
|
0.00% |
0 / 66 |
CRAP | |
0.00% |
0 / 1 |
TransactionManager | |
0.00% |
0 / 391 |
|
0.00% |
0 / 66 |
25760 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
trxLevel | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
newTrxId | |
0.00% |
0 / 18 |
|
0.00% |
0 / 1 |
2 | |||
getTrxId | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
consumeTrxId | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
2 | |||
trxTimestamp | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
trxStatus | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
setTrxStatusToOk | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
setTrxStatusToNone | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
assertTransactionStatus | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
20 | |||
assertSessionStatus | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
setTransactionError | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
setTrxStatusIgnoredCause | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
sessionStatus | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
setSessionError | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
clearSessionError | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
calculateLastTrxApplyTime | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
pendingWriteCallers | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
updateTrxWriteQueryReport | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
30 | |||
pendingWriteQueryDuration | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
flatAtomicSectionList | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
resetTrxAtomicLevels | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
explicitTrxActive | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
12 | |||
trxCheckBeforeClose | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
42 | |||
onAtomicSectionCancel | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
12 | |||
onCancelAtomicBeforeCriticalSection | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
12 | |||
currentAtomicSectionId | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
addToAtomicLevels | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
onBeginTransaction | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 | |||
onCommit | |
0.00% |
0 / 29 |
|
0.00% |
0 / 1 |
90 | |||
onEndAtomic | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
20 | |||
getPositionFromSectionId | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
cancelAtomic | |
0.00% |
0 / 21 |
|
0.00% |
0 / 1 |
20 | |||
popAtomicLevel | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
isClean | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
setAutomaticAtomic | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
turnOnAutomatic | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
nextSavePointId | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
6 | |||
writesPending | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
onDestruct | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
12 | |||
transactionWritingIn | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
12 | |||
transactionWritingOut | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
6 | |||
recordQueryCompletion | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
2 | |||
onTransactionResolution | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
addPostCommitOrIdleCallback | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
addPreCommitOrIdleCallback | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
setTransactionListener | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
setTrxEndCallbackSuppression | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
reassignCallbacksForSection | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
90 | |||
modifyCallbacksForCancel | |
0.00% |
0 / 22 |
|
0.00% |
0 / 1 |
30 | |||
consumeEndCallbacks | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
6 | |||
runOnAtomicSectionCancelCallbacks | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
20 | |||
runOnTransactionPreCommitCallbacks | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
12 | |||
clearPreEndCallbacks | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
clearEndCallbacks | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
writesOrCallbacksPending | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
42 | |||
pendingWriteAndCallbackCallers | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
12 | |||
pendingPreCommitCallbackCallers | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
isEndCallbacksSuppressed | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getRecurringCallbacks | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
countPostCommitOrIdleCallbacks | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
onRollback | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
onCommitInCriticalSection | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
onEndAtomicInCriticalSection | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
onFlushSnapshot | |
0.00% |
0 / 24 |
|
0.00% |
0 / 1 |
56 | |||
onGetScopedLockAndFlush | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | namespace Wikimedia\Rdbms; |
21 | |
22 | use Psr\Log\LoggerInterface; |
23 | use Psr\Log\NullLogger; |
24 | use RuntimeException; |
25 | use Throwable; |
26 | use UnexpectedValueException; |
27 | |
28 | /** |
29 | * @ingroup Database |
30 | * @internal This class should not be used outside of Database |
31 | */ |
32 | class TransactionManager { |
33 | /** @var int Transaction is in a error state requiring a full or savepoint rollback */ |
34 | public const STATUS_TRX_ERROR = 1; |
35 | /** @var int Transaction is active and in a normal state */ |
36 | public const STATUS_TRX_OK = 2; |
37 | /** @var int No transaction is active */ |
38 | public const STATUS_TRX_NONE = 3; |
39 | |
40 | /** Session is in a error state requiring a reset */ |
41 | public const STATUS_SESS_ERROR = 1; |
42 | /** Session is in a normal state */ |
43 | public const STATUS_SESS_OK = 2; |
44 | |
45 | /** @var float Guess of how many seconds it takes to replicate a small insert */ |
46 | private const TINY_WRITE_SEC = 0.010; |
47 | /** @var float Consider a write slow if it took more than this many seconds */ |
48 | private const SLOW_WRITE_SEC = 0.500; |
49 | /** @var int Assume an insert of this many rows or less should be fast to replicate */ |
50 | private const SMALL_WRITE_ROWS = 100; |
51 | |
52 | /** @var string Prefix to the atomic section counter used to make savepoint IDs */ |
53 | private const SAVEPOINT_PREFIX = 'wikimedia_rdbms_atomic'; |
54 | |
55 | /** @var TransactionIdentifier|null Application-side ID of the active transaction; null if none */ |
56 | private $trxId; |
57 | /** @var float|null UNIX timestamp at the time of BEGIN for the last transaction */ |
58 | private $trxTimestamp = null; |
59 | /** @var float|null Round trip time estimate for queries during this transaction */ |
60 | private $trxRoundTripDelay = null; |
61 | /** @var int Transaction status */ |
62 | private $trxStatus = self::STATUS_TRX_NONE; |
63 | /** @var Throwable|null The cause of any unresolved transaction state error, or, null */ |
64 | private $trxStatusCause; |
65 | /** @var array|null Details of the last statement-rollback error for the last transaction */ |
66 | private $trxStatusIgnoredCause; |
67 | |
68 | /** @var Throwable|null The cause of any unresolved session state loss error, or, null */ |
69 | private $sessionError; |
70 | |
71 | /** @var string[] Write query callers of the current transaction */ |
72 | private $trxWriteCallers = []; |
73 | /** @var float Seconds spent in write queries for the current transaction */ |
74 | private $trxWriteDuration = 0.0; |
75 | /** @var int Number of write queries for the current transaction */ |
76 | private $trxWriteQueryCount = 0; |
77 | /** @var int Number of rows affected by write queries for the current transaction */ |
78 | private $trxWriteAffectedRows = 0; |
79 | /** @var float Like trxWriteQueryCount but excludes lock-bound, easy to replicate, queries */ |
80 | private $trxWriteAdjDuration = 0.0; |
81 | /** @var int Number of write queries counted in trxWriteAdjDuration */ |
82 | private $trxWriteAdjQueryCount = 0; |
83 | |
84 | /** @var array List of (name, unique ID, savepoint ID) for each active atomic section level */ |
85 | private $trxAtomicLevels = []; |
86 | /** @var bool Whether the current transaction was started implicitly due to DBO_TRX */ |
87 | private $trxAutomatic = false; |
88 | /** @var bool Whether the current transaction was started implicitly by startAtomic() */ |
89 | private $trxAutomaticAtomic = false; |
90 | |
91 | /** @var string|null Name of the function that start the last transaction */ |
92 | private $trxFname = null; |
93 | /** @var bool Whether possible write queries were done in the last transaction started */ |
94 | private $trxDoneWrites = false; |
95 | /** @var int Counter for atomic savepoint identifiers (reset with each transaction) */ |
96 | private $trxAtomicCounter = 0; |
97 | |
98 | /** @var array[] List of (callable, method name, atomic section id) */ |
99 | private $trxPostCommitOrIdleCallbacks = []; |
100 | /** @var array[] List of (callable, method name, atomic section id) */ |
101 | private $trxPreCommitOrIdleCallbacks = []; |
102 | /** |
103 | * @var array[] List of (callable, method name, atomic section id) |
104 | * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}> |
105 | */ |
106 | private $trxEndCallbacks = []; |
107 | /** @var array[] List of (callable, method name, atomic section id) */ |
108 | private $trxSectionCancelCallbacks = []; |
109 | /** @var callable[] Map of (name => callable) */ |
110 | private $trxRecurringCallbacks = []; |
111 | /** @var bool Whether to suppress triggering of transaction end callbacks */ |
112 | private $trxEndCallbacksSuppressed = false; |
113 | |
114 | /** @var LoggerInterface */ |
115 | private $logger; |
116 | /** @var TransactionProfiler */ |
117 | private $profiler; |
118 | |
119 | public function __construct( LoggerInterface $logger = null, $profiler = null ) { |
120 | $this->logger = $logger ?? new NullLogger(); |
121 | $this->profiler = $profiler ?? new TransactionProfiler(); |
122 | } |
123 | |
124 | public function trxLevel() { |
125 | return $this->trxId ? 1 : 0; |
126 | } |
127 | |
128 | /** |
129 | * TODO: This should be removed once all usages have been migrated here |
130 | * @param string $mode One of IDatabase::TRANSACTION_* values |
131 | * @param string $fname method name |
132 | * @param float $rtt Trivial query round-trip-delay |
133 | */ |
134 | public function newTrxId( $mode, $fname, $rtt ) { |
135 | $this->trxId = new TransactionIdentifier(); |
136 | $this->trxStatus = self::STATUS_TRX_OK; |
137 | $this->trxStatusCause = null; |
138 | $this->trxStatusIgnoredCause = null; |
139 | $this->trxWriteDuration = 0.0; |
140 | $this->trxWriteQueryCount = 0; |
141 | $this->trxWriteAffectedRows = 0; |
142 | $this->trxWriteAdjDuration = 0.0; |
143 | $this->trxWriteAdjQueryCount = 0; |
144 | $this->trxWriteCallers = []; |
145 | $this->trxAtomicLevels = []; |
146 | // T147697: make explicitTrxActive() return true until begin() finishes. This way, |
147 | // no caller triggered by getApproximateLagStatus() will think its OK to muck around |
148 | // with the transaction just because startAtomic() has not yet finished updating the |
149 | // tracking fields (e.g. trxAtomicLevels). |
150 | $this->trxAutomatic = ( $mode === IDatabase::TRANSACTION_INTERNAL ); |
151 | $this->trxAutomaticAtomic = false; |
152 | $this->trxFname = $fname; |
153 | $this->trxDoneWrites = false; |
154 | $this->trxAtomicCounter = 0; |
155 | $this->trxTimestamp = microtime( true ); |
156 | $this->trxRoundTripDelay = $rtt; |
157 | } |
158 | |
159 | /** |
160 | * Get the application-side transaction identifier instance |
161 | * |
162 | * @return TransactionIdentifier Token for the active transaction; null if there isn't one |
163 | */ |
164 | public function getTrxId() { |
165 | return $this->trxId; |
166 | } |
167 | |
168 | /** |
169 | * Reset the application-side transaction identifier instance and return the old one |
170 | * |
171 | * This will become private soon. |
172 | * @return TransactionIdentifier|null The old transaction token; null if there wasn't one |
173 | */ |
174 | public function consumeTrxId() { |
175 | $old = $this->trxId; |
176 | $this->trxId = null; |
177 | $this->trxAtomicCounter = 0; |
178 | |
179 | return $old; |
180 | } |
181 | |
182 | public function trxTimestamp(): ?float { |
183 | return $this->trxLevel() ? $this->trxTimestamp : null; |
184 | } |
185 | |
186 | /** |
187 | * @return int One of the STATUS_TRX_* class constants |
188 | */ |
189 | public function trxStatus(): int { |
190 | return $this->trxStatus; |
191 | } |
192 | |
193 | public function setTrxStatusToOk() { |
194 | $this->trxStatus = self::STATUS_TRX_OK; |
195 | $this->trxStatusCause = null; |
196 | $this->trxStatusIgnoredCause = null; |
197 | } |
198 | |
199 | public function setTrxStatusToNone() { |
200 | $this->trxStatus = self::STATUS_TRX_NONE; |
201 | $this->trxStatusCause = null; |
202 | $this->trxStatusIgnoredCause = null; |
203 | } |
204 | |
205 | public function assertTransactionStatus( IDatabase $db, $deprecationLogger, $fname ) { |
206 | if ( $this->trxStatus < self::STATUS_TRX_OK ) { |
207 | throw new DBTransactionStateError( |
208 | $db, |
209 | "Cannot execute query from $fname while transaction status is ERROR", |
210 | [], |
211 | $this->trxStatusCause |
212 | ); |
213 | } elseif ( $this->trxStatus === self::STATUS_TRX_OK && $this->trxStatusIgnoredCause ) { |
214 | [ $iLastError, $iLastErrno, $iFname ] = $this->trxStatusIgnoredCause; |
215 | call_user_func( $deprecationLogger, |
216 | "Caller from $fname ignored an error originally raised from $iFname: " . |
217 | "[$iLastErrno] $iLastError" |
218 | ); |
219 | $this->trxStatusIgnoredCause = null; |
220 | } |
221 | } |
222 | |
223 | public function assertSessionStatus( IDatabase $db, $fname ) { |
224 | if ( $this->sessionError ) { |
225 | throw new DBSessionStateError( |
226 | $db, |
227 | "Cannot execute query from $fname while session status is ERROR", |
228 | [], |
229 | $this->sessionError |
230 | ); |
231 | } |
232 | } |
233 | |
234 | /** |
235 | * Mark the transaction as requiring rollback (STATUS_TRX_ERROR) due to an error |
236 | * |
237 | * @param Throwable $trxError |
238 | */ |
239 | public function setTransactionError( Throwable $trxError ) { |
240 | if ( $this->trxStatus > self::STATUS_TRX_ERROR ) { |
241 | $this->trxStatus = self::STATUS_TRX_ERROR; |
242 | $this->trxStatusCause = $trxError; |
243 | } |
244 | } |
245 | |
246 | /** |
247 | * @param array|null $trxStatusIgnoredCause |
248 | */ |
249 | public function setTrxStatusIgnoredCause( ?array $trxStatusIgnoredCause ): void { |
250 | $this->trxStatusIgnoredCause = $trxStatusIgnoredCause; |
251 | } |
252 | |
253 | /** |
254 | * Get the status of the current session (ephemeral server-side state tied to the connection) |
255 | * |
256 | * @return int One of the STATUS_SESSION_* class constants |
257 | */ |
258 | public function sessionStatus() { |
259 | // Check if an unresolved error still exists |
260 | return ( $this->sessionError ) ? self::STATUS_SESS_ERROR : self::STATUS_SESS_OK; |
261 | } |
262 | |
263 | /** |
264 | * Flag the session as needing a reset due to an error, if not already flagged |
265 | * |
266 | * @param Throwable $sessionError |
267 | */ |
268 | public function setSessionError( Throwable $sessionError ) { |
269 | $this->sessionError ??= $sessionError; |
270 | } |
271 | |
272 | /** |
273 | * Unflag the session as needing a reset due to an error |
274 | */ |
275 | public function clearSessionError() { |
276 | $this->sessionError = null; |
277 | } |
278 | |
279 | /** |
280 | * @param float $rtt |
281 | * @return float Time to apply writes to replicas based on trxWrite* fields |
282 | */ |
283 | private function calculateLastTrxApplyTime( float $rtt ) { |
284 | $rttAdjTotal = $this->trxWriteAdjQueryCount * $rtt; |
285 | $applyTime = max( $this->trxWriteAdjDuration - $rttAdjTotal, 0 ); |
286 | // For omitted queries, make them count as something at least |
287 | $omitted = $this->trxWriteQueryCount - $this->trxWriteAdjQueryCount; |
288 | $applyTime += self::TINY_WRITE_SEC * $omitted; |
289 | |
290 | return $applyTime; |
291 | } |
292 | |
293 | public function pendingWriteCallers() { |
294 | return $this->trxLevel() ? $this->trxWriteCallers : []; |
295 | } |
296 | |
297 | /** |
298 | * Update the estimated run-time of a query, not counting large row lock times |
299 | * |
300 | * LoadBalancer can be set to rollback transactions that will create huge replication |
301 | * lag. It bases this estimate off of pendingWriteQueryDuration(). Certain simple |
302 | * queries, like inserting a row can take a long time due to row locking. This method |
303 | * uses some simple heuristics to discount those cases. |
304 | * |
305 | * @param string $queryVerb action in the write query |
306 | * @param float $runtime Total runtime, including RTT |
307 | * @param int $affected Affected row count |
308 | * @param string $fname method name invoking the action |
309 | */ |
310 | public function updateTrxWriteQueryReport( $queryVerb, $runtime, $affected, $fname ) { |
311 | // Whether this is indicative of replica DB runtime (except for RBR or ws_repl) |
312 | $indicativeOfReplicaRuntime = true; |
313 | if ( $runtime > self::SLOW_WRITE_SEC ) { |
314 | // insert(), upsert(), replace() are fast unless bulky in size or blocked on locks |
315 | if ( $queryVerb === 'INSERT' ) { |
316 | $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS; |
317 | } elseif ( $queryVerb === 'REPLACE' ) { |
318 | $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS / 2; |
319 | } |
320 | } |
321 | |
322 | $this->trxWriteDuration += $runtime; |
323 | $this->trxWriteQueryCount += 1; |
324 | $this->trxWriteAffectedRows += $affected; |
325 | if ( $indicativeOfReplicaRuntime ) { |
326 | $this->trxWriteAdjDuration += $runtime; |
327 | $this->trxWriteAdjQueryCount += 1; |
328 | } |
329 | |
330 | $this->trxWriteCallers[] = $fname; |
331 | } |
332 | |
333 | public function pendingWriteQueryDuration( $type = IDatabase::ESTIMATE_TOTAL ) { |
334 | if ( !$this->trxLevel() ) { |
335 | return false; |
336 | } elseif ( !$this->trxDoneWrites ) { |
337 | return 0.0; |
338 | } |
339 | |
340 | if ( $type === IDatabase::ESTIMATE_DB_APPLY ) { |
341 | return $this->calculateLastTrxApplyTime( $this->trxRoundTripDelay ); |
342 | } |
343 | |
344 | return $this->trxWriteDuration; |
345 | } |
346 | |
347 | /** |
348 | * @return string |
349 | */ |
350 | private function flatAtomicSectionList() { |
351 | return array_reduce( $this->trxAtomicLevels, static function ( $accum, $v ) { |
352 | return $accum === null ? $v[0] : "$accum, " . $v[0]; |
353 | } ); |
354 | } |
355 | |
356 | public function resetTrxAtomicLevels() { |
357 | $this->trxAtomicLevels = []; |
358 | } |
359 | |
360 | public function explicitTrxActive() { |
361 | return $this->trxLevel() && ( $this->trxAtomicLevels || !$this->trxAutomatic ); |
362 | } |
363 | |
364 | public function trxCheckBeforeClose( IDatabase $db, $fname ) { |
365 | $error = null; |
366 | if ( $this->trxAtomicLevels ) { |
367 | // Cannot let incomplete atomic sections be committed |
368 | $levels = $this->flatAtomicSectionList(); |
369 | $error = "$fname: atomic sections $levels are still open"; |
370 | } elseif ( $this->trxAutomatic ) { |
371 | // Only the connection manager can commit non-empty DBO_TRX transactions |
372 | // (empty ones we can silently roll back) |
373 | if ( $db->writesOrCallbacksPending() ) { |
374 | $error = "$fname: " . |
375 | "expected mass rollback of all peer transactions (DBO_TRX set)"; |
376 | } |
377 | } else { |
378 | // Manual transactions should have been committed or rolled |
379 | // back, even if empty. |
380 | $error = "$fname: transaction is still open (from {$this->trxFname})"; |
381 | } |
382 | |
383 | if ( $this->trxEndCallbacksSuppressed && $error === null ) { |
384 | $error = "$fname: callbacks are suppressed; cannot properly commit"; |
385 | } |
386 | |
387 | return $error; |
388 | } |
389 | |
390 | public function onAtomicSectionCancel( IDatabase $db, $callback, $fname ): void { |
391 | if ( !$this->trxLevel() || !$this->trxAtomicLevels ) { |
392 | throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" ); |
393 | } |
394 | $this->trxSectionCancelCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ]; |
395 | } |
396 | |
397 | public function onCancelAtomicBeforeCriticalSection( IDatabase $db, $fname ): void { |
398 | if ( !$this->trxLevel() || !$this->trxAtomicLevels ) { |
399 | throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" ); |
400 | } |
401 | } |
402 | |
403 | /** |
404 | * @return AtomicSectionIdentifier|null ID of the topmost atomic section level |
405 | */ |
406 | public function currentAtomicSectionId(): ?AtomicSectionIdentifier { |
407 | if ( $this->trxLevel() && $this->trxAtomicLevels ) { |
408 | $levelInfo = end( $this->trxAtomicLevels ); |
409 | |
410 | return $levelInfo[1]; |
411 | } |
412 | |
413 | return null; |
414 | } |
415 | |
416 | public function addToAtomicLevels( $fname, AtomicSectionIdentifier $sectionId, $savepointId ) { |
417 | $this->trxAtomicLevels[] = [ $fname, $sectionId, $savepointId ]; |
418 | $this->logger->debug( 'startAtomic: entering level ' . |
419 | ( count( $this->trxAtomicLevels ) - 1 ) . " ($fname)", [ 'db_log_category' => 'trx' ] ); |
420 | } |
421 | |
422 | public function onBeginTransaction( IDatabase $db, $fname ): void { |
423 | // @phan-suppress-previous-line PhanPluginNeverReturnMethod |
424 | if ( $this->trxAtomicLevels ) { |
425 | $levels = $this->flatAtomicSectionList(); |
426 | $msg = "$fname: got explicit BEGIN while atomic section(s) $levels are open"; |
427 | throw new DBUnexpectedError( $db, $msg ); |
428 | } elseif ( !$this->trxAutomatic ) { |
429 | $msg = "$fname: explicit transaction already active (from {$this->trxFname})"; |
430 | throw new DBUnexpectedError( $db, $msg ); |
431 | } else { |
432 | $msg = "$fname: implicit transaction already active (from {$this->trxFname})"; |
433 | throw new DBUnexpectedError( $db, $msg ); |
434 | } |
435 | } |
436 | |
437 | /** |
438 | * @param IDatabase $db |
439 | * @param string $fname |
440 | * @param string $flush one of IDatabase::FLUSHING_* values |
441 | * @return bool false if the commit should go aborted, true otherwise. |
442 | */ |
443 | public function onCommit( IDatabase $db, $fname, $flush ): bool { |
444 | if ( $this->trxLevel() && $this->trxAtomicLevels ) { |
445 | // There are still atomic sections open; this cannot be ignored |
446 | $levels = $this->flatAtomicSectionList(); |
447 | throw new DBUnexpectedError( |
448 | $db, |
449 | "$fname: got COMMIT while atomic sections $levels are still open" |
450 | ); |
451 | } |
452 | |
453 | if ( $flush === IDatabase::FLUSHING_INTERNAL || $flush === IDatabase::FLUSHING_ALL_PEERS ) { |
454 | if ( !$this->trxLevel() ) { |
455 | return false; // nothing to do |
456 | } elseif ( !$this->trxAutomatic ) { |
457 | throw new DBUnexpectedError( |
458 | $db, |
459 | "$fname: flushing an explicit transaction, getting out of sync" |
460 | ); |
461 | } |
462 | } elseif ( !$this->trxLevel() ) { |
463 | $this->logger->error( |
464 | "$fname: no transaction to commit, something got out of sync", |
465 | [ |
466 | 'exception' => new RuntimeException(), |
467 | 'db_log_category' => 'trx' |
468 | ] |
469 | ); |
470 | |
471 | return false; // nothing to do |
472 | } elseif ( $this->trxAutomatic ) { |
473 | throw new DBUnexpectedError( |
474 | $db, |
475 | "$fname: expected mass commit of all peer transactions (DBO_TRX set)" |
476 | ); |
477 | } |
478 | return true; |
479 | } |
480 | |
481 | public function onEndAtomic( IDatabase $db, $fname ): array { |
482 | if ( !$this->trxLevel() || !$this->trxAtomicLevels ) { |
483 | throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" ); |
484 | } |
485 | // Check if the current section matches $fname |
486 | $pos = count( $this->trxAtomicLevels ) - 1; |
487 | [ $savedFname, $sectionId, $savepointId ] = $this->trxAtomicLevels[$pos]; |
488 | $this->logger->debug( "endAtomic: leaving level $pos ($fname)", [ 'db_log_category' => 'trx' ] ); |
489 | |
490 | if ( $savedFname !== $fname ) { |
491 | throw new DBUnexpectedError( |
492 | $db, |
493 | "Invalid atomic section ended (got $fname but expected $savedFname)" |
494 | ); |
495 | } |
496 | |
497 | return [ $savepointId, $sectionId ]; |
498 | } |
499 | |
500 | public function getPositionFromSectionId( AtomicSectionIdentifier $sectionId = null ): ?int { |
501 | if ( $sectionId !== null ) { |
502 | // Find the (last) section with the given $sectionId |
503 | $pos = -1; |
504 | foreach ( $this->trxAtomicLevels as $i => [ , $asId, ] ) { |
505 | if ( $asId === $sectionId ) { |
506 | $pos = $i; |
507 | } |
508 | } |
509 | } else { |
510 | $pos = null; |
511 | } |
512 | |
513 | return $pos; |
514 | } |
515 | |
516 | public function cancelAtomic( $pos ) { |
517 | $excisedIds = []; |
518 | $excisedFnames = []; |
519 | $newTopSection = $this->currentAtomicSectionId(); |
520 | if ( $pos !== null ) { |
521 | // Remove all descendant sections and re-index the array |
522 | $len = count( $this->trxAtomicLevels ); |
523 | for ( $i = $pos + 1; $i < $len; ++$i ) { |
524 | $excisedFnames[] = $this->trxAtomicLevels[$i][0]; |
525 | $excisedIds[] = $this->trxAtomicLevels[$i][1]; |
526 | } |
527 | $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 ); |
528 | $newTopSection = $this->currentAtomicSectionId(); |
529 | } |
530 | |
531 | // Check if the current section matches $fname |
532 | $pos = count( $this->trxAtomicLevels ) - 1; |
533 | [ $savedFname, $savedSectionId, $savepointId ] = $this->trxAtomicLevels[$pos]; |
534 | |
535 | if ( $excisedFnames ) { |
536 | $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname) " . |
537 | "and descendants " . implode( ', ', $excisedFnames ), |
538 | [ 'db_log_category' => 'trx' ] |
539 | ); |
540 | } else { |
541 | $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname)", |
542 | [ 'db_log_category' => 'trx' ] |
543 | ); |
544 | } |
545 | |
546 | return [ $savedFname, $excisedIds, $newTopSection, $savedSectionId, $savepointId ]; |
547 | } |
548 | |
549 | public function popAtomicLevel() { |
550 | array_pop( $this->trxAtomicLevels ); |
551 | } |
552 | |
553 | public function isClean() { |
554 | return !$this->trxAtomicLevels && $this->trxAutomaticAtomic; |
555 | } |
556 | |
557 | public function setAutomaticAtomic( $value ) { |
558 | $this->trxAutomaticAtomic = $value; |
559 | } |
560 | |
561 | public function turnOnAutomatic() { |
562 | $this->trxAutomatic = true; |
563 | } |
564 | |
565 | public function nextSavePointId( IDatabase $db, $fname ) { |
566 | $savepointId = self::SAVEPOINT_PREFIX . ++$this->trxAtomicCounter; |
567 | if ( strlen( $savepointId ) > 30 ) { |
568 | // 30 == Oracle's identifier length limit (pre 12c) |
569 | // With a 22 character prefix, that puts the highest number at 99999999. |
570 | throw new DBUnexpectedError( |
571 | $db, |
572 | 'There have been an excessively large number of atomic sections in a transaction' |
573 | . " started by $this->trxFname (at $fname)" |
574 | ); |
575 | } |
576 | |
577 | return $savepointId; |
578 | } |
579 | |
580 | public function writesPending() { |
581 | return $this->trxLevel() && $this->trxDoneWrites; |
582 | } |
583 | |
584 | public function onDestruct() { |
585 | if ( $this->trxLevel() && $this->trxDoneWrites ) { |
586 | trigger_error( "Uncommitted DB writes (transaction from {$this->trxFname})" ); |
587 | } |
588 | } |
589 | |
590 | public function transactionWritingIn( $serverName, $domainId ) { |
591 | if ( $this->trxLevel() && !$this->trxDoneWrites ) { |
592 | $this->trxDoneWrites = true; |
593 | $this->profiler->transactionWritingIn( |
594 | $serverName, |
595 | $domainId, |
596 | (string)$this->trxId |
597 | ); |
598 | } |
599 | } |
600 | |
601 | public function transactionWritingOut( IDatabase $db, $oldId ) { |
602 | if ( $this->trxDoneWrites ) { |
603 | $this->profiler->transactionWritingOut( |
604 | $db->getServerName(), |
605 | $db->getDomainID(), |
606 | $oldId, |
607 | $this->pendingWriteQueryDuration( IDatabase::ESTIMATE_TOTAL ), |
608 | $this->trxWriteAffectedRows |
609 | ); |
610 | } |
611 | } |
612 | |
613 | public function recordQueryCompletion( $sql, $startTime, $isPermWrite, $rowCount, $serverName ) { |
614 | $this->profiler->recordQueryCompletion( |
615 | $sql, |
616 | $startTime, |
617 | $isPermWrite, |
618 | $rowCount, |
619 | (string)$this->trxId, |
620 | $serverName |
621 | ); |
622 | } |
623 | |
624 | public function onTransactionResolution( IDatabase $db, callable $callback, $fname ) { |
625 | if ( !$this->trxLevel() ) { |
626 | throw new DBUnexpectedError( $db, "No transaction is active" ); |
627 | } |
628 | $this->trxEndCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ]; |
629 | } |
630 | |
631 | public function addPostCommitOrIdleCallback( callable $callback, $fname = __METHOD__ ) { |
632 | $this->trxPostCommitOrIdleCallbacks[] = [ |
633 | $callback, |
634 | $fname, |
635 | $this->currentAtomicSectionId() |
636 | ]; |
637 | } |
638 | |
639 | final public function addPreCommitOrIdleCallback( callable $callback, $fname = __METHOD__ ) { |
640 | $this->trxPreCommitOrIdleCallbacks[] = [ |
641 | $callback, |
642 | $fname, |
643 | $this->currentAtomicSectionId() |
644 | ]; |
645 | } |
646 | |
647 | public function setTransactionListener( $name, callable $callback = null ) { |
648 | if ( $callback ) { |
649 | $this->trxRecurringCallbacks[$name] = $callback; |
650 | } else { |
651 | unset( $this->trxRecurringCallbacks[$name] ); |
652 | } |
653 | } |
654 | |
655 | /** |
656 | * Whether to disable running of post-COMMIT/ROLLBACK callbacks |
657 | * @param bool $suppress |
658 | */ |
659 | public function setTrxEndCallbackSuppression( bool $suppress ) { |
660 | $this->trxEndCallbacksSuppressed = $suppress; |
661 | } |
662 | |
663 | /** |
664 | * Hoist callback ownership for callbacks in a section to a parent section. |
665 | * All callbacks should have an owner that is present in trxAtomicLevels. |
666 | * @param AtomicSectionIdentifier $old |
667 | * @param AtomicSectionIdentifier $new |
668 | */ |
669 | public function reassignCallbacksForSection( |
670 | AtomicSectionIdentifier $old, |
671 | AtomicSectionIdentifier $new |
672 | ) { |
673 | foreach ( $this->trxPreCommitOrIdleCallbacks as $key => $info ) { |
674 | if ( $info[2] === $old ) { |
675 | $this->trxPreCommitOrIdleCallbacks[$key][2] = $new; |
676 | } |
677 | } |
678 | foreach ( $this->trxPostCommitOrIdleCallbacks as $key => $info ) { |
679 | if ( $info[2] === $old ) { |
680 | $this->trxPostCommitOrIdleCallbacks[$key][2] = $new; |
681 | } |
682 | } |
683 | foreach ( $this->trxEndCallbacks as $key => $info ) { |
684 | if ( $info[2] === $old ) { |
685 | $this->trxEndCallbacks[$key][2] = $new; |
686 | } |
687 | } |
688 | foreach ( $this->trxSectionCancelCallbacks as $key => $info ) { |
689 | if ( $info[2] === $old ) { |
690 | $this->trxSectionCancelCallbacks[$key][2] = $new; |
691 | } |
692 | } |
693 | } |
694 | |
695 | /** |
696 | * Update callbacks that were owned by cancelled atomic sections. |
697 | * |
698 | * Callbacks for "on commit" should never be run if they're owned by a |
699 | * section that won't be committed. |
700 | * |
701 | * Callbacks for "on resolution" need to reflect that the section was |
702 | * rolled back, even if the transaction as a whole commits successfully. |
703 | * |
704 | * Callbacks for "on section cancel" should already have been consumed, |
705 | * but errors during the cancellation itself can prevent that while still |
706 | * destroying the section. Hoist any such callbacks to the new top section, |
707 | * which we assume will itself have to be cancelled or rolled back to |
708 | * resolve the error. |
709 | * |
710 | * @param IDatabase $db |
711 | * @param AtomicSectionIdentifier[] $sectionIds ID of an actual savepoint |
712 | * @param AtomicSectionIdentifier|null $newSectionId New top section ID. |
713 | * @throws UnexpectedValueException |
714 | */ |
715 | public function modifyCallbacksForCancel( |
716 | IDatabase $db, |
717 | array $sectionIds, |
718 | AtomicSectionIdentifier $newSectionId = null |
719 | ) { |
720 | // Cancel the "on commit" callbacks owned by this savepoint |
721 | $this->trxPostCommitOrIdleCallbacks = array_filter( |
722 | $this->trxPostCommitOrIdleCallbacks, |
723 | static function ( $entry ) use ( $sectionIds ) { |
724 | return !in_array( $entry[2], $sectionIds, true ); |
725 | } |
726 | ); |
727 | $this->trxPreCommitOrIdleCallbacks = array_filter( |
728 | $this->trxPreCommitOrIdleCallbacks, |
729 | static function ( $entry ) use ( $sectionIds ) { |
730 | return !in_array( $entry[2], $sectionIds, true ); |
731 | } |
732 | ); |
733 | // Make "on resolution" callbacks owned by this savepoint to perceive a rollback |
734 | foreach ( $this->trxEndCallbacks as $key => $entry ) { |
735 | if ( in_array( $entry[2], $sectionIds, true ) ) { |
736 | $callback = $entry[0]; |
737 | $this->trxEndCallbacks[$key][0] = static function () use ( $callback, $db ) { |
738 | return $callback( IDatabase::TRIGGER_ROLLBACK, $db ); |
739 | }; |
740 | // This "on resolution" callback no longer belongs to a section. |
741 | $this->trxEndCallbacks[$key][2] = null; |
742 | } |
743 | } |
744 | // Hoist callback ownership for section cancel callbacks to the new top section |
745 | foreach ( $this->trxSectionCancelCallbacks as $key => $entry ) { |
746 | if ( in_array( $entry[2], $sectionIds, true ) ) { |
747 | $this->trxSectionCancelCallbacks[$key][2] = $newSectionId; |
748 | } |
749 | } |
750 | } |
751 | |
752 | public function consumeEndCallbacks( $trigger ): array { |
753 | $callbackEntries = array_merge( |
754 | $this->trxPostCommitOrIdleCallbacks, |
755 | $this->trxEndCallbacks, |
756 | ( $trigger === IDatabase::TRIGGER_ROLLBACK ) |
757 | ? $this->trxSectionCancelCallbacks |
758 | : [] // just consume them |
759 | ); |
760 | $this->trxPostCommitOrIdleCallbacks = []; // consumed (and recursion guard) |
761 | $this->trxEndCallbacks = []; // consumed (recursion guard) |
762 | $this->trxSectionCancelCallbacks = []; // consumed (recursion guard) |
763 | |
764 | return $callbackEntries; |
765 | } |
766 | |
767 | /** |
768 | * Consume and run any relevant "on atomic section cancel" callbacks for the active transaction |
769 | * |
770 | * @param IDatabase $db |
771 | * @param int $trigger IDatabase::TRIGGER_* constant |
772 | * @param AtomicSectionIdentifier[] $sectionIds IDs of the sections that where just cancelled |
773 | * @throws Throwable Any exception thrown by a callback |
774 | */ |
775 | public function runOnAtomicSectionCancelCallbacks( IDatabase $db, int $trigger, array $sectionIds ) { |
776 | // Drain the queue of matching "atomic section cancel" callbacks until there are none |
777 | $unrelatedCallbackEntries = []; |
778 | do { |
779 | $callbackEntries = $this->trxSectionCancelCallbacks; |
780 | $this->trxSectionCancelCallbacks = []; // consumed (recursion guard) |
781 | foreach ( $callbackEntries as $entry ) { |
782 | if ( in_array( $entry[2], $sectionIds, true ) ) { |
783 | try { |
784 | // @phan-suppress-next-line PhanUndeclaredInvokeInCallable |
785 | $entry[0]( $trigger, $db ); |
786 | } catch ( Throwable $trxError ) { |
787 | $this->setTransactionError( $trxError ); |
788 | throw $trxError; |
789 | } |
790 | } else { |
791 | $unrelatedCallbackEntries[] = $entry; |
792 | } |
793 | } |
794 | // @phan-suppress-next-line PhanImpossibleConditionInLoop |
795 | } while ( $this->trxSectionCancelCallbacks ); |
796 | |
797 | $this->trxSectionCancelCallbacks = $unrelatedCallbackEntries; |
798 | } |
799 | |
800 | /** |
801 | * Consume and run any "on transaction pre-commit" callbacks |
802 | * |
803 | * @param IDatabase $db |
804 | * @return int Number of callbacks attempted |
805 | * @throws Throwable Any exception thrown by a callback |
806 | */ |
807 | public function runOnTransactionPreCommitCallbacks( IDatabase $db ): int { |
808 | $count = 0; |
809 | |
810 | // Drain the queues of transaction "precommit" callbacks until it is empty |
811 | do { |
812 | $callbackEntries = $this->trxPreCommitOrIdleCallbacks; |
813 | $this->trxPreCommitOrIdleCallbacks = []; // consumed (and recursion guard) |
814 | $count += count( $callbackEntries ); |
815 | foreach ( $callbackEntries as $entry ) { |
816 | try { |
817 | // @phan-suppress-next-line PhanUndeclaredInvokeInCallable |
818 | $entry[0]( $db ); |
819 | } catch ( Throwable $trxError ) { |
820 | $this->setTransactionError( $trxError ); |
821 | throw $trxError; |
822 | } |
823 | } |
824 | // @phan-suppress-next-line PhanImpossibleConditionInLoop |
825 | } while ( $this->trxPreCommitOrIdleCallbacks ); |
826 | |
827 | return $count; |
828 | } |
829 | |
830 | public function clearPreEndCallbacks() { |
831 | $this->trxPostCommitOrIdleCallbacks = []; |
832 | $this->trxPreCommitOrIdleCallbacks = []; |
833 | } |
834 | |
835 | public function clearEndCallbacks() { |
836 | $this->trxEndCallbacks = []; // don't copy |
837 | $this->trxSectionCancelCallbacks = []; // don't copy |
838 | } |
839 | |
840 | public function writesOrCallbacksPending(): bool { |
841 | return $this->trxLevel() && ( |
842 | $this->trxDoneWrites || |
843 | $this->trxPostCommitOrIdleCallbacks || |
844 | $this->trxPreCommitOrIdleCallbacks || |
845 | $this->trxEndCallbacks || |
846 | $this->trxSectionCancelCallbacks |
847 | ); |
848 | } |
849 | |
850 | /** |
851 | * List the methods that have write queries or callbacks for the current transaction |
852 | * |
853 | * @return string[] |
854 | */ |
855 | public function pendingWriteAndCallbackCallers(): array { |
856 | $fnames = $this->pendingWriteCallers(); |
857 | foreach ( [ |
858 | $this->trxPostCommitOrIdleCallbacks, |
859 | $this->trxPreCommitOrIdleCallbacks, |
860 | $this->trxEndCallbacks, |
861 | $this->trxSectionCancelCallbacks |
862 | ] as $callbacks ) { |
863 | foreach ( $callbacks as $callback ) { |
864 | $fnames[] = $callback[1]; |
865 | } |
866 | } |
867 | |
868 | return $fnames; |
869 | } |
870 | |
871 | /** |
872 | * List the methods that have precommit callbacks for the current transaction |
873 | * |
874 | * @return string[] |
875 | */ |
876 | public function pendingPreCommitCallbackCallers(): array { |
877 | $fnames = $this->pendingWriteCallers(); |
878 | foreach ( $this->trxPreCommitOrIdleCallbacks as $callback ) { |
879 | $fnames[] = $callback[1]; |
880 | } |
881 | |
882 | return $fnames; |
883 | } |
884 | |
885 | public function isEndCallbacksSuppressed(): bool { |
886 | return $this->trxEndCallbacksSuppressed; |
887 | } |
888 | |
889 | public function getRecurringCallbacks() { |
890 | return $this->trxRecurringCallbacks; |
891 | } |
892 | |
893 | public function countPostCommitOrIdleCallbacks() { |
894 | return count( $this->trxPostCommitOrIdleCallbacks ); |
895 | } |
896 | |
897 | public function onRollback( IDatabase $db ) { |
898 | $oldTrxId = $this->consumeTrxId(); |
899 | $this->setTrxStatusToNone(); |
900 | $this->resetTrxAtomicLevels(); |
901 | $this->clearPreEndCallbacks(); |
902 | $this->transactionWritingOut( $db, (string)$oldTrxId ); |
903 | } |
904 | |
905 | public function onCommitInCriticalSection( IDatabase $db ) { |
906 | $lastWriteTime = null; |
907 | $oldTrxId = $this->consumeTrxId(); |
908 | $this->setTrxStatusToNone(); |
909 | if ( $this->trxDoneWrites ) { |
910 | $lastWriteTime = microtime( true ); |
911 | $this->transactionWritingOut( $db, (string)$oldTrxId ); |
912 | } |
913 | return $lastWriteTime; |
914 | } |
915 | |
916 | public function onEndAtomicInCriticalSection( $sectionId ) { |
917 | // Hoist callback ownership for callbacks in the section that just ended; |
918 | // all callbacks should have an owner that is present in trxAtomicLevels. |
919 | $currentSectionId = $this->currentAtomicSectionId(); |
920 | if ( $currentSectionId ) { |
921 | $this->reassignCallbacksForSection( $sectionId, $currentSectionId ); |
922 | } |
923 | } |
924 | |
925 | public function onFlushSnapshot( IDatabase $db, $fname, $flush, $trxRoundId ) { |
926 | if ( $this->explicitTrxActive() ) { |
927 | // Committing this transaction would break callers that assume it is still open |
928 | throw new DBUnexpectedError( |
929 | $db, |
930 | "$fname: Cannot flush snapshot; " . |
931 | "explicit transaction '{$this->trxFname}' is still open" |
932 | ); |
933 | } elseif ( $this->writesOrCallbacksPending() ) { |
934 | // This only flushes transactions to clear snapshots, not to write data |
935 | $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() ); |
936 | throw new DBUnexpectedError( |
937 | $db, |
938 | "$fname: Cannot flush snapshot; " . |
939 | "writes from transaction {$this->trxFname} are still pending ($fnames)" |
940 | ); |
941 | } elseif ( |
942 | $this->trxLevel() && |
943 | $trxRoundId && |
944 | $flush !== IDatabase::FLUSHING_INTERNAL && |
945 | $flush !== IDatabase::FLUSHING_ALL_PEERS |
946 | ) { |
947 | $this->logger->warning( |
948 | "$fname: Expected mass snapshot flush of all peer transactions " . |
949 | "in the explicit transactions round '{$trxRoundId}'", |
950 | [ |
951 | 'exception' => new RuntimeException(), |
952 | 'db_log_category' => 'trx' |
953 | ] |
954 | ); |
955 | } |
956 | } |
957 | |
958 | public function onGetScopedLockAndFlush( IDatabase $db, $fname ) { |
959 | if ( $this->writesOrCallbacksPending() ) { |
960 | // This only flushes transactions to clear snapshots, not to write data |
961 | $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() ); |
962 | throw new DBUnexpectedError( |
963 | $db, |
964 | "$fname: Cannot flush pre-lock snapshot; " . |
965 | "writes from transaction {$this->trxFname} are still pending ($fnames)" |
966 | ); |
967 | } |
968 | } |
969 | } |