Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 474
0.00% covered (danger)
0.00%
0 / 35
CRAP
0.00% covered (danger)
0.00%
0 / 1
JobQueueDB
0.00% covered (danger)
0.00%
0 / 474
0.00% covered (danger)
0.00%
0 / 35
10302
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
20
 supportedOrders
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 optimalOrder
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doIsEmpty
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
6
 doGetSize
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
12
 doGetAcquiredCount
0.00% covered (danger)
0.00%
0 / 19
0.00% covered (danger)
0.00%
0 / 1
20
 doGetAbandonedCount
0.00% covered (danger)
0.00%
0 / 22
0.00% covered (danger)
0.00%
0 / 1
20
 doBatchPush
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
2
 doBatchPushInternal
0.00% covered (danger)
0.00%
0 / 39
0.00% covered (danger)
0.00%
0 / 1
110
 doPop
0.00% covered (danger)
0.00%
0 / 19
0.00% covered (danger)
0.00%
0 / 1
42
 claimRandom
0.00% covered (danger)
0.00%
0 / 58
0.00% covered (danger)
0.00%
0 / 1
90
 claimOldest
0.00% covered (danger)
0.00%
0 / 41
0.00% covered (danger)
0.00%
0 / 1
20
 doAck
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
12
 doDeduplicateRootJob
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
2
 doDelete
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
6
 doWaitForBackups
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 doFlushCaches
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 getAllQueuedJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getAllAcquiredJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getAllAbandonedJobs
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 getJobIterator
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
6
 getCoalesceLocationInternal
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 doGetSiblingQueuesWithJobs
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
6
 doGetSiblingQueueSizes
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
6
 recycleAndDeleteStaleJobs
0.00% covered (danger)
0.00%
0 / 71
0.00% covered (danger)
0.00%
0 / 1
56
 insertFields
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
2
 getReplicaDB
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getPrimaryDB
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getDB
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
56
 getScopedNoTrxFlag
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 getCacheKey
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 makeBlob
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 jobFromRow
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 getDBException
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 selectFields
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20use MediaWiki\MediaWikiServices;
21use Wikimedia\Rdbms\DBConnectionError;
22use Wikimedia\Rdbms\DBError;
23use Wikimedia\Rdbms\IDatabase;
24use Wikimedia\Rdbms\IMaintainableDatabase;
25use Wikimedia\Rdbms\SelectQueryBuilder;
26use Wikimedia\ScopedCallback;
27
28/**
29 * Database-backed job queue storage.
30 *
31 * @since 1.21
32 * @ingroup JobQueue
33 */
34class JobQueueDB extends JobQueue {
35    /* seconds to cache info without re-validating */
36    private const CACHE_TTL_SHORT = 30;
37    /* seconds a job can live once claimed */
38    private const MAX_AGE_PRUNE = 7 * 24 * 3600;
39    /**
40     * Used for job_random, the highest safe 32-bit signed integer.
41     * Equivalent to `( 2 ** 31 ) - 1` on 64-bit.
42     */
43    private const MAX_JOB_RANDOM = 2_147_483_647;
44    /* maximum number of rows to skip */
45    private const MAX_OFFSET = 255;
46
47    /** @var IMaintainableDatabase|DBError|null */
48    protected $conn;
49
50    /** @var array|null Server configuration array */
51    protected $server;
52    /** @var string|null Name of an external DB cluster or null for the local DB cluster */
53    protected $cluster;
54
55    /**
56     * Additional parameters include:
57     *   - server  : Server configuration array for Database::factory. Overrides "cluster".
58     *   - cluster : The name of an external cluster registered via LBFactory.
59     *               If not specified, the primary DB cluster for the wiki will be used.
60     *               This can be overridden with a custom cluster so that DB handles will
61     *               be retrieved via LBFactory::getExternalLB() and getConnection().
62     * @param array $params
63     */
64    protected function __construct( array $params ) {
65        parent::__construct( $params );
66
67        if ( isset( $params['server'] ) ) {
68            $this->server = $params['server'];
69        } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
70            $this->cluster = $params['cluster'];
71        }
72    }
73
74    protected function supportedOrders() {
75        return [ 'random', 'timestamp', 'fifo' ];
76    }
77
78    protected function optimalOrder() {
79        return 'random';
80    }
81
82    /**
83     * @see JobQueue::doIsEmpty()
84     * @return bool
85     */
86    protected function doIsEmpty() {
87        $dbr = $this->getReplicaDB();
88        /** @noinspection PhpUnusedLocalVariableInspection */
89        $scope = $this->getScopedNoTrxFlag( $dbr );
90        try {
91            // unclaimed job
92            $found = (bool)$dbr->newSelectQueryBuilder()
93                ->select( '1' )
94                ->from( 'job' )
95                ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
96                ->caller( __METHOD__ )->fetchField();
97        } catch ( DBError $e ) {
98            throw $this->getDBException( $e );
99        }
100
101        return !$found;
102    }
103
104    /**
105     * @see JobQueue::doGetSize()
106     * @return int
107     */
108    protected function doGetSize() {
109        $key = $this->getCacheKey( 'size' );
110
111        $size = $this->wanCache->get( $key );
112        if ( is_int( $size ) ) {
113            return $size;
114        }
115
116        $dbr = $this->getReplicaDB();
117        /** @noinspection PhpUnusedLocalVariableInspection */
118        $scope = $this->getScopedNoTrxFlag( $dbr );
119        try {
120            $size = $dbr->newSelectQueryBuilder()
121                ->from( 'job' )
122                ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
123                ->caller( __METHOD__ )->fetchRowCount();
124        } catch ( DBError $e ) {
125            throw $this->getDBException( $e );
126        }
127        $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
128
129        return $size;
130    }
131
132    /**
133     * @see JobQueue::doGetAcquiredCount()
134     * @return int
135     */
136    protected function doGetAcquiredCount() {
137        if ( $this->claimTTL <= 0 ) {
138            return 0; // no acknowledgements
139        }
140
141        $key = $this->getCacheKey( 'acquiredcount' );
142
143        $count = $this->wanCache->get( $key );
144        if ( is_int( $count ) ) {
145            return $count;
146        }
147
148        $dbr = $this->getReplicaDB();
149        /** @noinspection PhpUnusedLocalVariableInspection */
150        $scope = $this->getScopedNoTrxFlag( $dbr );
151        try {
152            $count = $dbr->newSelectQueryBuilder()
153                ->from( 'job' )
154                ->where( [
155                    'job_cmd' => $this->type,
156                    $dbr->expr( 'job_token', '!=', '' ),
157                ] )
158                ->caller( __METHOD__ )->fetchRowCount();
159        } catch ( DBError $e ) {
160            throw $this->getDBException( $e );
161        }
162        $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
163
164        return $count;
165    }
166
167    /**
168     * @see JobQueue::doGetAbandonedCount()
169     * @return int
170     * @throws JobQueueConnectionError
171     * @throws JobQueueError
172     */
173    protected function doGetAbandonedCount() {
174        if ( $this->claimTTL <= 0 ) {
175            return 0; // no acknowledgements
176        }
177
178        $key = $this->getCacheKey( 'abandonedcount' );
179
180        $count = $this->wanCache->get( $key );
181        if ( is_int( $count ) ) {
182            return $count;
183        }
184
185        $dbr = $this->getReplicaDB();
186        /** @noinspection PhpUnusedLocalVariableInspection */
187        $scope = $this->getScopedNoTrxFlag( $dbr );
188        try {
189            $count = $dbr->newSelectQueryBuilder()
190                ->from( 'job' )
191                ->where(
192                    [
193                        'job_cmd' => $this->type,
194                        $dbr->expr( 'job_token', '!=', '' ),
195                        $dbr->expr( 'job_attempts', '>=', $this->maxTries ),
196                    ]
197                )
198                ->caller( __METHOD__ )->fetchRowCount();
199        } catch ( DBError $e ) {
200            throw $this->getDBException( $e );
201        }
202
203        $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
204
205        return $count;
206    }
207
208    /**
209     * @see JobQueue::doBatchPush()
210     * @param IJobSpecification[] $jobs
211     * @param int $flags
212     * @throws DBError|Exception
213     * @return void
214     */
215    protected function doBatchPush( array $jobs, $flags ) {
216        $dbw = $this->getPrimaryDB();
217        /** @noinspection PhpUnusedLocalVariableInspection */
218        $scope = $this->getScopedNoTrxFlag( $dbw );
219        // In general, there will be two cases here:
220        // a) sqlite; DB connection is probably a regular round-aware handle.
221        // If the connection is busy with a transaction, then defer the job writes
222        // until right before the main round commit step. Any errors that bubble
223        // up will rollback the main commit round.
224        // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
225        // No transaction is active nor will be started by writes, so enqueue the jobs
226        // now so that any errors will show up immediately as the interface expects. Any
227        // errors that bubble up will rollback the main commit round.
228        $fname = __METHOD__;
229        $dbw->onTransactionPreCommitOrIdle(
230            function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
231                $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
232            },
233            $fname
234        );
235    }
236
237    /**
238     * This function should *not* be called outside of JobQueueDB
239     *
240     * @suppress SecurityCheck-SQLInjection Bug in phan-taint-check handling bulk inserts
241     * @param IDatabase $dbw
242     * @param IJobSpecification[] $jobs
243     * @param int $flags
244     * @param string $method
245     * @throws DBError
246     * @return void
247     */
248    public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
249        if ( $jobs === [] ) {
250            return;
251        }
252
253        $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
254        $rowList = []; // list of jobs for jobs that are not de-duplicated
255        foreach ( $jobs as $job ) {
256            $row = $this->insertFields( $job, $dbw );
257            if ( $job->ignoreDuplicates() ) {
258                $rowSet[$row['job_sha1']] = $row;
259            } else {
260                $rowList[] = $row;
261            }
262        }
263
264        if ( $flags & self::QOS_ATOMIC ) {
265            $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
266        }
267        try {
268            // Strip out any duplicate jobs that are already in the queue...
269            if ( count( $rowSet ) ) {
270                $res = $dbw->newSelectQueryBuilder()
271                    ->select( 'job_sha1' )
272                    ->from( 'job' )
273                    ->where(
274                        [
275                            // No job_type condition since it's part of the job_sha1 hash
276                            'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ),
277                            'job_token' => '' // unclaimed
278                        ]
279                    )
280                    ->caller( $method )->fetchResultSet();
281                foreach ( $res as $row ) {
282                    wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
283                    unset( $rowSet[$row->job_sha1] ); // already enqueued
284                }
285            }
286            // Build the full list of job rows to insert
287            $rows = array_merge( $rowList, array_values( $rowSet ) );
288            // Insert the job rows in chunks to avoid replica DB lag...
289            foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
290                $dbw->newInsertQueryBuilder()
291                    ->insertInto( 'job' )
292                    ->rows( $rowBatch )
293                    ->caller( $method )->execute();
294            }
295            $this->incrStats( 'inserts', $this->type, count( $rows ) );
296            $this->incrStats( 'dupe_inserts', $this->type,
297                count( $rowSet ) + count( $rowList ) - count( $rows )
298            );
299        } catch ( DBError $e ) {
300            throw $this->getDBException( $e );
301        }
302        if ( $flags & self::QOS_ATOMIC ) {
303            $dbw->endAtomic( $method );
304        }
305    }
306
307    /**
308     * @see JobQueue::doPop()
309     * @return RunnableJob|false
310     */
311    protected function doPop() {
312        $dbw = $this->getPrimaryDB();
313        /** @noinspection PhpUnusedLocalVariableInspection */
314        $scope = $this->getScopedNoTrxFlag( $dbw );
315
316        $job = false; // job popped off
317        try {
318            $uuid = wfRandomString( 32 ); // pop attempt
319            do { // retry when our row is invalid or deleted as a duplicate
320                // Try to reserve a row in the DB...
321                if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
322                    $row = $this->claimOldest( $uuid );
323                } else { // random first
324                    $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
325                    $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
326                    $row = $this->claimRandom( $uuid, $rand, $gte );
327                }
328                // Check if we found a row to reserve...
329                if ( !$row ) {
330                    break; // nothing to do
331                }
332                $this->incrStats( 'pops', $this->type );
333
334                // Get the job object from the row...
335                $job = $this->jobFromRow( $row );
336                break; // done
337            } while ( true );
338
339            if ( !$job || mt_rand( 0, 9 ) == 0 ) {
340                // Handled jobs that need to be recycled/deleted;
341                // any recycled jobs will be picked up next attempt
342                $this->recycleAndDeleteStaleJobs();
343            }
344        } catch ( DBError $e ) {
345            throw $this->getDBException( $e );
346        }
347
348        return $job;
349    }
350
351    /**
352     * Reserve a row with a single UPDATE without holding row locks over RTTs...
353     *
354     * @param string $uuid 32 char hex string
355     * @param int $rand Random unsigned integer (31 bits)
356     * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
357     * @return stdClass|false Row|false
358     */
359    protected function claimRandom( $uuid, $rand, $gte ) {
360        $dbw = $this->getPrimaryDB();
361        /** @noinspection PhpUnusedLocalVariableInspection */
362        $scope = $this->getScopedNoTrxFlag( $dbw );
363        // Check cache to see if the queue has <= OFFSET items
364        $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
365
366        $invertedDirection = false; // whether one job_random direction was already scanned
367        // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
368        // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
369        // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
370        // be used here with MySQL.
371        do {
372            if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
373                // For small queues, using OFFSET will overshoot and return no rows more often.
374                // Instead, this uses job_random to pick a row (possibly checking both directions).
375                $row = $dbw->newSelectQueryBuilder()
376                    ->select( self::selectFields() )
377                    ->from( 'job' )
378                    ->where(
379                        [
380                            'job_cmd' => $this->type,
381                            'job_token' => '', // unclaimed
382                            $dbw->expr( 'job_random', $gte ? '>=' : '<=', $rand )
383                        ]
384                    )
385                    ->orderBy(
386                        'job_random',
387                        $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
388                    )
389                    ->caller( __METHOD__ )->fetchRow();
390                if ( !$row && !$invertedDirection ) {
391                    $gte = !$gte;
392                    $invertedDirection = true;
393                    continue; // try the other direction
394                }
395            } else { // table *may* have >= MAX_OFFSET rows
396                // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
397                // in MySQL if there are many rows for some reason. This uses a small OFFSET
398                // instead of job_random for reducing excess claim retries.
399                $row = $dbw->newSelectQueryBuilder()
400                    ->select( self::selectFields() )
401                    ->from( 'job' )
402                    ->where(
403                        [
404                            'job_cmd' => $this->type,
405                            'job_token' => '', // unclaimed
406                        ]
407                    )
408                    ->offset( mt_rand( 0, self::MAX_OFFSET ) )
409                    ->caller( __METHOD__ )->fetchRow();
410                if ( !$row ) {
411                    $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
412                    $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
413                    continue; // use job_random
414                }
415            }
416
417            if ( !$row ) {
418                break;
419            }
420
421            $dbw->newUpdateQueryBuilder()
422                ->update( 'job' ) // update by PK
423                ->set( [
424                    'job_token' => $uuid,
425                    'job_token_timestamp' => $dbw->timestamp(),
426                    'job_attempts = job_attempts+1'
427                ] )
428                ->where( [
429                    'job_cmd' => $this->type,
430                    'job_id' => $row->job_id,
431                    'job_token' => ''
432                ] )
433                ->caller( __METHOD__ )->execute();
434
435            // This might get raced out by another runner when claiming the previously
436            // selected row. The use of job_random should minimize this problem, however.
437            if ( !$dbw->affectedRows() ) {
438                $row = false; // raced out
439            }
440        } while ( !$row );
441
442        return $row;
443    }
444
445    /**
446     * Reserve a row with a single UPDATE without holding row locks over RTTs...
447     *
448     * @param string $uuid 32 char hex string
449     * @return stdClass|false Row|false
450     */
451    protected function claimOldest( $uuid ) {
452        $dbw = $this->getPrimaryDB();
453        /** @noinspection PhpUnusedLocalVariableInspection */
454        $scope = $this->getScopedNoTrxFlag( $dbw );
455
456        $row = false; // the row acquired
457        do {
458            if ( $dbw->getType() === 'mysql' ) {
459                // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
460                // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
461                // Postgres has no such limitation. However, MySQL offers an
462                // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
463                $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
464                    "SET " .
465                        "job_token = {$dbw->addQuotes( $uuid ) }" .
466                        "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}" .
467                        "job_attempts = job_attempts+1 " .
468                    "WHERE ( " .
469                        "job_cmd = {$dbw->addQuotes( $this->type )} " .
470                        "AND job_token = {$dbw->addQuotes( '' )} " .
471                    ") ORDER BY job_id ASC LIMIT 1",
472                    __METHOD__
473                );
474            } else {
475                // Use a subquery to find the job, within an UPDATE to claim it.
476                // This uses as much of the DB wrapper functions as possible.
477                $qb = $dbw->newSelectQueryBuilder()
478                    ->select( 'job_id' )
479                    ->from( 'job' )
480                    ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
481                    ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC )
482                    ->limit( 1 );
483
484                $dbw->newUpdateQueryBuilder()
485                    ->update( 'job' )
486                    ->set( [
487                        'job_token' => $uuid,
488                        'job_token_timestamp' => $dbw->timestamp(),
489                        'job_attempts = job_attempts+1'
490                    ] )
491                    ->where( [ 'job_id = (' . $qb->getSQL() . ')' ] )
492                    ->caller( __METHOD__ )->execute();
493            }
494
495            if ( !$dbw->affectedRows() ) {
496                break;
497            }
498
499            // Fetch any row that we just reserved...
500            $row = $dbw->newSelectQueryBuilder()
501                ->select( self::selectFields() )
502                ->from( 'job' )
503                ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] )
504                ->caller( __METHOD__ )->fetchRow();
505            if ( !$row ) { // raced out by duplicate job removal
506                wfDebug( "Row deleted as duplicate by another process." );
507            }
508        } while ( !$row );
509
510        return $row;
511    }
512
513    /**
514     * @see JobQueue::doAck()
515     * @param RunnableJob $job
516     * @throws JobQueueConnectionError
517     * @throws JobQueueError
518     */
519    protected function doAck( RunnableJob $job ) {
520        $id = $job->getMetadata( 'id' );
521        if ( $id === null ) {
522            throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no ID." );
523        }
524
525        $dbw = $this->getPrimaryDB();
526        /** @noinspection PhpUnusedLocalVariableInspection */
527        $scope = $this->getScopedNoTrxFlag( $dbw );
528        try {
529            // Delete a row with a single DELETE without holding row locks over RTTs...
530            $dbw->newDeleteQueryBuilder()
531                ->deleteFrom( 'job' )
532                ->where( [ 'job_cmd' => $this->type, 'job_id' => $id ] )
533                ->caller( __METHOD__ )->execute();
534
535            $this->incrStats( 'acks', $this->type );
536        } catch ( DBError $e ) {
537            throw $this->getDBException( $e );
538        }
539    }
540
541    /**
542     * @see JobQueue::doDeduplicateRootJob()
543     * @param IJobSpecification $job
544     * @throws JobQueueConnectionError
545     * @return bool
546     */
547    protected function doDeduplicateRootJob( IJobSpecification $job ) {
548        // Callers should call JobQueueGroup::push() before this method so that if the
549        // insert fails, the de-duplication registration will be aborted. Since the insert
550        // is deferred till "transaction idle", do the same here, so that the ordering is
551        // maintained. Having only the de-duplication registration succeed would cause
552        // jobs to become no-ops without any actual jobs that made them redundant.
553        $dbw = $this->getPrimaryDB();
554        /** @noinspection PhpUnusedLocalVariableInspection */
555        $scope = $this->getScopedNoTrxFlag( $dbw );
556        $dbw->onTransactionCommitOrIdle(
557            function () use ( $job ) {
558                parent::doDeduplicateRootJob( $job );
559            },
560            __METHOD__
561        );
562
563        return true;
564    }
565
566    /**
567     * @see JobQueue::doDelete()
568     * @return bool
569     */
570    protected function doDelete() {
571        $dbw = $this->getPrimaryDB();
572        /** @noinspection PhpUnusedLocalVariableInspection */
573        $scope = $this->getScopedNoTrxFlag( $dbw );
574        try {
575            $dbw->newDeleteQueryBuilder()
576                ->deleteFrom( 'job' )
577                ->where( [ 'job_cmd' => $this->type ] )
578                ->caller( __METHOD__ )->execute();
579        } catch ( DBError $e ) {
580            throw $this->getDBException( $e );
581        }
582
583        return true;
584    }
585
586    /**
587     * @see JobQueue::doWaitForBackups()
588     * @return void
589     */
590    protected function doWaitForBackups() {
591        if ( $this->server ) {
592            return; // not using LBFactory instance
593        }
594
595        $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
596        $lbFactory->waitForReplication();
597    }
598
599    /**
600     * @return void
601     */
602    protected function doFlushCaches() {
603        foreach ( [ 'size', 'acquiredcount' ] as $type ) {
604            $this->wanCache->delete( $this->getCacheKey( $type ) );
605        }
606    }
607
608    /**
609     * @see JobQueue::getAllQueuedJobs()
610     * @return Iterator<RunnableJob>
611     */
612    public function getAllQueuedJobs() {
613        return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
614    }
615
616    /**
617     * @see JobQueue::getAllAcquiredJobs()
618     * @return Iterator<RunnableJob>
619     */
620    public function getAllAcquiredJobs() {
621        return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
622    }
623
624    /**
625     * @see JobQueue::getAllAbandonedJobs()
626     * @return Iterator<RunnableJob>
627     */
628    public function getAllAbandonedJobs() {
629        return $this->getJobIterator( [
630            'job_cmd' => $this->getType(),
631            "job_token > ''",
632            "job_attempts >= " . intval( $this->maxTries )
633        ] );
634    }
635
636    /**
637     * @param array $conds Query conditions
638     * @return Iterator<RunnableJob>
639     */
640    protected function getJobIterator( array $conds ) {
641        $dbr = $this->getReplicaDB();
642        /** @noinspection PhpUnusedLocalVariableInspection */
643        $scope = $this->getScopedNoTrxFlag( $dbr );
644        $qb = $dbr->newSelectQueryBuilder()
645            ->select( self::selectFields() )
646            ->from( 'job' )
647            ->where( $conds );
648        try {
649            return new MappedIterator(
650                $qb->caller( __METHOD__ )->fetchResultSet(),
651                function ( $row ) {
652                    return $this->jobFromRow( $row );
653                }
654            );
655        } catch ( DBError $e ) {
656            throw $this->getDBException( $e );
657        }
658    }
659
660    public function getCoalesceLocationInternal() {
661        if ( $this->server ) {
662            return null; // not using the LBFactory instance
663        }
664
665        return is_string( $this->cluster )
666            ? "DBCluster:{$this->cluster}:{$this->domain}"
667            : "LBFactory:{$this->domain}";
668    }
669
670    protected function doGetSiblingQueuesWithJobs( array $types ) {
671        $dbr = $this->getReplicaDB();
672        /** @noinspection PhpUnusedLocalVariableInspection */
673        $scope = $this->getScopedNoTrxFlag( $dbr );
674        // @note: this does not check whether the jobs are claimed or not.
675        // This is useful so JobQueueGroup::pop() also sees queues that only
676        // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
677        // failed jobs so that they can be popped again for that edge case.
678        $res = $dbr->newSelectQueryBuilder()
679            ->select( 'job_cmd' )
680            ->distinct()
681            ->from( 'job' )
682            ->where( [ 'job_cmd' => $types ] )
683            ->caller( __METHOD__ )->fetchResultSet();
684
685        $types = [];
686        foreach ( $res as $row ) {
687            $types[] = $row->job_cmd;
688        }
689
690        return $types;
691    }
692
693    protected function doGetSiblingQueueSizes( array $types ) {
694        $dbr = $this->getReplicaDB();
695        /** @noinspection PhpUnusedLocalVariableInspection */
696        $scope = $this->getScopedNoTrxFlag( $dbr );
697
698        $res = $dbr->newSelectQueryBuilder()
699            ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] )
700            ->from( 'job' )
701            ->where( [ 'job_cmd' => $types ] )
702            ->groupBy( 'job_cmd' )
703            ->caller( __METHOD__ )->fetchResultSet();
704
705        $sizes = [];
706        foreach ( $res as $row ) {
707            $sizes[$row->job_cmd] = (int)$row->count;
708        }
709
710        return $sizes;
711    }
712
713    /**
714     * Recycle or destroy any jobs that have been claimed for too long
715     *
716     * @return int Number of jobs recycled/deleted
717     */
718    public function recycleAndDeleteStaleJobs() {
719        $now = time();
720        $count = 0; // affected rows
721        $dbw = $this->getPrimaryDB();
722        /** @noinspection PhpUnusedLocalVariableInspection */
723        $scope = $this->getScopedNoTrxFlag( $dbw );
724
725        try {
726            if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
727                return $count; // already in progress
728            }
729
730            // Remove claims on jobs acquired for too long if enabled...
731            if ( $this->claimTTL > 0 ) {
732                $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
733                // Get the IDs of jobs that have be claimed but not finished after too long.
734                // These jobs can be recycled into the queue by expiring the claim. Selecting
735                // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
736                $res = $dbw->newSelectQueryBuilder()
737                    ->select( 'job_id' )
738                    ->from( 'job' )
739                    ->where(
740                        [
741                            'job_cmd' => $this->type,
742                            $dbw->expr( 'job_token', '!=', '' ), // was acquired
743                            $dbw->expr( 'job_token_timestamp', '<', $claimCutoff ), // stale
744                            $dbw->expr( 'job_attempts', '<', $this->maxTries ), // retries left
745                        ]
746                    )
747                    ->caller( __METHOD__ )->fetchResultSet();
748                $ids = array_map(
749                    static function ( $o ) {
750                        return $o->job_id;
751                    }, iterator_to_array( $res )
752                );
753                if ( count( $ids ) ) {
754                    // Reset job_token for these jobs so that other runners will pick them up.
755                    // Set the timestamp to the current time, as it is useful to now that the job
756                    // was already tried before (the timestamp becomes the "released" time).
757                    $dbw->newUpdateQueryBuilder()
758                        ->update( 'job' )
759                        ->set( [
760                            'job_token' => '',
761                            'job_token_timestamp' => $dbw->timestamp( $now ) // time of release
762                        ] )
763                        ->where( [
764                            'job_id' => $ids,
765                            $dbw->expr( 'job_token', '!=', '' ),
766                        ] )
767                        ->caller( __METHOD__ )->execute();
768
769                    $affected = $dbw->affectedRows();
770                    $count += $affected;
771                    $this->incrStats( 'recycles', $this->type, $affected );
772                }
773            }
774
775            // Just destroy any stale jobs...
776            $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
777            $qb = $dbw->newSelectQueryBuilder()
778                ->select( 'job_id' )
779                ->from( 'job' )
780                ->where(
781                    [
782                        'job_cmd' => $this->type,
783                        $dbw->expr( 'job_token', '!=', '' ), // was acquired
784                        $dbw->expr( 'job_token_timestamp', '<', $pruneCutoff ) // stale
785                    ]
786                );
787            if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
788                $qb->andWhere( $dbw->expr( 'job_attempts', '>=', $this->maxTries ) );
789            }
790            // Get the IDs of jobs that are considered stale and should be removed. Selecting
791            // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
792            $res = $qb->caller( __METHOD__ )->fetchResultSet();
793            $ids = array_map(
794                static function ( $o ) {
795                    return $o->job_id;
796                }, iterator_to_array( $res )
797            );
798            if ( count( $ids ) ) {
799                $dbw->newDeleteQueryBuilder()
800                    ->deleteFrom( 'job' )
801                    ->where( [ 'job_id' => $ids ] )
802                    ->caller( __METHOD__ )->execute();
803                $affected = $dbw->affectedRows();
804                $count += $affected;
805                $this->incrStats( 'abandons', $this->type, $affected );
806            }
807
808            $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
809        } catch ( DBError $e ) {
810            throw $this->getDBException( $e );
811        }
812
813        return $count;
814    }
815
816    /**
817     * @param IJobSpecification $job
818     * @param IDatabase $db
819     * @return array
820     */
821    protected function insertFields( IJobSpecification $job, IDatabase $db ) {
822        return [
823            // Fields that describe the nature of the job
824            'job_cmd' => $job->getType(),
825            'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
826            'job_title' => $job->getParams()['title'] ?? '',
827            'job_params' => self::makeBlob( $job->getParams() ),
828            // Additional job metadata
829            'job_timestamp' => $db->timestamp(),
830            'job_sha1' => Wikimedia\base_convert(
831                sha1( serialize( $job->getDeduplicationInfo() ) ),
832                16, 36, 31
833            ),
834            'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
835        ];
836    }
837
838    /**
839     * @throws JobQueueConnectionError
840     * @return IDatabase
841     */
842    protected function getReplicaDB() {
843        try {
844            return $this->getDB( DB_REPLICA );
845        } catch ( DBConnectionError $e ) {
846            throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
847        }
848    }
849
850    /**
851     * @throws JobQueueConnectionError
852     * @return IMaintainableDatabase
853     * @since 1.37
854     */
855    protected function getPrimaryDB() {
856        try {
857            return $this->getDB( DB_PRIMARY );
858        } catch ( DBConnectionError $e ) {
859            throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
860        }
861    }
862
863    /**
864     * @param int $index (DB_REPLICA/DB_PRIMARY)
865     * @return IMaintainableDatabase
866     */
867    protected function getDB( $index ) {
868        if ( $this->server ) {
869            if ( $this->conn instanceof IDatabase ) {
870                return $this->conn;
871            } elseif ( $this->conn instanceof DBError ) {
872                throw $this->conn;
873            }
874
875            try {
876                $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create(
877                    $this->server['type'],
878                    $this->server
879                );
880            } catch ( DBError $e ) {
881                $this->conn = $e;
882                throw $e;
883            }
884
885            return $this->conn;
886        } else {
887            $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
888            $lb = is_string( $this->cluster )
889                ? $lbFactory->getExternalLB( $this->cluster )
890                : $lbFactory->getMainLB( $this->domain );
891
892            if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) {
893                // Keep a separate connection to avoid contention and deadlocks;
894                // However, SQLite has the opposite behavior due to DB-level locking.
895                $flags = $lb::CONN_TRX_AUTOCOMMIT;
896            } else {
897                // Jobs insertion will be deferred until the PRESEND stage to reduce contention.
898                $flags = 0;
899            }
900
901            return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
902        }
903    }
904
905    /**
906     * @param IDatabase $db
907     * @return ScopedCallback
908     */
909    private function getScopedNoTrxFlag( IDatabase $db ) {
910        $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
911        $db->clearFlag( DBO_TRX ); // make each query its own transaction
912
913        return new ScopedCallback( static function () use ( $db, $autoTrx ) {
914            if ( $autoTrx ) {
915                $db->setFlag( DBO_TRX ); // restore old setting
916            }
917        } );
918    }
919
920    /**
921     * @param string $property
922     * @return string
923     */
924    private function getCacheKey( $property ) {
925        $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
926
927        return $this->wanCache->makeGlobalKey(
928            'jobqueue',
929            $this->domain,
930            $cluster,
931            $this->type,
932            $property
933        );
934    }
935
936    /**
937     * @param array|false $params
938     * @return string
939     */
940    protected static function makeBlob( $params ) {
941        if ( $params !== false ) {
942            return serialize( $params );
943        } else {
944            return '';
945        }
946    }
947
948    /**
949     * @param stdClass $row
950     * @return RunnableJob
951     */
952    protected function jobFromRow( $row ) {
953        $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
954        if ( !is_array( $params ) ) { // this shouldn't happen
955            throw new UnexpectedValueException(
956                "Could not unserialize job with ID '{$row->job_id}'." );
957        }
958
959        $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
960        $job = $this->factoryJob( $row->job_cmd, $params );
961        $job->setMetadata( 'id', $row->job_id );
962        $job->setMetadata( 'timestamp', $row->job_timestamp );
963
964        return $job;
965    }
966
967    /**
968     * @param DBError $e
969     * @return JobQueueError
970     */
971    protected function getDBException( DBError $e ) {
972        return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
973    }
974
975    /**
976     * Return the list of job fields that should be selected.
977     * @since 1.23
978     * @return array
979     */
980    public static function selectFields() {
981        return [
982            'job_id',
983            'job_cmd',
984            'job_namespace',
985            'job_title',
986            'job_timestamp',
987            'job_params',
988            'job_random',
989            'job_attempts',
990            'job_token',
991            'job_token_timestamp',
992            'job_sha1',
993        ];
994    }
995}