Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
6.45% |
14 / 217 |
|
0.00% |
0 / 20 |
CRAP | |
0.00% |
0 / 1 |
Reindexer | |
6.45% |
14 / 217 |
|
0.00% |
0 / 20 |
3107.27 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
2 | |||
reindex | |
0.00% |
0 / 44 |
|
0.00% |
0 / 1 |
56 | |||
waitForCounts | |
0.00% |
0 / 18 |
|
0.00% |
0 / 1 |
30 | |||
waitForGreen | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
12 | |||
getHealth | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
12 | |||
decideMaxShardsPerNodeForReindex | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
setConnectionTimeout | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
destroyClients | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
output | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
outputIndented | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
error | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
fatalError | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
makeUpdateFieldsScript | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
42 | |||
makeRemoteReindexInfo | |
58.33% |
14 / 24 |
|
0.00% |
0 / 1 |
12.63 | |||
monitorReindexTask | |
0.00% |
0 / 36 |
|
0.00% |
0 / 1 |
56 | |||
monitorSleepSeconds | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
estimateTimeRemaining | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
30 | |||
estimateSlices | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
getNumberOfNodes | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
getNumberOfShards | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
6 |
1 | <?php |
2 | |
3 | namespace CirrusSearch\Maintenance; |
4 | |
5 | use CirrusSearch\Connection; |
6 | use CirrusSearch\Elastica\ReindexRequest; |
7 | use CirrusSearch\Elastica\ReindexResponse; |
8 | use CirrusSearch\Elastica\ReindexTask; |
9 | use CirrusSearch\SearchConfig; |
10 | use Elastica\Client; |
11 | use Elastica\Exception\Connection\HttpException; |
12 | use Elastica\Index; |
13 | use Elastica\Request; |
14 | use Elastica\Transport\Http; |
15 | use Elastica\Transport\Https; |
16 | |
17 | /** |
18 | * This program is free software; you can redistribute it and/or modify |
19 | * it under the terms of the GNU General Public License as published by |
20 | * the Free Software Foundation; either version 2 of the License, or |
21 | * (at your option) any later version. |
22 | * |
23 | * This program is distributed in the hope that it will be useful, |
24 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
25 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
26 | * GNU General Public License for more details. |
27 | * |
28 | * You should have received a copy of the GNU General Public License along |
29 | * with this program; if not, write to the Free Software Foundation, Inc., |
30 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
31 | * http://www.gnu.org/copyleft/gpl.html |
32 | */ |
33 | class Reindexer { |
34 | private const MAX_CONSECUTIVE_ERRORS = 5; |
35 | private const MONITOR_SLEEP_SECONDS = 30; |
36 | private const MAX_WAIT_FOR_COUNT_SEC = 600; |
37 | private const AUTO_SLICE_CEILING = 20; |
38 | |
39 | /** |
40 | * @var SearchConfig |
41 | */ |
42 | private $searchConfig; |
43 | |
44 | /* "From" portion */ |
45 | /** |
46 | * @var Index |
47 | */ |
48 | private $oldIndex; |
49 | |
50 | /** |
51 | * @var Connection |
52 | */ |
53 | private $oldConnection; |
54 | |
55 | /* "To" portion */ |
56 | |
57 | /** |
58 | * @var Index |
59 | */ |
60 | private $index; |
61 | |
62 | /** |
63 | * @var Connection |
64 | */ |
65 | private $connection; |
66 | |
67 | /** |
68 | * @var Printer |
69 | */ |
70 | private $out; |
71 | |
72 | /** |
73 | * @var string[] list of fields to delete |
74 | */ |
75 | private $fieldsToDelete; |
76 | |
77 | /** |
78 | * @param SearchConfig $searchConfig |
79 | * @param Connection $source |
80 | * @param Connection $target |
81 | * @param Index $index |
82 | * @param Index $oldIndex |
83 | * @param Printer|null $out |
84 | * @param string[] $fieldsToDelete |
85 | */ |
86 | public function __construct( |
87 | SearchConfig $searchConfig, |
88 | Connection $source, |
89 | Connection $target, |
90 | Index $index, |
91 | Index $oldIndex, |
92 | Printer $out = null, |
93 | $fieldsToDelete = [] |
94 | ) { |
95 | // @todo: this constructor has too many arguments - refactor! |
96 | $this->searchConfig = $searchConfig; |
97 | $this->oldConnection = $source; |
98 | $this->connection = $target; |
99 | $this->oldIndex = $oldIndex; |
100 | $this->index = $index; |
101 | $this->out = $out; |
102 | $this->fieldsToDelete = $fieldsToDelete; |
103 | } |
104 | |
105 | /** |
106 | * Dump everything from the live index into the one being worked on. |
107 | * |
108 | * @param int|null $slices The number of slices to use, or null to use |
109 | * the number of shards |
110 | * @param int $chunkSize |
111 | * @param float $acceptableCountDeviation |
112 | */ |
113 | public function reindex( |
114 | $slices = null, |
115 | $chunkSize = 100, |
116 | $acceptableCountDeviation = 0.05 |
117 | ) { |
118 | // Set some settings that should help io load during bulk indexing. We'll have to |
119 | // optimize after this to consolidate down to a proper number of segments but that is |
120 | // is worth the price. total_shards_per_node will help to make sure that each shard |
121 | // has as few neighbors as possible. |
122 | $this->outputIndented( "Preparing index settings for reindex\n" ); |
123 | $this->setConnectionTimeout(); |
124 | $settings = $this->index->getSettings(); |
125 | $oldSettings = $settings->get(); |
126 | if ( !is_array( $oldSettings ) ) { |
127 | throw new \RuntimeException( 'Invalid response from index settings' ); |
128 | } |
129 | $settings->set( [ |
130 | 'refresh_interval' => -1, |
131 | 'routing.allocation.total_shards_per_node' => |
132 | $this->decideMaxShardsPerNodeForReindex( $oldSettings ), |
133 | // It's probably inefficient to let the index be created with replicas, |
134 | // then drop the empty replicas a few moments later. Doing it like this |
135 | // allows reindexing and index creation to operate independantly without |
136 | // needing to know about each other. |
137 | 'auto_expand_replicas' => 'false', |
138 | 'number_of_replicas' => 0, |
139 | ] ); |
140 | $this->waitForGreen(); |
141 | |
142 | $request = new ReindexRequest( $this->oldIndex, $this->index, $chunkSize ); |
143 | if ( $slices === null ) { |
144 | $request->setSlices( $this->estimateSlices( $this->oldIndex ) ); |
145 | } else { |
146 | $request->setSlices( $slices ); |
147 | } |
148 | $remote = self::makeRemoteReindexInfo( $this->oldConnection, $this->connection ); |
149 | if ( $remote !== null ) { |
150 | $request->setRemoteInfo( $remote ); |
151 | } |
152 | |
153 | $script = $this->makeUpdateFieldsScript(); |
154 | if ( $script !== null ) { |
155 | $request->setScript( $script ); |
156 | } |
157 | |
158 | try { |
159 | $task = $request->reindexTask(); |
160 | } catch ( \Exception $e ) { |
161 | $this->fatalError( $e->getMessage() ); |
162 | } |
163 | |
164 | $this->outputIndented( "Started reindex task: " . $task->getId() . "\n" ); |
165 | $response = $this->monitorReindexTask( $task, $this->index ); |
166 | $task->delete(); |
167 | if ( !$response->isSuccessful() ) { |
168 | $this->fatalError( |
169 | "Reindex task was not successful: " . $response->getUnsuccessfulReason() |
170 | ); |
171 | } |
172 | |
173 | $this->outputIndented( "Verifying counts..." ); |
174 | // We can't verify counts are exactly equal because they won't be - we still push updates |
175 | // into the old index while reindexing the new one. |
176 | $this->waitForCounts( $acceptableCountDeviation ); |
177 | $this->output( "done\n" ); |
178 | |
179 | // Revert settings changed just for reindexing. Although we set number_of_replicas above |
180 | // we do not reset it's value here, rather allowing auto_expand_replicas to pick an |
181 | // appropriate value. |
182 | $newSettings = [ |
183 | 'refresh_interval' => $oldSettings['refresh_interval'], |
184 | 'auto_expand_replicas' => $oldSettings['auto_expand_replicas'], |
185 | 'routing.allocation.total_shards_per_node' => |
186 | $oldSettings['routing']['allocation']['total_shards_per_node'] ?? -1, |
187 | ]; |
188 | $settings->set( $newSettings ); |
189 | } |
190 | |
191 | /** |
192 | * @param float $acceptableCountDeviation |
193 | */ |
194 | private function waitForCounts( float $acceptableCountDeviation ) { |
195 | $oldCount = (float)$this->oldIndex->count(); |
196 | $this->index->refresh(); |
197 | // While elasticsearch should be ready immediately after a refresh, we have seen this return |
198 | // exceptionally low values in 2% of reindex attempts. Wait around a bit and hope the refresh |
199 | // becomes available |
200 | $start = microtime( true ); |
201 | $timeoutAfter = $start + self::MAX_WAIT_FOR_COUNT_SEC; |
202 | while ( true ) { |
203 | $newCount = (float)$this->index->count(); |
204 | $difference = $oldCount > 0 ? abs( $oldCount - $newCount ) / $oldCount : 0; |
205 | if ( $difference <= $acceptableCountDeviation ) { |
206 | break; |
207 | } |
208 | $this->output( |
209 | "Not close enough! old=$oldCount new=$newCount difference=$difference\n" |
210 | ); |
211 | if ( microtime( true ) > $timeoutAfter ) { |
212 | $this->fatalError( 'Failed to load index - counts not close enough. ' . |
213 | "old=$oldCount new=$newCount difference=$difference. " . |
214 | 'Check for warnings above.' ); |
215 | } else { |
216 | $this->output( "Waiting to re-check counts..." ); |
217 | sleep( 30 ); |
218 | } |
219 | } |
220 | } |
221 | |
222 | public function waitForGreen() { |
223 | $this->outputIndented( "Waiting for index green status..." ); |
224 | $each = 0; |
225 | $status = $this->getHealth(); |
226 | while ( $status['status'] !== 'green' ) { |
227 | if ( $each === 0 ) { |
228 | $this->output( '.' ); |
229 | } |
230 | $each = ( $each + 1 ) % 20; |
231 | sleep( 1 ); |
232 | $status = $this->getHealth(); |
233 | } |
234 | $this->output( "done\n" ); |
235 | } |
236 | |
237 | /** |
238 | * Get health information about the index |
239 | * |
240 | * @return array Response data array |
241 | */ |
242 | private function getHealth() { |
243 | $indexName = $this->index->getName(); |
244 | $path = "_cluster/health/$indexName"; |
245 | while ( true ) { |
246 | $response = $this->index->getClient()->request( $path ); |
247 | if ( $response->hasError() ) { |
248 | $this->error( 'Error fetching index health but going to retry. Message: ' . |
249 | $response->getError() ); |
250 | sleep( 1 ); |
251 | continue; |
252 | } |
253 | return $response->getData(); |
254 | } |
255 | } |
256 | |
257 | /** |
258 | * Decide shards per node during reindex operation |
259 | * |
260 | * While reindexing we run with no replicas, meaning the default |
261 | * configuration for max shards per node might allow things to |
262 | * become very unbalanced. Choose a value that spreads the |
263 | * indexing load across as many instances as possible. |
264 | * |
265 | * @param array $settings Configured live index settings |
266 | * @return int |
267 | */ |
268 | private function decideMaxShardsPerNodeForReindex( array $settings ): int { |
269 | $numberOfNodes = $this->getHealth()[ 'number_of_nodes' ]; |
270 | $numberOfShards = $settings['number_of_shards']; |
271 | return (int)ceil( $numberOfShards / $numberOfNodes ); |
272 | } |
273 | |
274 | /** |
275 | * Set the maintenance timeout to the connection we will issue the reindex request |
276 | * to, so that it does not timeout while the reindex is running. |
277 | */ |
278 | private function setConnectionTimeout() { |
279 | $timeout = $this->searchConfig->get( 'CirrusSearchMaintenanceTimeout' ); |
280 | $this->connection->setTimeout( $timeout ); |
281 | } |
282 | |
283 | /** |
284 | * Destroy client connections |
285 | */ |
286 | private function destroyClients() { |
287 | $this->connection->destroyClient(); |
288 | $this->oldConnection->destroyClient(); |
289 | // Destroying connections resets timeouts, so we have to reinstate them |
290 | $this->setConnectionTimeout(); |
291 | } |
292 | |
293 | /** |
294 | * @param string $message |
295 | * @param mixed|null $channel |
296 | */ |
297 | protected function output( $message, $channel = null ) { |
298 | if ( $this->out ) { |
299 | $this->out->output( $message, $channel ); |
300 | } |
301 | } |
302 | |
303 | /** |
304 | * @param string $message |
305 | * @param string $prefix By default prefixes tab to fake an |
306 | * additional indentation level. |
307 | */ |
308 | private function outputIndented( $message, $prefix = "\t" ) { |
309 | if ( $this->out ) { |
310 | $this->out->outputIndented( $prefix . $message ); |
311 | } |
312 | } |
313 | |
314 | /** |
315 | * @param string $message |
316 | */ |
317 | private function error( $message ) { |
318 | if ( $this->out ) { |
319 | $this->out->error( $message ); |
320 | } |
321 | } |
322 | |
323 | /** |
324 | * @param string $message |
325 | * @param int $exitCode |
326 | * @return never |
327 | */ |
328 | private function fatalError( $message, $exitCode = 1 ) { |
329 | $this->error( $message ); |
330 | exit( $exitCode ); |
331 | } |
332 | |
333 | /** |
334 | * @return array|null Returns an array suitable for use as |
335 | * the _reindex api script parameter to delete fields from |
336 | * the copied documents, or null if no script is needed. |
337 | */ |
338 | private function makeUpdateFieldsScript() { |
339 | $script = [ |
340 | 'source' => '', |
341 | 'lang' => 'painless', |
342 | ]; |
343 | foreach ( $this->fieldsToDelete as $field ) { |
344 | $field = trim( $field ); |
345 | if ( strlen( $field ) ) { |
346 | $script['source'] .= "ctx._source.remove('$field');"; |
347 | } |
348 | } |
349 | // Populate the page_id if it's the first time we add the page_id field to the mapping |
350 | if ( !isset( $this->oldIndex->getMapping()['properties']['page_id'] ) |
351 | && isset( $this->index->getMapping()['properties']['page_id'] ) ) { |
352 | $this->outputIndented( "Populating the page_id field if not set\n" ); |
353 | $prefLen = strlen( $this->searchConfig->makeId( 1 ) ) - 1; |
354 | $script['source'] .= "if (ctx._source.page_id == null) {ctx._source.page_id = Long.parseLong(ctx._id.substring($prefLen));}"; |
355 | } |
356 | if ( $script['source'] === '' ) { |
357 | return null; |
358 | } |
359 | |
360 | return $script; |
361 | } |
362 | |
363 | /** |
364 | * Creates an array suitable for use as the _reindex api source.remote |
365 | * parameter to read from $oldConnection. |
366 | * |
367 | * This is very fragile, but the transports don't expose enough to do more really |
368 | * |
369 | * @param Connection $source Connection to read data from |
370 | * @param Connection $dest Connection to reindex data into |
371 | * @return array|null |
372 | */ |
373 | public static function makeRemoteReindexInfo( Connection $source, Connection $dest ) { |
374 | if ( $source->getClusterName() === $dest->getClusterName() ) { |
375 | return null; |
376 | } |
377 | |
378 | $innerConnection = $source->getClient()->getConnection(); |
379 | $transport = $innerConnection->getTransportObject(); |
380 | if ( !$transport instanceof Http ) { |
381 | throw new \RuntimeException( |
382 | 'Remote reindex not implemented for transport: ' . get_class( $transport ) |
383 | ); |
384 | } |
385 | |
386 | // We make some pretty bold assumptions that classes extending from \Elastica\Transport\Http |
387 | // don't change how any of this works. |
388 | $url = $innerConnection->hasConfig( 'url' ) |
389 | ? $innerConnection->getConfig( 'url' ) |
390 | : ''; |
391 | if ( $url === '' ) { |
392 | $scheme = ( $transport instanceof Https ) |
393 | ? 'https' |
394 | : 'http'; |
395 | $url = $scheme . '://' . $innerConnection->getHost() . ':' . |
396 | $innerConnection->getPort() . '/' . $innerConnection->getPath(); |
397 | } |
398 | |
399 | if ( $innerConnection->getUsername() && $innerConnection->getPassword() ) { |
400 | return [ |
401 | 'host' => $url, |
402 | 'username' => $innerConnection->getUsername(), |
403 | 'password' => $innerConnection->getPassword(), |
404 | ]; |
405 | } else { |
406 | return [ 'host' => $url ]; |
407 | } |
408 | } |
409 | |
410 | /** |
411 | * @param ReindexTask $task |
412 | * @param Index $target |
413 | * @return ReindexResponse |
414 | */ |
415 | private function monitorReindexTask( ReindexTask $task, Index $target ) { |
416 | $consecutiveErrors = 0; |
417 | $sleepSeconds = self::monitorSleepSeconds( 1, 2, self::MONITOR_SLEEP_SECONDS ); |
418 | $completionEstimateGen = self::estimateTimeRemaining(); |
419 | while ( !$task->isComplete() ) { |
420 | try { |
421 | $status = $task->getStatus(); |
422 | } catch ( \Exception $e ) { |
423 | if ( ++$consecutiveErrors > self::MAX_CONSECUTIVE_ERRORS ) { |
424 | $this->output( "\n" ); |
425 | $this->fatalError( |
426 | "$e\n\n" . |
427 | "Lost connection to elasticsearch cluster. The reindex task " |
428 | . "{$task->getId()} is still running.\nThe task should be manually " |
429 | . "canceled, and the index {$target->getName()}\n" |
430 | . "should be removed.\n" . |
431 | $e->getMessage() |
432 | ); |
433 | } |
434 | if ( $e instanceof HttpException ) { |
435 | // Allow through potentially intermittent network problems: |
436 | // * couldn't connect, |
437 | // * 28: timeout out |
438 | // * 52: connected, closed with no response |
439 | if ( !in_array( $e->getError(), [ CURLE_COULDNT_CONNECT, 28, 52 ] ) ) { |
440 | // Wrap exception to include info about task id? |
441 | throw $e; |
442 | } |
443 | } |
444 | $this->outputIndented( "Error: {$e->getMessage()}\n" ); |
445 | usleep( 500000 ); |
446 | continue; |
447 | } |
448 | |
449 | $consecutiveErrors = 0; |
450 | |
451 | $estCompletion = $completionEstimateGen->send( |
452 | $status->getTotal() - $status->getCreated() ); |
453 | // What is worth reporting here? |
454 | $this->outputIndented( |
455 | "Task: {$task->getId()} " |
456 | . "Search Retries: {$status->getSearchRetries()} " |
457 | . "Bulk Retries: {$status->getBulkRetries()} " |
458 | . "Indexed: {$status->getCreated()} / {$status->getTotal()} " |
459 | . "Complete: $estCompletion\n" |
460 | ); |
461 | if ( !$status->isComplete() ) { |
462 | sleep( $sleepSeconds->current() ); |
463 | $sleepSeconds->next(); |
464 | } |
465 | } |
466 | |
467 | return $task->getResponse(); |
468 | } |
469 | |
470 | private static function monitorSleepSeconds( $base, $ratio, $max ) { |
471 | $val = $base; |
472 | // @phan-suppress-next-line PhanInfiniteLoop https://github.com/phan/phan/issues/3545 |
473 | while ( true ) { |
474 | yield $val; |
475 | $val = min( $max, $val * $ratio ); |
476 | } |
477 | } |
478 | |
479 | /** |
480 | * Generator returning the estimated timestamp of completion. |
481 | * @return \Generator Must be provided the remaining count via Generator::send, replies |
482 | * with a unix timestamp estimating the completion time. |
483 | */ |
484 | private static function estimateTimeRemaining(): \Generator { |
485 | $estimatedStr = null; |
486 | $remain = null; |
487 | $prevRemain = null; |
488 | $now = microtime( true ); |
489 | while ( true ) { |
490 | $start = $now; |
491 | $prevRemain = $remain; |
492 | $remain = yield $estimatedStr; |
493 | $now = microtime( true ); |
494 | if ( $remain === null || $prevRemain === null ) { |
495 | continue; |
496 | } |
497 | # Very simple calc, no smoothing and will vary wildly. Could be |
498 | # improved if deemed useful. |
499 | $elapsed = $now - $start; |
500 | $rate = ( $prevRemain - $remain ) / $elapsed; |
501 | if ( $rate > 0 ) { |
502 | $estimatedCompletion = $now + ( $remain / $rate ); |
503 | $estimatedStr = \MWTimestamp::convert( TS_RFC2822, $estimatedCompletion ); |
504 | } |
505 | } |
506 | } |
507 | |
508 | /** |
509 | * Auto detect the number of slices to use when reindexing. |
510 | * |
511 | * Note that elasticseach 7.x added an 'auto' setting, but we are on |
512 | * 6.x. That setting uses one slice per shard, up to a certain limit (20 in |
513 | * 7.9). This implementation provides the same limits, and adds an additional |
514 | * constraint that the auto-detected value must be <= the number of nodes. |
515 | * |
516 | * @param Index $index The index the estimate a slice count for |
517 | * @return int The number of slices to reindex with |
518 | */ |
519 | private function estimateSlices( Index $index ) { |
520 | return min( |
521 | $this->getNumberOfNodes( $index->getClient() ), |
522 | $this->getNumberOfShards( $index ), |
523 | self::AUTO_SLICE_CEILING |
524 | ); |
525 | } |
526 | |
527 | private function getNumberOfNodes( Client $client ) { |
528 | $endpoint = ( new \Elasticsearch\Endpoints\Cat\Nodes() ) |
529 | ->setParams( [ 'format' => 'json' ] ); |
530 | return count( $client->requestEndpoint( $endpoint )->getData() ); |
531 | } |
532 | |
533 | private function getNumberOfShards( Index $index ) { |
534 | $response = $index->request( '_settings/index.number_of_shards', Request::GET ); |
535 | $data = $response->getData(); |
536 | // Can't use $index->getName() because that is probably an alias |
537 | $realIndexName = array_keys( $data )[0]; |
538 | // In theory this should never happen, we will get a ResponseException if the index doesn't |
539 | // exist and every index must have a number_of_shards settings. But better safe than sorry. |
540 | if ( !isset( $data[$realIndexName]['settings']['index']['number_of_shards'] ) ) { |
541 | throw new \RuntimeException( |
542 | "Couldn't detect number of shards in {$index->getName()}" |
543 | ); |
544 | } |
545 | return $data[$realIndexName]['settings']['index']['number_of_shards']; |
546 | } |
547 | } |