Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 391
0.00% covered (danger)
0.00%
0 / 66
CRAP
0.00% covered (danger)
0.00%
0 / 1
TransactionManager
0.00% covered (danger)
0.00%
0 / 391
0.00% covered (danger)
0.00%
0 / 66
25760
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 trxLevel
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 getTrxId
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 consumeTrxId
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 trxTimestamp
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 trxStatus
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setTrxStatusToOk
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 setTrxStatusToNone
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 assertTransactionStatus
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
20
 assertSessionStatus
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 setTransactionError
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 setTrxStatusIgnoredCause
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 sessionStatus
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 setSessionError
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 clearSessionError
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 calculateLastTrxApplyTime
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 pendingWriteCallers
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 updateTrxWriteQueryReport
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
30
 pendingWriteQueryDuration
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 flatAtomicSectionList
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 resetTrxAtomicLevels
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 explicitTrxActive
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
12
 trxCheckBeforeClose
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
42
 onAtomicSectionCancel
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
12
 onCancelAtomicBeforeCriticalSection
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
12
 currentAtomicSectionId
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 addToAtomicLevels
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 onBegin
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
20
 onCommit
0.00% covered (danger)
0.00%
0 / 29
0.00% covered (danger)
0.00%
0 / 1
90
 onEndAtomic
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
20
 getPositionFromSectionId
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 cancelAtomic
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
20
 popAtomicLevel
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 setAutomaticAtomic
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 turnOnAutomatic
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 nextSavePointId
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 writesPending
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 onDestruct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
12
 transactionWritingIn
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 transactionWritingOut
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 recordQueryCompletion
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
2
 onTransactionResolution
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 addPostCommitOrIdleCallback
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 addPreCommitOrIdleCallback
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 setTransactionListener
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 setTrxEndCallbackSuppression
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 reassignCallbacksForSection
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
90
 modifyCallbacksForCancel
0.00% covered (danger)
0.00%
0 / 22
0.00% covered (danger)
0.00%
0 / 1
30
 consumeEndCallbacks
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 runOnAtomicSectionCancelCallbacks
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
20
 runOnTransactionPreCommitCallbacks
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
12
 clearPreEndCallbacks
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 clearEndCallbacks
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 writesOrCallbacksPending
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
42
 pendingWriteAndCallbackCallers
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
12
 pendingPreCommitCallbackCallers
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 isEndCallbacksSuppressed
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getRecurringCallbacks
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 countPostCommitOrIdleCallbacks
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 onBeginInCriticalSection
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
2
 onRollbackInCriticalSection
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 onCommitInCriticalSection
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 onSessionLoss
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 onEndAtomicInCriticalSection
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 onFlushSnapshot
0.00% covered (danger)
0.00%
0 / 24
0.00% covered (danger)
0.00%
0 / 1
56
 onGetScopedLockAndFlush
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20namespace Wikimedia\Rdbms;
21
22use Psr\Log\LoggerInterface;
23use Psr\Log\NullLogger;
24use RuntimeException;
25use Throwable;
26use UnexpectedValueException;
27
28/**
29 * @ingroup Database
30 * @internal This class should not be used outside of Database
31 */
32class TransactionManager {
33    /** Transaction is in a error state requiring a full or savepoint rollback */
34    public const STATUS_TRX_ERROR = 1;
35    /** Transaction is active and in a normal state */
36    public const STATUS_TRX_OK = 2;
37    /** No transaction is active */
38    public const STATUS_TRX_NONE = 3;
39
40    /** Session is in a error state requiring a reset */
41    public const STATUS_SESS_ERROR = 1;
42    /** Session is in a normal state */
43    public const STATUS_SESS_OK = 2;
44
45    /** @var float Guess of how many seconds it takes to replicate a small insert */
46    private const TINY_WRITE_SEC = 0.010;
47    /** @var float Consider a write slow if it took more than this many seconds */
48    private const SLOW_WRITE_SEC = 0.500;
49    /** Assume an insert of this many rows or less should be fast to replicate */
50    private const SMALL_WRITE_ROWS = 100;
51
52    /** @var string Prefix to the atomic section counter used to make savepoint IDs */
53    private const SAVEPOINT_PREFIX = 'wikimedia_rdbms_atomic';
54
55    /** @var ?TransactionIdentifier Application-side ID of the active (server-side) transaction */
56    private $trxId;
57    /** @var ?float UNIX timestamp of BEGIN for the last transaction */
58    private $trxTimestamp = null;
59    /** @var ?float Round trip time estimate for queries during the last transaction */
60    private $trxRoundTripDelay = null;
61    /** @var int STATUS_TRX_* constant indicating the transaction lifecycle state */
62    private $trxStatus = self::STATUS_TRX_NONE;
63    /** @var ?Throwable The cause of any unresolved transaction lifecycle state error */
64    private $trxStatusCause;
65    /** @var ?array Details of any unresolved statement-rollback error within a transaction */
66    private $trxStatusIgnoredCause;
67
68    /** @var ?Throwable The cause of any unresolved session state loss error */
69    private $sessionError;
70
71    /** @var string[] Write query callers of the last transaction */
72    private $trxWriteCallers = [];
73    /** @var float Seconds spent in write queries for the last transaction */
74    private $trxWriteDuration = 0.0;
75    /** @var int Number of write queries for the last transaction */
76    private $trxWriteQueryCount = 0;
77    /** @var int Number of rows affected by write queries for the last transaction */
78    private $trxWriteAffectedRows = 0;
79    /** @var float Like trxWriteQueryCount but excludes lock-bound, easy to replicate, queries */
80    private $trxWriteAdjDuration = 0.0;
81    /** @var int Number of write queries counted in trxWriteAdjDuration */
82    private $trxWriteAdjQueryCount = 0;
83
84    /** @var array Pending atomic sections; list of (name, unique ID, savepoint ID) */
85    private $trxAtomicLevels = [];
86    /** @var bool Whether the last transaction was started implicitly due to DBO_TRX */
87    private $trxAutomatic = false;
88    /** @var bool Whether the last transaction was started implicitly by startAtomic() */
89    private $trxAutomaticAtomic = false;
90
91    /** @var ?string Name of the function that started the last transaction */
92    private $trxFname = null;
93    /** @var int Counter for atomic savepoint identifiers for the last transaction */
94    private $trxAtomicCounter = 0;
95
96    /**
97     * @var array[] Pending postcommit callbacks; list of (callable, method name, atomic section id)
98     * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}>
99     */
100    private $trxPostCommitOrIdleCallbacks = [];
101    /**
102     * @var array[] Pending precommit callbacks; list of (callable, method name, atomic section id)
103     * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}>
104     */
105    private $trxPreCommitOrIdleCallbacks = [];
106    /**
107     * @var array[] Pending post-trx callbacks; list of (callable, method name, atomic section id)
108     * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}>
109     */
110    private $trxEndCallbacks = [];
111    /**
112     * @var array[] Pending cancel callbacks; list of (callable, method name, atomic section id)
113     * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}>
114     */
115    private $trxSectionCancelCallbacks = [];
116    /** @var callable[] Listener callbacks; map of (name => callable) */
117    private $trxRecurringCallbacks = [];
118    /** @var bool Whether to suppress triggering of transaction end callbacks */
119    private $trxEndCallbacksSuppressed = false;
120
121    /** @var LoggerInterface */
122    private $logger;
123    /** @var TransactionProfiler */
124    private $profiler;
125
126    public function __construct( LoggerInterface $logger = null, $profiler = null ) {
127        $this->logger = $logger ?? new NullLogger();
128        $this->profiler = $profiler ?? new TransactionProfiler();
129    }
130
131    public function trxLevel() {
132        return $this->trxId ? 1 : 0;
133    }
134
135    /**
136     * Get the application-side transaction identifier instance
137     *
138     * @return ?TransactionIdentifier Token for the active transaction; null if there isn't one
139     */
140    public function getTrxId() {
141        return $this->trxId;
142    }
143
144    /**
145     * Reset the application-side transaction identifier instance and return the old one
146     *
147     * @return ?TransactionIdentifier The old transaction token; null if there wasn't one
148     */
149    private function consumeTrxId() {
150        $old = $this->trxId;
151        $this->trxId = null;
152
153        return $old;
154    }
155
156    public function trxTimestamp(): ?float {
157        return $this->trxLevel() ? $this->trxTimestamp : null;
158    }
159
160    /**
161     * @return int One of the STATUS_TRX_* class constants
162     */
163    public function trxStatus(): int {
164        return $this->trxStatus;
165    }
166
167    public function setTrxStatusToOk() {
168        $this->trxStatus = self::STATUS_TRX_OK;
169        $this->trxStatusCause = null;
170        $this->trxStatusIgnoredCause = null;
171    }
172
173    public function setTrxStatusToNone() {
174        $this->trxStatus = self::STATUS_TRX_NONE;
175        $this->trxStatusCause = null;
176        $this->trxStatusIgnoredCause = null;
177    }
178
179    public function assertTransactionStatus( IDatabase $db, $deprecationLogger, $fname ) {
180        if ( $this->trxStatus < self::STATUS_TRX_OK ) {
181            throw new DBTransactionStateError(
182                $db,
183                "Cannot execute query from $fname while transaction status is ERROR",
184                [],
185                $this->trxStatusCause
186            );
187        } elseif ( $this->trxStatus === self::STATUS_TRX_OK && $this->trxStatusIgnoredCause ) {
188            [ $iLastError, $iLastErrno, $iFname ] = $this->trxStatusIgnoredCause;
189            call_user_func( $deprecationLogger,
190                "Caller from $fname ignored an error originally raised from $iFname" .
191                "[$iLastErrno$iLastError"
192            );
193            $this->trxStatusIgnoredCause = null;
194        }
195    }
196
197    public function assertSessionStatus( IDatabase $db, $fname ) {
198        if ( $this->sessionError ) {
199            throw new DBSessionStateError(
200                $db,
201                "Cannot execute query from $fname while session status is ERROR",
202                [],
203                $this->sessionError
204            );
205        }
206    }
207
208    /**
209     * Mark the transaction as requiring rollback (STATUS_TRX_ERROR) due to an error
210     *
211     * @param Throwable $trxError
212     */
213    public function setTransactionError( Throwable $trxError ) {
214        if ( $this->trxStatus > self::STATUS_TRX_ERROR ) {
215            $this->trxStatus = self::STATUS_TRX_ERROR;
216            $this->trxStatusCause = $trxError;
217        }
218    }
219
220    /**
221     * @param array|null $trxStatusIgnoredCause
222     */
223    public function setTrxStatusIgnoredCause( ?array $trxStatusIgnoredCause ): void {
224        $this->trxStatusIgnoredCause = $trxStatusIgnoredCause;
225    }
226
227    /**
228     * Get the status of the current session (ephemeral server-side state tied to the connection)
229     *
230     * @return int One of the STATUS_SESSION_* class constants
231     */
232    public function sessionStatus() {
233        // Check if an unresolved error still exists
234        return ( $this->sessionError ) ? self::STATUS_SESS_ERROR : self::STATUS_SESS_OK;
235    }
236
237    /**
238     * Flag the session as needing a reset due to an error, if not already flagged
239     *
240     * @param Throwable $sessionError
241     */
242    public function setSessionError( Throwable $sessionError ) {
243        $this->sessionError ??= $sessionError;
244    }
245
246    /**
247     * Unflag the session as needing a reset due to an error
248     */
249    public function clearSessionError() {
250        $this->sessionError = null;
251    }
252
253    /**
254     * @param float $rtt
255     * @return float Time to apply writes to replicas based on trxWrite* fields
256     */
257    private function calculateLastTrxApplyTime( float $rtt ) {
258        $rttAdjTotal = $this->trxWriteAdjQueryCount * $rtt;
259        $applyTime = max( $this->trxWriteAdjDuration - $rttAdjTotal, 0.0 );
260        // For omitted queries, make them count as something at least
261        $omitted = $this->trxWriteQueryCount - $this->trxWriteAdjQueryCount;
262        $applyTime += self::TINY_WRITE_SEC * $omitted;
263
264        return $applyTime;
265    }
266
267    public function pendingWriteCallers() {
268        return $this->trxLevel() ? $this->trxWriteCallers : [];
269    }
270
271    /**
272     * Update the estimated run-time of a query, not counting large row lock times
273     *
274     * LoadBalancer can be set to rollback transactions that will create huge replication
275     * lag. It bases this estimate off of pendingWriteQueryDuration(). Certain simple
276     * queries, like inserting a row can take a long time due to row locking. This method
277     * uses some simple heuristics to discount those cases.
278     *
279     * @param string $queryVerb action in the write query
280     * @param float $runtime Total runtime, including RTT
281     * @param int $affected Affected row count
282     * @param string $fname method name invoking the action
283     */
284    public function updateTrxWriteQueryReport( $queryVerb, $runtime, $affected, $fname ) {
285        // Whether this is indicative of replica DB runtime (except for RBR or ws_repl)
286        $indicativeOfReplicaRuntime = true;
287        if ( $runtime > self::SLOW_WRITE_SEC ) {
288            // insert(), upsert(), replace() are fast unless bulky in size or blocked on locks
289            if ( $queryVerb === 'INSERT' ) {
290                $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS;
291            } elseif ( $queryVerb === 'REPLACE' ) {
292                $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS / 2;
293            }
294        }
295
296        $this->trxWriteDuration += $runtime;
297        $this->trxWriteQueryCount += 1;
298        $this->trxWriteAffectedRows += $affected;
299        if ( $indicativeOfReplicaRuntime ) {
300            $this->trxWriteAdjDuration += $runtime;
301            $this->trxWriteAdjQueryCount += 1;
302        }
303
304        $this->trxWriteCallers[] = $fname;
305    }
306
307    public function pendingWriteQueryDuration( $type = IDatabase::ESTIMATE_TOTAL ) {
308        if ( !$this->trxLevel() ) {
309            return false;
310        } elseif ( !$this->trxWriteCallers ) {
311            return 0.0;
312        }
313
314        if ( $type === IDatabase::ESTIMATE_DB_APPLY ) {
315            return $this->calculateLastTrxApplyTime( $this->trxRoundTripDelay );
316        }
317
318        return $this->trxWriteDuration;
319    }
320
321    /**
322     * @return string
323     */
324    private function flatAtomicSectionList() {
325        return array_reduce( $this->trxAtomicLevels, static function ( $accum, $v ) {
326            return $accum === null ? $v[0] : "$accum" . $v[0];
327        } );
328    }
329
330    public function resetTrxAtomicLevels() {
331        $this->trxAtomicLevels = [];
332        $this->trxAtomicCounter = 0;
333    }
334
335    public function explicitTrxActive() {
336        return $this->trxLevel() && ( $this->trxAtomicLevels || !$this->trxAutomatic );
337    }
338
339    public function trxCheckBeforeClose( IDatabaseForOwner $db, $fname ) {
340        $error = null;
341        if ( $this->trxAtomicLevels ) {
342            // Cannot let incomplete atomic sections be committed
343            $levels = $this->flatAtomicSectionList();
344            $error = "$fname: atomic sections $levels are still open";
345        } elseif ( $this->trxAutomatic ) {
346            // Only the connection manager can commit non-empty DBO_TRX transactions
347            // (empty ones we can silently roll back)
348            if ( $db->writesOrCallbacksPending() ) {
349                $error = "$fname" .
350                    "expected mass rollback of all peer transactions (DBO_TRX set)";
351            }
352        } else {
353            // Manual transactions should have been committed or rolled
354            // back, even if empty.
355            $error = "$fname: transaction is still open (from {$this->trxFname})";
356        }
357
358        if ( $this->trxEndCallbacksSuppressed && $error === null ) {
359            $error = "$fname: callbacks are suppressed; cannot properly commit";
360        }
361
362        return $error;
363    }
364
365    public function onAtomicSectionCancel( IDatabase $db, $callback, $fname ): void {
366        if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
367            throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" );
368        }
369        $this->trxSectionCancelCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
370    }
371
372    public function onCancelAtomicBeforeCriticalSection( IDatabase $db, $fname ): void {
373        if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
374            throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" );
375        }
376    }
377
378    /**
379     * @return AtomicSectionIdentifier|null ID of the topmost atomic section level
380     */
381    public function currentAtomicSectionId(): ?AtomicSectionIdentifier {
382        if ( $this->trxLevel() && $this->trxAtomicLevels ) {
383            $levelInfo = end( $this->trxAtomicLevels );
384
385            return $levelInfo[1];
386        }
387
388        return null;
389    }
390
391    public function addToAtomicLevels( $fname, AtomicSectionIdentifier $sectionId, $savepointId ) {
392        $this->trxAtomicLevels[] = [ $fname, $sectionId, $savepointId ];
393        $this->logger->debug( 'startAtomic: entering level ' .
394            ( count( $this->trxAtomicLevels ) - 1 ) . " ($fname)", [ 'db_log_category' => 'trx' ] );
395    }
396
397    public function onBegin( IDatabase $db, $fname ): void {
398        // Protect against mismatched atomic section, transaction nesting, and snapshot loss
399        if ( $this->trxLevel() ) {
400            if ( $this->trxAtomicLevels ) {
401                $levels = $this->flatAtomicSectionList();
402                $msg = "$fname: got explicit BEGIN while atomic section(s) $levels are open";
403                throw new DBUnexpectedError( $db, $msg );
404            } elseif ( !$this->trxAutomatic ) {
405                $msg = "$fname: explicit transaction already active (from {$this->trxFname})";
406                throw new DBUnexpectedError( $db, $msg );
407            } else {
408                $msg = "$fname: implicit transaction already active (from {$this->trxFname})";
409                throw new DBUnexpectedError( $db, $msg );
410            }
411        }
412    }
413
414    /**
415     * @param IDatabase $db
416     * @param string $fname
417     * @param string $flush one of IDatabase::FLUSHING_* values
418     * @return bool false if the commit should go aborted, true otherwise.
419     */
420    public function onCommit( IDatabase $db, $fname, $flush ): bool {
421        if ( $this->trxLevel() && $this->trxAtomicLevels ) {
422            // There are still atomic sections open; this cannot be ignored
423            $levels = $this->flatAtomicSectionList();
424            throw new DBUnexpectedError(
425                $db,
426                "$fname: got COMMIT while atomic sections $levels are still open"
427            );
428        }
429
430        if ( $flush === IDatabase::FLUSHING_INTERNAL || $flush === IDatabase::FLUSHING_ALL_PEERS ) {
431            if ( !$this->trxLevel() ) {
432                return false; // nothing to do
433            } elseif ( !$this->trxAutomatic ) {
434                throw new DBUnexpectedError(
435                    $db,
436                    "$fname: flushing an explicit transaction, getting out of sync"
437                );
438            }
439        } elseif ( !$this->trxLevel() ) {
440            $this->logger->error(
441                "$fname: no transaction to commit, something got out of sync",
442                [
443                    'exception' => new RuntimeException(),
444                    'db_log_category' => 'trx'
445                ]
446            );
447
448            return false; // nothing to do
449        } elseif ( $this->trxAutomatic ) {
450            throw new DBUnexpectedError(
451                $db,
452                "$fname: expected mass commit of all peer transactions (DBO_TRX set)"
453            );
454        }
455        return true;
456    }
457
458    public function onEndAtomic( IDatabase $db, $fname ): array {
459        if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
460            throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" );
461        }
462        // Check if the current section matches $fname
463        $pos = count( $this->trxAtomicLevels ) - 1;
464        [ $savedFname, $sectionId, $savepointId ] = $this->trxAtomicLevels[$pos];
465        $this->logger->debug( "endAtomic: leaving level $pos ($fname)", [ 'db_log_category' => 'trx' ] );
466
467        if ( $savedFname !== $fname ) {
468            throw new DBUnexpectedError(
469                $db,
470                "Invalid atomic section ended (got $fname but expected $savedFname)"
471            );
472        }
473
474        return [ $savepointId, $sectionId ];
475    }
476
477    public function getPositionFromSectionId( AtomicSectionIdentifier $sectionId = null ): ?int {
478        if ( $sectionId !== null ) {
479            // Find the (last) section with the given $sectionId
480            $pos = -1;
481            foreach ( $this->trxAtomicLevels as $i => [ , $asId, ] ) {
482                if ( $asId === $sectionId ) {
483                    $pos = $i;
484                }
485            }
486        } else {
487            $pos = null;
488        }
489
490        return $pos;
491    }
492
493    public function cancelAtomic( $pos ) {
494        $excisedIds = [];
495        $excisedFnames = [];
496        $newTopSection = $this->currentAtomicSectionId();
497        if ( $pos !== null ) {
498            // Remove all descendant sections and re-index the array
499            $len = count( $this->trxAtomicLevels );
500            for ( $i = $pos + 1; $i < $len; ++$i ) {
501                $excisedFnames[] = $this->trxAtomicLevels[$i][0];
502                $excisedIds[] = $this->trxAtomicLevels[$i][1];
503            }
504            $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
505            $newTopSection = $this->currentAtomicSectionId();
506        }
507
508        // Check if the current section matches $fname
509        $pos = count( $this->trxAtomicLevels ) - 1;
510        [ $savedFname, $savedSectionId, $savepointId ] = $this->trxAtomicLevels[$pos];
511
512        if ( $excisedFnames ) {
513            $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname" .
514                "and descendants " . implode( ', ', $excisedFnames ),
515                [ 'db_log_category' => 'trx' ]
516            );
517        } else {
518            $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname)",
519                [ 'db_log_category' => 'trx' ]
520            );
521        }
522
523        return [ $savedFname, $excisedIds, $newTopSection, $savedSectionId, $savepointId ];
524    }
525
526    /**
527     * @return bool Whether no levels remain and transaction was started by a popped level
528     */
529    public function popAtomicLevel() {
530        array_pop( $this->trxAtomicLevels );
531
532        return !$this->trxAtomicLevels && $this->trxAutomaticAtomic;
533    }
534
535    public function setAutomaticAtomic( $value ) {
536        $this->trxAutomaticAtomic = $value;
537    }
538
539    public function turnOnAutomatic() {
540        $this->trxAutomatic = true;
541    }
542
543    public function nextSavePointId( IDatabase $db, $fname ) {
544        $savepointId = self::SAVEPOINT_PREFIX . ++$this->trxAtomicCounter;
545        if ( strlen( $savepointId ) > 30 ) {
546            // 30 == Oracle's identifier length limit (pre 12c)
547            // With a 22 character prefix, that puts the highest number at 99999999.
548            throw new DBUnexpectedError(
549                $db,
550                'There have been an excessively large number of atomic sections in a transaction'
551                . " started by $this->trxFname (at $fname)"
552            );
553        }
554
555        return $savepointId;
556    }
557
558    public function writesPending() {
559        return $this->trxLevel() && $this->trxWriteCallers;
560    }
561
562    public function onDestruct() {
563        if ( $this->trxLevel() && $this->trxWriteCallers ) {
564            trigger_error( "Uncommitted DB writes (transaction from {$this->trxFname})" );
565        }
566    }
567
568    public function transactionWritingIn( $serverName, $domainId, float $startTime ) {
569        if ( !$this->trxWriteCallers ) {
570            $this->profiler->transactionWritingIn(
571                $serverName,
572                $domainId,
573                (string)$this->trxId,
574                $startTime
575            );
576        }
577    }
578
579    public function transactionWritingOut( IDatabase $db, $oldId ) {
580        if ( $this->trxWriteCallers ) {
581            $this->profiler->transactionWritingOut(
582                $db->getServerName(),
583                $db->getDomainID(),
584                $oldId,
585                $this->pendingWriteQueryDuration( IDatabase::ESTIMATE_TOTAL ),
586                $this->trxWriteAffectedRows
587            );
588        }
589    }
590
591    public function recordQueryCompletion( $sql, $startTime, $isPermWrite, $rowCount, $serverName ) {
592        $this->profiler->recordQueryCompletion(
593            $sql,
594            $startTime,
595            $isPermWrite,
596            $rowCount,
597            (string)$this->trxId,
598            $serverName
599        );
600    }
601
602    public function onTransactionResolution( IDatabase $db, callable $callback, $fname ) {
603        if ( !$this->trxLevel() ) {
604            throw new DBUnexpectedError( $db, "No transaction is active" );
605        }
606        $this->trxEndCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
607    }
608
609    public function addPostCommitOrIdleCallback( callable $callback, $fname ) {
610        $this->trxPostCommitOrIdleCallbacks[] = [
611            $callback,
612            $fname,
613            $this->currentAtomicSectionId()
614        ];
615    }
616
617    final public function addPreCommitOrIdleCallback( callable $callback, $fname ) {
618        $this->trxPreCommitOrIdleCallbacks[] = [
619            $callback,
620            $fname,
621            $this->currentAtomicSectionId()
622        ];
623    }
624
625    public function setTransactionListener( $name, callable $callback = null ) {
626        if ( $callback ) {
627            $this->trxRecurringCallbacks[$name] = $callback;
628        } else {
629            unset( $this->trxRecurringCallbacks[$name] );
630        }
631    }
632
633    /**
634     * Whether to disable running of post-COMMIT/ROLLBACK callbacks
635     * @param bool $suppress
636     */
637    public function setTrxEndCallbackSuppression( bool $suppress ) {
638        $this->trxEndCallbacksSuppressed = $suppress;
639    }
640
641    /**
642     * Hoist callback ownership for callbacks in a section to a parent section.
643     * All callbacks should have an owner that is present in trxAtomicLevels.
644     * @param AtomicSectionIdentifier $old
645     * @param AtomicSectionIdentifier $new
646     */
647    public function reassignCallbacksForSection(
648        AtomicSectionIdentifier $old,
649        AtomicSectionIdentifier $new
650    ) {
651        foreach ( $this->trxPreCommitOrIdleCallbacks as $key => $info ) {
652            if ( $info[2] === $old ) {
653                $this->trxPreCommitOrIdleCallbacks[$key][2] = $new;
654            }
655        }
656        foreach ( $this->trxPostCommitOrIdleCallbacks as $key => $info ) {
657            if ( $info[2] === $old ) {
658                $this->trxPostCommitOrIdleCallbacks[$key][2] = $new;
659            }
660        }
661        foreach ( $this->trxEndCallbacks as $key => $info ) {
662            if ( $info[2] === $old ) {
663                $this->trxEndCallbacks[$key][2] = $new;
664            }
665        }
666        foreach ( $this->trxSectionCancelCallbacks as $key => $info ) {
667            if ( $info[2] === $old ) {
668                $this->trxSectionCancelCallbacks[$key][2] = $new;
669            }
670        }
671    }
672
673    /**
674     * Update callbacks that were owned by cancelled atomic sections.
675     *
676     * Callbacks for "on commit" should never be run if they're owned by a
677     * section that won't be committed.
678     *
679     * Callbacks for "on resolution" need to reflect that the section was
680     * rolled back, even if the transaction as a whole commits successfully.
681     *
682     * Callbacks for "on section cancel" should already have been consumed,
683     * but errors during the cancellation itself can prevent that while still
684     * destroying the section. Hoist any such callbacks to the new top section,
685     * which we assume will itself have to be cancelled or rolled back to
686     * resolve the error.
687     *
688     * @param AtomicSectionIdentifier[] $excisedSectionsId Cancelled section IDs
689     * @param AtomicSectionIdentifier|null $newSectionId New top section ID
690     * @throws UnexpectedValueException
691     */
692    public function modifyCallbacksForCancel(
693        array $excisedSectionsId,
694        AtomicSectionIdentifier $newSectionId = null
695    ) {
696        // Cancel the "on commit" callbacks owned by this savepoint
697        $this->trxPostCommitOrIdleCallbacks = array_filter(
698            $this->trxPostCommitOrIdleCallbacks,
699            static function ( $entry ) use ( $excisedSectionsId ) {
700                return !in_array( $entry[2], $excisedSectionsId, true );
701            }
702        );
703        $this->trxPreCommitOrIdleCallbacks = array_filter(
704            $this->trxPreCommitOrIdleCallbacks,
705            static function ( $entry ) use ( $excisedSectionsId ) {
706                return !in_array( $entry[2], $excisedSectionsId, true );
707            }
708        );
709        // Make "on resolution" callbacks owned by this savepoint to perceive a rollback
710        foreach ( $this->trxEndCallbacks as $key => $entry ) {
711            if ( in_array( $entry[2], $excisedSectionsId, true ) ) {
712                $callback = $entry[0];
713                $this->trxEndCallbacks[$key][0] = static function ( $t, $db ) use ( $callback ) {
714                    return $callback( IDatabase::TRIGGER_ROLLBACK, $db );
715                };
716                // This "on resolution" callback no longer belongs to a section.
717                $this->trxEndCallbacks[$key][2] = null;
718            }
719        }
720        // Hoist callback ownership for section cancel callbacks to the new top section
721        foreach ( $this->trxSectionCancelCallbacks as $key => $entry ) {
722            if ( in_array( $entry[2], $excisedSectionsId, true ) ) {
723                $this->trxSectionCancelCallbacks[$key][2] = $newSectionId;
724            }
725        }
726    }
727
728    public function consumeEndCallbacks( $trigger ): array {
729        $callbackEntries = array_merge(
730            $this->trxPostCommitOrIdleCallbacks,
731            $this->trxEndCallbacks,
732            ( $trigger === IDatabase::TRIGGER_ROLLBACK )
733                ? $this->trxSectionCancelCallbacks
734                : [] // just consume them
735        );
736        $this->trxPostCommitOrIdleCallbacks = []; // consumed (and recursion guard)
737        $this->trxEndCallbacks = []; // consumed (recursion guard)
738        $this->trxSectionCancelCallbacks = []; // consumed (recursion guard)
739
740        return $callbackEntries;
741    }
742
743    /**
744     * Consume and run any relevant "on atomic section cancel" callbacks for the active transaction
745     *
746     * @param IDatabase $db
747     * @param int $trigger IDatabase::TRIGGER_* constant
748     * @param AtomicSectionIdentifier[] $sectionIds IDs of the sections that where just cancelled
749     * @throws Throwable Any exception thrown by a callback
750     */
751    public function runOnAtomicSectionCancelCallbacks( IDatabase $db, int $trigger, array $sectionIds ) {
752        // Drain the queue of matching "atomic section cancel" callbacks until there are none
753        $unrelatedCallbackEntries = [];
754        do {
755            $callbackEntries = $this->trxSectionCancelCallbacks;
756            $this->trxSectionCancelCallbacks = []; // consumed (recursion guard)
757            foreach ( $callbackEntries as $entry ) {
758                if ( in_array( $entry[2], $sectionIds, true ) ) {
759                    try {
760                        $entry[0]( $trigger, $db );
761                    } catch ( Throwable $trxError ) {
762                        $this->setTransactionError( $trxError );
763                        throw $trxError;
764                    }
765                } else {
766                    $unrelatedCallbackEntries[] = $entry;
767                }
768            }
769            // @phan-suppress-next-line PhanImpossibleConditionInLoop
770        } while ( $this->trxSectionCancelCallbacks );
771
772        $this->trxSectionCancelCallbacks = $unrelatedCallbackEntries;
773    }
774
775    /**
776     * Consume and run any "on transaction pre-commit" callbacks
777     *
778     * @param IDatabase $db
779     * @return int Number of callbacks attempted
780     * @throws Throwable Any exception thrown by a callback
781     */
782    public function runOnTransactionPreCommitCallbacks( IDatabase $db ): int {
783        $count = 0;
784
785        // Drain the queues of transaction "precommit" callbacks until it is empty
786        do {
787            $callbackEntries = $this->trxPreCommitOrIdleCallbacks;
788            $this->trxPreCommitOrIdleCallbacks = []; // consumed (and recursion guard)
789            $count += count( $callbackEntries );
790            foreach ( $callbackEntries as $entry ) {
791                try {
792                    $entry[0]( $db );
793                } catch ( Throwable $trxError ) {
794                    $this->setTransactionError( $trxError );
795                    throw $trxError;
796                }
797            }
798            // @phan-suppress-next-line PhanImpossibleConditionInLoop
799        } while ( $this->trxPreCommitOrIdleCallbacks );
800
801        return $count;
802    }
803
804    public function clearPreEndCallbacks() {
805        $this->trxPostCommitOrIdleCallbacks = [];
806        $this->trxPreCommitOrIdleCallbacks = [];
807    }
808
809    public function clearEndCallbacks() {
810        $this->trxEndCallbacks = []; // don't copy
811        $this->trxSectionCancelCallbacks = []; // don't copy
812    }
813
814    public function writesOrCallbacksPending(): bool {
815        return $this->trxLevel() && (
816                $this->trxWriteCallers ||
817                $this->trxPostCommitOrIdleCallbacks ||
818                $this->trxPreCommitOrIdleCallbacks ||
819                $this->trxEndCallbacks ||
820                $this->trxSectionCancelCallbacks
821            );
822    }
823
824    /**
825     * List the methods that have write queries or callbacks for the current transaction
826     *
827     * @return string[]
828     */
829    public function pendingWriteAndCallbackCallers(): array {
830        $fnames = $this->pendingWriteCallers();
831        foreach ( [
832            $this->trxPostCommitOrIdleCallbacks,
833            $this->trxPreCommitOrIdleCallbacks,
834            $this->trxEndCallbacks,
835            $this->trxSectionCancelCallbacks
836        ] as $callbacks ) {
837            foreach ( $callbacks as $callback ) {
838                $fnames[] = $callback[1];
839            }
840        }
841
842        return $fnames;
843    }
844
845    /**
846     * List the methods that have precommit callbacks for the current transaction
847     *
848     * @return string[]
849     */
850    public function pendingPreCommitCallbackCallers(): array {
851        $fnames = $this->pendingWriteCallers();
852        foreach ( $this->trxPreCommitOrIdleCallbacks as $callback ) {
853            $fnames[] = $callback[1];
854        }
855
856        return $fnames;
857    }
858
859    public function isEndCallbacksSuppressed(): bool {
860        return $this->trxEndCallbacksSuppressed;
861    }
862
863    public function getRecurringCallbacks() {
864        return $this->trxRecurringCallbacks;
865    }
866
867    public function countPostCommitOrIdleCallbacks() {
868        return count( $this->trxPostCommitOrIdleCallbacks );
869    }
870
871    /**
872     * @param string $mode One of IDatabase::TRANSACTION_* values
873     * @param string $fname method name
874     * @param float $rtt Trivial query round-trip-delay
875     */
876    public function onBeginInCriticalSection( $mode, $fname, $rtt ) {
877        $this->trxId = new TransactionIdentifier();
878        $this->setTrxStatusToOk();
879        $this->resetTrxAtomicLevels();
880        $this->trxWriteCallers = [];
881        $this->trxWriteDuration = 0.0;
882        $this->trxWriteQueryCount = 0;
883        $this->trxWriteAffectedRows = 0;
884        $this->trxWriteAdjDuration = 0.0;
885        $this->trxWriteAdjQueryCount = 0;
886        // T147697: make explicitTrxActive() return true until begin() finishes. This way,
887        // no caller triggered by getApproximateLagStatus() will think its OK to muck around
888        // with the transaction just because startAtomic() has not yet finished updating the
889        // tracking fields (e.g. trxAtomicLevels).
890        $this->trxAutomatic = ( $mode === IDatabase::TRANSACTION_INTERNAL );
891        $this->trxAutomaticAtomic = false;
892        $this->trxFname = $fname;
893        $this->trxTimestamp = microtime( true );
894        $this->trxRoundTripDelay = $rtt;
895    }
896
897    public function onRollbackInCriticalSection( IDatabase $db ) {
898        $oldTrxId = $this->consumeTrxId();
899        $this->setTrxStatusToNone();
900        $this->resetTrxAtomicLevels();
901        $this->clearPreEndCallbacks();
902        $this->transactionWritingOut( $db, (string)$oldTrxId );
903    }
904
905    public function onCommitInCriticalSection( IDatabase $db ) {
906        $lastWriteTime = null;
907
908        $oldTrxId = $this->consumeTrxId();
909        $this->setTrxStatusToNone();
910        if ( $this->trxWriteCallers ) {
911            $lastWriteTime = microtime( true );
912            $this->transactionWritingOut( $db, (string)$oldTrxId );
913        }
914
915        return $lastWriteTime;
916    }
917
918    public function onSessionLoss( IDatabase $db ) {
919        $oldTrxId = $this->consumeTrxId();
920        $this->clearPreEndCallbacks();
921        $this->transactionWritingOut( $db, (string)$oldTrxId );
922    }
923
924    public function onEndAtomicInCriticalSection( $sectionId ) {
925        // Hoist callback ownership for callbacks in the section that just ended;
926        // all callbacks should have an owner that is present in trxAtomicLevels.
927        $currentSectionId = $this->currentAtomicSectionId();
928        if ( $currentSectionId ) {
929            $this->reassignCallbacksForSection( $sectionId, $currentSectionId );
930        }
931    }
932
933    public function onFlushSnapshot( IDatabase $db, $fname, $flush, $trxRoundId ) {
934        if ( $this->explicitTrxActive() ) {
935            // Committing this transaction would break callers that assume it is still open
936            throw new DBUnexpectedError(
937                $db,
938                "$fname: Cannot flush snapshot; " .
939                "explicit transaction '{$this->trxFname}' is still open"
940            );
941        } elseif ( $this->writesOrCallbacksPending() ) {
942            // This only flushes transactions to clear snapshots, not to write data
943            $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() );
944            throw new DBUnexpectedError(
945                $db,
946                "$fname: Cannot flush snapshot; " .
947                "writes from transaction {$this->trxFname} are still pending ($fnames)"
948            );
949        } elseif (
950            $this->trxLevel() &&
951            $trxRoundId &&
952            $flush !== IDatabase::FLUSHING_INTERNAL &&
953            $flush !== IDatabase::FLUSHING_ALL_PEERS
954        ) {
955            $this->logger->warning(
956                "$fname: Expected mass snapshot flush of all peer transactions " .
957                "in the explicit transactions round '{$trxRoundId}'",
958                [
959                    'exception' => new RuntimeException(),
960                    'db_log_category' => 'trx'
961                ]
962            );
963        }
964    }
965
966    public function onGetScopedLockAndFlush( IDatabase $db, $fname ) {
967        if ( $this->writesOrCallbacksPending() ) {
968            // This only flushes transactions to clear snapshots, not to write data
969            $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() );
970            throw new DBUnexpectedError(
971                $db,
972                "$fname: Cannot flush pre-lock snapshot; " .
973                "writes from transaction {$this->trxFname} are still pending ($fnames)"
974            );
975        }
976    }
977}