Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 370 |
|
0.00% |
0 / 26 |
CRAP | |
0.00% |
0 / 1 |
UpdateSuggesterIndex | |
0.00% |
0 / 363 |
|
0.00% |
0 / 26 |
4830 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 32 |
|
0.00% |
0 / 1 |
2 | |||
execute | |
0.00% |
0 / 52 |
|
0.00% |
0 / 1 |
72 | |||
workAroundBrokenMessageCache | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
checkAndDeleteBrokenIndices | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
30 | |||
rebuild | |
0.00% |
0 / 19 |
|
0.00% |
0 / 1 |
6 | |||
canRecycle | |
0.00% |
0 / 42 |
|
0.00% |
0 / 1 |
110 | |||
recycle | |
0.00% |
0 / 45 |
|
0.00% |
0 / 1 |
42 | |||
deleteOldIndex | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 | |||
deleteIndex | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
2 | |||
optimize | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
expungeDeletes | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
indexData | |
0.00% |
0 / 48 |
|
0.00% |
0 / 1 |
56 | |||
validateAlias | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
6 | |||
outputProgress | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
20 | |||
log | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
pickAnalyzer | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
2 | |||
createIndex | |
0.00% |
0 / 31 |
|
0.00% |
0 / 1 |
20 | |||
enableReplicas | |
0.00% |
0 / 15 |
|
0.00% |
0 / 1 |
2 | |||
waitForGreen | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
getReplicaCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getShardCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getMaxShardsPerNode | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
updateVersions | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
getIndex | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
getClient | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getIndexAliasName | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | namespace CirrusSearch\Maintenance; |
4 | |
5 | use CirrusSearch\BuildDocument\Completion\SuggestBuilder; |
6 | use CirrusSearch\Connection; |
7 | use CirrusSearch\ElasticaErrorHandler; |
8 | use CirrusSearch\Maintenance\Validators\AnalyzersValidator; |
9 | use CirrusSearch\SearchConfig; |
10 | use Elastica; |
11 | use Elastica\Index; |
12 | use Elastica\Query; |
13 | use Elastica\Request; |
14 | use Elastica\Status; |
15 | use MediaWiki\Extension\Elastica\MWElasticUtils; |
16 | use RuntimeException; |
17 | |
18 | /** |
19 | * Update the search configuration on the search backend for the title |
20 | * suggest index. |
21 | * |
22 | * This program is free software; you can redistribute it and/or modify |
23 | * it under the terms of the GNU General Public License as published by |
24 | * the Free Software Foundation; either version 2 of the License, or |
25 | * (at your option) any later version. |
26 | * |
27 | * This program is distributed in the hope that it will be useful, |
28 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
29 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
30 | * GNU General Public License for more details. |
31 | * |
32 | * You should have received a copy of the GNU General Public License along |
33 | * with this program; if not, write to the Free Software Foundation, Inc., |
34 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
35 | * http://www.gnu.org/copyleft/gpl.html |
36 | */ |
37 | |
38 | $IP = getenv( 'MW_INSTALL_PATH' ); |
39 | if ( $IP === false ) { |
40 | $IP = __DIR__ . '/../../..'; |
41 | } |
42 | require_once "$IP/maintenance/Maintenance.php"; |
43 | require_once __DIR__ . '/../includes/Maintenance/Maintenance.php'; |
44 | |
45 | class UpdateSuggesterIndex extends Maintenance { |
46 | /** |
47 | * @var string language code we're building for |
48 | */ |
49 | private $langCode; |
50 | |
51 | /** |
52 | * @var int |
53 | */ |
54 | private $indexChunkSize; |
55 | |
56 | /** |
57 | * @var int |
58 | */ |
59 | private $indexRetryAttempts; |
60 | |
61 | /** |
62 | * @var string |
63 | */ |
64 | private $indexSuffix; |
65 | |
66 | /** |
67 | * @var string |
68 | */ |
69 | private $indexBaseName; |
70 | |
71 | /** |
72 | * @var string |
73 | */ |
74 | private $indexIdentifier; |
75 | |
76 | /** |
77 | * @var Index old suggester index that will be deleted at the end of the process |
78 | */ |
79 | private $oldIndex; |
80 | |
81 | /** |
82 | * @var int |
83 | */ |
84 | private $lastProgressPrinted; |
85 | |
86 | /** |
87 | * @var bool optimize the index when done. |
88 | */ |
89 | private $optimizeIndex; |
90 | |
91 | /** |
92 | * @var array list of available plugins |
93 | */ |
94 | private $availablePlugins; |
95 | |
96 | /** |
97 | * @var string |
98 | */ |
99 | private $masterTimeout; |
100 | |
101 | /** |
102 | * @var ConfigUtils |
103 | */ |
104 | private $utils; |
105 | |
106 | /** |
107 | * @todo: public (used in closure) |
108 | * @var SuggestBuilder |
109 | */ |
110 | public $builder; |
111 | |
112 | /** |
113 | * @var array |
114 | */ |
115 | private $analysisConfig; |
116 | |
117 | /** |
118 | * @var bool |
119 | */ |
120 | private $recycle = false; |
121 | |
122 | /** |
123 | * @var string[] |
124 | */ |
125 | private $bannedPlugins; |
126 | |
127 | public function __construct() { |
128 | parent::__construct(); |
129 | $this->addDescription( "Create a new suggester index. Always operates on a single cluster." ); |
130 | $this->addOption( 'baseName', 'What basename to use for all indexes, ' . |
131 | 'defaults to wiki id', false, true ); |
132 | $this->addOption( 'indexChunkSize', 'Documents per shard to index in a batch. ' . |
133 | 'Note when changing the number of shards that the old shard size is used, not the new ' . |
134 | 'one. If you see many errors submitting documents in bulk but the automatic retry as ' . |
135 | 'singles works then lower this number. Defaults to 500.', false, true ); |
136 | $this->addOption( 'indexRetryAttempts', 'Number of times to back off and retry ' . |
137 | 'per failure. Note that failures are not common but if Elasticsearch is in the process ' . |
138 | 'of moving a shard this can time out. This will retry the attempt after some backoff ' . |
139 | 'rather than failing the whole reindex process. Defaults to 5.', false, true ); |
140 | $this->addOption( 'optimize', |
141 | 'Optimize the index to 1 segment. Defaults to false.', false, false ); |
142 | $this->addOption( 'scoringMethod', |
143 | 'The scoring method to use when computing suggestion weights. ' . |
144 | 'Defaults to $wgCirrusSearchCompletionDefaultScore or quality if unset.', false, true ); |
145 | $this->addOption( 'masterTimeout', |
146 | 'The amount of time to wait for the master to respond to mapping ' . |
147 | 'updates before failing. Defaults to $wgCirrusSearchMasterTimeout.', false, true ); |
148 | $this->addOption( 'replicationTimeout', |
149 | 'The amount of time (seconds) to wait for the replica shards to initialize. ' . |
150 | 'Defaults to 3600 seconds.', false, true ); |
151 | $this->addOption( 'allocationIncludeTag', |
152 | 'Set index.routing.allocation.include.tag on the created index. Useful if you want to ' . |
153 | 'force the suggester index not to be allocated on a specific set of nodes.', |
154 | false, true ); |
155 | $this->addOption( 'allocationExcludeTag', |
156 | 'Set index.routing.allocation.exclude.tag on the created index. Useful if you want ' . |
157 | 'to force the suggester index not to be allocated on a specific set of nodes.', |
158 | false, true ); |
159 | $this->addOption( 'recreate', "Force the creation of a new index." ); |
160 | } |
161 | |
162 | public function execute() { |
163 | global $wgLanguageCode, |
164 | $wgCirrusSearchBannedPlugins, |
165 | $wgCirrusSearchMasterTimeout; |
166 | |
167 | $this->disablePoolCountersAndLogging(); |
168 | $this->workAroundBrokenMessageCache(); |
169 | $this->masterTimeout = $this->getOption( 'masterTimeout', $wgCirrusSearchMasterTimeout ); |
170 | $this->indexSuffix = Connection::TITLE_SUGGEST_INDEX_SUFFIX; |
171 | |
172 | $useCompletion = $this->getSearchConfig()->get( 'CirrusSearchUseCompletionSuggester' ); |
173 | |
174 | if ( $useCompletion !== 'build' && $useCompletion !== 'yes' && $useCompletion !== true ) { |
175 | $this->fatalError( "Completion suggester disabled, quitting..." ); |
176 | } |
177 | |
178 | // Check that all shards and replicas settings are set |
179 | try { |
180 | $this->getShardCount(); |
181 | $this->getReplicaCount(); |
182 | $this->getMaxShardsPerNode(); |
183 | } catch ( \Exception $e ) { |
184 | $this->fatalError( |
185 | "Failed to get shard count and replica count information: {$e->getMessage()}" |
186 | ); |
187 | } |
188 | |
189 | $this->indexBaseName = $this->getOption( |
190 | 'baseName', $this->getSearchConfig()->get( SearchConfig::INDEX_BASE_NAME ) |
191 | ); |
192 | $this->indexChunkSize = $this->getOption( 'indexChunkSize', 500 ); |
193 | $this->indexRetryAttempts = $this->getOption( 'reindexRetryAttempts', 5 ); |
194 | |
195 | $this->optimizeIndex = $this->getOption( 'optimize', false ); |
196 | |
197 | $this->utils = new ConfigUtils( $this->getClient(), $this ); |
198 | |
199 | $this->langCode = $wgLanguageCode; |
200 | $this->bannedPlugins = $wgCirrusSearchBannedPlugins; |
201 | |
202 | $this->availablePlugins = $this->unwrap( $this->utils->scanAvailablePlugins( $this->bannedPlugins ) ); |
203 | $this->analysisConfig = $this->pickAnalyzer( $this->langCode, $this->availablePlugins ) |
204 | ->buildConfig(); |
205 | |
206 | $this->unwrap( $this->utils->checkElasticsearchVersion() ); |
207 | |
208 | try { |
209 | $this->requireCirrusReady(); |
210 | $this->builder = SuggestBuilder::create( $this->getConnection(), |
211 | $this->getOption( 'scoringMethod' ), $this->indexBaseName ); |
212 | # check for broken indices and delete them |
213 | $this->checkAndDeleteBrokenIndices(); |
214 | |
215 | if ( !$this->canRecycle() ) { |
216 | $this->rebuild(); |
217 | } else { |
218 | $this->recycle(); |
219 | } |
220 | } catch ( \Elastica\Exception\Connection\HttpException $e ) { |
221 | $message = $e->getMessage(); |
222 | $this->log( "\nUnexpected Elasticsearch failure.\n" ); |
223 | $this->fatalError( "Http error communicating with Elasticsearch: $message.\n" ); |
224 | } catch ( \Elastica\Exception\ExceptionInterface $e ) { |
225 | $type = get_class( $e ); |
226 | $message = ElasticaErrorHandler::extractMessage( $e ); |
227 | $trace = $e->getTraceAsString(); |
228 | $this->log( "\nUnexpected Elasticsearch failure.\n" ); |
229 | $this->fatalError( "Elasticsearch failed in an unexpected way. " . |
230 | "This is always a bug in CirrusSearch.\n" . |
231 | "Error type: $type\n" . |
232 | "Message: $message\n" . |
233 | "Trace:\n" . $trace ); |
234 | } |
235 | |
236 | return true; |
237 | } |
238 | |
239 | private function workAroundBrokenMessageCache() { |
240 | // Under some configurations (T288233) the i18n cache fails to |
241 | // initialize. After failing, at least in this particular deployment, |
242 | // it will fallback to local CDB files and ignore on-wiki overrides |
243 | // which is acceptable for this script. |
244 | try { |
245 | wfMessage( 'ok' )->text(); |
246 | } catch ( \LogicException $e ) { |
247 | // The first failure should trigger the fallback mode, this second |
248 | // try should work (and not throw the LogicException deep in the updates). |
249 | wfMessage( 'ok' )->text(); |
250 | } |
251 | } |
252 | |
253 | /** |
254 | * Check for duplicate indices that may have been created |
255 | * by a previous update that failed. |
256 | */ |
257 | private function checkAndDeleteBrokenIndices() { |
258 | $indices = $this->unwrap( $this->utils->getAllIndicesByType( $this->getIndexAliasName() ) ); |
259 | if ( count( $indices ) < 2 ) { |
260 | return; |
261 | } |
262 | $indexByName = []; |
263 | foreach ( $indices as $indexName ) { |
264 | $status = $this->utils->isIndexLive( $indexName ); |
265 | if ( !$status->isGood() ) { |
266 | $this->log( (string)$status ); |
267 | } elseif ( $status->getValue() === false ) { |
268 | $this->log( "Deleting broken index {$indexName}\n" ); |
269 | $this->deleteIndex( $this->getConnection()->getIndex( $indexName ) ); |
270 | } |
271 | } |
272 | # If something went wrong the process will fail when calling pickIndexIdentifierFromOption |
273 | } |
274 | |
275 | private function rebuild() { |
276 | $oldIndexIdentifier = $this->unwrap( $this->utils->pickIndexIdentifierFromOption( |
277 | 'current', $this->getIndexAliasName() |
278 | ) ); |
279 | $this->oldIndex = $this->getConnection()->getIndex( |
280 | $this->indexBaseName, $this->indexSuffix, $oldIndexIdentifier |
281 | ); |
282 | $this->indexIdentifier = $this->unwrap( $this->utils->pickIndexIdentifierFromOption( |
283 | 'now', $this->getIndexAliasName() |
284 | ) ); |
285 | |
286 | $this->createIndex(); |
287 | $this->indexData(); |
288 | if ( $this->optimizeIndex ) { |
289 | $this->optimize(); |
290 | } |
291 | $this->enableReplicas(); |
292 | $this->getIndex()->refresh(); |
293 | $this->validateAlias(); |
294 | $this->updateVersions(); |
295 | $this->deleteOldIndex(); |
296 | $this->log( "Done.\n" ); |
297 | } |
298 | |
299 | private function canRecycle() { |
300 | global $wgCirrusSearchRecycleCompletionSuggesterIndex; |
301 | if ( !$wgCirrusSearchRecycleCompletionSuggesterIndex ) { |
302 | return false; |
303 | } |
304 | |
305 | if ( $this->getOption( "recreate", false ) ) { |
306 | return false; |
307 | } |
308 | |
309 | $oldIndexIdentifier = $this->unwrap( $this->utils->pickIndexIdentifierFromOption( |
310 | 'current', $this->getIndexAliasName() |
311 | ) ); |
312 | $oldIndex = $this->getConnection()->getIndex( |
313 | $this->indexBaseName, $this->indexSuffix, $oldIndexIdentifier |
314 | ); |
315 | if ( !$oldIndex->exists() ) { |
316 | $this->error( 'Index does not exist yet cannot recycle.' ); |
317 | return false; |
318 | } |
319 | $refresh = $oldIndex->getSettings()->getRefreshInterval(); |
320 | if ( $refresh != '-1' ) { |
321 | $this->error( 'Refresh interval is not -1, cannot recycle.' ); |
322 | return false; |
323 | } |
324 | |
325 | $shards = $oldIndex->getSettings()->get( 'number_of_shards' ); |
326 | // We check only the number of shards since it cannot be updated. |
327 | if ( $shards != $this->getShardCount() ) { |
328 | $this->error( 'Number of shards mismatch cannot recycle.' ); |
329 | return false; |
330 | } |
331 | |
332 | $mMaj = explode( '.', SuggesterMappingConfigBuilder::VERSION, 2 )[0]; |
333 | $aMaj = explode( '.', SuggesterAnalysisConfigBuilder::VERSION, 2 )[0]; |
334 | |
335 | try { |
336 | $versionDoc = $this->getMetaStore() |
337 | ->versionStore() |
338 | ->find( $this->indexBaseName, $this->indexSuffix ); |
339 | } catch ( \Elastica\Exception\NotFoundException $nfe ) { |
340 | $this->error( 'Index missing in mw_cirrus_metastore::version, cannot recycle.' ); |
341 | return false; |
342 | } |
343 | |
344 | if ( $versionDoc->analysis_maj != $aMaj ) { |
345 | $this->error( 'Analysis config version mismatch, cannot recycle.' ); |
346 | return false; |
347 | } |
348 | |
349 | if ( $versionDoc->mapping_maj != $mMaj ) { |
350 | $this->error( 'Mapping config version mismatch, cannot recycle.' ); |
351 | return false; |
352 | } |
353 | |
354 | $validator = new AnalyzersValidator( $oldIndex, $this->analysisConfig, $this ); |
355 | $status = $validator->validate(); |
356 | if ( !$status->isOK() ) { |
357 | $this->error( 'Analysis config differs, cannot recycle.' ); |
358 | return false; |
359 | } |
360 | |
361 | return true; |
362 | } |
363 | |
364 | /** |
365 | * Recycle a suggester index: |
366 | * 1/ index data (delete docs if it already exists) |
367 | * 2/ expunge deleted docs |
368 | * 3/ refresh the reader |
369 | * - so we can run a quick delete on remaining docs |
370 | * (the docs that were actually deleted) |
371 | * - drawbacks we load the FST from an un-optimized index |
372 | * 4/ delete old docs |
373 | * 5/ optimize |
374 | * 6/ refresh the reader |
375 | * |
376 | * Drawbacks: the FST will be read from disk twice in a short |
377 | * amount of time. |
378 | * This is a trade off between cluster operation and disk operation. |
379 | * Recreating the index may require less disk operations but causes |
380 | * the cluster to rebalance. |
381 | * This is certainly the best strategy for small indices (less than 100k docs) |
382 | * but needs to be carefully tested on bigger indices with high QPS. |
383 | */ |
384 | private function recycle() { |
385 | $this->log( "Recycling index {$this->getIndex()->getName()}\n" ); |
386 | $this->recycle = true; |
387 | $this->indexData(); |
388 | // This is fragile... hopefully most of the docs will be deleted from the old segments |
389 | // and will result in a fast operation. |
390 | // New segments should not be affected. |
391 | // Unfortunately if a failure causes the process to stop |
392 | // the FST will maybe contains duplicates as it cannot (elastic 1.7) |
393 | // filter deleted docs. We will rely on output deduplication |
394 | // but this will certainly affect performances. |
395 | |
396 | $this->expungeDeletes(); |
397 | // Refresh the reader so we can scroll over remaining docs. |
398 | // At this point we may read the new un-optimized FST segments |
399 | // Old ones should be pretty small after expungeDeletes |
400 | $this->getIndex()->refresh(); |
401 | |
402 | $boolNot = new Elastica\Query\BoolQuery(); |
403 | $boolNot->addMustNot( |
404 | new Elastica\Query\Term( [ "batch_id" => $this->builder->getBatchId() ] ) |
405 | ); |
406 | $bool = new Elastica\Query\BoolQuery(); |
407 | $bool->addFilter( $boolNot ); |
408 | |
409 | $query = new Elastica\Query(); |
410 | $query->setQuery( $bool ); |
411 | $query->setSize( $this->indexChunkSize ); |
412 | $query->setSource( false ); |
413 | $query->setSort( [ '_doc' ] ); |
414 | // Explicitly ask for accurate total_hits even-though we use a scroll request |
415 | $query->setTrackTotalHits( true ); |
416 | $search = new \Elastica\Search( $this->getClient() ); |
417 | $search->setQuery( $query ); |
418 | $search->addIndex( $this->getIndex() ); |
419 | $scroll = new \Elastica\Scroll( $search, '15m' ); |
420 | |
421 | $totalDocsToDump = -1; |
422 | $docsDumped = 0; |
423 | |
424 | $this->log( "Deleting remaining docs from previous batch\n" ); |
425 | foreach ( $scroll as $results ) { |
426 | if ( $totalDocsToDump === -1 ) { |
427 | $totalDocsToDump = $results->getTotalHits(); |
428 | if ( $totalDocsToDump === 0 ) { |
429 | break; |
430 | } |
431 | $docsDumped = 0; |
432 | } |
433 | $docIds = []; |
434 | foreach ( $results as $result ) { |
435 | $docsDumped++; |
436 | $docIds[] = $result->getId(); |
437 | } |
438 | $this->outputProgress( $docsDumped, $totalDocsToDump ); |
439 | if ( !$docIds ) { |
440 | continue; |
441 | } |
442 | |
443 | MWElasticUtils::withRetry( $this->indexRetryAttempts, |
444 | function () use ( $docIds ) { |
445 | $this->getIndex()->deleteByQuery( new Query\Ids( $docIds ) ); |
446 | } |
447 | ); |
448 | } |
449 | $this->log( "Done.\n" ); |
450 | // Old docs should be deleted now we can optimize and flush |
451 | $this->optimize(); |
452 | |
453 | // @todo add support for changing the number of replicas |
454 | // if the setting was changed in cirrus config. |
455 | // Workaround is to change the settings directly on the cluster. |
456 | |
457 | // Refresh the reader so it now uses the optimized FST, |
458 | // and actually free and delete old segments. |
459 | $this->getIndex()->refresh(); |
460 | } |
461 | |
462 | private function deleteOldIndex() { |
463 | if ( $this->oldIndex && $this->oldIndex->exists() ) { |
464 | $this->log( "Deleting " . $this->oldIndex->getName() . " ... " ); |
465 | // @todo Utilize $this->oldIndex->delete(...) once Elastica library is updated |
466 | // to allow passing the master_timeout |
467 | $this->oldIndex->request( |
468 | '', |
469 | Request::DELETE, |
470 | [], |
471 | [ 'master_timeout' => $this->masterTimeout ] |
472 | ); |
473 | $this->output( "ok.\n" ); |
474 | } |
475 | } |
476 | |
477 | /** |
478 | * Delete an index |
479 | * @param \Elastica\Index $index |
480 | */ |
481 | private function deleteIndex( \Elastica\Index $index ) { |
482 | // @todo Utilize $this->oldIndex->delete(...) once Elastica library is updated |
483 | // to allow passing the master_timeout |
484 | $index->request( |
485 | '', |
486 | Request::DELETE, |
487 | [], |
488 | [ 'master_timeout' => $this->masterTimeout ] |
489 | ); |
490 | } |
491 | |
492 | private function optimize() { |
493 | $this->log( "Optimizing index..." ); |
494 | $this->getIndex()->forcemerge( [ 'max_num_segments' => 1 ] ); |
495 | $this->output( "ok.\n" ); |
496 | } |
497 | |
498 | private function expungeDeletes() { |
499 | $this->log( "Purging deleted docs..." ); |
500 | $this->getIndex()->forcemerge( [ 'only_expunge_deletes' => 'true', 'flush' => 'false' ] ); |
501 | $this->output( "ok.\n" ); |
502 | } |
503 | |
504 | private function indexData() { |
505 | // We build the suggestions by reading CONTENT and GENERAL indices. |
506 | // This does not support extra indices like FILES on commons. |
507 | $sourceIndexSuffixes = [ Connection::CONTENT_INDEX_SUFFIX, Connection::GENERAL_INDEX_SUFFIX ]; |
508 | |
509 | $query = new Query(); |
510 | $query->setSource( [ |
511 | 'includes' => $this->builder->getRequiredFields() |
512 | ] ); |
513 | |
514 | $pageAndNs = new Elastica\Query\BoolQuery(); |
515 | $pageAndNs->addShould( new Elastica\Query\Term( [ "namespace" => NS_MAIN ] ) ); |
516 | $pageAndNs->addShould( new Elastica\Query\Term( [ "redirect.namespace" => NS_MAIN ] ) ); |
517 | $bool = new Elastica\Query\BoolQuery(); |
518 | $bool->addFilter( $pageAndNs ); |
519 | |
520 | $query->setQuery( $bool ); |
521 | $query->setSort( [ '_doc' ] ); |
522 | // Explicitly ask for accurate total_hits even-though we use a scroll request |
523 | $query->setTrackTotalHits( true ); |
524 | |
525 | foreach ( $sourceIndexSuffixes as $sourceIndexSuffix ) { |
526 | $sourceIndex = $this->getConnection()->getIndex( $this->indexBaseName, $sourceIndexSuffix ); |
527 | $search = new \Elastica\Search( $this->getClient() ); |
528 | $search->setQuery( $query ); |
529 | $search->addIndex( $sourceIndex ); |
530 | $query->setSize( $this->indexChunkSize ); |
531 | $totalDocsToDump = -1; |
532 | $scroll = new \Elastica\Scroll( $search, '15m' ); |
533 | |
534 | $docsDumped = 0; |
535 | $destinationIndex = $this->getIndex(); |
536 | |
537 | foreach ( $scroll as $results ) { |
538 | if ( $totalDocsToDump === -1 ) { |
539 | $totalDocsToDump = $results->getTotalHits(); |
540 | if ( $totalDocsToDump === 0 ) { |
541 | $this->log( "No documents to index from $sourceIndexSuffix\n" ); |
542 | break; |
543 | } |
544 | $this->log( "Indexing $totalDocsToDump documents from $sourceIndexSuffix with " . |
545 | "batchId: {$this->builder->getBatchId()}\n" ); |
546 | } |
547 | $inputDocs = []; |
548 | foreach ( $results as $result ) { |
549 | $docsDumped++; |
550 | $inputDocs[] = [ |
551 | 'id' => $result->getId(), |
552 | 'source' => $result->getSource() |
553 | ]; |
554 | } |
555 | |
556 | $suggestDocs = $this->builder->build( $inputDocs ); |
557 | if ( !$suggestDocs ) { |
558 | continue; |
559 | } |
560 | $this->outputProgress( $docsDumped, $totalDocsToDump ); |
561 | MWElasticUtils::withRetry( $this->indexRetryAttempts, |
562 | static function () use ( $destinationIndex, $suggestDocs ) { |
563 | $destinationIndex->addDocuments( $suggestDocs ); |
564 | } |
565 | ); |
566 | } |
567 | $this->log( "Indexing from $sourceIndexSuffix index done.\n" ); |
568 | } |
569 | } |
570 | |
571 | public function validateAlias() { |
572 | // @todo utilize the following once Elastica is updated to support passing |
573 | // master_timeout. This is a copy of the Elastica\Index::addAlias() method |
574 | // $this->getIndex()->addAlias( $this->getIndexTypeName(), true ); |
575 | $index = $this->getIndex(); |
576 | $name = $this->getIndexAliasName(); |
577 | |
578 | $path = '_aliases'; |
579 | $data = [ 'actions' => [] ]; |
580 | $status = new Status( $index->getClient() ); |
581 | foreach ( $status->getIndicesWithAlias( $name ) as $aliased ) { |
582 | $data['actions'][] = [ 'remove' => [ 'index' => $aliased->getName(), 'alias' => $name ] ]; |
583 | } |
584 | |
585 | $data['actions'][] = [ 'add' => [ 'index' => $index->getName(), 'alias' => $name ] ]; |
586 | |
587 | $index->getClient()->request( |
588 | $path, Request::POST, $data, [ 'master_timeout' => $this->masterTimeout ] |
589 | ); |
590 | } |
591 | |
592 | /** |
593 | * public because php 5.3 does not support accessing private |
594 | * methods in a closure. |
595 | * @param int $docsDumped |
596 | * @param int $limit |
597 | */ |
598 | public function outputProgress( $docsDumped, $limit ) { |
599 | if ( $docsDumped <= 0 ) { |
600 | return; |
601 | } |
602 | $pctDone = (int)( ( $docsDumped / $limit ) * 100 ); |
603 | if ( $this->lastProgressPrinted == $pctDone ) { |
604 | return; |
605 | } |
606 | $this->lastProgressPrinted = $pctDone; |
607 | if ( ( $pctDone % 2 ) == 0 ) { |
608 | $this->outputIndented( " $pctDone% done...\n" ); |
609 | } |
610 | } |
611 | |
612 | public function log( $message, $channel = null ) { |
613 | $date = new \DateTime(); |
614 | parent::output( $date->format( 'Y-m-d H:i:s' ) . " " . $message, $channel ); |
615 | } |
616 | |
617 | /** |
618 | * @param string $langCode |
619 | * @param array $availablePlugins |
620 | * @return AnalysisConfigBuilder |
621 | */ |
622 | private function pickAnalyzer( $langCode, array $availablePlugins = [] ) { |
623 | $analysisConfigBuilder = new SuggesterAnalysisConfigBuilder( $langCode, $availablePlugins ); |
624 | $this->outputIndented( 'Picking analyzer...' . |
625 | $analysisConfigBuilder->getDefaultTextAnalyzerType( $langCode ) . |
626 | "\n" ); |
627 | return $analysisConfigBuilder; |
628 | } |
629 | |
630 | private function createIndex() { |
631 | // This is "create only" for now. |
632 | if ( $this->getIndex()->exists() ) { |
633 | throw new RuntimeException( "Index already exists." ); |
634 | } |
635 | |
636 | $mappingConfigBuilder = new SuggesterMappingConfigBuilder(); |
637 | |
638 | // We create the index with 0 replicas, this is faster and will |
639 | // stress less nodes with 4 shards and 2 replicas we would |
640 | // stress 12 nodes (moreover with the optimize flag) |
641 | $settings = [ |
642 | 'number_of_shards' => $this->getShardCount(), |
643 | // hacky but we still use auto_expand_replicas |
644 | // for convenience on small install. |
645 | 'auto_expand_replicas' => "0-0", |
646 | 'refresh_interval' => -1, |
647 | 'analysis' => $this->analysisConfig, |
648 | 'routing.allocation.total_shards_per_node' => $this->getMaxShardsPerNode(), |
649 | ]; |
650 | |
651 | if ( $this->hasOption( 'allocationIncludeTag' ) ) { |
652 | $this->output( "Using routing.allocation.include.tag: " . |
653 | "{$this->getOption( 'allocationIncludeTag' )}, the index might be stuck in red " . |
654 | "if the cluster is not properly configured.\n" ); |
655 | $settings['routing.allocation.include.tag'] = $this->getOption( 'allocationIncludeTag' ); |
656 | } |
657 | |
658 | if ( $this->hasOption( 'allocationExcludeTag' ) ) { |
659 | $this->output( "Using routing.allocation.exclude.tag: " . |
660 | "{$this->getOption( 'allocationExcludeTag' )}, the index might be stuck in red " . |
661 | "if the cluster is not properly configured.\n" ); |
662 | $settings['routing.allocation.exclude.tag'] = $this->getOption( 'allocationExcludeTag' ); |
663 | } |
664 | |
665 | $args = [ |
666 | 'settings' => [ 'index' => $settings ], |
667 | 'mappings' => $mappingConfigBuilder->buildConfig() |
668 | ]; |
669 | // @todo utilize $this->getIndex()->create(...) once it supports setting |
670 | // the master_timeout parameter. |
671 | $this->getIndex()->request( |
672 | '', |
673 | Request::PUT, |
674 | $args, |
675 | [ 'master_timeout' => $this->masterTimeout ] |
676 | ); |
677 | |
678 | // Index create is async, we have to make sure that the index is ready |
679 | // before sending any docs to it. |
680 | $this->waitForGreen(); |
681 | } |
682 | |
683 | private function enableReplicas() { |
684 | $this->log( "Enabling replicas...\n" ); |
685 | $args = [ |
686 | 'index' => [ |
687 | 'auto_expand_replicas' => $this->getReplicaCount(), |
688 | ], |
689 | ]; |
690 | |
691 | $path = $this->getIndex()->getName() . "/_settings"; |
692 | $this->getIndex()->getClient()->request( |
693 | $path, |
694 | Request::PUT, |
695 | $args, |
696 | [ 'master_timeout' => $this->masterTimeout ] |
697 | ); |
698 | |
699 | // The previous call seems to be async, let's wait few sec |
700 | // otherwise replication won't have time to start. |
701 | sleep( 20 ); |
702 | |
703 | // Index will be yellow while replica shards are being allocated. |
704 | $this->waitForGreen( $this->getOption( 'replicationTimeout', 3600 ) ); |
705 | } |
706 | |
707 | private function waitForGreen( $timeout = 600 ) { |
708 | $this->log( "Waiting for the index to go green...\n" ); |
709 | // Wait for the index to go green ( default 10 min) |
710 | if ( !$this->utils->waitForGreen( $this->getIndex()->getName(), $timeout ) ) { |
711 | $this->fatalError( "Failed to wait for green... please check config and " . |
712 | "delete the {$this->getIndex()->getName()} index if it was created." ); |
713 | } |
714 | } |
715 | |
716 | /** |
717 | * @return string Number of replicas this index should have. May be a range such as '0-2' |
718 | */ |
719 | private function getReplicaCount() { |
720 | return $this->getConnection()->getSettings()->getReplicaCount( $this->indexSuffix ); |
721 | } |
722 | |
723 | private function getShardCount() { |
724 | return $this->getConnection()->getSettings()->getShardCount( $this->indexSuffix ); |
725 | } |
726 | |
727 | /** |
728 | * @return int Maximum number of shards that can be allocated on a single elasticsearch |
729 | * node. -1 for unlimited. |
730 | */ |
731 | private function getMaxShardsPerNode() { |
732 | return $this->getConnection()->getSettings()->getMaxShardsPerNode( $this->indexSuffix ); |
733 | } |
734 | |
735 | private function updateVersions() { |
736 | $this->log( "Updating tracking indexes..." ); |
737 | $this->getMetaStore() |
738 | ->versionStore() |
739 | ->update( $this->indexBaseName, $this->indexSuffix ); |
740 | $this->output( "ok.\n" ); |
741 | } |
742 | |
743 | /** |
744 | * @return \Elastica\Index being updated |
745 | */ |
746 | public function getIndex() { |
747 | return $this->getConnection()->getIndex( |
748 | $this->indexBaseName, $this->indexSuffix, $this->indexIdentifier |
749 | ); |
750 | } |
751 | |
752 | /** |
753 | * @return Elastica\Client |
754 | */ |
755 | protected function getClient() { |
756 | return $this->getConnection()->getClient(); |
757 | } |
758 | |
759 | /** |
760 | * @return string name of the index type being updated |
761 | */ |
762 | protected function getIndexAliasName() { |
763 | return $this->getConnection()->getIndexName( $this->indexBaseName, $this->indexSuffix ); |
764 | } |
765 | } |
766 | |
767 | $maintClass = UpdateSuggesterIndex::class; |
768 | require_once RUN_MAINTENANCE_IF_MAIN; |