Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
58.68% |
142 / 242 |
|
16.67% |
2 / 12 |
CRAP | |
0.00% |
0 / 1 |
JobRunner | |
58.68% |
142 / 242 |
|
16.67% |
2 / 12 |
449.01 | |
0.00% |
0 / 1 |
setDebugHandler | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
__construct | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
1 | |||
run | |
80.00% |
64 / 80 |
|
0.00% |
0 / 1 |
31.41 | |||
executeJob | |
88.89% |
8 / 9 |
|
0.00% |
0 / 1 |
2.01 | |||
doExecuteJob | |
70.31% |
45 / 64 |
|
0.00% |
0 / 1 |
11.12 | |||
getErrorBackoffTTL | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
getMaxRssKb | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
2 | |||
getBackoffTimeToWait | |
23.08% |
3 / 13 |
|
0.00% |
0 / 1 |
22.39 | |||
loadBackoffs | |
23.53% |
4 / 17 |
|
0.00% |
0 / 1 |
28.91 | |||
syncBackoffDeltas | |
8.33% |
2 / 24 |
|
0.00% |
0 / 1 |
87.03 | |||
checkMemoryOK | |
26.32% |
5 / 19 |
|
0.00% |
0 / 1 |
15.00 | |||
debugCallback | |
50.00% |
1 / 2 |
|
0.00% |
0 / 1 |
2.50 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | |
21 | use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface; |
22 | use MediaWiki\Cache\LinkCache; |
23 | use MediaWiki\Config\ServiceOptions; |
24 | use MediaWiki\Deferred\DeferredUpdates; |
25 | use MediaWiki\Http\Telemetry; |
26 | use MediaWiki\MainConfigNames; |
27 | use Psr\Log\LoggerInterface; |
28 | use Wikimedia\Rdbms\DBConnectionError; |
29 | use Wikimedia\Rdbms\DBReadOnlyError; |
30 | use Wikimedia\Rdbms\ILBFactory; |
31 | use Wikimedia\Rdbms\ReadOnlyMode; |
32 | |
33 | /** |
34 | * Job queue runner utility methods. |
35 | * |
36 | * @since 1.24 |
37 | * @ingroup JobQueue |
38 | */ |
39 | class JobRunner { |
40 | |
41 | /** |
42 | * @internal For use by ServiceWiring |
43 | */ |
44 | public const CONSTRUCTOR_OPTIONS = [ |
45 | MainConfigNames::JobBackoffThrottling, |
46 | MainConfigNames::JobClasses, |
47 | MainConfigNames::MaxJobDBWriteDuration, |
48 | MainConfigNames::TrxProfilerLimits, |
49 | ]; |
50 | |
51 | /** @var ServiceOptions */ |
52 | private $options; |
53 | |
54 | /** @var ILBFactory */ |
55 | private $lbFactory; |
56 | |
57 | /** @var JobQueueGroup */ |
58 | private $jobQueueGroup; |
59 | |
60 | /** @var ReadOnlyMode */ |
61 | private $readOnlyMode; |
62 | |
63 | /** @var LinkCache */ |
64 | private $linkCache; |
65 | |
66 | /** @var StatsdDataFactoryInterface */ |
67 | private $stats; |
68 | |
69 | /** @var callable|null Debug output handler */ |
70 | private $debug; |
71 | |
72 | /** @var LoggerInterface */ |
73 | private $logger; |
74 | |
75 | /** @var int Abort if more than this much DB lag is present */ |
76 | private const MAX_ALLOWED_LAG = 3; |
77 | /** @var int An appropriate timeout to balance lag avoidance and job progress */ |
78 | private const SYNC_TIMEOUT = self::MAX_ALLOWED_LAG; |
79 | /** @var float Check replica DB lag this many seconds */ |
80 | private const LAG_CHECK_PERIOD = 1.0; |
81 | /** @var int Seconds to back off a queue due to errors */ |
82 | private const ERROR_BACKOFF_TTL = 1; |
83 | /** @var int Seconds to back off a queue due to read-only errors */ |
84 | private const READONLY_BACKOFF_TTL = 30; |
85 | |
86 | /** |
87 | * @param callable $debug Optional debug output handler |
88 | */ |
89 | public function setDebugHandler( $debug ) { |
90 | $this->debug = $debug; |
91 | } |
92 | |
93 | /** |
94 | * @internal For use by ServiceWiring |
95 | * @param ServiceOptions $serviceOptions |
96 | * @param ILBFactory $lbFactory |
97 | * @param JobQueueGroup $jobQueueGroup The JobQueueGroup for this wiki |
98 | * @param ReadOnlyMode $readOnlyMode |
99 | * @param LinkCache $linkCache |
100 | * @param StatsdDataFactoryInterface $statsdDataFactory |
101 | * @param LoggerInterface $logger |
102 | */ |
103 | public function __construct( |
104 | ServiceOptions $serviceOptions, |
105 | ILBFactory $lbFactory, |
106 | JobQueueGroup $jobQueueGroup, |
107 | ReadOnlyMode $readOnlyMode, |
108 | LinkCache $linkCache, |
109 | StatsdDataFactoryInterface $statsdDataFactory, |
110 | LoggerInterface $logger |
111 | ) { |
112 | $serviceOptions->assertRequiredOptions( self::CONSTRUCTOR_OPTIONS ); |
113 | $this->options = $serviceOptions; |
114 | $this->lbFactory = $lbFactory; |
115 | $this->jobQueueGroup = $jobQueueGroup; |
116 | $this->readOnlyMode = $readOnlyMode; |
117 | $this->linkCache = $linkCache; |
118 | $this->stats = $statsdDataFactory; |
119 | $this->logger = $logger; |
120 | } |
121 | |
122 | /** |
123 | * Run jobs of the specified number/type for the specified time |
124 | * |
125 | * The response map has a 'job' field that lists status of each job, including: |
126 | * - type : the job/queue type |
127 | * - status : ok/failed |
128 | * - error : any error message string |
129 | * - time : the job run time in ms |
130 | * The response map also has: |
131 | * - backoffs : the (job/queue type => seconds) map of backoff times |
132 | * - elapsed : the total time spent running tasks in ms |
133 | * - reached : the reason the script finished, one of (none-ready, job-limit, time-limit, |
134 | * memory-limit, exception) |
135 | * |
136 | * This method outputs status information only if a debug handler was set. |
137 | * Any exceptions are caught and logged, but are not reported as output. |
138 | * |
139 | * @param array $options Map of parameters: |
140 | * - type : specified job/queue type (or false for the default types) |
141 | * - maxJobs : maximum number of jobs to run |
142 | * - maxTime : maximum time in seconds before stopping |
143 | * - throttle : whether to respect job backoff configuration |
144 | * @return array Summary response that can easily be JSON serialized |
145 | * @throws JobQueueError |
146 | */ |
147 | public function run( array $options ) { |
148 | $type = $options['type'] ?? false; |
149 | $maxJobs = $options['maxJobs'] ?? false; |
150 | $maxTime = $options['maxTime'] ?? false; |
151 | $throttle = $options['throttle'] ?? true; |
152 | |
153 | $jobClasses = $this->options->get( MainConfigNames::JobClasses ); |
154 | $profilerLimits = $this->options->get( MainConfigNames::TrxProfilerLimits ); |
155 | |
156 | $response = [ 'jobs' => [], 'reached' => 'none-ready' ]; |
157 | |
158 | if ( $type !== false && !isset( $jobClasses[$type] ) ) { |
159 | // Invalid job type specified |
160 | $response['reached'] = 'none-possible'; |
161 | return $response; |
162 | } |
163 | |
164 | if ( $this->readOnlyMode->isReadOnly() ) { |
165 | // Any jobs popped off the queue might fail to run and thus might end up lost |
166 | $response['reached'] = 'read-only'; |
167 | return $response; |
168 | } |
169 | |
170 | [ , $maxLag ] = $this->lbFactory->getMainLB()->getMaxLag(); |
171 | if ( $maxLag >= self::MAX_ALLOWED_LAG ) { |
172 | // DB lag is already too high; caller can immediately try other wikis if applicable |
173 | $response['reached'] = 'replica-lag-limit'; |
174 | return $response; |
175 | } |
176 | |
177 | // Narrow DB query expectations for this HTTP request |
178 | $this->lbFactory->getTransactionProfiler() |
179 | ->setExpectations( $profilerLimits['JobRunner'], __METHOD__ ); |
180 | |
181 | // Error out if an explicit DB transaction round is somehow active |
182 | if ( $this->lbFactory->hasTransactionRound() ) { |
183 | throw new LogicException( __METHOD__ . ' called with an active transaction round.' ); |
184 | } |
185 | |
186 | // Some jobs types should not run until a certain timestamp |
187 | $backoffs = []; // map of (type => UNIX expiry) |
188 | $backoffDeltas = []; // map of (type => seconds) |
189 | $wait = 'wait'; // block to read backoffs the first time |
190 | |
191 | $loopStartTime = microtime( true ); |
192 | $jobsPopped = 0; |
193 | $timeMsTotal = 0; |
194 | $lastSyncTime = 1; // initialize "last sync check timestamp" to "ages ago" |
195 | // Keep popping and running jobs until there are no more... |
196 | do { |
197 | // Sync the persistent backoffs with concurrent runners |
198 | $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); |
199 | $backoffKeys = $throttle ? array_keys( $backoffs ) : []; |
200 | $wait = 'nowait'; // less important now |
201 | |
202 | if ( $type === false ) { |
203 | // Treat the default job type queues as a single queue and pop off a job |
204 | $job = $this->jobQueueGroup |
205 | ->pop( JobQueueGroup::TYPE_DEFAULT, JobQueueGroup::USE_CACHE, $backoffKeys ); |
206 | } else { |
207 | // Pop off a job from the specified job type queue unless the execution of |
208 | // that type of job is currently rate-limited by the back-off list |
209 | $job = in_array( $type, $backoffKeys ) ? false : $this->jobQueueGroup->pop( $type ); |
210 | } |
211 | |
212 | if ( $job ) { |
213 | ++$jobsPopped; |
214 | $jType = $job->getType(); |
215 | |
216 | // Back off of certain jobs for a while (for throttling and for errors) |
217 | $ttw = $this->getBackoffTimeToWait( $job ); |
218 | if ( $ttw > 0 ) { |
219 | // Always add the delta for other runners in case the time running the |
220 | // job negated the backoff for each individually but not collectively. |
221 | $backoffDeltas[$jType] = ( $backoffDeltas[$jType] ?? 0 ) + $ttw; |
222 | $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); |
223 | } |
224 | |
225 | $info = $this->executeJob( $job ); |
226 | |
227 | // Mark completed or "one shot only" jobs as resolved |
228 | if ( $info['status'] !== false || !$job->allowRetries() ) { |
229 | $this->jobQueueGroup->ack( $job ); |
230 | } |
231 | |
232 | // Back off of certain jobs for a while (for throttling and for errors) |
233 | if ( $info['status'] === false && mt_rand( 0, 49 ) == 0 ) { |
234 | $ttw = max( $ttw, $this->getErrorBackoffTTL( $info['caught'] ) ); |
235 | $backoffDeltas[$jType] = ( $backoffDeltas[$jType] ?? 0 ) + $ttw; |
236 | } |
237 | |
238 | $response['jobs'][] = [ |
239 | 'type' => $jType, |
240 | 'status' => ( $info['status'] === false ) ? 'failed' : 'ok', |
241 | 'error' => $info['error'], |
242 | 'time' => $info['timeMs'] |
243 | ]; |
244 | $timeMsTotal += $info['timeMs']; |
245 | |
246 | // Break out if we hit the job count or wall time limits |
247 | if ( $maxJobs && $jobsPopped >= $maxJobs ) { |
248 | $response['reached'] = 'job-limit'; |
249 | break; |
250 | } elseif ( $maxTime && ( microtime( true ) - $loopStartTime ) > $maxTime ) { |
251 | $response['reached'] = 'time-limit'; |
252 | break; |
253 | } |
254 | |
255 | // Stop if we caught a DBConnectionError. In theory it would be |
256 | // possible to explicitly reconnect, but the present behaviour |
257 | // is to just throw more exceptions every time something database- |
258 | // related is attempted. |
259 | if ( in_array( DBConnectionError::class, $info['caught'], true ) ) { |
260 | $response['reached'] = 'exception'; |
261 | break; |
262 | } |
263 | |
264 | // Don't let any of the main DB replica DBs get backed up. |
265 | // This only waits for so long before exiting and letting |
266 | // other wikis in the farm (on different masters) get a chance. |
267 | $timePassed = microtime( true ) - $lastSyncTime; |
268 | if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) { |
269 | $opts = [ 'ifWritesSince' => $lastSyncTime, 'timeout' => self::SYNC_TIMEOUT ]; |
270 | if ( !$this->lbFactory->waitForReplication( $opts ) ) { |
271 | $response['reached'] = 'replica-lag-limit'; |
272 | break; |
273 | } |
274 | $lastSyncTime = microtime( true ); |
275 | } |
276 | |
277 | // Abort if nearing OOM to avoid erroring out in the middle of a job |
278 | if ( !$this->checkMemoryOK() ) { |
279 | $response['reached'] = 'memory-limit'; |
280 | break; |
281 | } |
282 | } |
283 | } while ( $job ); |
284 | |
285 | // Sync the persistent backoffs for the next runJobs.php pass |
286 | if ( $backoffDeltas ) { |
287 | $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' ); |
288 | } |
289 | |
290 | $response['backoffs'] = $backoffs; |
291 | $response['elapsed'] = $timeMsTotal; |
292 | |
293 | return $response; |
294 | } |
295 | |
296 | /** |
297 | * Run a specific job in a manner appropriate for mass use by job dispatchers |
298 | * |
299 | * Wraps the job's run() and tearDown() methods into appropriate transaction rounds. |
300 | * During execution, SPI-based logging will use the ID of the HTTP request that spawned |
301 | * the job (instead of the current one). Large DB write transactions will be subject to |
302 | * $wgMaxJobDBWriteDuration. |
303 | * |
304 | * This should never be called if there are explicit transaction rounds or pending DB writes |
305 | * |
306 | * @param RunnableJob $job |
307 | * @return array Map of: |
308 | * - status: boolean; whether the job succeed |
309 | * - error: error string; empty if there was no error specified |
310 | * - caught: list of FQCNs corresponding to any exceptions caught |
311 | * - timeMs: float; job execution time in milliseconds |
312 | * @since 1.35 |
313 | */ |
314 | public function executeJob( RunnableJob $job ) { |
315 | $telemetry = Telemetry::getInstance(); |
316 | $oldRequestId = $telemetry->getRequestId(); |
317 | |
318 | if ( $job->getRequestId() !== null ) { |
319 | // Temporarily inherit the original ID of the web request that spawned this job |
320 | $telemetry->overrideRequestId( $job->getRequestId() ); |
321 | } else { |
322 | // TODO: do we need to regenerate if job doesn't have the request id? |
323 | // If JobRunner was called with X-Request-ID header, regeneration will generate the |
324 | // same value |
325 | $telemetry->regenerateRequestId(); |
326 | } |
327 | // Use an appropriate timeout to balance lag avoidance and job progress |
328 | $oldTimeout = $this->lbFactory->setDefaultReplicationWaitTimeout( self::SYNC_TIMEOUT ); |
329 | try { |
330 | return $this->doExecuteJob( $job ); |
331 | } finally { |
332 | $this->lbFactory->setDefaultReplicationWaitTimeout( $oldTimeout ); |
333 | $telemetry->overrideRequestId( $oldRequestId ); |
334 | } |
335 | } |
336 | |
337 | /** |
338 | * @param RunnableJob $job |
339 | * @return array Map of: |
340 | * - status: boolean; whether the job succeed |
341 | * - error: error string; empty if there was no error specified |
342 | * - caught: list of FQCNs corresponding to any exceptions caught |
343 | * - timeMs: float; job execution time in milliseconds |
344 | */ |
345 | private function doExecuteJob( RunnableJob $job ) { |
346 | $jType = $job->getType(); |
347 | $msg = $job->toString() . " STARTING"; |
348 | $this->logger->debug( $msg, [ 'job_type' => $job->getType() ] ); |
349 | $this->debugCallback( $msg ); |
350 | |
351 | // Clear out title cache data from prior snapshots |
352 | // (e.g. from before JobRunner was invoked in this process) |
353 | $this->linkCache->clear(); |
354 | |
355 | // Run the job... |
356 | $caught = []; |
357 | $rssStart = $this->getMaxRssKb(); |
358 | $jobStartTime = microtime( true ); |
359 | try { |
360 | $fnameTrxOwner = get_class( $job ) . '::run'; // give run() outer scope |
361 | // Flush any pending changes left over from an implicit transaction round |
362 | if ( $job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) { |
363 | $this->lbFactory->commitPrimaryChanges( $fnameTrxOwner ); // new implicit round |
364 | } else { |
365 | $this->lbFactory->beginPrimaryChanges( $fnameTrxOwner ); // new explicit round |
366 | } |
367 | // Clear any stale REPEATABLE-READ snapshots from replica DB connections |
368 | $status = $job->run(); |
369 | $error = $job->getLastError(); |
370 | // Commit all pending changes from this job |
371 | $this->lbFactory->commitPrimaryChanges( |
372 | $fnameTrxOwner, |
373 | // Abort if any transaction was too big |
374 | $this->options->get( MainConfigNames::MaxJobDBWriteDuration ) |
375 | ); |
376 | // Run any deferred update tasks; doUpdates() manages transactions itself |
377 | DeferredUpdates::doUpdates(); |
378 | } catch ( Throwable $e ) { |
379 | MWExceptionHandler::rollbackPrimaryChangesAndLog( $e ); |
380 | $status = false; |
381 | $error = get_class( $e ) . ': ' . $e->getMessage() . ' in ' |
382 | . $e->getFile() . ' on line ' . $e->getLine(); |
383 | $caught[] = get_class( $e ); |
384 | } |
385 | // Always attempt to call teardown(), even if Job throws exception |
386 | try { |
387 | $job->tearDown( $status ); |
388 | } catch ( Throwable $e ) { |
389 | MWExceptionHandler::logException( $e ); |
390 | } |
391 | |
392 | $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 ); |
393 | $rssEnd = $this->getMaxRssKb(); |
394 | |
395 | // Record how long jobs wait before getting popped |
396 | $readyTs = $job->getReadyTimestamp(); |
397 | if ( $readyTs ) { |
398 | $pickupDelay = max( 0, $jobStartTime - $readyTs ); |
399 | $this->stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay ); |
400 | $this->stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay ); |
401 | } |
402 | // Record root job age for jobs being run |
403 | $rootTimestamp = $job->getRootJobParams()['rootJobTimestamp']; |
404 | if ( $rootTimestamp ) { |
405 | $age = max( 0, $jobStartTime - (int)wfTimestamp( TS_UNIX, $rootTimestamp ) ); |
406 | $this->stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age ); |
407 | } |
408 | // Track the execution time for jobs |
409 | $this->stats->timing( "jobqueue.run.$jType", $timeMs ); |
410 | // Track RSS increases for jobs (in case of memory leaks) |
411 | if ( $rssStart && $rssEnd ) { |
412 | $this->stats->updateCount( "jobqueue.rss_delta.$jType", $rssEnd - $rssStart ); |
413 | } |
414 | |
415 | if ( $status === false ) { |
416 | $msg = $job->toString() . " t={job_duration} error={job_error}"; |
417 | $this->logger->error( $msg, [ |
418 | 'job_type' => $job->getType(), |
419 | 'job_duration' => $timeMs, |
420 | 'job_error' => $error, |
421 | ] ); |
422 | |
423 | $msg = $job->toString() . " t=$timeMs error={$error}"; |
424 | $this->debugCallback( $msg ); |
425 | } else { |
426 | $msg = $job->toString() . " t={job_duration} good"; |
427 | $this->logger->info( $msg, [ |
428 | 'job_type' => $job->getType(), |
429 | 'job_duration' => $timeMs, |
430 | ] ); |
431 | |
432 | $msg = $job->toString() . " t=$timeMs good"; |
433 | $this->debugCallback( $msg ); |
434 | } |
435 | |
436 | return [ |
437 | 'status' => $status, |
438 | 'error' => $error, |
439 | 'caught' => $caught, |
440 | 'timeMs' => $timeMs |
441 | ]; |
442 | } |
443 | |
444 | /** |
445 | * @param string[] $caught List of FQCNs corresponding to any exceptions caught |
446 | * @return int TTL in seconds |
447 | */ |
448 | private function getErrorBackoffTTL( array $caught ) { |
449 | return in_array( DBReadOnlyError::class, $caught ) |
450 | ? self::READONLY_BACKOFF_TTL |
451 | : self::ERROR_BACKOFF_TTL; |
452 | } |
453 | |
454 | /** |
455 | * @return int|null Max memory RSS in kilobytes |
456 | */ |
457 | private function getMaxRssKb() { |
458 | $info = getrusage( 0 /* RUSAGE_SELF */ ); |
459 | // see https://linux.die.net/man/2/getrusage |
460 | return isset( $info['ru_maxrss'] ) ? (int)$info['ru_maxrss'] : null; |
461 | } |
462 | |
463 | /** |
464 | * @param RunnableJob $job |
465 | * @return int Seconds for this runner to avoid doing more jobs of this type |
466 | * @see $wgJobBackoffThrottling |
467 | */ |
468 | private function getBackoffTimeToWait( RunnableJob $job ) { |
469 | $throttling = $this->options->get( MainConfigNames::JobBackoffThrottling ); |
470 | |
471 | if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) { |
472 | return 0; // not throttled |
473 | } |
474 | |
475 | $itemsPerSecond = $throttling[$job->getType()]; |
476 | if ( $itemsPerSecond <= 0 ) { |
477 | return 0; // not throttled |
478 | } |
479 | |
480 | $seconds = 0; |
481 | if ( $job->workItemCount() > 0 ) { |
482 | $exactSeconds = $job->workItemCount() / $itemsPerSecond; |
483 | // use randomized rounding |
484 | $seconds = floor( $exactSeconds ); |
485 | $remainder = $exactSeconds - $seconds; |
486 | $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0; |
487 | } |
488 | |
489 | return (int)$seconds; |
490 | } |
491 | |
492 | /** |
493 | * Get the previous backoff expiries from persistent storage |
494 | * On I/O or lock acquisition failure this returns the original $backoffs. |
495 | * |
496 | * @param array $backoffs Map of (job type => UNIX timestamp) |
497 | * @param string $mode Lock wait mode - "wait" or "nowait" |
498 | * @return array Map of (job type => backoff expiry timestamp) |
499 | */ |
500 | private function loadBackoffs( array $backoffs, $mode = 'wait' ) { |
501 | $file = wfTempDir() . '/mw-runJobs-backoffs.json'; |
502 | if ( is_file( $file ) ) { |
503 | $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0; |
504 | $handle = fopen( $file, 'rb' ); |
505 | if ( !flock( $handle, LOCK_SH | $noblock ) ) { |
506 | fclose( $handle ); |
507 | return $backoffs; // don't wait on lock |
508 | } |
509 | $content = stream_get_contents( $handle ); |
510 | flock( $handle, LOCK_UN ); |
511 | fclose( $handle ); |
512 | $ctime = microtime( true ); |
513 | $cBackoffs = json_decode( $content, true ) ?: []; |
514 | foreach ( $cBackoffs as $type => $timestamp ) { |
515 | if ( $timestamp < $ctime ) { |
516 | unset( $cBackoffs[$type] ); |
517 | } |
518 | } |
519 | } else { |
520 | $cBackoffs = []; |
521 | } |
522 | |
523 | return $cBackoffs; |
524 | } |
525 | |
526 | /** |
527 | * Merge the current backoff expiries from persistent storage |
528 | * |
529 | * The $deltas map is set to an empty array on success. |
530 | * On I/O or lock acquisition failure this returns the original $backoffs. |
531 | * |
532 | * @param array $backoffs Map of (job type => UNIX timestamp) |
533 | * @param array &$deltas Map of (job type => seconds) |
534 | * @param string $mode Lock wait mode - "wait" or "nowait" |
535 | * @return array The new backoffs account for $backoffs and the latest file data |
536 | */ |
537 | private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) { |
538 | if ( !$deltas ) { |
539 | return $this->loadBackoffs( $backoffs, $mode ); |
540 | } |
541 | |
542 | $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0; |
543 | $file = wfTempDir() . '/mw-runJobs-backoffs.json'; |
544 | $handle = fopen( $file, 'wb+' ); |
545 | if ( !flock( $handle, LOCK_EX | $noblock ) ) { |
546 | fclose( $handle ); |
547 | return $backoffs; // don't wait on lock |
548 | } |
549 | $ctime = microtime( true ); |
550 | $content = stream_get_contents( $handle ); |
551 | $cBackoffs = json_decode( $content, true ) ?: []; |
552 | foreach ( $deltas as $type => $seconds ) { |
553 | $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime |
554 | ? $cBackoffs[$type] + $seconds |
555 | : $ctime + $seconds; |
556 | } |
557 | foreach ( $cBackoffs as $type => $timestamp ) { |
558 | if ( $timestamp < $ctime ) { |
559 | unset( $cBackoffs[$type] ); |
560 | } |
561 | } |
562 | ftruncate( $handle, 0 ); |
563 | fwrite( $handle, json_encode( $cBackoffs ) ); |
564 | flock( $handle, LOCK_UN ); |
565 | fclose( $handle ); |
566 | |
567 | $deltas = []; |
568 | |
569 | return $cBackoffs; |
570 | } |
571 | |
572 | /** |
573 | * Make sure that this script is not too close to the memory usage limit. |
574 | * It is better to die in between jobs than OOM right in the middle of one. |
575 | * @return bool |
576 | */ |
577 | private function checkMemoryOK() { |
578 | static $maxBytes = null; |
579 | if ( $maxBytes === null ) { |
580 | $m = []; |
581 | if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) { |
582 | [ , $num, $unit ] = $m; |
583 | $conv = [ 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ]; |
584 | $maxBytes = (int)$num * $conv[strtolower( $unit )]; |
585 | } else { |
586 | $maxBytes = 0; |
587 | } |
588 | } |
589 | $usedBytes = memory_get_usage(); |
590 | if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) { |
591 | $msg = "Detected excessive memory usage ({used_bytes}/{max_bytes})."; |
592 | $this->logger->error( $msg, [ |
593 | 'used_bytes' => $usedBytes, |
594 | 'max_bytes' => $maxBytes, |
595 | ] ); |
596 | |
597 | $msg = "Detected excessive memory usage ($usedBytes/$maxBytes)."; |
598 | $this->debugCallback( $msg ); |
599 | |
600 | return false; |
601 | } |
602 | |
603 | return true; |
604 | } |
605 | |
606 | /** |
607 | * Log the job message |
608 | * @param string $msg The message to log |
609 | */ |
610 | private function debugCallback( $msg ) { |
611 | if ( $this->debug ) { |
612 | call_user_func_array( $this->debug, [ wfTimestamp( TS_DB ) . " $msg\n" ] ); |
613 | } |
614 | } |
615 | } |