Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
| Total | |
55.56% |
75 / 135 |
|
60.00% |
3 / 5 |
CRAP | |
0.00% |
0 / 1 |
| JobExecutor | |
55.56% |
75 / 135 |
|
60.00% |
3 / 5 |
59.72 | |
0.00% |
0 / 1 |
| execute | |
54.37% |
56 / 103 |
|
0.00% |
0 / 1 |
22.50 | |||
| getJobFromParams | |
43.48% |
10 / 23 |
|
0.00% |
0 / 1 |
6.89 | |||
| logger | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
| config | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
| stats | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
| 1 | <?php |
| 2 | |
| 3 | /** |
| 4 | * A massively simplified JobRunner with a solo purpose of |
| 5 | * executing a Job |
| 6 | */ |
| 7 | |
| 8 | namespace MediaWiki\Extension\EventBus; |
| 9 | |
| 10 | use Exception; |
| 11 | use MediaWiki\Config\Config; |
| 12 | use MediaWiki\Deferred\DeferredUpdates; |
| 13 | use MediaWiki\Exception\MWExceptionHandler; |
| 14 | use MediaWiki\Http\Telemetry; |
| 15 | use MediaWiki\JobQueue\Job; |
| 16 | use MediaWiki\Logger\LoggerFactory; |
| 17 | use MediaWiki\MediaWikiServices; |
| 18 | use Psr\Log\LoggerInterface; |
| 19 | use Wikimedia\Stats\StatsFactory; |
| 20 | use Wikimedia\Telemetry\SpanInterface; |
| 21 | |
| 22 | class JobExecutor { |
| 23 | |
| 24 | /** @var LoggerInterface instance for all JobExecutor instances */ |
| 25 | private static $logger; |
| 26 | |
| 27 | /** @var StatsFactory instance |
| 28 | * for all JobExecutor instances |
| 29 | */ |
| 30 | private static $statsFactory; |
| 31 | |
| 32 | /** |
| 33 | * @var Config a references to the wiki config |
| 34 | */ |
| 35 | private static $config; |
| 36 | |
| 37 | /** |
| 38 | * @param array $jobEvent the job event |
| 39 | * @return array containing the response status and the message |
| 40 | */ |
| 41 | public function execute( $jobEvent ) { |
| 42 | $startTime = microtime( true ); |
| 43 | $isReadonly = false; |
| 44 | $jobCreateResult = $this->getJobFromParams( $jobEvent ); |
| 45 | |
| 46 | if ( !$jobCreateResult['status'] ) { |
| 47 | $this->logger()->error( 'Failed creating job from description', [ |
| 48 | 'job_type' => $jobEvent['type'], |
| 49 | 'message' => $jobCreateResult['message'] |
| 50 | ] ); |
| 51 | $jobCreateResult['readonly'] = false; |
| 52 | return $jobCreateResult; |
| 53 | } |
| 54 | |
| 55 | // Wrap job execution in a span to easily identify job types in traces. |
| 56 | $tracer = MediaWikiServices::getInstance()->getTracer(); |
| 57 | $span = $tracer->createSpan( 'execute job' ) |
| 58 | ->setAttributes( [ 'org.wikimedia.eventbus.job.type' => $jobEvent['type'] ] ) |
| 59 | ->start(); |
| 60 | $tracerScope = $span->activate(); |
| 61 | |
| 62 | /** @var Job $job */ |
| 63 | $job = $jobCreateResult['job']; |
| 64 | $this->logger()->debug( 'Beginning job execution', [ |
| 65 | 'job' => $job->toString(), |
| 66 | 'job_type' => $job->getType() |
| 67 | ] ); |
| 68 | |
| 69 | $logContextScope = LoggerFactory::getContext()->addScoped( [ |
| 70 | 'context.job_type' => $job->getType(), |
| 71 | ] ); |
| 72 | |
| 73 | $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); |
| 74 | $telemetry = Telemetry::getInstance(); |
| 75 | |
| 76 | if ( $job->getRequestId() !== null ) { |
| 77 | $telemetry->overrideRequestId( $job->getRequestId() ); |
| 78 | } else { |
| 79 | $telemetry->regenerateRequestId(); |
| 80 | } |
| 81 | // Clear out title cache data from prior snapshots |
| 82 | MediaWikiServices::getInstance()->getLinkCache()->clear(); |
| 83 | |
| 84 | // Actually execute the job |
| 85 | try { |
| 86 | $fnameTrxOwner = get_class( $job ) . '::run'; |
| 87 | // Flush any pending changes left over from an implicit transaction round |
| 88 | if ( $job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) { |
| 89 | $lbFactory->commitPrimaryChanges( $fnameTrxOwner ); |
| 90 | } else { |
| 91 | $lbFactory->beginPrimaryChanges( $fnameTrxOwner ); |
| 92 | } |
| 93 | // Clear any stale REPEATABLE-READ snapshots from replica DB connections |
| 94 | $status = $job->run(); |
| 95 | // Commit all pending changes from this job |
| 96 | $lbFactory->commitPrimaryChanges( |
| 97 | $fnameTrxOwner, |
| 98 | // Abort if any transaction was too big |
| 99 | $this->config()->get( 'MaxJobDBWriteDuration' ) |
| 100 | ); |
| 101 | |
| 102 | if ( $status === false ) { |
| 103 | $message = $job->getLastError(); |
| 104 | $this->logger()->error( 'Failed executing job: ' . $job->toString(), [ |
| 105 | 'job_type' => $job->getType(), |
| 106 | 'error' => $message |
| 107 | ] ); |
| 108 | } elseif ( !is_bool( $status ) ) { |
| 109 | $message = 'Success, but no status returned'; |
| 110 | $this->logger()->warning( 'Non-boolean result returned by job: ' . $job->toString(), |
| 111 | [ |
| 112 | 'job_type' => $job->getType(), |
| 113 | 'job_result' => $status ?? 'unset' |
| 114 | ] ); |
| 115 | // For backwards compatibility with old job executor we should set the status |
| 116 | // to true here, as before anything other then boolean false was considered a success. |
| 117 | // TODO: After all the jobs are fixed to return proper result this should be removed. |
| 118 | $status = true; |
| 119 | } else { |
| 120 | $message = 'success'; |
| 121 | } |
| 122 | |
| 123 | // Run any deferred update tasks; doUpdates() manages transactions itself |
| 124 | DeferredUpdates::doUpdates(); |
| 125 | } catch ( \Wikimedia\Rdbms\DBReadOnlyError $e ) { |
| 126 | $status = false; |
| 127 | $isReadonly = true; |
| 128 | $message = 'Database is in read-only mode'; |
| 129 | MWExceptionHandler::rollbackPrimaryChangesAndLog( $e, MWExceptionHandler::CAUGHT_BY_ENTRYPOINT ); |
| 130 | } catch ( Exception $e ) { |
| 131 | MWExceptionHandler::rollbackPrimaryChangesAndLog( $e, MWExceptionHandler::CAUGHT_BY_ENTRYPOINT ); |
| 132 | $status = false; |
| 133 | $message = 'Exception executing job: ' |
| 134 | . $job->toString() . ' : ' |
| 135 | . get_class( $e ) . ': ' . $e->getMessage(); |
| 136 | $this->logger()->error( $message, |
| 137 | [ |
| 138 | 'job_type' => $job->getType(), |
| 139 | 'exception' => $e |
| 140 | ] |
| 141 | ); |
| 142 | } |
| 143 | |
| 144 | // Always attempt to call teardown() even if Job throws exception. |
| 145 | try { |
| 146 | $job->teardown( $status ); |
| 147 | } catch ( Exception $e ) { |
| 148 | $message = 'Exception tearing down job: ' |
| 149 | . $job->toString() . ' : ' |
| 150 | . get_class( $e ) . ': ' . $e->getMessage(); |
| 151 | $this->logger()->error( $message, |
| 152 | [ |
| 153 | 'job_type' => $job->getType(), |
| 154 | 'exception' => $e |
| 155 | ] |
| 156 | ); |
| 157 | } |
| 158 | |
| 159 | // The JobRunner at this point makes some cleanups to prepare for |
| 160 | // the next Job execution. However since we run one job at a time |
| 161 | // we don't need that. |
| 162 | |
| 163 | // Report pure job execution timing |
| 164 | $jobDuration = microtime( true ) - $startTime; |
| 165 | self::stats()->getTiming( 'jobexecutor_exec_runtime_seconds' ) |
| 166 | ->setLabel( 'type', $job->getType() ) |
| 167 | ->copyToStatsdAt( "jobexecutor.{$job->getType()}.exec" ) |
| 168 | ->observeSeconds( $jobDuration ); |
| 169 | $this->logger()->info( 'Finished job execution', |
| 170 | [ |
| 171 | 'job' => $job->toString(), |
| 172 | 'job_type' => $job->getType(), |
| 173 | 'job_status' => $status, |
| 174 | 'job_duration' => $jobDuration |
| 175 | ] |
| 176 | ); |
| 177 | |
| 178 | $span->setSpanStatus( $status ? SpanInterface::SPAN_STATUS_OK : SpanInterface::SPAN_STATUS_ERROR ); |
| 179 | |
| 180 | if ( !$job->allowRetries() ) { |
| 181 | // Report success if the job doesn't allow retries |
| 182 | // even if actually the job has failed. |
| 183 | $status = true; |
| 184 | } |
| 185 | |
| 186 | return [ |
| 187 | 'status' => $status, |
| 188 | 'readonly' => $isReadonly, |
| 189 | 'message' => $message |
| 190 | ]; |
| 191 | } |
| 192 | |
| 193 | /** |
| 194 | * @param array $jobEvent containing the job EventBus event. |
| 195 | * @return array containing the Job, status and potentially error message |
| 196 | */ |
| 197 | private function getJobFromParams( $jobEvent ) { |
| 198 | if ( !isset( $jobEvent['type'] ) ) { |
| 199 | return [ |
| 200 | 'status' => false, |
| 201 | 'message' => 'Job event type is not defined' |
| 202 | ]; |
| 203 | } |
| 204 | |
| 205 | $jobType = $jobEvent['type']; |
| 206 | $params = $jobEvent['params']; |
| 207 | |
| 208 | try { |
| 209 | $jobFactory = MediaWikiServices::getInstance()->getJobFactory(); |
| 210 | $job = $jobFactory->newJob( $jobType, $params ); |
| 211 | } catch ( Exception $e ) { |
| 212 | return [ |
| 213 | 'status' => false, |
| 214 | 'message' => $e->getMessage() |
| 215 | ]; |
| 216 | } |
| 217 | |
| 218 | // @phan-suppress-next-line PhanImpossibleTypeComparison |
| 219 | if ( $job === null ) { |
| 220 | return [ |
| 221 | 'status' => false, |
| 222 | 'message' => 'Could not create a job from event' |
| 223 | ]; |
| 224 | } |
| 225 | |
| 226 | return [ |
| 227 | 'status' => true, |
| 228 | 'job' => $job |
| 229 | ]; |
| 230 | } |
| 231 | |
| 232 | /** |
| 233 | * Returns a singleton logger instance for all JobExecutor instances. |
| 234 | * Use like: self::logger()->info( $message ) |
| 235 | * We use this so we don't have to check if the logger has been created |
| 236 | * before attempting to log a message. |
| 237 | * @return LoggerInterface |
| 238 | */ |
| 239 | private static function logger() { |
| 240 | if ( !self::$logger ) { |
| 241 | self::$logger = LoggerFactory::getInstance( 'JobExecutor' ); |
| 242 | } |
| 243 | return self::$logger; |
| 244 | } |
| 245 | |
| 246 | /** |
| 247 | * Returns a singleton config instance for all JobExecutor instances. |
| 248 | * Use like: self::config()->get( 'SomeConfigParameter' ) |
| 249 | * @return Config |
| 250 | */ |
| 251 | private static function config() { |
| 252 | if ( !self::$config ) { |
| 253 | self::$config = MediaWikiServices::getInstance()->getMainConfig(); |
| 254 | } |
| 255 | return self::$config; |
| 256 | } |
| 257 | |
| 258 | /** |
| 259 | * Returns a singleton stats reporter instance for all JobExecutor instances. |
| 260 | * Use like self::stats()->getGauge() |
| 261 | * @return StatsFactory |
| 262 | */ |
| 263 | private static function stats() { |
| 264 | if ( !self::$statsFactory ) { |
| 265 | self::$statsFactory = MediaWikiServices::getInstance()->getStatsFactory(); |
| 266 | } |
| 267 | return self::$statsFactory; |
| 268 | } |
| 269 | } |