Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 390
0.00% covered (danger)
0.00%
0 / 66
CRAP
0.00% covered (danger)
0.00%
0 / 1
TransactionManager
0.00% covered (danger)
0.00%
0 / 390
0.00% covered (danger)
0.00%
0 / 66
25440
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 trxLevel
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 newTrxId
0.00% covered (danger)
0.00%
0 / 17
0.00% covered (danger)
0.00%
0 / 1
2
 getTrxId
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 consumeTrxId
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 trxTimestamp
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 trxStatus
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setTrxStatusToOk
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 setTrxStatusToNone
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 assertTransactionStatus
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
20
 assertSessionStatus
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 setTransactionError
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 setTrxStatusIgnoredCause
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 sessionStatus
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 setSessionError
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 clearSessionError
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 calculateLastTrxApplyTime
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 pendingWriteCallers
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 updateTrxWriteQueryReport
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
30
 pendingWriteQueryDuration
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 flatAtomicSectionList
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 resetTrxAtomicLevels
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 explicitTrxActive
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
12
 trxCheckBeforeClose
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
42
 onAtomicSectionCancel
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
12
 onCancelAtomicBeforeCriticalSection
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
12
 currentAtomicSectionId
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 addToAtomicLevels
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 onBeginTransaction
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 onCommit
0.00% covered (danger)
0.00%
0 / 29
0.00% covered (danger)
0.00%
0 / 1
90
 onEndAtomic
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
20
 getPositionFromSectionId
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 cancelAtomic
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
20
 popAtomicLevel
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 isClean
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 setAutomaticAtomic
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 turnOnAutomatic
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 nextSavePointId
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 writesPending
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 onDestruct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
12
 transactionWritingIn
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 transactionWritingOut
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 recordQueryCompletion
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
2
 onTransactionResolution
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 addPostCommitOrIdleCallback
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 addPreCommitOrIdleCallback
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 setTransactionListener
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 setTrxEndCallbackSuppression
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 reassignCallbacksForSection
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
90
 modifyCallbacksForCancel
0.00% covered (danger)
0.00%
0 / 22
0.00% covered (danger)
0.00%
0 / 1
30
 consumeEndCallbacks
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 runOnAtomicSectionCancelCallbacks
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
20
 runOnTransactionPreCommitCallbacks
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
12
 clearPreEndCallbacks
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 clearEndCallbacks
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 writesOrCallbacksPending
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
42
 pendingWriteAndCallbackCallers
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
12
 pendingPreCommitCallbackCallers
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 isEndCallbacksSuppressed
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getRecurringCallbacks
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 countPostCommitOrIdleCallbacks
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 onRollback
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 onCommitInCriticalSection
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 onEndAtomicInCriticalSection
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 onFlushSnapshot
0.00% covered (danger)
0.00%
0 / 24
0.00% covered (danger)
0.00%
0 / 1
56
 onGetScopedLockAndFlush
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20namespace Wikimedia\Rdbms;
21
22use Psr\Log\LoggerInterface;
23use Psr\Log\NullLogger;
24use RuntimeException;
25use Throwable;
26use UnexpectedValueException;
27
28/**
29 * @ingroup Database
30 * @internal This class should not be used outside of Database
31 */
32class TransactionManager {
33    /** Transaction is in a error state requiring a full or savepoint rollback */
34    public const STATUS_TRX_ERROR = 1;
35    /** Transaction is active and in a normal state */
36    public const STATUS_TRX_OK = 2;
37    /** No transaction is active */
38    public const STATUS_TRX_NONE = 3;
39
40    /** Session is in a error state requiring a reset */
41    public const STATUS_SESS_ERROR = 1;
42    /** Session is in a normal state */
43    public const STATUS_SESS_OK = 2;
44
45    /** @var float Guess of how many seconds it takes to replicate a small insert */
46    private const TINY_WRITE_SEC = 0.010;
47    /** @var float Consider a write slow if it took more than this many seconds */
48    private const SLOW_WRITE_SEC = 0.500;
49    /** Assume an insert of this many rows or less should be fast to replicate */
50    private const SMALL_WRITE_ROWS = 100;
51
52    /** @var string Prefix to the atomic section counter used to make savepoint IDs */
53    private const SAVEPOINT_PREFIX = 'wikimedia_rdbms_atomic';
54
55    /** @var TransactionIdentifier|null Application-side ID of the active transaction; null if none */
56    private $trxId;
57    /** @var float|null UNIX timestamp at the time of BEGIN for the last transaction */
58    private $trxTimestamp = null;
59    /** @var float|null Round trip time estimate for queries during this transaction */
60    private $trxRoundTripDelay = null;
61    /** @var int Transaction status */
62    private $trxStatus = self::STATUS_TRX_NONE;
63    /** @var Throwable|null The cause of any unresolved transaction state error, or, null */
64    private $trxStatusCause;
65    /** @var array|null Details of the last statement-rollback error for the last transaction */
66    private $trxStatusIgnoredCause;
67
68    /** @var Throwable|null The cause of any unresolved session state loss error, or, null */
69    private $sessionError;
70
71    /** @var string[] Write query callers of the current transaction */
72    private $trxWriteCallers = [];
73    /** @var float Seconds spent in write queries for the current transaction */
74    private $trxWriteDuration = 0.0;
75    /** @var int Number of write queries for the current transaction */
76    private $trxWriteQueryCount = 0;
77    /** @var int Number of rows affected by write queries for the current transaction */
78    private $trxWriteAffectedRows = 0;
79    /** @var float Like trxWriteQueryCount but excludes lock-bound, easy to replicate, queries */
80    private $trxWriteAdjDuration = 0.0;
81    /** @var int Number of write queries counted in trxWriteAdjDuration */
82    private $trxWriteAdjQueryCount = 0;
83
84    /** @var array List of (name, unique ID, savepoint ID) for each active atomic section level */
85    private $trxAtomicLevels = [];
86    /** @var bool Whether the current transaction was started implicitly due to DBO_TRX */
87    private $trxAutomatic = false;
88    /** @var bool Whether the current transaction was started implicitly by startAtomic() */
89    private $trxAutomaticAtomic = false;
90
91    /** @var string|null Name of the function that start the last transaction */
92    private $trxFname = null;
93    /** @var int Counter for atomic savepoint identifiers (reset with each transaction) */
94    private $trxAtomicCounter = 0;
95
96    /** @var array[] List of (callable, method name, atomic section id) */
97    private $trxPostCommitOrIdleCallbacks = [];
98    /** @var array[] List of (callable, method name, atomic section id) */
99    private $trxPreCommitOrIdleCallbacks = [];
100    /**
101     * @var array[] List of (callable, method name, atomic section id)
102     * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}>
103     */
104    private $trxEndCallbacks = [];
105    /** @var array[] List of (callable, method name, atomic section id) */
106    private $trxSectionCancelCallbacks = [];
107    /** @var callable[] Map of (name => callable) */
108    private $trxRecurringCallbacks = [];
109    /** @var bool Whether to suppress triggering of transaction end callbacks */
110    private $trxEndCallbacksSuppressed = false;
111
112    /** @var LoggerInterface */
113    private $logger;
114    /** @var TransactionProfiler */
115    private $profiler;
116
117    public function __construct( LoggerInterface $logger = null, $profiler = null ) {
118        $this->logger = $logger ?? new NullLogger();
119        $this->profiler = $profiler ?? new TransactionProfiler();
120    }
121
122    public function trxLevel() {
123        return $this->trxId ? 1 : 0;
124    }
125
126    /**
127     * TODO: This should be removed once all usages have been migrated here
128     * @param string $mode One of IDatabase::TRANSACTION_* values
129     * @param string $fname method name
130     * @param float $rtt Trivial query round-trip-delay
131     */
132    public function newTrxId( $mode, $fname, $rtt ) {
133        $this->trxId = new TransactionIdentifier();
134        $this->trxStatus = self::STATUS_TRX_OK;
135        $this->trxStatusCause = null;
136        $this->trxStatusIgnoredCause = null;
137        $this->trxWriteDuration = 0.0;
138        $this->trxWriteQueryCount = 0;
139        $this->trxWriteAffectedRows = 0;
140        $this->trxWriteAdjDuration = 0.0;
141        $this->trxWriteAdjQueryCount = 0;
142        $this->trxWriteCallers = [];
143        $this->trxAtomicLevels = [];
144        // T147697: make explicitTrxActive() return true until begin() finishes. This way,
145        // no caller triggered by getApproximateLagStatus() will think its OK to muck around
146        // with the transaction just because startAtomic() has not yet finished updating the
147        // tracking fields (e.g. trxAtomicLevels).
148        $this->trxAutomatic = ( $mode === IDatabase::TRANSACTION_INTERNAL );
149        $this->trxAutomaticAtomic = false;
150        $this->trxFname = $fname;
151        $this->trxAtomicCounter = 0;
152        $this->trxTimestamp = microtime( true );
153        $this->trxRoundTripDelay = $rtt;
154    }
155
156    /**
157     * Get the application-side transaction identifier instance
158     *
159     * @return TransactionIdentifier Token for the active transaction; null if there isn't one
160     */
161    public function getTrxId() {
162        return $this->trxId;
163    }
164
165    /**
166     * Reset the application-side transaction identifier instance and return the old one
167     *
168     * This will become private soon.
169     * @return TransactionIdentifier|null The old transaction token; null if there wasn't one
170     */
171    public function consumeTrxId() {
172        $old = $this->trxId;
173        $this->trxId = null;
174        $this->trxAtomicCounter = 0;
175
176        return $old;
177    }
178
179    public function trxTimestamp(): ?float {
180        return $this->trxLevel() ? $this->trxTimestamp : null;
181    }
182
183    /**
184     * @return int One of the STATUS_TRX_* class constants
185     */
186    public function trxStatus(): int {
187        return $this->trxStatus;
188    }
189
190    public function setTrxStatusToOk() {
191        $this->trxStatus = self::STATUS_TRX_OK;
192        $this->trxStatusCause = null;
193        $this->trxStatusIgnoredCause = null;
194    }
195
196    public function setTrxStatusToNone() {
197        $this->trxStatus = self::STATUS_TRX_NONE;
198        $this->trxStatusCause = null;
199        $this->trxStatusIgnoredCause = null;
200    }
201
202    public function assertTransactionStatus( IDatabase $db, $deprecationLogger, $fname ) {
203        if ( $this->trxStatus < self::STATUS_TRX_OK ) {
204            throw new DBTransactionStateError(
205                $db,
206                "Cannot execute query from $fname while transaction status is ERROR",
207                [],
208                $this->trxStatusCause
209            );
210        } elseif ( $this->trxStatus === self::STATUS_TRX_OK && $this->trxStatusIgnoredCause ) {
211            [ $iLastError, $iLastErrno, $iFname ] = $this->trxStatusIgnoredCause;
212            call_user_func( $deprecationLogger,
213                "Caller from $fname ignored an error originally raised from $iFname" .
214                "[$iLastErrno$iLastError"
215            );
216            $this->trxStatusIgnoredCause = null;
217        }
218    }
219
220    public function assertSessionStatus( IDatabase $db, $fname ) {
221        if ( $this->sessionError ) {
222            throw new DBSessionStateError(
223                $db,
224                "Cannot execute query from $fname while session status is ERROR",
225                [],
226                $this->sessionError
227            );
228        }
229    }
230
231    /**
232     * Mark the transaction as requiring rollback (STATUS_TRX_ERROR) due to an error
233     *
234     * @param Throwable $trxError
235     */
236    public function setTransactionError( Throwable $trxError ) {
237        if ( $this->trxStatus > self::STATUS_TRX_ERROR ) {
238            $this->trxStatus = self::STATUS_TRX_ERROR;
239            $this->trxStatusCause = $trxError;
240        }
241    }
242
243    /**
244     * @param array|null $trxStatusIgnoredCause
245     */
246    public function setTrxStatusIgnoredCause( ?array $trxStatusIgnoredCause ): void {
247        $this->trxStatusIgnoredCause = $trxStatusIgnoredCause;
248    }
249
250    /**
251     * Get the status of the current session (ephemeral server-side state tied to the connection)
252     *
253     * @return int One of the STATUS_SESSION_* class constants
254     */
255    public function sessionStatus() {
256        // Check if an unresolved error still exists
257        return ( $this->sessionError ) ? self::STATUS_SESS_ERROR : self::STATUS_SESS_OK;
258    }
259
260    /**
261     * Flag the session as needing a reset due to an error, if not already flagged
262     *
263     * @param Throwable $sessionError
264     */
265    public function setSessionError( Throwable $sessionError ) {
266        $this->sessionError ??= $sessionError;
267    }
268
269    /**
270     * Unflag the session as needing a reset due to an error
271     */
272    public function clearSessionError() {
273        $this->sessionError = null;
274    }
275
276    /**
277     * @param float $rtt
278     * @return float Time to apply writes to replicas based on trxWrite* fields
279     */
280    private function calculateLastTrxApplyTime( float $rtt ) {
281        $rttAdjTotal = $this->trxWriteAdjQueryCount * $rtt;
282        $applyTime = max( $this->trxWriteAdjDuration - $rttAdjTotal, 0 );
283        // For omitted queries, make them count as something at least
284        $omitted = $this->trxWriteQueryCount - $this->trxWriteAdjQueryCount;
285        $applyTime += self::TINY_WRITE_SEC * $omitted;
286
287        return $applyTime;
288    }
289
290    public function pendingWriteCallers() {
291        return $this->trxLevel() ? $this->trxWriteCallers : [];
292    }
293
294    /**
295     * Update the estimated run-time of a query, not counting large row lock times
296     *
297     * LoadBalancer can be set to rollback transactions that will create huge replication
298     * lag. It bases this estimate off of pendingWriteQueryDuration(). Certain simple
299     * queries, like inserting a row can take a long time due to row locking. This method
300     * uses some simple heuristics to discount those cases.
301     *
302     * @param string $queryVerb action in the write query
303     * @param float $runtime Total runtime, including RTT
304     * @param int $affected Affected row count
305     * @param string $fname method name invoking the action
306     */
307    public function updateTrxWriteQueryReport( $queryVerb, $runtime, $affected, $fname ) {
308        // Whether this is indicative of replica DB runtime (except for RBR or ws_repl)
309        $indicativeOfReplicaRuntime = true;
310        if ( $runtime > self::SLOW_WRITE_SEC ) {
311            // insert(), upsert(), replace() are fast unless bulky in size or blocked on locks
312            if ( $queryVerb === 'INSERT' ) {
313                $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS;
314            } elseif ( $queryVerb === 'REPLACE' ) {
315                $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS / 2;
316            }
317        }
318
319        $this->trxWriteDuration += $runtime;
320        $this->trxWriteQueryCount += 1;
321        $this->trxWriteAffectedRows += $affected;
322        if ( $indicativeOfReplicaRuntime ) {
323            $this->trxWriteAdjDuration += $runtime;
324            $this->trxWriteAdjQueryCount += 1;
325        }
326
327        $this->trxWriteCallers[] = $fname;
328    }
329
330    public function pendingWriteQueryDuration( $type = IDatabase::ESTIMATE_TOTAL ) {
331        if ( !$this->trxLevel() ) {
332            return false;
333        } elseif ( !$this->trxWriteCallers ) {
334            return 0.0;
335        }
336
337        if ( $type === IDatabase::ESTIMATE_DB_APPLY ) {
338            return $this->calculateLastTrxApplyTime( $this->trxRoundTripDelay );
339        }
340
341        return $this->trxWriteDuration;
342    }
343
344    /**
345     * @return string
346     */
347    private function flatAtomicSectionList() {
348        return array_reduce( $this->trxAtomicLevels, static function ( $accum, $v ) {
349            return $accum === null ? $v[0] : "$accum" . $v[0];
350        } );
351    }
352
353    public function resetTrxAtomicLevels() {
354        $this->trxAtomicLevels = [];
355    }
356
357    public function explicitTrxActive() {
358        return $this->trxLevel() && ( $this->trxAtomicLevels || !$this->trxAutomatic );
359    }
360
361    public function trxCheckBeforeClose( IDatabaseForOwner $db, $fname ) {
362        $error = null;
363        if ( $this->trxAtomicLevels ) {
364            // Cannot let incomplete atomic sections be committed
365            $levels = $this->flatAtomicSectionList();
366            $error = "$fname: atomic sections $levels are still open";
367        } elseif ( $this->trxAutomatic ) {
368            // Only the connection manager can commit non-empty DBO_TRX transactions
369            // (empty ones we can silently roll back)
370            if ( $db->writesOrCallbacksPending() ) {
371                $error = "$fname" .
372                    "expected mass rollback of all peer transactions (DBO_TRX set)";
373            }
374        } else {
375            // Manual transactions should have been committed or rolled
376            // back, even if empty.
377            $error = "$fname: transaction is still open (from {$this->trxFname})";
378        }
379
380        if ( $this->trxEndCallbacksSuppressed && $error === null ) {
381            $error = "$fname: callbacks are suppressed; cannot properly commit";
382        }
383
384        return $error;
385    }
386
387    public function onAtomicSectionCancel( IDatabase $db, $callback, $fname ): void {
388        if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
389            throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" );
390        }
391        $this->trxSectionCancelCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
392    }
393
394    public function onCancelAtomicBeforeCriticalSection( IDatabase $db, $fname ): void {
395        if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
396            throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" );
397        }
398    }
399
400    /**
401     * @return AtomicSectionIdentifier|null ID of the topmost atomic section level
402     */
403    public function currentAtomicSectionId(): ?AtomicSectionIdentifier {
404        if ( $this->trxLevel() && $this->trxAtomicLevels ) {
405            $levelInfo = end( $this->trxAtomicLevels );
406
407            return $levelInfo[1];
408        }
409
410        return null;
411    }
412
413    public function addToAtomicLevels( $fname, AtomicSectionIdentifier $sectionId, $savepointId ) {
414        $this->trxAtomicLevels[] = [ $fname, $sectionId, $savepointId ];
415        $this->logger->debug( 'startAtomic: entering level ' .
416            ( count( $this->trxAtomicLevels ) - 1 ) . " ($fname)", [ 'db_log_category' => 'trx' ] );
417    }
418
419    public function onBeginTransaction( IDatabase $db, $fname ): void {
420        // @phan-suppress-previous-line PhanPluginNeverReturnMethod
421        if ( $this->trxAtomicLevels ) {
422            $levels = $this->flatAtomicSectionList();
423            $msg = "$fname: got explicit BEGIN while atomic section(s) $levels are open";
424            throw new DBUnexpectedError( $db, $msg );
425        } elseif ( !$this->trxAutomatic ) {
426            $msg = "$fname: explicit transaction already active (from {$this->trxFname})";
427            throw new DBUnexpectedError( $db, $msg );
428        } else {
429            $msg = "$fname: implicit transaction already active (from {$this->trxFname})";
430            throw new DBUnexpectedError( $db, $msg );
431        }
432    }
433
434    /**
435     * @param IDatabase $db
436     * @param string $fname
437     * @param string $flush one of IDatabase::FLUSHING_* values
438     * @return bool false if the commit should go aborted, true otherwise.
439     */
440    public function onCommit( IDatabase $db, $fname, $flush ): bool {
441        if ( $this->trxLevel() && $this->trxAtomicLevels ) {
442            // There are still atomic sections open; this cannot be ignored
443            $levels = $this->flatAtomicSectionList();
444            throw new DBUnexpectedError(
445                $db,
446                "$fname: got COMMIT while atomic sections $levels are still open"
447            );
448        }
449
450        if ( $flush === IDatabase::FLUSHING_INTERNAL || $flush === IDatabase::FLUSHING_ALL_PEERS ) {
451            if ( !$this->trxLevel() ) {
452                return false; // nothing to do
453            } elseif ( !$this->trxAutomatic ) {
454                throw new DBUnexpectedError(
455                    $db,
456                    "$fname: flushing an explicit transaction, getting out of sync"
457                );
458            }
459        } elseif ( !$this->trxLevel() ) {
460            $this->logger->error(
461                "$fname: no transaction to commit, something got out of sync",
462                [
463                    'exception' => new RuntimeException(),
464                    'db_log_category' => 'trx'
465                ]
466            );
467
468            return false; // nothing to do
469        } elseif ( $this->trxAutomatic ) {
470            throw new DBUnexpectedError(
471                $db,
472                "$fname: expected mass commit of all peer transactions (DBO_TRX set)"
473            );
474        }
475        return true;
476    }
477
478    public function onEndAtomic( IDatabase $db, $fname ): array {
479        if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
480            throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" );
481        }
482        // Check if the current section matches $fname
483        $pos = count( $this->trxAtomicLevels ) - 1;
484        [ $savedFname, $sectionId, $savepointId ] = $this->trxAtomicLevels[$pos];
485        $this->logger->debug( "endAtomic: leaving level $pos ($fname)", [ 'db_log_category' => 'trx' ] );
486
487        if ( $savedFname !== $fname ) {
488            throw new DBUnexpectedError(
489                $db,
490                "Invalid atomic section ended (got $fname but expected $savedFname)"
491            );
492        }
493
494        return [ $savepointId, $sectionId ];
495    }
496
497    public function getPositionFromSectionId( AtomicSectionIdentifier $sectionId = null ): ?int {
498        if ( $sectionId !== null ) {
499            // Find the (last) section with the given $sectionId
500            $pos = -1;
501            foreach ( $this->trxAtomicLevels as $i => [ , $asId, ] ) {
502                if ( $asId === $sectionId ) {
503                    $pos = $i;
504                }
505            }
506        } else {
507            $pos = null;
508        }
509
510        return $pos;
511    }
512
513    public function cancelAtomic( $pos ) {
514        $excisedIds = [];
515        $excisedFnames = [];
516        $newTopSection = $this->currentAtomicSectionId();
517        if ( $pos !== null ) {
518            // Remove all descendant sections and re-index the array
519            $len = count( $this->trxAtomicLevels );
520            for ( $i = $pos + 1; $i < $len; ++$i ) {
521                $excisedFnames[] = $this->trxAtomicLevels[$i][0];
522                $excisedIds[] = $this->trxAtomicLevels[$i][1];
523            }
524            $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
525            $newTopSection = $this->currentAtomicSectionId();
526        }
527
528        // Check if the current section matches $fname
529        $pos = count( $this->trxAtomicLevels ) - 1;
530        [ $savedFname, $savedSectionId, $savepointId ] = $this->trxAtomicLevels[$pos];
531
532        if ( $excisedFnames ) {
533            $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname" .
534                "and descendants " . implode( ', ', $excisedFnames ),
535                [ 'db_log_category' => 'trx' ]
536            );
537        } else {
538            $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname)",
539                [ 'db_log_category' => 'trx' ]
540            );
541        }
542
543        return [ $savedFname, $excisedIds, $newTopSection, $savedSectionId, $savepointId ];
544    }
545
546    public function popAtomicLevel() {
547        array_pop( $this->trxAtomicLevels );
548    }
549
550    public function isClean() {
551        return !$this->trxAtomicLevels && $this->trxAutomaticAtomic;
552    }
553
554    public function setAutomaticAtomic( $value ) {
555        $this->trxAutomaticAtomic = $value;
556    }
557
558    public function turnOnAutomatic() {
559        $this->trxAutomatic = true;
560    }
561
562    public function nextSavePointId( IDatabase $db, $fname ) {
563        $savepointId = self::SAVEPOINT_PREFIX . ++$this->trxAtomicCounter;
564        if ( strlen( $savepointId ) > 30 ) {
565            // 30 == Oracle's identifier length limit (pre 12c)
566            // With a 22 character prefix, that puts the highest number at 99999999.
567            throw new DBUnexpectedError(
568                $db,
569                'There have been an excessively large number of atomic sections in a transaction'
570                . " started by $this->trxFname (at $fname)"
571            );
572        }
573
574        return $savepointId;
575    }
576
577    public function writesPending() {
578        return $this->trxLevel() && $this->trxWriteCallers;
579    }
580
581    public function onDestruct() {
582        if ( $this->trxLevel() && $this->trxWriteCallers ) {
583            trigger_error( "Uncommitted DB writes (transaction from {$this->trxFname})" );
584        }
585    }
586
587    public function transactionWritingIn( $serverName, $domainId, float $startTime ) {
588        if ( !$this->trxWriteCallers ) {
589            $this->profiler->transactionWritingIn(
590                $serverName,
591                $domainId,
592                (string)$this->trxId,
593                $startTime
594            );
595        }
596    }
597
598    public function transactionWritingOut( IDatabase $db, $oldId ) {
599        if ( $this->trxWriteCallers ) {
600            $this->profiler->transactionWritingOut(
601                $db->getServerName(),
602                $db->getDomainID(),
603                $oldId,
604                $this->pendingWriteQueryDuration( IDatabase::ESTIMATE_TOTAL ),
605                $this->trxWriteAffectedRows
606            );
607        }
608    }
609
610    public function recordQueryCompletion( $sql, $startTime, $isPermWrite, $rowCount, $serverName ) {
611        $this->profiler->recordQueryCompletion(
612            $sql,
613            $startTime,
614            $isPermWrite,
615            $rowCount,
616            (string)$this->trxId,
617            $serverName
618        );
619    }
620
621    public function onTransactionResolution( IDatabase $db, callable $callback, $fname ) {
622        if ( !$this->trxLevel() ) {
623            throw new DBUnexpectedError( $db, "No transaction is active" );
624        }
625        $this->trxEndCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
626    }
627
628    public function addPostCommitOrIdleCallback( callable $callback, $fname = __METHOD__ ) {
629        $this->trxPostCommitOrIdleCallbacks[] = [
630            $callback,
631            $fname,
632            $this->currentAtomicSectionId()
633        ];
634    }
635
636    final public function addPreCommitOrIdleCallback( callable $callback, $fname = __METHOD__ ) {
637        $this->trxPreCommitOrIdleCallbacks[] = [
638            $callback,
639            $fname,
640            $this->currentAtomicSectionId()
641        ];
642    }
643
644    public function setTransactionListener( $name, callable $callback = null ) {
645        if ( $callback ) {
646            $this->trxRecurringCallbacks[$name] = $callback;
647        } else {
648            unset( $this->trxRecurringCallbacks[$name] );
649        }
650    }
651
652    /**
653     * Whether to disable running of post-COMMIT/ROLLBACK callbacks
654     * @param bool $suppress
655     */
656    public function setTrxEndCallbackSuppression( bool $suppress ) {
657        $this->trxEndCallbacksSuppressed = $suppress;
658    }
659
660    /**
661     * Hoist callback ownership for callbacks in a section to a parent section.
662     * All callbacks should have an owner that is present in trxAtomicLevels.
663     * @param AtomicSectionIdentifier $old
664     * @param AtomicSectionIdentifier $new
665     */
666    public function reassignCallbacksForSection(
667        AtomicSectionIdentifier $old,
668        AtomicSectionIdentifier $new
669    ) {
670        foreach ( $this->trxPreCommitOrIdleCallbacks as $key => $info ) {
671            if ( $info[2] === $old ) {
672                $this->trxPreCommitOrIdleCallbacks[$key][2] = $new;
673            }
674        }
675        foreach ( $this->trxPostCommitOrIdleCallbacks as $key => $info ) {
676            if ( $info[2] === $old ) {
677                $this->trxPostCommitOrIdleCallbacks[$key][2] = $new;
678            }
679        }
680        foreach ( $this->trxEndCallbacks as $key => $info ) {
681            if ( $info[2] === $old ) {
682                $this->trxEndCallbacks[$key][2] = $new;
683            }
684        }
685        foreach ( $this->trxSectionCancelCallbacks as $key => $info ) {
686            if ( $info[2] === $old ) {
687                $this->trxSectionCancelCallbacks[$key][2] = $new;
688            }
689        }
690    }
691
692    /**
693     * Update callbacks that were owned by cancelled atomic sections.
694     *
695     * Callbacks for "on commit" should never be run if they're owned by a
696     * section that won't be committed.
697     *
698     * Callbacks for "on resolution" need to reflect that the section was
699     * rolled back, even if the transaction as a whole commits successfully.
700     *
701     * Callbacks for "on section cancel" should already have been consumed,
702     * but errors during the cancellation itself can prevent that while still
703     * destroying the section. Hoist any such callbacks to the new top section,
704     * which we assume will itself have to be cancelled or rolled back to
705     * resolve the error.
706     *
707     * @param AtomicSectionIdentifier[] $excisedSectionsId Cancelled section IDs
708     * @param AtomicSectionIdentifier|null $newSectionId New top section ID
709     * @throws UnexpectedValueException
710     */
711    public function modifyCallbacksForCancel(
712        array $excisedSectionsId,
713        AtomicSectionIdentifier $newSectionId = null
714    ) {
715        // Cancel the "on commit" callbacks owned by this savepoint
716        $this->trxPostCommitOrIdleCallbacks = array_filter(
717            $this->trxPostCommitOrIdleCallbacks,
718            static function ( $entry ) use ( $excisedSectionsId ) {
719                return !in_array( $entry[2], $excisedSectionsId, true );
720            }
721        );
722        $this->trxPreCommitOrIdleCallbacks = array_filter(
723            $this->trxPreCommitOrIdleCallbacks,
724            static function ( $entry ) use ( $excisedSectionsId ) {
725                return !in_array( $entry[2], $excisedSectionsId, true );
726            }
727        );
728        // Make "on resolution" callbacks owned by this savepoint to perceive a rollback
729        foreach ( $this->trxEndCallbacks as $key => $entry ) {
730            if ( in_array( $entry[2], $excisedSectionsId, true ) ) {
731                $callback = $entry[0];
732                $this->trxEndCallbacks[$key][0] = static function ( $t, $db ) use ( $callback ) {
733                    return $callback( IDatabase::TRIGGER_ROLLBACK, $db );
734                };
735                // This "on resolution" callback no longer belongs to a section.
736                $this->trxEndCallbacks[$key][2] = null;
737            }
738        }
739        // Hoist callback ownership for section cancel callbacks to the new top section
740        foreach ( $this->trxSectionCancelCallbacks as $key => $entry ) {
741            if ( in_array( $entry[2], $excisedSectionsId, true ) ) {
742                $this->trxSectionCancelCallbacks[$key][2] = $newSectionId;
743            }
744        }
745    }
746
747    public function consumeEndCallbacks( $trigger ): array {
748        $callbackEntries = array_merge(
749            $this->trxPostCommitOrIdleCallbacks,
750            $this->trxEndCallbacks,
751            ( $trigger === IDatabase::TRIGGER_ROLLBACK )
752                ? $this->trxSectionCancelCallbacks
753                : [] // just consume them
754        );
755        $this->trxPostCommitOrIdleCallbacks = []; // consumed (and recursion guard)
756        $this->trxEndCallbacks = []; // consumed (recursion guard)
757        $this->trxSectionCancelCallbacks = []; // consumed (recursion guard)
758
759        return $callbackEntries;
760    }
761
762    /**
763     * Consume and run any relevant "on atomic section cancel" callbacks for the active transaction
764     *
765     * @param IDatabase $db
766     * @param int $trigger IDatabase::TRIGGER_* constant
767     * @param AtomicSectionIdentifier[] $sectionIds IDs of the sections that where just cancelled
768     * @throws Throwable Any exception thrown by a callback
769     */
770    public function runOnAtomicSectionCancelCallbacks( IDatabase $db, int $trigger, array $sectionIds ) {
771        // Drain the queue of matching "atomic section cancel" callbacks until there are none
772        $unrelatedCallbackEntries = [];
773        do {
774            $callbackEntries = $this->trxSectionCancelCallbacks;
775            $this->trxSectionCancelCallbacks = []; // consumed (recursion guard)
776            foreach ( $callbackEntries as $entry ) {
777                if ( in_array( $entry[2], $sectionIds, true ) ) {
778                    try {
779                        // @phan-suppress-next-line PhanUndeclaredInvokeInCallable
780                        $entry[0]( $trigger, $db );
781                    } catch ( Throwable $trxError ) {
782                        $this->setTransactionError( $trxError );
783                        throw $trxError;
784                    }
785                } else {
786                    $unrelatedCallbackEntries[] = $entry;
787                }
788            }
789            // @phan-suppress-next-line PhanImpossibleConditionInLoop
790        } while ( $this->trxSectionCancelCallbacks );
791
792        $this->trxSectionCancelCallbacks = $unrelatedCallbackEntries;
793    }
794
795    /**
796     * Consume and run any "on transaction pre-commit" callbacks
797     *
798     * @param IDatabase $db
799     * @return int Number of callbacks attempted
800     * @throws Throwable Any exception thrown by a callback
801     */
802    public function runOnTransactionPreCommitCallbacks( IDatabase $db ): int {
803        $count = 0;
804
805        // Drain the queues of transaction "precommit" callbacks until it is empty
806        do {
807            $callbackEntries = $this->trxPreCommitOrIdleCallbacks;
808            $this->trxPreCommitOrIdleCallbacks = []; // consumed (and recursion guard)
809            $count += count( $callbackEntries );
810            foreach ( $callbackEntries as $entry ) {
811                try {
812                    // @phan-suppress-next-line PhanUndeclaredInvokeInCallable
813                    $entry[0]( $db );
814                } catch ( Throwable $trxError ) {
815                    $this->setTransactionError( $trxError );
816                    throw $trxError;
817                }
818            }
819            // @phan-suppress-next-line PhanImpossibleConditionInLoop
820        } while ( $this->trxPreCommitOrIdleCallbacks );
821
822        return $count;
823    }
824
825    public function clearPreEndCallbacks() {
826        $this->trxPostCommitOrIdleCallbacks = [];
827        $this->trxPreCommitOrIdleCallbacks = [];
828    }
829
830    public function clearEndCallbacks() {
831        $this->trxEndCallbacks = []; // don't copy
832        $this->trxSectionCancelCallbacks = []; // don't copy
833    }
834
835    public function writesOrCallbacksPending(): bool {
836        return $this->trxLevel() && (
837                $this->trxWriteCallers ||
838                $this->trxPostCommitOrIdleCallbacks ||
839                $this->trxPreCommitOrIdleCallbacks ||
840                $this->trxEndCallbacks ||
841                $this->trxSectionCancelCallbacks
842            );
843    }
844
845    /**
846     * List the methods that have write queries or callbacks for the current transaction
847     *
848     * @return string[]
849     */
850    public function pendingWriteAndCallbackCallers(): array {
851        $fnames = $this->pendingWriteCallers();
852        foreach ( [
853            $this->trxPostCommitOrIdleCallbacks,
854            $this->trxPreCommitOrIdleCallbacks,
855            $this->trxEndCallbacks,
856            $this->trxSectionCancelCallbacks
857        ] as $callbacks ) {
858            foreach ( $callbacks as $callback ) {
859                $fnames[] = $callback[1];
860            }
861        }
862
863        return $fnames;
864    }
865
866    /**
867     * List the methods that have precommit callbacks for the current transaction
868     *
869     * @return string[]
870     */
871    public function pendingPreCommitCallbackCallers(): array {
872        $fnames = $this->pendingWriteCallers();
873        foreach ( $this->trxPreCommitOrIdleCallbacks as $callback ) {
874            $fnames[] = $callback[1];
875        }
876
877        return $fnames;
878    }
879
880    public function isEndCallbacksSuppressed(): bool {
881        return $this->trxEndCallbacksSuppressed;
882    }
883
884    public function getRecurringCallbacks() {
885        return $this->trxRecurringCallbacks;
886    }
887
888    public function countPostCommitOrIdleCallbacks() {
889        return count( $this->trxPostCommitOrIdleCallbacks );
890    }
891
892    public function onRollback( IDatabase $db ) {
893        $oldTrxId = $this->consumeTrxId();
894        $this->setTrxStatusToNone();
895        $this->resetTrxAtomicLevels();
896        $this->clearPreEndCallbacks();
897        $this->transactionWritingOut( $db, (string)$oldTrxId );
898    }
899
900    public function onCommitInCriticalSection( IDatabase $db ) {
901        $lastWriteTime = null;
902        $oldTrxId = $this->consumeTrxId();
903        $this->setTrxStatusToNone();
904        if ( $this->trxWriteCallers ) {
905            $lastWriteTime = microtime( true );
906            $this->transactionWritingOut( $db, (string)$oldTrxId );
907        }
908        return $lastWriteTime;
909    }
910
911    public function onEndAtomicInCriticalSection( $sectionId ) {
912        // Hoist callback ownership for callbacks in the section that just ended;
913        // all callbacks should have an owner that is present in trxAtomicLevels.
914        $currentSectionId = $this->currentAtomicSectionId();
915        if ( $currentSectionId ) {
916            $this->reassignCallbacksForSection( $sectionId, $currentSectionId );
917        }
918    }
919
920    public function onFlushSnapshot( IDatabase $db, $fname, $flush, $trxRoundId ) {
921        if ( $this->explicitTrxActive() ) {
922            // Committing this transaction would break callers that assume it is still open
923            throw new DBUnexpectedError(
924                $db,
925                "$fname: Cannot flush snapshot; " .
926                "explicit transaction '{$this->trxFname}' is still open"
927            );
928        } elseif ( $this->writesOrCallbacksPending() ) {
929            // This only flushes transactions to clear snapshots, not to write data
930            $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() );
931            throw new DBUnexpectedError(
932                $db,
933                "$fname: Cannot flush snapshot; " .
934                "writes from transaction {$this->trxFname} are still pending ($fnames)"
935            );
936        } elseif (
937            $this->trxLevel() &&
938            $trxRoundId &&
939            $flush !== IDatabase::FLUSHING_INTERNAL &&
940            $flush !== IDatabase::FLUSHING_ALL_PEERS
941        ) {
942            $this->logger->warning(
943                "$fname: Expected mass snapshot flush of all peer transactions " .
944                "in the explicit transactions round '{$trxRoundId}'",
945                [
946                    'exception' => new RuntimeException(),
947                    'db_log_category' => 'trx'
948                ]
949            );
950        }
951    }
952
953    public function onGetScopedLockAndFlush( IDatabase $db, $fname ) {
954        if ( $this->writesOrCallbacksPending() ) {
955            // This only flushes transactions to clear snapshots, not to write data
956            $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() );
957            throw new DBUnexpectedError(
958                $db,
959                "$fname: Cannot flush pre-lock snapshot; " .
960                "writes from transaction {$this->trxFname} are still pending ($fnames)"
961            );
962        }
963    }
964}