Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 391
0.00% covered (danger)
0.00%
0 / 66
CRAP
0.00% covered (danger)
0.00%
0 / 1
TransactionManager
0.00% covered (danger)
0.00%
0 / 391
0.00% covered (danger)
0.00%
0 / 66
25760
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 trxLevel
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 newTrxId
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
2
 getTrxId
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 consumeTrxId
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 trxTimestamp
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 trxStatus
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setTrxStatusToOk
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 setTrxStatusToNone
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 assertTransactionStatus
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
20
 assertSessionStatus
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 setTransactionError
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 setTrxStatusIgnoredCause
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 sessionStatus
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 setSessionError
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 clearSessionError
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 calculateLastTrxApplyTime
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 pendingWriteCallers
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 updateTrxWriteQueryReport
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
30
 pendingWriteQueryDuration
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 flatAtomicSectionList
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 resetTrxAtomicLevels
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 explicitTrxActive
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
12
 trxCheckBeforeClose
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
42
 onAtomicSectionCancel
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
12
 onCancelAtomicBeforeCriticalSection
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
12
 currentAtomicSectionId
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 addToAtomicLevels
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 onBeginTransaction
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 onCommit
0.00% covered (danger)
0.00%
0 / 29
0.00% covered (danger)
0.00%
0 / 1
90
 onEndAtomic
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
20
 getPositionFromSectionId
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 cancelAtomic
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
20
 popAtomicLevel
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 isClean
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 setAutomaticAtomic
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 turnOnAutomatic
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 nextSavePointId
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 writesPending
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 onDestruct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
12
 transactionWritingIn
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
12
 transactionWritingOut
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 recordQueryCompletion
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
2
 onTransactionResolution
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 addPostCommitOrIdleCallback
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 addPreCommitOrIdleCallback
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 setTransactionListener
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 setTrxEndCallbackSuppression
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 reassignCallbacksForSection
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
90
 modifyCallbacksForCancel
0.00% covered (danger)
0.00%
0 / 22
0.00% covered (danger)
0.00%
0 / 1
30
 consumeEndCallbacks
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 runOnAtomicSectionCancelCallbacks
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
20
 runOnTransactionPreCommitCallbacks
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
12
 clearPreEndCallbacks
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 clearEndCallbacks
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 writesOrCallbacksPending
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
42
 pendingWriteAndCallbackCallers
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
12
 pendingPreCommitCallbackCallers
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 isEndCallbacksSuppressed
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getRecurringCallbacks
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 countPostCommitOrIdleCallbacks
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 onRollback
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 onCommitInCriticalSection
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 onEndAtomicInCriticalSection
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 onFlushSnapshot
0.00% covered (danger)
0.00%
0 / 24
0.00% covered (danger)
0.00%
0 / 1
56
 onGetScopedLockAndFlush
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20namespace Wikimedia\Rdbms;
21
22use Psr\Log\LoggerInterface;
23use Psr\Log\NullLogger;
24use RuntimeException;
25use Throwable;
26use UnexpectedValueException;
27
28/**
29 * @ingroup Database
30 * @internal This class should not be used outside of Database
31 */
32class TransactionManager {
33    /** @var int Transaction is in a error state requiring a full or savepoint rollback */
34    public const STATUS_TRX_ERROR = 1;
35    /** @var int Transaction is active and in a normal state */
36    public const STATUS_TRX_OK = 2;
37    /** @var int No transaction is active */
38    public const STATUS_TRX_NONE = 3;
39
40    /** Session is in a error state requiring a reset */
41    public const STATUS_SESS_ERROR = 1;
42    /** Session is in a normal state */
43    public const STATUS_SESS_OK = 2;
44
45    /** @var float Guess of how many seconds it takes to replicate a small insert */
46    private const TINY_WRITE_SEC = 0.010;
47    /** @var float Consider a write slow if it took more than this many seconds */
48    private const SLOW_WRITE_SEC = 0.500;
49    /** @var int Assume an insert of this many rows or less should be fast to replicate */
50    private const SMALL_WRITE_ROWS = 100;
51
52    /** @var string Prefix to the atomic section counter used to make savepoint IDs */
53    private const SAVEPOINT_PREFIX = 'wikimedia_rdbms_atomic';
54
55    /** @var TransactionIdentifier|null Application-side ID of the active transaction; null if none */
56    private $trxId;
57    /** @var float|null UNIX timestamp at the time of BEGIN for the last transaction */
58    private $trxTimestamp = null;
59    /** @var float|null Round trip time estimate for queries during this transaction */
60    private $trxRoundTripDelay = null;
61    /** @var int Transaction status */
62    private $trxStatus = self::STATUS_TRX_NONE;
63    /** @var Throwable|null The cause of any unresolved transaction state error, or, null */
64    private $trxStatusCause;
65    /** @var array|null Details of the last statement-rollback error for the last transaction */
66    private $trxStatusIgnoredCause;
67
68    /** @var Throwable|null The cause of any unresolved session state loss error, or, null */
69    private $sessionError;
70
71    /** @var string[] Write query callers of the current transaction */
72    private $trxWriteCallers = [];
73    /** @var float Seconds spent in write queries for the current transaction */
74    private $trxWriteDuration = 0.0;
75    /** @var int Number of write queries for the current transaction */
76    private $trxWriteQueryCount = 0;
77    /** @var int Number of rows affected by write queries for the current transaction */
78    private $trxWriteAffectedRows = 0;
79    /** @var float Like trxWriteQueryCount but excludes lock-bound, easy to replicate, queries */
80    private $trxWriteAdjDuration = 0.0;
81    /** @var int Number of write queries counted in trxWriteAdjDuration */
82    private $trxWriteAdjQueryCount = 0;
83
84    /** @var array List of (name, unique ID, savepoint ID) for each active atomic section level */
85    private $trxAtomicLevels = [];
86    /** @var bool Whether the current transaction was started implicitly due to DBO_TRX */
87    private $trxAutomatic = false;
88    /** @var bool Whether the current transaction was started implicitly by startAtomic() */
89    private $trxAutomaticAtomic = false;
90
91    /** @var string|null Name of the function that start the last transaction */
92    private $trxFname = null;
93    /** @var bool Whether possible write queries were done in the last transaction started */
94    private $trxDoneWrites = false;
95    /** @var int Counter for atomic savepoint identifiers (reset with each transaction) */
96    private $trxAtomicCounter = 0;
97
98    /** @var array[] List of (callable, method name, atomic section id) */
99    private $trxPostCommitOrIdleCallbacks = [];
100    /** @var array[] List of (callable, method name, atomic section id) */
101    private $trxPreCommitOrIdleCallbacks = [];
102    /**
103     * @var array[] List of (callable, method name, atomic section id)
104     * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}>
105     */
106    private $trxEndCallbacks = [];
107    /** @var array[] List of (callable, method name, atomic section id) */
108    private $trxSectionCancelCallbacks = [];
109    /** @var callable[] Map of (name => callable) */
110    private $trxRecurringCallbacks = [];
111    /** @var bool Whether to suppress triggering of transaction end callbacks */
112    private $trxEndCallbacksSuppressed = false;
113
114    /** @var LoggerInterface */
115    private $logger;
116    /** @var TransactionProfiler */
117    private $profiler;
118
119    public function __construct( LoggerInterface $logger = null, $profiler = null ) {
120        $this->logger = $logger ?? new NullLogger();
121        $this->profiler = $profiler ?? new TransactionProfiler();
122    }
123
124    public function trxLevel() {
125        return $this->trxId ? 1 : 0;
126    }
127
128    /**
129     * TODO: This should be removed once all usages have been migrated here
130     * @param string $mode One of IDatabase::TRANSACTION_* values
131     * @param string $fname method name
132     * @param float $rtt Trivial query round-trip-delay
133     */
134    public function newTrxId( $mode, $fname, $rtt ) {
135        $this->trxId = new TransactionIdentifier();
136        $this->trxStatus = self::STATUS_TRX_OK;
137        $this->trxStatusCause = null;
138        $this->trxStatusIgnoredCause = null;
139        $this->trxWriteDuration = 0.0;
140        $this->trxWriteQueryCount = 0;
141        $this->trxWriteAffectedRows = 0;
142        $this->trxWriteAdjDuration = 0.0;
143        $this->trxWriteAdjQueryCount = 0;
144        $this->trxWriteCallers = [];
145        $this->trxAtomicLevels = [];
146        // T147697: make explicitTrxActive() return true until begin() finishes. This way,
147        // no caller triggered by getApproximateLagStatus() will think its OK to muck around
148        // with the transaction just because startAtomic() has not yet finished updating the
149        // tracking fields (e.g. trxAtomicLevels).
150        $this->trxAutomatic = ( $mode === IDatabase::TRANSACTION_INTERNAL );
151        $this->trxAutomaticAtomic = false;
152        $this->trxFname = $fname;
153        $this->trxDoneWrites = false;
154        $this->trxAtomicCounter = 0;
155        $this->trxTimestamp = microtime( true );
156        $this->trxRoundTripDelay = $rtt;
157    }
158
159    /**
160     * Get the application-side transaction identifier instance
161     *
162     * @return TransactionIdentifier Token for the active transaction; null if there isn't one
163     */
164    public function getTrxId() {
165        return $this->trxId;
166    }
167
168    /**
169     * Reset the application-side transaction identifier instance and return the old one
170     *
171     * This will become private soon.
172     * @return TransactionIdentifier|null The old transaction token; null if there wasn't one
173     */
174    public function consumeTrxId() {
175        $old = $this->trxId;
176        $this->trxId = null;
177        $this->trxAtomicCounter = 0;
178
179        return $old;
180    }
181
182    public function trxTimestamp(): ?float {
183        return $this->trxLevel() ? $this->trxTimestamp : null;
184    }
185
186    /**
187     * @return int One of the STATUS_TRX_* class constants
188     */
189    public function trxStatus(): int {
190        return $this->trxStatus;
191    }
192
193    public function setTrxStatusToOk() {
194        $this->trxStatus = self::STATUS_TRX_OK;
195        $this->trxStatusCause = null;
196        $this->trxStatusIgnoredCause = null;
197    }
198
199    public function setTrxStatusToNone() {
200        $this->trxStatus = self::STATUS_TRX_NONE;
201        $this->trxStatusCause = null;
202        $this->trxStatusIgnoredCause = null;
203    }
204
205    public function assertTransactionStatus( IDatabase $db, $deprecationLogger, $fname ) {
206        if ( $this->trxStatus < self::STATUS_TRX_OK ) {
207            throw new DBTransactionStateError(
208                $db,
209                "Cannot execute query from $fname while transaction status is ERROR",
210                [],
211                $this->trxStatusCause
212            );
213        } elseif ( $this->trxStatus === self::STATUS_TRX_OK && $this->trxStatusIgnoredCause ) {
214            [ $iLastError, $iLastErrno, $iFname ] = $this->trxStatusIgnoredCause;
215            call_user_func( $deprecationLogger,
216                "Caller from $fname ignored an error originally raised from $iFname" .
217                "[$iLastErrno$iLastError"
218            );
219            $this->trxStatusIgnoredCause = null;
220        }
221    }
222
223    public function assertSessionStatus( IDatabase $db, $fname ) {
224        if ( $this->sessionError ) {
225            throw new DBSessionStateError(
226                $db,
227                "Cannot execute query from $fname while session status is ERROR",
228                [],
229                $this->sessionError
230            );
231        }
232    }
233
234    /**
235     * Mark the transaction as requiring rollback (STATUS_TRX_ERROR) due to an error
236     *
237     * @param Throwable $trxError
238     */
239    public function setTransactionError( Throwable $trxError ) {
240        if ( $this->trxStatus > self::STATUS_TRX_ERROR ) {
241            $this->trxStatus = self::STATUS_TRX_ERROR;
242            $this->trxStatusCause = $trxError;
243        }
244    }
245
246    /**
247     * @param array|null $trxStatusIgnoredCause
248     */
249    public function setTrxStatusIgnoredCause( ?array $trxStatusIgnoredCause ): void {
250        $this->trxStatusIgnoredCause = $trxStatusIgnoredCause;
251    }
252
253    /**
254     * Get the status of the current session (ephemeral server-side state tied to the connection)
255     *
256     * @return int One of the STATUS_SESSION_* class constants
257     */
258    public function sessionStatus() {
259        // Check if an unresolved error still exists
260        return ( $this->sessionError ) ? self::STATUS_SESS_ERROR : self::STATUS_SESS_OK;
261    }
262
263    /**
264     * Flag the session as needing a reset due to an error, if not already flagged
265     *
266     * @param Throwable $sessionError
267     */
268    public function setSessionError( Throwable $sessionError ) {
269        $this->sessionError ??= $sessionError;
270    }
271
272    /**
273     * Unflag the session as needing a reset due to an error
274     */
275    public function clearSessionError() {
276        $this->sessionError = null;
277    }
278
279    /**
280     * @param float $rtt
281     * @return float Time to apply writes to replicas based on trxWrite* fields
282     */
283    private function calculateLastTrxApplyTime( float $rtt ) {
284        $rttAdjTotal = $this->trxWriteAdjQueryCount * $rtt;
285        $applyTime = max( $this->trxWriteAdjDuration - $rttAdjTotal, 0 );
286        // For omitted queries, make them count as something at least
287        $omitted = $this->trxWriteQueryCount - $this->trxWriteAdjQueryCount;
288        $applyTime += self::TINY_WRITE_SEC * $omitted;
289
290        return $applyTime;
291    }
292
293    public function pendingWriteCallers() {
294        return $this->trxLevel() ? $this->trxWriteCallers : [];
295    }
296
297    /**
298     * Update the estimated run-time of a query, not counting large row lock times
299     *
300     * LoadBalancer can be set to rollback transactions that will create huge replication
301     * lag. It bases this estimate off of pendingWriteQueryDuration(). Certain simple
302     * queries, like inserting a row can take a long time due to row locking. This method
303     * uses some simple heuristics to discount those cases.
304     *
305     * @param string $queryVerb action in the write query
306     * @param float $runtime Total runtime, including RTT
307     * @param int $affected Affected row count
308     * @param string $fname method name invoking the action
309     */
310    public function updateTrxWriteQueryReport( $queryVerb, $runtime, $affected, $fname ) {
311        // Whether this is indicative of replica DB runtime (except for RBR or ws_repl)
312        $indicativeOfReplicaRuntime = true;
313        if ( $runtime > self::SLOW_WRITE_SEC ) {
314            // insert(), upsert(), replace() are fast unless bulky in size or blocked on locks
315            if ( $queryVerb === 'INSERT' ) {
316                $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS;
317            } elseif ( $queryVerb === 'REPLACE' ) {
318                $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS / 2;
319            }
320        }
321
322        $this->trxWriteDuration += $runtime;
323        $this->trxWriteQueryCount += 1;
324        $this->trxWriteAffectedRows += $affected;
325        if ( $indicativeOfReplicaRuntime ) {
326            $this->trxWriteAdjDuration += $runtime;
327            $this->trxWriteAdjQueryCount += 1;
328        }
329
330        $this->trxWriteCallers[] = $fname;
331    }
332
333    public function pendingWriteQueryDuration( $type = IDatabase::ESTIMATE_TOTAL ) {
334        if ( !$this->trxLevel() ) {
335            return false;
336        } elseif ( !$this->trxDoneWrites ) {
337            return 0.0;
338        }
339
340        if ( $type === IDatabase::ESTIMATE_DB_APPLY ) {
341            return $this->calculateLastTrxApplyTime( $this->trxRoundTripDelay );
342        }
343
344        return $this->trxWriteDuration;
345    }
346
347    /**
348     * @return string
349     */
350    private function flatAtomicSectionList() {
351        return array_reduce( $this->trxAtomicLevels, static function ( $accum, $v ) {
352            return $accum === null ? $v[0] : "$accum" . $v[0];
353        } );
354    }
355
356    public function resetTrxAtomicLevels() {
357        $this->trxAtomicLevels = [];
358    }
359
360    public function explicitTrxActive() {
361        return $this->trxLevel() && ( $this->trxAtomicLevels || !$this->trxAutomatic );
362    }
363
364    public function trxCheckBeforeClose( IDatabase $db, $fname ) {
365        $error = null;
366        if ( $this->trxAtomicLevels ) {
367            // Cannot let incomplete atomic sections be committed
368            $levels = $this->flatAtomicSectionList();
369            $error = "$fname: atomic sections $levels are still open";
370        } elseif ( $this->trxAutomatic ) {
371            // Only the connection manager can commit non-empty DBO_TRX transactions
372            // (empty ones we can silently roll back)
373            if ( $db->writesOrCallbacksPending() ) {
374                $error = "$fname" .
375                    "expected mass rollback of all peer transactions (DBO_TRX set)";
376            }
377        } else {
378            // Manual transactions should have been committed or rolled
379            // back, even if empty.
380            $error = "$fname: transaction is still open (from {$this->trxFname})";
381        }
382
383        if ( $this->trxEndCallbacksSuppressed && $error === null ) {
384            $error = "$fname: callbacks are suppressed; cannot properly commit";
385        }
386
387        return $error;
388    }
389
390    public function onAtomicSectionCancel( IDatabase $db, $callback, $fname ): void {
391        if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
392            throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" );
393        }
394        $this->trxSectionCancelCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
395    }
396
397    public function onCancelAtomicBeforeCriticalSection( IDatabase $db, $fname ): void {
398        if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
399            throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" );
400        }
401    }
402
403    /**
404     * @return AtomicSectionIdentifier|null ID of the topmost atomic section level
405     */
406    public function currentAtomicSectionId(): ?AtomicSectionIdentifier {
407        if ( $this->trxLevel() && $this->trxAtomicLevels ) {
408            $levelInfo = end( $this->trxAtomicLevels );
409
410            return $levelInfo[1];
411        }
412
413        return null;
414    }
415
416    public function addToAtomicLevels( $fname, AtomicSectionIdentifier $sectionId, $savepointId ) {
417        $this->trxAtomicLevels[] = [ $fname, $sectionId, $savepointId ];
418        $this->logger->debug( 'startAtomic: entering level ' .
419            ( count( $this->trxAtomicLevels ) - 1 ) . " ($fname)", [ 'db_log_category' => 'trx' ] );
420    }
421
422    public function onBeginTransaction( IDatabase $db, $fname ): void {
423        // @phan-suppress-previous-line PhanPluginNeverReturnMethod
424        if ( $this->trxAtomicLevels ) {
425            $levels = $this->flatAtomicSectionList();
426            $msg = "$fname: got explicit BEGIN while atomic section(s) $levels are open";
427            throw new DBUnexpectedError( $db, $msg );
428        } elseif ( !$this->trxAutomatic ) {
429            $msg = "$fname: explicit transaction already active (from {$this->trxFname})";
430            throw new DBUnexpectedError( $db, $msg );
431        } else {
432            $msg = "$fname: implicit transaction already active (from {$this->trxFname})";
433            throw new DBUnexpectedError( $db, $msg );
434        }
435    }
436
437    /**
438     * @param IDatabase $db
439     * @param string $fname
440     * @param string $flush one of IDatabase::FLUSHING_* values
441     * @return bool false if the commit should go aborted, true otherwise.
442     */
443    public function onCommit( IDatabase $db, $fname, $flush ): bool {
444        if ( $this->trxLevel() && $this->trxAtomicLevels ) {
445            // There are still atomic sections open; this cannot be ignored
446            $levels = $this->flatAtomicSectionList();
447            throw new DBUnexpectedError(
448                $db,
449                "$fname: got COMMIT while atomic sections $levels are still open"
450            );
451        }
452
453        if ( $flush === IDatabase::FLUSHING_INTERNAL || $flush === IDatabase::FLUSHING_ALL_PEERS ) {
454            if ( !$this->trxLevel() ) {
455                return false; // nothing to do
456            } elseif ( !$this->trxAutomatic ) {
457                throw new DBUnexpectedError(
458                    $db,
459                    "$fname: flushing an explicit transaction, getting out of sync"
460                );
461            }
462        } elseif ( !$this->trxLevel() ) {
463            $this->logger->error(
464                "$fname: no transaction to commit, something got out of sync",
465                [
466                    'exception' => new RuntimeException(),
467                    'db_log_category' => 'trx'
468                ]
469            );
470
471            return false; // nothing to do
472        } elseif ( $this->trxAutomatic ) {
473            throw new DBUnexpectedError(
474                $db,
475                "$fname: expected mass commit of all peer transactions (DBO_TRX set)"
476            );
477        }
478        return true;
479    }
480
481    public function onEndAtomic( IDatabase $db, $fname ): array {
482        if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
483            throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" );
484        }
485        // Check if the current section matches $fname
486        $pos = count( $this->trxAtomicLevels ) - 1;
487        [ $savedFname, $sectionId, $savepointId ] = $this->trxAtomicLevels[$pos];
488        $this->logger->debug( "endAtomic: leaving level $pos ($fname)", [ 'db_log_category' => 'trx' ] );
489
490        if ( $savedFname !== $fname ) {
491            throw new DBUnexpectedError(
492                $db,
493                "Invalid atomic section ended (got $fname but expected $savedFname)"
494            );
495        }
496
497        return [ $savepointId, $sectionId ];
498    }
499
500    public function getPositionFromSectionId( AtomicSectionIdentifier $sectionId = null ): ?int {
501        if ( $sectionId !== null ) {
502            // Find the (last) section with the given $sectionId
503            $pos = -1;
504            foreach ( $this->trxAtomicLevels as $i => [ , $asId, ] ) {
505                if ( $asId === $sectionId ) {
506                    $pos = $i;
507                }
508            }
509        } else {
510            $pos = null;
511        }
512
513        return $pos;
514    }
515
516    public function cancelAtomic( $pos ) {
517        $excisedIds = [];
518        $excisedFnames = [];
519        $newTopSection = $this->currentAtomicSectionId();
520        if ( $pos !== null ) {
521            // Remove all descendant sections and re-index the array
522            $len = count( $this->trxAtomicLevels );
523            for ( $i = $pos + 1; $i < $len; ++$i ) {
524                $excisedFnames[] = $this->trxAtomicLevels[$i][0];
525                $excisedIds[] = $this->trxAtomicLevels[$i][1];
526            }
527            $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
528            $newTopSection = $this->currentAtomicSectionId();
529        }
530
531        // Check if the current section matches $fname
532        $pos = count( $this->trxAtomicLevels ) - 1;
533        [ $savedFname, $savedSectionId, $savepointId ] = $this->trxAtomicLevels[$pos];
534
535        if ( $excisedFnames ) {
536            $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname" .
537                "and descendants " . implode( ', ', $excisedFnames ),
538                [ 'db_log_category' => 'trx' ]
539            );
540        } else {
541            $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname)",
542                [ 'db_log_category' => 'trx' ]
543            );
544        }
545
546        return [ $savedFname, $excisedIds, $newTopSection, $savedSectionId, $savepointId ];
547    }
548
549    public function popAtomicLevel() {
550        array_pop( $this->trxAtomicLevels );
551    }
552
553    public function isClean() {
554        return !$this->trxAtomicLevels && $this->trxAutomaticAtomic;
555    }
556
557    public function setAutomaticAtomic( $value ) {
558        $this->trxAutomaticAtomic = $value;
559    }
560
561    public function turnOnAutomatic() {
562        $this->trxAutomatic = true;
563    }
564
565    public function nextSavePointId( IDatabase $db, $fname ) {
566        $savepointId = self::SAVEPOINT_PREFIX . ++$this->trxAtomicCounter;
567        if ( strlen( $savepointId ) > 30 ) {
568            // 30 == Oracle's identifier length limit (pre 12c)
569            // With a 22 character prefix, that puts the highest number at 99999999.
570            throw new DBUnexpectedError(
571                $db,
572                'There have been an excessively large number of atomic sections in a transaction'
573                . " started by $this->trxFname (at $fname)"
574            );
575        }
576
577        return $savepointId;
578    }
579
580    public function writesPending() {
581        return $this->trxLevel() && $this->trxDoneWrites;
582    }
583
584    public function onDestruct() {
585        if ( $this->trxLevel() && $this->trxDoneWrites ) {
586            trigger_error( "Uncommitted DB writes (transaction from {$this->trxFname})" );
587        }
588    }
589
590    public function transactionWritingIn( $serverName, $domainId ) {
591        if ( $this->trxLevel() && !$this->trxDoneWrites ) {
592            $this->trxDoneWrites = true;
593            $this->profiler->transactionWritingIn(
594                $serverName,
595                $domainId,
596                (string)$this->trxId
597            );
598        }
599    }
600
601    public function transactionWritingOut( IDatabase $db, $oldId ) {
602        if ( $this->trxDoneWrites ) {
603            $this->profiler->transactionWritingOut(
604                $db->getServerName(),
605                $db->getDomainID(),
606                $oldId,
607                $this->pendingWriteQueryDuration( IDatabase::ESTIMATE_TOTAL ),
608                $this->trxWriteAffectedRows
609            );
610        }
611    }
612
613    public function recordQueryCompletion( $sql, $startTime, $isPermWrite, $rowCount, $serverName ) {
614        $this->profiler->recordQueryCompletion(
615            $sql,
616            $startTime,
617            $isPermWrite,
618            $rowCount,
619            (string)$this->trxId,
620            $serverName
621        );
622    }
623
624    public function onTransactionResolution( IDatabase $db, callable $callback, $fname ) {
625        if ( !$this->trxLevel() ) {
626            throw new DBUnexpectedError( $db, "No transaction is active" );
627        }
628        $this->trxEndCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
629    }
630
631    public function addPostCommitOrIdleCallback( callable $callback, $fname = __METHOD__ ) {
632        $this->trxPostCommitOrIdleCallbacks[] = [
633            $callback,
634            $fname,
635            $this->currentAtomicSectionId()
636        ];
637    }
638
639    final public function addPreCommitOrIdleCallback( callable $callback, $fname = __METHOD__ ) {
640        $this->trxPreCommitOrIdleCallbacks[] = [
641            $callback,
642            $fname,
643            $this->currentAtomicSectionId()
644        ];
645    }
646
647    public function setTransactionListener( $name, callable $callback = null ) {
648        if ( $callback ) {
649            $this->trxRecurringCallbacks[$name] = $callback;
650        } else {
651            unset( $this->trxRecurringCallbacks[$name] );
652        }
653    }
654
655    /**
656     * Whether to disable running of post-COMMIT/ROLLBACK callbacks
657     * @param bool $suppress
658     */
659    public function setTrxEndCallbackSuppression( bool $suppress ) {
660        $this->trxEndCallbacksSuppressed = $suppress;
661    }
662
663    /**
664     * Hoist callback ownership for callbacks in a section to a parent section.
665     * All callbacks should have an owner that is present in trxAtomicLevels.
666     * @param AtomicSectionIdentifier $old
667     * @param AtomicSectionIdentifier $new
668     */
669    public function reassignCallbacksForSection(
670        AtomicSectionIdentifier $old,
671        AtomicSectionIdentifier $new
672    ) {
673        foreach ( $this->trxPreCommitOrIdleCallbacks as $key => $info ) {
674            if ( $info[2] === $old ) {
675                $this->trxPreCommitOrIdleCallbacks[$key][2] = $new;
676            }
677        }
678        foreach ( $this->trxPostCommitOrIdleCallbacks as $key => $info ) {
679            if ( $info[2] === $old ) {
680                $this->trxPostCommitOrIdleCallbacks[$key][2] = $new;
681            }
682        }
683        foreach ( $this->trxEndCallbacks as $key => $info ) {
684            if ( $info[2] === $old ) {
685                $this->trxEndCallbacks[$key][2] = $new;
686            }
687        }
688        foreach ( $this->trxSectionCancelCallbacks as $key => $info ) {
689            if ( $info[2] === $old ) {
690                $this->trxSectionCancelCallbacks[$key][2] = $new;
691            }
692        }
693    }
694
695    /**
696     * Update callbacks that were owned by cancelled atomic sections.
697     *
698     * Callbacks for "on commit" should never be run if they're owned by a
699     * section that won't be committed.
700     *
701     * Callbacks for "on resolution" need to reflect that the section was
702     * rolled back, even if the transaction as a whole commits successfully.
703     *
704     * Callbacks for "on section cancel" should already have been consumed,
705     * but errors during the cancellation itself can prevent that while still
706     * destroying the section. Hoist any such callbacks to the new top section,
707     * which we assume will itself have to be cancelled or rolled back to
708     * resolve the error.
709     *
710     * @param IDatabase $db
711     * @param AtomicSectionIdentifier[] $sectionIds ID of an actual savepoint
712     * @param AtomicSectionIdentifier|null $newSectionId New top section ID.
713     * @throws UnexpectedValueException
714     */
715    public function modifyCallbacksForCancel(
716        IDatabase $db,
717        array $sectionIds,
718        AtomicSectionIdentifier $newSectionId = null
719    ) {
720        // Cancel the "on commit" callbacks owned by this savepoint
721        $this->trxPostCommitOrIdleCallbacks = array_filter(
722            $this->trxPostCommitOrIdleCallbacks,
723            static function ( $entry ) use ( $sectionIds ) {
724                return !in_array( $entry[2], $sectionIds, true );
725            }
726        );
727        $this->trxPreCommitOrIdleCallbacks = array_filter(
728            $this->trxPreCommitOrIdleCallbacks,
729            static function ( $entry ) use ( $sectionIds ) {
730                return !in_array( $entry[2], $sectionIds, true );
731            }
732        );
733        // Make "on resolution" callbacks owned by this savepoint to perceive a rollback
734        foreach ( $this->trxEndCallbacks as $key => $entry ) {
735            if ( in_array( $entry[2], $sectionIds, true ) ) {
736                $callback = $entry[0];
737                $this->trxEndCallbacks[$key][0] = static function () use ( $callback, $db ) {
738                    return $callback( IDatabase::TRIGGER_ROLLBACK, $db );
739                };
740                // This "on resolution" callback no longer belongs to a section.
741                $this->trxEndCallbacks[$key][2] = null;
742            }
743        }
744        // Hoist callback ownership for section cancel callbacks to the new top section
745        foreach ( $this->trxSectionCancelCallbacks as $key => $entry ) {
746            if ( in_array( $entry[2], $sectionIds, true ) ) {
747                $this->trxSectionCancelCallbacks[$key][2] = $newSectionId;
748            }
749        }
750    }
751
752    public function consumeEndCallbacks( $trigger ): array {
753        $callbackEntries = array_merge(
754            $this->trxPostCommitOrIdleCallbacks,
755            $this->trxEndCallbacks,
756            ( $trigger === IDatabase::TRIGGER_ROLLBACK )
757                ? $this->trxSectionCancelCallbacks
758                : [] // just consume them
759        );
760        $this->trxPostCommitOrIdleCallbacks = []; // consumed (and recursion guard)
761        $this->trxEndCallbacks = []; // consumed (recursion guard)
762        $this->trxSectionCancelCallbacks = []; // consumed (recursion guard)
763
764        return $callbackEntries;
765    }
766
767    /**
768     * Consume and run any relevant "on atomic section cancel" callbacks for the active transaction
769     *
770     * @param IDatabase $db
771     * @param int $trigger IDatabase::TRIGGER_* constant
772     * @param AtomicSectionIdentifier[] $sectionIds IDs of the sections that where just cancelled
773     * @throws Throwable Any exception thrown by a callback
774     */
775    public function runOnAtomicSectionCancelCallbacks( IDatabase $db, int $trigger, array $sectionIds ) {
776        // Drain the queue of matching "atomic section cancel" callbacks until there are none
777        $unrelatedCallbackEntries = [];
778        do {
779            $callbackEntries = $this->trxSectionCancelCallbacks;
780            $this->trxSectionCancelCallbacks = []; // consumed (recursion guard)
781            foreach ( $callbackEntries as $entry ) {
782                if ( in_array( $entry[2], $sectionIds, true ) ) {
783                    try {
784                        // @phan-suppress-next-line PhanUndeclaredInvokeInCallable
785                        $entry[0]( $trigger, $db );
786                    } catch ( Throwable $trxError ) {
787                        $this->setTransactionError( $trxError );
788                        throw $trxError;
789                    }
790                } else {
791                    $unrelatedCallbackEntries[] = $entry;
792                }
793            }
794            // @phan-suppress-next-line PhanImpossibleConditionInLoop
795        } while ( $this->trxSectionCancelCallbacks );
796
797        $this->trxSectionCancelCallbacks = $unrelatedCallbackEntries;
798    }
799
800    /**
801     * Consume and run any "on transaction pre-commit" callbacks
802     *
803     * @param IDatabase $db
804     * @return int Number of callbacks attempted
805     * @throws Throwable Any exception thrown by a callback
806     */
807    public function runOnTransactionPreCommitCallbacks( IDatabase $db ): int {
808        $count = 0;
809
810        // Drain the queues of transaction "precommit" callbacks until it is empty
811        do {
812            $callbackEntries = $this->trxPreCommitOrIdleCallbacks;
813            $this->trxPreCommitOrIdleCallbacks = []; // consumed (and recursion guard)
814            $count += count( $callbackEntries );
815            foreach ( $callbackEntries as $entry ) {
816                try {
817                    // @phan-suppress-next-line PhanUndeclaredInvokeInCallable
818                    $entry[0]( $db );
819                } catch ( Throwable $trxError ) {
820                    $this->setTransactionError( $trxError );
821                    throw $trxError;
822                }
823            }
824            // @phan-suppress-next-line PhanImpossibleConditionInLoop
825        } while ( $this->trxPreCommitOrIdleCallbacks );
826
827        return $count;
828    }
829
830    public function clearPreEndCallbacks() {
831        $this->trxPostCommitOrIdleCallbacks = [];
832        $this->trxPreCommitOrIdleCallbacks = [];
833    }
834
835    public function clearEndCallbacks() {
836        $this->trxEndCallbacks = []; // don't copy
837        $this->trxSectionCancelCallbacks = []; // don't copy
838    }
839
840    public function writesOrCallbacksPending(): bool {
841        return $this->trxLevel() && (
842                $this->trxDoneWrites ||
843                $this->trxPostCommitOrIdleCallbacks ||
844                $this->trxPreCommitOrIdleCallbacks ||
845                $this->trxEndCallbacks ||
846                $this->trxSectionCancelCallbacks
847            );
848    }
849
850    /**
851     * List the methods that have write queries or callbacks for the current transaction
852     *
853     * @return string[]
854     */
855    public function pendingWriteAndCallbackCallers(): array {
856        $fnames = $this->pendingWriteCallers();
857        foreach ( [
858            $this->trxPostCommitOrIdleCallbacks,
859            $this->trxPreCommitOrIdleCallbacks,
860            $this->trxEndCallbacks,
861            $this->trxSectionCancelCallbacks
862        ] as $callbacks ) {
863            foreach ( $callbacks as $callback ) {
864                $fnames[] = $callback[1];
865            }
866        }
867
868        return $fnames;
869    }
870
871    /**
872     * List the methods that have precommit callbacks for the current transaction
873     *
874     * @return string[]
875     */
876    public function pendingPreCommitCallbackCallers(): array {
877        $fnames = $this->pendingWriteCallers();
878        foreach ( $this->trxPreCommitOrIdleCallbacks as $callback ) {
879            $fnames[] = $callback[1];
880        }
881
882        return $fnames;
883    }
884
885    public function isEndCallbacksSuppressed(): bool {
886        return $this->trxEndCallbacksSuppressed;
887    }
888
889    public function getRecurringCallbacks() {
890        return $this->trxRecurringCallbacks;
891    }
892
893    public function countPostCommitOrIdleCallbacks() {
894        return count( $this->trxPostCommitOrIdleCallbacks );
895    }
896
897    public function onRollback( IDatabase $db ) {
898        $oldTrxId = $this->consumeTrxId();
899        $this->setTrxStatusToNone();
900        $this->resetTrxAtomicLevels();
901        $this->clearPreEndCallbacks();
902        $this->transactionWritingOut( $db, (string)$oldTrxId );
903    }
904
905    public function onCommitInCriticalSection( IDatabase $db ) {
906        $lastWriteTime = null;
907        $oldTrxId = $this->consumeTrxId();
908        $this->setTrxStatusToNone();
909        if ( $this->trxDoneWrites ) {
910            $lastWriteTime = microtime( true );
911            $this->transactionWritingOut( $db, (string)$oldTrxId );
912        }
913        return $lastWriteTime;
914    }
915
916    public function onEndAtomicInCriticalSection( $sectionId ) {
917        // Hoist callback ownership for callbacks in the section that just ended;
918        // all callbacks should have an owner that is present in trxAtomicLevels.
919        $currentSectionId = $this->currentAtomicSectionId();
920        if ( $currentSectionId ) {
921            $this->reassignCallbacksForSection( $sectionId, $currentSectionId );
922        }
923    }
924
925    public function onFlushSnapshot( IDatabase $db, $fname, $flush, $trxRoundId ) {
926        if ( $this->explicitTrxActive() ) {
927            // Committing this transaction would break callers that assume it is still open
928            throw new DBUnexpectedError(
929                $db,
930                "$fname: Cannot flush snapshot; " .
931                "explicit transaction '{$this->trxFname}' is still open"
932            );
933        } elseif ( $this->writesOrCallbacksPending() ) {
934            // This only flushes transactions to clear snapshots, not to write data
935            $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() );
936            throw new DBUnexpectedError(
937                $db,
938                "$fname: Cannot flush snapshot; " .
939                "writes from transaction {$this->trxFname} are still pending ($fnames)"
940            );
941        } elseif (
942            $this->trxLevel() &&
943            $trxRoundId &&
944            $flush !== IDatabase::FLUSHING_INTERNAL &&
945            $flush !== IDatabase::FLUSHING_ALL_PEERS
946        ) {
947            $this->logger->warning(
948                "$fname: Expected mass snapshot flush of all peer transactions " .
949                "in the explicit transactions round '{$trxRoundId}'",
950                [
951                    'exception' => new RuntimeException(),
952                    'db_log_category' => 'trx'
953                ]
954            );
955        }
956    }
957
958    public function onGetScopedLockAndFlush( IDatabase $db, $fname ) {
959        if ( $this->writesOrCallbacksPending() ) {
960            // This only flushes transactions to clear snapshots, not to write data
961            $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() );
962            throw new DBUnexpectedError(
963                $db,
964                "$fname: Cannot flush pre-lock snapshot; " .
965                "writes from transaction {$this->trxFname} are still pending ($fnames)"
966            );
967        }
968    }
969}