Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 456
0.00% covered (danger)
0.00%
0 / 34
CRAP
0.00% covered (danger)
0.00%
0 / 1
JobQueueDB
0.00% covered (danger)
0.00%
0 / 456
0.00% covered (danger)
0.00%
0 / 34
9900
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 supportedOrders
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 optimalOrder
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doIsEmpty
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
6
 doGetSize
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
12
 doGetAcquiredCount
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
20
 doGetAbandonedCount
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
20
 doBatchPush
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
2
 doBatchPushInternal
0.00% covered (danger)
0.00%
0 / 39
0.00% covered (danger)
0.00%
0 / 1
110
 doPop
0.00% covered (danger)
0.00%
0 / 17
0.00% covered (danger)
0.00%
0 / 1
42
 claimRandom
0.00% covered (danger)
0.00%
0 / 57
0.00% covered (danger)
0.00%
0 / 1
90
 claimOldest
0.00% covered (danger)
0.00%
0 / 40
0.00% covered (danger)
0.00%
0 / 1
20
 doAck
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
12
 doDeduplicateRootJob
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
2
 doDelete
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 doWaitForBackups
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 doFlushCaches
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 getAllQueuedJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getAllAcquiredJobs
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 getAllAbandonedJobs
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
2
 getJobIterator
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
6
 getCoalesceLocationInternal
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 doGetSiblingQueuesWithJobs
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 doGetSiblingQueueSizes
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 recycleAndDeleteStaleJobs
0.00% covered (danger)
0.00%
0 / 70
0.00% covered (danger)
0.00%
0 / 1
56
 insertFields
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
2
 getReplicaDB
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getPrimaryDB
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getDB
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
56
 getCacheKey
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 makeBlob
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 jobFromRow
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 getDBException
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 selectFields
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20use MediaWiki\MediaWikiServices;
21use Wikimedia\Rdbms\DBConnectionError;
22use Wikimedia\Rdbms\DBError;
23use Wikimedia\Rdbms\IDatabase;
24use Wikimedia\Rdbms\IMaintainableDatabase;
25use Wikimedia\Rdbms\IReadableDatabase;
26use Wikimedia\Rdbms\RawSQLValue;
27use Wikimedia\Rdbms\SelectQueryBuilder;
28use Wikimedia\Rdbms\ServerInfo;
29
30/**
31 * Database-backed job queue storage.
32 *
33 * @since 1.21
34 * @ingroup JobQueue
35 */
36class JobQueueDB extends JobQueue {
37    /* seconds to cache info without re-validating */
38    private const CACHE_TTL_SHORT = 30;
39    /* seconds a job can live once claimed */
40    private const MAX_AGE_PRUNE = 7 * 24 * 3600;
41    /**
42     * Used for job_random, the highest safe 32-bit signed integer.
43     * Equivalent to `( 2 ** 31 ) - 1` on 64-bit.
44     */
45    private const MAX_JOB_RANDOM = 2_147_483_647;
46    /* maximum number of rows to skip */
47    private const MAX_OFFSET = 255;
48
49    /** @var IMaintainableDatabase|DBError|null */
50    protected $conn;
51
52    /** @var array|null Server configuration array */
53    protected $server;
54    /** @var string|null Name of an external DB cluster or null for the local DB cluster */
55    protected $cluster;
56
57    /**
58     * Additional parameters include:
59     *   - server  : Server configuration array for Database::factory. Overrides "cluster".
60     *   - cluster : The name of an external cluster registered via LBFactory.
61     *               If not specified, the primary DB cluster for the wiki will be used.
62     *               This can be overridden with a custom cluster so that DB handles will
63     *               be retrieved via LBFactory::getExternalLB() and getConnection().
64     * @param array $params
65     */
66    protected function __construct( array $params ) {
67        parent::__construct( $params );
68
69        if ( isset( $params['server'] ) ) {
70            $this->server = $params['server'];
71            // Always use autocommit mode, even if DBO_TRX is configured
72            $this->server['flags'] ??= 0;
73            $this->server['flags'] &= ~( IDatabase::DBO_TRX | IDatabase::DBO_DEFAULT );
74        } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
75            $this->cluster = $params['cluster'];
76        }
77    }
78
79    protected function supportedOrders() {
80        return [ 'random', 'timestamp', 'fifo' ];
81    }
82
83    protected function optimalOrder() {
84        return 'random';
85    }
86
87    /**
88     * @see JobQueue::doIsEmpty()
89     * @return bool
90     */
91    protected function doIsEmpty() {
92        $dbr = $this->getReplicaDB();
93        try {
94            // unclaimed job
95            $found = (bool)$dbr->newSelectQueryBuilder()
96                ->select( '1' )
97                ->from( 'job' )
98                ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
99                ->caller( __METHOD__ )->fetchField();
100        } catch ( DBError $e ) {
101            throw $this->getDBException( $e );
102        }
103
104        return !$found;
105    }
106
107    /**
108     * @see JobQueue::doGetSize()
109     * @return int
110     */
111    protected function doGetSize() {
112        $key = $this->getCacheKey( 'size' );
113
114        $size = $this->wanCache->get( $key );
115        if ( is_int( $size ) ) {
116            return $size;
117        }
118
119        $dbr = $this->getReplicaDB();
120        try {
121            $size = $dbr->newSelectQueryBuilder()
122                ->from( 'job' )
123                ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
124                ->caller( __METHOD__ )->fetchRowCount();
125        } catch ( DBError $e ) {
126            throw $this->getDBException( $e );
127        }
128        $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
129
130        return $size;
131    }
132
133    /**
134     * @see JobQueue::doGetAcquiredCount()
135     * @return int
136     */
137    protected function doGetAcquiredCount() {
138        if ( $this->claimTTL <= 0 ) {
139            return 0; // no acknowledgements
140        }
141
142        $key = $this->getCacheKey( 'acquiredcount' );
143
144        $count = $this->wanCache->get( $key );
145        if ( is_int( $count ) ) {
146            return $count;
147        }
148
149        $dbr = $this->getReplicaDB();
150        try {
151            $count = $dbr->newSelectQueryBuilder()
152                ->from( 'job' )
153                ->where( [
154                    'job_cmd' => $this->type,
155                    $dbr->expr( 'job_token', '!=', '' ),
156                ] )
157                ->caller( __METHOD__ )->fetchRowCount();
158        } catch ( DBError $e ) {
159            throw $this->getDBException( $e );
160        }
161        $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
162
163        return $count;
164    }
165
166    /**
167     * @see JobQueue::doGetAbandonedCount()
168     * @return int
169     * @throws JobQueueConnectionError
170     * @throws JobQueueError
171     */
172    protected function doGetAbandonedCount() {
173        if ( $this->claimTTL <= 0 ) {
174            return 0; // no acknowledgements
175        }
176
177        $key = $this->getCacheKey( 'abandonedcount' );
178
179        $count = $this->wanCache->get( $key );
180        if ( is_int( $count ) ) {
181            return $count;
182        }
183
184        $dbr = $this->getReplicaDB();
185        try {
186            $count = $dbr->newSelectQueryBuilder()
187                ->from( 'job' )
188                ->where(
189                    [
190                        'job_cmd' => $this->type,
191                        $dbr->expr( 'job_token', '!=', '' ),
192                        $dbr->expr( 'job_attempts', '>=', $this->maxTries ),
193                    ]
194                )
195                ->caller( __METHOD__ )->fetchRowCount();
196        } catch ( DBError $e ) {
197            throw $this->getDBException( $e );
198        }
199
200        $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
201
202        return $count;
203    }
204
205    /**
206     * @see JobQueue::doBatchPush()
207     * @param IJobSpecification[] $jobs
208     * @param int $flags
209     * @throws DBError|Exception
210     * @return void
211     */
212    protected function doBatchPush( array $jobs, $flags ) {
213        $dbw = $this->getPrimaryDB();
214        // In general, there will be two cases here:
215        // a) sqlite; DB connection is probably a regular round-aware handle.
216        // If the connection is busy with a transaction, then defer the job writes
217        // until right before the main round commit step. Any errors that bubble
218        // up will rollback the main commit round.
219        // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
220        // No transaction is active nor will be started by writes, so enqueue the jobs
221        // now so that any errors will show up immediately as the interface expects. Any
222        // errors that bubble up will rollback the main commit round.
223        $fname = __METHOD__;
224        $dbw->onTransactionPreCommitOrIdle(
225            function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
226                $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
227            },
228            $fname
229        );
230    }
231
232    /**
233     * This function should *not* be called outside of JobQueueDB
234     *
235     * @suppress SecurityCheck-SQLInjection Bug in phan-taint-check handling bulk inserts
236     * @param IDatabase $dbw
237     * @param IJobSpecification[] $jobs
238     * @param int $flags
239     * @param string $method
240     * @throws DBError
241     * @return void
242     */
243    public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
244        if ( $jobs === [] ) {
245            return;
246        }
247
248        $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
249        $rowList = []; // list of jobs for jobs that are not de-duplicated
250        foreach ( $jobs as $job ) {
251            $row = $this->insertFields( $job, $dbw );
252            if ( $job->ignoreDuplicates() ) {
253                $rowSet[$row['job_sha1']] = $row;
254            } else {
255                $rowList[] = $row;
256            }
257        }
258
259        if ( $flags & self::QOS_ATOMIC ) {
260            $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
261        }
262        try {
263            // Strip out any duplicate jobs that are already in the queue...
264            if ( count( $rowSet ) ) {
265                $res = $dbw->newSelectQueryBuilder()
266                    ->select( 'job_sha1' )
267                    ->from( 'job' )
268                    ->where(
269                        [
270                            // No job_type condition since it's part of the job_sha1 hash
271                            'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ),
272                            'job_token' => '' // unclaimed
273                        ]
274                    )
275                    ->caller( $method )->fetchResultSet();
276                foreach ( $res as $row ) {
277                    wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
278                    unset( $rowSet[$row->job_sha1] ); // already enqueued
279                }
280            }
281            // Build the full list of job rows to insert
282            $rows = array_merge( $rowList, array_values( $rowSet ) );
283            // Insert the job rows in chunks to avoid replica DB lag...
284            foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
285                $dbw->newInsertQueryBuilder()
286                    ->insertInto( 'job' )
287                    ->rows( $rowBatch )
288                    ->caller( $method )->execute();
289            }
290            $this->incrStats( 'inserts', $this->type, count( $rows ) );
291            $this->incrStats( 'dupe_inserts', $this->type,
292                count( $rowSet ) + count( $rowList ) - count( $rows )
293            );
294        } catch ( DBError $e ) {
295            throw $this->getDBException( $e );
296        }
297        if ( $flags & self::QOS_ATOMIC ) {
298            $dbw->endAtomic( $method );
299        }
300    }
301
302    /**
303     * @see JobQueue::doPop()
304     * @return RunnableJob|false
305     */
306    protected function doPop() {
307        $job = false; // job popped off
308        try {
309            $uuid = wfRandomString( 32 ); // pop attempt
310            do { // retry when our row is invalid or deleted as a duplicate
311                // Try to reserve a row in the DB...
312                if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
313                    $row = $this->claimOldest( $uuid );
314                } else { // random first
315                    $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
316                    $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
317                    $row = $this->claimRandom( $uuid, $rand, $gte );
318                }
319                // Check if we found a row to reserve...
320                if ( !$row ) {
321                    break; // nothing to do
322                }
323                $this->incrStats( 'pops', $this->type );
324
325                // Get the job object from the row...
326                $job = $this->jobFromRow( $row );
327                break; // done
328            } while ( true );
329
330            if ( !$job || mt_rand( 0, 9 ) == 0 ) {
331                // Handled jobs that need to be recycled/deleted;
332                // any recycled jobs will be picked up next attempt
333                $this->recycleAndDeleteStaleJobs();
334            }
335        } catch ( DBError $e ) {
336            throw $this->getDBException( $e );
337        }
338
339        return $job;
340    }
341
342    /**
343     * Reserve a row with a single UPDATE without holding row locks over RTTs...
344     *
345     * @param string $uuid 32 char hex string
346     * @param int $rand Random unsigned integer (31 bits)
347     * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
348     * @return stdClass|false Row|false
349     */
350    protected function claimRandom( $uuid, $rand, $gte ) {
351        $dbw = $this->getPrimaryDB();
352        // Check cache to see if the queue has <= OFFSET items
353        $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
354
355        $invertedDirection = false; // whether one job_random direction was already scanned
356        // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
357        // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
358        // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
359        // be used here with MySQL.
360        do {
361            if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
362                // For small queues, using OFFSET will overshoot and return no rows more often.
363                // Instead, this uses job_random to pick a row (possibly checking both directions).
364                $row = $dbw->newSelectQueryBuilder()
365                    ->select( self::selectFields() )
366                    ->from( 'job' )
367                    ->where(
368                        [
369                            'job_cmd' => $this->type,
370                            'job_token' => '', // unclaimed
371                            $dbw->expr( 'job_random', $gte ? '>=' : '<=', $rand )
372                        ]
373                    )
374                    ->orderBy(
375                        'job_random',
376                        $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
377                    )
378                    ->caller( __METHOD__ )->fetchRow();
379                if ( !$row && !$invertedDirection ) {
380                    $gte = !$gte;
381                    $invertedDirection = true;
382                    continue; // try the other direction
383                }
384            } else { // table *may* have >= MAX_OFFSET rows
385                // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
386                // in MySQL if there are many rows for some reason. This uses a small OFFSET
387                // instead of job_random for reducing excess claim retries.
388                $row = $dbw->newSelectQueryBuilder()
389                    ->select( self::selectFields() )
390                    ->from( 'job' )
391                    ->where(
392                        [
393                            'job_cmd' => $this->type,
394                            'job_token' => '', // unclaimed
395                        ]
396                    )
397                    ->offset( mt_rand( 0, self::MAX_OFFSET ) )
398                    ->caller( __METHOD__ )->fetchRow();
399                if ( !$row ) {
400                    $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
401                    $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
402                    continue; // use job_random
403                }
404            }
405
406            if ( !$row ) {
407                break;
408            }
409
410            $dbw->newUpdateQueryBuilder()
411                ->update( 'job' ) // update by PK
412                ->set( [
413                    'job_token' => $uuid,
414                    'job_token_timestamp' => $dbw->timestamp(),
415                    'job_attempts' => new RawSQLValue( 'job_attempts+1' ),
416                ] )
417                ->where( [
418                    'job_cmd' => $this->type,
419                    'job_id' => $row->job_id,
420                    'job_token' => ''
421                ] )
422                ->caller( __METHOD__ )->execute();
423
424            // This might get raced out by another runner when claiming the previously
425            // selected row. The use of job_random should minimize this problem, however.
426            if ( !$dbw->affectedRows() ) {
427                $row = false; // raced out
428            }
429        } while ( !$row );
430
431        return $row;
432    }
433
434    /**
435     * Reserve a row with a single UPDATE without holding row locks over RTTs...
436     *
437     * @param string $uuid 32 char hex string
438     * @return stdClass|false Row|false
439     */
440    protected function claimOldest( $uuid ) {
441        $dbw = $this->getPrimaryDB();
442
443        $row = false; // the row acquired
444        do {
445            if ( $dbw->getType() === 'mysql' ) {
446                // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
447                // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
448                // Postgres has no such limitation. However, MySQL offers an
449                // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
450                $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
451                    "SET " .
452                        "job_token = {$dbw->addQuotes( $uuid ) }" .
453                        "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}" .
454                        "job_attempts = job_attempts+1 " .
455                    "WHERE ( " .
456                        "job_cmd = {$dbw->addQuotes( $this->type )} " .
457                        "AND job_token = {$dbw->addQuotes( '' )} " .
458                    ") ORDER BY job_id ASC LIMIT 1",
459                    __METHOD__
460                );
461            } else {
462                // Use a subquery to find the job, within an UPDATE to claim it.
463                // This uses as much of the DB wrapper functions as possible.
464                $qb = $dbw->newSelectQueryBuilder()
465                    ->select( 'job_id' )
466                    ->from( 'job' )
467                    ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
468                    ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC )
469                    ->limit( 1 );
470
471                $dbw->newUpdateQueryBuilder()
472                    ->update( 'job' )
473                    ->set( [
474                        'job_token' => $uuid,
475                        'job_token_timestamp' => $dbw->timestamp(),
476                        'job_attempts' => new RawSQLValue( 'job_attempts+1' ),
477                    ] )
478                    ->where( [ 'job_id' => new RawSQLValue( '(' . $qb->getSQL() . ')' ) ] )
479                    ->caller( __METHOD__ )->execute();
480            }
481
482            if ( !$dbw->affectedRows() ) {
483                break;
484            }
485
486            // Fetch any row that we just reserved...
487            $row = $dbw->newSelectQueryBuilder()
488                ->select( self::selectFields() )
489                ->from( 'job' )
490                ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] )
491                ->caller( __METHOD__ )->fetchRow();
492            if ( !$row ) { // raced out by duplicate job removal
493                wfDebug( "Row deleted as duplicate by another process." );
494            }
495        } while ( !$row );
496
497        return $row;
498    }
499
500    /**
501     * @see JobQueue::doAck()
502     * @param RunnableJob $job
503     * @throws JobQueueConnectionError
504     * @throws JobQueueError
505     */
506    protected function doAck( RunnableJob $job ) {
507        $id = $job->getMetadata( 'id' );
508        if ( $id === null ) {
509            throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no ID." );
510        }
511
512        $dbw = $this->getPrimaryDB();
513        try {
514            // Delete a row with a single DELETE without holding row locks over RTTs...
515            $dbw->newDeleteQueryBuilder()
516                ->deleteFrom( 'job' )
517                ->where( [ 'job_cmd' => $this->type, 'job_id' => $id ] )
518                ->caller( __METHOD__ )->execute();
519
520            $this->incrStats( 'acks', $this->type );
521        } catch ( DBError $e ) {
522            throw $this->getDBException( $e );
523        }
524    }
525
526    /**
527     * @see JobQueue::doDeduplicateRootJob()
528     * @param IJobSpecification $job
529     * @throws JobQueueConnectionError
530     * @return bool
531     */
532    protected function doDeduplicateRootJob( IJobSpecification $job ) {
533        // Callers should call JobQueueGroup::push() before this method so that if the
534        // insert fails, the de-duplication registration will be aborted. Since the insert
535        // is deferred till "transaction idle", do the same here, so that the ordering is
536        // maintained. Having only the de-duplication registration succeed would cause
537        // jobs to become no-ops without any actual jobs that made them redundant.
538        $dbw = $this->getPrimaryDB();
539        $dbw->onTransactionCommitOrIdle(
540            function () use ( $job ) {
541                parent::doDeduplicateRootJob( $job );
542            },
543            __METHOD__
544        );
545
546        return true;
547    }
548
549    /**
550     * @see JobQueue::doDelete()
551     * @return bool
552     */
553    protected function doDelete() {
554        $dbw = $this->getPrimaryDB();
555        try {
556            $dbw->newDeleteQueryBuilder()
557                ->deleteFrom( 'job' )
558                ->where( [ 'job_cmd' => $this->type ] )
559                ->caller( __METHOD__ )->execute();
560        } catch ( DBError $e ) {
561            throw $this->getDBException( $e );
562        }
563
564        return true;
565    }
566
567    /**
568     * @see JobQueue::doWaitForBackups()
569     * @return void
570     */
571    protected function doWaitForBackups() {
572        if ( $this->server ) {
573            return; // not using LBFactory instance
574        }
575
576        $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
577        $lbFactory->waitForReplication();
578    }
579
580    /**
581     * @return void
582     */
583    protected function doFlushCaches() {
584        foreach ( [ 'size', 'acquiredcount' ] as $type ) {
585            $this->wanCache->delete( $this->getCacheKey( $type ) );
586        }
587    }
588
589    /**
590     * @see JobQueue::getAllQueuedJobs()
591     * @return Iterator<RunnableJob>
592     */
593    public function getAllQueuedJobs() {
594        return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
595    }
596
597    /**
598     * @see JobQueue::getAllAcquiredJobs()
599     * @return Iterator<RunnableJob>
600     */
601    public function getAllAcquiredJobs() {
602        $dbr = $this->getReplicaDB();
603        return $this->getJobIterator( [ 'job_cmd' => $this->getType(), $dbr->expr( 'job_token', '>', '' ) ] );
604    }
605
606    /**
607     * @see JobQueue::getAllAbandonedJobs()
608     * @return Iterator<RunnableJob>
609     */
610    public function getAllAbandonedJobs() {
611        $dbr = $this->getReplicaDB();
612        return $this->getJobIterator( [
613            'job_cmd' => $this->getType(),
614            $dbr->expr( 'job_token', '>', '' ),
615            $dbr->expr( 'job_attempts', '>=', intval( $this->maxTries ) ),
616        ] );
617    }
618
619    /**
620     * @param array $conds Query conditions
621     * @return Iterator<RunnableJob>
622     */
623    protected function getJobIterator( array $conds ) {
624        $dbr = $this->getReplicaDB();
625        $qb = $dbr->newSelectQueryBuilder()
626            ->select( self::selectFields() )
627            ->from( 'job' )
628            ->where( $conds );
629        try {
630            return new MappedIterator(
631                $qb->caller( __METHOD__ )->fetchResultSet(),
632                function ( $row ) {
633                    return $this->jobFromRow( $row );
634                }
635            );
636        } catch ( DBError $e ) {
637            throw $this->getDBException( $e );
638        }
639    }
640
641    public function getCoalesceLocationInternal() {
642        if ( $this->server ) {
643            return null; // not using the LBFactory instance
644        }
645
646        return is_string( $this->cluster )
647            ? "DBCluster:{$this->cluster}:{$this->domain}"
648            : "LBFactory:{$this->domain}";
649    }
650
651    protected function doGetSiblingQueuesWithJobs( array $types ) {
652        $dbr = $this->getReplicaDB();
653        // @note: this does not check whether the jobs are claimed or not.
654        // This is useful so JobQueueGroup::pop() also sees queues that only
655        // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
656        // failed jobs so that they can be popped again for that edge case.
657        $res = $dbr->newSelectQueryBuilder()
658            ->select( 'job_cmd' )
659            ->distinct()
660            ->from( 'job' )
661            ->where( [ 'job_cmd' => $types ] )
662            ->caller( __METHOD__ )->fetchResultSet();
663
664        $types = [];
665        foreach ( $res as $row ) {
666            $types[] = $row->job_cmd;
667        }
668
669        return $types;
670    }
671
672    protected function doGetSiblingQueueSizes( array $types ) {
673        $dbr = $this->getReplicaDB();
674
675        $res = $dbr->newSelectQueryBuilder()
676            ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] )
677            ->from( 'job' )
678            ->where( [ 'job_cmd' => $types ] )
679            ->groupBy( 'job_cmd' )
680            ->caller( __METHOD__ )->fetchResultSet();
681
682        $sizes = [];
683        foreach ( $res as $row ) {
684            $sizes[$row->job_cmd] = (int)$row->count;
685        }
686
687        return $sizes;
688    }
689
690    /**
691     * Recycle or destroy any jobs that have been claimed for too long
692     *
693     * @return int Number of jobs recycled/deleted
694     */
695    public function recycleAndDeleteStaleJobs() {
696        $now = time();
697        $count = 0; // affected rows
698        $dbw = $this->getPrimaryDB();
699
700        try {
701            if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
702                return $count; // already in progress
703            }
704
705            // Remove claims on jobs acquired for too long if enabled...
706            if ( $this->claimTTL > 0 ) {
707                $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
708                // Get the IDs of jobs that have be claimed but not finished after too long.
709                // These jobs can be recycled into the queue by expiring the claim. Selecting
710                // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
711                $res = $dbw->newSelectQueryBuilder()
712                    ->select( 'job_id' )
713                    ->from( 'job' )
714                    ->where(
715                        [
716                            'job_cmd' => $this->type,
717                            $dbw->expr( 'job_token', '!=', '' ), // was acquired
718                            $dbw->expr( 'job_token_timestamp', '<', $claimCutoff ), // stale
719                            $dbw->expr( 'job_attempts', '<', $this->maxTries ), // retries left
720                        ]
721                    )
722                    ->caller( __METHOD__ )->fetchResultSet();
723                $ids = array_map(
724                    static function ( $o ) {
725                        return $o->job_id;
726                    }, iterator_to_array( $res )
727                );
728                if ( count( $ids ) ) {
729                    // Reset job_token for these jobs so that other runners will pick them up.
730                    // Set the timestamp to the current time, as it is useful to now that the job
731                    // was already tried before (the timestamp becomes the "released" time).
732                    $dbw->newUpdateQueryBuilder()
733                        ->update( 'job' )
734                        ->set( [
735                            'job_token' => '',
736                            'job_token_timestamp' => $dbw->timestamp( $now ) // time of release
737                        ] )
738                        ->where( [
739                            'job_id' => $ids,
740                            $dbw->expr( 'job_token', '!=', '' ),
741                        ] )
742                        ->caller( __METHOD__ )->execute();
743
744                    $affected = $dbw->affectedRows();
745                    $count += $affected;
746                    $this->incrStats( 'recycles', $this->type, $affected );
747                }
748            }
749
750            // Just destroy any stale jobs...
751            $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
752            $qb = $dbw->newSelectQueryBuilder()
753                ->select( 'job_id' )
754                ->from( 'job' )
755                ->where(
756                    [
757                        'job_cmd' => $this->type,
758                        $dbw->expr( 'job_token', '!=', '' ), // was acquired
759                        $dbw->expr( 'job_token_timestamp', '<', $pruneCutoff ) // stale
760                    ]
761                );
762            if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
763                $qb->andWhere( $dbw->expr( 'job_attempts', '>=', $this->maxTries ) );
764            }
765            // Get the IDs of jobs that are considered stale and should be removed. Selecting
766            // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
767            $res = $qb->caller( __METHOD__ )->fetchResultSet();
768            $ids = array_map(
769                static function ( $o ) {
770                    return $o->job_id;
771                }, iterator_to_array( $res )
772            );
773            if ( count( $ids ) ) {
774                $dbw->newDeleteQueryBuilder()
775                    ->deleteFrom( 'job' )
776                    ->where( [ 'job_id' => $ids ] )
777                    ->caller( __METHOD__ )->execute();
778                $affected = $dbw->affectedRows();
779                $count += $affected;
780                $this->incrStats( 'abandons', $this->type, $affected );
781            }
782
783            $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
784        } catch ( DBError $e ) {
785            throw $this->getDBException( $e );
786        }
787
788        return $count;
789    }
790
791    /**
792     * @param IJobSpecification $job
793     * @param IReadableDatabase $db
794     * @return array
795     */
796    protected function insertFields( IJobSpecification $job, IReadableDatabase $db ) {
797        return [
798            // Fields that describe the nature of the job
799            'job_cmd' => $job->getType(),
800            'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
801            'job_title' => $job->getParams()['title'] ?? '',
802            'job_params' => self::makeBlob( $job->getParams() ),
803            // Additional job metadata
804            'job_timestamp' => $db->timestamp(),
805            'job_sha1' => Wikimedia\base_convert(
806                sha1( serialize( $job->getDeduplicationInfo() ) ),
807                16, 36, 31
808            ),
809            'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
810        ];
811    }
812
813    /**
814     * @throws JobQueueConnectionError
815     * @return IDatabase
816     */
817    protected function getReplicaDB() {
818        try {
819            return $this->getDB( DB_REPLICA );
820        } catch ( DBConnectionError $e ) {
821            throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
822        }
823    }
824
825    /**
826     * @throws JobQueueConnectionError
827     * @return IMaintainableDatabase
828     * @since 1.37
829     */
830    protected function getPrimaryDB() {
831        try {
832            return $this->getDB( DB_PRIMARY );
833        } catch ( DBConnectionError $e ) {
834            throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
835        }
836    }
837
838    /**
839     * @param int $index (DB_REPLICA/DB_PRIMARY)
840     * @return IMaintainableDatabase
841     */
842    protected function getDB( $index ) {
843        if ( $this->server ) {
844            if ( $this->conn instanceof IDatabase ) {
845                return $this->conn;
846            } elseif ( $this->conn instanceof DBError ) {
847                throw $this->conn;
848            }
849
850            try {
851                $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create(
852                    $this->server['type'],
853                    $this->server
854                );
855            } catch ( DBError $e ) {
856                $this->conn = $e;
857                throw $e;
858            }
859
860            return $this->conn;
861        } else {
862            $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
863            $lb = is_string( $this->cluster )
864                ? $lbFactory->getExternalLB( $this->cluster )
865                : $lbFactory->getMainLB( $this->domain );
866
867            if ( $lb->getServerType( ServerInfo::WRITER_INDEX ) !== 'sqlite' ) {
868                // Keep a separate connection to avoid contention and deadlocks;
869                // However, SQLite has the opposite behavior due to DB-level locking.
870                $flags = $lb::CONN_TRX_AUTOCOMMIT;
871            } else {
872                // Jobs insertion will be deferred until the PRESEND stage to reduce contention.
873                $flags = 0;
874            }
875
876            return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
877        }
878    }
879
880    /**
881     * @param string $property
882     * @return string
883     */
884    private function getCacheKey( $property ) {
885        $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
886
887        return $this->wanCache->makeGlobalKey(
888            'jobqueue',
889            $this->domain,
890            $cluster,
891            $this->type,
892            $property
893        );
894    }
895
896    /**
897     * @param array|false $params
898     * @return string
899     */
900    protected static function makeBlob( $params ) {
901        if ( $params !== false ) {
902            return serialize( $params );
903        } else {
904            return '';
905        }
906    }
907
908    /**
909     * @param stdClass $row
910     * @return RunnableJob
911     */
912    protected function jobFromRow( $row ) {
913        $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
914        if ( !is_array( $params ) ) { // this shouldn't happen
915            throw new UnexpectedValueException(
916                "Could not unserialize job with ID '{$row->job_id}'." );
917        }
918
919        $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
920        $job = $this->factoryJob( $row->job_cmd, $params );
921        $job->setMetadata( 'id', $row->job_id );
922        $job->setMetadata( 'timestamp', $row->job_timestamp );
923
924        return $job;
925    }
926
927    /**
928     * @param DBError $e
929     * @return JobQueueError
930     */
931    protected function getDBException( DBError $e ) {
932        return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
933    }
934
935    /**
936     * Return the list of job fields that should be selected.
937     * @since 1.23
938     * @return array
939     */
940    public static function selectFields() {
941        return [
942            'job_id',
943            'job_cmd',
944            'job_namespace',
945            'job_title',
946            'job_timestamp',
947            'job_params',
948            'job_random',
949            'job_attempts',
950            'job_token',
951            'job_token_timestamp',
952            'job_sha1',
953        ];
954    }
955}