Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 474
0.00% covered (danger)
0.00%
0 / 35
CRAP
0.00% covered (danger)
0.00%
0 / 1
JobQueueDB
0.00% covered (danger)
0.00%
0 / 474
0.00% covered (danger)
0.00%
0 / 35
10302
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
20
 supportedOrders
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 optimalOrder
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doIsEmpty
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
6
 doGetSize
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
12
 doGetAcquiredCount
0.00% covered (danger)
0.00%
0 / 19
0.00% covered (danger)
0.00%
0 / 1
20
 doGetAbandonedCount
0.00% covered (danger)
0.00%
0 / 22
0.00% covered (danger)
0.00%
0 / 1
20
 doBatchPush
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
2
 doBatchPushInternal
0.00% covered (danger)
0.00%
0 / 39
0.00% covered (danger)
0.00%
0 / 1
110
 doPop
0.00% covered (danger)
0.00%
0 / 19
0.00% covered (danger)
0.00%
0 / 1
42
 claimRandom
0.00% covered (danger)
0.00%
0 / 58
0.00% covered (danger)
0.00%
0 / 1
90
 claimOldest
0.00% covered (danger)
0.00%
0 / 41
0.00% covered (danger)
0.00%
0 / 1
20
 doAck
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
12
 doDeduplicateRootJob
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
2
 doDelete
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
6
 doWaitForBackups
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 doFlushCaches
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 getAllQueuedJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getAllAcquiredJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getAllAbandonedJobs
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 getJobIterator
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
6
 getCoalesceLocationInternal
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 doGetSiblingQueuesWithJobs
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
6
 doGetSiblingQueueSizes
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
6
 recycleAndDeleteStaleJobs
0.00% covered (danger)
0.00%
0 / 71
0.00% covered (danger)
0.00%
0 / 1
56
 insertFields
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
2
 getReplicaDB
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getPrimaryDB
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getDB
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
56
 getScopedNoTrxFlag
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 getCacheKey
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 makeBlob
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 jobFromRow
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 getDBException
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 selectFields
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20use MediaWiki\MediaWikiServices;
21use Wikimedia\Rdbms\DBConnectionError;
22use Wikimedia\Rdbms\DBError;
23use Wikimedia\Rdbms\IDatabase;
24use Wikimedia\Rdbms\IMaintainableDatabase;
25use Wikimedia\Rdbms\IReadableDatabase;
26use Wikimedia\Rdbms\RawSQLValue;
27use Wikimedia\Rdbms\SelectQueryBuilder;
28use Wikimedia\Rdbms\ServerInfo;
29use Wikimedia\ScopedCallback;
30
31/**
32 * Database-backed job queue storage.
33 *
34 * @since 1.21
35 * @ingroup JobQueue
36 */
37class JobQueueDB extends JobQueue {
38    /* seconds to cache info without re-validating */
39    private const CACHE_TTL_SHORT = 30;
40    /* seconds a job can live once claimed */
41    private const MAX_AGE_PRUNE = 7 * 24 * 3600;
42    /**
43     * Used for job_random, the highest safe 32-bit signed integer.
44     * Equivalent to `( 2 ** 31 ) - 1` on 64-bit.
45     */
46    private const MAX_JOB_RANDOM = 2_147_483_647;
47    /* maximum number of rows to skip */
48    private const MAX_OFFSET = 255;
49
50    /** @var IMaintainableDatabase|DBError|null */
51    protected $conn;
52
53    /** @var array|null Server configuration array */
54    protected $server;
55    /** @var string|null Name of an external DB cluster or null for the local DB cluster */
56    protected $cluster;
57
58    /**
59     * Additional parameters include:
60     *   - server  : Server configuration array for Database::factory. Overrides "cluster".
61     *   - cluster : The name of an external cluster registered via LBFactory.
62     *               If not specified, the primary DB cluster for the wiki will be used.
63     *               This can be overridden with a custom cluster so that DB handles will
64     *               be retrieved via LBFactory::getExternalLB() and getConnection().
65     * @param array $params
66     */
67    protected function __construct( array $params ) {
68        parent::__construct( $params );
69
70        if ( isset( $params['server'] ) ) {
71            $this->server = $params['server'];
72        } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
73            $this->cluster = $params['cluster'];
74        }
75    }
76
77    protected function supportedOrders() {
78        return [ 'random', 'timestamp', 'fifo' ];
79    }
80
81    protected function optimalOrder() {
82        return 'random';
83    }
84
85    /**
86     * @see JobQueue::doIsEmpty()
87     * @return bool
88     */
89    protected function doIsEmpty() {
90        $dbr = $this->getReplicaDB();
91        /** @noinspection PhpUnusedLocalVariableInspection */
92        $scope = $this->getScopedNoTrxFlag( $dbr );
93        try {
94            // unclaimed job
95            $found = (bool)$dbr->newSelectQueryBuilder()
96                ->select( '1' )
97                ->from( 'job' )
98                ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
99                ->caller( __METHOD__ )->fetchField();
100        } catch ( DBError $e ) {
101            throw $this->getDBException( $e );
102        }
103
104        return !$found;
105    }
106
107    /**
108     * @see JobQueue::doGetSize()
109     * @return int
110     */
111    protected function doGetSize() {
112        $key = $this->getCacheKey( 'size' );
113
114        $size = $this->wanCache->get( $key );
115        if ( is_int( $size ) ) {
116            return $size;
117        }
118
119        $dbr = $this->getReplicaDB();
120        /** @noinspection PhpUnusedLocalVariableInspection */
121        $scope = $this->getScopedNoTrxFlag( $dbr );
122        try {
123            $size = $dbr->newSelectQueryBuilder()
124                ->from( 'job' )
125                ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
126                ->caller( __METHOD__ )->fetchRowCount();
127        } catch ( DBError $e ) {
128            throw $this->getDBException( $e );
129        }
130        $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
131
132        return $size;
133    }
134
135    /**
136     * @see JobQueue::doGetAcquiredCount()
137     * @return int
138     */
139    protected function doGetAcquiredCount() {
140        if ( $this->claimTTL <= 0 ) {
141            return 0; // no acknowledgements
142        }
143
144        $key = $this->getCacheKey( 'acquiredcount' );
145
146        $count = $this->wanCache->get( $key );
147        if ( is_int( $count ) ) {
148            return $count;
149        }
150
151        $dbr = $this->getReplicaDB();
152        /** @noinspection PhpUnusedLocalVariableInspection */
153        $scope = $this->getScopedNoTrxFlag( $dbr );
154        try {
155            $count = $dbr->newSelectQueryBuilder()
156                ->from( 'job' )
157                ->where( [
158                    'job_cmd' => $this->type,
159                    $dbr->expr( 'job_token', '!=', '' ),
160                ] )
161                ->caller( __METHOD__ )->fetchRowCount();
162        } catch ( DBError $e ) {
163            throw $this->getDBException( $e );
164        }
165        $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
166
167        return $count;
168    }
169
170    /**
171     * @see JobQueue::doGetAbandonedCount()
172     * @return int
173     * @throws JobQueueConnectionError
174     * @throws JobQueueError
175     */
176    protected function doGetAbandonedCount() {
177        if ( $this->claimTTL <= 0 ) {
178            return 0; // no acknowledgements
179        }
180
181        $key = $this->getCacheKey( 'abandonedcount' );
182
183        $count = $this->wanCache->get( $key );
184        if ( is_int( $count ) ) {
185            return $count;
186        }
187
188        $dbr = $this->getReplicaDB();
189        /** @noinspection PhpUnusedLocalVariableInspection */
190        $scope = $this->getScopedNoTrxFlag( $dbr );
191        try {
192            $count = $dbr->newSelectQueryBuilder()
193                ->from( 'job' )
194                ->where(
195                    [
196                        'job_cmd' => $this->type,
197                        $dbr->expr( 'job_token', '!=', '' ),
198                        $dbr->expr( 'job_attempts', '>=', $this->maxTries ),
199                    ]
200                )
201                ->caller( __METHOD__ )->fetchRowCount();
202        } catch ( DBError $e ) {
203            throw $this->getDBException( $e );
204        }
205
206        $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
207
208        return $count;
209    }
210
211    /**
212     * @see JobQueue::doBatchPush()
213     * @param IJobSpecification[] $jobs
214     * @param int $flags
215     * @throws DBError|Exception
216     * @return void
217     */
218    protected function doBatchPush( array $jobs, $flags ) {
219        $dbw = $this->getPrimaryDB();
220        /** @noinspection PhpUnusedLocalVariableInspection */
221        $scope = $this->getScopedNoTrxFlag( $dbw );
222        // In general, there will be two cases here:
223        // a) sqlite; DB connection is probably a regular round-aware handle.
224        // If the connection is busy with a transaction, then defer the job writes
225        // until right before the main round commit step. Any errors that bubble
226        // up will rollback the main commit round.
227        // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
228        // No transaction is active nor will be started by writes, so enqueue the jobs
229        // now so that any errors will show up immediately as the interface expects. Any
230        // errors that bubble up will rollback the main commit round.
231        $fname = __METHOD__;
232        $dbw->onTransactionPreCommitOrIdle(
233            function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
234                $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
235            },
236            $fname
237        );
238    }
239
240    /**
241     * This function should *not* be called outside of JobQueueDB
242     *
243     * @suppress SecurityCheck-SQLInjection Bug in phan-taint-check handling bulk inserts
244     * @param IDatabase $dbw
245     * @param IJobSpecification[] $jobs
246     * @param int $flags
247     * @param string $method
248     * @throws DBError
249     * @return void
250     */
251    public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
252        if ( $jobs === [] ) {
253            return;
254        }
255
256        $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
257        $rowList = []; // list of jobs for jobs that are not de-duplicated
258        foreach ( $jobs as $job ) {
259            $row = $this->insertFields( $job, $dbw );
260            if ( $job->ignoreDuplicates() ) {
261                $rowSet[$row['job_sha1']] = $row;
262            } else {
263                $rowList[] = $row;
264            }
265        }
266
267        if ( $flags & self::QOS_ATOMIC ) {
268            $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
269        }
270        try {
271            // Strip out any duplicate jobs that are already in the queue...
272            if ( count( $rowSet ) ) {
273                $res = $dbw->newSelectQueryBuilder()
274                    ->select( 'job_sha1' )
275                    ->from( 'job' )
276                    ->where(
277                        [
278                            // No job_type condition since it's part of the job_sha1 hash
279                            'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ),
280                            'job_token' => '' // unclaimed
281                        ]
282                    )
283                    ->caller( $method )->fetchResultSet();
284                foreach ( $res as $row ) {
285                    wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
286                    unset( $rowSet[$row->job_sha1] ); // already enqueued
287                }
288            }
289            // Build the full list of job rows to insert
290            $rows = array_merge( $rowList, array_values( $rowSet ) );
291            // Insert the job rows in chunks to avoid replica DB lag...
292            foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
293                $dbw->newInsertQueryBuilder()
294                    ->insertInto( 'job' )
295                    ->rows( $rowBatch )
296                    ->caller( $method )->execute();
297            }
298            $this->incrStats( 'inserts', $this->type, count( $rows ) );
299            $this->incrStats( 'dupe_inserts', $this->type,
300                count( $rowSet ) + count( $rowList ) - count( $rows )
301            );
302        } catch ( DBError $e ) {
303            throw $this->getDBException( $e );
304        }
305        if ( $flags & self::QOS_ATOMIC ) {
306            $dbw->endAtomic( $method );
307        }
308    }
309
310    /**
311     * @see JobQueue::doPop()
312     * @return RunnableJob|false
313     */
314    protected function doPop() {
315        $dbw = $this->getPrimaryDB();
316        /** @noinspection PhpUnusedLocalVariableInspection */
317        $scope = $this->getScopedNoTrxFlag( $dbw );
318
319        $job = false; // job popped off
320        try {
321            $uuid = wfRandomString( 32 ); // pop attempt
322            do { // retry when our row is invalid or deleted as a duplicate
323                // Try to reserve a row in the DB...
324                if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
325                    $row = $this->claimOldest( $uuid );
326                } else { // random first
327                    $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
328                    $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
329                    $row = $this->claimRandom( $uuid, $rand, $gte );
330                }
331                // Check if we found a row to reserve...
332                if ( !$row ) {
333                    break; // nothing to do
334                }
335                $this->incrStats( 'pops', $this->type );
336
337                // Get the job object from the row...
338                $job = $this->jobFromRow( $row );
339                break; // done
340            } while ( true );
341
342            if ( !$job || mt_rand( 0, 9 ) == 0 ) {
343                // Handled jobs that need to be recycled/deleted;
344                // any recycled jobs will be picked up next attempt
345                $this->recycleAndDeleteStaleJobs();
346            }
347        } catch ( DBError $e ) {
348            throw $this->getDBException( $e );
349        }
350
351        return $job;
352    }
353
354    /**
355     * Reserve a row with a single UPDATE without holding row locks over RTTs...
356     *
357     * @param string $uuid 32 char hex string
358     * @param int $rand Random unsigned integer (31 bits)
359     * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
360     * @return stdClass|false Row|false
361     */
362    protected function claimRandom( $uuid, $rand, $gte ) {
363        $dbw = $this->getPrimaryDB();
364        /** @noinspection PhpUnusedLocalVariableInspection */
365        $scope = $this->getScopedNoTrxFlag( $dbw );
366        // Check cache to see if the queue has <= OFFSET items
367        $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
368
369        $invertedDirection = false; // whether one job_random direction was already scanned
370        // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
371        // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
372        // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
373        // be used here with MySQL.
374        do {
375            if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
376                // For small queues, using OFFSET will overshoot and return no rows more often.
377                // Instead, this uses job_random to pick a row (possibly checking both directions).
378                $row = $dbw->newSelectQueryBuilder()
379                    ->select( self::selectFields() )
380                    ->from( 'job' )
381                    ->where(
382                        [
383                            'job_cmd' => $this->type,
384                            'job_token' => '', // unclaimed
385                            $dbw->expr( 'job_random', $gte ? '>=' : '<=', $rand )
386                        ]
387                    )
388                    ->orderBy(
389                        'job_random',
390                        $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
391                    )
392                    ->caller( __METHOD__ )->fetchRow();
393                if ( !$row && !$invertedDirection ) {
394                    $gte = !$gte;
395                    $invertedDirection = true;
396                    continue; // try the other direction
397                }
398            } else { // table *may* have >= MAX_OFFSET rows
399                // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
400                // in MySQL if there are many rows for some reason. This uses a small OFFSET
401                // instead of job_random for reducing excess claim retries.
402                $row = $dbw->newSelectQueryBuilder()
403                    ->select( self::selectFields() )
404                    ->from( 'job' )
405                    ->where(
406                        [
407                            'job_cmd' => $this->type,
408                            'job_token' => '', // unclaimed
409                        ]
410                    )
411                    ->offset( mt_rand( 0, self::MAX_OFFSET ) )
412                    ->caller( __METHOD__ )->fetchRow();
413                if ( !$row ) {
414                    $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
415                    $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
416                    continue; // use job_random
417                }
418            }
419
420            if ( !$row ) {
421                break;
422            }
423
424            $dbw->newUpdateQueryBuilder()
425                ->update( 'job' ) // update by PK
426                ->set( [
427                    'job_token' => $uuid,
428                    'job_token_timestamp' => $dbw->timestamp(),
429                    'job_attempts' => new RawSQLValue( 'job_attempts+1' ),
430                ] )
431                ->where( [
432                    'job_cmd' => $this->type,
433                    'job_id' => $row->job_id,
434                    'job_token' => ''
435                ] )
436                ->caller( __METHOD__ )->execute();
437
438            // This might get raced out by another runner when claiming the previously
439            // selected row. The use of job_random should minimize this problem, however.
440            if ( !$dbw->affectedRows() ) {
441                $row = false; // raced out
442            }
443        } while ( !$row );
444
445        return $row;
446    }
447
448    /**
449     * Reserve a row with a single UPDATE without holding row locks over RTTs...
450     *
451     * @param string $uuid 32 char hex string
452     * @return stdClass|false Row|false
453     */
454    protected function claimOldest( $uuid ) {
455        $dbw = $this->getPrimaryDB();
456        /** @noinspection PhpUnusedLocalVariableInspection */
457        $scope = $this->getScopedNoTrxFlag( $dbw );
458
459        $row = false; // the row acquired
460        do {
461            if ( $dbw->getType() === 'mysql' ) {
462                // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
463                // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
464                // Postgres has no such limitation. However, MySQL offers an
465                // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
466                $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
467                    "SET " .
468                        "job_token = {$dbw->addQuotes( $uuid ) }" .
469                        "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}" .
470                        "job_attempts = job_attempts+1 " .
471                    "WHERE ( " .
472                        "job_cmd = {$dbw->addQuotes( $this->type )} " .
473                        "AND job_token = {$dbw->addQuotes( '' )} " .
474                    ") ORDER BY job_id ASC LIMIT 1",
475                    __METHOD__
476                );
477            } else {
478                // Use a subquery to find the job, within an UPDATE to claim it.
479                // This uses as much of the DB wrapper functions as possible.
480                $qb = $dbw->newSelectQueryBuilder()
481                    ->select( 'job_id' )
482                    ->from( 'job' )
483                    ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
484                    ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC )
485                    ->limit( 1 );
486
487                $dbw->newUpdateQueryBuilder()
488                    ->update( 'job' )
489                    ->set( [
490                        'job_token' => $uuid,
491                        'job_token_timestamp' => $dbw->timestamp(),
492                        'job_attempts' => new RawSQLValue( 'job_attempts+1' ),
493                    ] )
494                    ->where( [ 'job_id' => new RawSQLValue( '(' . $qb->getSQL() . ')' ) ] )
495                    ->caller( __METHOD__ )->execute();
496            }
497
498            if ( !$dbw->affectedRows() ) {
499                break;
500            }
501
502            // Fetch any row that we just reserved...
503            $row = $dbw->newSelectQueryBuilder()
504                ->select( self::selectFields() )
505                ->from( 'job' )
506                ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] )
507                ->caller( __METHOD__ )->fetchRow();
508            if ( !$row ) { // raced out by duplicate job removal
509                wfDebug( "Row deleted as duplicate by another process." );
510            }
511        } while ( !$row );
512
513        return $row;
514    }
515
516    /**
517     * @see JobQueue::doAck()
518     * @param RunnableJob $job
519     * @throws JobQueueConnectionError
520     * @throws JobQueueError
521     */
522    protected function doAck( RunnableJob $job ) {
523        $id = $job->getMetadata( 'id' );
524        if ( $id === null ) {
525            throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no ID." );
526        }
527
528        $dbw = $this->getPrimaryDB();
529        /** @noinspection PhpUnusedLocalVariableInspection */
530        $scope = $this->getScopedNoTrxFlag( $dbw );
531        try {
532            // Delete a row with a single DELETE without holding row locks over RTTs...
533            $dbw->newDeleteQueryBuilder()
534                ->deleteFrom( 'job' )
535                ->where( [ 'job_cmd' => $this->type, 'job_id' => $id ] )
536                ->caller( __METHOD__ )->execute();
537
538            $this->incrStats( 'acks', $this->type );
539        } catch ( DBError $e ) {
540            throw $this->getDBException( $e );
541        }
542    }
543
544    /**
545     * @see JobQueue::doDeduplicateRootJob()
546     * @param IJobSpecification $job
547     * @throws JobQueueConnectionError
548     * @return bool
549     */
550    protected function doDeduplicateRootJob( IJobSpecification $job ) {
551        // Callers should call JobQueueGroup::push() before this method so that if the
552        // insert fails, the de-duplication registration will be aborted. Since the insert
553        // is deferred till "transaction idle", do the same here, so that the ordering is
554        // maintained. Having only the de-duplication registration succeed would cause
555        // jobs to become no-ops without any actual jobs that made them redundant.
556        $dbw = $this->getPrimaryDB();
557        /** @noinspection PhpUnusedLocalVariableInspection */
558        $scope = $this->getScopedNoTrxFlag( $dbw );
559        $dbw->onTransactionCommitOrIdle(
560            function () use ( $job ) {
561                parent::doDeduplicateRootJob( $job );
562            },
563            __METHOD__
564        );
565
566        return true;
567    }
568
569    /**
570     * @see JobQueue::doDelete()
571     * @return bool
572     */
573    protected function doDelete() {
574        $dbw = $this->getPrimaryDB();
575        /** @noinspection PhpUnusedLocalVariableInspection */
576        $scope = $this->getScopedNoTrxFlag( $dbw );
577        try {
578            $dbw->newDeleteQueryBuilder()
579                ->deleteFrom( 'job' )
580                ->where( [ 'job_cmd' => $this->type ] )
581                ->caller( __METHOD__ )->execute();
582        } catch ( DBError $e ) {
583            throw $this->getDBException( $e );
584        }
585
586        return true;
587    }
588
589    /**
590     * @see JobQueue::doWaitForBackups()
591     * @return void
592     */
593    protected function doWaitForBackups() {
594        if ( $this->server ) {
595            return; // not using LBFactory instance
596        }
597
598        $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
599        $lbFactory->waitForReplication();
600    }
601
602    /**
603     * @return void
604     */
605    protected function doFlushCaches() {
606        foreach ( [ 'size', 'acquiredcount' ] as $type ) {
607            $this->wanCache->delete( $this->getCacheKey( $type ) );
608        }
609    }
610
611    /**
612     * @see JobQueue::getAllQueuedJobs()
613     * @return Iterator<RunnableJob>
614     */
615    public function getAllQueuedJobs() {
616        return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
617    }
618
619    /**
620     * @see JobQueue::getAllAcquiredJobs()
621     * @return Iterator<RunnableJob>
622     */
623    public function getAllAcquiredJobs() {
624        return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
625    }
626
627    /**
628     * @see JobQueue::getAllAbandonedJobs()
629     * @return Iterator<RunnableJob>
630     */
631    public function getAllAbandonedJobs() {
632        return $this->getJobIterator( [
633            'job_cmd' => $this->getType(),
634            "job_token > ''",
635            "job_attempts >= " . intval( $this->maxTries )
636        ] );
637    }
638
639    /**
640     * @param array $conds Query conditions
641     * @return Iterator<RunnableJob>
642     */
643    protected function getJobIterator( array $conds ) {
644        $dbr = $this->getReplicaDB();
645        /** @noinspection PhpUnusedLocalVariableInspection */
646        $scope = $this->getScopedNoTrxFlag( $dbr );
647        $qb = $dbr->newSelectQueryBuilder()
648            ->select( self::selectFields() )
649            ->from( 'job' )
650            ->where( $conds );
651        try {
652            return new MappedIterator(
653                $qb->caller( __METHOD__ )->fetchResultSet(),
654                function ( $row ) {
655                    return $this->jobFromRow( $row );
656                }
657            );
658        } catch ( DBError $e ) {
659            throw $this->getDBException( $e );
660        }
661    }
662
663    public function getCoalesceLocationInternal() {
664        if ( $this->server ) {
665            return null; // not using the LBFactory instance
666        }
667
668        return is_string( $this->cluster )
669            ? "DBCluster:{$this->cluster}:{$this->domain}"
670            : "LBFactory:{$this->domain}";
671    }
672
673    protected function doGetSiblingQueuesWithJobs( array $types ) {
674        $dbr = $this->getReplicaDB();
675        /** @noinspection PhpUnusedLocalVariableInspection */
676        $scope = $this->getScopedNoTrxFlag( $dbr );
677        // @note: this does not check whether the jobs are claimed or not.
678        // This is useful so JobQueueGroup::pop() also sees queues that only
679        // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
680        // failed jobs so that they can be popped again for that edge case.
681        $res = $dbr->newSelectQueryBuilder()
682            ->select( 'job_cmd' )
683            ->distinct()
684            ->from( 'job' )
685            ->where( [ 'job_cmd' => $types ] )
686            ->caller( __METHOD__ )->fetchResultSet();
687
688        $types = [];
689        foreach ( $res as $row ) {
690            $types[] = $row->job_cmd;
691        }
692
693        return $types;
694    }
695
696    protected function doGetSiblingQueueSizes( array $types ) {
697        $dbr = $this->getReplicaDB();
698        /** @noinspection PhpUnusedLocalVariableInspection */
699        $scope = $this->getScopedNoTrxFlag( $dbr );
700
701        $res = $dbr->newSelectQueryBuilder()
702            ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] )
703            ->from( 'job' )
704            ->where( [ 'job_cmd' => $types ] )
705            ->groupBy( 'job_cmd' )
706            ->caller( __METHOD__ )->fetchResultSet();
707
708        $sizes = [];
709        foreach ( $res as $row ) {
710            $sizes[$row->job_cmd] = (int)$row->count;
711        }
712
713        return $sizes;
714    }
715
716    /**
717     * Recycle or destroy any jobs that have been claimed for too long
718     *
719     * @return int Number of jobs recycled/deleted
720     */
721    public function recycleAndDeleteStaleJobs() {
722        $now = time();
723        $count = 0; // affected rows
724        $dbw = $this->getPrimaryDB();
725        /** @noinspection PhpUnusedLocalVariableInspection */
726        $scope = $this->getScopedNoTrxFlag( $dbw );
727
728        try {
729            if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
730                return $count; // already in progress
731            }
732
733            // Remove claims on jobs acquired for too long if enabled...
734            if ( $this->claimTTL > 0 ) {
735                $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
736                // Get the IDs of jobs that have be claimed but not finished after too long.
737                // These jobs can be recycled into the queue by expiring the claim. Selecting
738                // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
739                $res = $dbw->newSelectQueryBuilder()
740                    ->select( 'job_id' )
741                    ->from( 'job' )
742                    ->where(
743                        [
744                            'job_cmd' => $this->type,
745                            $dbw->expr( 'job_token', '!=', '' ), // was acquired
746                            $dbw->expr( 'job_token_timestamp', '<', $claimCutoff ), // stale
747                            $dbw->expr( 'job_attempts', '<', $this->maxTries ), // retries left
748                        ]
749                    )
750                    ->caller( __METHOD__ )->fetchResultSet();
751                $ids = array_map(
752                    static function ( $o ) {
753                        return $o->job_id;
754                    }, iterator_to_array( $res )
755                );
756                if ( count( $ids ) ) {
757                    // Reset job_token for these jobs so that other runners will pick them up.
758                    // Set the timestamp to the current time, as it is useful to now that the job
759                    // was already tried before (the timestamp becomes the "released" time).
760                    $dbw->newUpdateQueryBuilder()
761                        ->update( 'job' )
762                        ->set( [
763                            'job_token' => '',
764                            'job_token_timestamp' => $dbw->timestamp( $now ) // time of release
765                        ] )
766                        ->where( [
767                            'job_id' => $ids,
768                            $dbw->expr( 'job_token', '!=', '' ),
769                        ] )
770                        ->caller( __METHOD__ )->execute();
771
772                    $affected = $dbw->affectedRows();
773                    $count += $affected;
774                    $this->incrStats( 'recycles', $this->type, $affected );
775                }
776            }
777
778            // Just destroy any stale jobs...
779            $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
780            $qb = $dbw->newSelectQueryBuilder()
781                ->select( 'job_id' )
782                ->from( 'job' )
783                ->where(
784                    [
785                        'job_cmd' => $this->type,
786                        $dbw->expr( 'job_token', '!=', '' ), // was acquired
787                        $dbw->expr( 'job_token_timestamp', '<', $pruneCutoff ) // stale
788                    ]
789                );
790            if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
791                $qb->andWhere( $dbw->expr( 'job_attempts', '>=', $this->maxTries ) );
792            }
793            // Get the IDs of jobs that are considered stale and should be removed. Selecting
794            // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
795            $res = $qb->caller( __METHOD__ )->fetchResultSet();
796            $ids = array_map(
797                static function ( $o ) {
798                    return $o->job_id;
799                }, iterator_to_array( $res )
800            );
801            if ( count( $ids ) ) {
802                $dbw->newDeleteQueryBuilder()
803                    ->deleteFrom( 'job' )
804                    ->where( [ 'job_id' => $ids ] )
805                    ->caller( __METHOD__ )->execute();
806                $affected = $dbw->affectedRows();
807                $count += $affected;
808                $this->incrStats( 'abandons', $this->type, $affected );
809            }
810
811            $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
812        } catch ( DBError $e ) {
813            throw $this->getDBException( $e );
814        }
815
816        return $count;
817    }
818
819    /**
820     * @param IJobSpecification $job
821     * @param IReadableDatabase $db
822     * @return array
823     */
824    protected function insertFields( IJobSpecification $job, IReadableDatabase $db ) {
825        return [
826            // Fields that describe the nature of the job
827            'job_cmd' => $job->getType(),
828            'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
829            'job_title' => $job->getParams()['title'] ?? '',
830            'job_params' => self::makeBlob( $job->getParams() ),
831            // Additional job metadata
832            'job_timestamp' => $db->timestamp(),
833            'job_sha1' => Wikimedia\base_convert(
834                sha1( serialize( $job->getDeduplicationInfo() ) ),
835                16, 36, 31
836            ),
837            'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
838        ];
839    }
840
841    /**
842     * @throws JobQueueConnectionError
843     * @return IDatabase
844     */
845    protected function getReplicaDB() {
846        try {
847            return $this->getDB( DB_REPLICA );
848        } catch ( DBConnectionError $e ) {
849            throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
850        }
851    }
852
853    /**
854     * @throws JobQueueConnectionError
855     * @return IMaintainableDatabase
856     * @since 1.37
857     */
858    protected function getPrimaryDB() {
859        try {
860            return $this->getDB( DB_PRIMARY );
861        } catch ( DBConnectionError $e ) {
862            throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
863        }
864    }
865
866    /**
867     * @param int $index (DB_REPLICA/DB_PRIMARY)
868     * @return IMaintainableDatabase
869     */
870    protected function getDB( $index ) {
871        if ( $this->server ) {
872            if ( $this->conn instanceof IDatabase ) {
873                return $this->conn;
874            } elseif ( $this->conn instanceof DBError ) {
875                throw $this->conn;
876            }
877
878            try {
879                $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create(
880                    $this->server['type'],
881                    $this->server
882                );
883            } catch ( DBError $e ) {
884                $this->conn = $e;
885                throw $e;
886            }
887
888            return $this->conn;
889        } else {
890            $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
891            $lb = is_string( $this->cluster )
892                ? $lbFactory->getExternalLB( $this->cluster )
893                : $lbFactory->getMainLB( $this->domain );
894
895            if ( $lb->getServerType( ServerInfo::WRITER_INDEX ) !== 'sqlite' ) {
896                // Keep a separate connection to avoid contention and deadlocks;
897                // However, SQLite has the opposite behavior due to DB-level locking.
898                $flags = $lb::CONN_TRX_AUTOCOMMIT;
899            } else {
900                // Jobs insertion will be deferred until the PRESEND stage to reduce contention.
901                $flags = 0;
902            }
903
904            return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
905        }
906    }
907
908    /**
909     * @param IReadableDatabase $db
910     * @return ScopedCallback
911     */
912    private function getScopedNoTrxFlag( IReadableDatabase $db ) {
913        $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
914        $db->clearFlag( DBO_TRX ); // make each query its own transaction
915
916        return new ScopedCallback( static function () use ( $db, $autoTrx ) {
917            if ( $autoTrx ) {
918                $db->setFlag( DBO_TRX ); // restore old setting
919            }
920        } );
921    }
922
923    /**
924     * @param string $property
925     * @return string
926     */
927    private function getCacheKey( $property ) {
928        $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
929
930        return $this->wanCache->makeGlobalKey(
931            'jobqueue',
932            $this->domain,
933            $cluster,
934            $this->type,
935            $property
936        );
937    }
938
939    /**
940     * @param array|false $params
941     * @return string
942     */
943    protected static function makeBlob( $params ) {
944        if ( $params !== false ) {
945            return serialize( $params );
946        } else {
947            return '';
948        }
949    }
950
951    /**
952     * @param stdClass $row
953     * @return RunnableJob
954     */
955    protected function jobFromRow( $row ) {
956        $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
957        if ( !is_array( $params ) ) { // this shouldn't happen
958            throw new UnexpectedValueException(
959                "Could not unserialize job with ID '{$row->job_id}'." );
960        }
961
962        $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
963        $job = $this->factoryJob( $row->job_cmd, $params );
964        $job->setMetadata( 'id', $row->job_id );
965        $job->setMetadata( 'timestamp', $row->job_timestamp );
966
967        return $job;
968    }
969
970    /**
971     * @param DBError $e
972     * @return JobQueueError
973     */
974    protected function getDBException( DBError $e ) {
975        return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
976    }
977
978    /**
979     * Return the list of job fields that should be selected.
980     * @since 1.23
981     * @return array
982     */
983    public static function selectFields() {
984        return [
985            'job_id',
986            'job_cmd',
987            'job_namespace',
988            'job_title',
989            'job_timestamp',
990            'job_params',
991            'job_random',
992            'job_attempts',
993            'job_token',
994            'job_token_timestamp',
995            'job_sha1',
996        ];
997    }
998}