Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
53.15% |
245 / 461 |
|
23.53% |
8 / 34 |
CRAP | |
0.00% |
0 / 1 |
JobQueueDB | |
53.26% |
245 / 460 |
|
23.53% |
8 / 34 |
1099.72 | |
0.00% |
0 / 1 |
__construct | |
42.86% |
3 / 7 |
|
0.00% |
0 / 1 |
6.99 | |||
supportedOrders | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
optimalOrder | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
doIsEmpty | |
77.78% |
7 / 9 |
|
0.00% |
0 / 1 |
2.04 | |||
doGetSize | |
76.92% |
10 / 13 |
|
0.00% |
0 / 1 |
3.11 | |||
doGetAcquiredCount | |
77.78% |
14 / 18 |
|
0.00% |
0 / 1 |
4.18 | |||
doGetAbandonedCount | |
0.00% |
0 / 21 |
|
0.00% |
0 / 1 |
20 | |||
doBatchPush | |
100.00% |
9 / 9 |
|
100.00% |
1 / 1 |
1 | |||
doBatchPushInternal | |
54.76% |
23 / 42 |
|
0.00% |
0 / 1 |
19.26 | |||
doPop | |
76.47% |
13 / 17 |
|
0.00% |
0 / 1 |
6.47 | |||
claimRandom | |
96.49% |
55 / 57 |
|
0.00% |
0 / 1 |
9 | |||
claimOldest | |
0.00% |
0 / 40 |
|
0.00% |
0 / 1 |
20 | |||
doAck | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
12 | |||
doDeduplicateRootJob | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
2 | |||
doDelete | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
6 | |||
doWaitForBackups | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
doFlushCaches | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
getAllQueuedJobs | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getAllAcquiredJobs | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
getAllAbandonedJobs | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
2 | |||
getJobIterator | |
84.62% |
11 / 13 |
|
0.00% |
0 / 1 |
2.01 | |||
getCoalesceLocationInternal | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
12 | |||
doGetSiblingQueuesWithJobs | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
6 | |||
doGetSiblingQueueSizes | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
6 | |||
recycleAndDeleteStaleJobs | |
62.86% |
44 / 70 |
|
0.00% |
0 / 1 |
9.51 | |||
insertFields | |
100.00% |
12 / 12 |
|
100.00% |
1 / 1 |
1 | |||
getReplicaDB | |
33.33% |
1 / 3 |
|
0.00% |
0 / 1 |
3.19 | |||
getPrimaryDB | |
33.33% |
1 / 3 |
|
0.00% |
0 / 1 |
3.19 | |||
getDB | |
33.33% |
7 / 21 |
|
0.00% |
0 / 1 |
21.52 | |||
getCacheKey | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
2 | |||
makeBlob | |
66.67% |
2 / 3 |
|
0.00% |
0 / 1 |
2.15 | |||
jobFromRow | |
77.78% |
7 / 9 |
|
0.00% |
0 / 1 |
3.10 | |||
getDBException | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
selectFields | |
100.00% |
13 / 13 |
|
100.00% |
1 / 1 |
1 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | |
21 | namespace MediaWiki\JobQueue; |
22 | |
23 | use MappedIterator; |
24 | use MediaWiki\JobQueue\Exceptions\JobQueueConnectionError; |
25 | use MediaWiki\JobQueue\Exceptions\JobQueueError; |
26 | use MediaWiki\MediaWikiServices; |
27 | use Profiler; |
28 | use stdClass; |
29 | use UnexpectedValueException; |
30 | use Wikimedia\Rdbms\DBConnectionError; |
31 | use Wikimedia\Rdbms\DBError; |
32 | use Wikimedia\Rdbms\IDatabase; |
33 | use Wikimedia\Rdbms\IMaintainableDatabase; |
34 | use Wikimedia\Rdbms\IReadableDatabase; |
35 | use Wikimedia\Rdbms\RawSQLValue; |
36 | use Wikimedia\Rdbms\SelectQueryBuilder; |
37 | use Wikimedia\Rdbms\ServerInfo; |
38 | use Wikimedia\ScopedCallback; |
39 | |
40 | /** |
41 | * Database-backed job queue storage. |
42 | * |
43 | * @since 1.21 |
44 | * @ingroup JobQueue |
45 | */ |
46 | class JobQueueDB extends JobQueue { |
47 | /* seconds to cache info without re-validating */ |
48 | private const CACHE_TTL_SHORT = 30; |
49 | /* seconds a job can live once claimed */ |
50 | private const MAX_AGE_PRUNE = 7 * 24 * 3600; |
51 | /** |
52 | * Used for job_random, the highest safe 32-bit signed integer. |
53 | * Equivalent to `( 2 ** 31 ) - 1` on 64-bit. |
54 | */ |
55 | private const MAX_JOB_RANDOM = 2_147_483_647; |
56 | /* maximum number of rows to skip */ |
57 | private const MAX_OFFSET = 255; |
58 | |
59 | /** @var IMaintainableDatabase|DBError|null */ |
60 | protected $conn; |
61 | |
62 | /** @var array|null Server configuration array */ |
63 | protected $server; |
64 | /** @var string|null Name of an external DB cluster or null for the local DB cluster */ |
65 | protected $cluster; |
66 | |
67 | /** |
68 | * Additional parameters include: |
69 | * - server : Server configuration array for Database::factory. Overrides "cluster". |
70 | * - cluster : The name of an external cluster registered via LBFactory. |
71 | * If not specified, the primary DB cluster for the wiki will be used. |
72 | * This can be overridden with a custom cluster so that DB handles will |
73 | * be retrieved via LBFactory::getExternalLB() and getConnection(). |
74 | */ |
75 | protected function __construct( array $params ) { |
76 | parent::__construct( $params ); |
77 | |
78 | if ( isset( $params['server'] ) ) { |
79 | $this->server = $params['server']; |
80 | // Always use autocommit mode, even if DBO_TRX is configured |
81 | $this->server['flags'] ??= 0; |
82 | $this->server['flags'] &= ~( IDatabase::DBO_TRX | IDatabase::DBO_DEFAULT ); |
83 | } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) { |
84 | $this->cluster = $params['cluster']; |
85 | } |
86 | } |
87 | |
88 | protected function supportedOrders() { |
89 | return [ 'random', 'timestamp', 'fifo' ]; |
90 | } |
91 | |
92 | protected function optimalOrder() { |
93 | return 'random'; |
94 | } |
95 | |
96 | /** |
97 | * @see JobQueue::doIsEmpty() |
98 | * @return bool |
99 | */ |
100 | protected function doIsEmpty() { |
101 | $dbr = $this->getReplicaDB(); |
102 | try { |
103 | // unclaimed job |
104 | $found = (bool)$dbr->newSelectQueryBuilder() |
105 | ->select( '1' ) |
106 | ->from( 'job' ) |
107 | ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] ) |
108 | ->caller( __METHOD__ )->fetchField(); |
109 | } catch ( DBError $e ) { |
110 | throw $this->getDBException( $e ); |
111 | } |
112 | |
113 | return !$found; |
114 | } |
115 | |
116 | /** |
117 | * @see JobQueue::doGetSize() |
118 | * @return int |
119 | */ |
120 | protected function doGetSize() { |
121 | $key = $this->getCacheKey( 'size' ); |
122 | |
123 | $size = $this->wanCache->get( $key ); |
124 | if ( is_int( $size ) ) { |
125 | return $size; |
126 | } |
127 | |
128 | $dbr = $this->getReplicaDB(); |
129 | try { |
130 | $size = $dbr->newSelectQueryBuilder() |
131 | ->from( 'job' ) |
132 | ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] ) |
133 | ->caller( __METHOD__ )->fetchRowCount(); |
134 | } catch ( DBError $e ) { |
135 | throw $this->getDBException( $e ); |
136 | } |
137 | $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT ); |
138 | |
139 | return $size; |
140 | } |
141 | |
142 | /** |
143 | * @see JobQueue::doGetAcquiredCount() |
144 | * @return int |
145 | */ |
146 | protected function doGetAcquiredCount() { |
147 | if ( $this->claimTTL <= 0 ) { |
148 | return 0; // no acknowledgements |
149 | } |
150 | |
151 | $key = $this->getCacheKey( 'acquiredcount' ); |
152 | |
153 | $count = $this->wanCache->get( $key ); |
154 | if ( is_int( $count ) ) { |
155 | return $count; |
156 | } |
157 | |
158 | $dbr = $this->getReplicaDB(); |
159 | try { |
160 | $count = $dbr->newSelectQueryBuilder() |
161 | ->from( 'job' ) |
162 | ->where( [ |
163 | 'job_cmd' => $this->type, |
164 | $dbr->expr( 'job_token', '!=', '' ), |
165 | ] ) |
166 | ->caller( __METHOD__ )->fetchRowCount(); |
167 | } catch ( DBError $e ) { |
168 | throw $this->getDBException( $e ); |
169 | } |
170 | $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT ); |
171 | |
172 | return $count; |
173 | } |
174 | |
175 | /** |
176 | * @see JobQueue::doGetAbandonedCount() |
177 | * @return int |
178 | * @throws JobQueueConnectionError |
179 | * @throws JobQueueError |
180 | */ |
181 | protected function doGetAbandonedCount() { |
182 | if ( $this->claimTTL <= 0 ) { |
183 | return 0; // no acknowledgements |
184 | } |
185 | |
186 | $key = $this->getCacheKey( 'abandonedcount' ); |
187 | |
188 | $count = $this->wanCache->get( $key ); |
189 | if ( is_int( $count ) ) { |
190 | return $count; |
191 | } |
192 | |
193 | $dbr = $this->getReplicaDB(); |
194 | try { |
195 | $count = $dbr->newSelectQueryBuilder() |
196 | ->from( 'job' ) |
197 | ->where( |
198 | [ |
199 | 'job_cmd' => $this->type, |
200 | $dbr->expr( 'job_token', '!=', '' ), |
201 | $dbr->expr( 'job_attempts', '>=', $this->maxTries ), |
202 | ] |
203 | ) |
204 | ->caller( __METHOD__ )->fetchRowCount(); |
205 | } catch ( DBError $e ) { |
206 | throw $this->getDBException( $e ); |
207 | } |
208 | |
209 | $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT ); |
210 | |
211 | return $count; |
212 | } |
213 | |
214 | /** |
215 | * @see JobQueue::doBatchPush() |
216 | * @param IJobSpecification[] $jobs |
217 | * @param int $flags |
218 | * @throws DBError|\Exception |
219 | * @return void |
220 | */ |
221 | protected function doBatchPush( array $jobs, $flags ) { |
222 | // Silence expectations related to getting a primary DB, as we have to get a primary DB to insert the job. |
223 | $transactionProfiler = Profiler::instance()->getTransactionProfiler(); |
224 | $scope = $transactionProfiler->silenceForScope(); |
225 | $dbw = $this->getPrimaryDB(); |
226 | ScopedCallback::consume( $scope ); |
227 | // In general, there will be two cases here: |
228 | // a) sqlite; DB connection is probably a regular round-aware handle. |
229 | // If the connection is busy with a transaction, then defer the job writes |
230 | // until right before the main round commit step. Any errors that bubble |
231 | // up will rollback the main commit round. |
232 | // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle. |
233 | // No transaction is active nor will be started by writes, so enqueue the jobs |
234 | // now so that any errors will show up immediately as the interface expects. Any |
235 | // errors that bubble up will rollback the main commit round. |
236 | $fname = __METHOD__; |
237 | $dbw->onTransactionPreCommitOrIdle( |
238 | fn () => $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname ), |
239 | $fname |
240 | ); |
241 | } |
242 | |
243 | /** |
244 | * This function should *not* be called outside of JobQueueDB |
245 | * |
246 | * @suppress SecurityCheck-SQLInjection Bug in phan-taint-check handling bulk inserts |
247 | * @param IDatabase $dbw |
248 | * @param IJobSpecification[] $jobs |
249 | * @param int $flags |
250 | * @param string $method |
251 | * @throws DBError |
252 | * @return void |
253 | */ |
254 | public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { |
255 | if ( $jobs === [] ) { |
256 | return; |
257 | } |
258 | |
259 | $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated |
260 | $rowList = []; // list of jobs for jobs that are not de-duplicated |
261 | foreach ( $jobs as $job ) { |
262 | $row = $this->insertFields( $job, $dbw ); |
263 | if ( $job->ignoreDuplicates() ) { |
264 | $rowSet[$row['job_sha1']] = $row; |
265 | } else { |
266 | $rowList[] = $row; |
267 | } |
268 | } |
269 | |
270 | if ( $flags & self::QOS_ATOMIC ) { |
271 | $dbw->startAtomic( $method ); // wrap all the job additions in one transaction |
272 | } |
273 | try { |
274 | // Strip out any duplicate jobs that are already in the queue... |
275 | if ( count( $rowSet ) ) { |
276 | $res = $dbw->newSelectQueryBuilder() |
277 | ->select( 'job_sha1' ) |
278 | ->from( 'job' ) |
279 | ->where( |
280 | [ |
281 | // No job_type condition since it's part of the job_sha1 hash |
282 | 'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ), |
283 | 'job_token' => '' // unclaimed |
284 | ] |
285 | ) |
286 | ->caller( $method )->fetchResultSet(); |
287 | foreach ( $res as $row ) { |
288 | wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." ); |
289 | unset( $rowSet[$row->job_sha1] ); // already enqueued |
290 | } |
291 | } |
292 | // Build the full list of job rows to insert |
293 | $rows = array_merge( $rowList, array_values( $rowSet ) ); |
294 | // Silence expectations related to inserting to the job table, because we have to perform the inserts to |
295 | // track the job. |
296 | $transactionProfiler = Profiler::instance()->getTransactionProfiler(); |
297 | $scope = $transactionProfiler->silenceForScope(); |
298 | // Insert the job rows in chunks to avoid replica DB lag... |
299 | foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { |
300 | $dbw->newInsertQueryBuilder() |
301 | ->insertInto( 'job' ) |
302 | ->rows( $rowBatch ) |
303 | ->caller( $method )->execute(); |
304 | } |
305 | ScopedCallback::consume( $scope ); |
306 | $this->incrStats( 'inserts', $this->type, count( $rows ) ); |
307 | $this->incrStats( 'dupe_inserts', $this->type, |
308 | count( $rowSet ) + count( $rowList ) - count( $rows ) |
309 | ); |
310 | } catch ( DBError $e ) { |
311 | throw $this->getDBException( $e ); |
312 | } |
313 | if ( $flags & self::QOS_ATOMIC ) { |
314 | $dbw->endAtomic( $method ); |
315 | } |
316 | } |
317 | |
318 | /** |
319 | * @see JobQueue::doPop() |
320 | * @return RunnableJob|false |
321 | */ |
322 | protected function doPop() { |
323 | $job = false; // job popped off |
324 | try { |
325 | $uuid = wfRandomString( 32 ); // pop attempt |
326 | do { // retry when our row is invalid or deleted as a duplicate |
327 | // Try to reserve a row in the DB... |
328 | if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) { |
329 | $row = $this->claimOldest( $uuid ); |
330 | } else { // random first |
331 | $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs |
332 | $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand |
333 | $row = $this->claimRandom( $uuid, $rand, $gte ); |
334 | } |
335 | // Check if we found a row to reserve... |
336 | if ( !$row ) { |
337 | break; // nothing to do |
338 | } |
339 | $this->incrStats( 'pops', $this->type ); |
340 | |
341 | // Get the job object from the row... |
342 | $job = $this->jobFromRow( $row ); |
343 | break; // done |
344 | } while ( true ); |
345 | |
346 | if ( !$job || mt_rand( 0, 9 ) == 0 ) { |
347 | // Handled jobs that need to be recycled/deleted; |
348 | // any recycled jobs will be picked up next attempt |
349 | $this->recycleAndDeleteStaleJobs(); |
350 | } |
351 | } catch ( DBError $e ) { |
352 | throw $this->getDBException( $e ); |
353 | } |
354 | |
355 | return $job; |
356 | } |
357 | |
358 | /** |
359 | * Reserve a row with a single UPDATE without holding row locks over RTTs... |
360 | * |
361 | * @param string $uuid 32 char hex string |
362 | * @param int $rand Random unsigned integer (31 bits) |
363 | * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random) |
364 | * @return stdClass|false Row|false |
365 | */ |
366 | protected function claimRandom( $uuid, $rand, $gte ) { |
367 | $dbw = $this->getPrimaryDB(); |
368 | // Check cache to see if the queue has <= OFFSET items |
369 | $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) ); |
370 | |
371 | $invertedDirection = false; // whether one job_random direction was already scanned |
372 | // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT |
373 | // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is |
374 | // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot |
375 | // be used here with MySQL. |
376 | do { |
377 | if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows |
378 | // For small queues, using OFFSET will overshoot and return no rows more often. |
379 | // Instead, this uses job_random to pick a row (possibly checking both directions). |
380 | $row = $dbw->newSelectQueryBuilder() |
381 | ->select( self::selectFields() ) |
382 | ->from( 'job' ) |
383 | ->where( |
384 | [ |
385 | 'job_cmd' => $this->type, |
386 | 'job_token' => '', // unclaimed |
387 | $dbw->expr( 'job_random', $gte ? '>=' : '<=', $rand ) |
388 | ] |
389 | ) |
390 | ->orderBy( |
391 | 'job_random', |
392 | $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC |
393 | ) |
394 | ->caller( __METHOD__ )->fetchRow(); |
395 | if ( !$row && !$invertedDirection ) { |
396 | $gte = !$gte; |
397 | $invertedDirection = true; |
398 | continue; // try the other direction |
399 | } |
400 | } else { // table *may* have >= MAX_OFFSET rows |
401 | // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU |
402 | // in MySQL if there are many rows for some reason. This uses a small OFFSET |
403 | // instead of job_random for reducing excess claim retries. |
404 | $row = $dbw->newSelectQueryBuilder() |
405 | ->select( self::selectFields() ) |
406 | ->from( 'job' ) |
407 | ->where( |
408 | [ |
409 | 'job_cmd' => $this->type, |
410 | 'job_token' => '', // unclaimed |
411 | ] |
412 | ) |
413 | ->offset( mt_rand( 0, self::MAX_OFFSET ) ) |
414 | ->caller( __METHOD__ )->fetchRow(); |
415 | if ( !$row ) { |
416 | $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows |
417 | $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 ); |
418 | continue; // use job_random |
419 | } |
420 | } |
421 | |
422 | if ( !$row ) { |
423 | break; |
424 | } |
425 | |
426 | $dbw->newUpdateQueryBuilder() |
427 | ->update( 'job' ) // update by PK |
428 | ->set( [ |
429 | 'job_token' => $uuid, |
430 | 'job_token_timestamp' => $dbw->timestamp(), |
431 | 'job_attempts' => new RawSQLValue( 'job_attempts+1' ), |
432 | ] ) |
433 | ->where( [ |
434 | 'job_cmd' => $this->type, |
435 | 'job_id' => $row->job_id, |
436 | 'job_token' => '' |
437 | ] ) |
438 | ->caller( __METHOD__ )->execute(); |
439 | |
440 | // This might get raced out by another runner when claiming the previously |
441 | // selected row. The use of job_random should minimize this problem, however. |
442 | if ( !$dbw->affectedRows() ) { |
443 | $row = false; // raced out |
444 | } |
445 | } while ( !$row ); |
446 | |
447 | return $row; |
448 | } |
449 | |
450 | /** |
451 | * Reserve a row with a single UPDATE without holding row locks over RTTs... |
452 | * |
453 | * @param string $uuid 32 char hex string |
454 | * @return stdClass|false Row|false |
455 | */ |
456 | protected function claimOldest( $uuid ) { |
457 | $dbw = $this->getPrimaryDB(); |
458 | |
459 | $row = false; // the row acquired |
460 | do { |
461 | if ( $dbw->getType() === 'mysql' ) { |
462 | // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the |
463 | // same table being changed in an UPDATE query in MySQL (gives Error: 1093). |
464 | // Postgres has no such limitation. However, MySQL offers an |
465 | // alternative here by supporting ORDER BY + LIMIT for UPDATE queries. |
466 | $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " . |
467 | "SET " . |
468 | "job_token = {$dbw->addQuotes( $uuid ) }, " . |
469 | "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " . |
470 | "job_attempts = job_attempts+1 " . |
471 | "WHERE ( " . |
472 | "job_cmd = {$dbw->addQuotes( $this->type )} " . |
473 | "AND job_token = {$dbw->addQuotes( '' )} " . |
474 | ") ORDER BY job_id ASC LIMIT 1", |
475 | __METHOD__ |
476 | ); |
477 | } else { |
478 | // Use a subquery to find the job, within an UPDATE to claim it. |
479 | // This uses as much of the DB wrapper functions as possible. |
480 | $qb = $dbw->newSelectQueryBuilder() |
481 | ->select( 'job_id' ) |
482 | ->from( 'job' ) |
483 | ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] ) |
484 | ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC ) |
485 | ->limit( 1 ); |
486 | |
487 | $dbw->newUpdateQueryBuilder() |
488 | ->update( 'job' ) |
489 | ->set( [ |
490 | 'job_token' => $uuid, |
491 | 'job_token_timestamp' => $dbw->timestamp(), |
492 | 'job_attempts' => new RawSQLValue( 'job_attempts+1' ), |
493 | ] ) |
494 | ->where( [ 'job_id' => new RawSQLValue( '(' . $qb->getSQL() . ')' ) ] ) |
495 | ->caller( __METHOD__ )->execute(); |
496 | } |
497 | |
498 | if ( !$dbw->affectedRows() ) { |
499 | break; |
500 | } |
501 | |
502 | // Fetch any row that we just reserved... |
503 | $row = $dbw->newSelectQueryBuilder() |
504 | ->select( self::selectFields() ) |
505 | ->from( 'job' ) |
506 | ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] ) |
507 | ->caller( __METHOD__ )->fetchRow(); |
508 | if ( !$row ) { // raced out by duplicate job removal |
509 | wfDebug( "Row deleted as duplicate by another process." ); |
510 | } |
511 | } while ( !$row ); |
512 | |
513 | return $row; |
514 | } |
515 | |
516 | /** |
517 | * @see JobQueue::doAck() |
518 | * @param RunnableJob $job |
519 | * @throws JobQueueConnectionError |
520 | * @throws JobQueueError |
521 | */ |
522 | protected function doAck( RunnableJob $job ) { |
523 | $id = $job->getMetadata( 'id' ); |
524 | if ( $id === null ) { |
525 | throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no ID." ); |
526 | } |
527 | |
528 | $dbw = $this->getPrimaryDB(); |
529 | try { |
530 | // Delete a row with a single DELETE without holding row locks over RTTs... |
531 | $dbw->newDeleteQueryBuilder() |
532 | ->deleteFrom( 'job' ) |
533 | ->where( [ 'job_cmd' => $this->type, 'job_id' => $id ] ) |
534 | ->caller( __METHOD__ )->execute(); |
535 | |
536 | $this->incrStats( 'acks', $this->type ); |
537 | } catch ( DBError $e ) { |
538 | throw $this->getDBException( $e ); |
539 | } |
540 | } |
541 | |
542 | /** |
543 | * @see JobQueue::doDeduplicateRootJob() |
544 | * @param IJobSpecification $job |
545 | * @throws JobQueueConnectionError |
546 | * @return bool |
547 | */ |
548 | protected function doDeduplicateRootJob( IJobSpecification $job ) { |
549 | // Callers should call JobQueueGroup::push() before this method so that if the |
550 | // insert fails, the de-duplication registration will be aborted. Since the insert |
551 | // is deferred till "transaction idle", do the same here, so that the ordering is |
552 | // maintained. Having only the de-duplication registration succeed would cause |
553 | // jobs to become no-ops without any actual jobs that made them redundant. |
554 | $dbw = $this->getPrimaryDB(); |
555 | $dbw->onTransactionCommitOrIdle( |
556 | function () use ( $job ) { |
557 | parent::doDeduplicateRootJob( $job ); |
558 | }, |
559 | __METHOD__ |
560 | ); |
561 | |
562 | return true; |
563 | } |
564 | |
565 | /** |
566 | * @see JobQueue::doDelete() |
567 | * @return bool |
568 | */ |
569 | protected function doDelete() { |
570 | $dbw = $this->getPrimaryDB(); |
571 | try { |
572 | $dbw->newDeleteQueryBuilder() |
573 | ->deleteFrom( 'job' ) |
574 | ->where( [ 'job_cmd' => $this->type ] ) |
575 | ->caller( __METHOD__ )->execute(); |
576 | } catch ( DBError $e ) { |
577 | throw $this->getDBException( $e ); |
578 | } |
579 | |
580 | return true; |
581 | } |
582 | |
583 | /** |
584 | * @see JobQueue::doWaitForBackups() |
585 | * @return void |
586 | */ |
587 | protected function doWaitForBackups() { |
588 | if ( $this->server ) { |
589 | return; // not using LBFactory instance |
590 | } |
591 | |
592 | $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); |
593 | $lbFactory->waitForReplication(); |
594 | } |
595 | |
596 | /** |
597 | * @return void |
598 | */ |
599 | protected function doFlushCaches() { |
600 | foreach ( [ 'size', 'acquiredcount' ] as $type ) { |
601 | $this->wanCache->delete( $this->getCacheKey( $type ) ); |
602 | } |
603 | } |
604 | |
605 | /** |
606 | * @see JobQueue::getAllQueuedJobs() |
607 | * @return \Iterator<RunnableJob> |
608 | */ |
609 | public function getAllQueuedJobs() { |
610 | return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] ); |
611 | } |
612 | |
613 | /** |
614 | * @see JobQueue::getAllAcquiredJobs() |
615 | * @return \Iterator<RunnableJob> |
616 | */ |
617 | public function getAllAcquiredJobs() { |
618 | $dbr = $this->getReplicaDB(); |
619 | return $this->getJobIterator( [ 'job_cmd' => $this->getType(), $dbr->expr( 'job_token', '>', '' ) ] ); |
620 | } |
621 | |
622 | /** |
623 | * @see JobQueue::getAllAbandonedJobs() |
624 | * @return \Iterator<RunnableJob> |
625 | */ |
626 | public function getAllAbandonedJobs() { |
627 | $dbr = $this->getReplicaDB(); |
628 | return $this->getJobIterator( [ |
629 | 'job_cmd' => $this->getType(), |
630 | $dbr->expr( 'job_token', '>', '' ), |
631 | $dbr->expr( 'job_attempts', '>=', intval( $this->maxTries ) ), |
632 | ] ); |
633 | } |
634 | |
635 | /** |
636 | * @param array $conds Query conditions |
637 | * @return \Iterator<RunnableJob> |
638 | */ |
639 | protected function getJobIterator( array $conds ) { |
640 | $dbr = $this->getReplicaDB(); |
641 | $qb = $dbr->newSelectQueryBuilder() |
642 | ->select( self::selectFields() ) |
643 | ->from( 'job' ) |
644 | ->where( $conds ); |
645 | try { |
646 | return new MappedIterator( |
647 | $qb->caller( __METHOD__ )->fetchResultSet(), |
648 | function ( $row ) { |
649 | return $this->jobFromRow( $row ); |
650 | } |
651 | ); |
652 | } catch ( DBError $e ) { |
653 | throw $this->getDBException( $e ); |
654 | } |
655 | } |
656 | |
657 | public function getCoalesceLocationInternal() { |
658 | if ( $this->server ) { |
659 | return null; // not using the LBFactory instance |
660 | } |
661 | |
662 | return is_string( $this->cluster ) |
663 | ? "DBCluster:{$this->cluster}:{$this->domain}" |
664 | : "LBFactory:{$this->domain}"; |
665 | } |
666 | |
667 | protected function doGetSiblingQueuesWithJobs( array $types ) { |
668 | $dbr = $this->getReplicaDB(); |
669 | // @note: this does not check whether the jobs are claimed or not. |
670 | // This is useful so JobQueueGroup::pop() also sees queues that only |
671 | // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue |
672 | // failed jobs so that they can be popped again for that edge case. |
673 | $res = $dbr->newSelectQueryBuilder() |
674 | ->select( 'job_cmd' ) |
675 | ->distinct() |
676 | ->from( 'job' ) |
677 | ->where( [ 'job_cmd' => $types ] ) |
678 | ->caller( __METHOD__ )->fetchResultSet(); |
679 | |
680 | $types = []; |
681 | foreach ( $res as $row ) { |
682 | $types[] = $row->job_cmd; |
683 | } |
684 | |
685 | return $types; |
686 | } |
687 | |
688 | protected function doGetSiblingQueueSizes( array $types ) { |
689 | $dbr = $this->getReplicaDB(); |
690 | |
691 | $res = $dbr->newSelectQueryBuilder() |
692 | ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] ) |
693 | ->from( 'job' ) |
694 | ->where( [ 'job_cmd' => $types ] ) |
695 | ->groupBy( 'job_cmd' ) |
696 | ->caller( __METHOD__ )->fetchResultSet(); |
697 | |
698 | $sizes = []; |
699 | foreach ( $res as $row ) { |
700 | $sizes[$row->job_cmd] = (int)$row->count; |
701 | } |
702 | |
703 | return $sizes; |
704 | } |
705 | |
706 | /** |
707 | * Recycle or destroy any jobs that have been claimed for too long |
708 | * |
709 | * @return int Number of jobs recycled/deleted |
710 | */ |
711 | public function recycleAndDeleteStaleJobs() { |
712 | $now = time(); |
713 | $count = 0; // affected rows |
714 | $dbw = $this->getPrimaryDB(); |
715 | |
716 | try { |
717 | if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { |
718 | return $count; // already in progress |
719 | } |
720 | |
721 | // Remove claims on jobs acquired for too long if enabled... |
722 | if ( $this->claimTTL > 0 ) { |
723 | $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); |
724 | // Get the IDs of jobs that have be claimed but not finished after too long. |
725 | // These jobs can be recycled into the queue by expiring the claim. Selecting |
726 | // the IDs first means that the UPDATE can be done by primary key (less deadlocks). |
727 | $res = $dbw->newSelectQueryBuilder() |
728 | ->select( 'job_id' ) |
729 | ->from( 'job' ) |
730 | ->where( |
731 | [ |
732 | 'job_cmd' => $this->type, |
733 | $dbw->expr( 'job_token', '!=', '' ), // was acquired |
734 | $dbw->expr( 'job_token_timestamp', '<', $claimCutoff ), // stale |
735 | $dbw->expr( 'job_attempts', '<', $this->maxTries ), // retries left |
736 | ] |
737 | ) |
738 | ->caller( __METHOD__ )->fetchResultSet(); |
739 | $ids = array_map( |
740 | static function ( $o ) { |
741 | return $o->job_id; |
742 | }, iterator_to_array( $res ) |
743 | ); |
744 | if ( count( $ids ) ) { |
745 | // Reset job_token for these jobs so that other runners will pick them up. |
746 | // Set the timestamp to the current time, as it is useful to now that the job |
747 | // was already tried before (the timestamp becomes the "released" time). |
748 | $dbw->newUpdateQueryBuilder() |
749 | ->update( 'job' ) |
750 | ->set( [ |
751 | 'job_token' => '', |
752 | 'job_token_timestamp' => $dbw->timestamp( $now ) // time of release |
753 | ] ) |
754 | ->where( [ |
755 | 'job_id' => $ids, |
756 | $dbw->expr( 'job_token', '!=', '' ), |
757 | ] ) |
758 | ->caller( __METHOD__ )->execute(); |
759 | |
760 | $affected = $dbw->affectedRows(); |
761 | $count += $affected; |
762 | $this->incrStats( 'recycles', $this->type, $affected ); |
763 | } |
764 | } |
765 | |
766 | // Just destroy any stale jobs... |
767 | $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); |
768 | $qb = $dbw->newSelectQueryBuilder() |
769 | ->select( 'job_id' ) |
770 | ->from( 'job' ) |
771 | ->where( |
772 | [ |
773 | 'job_cmd' => $this->type, |
774 | $dbw->expr( 'job_token', '!=', '' ), // was acquired |
775 | $dbw->expr( 'job_token_timestamp', '<', $pruneCutoff ) // stale |
776 | ] |
777 | ); |
778 | if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... |
779 | $qb->andWhere( $dbw->expr( 'job_attempts', '>=', $this->maxTries ) ); |
780 | } |
781 | // Get the IDs of jobs that are considered stale and should be removed. Selecting |
782 | // the IDs first means that the UPDATE can be done by primary key (less deadlocks). |
783 | $res = $qb->caller( __METHOD__ )->fetchResultSet(); |
784 | $ids = array_map( |
785 | static function ( $o ) { |
786 | return $o->job_id; |
787 | }, iterator_to_array( $res ) |
788 | ); |
789 | if ( count( $ids ) ) { |
790 | $dbw->newDeleteQueryBuilder() |
791 | ->deleteFrom( 'job' ) |
792 | ->where( [ 'job_id' => $ids ] ) |
793 | ->caller( __METHOD__ )->execute(); |
794 | $affected = $dbw->affectedRows(); |
795 | $count += $affected; |
796 | $this->incrStats( 'abandons', $this->type, $affected ); |
797 | } |
798 | |
799 | $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); |
800 | } catch ( DBError $e ) { |
801 | throw $this->getDBException( $e ); |
802 | } |
803 | |
804 | return $count; |
805 | } |
806 | |
807 | /** |
808 | * @param IJobSpecification $job |
809 | * @param IReadableDatabase $db |
810 | * @return array |
811 | */ |
812 | protected function insertFields( IJobSpecification $job, IReadableDatabase $db ) { |
813 | return [ |
814 | // Fields that describe the nature of the job |
815 | 'job_cmd' => $job->getType(), |
816 | 'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL, |
817 | 'job_title' => $job->getParams()['title'] ?? '', |
818 | 'job_params' => self::makeBlob( $job->getParams() ), |
819 | // Additional job metadata |
820 | 'job_timestamp' => $db->timestamp(), |
821 | 'job_sha1' => \Wikimedia\base_convert( |
822 | sha1( serialize( $job->getDeduplicationInfo() ) ), |
823 | 16, 36, 31 |
824 | ), |
825 | 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) |
826 | ]; |
827 | } |
828 | |
829 | /** |
830 | * @throws JobQueueConnectionError |
831 | * @return IDatabase |
832 | */ |
833 | protected function getReplicaDB() { |
834 | try { |
835 | return $this->getDB( DB_REPLICA ); |
836 | } catch ( DBConnectionError $e ) { |
837 | throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); |
838 | } |
839 | } |
840 | |
841 | /** |
842 | * @throws JobQueueConnectionError |
843 | * @return IMaintainableDatabase |
844 | * @since 1.37 |
845 | */ |
846 | protected function getPrimaryDB() { |
847 | try { |
848 | return $this->getDB( DB_PRIMARY ); |
849 | } catch ( DBConnectionError $e ) { |
850 | throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); |
851 | } |
852 | } |
853 | |
854 | /** |
855 | * @param int $index (DB_REPLICA/DB_PRIMARY) |
856 | * @return IMaintainableDatabase |
857 | */ |
858 | protected function getDB( $index ) { |
859 | if ( $this->server ) { |
860 | if ( $this->conn instanceof IDatabase ) { |
861 | return $this->conn; |
862 | } elseif ( $this->conn instanceof DBError ) { |
863 | throw $this->conn; |
864 | } |
865 | |
866 | try { |
867 | $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create( |
868 | $this->server['type'], |
869 | $this->server |
870 | ); |
871 | } catch ( DBError $e ) { |
872 | $this->conn = $e; |
873 | throw $e; |
874 | } |
875 | |
876 | return $this->conn; |
877 | } else { |
878 | $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); |
879 | $lb = is_string( $this->cluster ) |
880 | ? $lbFactory->getExternalLB( $this->cluster ) |
881 | : $lbFactory->getMainLB( $this->domain ); |
882 | |
883 | if ( $lb->getServerType( ServerInfo::WRITER_INDEX ) !== 'sqlite' ) { |
884 | // Keep a separate connection to avoid contention and deadlocks; |
885 | // However, SQLite has the opposite behavior due to DB-level locking. |
886 | $flags = $lb::CONN_TRX_AUTOCOMMIT; |
887 | } else { |
888 | // Jobs insertion will be deferred until the PRESEND stage to reduce contention. |
889 | $flags = 0; |
890 | } |
891 | |
892 | return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags ); |
893 | } |
894 | } |
895 | |
896 | /** |
897 | * @param string $property |
898 | * @return string |
899 | */ |
900 | private function getCacheKey( $property ) { |
901 | $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; |
902 | |
903 | return $this->wanCache->makeGlobalKey( |
904 | 'jobqueue', |
905 | $this->domain, |
906 | $cluster, |
907 | $this->type, |
908 | $property |
909 | ); |
910 | } |
911 | |
912 | /** |
913 | * @param array|false $params |
914 | * @return string |
915 | */ |
916 | protected static function makeBlob( $params ) { |
917 | if ( $params !== false ) { |
918 | return serialize( $params ); |
919 | } else { |
920 | return ''; |
921 | } |
922 | } |
923 | |
924 | /** |
925 | * @param stdClass $row |
926 | * @return RunnableJob |
927 | */ |
928 | protected function jobFromRow( $row ) { |
929 | $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : []; |
930 | if ( !is_array( $params ) ) { // this shouldn't happen |
931 | throw new UnexpectedValueException( |
932 | "Could not unserialize job with ID '{$row->job_id}'." ); |
933 | } |
934 | |
935 | $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ]; |
936 | $job = $this->factoryJob( $row->job_cmd, $params ); |
937 | $job->setMetadata( 'id', $row->job_id ); |
938 | $job->setMetadata( 'timestamp', $row->job_timestamp ); |
939 | |
940 | return $job; |
941 | } |
942 | |
943 | /** |
944 | * @param DBError $e |
945 | * @return JobQueueError |
946 | */ |
947 | protected function getDBException( DBError $e ) { |
948 | return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); |
949 | } |
950 | |
951 | /** |
952 | * Return the list of job fields that should be selected. |
953 | * @since 1.23 |
954 | * @return array |
955 | */ |
956 | public static function selectFields() { |
957 | return [ |
958 | 'job_id', |
959 | 'job_cmd', |
960 | 'job_namespace', |
961 | 'job_title', |
962 | 'job_timestamp', |
963 | 'job_params', |
964 | 'job_random', |
965 | 'job_attempts', |
966 | 'job_token', |
967 | 'job_token_timestamp', |
968 | 'job_sha1', |
969 | ]; |
970 | } |
971 | } |
972 | |
973 | /** @deprecated class alias since 1.44 */ |
974 | class_alias( JobQueueDB::class, 'JobQueueDB' ); |