Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
55.56% covered (warning)
55.56%
75 / 135
60.00% covered (warning)
60.00%
3 / 5
CRAP
0.00% covered (danger)
0.00%
0 / 1
JobExecutor
55.56% covered (warning)
55.56%
75 / 135
60.00% covered (warning)
60.00%
3 / 5
59.72
0.00% covered (danger)
0.00%
0 / 1
 execute
54.37% covered (warning)
54.37%
56 / 103
0.00% covered (danger)
0.00%
0 / 1
22.50
 getJobFromParams
43.48% covered (danger)
43.48%
10 / 23
0.00% covered (danger)
0.00%
0 / 1
6.89
 logger
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 config
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 stats
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
1<?php
2
3/**
4 * A massively simplified JobRunner with a solo purpose of
5 * executing a Job
6 */
7
8namespace MediaWiki\Extension\EventBus;
9
10use Exception;
11use MediaWiki\Config\Config;
12use MediaWiki\Deferred\DeferredUpdates;
13use MediaWiki\Exception\MWExceptionHandler;
14use MediaWiki\Http\Telemetry;
15use MediaWiki\JobQueue\Job;
16use MediaWiki\Logger\LoggerFactory;
17use MediaWiki\MediaWikiServices;
18use Psr\Log\LoggerInterface;
19use Wikimedia\Stats\StatsFactory;
20use Wikimedia\Telemetry\SpanInterface;
21
22class JobExecutor {
23
24    /** @var LoggerInterface instance for all JobExecutor instances */
25    private static $logger;
26
27    /** @var StatsFactory instance
28     * for all JobExecutor instances
29     */
30    private static $statsFactory;
31
32    /**
33     * @var Config a references to the wiki config
34     */
35    private static $config;
36
37    /**
38     * @param array $jobEvent the job event
39     * @return array containing the response status and the message
40     */
41    public function execute( $jobEvent ) {
42        $startTime = microtime( true );
43        $isReadonly = false;
44        $jobCreateResult = $this->getJobFromParams( $jobEvent );
45
46        if ( !$jobCreateResult['status'] ) {
47            $this->logger()->error( 'Failed creating job from description', [
48                'job_type' => $jobEvent['type'],
49                'message' => $jobCreateResult['message']
50            ] );
51            $jobCreateResult['readonly'] = false;
52            return $jobCreateResult;
53        }
54
55        // Wrap job execution in a span to easily identify job types in traces.
56        $tracer = MediaWikiServices::getInstance()->getTracer();
57        $span = $tracer->createSpan( 'execute job' )
58            ->setAttributes( [ 'org.wikimedia.eventbus.job.type' => $jobEvent['type'] ] )
59            ->start();
60        $tracerScope = $span->activate();
61
62        /** @var Job $job */
63        $job = $jobCreateResult['job'];
64        $this->logger()->debug( 'Beginning job execution', [
65            'job' => $job->toString(),
66            'job_type' => $job->getType()
67        ] );
68
69        $logContextScope = LoggerFactory::getContext()->addScoped( [
70            'context.job_type' => $job->getType(),
71        ] );
72
73        $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
74        $telemetry = Telemetry::getInstance();
75
76        if ( $job->getRequestId() !== null ) {
77            $telemetry->overrideRequestId( $job->getRequestId() );
78        } else {
79            $telemetry->regenerateRequestId();
80        }
81        // Clear out title cache data from prior snapshots
82        MediaWikiServices::getInstance()->getLinkCache()->clear();
83
84        // Actually execute the job
85        try {
86            $fnameTrxOwner = get_class( $job ) . '::run';
87            // Flush any pending changes left over from an implicit transaction round
88            if ( $job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
89                $lbFactory->commitPrimaryChanges( $fnameTrxOwner );
90            } else {
91                $lbFactory->beginPrimaryChanges( $fnameTrxOwner );
92            }
93            // Clear any stale REPEATABLE-READ snapshots from replica DB connections
94            $status = $job->run();
95            // Commit all pending changes from this job
96            $lbFactory->commitPrimaryChanges(
97                $fnameTrxOwner,
98                // Abort if any transaction was too big
99                $this->config()->get( 'MaxJobDBWriteDuration' )
100            );
101
102            if ( $status === false ) {
103                $message = $job->getLastError();
104                $this->logger()->error( 'Failed executing job: ' . $job->toString(), [
105                    'job_type' => $job->getType(),
106                    'error' => $message
107                ] );
108            } elseif ( !is_bool( $status ) ) {
109                $message = 'Success, but no status returned';
110                $this->logger()->warning( 'Non-boolean result returned by job: ' . $job->toString(),
111                    [
112                        'job_type' => $job->getType(),
113                        'job_result' => $status ?? 'unset'
114                    ] );
115                // For backwards compatibility with old job executor we should set the status
116                // to true here, as before anything other then boolean false was considered a success.
117                // TODO: After all the jobs are fixed to return proper result this should be removed.
118                $status = true;
119            } else {
120                $message = 'success';
121            }
122
123            // Run any deferred update tasks; doUpdates() manages transactions itself
124            DeferredUpdates::doUpdates();
125        } catch ( \Wikimedia\Rdbms\DBReadOnlyError $e ) {
126            $status = false;
127            $isReadonly = true;
128            $message = 'Database is in read-only mode';
129            MWExceptionHandler::rollbackPrimaryChangesAndLog( $e, MWExceptionHandler::CAUGHT_BY_ENTRYPOINT );
130        } catch ( Exception $e ) {
131            MWExceptionHandler::rollbackPrimaryChangesAndLog( $e, MWExceptionHandler::CAUGHT_BY_ENTRYPOINT );
132            $status = false;
133            $message = 'Exception executing job: '
134                . $job->toString() . ' : '
135                . get_class( $e ) . ': ' . $e->getMessage();
136            $this->logger()->error( $message,
137                [
138                    'job_type'  => $job->getType(),
139                    'exception' => $e
140                ]
141            );
142        }
143
144        // Always attempt to call teardown() even if Job throws exception.
145        try {
146            $job->teardown( $status );
147        } catch ( Exception $e ) {
148            $message = 'Exception tearing down job: '
149                . $job->toString() . ' : '
150                . get_class( $e ) . ': ' . $e->getMessage();
151            $this->logger()->error( $message,
152                [
153                    'job_type'  => $job->getType(),
154                    'exception' => $e
155                ]
156            );
157        }
158
159        // The JobRunner at this point makes some cleanups to prepare for
160        // the next Job execution. However since we run one job at a time
161        // we don't need that.
162
163        // Report pure job execution timing
164        $jobDuration = microtime( true ) - $startTime;
165        self::stats()->getTiming( 'jobexecutor_exec_runtime_seconds' )
166                ->setLabel( 'type', $job->getType() )
167                ->copyToStatsdAt( "jobexecutor.{$job->getType()}.exec" )
168                ->observeSeconds( $jobDuration );
169        $this->logger()->info( 'Finished job execution',
170            [
171                'job' => $job->toString(),
172                'job_type' => $job->getType(),
173                'job_status' => $status,
174                'job_duration' => $jobDuration
175            ]
176        );
177
178        $span->setSpanStatus( $status ? SpanInterface::SPAN_STATUS_OK : SpanInterface::SPAN_STATUS_ERROR );
179
180        if ( !$job->allowRetries() ) {
181            // Report success if the job doesn't allow retries
182            // even if actually the job has failed.
183            $status = true;
184        }
185
186        return [
187            'status'   => $status,
188            'readonly' => $isReadonly,
189            'message'  => $message
190        ];
191    }
192
193    /**
194     * @param array $jobEvent containing the job EventBus event.
195     * @return array containing the Job, status and potentially error message
196     */
197    private function getJobFromParams( $jobEvent ) {
198        if ( !isset( $jobEvent['type'] ) ) {
199            return [
200                'status'  => false,
201                'message' => 'Job event type is not defined'
202            ];
203        }
204
205        $jobType = $jobEvent['type'];
206        $params = $jobEvent['params'];
207
208        try {
209            $jobFactory = MediaWikiServices::getInstance()->getJobFactory();
210            $job = $jobFactory->newJob( $jobType, $params );
211        } catch ( Exception $e ) {
212            return [
213                'status'  => false,
214                'message' => $e->getMessage()
215            ];
216        }
217
218        // @phan-suppress-next-line PhanImpossibleTypeComparison
219        if ( $job === null ) {
220            return [
221                'status'  => false,
222                'message' => 'Could not create a job from event'
223            ];
224        }
225
226        return [
227            'status' => true,
228            'job'    => $job
229        ];
230    }
231
232    /**
233     * Returns a singleton logger instance for all JobExecutor instances.
234     * Use like: self::logger()->info( $message )
235     * We use this so we don't have to check if the logger has been created
236     * before attempting to log a message.
237     * @return LoggerInterface
238     */
239    private static function logger() {
240        if ( !self::$logger ) {
241            self::$logger = LoggerFactory::getInstance( 'JobExecutor' );
242        }
243        return self::$logger;
244    }
245
246    /**
247     * Returns a singleton config instance for all JobExecutor instances.
248     * Use like: self::config()->get( 'SomeConfigParameter' )
249     * @return Config
250     */
251    private static function config() {
252        if ( !self::$config ) {
253            self::$config = MediaWikiServices::getInstance()->getMainConfig();
254        }
255        return self::$config;
256    }
257
258    /**
259     * Returns a singleton stats reporter instance for all JobExecutor instances.
260     * Use like self::stats()->getGauge()
261     * @return StatsFactory
262     */
263    private static function stats() {
264        if ( !self::$statsFactory ) {
265            self::$statsFactory = MediaWikiServices::getInstance()->getStatsFactory();
266        }
267        return self::$statsFactory;
268    }
269}