Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
43.94% covered (danger)
43.94%
203 / 462
23.53% covered (danger)
23.53%
8 / 34
CRAP
0.00% covered (danger)
0.00%
0 / 1
JobQueueDB
43.94% covered (danger)
43.94%
203 / 462
23.53% covered (danger)
23.53%
8 / 34
1825.81
0.00% covered (danger)
0.00%
0 / 1
 __construct
42.86% covered (danger)
42.86%
3 / 7
0.00% covered (danger)
0.00%
0 / 1
6.99
 supportedOrders
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 optimalOrder
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 doIsEmpty
77.78% covered (warning)
77.78%
7 / 9
0.00% covered (danger)
0.00%
0 / 1
2.04
 doGetSize
76.92% covered (warning)
76.92%
10 / 13
0.00% covered (danger)
0.00%
0 / 1
3.11
 doGetAcquiredCount
77.78% covered (warning)
77.78%
14 / 18
0.00% covered (danger)
0.00%
0 / 1
4.18
 doGetAbandonedCount
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
20
 doBatchPush
100.00% covered (success)
100.00%
11 / 11
100.00% covered (success)
100.00%
1 / 1
1
 doBatchPushInternal
54.76% covered (warning)
54.76%
23 / 42
0.00% covered (danger)
0.00%
0 / 1
19.26
 doPop
76.47% covered (warning)
76.47%
13 / 17
0.00% covered (danger)
0.00%
0 / 1
6.47
 claimRandom
96.49% covered (success)
96.49%
55 / 57
0.00% covered (danger)
0.00%
0 / 1
9
 claimOldest
0.00% covered (danger)
0.00%
0 / 40
0.00% covered (danger)
0.00%
0 / 1
20
 doAck
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
12
 doDeduplicateRootJob
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
2
 doDelete
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 doWaitForBackups
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 doFlushCaches
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 getAllQueuedJobs
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getAllAcquiredJobs
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getAllAbandonedJobs
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
2
 getJobIterator
84.62% covered (warning)
84.62%
11 / 13
0.00% covered (danger)
0.00%
0 / 1
2.01
 getCoalesceLocationInternal
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 doGetSiblingQueuesWithJobs
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 doGetSiblingQueueSizes
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 recycleAndDeleteStaleJobs
0.00% covered (danger)
0.00%
0 / 70
0.00% covered (danger)
0.00%
0 / 1
56
 insertFields
100.00% covered (success)
100.00%
12 / 12
100.00% covered (success)
100.00%
1 / 1
1
 getReplicaDB
33.33% covered (danger)
33.33%
1 / 3
0.00% covered (danger)
0.00%
0 / 1
3.19
 getPrimaryDB
33.33% covered (danger)
33.33%
1 / 3
0.00% covered (danger)
0.00%
0 / 1
3.19
 getDB
33.33% covered (danger)
33.33%
7 / 21
0.00% covered (danger)
0.00%
0 / 1
21.52
 getCacheKey
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
2
 makeBlob
66.67% covered (warning)
66.67%
2 / 3
0.00% covered (danger)
0.00%
0 / 1
2.15
 jobFromRow
77.78% covered (warning)
77.78%
7 / 9
0.00% covered (danger)
0.00%
0 / 1
3.10
 getDBException
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 selectFields
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
1
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20use MediaWiki\MediaWikiServices;
21use Wikimedia\Rdbms\DBConnectionError;
22use Wikimedia\Rdbms\DBError;
23use Wikimedia\Rdbms\IDatabase;
24use Wikimedia\Rdbms\IMaintainableDatabase;
25use Wikimedia\Rdbms\IReadableDatabase;
26use Wikimedia\Rdbms\RawSQLValue;
27use Wikimedia\Rdbms\SelectQueryBuilder;
28use Wikimedia\Rdbms\ServerInfo;
29use Wikimedia\ScopedCallback;
30
31/**
32 * Database-backed job queue storage.
33 *
34 * @since 1.21
35 * @ingroup JobQueue
36 */
37class JobQueueDB extends JobQueue {
38    /* seconds to cache info without re-validating */
39    private const CACHE_TTL_SHORT = 30;
40    /* seconds a job can live once claimed */
41    private const MAX_AGE_PRUNE = 7 * 24 * 3600;
42    /**
43     * Used for job_random, the highest safe 32-bit signed integer.
44     * Equivalent to `( 2 ** 31 ) - 1` on 64-bit.
45     */
46    private const MAX_JOB_RANDOM = 2_147_483_647;
47    /* maximum number of rows to skip */
48    private const MAX_OFFSET = 255;
49
50    /** @var IMaintainableDatabase|DBError|null */
51    protected $conn;
52
53    /** @var array|null Server configuration array */
54    protected $server;
55    /** @var string|null Name of an external DB cluster or null for the local DB cluster */
56    protected $cluster;
57
58    /**
59     * Additional parameters include:
60     *   - server  : Server configuration array for Database::factory. Overrides "cluster".
61     *   - cluster : The name of an external cluster registered via LBFactory.
62     *               If not specified, the primary DB cluster for the wiki will be used.
63     *               This can be overridden with a custom cluster so that DB handles will
64     *               be retrieved via LBFactory::getExternalLB() and getConnection().
65     * @param array $params
66     */
67    protected function __construct( array $params ) {
68        parent::__construct( $params );
69
70        if ( isset( $params['server'] ) ) {
71            $this->server = $params['server'];
72            // Always use autocommit mode, even if DBO_TRX is configured
73            $this->server['flags'] ??= 0;
74            $this->server['flags'] &= ~( IDatabase::DBO_TRX | IDatabase::DBO_DEFAULT );
75        } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
76            $this->cluster = $params['cluster'];
77        }
78    }
79
80    protected function supportedOrders() {
81        return [ 'random', 'timestamp', 'fifo' ];
82    }
83
84    protected function optimalOrder() {
85        return 'random';
86    }
87
88    /**
89     * @see JobQueue::doIsEmpty()
90     * @return bool
91     */
92    protected function doIsEmpty() {
93        $dbr = $this->getReplicaDB();
94        try {
95            // unclaimed job
96            $found = (bool)$dbr->newSelectQueryBuilder()
97                ->select( '1' )
98                ->from( 'job' )
99                ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
100                ->caller( __METHOD__ )->fetchField();
101        } catch ( DBError $e ) {
102            throw $this->getDBException( $e );
103        }
104
105        return !$found;
106    }
107
108    /**
109     * @see JobQueue::doGetSize()
110     * @return int
111     */
112    protected function doGetSize() {
113        $key = $this->getCacheKey( 'size' );
114
115        $size = $this->wanCache->get( $key );
116        if ( is_int( $size ) ) {
117            return $size;
118        }
119
120        $dbr = $this->getReplicaDB();
121        try {
122            $size = $dbr->newSelectQueryBuilder()
123                ->from( 'job' )
124                ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
125                ->caller( __METHOD__ )->fetchRowCount();
126        } catch ( DBError $e ) {
127            throw $this->getDBException( $e );
128        }
129        $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
130
131        return $size;
132    }
133
134    /**
135     * @see JobQueue::doGetAcquiredCount()
136     * @return int
137     */
138    protected function doGetAcquiredCount() {
139        if ( $this->claimTTL <= 0 ) {
140            return 0; // no acknowledgements
141        }
142
143        $key = $this->getCacheKey( 'acquiredcount' );
144
145        $count = $this->wanCache->get( $key );
146        if ( is_int( $count ) ) {
147            return $count;
148        }
149
150        $dbr = $this->getReplicaDB();
151        try {
152            $count = $dbr->newSelectQueryBuilder()
153                ->from( 'job' )
154                ->where( [
155                    'job_cmd' => $this->type,
156                    $dbr->expr( 'job_token', '!=', '' ),
157                ] )
158                ->caller( __METHOD__ )->fetchRowCount();
159        } catch ( DBError $e ) {
160            throw $this->getDBException( $e );
161        }
162        $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
163
164        return $count;
165    }
166
167    /**
168     * @see JobQueue::doGetAbandonedCount()
169     * @return int
170     * @throws JobQueueConnectionError
171     * @throws JobQueueError
172     */
173    protected function doGetAbandonedCount() {
174        if ( $this->claimTTL <= 0 ) {
175            return 0; // no acknowledgements
176        }
177
178        $key = $this->getCacheKey( 'abandonedcount' );
179
180        $count = $this->wanCache->get( $key );
181        if ( is_int( $count ) ) {
182            return $count;
183        }
184
185        $dbr = $this->getReplicaDB();
186        try {
187            $count = $dbr->newSelectQueryBuilder()
188                ->from( 'job' )
189                ->where(
190                    [
191                        'job_cmd' => $this->type,
192                        $dbr->expr( 'job_token', '!=', '' ),
193                        $dbr->expr( 'job_attempts', '>=', $this->maxTries ),
194                    ]
195                )
196                ->caller( __METHOD__ )->fetchRowCount();
197        } catch ( DBError $e ) {
198            throw $this->getDBException( $e );
199        }
200
201        $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
202
203        return $count;
204    }
205
206    /**
207     * @see JobQueue::doBatchPush()
208     * @param IJobSpecification[] $jobs
209     * @param int $flags
210     * @throws DBError|Exception
211     * @return void
212     */
213    protected function doBatchPush( array $jobs, $flags ) {
214        // Silence expectations related to getting a primary DB, as we have to get a primary DB to insert the job.
215        $transactionProfiler = Profiler::instance()->getTransactionProfiler();
216        $scope = $transactionProfiler->silenceForScope();
217        $dbw = $this->getPrimaryDB();
218        ScopedCallback::consume( $scope );
219        // In general, there will be two cases here:
220        // a) sqlite; DB connection is probably a regular round-aware handle.
221        // If the connection is busy with a transaction, then defer the job writes
222        // until right before the main round commit step. Any errors that bubble
223        // up will rollback the main commit round.
224        // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
225        // No transaction is active nor will be started by writes, so enqueue the jobs
226        // now so that any errors will show up immediately as the interface expects. Any
227        // errors that bubble up will rollback the main commit round.
228        $fname = __METHOD__;
229        $dbw->onTransactionPreCommitOrIdle(
230            function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
231                $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
232            },
233            $fname
234        );
235    }
236
237    /**
238     * This function should *not* be called outside of JobQueueDB
239     *
240     * @suppress SecurityCheck-SQLInjection Bug in phan-taint-check handling bulk inserts
241     * @param IDatabase $dbw
242     * @param IJobSpecification[] $jobs
243     * @param int $flags
244     * @param string $method
245     * @throws DBError
246     * @return void
247     */
248    public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
249        if ( $jobs === [] ) {
250            return;
251        }
252
253        $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
254        $rowList = []; // list of jobs for jobs that are not de-duplicated
255        foreach ( $jobs as $job ) {
256            $row = $this->insertFields( $job, $dbw );
257            if ( $job->ignoreDuplicates() ) {
258                $rowSet[$row['job_sha1']] = $row;
259            } else {
260                $rowList[] = $row;
261            }
262        }
263
264        if ( $flags & self::QOS_ATOMIC ) {
265            $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
266        }
267        try {
268            // Strip out any duplicate jobs that are already in the queue...
269            if ( count( $rowSet ) ) {
270                $res = $dbw->newSelectQueryBuilder()
271                    ->select( 'job_sha1' )
272                    ->from( 'job' )
273                    ->where(
274                        [
275                            // No job_type condition since it's part of the job_sha1 hash
276                            'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ),
277                            'job_token' => '' // unclaimed
278                        ]
279                    )
280                    ->caller( $method )->fetchResultSet();
281                foreach ( $res as $row ) {
282                    wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
283                    unset( $rowSet[$row->job_sha1] ); // already enqueued
284                }
285            }
286            // Build the full list of job rows to insert
287            $rows = array_merge( $rowList, array_values( $rowSet ) );
288            // Silence expectations related to inserting to the job table, because we have to perform the inserts to
289            // track the job.
290            $transactionProfiler = Profiler::instance()->getTransactionProfiler();
291            $scope = $transactionProfiler->silenceForScope();
292            // Insert the job rows in chunks to avoid replica DB lag...
293            foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
294                $dbw->newInsertQueryBuilder()
295                    ->insertInto( 'job' )
296                    ->rows( $rowBatch )
297                    ->caller( $method )->execute();
298            }
299            ScopedCallback::consume( $scope );
300            $this->incrStats( 'inserts', $this->type, count( $rows ) );
301            $this->incrStats( 'dupe_inserts', $this->type,
302                count( $rowSet ) + count( $rowList ) - count( $rows )
303            );
304        } catch ( DBError $e ) {
305            throw $this->getDBException( $e );
306        }
307        if ( $flags & self::QOS_ATOMIC ) {
308            $dbw->endAtomic( $method );
309        }
310    }
311
312    /**
313     * @see JobQueue::doPop()
314     * @return RunnableJob|false
315     */
316    protected function doPop() {
317        $job = false; // job popped off
318        try {
319            $uuid = wfRandomString( 32 ); // pop attempt
320            do { // retry when our row is invalid or deleted as a duplicate
321                // Try to reserve a row in the DB...
322                if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
323                    $row = $this->claimOldest( $uuid );
324                } else { // random first
325                    $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
326                    $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
327                    $row = $this->claimRandom( $uuid, $rand, $gte );
328                }
329                // Check if we found a row to reserve...
330                if ( !$row ) {
331                    break; // nothing to do
332                }
333                $this->incrStats( 'pops', $this->type );
334
335                // Get the job object from the row...
336                $job = $this->jobFromRow( $row );
337                break; // done
338            } while ( true );
339
340            if ( !$job || mt_rand( 0, 9 ) == 0 ) {
341                // Handled jobs that need to be recycled/deleted;
342                // any recycled jobs will be picked up next attempt
343                $this->recycleAndDeleteStaleJobs();
344            }
345        } catch ( DBError $e ) {
346            throw $this->getDBException( $e );
347        }
348
349        return $job;
350    }
351
352    /**
353     * Reserve a row with a single UPDATE without holding row locks over RTTs...
354     *
355     * @param string $uuid 32 char hex string
356     * @param int $rand Random unsigned integer (31 bits)
357     * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
358     * @return stdClass|false Row|false
359     */
360    protected function claimRandom( $uuid, $rand, $gte ) {
361        $dbw = $this->getPrimaryDB();
362        // Check cache to see if the queue has <= OFFSET items
363        $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
364
365        $invertedDirection = false; // whether one job_random direction was already scanned
366        // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
367        // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
368        // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
369        // be used here with MySQL.
370        do {
371            if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
372                // For small queues, using OFFSET will overshoot and return no rows more often.
373                // Instead, this uses job_random to pick a row (possibly checking both directions).
374                $row = $dbw->newSelectQueryBuilder()
375                    ->select( self::selectFields() )
376                    ->from( 'job' )
377                    ->where(
378                        [
379                            'job_cmd' => $this->type,
380                            'job_token' => '', // unclaimed
381                            $dbw->expr( 'job_random', $gte ? '>=' : '<=', $rand )
382                        ]
383                    )
384                    ->orderBy(
385                        'job_random',
386                        $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
387                    )
388                    ->caller( __METHOD__ )->fetchRow();
389                if ( !$row && !$invertedDirection ) {
390                    $gte = !$gte;
391                    $invertedDirection = true;
392                    continue; // try the other direction
393                }
394            } else { // table *may* have >= MAX_OFFSET rows
395                // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
396                // in MySQL if there are many rows for some reason. This uses a small OFFSET
397                // instead of job_random for reducing excess claim retries.
398                $row = $dbw->newSelectQueryBuilder()
399                    ->select( self::selectFields() )
400                    ->from( 'job' )
401                    ->where(
402                        [
403                            'job_cmd' => $this->type,
404                            'job_token' => '', // unclaimed
405                        ]
406                    )
407                    ->offset( mt_rand( 0, self::MAX_OFFSET ) )
408                    ->caller( __METHOD__ )->fetchRow();
409                if ( !$row ) {
410                    $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
411                    $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
412                    continue; // use job_random
413                }
414            }
415
416            if ( !$row ) {
417                break;
418            }
419
420            $dbw->newUpdateQueryBuilder()
421                ->update( 'job' ) // update by PK
422                ->set( [
423                    'job_token' => $uuid,
424                    'job_token_timestamp' => $dbw->timestamp(),
425                    'job_attempts' => new RawSQLValue( 'job_attempts+1' ),
426                ] )
427                ->where( [
428                    'job_cmd' => $this->type,
429                    'job_id' => $row->job_id,
430                    'job_token' => ''
431                ] )
432                ->caller( __METHOD__ )->execute();
433
434            // This might get raced out by another runner when claiming the previously
435            // selected row. The use of job_random should minimize this problem, however.
436            if ( !$dbw->affectedRows() ) {
437                $row = false; // raced out
438            }
439        } while ( !$row );
440
441        return $row;
442    }
443
444    /**
445     * Reserve a row with a single UPDATE without holding row locks over RTTs...
446     *
447     * @param string $uuid 32 char hex string
448     * @return stdClass|false Row|false
449     */
450    protected function claimOldest( $uuid ) {
451        $dbw = $this->getPrimaryDB();
452
453        $row = false; // the row acquired
454        do {
455            if ( $dbw->getType() === 'mysql' ) {
456                // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
457                // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
458                // Postgres has no such limitation. However, MySQL offers an
459                // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
460                $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
461                    "SET " .
462                        "job_token = {$dbw->addQuotes( $uuid ) }" .
463                        "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}" .
464                        "job_attempts = job_attempts+1 " .
465                    "WHERE ( " .
466                        "job_cmd = {$dbw->addQuotes( $this->type )} " .
467                        "AND job_token = {$dbw->addQuotes( '' )} " .
468                    ") ORDER BY job_id ASC LIMIT 1",
469                    __METHOD__
470                );
471            } else {
472                // Use a subquery to find the job, within an UPDATE to claim it.
473                // This uses as much of the DB wrapper functions as possible.
474                $qb = $dbw->newSelectQueryBuilder()
475                    ->select( 'job_id' )
476                    ->from( 'job' )
477                    ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
478                    ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC )
479                    ->limit( 1 );
480
481                $dbw->newUpdateQueryBuilder()
482                    ->update( 'job' )
483                    ->set( [
484                        'job_token' => $uuid,
485                        'job_token_timestamp' => $dbw->timestamp(),
486                        'job_attempts' => new RawSQLValue( 'job_attempts+1' ),
487                    ] )
488                    ->where( [ 'job_id' => new RawSQLValue( '(' . $qb->getSQL() . ')' ) ] )
489                    ->caller( __METHOD__ )->execute();
490            }
491
492            if ( !$dbw->affectedRows() ) {
493                break;
494            }
495
496            // Fetch any row that we just reserved...
497            $row = $dbw->newSelectQueryBuilder()
498                ->select( self::selectFields() )
499                ->from( 'job' )
500                ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] )
501                ->caller( __METHOD__ )->fetchRow();
502            if ( !$row ) { // raced out by duplicate job removal
503                wfDebug( "Row deleted as duplicate by another process." );
504            }
505        } while ( !$row );
506
507        return $row;
508    }
509
510    /**
511     * @see JobQueue::doAck()
512     * @param RunnableJob $job
513     * @throws JobQueueConnectionError
514     * @throws JobQueueError
515     */
516    protected function doAck( RunnableJob $job ) {
517        $id = $job->getMetadata( 'id' );
518        if ( $id === null ) {
519            throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no ID." );
520        }
521
522        $dbw = $this->getPrimaryDB();
523        try {
524            // Delete a row with a single DELETE without holding row locks over RTTs...
525            $dbw->newDeleteQueryBuilder()
526                ->deleteFrom( 'job' )
527                ->where( [ 'job_cmd' => $this->type, 'job_id' => $id ] )
528                ->caller( __METHOD__ )->execute();
529
530            $this->incrStats( 'acks', $this->type );
531        } catch ( DBError $e ) {
532            throw $this->getDBException( $e );
533        }
534    }
535
536    /**
537     * @see JobQueue::doDeduplicateRootJob()
538     * @param IJobSpecification $job
539     * @throws JobQueueConnectionError
540     * @return bool
541     */
542    protected function doDeduplicateRootJob( IJobSpecification $job ) {
543        // Callers should call JobQueueGroup::push() before this method so that if the
544        // insert fails, the de-duplication registration will be aborted. Since the insert
545        // is deferred till "transaction idle", do the same here, so that the ordering is
546        // maintained. Having only the de-duplication registration succeed would cause
547        // jobs to become no-ops without any actual jobs that made them redundant.
548        $dbw = $this->getPrimaryDB();
549        $dbw->onTransactionCommitOrIdle(
550            function () use ( $job ) {
551                parent::doDeduplicateRootJob( $job );
552            },
553            __METHOD__
554        );
555
556        return true;
557    }
558
559    /**
560     * @see JobQueue::doDelete()
561     * @return bool
562     */
563    protected function doDelete() {
564        $dbw = $this->getPrimaryDB();
565        try {
566            $dbw->newDeleteQueryBuilder()
567                ->deleteFrom( 'job' )
568                ->where( [ 'job_cmd' => $this->type ] )
569                ->caller( __METHOD__ )->execute();
570        } catch ( DBError $e ) {
571            throw $this->getDBException( $e );
572        }
573
574        return true;
575    }
576
577    /**
578     * @see JobQueue::doWaitForBackups()
579     * @return void
580     */
581    protected function doWaitForBackups() {
582        if ( $this->server ) {
583            return; // not using LBFactory instance
584        }
585
586        $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
587        $lbFactory->waitForReplication();
588    }
589
590    /**
591     * @return void
592     */
593    protected function doFlushCaches() {
594        foreach ( [ 'size', 'acquiredcount' ] as $type ) {
595            $this->wanCache->delete( $this->getCacheKey( $type ) );
596        }
597    }
598
599    /**
600     * @see JobQueue::getAllQueuedJobs()
601     * @return Iterator<RunnableJob>
602     */
603    public function getAllQueuedJobs() {
604        return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
605    }
606
607    /**
608     * @see JobQueue::getAllAcquiredJobs()
609     * @return Iterator<RunnableJob>
610     */
611    public function getAllAcquiredJobs() {
612        $dbr = $this->getReplicaDB();
613        return $this->getJobIterator( [ 'job_cmd' => $this->getType(), $dbr->expr( 'job_token', '>', '' ) ] );
614    }
615
616    /**
617     * @see JobQueue::getAllAbandonedJobs()
618     * @return Iterator<RunnableJob>
619     */
620    public function getAllAbandonedJobs() {
621        $dbr = $this->getReplicaDB();
622        return $this->getJobIterator( [
623            'job_cmd' => $this->getType(),
624            $dbr->expr( 'job_token', '>', '' ),
625            $dbr->expr( 'job_attempts', '>=', intval( $this->maxTries ) ),
626        ] );
627    }
628
629    /**
630     * @param array $conds Query conditions
631     * @return Iterator<RunnableJob>
632     */
633    protected function getJobIterator( array $conds ) {
634        $dbr = $this->getReplicaDB();
635        $qb = $dbr->newSelectQueryBuilder()
636            ->select( self::selectFields() )
637            ->from( 'job' )
638            ->where( $conds );
639        try {
640            return new MappedIterator(
641                $qb->caller( __METHOD__ )->fetchResultSet(),
642                function ( $row ) {
643                    return $this->jobFromRow( $row );
644                }
645            );
646        } catch ( DBError $e ) {
647            throw $this->getDBException( $e );
648        }
649    }
650
651    public function getCoalesceLocationInternal() {
652        if ( $this->server ) {
653            return null; // not using the LBFactory instance
654        }
655
656        return is_string( $this->cluster )
657            ? "DBCluster:{$this->cluster}:{$this->domain}"
658            : "LBFactory:{$this->domain}";
659    }
660
661    protected function doGetSiblingQueuesWithJobs( array $types ) {
662        $dbr = $this->getReplicaDB();
663        // @note: this does not check whether the jobs are claimed or not.
664        // This is useful so JobQueueGroup::pop() also sees queues that only
665        // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
666        // failed jobs so that they can be popped again for that edge case.
667        $res = $dbr->newSelectQueryBuilder()
668            ->select( 'job_cmd' )
669            ->distinct()
670            ->from( 'job' )
671            ->where( [ 'job_cmd' => $types ] )
672            ->caller( __METHOD__ )->fetchResultSet();
673
674        $types = [];
675        foreach ( $res as $row ) {
676            $types[] = $row->job_cmd;
677        }
678
679        return $types;
680    }
681
682    protected function doGetSiblingQueueSizes( array $types ) {
683        $dbr = $this->getReplicaDB();
684
685        $res = $dbr->newSelectQueryBuilder()
686            ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] )
687            ->from( 'job' )
688            ->where( [ 'job_cmd' => $types ] )
689            ->groupBy( 'job_cmd' )
690            ->caller( __METHOD__ )->fetchResultSet();
691
692        $sizes = [];
693        foreach ( $res as $row ) {
694            $sizes[$row->job_cmd] = (int)$row->count;
695        }
696
697        return $sizes;
698    }
699
700    /**
701     * Recycle or destroy any jobs that have been claimed for too long
702     *
703     * @return int Number of jobs recycled/deleted
704     */
705    public function recycleAndDeleteStaleJobs() {
706        $now = time();
707        $count = 0; // affected rows
708        $dbw = $this->getPrimaryDB();
709
710        try {
711            if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
712                return $count; // already in progress
713            }
714
715            // Remove claims on jobs acquired for too long if enabled...
716            if ( $this->claimTTL > 0 ) {
717                $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
718                // Get the IDs of jobs that have be claimed but not finished after too long.
719                // These jobs can be recycled into the queue by expiring the claim. Selecting
720                // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
721                $res = $dbw->newSelectQueryBuilder()
722                    ->select( 'job_id' )
723                    ->from( 'job' )
724                    ->where(
725                        [
726                            'job_cmd' => $this->type,
727                            $dbw->expr( 'job_token', '!=', '' ), // was acquired
728                            $dbw->expr( 'job_token_timestamp', '<', $claimCutoff ), // stale
729                            $dbw->expr( 'job_attempts', '<', $this->maxTries ), // retries left
730                        ]
731                    )
732                    ->caller( __METHOD__ )->fetchResultSet();
733                $ids = array_map(
734                    static function ( $o ) {
735                        return $o->job_id;
736                    }, iterator_to_array( $res )
737                );
738                if ( count( $ids ) ) {
739                    // Reset job_token for these jobs so that other runners will pick them up.
740                    // Set the timestamp to the current time, as it is useful to now that the job
741                    // was already tried before (the timestamp becomes the "released" time).
742                    $dbw->newUpdateQueryBuilder()
743                        ->update( 'job' )
744                        ->set( [
745                            'job_token' => '',
746                            'job_token_timestamp' => $dbw->timestamp( $now ) // time of release
747                        ] )
748                        ->where( [
749                            'job_id' => $ids,
750                            $dbw->expr( 'job_token', '!=', '' ),
751                        ] )
752                        ->caller( __METHOD__ )->execute();
753
754                    $affected = $dbw->affectedRows();
755                    $count += $affected;
756                    $this->incrStats( 'recycles', $this->type, $affected );
757                }
758            }
759
760            // Just destroy any stale jobs...
761            $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
762            $qb = $dbw->newSelectQueryBuilder()
763                ->select( 'job_id' )
764                ->from( 'job' )
765                ->where(
766                    [
767                        'job_cmd' => $this->type,
768                        $dbw->expr( 'job_token', '!=', '' ), // was acquired
769                        $dbw->expr( 'job_token_timestamp', '<', $pruneCutoff ) // stale
770                    ]
771                );
772            if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
773                $qb->andWhere( $dbw->expr( 'job_attempts', '>=', $this->maxTries ) );
774            }
775            // Get the IDs of jobs that are considered stale and should be removed. Selecting
776            // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
777            $res = $qb->caller( __METHOD__ )->fetchResultSet();
778            $ids = array_map(
779                static function ( $o ) {
780                    return $o->job_id;
781                }, iterator_to_array( $res )
782            );
783            if ( count( $ids ) ) {
784                $dbw->newDeleteQueryBuilder()
785                    ->deleteFrom( 'job' )
786                    ->where( [ 'job_id' => $ids ] )
787                    ->caller( __METHOD__ )->execute();
788                $affected = $dbw->affectedRows();
789                $count += $affected;
790                $this->incrStats( 'abandons', $this->type, $affected );
791            }
792
793            $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
794        } catch ( DBError $e ) {
795            throw $this->getDBException( $e );
796        }
797
798        return $count;
799    }
800
801    /**
802     * @param IJobSpecification $job
803     * @param IReadableDatabase $db
804     * @return array
805     */
806    protected function insertFields( IJobSpecification $job, IReadableDatabase $db ) {
807        return [
808            // Fields that describe the nature of the job
809            'job_cmd' => $job->getType(),
810            'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
811            'job_title' => $job->getParams()['title'] ?? '',
812            'job_params' => self::makeBlob( $job->getParams() ),
813            // Additional job metadata
814            'job_timestamp' => $db->timestamp(),
815            'job_sha1' => Wikimedia\base_convert(
816                sha1( serialize( $job->getDeduplicationInfo() ) ),
817                16, 36, 31
818            ),
819            'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
820        ];
821    }
822
823    /**
824     * @throws JobQueueConnectionError
825     * @return IDatabase
826     */
827    protected function getReplicaDB() {
828        try {
829            return $this->getDB( DB_REPLICA );
830        } catch ( DBConnectionError $e ) {
831            throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
832        }
833    }
834
835    /**
836     * @throws JobQueueConnectionError
837     * @return IMaintainableDatabase
838     * @since 1.37
839     */
840    protected function getPrimaryDB() {
841        try {
842            return $this->getDB( DB_PRIMARY );
843        } catch ( DBConnectionError $e ) {
844            throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
845        }
846    }
847
848    /**
849     * @param int $index (DB_REPLICA/DB_PRIMARY)
850     * @return IMaintainableDatabase
851     */
852    protected function getDB( $index ) {
853        if ( $this->server ) {
854            if ( $this->conn instanceof IDatabase ) {
855                return $this->conn;
856            } elseif ( $this->conn instanceof DBError ) {
857                throw $this->conn;
858            }
859
860            try {
861                $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create(
862                    $this->server['type'],
863                    $this->server
864                );
865            } catch ( DBError $e ) {
866                $this->conn = $e;
867                throw $e;
868            }
869
870            return $this->conn;
871        } else {
872            $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
873            $lb = is_string( $this->cluster )
874                ? $lbFactory->getExternalLB( $this->cluster )
875                : $lbFactory->getMainLB( $this->domain );
876
877            if ( $lb->getServerType( ServerInfo::WRITER_INDEX ) !== 'sqlite' ) {
878                // Keep a separate connection to avoid contention and deadlocks;
879                // However, SQLite has the opposite behavior due to DB-level locking.
880                $flags = $lb::CONN_TRX_AUTOCOMMIT;
881            } else {
882                // Jobs insertion will be deferred until the PRESEND stage to reduce contention.
883                $flags = 0;
884            }
885
886            return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
887        }
888    }
889
890    /**
891     * @param string $property
892     * @return string
893     */
894    private function getCacheKey( $property ) {
895        $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
896
897        return $this->wanCache->makeGlobalKey(
898            'jobqueue',
899            $this->domain,
900            $cluster,
901            $this->type,
902            $property
903        );
904    }
905
906    /**
907     * @param array|false $params
908     * @return string
909     */
910    protected static function makeBlob( $params ) {
911        if ( $params !== false ) {
912            return serialize( $params );
913        } else {
914            return '';
915        }
916    }
917
918    /**
919     * @param stdClass $row
920     * @return RunnableJob
921     */
922    protected function jobFromRow( $row ) {
923        $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
924        if ( !is_array( $params ) ) { // this shouldn't happen
925            throw new UnexpectedValueException(
926                "Could not unserialize job with ID '{$row->job_id}'." );
927        }
928
929        $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
930        $job = $this->factoryJob( $row->job_cmd, $params );
931        $job->setMetadata( 'id', $row->job_id );
932        $job->setMetadata( 'timestamp', $row->job_timestamp );
933
934        return $job;
935    }
936
937    /**
938     * @param DBError $e
939     * @return JobQueueError
940     */
941    protected function getDBException( DBError $e ) {
942        return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
943    }
944
945    /**
946     * Return the list of job fields that should be selected.
947     * @since 1.23
948     * @return array
949     */
950    public static function selectFields() {
951        return [
952            'job_id',
953            'job_cmd',
954            'job_namespace',
955            'job_title',
956            'job_timestamp',
957            'job_params',
958            'job_random',
959            'job_attempts',
960            'job_token',
961            'job_token_timestamp',
962            'job_sha1',
963        ];
964    }
965}