Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
| Total | |
0.00% |
0 / 211 |
|
0.00% |
0 / 15 |
CRAP | |
0.00% |
0 / 1 |
| Updater | |
0.00% |
0 / 211 |
|
0.00% |
0 / 15 |
2256 | |
0.00% |
0 / 1 |
| __construct | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
| build | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
| updateFromTitle | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
20 | |||
| traceRedirects | |
0.00% |
0 / 29 |
|
0.00% |
0 / 1 |
90 | |||
| updatePages | |
0.00% |
0 / 44 |
|
0.00% |
0 / 1 |
20 | |||
| updateWeightedTags | |
0.00% |
0 / 23 |
|
0.00% |
0 / 1 |
2 | |||
| resetWeightedTags | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
| deletePages | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
2 | |||
| archivePages | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
6 | |||
| buildArchiveDocuments | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
12 | |||
| updateLinkedArticles | |
0.00% |
0 / 21 |
|
0.00% |
0 / 1 |
132 | |||
| pagesToTitles | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
| pushElasticaWriteJobs | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
12 | |||
| elasticaWriteClusters | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
6 | |||
| newLog | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
2 | |||
| 1 | <?php |
| 2 | |
| 3 | namespace CirrusSearch; |
| 4 | |
| 5 | use CirrusSearch\BuildDocument\BuildDocument; |
| 6 | use CirrusSearch\BuildDocument\DocumentSizeLimiter; |
| 7 | use CirrusSearch\Profile\SearchProfileService; |
| 8 | use CirrusSearch\Search\CirrusIndexField; |
| 9 | use MediaWiki\Content\TextContent; |
| 10 | use MediaWiki\Logger\LoggerFactory; |
| 11 | use MediaWiki\MediaWikiServices; |
| 12 | use MediaWiki\Page\ProperPageIdentity; |
| 13 | use MediaWiki\Page\WikiPage; |
| 14 | use MediaWiki\Title\Title; |
| 15 | use MediaWiki\WikiMap\WikiMap; |
| 16 | use Wikimedia\Assert\Assert; |
| 17 | |
| 18 | /** |
| 19 | * Performs updates and deletes on the Elasticsearch index. Called by |
| 20 | * CirrusSearch.php (our SearchEngine implementation), forceSearchIndex |
| 21 | * (for bulk updates), and CirrusSearch's jobs. |
| 22 | * |
| 23 | * @license GPL-2.0-or-later |
| 24 | */ |
| 25 | class Updater extends ElasticsearchIntermediary implements WeightedTagsUpdater { |
| 26 | /** |
| 27 | * Full title text of pages updated in this process. Used for deduplication |
| 28 | * of updates. |
| 29 | * @var string[] |
| 30 | */ |
| 31 | private $updated = []; |
| 32 | |
| 33 | /** |
| 34 | * @var string|null Name of cluster to write to, or null if none (write to all) |
| 35 | */ |
| 36 | protected $writeToClusterName; |
| 37 | |
| 38 | /** |
| 39 | * @param Connection $readConnection connection used to pull data out of elasticsearch |
| 40 | * @param string|null $writeToClusterName |
| 41 | */ |
| 42 | public function __construct( Connection $readConnection, $writeToClusterName = null ) { |
| 43 | parent::__construct( $readConnection, null, 0 ); |
| 44 | $this->writeToClusterName = $writeToClusterName; |
| 45 | } |
| 46 | |
| 47 | /** |
| 48 | * @param SearchConfig $config |
| 49 | * @param string|null $cluster cluster to read from and write to, |
| 50 | * null to read from the default cluster and write to all |
| 51 | * @return self |
| 52 | */ |
| 53 | public static function build( SearchConfig $config, $cluster ): self { |
| 54 | Assert::invariant( self::class === static::class, 'Must be invoked as Updater::build( ... )' ); |
| 55 | $connection = Connection::getPool( $config, $cluster ); |
| 56 | return new self( $connection, $cluster ); |
| 57 | } |
| 58 | |
| 59 | /** |
| 60 | * Update a single page. |
| 61 | * @param Title $title |
| 62 | * @param string|null $updateKind kind of update to perform (used for monitoring) |
| 63 | * @param int|null $rootEventTime the time of MW event that caused this update (used for monitoring) |
| 64 | */ |
| 65 | public function updateFromTitle( $title, ?string $updateKind, ?int $rootEventTime ): void { |
| 66 | [ $page, $redirects ] = $this->traceRedirects( $title ); |
| 67 | if ( $page ) { |
| 68 | $this->updatePages( |
| 69 | [ $page ], |
| 70 | BuildDocument::INDEX_EVERYTHING, |
| 71 | $updateKind, |
| 72 | $rootEventTime |
| 73 | ); |
| 74 | } |
| 75 | |
| 76 | if ( $redirects === [] ) { |
| 77 | return; |
| 78 | } |
| 79 | $redirectDocIds = []; |
| 80 | foreach ( $redirects as $redirect ) { |
| 81 | $redirectDocIds[] = $this->connection->getConfig()->makeId( $redirect->getId() ); |
| 82 | } |
| 83 | $this->deletePages( [], $redirectDocIds ); |
| 84 | } |
| 85 | |
| 86 | /** |
| 87 | * Trace redirects from the title to the destination. Also registers the title in the |
| 88 | * memory of titles updated and detects special pages. |
| 89 | * |
| 90 | * @param Title $title title to trace |
| 91 | * @return array with keys: target, redirects |
| 92 | * - target is WikiPage|null wikipage if the $title either isn't a redirect or resolves |
| 93 | * to an updatable page that hasn't been updated yet. Null if the page has been |
| 94 | * updated, is a special page, or the redirects enter a loop. |
| 95 | * - redirects is an array of WikiPages, one per redirect in the chain. If title isn't |
| 96 | * a redirect then this will be an empty array |
| 97 | */ |
| 98 | public function traceRedirects( $title ) { |
| 99 | // Loop through redirects until we get to the ultimate target |
| 100 | $redirects = []; |
| 101 | $wikiPageFactory = MediaWikiServices::getInstance()->getWikiPageFactory(); |
| 102 | while ( true ) { |
| 103 | $titleText = $title->getFullText(); |
| 104 | if ( in_array( $titleText, $this->updated ) ) { |
| 105 | // Already indexed this article in this process. This is mostly useful |
| 106 | // to catch self redirects but has a storied history of catching strange |
| 107 | // behavior. |
| 108 | return [ null, $redirects ]; |
| 109 | } |
| 110 | |
| 111 | // Don't index special pages, interwiki links, bad namespaces, etc |
| 112 | $logger = LoggerFactory::getInstance( 'CirrusSearch' ); |
| 113 | if ( !$title->canExist() ) { |
| 114 | $logger->debug( "Ignoring an update for a page that cannot exist: $titleText" ); |
| 115 | return [ null, $redirects ]; |
| 116 | } |
| 117 | |
| 118 | $page = $wikiPageFactory->newFromTitle( $title ); |
| 119 | if ( !$page->exists() ) { |
| 120 | $logger->debug( "Ignoring an update for a nonexistent page: $titleText" ); |
| 121 | return [ null, $redirects ]; |
| 122 | } |
| 123 | $content = $page->getContent(); |
| 124 | if ( is_string( $content ) ) { |
| 125 | $content = new TextContent( $content ); |
| 126 | } |
| 127 | // If the event that the content is _still_ not usable, we have to give up. |
| 128 | if ( !is_object( $content ) ) { |
| 129 | return [ null, $redirects ]; |
| 130 | } |
| 131 | |
| 132 | // Add the page to the list of updated pages before we start trying to update to catch redirect loops. |
| 133 | $this->updated[] = $titleText; |
| 134 | if ( $content->isRedirect() ) { |
| 135 | $redirects[] = $page; |
| 136 | $target = $content->getRedirectTarget(); |
| 137 | if ( $target->equals( $page->getTitle() ) ) { |
| 138 | // This doesn't warn about redirect loops longer than one but we'll catch those anyway. |
| 139 | $logger->info( "Title redirecting to itself. Skip indexing" ); |
| 140 | return [ null, $redirects ]; |
| 141 | } |
| 142 | $title = $target; |
| 143 | continue; |
| 144 | } else { |
| 145 | return [ $page, $redirects ]; |
| 146 | } |
| 147 | } |
| 148 | } |
| 149 | |
| 150 | /** |
| 151 | * This updates pages in elasticsearch. |
| 152 | * |
| 153 | * $flags includes: |
| 154 | * INDEX_EVERYTHING Cirrus will parse the page and count the links and send the document |
| 155 | * to Elasticsearch as an index so if it doesn't exist it'll be created. |
| 156 | * SKIP_PARSE Cirrus will skip parsing the page when building the document. It makes |
| 157 | * sense to do this when you know the page hasn't changed like when it is newly linked |
| 158 | * from another page. |
| 159 | * SKIP_LINKS Cirrus will skip collecting links information. It makes sense to do this |
| 160 | * when you know the link counts aren't yet available like during the first phase of |
| 161 | * the two phase index build. |
| 162 | * INDEX_ON_SKIP Cirrus will send an update if SKIP_PARSE or SKIP_LINKS rather than an |
| 163 | * index. Indexing with any portion of the document skipped is dangerous because it |
| 164 | * can put half created pages in the index. This is only a good idea during the first |
| 165 | * half of the two phase index build. |
| 166 | * |
| 167 | * @param WikiPage[] $pages pages to update |
| 168 | * @param int $flags Bit field containing instructions about how the document should be built |
| 169 | * and sent to Elasticsearch. |
| 170 | * @param string|null $updateKind kind of update to perform (used for monitoring) |
| 171 | * @param int|null $rootEventTime the time of MW event that caused this update (used for monitoring) |
| 172 | * @return int Number of documents updated |
| 173 | */ |
| 174 | public function updatePages( $pages, $flags, ?string $updateKind = null, ?int $rootEventTime = null ): int { |
| 175 | // Don't update the same page twice. We shouldn't, but meh |
| 176 | $pageIds = []; |
| 177 | $pages = array_filter( $pages, static function ( WikiPage $page ) use ( &$pageIds ) { |
| 178 | if ( !in_array( $page->getId(), $pageIds ) ) { |
| 179 | $pageIds[] = $page->getId(); |
| 180 | return true; |
| 181 | } |
| 182 | return false; |
| 183 | } ); |
| 184 | |
| 185 | $titles = $this->pagesToTitles( $pages ); |
| 186 | Job\OtherIndex::queueIfRequired( $this->connection->getConfig(), $titles, $this->writeToClusterName ); |
| 187 | |
| 188 | $allDocuments = array_fill_keys( $this->connection->getAllIndexSuffixes(), [] ); |
| 189 | $services = MediaWikiServices::getInstance(); |
| 190 | $docSizeLimiter = new DocumentSizeLimiter( |
| 191 | $this->connection->getConfig()->getProfileService()->loadProfile( SearchProfileService::DOCUMENT_SIZE_LIMITER ) ); |
| 192 | $builder = new BuildDocument( |
| 193 | $this->connection, |
| 194 | $services->getConnectionProvider()->getReplicaDatabase(), |
| 195 | $services->getRevisionStore(), |
| 196 | $services->getBacklinkCacheFactory(), |
| 197 | $docSizeLimiter, |
| 198 | $services->getTitleFormatter(), |
| 199 | $services->getWikiPageFactory(), |
| 200 | $services->getTitleFactory() |
| 201 | ); |
| 202 | foreach ( $builder->initialize( $pages, $flags ) as $document ) { |
| 203 | // This isn't really a property of the connection, so it doesn't matter |
| 204 | // this is the read cluster and not the write cluster. |
| 205 | $suffix = $this->connection->getIndexSuffixForNamespace( $document->get( 'namespace' ) ); |
| 206 | $allDocuments[$suffix][] = $document; |
| 207 | } |
| 208 | |
| 209 | $count = 0; |
| 210 | foreach ( $allDocuments as $indexSuffix => $documents ) { |
| 211 | $this->pushElasticaWriteJobs( |
| 212 | UpdateGroup::PAGE, |
| 213 | $documents, |
| 214 | static function ( array $chunk, string $cluster ) use ( $indexSuffix, $updateKind, $rootEventTime ) { |
| 215 | return Job\ElasticaWrite::build( |
| 216 | $cluster, |
| 217 | UpdateGroup::PAGE, |
| 218 | 'sendData', |
| 219 | [ $indexSuffix, $chunk ], |
| 220 | [], |
| 221 | $updateKind, |
| 222 | $rootEventTime |
| 223 | ); |
| 224 | } ); |
| 225 | $count += count( $documents ); |
| 226 | } |
| 227 | |
| 228 | return $count; |
| 229 | } |
| 230 | |
| 231 | /** |
| 232 | * @inheritDoc |
| 233 | */ |
| 234 | public function updateWeightedTags( |
| 235 | ProperPageIdentity $page, |
| 236 | string $tagPrefix, |
| 237 | ?array $tagWeights = null, |
| 238 | ?string $trigger = null |
| 239 | ): void { |
| 240 | Assert::precondition( $page->exists(), "page must exist" ); |
| 241 | $docId = $this->connection->getConfig()->makeId( $page->getId() ); |
| 242 | $indexSuffix = $this->connection->getIndexSuffixForNamespace( $page->getNamespace() ); |
| 243 | $this->pushElasticaWriteJobs( |
| 244 | UpdateGroup::WEIGHTED_TAGS, |
| 245 | [ $docId ], |
| 246 | static function ( array $docIds, string $cluster ) use ( |
| 247 | $docId, |
| 248 | $indexSuffix, |
| 249 | $tagPrefix, |
| 250 | $tagWeights |
| 251 | ) { |
| 252 | return Job\ElasticaWrite::build( |
| 253 | $cluster, |
| 254 | UpdateGroup::WEIGHTED_TAGS, |
| 255 | 'sendWeightedTagsUpdate', |
| 256 | [ |
| 257 | $indexSuffix, |
| 258 | $tagPrefix, |
| 259 | [ $docId => $tagWeights ] |
| 260 | ], |
| 261 | ); |
| 262 | } ); |
| 263 | } |
| 264 | |
| 265 | /** |
| 266 | * @inheritDoc |
| 267 | */ |
| 268 | public function resetWeightedTags( ProperPageIdentity $page, array $tagPrefixes, ?string $trigger = null ): void { |
| 269 | foreach ( $tagPrefixes as $tagPrefix ) { |
| 270 | $this->updateWeightedTags( |
| 271 | $page, |
| 272 | $tagPrefix, |
| 273 | [ CirrusIndexField::MULTILIST_DELETE_GROUPING => null ], |
| 274 | $trigger |
| 275 | ); |
| 276 | } |
| 277 | } |
| 278 | |
| 279 | /** |
| 280 | * Delete pages from the elasticsearch index. $titles and $docIds must point to the |
| 281 | * same pages and should point to them in the same order. |
| 282 | * |
| 283 | * @param Title[] $titles List of titles to delete. If empty then skipped other index |
| 284 | * maintenance is skipped. |
| 285 | * @param int[]|string[] $docIds List of elasticsearch document ids to delete |
| 286 | * @param string|null $indexSuffix index from which to delete. null means all. |
| 287 | * @param array $writeJobParams Parameters passed on to ElasticaWriteJob |
| 288 | */ |
| 289 | public function deletePages( $titles, $docIds, $indexSuffix = null, array $writeJobParams = [] ): void { |
| 290 | Job\OtherIndex::queueIfRequired( $this->connection->getConfig(), $titles, $this->writeToClusterName ); |
| 291 | |
| 292 | // Deletes are fairly cheap to send, they can be batched in larger |
| 293 | // chunks. Unlikely a batch this large ever comes through. |
| 294 | $batchSize = 50; |
| 295 | $this->pushElasticaWriteJobs( |
| 296 | UpdateGroup::PAGE, |
| 297 | $docIds, |
| 298 | static function ( array $chunk, string $cluster ) use ( $indexSuffix, $writeJobParams ) { |
| 299 | return Job\ElasticaWrite::build( |
| 300 | $cluster, |
| 301 | UpdateGroup::PAGE, |
| 302 | 'sendDeletes', |
| 303 | [ $chunk, $indexSuffix ], |
| 304 | $writeJobParams |
| 305 | ); |
| 306 | }, |
| 307 | $batchSize |
| 308 | ); |
| 309 | } |
| 310 | |
| 311 | /** |
| 312 | * Add documents to archive index. |
| 313 | * @param array $archived |
| 314 | * @return bool |
| 315 | */ |
| 316 | public function archivePages( $archived ) { |
| 317 | if ( !$this->connection->getConfig()->getElement( 'CirrusSearchIndexDeletes' ) ) { |
| 318 | // Disabled by config - don't do anything |
| 319 | return true; |
| 320 | } |
| 321 | $docs = $this->buildArchiveDocuments( $archived ); |
| 322 | $this->pushElasticaWriteJobs( |
| 323 | UpdateGroup::ARCHIVE, |
| 324 | $docs, |
| 325 | static function ( array $chunk, string $cluster ) { |
| 326 | return Job\ElasticaWrite::build( |
| 327 | $cluster, |
| 328 | UpdateGroup::ARCHIVE, |
| 329 | 'sendData', |
| 330 | [ Connection::ARCHIVE_INDEX_SUFFIX, $chunk ], |
| 331 | [ 'private_data' => true ], |
| 332 | ); |
| 333 | } ); |
| 334 | |
| 335 | return true; |
| 336 | } |
| 337 | |
| 338 | /** |
| 339 | * Build Elastica documents for archived pages. |
| 340 | * @param array $archived |
| 341 | * @return \Elastica\Document[] |
| 342 | */ |
| 343 | private function buildArchiveDocuments( array $archived ) { |
| 344 | $docs = []; |
| 345 | foreach ( $archived as $delete ) { |
| 346 | if ( !isset( $delete['title'] ) ) { |
| 347 | // These come from pages that still exist, but are redirects. |
| 348 | // This is non-obvious and we probably need a better way... |
| 349 | continue; |
| 350 | } |
| 351 | /** @var Title $title */ |
| 352 | $title = $delete['title']; |
| 353 | $doc = new \Elastica\Document( $delete['page'], [ |
| 354 | 'namespace' => $title->getNamespace(), |
| 355 | 'title' => $title->getText(), |
| 356 | 'wiki' => WikiMap::getCurrentWikiId(), |
| 357 | ] ); |
| 358 | $doc->setDocAsUpsert( true ); |
| 359 | $doc->setRetryOnConflict( $this->connection->getConfig()->getElement( 'CirrusSearchUpdateConflictRetryCount' ) ); |
| 360 | |
| 361 | $docs[] = $doc; |
| 362 | } |
| 363 | |
| 364 | return $docs; |
| 365 | } |
| 366 | |
| 367 | /** |
| 368 | * Update the search index for newly linked or unlinked articles. |
| 369 | * @param Title[] $titles titles to update |
| 370 | */ |
| 371 | public function updateLinkedArticles( $titles ): void { |
| 372 | $pages = []; |
| 373 | $wikiPageFactory = MediaWikiServices::getInstance()->getWikiPageFactory(); |
| 374 | foreach ( $titles as $title ) { |
| 375 | // Special pages don't get updated, we only index |
| 376 | // actual existing pages. |
| 377 | if ( !$title || !$title->canExist() ) { |
| 378 | continue; |
| 379 | } |
| 380 | |
| 381 | $page = $wikiPageFactory->newFromTitle( $title ); |
| 382 | if ( $page === null || !$page->exists() ) { |
| 383 | // Skip link to nonexistent page. |
| 384 | continue; |
| 385 | } |
| 386 | // Resolve one level of redirects because only one level of redirects is scored. |
| 387 | if ( $page->isRedirect() ) { |
| 388 | $target = $page->getRedirectTarget(); |
| 389 | if ( $target === null ) { |
| 390 | // Redirect to itself or broken redirect? ignore. |
| 391 | continue; |
| 392 | } |
| 393 | if ( !$target->exists() ) { |
| 394 | // Skip redirects to nonexistent pages |
| 395 | continue; |
| 396 | } |
| 397 | $page = $wikiPageFactory->newFromTitle( $target ); |
| 398 | } |
| 399 | if ( $page->isRedirect() ) { |
| 400 | // This is a redirect to a redirect which doesn't count in the search score any way. |
| 401 | continue; |
| 402 | } |
| 403 | if ( in_array( $title->getFullText(), $this->updated ) ) { |
| 404 | // We've already updated this page in this process so there is no need to update it again. |
| 405 | continue; |
| 406 | } |
| 407 | // Note that we don't add this page to the list of updated pages because this update isn't |
| 408 | // a full update (just link counts). |
| 409 | $pages[] = $page; |
| 410 | } |
| 411 | $this->updatePages( $pages, BuildDocument::SKIP_PARSE ); |
| 412 | } |
| 413 | |
| 414 | /** |
| 415 | * Convert an array of pages to an array of their titles. |
| 416 | * |
| 417 | * @param WikiPage[] $pages |
| 418 | * @return Title[] |
| 419 | */ |
| 420 | private function pagesToTitles( $pages ) { |
| 421 | $titles = []; |
| 422 | foreach ( $pages as $page ) { |
| 423 | $titles[] = $page->getTitle(); |
| 424 | } |
| 425 | return $titles; |
| 426 | } |
| 427 | |
| 428 | /** |
| 429 | * @param string $updateGroup UpdateGroup::* constant |
| 430 | * @param mixed[] $items |
| 431 | * @param callable $factory |
| 432 | * @param int $batchSize |
| 433 | */ |
| 434 | protected function pushElasticaWriteJobs( string $updateGroup, array $items, $factory, int $batchSize = 10 ): void { |
| 435 | // Elasticsearch has a queue capacity of 50 so if $documents contains 50 pages it could bump up |
| 436 | // against the max. So we chunk it and do them sequentially. |
| 437 | $config = $this->connection->getConfig(); |
| 438 | $clusters = $this->elasticaWriteClusters( $updateGroup ); |
| 439 | |
| 440 | foreach ( array_chunk( $items, $batchSize ) as $chunked ) { |
| 441 | foreach ( $clusters as $cluster ) { |
| 442 | $job = $factory( $chunked, $cluster ); |
| 443 | // If the job fails for any reason it will enqueue itself to retry later. |
| 444 | $job->run(); |
| 445 | } |
| 446 | } |
| 447 | } |
| 448 | |
| 449 | private function elasticaWriteClusters( string $updateGroup ): array { |
| 450 | if ( $this->writeToClusterName !== null ) { |
| 451 | return [ $this->writeToClusterName ]; |
| 452 | } else { |
| 453 | return $this->connection |
| 454 | ->getConfig() |
| 455 | ->getClusterAssignment() |
| 456 | ->getWritableClusters( $updateGroup ); |
| 457 | } |
| 458 | } |
| 459 | |
| 460 | /** |
| 461 | * @param string $description |
| 462 | * @param string $queryType |
| 463 | * @param string[] $extra |
| 464 | * @return SearchRequestLog |
| 465 | */ |
| 466 | protected function newLog( $description, $queryType, array $extra = [] ) { |
| 467 | return new SearchRequestLog( |
| 468 | $this->connection->getClient(), |
| 469 | $description, |
| 470 | $queryType, |
| 471 | $extra |
| 472 | ); |
| 473 | } |
| 474 | } |