Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
6.45% |
14 / 217 |
|
0.00% |
0 / 20 |
CRAP | |
0.00% |
0 / 1 |
Reindexer | |
6.45% |
14 / 217 |
|
0.00% |
0 / 20 |
3107.27 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
2 | |||
reindex | |
0.00% |
0 / 44 |
|
0.00% |
0 / 1 |
56 | |||
waitForCounts | |
0.00% |
0 / 18 |
|
0.00% |
0 / 1 |
30 | |||
waitForGreen | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
12 | |||
getHealth | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
12 | |||
decideMaxShardsPerNodeForReindex | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
setConnectionTimeout | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
destroyClients | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
output | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
outputIndented | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
error | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
fatalError | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
makeUpdateFieldsScript | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
42 | |||
makeRemoteReindexInfo | |
58.33% |
14 / 24 |
|
0.00% |
0 / 1 |
12.63 | |||
monitorReindexTask | |
0.00% |
0 / 36 |
|
0.00% |
0 / 1 |
56 | |||
monitorSleepSeconds | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
estimateTimeRemaining | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
30 | |||
estimateSlices | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
getNumberOfNodes | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
getNumberOfShards | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
6 |
1 | <?php |
2 | |
3 | namespace CirrusSearch\Maintenance; |
4 | |
5 | use CirrusSearch\Connection; |
6 | use CirrusSearch\Elastica\ReindexRequest; |
7 | use CirrusSearch\Elastica\ReindexResponse; |
8 | use CirrusSearch\Elastica\ReindexTask; |
9 | use CirrusSearch\SearchConfig; |
10 | use Elastica\Client; |
11 | use Elastica\Exception\Connection\HttpException; |
12 | use Elastica\Index; |
13 | use Elastica\Request; |
14 | use Elastica\Transport\Http; |
15 | use Elastica\Transport\Https; |
16 | use MediaWiki\Utils\MWTimestamp; |
17 | |
18 | /** |
19 | * This program is free software; you can redistribute it and/or modify |
20 | * it under the terms of the GNU General Public License as published by |
21 | * the Free Software Foundation; either version 2 of the License, or |
22 | * (at your option) any later version. |
23 | * |
24 | * This program is distributed in the hope that it will be useful, |
25 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
26 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
27 | * GNU General Public License for more details. |
28 | * |
29 | * You should have received a copy of the GNU General Public License along |
30 | * with this program; if not, write to the Free Software Foundation, Inc., |
31 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
32 | * http://www.gnu.org/copyleft/gpl.html |
33 | */ |
34 | class Reindexer { |
35 | private const MAX_CONSECUTIVE_ERRORS = 5; |
36 | private const MONITOR_SLEEP_SECONDS = 30; |
37 | private const MAX_WAIT_FOR_COUNT_SEC = 600; |
38 | private const AUTO_SLICE_CEILING = 20; |
39 | |
40 | /** |
41 | * @var SearchConfig |
42 | */ |
43 | private $searchConfig; |
44 | |
45 | /* "From" portion */ |
46 | /** |
47 | * @var Index |
48 | */ |
49 | private $oldIndex; |
50 | |
51 | /** |
52 | * @var Connection |
53 | */ |
54 | private $oldConnection; |
55 | |
56 | /* "To" portion */ |
57 | |
58 | /** |
59 | * @var Index |
60 | */ |
61 | private $index; |
62 | |
63 | /** |
64 | * @var Connection |
65 | */ |
66 | private $connection; |
67 | |
68 | /** |
69 | * @var Printer |
70 | */ |
71 | private $out; |
72 | |
73 | /** |
74 | * @var string[] list of fields to delete |
75 | */ |
76 | private $fieldsToDelete; |
77 | |
78 | /** |
79 | * @param SearchConfig $searchConfig |
80 | * @param Connection $source |
81 | * @param Connection $target |
82 | * @param Index $index |
83 | * @param Index $oldIndex |
84 | * @param Printer|null $out |
85 | * @param string[] $fieldsToDelete |
86 | */ |
87 | public function __construct( |
88 | SearchConfig $searchConfig, |
89 | Connection $source, |
90 | Connection $target, |
91 | Index $index, |
92 | Index $oldIndex, |
93 | Printer $out = null, |
94 | $fieldsToDelete = [] |
95 | ) { |
96 | // @todo: this constructor has too many arguments - refactor! |
97 | $this->searchConfig = $searchConfig; |
98 | $this->oldConnection = $source; |
99 | $this->connection = $target; |
100 | $this->oldIndex = $oldIndex; |
101 | $this->index = $index; |
102 | $this->out = $out; |
103 | $this->fieldsToDelete = $fieldsToDelete; |
104 | } |
105 | |
106 | /** |
107 | * Dump everything from the live index into the one being worked on. |
108 | * |
109 | * @param int|null $slices The number of slices to use, or null to use |
110 | * the number of shards |
111 | * @param int $chunkSize |
112 | * @param float $acceptableCountDeviation |
113 | */ |
114 | public function reindex( |
115 | $slices = null, |
116 | $chunkSize = 100, |
117 | $acceptableCountDeviation = 0.05 |
118 | ) { |
119 | // Set some settings that should help io load during bulk indexing. We'll have to |
120 | // optimize after this to consolidate down to a proper number of segments but that is |
121 | // is worth the price. total_shards_per_node will help to make sure that each shard |
122 | // has as few neighbors as possible. |
123 | $this->outputIndented( "Preparing index settings for reindex\n" ); |
124 | $this->setConnectionTimeout(); |
125 | $settings = $this->index->getSettings(); |
126 | $oldSettings = $settings->get(); |
127 | if ( !is_array( $oldSettings ) ) { |
128 | throw new \RuntimeException( 'Invalid response from index settings' ); |
129 | } |
130 | $settings->set( [ |
131 | 'refresh_interval' => -1, |
132 | 'routing.allocation.total_shards_per_node' => |
133 | $this->decideMaxShardsPerNodeForReindex( $oldSettings ), |
134 | // It's probably inefficient to let the index be created with replicas, |
135 | // then drop the empty replicas a few moments later. Doing it like this |
136 | // allows reindexing and index creation to operate independantly without |
137 | // needing to know about each other. |
138 | 'auto_expand_replicas' => 'false', |
139 | 'number_of_replicas' => 0, |
140 | ] ); |
141 | $this->waitForGreen(); |
142 | |
143 | $request = new ReindexRequest( $this->oldIndex, $this->index, $chunkSize ); |
144 | if ( $slices === null ) { |
145 | $request->setSlices( $this->estimateSlices( $this->oldIndex ) ); |
146 | } else { |
147 | $request->setSlices( $slices ); |
148 | } |
149 | $remote = self::makeRemoteReindexInfo( $this->oldConnection, $this->connection ); |
150 | if ( $remote !== null ) { |
151 | $request->setRemoteInfo( $remote ); |
152 | } |
153 | |
154 | $script = $this->makeUpdateFieldsScript(); |
155 | if ( $script !== null ) { |
156 | $request->setScript( $script ); |
157 | } |
158 | |
159 | try { |
160 | $task = $request->reindexTask(); |
161 | } catch ( \Exception $e ) { |
162 | $this->fatalError( $e->getMessage() ); |
163 | } |
164 | |
165 | $this->outputIndented( "Started reindex task: " . $task->getId() . "\n" ); |
166 | $response = $this->monitorReindexTask( $task, $this->index ); |
167 | $task->delete(); |
168 | if ( !$response->isSuccessful() ) { |
169 | $this->fatalError( |
170 | "Reindex task was not successful: " . $response->getUnsuccessfulReason() |
171 | ); |
172 | } |
173 | |
174 | $this->outputIndented( "Verifying counts..." ); |
175 | // We can't verify counts are exactly equal because they won't be - we still push updates |
176 | // into the old index while reindexing the new one. |
177 | $this->waitForCounts( $acceptableCountDeviation ); |
178 | $this->output( "done\n" ); |
179 | |
180 | // Revert settings changed just for reindexing. Although we set number_of_replicas above |
181 | // we do not reset it's value here, rather allowing auto_expand_replicas to pick an |
182 | // appropriate value. |
183 | $newSettings = [ |
184 | 'refresh_interval' => $oldSettings['refresh_interval'], |
185 | 'auto_expand_replicas' => $oldSettings['auto_expand_replicas'], |
186 | 'routing.allocation.total_shards_per_node' => |
187 | $oldSettings['routing']['allocation']['total_shards_per_node'] ?? -1, |
188 | ]; |
189 | $settings->set( $newSettings ); |
190 | } |
191 | |
192 | /** |
193 | * @param float $acceptableCountDeviation |
194 | */ |
195 | private function waitForCounts( float $acceptableCountDeviation ) { |
196 | $oldCount = (float)$this->oldIndex->count(); |
197 | $this->index->refresh(); |
198 | // While elasticsearch should be ready immediately after a refresh, we have seen this return |
199 | // exceptionally low values in 2% of reindex attempts. Wait around a bit and hope the refresh |
200 | // becomes available |
201 | $start = microtime( true ); |
202 | $timeoutAfter = $start + self::MAX_WAIT_FOR_COUNT_SEC; |
203 | while ( true ) { |
204 | $newCount = (float)$this->index->count(); |
205 | $difference = $oldCount > 0 ? abs( $oldCount - $newCount ) / $oldCount : 0; |
206 | if ( $difference <= $acceptableCountDeviation ) { |
207 | break; |
208 | } |
209 | $this->output( |
210 | "Not close enough! old=$oldCount new=$newCount difference=$difference\n" |
211 | ); |
212 | if ( microtime( true ) > $timeoutAfter ) { |
213 | $this->fatalError( 'Failed to load index - counts not close enough. ' . |
214 | "old=$oldCount new=$newCount difference=$difference. " . |
215 | 'Check for warnings above.' ); |
216 | } else { |
217 | $this->output( "Waiting to re-check counts..." ); |
218 | sleep( 30 ); |
219 | } |
220 | } |
221 | } |
222 | |
223 | public function waitForGreen() { |
224 | $this->outputIndented( "Waiting for index green status..." ); |
225 | $each = 0; |
226 | $status = $this->getHealth(); |
227 | while ( $status['status'] !== 'green' ) { |
228 | if ( $each === 0 ) { |
229 | $this->output( '.' ); |
230 | } |
231 | $each = ( $each + 1 ) % 20; |
232 | sleep( 1 ); |
233 | $status = $this->getHealth(); |
234 | } |
235 | $this->output( "done\n" ); |
236 | } |
237 | |
238 | /** |
239 | * Get health information about the index |
240 | * |
241 | * @return array Response data array |
242 | */ |
243 | private function getHealth() { |
244 | $indexName = $this->index->getName(); |
245 | $path = "_cluster/health/$indexName"; |
246 | while ( true ) { |
247 | $response = $this->index->getClient()->request( $path ); |
248 | if ( $response->hasError() ) { |
249 | $this->error( 'Error fetching index health but going to retry. Message: ' . |
250 | $response->getError() ); |
251 | sleep( 1 ); |
252 | continue; |
253 | } |
254 | return $response->getData(); |
255 | } |
256 | } |
257 | |
258 | /** |
259 | * Decide shards per node during reindex operation |
260 | * |
261 | * While reindexing we run with no replicas, meaning the default |
262 | * configuration for max shards per node might allow things to |
263 | * become very unbalanced. Choose a value that spreads the |
264 | * indexing load across as many instances as possible. |
265 | * |
266 | * @param array $settings Configured live index settings |
267 | * @return int |
268 | */ |
269 | private function decideMaxShardsPerNodeForReindex( array $settings ): int { |
270 | $numberOfNodes = $this->getHealth()[ 'number_of_nodes' ]; |
271 | $numberOfShards = $settings['number_of_shards']; |
272 | return (int)ceil( $numberOfShards / $numberOfNodes ); |
273 | } |
274 | |
275 | /** |
276 | * Set the maintenance timeout to the connection we will issue the reindex request |
277 | * to, so that it does not timeout while the reindex is running. |
278 | */ |
279 | private function setConnectionTimeout() { |
280 | $timeout = $this->searchConfig->get( 'CirrusSearchMaintenanceTimeout' ); |
281 | $this->connection->setTimeout( $timeout ); |
282 | } |
283 | |
284 | /** |
285 | * Destroy client connections |
286 | */ |
287 | private function destroyClients() { |
288 | $this->connection->destroyClient(); |
289 | $this->oldConnection->destroyClient(); |
290 | // Destroying connections resets timeouts, so we have to reinstate them |
291 | $this->setConnectionTimeout(); |
292 | } |
293 | |
294 | /** |
295 | * @param string $message |
296 | * @param mixed|null $channel |
297 | */ |
298 | protected function output( $message, $channel = null ) { |
299 | if ( $this->out ) { |
300 | $this->out->output( $message, $channel ); |
301 | } |
302 | } |
303 | |
304 | /** |
305 | * @param string $message |
306 | * @param string $prefix By default prefixes tab to fake an |
307 | * additional indentation level. |
308 | */ |
309 | private function outputIndented( $message, $prefix = "\t" ) { |
310 | if ( $this->out ) { |
311 | $this->out->outputIndented( $prefix . $message ); |
312 | } |
313 | } |
314 | |
315 | /** |
316 | * @param string $message |
317 | */ |
318 | private function error( $message ) { |
319 | if ( $this->out ) { |
320 | $this->out->error( $message ); |
321 | } |
322 | } |
323 | |
324 | /** |
325 | * @param string $message |
326 | * @param int $exitCode |
327 | * @return never |
328 | */ |
329 | private function fatalError( $message, $exitCode = 1 ) { |
330 | $this->error( $message ); |
331 | exit( $exitCode ); |
332 | } |
333 | |
334 | /** |
335 | * @return array|null Returns an array suitable for use as |
336 | * the _reindex api script parameter to delete fields from |
337 | * the copied documents, or null if no script is needed. |
338 | */ |
339 | private function makeUpdateFieldsScript() { |
340 | $script = [ |
341 | 'source' => '', |
342 | 'lang' => 'painless', |
343 | ]; |
344 | foreach ( $this->fieldsToDelete as $field ) { |
345 | $field = trim( $field ); |
346 | if ( strlen( $field ) ) { |
347 | $script['source'] .= "ctx._source.remove('$field');"; |
348 | } |
349 | } |
350 | // Populate the page_id if it's the first time we add the page_id field to the mapping |
351 | if ( !isset( $this->oldIndex->getMapping()['properties']['page_id'] ) |
352 | && isset( $this->index->getMapping()['properties']['page_id'] ) ) { |
353 | $this->outputIndented( "Populating the page_id field if not set\n" ); |
354 | $prefLen = strlen( $this->searchConfig->makeId( 1 ) ) - 1; |
355 | $script['source'] .= "if (ctx._source.page_id == null) {ctx._source.page_id = Long.parseLong(ctx._id.substring($prefLen));}"; |
356 | } |
357 | if ( $script['source'] === '' ) { |
358 | return null; |
359 | } |
360 | |
361 | return $script; |
362 | } |
363 | |
364 | /** |
365 | * Creates an array suitable for use as the _reindex api source.remote |
366 | * parameter to read from $oldConnection. |
367 | * |
368 | * This is very fragile, but the transports don't expose enough to do more really |
369 | * |
370 | * @param Connection $source Connection to read data from |
371 | * @param Connection $dest Connection to reindex data into |
372 | * @return array|null |
373 | */ |
374 | public static function makeRemoteReindexInfo( Connection $source, Connection $dest ) { |
375 | if ( $source->getClusterName() === $dest->getClusterName() ) { |
376 | return null; |
377 | } |
378 | |
379 | $innerConnection = $source->getClient()->getConnection(); |
380 | $transport = $innerConnection->getTransportObject(); |
381 | if ( !$transport instanceof Http ) { |
382 | throw new \RuntimeException( |
383 | 'Remote reindex not implemented for transport: ' . get_class( $transport ) |
384 | ); |
385 | } |
386 | |
387 | // We make some pretty bold assumptions that classes extending from \Elastica\Transport\Http |
388 | // don't change how any of this works. |
389 | $url = $innerConnection->hasConfig( 'url' ) |
390 | ? $innerConnection->getConfig( 'url' ) |
391 | : ''; |
392 | if ( $url === '' ) { |
393 | $scheme = ( $transport instanceof Https ) |
394 | ? 'https' |
395 | : 'http'; |
396 | $url = $scheme . '://' . $innerConnection->getHost() . ':' . |
397 | $innerConnection->getPort() . '/' . $innerConnection->getPath(); |
398 | } |
399 | |
400 | if ( $innerConnection->getUsername() && $innerConnection->getPassword() ) { |
401 | return [ |
402 | 'host' => $url, |
403 | 'username' => $innerConnection->getUsername(), |
404 | 'password' => $innerConnection->getPassword(), |
405 | ]; |
406 | } else { |
407 | return [ 'host' => $url ]; |
408 | } |
409 | } |
410 | |
411 | /** |
412 | * @param ReindexTask $task |
413 | * @param Index $target |
414 | * @return ReindexResponse |
415 | */ |
416 | private function monitorReindexTask( ReindexTask $task, Index $target ) { |
417 | $consecutiveErrors = 0; |
418 | $sleepSeconds = self::monitorSleepSeconds( 1, 2, self::MONITOR_SLEEP_SECONDS ); |
419 | $completionEstimateGen = self::estimateTimeRemaining(); |
420 | while ( !$task->isComplete() ) { |
421 | try { |
422 | $status = $task->getStatus(); |
423 | } catch ( \Exception $e ) { |
424 | if ( ++$consecutiveErrors > self::MAX_CONSECUTIVE_ERRORS ) { |
425 | $this->output( "\n" ); |
426 | $this->fatalError( |
427 | "$e\n\n" . |
428 | "Lost connection to elasticsearch cluster. The reindex task " |
429 | . "{$task->getId()} is still running.\nThe task should be manually " |
430 | . "canceled, and the index {$target->getName()}\n" |
431 | . "should be removed.\n" . |
432 | $e->getMessage() |
433 | ); |
434 | } |
435 | if ( $e instanceof HttpException ) { |
436 | // Allow through potentially intermittent network problems: |
437 | // * couldn't connect, |
438 | // * 28: timeout out |
439 | // * 52: connected, closed with no response |
440 | if ( !in_array( $e->getError(), [ CURLE_COULDNT_CONNECT, 28, 52 ] ) ) { |
441 | // Wrap exception to include info about task id? |
442 | throw $e; |
443 | } |
444 | } |
445 | $this->outputIndented( "Error: {$e->getMessage()}\n" ); |
446 | usleep( 500000 ); |
447 | continue; |
448 | } |
449 | |
450 | $consecutiveErrors = 0; |
451 | |
452 | $estCompletion = $completionEstimateGen->send( |
453 | $status->getTotal() - $status->getCreated() ); |
454 | // What is worth reporting here? |
455 | $this->outputIndented( |
456 | "Task: {$task->getId()} " |
457 | . "Search Retries: {$status->getSearchRetries()} " |
458 | . "Bulk Retries: {$status->getBulkRetries()} " |
459 | . "Indexed: {$status->getCreated()} / {$status->getTotal()} " |
460 | . "Complete: $estCompletion\n" |
461 | ); |
462 | if ( !$status->isComplete() ) { |
463 | sleep( $sleepSeconds->current() ); |
464 | $sleepSeconds->next(); |
465 | } |
466 | } |
467 | |
468 | return $task->getResponse(); |
469 | } |
470 | |
471 | private static function monitorSleepSeconds( $base, $ratio, $max ) { |
472 | $val = $base; |
473 | // @phan-suppress-next-line PhanInfiniteLoop https://github.com/phan/phan/issues/3545 |
474 | while ( true ) { |
475 | yield $val; |
476 | $val = min( $max, $val * $ratio ); |
477 | } |
478 | } |
479 | |
480 | /** |
481 | * Generator returning the estimated timestamp of completion. |
482 | * @return \Generator Must be provided the remaining count via Generator::send, replies |
483 | * with a unix timestamp estimating the completion time. |
484 | */ |
485 | private static function estimateTimeRemaining(): \Generator { |
486 | $estimatedStr = null; |
487 | $remain = null; |
488 | $prevRemain = null; |
489 | $now = microtime( true ); |
490 | while ( true ) { |
491 | $start = $now; |
492 | $prevRemain = $remain; |
493 | $remain = yield $estimatedStr; |
494 | $now = microtime( true ); |
495 | if ( $remain === null || $prevRemain === null ) { |
496 | continue; |
497 | } |
498 | # Very simple calc, no smoothing and will vary wildly. Could be |
499 | # improved if deemed useful. |
500 | $elapsed = $now - $start; |
501 | $rate = ( $prevRemain - $remain ) / $elapsed; |
502 | if ( $rate > 0 ) { |
503 | $estimatedCompletion = $now + ( $remain / $rate ); |
504 | $estimatedStr = MWTimestamp::convert( TS_RFC2822, $estimatedCompletion ); |
505 | } |
506 | } |
507 | } |
508 | |
509 | /** |
510 | * Auto detect the number of slices to use when reindexing. |
511 | * |
512 | * Note that elasticseach 7.x added an 'auto' setting, but we are on |
513 | * 6.x. That setting uses one slice per shard, up to a certain limit (20 in |
514 | * 7.9). This implementation provides the same limits, and adds an additional |
515 | * constraint that the auto-detected value must be <= the number of nodes. |
516 | * |
517 | * @param Index $index The index the estimate a slice count for |
518 | * @return int The number of slices to reindex with |
519 | */ |
520 | private function estimateSlices( Index $index ) { |
521 | return min( |
522 | $this->getNumberOfNodes( $index->getClient() ), |
523 | $this->getNumberOfShards( $index ), |
524 | self::AUTO_SLICE_CEILING |
525 | ); |
526 | } |
527 | |
528 | private function getNumberOfNodes( Client $client ) { |
529 | $endpoint = ( new \Elasticsearch\Endpoints\Cat\Nodes() ) |
530 | ->setParams( [ 'format' => 'json' ] ); |
531 | return count( $client->requestEndpoint( $endpoint )->getData() ); |
532 | } |
533 | |
534 | private function getNumberOfShards( Index $index ) { |
535 | $response = $index->request( '_settings/index.number_of_shards', Request::GET ); |
536 | $data = $response->getData(); |
537 | // Can't use $index->getName() because that is probably an alias |
538 | $realIndexName = array_keys( $data )[0]; |
539 | // In theory this should never happen, we will get a ResponseException if the index doesn't |
540 | // exist and every index must have a number_of_shards settings. But better safe than sorry. |
541 | if ( !isset( $data[$realIndexName]['settings']['index']['number_of_shards'] ) ) { |
542 | throw new \RuntimeException( |
543 | "Couldn't detect number of shards in {$index->getName()}" |
544 | ); |
545 | } |
546 | return $data[$realIndexName]['settings']['index']['number_of_shards']; |
547 | } |
548 | } |