Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 126
0.00% covered (danger)
0.00%
0 / 5
CRAP
0.00% covered (danger)
0.00%
0 / 1
JobExecutor
0.00% covered (danger)
0.00%
0 / 126
0.00% covered (danger)
0.00%
0 / 5
420
0.00% covered (danger)
0.00%
0 / 1
 execute
0.00% covered (danger)
0.00%
0 / 94
0.00% covered (danger)
0.00%
0 / 1
110
 getJobFromParams
0.00% covered (danger)
0.00%
0 / 23
0.00% covered (danger)
0.00%
0 / 1
20
 logger
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 config
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 stats
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2
3/**
4 * A massively simplified JobRunner with a solo purpose of
5 * executing a Job
6 */
7
8namespace MediaWiki\Extension\EventBus;
9
10use Exception;
11use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
12use MediaWiki\Config\Config;
13use MediaWiki\Deferred\DeferredUpdates;
14use MediaWiki\Http\Telemetry;
15use MediaWiki\Logger\LoggerFactory;
16use MediaWiki\MediaWikiServices;
17use MWExceptionHandler;
18use Psr\Log\LoggerInterface;
19
20class JobExecutor {
21
22    /** @var LoggerInterface instance for all JobExecutor instances */
23    private static $logger;
24
25    /** @var StatsdDataFactoryInterface instance
26     * for all JobExecutor instances
27     */
28    private static $stats;
29
30    /**
31     * @var Config a references to the wiki config
32     */
33    private static $config;
34
35    /**
36     * @param array $jobEvent the job event
37     * @return array containing the response status and the message
38     */
39    public function execute( $jobEvent ) {
40        $startTime = microtime( true );
41        $isReadonly = false;
42        $jobCreateResult = $this->getJobFromParams( $jobEvent );
43
44        if ( !$jobCreateResult['status'] ) {
45            $this->logger()->error( 'Failed creating job from description', [
46                'job_type' => $jobEvent['type'],
47                'message' => $jobCreateResult['message']
48            ] );
49            $jobCreateResult['readonly'] = false;
50            return $jobCreateResult;
51        }
52
53        $job = $jobCreateResult['job'];
54        $this->logger()->debug( 'Beginning job execution', [
55            'job' => $job->toString(),
56            'job_type' => $job->getType()
57        ] );
58
59        $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
60        $telemetry = Telemetry::getInstance();
61
62        if ( $job->getRequestId() !== null ) {
63            $telemetry->overrideRequestId( $job->getRequestId() );
64        } else {
65            $telemetry->regenerateRequestId();
66        }
67        // Clear out title cache data from prior snapshots
68        MediaWikiServices::getInstance()->getLinkCache()->clear();
69
70        // Actually execute the job
71        try {
72            $fnameTrxOwner = get_class( $job ) . '::run';
73            // Flush any pending changes left over from an implicit transaction round
74            if ( $job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
75                $lbFactory->commitPrimaryChanges( $fnameTrxOwner );
76            } else {
77                $lbFactory->beginPrimaryChanges( $fnameTrxOwner );
78            }
79            // Clear any stale REPEATABLE-READ snapshots from replica DB connections
80            $status = $job->run();
81            // Commit all pending changes from this job
82            $lbFactory->commitPrimaryChanges(
83                $fnameTrxOwner,
84                // Abort if any transaction was too big
85                $this->config()->get( 'MaxJobDBWriteDuration' )
86            );
87
88            if ( $status === false ) {
89                $message = $job->getLastError();
90                $this->logger()->error( 'Failed executing job: ' . $job->toString(), [
91                    'job_type' => $job->getType(),
92                    'error' => $message
93                ] );
94            } elseif ( !is_bool( $status ) ) {
95                $message = 'Success, but no status returned';
96                $this->logger()->warning( 'Non-boolean result returned by job: ' . $job->toString(),
97                    [
98                        'job_type' => $job->getType(),
99                        'job_result' => $status ?? 'unset'
100                    ] );
101                // For backwards compatibility with old job executor we should set the status
102                // to true here, as before anything other then boolean false was considered a success.
103                // TODO: After all the jobs are fixed to return proper result this should be removed.
104                $status = true;
105            } else {
106                $message = 'success';
107            }
108
109            // Run any deferred update tasks; doUpdates() manages transactions itself
110            DeferredUpdates::doUpdates();
111        } catch ( \Wikimedia\Rdbms\DBReadOnlyError $e ) {
112            $status = false;
113            $isReadonly = true;
114            $message = 'Database is in read-only mode';
115            MWExceptionHandler::rollbackPrimaryChangesAndLog( $e );
116        } catch ( Exception $e ) {
117            MWExceptionHandler::rollbackPrimaryChangesAndLog( $e );
118            $status = false;
119            $message = 'Exception executing job: '
120                . $job->toString() . ' : '
121                . get_class( $e ) . ': ' . $e->getMessage();
122            $this->logger()->error( $message,
123                [
124                    'job_type'  => $job->getType(),
125                    'exception' => $e
126                ]
127            );
128        }
129
130        // Always attempt to call teardown() even if Job throws exception.
131        try {
132            $job->teardown( $status );
133        } catch ( Exception $e ) {
134            $message = 'Exception tearing down job: '
135                . $job->toString() . ' : '
136                . get_class( $e ) . ': ' . $e->getMessage();
137            $this->logger()->error( $message,
138                [
139                    'job_type'  => $job->getType(),
140                    'exception' => $e
141                ]
142            );
143        }
144
145        // The JobRunner at this point makes some cleanups to prepare for
146        // the next Job execution. However since we run one job at a time
147        // we don't need that.
148
149        // Report pure job execution timing
150        $jobDuration = microtime( true ) - $startTime;
151        self::stats()->timing(
152            "jobexecutor.{$job->getType()}.exec",
153            $jobDuration
154        );
155        $this->logger()->info( 'Finished job execution',
156            [
157                'job' => $job->toString(),
158                'job_type' => $job->getType(),
159                'job_status' => $status,
160                'job_duration' => $jobDuration
161            ]
162        );
163
164        if ( !$job->allowRetries() ) {
165            // Report success if the job doesn't allow retries
166            // even if actually the job has failed.
167            $status = true;
168        }
169
170        return [
171            'status'   => $status,
172            'readonly' => $isReadonly,
173            'message'  => $message
174        ];
175    }
176
177    /**
178     * @param array $jobEvent containing the job EventBus event.
179     * @return array containing the Job, status and potentially error message
180     */
181    private function getJobFromParams( $jobEvent ) {
182        if ( !isset( $jobEvent['type'] ) ) {
183            return [
184                'status'  => false,
185                'message' => 'Job event type is not defined'
186            ];
187        }
188
189        $jobType = $jobEvent['type'];
190        $params = $jobEvent['params'];
191
192        try {
193            $jobFactory = MediaWikiServices::getInstance()->getJobFactory();
194            $job = $jobFactory->newJob( $jobType, $params );
195        } catch ( Exception $e ) {
196            return [
197                'status'  => false,
198                'message' => $e->getMessage()
199            ];
200        }
201
202        // @phan-suppress-next-line PhanImpossibleTypeComparison
203        if ( $job === null ) {
204            return [
205                'status'  => false,
206                'message' => 'Could not create a job from event'
207            ];
208        }
209
210        return [
211            'status' => true,
212            'job'    => $job
213        ];
214    }
215
216    /**
217     * Returns a singleton logger instance for all JobExecutor instances.
218     * Use like: self::logger()->info( $message )
219     * We use this so we don't have to check if the logger has been created
220     * before attempting to log a message.
221     * @return LoggerInterface
222     */
223    private static function logger() {
224        if ( !self::$logger ) {
225            self::$logger = LoggerFactory::getInstance( 'JobExecutor' );
226        }
227        return self::$logger;
228    }
229
230    /**
231     * Returns a singleton config instance for all JobExecutor instances.
232     * Use like: self::config()->get( 'SomeConfigParameter' )
233     * @return Config
234     */
235    private static function config() {
236        if ( !self::$config ) {
237            self::$config = MediaWikiServices::getInstance()->getMainConfig();
238        }
239        return self::$config;
240    }
241
242    /**
243     * Returns a singleton stats reporter instance for all JobExecutor instances.
244     * Use like self::stats()->increment( $key )
245     * @return StatsdDataFactoryInterface
246     */
247    private static function stats() {
248        if ( !self::$stats ) {
249            self::$stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
250        }
251        return self::$stats;
252    }
253}