Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
| Total | |
5.58% |
14 / 251 |
|
0.00% |
0 / 25 |
CRAP | |
0.00% |
0 / 1 |
| Reindexer | |
5.58% |
14 / 251 |
|
0.00% |
0 / 25 |
3960.62 | |
0.00% |
0 / 1 |
| __construct | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
2 | |||
| reindex | |
0.00% |
0 / 44 |
|
0.00% |
0 / 1 |
56 | |||
| waitForCounts | |
0.00% |
0 / 18 |
|
0.00% |
0 / 1 |
30 | |||
| waitForGreen | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
12 | |||
| getHealth | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
12 | |||
| decideMaxShardsPerNodeForReindex | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
| setConnectionTimeout | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
| destroyClients | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
| output | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
| outputIndented | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
| error | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
| fatalError | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
| makeUpdateFieldsScript | |
0.00% |
0 / 19 |
|
0.00% |
0 / 1 |
42 | |||
| makeWeightedTagsPrefixReplaceScript | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
12 | |||
| pruneWeightedTagsDeleteMarkersScript | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
| migrateArticleTopicUnderscoresScript | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
2 | |||
| makeRemoteReindexInfo | |
58.33% |
14 / 24 |
|
0.00% |
0 / 1 |
12.63 | |||
| monitorReindexTask | |
0.00% |
0 / 36 |
|
0.00% |
0 / 1 |
56 | |||
| monitorSleepSeconds | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
| estimateTimeRemaining | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
30 | |||
| estimateSlices | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
| getNumberOfNodes | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
| getNumberOfShards | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
6 | |||
| safeCount | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
2 | |||
| safeRefresh | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
2 | |||
| 1 | <?php |
| 2 | |
| 3 | namespace CirrusSearch\Maintenance; |
| 4 | |
| 5 | use CirrusSearch\Connection; |
| 6 | use CirrusSearch\Elastica\ReindexRequest; |
| 7 | use CirrusSearch\Elastica\ReindexResponse; |
| 8 | use CirrusSearch\Elastica\ReindexTask; |
| 9 | use CirrusSearch\Query\ArticlePredictionKeyword; |
| 10 | use CirrusSearch\SearchConfig; |
| 11 | use Elastica\Client; |
| 12 | use Elastica\Exception\Connection\HttpException; |
| 13 | use Elastica\Index; |
| 14 | use Elastica\Request; |
| 15 | use Elastica\Transport\Http; |
| 16 | use Elastica\Transport\Https; |
| 17 | use MediaWiki\Utils\MWTimestamp; |
| 18 | use StatusValue; |
| 19 | |
| 20 | /** |
| 21 | * @license GPL-2.0-or-later |
| 22 | */ |
| 23 | class Reindexer { |
| 24 | private const MAX_CONSECUTIVE_ERRORS = 5; |
| 25 | private const MONITOR_SLEEP_SECONDS = 30; |
| 26 | private const MAX_WAIT_FOR_COUNT_SEC = 600; |
| 27 | private const AUTO_SLICE_CEILING = 20; |
| 28 | |
| 29 | /** |
| 30 | * @var SearchConfig |
| 31 | */ |
| 32 | private $searchConfig; |
| 33 | |
| 34 | /* "From" portion */ |
| 35 | /** |
| 36 | * @var Index |
| 37 | */ |
| 38 | private $oldIndex; |
| 39 | |
| 40 | /** |
| 41 | * @var Connection |
| 42 | */ |
| 43 | private $oldConnection; |
| 44 | |
| 45 | /* "To" portion */ |
| 46 | |
| 47 | /** |
| 48 | * @var Index |
| 49 | */ |
| 50 | private $index; |
| 51 | |
| 52 | /** |
| 53 | * @var Connection |
| 54 | */ |
| 55 | private $connection; |
| 56 | |
| 57 | /** |
| 58 | * @var Printer |
| 59 | */ |
| 60 | private $out; |
| 61 | |
| 62 | /** |
| 63 | * @var string[] list of fields to delete |
| 64 | */ |
| 65 | private $fieldsToDelete; |
| 66 | |
| 67 | /** |
| 68 | * @var string[] list of weighted tag prefixes to rename (old prefix => new prefix) |
| 69 | */ |
| 70 | private $weightedTagsPrefixMap; |
| 71 | |
| 72 | /** |
| 73 | * @param SearchConfig $searchConfig |
| 74 | * @param Connection $source |
| 75 | * @param Connection $target |
| 76 | * @param Index $index |
| 77 | * @param Index $oldIndex |
| 78 | * @param Printer|null $out |
| 79 | * @param string[] $fieldsToDelete |
| 80 | * @param string[] $weightedTagsPrefixMap |
| 81 | */ |
| 82 | public function __construct( |
| 83 | SearchConfig $searchConfig, |
| 84 | Connection $source, |
| 85 | Connection $target, |
| 86 | Index $index, |
| 87 | Index $oldIndex, |
| 88 | ?Printer $out = null, |
| 89 | array $fieldsToDelete = [], |
| 90 | array $weightedTagsPrefixMap = [] |
| 91 | ) { |
| 92 | // @todo: this constructor has too many arguments - refactor! |
| 93 | $this->searchConfig = $searchConfig; |
| 94 | $this->oldConnection = $source; |
| 95 | $this->connection = $target; |
| 96 | $this->oldIndex = $oldIndex; |
| 97 | $this->index = $index; |
| 98 | $this->out = $out; |
| 99 | $this->fieldsToDelete = $fieldsToDelete; |
| 100 | $this->weightedTagsPrefixMap = $weightedTagsPrefixMap; |
| 101 | } |
| 102 | |
| 103 | /** |
| 104 | * Dump everything from the live index into the one being worked on. |
| 105 | * |
| 106 | * @param int|null $slices The number of slices to use, or null to use |
| 107 | * the number of shards |
| 108 | * @param int $chunkSize |
| 109 | * @param float $acceptableCountDeviation |
| 110 | */ |
| 111 | public function reindex( |
| 112 | $slices = null, |
| 113 | $chunkSize = 100, |
| 114 | $acceptableCountDeviation = 0.05 |
| 115 | ) { |
| 116 | // Set some settings that should help io load during bulk indexing. We'll have to |
| 117 | // optimize after this to consolidate down to a proper number of segments but that is |
| 118 | // is worth the price. total_shards_per_node will help to make sure that each shard |
| 119 | // has as few neighbors as possible. |
| 120 | $this->outputIndented( "Preparing index settings for reindex\n" ); |
| 121 | $this->setConnectionTimeout(); |
| 122 | $settings = $this->index->getSettings(); |
| 123 | $oldSettings = $settings->get(); |
| 124 | if ( !is_array( $oldSettings ) ) { |
| 125 | throw new \RuntimeException( 'Invalid response from index settings' ); |
| 126 | } |
| 127 | $settings->set( [ |
| 128 | 'refresh_interval' => -1, |
| 129 | 'routing.allocation.total_shards_per_node' => |
| 130 | $this->decideMaxShardsPerNodeForReindex( $oldSettings ), |
| 131 | // It's probably inefficient to let the index be created with replicas, |
| 132 | // then drop the empty replicas a few moments later. Doing it like this |
| 133 | // allows reindexing and index creation to operate independantly without |
| 134 | // needing to know about each other. |
| 135 | 'auto_expand_replicas' => 'false', |
| 136 | 'number_of_replicas' => 0, |
| 137 | ] ); |
| 138 | $this->waitForGreen(); |
| 139 | |
| 140 | $request = new ReindexRequest( $this->oldIndex, $this->index, $chunkSize ); |
| 141 | if ( $slices === null ) { |
| 142 | $request->setSlices( $this->estimateSlices( $this->oldIndex ) ); |
| 143 | } else { |
| 144 | $request->setSlices( $slices ); |
| 145 | } |
| 146 | $remote = self::makeRemoteReindexInfo( $this->oldConnection, $this->connection ); |
| 147 | if ( $remote !== null ) { |
| 148 | $request->setRemoteInfo( $remote ); |
| 149 | } |
| 150 | |
| 151 | $script = $this->makeUpdateFieldsScript(); |
| 152 | if ( $script !== null ) { |
| 153 | $request->setScript( $script ); |
| 154 | } |
| 155 | |
| 156 | try { |
| 157 | $task = $request->reindexTask(); |
| 158 | } catch ( \Exception $e ) { |
| 159 | $this->fatalError( $e->getMessage() ); |
| 160 | } |
| 161 | |
| 162 | $this->outputIndented( "Started reindex task: " . $task->getId() . "\n" ); |
| 163 | $response = $this->monitorReindexTask( $task, $this->index ); |
| 164 | $task->delete(); |
| 165 | if ( !$response->isSuccessful() ) { |
| 166 | $this->fatalError( |
| 167 | "Reindex task was not successful: " . $response->getUnsuccessfulReason() |
| 168 | ); |
| 169 | } |
| 170 | |
| 171 | $this->outputIndented( "Verifying counts..." ); |
| 172 | // We can't verify counts are exactly equal because they won't be - we still push updates |
| 173 | // into the old index while reindexing the new one. |
| 174 | $this->waitForCounts( $acceptableCountDeviation ); |
| 175 | $this->output( "done\n" ); |
| 176 | |
| 177 | // Revert settings changed just for reindexing. Although we set number_of_replicas above |
| 178 | // we do not reset it's value here, rather allowing auto_expand_replicas to pick an |
| 179 | // appropriate value. |
| 180 | $newSettings = [ |
| 181 | 'refresh_interval' => $oldSettings['refresh_interval'], |
| 182 | 'auto_expand_replicas' => $oldSettings['auto_expand_replicas'], |
| 183 | 'routing.allocation.total_shards_per_node' => |
| 184 | $oldSettings['routing']['allocation']['total_shards_per_node'] ?? -1, |
| 185 | ]; |
| 186 | $settings->set( $newSettings ); |
| 187 | } |
| 188 | |
| 189 | private function waitForCounts( float $acceptableCountDeviation ) { |
| 190 | $oldCount = (float)$this->safeCount( $this->oldIndex ); |
| 191 | $this->safeRefresh( $this->index ); |
| 192 | // While elasticsearch should be ready immediately after a refresh, we have seen this return |
| 193 | // exceptionally low values in 2% of reindex attempts. Wait around a bit and hope the refresh |
| 194 | // becomes available |
| 195 | $start = microtime( true ); |
| 196 | $timeoutAfter = $start + self::MAX_WAIT_FOR_COUNT_SEC; |
| 197 | while ( true ) { |
| 198 | $newCount = (float)$this->safeCount( $this->index ); |
| 199 | $difference = $oldCount > 0 ? abs( $oldCount - $newCount ) / $oldCount : 0; |
| 200 | if ( $difference <= $acceptableCountDeviation ) { |
| 201 | break; |
| 202 | } |
| 203 | $this->output( |
| 204 | "Not close enough! old=$oldCount new=$newCount difference=$difference\n" |
| 205 | ); |
| 206 | if ( microtime( true ) > $timeoutAfter ) { |
| 207 | $this->fatalError( 'Failed to load index - counts not close enough. ' . |
| 208 | "old=$oldCount new=$newCount difference=$difference. " . |
| 209 | 'Check for warnings above.' ); |
| 210 | } else { |
| 211 | $this->output( "Waiting to re-check counts..." ); |
| 212 | sleep( 30 ); |
| 213 | } |
| 214 | } |
| 215 | } |
| 216 | |
| 217 | public function waitForGreen() { |
| 218 | $this->outputIndented( "Waiting for index green status..." ); |
| 219 | $each = 0; |
| 220 | $status = $this->getHealth(); |
| 221 | while ( $status['status'] !== 'green' ) { |
| 222 | if ( $each === 0 ) { |
| 223 | $this->output( '.' ); |
| 224 | } |
| 225 | $each = ( $each + 1 ) % 20; |
| 226 | sleep( 1 ); |
| 227 | $status = $this->getHealth(); |
| 228 | } |
| 229 | $this->output( "done\n" ); |
| 230 | } |
| 231 | |
| 232 | /** |
| 233 | * Get health information about the index |
| 234 | * |
| 235 | * @return array Response data array |
| 236 | */ |
| 237 | private function getHealth() { |
| 238 | $indexName = $this->index->getName(); |
| 239 | $path = "_cluster/health/$indexName"; |
| 240 | while ( true ) { |
| 241 | $response = $this->index->getClient()->request( $path ); |
| 242 | if ( $response->hasError() ) { |
| 243 | $this->error( 'Error fetching index health but going to retry. Message: ' . |
| 244 | $response->getError() ); |
| 245 | sleep( 1 ); |
| 246 | continue; |
| 247 | } |
| 248 | return $response->getData(); |
| 249 | } |
| 250 | } |
| 251 | |
| 252 | /** |
| 253 | * Decide shards per node during reindex operation |
| 254 | * |
| 255 | * While reindexing we run with no replicas, meaning the default |
| 256 | * configuration for max shards per node might allow things to |
| 257 | * become very unbalanced. Choose a value that spreads the |
| 258 | * indexing load across as many instances as possible. |
| 259 | * |
| 260 | * @param array $settings Configured live index settings |
| 261 | * @return int |
| 262 | */ |
| 263 | private function decideMaxShardsPerNodeForReindex( array $settings ): int { |
| 264 | $numberOfNodes = $this->getHealth()[ 'number_of_nodes' ]; |
| 265 | $numberOfShards = $settings['number_of_shards']; |
| 266 | return (int)ceil( $numberOfShards / $numberOfNodes ); |
| 267 | } |
| 268 | |
| 269 | /** |
| 270 | * Set the maintenance timeout to the connection we will issue the reindex request |
| 271 | * to, so that it does not timeout while the reindex is running. |
| 272 | */ |
| 273 | private function setConnectionTimeout() { |
| 274 | $timeout = $this->searchConfig->get( 'CirrusSearchMaintenanceTimeout' ); |
| 275 | $this->connection->setTimeout( $timeout ); |
| 276 | } |
| 277 | |
| 278 | /** |
| 279 | * Destroy client connections |
| 280 | */ |
| 281 | private function destroyClients() { |
| 282 | $this->connection->destroyClient(); |
| 283 | $this->oldConnection->destroyClient(); |
| 284 | // Destroying connections resets timeouts, so we have to reinstate them |
| 285 | $this->setConnectionTimeout(); |
| 286 | } |
| 287 | |
| 288 | /** |
| 289 | * @param string $message |
| 290 | * @param string|null $channel |
| 291 | */ |
| 292 | protected function output( $message, $channel = null ) { |
| 293 | if ( $this->out ) { |
| 294 | $this->out->output( $message, $channel ); |
| 295 | } |
| 296 | } |
| 297 | |
| 298 | /** |
| 299 | * @param string $message |
| 300 | * @param string $prefix By default prefixes tab to fake an |
| 301 | * additional indentation level. |
| 302 | */ |
| 303 | private function outputIndented( $message, $prefix = "\t" ) { |
| 304 | if ( $this->out ) { |
| 305 | $this->out->outputIndented( $prefix . $message ); |
| 306 | } |
| 307 | } |
| 308 | |
| 309 | /** |
| 310 | * @param string $message |
| 311 | */ |
| 312 | private function error( $message ) { |
| 313 | if ( $this->out ) { |
| 314 | $this->out->error( $message ); |
| 315 | } |
| 316 | } |
| 317 | |
| 318 | /** |
| 319 | * @param string $message |
| 320 | * @param int $exitCode |
| 321 | * @return never |
| 322 | */ |
| 323 | private function fatalError( $message, $exitCode = 1 ) { |
| 324 | $this->error( $message ); |
| 325 | exit( $exitCode ); |
| 326 | } |
| 327 | |
| 328 | /** |
| 329 | * @return array|null Returns an array suitable for use as |
| 330 | * the _reindex api script parameter to delete fields from |
| 331 | * the copied documents, or null if no script is needed. |
| 332 | */ |
| 333 | private function makeUpdateFieldsScript() { |
| 334 | $script = [ |
| 335 | 'source' => '', |
| 336 | 'lang' => 'painless', |
| 337 | ]; |
| 338 | foreach ( $this->fieldsToDelete as $field ) { |
| 339 | $field = trim( $field ); |
| 340 | if ( strlen( $field ) ) { |
| 341 | $script['source'] .= "ctx._source.remove('$field');"; |
| 342 | } |
| 343 | } |
| 344 | $script['source'] .= $this->makeWeightedTagsPrefixReplaceScript(); |
| 345 | $script['source'] .= $this->pruneWeightedTagsDeleteMarkersScript(); |
| 346 | $script['source'] .= $this->migrateArticleTopicUnderscoresScript(); |
| 347 | // Populate the page_id if it's the first time we add the page_id field to the mapping |
| 348 | if ( !isset( $this->oldIndex->getMapping()['properties']['page_id'] ) |
| 349 | && isset( $this->index->getMapping()['properties']['page_id'] ) ) { |
| 350 | $this->outputIndented( "Populating the page_id field if not set\n" ); |
| 351 | $prefLen = strlen( $this->searchConfig->makeId( 1 ) ) - 1; |
| 352 | $script['source'] .= "if (ctx._source.page_id == null) {ctx._source.page_id = Long.parseLong(ctx._id.substring($prefLen));}"; |
| 353 | } |
| 354 | if ( $script['source'] === '' ) { |
| 355 | return null; |
| 356 | } |
| 357 | |
| 358 | return $script; |
| 359 | } |
| 360 | |
| 361 | private function makeWeightedTagsPrefixReplaceScript(): string { |
| 362 | if ( count( $this->weightedTagsPrefixMap ) === 0 ) { |
| 363 | return ''; |
| 364 | } |
| 365 | $scriptSource = "if (ctx._source.containsKey('weighted_tags')) {"; |
| 366 | foreach ( $this->weightedTagsPrefixMap as $oldPrefix => $newPrefix ) { |
| 367 | $scriptSource .= " |
| 368 | for (int i = 0; i < ctx._source.weighted_tags.length; i++) { |
| 369 | if (ctx._source.weighted_tags[i].startsWith('$oldPrefix/')) { |
| 370 | ctx._source.weighted_tags[i] = ctx._source.weighted_tags[i].replace('$oldPrefix/', '$newPrefix/'); |
| 371 | } |
| 372 | }"; |
| 373 | } |
| 374 | $scriptSource .= "}"; |
| 375 | return $scriptSource; |
| 376 | } |
| 377 | |
| 378 | private function pruneWeightedTagsDeleteMarkersScript(): string { |
| 379 | // There was at one point a bug that inserted these tags directly, instead of interpreting them |
| 380 | // as markers. Prune them back out. |
| 381 | return " |
| 382 | if (ctx._source.containsKey('weighted_tags') && ctx._source.weighted_tags instanceof List) { |
| 383 | ctx._source.weighted_tags.removeIf(item -> item != null && item.endsWith('/__DELETE_GROUPING__')); |
| 384 | }"; |
| 385 | } |
| 386 | |
| 387 | private function migrateArticleTopicUnderscoresScript(): string { |
| 388 | // At some point the data source for article topics changed from |
| 389 | // providing the values with spaces to providing them with underscores. |
| 390 | // Normalize the previously indexed data to match. |
| 391 | $prefix = ArticlePredictionKeyword::ARTICLE_TOPIC_TAG_PREFIX; |
| 392 | return " |
| 393 | if (ctx._source.containsKey('weighted_tags') && ctx._source.weighted_tags instanceof List) { |
| 394 | for (int i = 0; i < ctx._source.weighted_tags.length; i++) { |
| 395 | if (ctx._source.weighted_tags[i].startsWith('$prefix/')) { |
| 396 | ctx._source.weighted_tags[i] = ctx._source.weighted_tags[i].replace(' ', '_'); |
| 397 | } |
| 398 | } |
| 399 | }"; |
| 400 | } |
| 401 | |
| 402 | /** |
| 403 | * Creates an array suitable for use as the _reindex api source.remote |
| 404 | * parameter to read from $oldConnection. |
| 405 | * |
| 406 | * This is very fragile, but the transports don't expose enough to do more really |
| 407 | * |
| 408 | * @param Connection $source Connection to read data from |
| 409 | * @param Connection $dest Connection to reindex data into |
| 410 | * @return array|null |
| 411 | */ |
| 412 | public static function makeRemoteReindexInfo( Connection $source, Connection $dest ) { |
| 413 | if ( $source->getClusterName() === $dest->getClusterName() ) { |
| 414 | return null; |
| 415 | } |
| 416 | |
| 417 | $innerConnection = $source->getClient()->getConnection(); |
| 418 | $transport = $innerConnection->getTransportObject(); |
| 419 | if ( !$transport instanceof Http ) { |
| 420 | throw new \RuntimeException( |
| 421 | 'Remote reindex not implemented for transport: ' . get_class( $transport ) |
| 422 | ); |
| 423 | } |
| 424 | |
| 425 | // We make some pretty bold assumptions that classes extending from \Elastica\Transport\Http |
| 426 | // don't change how any of this works. |
| 427 | $url = $innerConnection->hasConfig( 'url' ) |
| 428 | ? $innerConnection->getConfig( 'url' ) |
| 429 | : ''; |
| 430 | if ( $url === '' ) { |
| 431 | $scheme = ( $transport instanceof Https ) |
| 432 | ? 'https' |
| 433 | : 'http'; |
| 434 | $url = $scheme . '://' . $innerConnection->getHost() . ':' . |
| 435 | $innerConnection->getPort() . '/' . $innerConnection->getPath(); |
| 436 | } |
| 437 | |
| 438 | if ( $innerConnection->getUsername() && $innerConnection->getPassword() ) { |
| 439 | return [ |
| 440 | 'host' => $url, |
| 441 | 'username' => $innerConnection->getUsername(), |
| 442 | 'password' => $innerConnection->getPassword(), |
| 443 | ]; |
| 444 | } else { |
| 445 | return [ 'host' => $url ]; |
| 446 | } |
| 447 | } |
| 448 | |
| 449 | /** |
| 450 | * @param ReindexTask $task |
| 451 | * @param Index $target |
| 452 | * @return ReindexResponse |
| 453 | */ |
| 454 | private function monitorReindexTask( ReindexTask $task, Index $target ) { |
| 455 | $consecutiveErrors = 0; |
| 456 | $sleepSeconds = self::monitorSleepSeconds( 1, 2, self::MONITOR_SLEEP_SECONDS ); |
| 457 | $completionEstimateGen = self::estimateTimeRemaining(); |
| 458 | while ( !$task->isComplete() ) { |
| 459 | try { |
| 460 | $status = $task->getStatus(); |
| 461 | } catch ( \Exception $e ) { |
| 462 | if ( ++$consecutiveErrors > self::MAX_CONSECUTIVE_ERRORS ) { |
| 463 | $this->output( "\n" ); |
| 464 | $this->fatalError( |
| 465 | "$e\n\n" . |
| 466 | "Lost connection to elasticsearch cluster. The reindex task " |
| 467 | . "{$task->getId()} is still running.\nThe task should be manually " |
| 468 | . "canceled, and the index {$target->getName()}\n" |
| 469 | . "should be removed.\n" . |
| 470 | $e->getMessage() |
| 471 | ); |
| 472 | } |
| 473 | if ( $e instanceof HttpException ) { |
| 474 | // Allow through potentially intermittent network problems: |
| 475 | // * couldn't connect, |
| 476 | // * 28: timeout out |
| 477 | // * 52: connected, closed with no response |
| 478 | if ( !in_array( $e->getError(), [ CURLE_COULDNT_CONNECT, 28, 52 ] ) ) { |
| 479 | // Wrap exception to include info about task id? |
| 480 | throw $e; |
| 481 | } |
| 482 | } |
| 483 | $this->outputIndented( "Error: {$e->getMessage()}\n" ); |
| 484 | usleep( 500000 ); |
| 485 | continue; |
| 486 | } |
| 487 | |
| 488 | $consecutiveErrors = 0; |
| 489 | |
| 490 | $estCompletion = $completionEstimateGen->send( |
| 491 | $status->getTotal() - $status->getCreated() ); |
| 492 | // What is worth reporting here? |
| 493 | $this->outputIndented( |
| 494 | "Task: {$task->getId()} " |
| 495 | . "Search Retries: {$status->getSearchRetries()} " |
| 496 | . "Bulk Retries: {$status->getBulkRetries()} " |
| 497 | . "Indexed: {$status->getCreated()} / {$status->getTotal()} " |
| 498 | . "Complete: $estCompletion\n" |
| 499 | ); |
| 500 | if ( !$status->isComplete() ) { |
| 501 | sleep( $sleepSeconds->current() ); |
| 502 | $sleepSeconds->next(); |
| 503 | } |
| 504 | } |
| 505 | |
| 506 | return $task->getResponse(); |
| 507 | } |
| 508 | |
| 509 | private static function monitorSleepSeconds( int $base, int $ratio, int $max ): \Generator { |
| 510 | $val = $base; |
| 511 | // @phan-suppress-next-line PhanInfiniteLoop https://github.com/phan/phan/issues/3545 |
| 512 | while ( true ) { |
| 513 | yield $val; |
| 514 | $val = min( $max, $val * $ratio ); |
| 515 | } |
| 516 | } |
| 517 | |
| 518 | /** |
| 519 | * Generator returning the estimated timestamp of completion. |
| 520 | * @return \Generator Must be provided the remaining count via Generator::send, replies |
| 521 | * with a unix timestamp estimating the completion time. |
| 522 | */ |
| 523 | private static function estimateTimeRemaining(): \Generator { |
| 524 | $estimatedStr = null; |
| 525 | $remain = null; |
| 526 | $prevRemain = null; |
| 527 | $now = microtime( true ); |
| 528 | while ( true ) { |
| 529 | $start = $now; |
| 530 | $prevRemain = $remain; |
| 531 | $remain = yield $estimatedStr; |
| 532 | $now = microtime( true ); |
| 533 | if ( $remain === null || $prevRemain === null ) { |
| 534 | continue; |
| 535 | } |
| 536 | # Very simple calc, no smoothing and will vary wildly. Could be |
| 537 | # improved if deemed useful. |
| 538 | $elapsed = $now - $start; |
| 539 | $rate = ( $prevRemain - $remain ) / $elapsed; |
| 540 | if ( $rate > 0 ) { |
| 541 | $estimatedCompletion = $now + ( $remain / $rate ); |
| 542 | $estimatedStr = MWTimestamp::convert( TS_RFC2822, $estimatedCompletion ); |
| 543 | } |
| 544 | } |
| 545 | } |
| 546 | |
| 547 | /** |
| 548 | * Auto detect the number of slices to use when reindexing. |
| 549 | * |
| 550 | * Note that elasticseach 7.x added an 'auto' setting, but we are on |
| 551 | * 6.x. That setting uses one slice per shard, up to a certain limit (20 in |
| 552 | * 7.9). This implementation provides the same limits, and adds an additional |
| 553 | * constraint that the auto-detected value must be <= the number of nodes. |
| 554 | * |
| 555 | * @param Index $index The index the estimate a slice count for |
| 556 | * @return int The number of slices to reindex with |
| 557 | */ |
| 558 | private function estimateSlices( Index $index ): int { |
| 559 | return min( |
| 560 | $this->getNumberOfNodes( $index->getClient() ), |
| 561 | $this->getNumberOfShards( $index ), |
| 562 | self::AUTO_SLICE_CEILING |
| 563 | ); |
| 564 | } |
| 565 | |
| 566 | private function getNumberOfNodes( Client $client ): int { |
| 567 | $endpoint = ( new \Elasticsearch\Endpoints\Cat\Nodes() ) |
| 568 | ->setParams( [ 'format' => 'json' ] ); |
| 569 | return count( $client->requestEndpoint( $endpoint )->getData() ); |
| 570 | } |
| 571 | |
| 572 | private function getNumberOfShards( Index $index ): int { |
| 573 | $response = $index->request( '_settings/index.number_of_shards', Request::GET ); |
| 574 | $data = $response->getData(); |
| 575 | // Can't use $index->getName() because that is probably an alias |
| 576 | $realIndexName = array_keys( $data )[0]; |
| 577 | // In theory this should never happen, we will get a ResponseException if the index doesn't |
| 578 | // exist and every index must have a number_of_shards settings. But better safe than sorry. |
| 579 | if ( !isset( $data[$realIndexName]['settings']['index']['number_of_shards'] ) ) { |
| 580 | throw new \RuntimeException( |
| 581 | "Couldn't detect number of shards in {$index->getName()}" |
| 582 | ); |
| 583 | } |
| 584 | return (int)$data[$realIndexName]['settings']['index']['number_of_shards']; |
| 585 | } |
| 586 | |
| 587 | private function safeCount( Index $index, int $attempts = 3 ): int { |
| 588 | return ConfigUtils::safeCountOrFail( |
| 589 | $index, |
| 590 | function ( StatusValue $error ): never { |
| 591 | $this->fatalError( $error ); |
| 592 | }, |
| 593 | $attempts |
| 594 | ); |
| 595 | } |
| 596 | |
| 597 | private function safeRefresh( Index $index, int $attempts = 3 ): void { |
| 598 | ConfigUtils::safeRefreshOrFail( |
| 599 | $index, |
| 600 | function ( StatusValue $error ): never { |
| 601 | $this->fatalError( $error ); |
| 602 | }, |
| 603 | $attempts |
| 604 | ); |
| 605 | } |
| 606 | } |