Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 126 |
|
0.00% |
0 / 5 |
CRAP | |
0.00% |
0 / 1 |
JobExecutor | |
0.00% |
0 / 126 |
|
0.00% |
0 / 5 |
420 | |
0.00% |
0 / 1 |
execute | |
0.00% |
0 / 94 |
|
0.00% |
0 / 1 |
110 | |||
getJobFromParams | |
0.00% |
0 / 23 |
|
0.00% |
0 / 1 |
20 | |||
logger | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
config | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
stats | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 |
1 | <?php |
2 | |
3 | /** |
4 | * A massively simplified JobRunner with a solo purpose of |
5 | * executing a Job |
6 | */ |
7 | |
8 | namespace MediaWiki\Extension\EventBus; |
9 | |
10 | use Exception; |
11 | use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface; |
12 | use MediaWiki\Config\Config; |
13 | use MediaWiki\Deferred\DeferredUpdates; |
14 | use MediaWiki\Http\Telemetry; |
15 | use MediaWiki\Logger\LoggerFactory; |
16 | use MediaWiki\MediaWikiServices; |
17 | use MWExceptionHandler; |
18 | use Psr\Log\LoggerInterface; |
19 | |
20 | class JobExecutor { |
21 | |
22 | /** @var LoggerInterface instance for all JobExecutor instances */ |
23 | private static $logger; |
24 | |
25 | /** @var StatsdDataFactoryInterface instance |
26 | * for all JobExecutor instances |
27 | */ |
28 | private static $stats; |
29 | |
30 | /** |
31 | * @var Config a references to the wiki config |
32 | */ |
33 | private static $config; |
34 | |
35 | /** |
36 | * @param array $jobEvent the job event |
37 | * @return array containing the response status and the message |
38 | */ |
39 | public function execute( $jobEvent ) { |
40 | $startTime = microtime( true ); |
41 | $isReadonly = false; |
42 | $jobCreateResult = $this->getJobFromParams( $jobEvent ); |
43 | |
44 | if ( !$jobCreateResult['status'] ) { |
45 | $this->logger()->error( 'Failed creating job from description', [ |
46 | 'job_type' => $jobEvent['type'], |
47 | 'message' => $jobCreateResult['message'] |
48 | ] ); |
49 | $jobCreateResult['readonly'] = false; |
50 | return $jobCreateResult; |
51 | } |
52 | |
53 | $job = $jobCreateResult['job']; |
54 | $this->logger()->debug( 'Beginning job execution', [ |
55 | 'job' => $job->toString(), |
56 | 'job_type' => $job->getType() |
57 | ] ); |
58 | |
59 | $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); |
60 | $telemetry = Telemetry::getInstance(); |
61 | |
62 | if ( $job->getRequestId() !== null ) { |
63 | $telemetry->overrideRequestId( $job->getRequestId() ); |
64 | } else { |
65 | $telemetry->regenerateRequestId(); |
66 | } |
67 | // Clear out title cache data from prior snapshots |
68 | MediaWikiServices::getInstance()->getLinkCache()->clear(); |
69 | |
70 | // Actually execute the job |
71 | try { |
72 | $fnameTrxOwner = get_class( $job ) . '::run'; |
73 | // Flush any pending changes left over from an implicit transaction round |
74 | if ( $job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) { |
75 | $lbFactory->commitPrimaryChanges( $fnameTrxOwner ); |
76 | } else { |
77 | $lbFactory->beginPrimaryChanges( $fnameTrxOwner ); |
78 | } |
79 | // Clear any stale REPEATABLE-READ snapshots from replica DB connections |
80 | $status = $job->run(); |
81 | // Commit all pending changes from this job |
82 | $lbFactory->commitPrimaryChanges( |
83 | $fnameTrxOwner, |
84 | // Abort if any transaction was too big |
85 | $this->config()->get( 'MaxJobDBWriteDuration' ) |
86 | ); |
87 | |
88 | if ( $status === false ) { |
89 | $message = $job->getLastError(); |
90 | $this->logger()->error( 'Failed executing job: ' . $job->toString(), [ |
91 | 'job_type' => $job->getType(), |
92 | 'error' => $message |
93 | ] ); |
94 | } elseif ( !is_bool( $status ) ) { |
95 | $message = 'Success, but no status returned'; |
96 | $this->logger()->warning( 'Non-boolean result returned by job: ' . $job->toString(), |
97 | [ |
98 | 'job_type' => $job->getType(), |
99 | 'job_result' => $status ?? 'unset' |
100 | ] ); |
101 | // For backwards compatibility with old job executor we should set the status |
102 | // to true here, as before anything other then boolean false was considered a success. |
103 | // TODO: After all the jobs are fixed to return proper result this should be removed. |
104 | $status = true; |
105 | } else { |
106 | $message = 'success'; |
107 | } |
108 | |
109 | // Run any deferred update tasks; doUpdates() manages transactions itself |
110 | DeferredUpdates::doUpdates(); |
111 | } catch ( \Wikimedia\Rdbms\DBReadOnlyError $e ) { |
112 | $status = false; |
113 | $isReadonly = true; |
114 | $message = 'Database is in read-only mode'; |
115 | MWExceptionHandler::rollbackPrimaryChangesAndLog( $e ); |
116 | } catch ( Exception $e ) { |
117 | MWExceptionHandler::rollbackPrimaryChangesAndLog( $e ); |
118 | $status = false; |
119 | $message = 'Exception executing job: ' |
120 | . $job->toString() . ' : ' |
121 | . get_class( $e ) . ': ' . $e->getMessage(); |
122 | $this->logger()->error( $message, |
123 | [ |
124 | 'job_type' => $job->getType(), |
125 | 'exception' => $e |
126 | ] |
127 | ); |
128 | } |
129 | |
130 | // Always attempt to call teardown() even if Job throws exception. |
131 | try { |
132 | $job->teardown( $status ); |
133 | } catch ( Exception $e ) { |
134 | $message = 'Exception tearing down job: ' |
135 | . $job->toString() . ' : ' |
136 | . get_class( $e ) . ': ' . $e->getMessage(); |
137 | $this->logger()->error( $message, |
138 | [ |
139 | 'job_type' => $job->getType(), |
140 | 'exception' => $e |
141 | ] |
142 | ); |
143 | } |
144 | |
145 | // The JobRunner at this point makes some cleanups to prepare for |
146 | // the next Job execution. However since we run one job at a time |
147 | // we don't need that. |
148 | |
149 | // Report pure job execution timing |
150 | $jobDuration = microtime( true ) - $startTime; |
151 | self::stats()->timing( |
152 | "jobexecutor.{$job->getType()}.exec", |
153 | $jobDuration |
154 | ); |
155 | $this->logger()->info( 'Finished job execution', |
156 | [ |
157 | 'job' => $job->toString(), |
158 | 'job_type' => $job->getType(), |
159 | 'job_status' => $status, |
160 | 'job_duration' => $jobDuration |
161 | ] |
162 | ); |
163 | |
164 | if ( !$job->allowRetries() ) { |
165 | // Report success if the job doesn't allow retries |
166 | // even if actually the job has failed. |
167 | $status = true; |
168 | } |
169 | |
170 | return [ |
171 | 'status' => $status, |
172 | 'readonly' => $isReadonly, |
173 | 'message' => $message |
174 | ]; |
175 | } |
176 | |
177 | /** |
178 | * @param array $jobEvent containing the job EventBus event. |
179 | * @return array containing the Job, status and potentially error message |
180 | */ |
181 | private function getJobFromParams( $jobEvent ) { |
182 | if ( !isset( $jobEvent['type'] ) ) { |
183 | return [ |
184 | 'status' => false, |
185 | 'message' => 'Job event type is not defined' |
186 | ]; |
187 | } |
188 | |
189 | $jobType = $jobEvent['type']; |
190 | $params = $jobEvent['params']; |
191 | |
192 | try { |
193 | $jobFactory = MediaWikiServices::getInstance()->getJobFactory(); |
194 | $job = $jobFactory->newJob( $jobType, $params ); |
195 | } catch ( Exception $e ) { |
196 | return [ |
197 | 'status' => false, |
198 | 'message' => $e->getMessage() |
199 | ]; |
200 | } |
201 | |
202 | // @phan-suppress-next-line PhanImpossibleTypeComparison |
203 | if ( $job === null ) { |
204 | return [ |
205 | 'status' => false, |
206 | 'message' => 'Could not create a job from event' |
207 | ]; |
208 | } |
209 | |
210 | return [ |
211 | 'status' => true, |
212 | 'job' => $job |
213 | ]; |
214 | } |
215 | |
216 | /** |
217 | * Returns a singleton logger instance for all JobExecutor instances. |
218 | * Use like: self::logger()->info( $message ) |
219 | * We use this so we don't have to check if the logger has been created |
220 | * before attempting to log a message. |
221 | * @return LoggerInterface |
222 | */ |
223 | private static function logger() { |
224 | if ( !self::$logger ) { |
225 | self::$logger = LoggerFactory::getInstance( 'JobExecutor' ); |
226 | } |
227 | return self::$logger; |
228 | } |
229 | |
230 | /** |
231 | * Returns a singleton config instance for all JobExecutor instances. |
232 | * Use like: self::config()->get( 'SomeConfigParameter' ) |
233 | * @return Config |
234 | */ |
235 | private static function config() { |
236 | if ( !self::$config ) { |
237 | self::$config = MediaWikiServices::getInstance()->getMainConfig(); |
238 | } |
239 | return self::$config; |
240 | } |
241 | |
242 | /** |
243 | * Returns a singleton stats reporter instance for all JobExecutor instances. |
244 | * Use like self::stats()->increment( $key ) |
245 | * @return StatsdDataFactoryInterface |
246 | */ |
247 | private static function stats() { |
248 | if ( !self::$stats ) { |
249 | self::$stats = MediaWikiServices::getInstance()->getStatsdDataFactory(); |
250 | } |
251 | return self::$stats; |
252 | } |
253 | } |