Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 474 |
|
0.00% |
0 / 35 |
CRAP | |
0.00% |
0 / 1 |
JobQueueDB | |
0.00% |
0 / 474 |
|
0.00% |
0 / 35 |
10302 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
20 | |||
supportedOrders | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
optimalOrder | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doIsEmpty | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
6 | |||
doGetSize | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
12 | |||
doGetAcquiredCount | |
0.00% |
0 / 19 |
|
0.00% |
0 / 1 |
20 | |||
doGetAbandonedCount | |
0.00% |
0 / 22 |
|
0.00% |
0 / 1 |
20 | |||
doBatchPush | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
2 | |||
doBatchPushInternal | |
0.00% |
0 / 39 |
|
0.00% |
0 / 1 |
110 | |||
doPop | |
0.00% |
0 / 19 |
|
0.00% |
0 / 1 |
42 | |||
claimRandom | |
0.00% |
0 / 58 |
|
0.00% |
0 / 1 |
90 | |||
claimOldest | |
0.00% |
0 / 41 |
|
0.00% |
0 / 1 |
20 | |||
doAck | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
12 | |||
doDeduplicateRootJob | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
2 | |||
doDelete | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
6 | |||
doWaitForBackups | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
doFlushCaches | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
getAllQueuedJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getAllAcquiredJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getAllAbandonedJobs | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
getJobIterator | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
6 | |||
getCoalesceLocationInternal | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
12 | |||
doGetSiblingQueuesWithJobs | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
6 | |||
doGetSiblingQueueSizes | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
6 | |||
recycleAndDeleteStaleJobs | |
0.00% |
0 / 71 |
|
0.00% |
0 / 1 |
56 | |||
insertFields | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
2 | |||
getReplicaDB | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
getPrimaryDB | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
getDB | |
0.00% |
0 / 21 |
|
0.00% |
0 / 1 |
56 | |||
getScopedNoTrxFlag | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
6 | |||
getCacheKey | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
6 | |||
makeBlob | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
jobFromRow | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 | |||
getDBException | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
selectFields | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | use MediaWiki\MediaWikiServices; |
21 | use Wikimedia\Rdbms\DBConnectionError; |
22 | use Wikimedia\Rdbms\DBError; |
23 | use Wikimedia\Rdbms\IDatabase; |
24 | use Wikimedia\Rdbms\IMaintainableDatabase; |
25 | use Wikimedia\Rdbms\SelectQueryBuilder; |
26 | use Wikimedia\ScopedCallback; |
27 | |
28 | /** |
29 | * Database-backed job queue storage. |
30 | * |
31 | * @since 1.21 |
32 | * @ingroup JobQueue |
33 | */ |
34 | class JobQueueDB extends JobQueue { |
35 | /* seconds to cache info without re-validating */ |
36 | private const CACHE_TTL_SHORT = 30; |
37 | /* seconds a job can live once claimed */ |
38 | private const MAX_AGE_PRUNE = 7 * 24 * 3600; |
39 | /** |
40 | * Used for job_random, the highest safe 32-bit signed integer. |
41 | * Equivalent to `( 2 ** 31 ) - 1` on 64-bit. |
42 | */ |
43 | private const MAX_JOB_RANDOM = 2_147_483_647; |
44 | /* maximum number of rows to skip */ |
45 | private const MAX_OFFSET = 255; |
46 | |
47 | /** @var IMaintainableDatabase|DBError|null */ |
48 | protected $conn; |
49 | |
50 | /** @var array|null Server configuration array */ |
51 | protected $server; |
52 | /** @var string|null Name of an external DB cluster or null for the local DB cluster */ |
53 | protected $cluster; |
54 | |
55 | /** |
56 | * Additional parameters include: |
57 | * - server : Server configuration array for Database::factory. Overrides "cluster". |
58 | * - cluster : The name of an external cluster registered via LBFactory. |
59 | * If not specified, the primary DB cluster for the wiki will be used. |
60 | * This can be overridden with a custom cluster so that DB handles will |
61 | * be retrieved via LBFactory::getExternalLB() and getConnection(). |
62 | * @param array $params |
63 | */ |
64 | protected function __construct( array $params ) { |
65 | parent::__construct( $params ); |
66 | |
67 | if ( isset( $params['server'] ) ) { |
68 | $this->server = $params['server']; |
69 | } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) { |
70 | $this->cluster = $params['cluster']; |
71 | } |
72 | } |
73 | |
74 | protected function supportedOrders() { |
75 | return [ 'random', 'timestamp', 'fifo' ]; |
76 | } |
77 | |
78 | protected function optimalOrder() { |
79 | return 'random'; |
80 | } |
81 | |
82 | /** |
83 | * @see JobQueue::doIsEmpty() |
84 | * @return bool |
85 | */ |
86 | protected function doIsEmpty() { |
87 | $dbr = $this->getReplicaDB(); |
88 | /** @noinspection PhpUnusedLocalVariableInspection */ |
89 | $scope = $this->getScopedNoTrxFlag( $dbr ); |
90 | try { |
91 | // unclaimed job |
92 | $found = (bool)$dbr->newSelectQueryBuilder() |
93 | ->select( '1' ) |
94 | ->from( 'job' ) |
95 | ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] ) |
96 | ->caller( __METHOD__ )->fetchField(); |
97 | } catch ( DBError $e ) { |
98 | throw $this->getDBException( $e ); |
99 | } |
100 | |
101 | return !$found; |
102 | } |
103 | |
104 | /** |
105 | * @see JobQueue::doGetSize() |
106 | * @return int |
107 | */ |
108 | protected function doGetSize() { |
109 | $key = $this->getCacheKey( 'size' ); |
110 | |
111 | $size = $this->wanCache->get( $key ); |
112 | if ( is_int( $size ) ) { |
113 | return $size; |
114 | } |
115 | |
116 | $dbr = $this->getReplicaDB(); |
117 | /** @noinspection PhpUnusedLocalVariableInspection */ |
118 | $scope = $this->getScopedNoTrxFlag( $dbr ); |
119 | try { |
120 | $size = $dbr->newSelectQueryBuilder() |
121 | ->from( 'job' ) |
122 | ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] ) |
123 | ->caller( __METHOD__ )->fetchRowCount(); |
124 | } catch ( DBError $e ) { |
125 | throw $this->getDBException( $e ); |
126 | } |
127 | $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT ); |
128 | |
129 | return $size; |
130 | } |
131 | |
132 | /** |
133 | * @see JobQueue::doGetAcquiredCount() |
134 | * @return int |
135 | */ |
136 | protected function doGetAcquiredCount() { |
137 | if ( $this->claimTTL <= 0 ) { |
138 | return 0; // no acknowledgements |
139 | } |
140 | |
141 | $key = $this->getCacheKey( 'acquiredcount' ); |
142 | |
143 | $count = $this->wanCache->get( $key ); |
144 | if ( is_int( $count ) ) { |
145 | return $count; |
146 | } |
147 | |
148 | $dbr = $this->getReplicaDB(); |
149 | /** @noinspection PhpUnusedLocalVariableInspection */ |
150 | $scope = $this->getScopedNoTrxFlag( $dbr ); |
151 | try { |
152 | $count = $dbr->newSelectQueryBuilder() |
153 | ->from( 'job' ) |
154 | ->where( [ |
155 | 'job_cmd' => $this->type, |
156 | $dbr->expr( 'job_token', '!=', '' ), |
157 | ] ) |
158 | ->caller( __METHOD__ )->fetchRowCount(); |
159 | } catch ( DBError $e ) { |
160 | throw $this->getDBException( $e ); |
161 | } |
162 | $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT ); |
163 | |
164 | return $count; |
165 | } |
166 | |
167 | /** |
168 | * @see JobQueue::doGetAbandonedCount() |
169 | * @return int |
170 | * @throws JobQueueConnectionError |
171 | * @throws JobQueueError |
172 | */ |
173 | protected function doGetAbandonedCount() { |
174 | if ( $this->claimTTL <= 0 ) { |
175 | return 0; // no acknowledgements |
176 | } |
177 | |
178 | $key = $this->getCacheKey( 'abandonedcount' ); |
179 | |
180 | $count = $this->wanCache->get( $key ); |
181 | if ( is_int( $count ) ) { |
182 | return $count; |
183 | } |
184 | |
185 | $dbr = $this->getReplicaDB(); |
186 | /** @noinspection PhpUnusedLocalVariableInspection */ |
187 | $scope = $this->getScopedNoTrxFlag( $dbr ); |
188 | try { |
189 | $count = $dbr->newSelectQueryBuilder() |
190 | ->from( 'job' ) |
191 | ->where( |
192 | [ |
193 | 'job_cmd' => $this->type, |
194 | $dbr->expr( 'job_token', '!=', '' ), |
195 | $dbr->expr( 'job_attempts', '>=', $this->maxTries ), |
196 | ] |
197 | ) |
198 | ->caller( __METHOD__ )->fetchRowCount(); |
199 | } catch ( DBError $e ) { |
200 | throw $this->getDBException( $e ); |
201 | } |
202 | |
203 | $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT ); |
204 | |
205 | return $count; |
206 | } |
207 | |
208 | /** |
209 | * @see JobQueue::doBatchPush() |
210 | * @param IJobSpecification[] $jobs |
211 | * @param int $flags |
212 | * @throws DBError|Exception |
213 | * @return void |
214 | */ |
215 | protected function doBatchPush( array $jobs, $flags ) { |
216 | $dbw = $this->getPrimaryDB(); |
217 | /** @noinspection PhpUnusedLocalVariableInspection */ |
218 | $scope = $this->getScopedNoTrxFlag( $dbw ); |
219 | // In general, there will be two cases here: |
220 | // a) sqlite; DB connection is probably a regular round-aware handle. |
221 | // If the connection is busy with a transaction, then defer the job writes |
222 | // until right before the main round commit step. Any errors that bubble |
223 | // up will rollback the main commit round. |
224 | // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle. |
225 | // No transaction is active nor will be started by writes, so enqueue the jobs |
226 | // now so that any errors will show up immediately as the interface expects. Any |
227 | // errors that bubble up will rollback the main commit round. |
228 | $fname = __METHOD__; |
229 | $dbw->onTransactionPreCommitOrIdle( |
230 | function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) { |
231 | $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname ); |
232 | }, |
233 | $fname |
234 | ); |
235 | } |
236 | |
237 | /** |
238 | * This function should *not* be called outside of JobQueueDB |
239 | * |
240 | * @suppress SecurityCheck-SQLInjection Bug in phan-taint-check handling bulk inserts |
241 | * @param IDatabase $dbw |
242 | * @param IJobSpecification[] $jobs |
243 | * @param int $flags |
244 | * @param string $method |
245 | * @throws DBError |
246 | * @return void |
247 | */ |
248 | public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { |
249 | if ( $jobs === [] ) { |
250 | return; |
251 | } |
252 | |
253 | $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated |
254 | $rowList = []; // list of jobs for jobs that are not de-duplicated |
255 | foreach ( $jobs as $job ) { |
256 | $row = $this->insertFields( $job, $dbw ); |
257 | if ( $job->ignoreDuplicates() ) { |
258 | $rowSet[$row['job_sha1']] = $row; |
259 | } else { |
260 | $rowList[] = $row; |
261 | } |
262 | } |
263 | |
264 | if ( $flags & self::QOS_ATOMIC ) { |
265 | $dbw->startAtomic( $method ); // wrap all the job additions in one transaction |
266 | } |
267 | try { |
268 | // Strip out any duplicate jobs that are already in the queue... |
269 | if ( count( $rowSet ) ) { |
270 | $res = $dbw->newSelectQueryBuilder() |
271 | ->select( 'job_sha1' ) |
272 | ->from( 'job' ) |
273 | ->where( |
274 | [ |
275 | // No job_type condition since it's part of the job_sha1 hash |
276 | 'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ), |
277 | 'job_token' => '' // unclaimed |
278 | ] |
279 | ) |
280 | ->caller( $method )->fetchResultSet(); |
281 | foreach ( $res as $row ) { |
282 | wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." ); |
283 | unset( $rowSet[$row->job_sha1] ); // already enqueued |
284 | } |
285 | } |
286 | // Build the full list of job rows to insert |
287 | $rows = array_merge( $rowList, array_values( $rowSet ) ); |
288 | // Insert the job rows in chunks to avoid replica DB lag... |
289 | foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { |
290 | $dbw->newInsertQueryBuilder() |
291 | ->insertInto( 'job' ) |
292 | ->rows( $rowBatch ) |
293 | ->caller( $method )->execute(); |
294 | } |
295 | $this->incrStats( 'inserts', $this->type, count( $rows ) ); |
296 | $this->incrStats( 'dupe_inserts', $this->type, |
297 | count( $rowSet ) + count( $rowList ) - count( $rows ) |
298 | ); |
299 | } catch ( DBError $e ) { |
300 | throw $this->getDBException( $e ); |
301 | } |
302 | if ( $flags & self::QOS_ATOMIC ) { |
303 | $dbw->endAtomic( $method ); |
304 | } |
305 | } |
306 | |
307 | /** |
308 | * @see JobQueue::doPop() |
309 | * @return RunnableJob|false |
310 | */ |
311 | protected function doPop() { |
312 | $dbw = $this->getPrimaryDB(); |
313 | /** @noinspection PhpUnusedLocalVariableInspection */ |
314 | $scope = $this->getScopedNoTrxFlag( $dbw ); |
315 | |
316 | $job = false; // job popped off |
317 | try { |
318 | $uuid = wfRandomString( 32 ); // pop attempt |
319 | do { // retry when our row is invalid or deleted as a duplicate |
320 | // Try to reserve a row in the DB... |
321 | if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) { |
322 | $row = $this->claimOldest( $uuid ); |
323 | } else { // random first |
324 | $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs |
325 | $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand |
326 | $row = $this->claimRandom( $uuid, $rand, $gte ); |
327 | } |
328 | // Check if we found a row to reserve... |
329 | if ( !$row ) { |
330 | break; // nothing to do |
331 | } |
332 | $this->incrStats( 'pops', $this->type ); |
333 | |
334 | // Get the job object from the row... |
335 | $job = $this->jobFromRow( $row ); |
336 | break; // done |
337 | } while ( true ); |
338 | |
339 | if ( !$job || mt_rand( 0, 9 ) == 0 ) { |
340 | // Handled jobs that need to be recycled/deleted; |
341 | // any recycled jobs will be picked up next attempt |
342 | $this->recycleAndDeleteStaleJobs(); |
343 | } |
344 | } catch ( DBError $e ) { |
345 | throw $this->getDBException( $e ); |
346 | } |
347 | |
348 | return $job; |
349 | } |
350 | |
351 | /** |
352 | * Reserve a row with a single UPDATE without holding row locks over RTTs... |
353 | * |
354 | * @param string $uuid 32 char hex string |
355 | * @param int $rand Random unsigned integer (31 bits) |
356 | * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random) |
357 | * @return stdClass|false Row|false |
358 | */ |
359 | protected function claimRandom( $uuid, $rand, $gte ) { |
360 | $dbw = $this->getPrimaryDB(); |
361 | /** @noinspection PhpUnusedLocalVariableInspection */ |
362 | $scope = $this->getScopedNoTrxFlag( $dbw ); |
363 | // Check cache to see if the queue has <= OFFSET items |
364 | $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) ); |
365 | |
366 | $invertedDirection = false; // whether one job_random direction was already scanned |
367 | // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT |
368 | // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is |
369 | // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot |
370 | // be used here with MySQL. |
371 | do { |
372 | if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows |
373 | // For small queues, using OFFSET will overshoot and return no rows more often. |
374 | // Instead, this uses job_random to pick a row (possibly checking both directions). |
375 | $row = $dbw->newSelectQueryBuilder() |
376 | ->select( self::selectFields() ) |
377 | ->from( 'job' ) |
378 | ->where( |
379 | [ |
380 | 'job_cmd' => $this->type, |
381 | 'job_token' => '', // unclaimed |
382 | $dbw->expr( 'job_random', $gte ? '>=' : '<=', $rand ) |
383 | ] |
384 | ) |
385 | ->orderBy( |
386 | 'job_random', |
387 | $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC |
388 | ) |
389 | ->caller( __METHOD__ )->fetchRow(); |
390 | if ( !$row && !$invertedDirection ) { |
391 | $gte = !$gte; |
392 | $invertedDirection = true; |
393 | continue; // try the other direction |
394 | } |
395 | } else { // table *may* have >= MAX_OFFSET rows |
396 | // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU |
397 | // in MySQL if there are many rows for some reason. This uses a small OFFSET |
398 | // instead of job_random for reducing excess claim retries. |
399 | $row = $dbw->newSelectQueryBuilder() |
400 | ->select( self::selectFields() ) |
401 | ->from( 'job' ) |
402 | ->where( |
403 | [ |
404 | 'job_cmd' => $this->type, |
405 | 'job_token' => '', // unclaimed |
406 | ] |
407 | ) |
408 | ->offset( mt_rand( 0, self::MAX_OFFSET ) ) |
409 | ->caller( __METHOD__ )->fetchRow(); |
410 | if ( !$row ) { |
411 | $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows |
412 | $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 ); |
413 | continue; // use job_random |
414 | } |
415 | } |
416 | |
417 | if ( !$row ) { |
418 | break; |
419 | } |
420 | |
421 | $dbw->newUpdateQueryBuilder() |
422 | ->update( 'job' ) // update by PK |
423 | ->set( [ |
424 | 'job_token' => $uuid, |
425 | 'job_token_timestamp' => $dbw->timestamp(), |
426 | 'job_attempts = job_attempts+1' |
427 | ] ) |
428 | ->where( [ |
429 | 'job_cmd' => $this->type, |
430 | 'job_id' => $row->job_id, |
431 | 'job_token' => '' |
432 | ] ) |
433 | ->caller( __METHOD__ )->execute(); |
434 | |
435 | // This might get raced out by another runner when claiming the previously |
436 | // selected row. The use of job_random should minimize this problem, however. |
437 | if ( !$dbw->affectedRows() ) { |
438 | $row = false; // raced out |
439 | } |
440 | } while ( !$row ); |
441 | |
442 | return $row; |
443 | } |
444 | |
445 | /** |
446 | * Reserve a row with a single UPDATE without holding row locks over RTTs... |
447 | * |
448 | * @param string $uuid 32 char hex string |
449 | * @return stdClass|false Row|false |
450 | */ |
451 | protected function claimOldest( $uuid ) { |
452 | $dbw = $this->getPrimaryDB(); |
453 | /** @noinspection PhpUnusedLocalVariableInspection */ |
454 | $scope = $this->getScopedNoTrxFlag( $dbw ); |
455 | |
456 | $row = false; // the row acquired |
457 | do { |
458 | if ( $dbw->getType() === 'mysql' ) { |
459 | // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the |
460 | // same table being changed in an UPDATE query in MySQL (gives Error: 1093). |
461 | // Postgres has no such limitation. However, MySQL offers an |
462 | // alternative here by supporting ORDER BY + LIMIT for UPDATE queries. |
463 | $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " . |
464 | "SET " . |
465 | "job_token = {$dbw->addQuotes( $uuid ) }, " . |
466 | "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " . |
467 | "job_attempts = job_attempts+1 " . |
468 | "WHERE ( " . |
469 | "job_cmd = {$dbw->addQuotes( $this->type )} " . |
470 | "AND job_token = {$dbw->addQuotes( '' )} " . |
471 | ") ORDER BY job_id ASC LIMIT 1", |
472 | __METHOD__ |
473 | ); |
474 | } else { |
475 | // Use a subquery to find the job, within an UPDATE to claim it. |
476 | // This uses as much of the DB wrapper functions as possible. |
477 | $qb = $dbw->newSelectQueryBuilder() |
478 | ->select( 'job_id' ) |
479 | ->from( 'job' ) |
480 | ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] ) |
481 | ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC ) |
482 | ->limit( 1 ); |
483 | |
484 | $dbw->newUpdateQueryBuilder() |
485 | ->update( 'job' ) |
486 | ->set( [ |
487 | 'job_token' => $uuid, |
488 | 'job_token_timestamp' => $dbw->timestamp(), |
489 | 'job_attempts = job_attempts+1' |
490 | ] ) |
491 | ->where( [ 'job_id = (' . $qb->getSQL() . ')' ] ) |
492 | ->caller( __METHOD__ )->execute(); |
493 | } |
494 | |
495 | if ( !$dbw->affectedRows() ) { |
496 | break; |
497 | } |
498 | |
499 | // Fetch any row that we just reserved... |
500 | $row = $dbw->newSelectQueryBuilder() |
501 | ->select( self::selectFields() ) |
502 | ->from( 'job' ) |
503 | ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] ) |
504 | ->caller( __METHOD__ )->fetchRow(); |
505 | if ( !$row ) { // raced out by duplicate job removal |
506 | wfDebug( "Row deleted as duplicate by another process." ); |
507 | } |
508 | } while ( !$row ); |
509 | |
510 | return $row; |
511 | } |
512 | |
513 | /** |
514 | * @see JobQueue::doAck() |
515 | * @param RunnableJob $job |
516 | * @throws JobQueueConnectionError |
517 | * @throws JobQueueError |
518 | */ |
519 | protected function doAck( RunnableJob $job ) { |
520 | $id = $job->getMetadata( 'id' ); |
521 | if ( $id === null ) { |
522 | throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no ID." ); |
523 | } |
524 | |
525 | $dbw = $this->getPrimaryDB(); |
526 | /** @noinspection PhpUnusedLocalVariableInspection */ |
527 | $scope = $this->getScopedNoTrxFlag( $dbw ); |
528 | try { |
529 | // Delete a row with a single DELETE without holding row locks over RTTs... |
530 | $dbw->newDeleteQueryBuilder() |
531 | ->deleteFrom( 'job' ) |
532 | ->where( [ 'job_cmd' => $this->type, 'job_id' => $id ] ) |
533 | ->caller( __METHOD__ )->execute(); |
534 | |
535 | $this->incrStats( 'acks', $this->type ); |
536 | } catch ( DBError $e ) { |
537 | throw $this->getDBException( $e ); |
538 | } |
539 | } |
540 | |
541 | /** |
542 | * @see JobQueue::doDeduplicateRootJob() |
543 | * @param IJobSpecification $job |
544 | * @throws JobQueueConnectionError |
545 | * @return bool |
546 | */ |
547 | protected function doDeduplicateRootJob( IJobSpecification $job ) { |
548 | // Callers should call JobQueueGroup::push() before this method so that if the |
549 | // insert fails, the de-duplication registration will be aborted. Since the insert |
550 | // is deferred till "transaction idle", do the same here, so that the ordering is |
551 | // maintained. Having only the de-duplication registration succeed would cause |
552 | // jobs to become no-ops without any actual jobs that made them redundant. |
553 | $dbw = $this->getPrimaryDB(); |
554 | /** @noinspection PhpUnusedLocalVariableInspection */ |
555 | $scope = $this->getScopedNoTrxFlag( $dbw ); |
556 | $dbw->onTransactionCommitOrIdle( |
557 | function () use ( $job ) { |
558 | parent::doDeduplicateRootJob( $job ); |
559 | }, |
560 | __METHOD__ |
561 | ); |
562 | |
563 | return true; |
564 | } |
565 | |
566 | /** |
567 | * @see JobQueue::doDelete() |
568 | * @return bool |
569 | */ |
570 | protected function doDelete() { |
571 | $dbw = $this->getPrimaryDB(); |
572 | /** @noinspection PhpUnusedLocalVariableInspection */ |
573 | $scope = $this->getScopedNoTrxFlag( $dbw ); |
574 | try { |
575 | $dbw->newDeleteQueryBuilder() |
576 | ->deleteFrom( 'job' ) |
577 | ->where( [ 'job_cmd' => $this->type ] ) |
578 | ->caller( __METHOD__ )->execute(); |
579 | } catch ( DBError $e ) { |
580 | throw $this->getDBException( $e ); |
581 | } |
582 | |
583 | return true; |
584 | } |
585 | |
586 | /** |
587 | * @see JobQueue::doWaitForBackups() |
588 | * @return void |
589 | */ |
590 | protected function doWaitForBackups() { |
591 | if ( $this->server ) { |
592 | return; // not using LBFactory instance |
593 | } |
594 | |
595 | $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); |
596 | $lbFactory->waitForReplication(); |
597 | } |
598 | |
599 | /** |
600 | * @return void |
601 | */ |
602 | protected function doFlushCaches() { |
603 | foreach ( [ 'size', 'acquiredcount' ] as $type ) { |
604 | $this->wanCache->delete( $this->getCacheKey( $type ) ); |
605 | } |
606 | } |
607 | |
608 | /** |
609 | * @see JobQueue::getAllQueuedJobs() |
610 | * @return Iterator<RunnableJob> |
611 | */ |
612 | public function getAllQueuedJobs() { |
613 | return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] ); |
614 | } |
615 | |
616 | /** |
617 | * @see JobQueue::getAllAcquiredJobs() |
618 | * @return Iterator<RunnableJob> |
619 | */ |
620 | public function getAllAcquiredJobs() { |
621 | return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] ); |
622 | } |
623 | |
624 | /** |
625 | * @see JobQueue::getAllAbandonedJobs() |
626 | * @return Iterator<RunnableJob> |
627 | */ |
628 | public function getAllAbandonedJobs() { |
629 | return $this->getJobIterator( [ |
630 | 'job_cmd' => $this->getType(), |
631 | "job_token > ''", |
632 | "job_attempts >= " . intval( $this->maxTries ) |
633 | ] ); |
634 | } |
635 | |
636 | /** |
637 | * @param array $conds Query conditions |
638 | * @return Iterator<RunnableJob> |
639 | */ |
640 | protected function getJobIterator( array $conds ) { |
641 | $dbr = $this->getReplicaDB(); |
642 | /** @noinspection PhpUnusedLocalVariableInspection */ |
643 | $scope = $this->getScopedNoTrxFlag( $dbr ); |
644 | $qb = $dbr->newSelectQueryBuilder() |
645 | ->select( self::selectFields() ) |
646 | ->from( 'job' ) |
647 | ->where( $conds ); |
648 | try { |
649 | return new MappedIterator( |
650 | $qb->caller( __METHOD__ )->fetchResultSet(), |
651 | function ( $row ) { |
652 | return $this->jobFromRow( $row ); |
653 | } |
654 | ); |
655 | } catch ( DBError $e ) { |
656 | throw $this->getDBException( $e ); |
657 | } |
658 | } |
659 | |
660 | public function getCoalesceLocationInternal() { |
661 | if ( $this->server ) { |
662 | return null; // not using the LBFactory instance |
663 | } |
664 | |
665 | return is_string( $this->cluster ) |
666 | ? "DBCluster:{$this->cluster}:{$this->domain}" |
667 | : "LBFactory:{$this->domain}"; |
668 | } |
669 | |
670 | protected function doGetSiblingQueuesWithJobs( array $types ) { |
671 | $dbr = $this->getReplicaDB(); |
672 | /** @noinspection PhpUnusedLocalVariableInspection */ |
673 | $scope = $this->getScopedNoTrxFlag( $dbr ); |
674 | // @note: this does not check whether the jobs are claimed or not. |
675 | // This is useful so JobQueueGroup::pop() also sees queues that only |
676 | // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue |
677 | // failed jobs so that they can be popped again for that edge case. |
678 | $res = $dbr->newSelectQueryBuilder() |
679 | ->select( 'job_cmd' ) |
680 | ->distinct() |
681 | ->from( 'job' ) |
682 | ->where( [ 'job_cmd' => $types ] ) |
683 | ->caller( __METHOD__ )->fetchResultSet(); |
684 | |
685 | $types = []; |
686 | foreach ( $res as $row ) { |
687 | $types[] = $row->job_cmd; |
688 | } |
689 | |
690 | return $types; |
691 | } |
692 | |
693 | protected function doGetSiblingQueueSizes( array $types ) { |
694 | $dbr = $this->getReplicaDB(); |
695 | /** @noinspection PhpUnusedLocalVariableInspection */ |
696 | $scope = $this->getScopedNoTrxFlag( $dbr ); |
697 | |
698 | $res = $dbr->newSelectQueryBuilder() |
699 | ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] ) |
700 | ->from( 'job' ) |
701 | ->where( [ 'job_cmd' => $types ] ) |
702 | ->groupBy( 'job_cmd' ) |
703 | ->caller( __METHOD__ )->fetchResultSet(); |
704 | |
705 | $sizes = []; |
706 | foreach ( $res as $row ) { |
707 | $sizes[$row->job_cmd] = (int)$row->count; |
708 | } |
709 | |
710 | return $sizes; |
711 | } |
712 | |
713 | /** |
714 | * Recycle or destroy any jobs that have been claimed for too long |
715 | * |
716 | * @return int Number of jobs recycled/deleted |
717 | */ |
718 | public function recycleAndDeleteStaleJobs() { |
719 | $now = time(); |
720 | $count = 0; // affected rows |
721 | $dbw = $this->getPrimaryDB(); |
722 | /** @noinspection PhpUnusedLocalVariableInspection */ |
723 | $scope = $this->getScopedNoTrxFlag( $dbw ); |
724 | |
725 | try { |
726 | if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { |
727 | return $count; // already in progress |
728 | } |
729 | |
730 | // Remove claims on jobs acquired for too long if enabled... |
731 | if ( $this->claimTTL > 0 ) { |
732 | $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); |
733 | // Get the IDs of jobs that have be claimed but not finished after too long. |
734 | // These jobs can be recycled into the queue by expiring the claim. Selecting |
735 | // the IDs first means that the UPDATE can be done by primary key (less deadlocks). |
736 | $res = $dbw->newSelectQueryBuilder() |
737 | ->select( 'job_id' ) |
738 | ->from( 'job' ) |
739 | ->where( |
740 | [ |
741 | 'job_cmd' => $this->type, |
742 | $dbw->expr( 'job_token', '!=', '' ), // was acquired |
743 | $dbw->expr( 'job_token_timestamp', '<', $claimCutoff ), // stale |
744 | $dbw->expr( 'job_attempts', '<', $this->maxTries ), // retries left |
745 | ] |
746 | ) |
747 | ->caller( __METHOD__ )->fetchResultSet(); |
748 | $ids = array_map( |
749 | static function ( $o ) { |
750 | return $o->job_id; |
751 | }, iterator_to_array( $res ) |
752 | ); |
753 | if ( count( $ids ) ) { |
754 | // Reset job_token for these jobs so that other runners will pick them up. |
755 | // Set the timestamp to the current time, as it is useful to now that the job |
756 | // was already tried before (the timestamp becomes the "released" time). |
757 | $dbw->newUpdateQueryBuilder() |
758 | ->update( 'job' ) |
759 | ->set( [ |
760 | 'job_token' => '', |
761 | 'job_token_timestamp' => $dbw->timestamp( $now ) // time of release |
762 | ] ) |
763 | ->where( [ |
764 | 'job_id' => $ids, |
765 | $dbw->expr( 'job_token', '!=', '' ), |
766 | ] ) |
767 | ->caller( __METHOD__ )->execute(); |
768 | |
769 | $affected = $dbw->affectedRows(); |
770 | $count += $affected; |
771 | $this->incrStats( 'recycles', $this->type, $affected ); |
772 | } |
773 | } |
774 | |
775 | // Just destroy any stale jobs... |
776 | $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); |
777 | $qb = $dbw->newSelectQueryBuilder() |
778 | ->select( 'job_id' ) |
779 | ->from( 'job' ) |
780 | ->where( |
781 | [ |
782 | 'job_cmd' => $this->type, |
783 | $dbw->expr( 'job_token', '!=', '' ), // was acquired |
784 | $dbw->expr( 'job_token_timestamp', '<', $pruneCutoff ) // stale |
785 | ] |
786 | ); |
787 | if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... |
788 | $qb->andWhere( $dbw->expr( 'job_attempts', '>=', $this->maxTries ) ); |
789 | } |
790 | // Get the IDs of jobs that are considered stale and should be removed. Selecting |
791 | // the IDs first means that the UPDATE can be done by primary key (less deadlocks). |
792 | $res = $qb->caller( __METHOD__ )->fetchResultSet(); |
793 | $ids = array_map( |
794 | static function ( $o ) { |
795 | return $o->job_id; |
796 | }, iterator_to_array( $res ) |
797 | ); |
798 | if ( count( $ids ) ) { |
799 | $dbw->newDeleteQueryBuilder() |
800 | ->deleteFrom( 'job' ) |
801 | ->where( [ 'job_id' => $ids ] ) |
802 | ->caller( __METHOD__ )->execute(); |
803 | $affected = $dbw->affectedRows(); |
804 | $count += $affected; |
805 | $this->incrStats( 'abandons', $this->type, $affected ); |
806 | } |
807 | |
808 | $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); |
809 | } catch ( DBError $e ) { |
810 | throw $this->getDBException( $e ); |
811 | } |
812 | |
813 | return $count; |
814 | } |
815 | |
816 | /** |
817 | * @param IJobSpecification $job |
818 | * @param IDatabase $db |
819 | * @return array |
820 | */ |
821 | protected function insertFields( IJobSpecification $job, IDatabase $db ) { |
822 | return [ |
823 | // Fields that describe the nature of the job |
824 | 'job_cmd' => $job->getType(), |
825 | 'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL, |
826 | 'job_title' => $job->getParams()['title'] ?? '', |
827 | 'job_params' => self::makeBlob( $job->getParams() ), |
828 | // Additional job metadata |
829 | 'job_timestamp' => $db->timestamp(), |
830 | 'job_sha1' => Wikimedia\base_convert( |
831 | sha1( serialize( $job->getDeduplicationInfo() ) ), |
832 | 16, 36, 31 |
833 | ), |
834 | 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) |
835 | ]; |
836 | } |
837 | |
838 | /** |
839 | * @throws JobQueueConnectionError |
840 | * @return IDatabase |
841 | */ |
842 | protected function getReplicaDB() { |
843 | try { |
844 | return $this->getDB( DB_REPLICA ); |
845 | } catch ( DBConnectionError $e ) { |
846 | throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); |
847 | } |
848 | } |
849 | |
850 | /** |
851 | * @throws JobQueueConnectionError |
852 | * @return IMaintainableDatabase |
853 | * @since 1.37 |
854 | */ |
855 | protected function getPrimaryDB() { |
856 | try { |
857 | return $this->getDB( DB_PRIMARY ); |
858 | } catch ( DBConnectionError $e ) { |
859 | throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); |
860 | } |
861 | } |
862 | |
863 | /** |
864 | * @param int $index (DB_REPLICA/DB_PRIMARY) |
865 | * @return IMaintainableDatabase |
866 | */ |
867 | protected function getDB( $index ) { |
868 | if ( $this->server ) { |
869 | if ( $this->conn instanceof IDatabase ) { |
870 | return $this->conn; |
871 | } elseif ( $this->conn instanceof DBError ) { |
872 | throw $this->conn; |
873 | } |
874 | |
875 | try { |
876 | $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create( |
877 | $this->server['type'], |
878 | $this->server |
879 | ); |
880 | } catch ( DBError $e ) { |
881 | $this->conn = $e; |
882 | throw $e; |
883 | } |
884 | |
885 | return $this->conn; |
886 | } else { |
887 | $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); |
888 | $lb = is_string( $this->cluster ) |
889 | ? $lbFactory->getExternalLB( $this->cluster ) |
890 | : $lbFactory->getMainLB( $this->domain ); |
891 | |
892 | if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) { |
893 | // Keep a separate connection to avoid contention and deadlocks; |
894 | // However, SQLite has the opposite behavior due to DB-level locking. |
895 | $flags = $lb::CONN_TRX_AUTOCOMMIT; |
896 | } else { |
897 | // Jobs insertion will be deferred until the PRESEND stage to reduce contention. |
898 | $flags = 0; |
899 | } |
900 | |
901 | return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags ); |
902 | } |
903 | } |
904 | |
905 | /** |
906 | * @param IDatabase $db |
907 | * @return ScopedCallback |
908 | */ |
909 | private function getScopedNoTrxFlag( IDatabase $db ) { |
910 | $autoTrx = $db->getFlag( DBO_TRX ); // get current setting |
911 | $db->clearFlag( DBO_TRX ); // make each query its own transaction |
912 | |
913 | return new ScopedCallback( static function () use ( $db, $autoTrx ) { |
914 | if ( $autoTrx ) { |
915 | $db->setFlag( DBO_TRX ); // restore old setting |
916 | } |
917 | } ); |
918 | } |
919 | |
920 | /** |
921 | * @param string $property |
922 | * @return string |
923 | */ |
924 | private function getCacheKey( $property ) { |
925 | $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; |
926 | |
927 | return $this->wanCache->makeGlobalKey( |
928 | 'jobqueue', |
929 | $this->domain, |
930 | $cluster, |
931 | $this->type, |
932 | $property |
933 | ); |
934 | } |
935 | |
936 | /** |
937 | * @param array|false $params |
938 | * @return string |
939 | */ |
940 | protected static function makeBlob( $params ) { |
941 | if ( $params !== false ) { |
942 | return serialize( $params ); |
943 | } else { |
944 | return ''; |
945 | } |
946 | } |
947 | |
948 | /** |
949 | * @param stdClass $row |
950 | * @return RunnableJob |
951 | */ |
952 | protected function jobFromRow( $row ) { |
953 | $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : []; |
954 | if ( !is_array( $params ) ) { // this shouldn't happen |
955 | throw new UnexpectedValueException( |
956 | "Could not unserialize job with ID '{$row->job_id}'." ); |
957 | } |
958 | |
959 | $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ]; |
960 | $job = $this->factoryJob( $row->job_cmd, $params ); |
961 | $job->setMetadata( 'id', $row->job_id ); |
962 | $job->setMetadata( 'timestamp', $row->job_timestamp ); |
963 | |
964 | return $job; |
965 | } |
966 | |
967 | /** |
968 | * @param DBError $e |
969 | * @return JobQueueError |
970 | */ |
971 | protected function getDBException( DBError $e ) { |
972 | return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); |
973 | } |
974 | |
975 | /** |
976 | * Return the list of job fields that should be selected. |
977 | * @since 1.23 |
978 | * @return array |
979 | */ |
980 | public static function selectFields() { |
981 | return [ |
982 | 'job_id', |
983 | 'job_cmd', |
984 | 'job_namespace', |
985 | 'job_title', |
986 | 'job_timestamp', |
987 | 'job_params', |
988 | 'job_random', |
989 | 'job_attempts', |
990 | 'job_token', |
991 | 'job_token_timestamp', |
992 | 'job_sha1', |
993 | ]; |
994 | } |
995 | } |