Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
| Total | |
79.52% |
198 / 249 |
|
46.15% |
12 / 26 |
CRAP | |
0.00% |
0 / 1 |
| CompletionSuggesterIndexer | |
79.52% |
198 / 249 |
|
46.15% |
12 / 26 |
107.73 | |
0.00% |
0 / 1 |
| __construct | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
1 | |||
| prepare | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
2 | |||
| addDocument | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
3 | |||
| flushSuggestDocs | |
77.78% |
21 / 27 |
|
0.00% |
0 / 1 |
8.70 | |||
| formatIndexingStats | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
2 | |||
| finish | |
80.00% |
4 / 5 |
|
0.00% |
0 / 1 |
3.07 | |||
| finishAndPromote | |
52.94% |
9 / 17 |
|
0.00% |
0 / 1 |
12.11 | |||
| finishRecycle | |
95.65% |
44 / 46 |
|
0.00% |
0 / 1 |
7 | |||
| createIndex | |
68.97% |
20 / 29 |
|
0.00% |
0 / 1 |
4.48 | |||
| expungeDeletes | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
1 | |||
| log | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
| optimize | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
1 | |||
| validateAlias | |
100.00% |
11 / 11 |
|
100.00% |
1 / 1 |
2 | |||
| deleteOldIndex | |
100.00% |
9 / 9 |
|
100.00% |
1 / 1 |
3 | |||
| getIndexAliasName | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
1 | |||
| getClient | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
| outputIndented | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
| enableReplicas | |
93.75% |
15 / 16 |
|
0.00% |
0 / 1 |
2.00 | |||
| waitForGreen | |
50.00% |
2 / 4 |
|
0.00% |
0 / 1 |
2.50 | |||
| refreshAndWaitForCount | |
64.00% |
16 / 25 |
|
0.00% |
0 / 1 |
16.65 | |||
| safeCount | |
85.71% |
6 / 7 |
|
0.00% |
0 / 1 |
1.00 | |||
| safeRefresh | |
85.71% |
6 / 7 |
|
0.00% |
0 / 1 |
1.00 | |||
| getBatchId | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| getTargetIndex | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| getRequiredFields | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| getIndexingStats | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
| 1 | <?php |
| 2 | |
| 3 | namespace CirrusSearch\Maintenance; |
| 4 | |
| 5 | use CirrusSearch\BuildDocument\Completion\SuggestBuilder; |
| 6 | use CirrusSearch\Connection; |
| 7 | use CirrusSearch\Elastica\SearchAfter; |
| 8 | use DateTime; |
| 9 | use Elastica\Client; |
| 10 | use Elastica\Document; |
| 11 | use Elastica\Index; |
| 12 | use Elastica\Query; |
| 13 | use Elastica\Query\BoolQuery; |
| 14 | use Elastica\Query\Term; |
| 15 | use Elastica\Request; |
| 16 | use Elastica\Search; |
| 17 | use Elastica\Status; |
| 18 | use MediaWiki\Extension\Elastica\MWElasticUtils; |
| 19 | use RuntimeException; |
| 20 | use StatusValue; |
| 21 | |
| 22 | /** |
| 23 | * CompletionSuggesterIndexer is responsible for populating a completion suggester using |
| 24 | * source data pulled by the UpdaterSuggesterIndex maint script. |
| 25 | * The process is as follow: |
| 26 | * - construct the CompletionSuggesterIndexer based on the state of the cluster, settings and |
| 27 | * script options |
| 28 | * - call {@see CompletionSuggesterIndexer::prepare()} |
| 29 | * - loop over the source documents and call {@see CompletionSuggesterIndexer::addDocument()} |
| 30 | * - flush any remaining buffered doc via {@see CompletionSuggesterIndexer::flushSuggestDocs()} |
| 31 | * - finally call {@see CompletionSuggesterIndexer::finish()} |
| 32 | * |
| 33 | * To promote and/or cleanup the target index. |
| 34 | */ |
| 35 | class CompletionSuggesterIndexer { |
| 36 | use ProgressPrinter; |
| 37 | |
| 38 | /** |
| 39 | * @var int perform extra checks prior to validating the new index if the number of docs is above this threshold. |
| 40 | * For context these extra checks are added in an attempt to understand what's causing T363521. |
| 41 | */ |
| 42 | private const EXTRA_CHECK_THRESHOLD = 100000; |
| 43 | |
| 44 | /** |
| 45 | * Max deviation between the number of suggestion we build and the count on the resulting index. |
| 46 | */ |
| 47 | private const BUILT_VS_INDEXED_DOCS_MAX_DEVIATION = 0.01; |
| 48 | |
| 49 | /** |
| 50 | * Max deviation between the new and old index. |
| 51 | */ |
| 52 | private const PREVIOUS_VS_NEXT_COUNT_MAX_DEVIATION = 0.1; |
| 53 | |
| 54 | private Connection $connection; |
| 55 | private Index $index; |
| 56 | private ?Index $oldIndex; |
| 57 | private SuggestBuilder $suggestBuilder; |
| 58 | private Printer $output; |
| 59 | private ConfigUtils $utils; |
| 60 | private SuggesterAnalysisConfigBuilder $analysisConfigBuilder; |
| 61 | private CompletionSuggesterIndexerConfig $indexerConfig; |
| 62 | |
| 63 | /** |
| 64 | * @var Document[] |
| 65 | */ |
| 66 | private array $batch = []; |
| 67 | |
| 68 | /** |
| 69 | * @var array |
| 70 | */ |
| 71 | private array $indexingStats = [ |
| 72 | 'doc_built' => 0, |
| 73 | 'doc_indexed' => 0, |
| 74 | 'bulk_requests' => 0, |
| 75 | 'retried_bulk_requests' => 0, |
| 76 | 'doc_sent' => 0, |
| 77 | 'index_results' => [ |
| 78 | 'created' => 0, |
| 79 | 'updated' => 0, |
| 80 | 'noop' => 0, |
| 81 | 'unknown' => 0, |
| 82 | 'error' => 0, |
| 83 | ], |
| 84 | ]; |
| 85 | |
| 86 | /** |
| 87 | * @param Connection $connection the connection to work on |
| 88 | * @param Index $index the target index |
| 89 | * @param Index|null $oldIndex the optional old index, must be null in case of recycle, must be the live index when rebuilding |
| 90 | * @param SuggestBuilder $suggestBuilder the SuggestBuilder to build suggest docs |
| 91 | * @param Printer $output the output to print message and errors |
| 92 | * @param ConfigUtils $utils the config utils |
| 93 | * @param SuggesterAnalysisConfigBuilder $analysisConfigBuilder the builder to create the analysis settings |
| 94 | * @param CompletionSuggesterIndexerConfig $indexerConfig the various settings used by this indexer |
| 95 | */ |
| 96 | public function __construct( |
| 97 | Connection $connection, |
| 98 | Index $index, ?Index $oldIndex, |
| 99 | SuggestBuilder $suggestBuilder, |
| 100 | Printer $output, |
| 101 | ConfigUtils $utils, |
| 102 | SuggesterAnalysisConfigBuilder $analysisConfigBuilder, |
| 103 | CompletionSuggesterIndexerConfig $indexerConfig |
| 104 | ) { |
| 105 | $this->connection = $connection; |
| 106 | $this->index = $index; |
| 107 | $this->oldIndex = $oldIndex; |
| 108 | $this->suggestBuilder = $suggestBuilder; |
| 109 | $this->output = $output; |
| 110 | $this->utils = $utils; |
| 111 | $this->analysisConfigBuilder = $analysisConfigBuilder; |
| 112 | $this->indexerConfig = $indexerConfig; |
| 113 | } |
| 114 | |
| 115 | public function prepare(): void { |
| 116 | if ( !$this->indexerConfig->isRecycle() ) { |
| 117 | $this->createIndex(); |
| 118 | } |
| 119 | } |
| 120 | |
| 121 | public function addDocument( array $inputDocs ): void { |
| 122 | $totalSuggestDocsIndexed = 0; |
| 123 | foreach ( $this->suggestBuilder->build( $inputDocs ) as $doc ) { |
| 124 | $this->batch[] = $doc; |
| 125 | $totalSuggestDocsIndexed++; |
| 126 | if ( count( $this->batch ) >= $this->indexerConfig->getIndexChunkSize() ) { |
| 127 | $this->flushSuggestDocs(); |
| 128 | } |
| 129 | } |
| 130 | $this->indexingStats['doc_built'] += $totalSuggestDocsIndexed; |
| 131 | } |
| 132 | |
| 133 | public function flushSuggestDocs(): void { |
| 134 | if ( $this->batch === [] ) { |
| 135 | return; |
| 136 | } |
| 137 | $attemptedAtLeastOnce = false; |
| 138 | MWElasticUtils::withRetry( $this->indexerConfig->getIndexRetryAttempts(), |
| 139 | function () use ( &$attemptedAtLeastOnce ) { |
| 140 | $this->indexingStats['bulk_requests']++; |
| 141 | $this->indexingStats['doc_sent'] += count( $this->batch ); |
| 142 | if ( $attemptedAtLeastOnce ) { |
| 143 | $this->indexingStats['retried_bulk_requests']++; |
| 144 | } |
| 145 | $attemptedAtLeastOnce = true; |
| 146 | $response = $this->index->addDocuments( $this->batch ); |
| 147 | $allowedOps = [ 'created', 'updated', 'noop' ]; |
| 148 | foreach ( $response->getBulkResponses() as $r ) { |
| 149 | if ( $r->hasError() ) { |
| 150 | if ( $this->indexingStats['index_results']['error'] < 10000 ) { |
| 151 | // do not spam the logs unnecessarily if we encountered 10000 errors. |
| 152 | // Hopefully 10000 is already enough to understand what's happening. |
| 153 | $this->output->error( "Failed to index doc {$r->getData()["_id"]} with {$r->getError()}" ); |
| 154 | } |
| 155 | $this->indexingStats['index_results']['error']++; |
| 156 | continue; |
| 157 | } |
| 158 | $opRes = 'unknown'; |
| 159 | if ( isset( $r->getData()["result"] ) ) { |
| 160 | $res = $r->getData()["result"]; |
| 161 | if ( in_array( $res, $allowedOps ) ) { |
| 162 | $opRes = $res; |
| 163 | } |
| 164 | $this->indexingStats['index_results'][$opRes]++; |
| 165 | } |
| 166 | } |
| 167 | } |
| 168 | ); |
| 169 | $this->batch = []; |
| 170 | } |
| 171 | |
| 172 | public function formatIndexingStats(): string { |
| 173 | return "Indexing stats for {$this->index->getName()}: bulk requests " . |
| 174 | "{$this->indexingStats["bulk_requests"]} (retried {$this->indexingStats["retried_bulk_requests"]}), " . |
| 175 | "{$this->indexingStats["doc_sent"]}/" . |
| 176 | "{$this->indexingStats["index_results"]["created"]}/" . |
| 177 | "{$this->indexingStats["index_results"]["updated"]}/" . |
| 178 | "{$this->indexingStats["index_results"]["noop"]}/" . |
| 179 | "{$this->indexingStats["index_results"]["error"]} " . |
| 180 | "(sent/created/updated/noop/error)\n"; |
| 181 | } |
| 182 | |
| 183 | /** |
| 184 | * @throws IndexPromotionException |
| 185 | */ |
| 186 | public function finish(): void { |
| 187 | if ( $this->batch !== [] ) { |
| 188 | throw new \LogicException( "{$this->index->getName()}: write buffer not empty." ); |
| 189 | } |
| 190 | if ( $this->indexerConfig->isRecycle() ) { |
| 191 | $this->finishRecycle(); |
| 192 | } else { |
| 193 | $this->finishAndPromote(); |
| 194 | } |
| 195 | } |
| 196 | |
| 197 | /** |
| 198 | * @throws IndexPromotionException |
| 199 | */ |
| 200 | private function finishAndPromote(): void { |
| 201 | if ( $this->indexerConfig->isOptimizeIndex() ) { |
| 202 | $this->optimize(); |
| 203 | } |
| 204 | $docsInIndex = $this->refreshAndWaitForCount(); |
| 205 | $totalSuggestDocs = $this->indexingStats['doc_built']; |
| 206 | if ( $docsInIndex != $totalSuggestDocs && $totalSuggestDocs > 0 ) { |
| 207 | $this->output->error( "{$this->index->getName()}: prepared and indexed $totalSuggestDocs docs but the index has $docsInIndex" ); |
| 208 | $errorRatio = ( $totalSuggestDocs - $docsInIndex ) / $totalSuggestDocs; |
| 209 | if ( |
| 210 | !$this->indexerConfig->isForce() && |
| 211 | $totalSuggestDocs > self::EXTRA_CHECK_THRESHOLD && |
| 212 | $errorRatio > self::BUILT_VS_INDEXED_DOCS_MAX_DEVIATION |
| 213 | ) { |
| 214 | throw new IndexPromotionException( "New index {$this->index->getName()}: " . |
| 215 | "deviation between docs built vs indexed docs is above " . |
| 216 | self::BUILT_VS_INDEXED_DOCS_MAX_DEVIATION ); |
| 217 | } |
| 218 | } |
| 219 | $this->enableReplicas(); |
| 220 | $this->validateAlias(); |
| 221 | $this->deleteOldIndex(); |
| 222 | $this->log( "Done.\n" ); |
| 223 | } |
| 224 | |
| 225 | private function finishRecycle(): void { |
| 226 | // This is fragile... hopefully most of the docs will be deleted from the old segments |
| 227 | // and will result in a fast operation. |
| 228 | // New segments should not be affected. |
| 229 | // Unfortunately if a failure causes the process to stop |
| 230 | // the FST will maybe contains duplicates as it cannot (elastic 1.7) |
| 231 | // filter deleted docs. We will rely on output deduplication |
| 232 | // but this will certainly affect performances. |
| 233 | |
| 234 | $this->expungeDeletes(); |
| 235 | // Refresh the reader so we can scroll over remaining docs. |
| 236 | // At this point we may read the new un-optimized FST segments |
| 237 | // Old ones should be pretty small after expungeDeletes |
| 238 | $this->safeRefresh( $this->index ); |
| 239 | |
| 240 | $bool = new BoolQuery(); |
| 241 | $bool->addMustNot( |
| 242 | new Term( [ "batch_id" => $this->suggestBuilder->getBatchId() ] ) |
| 243 | ); |
| 244 | |
| 245 | $query = new Query(); |
| 246 | $query->setQuery( $bool ); |
| 247 | $query->setSize( $this->indexerConfig->getIndexChunkSize() ); |
| 248 | $query->setSource( false ); |
| 249 | $query->setSort( [ |
| 250 | [ '_id' => 'asc' ], |
| 251 | ] ); |
| 252 | // Explicitly ask for accurate total_hits even-though we use a scroll request |
| 253 | $query->setTrackTotalHits( true ); |
| 254 | $search = new Search( $this->getClient() ); |
| 255 | $search->setQuery( $query ); |
| 256 | $search->addIndex( $this->index ); |
| 257 | $searchAfter = new SearchAfter( $search ); |
| 258 | |
| 259 | $totalDocsToDump = -1; |
| 260 | $docsDumped = 0; |
| 261 | |
| 262 | $this->log( "Deleting remaining docs from previous batch\n" ); |
| 263 | foreach ( $searchAfter as $results ) { |
| 264 | if ( $totalDocsToDump === -1 ) { |
| 265 | $totalDocsToDump = $results->getTotalHits(); |
| 266 | if ( $totalDocsToDump === 0 ) { |
| 267 | break; |
| 268 | } |
| 269 | $docsDumped = 0; |
| 270 | } |
| 271 | $docIds = []; |
| 272 | foreach ( $results as $result ) { |
| 273 | $docsDumped++; |
| 274 | $docIds[] = $result->getId(); |
| 275 | } |
| 276 | $this->outputProgress( $docsDumped, $totalDocsToDump ); |
| 277 | if ( !$docIds ) { |
| 278 | continue; |
| 279 | } |
| 280 | |
| 281 | MWElasticUtils::withRetry( $this->indexerConfig->getIndexRetryAttempts(), |
| 282 | function () use ( $docIds ) { |
| 283 | $this->index->deleteByQuery( new Query\Ids( $docIds ) ); |
| 284 | } |
| 285 | ); |
| 286 | } |
| 287 | $this->log( "Done.\n" ); |
| 288 | // Old docs should be deleted now we can optimize and flush |
| 289 | $this->optimize(); |
| 290 | |
| 291 | // @todo add support for changing the number of replicas |
| 292 | // if the setting was changed in cirrus config. |
| 293 | // Workaround is to change the settings directly on the cluster. |
| 294 | |
| 295 | // Refresh the reader so it now uses the optimized FST, |
| 296 | // and actually free and delete old segments. |
| 297 | $this->safeRefresh( $this->index ); |
| 298 | $docsInIndex = $this->safeCount( $this->index ); |
| 299 | if ( $docsInIndex != $this->indexingStats['doc_built'] ) { |
| 300 | $this->output->error( "{$this->index->getName()}: Prepared and indexed " . |
| 301 | "{$this->indexingStats['doc_built']} docs but the index has $docsInIndex" ); |
| 302 | } |
| 303 | } |
| 304 | |
| 305 | private function createIndex(): void { |
| 306 | // This is "create only" for now. |
| 307 | if ( $this->index->exists() ) { |
| 308 | throw new RuntimeException( "Index {$this->index->getName()} already exists." ); |
| 309 | } |
| 310 | |
| 311 | $mappingConfigBuilder = new SuggesterMappingConfigBuilder( $this->connection->getConfig() ); |
| 312 | |
| 313 | // We create the index with 0 replicas, this is faster and will |
| 314 | // stress less nodes with 4 shards and 2 replicas we would |
| 315 | // stress 12 nodes (moreover with the optimize flag) |
| 316 | $settings = [ |
| 317 | 'number_of_shards' => $this->indexerConfig->getShardCount(), |
| 318 | // hacky but we still use auto_expand_replicas |
| 319 | // for convenience on small install. |
| 320 | 'auto_expand_replicas' => "0-0", |
| 321 | 'refresh_interval' => -1, |
| 322 | 'analysis' => $this->analysisConfigBuilder->buildConfig(), |
| 323 | 'routing.allocation.total_shards_per_node' => $this->indexerConfig->getMaxShardPerNode(), |
| 324 | ]; |
| 325 | |
| 326 | if ( $this->indexerConfig->getAllocationIncludeTag() !== null ) { |
| 327 | $this->output->output( "Using routing.allocation.include.tag: " . |
| 328 | "{$this->indexerConfig->getAllocationIncludeTag()}, the index might be stuck in red " . |
| 329 | "if the cluster is not properly configured.\n" ); |
| 330 | $settings['routing.allocation.include.tag'] = $this->indexerConfig->getAllocationIncludeTag(); |
| 331 | } |
| 332 | |
| 333 | if ( $this->indexerConfig->getAllocationExcludeTag() ) { |
| 334 | $this->output->output( "Using routing.allocation.exclude.tag: " . |
| 335 | "{$this->indexerConfig->getAllocationExcludeTag()}, the index might be stuck in red " . |
| 336 | "if the cluster is not properly configured.\n" ); |
| 337 | $settings['routing.allocation.exclude.tag'] = $this->indexerConfig->getAllocationExcludeTag(); |
| 338 | } |
| 339 | |
| 340 | $args = [ |
| 341 | 'settings' => [ 'index' => $settings ], |
| 342 | 'mappings' => $mappingConfigBuilder->buildConfig(), |
| 343 | ]; |
| 344 | $this->index->create( |
| 345 | $args, |
| 346 | [ 'master_timeout' => $this->indexerConfig->getMasterTimeout() ] |
| 347 | ); |
| 348 | |
| 349 | // Index create is async, we have to make sure that the index is ready |
| 350 | // before sending any docs to it. |
| 351 | $this->waitForGreen( $this->indexerConfig->getReplicationTimeout() ); |
| 352 | } |
| 353 | |
| 354 | private function expungeDeletes(): void { |
| 355 | $this->log( "Purging deleted docs on {$this->index->getName()}..." ); |
| 356 | $this->index->forcemerge( [ 'only_expunge_deletes' => 'true', 'flush' => 'false' ] ); |
| 357 | $this->output->output( "ok.\n" ); |
| 358 | } |
| 359 | |
| 360 | /** |
| 361 | * @param string $message |
| 362 | */ |
| 363 | public function log( string $message ): void { |
| 364 | $date = new DateTime(); |
| 365 | $this->output->output( "{$date->format( 'Y-m-d H:i:s' )} {$this->index->getName()} $message" ); |
| 366 | } |
| 367 | |
| 368 | private function optimize(): void { |
| 369 | $this->log( "Optimizing index {$this->index->getName()}..." ); |
| 370 | $this->index->forcemerge( [ 'max_num_segments' => 1 ] ); |
| 371 | $this->output->output( "ok.\n" ); |
| 372 | } |
| 373 | |
| 374 | public function validateAlias(): void { |
| 375 | // @todo utilize the following once Elastica is updated to support passing |
| 376 | // master_timeout. This is a copy of the Elastica\Index::addAlias() method |
| 377 | // $this->getIndex()->addAlias( $this->getIndexTypeName(), true ); |
| 378 | $index = $this->index; |
| 379 | $name = $this->getIndexAliasName(); |
| 380 | |
| 381 | $path = '_aliases'; |
| 382 | $data = [ 'actions' => [] ]; |
| 383 | $status = new Status( $index->getClient() ); |
| 384 | foreach ( $status->getIndicesWithAlias( $name ) as $aliased ) { |
| 385 | $data['actions'][] = |
| 386 | [ 'remove' => [ 'index' => $aliased->getName(), 'alias' => $name ] ]; |
| 387 | } |
| 388 | |
| 389 | $data['actions'][] = [ 'add' => [ 'index' => $index->getName(), 'alias' => $name ] ]; |
| 390 | |
| 391 | $index->getClient() |
| 392 | ->request( $path, Request::POST, $data, [ 'master_timeout' => $this->indexerConfig->getMasterTimeout() ] ); |
| 393 | } |
| 394 | |
| 395 | private function deleteOldIndex(): void { |
| 396 | if ( $this->oldIndex && $this->oldIndex->exists() ) { |
| 397 | $this->log( "Deleting " . $this->oldIndex->getName() . " ... " ); |
| 398 | // @todo Utilize $this->oldIndex->delete(...) once Elastica library is updated |
| 399 | // to allow passing the master_timeout |
| 400 | $this->oldIndex->getClient()->request( |
| 401 | $this->oldIndex->getName(), |
| 402 | Request::DELETE, |
| 403 | [], |
| 404 | [ 'master_timeout' => $this->indexerConfig->getMasterTimeout() ] |
| 405 | ); |
| 406 | $this->output->output( "ok.\n" ); |
| 407 | } |
| 408 | } |
| 409 | |
| 410 | /** |
| 411 | * @return string name of the index type being updated |
| 412 | */ |
| 413 | private function getIndexAliasName(): string { |
| 414 | return $this->connection->getIndexName( |
| 415 | $this->indexerConfig->getIndexBaseName(), |
| 416 | Connection::TITLE_SUGGEST_INDEX_SUFFIX, |
| 417 | false, |
| 418 | $this->indexerConfig->isAltIndex(), |
| 419 | $this->indexerConfig->getAltIndexId() |
| 420 | ); |
| 421 | } |
| 422 | |
| 423 | private function getClient(): Client { |
| 424 | return $this->connection->getClient(); |
| 425 | } |
| 426 | |
| 427 | /** |
| 428 | * @inheritDoc |
| 429 | */ |
| 430 | public function outputIndented( $message ): void { |
| 431 | $this->output->outputIndented( $message ); |
| 432 | } |
| 433 | |
| 434 | private function enableReplicas(): void { |
| 435 | $this->log( "Enabling replicas...\n" ); |
| 436 | $args = [ |
| 437 | 'index' => [ |
| 438 | 'auto_expand_replicas' => $this->indexerConfig->getReplicaCount(), |
| 439 | ], |
| 440 | ]; |
| 441 | |
| 442 | $path = $this->index->getName() . "/_settings"; |
| 443 | $this->index->getClient()->request( |
| 444 | $path, |
| 445 | Request::PUT, |
| 446 | $args, |
| 447 | [ 'master_timeout' => $this->indexerConfig->getMasterTimeout() ] |
| 448 | ); |
| 449 | |
| 450 | // The previous call seems to be async, let's wait few sec |
| 451 | // otherwise replication won't have time to start. |
| 452 | if ( !defined( 'MW_PHPUNIT_TEST' ) ) { |
| 453 | sleep( 20 ); |
| 454 | } |
| 455 | |
| 456 | // Index will be yellow while replica shards are being allocated. |
| 457 | $this->waitForGreen( $this->indexerConfig->getReplicationTimeout() ); |
| 458 | } |
| 459 | |
| 460 | private function waitForGreen( int $timeout = 600 ): void { |
| 461 | $this->log( "Waiting for the index to go green...\n" ); |
| 462 | // Wait for the index to go green ( default 10 min) |
| 463 | if ( !$this->utils->waitForGreen( $this->index->getName(), $timeout ) ) { |
| 464 | throw new RuntimeException( "Failed to wait for green... please check config and " . |
| 465 | "delete the {$this->index->getName()} index if it was created." ); |
| 466 | } |
| 467 | } |
| 468 | |
| 469 | /** |
| 470 | * @return int |
| 471 | * @throws IndexPromotionException |
| 472 | */ |
| 473 | private function refreshAndWaitForCount(): int { |
| 474 | $this->safeRefresh( $this->index ); |
| 475 | $start = microtime( true ); |
| 476 | $timeoutAfter = !defined( 'MW_PHPUNIT_TEST' ) ? $start + 120 : $start + 2; |
| 477 | if ( $this->oldIndex && $this->oldIndex->exists() ) { |
| 478 | $oldCount = $this->safeCount( $this->oldIndex ); |
| 479 | while ( true ) { |
| 480 | $docsInIndex = $this->safeCount( $this->index ); |
| 481 | $this->log( "Old index had $oldCount docs vs $docsInIndex now.\n" ); |
| 482 | if ( $oldCount === 0 ) { |
| 483 | return $docsInIndex; |
| 484 | } |
| 485 | $diffRatio = ( $oldCount - $docsInIndex ) / $oldCount; |
| 486 | // Check for relatively large (>EXTRA_CHECK_THRESHOLD docs) indices that the new index is not |
| 487 | // abnormally smaller than the new one. |
| 488 | // We check only "large" indices to avoid false positives on small indices... |
| 489 | if ( $oldCount > self::EXTRA_CHECK_THRESHOLD && |
| 490 | $diffRatio > self::PREVIOUS_VS_NEXT_COUNT_MAX_DEVIATION ) { |
| 491 | if ( microtime( true ) > $timeoutAfter ) { |
| 492 | if ( !$this->indexerConfig->isForce() ) { |
| 493 | throw new IndexPromotionException( "New index seems too small compared to the previous index " . |
| 494 | "$oldCount/$docsInIndex > " . |
| 495 | self::PREVIOUS_VS_NEXT_COUNT_MAX_DEVIATION . |
| 496 | " (old/new > threshold). Aborting. (Use --force to bypass)" ); |
| 497 | } |
| 498 | } else { |
| 499 | $this->log( "Waiting to re-check counts...\n" ); |
| 500 | if ( !defined( 'MW_PHPUNIT_TEST' ) ) { |
| 501 | sleep( 10 ); |
| 502 | } |
| 503 | } |
| 504 | } else { |
| 505 | return $docsInIndex; |
| 506 | } |
| 507 | } |
| 508 | } else { |
| 509 | $docsInIndex = $this->safeCount( $this->index ); |
| 510 | } |
| 511 | |
| 512 | return $docsInIndex; |
| 513 | } |
| 514 | |
| 515 | /** |
| 516 | * @param Index $index |
| 517 | * @param int $attempts |
| 518 | * @return int |
| 519 | * @throws RuntimeException |
| 520 | */ |
| 521 | private function safeCount( Index $index, int $attempts = 3 ): int { |
| 522 | return ConfigUtils::safeCountOrFail( |
| 523 | $index, |
| 524 | static function ( StatusValue $error ): never { |
| 525 | throw new RuntimeException( (string)$error ); |
| 526 | }, |
| 527 | $attempts |
| 528 | ); |
| 529 | } |
| 530 | |
| 531 | /** |
| 532 | * @param Index $index |
| 533 | * @param int $attempts |
| 534 | * @return void |
| 535 | * @throws RuntimeException |
| 536 | */ |
| 537 | private function safeRefresh( Index $index, int $attempts = 3 ): void { |
| 538 | ConfigUtils::safeRefreshOrFail( |
| 539 | $index, |
| 540 | static function ( StatusValue $error ): never { |
| 541 | throw new RuntimeException( (string)$error ); |
| 542 | }, |
| 543 | $attempts |
| 544 | ); |
| 545 | } |
| 546 | |
| 547 | /** |
| 548 | * @return int the batchId the indexer is using to mark its indexed docs |
| 549 | */ |
| 550 | public function getBatchId(): int { |
| 551 | return $this->suggestBuilder->getBatchId(); |
| 552 | } |
| 553 | |
| 554 | /** |
| 555 | * @return Index the index this indexer is writing to |
| 556 | */ |
| 557 | public function getTargetIndex(): Index { |
| 558 | return $this->index; |
| 559 | } |
| 560 | |
| 561 | /** |
| 562 | * @return string[] |
| 563 | */ |
| 564 | public function getRequiredFields(): array { |
| 565 | return $this->suggestBuilder->getRequiredFields(); |
| 566 | } |
| 567 | |
| 568 | /** |
| 569 | * Visible for testing. |
| 570 | * @return array |
| 571 | */ |
| 572 | public function getIndexingStats(): array { |
| 573 | return $this->indexingStats; |
| 574 | } |
| 575 | } |