Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 187 |
|
0.00% |
0 / 6 |
CRAP | |
0.00% |
0 / 1 |
CheckerJob | |
0.00% |
0 / 187 |
|
0.00% |
0 / 6 |
1190 | |
0.00% |
0 / 1 |
build | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
2 | |||
__construct | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
20 | |||
doJob | |
0.00% |
0 / 120 |
|
0.00% |
0 / 1 |
462 | |||
getPressure | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
6 | |||
allowRetries | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
retry | |
0.00% |
0 / 33 |
|
0.00% |
0 / 1 |
30 |
1 | <?php |
2 | |
3 | namespace CirrusSearch\Job; |
4 | |
5 | use ArrayObject; |
6 | use CirrusSearch\Profile\SearchProfileService; |
7 | use CirrusSearch\Sanity\AllClustersQueueingRemediator; |
8 | use CirrusSearch\Sanity\BufferedRemediator; |
9 | use CirrusSearch\Sanity\Checker; |
10 | use CirrusSearch\Sanity\CheckerException; |
11 | use CirrusSearch\Sanity\LogOnlyRemediator; |
12 | use CirrusSearch\Sanity\MultiClusterRemediatorHelper; |
13 | use CirrusSearch\Sanity\QueueingRemediator; |
14 | use CirrusSearch\Searcher; |
15 | use CirrusSearch\UpdateGroup; |
16 | use MediaWiki\Logger\LoggerFactory; |
17 | use MediaWiki\MediaWikiServices; |
18 | |
19 | /** |
20 | * Job wrapper around Sanity\Checker |
21 | * |
22 | * This program is free software; you can redistribute it and/or modify |
23 | * it under the terms of the GNU General Public License as published by |
24 | * the Free Software Foundation; either version 2 of the License, or |
25 | * (at your option) any later version. |
26 | * |
27 | * This program is distributed in the hope that it will be useful, |
28 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
29 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
30 | * GNU General Public License for more details. |
31 | * |
32 | * You should have received a copy of the GNU General Public License along |
33 | * with this program; if not, write to the Free Software Foundation, Inc., |
34 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
35 | * http://www.gnu.org/copyleft/gpl.html |
36 | */ |
37 | class CheckerJob extends CirrusGenericJob { |
38 | /** |
39 | * @const int max number of retries, 3 means that the job can be run at |
40 | * most 4 times. |
41 | */ |
42 | private const JOB_MAX_RETRIES = 3; |
43 | |
44 | /** |
45 | * Construct a new CherckerJob. |
46 | * @param int $fromPageId |
47 | * @param int $toPageId |
48 | * @param int $delay |
49 | * @param string $profile sanitization profile to use |
50 | * @param string|null $cluster |
51 | * @param int $loopId The number of times the checker jobs have looped |
52 | * over the pages to be checked. |
53 | * @param \JobQueueGroup $jobQueueGroup |
54 | * @return CheckerJob |
55 | */ |
56 | public static function build( $fromPageId, $toPageId, $delay, $profile, $cluster, $loopId, \JobQueueGroup $jobQueueGroup ) { |
57 | $job = new self( [ |
58 | 'fromPageId' => $fromPageId, |
59 | 'toPageId' => $toPageId, |
60 | 'createdAt' => time(), |
61 | 'retryCount' => 0, |
62 | 'profile' => $profile, |
63 | 'cluster' => $cluster, |
64 | 'loopId' => $loopId, |
65 | ] + self::buildJobDelayOptions( self::class, $delay, $jobQueueGroup ) ); |
66 | return $job; |
67 | } |
68 | |
69 | /** |
70 | * @param array $params |
71 | */ |
72 | public function __construct( array $params ) { |
73 | // BC for jobs created before id fields were clarified to be explicitly page id's |
74 | if ( isset( $params['fromId'] ) ) { |
75 | $params['fromPageId'] = $params['fromId']; |
76 | unset( $params['fromId'] ); |
77 | } |
78 | if ( isset( $params['toId'] ) ) { |
79 | $params['toPageId'] = $params['toId']; |
80 | unset( $params['toId'] ); |
81 | } |
82 | // BC for jobs created before loop id existed |
83 | if ( !isset( $params['loopId'] ) ) { |
84 | $params['loopId'] = 0; |
85 | } |
86 | parent::__construct( $params ); |
87 | } |
88 | |
89 | /** |
90 | * @return bool |
91 | */ |
92 | protected function doJob() { |
93 | $profile = $this->searchConfig |
94 | ->getProfileService() |
95 | ->loadProfileByName( SearchProfileService::SANEITIZER, $this->params['profile'], false ); |
96 | |
97 | // First perform a set of sanity checks and return true to fake a success (to prevent retries) |
98 | // in case the job params are incorrect. These errors are generally unrecoverable. |
99 | if ( !$profile ) { |
100 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
101 | "Cannot run CheckerJob invalid profile {profile} provided, check CirrusSearchSanityCheck config.", |
102 | [ |
103 | 'profile' => $this->params['profile'] |
104 | ] |
105 | ); |
106 | return true; |
107 | } |
108 | $maxPressure = $profile['update_jobs_max_pressure'] ?? null; |
109 | if ( !$maxPressure || $maxPressure < 0 ) { |
110 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
111 | "Cannot run CheckerJob invalid update_jobs_max_pressure, check CirrusSearchSanityCheck config." |
112 | ); |
113 | return true; |
114 | } |
115 | $batchSize = $profile['checker_batch_size'] ?? null; |
116 | if ( !$batchSize || $batchSize < 0 ) { |
117 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
118 | "Cannot run CheckerJob invalid checker_batch_size, check CirrusSearchSanityCheck config." |
119 | ); |
120 | return true; |
121 | } |
122 | |
123 | $chunkSize = $profile['jobs_chunk_size'] ?? null; |
124 | if ( !$chunkSize || $chunkSize < 0 ) { |
125 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
126 | "Cannot run CheckerJob invalid jobs_chunk_size, check CirrusSearchSanityCheck config." |
127 | ); |
128 | return true; |
129 | } |
130 | |
131 | $maxTime = $profile['checker_job_max_time'] ?? null; |
132 | if ( !$maxTime || $maxTime < 0 ) { |
133 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
134 | "Cannot run CheckerJob invalid checker_job_max_time, check CirrusSearchSanityCheck config." |
135 | ); |
136 | return true; |
137 | } |
138 | |
139 | $connections = $this->decideClusters( UpdateGroup::CHECK_SANITY ); |
140 | if ( !$connections ) { |
141 | return true; |
142 | } |
143 | |
144 | $from = $this->params['fromPageId']; |
145 | $to = $this->params['toPageId']; |
146 | |
147 | if ( $from > $to ) { |
148 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
149 | "Cannot run CheckerJob: from > to ( {from} > {to} ), job is corrupted?", |
150 | [ |
151 | 'from' => $from, |
152 | 'to' => $to, |
153 | ] |
154 | ); |
155 | return true; |
156 | } |
157 | |
158 | if ( ( $to - $from ) > $chunkSize ) { |
159 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
160 | "Cannot run CheckerJob: to - from > chunkSize( {from}, {to} > {chunkSize} ), job is corrupted or profile mismatch?", |
161 | [ |
162 | 'from' => $from, |
163 | 'to' => $to, |
164 | 'chunkSize' => $chunkSize, |
165 | ] |
166 | ); |
167 | return true; |
168 | } |
169 | |
170 | $clusterNames = implode( ', ', array_keys( $connections ) ); |
171 | |
172 | LoggerFactory::getInstance( 'CirrusSearch' )->debug( |
173 | "Running CheckerJob on cluster $clusterNames {diff}s after insertion", |
174 | [ |
175 | 'diff' => time() - $this->params['createdAt'], |
176 | 'clusters' => array_keys( $connections ), |
177 | ] |
178 | ); |
179 | |
180 | $isOld = null; |
181 | $reindexAfterLoops = $profile['reindex_after_loops'] ?? null; |
182 | if ( $reindexAfterLoops ) { |
183 | $isOld = Checker::makeIsOldClosure( |
184 | $this->params['loopId'], |
185 | $reindexAfterLoops |
186 | ); |
187 | } |
188 | |
189 | $startTime = time(); |
190 | |
191 | $pageCache = new ArrayObject(); |
192 | /** |
193 | * @var Checker[] $checkers |
194 | */ |
195 | $checkers = []; |
196 | $perClusterRemediators = []; |
197 | $perClusterBufferedRemediators = []; |
198 | $logger = LoggerFactory::getInstance( 'CirrusSearch' ); |
199 | $assignment = $this->searchConfig->getClusterAssignment(); |
200 | foreach ( $connections as $cluster => $connection ) { |
201 | $searcher = new Searcher( $connection, 0, 0, $this->searchConfig, [], null ); |
202 | $remediator = $assignment->canWriteToCluster( $cluster, UpdateGroup::SANEITIZER ) |
203 | ? new QueueingRemediator( $cluster ) : new LogOnlyRemediator( $logger ); |
204 | $bufferedRemediator = new BufferedRemediator(); |
205 | $checker = new Checker( |
206 | $this->searchConfig, |
207 | $connection, |
208 | $bufferedRemediator, |
209 | $searcher, |
210 | false, // logSane |
211 | false, // fastRedirectCheck |
212 | $pageCache, |
213 | $isOld |
214 | ); |
215 | $checkers[$cluster] = $checker; |
216 | $perClusterRemediators[$cluster] = $remediator; |
217 | $perClusterBufferedRemediators[$cluster] = $bufferedRemediator; |
218 | } |
219 | |
220 | $multiClusterRemediator = new MultiClusterRemediatorHelper( $perClusterRemediators, $perClusterBufferedRemediators, |
221 | new AllClustersQueueingRemediator( |
222 | $this->getSearchConfig()->getClusterAssignment(), |
223 | MediaWikiServices::getInstance()->getJobQueueGroup() |
224 | ) ); |
225 | |
226 | $ranges = array_chunk( range( $from, $to ), $batchSize ); |
227 | foreach ( $ranges as $pageIds ) { |
228 | if ( self::getPressure() > $maxPressure ) { |
229 | $this->retry( "too much pressure on update jobs", reset( $pageIds ) ); |
230 | return true; |
231 | } |
232 | if ( time() - $startTime > $maxTime ) { |
233 | $this->retry( "execution time exceeded checker_job_max_time", reset( $pageIds ) ); |
234 | return true; |
235 | } |
236 | $pageCache->exchangeArray( [] ); |
237 | foreach ( $checkers as $cluster => $checker ) { |
238 | try { |
239 | $checker->check( $pageIds ); |
240 | } catch ( CheckerException $checkerException ) { |
241 | $this->retry( "Failed to verify ids on $cluster: " . $checkerException->getMessage(), reset( $pageIds ), $cluster ); |
242 | unset( $checkers[$cluster] ); |
243 | } |
244 | } |
245 | $multiClusterRemediator->sendBatch(); |
246 | } |
247 | return true; |
248 | } |
249 | |
250 | /** |
251 | * @return int the total number of update jobs enqueued |
252 | */ |
253 | public static function getPressure() { |
254 | $queues = [ |
255 | 'cirrusSearchLinksUpdatePrioritized', |
256 | 'cirrusSearchLinksUpdate', |
257 | 'cirrusSearchElasticaWrite', |
258 | 'cirrusSearchOtherIndex', |
259 | 'cirrusSearchDeletePages', |
260 | ]; |
261 | $size = 0; |
262 | $jobQueueGroup = MediaWikiServices::getInstance()->getJobQueueGroup(); |
263 | foreach ( $queues as $queueName ) { |
264 | $queue = $jobQueueGroup->get( $queueName ); |
265 | $size += $queue->getSize(); |
266 | $size += $queue->getDelayedCount(); |
267 | } |
268 | |
269 | return $size; |
270 | } |
271 | |
272 | /** |
273 | * This job handles all its own retries internally. |
274 | * @return bool |
275 | */ |
276 | public function allowRetries() { |
277 | return true; |
278 | } |
279 | |
280 | /** |
281 | * Retry the job later with a new from offset |
282 | * @param string $cause why we retry |
283 | * @param int $newFrom the new from offset |
284 | * @param string|null $cluster Cluster job is for |
285 | */ |
286 | private function retry( $cause, $newFrom, $cluster = null ) { |
287 | if ( $this->params['retryCount'] >= self::JOB_MAX_RETRIES ) { |
288 | LoggerFactory::getInstance( 'CirrusSearch' )->info( |
289 | "Sanitize CheckerJob: $cause ({fromPageId}:{toPageId}), Abandonning CheckerJob after {retries} retries " . |
290 | "for {cluster}, (jobs_chunk_size too high?).", |
291 | [ |
292 | 'retries' => $this->params['retryCount'], |
293 | 'fromPageId' => $this->params['fromPageId'], |
294 | 'toPageId' => $this->params['toPageId'], |
295 | 'cluster' => $cluster ?: 'all clusters' |
296 | ] |
297 | ); |
298 | return; |
299 | } |
300 | |
301 | $delay = $this->backoffDelay( $this->params['retryCount'] ); |
302 | $params = $this->params; |
303 | if ( $cluster !== null ) { |
304 | $params['cluster'] = $cluster; |
305 | } |
306 | $params['retryCount']++; |
307 | $params['fromPageId'] = $newFrom; |
308 | unset( $params['jobReleaseTimestamp'] ); |
309 | $jobQueue = MediaWikiServices::getInstance()->getJobQueueGroup(); |
310 | $params += self::buildJobDelayOptions( self::class, $delay, $jobQueue ); |
311 | $job = new self( $params ); |
312 | LoggerFactory::getInstance( 'CirrusSearch' )->info( |
313 | "Sanitize CheckerJob: $cause ({fromPageId}:{toPageId}), Requeueing CheckerJob " . |
314 | "for {cluster} with a delay of {delay}s.", |
315 | [ |
316 | 'delay' => $delay, |
317 | 'fromPageId' => $job->params['fromPageId'], |
318 | 'toPageId' => $job->params['toPageId'], |
319 | 'cluster' => $cluster ?: 'all clusters' |
320 | ] |
321 | ); |
322 | $jobQueue->push( $job ); |
323 | } |
324 | } |