Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 364
0.00% covered (danger)
0.00%
0 / 64
CRAP
0.00% covered (danger)
0.00%
0 / 1
TransactionManager
0.00% covered (danger)
0.00%
0 / 364
0.00% covered (danger)
0.00%
0 / 64
21756
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 trxLevel
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 getTrxId
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 consumeTrxId
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 trxTimestamp
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 trxStatus
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setTrxStatusToOk
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 setTrxStatusToNone
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 assertTransactionStatus
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
20
 assertSessionStatus
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 setTransactionError
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 setTrxStatusIgnoredCause
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 sessionStatus
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 setSessionError
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 clearSessionError
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 calculateLastTrxApplyTime
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 pendingWriteCallers
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 updateTrxWriteQueryReport
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
30
 pendingWriteQueryDuration
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 flatAtomicSectionList
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 resetTrxAtomicLevels
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 explicitTrxActive
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
12
 trxCheckBeforeClose
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
42
 onCancelAtomicBeforeCriticalSection
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
12
 currentAtomicSectionId
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 addToAtomicLevels
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 onBegin
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
20
 onCommit
0.00% covered (danger)
0.00%
0 / 29
0.00% covered (danger)
0.00%
0 / 1
90
 onEndAtomic
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
20
 getPositionFromSectionId
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 cancelAtomic
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
20
 popAtomicLevel
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 setAutomaticAtomic
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 turnOnAutomatic
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 nextSavePointId
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 writesPending
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 onDestruct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
12
 transactionWritingIn
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 transactionWritingOut
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 recordQueryCompletion
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
2
 onTransactionResolution
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 addPostCommitOrIdleCallback
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 addPreCommitOrIdleCallback
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 setTransactionListener
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 setTrxEndCallbackSuppression
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 reassignCallbacksForSection
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
56
 modifyCallbacksForCancel
0.00% covered (danger)
0.00%
0 / 19
0.00% covered (danger)
0.00%
0 / 1
12
 consumeEndCallbacks
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
2
 runOnTransactionPreCommitCallbacks
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
12
 clearPreEndCallbacks
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 clearEndCallbacks
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 writesOrCallbacksPending
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
30
 pendingWriteAndCallbackCallers
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 pendingPreCommitCallbackCallers
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 isEndCallbacksSuppressed
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getRecurringCallbacks
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 countPostCommitOrIdleCallbacks
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 onBeginInCriticalSection
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
2
 onRollbackInCriticalSection
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 onCommitInCriticalSection
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 onSessionLoss
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 onEndAtomicInCriticalSection
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 onFlushSnapshot
0.00% covered (danger)
0.00%
0 / 25
0.00% covered (danger)
0.00%
0 / 1
56
 onGetScopedLockAndFlush
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20namespace Wikimedia\Rdbms;
21
22use Psr\Log\LoggerInterface;
23use Psr\Log\NullLogger;
24use RuntimeException;
25use Throwable;
26use UnexpectedValueException;
27
28/**
29 * @ingroup Database
30 * @internal This class should not be used outside of Database
31 */
32class TransactionManager {
33    /** Transaction is in a error state requiring a full or savepoint rollback */
34    public const STATUS_TRX_ERROR = 1;
35    /** Transaction is active and in a normal state */
36    public const STATUS_TRX_OK = 2;
37    /** No transaction is active */
38    public const STATUS_TRX_NONE = 3;
39
40    /** Session is in a error state requiring a reset */
41    public const STATUS_SESS_ERROR = 1;
42    /** Session is in a normal state */
43    public const STATUS_SESS_OK = 2;
44
45    /** @var float Guess of how many seconds it takes to replicate a small insert */
46    private const TINY_WRITE_SEC = 0.010;
47    /** @var float Consider a write slow if it took more than this many seconds */
48    private const SLOW_WRITE_SEC = 0.500;
49    /** Assume an insert of this many rows or less should be fast to replicate */
50    private const SMALL_WRITE_ROWS = 100;
51
52    /** @var string Prefix to the atomic section counter used to make savepoint IDs */
53    private const SAVEPOINT_PREFIX = 'wikimedia_rdbms_atomic';
54
55    /** @var ?TransactionIdentifier Application-side ID of the active (server-side) transaction */
56    private $trxId;
57    /** @var ?float UNIX timestamp of BEGIN for the last transaction */
58    private $trxTimestamp = null;
59    /** @var ?float Round trip time estimate for queries during the last transaction */
60    private $trxRoundTripDelay = null;
61    /** @var int STATUS_TRX_* constant indicating the transaction lifecycle state */
62    private $trxStatus = self::STATUS_TRX_NONE;
63    /** @var ?Throwable The cause of any unresolved transaction lifecycle state error */
64    private $trxStatusCause;
65    /** @var ?array Details of any unresolved statement-rollback error within a transaction */
66    private $trxStatusIgnoredCause;
67
68    /** @var ?Throwable The cause of any unresolved session state loss error */
69    private $sessionError;
70
71    /** @var string[] Write query callers of the last transaction */
72    private $trxWriteCallers = [];
73    /** @var float Seconds spent in write queries for the last transaction */
74    private $trxWriteDuration = 0.0;
75    /** @var int Number of write queries for the last transaction */
76    private $trxWriteQueryCount = 0;
77    /** @var int Number of rows affected by write queries for the last transaction */
78    private $trxWriteAffectedRows = 0;
79    /** @var float Like trxWriteQueryCount but excludes lock-bound, easy to replicate, queries */
80    private $trxWriteAdjDuration = 0.0;
81    /** @var int Number of write queries counted in trxWriteAdjDuration */
82    private $trxWriteAdjQueryCount = 0;
83
84    /** @var array Pending atomic sections; list of (name, unique ID, savepoint ID) */
85    private $trxAtomicLevels = [];
86    /** @var bool Whether the last transaction was started implicitly due to DBO_TRX */
87    private $trxAutomatic = false;
88    /** @var bool Whether the last transaction was started implicitly by startAtomic() */
89    private $trxAutomaticAtomic = false;
90
91    /** @var ?string Name of the function that started the last transaction */
92    private $trxFname = null;
93    /** @var int Counter for atomic savepoint identifiers for the last transaction */
94    private $trxAtomicCounter = 0;
95
96    /**
97     * @var array[] Pending postcommit callbacks; list of (callable, method name, atomic section id)
98     * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}>
99     */
100    private $trxPostCommitOrIdleCallbacks = [];
101    /**
102     * @var array[] Pending precommit callbacks; list of (callable, method name, atomic section id)
103     * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}>
104     */
105    private $trxPreCommitOrIdleCallbacks = [];
106    /**
107     * @var array[] Pending post-trx callbacks; list of (callable, method name, atomic section id)
108     * @phan-var array<array{0:callable,1:string,2:AtomicSectionIdentifier|null}>
109     */
110    private $trxEndCallbacks = [];
111    /** @var callable[] Listener callbacks; map of (name => callable) */
112    private $trxRecurringCallbacks = [];
113    /** @var bool Whether to suppress triggering of transaction end callbacks */
114    private $trxEndCallbacksSuppressed = false;
115
116    /** @var LoggerInterface */
117    private $logger;
118    /** @var TransactionProfiler */
119    private $profiler;
120
121    public function __construct( ?LoggerInterface $logger = null, $profiler = null ) {
122        $this->logger = $logger ?? new NullLogger();
123        $this->profiler = $profiler ?? new TransactionProfiler();
124    }
125
126    public function trxLevel() {
127        return $this->trxId ? 1 : 0;
128    }
129
130    /**
131     * Get the application-side transaction identifier instance
132     *
133     * @return ?TransactionIdentifier Token for the active transaction; null if there isn't one
134     */
135    public function getTrxId() {
136        return $this->trxId;
137    }
138
139    /**
140     * Reset the application-side transaction identifier instance and return the old one
141     *
142     * @return ?TransactionIdentifier The old transaction token; null if there wasn't one
143     */
144    private function consumeTrxId() {
145        $old = $this->trxId;
146        $this->trxId = null;
147
148        return $old;
149    }
150
151    public function trxTimestamp(): ?float {
152        return $this->trxLevel() ? $this->trxTimestamp : null;
153    }
154
155    /**
156     * @return int One of the STATUS_TRX_* class constants
157     */
158    public function trxStatus(): int {
159        return $this->trxStatus;
160    }
161
162    public function setTrxStatusToOk() {
163        $this->trxStatus = self::STATUS_TRX_OK;
164        $this->trxStatusCause = null;
165        $this->trxStatusIgnoredCause = null;
166    }
167
168    public function setTrxStatusToNone() {
169        $this->trxStatus = self::STATUS_TRX_NONE;
170        $this->trxStatusCause = null;
171        $this->trxStatusIgnoredCause = null;
172    }
173
174    public function assertTransactionStatus( IDatabase $db, $deprecationLogger, $fname ) {
175        if ( $this->trxStatus === self::STATUS_TRX_ERROR ) {
176            throw new DBTransactionStateError(
177                $db,
178                "Cannot execute query from $fname while transaction status is ERROR",
179                [],
180                $this->trxStatusCause
181            );
182        } elseif ( $this->trxStatus === self::STATUS_TRX_OK && $this->trxStatusIgnoredCause ) {
183            [ $iLastError, $iLastErrno, $iFname ] = $this->trxStatusIgnoredCause;
184            $deprecationLogger(
185                "Caller from $fname ignored an error originally raised from $iFname" .
186                "[$iLastErrno$iLastError"
187            );
188            $this->trxStatusIgnoredCause = null;
189        }
190    }
191
192    public function assertSessionStatus( IDatabase $db, $fname ) {
193        if ( $this->sessionError ) {
194            throw new DBSessionStateError(
195                $db,
196                "Cannot execute query from $fname while session status is ERROR",
197                [],
198                $this->sessionError
199            );
200        }
201    }
202
203    /**
204     * Mark the transaction as requiring rollback (STATUS_TRX_ERROR) due to an error
205     */
206    public function setTransactionError( Throwable $trxError ) {
207        if ( $this->trxStatus !== self::STATUS_TRX_ERROR ) {
208            $this->trxStatus = self::STATUS_TRX_ERROR;
209            $this->trxStatusCause = $trxError;
210        }
211    }
212
213    public function setTrxStatusIgnoredCause( ?array $trxStatusIgnoredCause ): void {
214        $this->trxStatusIgnoredCause = $trxStatusIgnoredCause;
215    }
216
217    /**
218     * Get the status of the current session (ephemeral server-side state tied to the connection)
219     *
220     * @return int One of the STATUS_SESSION_* class constants
221     */
222    public function sessionStatus() {
223        // Check if an unresolved error still exists
224        return ( $this->sessionError ) ? self::STATUS_SESS_ERROR : self::STATUS_SESS_OK;
225    }
226
227    /**
228     * Flag the session as needing a reset due to an error, if not already flagged
229     */
230    public function setSessionError( Throwable $sessionError ) {
231        $this->sessionError ??= $sessionError;
232    }
233
234    /**
235     * Unflag the session as needing a reset due to an error
236     */
237    public function clearSessionError() {
238        $this->sessionError = null;
239    }
240
241    /**
242     * @param float $rtt
243     * @return float Time to apply writes to replicas based on trxWrite* fields
244     */
245    private function calculateLastTrxApplyTime( float $rtt ) {
246        $rttAdjTotal = $this->trxWriteAdjQueryCount * $rtt;
247        $applyTime = max( $this->trxWriteAdjDuration - $rttAdjTotal, 0.0 );
248        // For omitted queries, make them count as something at least
249        $omitted = $this->trxWriteQueryCount - $this->trxWriteAdjQueryCount;
250        $applyTime += self::TINY_WRITE_SEC * $omitted;
251
252        return $applyTime;
253    }
254
255    public function pendingWriteCallers() {
256        return $this->trxLevel() ? $this->trxWriteCallers : [];
257    }
258
259    /**
260     * Update the estimated run-time of a query, not counting large row lock times
261     *
262     * LoadBalancer can be set to rollback transactions that will create huge replication
263     * lag. It bases this estimate off of pendingWriteQueryDuration(). Certain simple
264     * queries, like inserting a row can take a long time due to row locking. This method
265     * uses some simple heuristics to discount those cases.
266     *
267     * @param string $queryVerb action in the write query
268     * @param float $runtime Total runtime, including RTT
269     * @param int $affected Affected row count
270     * @param string $fname method name invoking the action
271     */
272    public function updateTrxWriteQueryReport( $queryVerb, $runtime, $affected, $fname ) {
273        // Whether this is indicative of replica DB runtime (except for RBR or ws_repl)
274        $indicativeOfReplicaRuntime = true;
275        if ( $runtime > self::SLOW_WRITE_SEC ) {
276            // insert(), upsert(), replace() are fast unless bulky in size or blocked on locks
277            if ( $queryVerb === 'INSERT' ) {
278                $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS;
279            } elseif ( $queryVerb === 'REPLACE' ) {
280                $indicativeOfReplicaRuntime = $affected > self::SMALL_WRITE_ROWS / 2;
281            }
282        }
283
284        $this->trxWriteDuration += $runtime;
285        $this->trxWriteQueryCount++;
286        $this->trxWriteAffectedRows += $affected;
287        if ( $indicativeOfReplicaRuntime ) {
288            $this->trxWriteAdjDuration += $runtime;
289            $this->trxWriteAdjQueryCount++;
290        }
291
292        $this->trxWriteCallers[] = $fname;
293    }
294
295    public function pendingWriteQueryDuration( $type = IDatabase::ESTIMATE_TOTAL ) {
296        if ( !$this->trxLevel() ) {
297            return false;
298        } elseif ( !$this->trxWriteCallers ) {
299            return 0.0;
300        }
301
302        if ( $type === IDatabase::ESTIMATE_DB_APPLY ) {
303            return $this->calculateLastTrxApplyTime( $this->trxRoundTripDelay );
304        }
305
306        return $this->trxWriteDuration;
307    }
308
309    /**
310     * @return string
311     */
312    private function flatAtomicSectionList() {
313        return array_reduce( $this->trxAtomicLevels, static function ( $accum, $v ) {
314            return $accum === null ? $v[0] : "$accum" . $v[0];
315        } );
316    }
317
318    public function resetTrxAtomicLevels() {
319        $this->trxAtomicLevels = [];
320        $this->trxAtomicCounter = 0;
321    }
322
323    public function explicitTrxActive() {
324        return $this->trxLevel() && ( $this->trxAtomicLevels || !$this->trxAutomatic );
325    }
326
327    public function trxCheckBeforeClose( IDatabaseForOwner $db, $fname ) {
328        $error = null;
329        if ( $this->trxAtomicLevels ) {
330            // Cannot let incomplete atomic sections be committed
331            $levels = $this->flatAtomicSectionList();
332            $error = "$fname: atomic sections $levels are still open";
333        } elseif ( $this->trxAutomatic ) {
334            // Only the connection manager can commit non-empty DBO_TRX transactions
335            // (empty ones we can silently roll back)
336            if ( $db->writesOrCallbacksPending() ) {
337                $error = "$fname" .
338                    "expected mass rollback of all peer transactions (DBO_TRX set)";
339            }
340        } else {
341            // Manual transactions should have been committed or rolled
342            // back, even if empty.
343            $error = "$fname: transaction is still open (from {$this->trxFname})";
344        }
345
346        if ( $this->trxEndCallbacksSuppressed && $error === null ) {
347            $error = "$fname: callbacks are suppressed; cannot properly commit";
348        }
349
350        return $error;
351    }
352
353    public function onCancelAtomicBeforeCriticalSection( IDatabase $db, $fname ): void {
354        if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
355            throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" );
356        }
357    }
358
359    /**
360     * @return AtomicSectionIdentifier|null ID of the topmost atomic section level
361     */
362    public function currentAtomicSectionId(): ?AtomicSectionIdentifier {
363        if ( $this->trxLevel() && $this->trxAtomicLevels ) {
364            $levelInfo = end( $this->trxAtomicLevels );
365
366            return $levelInfo[1];
367        }
368
369        return null;
370    }
371
372    public function addToAtomicLevels( $fname, AtomicSectionIdentifier $sectionId, $savepointId ) {
373        $this->trxAtomicLevels[] = [ $fname, $sectionId, $savepointId ];
374        $this->logger->debug( 'startAtomic: entering level ' .
375            ( count( $this->trxAtomicLevels ) - 1 ) . " ($fname)", [ 'db_log_category' => 'trx' ] );
376    }
377
378    public function onBegin( IDatabase $db, $fname ): void {
379        // Protect against mismatched atomic section, transaction nesting, and snapshot loss
380        if ( $this->trxLevel() ) {
381            if ( $this->trxAtomicLevels ) {
382                $levels = $this->flatAtomicSectionList();
383                $msg = "$fname: got explicit BEGIN while atomic section(s) $levels are open";
384                throw new DBUnexpectedError( $db, $msg );
385            } elseif ( !$this->trxAutomatic ) {
386                $msg = "$fname: explicit transaction already active (from {$this->trxFname})";
387                throw new DBUnexpectedError( $db, $msg );
388            } else {
389                $msg = "$fname: implicit transaction already active (from {$this->trxFname})";
390                throw new DBUnexpectedError( $db, $msg );
391            }
392        }
393    }
394
395    /**
396     * @param IDatabase $db
397     * @param string $fname
398     * @param string $flush one of IDatabase::FLUSHING_* values
399     * @return bool false if the commit should go aborted, true otherwise.
400     */
401    public function onCommit( IDatabase $db, $fname, $flush ): bool {
402        if ( $this->trxLevel() && $this->trxAtomicLevels ) {
403            // There are still atomic sections open; this cannot be ignored
404            $levels = $this->flatAtomicSectionList();
405            throw new DBUnexpectedError(
406                $db,
407                "$fname: got COMMIT while atomic sections $levels are still open"
408            );
409        }
410
411        if ( $flush === IDatabase::FLUSHING_INTERNAL || $flush === IDatabase::FLUSHING_ALL_PEERS ) {
412            if ( !$this->trxLevel() ) {
413                return false; // nothing to do
414            } elseif ( !$this->trxAutomatic ) {
415                throw new DBUnexpectedError(
416                    $db,
417                    "$fname: flushing an explicit transaction, getting out of sync"
418                );
419            }
420        } elseif ( !$this->trxLevel() ) {
421            $this->logger->error(
422                "$fname: no transaction to commit, something got out of sync",
423                [
424                    'exception' => new RuntimeException(),
425                    'db_log_category' => 'trx'
426                ]
427            );
428
429            return false; // nothing to do
430        } elseif ( $this->trxAutomatic ) {
431            throw new DBUnexpectedError(
432                $db,
433                "$fname: expected mass commit of all peer transactions (DBO_TRX set)"
434            );
435        }
436        return true;
437    }
438
439    public function onEndAtomic( IDatabase $db, $fname ): array {
440        if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
441            throw new DBUnexpectedError( $db, "No atomic section is open (got $fname)" );
442        }
443        // Check if the current section matches $fname
444        $pos = count( $this->trxAtomicLevels ) - 1;
445        [ $savedFname, $sectionId, $savepointId ] = $this->trxAtomicLevels[$pos];
446        $this->logger->debug( "endAtomic: leaving level $pos ($fname)", [ 'db_log_category' => 'trx' ] );
447
448        if ( $savedFname !== $fname ) {
449            throw new DBUnexpectedError(
450                $db,
451                "Invalid atomic section ended (got $fname but expected $savedFname)"
452            );
453        }
454
455        return [ $savepointId, $sectionId ];
456    }
457
458    public function getPositionFromSectionId( ?AtomicSectionIdentifier $sectionId = null ): ?int {
459        if ( $sectionId !== null ) {
460            // Find the (last) section with the given $sectionId
461            $pos = -1;
462            foreach ( $this->trxAtomicLevels as $i => [ , $asId, ] ) {
463                if ( $asId === $sectionId ) {
464                    $pos = $i;
465                }
466            }
467        } else {
468            $pos = null;
469        }
470
471        return $pos;
472    }
473
474    public function cancelAtomic( $pos ) {
475        $excisedIds = [];
476        $excisedFnames = [];
477        $newTopSection = $this->currentAtomicSectionId();
478        if ( $pos !== null ) {
479            // Remove all descendant sections and re-index the array
480            $len = count( $this->trxAtomicLevels );
481            for ( $i = $pos + 1; $i < $len; ++$i ) {
482                $excisedFnames[] = $this->trxAtomicLevels[$i][0];
483                $excisedIds[] = $this->trxAtomicLevels[$i][1];
484            }
485            $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
486            $newTopSection = $this->currentAtomicSectionId();
487        }
488
489        // Check if the current section matches $fname
490        $pos = count( $this->trxAtomicLevels ) - 1;
491        [ $savedFname, $savedSectionId, $savepointId ] = $this->trxAtomicLevels[$pos];
492
493        if ( $excisedFnames ) {
494            $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname" .
495                "and descendants " . implode( ', ', $excisedFnames ),
496                [ 'db_log_category' => 'trx' ]
497            );
498        } else {
499            $this->logger->debug( "cancelAtomic: canceling level $pos ($savedFname)",
500                [ 'db_log_category' => 'trx' ]
501            );
502        }
503
504        return [ $savedFname, $excisedIds, $newTopSection, $savedSectionId, $savepointId ];
505    }
506
507    /**
508     * @return bool Whether no levels remain and transaction was started by a popped level
509     */
510    public function popAtomicLevel() {
511        array_pop( $this->trxAtomicLevels );
512
513        return !$this->trxAtomicLevels && $this->trxAutomaticAtomic;
514    }
515
516    public function setAutomaticAtomic( $value ) {
517        $this->trxAutomaticAtomic = $value;
518    }
519
520    public function turnOnAutomatic() {
521        $this->trxAutomatic = true;
522    }
523
524    public function nextSavePointId( IDatabase $db, $fname ) {
525        $savepointId = self::SAVEPOINT_PREFIX . ++$this->trxAtomicCounter;
526        if ( strlen( $savepointId ) > 30 ) {
527            // 30 == Oracle's identifier length limit (pre 12c)
528            // With a 22 character prefix, that puts the highest number at 99999999.
529            throw new DBUnexpectedError(
530                $db,
531                'There have been an excessively large number of atomic sections in a transaction'
532                . " started by $this->trxFname (at $fname)"
533            );
534        }
535
536        return $savepointId;
537    }
538
539    public function writesPending() {
540        return $this->trxLevel() && $this->trxWriteCallers;
541    }
542
543    public function onDestruct() {
544        if ( $this->trxLevel() && $this->trxWriteCallers ) {
545            trigger_error( "Uncommitted DB writes (transaction from {$this->trxFname})" );
546        }
547    }
548
549    public function transactionWritingIn( $serverName, $domainId, float $startTime ) {
550        if ( !$this->trxWriteCallers ) {
551            $this->profiler->transactionWritingIn(
552                $serverName,
553                $domainId,
554                (string)$this->trxId,
555                $startTime
556            );
557        }
558    }
559
560    public function transactionWritingOut( IDatabase $db, $oldId ) {
561        if ( $this->trxWriteCallers ) {
562            $this->profiler->transactionWritingOut(
563                $db->getServerName(),
564                $db->getDomainID(),
565                $oldId,
566                $this->pendingWriteQueryDuration( IDatabase::ESTIMATE_TOTAL ),
567                $this->trxWriteAffectedRows
568            );
569        }
570    }
571
572    public function recordQueryCompletion( $sql, $startTime, $isPermWrite, $rowCount, $serverName ) {
573        $this->profiler->recordQueryCompletion(
574            $sql,
575            $startTime,
576            $isPermWrite,
577            $rowCount,
578            (string)$this->trxId,
579            $serverName
580        );
581    }
582
583    public function onTransactionResolution( IDatabase $db, callable $callback, $fname ) {
584        if ( !$this->trxLevel() ) {
585            throw new DBUnexpectedError( $db, "No transaction is active" );
586        }
587        $this->trxEndCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
588    }
589
590    public function addPostCommitOrIdleCallback( callable $callback, $fname ) {
591        $this->trxPostCommitOrIdleCallbacks[] = [
592            $callback,
593            $fname,
594            $this->currentAtomicSectionId()
595        ];
596    }
597
598    final public function addPreCommitOrIdleCallback( callable $callback, $fname ) {
599        $this->trxPreCommitOrIdleCallbacks[] = [
600            $callback,
601            $fname,
602            $this->currentAtomicSectionId()
603        ];
604    }
605
606    public function setTransactionListener( $name, ?callable $callback = null ) {
607        if ( $callback ) {
608            $this->trxRecurringCallbacks[$name] = $callback;
609        } else {
610            unset( $this->trxRecurringCallbacks[$name] );
611        }
612    }
613
614    /**
615     * Whether to disable running of post-COMMIT/ROLLBACK callbacks
616     * @param bool $suppress
617     */
618    public function setTrxEndCallbackSuppression( bool $suppress ) {
619        $this->trxEndCallbacksSuppressed = $suppress;
620    }
621
622    /**
623     * Hoist callback ownership for callbacks in a section to a parent section.
624     * All callbacks should have an owner that is present in trxAtomicLevels.
625     * @param AtomicSectionIdentifier $old
626     * @param AtomicSectionIdentifier $new
627     */
628    public function reassignCallbacksForSection(
629        AtomicSectionIdentifier $old,
630        AtomicSectionIdentifier $new
631    ) {
632        foreach ( $this->trxPreCommitOrIdleCallbacks as $key => $info ) {
633            if ( $info[2] === $old ) {
634                $this->trxPreCommitOrIdleCallbacks[$key][2] = $new;
635            }
636        }
637        foreach ( $this->trxPostCommitOrIdleCallbacks as $key => $info ) {
638            if ( $info[2] === $old ) {
639                $this->trxPostCommitOrIdleCallbacks[$key][2] = $new;
640            }
641        }
642        foreach ( $this->trxEndCallbacks as $key => $info ) {
643            if ( $info[2] === $old ) {
644                $this->trxEndCallbacks[$key][2] = $new;
645            }
646        }
647    }
648
649    /**
650     * Update callbacks that were owned by cancelled atomic sections.
651     *
652     * Callbacks for "on commit" should never be run if they're owned by a
653     * section that won't be committed.
654     *
655     * Callbacks for "on resolution" need to reflect that the section was
656     * rolled back, even if the transaction as a whole commits successfully.
657     *
658     * Callbacks for "on section cancel" should already have been consumed,
659     * but errors during the cancellation itself can prevent that while still
660     * destroying the section. Hoist any such callbacks to the new top section,
661     * which we assume will itself have to be cancelled or rolled back to
662     * resolve the error.
663     *
664     * @param AtomicSectionIdentifier[] $excisedSectionsId Cancelled section IDs
665     * @param AtomicSectionIdentifier|null $newSectionId New top section ID
666     * @throws UnexpectedValueException
667     */
668    public function modifyCallbacksForCancel(
669        array $excisedSectionsId,
670        ?AtomicSectionIdentifier $newSectionId = null
671    ) {
672        // Cancel the "on commit" callbacks owned by this savepoint
673        $this->trxPostCommitOrIdleCallbacks = array_filter(
674            $this->trxPostCommitOrIdleCallbacks,
675            static function ( $entry ) use ( $excisedSectionsId ) {
676                return !in_array( $entry[2], $excisedSectionsId, true );
677            }
678        );
679        $this->trxPreCommitOrIdleCallbacks = array_filter(
680            $this->trxPreCommitOrIdleCallbacks,
681            static function ( $entry ) use ( $excisedSectionsId ) {
682                return !in_array( $entry[2], $excisedSectionsId, true );
683            }
684        );
685        // Make "on resolution" callbacks owned by this savepoint to perceive a rollback
686        foreach ( $this->trxEndCallbacks as $key => $entry ) {
687            if ( in_array( $entry[2], $excisedSectionsId, true ) ) {
688                $callback = $entry[0];
689                $this->trxEndCallbacks[$key][0] = static function () use ( $callback ) {
690                    return $callback( IDatabase::TRIGGER_ROLLBACK );
691                };
692                // This "on resolution" callback no longer belongs to a section.
693                $this->trxEndCallbacks[$key][2] = null;
694            }
695        }
696    }
697
698    public function consumeEndCallbacks(): array {
699        $callbackEntries = array_merge(
700            $this->trxPostCommitOrIdleCallbacks,
701            $this->trxEndCallbacks
702        );
703        $this->trxPostCommitOrIdleCallbacks = []; // consumed (and recursion guard)
704        $this->trxEndCallbacks = []; // consumed (recursion guard)
705
706        return $callbackEntries;
707    }
708
709    /**
710     * Consume and run any "on transaction pre-commit" callbacks
711     *
712     * @return int Number of callbacks attempted
713     * @throws Throwable Any exception thrown by a callback
714     */
715    public function runOnTransactionPreCommitCallbacks(): int {
716        $count = 0;
717
718        // Drain the queues of transaction "precommit" callbacks until it is empty
719        do {
720            $callbackEntries = $this->trxPreCommitOrIdleCallbacks;
721            $this->trxPreCommitOrIdleCallbacks = []; // consumed (and recursion guard)
722            $count += count( $callbackEntries );
723            foreach ( $callbackEntries as $entry ) {
724                try {
725                    $entry[0]();
726                } catch ( Throwable $trxError ) {
727                    $this->setTransactionError( $trxError );
728                    throw $trxError;
729                }
730            }
731            // @phan-suppress-next-line PhanImpossibleConditionInLoop
732        } while ( $this->trxPreCommitOrIdleCallbacks );
733
734        return $count;
735    }
736
737    public function clearPreEndCallbacks() {
738        $this->trxPostCommitOrIdleCallbacks = [];
739        $this->trxPreCommitOrIdleCallbacks = [];
740    }
741
742    public function clearEndCallbacks() {
743        $this->trxEndCallbacks = []; // don't copy
744    }
745
746    public function writesOrCallbacksPending(): bool {
747        return $this->trxLevel() && (
748                $this->trxWriteCallers ||
749                $this->trxPostCommitOrIdleCallbacks ||
750                $this->trxPreCommitOrIdleCallbacks ||
751                $this->trxEndCallbacks
752            );
753    }
754
755    /**
756     * List the methods that have write queries or callbacks for the current transaction
757     *
758     * @return string[]
759     */
760    public function pendingWriteAndCallbackCallers(): array {
761        $fnames = $this->pendingWriteCallers();
762        foreach ( [
763            $this->trxPostCommitOrIdleCallbacks,
764            $this->trxPreCommitOrIdleCallbacks,
765            $this->trxEndCallbacks
766        ] as $callbacks ) {
767            foreach ( $callbacks as $callback ) {
768                $fnames[] = $callback[1];
769            }
770        }
771
772        return $fnames;
773    }
774
775    /**
776     * List the methods that have precommit callbacks for the current transaction
777     *
778     * @return string[]
779     */
780    public function pendingPreCommitCallbackCallers(): array {
781        $fnames = $this->pendingWriteCallers();
782        foreach ( $this->trxPreCommitOrIdleCallbacks as $callback ) {
783            $fnames[] = $callback[1];
784        }
785
786        return $fnames;
787    }
788
789    public function isEndCallbacksSuppressed(): bool {
790        return $this->trxEndCallbacksSuppressed;
791    }
792
793    public function getRecurringCallbacks() {
794        return $this->trxRecurringCallbacks;
795    }
796
797    public function countPostCommitOrIdleCallbacks() {
798        return count( $this->trxPostCommitOrIdleCallbacks );
799    }
800
801    /**
802     * @param string $mode One of IDatabase::TRANSACTION_* values
803     * @param string $fname method name
804     * @param float $rtt Trivial query round-trip-delay
805     */
806    public function onBeginInCriticalSection( $mode, $fname, $rtt ) {
807        $this->trxId = new TransactionIdentifier();
808        $this->setTrxStatusToOk();
809        $this->resetTrxAtomicLevels();
810        $this->trxWriteCallers = [];
811        $this->trxWriteDuration = 0.0;
812        $this->trxWriteQueryCount = 0;
813        $this->trxWriteAffectedRows = 0;
814        $this->trxWriteAdjDuration = 0.0;
815        $this->trxWriteAdjQueryCount = 0;
816        // T147697: make explicitTrxActive() return true until begin() finishes. This way,
817        // no caller triggered by getApproximateLagStatus() will think its OK to muck around
818        // with the transaction just because startAtomic() has not yet finished updating the
819        // tracking fields (e.g. trxAtomicLevels).
820        $this->trxAutomatic = ( $mode === IDatabase::TRANSACTION_INTERNAL );
821        $this->trxAutomaticAtomic = false;
822        $this->trxFname = $fname;
823        $this->trxTimestamp = microtime( true );
824        $this->trxRoundTripDelay = $rtt;
825    }
826
827    public function onRollbackInCriticalSection( IDatabase $db ) {
828        $oldTrxId = $this->consumeTrxId();
829        $this->setTrxStatusToNone();
830        $this->resetTrxAtomicLevels();
831        $this->clearPreEndCallbacks();
832        $this->transactionWritingOut( $db, (string)$oldTrxId );
833    }
834
835    public function onCommitInCriticalSection( IDatabase $db ) {
836        $lastWriteTime = null;
837
838        $oldTrxId = $this->consumeTrxId();
839        $this->setTrxStatusToNone();
840        if ( $this->trxWriteCallers ) {
841            $lastWriteTime = microtime( true );
842            $this->transactionWritingOut( $db, (string)$oldTrxId );
843        }
844
845        return $lastWriteTime;
846    }
847
848    public function onSessionLoss( IDatabase $db ) {
849        $oldTrxId = $this->consumeTrxId();
850        $this->clearPreEndCallbacks();
851        $this->transactionWritingOut( $db, (string)$oldTrxId );
852    }
853
854    public function onEndAtomicInCriticalSection( $sectionId ) {
855        // Hoist callback ownership for callbacks in the section that just ended;
856        // all callbacks should have an owner that is present in trxAtomicLevels.
857        $currentSectionId = $this->currentAtomicSectionId();
858        if ( $currentSectionId ) {
859            $this->reassignCallbacksForSection( $sectionId, $currentSectionId );
860        }
861    }
862
863    public function onFlushSnapshot( IDatabase $db, $fname, $flush, $trxRoundFname ) {
864        if ( $this->explicitTrxActive() ) {
865            // Committing this transaction would break callers that assume it is still open
866            throw new DBUnexpectedError(
867                $db,
868                "$fname: Cannot flush snapshot; " .
869                "explicit transaction '{$this->trxFname}' is still open"
870            );
871        } elseif ( $this->writesOrCallbacksPending() ) {
872            // This only flushes transactions to clear snapshots, not to write data
873            $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() );
874            throw new DBUnexpectedError(
875                $db,
876                "$fname: Cannot flush snapshot; " .
877                "writes from transaction {$this->trxFname} are still pending ($fnames)"
878            );
879        } elseif (
880            $this->trxLevel() &&
881            $trxRoundFname !== null &&
882            $flush !== IDatabase::FLUSHING_INTERNAL &&
883            $flush !== IDatabase::FLUSHING_ALL_PEERS
884        ) {
885            $this->logger->warning(
886                "$fname: Expected mass snapshot flush of all peer transactions " .
887                "in the explicit transactions round '{$trxRoundFname}'",
888                [
889                    'exception' => new RuntimeException(),
890                    'db_log_category' => 'trx'
891                ]
892            );
893        }
894    }
895
896    public function onGetScopedLockAndFlush( IDatabase $db, $fname ) {
897        if ( $this->writesOrCallbacksPending() ) {
898            // This only flushes transactions to clear snapshots, not to write data
899            $fnames = implode( ', ', $this->pendingWriteAndCallbackCallers() );
900            throw new DBUnexpectedError(
901                $db,
902                "$fname: Cannot flush pre-lock snapshot; " .
903                "writes from transaction {$this->trxFname} are still pending ($fnames)"
904            );
905        }
906    }
907}