Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 456 |
|
0.00% |
0 / 34 |
CRAP | |
0.00% |
0 / 1 |
JobQueueDB | |
0.00% |
0 / 456 |
|
0.00% |
0 / 34 |
9900 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
supportedOrders | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
optimalOrder | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doIsEmpty | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
6 | |||
doGetSize | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
12 | |||
doGetAcquiredCount | |
0.00% |
0 / 18 |
|
0.00% |
0 / 1 |
20 | |||
doGetAbandonedCount | |
0.00% |
0 / 21 |
|
0.00% |
0 / 1 |
20 | |||
doBatchPush | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
2 | |||
doBatchPushInternal | |
0.00% |
0 / 39 |
|
0.00% |
0 / 1 |
110 | |||
doPop | |
0.00% |
0 / 17 |
|
0.00% |
0 / 1 |
42 | |||
claimRandom | |
0.00% |
0 / 57 |
|
0.00% |
0 / 1 |
90 | |||
claimOldest | |
0.00% |
0 / 40 |
|
0.00% |
0 / 1 |
20 | |||
doAck | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
12 | |||
doDeduplicateRootJob | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
2 | |||
doDelete | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
6 | |||
doWaitForBackups | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
doFlushCaches | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
getAllQueuedJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getAllAcquiredJobs | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
getAllAbandonedJobs | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
2 | |||
getJobIterator | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
6 | |||
getCoalesceLocationInternal | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
12 | |||
doGetSiblingQueuesWithJobs | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
6 | |||
doGetSiblingQueueSizes | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
6 | |||
recycleAndDeleteStaleJobs | |
0.00% |
0 / 70 |
|
0.00% |
0 / 1 |
56 | |||
insertFields | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
2 | |||
getReplicaDB | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
getPrimaryDB | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
getDB | |
0.00% |
0 / 21 |
|
0.00% |
0 / 1 |
56 | |||
getCacheKey | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
6 | |||
makeBlob | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
jobFromRow | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 | |||
getDBException | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
selectFields | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | use MediaWiki\MediaWikiServices; |
21 | use Wikimedia\Rdbms\DBConnectionError; |
22 | use Wikimedia\Rdbms\DBError; |
23 | use Wikimedia\Rdbms\IDatabase; |
24 | use Wikimedia\Rdbms\IMaintainableDatabase; |
25 | use Wikimedia\Rdbms\IReadableDatabase; |
26 | use Wikimedia\Rdbms\RawSQLValue; |
27 | use Wikimedia\Rdbms\SelectQueryBuilder; |
28 | use Wikimedia\Rdbms\ServerInfo; |
29 | |
30 | /** |
31 | * Database-backed job queue storage. |
32 | * |
33 | * @since 1.21 |
34 | * @ingroup JobQueue |
35 | */ |
36 | class JobQueueDB extends JobQueue { |
37 | /* seconds to cache info without re-validating */ |
38 | private const CACHE_TTL_SHORT = 30; |
39 | /* seconds a job can live once claimed */ |
40 | private const MAX_AGE_PRUNE = 7 * 24 * 3600; |
41 | /** |
42 | * Used for job_random, the highest safe 32-bit signed integer. |
43 | * Equivalent to `( 2 ** 31 ) - 1` on 64-bit. |
44 | */ |
45 | private const MAX_JOB_RANDOM = 2_147_483_647; |
46 | /* maximum number of rows to skip */ |
47 | private const MAX_OFFSET = 255; |
48 | |
49 | /** @var IMaintainableDatabase|DBError|null */ |
50 | protected $conn; |
51 | |
52 | /** @var array|null Server configuration array */ |
53 | protected $server; |
54 | /** @var string|null Name of an external DB cluster or null for the local DB cluster */ |
55 | protected $cluster; |
56 | |
57 | /** |
58 | * Additional parameters include: |
59 | * - server : Server configuration array for Database::factory. Overrides "cluster". |
60 | * - cluster : The name of an external cluster registered via LBFactory. |
61 | * If not specified, the primary DB cluster for the wiki will be used. |
62 | * This can be overridden with a custom cluster so that DB handles will |
63 | * be retrieved via LBFactory::getExternalLB() and getConnection(). |
64 | * @param array $params |
65 | */ |
66 | protected function __construct( array $params ) { |
67 | parent::__construct( $params ); |
68 | |
69 | if ( isset( $params['server'] ) ) { |
70 | $this->server = $params['server']; |
71 | // Always use autocommit mode, even if DBO_TRX is configured |
72 | $this->server['flags'] ??= 0; |
73 | $this->server['flags'] &= ~( IDatabase::DBO_TRX | IDatabase::DBO_DEFAULT ); |
74 | } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) { |
75 | $this->cluster = $params['cluster']; |
76 | } |
77 | } |
78 | |
79 | protected function supportedOrders() { |
80 | return [ 'random', 'timestamp', 'fifo' ]; |
81 | } |
82 | |
83 | protected function optimalOrder() { |
84 | return 'random'; |
85 | } |
86 | |
87 | /** |
88 | * @see JobQueue::doIsEmpty() |
89 | * @return bool |
90 | */ |
91 | protected function doIsEmpty() { |
92 | $dbr = $this->getReplicaDB(); |
93 | try { |
94 | // unclaimed job |
95 | $found = (bool)$dbr->newSelectQueryBuilder() |
96 | ->select( '1' ) |
97 | ->from( 'job' ) |
98 | ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] ) |
99 | ->caller( __METHOD__ )->fetchField(); |
100 | } catch ( DBError $e ) { |
101 | throw $this->getDBException( $e ); |
102 | } |
103 | |
104 | return !$found; |
105 | } |
106 | |
107 | /** |
108 | * @see JobQueue::doGetSize() |
109 | * @return int |
110 | */ |
111 | protected function doGetSize() { |
112 | $key = $this->getCacheKey( 'size' ); |
113 | |
114 | $size = $this->wanCache->get( $key ); |
115 | if ( is_int( $size ) ) { |
116 | return $size; |
117 | } |
118 | |
119 | $dbr = $this->getReplicaDB(); |
120 | try { |
121 | $size = $dbr->newSelectQueryBuilder() |
122 | ->from( 'job' ) |
123 | ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] ) |
124 | ->caller( __METHOD__ )->fetchRowCount(); |
125 | } catch ( DBError $e ) { |
126 | throw $this->getDBException( $e ); |
127 | } |
128 | $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT ); |
129 | |
130 | return $size; |
131 | } |
132 | |
133 | /** |
134 | * @see JobQueue::doGetAcquiredCount() |
135 | * @return int |
136 | */ |
137 | protected function doGetAcquiredCount() { |
138 | if ( $this->claimTTL <= 0 ) { |
139 | return 0; // no acknowledgements |
140 | } |
141 | |
142 | $key = $this->getCacheKey( 'acquiredcount' ); |
143 | |
144 | $count = $this->wanCache->get( $key ); |
145 | if ( is_int( $count ) ) { |
146 | return $count; |
147 | } |
148 | |
149 | $dbr = $this->getReplicaDB(); |
150 | try { |
151 | $count = $dbr->newSelectQueryBuilder() |
152 | ->from( 'job' ) |
153 | ->where( [ |
154 | 'job_cmd' => $this->type, |
155 | $dbr->expr( 'job_token', '!=', '' ), |
156 | ] ) |
157 | ->caller( __METHOD__ )->fetchRowCount(); |
158 | } catch ( DBError $e ) { |
159 | throw $this->getDBException( $e ); |
160 | } |
161 | $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT ); |
162 | |
163 | return $count; |
164 | } |
165 | |
166 | /** |
167 | * @see JobQueue::doGetAbandonedCount() |
168 | * @return int |
169 | * @throws JobQueueConnectionError |
170 | * @throws JobQueueError |
171 | */ |
172 | protected function doGetAbandonedCount() { |
173 | if ( $this->claimTTL <= 0 ) { |
174 | return 0; // no acknowledgements |
175 | } |
176 | |
177 | $key = $this->getCacheKey( 'abandonedcount' ); |
178 | |
179 | $count = $this->wanCache->get( $key ); |
180 | if ( is_int( $count ) ) { |
181 | return $count; |
182 | } |
183 | |
184 | $dbr = $this->getReplicaDB(); |
185 | try { |
186 | $count = $dbr->newSelectQueryBuilder() |
187 | ->from( 'job' ) |
188 | ->where( |
189 | [ |
190 | 'job_cmd' => $this->type, |
191 | $dbr->expr( 'job_token', '!=', '' ), |
192 | $dbr->expr( 'job_attempts', '>=', $this->maxTries ), |
193 | ] |
194 | ) |
195 | ->caller( __METHOD__ )->fetchRowCount(); |
196 | } catch ( DBError $e ) { |
197 | throw $this->getDBException( $e ); |
198 | } |
199 | |
200 | $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT ); |
201 | |
202 | return $count; |
203 | } |
204 | |
205 | /** |
206 | * @see JobQueue::doBatchPush() |
207 | * @param IJobSpecification[] $jobs |
208 | * @param int $flags |
209 | * @throws DBError|Exception |
210 | * @return void |
211 | */ |
212 | protected function doBatchPush( array $jobs, $flags ) { |
213 | $dbw = $this->getPrimaryDB(); |
214 | // In general, there will be two cases here: |
215 | // a) sqlite; DB connection is probably a regular round-aware handle. |
216 | // If the connection is busy with a transaction, then defer the job writes |
217 | // until right before the main round commit step. Any errors that bubble |
218 | // up will rollback the main commit round. |
219 | // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle. |
220 | // No transaction is active nor will be started by writes, so enqueue the jobs |
221 | // now so that any errors will show up immediately as the interface expects. Any |
222 | // errors that bubble up will rollback the main commit round. |
223 | $fname = __METHOD__; |
224 | $dbw->onTransactionPreCommitOrIdle( |
225 | function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) { |
226 | $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname ); |
227 | }, |
228 | $fname |
229 | ); |
230 | } |
231 | |
232 | /** |
233 | * This function should *not* be called outside of JobQueueDB |
234 | * |
235 | * @suppress SecurityCheck-SQLInjection Bug in phan-taint-check handling bulk inserts |
236 | * @param IDatabase $dbw |
237 | * @param IJobSpecification[] $jobs |
238 | * @param int $flags |
239 | * @param string $method |
240 | * @throws DBError |
241 | * @return void |
242 | */ |
243 | public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { |
244 | if ( $jobs === [] ) { |
245 | return; |
246 | } |
247 | |
248 | $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated |
249 | $rowList = []; // list of jobs for jobs that are not de-duplicated |
250 | foreach ( $jobs as $job ) { |
251 | $row = $this->insertFields( $job, $dbw ); |
252 | if ( $job->ignoreDuplicates() ) { |
253 | $rowSet[$row['job_sha1']] = $row; |
254 | } else { |
255 | $rowList[] = $row; |
256 | } |
257 | } |
258 | |
259 | if ( $flags & self::QOS_ATOMIC ) { |
260 | $dbw->startAtomic( $method ); // wrap all the job additions in one transaction |
261 | } |
262 | try { |
263 | // Strip out any duplicate jobs that are already in the queue... |
264 | if ( count( $rowSet ) ) { |
265 | $res = $dbw->newSelectQueryBuilder() |
266 | ->select( 'job_sha1' ) |
267 | ->from( 'job' ) |
268 | ->where( |
269 | [ |
270 | // No job_type condition since it's part of the job_sha1 hash |
271 | 'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ), |
272 | 'job_token' => '' // unclaimed |
273 | ] |
274 | ) |
275 | ->caller( $method )->fetchResultSet(); |
276 | foreach ( $res as $row ) { |
277 | wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." ); |
278 | unset( $rowSet[$row->job_sha1] ); // already enqueued |
279 | } |
280 | } |
281 | // Build the full list of job rows to insert |
282 | $rows = array_merge( $rowList, array_values( $rowSet ) ); |
283 | // Insert the job rows in chunks to avoid replica DB lag... |
284 | foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { |
285 | $dbw->newInsertQueryBuilder() |
286 | ->insertInto( 'job' ) |
287 | ->rows( $rowBatch ) |
288 | ->caller( $method )->execute(); |
289 | } |
290 | $this->incrStats( 'inserts', $this->type, count( $rows ) ); |
291 | $this->incrStats( 'dupe_inserts', $this->type, |
292 | count( $rowSet ) + count( $rowList ) - count( $rows ) |
293 | ); |
294 | } catch ( DBError $e ) { |
295 | throw $this->getDBException( $e ); |
296 | } |
297 | if ( $flags & self::QOS_ATOMIC ) { |
298 | $dbw->endAtomic( $method ); |
299 | } |
300 | } |
301 | |
302 | /** |
303 | * @see JobQueue::doPop() |
304 | * @return RunnableJob|false |
305 | */ |
306 | protected function doPop() { |
307 | $job = false; // job popped off |
308 | try { |
309 | $uuid = wfRandomString( 32 ); // pop attempt |
310 | do { // retry when our row is invalid or deleted as a duplicate |
311 | // Try to reserve a row in the DB... |
312 | if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) { |
313 | $row = $this->claimOldest( $uuid ); |
314 | } else { // random first |
315 | $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs |
316 | $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand |
317 | $row = $this->claimRandom( $uuid, $rand, $gte ); |
318 | } |
319 | // Check if we found a row to reserve... |
320 | if ( !$row ) { |
321 | break; // nothing to do |
322 | } |
323 | $this->incrStats( 'pops', $this->type ); |
324 | |
325 | // Get the job object from the row... |
326 | $job = $this->jobFromRow( $row ); |
327 | break; // done |
328 | } while ( true ); |
329 | |
330 | if ( !$job || mt_rand( 0, 9 ) == 0 ) { |
331 | // Handled jobs that need to be recycled/deleted; |
332 | // any recycled jobs will be picked up next attempt |
333 | $this->recycleAndDeleteStaleJobs(); |
334 | } |
335 | } catch ( DBError $e ) { |
336 | throw $this->getDBException( $e ); |
337 | } |
338 | |
339 | return $job; |
340 | } |
341 | |
342 | /** |
343 | * Reserve a row with a single UPDATE without holding row locks over RTTs... |
344 | * |
345 | * @param string $uuid 32 char hex string |
346 | * @param int $rand Random unsigned integer (31 bits) |
347 | * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random) |
348 | * @return stdClass|false Row|false |
349 | */ |
350 | protected function claimRandom( $uuid, $rand, $gte ) { |
351 | $dbw = $this->getPrimaryDB(); |
352 | // Check cache to see if the queue has <= OFFSET items |
353 | $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) ); |
354 | |
355 | $invertedDirection = false; // whether one job_random direction was already scanned |
356 | // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT |
357 | // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is |
358 | // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot |
359 | // be used here with MySQL. |
360 | do { |
361 | if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows |
362 | // For small queues, using OFFSET will overshoot and return no rows more often. |
363 | // Instead, this uses job_random to pick a row (possibly checking both directions). |
364 | $row = $dbw->newSelectQueryBuilder() |
365 | ->select( self::selectFields() ) |
366 | ->from( 'job' ) |
367 | ->where( |
368 | [ |
369 | 'job_cmd' => $this->type, |
370 | 'job_token' => '', // unclaimed |
371 | $dbw->expr( 'job_random', $gte ? '>=' : '<=', $rand ) |
372 | ] |
373 | ) |
374 | ->orderBy( |
375 | 'job_random', |
376 | $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC |
377 | ) |
378 | ->caller( __METHOD__ )->fetchRow(); |
379 | if ( !$row && !$invertedDirection ) { |
380 | $gte = !$gte; |
381 | $invertedDirection = true; |
382 | continue; // try the other direction |
383 | } |
384 | } else { // table *may* have >= MAX_OFFSET rows |
385 | // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU |
386 | // in MySQL if there are many rows for some reason. This uses a small OFFSET |
387 | // instead of job_random for reducing excess claim retries. |
388 | $row = $dbw->newSelectQueryBuilder() |
389 | ->select( self::selectFields() ) |
390 | ->from( 'job' ) |
391 | ->where( |
392 | [ |
393 | 'job_cmd' => $this->type, |
394 | 'job_token' => '', // unclaimed |
395 | ] |
396 | ) |
397 | ->offset( mt_rand( 0, self::MAX_OFFSET ) ) |
398 | ->caller( __METHOD__ )->fetchRow(); |
399 | if ( !$row ) { |
400 | $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows |
401 | $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 ); |
402 | continue; // use job_random |
403 | } |
404 | } |
405 | |
406 | if ( !$row ) { |
407 | break; |
408 | } |
409 | |
410 | $dbw->newUpdateQueryBuilder() |
411 | ->update( 'job' ) // update by PK |
412 | ->set( [ |
413 | 'job_token' => $uuid, |
414 | 'job_token_timestamp' => $dbw->timestamp(), |
415 | 'job_attempts' => new RawSQLValue( 'job_attempts+1' ), |
416 | ] ) |
417 | ->where( [ |
418 | 'job_cmd' => $this->type, |
419 | 'job_id' => $row->job_id, |
420 | 'job_token' => '' |
421 | ] ) |
422 | ->caller( __METHOD__ )->execute(); |
423 | |
424 | // This might get raced out by another runner when claiming the previously |
425 | // selected row. The use of job_random should minimize this problem, however. |
426 | if ( !$dbw->affectedRows() ) { |
427 | $row = false; // raced out |
428 | } |
429 | } while ( !$row ); |
430 | |
431 | return $row; |
432 | } |
433 | |
434 | /** |
435 | * Reserve a row with a single UPDATE without holding row locks over RTTs... |
436 | * |
437 | * @param string $uuid 32 char hex string |
438 | * @return stdClass|false Row|false |
439 | */ |
440 | protected function claimOldest( $uuid ) { |
441 | $dbw = $this->getPrimaryDB(); |
442 | |
443 | $row = false; // the row acquired |
444 | do { |
445 | if ( $dbw->getType() === 'mysql' ) { |
446 | // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the |
447 | // same table being changed in an UPDATE query in MySQL (gives Error: 1093). |
448 | // Postgres has no such limitation. However, MySQL offers an |
449 | // alternative here by supporting ORDER BY + LIMIT for UPDATE queries. |
450 | $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " . |
451 | "SET " . |
452 | "job_token = {$dbw->addQuotes( $uuid ) }, " . |
453 | "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " . |
454 | "job_attempts = job_attempts+1 " . |
455 | "WHERE ( " . |
456 | "job_cmd = {$dbw->addQuotes( $this->type )} " . |
457 | "AND job_token = {$dbw->addQuotes( '' )} " . |
458 | ") ORDER BY job_id ASC LIMIT 1", |
459 | __METHOD__ |
460 | ); |
461 | } else { |
462 | // Use a subquery to find the job, within an UPDATE to claim it. |
463 | // This uses as much of the DB wrapper functions as possible. |
464 | $qb = $dbw->newSelectQueryBuilder() |
465 | ->select( 'job_id' ) |
466 | ->from( 'job' ) |
467 | ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] ) |
468 | ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC ) |
469 | ->limit( 1 ); |
470 | |
471 | $dbw->newUpdateQueryBuilder() |
472 | ->update( 'job' ) |
473 | ->set( [ |
474 | 'job_token' => $uuid, |
475 | 'job_token_timestamp' => $dbw->timestamp(), |
476 | 'job_attempts' => new RawSQLValue( 'job_attempts+1' ), |
477 | ] ) |
478 | ->where( [ 'job_id' => new RawSQLValue( '(' . $qb->getSQL() . ')' ) ] ) |
479 | ->caller( __METHOD__ )->execute(); |
480 | } |
481 | |
482 | if ( !$dbw->affectedRows() ) { |
483 | break; |
484 | } |
485 | |
486 | // Fetch any row that we just reserved... |
487 | $row = $dbw->newSelectQueryBuilder() |
488 | ->select( self::selectFields() ) |
489 | ->from( 'job' ) |
490 | ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] ) |
491 | ->caller( __METHOD__ )->fetchRow(); |
492 | if ( !$row ) { // raced out by duplicate job removal |
493 | wfDebug( "Row deleted as duplicate by another process." ); |
494 | } |
495 | } while ( !$row ); |
496 | |
497 | return $row; |
498 | } |
499 | |
500 | /** |
501 | * @see JobQueue::doAck() |
502 | * @param RunnableJob $job |
503 | * @throws JobQueueConnectionError |
504 | * @throws JobQueueError |
505 | */ |
506 | protected function doAck( RunnableJob $job ) { |
507 | $id = $job->getMetadata( 'id' ); |
508 | if ( $id === null ) { |
509 | throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no ID." ); |
510 | } |
511 | |
512 | $dbw = $this->getPrimaryDB(); |
513 | try { |
514 | // Delete a row with a single DELETE without holding row locks over RTTs... |
515 | $dbw->newDeleteQueryBuilder() |
516 | ->deleteFrom( 'job' ) |
517 | ->where( [ 'job_cmd' => $this->type, 'job_id' => $id ] ) |
518 | ->caller( __METHOD__ )->execute(); |
519 | |
520 | $this->incrStats( 'acks', $this->type ); |
521 | } catch ( DBError $e ) { |
522 | throw $this->getDBException( $e ); |
523 | } |
524 | } |
525 | |
526 | /** |
527 | * @see JobQueue::doDeduplicateRootJob() |
528 | * @param IJobSpecification $job |
529 | * @throws JobQueueConnectionError |
530 | * @return bool |
531 | */ |
532 | protected function doDeduplicateRootJob( IJobSpecification $job ) { |
533 | // Callers should call JobQueueGroup::push() before this method so that if the |
534 | // insert fails, the de-duplication registration will be aborted. Since the insert |
535 | // is deferred till "transaction idle", do the same here, so that the ordering is |
536 | // maintained. Having only the de-duplication registration succeed would cause |
537 | // jobs to become no-ops without any actual jobs that made them redundant. |
538 | $dbw = $this->getPrimaryDB(); |
539 | $dbw->onTransactionCommitOrIdle( |
540 | function () use ( $job ) { |
541 | parent::doDeduplicateRootJob( $job ); |
542 | }, |
543 | __METHOD__ |
544 | ); |
545 | |
546 | return true; |
547 | } |
548 | |
549 | /** |
550 | * @see JobQueue::doDelete() |
551 | * @return bool |
552 | */ |
553 | protected function doDelete() { |
554 | $dbw = $this->getPrimaryDB(); |
555 | try { |
556 | $dbw->newDeleteQueryBuilder() |
557 | ->deleteFrom( 'job' ) |
558 | ->where( [ 'job_cmd' => $this->type ] ) |
559 | ->caller( __METHOD__ )->execute(); |
560 | } catch ( DBError $e ) { |
561 | throw $this->getDBException( $e ); |
562 | } |
563 | |
564 | return true; |
565 | } |
566 | |
567 | /** |
568 | * @see JobQueue::doWaitForBackups() |
569 | * @return void |
570 | */ |
571 | protected function doWaitForBackups() { |
572 | if ( $this->server ) { |
573 | return; // not using LBFactory instance |
574 | } |
575 | |
576 | $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); |
577 | $lbFactory->waitForReplication(); |
578 | } |
579 | |
580 | /** |
581 | * @return void |
582 | */ |
583 | protected function doFlushCaches() { |
584 | foreach ( [ 'size', 'acquiredcount' ] as $type ) { |
585 | $this->wanCache->delete( $this->getCacheKey( $type ) ); |
586 | } |
587 | } |
588 | |
589 | /** |
590 | * @see JobQueue::getAllQueuedJobs() |
591 | * @return Iterator<RunnableJob> |
592 | */ |
593 | public function getAllQueuedJobs() { |
594 | return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] ); |
595 | } |
596 | |
597 | /** |
598 | * @see JobQueue::getAllAcquiredJobs() |
599 | * @return Iterator<RunnableJob> |
600 | */ |
601 | public function getAllAcquiredJobs() { |
602 | $dbr = $this->getReplicaDB(); |
603 | return $this->getJobIterator( [ 'job_cmd' => $this->getType(), $dbr->expr( 'job_token', '>', '' ) ] ); |
604 | } |
605 | |
606 | /** |
607 | * @see JobQueue::getAllAbandonedJobs() |
608 | * @return Iterator<RunnableJob> |
609 | */ |
610 | public function getAllAbandonedJobs() { |
611 | $dbr = $this->getReplicaDB(); |
612 | return $this->getJobIterator( [ |
613 | 'job_cmd' => $this->getType(), |
614 | $dbr->expr( 'job_token', '>', '' ), |
615 | $dbr->expr( 'job_attempts', '>=', intval( $this->maxTries ) ), |
616 | ] ); |
617 | } |
618 | |
619 | /** |
620 | * @param array $conds Query conditions |
621 | * @return Iterator<RunnableJob> |
622 | */ |
623 | protected function getJobIterator( array $conds ) { |
624 | $dbr = $this->getReplicaDB(); |
625 | $qb = $dbr->newSelectQueryBuilder() |
626 | ->select( self::selectFields() ) |
627 | ->from( 'job' ) |
628 | ->where( $conds ); |
629 | try { |
630 | return new MappedIterator( |
631 | $qb->caller( __METHOD__ )->fetchResultSet(), |
632 | function ( $row ) { |
633 | return $this->jobFromRow( $row ); |
634 | } |
635 | ); |
636 | } catch ( DBError $e ) { |
637 | throw $this->getDBException( $e ); |
638 | } |
639 | } |
640 | |
641 | public function getCoalesceLocationInternal() { |
642 | if ( $this->server ) { |
643 | return null; // not using the LBFactory instance |
644 | } |
645 | |
646 | return is_string( $this->cluster ) |
647 | ? "DBCluster:{$this->cluster}:{$this->domain}" |
648 | : "LBFactory:{$this->domain}"; |
649 | } |
650 | |
651 | protected function doGetSiblingQueuesWithJobs( array $types ) { |
652 | $dbr = $this->getReplicaDB(); |
653 | // @note: this does not check whether the jobs are claimed or not. |
654 | // This is useful so JobQueueGroup::pop() also sees queues that only |
655 | // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue |
656 | // failed jobs so that they can be popped again for that edge case. |
657 | $res = $dbr->newSelectQueryBuilder() |
658 | ->select( 'job_cmd' ) |
659 | ->distinct() |
660 | ->from( 'job' ) |
661 | ->where( [ 'job_cmd' => $types ] ) |
662 | ->caller( __METHOD__ )->fetchResultSet(); |
663 | |
664 | $types = []; |
665 | foreach ( $res as $row ) { |
666 | $types[] = $row->job_cmd; |
667 | } |
668 | |
669 | return $types; |
670 | } |
671 | |
672 | protected function doGetSiblingQueueSizes( array $types ) { |
673 | $dbr = $this->getReplicaDB(); |
674 | |
675 | $res = $dbr->newSelectQueryBuilder() |
676 | ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] ) |
677 | ->from( 'job' ) |
678 | ->where( [ 'job_cmd' => $types ] ) |
679 | ->groupBy( 'job_cmd' ) |
680 | ->caller( __METHOD__ )->fetchResultSet(); |
681 | |
682 | $sizes = []; |
683 | foreach ( $res as $row ) { |
684 | $sizes[$row->job_cmd] = (int)$row->count; |
685 | } |
686 | |
687 | return $sizes; |
688 | } |
689 | |
690 | /** |
691 | * Recycle or destroy any jobs that have been claimed for too long |
692 | * |
693 | * @return int Number of jobs recycled/deleted |
694 | */ |
695 | public function recycleAndDeleteStaleJobs() { |
696 | $now = time(); |
697 | $count = 0; // affected rows |
698 | $dbw = $this->getPrimaryDB(); |
699 | |
700 | try { |
701 | if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { |
702 | return $count; // already in progress |
703 | } |
704 | |
705 | // Remove claims on jobs acquired for too long if enabled... |
706 | if ( $this->claimTTL > 0 ) { |
707 | $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); |
708 | // Get the IDs of jobs that have be claimed but not finished after too long. |
709 | // These jobs can be recycled into the queue by expiring the claim. Selecting |
710 | // the IDs first means that the UPDATE can be done by primary key (less deadlocks). |
711 | $res = $dbw->newSelectQueryBuilder() |
712 | ->select( 'job_id' ) |
713 | ->from( 'job' ) |
714 | ->where( |
715 | [ |
716 | 'job_cmd' => $this->type, |
717 | $dbw->expr( 'job_token', '!=', '' ), // was acquired |
718 | $dbw->expr( 'job_token_timestamp', '<', $claimCutoff ), // stale |
719 | $dbw->expr( 'job_attempts', '<', $this->maxTries ), // retries left |
720 | ] |
721 | ) |
722 | ->caller( __METHOD__ )->fetchResultSet(); |
723 | $ids = array_map( |
724 | static function ( $o ) { |
725 | return $o->job_id; |
726 | }, iterator_to_array( $res ) |
727 | ); |
728 | if ( count( $ids ) ) { |
729 | // Reset job_token for these jobs so that other runners will pick them up. |
730 | // Set the timestamp to the current time, as it is useful to now that the job |
731 | // was already tried before (the timestamp becomes the "released" time). |
732 | $dbw->newUpdateQueryBuilder() |
733 | ->update( 'job' ) |
734 | ->set( [ |
735 | 'job_token' => '', |
736 | 'job_token_timestamp' => $dbw->timestamp( $now ) // time of release |
737 | ] ) |
738 | ->where( [ |
739 | 'job_id' => $ids, |
740 | $dbw->expr( 'job_token', '!=', '' ), |
741 | ] ) |
742 | ->caller( __METHOD__ )->execute(); |
743 | |
744 | $affected = $dbw->affectedRows(); |
745 | $count += $affected; |
746 | $this->incrStats( 'recycles', $this->type, $affected ); |
747 | } |
748 | } |
749 | |
750 | // Just destroy any stale jobs... |
751 | $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); |
752 | $qb = $dbw->newSelectQueryBuilder() |
753 | ->select( 'job_id' ) |
754 | ->from( 'job' ) |
755 | ->where( |
756 | [ |
757 | 'job_cmd' => $this->type, |
758 | $dbw->expr( 'job_token', '!=', '' ), // was acquired |
759 | $dbw->expr( 'job_token_timestamp', '<', $pruneCutoff ) // stale |
760 | ] |
761 | ); |
762 | if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... |
763 | $qb->andWhere( $dbw->expr( 'job_attempts', '>=', $this->maxTries ) ); |
764 | } |
765 | // Get the IDs of jobs that are considered stale and should be removed. Selecting |
766 | // the IDs first means that the UPDATE can be done by primary key (less deadlocks). |
767 | $res = $qb->caller( __METHOD__ )->fetchResultSet(); |
768 | $ids = array_map( |
769 | static function ( $o ) { |
770 | return $o->job_id; |
771 | }, iterator_to_array( $res ) |
772 | ); |
773 | if ( count( $ids ) ) { |
774 | $dbw->newDeleteQueryBuilder() |
775 | ->deleteFrom( 'job' ) |
776 | ->where( [ 'job_id' => $ids ] ) |
777 | ->caller( __METHOD__ )->execute(); |
778 | $affected = $dbw->affectedRows(); |
779 | $count += $affected; |
780 | $this->incrStats( 'abandons', $this->type, $affected ); |
781 | } |
782 | |
783 | $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); |
784 | } catch ( DBError $e ) { |
785 | throw $this->getDBException( $e ); |
786 | } |
787 | |
788 | return $count; |
789 | } |
790 | |
791 | /** |
792 | * @param IJobSpecification $job |
793 | * @param IReadableDatabase $db |
794 | * @return array |
795 | */ |
796 | protected function insertFields( IJobSpecification $job, IReadableDatabase $db ) { |
797 | return [ |
798 | // Fields that describe the nature of the job |
799 | 'job_cmd' => $job->getType(), |
800 | 'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL, |
801 | 'job_title' => $job->getParams()['title'] ?? '', |
802 | 'job_params' => self::makeBlob( $job->getParams() ), |
803 | // Additional job metadata |
804 | 'job_timestamp' => $db->timestamp(), |
805 | 'job_sha1' => Wikimedia\base_convert( |
806 | sha1( serialize( $job->getDeduplicationInfo() ) ), |
807 | 16, 36, 31 |
808 | ), |
809 | 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) |
810 | ]; |
811 | } |
812 | |
813 | /** |
814 | * @throws JobQueueConnectionError |
815 | * @return IDatabase |
816 | */ |
817 | protected function getReplicaDB() { |
818 | try { |
819 | return $this->getDB( DB_REPLICA ); |
820 | } catch ( DBConnectionError $e ) { |
821 | throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); |
822 | } |
823 | } |
824 | |
825 | /** |
826 | * @throws JobQueueConnectionError |
827 | * @return IMaintainableDatabase |
828 | * @since 1.37 |
829 | */ |
830 | protected function getPrimaryDB() { |
831 | try { |
832 | return $this->getDB( DB_PRIMARY ); |
833 | } catch ( DBConnectionError $e ) { |
834 | throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); |
835 | } |
836 | } |
837 | |
838 | /** |
839 | * @param int $index (DB_REPLICA/DB_PRIMARY) |
840 | * @return IMaintainableDatabase |
841 | */ |
842 | protected function getDB( $index ) { |
843 | if ( $this->server ) { |
844 | if ( $this->conn instanceof IDatabase ) { |
845 | return $this->conn; |
846 | } elseif ( $this->conn instanceof DBError ) { |
847 | throw $this->conn; |
848 | } |
849 | |
850 | try { |
851 | $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create( |
852 | $this->server['type'], |
853 | $this->server |
854 | ); |
855 | } catch ( DBError $e ) { |
856 | $this->conn = $e; |
857 | throw $e; |
858 | } |
859 | |
860 | return $this->conn; |
861 | } else { |
862 | $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); |
863 | $lb = is_string( $this->cluster ) |
864 | ? $lbFactory->getExternalLB( $this->cluster ) |
865 | : $lbFactory->getMainLB( $this->domain ); |
866 | |
867 | if ( $lb->getServerType( ServerInfo::WRITER_INDEX ) !== 'sqlite' ) { |
868 | // Keep a separate connection to avoid contention and deadlocks; |
869 | // However, SQLite has the opposite behavior due to DB-level locking. |
870 | $flags = $lb::CONN_TRX_AUTOCOMMIT; |
871 | } else { |
872 | // Jobs insertion will be deferred until the PRESEND stage to reduce contention. |
873 | $flags = 0; |
874 | } |
875 | |
876 | return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags ); |
877 | } |
878 | } |
879 | |
880 | /** |
881 | * @param string $property |
882 | * @return string |
883 | */ |
884 | private function getCacheKey( $property ) { |
885 | $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; |
886 | |
887 | return $this->wanCache->makeGlobalKey( |
888 | 'jobqueue', |
889 | $this->domain, |
890 | $cluster, |
891 | $this->type, |
892 | $property |
893 | ); |
894 | } |
895 | |
896 | /** |
897 | * @param array|false $params |
898 | * @return string |
899 | */ |
900 | protected static function makeBlob( $params ) { |
901 | if ( $params !== false ) { |
902 | return serialize( $params ); |
903 | } else { |
904 | return ''; |
905 | } |
906 | } |
907 | |
908 | /** |
909 | * @param stdClass $row |
910 | * @return RunnableJob |
911 | */ |
912 | protected function jobFromRow( $row ) { |
913 | $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : []; |
914 | if ( !is_array( $params ) ) { // this shouldn't happen |
915 | throw new UnexpectedValueException( |
916 | "Could not unserialize job with ID '{$row->job_id}'." ); |
917 | } |
918 | |
919 | $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ]; |
920 | $job = $this->factoryJob( $row->job_cmd, $params ); |
921 | $job->setMetadata( 'id', $row->job_id ); |
922 | $job->setMetadata( 'timestamp', $row->job_timestamp ); |
923 | |
924 | return $job; |
925 | } |
926 | |
927 | /** |
928 | * @param DBError $e |
929 | * @return JobQueueError |
930 | */ |
931 | protected function getDBException( DBError $e ) { |
932 | return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); |
933 | } |
934 | |
935 | /** |
936 | * Return the list of job fields that should be selected. |
937 | * @since 1.23 |
938 | * @return array |
939 | */ |
940 | public static function selectFields() { |
941 | return [ |
942 | 'job_id', |
943 | 'job_cmd', |
944 | 'job_namespace', |
945 | 'job_title', |
946 | 'job_timestamp', |
947 | 'job_params', |
948 | 'job_random', |
949 | 'job_attempts', |
950 | 'job_token', |
951 | 'job_token_timestamp', |
952 | 'job_sha1', |
953 | ]; |
954 | } |
955 | } |