Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 188 |
|
0.00% |
0 / 6 |
CRAP | |
0.00% |
0 / 1 |
CheckerJob | |
0.00% |
0 / 188 |
|
0.00% |
0 / 6 |
1190 | |
0.00% |
0 / 1 |
build | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
2 | |||
__construct | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
20 | |||
doJob | |
0.00% |
0 / 121 |
|
0.00% |
0 / 1 |
462 | |||
getPressure | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
6 | |||
allowRetries | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
retry | |
0.00% |
0 / 33 |
|
0.00% |
0 / 1 |
30 |
1 | <?php |
2 | |
3 | namespace CirrusSearch\Job; |
4 | |
5 | use ArrayObject; |
6 | use CirrusSearch\Profile\SearchProfileService; |
7 | use CirrusSearch\Sanity\AllClustersQueueingRemediator; |
8 | use CirrusSearch\Sanity\BufferedRemediator; |
9 | use CirrusSearch\Sanity\Checker; |
10 | use CirrusSearch\Sanity\CheckerException; |
11 | use CirrusSearch\Sanity\LogOnlyRemediator; |
12 | use CirrusSearch\Sanity\MultiClusterRemediatorHelper; |
13 | use CirrusSearch\Sanity\QueueingRemediator; |
14 | use CirrusSearch\Searcher; |
15 | use CirrusSearch\UpdateGroup; |
16 | use CirrusSearch\Util; |
17 | use MediaWiki\Logger\LoggerFactory; |
18 | use MediaWiki\MediaWikiServices; |
19 | |
20 | /** |
21 | * Job wrapper around Sanity\Checker |
22 | * |
23 | * This program is free software; you can redistribute it and/or modify |
24 | * it under the terms of the GNU General Public License as published by |
25 | * the Free Software Foundation; either version 2 of the License, or |
26 | * (at your option) any later version. |
27 | * |
28 | * This program is distributed in the hope that it will be useful, |
29 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
30 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
31 | * GNU General Public License for more details. |
32 | * |
33 | * You should have received a copy of the GNU General Public License along |
34 | * with this program; if not, write to the Free Software Foundation, Inc., |
35 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
36 | * http://www.gnu.org/copyleft/gpl.html |
37 | */ |
38 | class CheckerJob extends CirrusGenericJob { |
39 | /** |
40 | * @const int max number of retries, 3 means that the job can be run at |
41 | * most 4 times. |
42 | */ |
43 | private const JOB_MAX_RETRIES = 3; |
44 | |
45 | /** |
46 | * Construct a new CherckerJob. |
47 | * @param int $fromPageId |
48 | * @param int $toPageId |
49 | * @param int $delay |
50 | * @param string $profile sanitization profile to use |
51 | * @param string|null $cluster |
52 | * @param int $loopId The number of times the checker jobs have looped |
53 | * over the pages to be checked. |
54 | * @param \JobQueueGroup $jobQueueGroup |
55 | * @return CheckerJob |
56 | */ |
57 | public static function build( $fromPageId, $toPageId, $delay, $profile, $cluster, $loopId, \JobQueueGroup $jobQueueGroup ) { |
58 | $job = new self( [ |
59 | 'fromPageId' => $fromPageId, |
60 | 'toPageId' => $toPageId, |
61 | 'createdAt' => time(), |
62 | 'retryCount' => 0, |
63 | 'profile' => $profile, |
64 | 'cluster' => $cluster, |
65 | 'loopId' => $loopId, |
66 | ] + self::buildJobDelayOptions( self::class, $delay, $jobQueueGroup ) ); |
67 | return $job; |
68 | } |
69 | |
70 | /** |
71 | * @param array $params |
72 | */ |
73 | public function __construct( array $params ) { |
74 | // BC for jobs created before id fields were clarified to be explicitly page id's |
75 | if ( isset( $params['fromId'] ) ) { |
76 | $params['fromPageId'] = $params['fromId']; |
77 | unset( $params['fromId'] ); |
78 | } |
79 | if ( isset( $params['toId'] ) ) { |
80 | $params['toPageId'] = $params['toId']; |
81 | unset( $params['toId'] ); |
82 | } |
83 | // BC for jobs created before loop id existed |
84 | if ( !isset( $params['loopId'] ) ) { |
85 | $params['loopId'] = 0; |
86 | } |
87 | parent::__construct( $params ); |
88 | } |
89 | |
90 | /** |
91 | * @return bool |
92 | */ |
93 | protected function doJob() { |
94 | $profile = $this->searchConfig |
95 | ->getProfileService() |
96 | ->loadProfileByName( SearchProfileService::SANEITIZER, $this->params['profile'], false ); |
97 | |
98 | // First perform a set of sanity checks and return true to fake a success (to prevent retries) |
99 | // in case the job params are incorrect. These errors are generally unrecoverable. |
100 | if ( !$profile ) { |
101 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
102 | "Cannot run CheckerJob invalid profile {profile} provided, check CirrusSearchSanityCheck config.", |
103 | [ |
104 | 'profile' => $this->params['profile'] |
105 | ] |
106 | ); |
107 | return true; |
108 | } |
109 | $maxPressure = $profile['update_jobs_max_pressure'] ?? null; |
110 | if ( !$maxPressure || $maxPressure < 0 ) { |
111 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
112 | "Cannot run CheckerJob invalid update_jobs_max_pressure, check CirrusSearchSanityCheck config." |
113 | ); |
114 | return true; |
115 | } |
116 | $batchSize = $profile['checker_batch_size'] ?? null; |
117 | if ( !$batchSize || $batchSize < 0 ) { |
118 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
119 | "Cannot run CheckerJob invalid checker_batch_size, check CirrusSearchSanityCheck config." |
120 | ); |
121 | return true; |
122 | } |
123 | |
124 | $chunkSize = $profile['jobs_chunk_size'] ?? null; |
125 | if ( !$chunkSize || $chunkSize < 0 ) { |
126 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
127 | "Cannot run CheckerJob invalid jobs_chunk_size, check CirrusSearchSanityCheck config." |
128 | ); |
129 | return true; |
130 | } |
131 | |
132 | $maxTime = $profile['checker_job_max_time'] ?? null; |
133 | if ( !$maxTime || $maxTime < 0 ) { |
134 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
135 | "Cannot run CheckerJob invalid checker_job_max_time, check CirrusSearchSanityCheck config." |
136 | ); |
137 | return true; |
138 | } |
139 | |
140 | $connections = $this->decideClusters( UpdateGroup::CHECK_SANITY ); |
141 | if ( !$connections ) { |
142 | return true; |
143 | } |
144 | |
145 | $from = $this->params['fromPageId']; |
146 | $to = $this->params['toPageId']; |
147 | |
148 | if ( $from > $to ) { |
149 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
150 | "Cannot run CheckerJob: from > to ( {from} > {to} ), job is corrupted?", |
151 | [ |
152 | 'from' => $from, |
153 | 'to' => $to, |
154 | ] |
155 | ); |
156 | return true; |
157 | } |
158 | |
159 | if ( ( $to - $from ) > $chunkSize ) { |
160 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
161 | "Cannot run CheckerJob: to - from > chunkSize( {from}, {to} > {chunkSize} ), job is corrupted or profile mismatch?", |
162 | [ |
163 | 'from' => $from, |
164 | 'to' => $to, |
165 | 'chunkSize' => $chunkSize, |
166 | ] |
167 | ); |
168 | return true; |
169 | } |
170 | |
171 | $clusterNames = implode( ', ', array_keys( $connections ) ); |
172 | |
173 | LoggerFactory::getInstance( 'CirrusSearch' )->debug( |
174 | "Running CheckerJob on cluster $clusterNames {diff}s after insertion", |
175 | [ |
176 | 'diff' => time() - $this->params['createdAt'], |
177 | 'clusters' => array_keys( $connections ), |
178 | ] |
179 | ); |
180 | |
181 | $isOld = null; |
182 | $reindexAfterLoops = $profile['reindex_after_loops'] ?? null; |
183 | if ( $reindexAfterLoops ) { |
184 | $isOld = Checker::makeIsOldClosure( |
185 | $this->params['loopId'], |
186 | $reindexAfterLoops |
187 | ); |
188 | } |
189 | |
190 | $startTime = time(); |
191 | |
192 | $pageCache = new ArrayObject(); |
193 | /** |
194 | * @var Checker[] $checkers |
195 | */ |
196 | $checkers = []; |
197 | $perClusterRemediators = []; |
198 | $perClusterBufferedRemediators = []; |
199 | $logger = LoggerFactory::getInstance( 'CirrusSearch' ); |
200 | $assignment = $this->searchConfig->getClusterAssignment(); |
201 | foreach ( $connections as $cluster => $connection ) { |
202 | $searcher = new Searcher( $connection, 0, 0, $this->searchConfig, [], null ); |
203 | $remediator = $assignment->canWriteToCluster( $cluster, UpdateGroup::SANEITIZER ) |
204 | ? new QueueingRemediator( $cluster ) : new LogOnlyRemediator( $logger ); |
205 | $bufferedRemediator = new BufferedRemediator(); |
206 | $checker = new Checker( |
207 | $this->searchConfig, |
208 | $connection, |
209 | $bufferedRemediator, |
210 | $searcher, |
211 | Util::getStatsFactory(), |
212 | false, // logSane |
213 | false, // fastRedirectCheck |
214 | $pageCache, |
215 | $isOld |
216 | ); |
217 | $checkers[$cluster] = $checker; |
218 | $perClusterRemediators[$cluster] = $remediator; |
219 | $perClusterBufferedRemediators[$cluster] = $bufferedRemediator; |
220 | } |
221 | |
222 | $multiClusterRemediator = new MultiClusterRemediatorHelper( $perClusterRemediators, $perClusterBufferedRemediators, |
223 | new AllClustersQueueingRemediator( |
224 | $this->getSearchConfig()->getClusterAssignment(), |
225 | MediaWikiServices::getInstance()->getJobQueueGroup() |
226 | ) ); |
227 | |
228 | $ranges = array_chunk( range( $from, $to ), $batchSize ); |
229 | foreach ( $ranges as $pageIds ) { |
230 | if ( self::getPressure() > $maxPressure ) { |
231 | $this->retry( "too much pressure on update jobs", reset( $pageIds ) ); |
232 | return true; |
233 | } |
234 | if ( time() - $startTime > $maxTime ) { |
235 | $this->retry( "execution time exceeded checker_job_max_time", reset( $pageIds ) ); |
236 | return true; |
237 | } |
238 | $pageCache->exchangeArray( [] ); |
239 | foreach ( $checkers as $cluster => $checker ) { |
240 | try { |
241 | $checker->check( $pageIds ); |
242 | } catch ( CheckerException $checkerException ) { |
243 | $this->retry( "Failed to verify ids on $cluster: " . $checkerException->getMessage(), reset( $pageIds ), $cluster ); |
244 | unset( $checkers[$cluster] ); |
245 | } |
246 | } |
247 | $multiClusterRemediator->sendBatch(); |
248 | } |
249 | return true; |
250 | } |
251 | |
252 | /** |
253 | * @return int the total number of update jobs enqueued |
254 | */ |
255 | public static function getPressure() { |
256 | $queues = [ |
257 | 'cirrusSearchLinksUpdatePrioritized', |
258 | 'cirrusSearchLinksUpdate', |
259 | 'cirrusSearchElasticaWrite', |
260 | 'cirrusSearchOtherIndex', |
261 | 'cirrusSearchDeletePages', |
262 | ]; |
263 | $size = 0; |
264 | $jobQueueGroup = MediaWikiServices::getInstance()->getJobQueueGroup(); |
265 | foreach ( $queues as $queueName ) { |
266 | $queue = $jobQueueGroup->get( $queueName ); |
267 | $size += $queue->getSize(); |
268 | $size += $queue->getDelayedCount(); |
269 | } |
270 | |
271 | return $size; |
272 | } |
273 | |
274 | /** |
275 | * This job handles all its own retries internally. |
276 | * @return bool |
277 | */ |
278 | public function allowRetries() { |
279 | return true; |
280 | } |
281 | |
282 | /** |
283 | * Retry the job later with a new from offset |
284 | * @param string $cause why we retry |
285 | * @param int $newFrom the new from offset |
286 | * @param string|null $cluster Cluster job is for |
287 | */ |
288 | private function retry( $cause, $newFrom, $cluster = null ) { |
289 | if ( $this->params['retryCount'] >= self::JOB_MAX_RETRIES ) { |
290 | LoggerFactory::getInstance( 'CirrusSearch' )->info( |
291 | "Sanitize CheckerJob: $cause ({fromPageId}:{toPageId}), Abandonning CheckerJob after {retries} retries " . |
292 | "for {cluster}, (jobs_chunk_size too high?).", |
293 | [ |
294 | 'retries' => $this->params['retryCount'], |
295 | 'fromPageId' => $this->params['fromPageId'], |
296 | 'toPageId' => $this->params['toPageId'], |
297 | 'cluster' => $cluster ?: 'all clusters' |
298 | ] |
299 | ); |
300 | return; |
301 | } |
302 | |
303 | $delay = $this->backoffDelay( $this->params['retryCount'] ); |
304 | $params = $this->params; |
305 | if ( $cluster !== null ) { |
306 | $params['cluster'] = $cluster; |
307 | } |
308 | $params['retryCount']++; |
309 | $params['fromPageId'] = $newFrom; |
310 | unset( $params['jobReleaseTimestamp'] ); |
311 | $jobQueue = MediaWikiServices::getInstance()->getJobQueueGroup(); |
312 | $params += self::buildJobDelayOptions( self::class, $delay, $jobQueue ); |
313 | $job = new self( $params ); |
314 | LoggerFactory::getInstance( 'CirrusSearch' )->info( |
315 | "Sanitize CheckerJob: $cause ({fromPageId}:{toPageId}), Requeueing CheckerJob " . |
316 | "for {cluster} with a delay of {delay}s.", |
317 | [ |
318 | 'delay' => $delay, |
319 | 'fromPageId' => $job->params['fromPageId'], |
320 | 'toPageId' => $job->params['toPageId'], |
321 | 'cluster' => $cluster ?: 'all clusters' |
322 | ] |
323 | ); |
324 | $jobQueue->push( $job ); |
325 | } |
326 | } |