Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
53.15% covered (warning)
53.15%
245 / 461
23.53% covered (danger)
23.53%
8 / 34
CRAP
0.00% covered (danger)
0.00%
0 / 1
JobQueueDB
53.26% covered (warning)
53.26%
245 / 460
23.53% covered (danger)
23.53%
8 / 34
1099.72
0.00% covered (danger)
0.00%
0 / 1
 __construct
42.86% covered (danger)
42.86%
3 / 7
0.00% covered (danger)
0.00%
0 / 1
6.99
 supportedOrders
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 optimalOrder
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 doIsEmpty
77.78% covered (warning)
77.78%
7 / 9
0.00% covered (danger)
0.00%
0 / 1
2.04
 doGetSize
76.92% covered (warning)
76.92%
10 / 13
0.00% covered (danger)
0.00%
0 / 1
3.11
 doGetAcquiredCount
77.78% covered (warning)
77.78%
14 / 18
0.00% covered (danger)
0.00%
0 / 1
4.18
 doGetAbandonedCount
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
20
 doBatchPush
100.00% covered (success)
100.00%
9 / 9
100.00% covered (success)
100.00%
1 / 1
1
 doBatchPushInternal
54.76% covered (warning)
54.76%
23 / 42
0.00% covered (danger)
0.00%
0 / 1
19.26
 doPop
76.47% covered (warning)
76.47%
13 / 17
0.00% covered (danger)
0.00%
0 / 1
6.47
 claimRandom
96.49% covered (success)
96.49%
55 / 57
0.00% covered (danger)
0.00%
0 / 1
9
 claimOldest
0.00% covered (danger)
0.00%
0 / 40
0.00% covered (danger)
0.00%
0 / 1
20
 doAck
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
12
 doDeduplicateRootJob
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
2
 doDelete
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 doWaitForBackups
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 doFlushCaches
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 getAllQueuedJobs
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getAllAcquiredJobs
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getAllAbandonedJobs
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
2
 getJobIterator
84.62% covered (warning)
84.62%
11 / 13
0.00% covered (danger)
0.00%
0 / 1
2.01
 getCoalesceLocationInternal
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 doGetSiblingQueuesWithJobs
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 doGetSiblingQueueSizes
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 recycleAndDeleteStaleJobs
62.86% covered (warning)
62.86%
44 / 70
0.00% covered (danger)
0.00%
0 / 1
9.51
 insertFields
100.00% covered (success)
100.00%
12 / 12
100.00% covered (success)
100.00%
1 / 1
1
 getReplicaDB
33.33% covered (danger)
33.33%
1 / 3
0.00% covered (danger)
0.00%
0 / 1
3.19
 getPrimaryDB
33.33% covered (danger)
33.33%
1 / 3
0.00% covered (danger)
0.00%
0 / 1
3.19
 getDB
33.33% covered (danger)
33.33%
7 / 21
0.00% covered (danger)
0.00%
0 / 1
21.52
 getCacheKey
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
2
 makeBlob
66.67% covered (warning)
66.67%
2 / 3
0.00% covered (danger)
0.00%
0 / 1
2.15
 jobFromRow
77.78% covered (warning)
77.78%
7 / 9
0.00% covered (danger)
0.00%
0 / 1
3.10
 getDBException
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 selectFields
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
1
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20
21namespace MediaWiki\JobQueue;
22
23use MappedIterator;
24use MediaWiki\JobQueue\Exceptions\JobQueueConnectionError;
25use MediaWiki\JobQueue\Exceptions\JobQueueError;
26use MediaWiki\MediaWikiServices;
27use Profiler;
28use stdClass;
29use UnexpectedValueException;
30use Wikimedia\Rdbms\DBConnectionError;
31use Wikimedia\Rdbms\DBError;
32use Wikimedia\Rdbms\IDatabase;
33use Wikimedia\Rdbms\IMaintainableDatabase;
34use Wikimedia\Rdbms\IReadableDatabase;
35use Wikimedia\Rdbms\RawSQLValue;
36use Wikimedia\Rdbms\SelectQueryBuilder;
37use Wikimedia\Rdbms\ServerInfo;
38use Wikimedia\ScopedCallback;
39
40/**
41 * Database-backed job queue storage.
42 *
43 * @since 1.21
44 * @ingroup JobQueue
45 */
46class JobQueueDB extends JobQueue {
47    /* seconds to cache info without re-validating */
48    private const CACHE_TTL_SHORT = 30;
49    /* seconds a job can live once claimed */
50    private const MAX_AGE_PRUNE = 7 * 24 * 3600;
51    /**
52     * Used for job_random, the highest safe 32-bit signed integer.
53     * Equivalent to `( 2 ** 31 ) - 1` on 64-bit.
54     */
55    private const MAX_JOB_RANDOM = 2_147_483_647;
56    /* maximum number of rows to skip */
57    private const MAX_OFFSET = 255;
58
59    /** @var IMaintainableDatabase|DBError|null */
60    protected $conn;
61
62    /** @var array|null Server configuration array */
63    protected $server;
64    /** @var string|null Name of an external DB cluster or null for the local DB cluster */
65    protected $cluster;
66
67    /**
68     * Additional parameters include:
69     *   - server  : Server configuration array for Database::factory. Overrides "cluster".
70     *   - cluster : The name of an external cluster registered via LBFactory.
71     *               If not specified, the primary DB cluster for the wiki will be used.
72     *               This can be overridden with a custom cluster so that DB handles will
73     *               be retrieved via LBFactory::getExternalLB() and getConnection().
74     */
75    protected function __construct( array $params ) {
76        parent::__construct( $params );
77
78        if ( isset( $params['server'] ) ) {
79            $this->server = $params['server'];
80            // Always use autocommit mode, even if DBO_TRX is configured
81            $this->server['flags'] ??= 0;
82            $this->server['flags'] &= ~( IDatabase::DBO_TRX | IDatabase::DBO_DEFAULT );
83        } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
84            $this->cluster = $params['cluster'];
85        }
86    }
87
88    protected function supportedOrders() {
89        return [ 'random', 'timestamp', 'fifo' ];
90    }
91
92    protected function optimalOrder() {
93        return 'random';
94    }
95
96    /**
97     * @see JobQueue::doIsEmpty()
98     * @return bool
99     */
100    protected function doIsEmpty() {
101        $dbr = $this->getReplicaDB();
102        try {
103            // unclaimed job
104            $found = (bool)$dbr->newSelectQueryBuilder()
105                ->select( '1' )
106                ->from( 'job' )
107                ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
108                ->caller( __METHOD__ )->fetchField();
109        } catch ( DBError $e ) {
110            throw $this->getDBException( $e );
111        }
112
113        return !$found;
114    }
115
116    /**
117     * @see JobQueue::doGetSize()
118     * @return int
119     */
120    protected function doGetSize() {
121        $key = $this->getCacheKey( 'size' );
122
123        $size = $this->wanCache->get( $key );
124        if ( is_int( $size ) ) {
125            return $size;
126        }
127
128        $dbr = $this->getReplicaDB();
129        try {
130            $size = $dbr->newSelectQueryBuilder()
131                ->from( 'job' )
132                ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
133                ->caller( __METHOD__ )->fetchRowCount();
134        } catch ( DBError $e ) {
135            throw $this->getDBException( $e );
136        }
137        $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
138
139        return $size;
140    }
141
142    /**
143     * @see JobQueue::doGetAcquiredCount()
144     * @return int
145     */
146    protected function doGetAcquiredCount() {
147        if ( $this->claimTTL <= 0 ) {
148            return 0; // no acknowledgements
149        }
150
151        $key = $this->getCacheKey( 'acquiredcount' );
152
153        $count = $this->wanCache->get( $key );
154        if ( is_int( $count ) ) {
155            return $count;
156        }
157
158        $dbr = $this->getReplicaDB();
159        try {
160            $count = $dbr->newSelectQueryBuilder()
161                ->from( 'job' )
162                ->where( [
163                    'job_cmd' => $this->type,
164                    $dbr->expr( 'job_token', '!=', '' ),
165                ] )
166                ->caller( __METHOD__ )->fetchRowCount();
167        } catch ( DBError $e ) {
168            throw $this->getDBException( $e );
169        }
170        $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
171
172        return $count;
173    }
174
175    /**
176     * @see JobQueue::doGetAbandonedCount()
177     * @return int
178     * @throws JobQueueConnectionError
179     * @throws JobQueueError
180     */
181    protected function doGetAbandonedCount() {
182        if ( $this->claimTTL <= 0 ) {
183            return 0; // no acknowledgements
184        }
185
186        $key = $this->getCacheKey( 'abandonedcount' );
187
188        $count = $this->wanCache->get( $key );
189        if ( is_int( $count ) ) {
190            return $count;
191        }
192
193        $dbr = $this->getReplicaDB();
194        try {
195            $count = $dbr->newSelectQueryBuilder()
196                ->from( 'job' )
197                ->where(
198                    [
199                        'job_cmd' => $this->type,
200                        $dbr->expr( 'job_token', '!=', '' ),
201                        $dbr->expr( 'job_attempts', '>=', $this->maxTries ),
202                    ]
203                )
204                ->caller( __METHOD__ )->fetchRowCount();
205        } catch ( DBError $e ) {
206            throw $this->getDBException( $e );
207        }
208
209        $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
210
211        return $count;
212    }
213
214    /**
215     * @see JobQueue::doBatchPush()
216     * @param IJobSpecification[] $jobs
217     * @param int $flags
218     * @throws DBError|\Exception
219     * @return void
220     */
221    protected function doBatchPush( array $jobs, $flags ) {
222        // Silence expectations related to getting a primary DB, as we have to get a primary DB to insert the job.
223        $transactionProfiler = Profiler::instance()->getTransactionProfiler();
224        $scope = $transactionProfiler->silenceForScope();
225        $dbw = $this->getPrimaryDB();
226        ScopedCallback::consume( $scope );
227        // In general, there will be two cases here:
228        // a) sqlite; DB connection is probably a regular round-aware handle.
229        // If the connection is busy with a transaction, then defer the job writes
230        // until right before the main round commit step. Any errors that bubble
231        // up will rollback the main commit round.
232        // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
233        // No transaction is active nor will be started by writes, so enqueue the jobs
234        // now so that any errors will show up immediately as the interface expects. Any
235        // errors that bubble up will rollback the main commit round.
236        $fname = __METHOD__;
237        $dbw->onTransactionPreCommitOrIdle(
238            fn () => $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname ),
239            $fname
240        );
241    }
242
243    /**
244     * This function should *not* be called outside of JobQueueDB
245     *
246     * @suppress SecurityCheck-SQLInjection Bug in phan-taint-check handling bulk inserts
247     * @param IDatabase $dbw
248     * @param IJobSpecification[] $jobs
249     * @param int $flags
250     * @param string $method
251     * @throws DBError
252     * @return void
253     */
254    public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
255        if ( $jobs === [] ) {
256            return;
257        }
258
259        $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
260        $rowList = []; // list of jobs for jobs that are not de-duplicated
261        foreach ( $jobs as $job ) {
262            $row = $this->insertFields( $job, $dbw );
263            if ( $job->ignoreDuplicates() ) {
264                $rowSet[$row['job_sha1']] = $row;
265            } else {
266                $rowList[] = $row;
267            }
268        }
269
270        if ( $flags & self::QOS_ATOMIC ) {
271            $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
272        }
273        try {
274            // Strip out any duplicate jobs that are already in the queue...
275            if ( count( $rowSet ) ) {
276                $res = $dbw->newSelectQueryBuilder()
277                    ->select( 'job_sha1' )
278                    ->from( 'job' )
279                    ->where(
280                        [
281                            // No job_type condition since it's part of the job_sha1 hash
282                            'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ),
283                            'job_token' => '' // unclaimed
284                        ]
285                    )
286                    ->caller( $method )->fetchResultSet();
287                foreach ( $res as $row ) {
288                    wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
289                    unset( $rowSet[$row->job_sha1] ); // already enqueued
290                }
291            }
292            // Build the full list of job rows to insert
293            $rows = array_merge( $rowList, array_values( $rowSet ) );
294            // Silence expectations related to inserting to the job table, because we have to perform the inserts to
295            // track the job.
296            $transactionProfiler = Profiler::instance()->getTransactionProfiler();
297            $scope = $transactionProfiler->silenceForScope();
298            // Insert the job rows in chunks to avoid replica DB lag...
299            foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
300                $dbw->newInsertQueryBuilder()
301                    ->insertInto( 'job' )
302                    ->rows( $rowBatch )
303                    ->caller( $method )->execute();
304            }
305            ScopedCallback::consume( $scope );
306            $this->incrStats( 'inserts', $this->type, count( $rows ) );
307            $this->incrStats( 'dupe_inserts', $this->type,
308                count( $rowSet ) + count( $rowList ) - count( $rows )
309            );
310        } catch ( DBError $e ) {
311            throw $this->getDBException( $e );
312        }
313        if ( $flags & self::QOS_ATOMIC ) {
314            $dbw->endAtomic( $method );
315        }
316    }
317
318    /**
319     * @see JobQueue::doPop()
320     * @return RunnableJob|false
321     */
322    protected function doPop() {
323        $job = false; // job popped off
324        try {
325            $uuid = wfRandomString( 32 ); // pop attempt
326            do { // retry when our row is invalid or deleted as a duplicate
327                // Try to reserve a row in the DB...
328                if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
329                    $row = $this->claimOldest( $uuid );
330                } else { // random first
331                    $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
332                    $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
333                    $row = $this->claimRandom( $uuid, $rand, $gte );
334                }
335                // Check if we found a row to reserve...
336                if ( !$row ) {
337                    break; // nothing to do
338                }
339                $this->incrStats( 'pops', $this->type );
340
341                // Get the job object from the row...
342                $job = $this->jobFromRow( $row );
343                break; // done
344            } while ( true );
345
346            if ( !$job || mt_rand( 0, 9 ) == 0 ) {
347                // Handled jobs that need to be recycled/deleted;
348                // any recycled jobs will be picked up next attempt
349                $this->recycleAndDeleteStaleJobs();
350            }
351        } catch ( DBError $e ) {
352            throw $this->getDBException( $e );
353        }
354
355        return $job;
356    }
357
358    /**
359     * Reserve a row with a single UPDATE without holding row locks over RTTs...
360     *
361     * @param string $uuid 32 char hex string
362     * @param int $rand Random unsigned integer (31 bits)
363     * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
364     * @return stdClass|false Row|false
365     */
366    protected function claimRandom( $uuid, $rand, $gte ) {
367        $dbw = $this->getPrimaryDB();
368        // Check cache to see if the queue has <= OFFSET items
369        $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
370
371        $invertedDirection = false; // whether one job_random direction was already scanned
372        // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
373        // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
374        // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
375        // be used here with MySQL.
376        do {
377            if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
378                // For small queues, using OFFSET will overshoot and return no rows more often.
379                // Instead, this uses job_random to pick a row (possibly checking both directions).
380                $row = $dbw->newSelectQueryBuilder()
381                    ->select( self::selectFields() )
382                    ->from( 'job' )
383                    ->where(
384                        [
385                            'job_cmd' => $this->type,
386                            'job_token' => '', // unclaimed
387                            $dbw->expr( 'job_random', $gte ? '>=' : '<=', $rand )
388                        ]
389                    )
390                    ->orderBy(
391                        'job_random',
392                        $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
393                    )
394                    ->caller( __METHOD__ )->fetchRow();
395                if ( !$row && !$invertedDirection ) {
396                    $gte = !$gte;
397                    $invertedDirection = true;
398                    continue; // try the other direction
399                }
400            } else { // table *may* have >= MAX_OFFSET rows
401                // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
402                // in MySQL if there are many rows for some reason. This uses a small OFFSET
403                // instead of job_random for reducing excess claim retries.
404                $row = $dbw->newSelectQueryBuilder()
405                    ->select( self::selectFields() )
406                    ->from( 'job' )
407                    ->where(
408                        [
409                            'job_cmd' => $this->type,
410                            'job_token' => '', // unclaimed
411                        ]
412                    )
413                    ->offset( mt_rand( 0, self::MAX_OFFSET ) )
414                    ->caller( __METHOD__ )->fetchRow();
415                if ( !$row ) {
416                    $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
417                    $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
418                    continue; // use job_random
419                }
420            }
421
422            if ( !$row ) {
423                break;
424            }
425
426            $dbw->newUpdateQueryBuilder()
427                ->update( 'job' ) // update by PK
428                ->set( [
429                    'job_token' => $uuid,
430                    'job_token_timestamp' => $dbw->timestamp(),
431                    'job_attempts' => new RawSQLValue( 'job_attempts+1' ),
432                ] )
433                ->where( [
434                    'job_cmd' => $this->type,
435                    'job_id' => $row->job_id,
436                    'job_token' => ''
437                ] )
438                ->caller( __METHOD__ )->execute();
439
440            // This might get raced out by another runner when claiming the previously
441            // selected row. The use of job_random should minimize this problem, however.
442            if ( !$dbw->affectedRows() ) {
443                $row = false; // raced out
444            }
445        } while ( !$row );
446
447        return $row;
448    }
449
450    /**
451     * Reserve a row with a single UPDATE without holding row locks over RTTs...
452     *
453     * @param string $uuid 32 char hex string
454     * @return stdClass|false Row|false
455     */
456    protected function claimOldest( $uuid ) {
457        $dbw = $this->getPrimaryDB();
458
459        $row = false; // the row acquired
460        do {
461            if ( $dbw->getType() === 'mysql' ) {
462                // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
463                // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
464                // Postgres has no such limitation. However, MySQL offers an
465                // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
466                $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
467                    "SET " .
468                        "job_token = {$dbw->addQuotes( $uuid ) }" .
469                        "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}" .
470                        "job_attempts = job_attempts+1 " .
471                    "WHERE ( " .
472                        "job_cmd = {$dbw->addQuotes( $this->type )} " .
473                        "AND job_token = {$dbw->addQuotes( '' )} " .
474                    ") ORDER BY job_id ASC LIMIT 1",
475                    __METHOD__
476                );
477            } else {
478                // Use a subquery to find the job, within an UPDATE to claim it.
479                // This uses as much of the DB wrapper functions as possible.
480                $qb = $dbw->newSelectQueryBuilder()
481                    ->select( 'job_id' )
482                    ->from( 'job' )
483                    ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
484                    ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC )
485                    ->limit( 1 );
486
487                $dbw->newUpdateQueryBuilder()
488                    ->update( 'job' )
489                    ->set( [
490                        'job_token' => $uuid,
491                        'job_token_timestamp' => $dbw->timestamp(),
492                        'job_attempts' => new RawSQLValue( 'job_attempts+1' ),
493                    ] )
494                    ->where( [ 'job_id' => new RawSQLValue( '(' . $qb->getSQL() . ')' ) ] )
495                    ->caller( __METHOD__ )->execute();
496            }
497
498            if ( !$dbw->affectedRows() ) {
499                break;
500            }
501
502            // Fetch any row that we just reserved...
503            $row = $dbw->newSelectQueryBuilder()
504                ->select( self::selectFields() )
505                ->from( 'job' )
506                ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] )
507                ->caller( __METHOD__ )->fetchRow();
508            if ( !$row ) { // raced out by duplicate job removal
509                wfDebug( "Row deleted as duplicate by another process." );
510            }
511        } while ( !$row );
512
513        return $row;
514    }
515
516    /**
517     * @see JobQueue::doAck()
518     * @param RunnableJob $job
519     * @throws JobQueueConnectionError
520     * @throws JobQueueError
521     */
522    protected function doAck( RunnableJob $job ) {
523        $id = $job->getMetadata( 'id' );
524        if ( $id === null ) {
525            throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no ID." );
526        }
527
528        $dbw = $this->getPrimaryDB();
529        try {
530            // Delete a row with a single DELETE without holding row locks over RTTs...
531            $dbw->newDeleteQueryBuilder()
532                ->deleteFrom( 'job' )
533                ->where( [ 'job_cmd' => $this->type, 'job_id' => $id ] )
534                ->caller( __METHOD__ )->execute();
535
536            $this->incrStats( 'acks', $this->type );
537        } catch ( DBError $e ) {
538            throw $this->getDBException( $e );
539        }
540    }
541
542    /**
543     * @see JobQueue::doDeduplicateRootJob()
544     * @param IJobSpecification $job
545     * @throws JobQueueConnectionError
546     * @return bool
547     */
548    protected function doDeduplicateRootJob( IJobSpecification $job ) {
549        // Callers should call JobQueueGroup::push() before this method so that if the
550        // insert fails, the de-duplication registration will be aborted. Since the insert
551        // is deferred till "transaction idle", do the same here, so that the ordering is
552        // maintained. Having only the de-duplication registration succeed would cause
553        // jobs to become no-ops without any actual jobs that made them redundant.
554        $dbw = $this->getPrimaryDB();
555        $dbw->onTransactionCommitOrIdle(
556            function () use ( $job ) {
557                parent::doDeduplicateRootJob( $job );
558            },
559            __METHOD__
560        );
561
562        return true;
563    }
564
565    /**
566     * @see JobQueue::doDelete()
567     * @return bool
568     */
569    protected function doDelete() {
570        $dbw = $this->getPrimaryDB();
571        try {
572            $dbw->newDeleteQueryBuilder()
573                ->deleteFrom( 'job' )
574                ->where( [ 'job_cmd' => $this->type ] )
575                ->caller( __METHOD__ )->execute();
576        } catch ( DBError $e ) {
577            throw $this->getDBException( $e );
578        }
579
580        return true;
581    }
582
583    /**
584     * @see JobQueue::doWaitForBackups()
585     * @return void
586     */
587    protected function doWaitForBackups() {
588        if ( $this->server ) {
589            return; // not using LBFactory instance
590        }
591
592        $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
593        $lbFactory->waitForReplication();
594    }
595
596    /**
597     * @return void
598     */
599    protected function doFlushCaches() {
600        foreach ( [ 'size', 'acquiredcount' ] as $type ) {
601            $this->wanCache->delete( $this->getCacheKey( $type ) );
602        }
603    }
604
605    /**
606     * @see JobQueue::getAllQueuedJobs()
607     * @return \Iterator<RunnableJob>
608     */
609    public function getAllQueuedJobs() {
610        return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
611    }
612
613    /**
614     * @see JobQueue::getAllAcquiredJobs()
615     * @return \Iterator<RunnableJob>
616     */
617    public function getAllAcquiredJobs() {
618        $dbr = $this->getReplicaDB();
619        return $this->getJobIterator( [ 'job_cmd' => $this->getType(), $dbr->expr( 'job_token', '>', '' ) ] );
620    }
621
622    /**
623     * @see JobQueue::getAllAbandonedJobs()
624     * @return \Iterator<RunnableJob>
625     */
626    public function getAllAbandonedJobs() {
627        $dbr = $this->getReplicaDB();
628        return $this->getJobIterator( [
629            'job_cmd' => $this->getType(),
630            $dbr->expr( 'job_token', '>', '' ),
631            $dbr->expr( 'job_attempts', '>=', intval( $this->maxTries ) ),
632        ] );
633    }
634
635    /**
636     * @param array $conds Query conditions
637     * @return \Iterator<RunnableJob>
638     */
639    protected function getJobIterator( array $conds ) {
640        $dbr = $this->getReplicaDB();
641        $qb = $dbr->newSelectQueryBuilder()
642            ->select( self::selectFields() )
643            ->from( 'job' )
644            ->where( $conds );
645        try {
646            return new MappedIterator(
647                $qb->caller( __METHOD__ )->fetchResultSet(),
648                function ( $row ) {
649                    return $this->jobFromRow( $row );
650                }
651            );
652        } catch ( DBError $e ) {
653            throw $this->getDBException( $e );
654        }
655    }
656
657    public function getCoalesceLocationInternal() {
658        if ( $this->server ) {
659            return null; // not using the LBFactory instance
660        }
661
662        return is_string( $this->cluster )
663            ? "DBCluster:{$this->cluster}:{$this->domain}"
664            : "LBFactory:{$this->domain}";
665    }
666
667    protected function doGetSiblingQueuesWithJobs( array $types ) {
668        $dbr = $this->getReplicaDB();
669        // @note: this does not check whether the jobs are claimed or not.
670        // This is useful so JobQueueGroup::pop() also sees queues that only
671        // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
672        // failed jobs so that they can be popped again for that edge case.
673        $res = $dbr->newSelectQueryBuilder()
674            ->select( 'job_cmd' )
675            ->distinct()
676            ->from( 'job' )
677            ->where( [ 'job_cmd' => $types ] )
678            ->caller( __METHOD__ )->fetchResultSet();
679
680        $types = [];
681        foreach ( $res as $row ) {
682            $types[] = $row->job_cmd;
683        }
684
685        return $types;
686    }
687
688    protected function doGetSiblingQueueSizes( array $types ) {
689        $dbr = $this->getReplicaDB();
690
691        $res = $dbr->newSelectQueryBuilder()
692            ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] )
693            ->from( 'job' )
694            ->where( [ 'job_cmd' => $types ] )
695            ->groupBy( 'job_cmd' )
696            ->caller( __METHOD__ )->fetchResultSet();
697
698        $sizes = [];
699        foreach ( $res as $row ) {
700            $sizes[$row->job_cmd] = (int)$row->count;
701        }
702
703        return $sizes;
704    }
705
706    /**
707     * Recycle or destroy any jobs that have been claimed for too long
708     *
709     * @return int Number of jobs recycled/deleted
710     */
711    public function recycleAndDeleteStaleJobs() {
712        $now = time();
713        $count = 0; // affected rows
714        $dbw = $this->getPrimaryDB();
715
716        try {
717            if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
718                return $count; // already in progress
719            }
720
721            // Remove claims on jobs acquired for too long if enabled...
722            if ( $this->claimTTL > 0 ) {
723                $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
724                // Get the IDs of jobs that have be claimed but not finished after too long.
725                // These jobs can be recycled into the queue by expiring the claim. Selecting
726                // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
727                $res = $dbw->newSelectQueryBuilder()
728                    ->select( 'job_id' )
729                    ->from( 'job' )
730                    ->where(
731                        [
732                            'job_cmd' => $this->type,
733                            $dbw->expr( 'job_token', '!=', '' ), // was acquired
734                            $dbw->expr( 'job_token_timestamp', '<', $claimCutoff ), // stale
735                            $dbw->expr( 'job_attempts', '<', $this->maxTries ), // retries left
736                        ]
737                    )
738                    ->caller( __METHOD__ )->fetchResultSet();
739                $ids = array_map(
740                    static function ( $o ) {
741                        return $o->job_id;
742                    }, iterator_to_array( $res )
743                );
744                if ( count( $ids ) ) {
745                    // Reset job_token for these jobs so that other runners will pick them up.
746                    // Set the timestamp to the current time, as it is useful to now that the job
747                    // was already tried before (the timestamp becomes the "released" time).
748                    $dbw->newUpdateQueryBuilder()
749                        ->update( 'job' )
750                        ->set( [
751                            'job_token' => '',
752                            'job_token_timestamp' => $dbw->timestamp( $now ) // time of release
753                        ] )
754                        ->where( [
755                            'job_id' => $ids,
756                            $dbw->expr( 'job_token', '!=', '' ),
757                        ] )
758                        ->caller( __METHOD__ )->execute();
759
760                    $affected = $dbw->affectedRows();
761                    $count += $affected;
762                    $this->incrStats( 'recycles', $this->type, $affected );
763                }
764            }
765
766            // Just destroy any stale jobs...
767            $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
768            $qb = $dbw->newSelectQueryBuilder()
769                ->select( 'job_id' )
770                ->from( 'job' )
771                ->where(
772                    [
773                        'job_cmd' => $this->type,
774                        $dbw->expr( 'job_token', '!=', '' ), // was acquired
775                        $dbw->expr( 'job_token_timestamp', '<', $pruneCutoff ) // stale
776                    ]
777                );
778            if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
779                $qb->andWhere( $dbw->expr( 'job_attempts', '>=', $this->maxTries ) );
780            }
781            // Get the IDs of jobs that are considered stale and should be removed. Selecting
782            // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
783            $res = $qb->caller( __METHOD__ )->fetchResultSet();
784            $ids = array_map(
785                static function ( $o ) {
786                    return $o->job_id;
787                }, iterator_to_array( $res )
788            );
789            if ( count( $ids ) ) {
790                $dbw->newDeleteQueryBuilder()
791                    ->deleteFrom( 'job' )
792                    ->where( [ 'job_id' => $ids ] )
793                    ->caller( __METHOD__ )->execute();
794                $affected = $dbw->affectedRows();
795                $count += $affected;
796                $this->incrStats( 'abandons', $this->type, $affected );
797            }
798
799            $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
800        } catch ( DBError $e ) {
801            throw $this->getDBException( $e );
802        }
803
804        return $count;
805    }
806
807    /**
808     * @param IJobSpecification $job
809     * @param IReadableDatabase $db
810     * @return array
811     */
812    protected function insertFields( IJobSpecification $job, IReadableDatabase $db ) {
813        return [
814            // Fields that describe the nature of the job
815            'job_cmd' => $job->getType(),
816            'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
817            'job_title' => $job->getParams()['title'] ?? '',
818            'job_params' => self::makeBlob( $job->getParams() ),
819            // Additional job metadata
820            'job_timestamp' => $db->timestamp(),
821            'job_sha1' => \Wikimedia\base_convert(
822                sha1( serialize( $job->getDeduplicationInfo() ) ),
823                16, 36, 31
824            ),
825            'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
826        ];
827    }
828
829    /**
830     * @throws JobQueueConnectionError
831     * @return IDatabase
832     */
833    protected function getReplicaDB() {
834        try {
835            return $this->getDB( DB_REPLICA );
836        } catch ( DBConnectionError $e ) {
837            throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
838        }
839    }
840
841    /**
842     * @throws JobQueueConnectionError
843     * @return IMaintainableDatabase
844     * @since 1.37
845     */
846    protected function getPrimaryDB() {
847        try {
848            return $this->getDB( DB_PRIMARY );
849        } catch ( DBConnectionError $e ) {
850            throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
851        }
852    }
853
854    /**
855     * @param int $index (DB_REPLICA/DB_PRIMARY)
856     * @return IMaintainableDatabase
857     */
858    protected function getDB( $index ) {
859        if ( $this->server ) {
860            if ( $this->conn instanceof IDatabase ) {
861                return $this->conn;
862            } elseif ( $this->conn instanceof DBError ) {
863                throw $this->conn;
864            }
865
866            try {
867                $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create(
868                    $this->server['type'],
869                    $this->server
870                );
871            } catch ( DBError $e ) {
872                $this->conn = $e;
873                throw $e;
874            }
875
876            return $this->conn;
877        } else {
878            $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
879            $lb = is_string( $this->cluster )
880                ? $lbFactory->getExternalLB( $this->cluster )
881                : $lbFactory->getMainLB( $this->domain );
882
883            if ( $lb->getServerType( ServerInfo::WRITER_INDEX ) !== 'sqlite' ) {
884                // Keep a separate connection to avoid contention and deadlocks;
885                // However, SQLite has the opposite behavior due to DB-level locking.
886                $flags = $lb::CONN_TRX_AUTOCOMMIT;
887            } else {
888                // Jobs insertion will be deferred until the PRESEND stage to reduce contention.
889                $flags = 0;
890            }
891
892            return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
893        }
894    }
895
896    /**
897     * @param string $property
898     * @return string
899     */
900    private function getCacheKey( $property ) {
901        $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
902
903        return $this->wanCache->makeGlobalKey(
904            'jobqueue',
905            $this->domain,
906            $cluster,
907            $this->type,
908            $property
909        );
910    }
911
912    /**
913     * @param array|false $params
914     * @return string
915     */
916    protected static function makeBlob( $params ) {
917        if ( $params !== false ) {
918            return serialize( $params );
919        } else {
920            return '';
921        }
922    }
923
924    /**
925     * @param stdClass $row
926     * @return RunnableJob
927     */
928    protected function jobFromRow( $row ) {
929        $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
930        if ( !is_array( $params ) ) { // this shouldn't happen
931            throw new UnexpectedValueException(
932                "Could not unserialize job with ID '{$row->job_id}'." );
933        }
934
935        $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
936        $job = $this->factoryJob( $row->job_cmd, $params );
937        $job->setMetadata( 'id', $row->job_id );
938        $job->setMetadata( 'timestamp', $row->job_timestamp );
939
940        return $job;
941    }
942
943    /**
944     * @param DBError $e
945     * @return JobQueueError
946     */
947    protected function getDBException( DBError $e ) {
948        return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
949    }
950
951    /**
952     * Return the list of job fields that should be selected.
953     * @since 1.23
954     * @return array
955     */
956    public static function selectFields() {
957        return [
958            'job_id',
959            'job_cmd',
960            'job_namespace',
961            'job_title',
962            'job_timestamp',
963            'job_params',
964            'job_random',
965            'job_attempts',
966            'job_token',
967            'job_token_timestamp',
968            'job_sha1',
969        ];
970    }
971}
972
973/** @deprecated class alias since 1.44 */
974class_alias( JobQueueDB::class, 'JobQueueDB' );