Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
43.94% |
203 / 462 |
|
23.53% |
8 / 34 |
CRAP | |
0.00% |
0 / 1 |
JobQueueDB | |
43.94% |
203 / 462 |
|
23.53% |
8 / 34 |
1825.81 | |
0.00% |
0 / 1 |
__construct | |
42.86% |
3 / 7 |
|
0.00% |
0 / 1 |
6.99 | |||
supportedOrders | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
optimalOrder | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
doIsEmpty | |
77.78% |
7 / 9 |
|
0.00% |
0 / 1 |
2.04 | |||
doGetSize | |
76.92% |
10 / 13 |
|
0.00% |
0 / 1 |
3.11 | |||
doGetAcquiredCount | |
77.78% |
14 / 18 |
|
0.00% |
0 / 1 |
4.18 | |||
doGetAbandonedCount | |
0.00% |
0 / 21 |
|
0.00% |
0 / 1 |
20 | |||
doBatchPush | |
100.00% |
11 / 11 |
|
100.00% |
1 / 1 |
1 | |||
doBatchPushInternal | |
54.76% |
23 / 42 |
|
0.00% |
0 / 1 |
19.26 | |||
doPop | |
76.47% |
13 / 17 |
|
0.00% |
0 / 1 |
6.47 | |||
claimRandom | |
96.49% |
55 / 57 |
|
0.00% |
0 / 1 |
9 | |||
claimOldest | |
0.00% |
0 / 40 |
|
0.00% |
0 / 1 |
20 | |||
doAck | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
12 | |||
doDeduplicateRootJob | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
2 | |||
doDelete | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
6 | |||
doWaitForBackups | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
doFlushCaches | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
getAllQueuedJobs | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getAllAcquiredJobs | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
getAllAbandonedJobs | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
2 | |||
getJobIterator | |
84.62% |
11 / 13 |
|
0.00% |
0 / 1 |
2.01 | |||
getCoalesceLocationInternal | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
12 | |||
doGetSiblingQueuesWithJobs | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
6 | |||
doGetSiblingQueueSizes | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
6 | |||
recycleAndDeleteStaleJobs | |
0.00% |
0 / 70 |
|
0.00% |
0 / 1 |
56 | |||
insertFields | |
100.00% |
12 / 12 |
|
100.00% |
1 / 1 |
1 | |||
getReplicaDB | |
33.33% |
1 / 3 |
|
0.00% |
0 / 1 |
3.19 | |||
getPrimaryDB | |
33.33% |
1 / 3 |
|
0.00% |
0 / 1 |
3.19 | |||
getDB | |
33.33% |
7 / 21 |
|
0.00% |
0 / 1 |
21.52 | |||
getCacheKey | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
2 | |||
makeBlob | |
66.67% |
2 / 3 |
|
0.00% |
0 / 1 |
2.15 | |||
jobFromRow | |
77.78% |
7 / 9 |
|
0.00% |
0 / 1 |
3.10 | |||
getDBException | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
selectFields | |
100.00% |
13 / 13 |
|
100.00% |
1 / 1 |
1 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | use MediaWiki\MediaWikiServices; |
21 | use Wikimedia\Rdbms\DBConnectionError; |
22 | use Wikimedia\Rdbms\DBError; |
23 | use Wikimedia\Rdbms\IDatabase; |
24 | use Wikimedia\Rdbms\IMaintainableDatabase; |
25 | use Wikimedia\Rdbms\IReadableDatabase; |
26 | use Wikimedia\Rdbms\RawSQLValue; |
27 | use Wikimedia\Rdbms\SelectQueryBuilder; |
28 | use Wikimedia\Rdbms\ServerInfo; |
29 | use Wikimedia\ScopedCallback; |
30 | |
31 | /** |
32 | * Database-backed job queue storage. |
33 | * |
34 | * @since 1.21 |
35 | * @ingroup JobQueue |
36 | */ |
37 | class JobQueueDB extends JobQueue { |
38 | /* seconds to cache info without re-validating */ |
39 | private const CACHE_TTL_SHORT = 30; |
40 | /* seconds a job can live once claimed */ |
41 | private const MAX_AGE_PRUNE = 7 * 24 * 3600; |
42 | /** |
43 | * Used for job_random, the highest safe 32-bit signed integer. |
44 | * Equivalent to `( 2 ** 31 ) - 1` on 64-bit. |
45 | */ |
46 | private const MAX_JOB_RANDOM = 2_147_483_647; |
47 | /* maximum number of rows to skip */ |
48 | private const MAX_OFFSET = 255; |
49 | |
50 | /** @var IMaintainableDatabase|DBError|null */ |
51 | protected $conn; |
52 | |
53 | /** @var array|null Server configuration array */ |
54 | protected $server; |
55 | /** @var string|null Name of an external DB cluster or null for the local DB cluster */ |
56 | protected $cluster; |
57 | |
58 | /** |
59 | * Additional parameters include: |
60 | * - server : Server configuration array for Database::factory. Overrides "cluster". |
61 | * - cluster : The name of an external cluster registered via LBFactory. |
62 | * If not specified, the primary DB cluster for the wiki will be used. |
63 | * This can be overridden with a custom cluster so that DB handles will |
64 | * be retrieved via LBFactory::getExternalLB() and getConnection(). |
65 | * @param array $params |
66 | */ |
67 | protected function __construct( array $params ) { |
68 | parent::__construct( $params ); |
69 | |
70 | if ( isset( $params['server'] ) ) { |
71 | $this->server = $params['server']; |
72 | // Always use autocommit mode, even if DBO_TRX is configured |
73 | $this->server['flags'] ??= 0; |
74 | $this->server['flags'] &= ~( IDatabase::DBO_TRX | IDatabase::DBO_DEFAULT ); |
75 | } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) { |
76 | $this->cluster = $params['cluster']; |
77 | } |
78 | } |
79 | |
80 | protected function supportedOrders() { |
81 | return [ 'random', 'timestamp', 'fifo' ]; |
82 | } |
83 | |
84 | protected function optimalOrder() { |
85 | return 'random'; |
86 | } |
87 | |
88 | /** |
89 | * @see JobQueue::doIsEmpty() |
90 | * @return bool |
91 | */ |
92 | protected function doIsEmpty() { |
93 | $dbr = $this->getReplicaDB(); |
94 | try { |
95 | // unclaimed job |
96 | $found = (bool)$dbr->newSelectQueryBuilder() |
97 | ->select( '1' ) |
98 | ->from( 'job' ) |
99 | ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] ) |
100 | ->caller( __METHOD__ )->fetchField(); |
101 | } catch ( DBError $e ) { |
102 | throw $this->getDBException( $e ); |
103 | } |
104 | |
105 | return !$found; |
106 | } |
107 | |
108 | /** |
109 | * @see JobQueue::doGetSize() |
110 | * @return int |
111 | */ |
112 | protected function doGetSize() { |
113 | $key = $this->getCacheKey( 'size' ); |
114 | |
115 | $size = $this->wanCache->get( $key ); |
116 | if ( is_int( $size ) ) { |
117 | return $size; |
118 | } |
119 | |
120 | $dbr = $this->getReplicaDB(); |
121 | try { |
122 | $size = $dbr->newSelectQueryBuilder() |
123 | ->from( 'job' ) |
124 | ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] ) |
125 | ->caller( __METHOD__ )->fetchRowCount(); |
126 | } catch ( DBError $e ) { |
127 | throw $this->getDBException( $e ); |
128 | } |
129 | $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT ); |
130 | |
131 | return $size; |
132 | } |
133 | |
134 | /** |
135 | * @see JobQueue::doGetAcquiredCount() |
136 | * @return int |
137 | */ |
138 | protected function doGetAcquiredCount() { |
139 | if ( $this->claimTTL <= 0 ) { |
140 | return 0; // no acknowledgements |
141 | } |
142 | |
143 | $key = $this->getCacheKey( 'acquiredcount' ); |
144 | |
145 | $count = $this->wanCache->get( $key ); |
146 | if ( is_int( $count ) ) { |
147 | return $count; |
148 | } |
149 | |
150 | $dbr = $this->getReplicaDB(); |
151 | try { |
152 | $count = $dbr->newSelectQueryBuilder() |
153 | ->from( 'job' ) |
154 | ->where( [ |
155 | 'job_cmd' => $this->type, |
156 | $dbr->expr( 'job_token', '!=', '' ), |
157 | ] ) |
158 | ->caller( __METHOD__ )->fetchRowCount(); |
159 | } catch ( DBError $e ) { |
160 | throw $this->getDBException( $e ); |
161 | } |
162 | $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT ); |
163 | |
164 | return $count; |
165 | } |
166 | |
167 | /** |
168 | * @see JobQueue::doGetAbandonedCount() |
169 | * @return int |
170 | * @throws JobQueueConnectionError |
171 | * @throws JobQueueError |
172 | */ |
173 | protected function doGetAbandonedCount() { |
174 | if ( $this->claimTTL <= 0 ) { |
175 | return 0; // no acknowledgements |
176 | } |
177 | |
178 | $key = $this->getCacheKey( 'abandonedcount' ); |
179 | |
180 | $count = $this->wanCache->get( $key ); |
181 | if ( is_int( $count ) ) { |
182 | return $count; |
183 | } |
184 | |
185 | $dbr = $this->getReplicaDB(); |
186 | try { |
187 | $count = $dbr->newSelectQueryBuilder() |
188 | ->from( 'job' ) |
189 | ->where( |
190 | [ |
191 | 'job_cmd' => $this->type, |
192 | $dbr->expr( 'job_token', '!=', '' ), |
193 | $dbr->expr( 'job_attempts', '>=', $this->maxTries ), |
194 | ] |
195 | ) |
196 | ->caller( __METHOD__ )->fetchRowCount(); |
197 | } catch ( DBError $e ) { |
198 | throw $this->getDBException( $e ); |
199 | } |
200 | |
201 | $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT ); |
202 | |
203 | return $count; |
204 | } |
205 | |
206 | /** |
207 | * @see JobQueue::doBatchPush() |
208 | * @param IJobSpecification[] $jobs |
209 | * @param int $flags |
210 | * @throws DBError|Exception |
211 | * @return void |
212 | */ |
213 | protected function doBatchPush( array $jobs, $flags ) { |
214 | // Silence expectations related to getting a primary DB, as we have to get a primary DB to insert the job. |
215 | $transactionProfiler = Profiler::instance()->getTransactionProfiler(); |
216 | $scope = $transactionProfiler->silenceForScope(); |
217 | $dbw = $this->getPrimaryDB(); |
218 | ScopedCallback::consume( $scope ); |
219 | // In general, there will be two cases here: |
220 | // a) sqlite; DB connection is probably a regular round-aware handle. |
221 | // If the connection is busy with a transaction, then defer the job writes |
222 | // until right before the main round commit step. Any errors that bubble |
223 | // up will rollback the main commit round. |
224 | // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle. |
225 | // No transaction is active nor will be started by writes, so enqueue the jobs |
226 | // now so that any errors will show up immediately as the interface expects. Any |
227 | // errors that bubble up will rollback the main commit round. |
228 | $fname = __METHOD__; |
229 | $dbw->onTransactionPreCommitOrIdle( |
230 | function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) { |
231 | $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname ); |
232 | }, |
233 | $fname |
234 | ); |
235 | } |
236 | |
237 | /** |
238 | * This function should *not* be called outside of JobQueueDB |
239 | * |
240 | * @suppress SecurityCheck-SQLInjection Bug in phan-taint-check handling bulk inserts |
241 | * @param IDatabase $dbw |
242 | * @param IJobSpecification[] $jobs |
243 | * @param int $flags |
244 | * @param string $method |
245 | * @throws DBError |
246 | * @return void |
247 | */ |
248 | public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { |
249 | if ( $jobs === [] ) { |
250 | return; |
251 | } |
252 | |
253 | $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated |
254 | $rowList = []; // list of jobs for jobs that are not de-duplicated |
255 | foreach ( $jobs as $job ) { |
256 | $row = $this->insertFields( $job, $dbw ); |
257 | if ( $job->ignoreDuplicates() ) { |
258 | $rowSet[$row['job_sha1']] = $row; |
259 | } else { |
260 | $rowList[] = $row; |
261 | } |
262 | } |
263 | |
264 | if ( $flags & self::QOS_ATOMIC ) { |
265 | $dbw->startAtomic( $method ); // wrap all the job additions in one transaction |
266 | } |
267 | try { |
268 | // Strip out any duplicate jobs that are already in the queue... |
269 | if ( count( $rowSet ) ) { |
270 | $res = $dbw->newSelectQueryBuilder() |
271 | ->select( 'job_sha1' ) |
272 | ->from( 'job' ) |
273 | ->where( |
274 | [ |
275 | // No job_type condition since it's part of the job_sha1 hash |
276 | 'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ), |
277 | 'job_token' => '' // unclaimed |
278 | ] |
279 | ) |
280 | ->caller( $method )->fetchResultSet(); |
281 | foreach ( $res as $row ) { |
282 | wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." ); |
283 | unset( $rowSet[$row->job_sha1] ); // already enqueued |
284 | } |
285 | } |
286 | // Build the full list of job rows to insert |
287 | $rows = array_merge( $rowList, array_values( $rowSet ) ); |
288 | // Silence expectations related to inserting to the job table, because we have to perform the inserts to |
289 | // track the job. |
290 | $transactionProfiler = Profiler::instance()->getTransactionProfiler(); |
291 | $scope = $transactionProfiler->silenceForScope(); |
292 | // Insert the job rows in chunks to avoid replica DB lag... |
293 | foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { |
294 | $dbw->newInsertQueryBuilder() |
295 | ->insertInto( 'job' ) |
296 | ->rows( $rowBatch ) |
297 | ->caller( $method )->execute(); |
298 | } |
299 | ScopedCallback::consume( $scope ); |
300 | $this->incrStats( 'inserts', $this->type, count( $rows ) ); |
301 | $this->incrStats( 'dupe_inserts', $this->type, |
302 | count( $rowSet ) + count( $rowList ) - count( $rows ) |
303 | ); |
304 | } catch ( DBError $e ) { |
305 | throw $this->getDBException( $e ); |
306 | } |
307 | if ( $flags & self::QOS_ATOMIC ) { |
308 | $dbw->endAtomic( $method ); |
309 | } |
310 | } |
311 | |
312 | /** |
313 | * @see JobQueue::doPop() |
314 | * @return RunnableJob|false |
315 | */ |
316 | protected function doPop() { |
317 | $job = false; // job popped off |
318 | try { |
319 | $uuid = wfRandomString( 32 ); // pop attempt |
320 | do { // retry when our row is invalid or deleted as a duplicate |
321 | // Try to reserve a row in the DB... |
322 | if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) { |
323 | $row = $this->claimOldest( $uuid ); |
324 | } else { // random first |
325 | $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs |
326 | $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand |
327 | $row = $this->claimRandom( $uuid, $rand, $gte ); |
328 | } |
329 | // Check if we found a row to reserve... |
330 | if ( !$row ) { |
331 | break; // nothing to do |
332 | } |
333 | $this->incrStats( 'pops', $this->type ); |
334 | |
335 | // Get the job object from the row... |
336 | $job = $this->jobFromRow( $row ); |
337 | break; // done |
338 | } while ( true ); |
339 | |
340 | if ( !$job || mt_rand( 0, 9 ) == 0 ) { |
341 | // Handled jobs that need to be recycled/deleted; |
342 | // any recycled jobs will be picked up next attempt |
343 | $this->recycleAndDeleteStaleJobs(); |
344 | } |
345 | } catch ( DBError $e ) { |
346 | throw $this->getDBException( $e ); |
347 | } |
348 | |
349 | return $job; |
350 | } |
351 | |
352 | /** |
353 | * Reserve a row with a single UPDATE without holding row locks over RTTs... |
354 | * |
355 | * @param string $uuid 32 char hex string |
356 | * @param int $rand Random unsigned integer (31 bits) |
357 | * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random) |
358 | * @return stdClass|false Row|false |
359 | */ |
360 | protected function claimRandom( $uuid, $rand, $gte ) { |
361 | $dbw = $this->getPrimaryDB(); |
362 | // Check cache to see if the queue has <= OFFSET items |
363 | $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) ); |
364 | |
365 | $invertedDirection = false; // whether one job_random direction was already scanned |
366 | // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT |
367 | // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is |
368 | // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot |
369 | // be used here with MySQL. |
370 | do { |
371 | if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows |
372 | // For small queues, using OFFSET will overshoot and return no rows more often. |
373 | // Instead, this uses job_random to pick a row (possibly checking both directions). |
374 | $row = $dbw->newSelectQueryBuilder() |
375 | ->select( self::selectFields() ) |
376 | ->from( 'job' ) |
377 | ->where( |
378 | [ |
379 | 'job_cmd' => $this->type, |
380 | 'job_token' => '', // unclaimed |
381 | $dbw->expr( 'job_random', $gte ? '>=' : '<=', $rand ) |
382 | ] |
383 | ) |
384 | ->orderBy( |
385 | 'job_random', |
386 | $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC |
387 | ) |
388 | ->caller( __METHOD__ )->fetchRow(); |
389 | if ( !$row && !$invertedDirection ) { |
390 | $gte = !$gte; |
391 | $invertedDirection = true; |
392 | continue; // try the other direction |
393 | } |
394 | } else { // table *may* have >= MAX_OFFSET rows |
395 | // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU |
396 | // in MySQL if there are many rows for some reason. This uses a small OFFSET |
397 | // instead of job_random for reducing excess claim retries. |
398 | $row = $dbw->newSelectQueryBuilder() |
399 | ->select( self::selectFields() ) |
400 | ->from( 'job' ) |
401 | ->where( |
402 | [ |
403 | 'job_cmd' => $this->type, |
404 | 'job_token' => '', // unclaimed |
405 | ] |
406 | ) |
407 | ->offset( mt_rand( 0, self::MAX_OFFSET ) ) |
408 | ->caller( __METHOD__ )->fetchRow(); |
409 | if ( !$row ) { |
410 | $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows |
411 | $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 ); |
412 | continue; // use job_random |
413 | } |
414 | } |
415 | |
416 | if ( !$row ) { |
417 | break; |
418 | } |
419 | |
420 | $dbw->newUpdateQueryBuilder() |
421 | ->update( 'job' ) // update by PK |
422 | ->set( [ |
423 | 'job_token' => $uuid, |
424 | 'job_token_timestamp' => $dbw->timestamp(), |
425 | 'job_attempts' => new RawSQLValue( 'job_attempts+1' ), |
426 | ] ) |
427 | ->where( [ |
428 | 'job_cmd' => $this->type, |
429 | 'job_id' => $row->job_id, |
430 | 'job_token' => '' |
431 | ] ) |
432 | ->caller( __METHOD__ )->execute(); |
433 | |
434 | // This might get raced out by another runner when claiming the previously |
435 | // selected row. The use of job_random should minimize this problem, however. |
436 | if ( !$dbw->affectedRows() ) { |
437 | $row = false; // raced out |
438 | } |
439 | } while ( !$row ); |
440 | |
441 | return $row; |
442 | } |
443 | |
444 | /** |
445 | * Reserve a row with a single UPDATE without holding row locks over RTTs... |
446 | * |
447 | * @param string $uuid 32 char hex string |
448 | * @return stdClass|false Row|false |
449 | */ |
450 | protected function claimOldest( $uuid ) { |
451 | $dbw = $this->getPrimaryDB(); |
452 | |
453 | $row = false; // the row acquired |
454 | do { |
455 | if ( $dbw->getType() === 'mysql' ) { |
456 | // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the |
457 | // same table being changed in an UPDATE query in MySQL (gives Error: 1093). |
458 | // Postgres has no such limitation. However, MySQL offers an |
459 | // alternative here by supporting ORDER BY + LIMIT for UPDATE queries. |
460 | $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " . |
461 | "SET " . |
462 | "job_token = {$dbw->addQuotes( $uuid ) }, " . |
463 | "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " . |
464 | "job_attempts = job_attempts+1 " . |
465 | "WHERE ( " . |
466 | "job_cmd = {$dbw->addQuotes( $this->type )} " . |
467 | "AND job_token = {$dbw->addQuotes( '' )} " . |
468 | ") ORDER BY job_id ASC LIMIT 1", |
469 | __METHOD__ |
470 | ); |
471 | } else { |
472 | // Use a subquery to find the job, within an UPDATE to claim it. |
473 | // This uses as much of the DB wrapper functions as possible. |
474 | $qb = $dbw->newSelectQueryBuilder() |
475 | ->select( 'job_id' ) |
476 | ->from( 'job' ) |
477 | ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] ) |
478 | ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC ) |
479 | ->limit( 1 ); |
480 | |
481 | $dbw->newUpdateQueryBuilder() |
482 | ->update( 'job' ) |
483 | ->set( [ |
484 | 'job_token' => $uuid, |
485 | 'job_token_timestamp' => $dbw->timestamp(), |
486 | 'job_attempts' => new RawSQLValue( 'job_attempts+1' ), |
487 | ] ) |
488 | ->where( [ 'job_id' => new RawSQLValue( '(' . $qb->getSQL() . ')' ) ] ) |
489 | ->caller( __METHOD__ )->execute(); |
490 | } |
491 | |
492 | if ( !$dbw->affectedRows() ) { |
493 | break; |
494 | } |
495 | |
496 | // Fetch any row that we just reserved... |
497 | $row = $dbw->newSelectQueryBuilder() |
498 | ->select( self::selectFields() ) |
499 | ->from( 'job' ) |
500 | ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] ) |
501 | ->caller( __METHOD__ )->fetchRow(); |
502 | if ( !$row ) { // raced out by duplicate job removal |
503 | wfDebug( "Row deleted as duplicate by another process." ); |
504 | } |
505 | } while ( !$row ); |
506 | |
507 | return $row; |
508 | } |
509 | |
510 | /** |
511 | * @see JobQueue::doAck() |
512 | * @param RunnableJob $job |
513 | * @throws JobQueueConnectionError |
514 | * @throws JobQueueError |
515 | */ |
516 | protected function doAck( RunnableJob $job ) { |
517 | $id = $job->getMetadata( 'id' ); |
518 | if ( $id === null ) { |
519 | throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no ID." ); |
520 | } |
521 | |
522 | $dbw = $this->getPrimaryDB(); |
523 | try { |
524 | // Delete a row with a single DELETE without holding row locks over RTTs... |
525 | $dbw->newDeleteQueryBuilder() |
526 | ->deleteFrom( 'job' ) |
527 | ->where( [ 'job_cmd' => $this->type, 'job_id' => $id ] ) |
528 | ->caller( __METHOD__ )->execute(); |
529 | |
530 | $this->incrStats( 'acks', $this->type ); |
531 | } catch ( DBError $e ) { |
532 | throw $this->getDBException( $e ); |
533 | } |
534 | } |
535 | |
536 | /** |
537 | * @see JobQueue::doDeduplicateRootJob() |
538 | * @param IJobSpecification $job |
539 | * @throws JobQueueConnectionError |
540 | * @return bool |
541 | */ |
542 | protected function doDeduplicateRootJob( IJobSpecification $job ) { |
543 | // Callers should call JobQueueGroup::push() before this method so that if the |
544 | // insert fails, the de-duplication registration will be aborted. Since the insert |
545 | // is deferred till "transaction idle", do the same here, so that the ordering is |
546 | // maintained. Having only the de-duplication registration succeed would cause |
547 | // jobs to become no-ops without any actual jobs that made them redundant. |
548 | $dbw = $this->getPrimaryDB(); |
549 | $dbw->onTransactionCommitOrIdle( |
550 | function () use ( $job ) { |
551 | parent::doDeduplicateRootJob( $job ); |
552 | }, |
553 | __METHOD__ |
554 | ); |
555 | |
556 | return true; |
557 | } |
558 | |
559 | /** |
560 | * @see JobQueue::doDelete() |
561 | * @return bool |
562 | */ |
563 | protected function doDelete() { |
564 | $dbw = $this->getPrimaryDB(); |
565 | try { |
566 | $dbw->newDeleteQueryBuilder() |
567 | ->deleteFrom( 'job' ) |
568 | ->where( [ 'job_cmd' => $this->type ] ) |
569 | ->caller( __METHOD__ )->execute(); |
570 | } catch ( DBError $e ) { |
571 | throw $this->getDBException( $e ); |
572 | } |
573 | |
574 | return true; |
575 | } |
576 | |
577 | /** |
578 | * @see JobQueue::doWaitForBackups() |
579 | * @return void |
580 | */ |
581 | protected function doWaitForBackups() { |
582 | if ( $this->server ) { |
583 | return; // not using LBFactory instance |
584 | } |
585 | |
586 | $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); |
587 | $lbFactory->waitForReplication(); |
588 | } |
589 | |
590 | /** |
591 | * @return void |
592 | */ |
593 | protected function doFlushCaches() { |
594 | foreach ( [ 'size', 'acquiredcount' ] as $type ) { |
595 | $this->wanCache->delete( $this->getCacheKey( $type ) ); |
596 | } |
597 | } |
598 | |
599 | /** |
600 | * @see JobQueue::getAllQueuedJobs() |
601 | * @return Iterator<RunnableJob> |
602 | */ |
603 | public function getAllQueuedJobs() { |
604 | return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] ); |
605 | } |
606 | |
607 | /** |
608 | * @see JobQueue::getAllAcquiredJobs() |
609 | * @return Iterator<RunnableJob> |
610 | */ |
611 | public function getAllAcquiredJobs() { |
612 | $dbr = $this->getReplicaDB(); |
613 | return $this->getJobIterator( [ 'job_cmd' => $this->getType(), $dbr->expr( 'job_token', '>', '' ) ] ); |
614 | } |
615 | |
616 | /** |
617 | * @see JobQueue::getAllAbandonedJobs() |
618 | * @return Iterator<RunnableJob> |
619 | */ |
620 | public function getAllAbandonedJobs() { |
621 | $dbr = $this->getReplicaDB(); |
622 | return $this->getJobIterator( [ |
623 | 'job_cmd' => $this->getType(), |
624 | $dbr->expr( 'job_token', '>', '' ), |
625 | $dbr->expr( 'job_attempts', '>=', intval( $this->maxTries ) ), |
626 | ] ); |
627 | } |
628 | |
629 | /** |
630 | * @param array $conds Query conditions |
631 | * @return Iterator<RunnableJob> |
632 | */ |
633 | protected function getJobIterator( array $conds ) { |
634 | $dbr = $this->getReplicaDB(); |
635 | $qb = $dbr->newSelectQueryBuilder() |
636 | ->select( self::selectFields() ) |
637 | ->from( 'job' ) |
638 | ->where( $conds ); |
639 | try { |
640 | return new MappedIterator( |
641 | $qb->caller( __METHOD__ )->fetchResultSet(), |
642 | function ( $row ) { |
643 | return $this->jobFromRow( $row ); |
644 | } |
645 | ); |
646 | } catch ( DBError $e ) { |
647 | throw $this->getDBException( $e ); |
648 | } |
649 | } |
650 | |
651 | public function getCoalesceLocationInternal() { |
652 | if ( $this->server ) { |
653 | return null; // not using the LBFactory instance |
654 | } |
655 | |
656 | return is_string( $this->cluster ) |
657 | ? "DBCluster:{$this->cluster}:{$this->domain}" |
658 | : "LBFactory:{$this->domain}"; |
659 | } |
660 | |
661 | protected function doGetSiblingQueuesWithJobs( array $types ) { |
662 | $dbr = $this->getReplicaDB(); |
663 | // @note: this does not check whether the jobs are claimed or not. |
664 | // This is useful so JobQueueGroup::pop() also sees queues that only |
665 | // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue |
666 | // failed jobs so that they can be popped again for that edge case. |
667 | $res = $dbr->newSelectQueryBuilder() |
668 | ->select( 'job_cmd' ) |
669 | ->distinct() |
670 | ->from( 'job' ) |
671 | ->where( [ 'job_cmd' => $types ] ) |
672 | ->caller( __METHOD__ )->fetchResultSet(); |
673 | |
674 | $types = []; |
675 | foreach ( $res as $row ) { |
676 | $types[] = $row->job_cmd; |
677 | } |
678 | |
679 | return $types; |
680 | } |
681 | |
682 | protected function doGetSiblingQueueSizes( array $types ) { |
683 | $dbr = $this->getReplicaDB(); |
684 | |
685 | $res = $dbr->newSelectQueryBuilder() |
686 | ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] ) |
687 | ->from( 'job' ) |
688 | ->where( [ 'job_cmd' => $types ] ) |
689 | ->groupBy( 'job_cmd' ) |
690 | ->caller( __METHOD__ )->fetchResultSet(); |
691 | |
692 | $sizes = []; |
693 | foreach ( $res as $row ) { |
694 | $sizes[$row->job_cmd] = (int)$row->count; |
695 | } |
696 | |
697 | return $sizes; |
698 | } |
699 | |
700 | /** |
701 | * Recycle or destroy any jobs that have been claimed for too long |
702 | * |
703 | * @return int Number of jobs recycled/deleted |
704 | */ |
705 | public function recycleAndDeleteStaleJobs() { |
706 | $now = time(); |
707 | $count = 0; // affected rows |
708 | $dbw = $this->getPrimaryDB(); |
709 | |
710 | try { |
711 | if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { |
712 | return $count; // already in progress |
713 | } |
714 | |
715 | // Remove claims on jobs acquired for too long if enabled... |
716 | if ( $this->claimTTL > 0 ) { |
717 | $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); |
718 | // Get the IDs of jobs that have be claimed but not finished after too long. |
719 | // These jobs can be recycled into the queue by expiring the claim. Selecting |
720 | // the IDs first means that the UPDATE can be done by primary key (less deadlocks). |
721 | $res = $dbw->newSelectQueryBuilder() |
722 | ->select( 'job_id' ) |
723 | ->from( 'job' ) |
724 | ->where( |
725 | [ |
726 | 'job_cmd' => $this->type, |
727 | $dbw->expr( 'job_token', '!=', '' ), // was acquired |
728 | $dbw->expr( 'job_token_timestamp', '<', $claimCutoff ), // stale |
729 | $dbw->expr( 'job_attempts', '<', $this->maxTries ), // retries left |
730 | ] |
731 | ) |
732 | ->caller( __METHOD__ )->fetchResultSet(); |
733 | $ids = array_map( |
734 | static function ( $o ) { |
735 | return $o->job_id; |
736 | }, iterator_to_array( $res ) |
737 | ); |
738 | if ( count( $ids ) ) { |
739 | // Reset job_token for these jobs so that other runners will pick them up. |
740 | // Set the timestamp to the current time, as it is useful to now that the job |
741 | // was already tried before (the timestamp becomes the "released" time). |
742 | $dbw->newUpdateQueryBuilder() |
743 | ->update( 'job' ) |
744 | ->set( [ |
745 | 'job_token' => '', |
746 | 'job_token_timestamp' => $dbw->timestamp( $now ) // time of release |
747 | ] ) |
748 | ->where( [ |
749 | 'job_id' => $ids, |
750 | $dbw->expr( 'job_token', '!=', '' ), |
751 | ] ) |
752 | ->caller( __METHOD__ )->execute(); |
753 | |
754 | $affected = $dbw->affectedRows(); |
755 | $count += $affected; |
756 | $this->incrStats( 'recycles', $this->type, $affected ); |
757 | } |
758 | } |
759 | |
760 | // Just destroy any stale jobs... |
761 | $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); |
762 | $qb = $dbw->newSelectQueryBuilder() |
763 | ->select( 'job_id' ) |
764 | ->from( 'job' ) |
765 | ->where( |
766 | [ |
767 | 'job_cmd' => $this->type, |
768 | $dbw->expr( 'job_token', '!=', '' ), // was acquired |
769 | $dbw->expr( 'job_token_timestamp', '<', $pruneCutoff ) // stale |
770 | ] |
771 | ); |
772 | if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... |
773 | $qb->andWhere( $dbw->expr( 'job_attempts', '>=', $this->maxTries ) ); |
774 | } |
775 | // Get the IDs of jobs that are considered stale and should be removed. Selecting |
776 | // the IDs first means that the UPDATE can be done by primary key (less deadlocks). |
777 | $res = $qb->caller( __METHOD__ )->fetchResultSet(); |
778 | $ids = array_map( |
779 | static function ( $o ) { |
780 | return $o->job_id; |
781 | }, iterator_to_array( $res ) |
782 | ); |
783 | if ( count( $ids ) ) { |
784 | $dbw->newDeleteQueryBuilder() |
785 | ->deleteFrom( 'job' ) |
786 | ->where( [ 'job_id' => $ids ] ) |
787 | ->caller( __METHOD__ )->execute(); |
788 | $affected = $dbw->affectedRows(); |
789 | $count += $affected; |
790 | $this->incrStats( 'abandons', $this->type, $affected ); |
791 | } |
792 | |
793 | $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); |
794 | } catch ( DBError $e ) { |
795 | throw $this->getDBException( $e ); |
796 | } |
797 | |
798 | return $count; |
799 | } |
800 | |
801 | /** |
802 | * @param IJobSpecification $job |
803 | * @param IReadableDatabase $db |
804 | * @return array |
805 | */ |
806 | protected function insertFields( IJobSpecification $job, IReadableDatabase $db ) { |
807 | return [ |
808 | // Fields that describe the nature of the job |
809 | 'job_cmd' => $job->getType(), |
810 | 'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL, |
811 | 'job_title' => $job->getParams()['title'] ?? '', |
812 | 'job_params' => self::makeBlob( $job->getParams() ), |
813 | // Additional job metadata |
814 | 'job_timestamp' => $db->timestamp(), |
815 | 'job_sha1' => Wikimedia\base_convert( |
816 | sha1( serialize( $job->getDeduplicationInfo() ) ), |
817 | 16, 36, 31 |
818 | ), |
819 | 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) |
820 | ]; |
821 | } |
822 | |
823 | /** |
824 | * @throws JobQueueConnectionError |
825 | * @return IDatabase |
826 | */ |
827 | protected function getReplicaDB() { |
828 | try { |
829 | return $this->getDB( DB_REPLICA ); |
830 | } catch ( DBConnectionError $e ) { |
831 | throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); |
832 | } |
833 | } |
834 | |
835 | /** |
836 | * @throws JobQueueConnectionError |
837 | * @return IMaintainableDatabase |
838 | * @since 1.37 |
839 | */ |
840 | protected function getPrimaryDB() { |
841 | try { |
842 | return $this->getDB( DB_PRIMARY ); |
843 | } catch ( DBConnectionError $e ) { |
844 | throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); |
845 | } |
846 | } |
847 | |
848 | /** |
849 | * @param int $index (DB_REPLICA/DB_PRIMARY) |
850 | * @return IMaintainableDatabase |
851 | */ |
852 | protected function getDB( $index ) { |
853 | if ( $this->server ) { |
854 | if ( $this->conn instanceof IDatabase ) { |
855 | return $this->conn; |
856 | } elseif ( $this->conn instanceof DBError ) { |
857 | throw $this->conn; |
858 | } |
859 | |
860 | try { |
861 | $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create( |
862 | $this->server['type'], |
863 | $this->server |
864 | ); |
865 | } catch ( DBError $e ) { |
866 | $this->conn = $e; |
867 | throw $e; |
868 | } |
869 | |
870 | return $this->conn; |
871 | } else { |
872 | $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); |
873 | $lb = is_string( $this->cluster ) |
874 | ? $lbFactory->getExternalLB( $this->cluster ) |
875 | : $lbFactory->getMainLB( $this->domain ); |
876 | |
877 | if ( $lb->getServerType( ServerInfo::WRITER_INDEX ) !== 'sqlite' ) { |
878 | // Keep a separate connection to avoid contention and deadlocks; |
879 | // However, SQLite has the opposite behavior due to DB-level locking. |
880 | $flags = $lb::CONN_TRX_AUTOCOMMIT; |
881 | } else { |
882 | // Jobs insertion will be deferred until the PRESEND stage to reduce contention. |
883 | $flags = 0; |
884 | } |
885 | |
886 | return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags ); |
887 | } |
888 | } |
889 | |
890 | /** |
891 | * @param string $property |
892 | * @return string |
893 | */ |
894 | private function getCacheKey( $property ) { |
895 | $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; |
896 | |
897 | return $this->wanCache->makeGlobalKey( |
898 | 'jobqueue', |
899 | $this->domain, |
900 | $cluster, |
901 | $this->type, |
902 | $property |
903 | ); |
904 | } |
905 | |
906 | /** |
907 | * @param array|false $params |
908 | * @return string |
909 | */ |
910 | protected static function makeBlob( $params ) { |
911 | if ( $params !== false ) { |
912 | return serialize( $params ); |
913 | } else { |
914 | return ''; |
915 | } |
916 | } |
917 | |
918 | /** |
919 | * @param stdClass $row |
920 | * @return RunnableJob |
921 | */ |
922 | protected function jobFromRow( $row ) { |
923 | $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : []; |
924 | if ( !is_array( $params ) ) { // this shouldn't happen |
925 | throw new UnexpectedValueException( |
926 | "Could not unserialize job with ID '{$row->job_id}'." ); |
927 | } |
928 | |
929 | $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ]; |
930 | $job = $this->factoryJob( $row->job_cmd, $params ); |
931 | $job->setMetadata( 'id', $row->job_id ); |
932 | $job->setMetadata( 'timestamp', $row->job_timestamp ); |
933 | |
934 | return $job; |
935 | } |
936 | |
937 | /** |
938 | * @param DBError $e |
939 | * @return JobQueueError |
940 | */ |
941 | protected function getDBException( DBError $e ) { |
942 | return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); |
943 | } |
944 | |
945 | /** |
946 | * Return the list of job fields that should be selected. |
947 | * @since 1.23 |
948 | * @return array |
949 | */ |
950 | public static function selectFields() { |
951 | return [ |
952 | 'job_id', |
953 | 'job_cmd', |
954 | 'job_namespace', |
955 | 'job_title', |
956 | 'job_timestamp', |
957 | 'job_params', |
958 | 'job_random', |
959 | 'job_attempts', |
960 | 'job_token', |
961 | 'job_token_timestamp', |
962 | 'job_sha1', |
963 | ]; |
964 | } |
965 | } |