Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 364
0.00% covered (danger)
0.00%
0 / 64
CRAP
0.00% covered (danger)
0.00%
0 / 1
TransactionManager
0.00% covered (danger)
0.00%
0 / 364
0.00% covered (danger)
0.00%
0 / 64
21756
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 trxLevel
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 getTrxId
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 consumeTrxId
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 trxTimestamp
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 trxStatus
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setTrxStatusToOk
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 setTrxStatusToNone
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 assertTransactionStatus
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
20
 assertSessionStatus
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 setTransactionError
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 setTrxStatusIgnoredCause
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 sessionStatus
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 setSessionError
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 clearSessionError
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 calculateLastTrxApplyTime
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 pendingWriteCallers
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 updateTrxWriteQueryReport
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
30
 pendingWriteQueryDuration
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 flatAtomicSectionList
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 resetTrxAtomicLevels
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 explicitTrxActive
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
12
 trxCheckBeforeClose
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
42
 onCancelAtomicBeforeCriticalSection
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
12
 currentAtomicSectionId
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 addToAtomicLevels
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 onBegin
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
20
 onCommit
0.00% covered (danger)
0.00%
0 / 29
0.00% covered (danger)
0.00%
0 / 1
90
 onEndAtomic
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
20
 getPositionFromSectionId
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 cancelAtomic
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
20
 popAtomicLevel
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 setAutomaticAtomic
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 turnOnAutomatic
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 nextSavePointId
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 writesPending
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 onDestruct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
12
 transactionWritingIn
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 transactionWritingOut
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 recordQueryCompletion
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
2
 onTransactionResolution
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 addPostCommitOrIdleCallback
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 addPreCommitOrIdleCallback
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 setTransactionListener
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 setTrxEndCallbackSuppression
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 reassignCallbacksForSection
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
56
 modifyCallbacksForCancel
0.00% covered (danger)
0.00%
0 / 19
0.00% covered (danger)
0.00%
0 / 1
12
 consumeEndCallbacks
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
2
 runOnTransactionPreCommitCallbacks
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
12
 clearPreEndCallbacks
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 clearEndCallbacks
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 writesOrCallbacksPending
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
30
 pendingWriteAndCallbackCallers
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 pendingPreCommitCallbackCallers
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 isEndCallbacksSuppressed
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getRecurringCallbacks
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 countPostCommitOrIdleCallbacks
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 onBeginInCriticalSection
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
2
 onRollbackInCriticalSection
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 onCommitInCriticalSection
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 onSessionLoss
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 onEndAtomicInCriticalSection
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 onFlushSnapshot
0.00% covered (danger)
0.00%
0 / 25
0.00% covered (danger)
0.00%
0 / 1
56
 onGetScopedLockAndFlush
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2/**
3 * @license GPL-2.0-or-later
4 * @file
5 */
6namespace Wikimedia\Rdbms;
7
8use Psr\Log\LoggerInterface;
9use Psr\Log\NullLogger;
10use RuntimeException;
11use Throwable;
12use UnexpectedValueException;
13
14/**
15 * @ingroup Database
16 * @internal This class should not be used outside of Database
17 */
18class TransactionManager {
19    /** Transaction is in a error state requiring a full or savepoint rollback */
20    public const STATUS_TRX_ERROR = 1;
21    /** Transaction is active and in a normal state */
22    public const STATUS_TRX_OK = 2;
23    /** No transaction is active */
24    public const STATUS_TRX_NONE = 3;
25
26    /** Session is in a error state requiring a reset */
27    public const STATUS_SESS_ERROR = 1;
28    /** Session is in a normal state */
29    public const STATUS_SESS_OK = 2;
30
31    /** @var float Guess of how many seconds it takes to replicate a small insert */
32    private const TINY_WRITE_SEC = 0.010;
33    /** @var float Consider a write slow if it took more than this many seconds */
34    private const SLOW_WRITE_SEC = 0.500;
35    /** Assume an insert of this many rows or less should be fast to replicate */
36    private const SMALL_WRITE_ROWS = 100;
37
38    /** @var string Prefix to the atomic section counter used to make savepoint IDs */
39    private const SAVEPOINT_PREFIX = 'wikimedia_rdbms_atomic';
40
41    /** @var ?TransactionIdentifier Application-side ID of the active (server-side) transaction */
42    private $trxId;
43    /** @var ?float UNIX timestamp of BEGIN for the last transaction */
44    private $trxTimestamp = null;
45    /** @var ?float Round trip time estimate for queries during the last transaction */
46    private $trxRoundTripDelay = null;
47    /** @var int STATUS_TRX_* constant indicating the transaction lifecycle state */
48    private $trxStatus = self::STATUS_TRX_NONE;
49    /** @var ?Throwable The cause of any unresolved transaction lifecycle state error */
50    private $trxStatusCause;
51    /** @var ?array Details of any unresolved statement-rollback error within a transaction */
52    private $trxStatusIgnoredCause;
53
54    /** @var ?Throwable The cause of any unresolved session state loss error */
55    private $sessionError;
56
57    /** @var string[] Write query callers of the last transaction */
58    private $trxWriteCallers = [];
59    /** @var float Seconds spent in write queries for the last transaction */
60    private $trxWriteDuration = 0.0;
61    /** @var int Number of write queries for the last transaction */
62    private $trxWriteQueryCount = 0;
63    /** @var int Number of rows affected by write queries for the last transaction */
64    private $trxWriteAffectedRows = 0;
65    /** @var float Like trxWriteQueryCount but excludes lock-bound, easy to replicate, queries */
66    private $trxWriteAdjDuration = 0.0;
67    /** @var int Number of write queries counted in trxWriteAdjDuration */
68    private $trxWriteAdjQueryCount = 0;
69
70    /** @var array Pending atomic sections; list of (name, unique ID, savepoint ID) */
71    private $trxAtomicLevels = [];
72    /** @var bool Whether the last transaction was started implicitly due to DBO_TRX */
73    private $trxAutomatic = false;
74    /** @var bool Whether the last transaction was started implicitly by startAtomic() */
75    private $trxAutomaticAtomic = false;
76
77    /** @var ?string Name of the function that started the last transaction */
78    private $trxFname = null;
79    /** @var int Counter for atomic savepoint identifiers for the last transaction */
80    private $trxAtomicCounter = 0;
81
82    /**
83     * @var array[] Pending postcommit callbacks; list of (callable, method name, atomic section id)
84     * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}>
85     */
86    private $trxPostCommitOrIdleCallbacks = [];
87    /**
88     * @var array[] Pending precommit callbacks; list of (callable, method name, atomic section id)
89     * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}>
90     */
91    private $trxPreCommitOrIdleCallbacks = [];
92    /**
93     * @var array[] Pending post-trx callbacks; list of (callable, method name, atomic section id)
94     * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}>
95     */
96    private $trxEndCallbacks = [];
97    /** @var callable[] Listener callbacks; map of (name => callable) */
98    private $trxRecurringCallbacks = [];
99    /** @var bool Whether to suppress triggering of transaction end callbacks */
100    private $trxEndCallbacksSuppressed = false;
101
102    /** @var LoggerInterface */
103    private $logger;
104    /** @var TransactionProfiler */
105    private $profiler;
106
107    public function __construct( ?LoggerInterface $logger = null, ?TransactionProfiler $profiler = null ) {
108        $this->logger = $logger ?? new NullLogger();
109        $this->profiler = $profiler ?? new TransactionProfiler();
110    }
111
112    public function trxLevel(): int {
113        return $this->trxId ? 1 : 0;
114    }
115
116    /**
117     * Get the application-side transaction identifier instance
118     *
119     * @return ?TransactionIdentifier Token for the active transaction; null if there isn't one
120     */
121    public function getTrxId() {
122        return $this->trxId;
123    }
124
125    /**
126     * Reset the application-side transaction identifier instance and return the old one
127     *
128     * @return ?TransactionIdentifier The old transaction token; null if there wasn't one
129     */
130    private function consumeTrxId() {
131        $old = $this->trxId;
132        $this->trxId = null;
133
134        return $old;
135    }
136
137    public function trxTimestamp(): ?float {
138        return $this->trxLevel() ? $this->trxTimestamp : null;
139    }
140
141    /**
142     * @return int One of the STATUS_TRX_* class constants
143     */
144    public function trxStatus(): int {
145        return $this->trxStatus;
146    }
147
148    public function setTrxStatusToOk() {
149        $this->trxStatus = self::STATUS_TRX_OK;
150        $this->trxStatusCause = null;
151        $this->trxStatusIgnoredCause = null;
152    }
153
154    public function setTrxStatusToNone() {
155        $this->trxStatus = self::STATUS_TRX_NONE;
156        $this->trxStatusCause = null;
157        $this->trxStatusIgnoredCause = null;
158    }
159
160    /**
161     * @param IDatabase $db
162     * @param callable $deprecationLogger
163     * @param string $fname
164     */
165    public function assertTransactionStatus( IDatabase $db, $deprecationLogger, $fname ) {
166        if ( $this->trxStatus === self::STATUS_TRX_ERROR ) {
167            throw new DBTransactionStateError(
168                $db,
169                "Cannot execute query from $fname while transaction status is ERROR",
170                [],
171                $this->trxStatusCause
172            );
173        } elseif ( $this->trxStatus === self::STATUS_TRX_OK && $this->trxStatusIgnoredCause ) {
174            [ $iLastError, $iLastErrno, $iFname ] = $this->trxStatusIgnoredCause;
175            $deprecationLogger(
176                "Caller from $fname ignored an error originally raised from $iFname" .
177                "[$iLastErrno$iLastError"
178            );
179            $this->trxStatusIgnoredCause = null;
180        }
181    }
182
183    public function assertSessionStatus( IDatabase $db, string $fname ) {
184        if ( $this->sessionError ) {
185            throw new DBSessionStateError(
186                $db,
187                "Cannot execute query from $fname while session status is ERROR",
188                [],
189                $this->sessionError
190            );
191        }
192    }
193
194    /**
195     * Mark the transaction as requiring rollback (STATUS_TRX_ERROR) due to an error
196     */
197    public function setTransactionError( Throwable $trxError ) {
198        if ( $this->trxStatus !== self::STATUS_TRX_ERROR ) {
199            $this->trxStatus = self::STATUS_TRX_ERROR;
200            $this->trxStatusCause = $trxError;
201        }
202    }
203
204    public function setTrxStatusIgnoredCause( ?array $trxStatusIgnoredCause ): void {
205        $this->trxStatusIgnoredCause = $trxStatusIgnoredCause;
206    }
207
208    /**
209     * Get the status of the current session (ephemeral server-side state tied to the connection)
210     *
211     * @return int One of the STATUS_SESSION_* class constants
212     */
213    public function sessionStatus() {
214        // Check if an unresolved error still exists
215        return ( $this->sessionError ) ? self::STATUS_SESS_ERROR : self::STATUS_SESS_OK;
216    }
217
218    /**
219     * Flag the session as needing a reset due to an error, if not already flagged
220     */
221    public function setSessionError( Throwable $sessionError ) {
222        $this->sessionError ??= $sessionError;
223    }
224
225    /**
226     * Unflag the session as needing a reset due to an error
227     */
228    public function clearSessionError() {
229        $this->sessionError = null;
230    }
231
232    /**
233     * @param float $rtt
234     * @return float Time to apply writes to replicas based on trxWrite* fields
235     */
236    private function calculateLastTrxApplyTime( float $rtt ) {
237        $rttAdjTotal = $this->trxWriteAdjQueryCount * $rtt;
238        $applyTime = max( $this->trxWriteAdjDuration - $rttAdjTotal, 0.0 );
239        // For omitted queries, make them count as something at least
240        $omitted = $this->trxWriteQueryCount - $this->trxWriteAdjQueryCount;
241        $applyTime += self::TINY_WRITE_SEC * $omitted;
242
243        return $applyTime;
244    }
245
246    public function pendingWriteCallers(): array {
247        return $this->trxLevel() ? $this->trxWriteCallers : [];
248    }
249
250    /**
251     * Update the estimated run-time of a query, not counting large row lock times
252     *
253     * LoadBalancer can be set to rollback transactions that will create huge replication
254     * lag. It bases this estimate off of pendingWriteQueryDuration(). Certain simple
255     * queries, like inserting a row can take a long time due to row locking. This method
256     * uses some simple heuristics to discount those cases.
257     *
258     * @param string $queryVerb action in the write query
259     * @param float $runtime Total runtime, including RTT
260     * @param int $affected Affected row count
261     * @param string $fname method name invoking the action
262     */
263    public function updateTrxWriteQueryReport( $queryVerb, $runtime, $affected, $fname ) {
264        // Whether this is indicative of replica DB runtime (except for RBR or ws_repl)
265        $indicativeOfReplicaRuntime = true;
266        if ( $runtime > self::SLOW_WRITE_SEC ) {
267            // insert(), upsert(), replace() are fast unless bulky in size or blocked on locks
268            if ( $queryVerb === 'INSERT' ) {
269                $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS;
270            } elseif ( $queryVerb === 'REPLACE' ) {
271                $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS / 2;
272            }
273        }
274
275        $this->trxWriteDuration += $runtime;
276        $this->trxWriteQueryCount++;
277        $this->trxWriteAffectedRows += $affected;
278        if ( $indicativeOfReplicaRuntime ) {
279            $this->trxWriteAdjDuration += $runtime;
280            $this->trxWriteAdjQueryCount++;
281        }
282
283        $this->trxWriteCallers[] = $fname;
284    }
285
286    public function pendingWriteQueryDuration( string $type = IDatabase::ESTIMATE_TOTAL ): float|false {
287        if ( !$this->trxLevel() ) {
288            return false;
289        } elseif ( !$this->trxWriteCallers ) {
290            return 0.0;
291        }
292
293        if ( $type === IDatabase::ESTIMATE_DB_APPLY ) {
294            return $this->calculateLastTrxApplyTime( $this->trxRoundTripDelay );
295        }
296
297        return $this->trxWriteDuration;
298    }
299
300    /**
301     * @return string
302     */
303    private function flatAtomicSectionList() {
304        return array_reduce( $this->trxAtomicLevels, static function ( $accum, $v ) {
305            return $accum === null ? $v[0] : "$accum" . $v[0];
306        } );
307    }
308
309    public function resetTrxAtomicLevels() {
310        $this->trxAtomicLevels = [];
311        $this->trxAtomicCounter = 0;
312    }
313
314    public function explicitTrxActive(): bool {
315        return $this->trxLevel() && ( $this->trxAtomicLevels || !$this->trxAutomatic );
316    }
317
318    public function trxCheckBeforeClose( IDatabaseForOwner $db, string $fname ): ?string {
319        $error = null;
320        if ( $this->trxAtomicLevels ) {
321            // Cannot let incomplete atomic sections be committed
322            $levels = $this->flatAtomicSectionList();
323            $error = "$fname: atomic sections $levels are still open";
324        } elseif ( $this->trxAutomatic ) {
325            // Only the connection manager can commit non-empty DBO_TRX transactions
326            // (empty ones we can silently roll back)
327            if ( $db->writesOrCallbacksPending() ) {
328                $error = "$fname" .
329                    "expected mass rollback of all peer transactions (DBO_TRX set)";
330            }
331        } else {
332            // Manual transactions should have been committed or rolled
333            // back, even if empty.
334            $error = "$fname: transaction is still open (from {$this->trxFname})";
335        }
336
337        if ( $this->trxEndCallbacksSuppressed && $error === null ) {
338            $error = "$fname: callbacks are suppressed; cannot properly commit";
339        }
340
341        return $error;
342    }
343
344    public function onCancelAtomicBeforeCriticalSection( IDatabase $db, string $fname ): void {
345        if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
346            throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" );
347        }
348    }
349
350    /**
351     * @return AtomicSectionIdentifier|null ID of the topmost atomic section level
352     */
353    public function currentAtomicSectionId(): ?AtomicSectionIdentifier {
354        if ( $this->trxLevel() && $this->trxAtomicLevels ) {
355            $levelInfo = end( $this->trxAtomicLevels );
356
357            return $levelInfo[1];
358        }
359
360        return null;
361    }
362
363    public function addToAtomicLevels( string $fname, AtomicSectionIdentifier $sectionId, ?string $savepointId ) {
364        $this->trxAtomicLevels[] = [ $fname, $sectionId, $savepointId ];
365        $this->logger->debug( 'startAtomic: entering level ' .
366            ( count( $this->trxAtomicLevels ) - 1 ) . " ($fname)", [ 'db_log_category' => 'trx' ] );
367    }
368
369    public function onBegin( IDatabase $db, string $fname ): void {
370        // Protect against mismatched atomic section, transaction nesting, and snapshot loss
371        if ( $this->trxLevel() ) {
372            if ( $this->trxAtomicLevels ) {
373                $levels = $this->flatAtomicSectionList();
374                $msg = "$fname: got explicit BEGIN while atomic section(s) $levels are open";
375                throw new DBUnexpectedError( $db, $msg );
376            } elseif ( !$this->trxAutomatic ) {
377                $msg = "$fname: explicit transaction already active (from {$this->trxFname})";
378                throw new DBUnexpectedError( $db, $msg );
379            } else {
380                $msg = "$fname: implicit transaction already active (from {$this->trxFname})";
381                throw new DBUnexpectedError( $db, $msg );
382            }
383        }
384    }
385
386    /**
387     * @param IDatabase $db
388     * @param string $fname
389     * @param string $flush one of IDatabase::FLUSHING_* values
390     * @return bool false if the commit should go aborted, true otherwise.
391     */
392    public function onCommit( IDatabase $db, $fname, $flush ): bool {
393        if ( $this->trxLevel() && $this->trxAtomicLevels ) {
394            // There are still atomic sections open; this cannot be ignored
395            $levels = $this->flatAtomicSectionList();
396            throw new DBUnexpectedError(
397                $db,
398                "$fname: got COMMIT while atomic sections $levels are still open"
399            );
400        }
401
402        if ( $flush === IDatabase::FLUSHING_INTERNAL || $flush === IDatabase::FLUSHING_ALL_PEERS ) {
403            if ( !$this->trxLevel() ) {
404                return false; // nothing to do
405            } elseif ( !$this->trxAutomatic ) {
406                throw new DBUnexpectedError(
407                    $db,
408                    "$fname: flushing an explicit transaction, getting out of sync"
409                );
410            }
411        } elseif ( !$this->trxLevel() ) {
412            $this->logger->error(
413                "$fname: no transaction to commit, something got out of sync",
414                [
415                    'exception' => new RuntimeException(),
416                    'db_log_category' => 'trx'
417                ]
418            );
419
420            return false; // nothing to do
421        } elseif ( $this->trxAutomatic ) {
422            throw new DBUnexpectedError(
423                $db,
424                "$fname: expected mass commit of all peer transactions (DBO_TRX set)"
425            );
426        }
427        return true;
428    }
429
430    public function onEndAtomic( IDatabase $db, string $fname ): array {
431        if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
432            throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" );
433        }
434        // Check if the current section matches $fname
435        $pos = count( $this->trxAtomicLevels ) - 1;
436        [ $savedFname, $sectionId, $savepointId ] = $this->trxAtomicLevels[$pos];
437        $this->logger->debug( "endAtomic: leaving level $pos ($fname)", [ 'db_log_category' => 'trx' ] );
438
439        if ( $savedFname !== $fname ) {
440            throw new DBUnexpectedError(
441                $db,
442                "Invalid atomic section ended (got $fname but expected $savedFname)"
443            );
444        }
445
446        return [ $savepointId, $sectionId ];
447    }
448
449    public function getPositionFromSectionId( ?AtomicSectionIdentifier $sectionId = null ): ?int {
450        if ( $sectionId !== null ) {
451            // Find the (last) section with the given $sectionId
452            $pos = -1;
453            foreach ( $this->trxAtomicLevels as $i => [ , $asId, ] ) {
454                if ( $asId === $sectionId ) {
455                    $pos = $i;
456                }
457            }
458        } else {
459            $pos = null;
460        }
461
462        return $pos;
463    }
464
465    public function cancelAtomic( ?int $pos ): array {
466        $excisedIds = [];
467        $excisedFnames = [];
468        $newTopSection = $this->currentAtomicSectionId();
469        if ( $pos !== null ) {
470            // Remove all descendant sections and re-index the array
471            $len = count( $this->trxAtomicLevels );
472            for ( $i = $pos + 1; $i < $len; ++$i ) {
473                $excisedFnames[] = $this->trxAtomicLevels[$i][0];
474                $excisedIds[] = $this->trxAtomicLevels[$i][1];
475            }
476            $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
477            $newTopSection = $this->currentAtomicSectionId();
478        }
479
480        // Check if the current section matches $fname
481        $pos = count( $this->trxAtomicLevels ) - 1;
482        [ $savedFname, $savedSectionId, $savepointId ] = $this->trxAtomicLevels[$pos];
483
484        if ( $excisedFnames ) {
485            $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname" .
486                "and descendants " . implode( ', ', $excisedFnames ),
487                [ 'db_log_category' => 'trx' ]
488            );
489        } else {
490            $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname)",
491                [ 'db_log_category' => 'trx' ]
492            );
493        }
494
495        return [ $savedFname, $excisedIds, $newTopSection, $savedSectionId, $savepointId ];
496    }
497
498    /**
499     * @return bool Whether no levels remain and transaction was started by a popped level
500     */
501    public function popAtomicLevel() {
502        array_pop( $this->trxAtomicLevels );
503
504        return !$this->trxAtomicLevels && $this->trxAutomaticAtomic;
505    }
506
507    public function setAutomaticAtomic( bool $value ) {
508        $this->trxAutomaticAtomic = $value;
509    }
510
511    public function turnOnAutomatic() {
512        $this->trxAutomatic = true;
513    }
514
515    public function nextSavePointId( IDatabase $db, string $fname ): string {
516        $savepointId = self::SAVEPOINT_PREFIX . ++$this->trxAtomicCounter;
517        if ( strlen( $savepointId ) > 30 ) {
518            // 30 == Oracle's identifier length limit (pre 12c)
519            // With a 22 character prefix, that puts the highest number at 99999999.
520            throw new DBUnexpectedError(
521                $db,
522                'There have been an excessively large number of atomic sections in a transaction'
523                . " started by $this->trxFname (at $fname)"
524            );
525        }
526
527        return $savepointId;
528    }
529
530    public function writesPending(): bool {
531        return $this->trxLevel() && $this->trxWriteCallers;
532    }
533
534    public function onDestruct() {
535        if ( $this->trxLevel() && $this->trxWriteCallers ) {
536            trigger_error( "Uncommitted DB writes (transaction from {$this->trxFname})" );
537        }
538    }
539
540    public function transactionWritingIn( string $serverName, ?string $domainId, float $startTime ) {
541        if ( !$this->trxWriteCallers ) {
542            $this->profiler->transactionWritingIn(
543                $serverName,
544                $domainId,
545                (string)$this->trxId,
546                $startTime
547            );
548        }
549    }
550
551    public function transactionWritingOut( IDatabase $db, string $oldId ) {
552        if ( $this->trxWriteCallers ) {
553            $this->profiler->transactionWritingOut(
554                $db->getServerName(),
555                $db->getDomainID(),
556                $oldId,
557                $this->pendingWriteQueryDuration( IDatabase::ESTIMATE_TOTAL ),
558                $this->trxWriteAffectedRows
559            );
560        }
561    }
562
563    /**
564     * @param string|GeneralizedSql $sql
565     * @param float $startTime
566     * @param bool $isPermWrite
567     * @param int|null $rowCount
568     * @param string|null $serverName
569     */
570    public function recordQueryCompletion( $sql, $startTime, $isPermWrite, $rowCount, $serverName ) {
571        $this->profiler->recordQueryCompletion(
572            $sql,
573            $startTime,
574            $isPermWrite,
575            $rowCount,
576            (string)$this->trxId,
577            $serverName
578        );
579    }
580
581    public function onTransactionResolution( IDatabase $db, callable $callback, string $fname ) {
582        if ( !$this->trxLevel() ) {
583            throw new DBUnexpectedError( $db, "No transaction is active" );
584        }
585        $this->trxEndCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
586    }
587
588    public function addPostCommitOrIdleCallback( callable $callback, string $fname ) {
589        $this->trxPostCommitOrIdleCallbacks[] = [
590            $callback,
591            $fname,
592            $this->currentAtomicSectionId()
593        ];
594    }
595
596    final public function addPreCommitOrIdleCallback( callable $callback, string $fname ) {
597        $this->trxPreCommitOrIdleCallbacks[] = [
598            $callback,
599            $fname,
600            $this->currentAtomicSectionId()
601        ];
602    }
603
604    public function setTransactionListener( string $name, ?callable $callback = null ) {
605        if ( $callback ) {
606            $this->trxRecurringCallbacks[$name] = $callback;
607        } else {
608            unset( $this->trxRecurringCallbacks[$name] );
609        }
610    }
611
612    /**
613     * Whether to disable running of post-COMMIT/ROLLBACK callbacks
614     * @param bool $suppress
615     */
616    public function setTrxEndCallbackSuppression( bool $suppress ) {
617        $this->trxEndCallbacksSuppressed = $suppress;
618    }
619
620    /**
621     * Hoist callback ownership for callbacks in a section to a parent section.
622     * All callbacks should have an owner that is present in trxAtomicLevels.
623     * @param AtomicSectionIdentifier $old
624     * @param AtomicSectionIdentifier $new
625     */
626    public function reassignCallbacksForSection(
627        AtomicSectionIdentifier $old,
628        AtomicSectionIdentifier $new
629    ) {
630        foreach ( $this->trxPreCommitOrIdleCallbacks as $key => $info ) {
631            if ( $info[2] === $old ) {
632                $this->trxPreCommitOrIdleCallbacks[$key][2] = $new;
633            }
634        }
635        foreach ( $this->trxPostCommitOrIdleCallbacks as $key => $info ) {
636            if ( $info[2] === $old ) {
637                $this->trxPostCommitOrIdleCallbacks[$key][2] = $new;
638            }
639        }
640        foreach ( $this->trxEndCallbacks as $key => $info ) {
641            if ( $info[2] === $old ) {
642                $this->trxEndCallbacks[$key][2] = $new;
643            }
644        }
645    }
646
647    /**
648     * Update callbacks that were owned by cancelled atomic sections.
649     *
650     * Callbacks for "on commit" should never be run if they're owned by a
651     * section that won't be committed.
652     *
653     * Callbacks for "on resolution" need to reflect that the section was
654     * rolled back, even if the transaction as a whole commits successfully.
655     *
656     * Callbacks for "on section cancel" should already have been consumed,
657     * but errors during the cancellation itself can prevent that while still
658     * destroying the section. Hoist any such callbacks to the new top section,
659     * which we assume will itself have to be cancelled or rolled back to
660     * resolve the error.
661     *
662     * @param AtomicSectionIdentifier[] $excisedSectionsId Cancelled section IDs
663     * @param AtomicSectionIdentifier|null $newSectionId New top section ID
664     * @throws UnexpectedValueException
665     */
666    public function modifyCallbacksForCancel(
667        array $excisedSectionsId,
668        ?AtomicSectionIdentifier $newSectionId = null
669    ) {
670        // Cancel the "on commit" callbacks owned by this savepoint
671        $this->trxPostCommitOrIdleCallbacks = array_filter(
672            $this->trxPostCommitOrIdleCallbacks,
673            static function ( $entry ) use ( $excisedSectionsId ) {
674                return !in_array( $entry[2], $excisedSectionsId, true );
675            }
676        );
677        $this->trxPreCommitOrIdleCallbacks = array_filter(
678            $this->trxPreCommitOrIdleCallbacks,
679            static function ( $entry ) use ( $excisedSectionsId ) {
680                return !in_array( $entry[2], $excisedSectionsId, true );
681            }
682        );
683        // Make "on resolution" callbacks owned by this savepoint to perceive a rollback
684        foreach ( $this->trxEndCallbacks as $key => $entry ) {
685            if ( in_array( $entry[2], $excisedSectionsId, true ) ) {
686                $callback = $entry[0];
687                $this->trxEndCallbacks[$key][0] = static function () use ( $callback ) {
688                    return $callback( IDatabase::TRIGGER_ROLLBACK );
689                };
690                // This "on resolution" callback no longer belongs to a section.
691                $this->trxEndCallbacks[$key][2] = null;
692            }
693        }
694    }
695
696    public function consumeEndCallbacks(): array {
697        $callbackEntries = array_merge(
698            $this->trxPostCommitOrIdleCallbacks,
699            $this->trxEndCallbacks
700        );
701        $this->trxPostCommitOrIdleCallbacks = []; // consumed (and recursion guard)
702        $this->trxEndCallbacks = []; // consumed (recursion guard)
703
704        return $callbackEntries;
705    }
706
707    /**
708     * Consume and run any "on transaction pre-commit" callbacks
709     *
710     * @return int Number of callbacks attempted
711     * @throws Throwable Any exception thrown by a callback
712     */
713    public function runOnTransactionPreCommitCallbacks(): int {
714        $count = 0;
715
716        // Drain the queues of transaction "precommit" callbacks until it is empty
717        do {
718            $callbackEntries = $this->trxPreCommitOrIdleCallbacks;
719            $this->trxPreCommitOrIdleCallbacks = []; // consumed (and recursion guard)
720            $count += count( $callbackEntries );
721            foreach ( $callbackEntries as $entry ) {
722                try {
723                    $entry[0]();
724                } catch ( Throwable $trxError ) {
725                    $this->setTransactionError( $trxError );
726                    throw $trxError;
727                }
728            }
729            // @phan-suppress-next-line PhanImpossibleConditionInLoop
730        } while ( $this->trxPreCommitOrIdleCallbacks );
731
732        return $count;
733    }
734
735    public function clearPreEndCallbacks() {
736        $this->trxPostCommitOrIdleCallbacks = [];
737        $this->trxPreCommitOrIdleCallbacks = [];
738    }
739
740    public function clearEndCallbacks() {
741        $this->trxEndCallbacks = []; // don't copy
742    }
743
744    public function writesOrCallbacksPending(): bool {
745        return $this->trxLevel() && (
746                $this->trxWriteCallers ||
747                $this->trxPostCommitOrIdleCallbacks ||
748                $this->trxPreCommitOrIdleCallbacks ||
749                $this->trxEndCallbacks
750            );
751    }
752
753    /**
754     * List the methods that have write queries or callbacks for the current transaction
755     *
756     * @return string[]
757     */
758    public function pendingWriteAndCallbackCallers(): array {
759        $fnames = $this->pendingWriteCallers();
760        foreach ( [
761            $this->trxPostCommitOrIdleCallbacks,
762            $this->trxPreCommitOrIdleCallbacks,
763            $this->trxEndCallbacks
764        ] as $callbacks ) {
765            foreach ( $callbacks as $callback ) {
766                $fnames[] = $callback[1];
767            }
768        }
769
770        return $fnames;
771    }
772
773    /**
774     * List the methods that have precommit callbacks for the current transaction
775     *
776     * @return string[]
777     */
778    public function pendingPreCommitCallbackCallers(): array {
779        $fnames = $this->pendingWriteCallers();
780        foreach ( $this->trxPreCommitOrIdleCallbacks as $callback ) {
781            $fnames[] = $callback[1];
782        }
783
784        return $fnames;
785    }
786
787    public function isEndCallbacksSuppressed(): bool {
788        return $this->trxEndCallbacksSuppressed;
789    }
790
791    public function getRecurringCallbacks(): array {
792        return $this->trxRecurringCallbacks;
793    }
794
795    public function countPostCommitOrIdleCallbacks(): int {
796        return count( $this->trxPostCommitOrIdleCallbacks );
797    }
798
799    /**
800     * @param string $mode One of IDatabase::TRANSACTION_* values
801     * @param string $fname method name
802     * @param float $rtt Trivial query round-trip-delay
803     */
804    public function onBeginInCriticalSection( $mode, $fname, $rtt ) {
805        $this->trxId = new TransactionIdentifier();
806        $this->setTrxStatusToOk();
807        $this->resetTrxAtomicLevels();
808        $this->trxWriteCallers = [];
809        $this->trxWriteDuration = 0.0;
810        $this->trxWriteQueryCount = 0;
811        $this->trxWriteAffectedRows = 0;
812        $this->trxWriteAdjDuration = 0.0;
813        $this->trxWriteAdjQueryCount = 0;
814        // T147697: make explicitTrxActive() return true until begin() finishes. This way,
815        // no caller triggered by getApproximateLagStatus() will think its OK to muck around
816        // with the transaction just because startAtomic() has not yet finished updating the
817        // tracking fields (e.g. trxAtomicLevels).
818        $this->trxAutomatic = ( $mode === IDatabase::TRANSACTION_INTERNAL );
819        $this->trxAutomaticAtomic = false;
820        $this->trxFname = $fname;
821        $this->trxTimestamp = microtime( true );
822        $this->trxRoundTripDelay = $rtt;
823    }
824
825    public function onRollbackInCriticalSection( IDatabase $db ) {
826        $oldTrxId = $this->consumeTrxId();
827        $this->setTrxStatusToNone();
828        $this->resetTrxAtomicLevels();
829        $this->clearPreEndCallbacks();
830        $this->transactionWritingOut( $db, (string)$oldTrxId );
831    }
832
833    public function onCommitInCriticalSection( IDatabase $db ): ?float {
834        $lastWriteTime = null;
835
836        $oldTrxId = $this->consumeTrxId();
837        $this->setTrxStatusToNone();
838        if ( $this->trxWriteCallers ) {
839            $lastWriteTime = microtime( true );
840            $this->transactionWritingOut( $db, (string)$oldTrxId );
841        }
842
843        return $lastWriteTime;
844    }
845
846    public function onSessionLoss( IDatabase $db ) {
847        $oldTrxId = $this->consumeTrxId();
848        $this->clearPreEndCallbacks();
849        $this->transactionWritingOut( $db, (string)$oldTrxId );
850    }
851
852    public function onEndAtomicInCriticalSection( AtomicSectionIdentifier $sectionId ) {
853        // Hoist callback ownership for callbacks in the section that just ended;
854        // all callbacks should have an owner that is present in trxAtomicLevels.
855        $currentSectionId = $this->currentAtomicSectionId();
856        if ( $currentSectionId ) {
857            $this->reassignCallbacksForSection( $sectionId, $currentSectionId );
858        }
859    }
860
861    public function onFlushSnapshot( IDatabase $db, string $fname, string $flush, ?string $trxRoundFname ) {
862        if ( $this->explicitTrxActive() ) {
863            // Committing this transaction would break callers that assume it is still open
864            throw new DBUnexpectedError(
865                $db,
866                "$fname: Cannot flush snapshot; " .
867                "explicit transaction '{$this->trxFname}' is still open"
868            );
869        } elseif ( $this->writesOrCallbacksPending() ) {
870            // This only flushes transactions to clear snapshots, not to write data
871            $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() );
872            throw new DBUnexpectedError(
873                $db,
874                "$fname: Cannot flush snapshot; " .
875                "writes from transaction {$this->trxFname} are still pending ($fnames)"
876            );
877        } elseif (
878            $this->trxLevel() &&
879            $trxRoundFname !== null &&
880            $flush !== IDatabase::FLUSHING_INTERNAL &&
881            $flush !== IDatabase::FLUSHING_ALL_PEERS
882        ) {
883            $this->logger->warning(
884                "$fname: Expected mass snapshot flush of all peer transactions " .
885                "in the explicit transactions round '{$trxRoundFname}'",
886                [
887                    'exception' => new RuntimeException(),
888                    'db_log_category' => 'trx'
889                ]
890            );
891        }
892    }
893
894    public function onGetScopedLockAndFlush( IDatabase $db, string $fname ) {
895        if ( $this->writesOrCallbacksPending() ) {
896            // This only flushes transactions to clear snapshots, not to write data
897            $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() );
898            throw new DBUnexpectedError(
899                $db,
900                "$fname: Cannot flush pre-lock snapshot; " .
901                "writes from transaction {$this->trxFname} are still pending ($fnames)"
902            );
903        }
904    }
905}