Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 364 |
|
0.00% |
0 / 64 |
CRAP | |
0.00% |
0 / 1 |
TransactionManager | |
0.00% |
0 / 364 |
|
0.00% |
0 / 64 |
21756 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
trxLevel | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
getTrxId | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
consumeTrxId | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
trxTimestamp | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
trxStatus | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
setTrxStatusToOk | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
setTrxStatusToNone | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
assertTransactionStatus | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
20 | |||
assertSessionStatus | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
setTransactionError | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
setTrxStatusIgnoredCause | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
sessionStatus | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
setSessionError | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
clearSessionError | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
calculateLastTrxApplyTime | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
pendingWriteCallers | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
updateTrxWriteQueryReport | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
30 | |||
pendingWriteQueryDuration | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
flatAtomicSectionList | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
resetTrxAtomicLevels | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
explicitTrxActive | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
12 | |||
trxCheckBeforeClose | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
42 | |||
onCancelAtomicBeforeCriticalSection | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
12 | |||
currentAtomicSectionId | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
addToAtomicLevels | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
onBegin | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
20 | |||
onCommit | |
0.00% |
0 / 29 |
|
0.00% |
0 / 1 |
90 | |||
onEndAtomic | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
20 | |||
getPositionFromSectionId | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
cancelAtomic | |
0.00% |
0 / 21 |
|
0.00% |
0 / 1 |
20 | |||
popAtomicLevel | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
setAutomaticAtomic | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
turnOnAutomatic | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
nextSavePointId | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
6 | |||
writesPending | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
6 | |||
onDestruct | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
12 | |||
transactionWritingIn | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
transactionWritingOut | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
6 | |||
recordQueryCompletion | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
2 | |||
onTransactionResolution | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
addPostCommitOrIdleCallback | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
addPreCommitOrIdleCallback | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
setTransactionListener | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
setTrxEndCallbackSuppression | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
reassignCallbacksForSection | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
56 | |||
modifyCallbacksForCancel | |
0.00% |
0 / 19 |
|
0.00% |
0 / 1 |
12 | |||
consumeEndCallbacks | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
2 | |||
runOnTransactionPreCommitCallbacks | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
12 | |||
clearPreEndCallbacks | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
clearEndCallbacks | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
writesOrCallbacksPending | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
30 | |||
pendingWriteAndCallbackCallers | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 | |||
pendingPreCommitCallbackCallers | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
isEndCallbacksSuppressed | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getRecurringCallbacks | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
countPostCommitOrIdleCallbacks | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
onBeginInCriticalSection | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
2 | |||
onRollbackInCriticalSection | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
onCommitInCriticalSection | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
onSessionLoss | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
onEndAtomicInCriticalSection | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
onFlushSnapshot | |
0.00% |
0 / 25 |
|
0.00% |
0 / 1 |
56 | |||
onGetScopedLockAndFlush | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | namespace Wikimedia\Rdbms; |
21 | |
22 | use Psr\Log\LoggerInterface; |
23 | use Psr\Log\NullLogger; |
24 | use RuntimeException; |
25 | use Throwable; |
26 | use UnexpectedValueException; |
27 | |
28 | /** |
29 | * @ingroup Database |
30 | * @internal This class should not be used outside of Database |
31 | */ |
32 | class TransactionManager { |
33 | /** Transaction is in a error state requiring a full or savepoint rollback */ |
34 | public const STATUS_TRX_ERROR = 1; |
35 | /** Transaction is active and in a normal state */ |
36 | public const STATUS_TRX_OK = 2; |
37 | /** No transaction is active */ |
38 | public const STATUS_TRX_NONE = 3; |
39 | |
40 | /** Session is in a error state requiring a reset */ |
41 | public const STATUS_SESS_ERROR = 1; |
42 | /** Session is in a normal state */ |
43 | public const STATUS_SESS_OK = 2; |
44 | |
45 | /** @var float Guess of how many seconds it takes to replicate a small insert */ |
46 | private const TINY_WRITE_SEC = 0.010; |
47 | /** @var float Consider a write slow if it took more than this many seconds */ |
48 | private const SLOW_WRITE_SEC = 0.500; |
49 | /** Assume an insert of this many rows or less should be fast to replicate */ |
50 | private const SMALL_WRITE_ROWS = 100; |
51 | |
52 | /** @var string Prefix to the atomic section counter used to make savepoint IDs */ |
53 | private const SAVEPOINT_PREFIX = 'wikimedia_rdbms_atomic'; |
54 | |
55 | /** @var ?TransactionIdentifier Application-side ID of the active (server-side) transaction */ |
56 | private $trxId; |
57 | /** @var ?float UNIX timestamp of BEGIN for the last transaction */ |
58 | private $trxTimestamp = null; |
59 | /** @var ?float Round trip time estimate for queries during the last transaction */ |
60 | private $trxRoundTripDelay = null; |
61 | /** @var int STATUS_TRX_* constant indicating the transaction lifecycle state */ |
62 | private $trxStatus = self::STATUS_TRX_NONE; |
63 | /** @var ?Throwable The cause of any unresolved transaction lifecycle state error */ |
64 | private $trxStatusCause; |
65 | /** @var ?array Details of any unresolved statement-rollback error within a transaction */ |
66 | private $trxStatusIgnoredCause; |
67 | |
68 | /** @var ?Throwable The cause of any unresolved session state loss error */ |
69 | private $sessionError; |
70 | |
71 | /** @var string[] Write query callers of the last transaction */ |
72 | private $trxWriteCallers = []; |
73 | /** @var float Seconds spent in write queries for the last transaction */ |
74 | private $trxWriteDuration = 0.0; |
75 | /** @var int Number of write queries for the last transaction */ |
76 | private $trxWriteQueryCount = 0; |
77 | /** @var int Number of rows affected by write queries for the last transaction */ |
78 | private $trxWriteAffectedRows = 0; |
79 | /** @var float Like trxWriteQueryCount but excludes lock-bound, easy to replicate, queries */ |
80 | private $trxWriteAdjDuration = 0.0; |
81 | /** @var int Number of write queries counted in trxWriteAdjDuration */ |
82 | private $trxWriteAdjQueryCount = 0; |
83 | |
84 | /** @var array Pending atomic sections; list of (name, unique ID, savepoint ID) */ |
85 | private $trxAtomicLevels = []; |
86 | /** @var bool Whether the last transaction was started implicitly due to DBO_TRX */ |
87 | private $trxAutomatic = false; |
88 | /** @var bool Whether the last transaction was started implicitly by startAtomic() */ |
89 | private $trxAutomaticAtomic = false; |
90 | |
91 | /** @var ?string Name of the function that started the last transaction */ |
92 | private $trxFname = null; |
93 | /** @var int Counter for atomic savepoint identifiers for the last transaction */ |
94 | private $trxAtomicCounter = 0; |
95 | |
96 | /** |
97 | * @var array[] Pending postcommit callbacks; list of (callable, method name, atomic section id) |
98 | * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}> |
99 | */ |
100 | private $trxPostCommitOrIdleCallbacks = []; |
101 | /** |
102 | * @var array[] Pending precommit callbacks; list of (callable, method name, atomic section id) |
103 | * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}> |
104 | */ |
105 | private $trxPreCommitOrIdleCallbacks = []; |
106 | /** |
107 | * @var array[] Pending post-trx callbacks; list of (callable, method name, atomic section id) |
108 | * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}> |
109 | */ |
110 | private $trxEndCallbacks = []; |
111 | /** @var callable[] Listener callbacks; map of (name => callable) */ |
112 | private $trxRecurringCallbacks = []; |
113 | /** @var bool Whether to suppress triggering of transaction end callbacks */ |
114 | private $trxEndCallbacksSuppressed = false; |
115 | |
116 | /** @var LoggerInterface */ |
117 | private $logger; |
118 | /** @var TransactionProfiler */ |
119 | private $profiler; |
120 | |
121 | public function __construct( ?LoggerInterface $logger = null, $profiler = null ) { |
122 | $this->logger = $logger ?? new NullLogger(); |
123 | $this->profiler = $profiler ?? new TransactionProfiler(); |
124 | } |
125 | |
126 | public function trxLevel() { |
127 | return $this->trxId ? 1 : 0; |
128 | } |
129 | |
130 | /** |
131 | * Get the application-side transaction identifier instance |
132 | * |
133 | * @return ?TransactionIdentifier Token for the active transaction; null if there isn't one |
134 | */ |
135 | public function getTrxId() { |
136 | return $this->trxId; |
137 | } |
138 | |
139 | /** |
140 | * Reset the application-side transaction identifier instance and return the old one |
141 | * |
142 | * @return ?TransactionIdentifier The old transaction token; null if there wasn't one |
143 | */ |
144 | private function consumeTrxId() { |
145 | $old = $this->trxId; |
146 | $this->trxId = null; |
147 | |
148 | return $old; |
149 | } |
150 | |
151 | public function trxTimestamp(): ?float { |
152 | return $this->trxLevel() ? $this->trxTimestamp : null; |
153 | } |
154 | |
155 | /** |
156 | * @return int One of the STATUS_TRX_* class constants |
157 | */ |
158 | public function trxStatus(): int { |
159 | return $this->trxStatus; |
160 | } |
161 | |
162 | public function setTrxStatusToOk() { |
163 | $this->trxStatus = self::STATUS_TRX_OK; |
164 | $this->trxStatusCause = null; |
165 | $this->trxStatusIgnoredCause = null; |
166 | } |
167 | |
168 | public function setTrxStatusToNone() { |
169 | $this->trxStatus = self::STATUS_TRX_NONE; |
170 | $this->trxStatusCause = null; |
171 | $this->trxStatusIgnoredCause = null; |
172 | } |
173 | |
174 | public function assertTransactionStatus( IDatabase $db, $deprecationLogger, $fname ) { |
175 | if ( $this->trxStatus === self::STATUS_TRX_ERROR ) { |
176 | throw new DBTransactionStateError( |
177 | $db, |
178 | "Cannot execute query from $fname while transaction status is ERROR", |
179 | [], |
180 | $this->trxStatusCause |
181 | ); |
182 | } elseif ( $this->trxStatus === self::STATUS_TRX_OK && $this->trxStatusIgnoredCause ) { |
183 | [ $iLastError, $iLastErrno, $iFname ] = $this->trxStatusIgnoredCause; |
184 | $deprecationLogger( |
185 | "Caller from $fname ignored an error originally raised from $iFname: " . |
186 | "[$iLastErrno] $iLastError" |
187 | ); |
188 | $this->trxStatusIgnoredCause = null; |
189 | } |
190 | } |
191 | |
192 | public function assertSessionStatus( IDatabase $db, $fname ) { |
193 | if ( $this->sessionError ) { |
194 | throw new DBSessionStateError( |
195 | $db, |
196 | "Cannot execute query from $fname while session status is ERROR", |
197 | [], |
198 | $this->sessionError |
199 | ); |
200 | } |
201 | } |
202 | |
203 | /** |
204 | * Mark the transaction as requiring rollback (STATUS_TRX_ERROR) due to an error |
205 | */ |
206 | public function setTransactionError( Throwable $trxError ) { |
207 | if ( $this->trxStatus !== self::STATUS_TRX_ERROR ) { |
208 | $this->trxStatus = self::STATUS_TRX_ERROR; |
209 | $this->trxStatusCause = $trxError; |
210 | } |
211 | } |
212 | |
213 | public function setTrxStatusIgnoredCause( ?array $trxStatusIgnoredCause ): void { |
214 | $this->trxStatusIgnoredCause = $trxStatusIgnoredCause; |
215 | } |
216 | |
217 | /** |
218 | * Get the status of the current session (ephemeral server-side state tied to the connection) |
219 | * |
220 | * @return int One of the STATUS_SESSION_* class constants |
221 | */ |
222 | public function sessionStatus() { |
223 | // Check if an unresolved error still exists |
224 | return ( $this->sessionError ) ? self::STATUS_SESS_ERROR : self::STATUS_SESS_OK; |
225 | } |
226 | |
227 | /** |
228 | * Flag the session as needing a reset due to an error, if not already flagged |
229 | */ |
230 | public function setSessionError( Throwable $sessionError ) { |
231 | $this->sessionError ??= $sessionError; |
232 | } |
233 | |
234 | /** |
235 | * Unflag the session as needing a reset due to an error |
236 | */ |
237 | public function clearSessionError() { |
238 | $this->sessionError = null; |
239 | } |
240 | |
241 | /** |
242 | * @param float $rtt |
243 | * @return float Time to apply writes to replicas based on trxWrite* fields |
244 | */ |
245 | private function calculateLastTrxApplyTime( float $rtt ) { |
246 | $rttAdjTotal = $this->trxWriteAdjQueryCount * $rtt; |
247 | $applyTime = max( $this->trxWriteAdjDuration - $rttAdjTotal, 0.0 ); |
248 | // For omitted queries, make them count as something at least |
249 | $omitted = $this->trxWriteQueryCount - $this->trxWriteAdjQueryCount; |
250 | $applyTime += self::TINY_WRITE_SEC * $omitted; |
251 | |
252 | return $applyTime; |
253 | } |
254 | |
255 | public function pendingWriteCallers() { |
256 | return $this->trxLevel() ? $this->trxWriteCallers : []; |
257 | } |
258 | |
259 | /** |
260 | * Update the estimated run-time of a query, not counting large row lock times |
261 | * |
262 | * LoadBalancer can be set to rollback transactions that will create huge replication |
263 | * lag. It bases this estimate off of pendingWriteQueryDuration(). Certain simple |
264 | * queries, like inserting a row can take a long time due to row locking. This method |
265 | * uses some simple heuristics to discount those cases. |
266 | * |
267 | * @param string $queryVerb action in the write query |
268 | * @param float $runtime Total runtime, including RTT |
269 | * @param int $affected Affected row count |
270 | * @param string $fname method name invoking the action |
271 | */ |
272 | public function updateTrxWriteQueryReport( $queryVerb, $runtime, $affected, $fname ) { |
273 | // Whether this is indicative of replica DB runtime (except for RBR or ws_repl) |
274 | $indicativeOfReplicaRuntime = true; |
275 | if ( $runtime > self::SLOW_WRITE_SEC ) { |
276 | // insert(), upsert(), replace() are fast unless bulky in size or blocked on locks |
277 | if ( $queryVerb === 'INSERT' ) { |
278 | $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS; |
279 | } elseif ( $queryVerb === 'REPLACE' ) { |
280 | $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS / 2; |
281 | } |
282 | } |
283 | |
284 | $this->trxWriteDuration += $runtime; |
285 | $this->trxWriteQueryCount++; |
286 | $this->trxWriteAffectedRows += $affected; |
287 | if ( $indicativeOfReplicaRuntime ) { |
288 | $this->trxWriteAdjDuration += $runtime; |
289 | $this->trxWriteAdjQueryCount++; |
290 | } |
291 | |
292 | $this->trxWriteCallers[] = $fname; |
293 | } |
294 | |
295 | public function pendingWriteQueryDuration( $type = IDatabase::ESTIMATE_TOTAL ) { |
296 | if ( !$this->trxLevel() ) { |
297 | return false; |
298 | } elseif ( !$this->trxWriteCallers ) { |
299 | return 0.0; |
300 | } |
301 | |
302 | if ( $type === IDatabase::ESTIMATE_DB_APPLY ) { |
303 | return $this->calculateLastTrxApplyTime( $this->trxRoundTripDelay ); |
304 | } |
305 | |
306 | return $this->trxWriteDuration; |
307 | } |
308 | |
309 | /** |
310 | * @return string |
311 | */ |
312 | private function flatAtomicSectionList() { |
313 | return array_reduce( $this->trxAtomicLevels, static function ( $accum, $v ) { |
314 | return $accum === null ? $v[0] : "$accum, " . $v[0]; |
315 | } ); |
316 | } |
317 | |
318 | public function resetTrxAtomicLevels() { |
319 | $this->trxAtomicLevels = []; |
320 | $this->trxAtomicCounter = 0; |
321 | } |
322 | |
323 | public function explicitTrxActive() { |
324 | return $this->trxLevel() && ( $this->trxAtomicLevels || !$this->trxAutomatic ); |
325 | } |
326 | |
327 | public function trxCheckBeforeClose( IDatabaseForOwner $db, $fname ) { |
328 | $error = null; |
329 | if ( $this->trxAtomicLevels ) { |
330 | // Cannot let incomplete atomic sections be committed |
331 | $levels = $this->flatAtomicSectionList(); |
332 | $error = "$fname: atomic sections $levels are still open"; |
333 | } elseif ( $this->trxAutomatic ) { |
334 | // Only the connection manager can commit non-empty DBO_TRX transactions |
335 | // (empty ones we can silently roll back) |
336 | if ( $db->writesOrCallbacksPending() ) { |
337 | $error = "$fname: " . |
338 | "expected mass rollback of all peer transactions (DBO_TRX set)"; |
339 | } |
340 | } else { |
341 | // Manual transactions should have been committed or rolled |
342 | // back, even if empty. |
343 | $error = "$fname: transaction is still open (from {$this->trxFname})"; |
344 | } |
345 | |
346 | if ( $this->trxEndCallbacksSuppressed && $error === null ) { |
347 | $error = "$fname: callbacks are suppressed; cannot properly commit"; |
348 | } |
349 | |
350 | return $error; |
351 | } |
352 | |
353 | public function onCancelAtomicBeforeCriticalSection( IDatabase $db, $fname ): void { |
354 | if ( !$this->trxLevel() || !$this->trxAtomicLevels ) { |
355 | throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" ); |
356 | } |
357 | } |
358 | |
359 | /** |
360 | * @return AtomicSectionIdentifier|null ID of the topmost atomic section level |
361 | */ |
362 | public function currentAtomicSectionId(): ?AtomicSectionIdentifier { |
363 | if ( $this->trxLevel() && $this->trxAtomicLevels ) { |
364 | $levelInfo = end( $this->trxAtomicLevels ); |
365 | |
366 | return $levelInfo[1]; |
367 | } |
368 | |
369 | return null; |
370 | } |
371 | |
372 | public function addToAtomicLevels( $fname, AtomicSectionIdentifier $sectionId, $savepointId ) { |
373 | $this->trxAtomicLevels[] = [ $fname, $sectionId, $savepointId ]; |
374 | $this->logger->debug( 'startAtomic: entering level ' . |
375 | ( count( $this->trxAtomicLevels ) - 1 ) . " ($fname)", [ 'db_log_category' => 'trx' ] ); |
376 | } |
377 | |
378 | public function onBegin( IDatabase $db, $fname ): void { |
379 | // Protect against mismatched atomic section, transaction nesting, and snapshot loss |
380 | if ( $this->trxLevel() ) { |
381 | if ( $this->trxAtomicLevels ) { |
382 | $levels = $this->flatAtomicSectionList(); |
383 | $msg = "$fname: got explicit BEGIN while atomic section(s) $levels are open"; |
384 | throw new DBUnexpectedError( $db, $msg ); |
385 | } elseif ( !$this->trxAutomatic ) { |
386 | $msg = "$fname: explicit transaction already active (from {$this->trxFname})"; |
387 | throw new DBUnexpectedError( $db, $msg ); |
388 | } else { |
389 | $msg = "$fname: implicit transaction already active (from {$this->trxFname})"; |
390 | throw new DBUnexpectedError( $db, $msg ); |
391 | } |
392 | } |
393 | } |
394 | |
395 | /** |
396 | * @param IDatabase $db |
397 | * @param string $fname |
398 | * @param string $flush one of IDatabase::FLUSHING_* values |
399 | * @return bool false if the commit should go aborted, true otherwise. |
400 | */ |
401 | public function onCommit( IDatabase $db, $fname, $flush ): bool { |
402 | if ( $this->trxLevel() && $this->trxAtomicLevels ) { |
403 | // There are still atomic sections open; this cannot be ignored |
404 | $levels = $this->flatAtomicSectionList(); |
405 | throw new DBUnexpectedError( |
406 | $db, |
407 | "$fname: got COMMIT while atomic sections $levels are still open" |
408 | ); |
409 | } |
410 | |
411 | if ( $flush === IDatabase::FLUSHING_INTERNAL || $flush === IDatabase::FLUSHING_ALL_PEERS ) { |
412 | if ( !$this->trxLevel() ) { |
413 | return false; // nothing to do |
414 | } elseif ( !$this->trxAutomatic ) { |
415 | throw new DBUnexpectedError( |
416 | $db, |
417 | "$fname: flushing an explicit transaction, getting out of sync" |
418 | ); |
419 | } |
420 | } elseif ( !$this->trxLevel() ) { |
421 | $this->logger->error( |
422 | "$fname: no transaction to commit, something got out of sync", |
423 | [ |
424 | 'exception' => new RuntimeException(), |
425 | 'db_log_category' => 'trx' |
426 | ] |
427 | ); |
428 | |
429 | return false; // nothing to do |
430 | } elseif ( $this->trxAutomatic ) { |
431 | throw new DBUnexpectedError( |
432 | $db, |
433 | "$fname: expected mass commit of all peer transactions (DBO_TRX set)" |
434 | ); |
435 | } |
436 | return true; |
437 | } |
438 | |
439 | public function onEndAtomic( IDatabase $db, $fname ): array { |
440 | if ( !$this->trxLevel() || !$this->trxAtomicLevels ) { |
441 | throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" ); |
442 | } |
443 | // Check if the current section matches $fname |
444 | $pos = count( $this->trxAtomicLevels ) - 1; |
445 | [ $savedFname, $sectionId, $savepointId ] = $this->trxAtomicLevels[$pos]; |
446 | $this->logger->debug( "endAtomic: leaving level $pos ($fname)", [ 'db_log_category' => 'trx' ] ); |
447 | |
448 | if ( $savedFname !== $fname ) { |
449 | throw new DBUnexpectedError( |
450 | $db, |
451 | "Invalid atomic section ended (got $fname but expected $savedFname)" |
452 | ); |
453 | } |
454 | |
455 | return [ $savepointId, $sectionId ]; |
456 | } |
457 | |
458 | public function getPositionFromSectionId( ?AtomicSectionIdentifier $sectionId = null ): ?int { |
459 | if ( $sectionId !== null ) { |
460 | // Find the (last) section with the given $sectionId |
461 | $pos = -1; |
462 | foreach ( $this->trxAtomicLevels as $i => [ , $asId, ] ) { |
463 | if ( $asId === $sectionId ) { |
464 | $pos = $i; |
465 | } |
466 | } |
467 | } else { |
468 | $pos = null; |
469 | } |
470 | |
471 | return $pos; |
472 | } |
473 | |
474 | public function cancelAtomic( $pos ) { |
475 | $excisedIds = []; |
476 | $excisedFnames = []; |
477 | $newTopSection = $this->currentAtomicSectionId(); |
478 | if ( $pos !== null ) { |
479 | // Remove all descendant sections and re-index the array |
480 | $len = count( $this->trxAtomicLevels ); |
481 | for ( $i = $pos + 1; $i < $len; ++$i ) { |
482 | $excisedFnames[] = $this->trxAtomicLevels[$i][0]; |
483 | $excisedIds[] = $this->trxAtomicLevels[$i][1]; |
484 | } |
485 | $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 ); |
486 | $newTopSection = $this->currentAtomicSectionId(); |
487 | } |
488 | |
489 | // Check if the current section matches $fname |
490 | $pos = count( $this->trxAtomicLevels ) - 1; |
491 | [ $savedFname, $savedSectionId, $savepointId ] = $this->trxAtomicLevels[$pos]; |
492 | |
493 | if ( $excisedFnames ) { |
494 | $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname) " . |
495 | "and descendants " . implode( ', ', $excisedFnames ), |
496 | [ 'db_log_category' => 'trx' ] |
497 | ); |
498 | } else { |
499 | $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname)", |
500 | [ 'db_log_category' => 'trx' ] |
501 | ); |
502 | } |
503 | |
504 | return [ $savedFname, $excisedIds, $newTopSection, $savedSectionId, $savepointId ]; |
505 | } |
506 | |
507 | /** |
508 | * @return bool Whether no levels remain and transaction was started by a popped level |
509 | */ |
510 | public function popAtomicLevel() { |
511 | array_pop( $this->trxAtomicLevels ); |
512 | |
513 | return !$this->trxAtomicLevels && $this->trxAutomaticAtomic; |
514 | } |
515 | |
516 | public function setAutomaticAtomic( $value ) { |
517 | $this->trxAutomaticAtomic = $value; |
518 | } |
519 | |
520 | public function turnOnAutomatic() { |
521 | $this->trxAutomatic = true; |
522 | } |
523 | |
524 | public function nextSavePointId( IDatabase $db, $fname ) { |
525 | $savepointId = self::SAVEPOINT_PREFIX . ++$this->trxAtomicCounter; |
526 | if ( strlen( $savepointId ) > 30 ) { |
527 | // 30 == Oracle's identifier length limit (pre 12c) |
528 | // With a 22 character prefix, that puts the highest number at 99999999. |
529 | throw new DBUnexpectedError( |
530 | $db, |
531 | 'There have been an excessively large number of atomic sections in a transaction' |
532 | . " started by $this->trxFname (at $fname)" |
533 | ); |
534 | } |
535 | |
536 | return $savepointId; |
537 | } |
538 | |
539 | public function writesPending() { |
540 | return $this->trxLevel() && $this->trxWriteCallers; |
541 | } |
542 | |
543 | public function onDestruct() { |
544 | if ( $this->trxLevel() && $this->trxWriteCallers ) { |
545 | trigger_error( "Uncommitted DB writes (transaction from {$this->trxFname})" ); |
546 | } |
547 | } |
548 | |
549 | public function transactionWritingIn( $serverName, $domainId, float $startTime ) { |
550 | if ( !$this->trxWriteCallers ) { |
551 | $this->profiler->transactionWritingIn( |
552 | $serverName, |
553 | $domainId, |
554 | (string)$this->trxId, |
555 | $startTime |
556 | ); |
557 | } |
558 | } |
559 | |
560 | public function transactionWritingOut( IDatabase $db, $oldId ) { |
561 | if ( $this->trxWriteCallers ) { |
562 | $this->profiler->transactionWritingOut( |
563 | $db->getServerName(), |
564 | $db->getDomainID(), |
565 | $oldId, |
566 | $this->pendingWriteQueryDuration( IDatabase::ESTIMATE_TOTAL ), |
567 | $this->trxWriteAffectedRows |
568 | ); |
569 | } |
570 | } |
571 | |
572 | public function recordQueryCompletion( $sql, $startTime, $isPermWrite, $rowCount, $serverName ) { |
573 | $this->profiler->recordQueryCompletion( |
574 | $sql, |
575 | $startTime, |
576 | $isPermWrite, |
577 | $rowCount, |
578 | (string)$this->trxId, |
579 | $serverName |
580 | ); |
581 | } |
582 | |
583 | public function onTransactionResolution( IDatabase $db, callable $callback, $fname ) { |
584 | if ( !$this->trxLevel() ) { |
585 | throw new DBUnexpectedError( $db, "No transaction is active" ); |
586 | } |
587 | $this->trxEndCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ]; |
588 | } |
589 | |
590 | public function addPostCommitOrIdleCallback( callable $callback, $fname ) { |
591 | $this->trxPostCommitOrIdleCallbacks[] = [ |
592 | $callback, |
593 | $fname, |
594 | $this->currentAtomicSectionId() |
595 | ]; |
596 | } |
597 | |
598 | final public function addPreCommitOrIdleCallback( callable $callback, $fname ) { |
599 | $this->trxPreCommitOrIdleCallbacks[] = [ |
600 | $callback, |
601 | $fname, |
602 | $this->currentAtomicSectionId() |
603 | ]; |
604 | } |
605 | |
606 | public function setTransactionListener( $name, ?callable $callback = null ) { |
607 | if ( $callback ) { |
608 | $this->trxRecurringCallbacks[$name] = $callback; |
609 | } else { |
610 | unset( $this->trxRecurringCallbacks[$name] ); |
611 | } |
612 | } |
613 | |
614 | /** |
615 | * Whether to disable running of post-COMMIT/ROLLBACK callbacks |
616 | * @param bool $suppress |
617 | */ |
618 | public function setTrxEndCallbackSuppression( bool $suppress ) { |
619 | $this->trxEndCallbacksSuppressed = $suppress; |
620 | } |
621 | |
622 | /** |
623 | * Hoist callback ownership for callbacks in a section to a parent section. |
624 | * All callbacks should have an owner that is present in trxAtomicLevels. |
625 | * @param AtomicSectionIdentifier $old |
626 | * @param AtomicSectionIdentifier $new |
627 | */ |
628 | public function reassignCallbacksForSection( |
629 | AtomicSectionIdentifier $old, |
630 | AtomicSectionIdentifier $new |
631 | ) { |
632 | foreach ( $this->trxPreCommitOrIdleCallbacks as $key => $info ) { |
633 | if ( $info[2] === $old ) { |
634 | $this->trxPreCommitOrIdleCallbacks[$key][2] = $new; |
635 | } |
636 | } |
637 | foreach ( $this->trxPostCommitOrIdleCallbacks as $key => $info ) { |
638 | if ( $info[2] === $old ) { |
639 | $this->trxPostCommitOrIdleCallbacks[$key][2] = $new; |
640 | } |
641 | } |
642 | foreach ( $this->trxEndCallbacks as $key => $info ) { |
643 | if ( $info[2] === $old ) { |
644 | $this->trxEndCallbacks[$key][2] = $new; |
645 | } |
646 | } |
647 | } |
648 | |
649 | /** |
650 | * Update callbacks that were owned by cancelled atomic sections. |
651 | * |
652 | * Callbacks for "on commit" should never be run if they're owned by a |
653 | * section that won't be committed. |
654 | * |
655 | * Callbacks for "on resolution" need to reflect that the section was |
656 | * rolled back, even if the transaction as a whole commits successfully. |
657 | * |
658 | * Callbacks for "on section cancel" should already have been consumed, |
659 | * but errors during the cancellation itself can prevent that while still |
660 | * destroying the section. Hoist any such callbacks to the new top section, |
661 | * which we assume will itself have to be cancelled or rolled back to |
662 | * resolve the error. |
663 | * |
664 | * @param AtomicSectionIdentifier[] $excisedSectionsId Cancelled section IDs |
665 | * @param AtomicSectionIdentifier|null $newSectionId New top section ID |
666 | * @throws UnexpectedValueException |
667 | */ |
668 | public function modifyCallbacksForCancel( |
669 | array $excisedSectionsId, |
670 | ?AtomicSectionIdentifier $newSectionId = null |
671 | ) { |
672 | // Cancel the "on commit" callbacks owned by this savepoint |
673 | $this->trxPostCommitOrIdleCallbacks = array_filter( |
674 | $this->trxPostCommitOrIdleCallbacks, |
675 | static function ( $entry ) use ( $excisedSectionsId ) { |
676 | return !in_array( $entry[2], $excisedSectionsId, true ); |
677 | } |
678 | ); |
679 | $this->trxPreCommitOrIdleCallbacks = array_filter( |
680 | $this->trxPreCommitOrIdleCallbacks, |
681 | static function ( $entry ) use ( $excisedSectionsId ) { |
682 | return !in_array( $entry[2], $excisedSectionsId, true ); |
683 | } |
684 | ); |
685 | // Make "on resolution" callbacks owned by this savepoint to perceive a rollback |
686 | foreach ( $this->trxEndCallbacks as $key => $entry ) { |
687 | if ( in_array( $entry[2], $excisedSectionsId, true ) ) { |
688 | $callback = $entry[0]; |
689 | $this->trxEndCallbacks[$key][0] = static function () use ( $callback ) { |
690 | return $callback( IDatabase::TRIGGER_ROLLBACK ); |
691 | }; |
692 | // This "on resolution" callback no longer belongs to a section. |
693 | $this->trxEndCallbacks[$key][2] = null; |
694 | } |
695 | } |
696 | } |
697 | |
698 | public function consumeEndCallbacks(): array { |
699 | $callbackEntries = array_merge( |
700 | $this->trxPostCommitOrIdleCallbacks, |
701 | $this->trxEndCallbacks |
702 | ); |
703 | $this->trxPostCommitOrIdleCallbacks = []; // consumed (and recursion guard) |
704 | $this->trxEndCallbacks = []; // consumed (recursion guard) |
705 | |
706 | return $callbackEntries; |
707 | } |
708 | |
709 | /** |
710 | * Consume and run any "on transaction pre-commit" callbacks |
711 | * |
712 | * @return int Number of callbacks attempted |
713 | * @throws Throwable Any exception thrown by a callback |
714 | */ |
715 | public function runOnTransactionPreCommitCallbacks(): int { |
716 | $count = 0; |
717 | |
718 | // Drain the queues of transaction "precommit" callbacks until it is empty |
719 | do { |
720 | $callbackEntries = $this->trxPreCommitOrIdleCallbacks; |
721 | $this->trxPreCommitOrIdleCallbacks = []; // consumed (and recursion guard) |
722 | $count += count( $callbackEntries ); |
723 | foreach ( $callbackEntries as $entry ) { |
724 | try { |
725 | $entry[0](); |
726 | } catch ( Throwable $trxError ) { |
727 | $this->setTransactionError( $trxError ); |
728 | throw $trxError; |
729 | } |
730 | } |
731 | // @phan-suppress-next-line PhanImpossibleConditionInLoop |
732 | } while ( $this->trxPreCommitOrIdleCallbacks ); |
733 | |
734 | return $count; |
735 | } |
736 | |
737 | public function clearPreEndCallbacks() { |
738 | $this->trxPostCommitOrIdleCallbacks = []; |
739 | $this->trxPreCommitOrIdleCallbacks = []; |
740 | } |
741 | |
742 | public function clearEndCallbacks() { |
743 | $this->trxEndCallbacks = []; // don't copy |
744 | } |
745 | |
746 | public function writesOrCallbacksPending(): bool { |
747 | return $this->trxLevel() && ( |
748 | $this->trxWriteCallers || |
749 | $this->trxPostCommitOrIdleCallbacks || |
750 | $this->trxPreCommitOrIdleCallbacks || |
751 | $this->trxEndCallbacks |
752 | ); |
753 | } |
754 | |
755 | /** |
756 | * List the methods that have write queries or callbacks for the current transaction |
757 | * |
758 | * @return string[] |
759 | */ |
760 | public function pendingWriteAndCallbackCallers(): array { |
761 | $fnames = $this->pendingWriteCallers(); |
762 | foreach ( [ |
763 | $this->trxPostCommitOrIdleCallbacks, |
764 | $this->trxPreCommitOrIdleCallbacks, |
765 | $this->trxEndCallbacks |
766 | ] as $callbacks ) { |
767 | foreach ( $callbacks as $callback ) { |
768 | $fnames[] = $callback[1]; |
769 | } |
770 | } |
771 | |
772 | return $fnames; |
773 | } |
774 | |
775 | /** |
776 | * List the methods that have precommit callbacks for the current transaction |
777 | * |
778 | * @return string[] |
779 | */ |
780 | public function pendingPreCommitCallbackCallers(): array { |
781 | $fnames = $this->pendingWriteCallers(); |
782 | foreach ( $this->trxPreCommitOrIdleCallbacks as $callback ) { |
783 | $fnames[] = $callback[1]; |
784 | } |
785 | |
786 | return $fnames; |
787 | } |
788 | |
789 | public function isEndCallbacksSuppressed(): bool { |
790 | return $this->trxEndCallbacksSuppressed; |
791 | } |
792 | |
793 | public function getRecurringCallbacks() { |
794 | return $this->trxRecurringCallbacks; |
795 | } |
796 | |
797 | public function countPostCommitOrIdleCallbacks() { |
798 | return count( $this->trxPostCommitOrIdleCallbacks ); |
799 | } |
800 | |
801 | /** |
802 | * @param string $mode One of IDatabase::TRANSACTION_* values |
803 | * @param string $fname method name |
804 | * @param float $rtt Trivial query round-trip-delay |
805 | */ |
806 | public function onBeginInCriticalSection( $mode, $fname, $rtt ) { |
807 | $this->trxId = new TransactionIdentifier(); |
808 | $this->setTrxStatusToOk(); |
809 | $this->resetTrxAtomicLevels(); |
810 | $this->trxWriteCallers = []; |
811 | $this->trxWriteDuration = 0.0; |
812 | $this->trxWriteQueryCount = 0; |
813 | $this->trxWriteAffectedRows = 0; |
814 | $this->trxWriteAdjDuration = 0.0; |
815 | $this->trxWriteAdjQueryCount = 0; |
816 | // T147697: make explicitTrxActive() return true until begin() finishes. This way, |
817 | // no caller triggered by getApproximateLagStatus() will think its OK to muck around |
818 | // with the transaction just because startAtomic() has not yet finished updating the |
819 | // tracking fields (e.g. trxAtomicLevels). |
820 | $this->trxAutomatic = ( $mode === IDatabase::TRANSACTION_INTERNAL ); |
821 | $this->trxAutomaticAtomic = false; |
822 | $this->trxFname = $fname; |
823 | $this->trxTimestamp = microtime( true ); |
824 | $this->trxRoundTripDelay = $rtt; |
825 | } |
826 | |
827 | public function onRollbackInCriticalSection( IDatabase $db ) { |
828 | $oldTrxId = $this->consumeTrxId(); |
829 | $this->setTrxStatusToNone(); |
830 | $this->resetTrxAtomicLevels(); |
831 | $this->clearPreEndCallbacks(); |
832 | $this->transactionWritingOut( $db, (string)$oldTrxId ); |
833 | } |
834 | |
835 | public function onCommitInCriticalSection( IDatabase $db ) { |
836 | $lastWriteTime = null; |
837 | |
838 | $oldTrxId = $this->consumeTrxId(); |
839 | $this->setTrxStatusToNone(); |
840 | if ( $this->trxWriteCallers ) { |
841 | $lastWriteTime = microtime( true ); |
842 | $this->transactionWritingOut( $db, (string)$oldTrxId ); |
843 | } |
844 | |
845 | return $lastWriteTime; |
846 | } |
847 | |
848 | public function onSessionLoss( IDatabase $db ) { |
849 | $oldTrxId = $this->consumeTrxId(); |
850 | $this->clearPreEndCallbacks(); |
851 | $this->transactionWritingOut( $db, (string)$oldTrxId ); |
852 | } |
853 | |
854 | public function onEndAtomicInCriticalSection( $sectionId ) { |
855 | // Hoist callback ownership for callbacks in the section that just ended; |
856 | // all callbacks should have an owner that is present in trxAtomicLevels. |
857 | $currentSectionId = $this->currentAtomicSectionId(); |
858 | if ( $currentSectionId ) { |
859 | $this->reassignCallbacksForSection( $sectionId, $currentSectionId ); |
860 | } |
861 | } |
862 | |
863 | public function onFlushSnapshot( IDatabase $db, $fname, $flush, $trxRoundFname ) { |
864 | if ( $this->explicitTrxActive() ) { |
865 | // Committing this transaction would break callers that assume it is still open |
866 | throw new DBUnexpectedError( |
867 | $db, |
868 | "$fname: Cannot flush snapshot; " . |
869 | "explicit transaction '{$this->trxFname}' is still open" |
870 | ); |
871 | } elseif ( $this->writesOrCallbacksPending() ) { |
872 | // This only flushes transactions to clear snapshots, not to write data |
873 | $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() ); |
874 | throw new DBUnexpectedError( |
875 | $db, |
876 | "$fname: Cannot flush snapshot; " . |
877 | "writes from transaction {$this->trxFname} are still pending ($fnames)" |
878 | ); |
879 | } elseif ( |
880 | $this->trxLevel() && |
881 | $trxRoundFname !== null && |
882 | $flush !== IDatabase::FLUSHING_INTERNAL && |
883 | $flush !== IDatabase::FLUSHING_ALL_PEERS |
884 | ) { |
885 | $this->logger->warning( |
886 | "$fname: Expected mass snapshot flush of all peer transactions " . |
887 | "in the explicit transactions round '{$trxRoundFname}'", |
888 | [ |
889 | 'exception' => new RuntimeException(), |
890 | 'db_log_category' => 'trx' |
891 | ] |
892 | ); |
893 | } |
894 | } |
895 | |
896 | public function onGetScopedLockAndFlush( IDatabase $db, $fname ) { |
897 | if ( $this->writesOrCallbacksPending() ) { |
898 | // This only flushes transactions to clear snapshots, not to write data |
899 | $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() ); |
900 | throw new DBUnexpectedError( |
901 | $db, |
902 | "$fname: Cannot flush pre-lock snapshot; " . |
903 | "writes from transaction {$this->trxFname} are still pending ($fnames)" |
904 | ); |
905 | } |
906 | } |
907 | } |