Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 211
0.00% covered (danger)
0.00%
0 / 15
CRAP
0.00% covered (danger)
0.00%
0 / 1
Updater
0.00% covered (danger)
0.00%
0 / 211
0.00% covered (danger)
0.00%
0 / 15
2256
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 build
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 updateFromTitle
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
20
 traceRedirects
0.00% covered (danger)
0.00%
0 / 29
0.00% covered (danger)
0.00%
0 / 1
90
 updatePages
0.00% covered (danger)
0.00%
0 / 44
0.00% covered (danger)
0.00%
0 / 1
20
 updateWeightedTags
0.00% covered (danger)
0.00%
0 / 23
0.00% covered (danger)
0.00%
0 / 1
2
 resetWeightedTags
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 deletePages
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
2
 archivePages
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
6
 buildArchiveDocuments
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
12
 updateLinkedArticles
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
132
 pagesToTitles
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 pushElasticaWriteJobs
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
12
 elasticaWriteClusters
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 newLog
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3namespace CirrusSearch;
4
5use CirrusSearch\BuildDocument\BuildDocument;
6use CirrusSearch\BuildDocument\DocumentSizeLimiter;
7use CirrusSearch\Profile\SearchProfileService;
8use CirrusSearch\Search\CirrusIndexField;
9use MediaWiki\Content\TextContent;
10use MediaWiki\Logger\LoggerFactory;
11use MediaWiki\MediaWikiServices;
12use MediaWiki\Page\ProperPageIdentity;
13use MediaWiki\Page\WikiPage;
14use MediaWiki\Title\Title;
15use MediaWiki\WikiMap\WikiMap;
16use Wikimedia\Assert\Assert;
17
18/**
19 * Performs updates and deletes on the Elasticsearch index.  Called by
20 * CirrusSearch.php (our SearchEngine implementation), forceSearchIndex
21 * (for bulk updates), and CirrusSearch's jobs.
22 *
23 * @license GPL-2.0-or-later
24 */
25class Updater extends ElasticsearchIntermediary implements WeightedTagsUpdater {
26    /**
27     * Full title text of pages updated in this process.  Used for deduplication
28     * of updates.
29     * @var string[]
30     */
31    private $updated = [];
32
33    /**
34     * @var string|null Name of cluster to write to, or null if none (write to all)
35     */
36    protected $writeToClusterName;
37
38    /**
39     * @param Connection $readConnection connection used to pull data out of elasticsearch
40     * @param string|null $writeToClusterName
41     */
42    public function __construct( Connection $readConnection, $writeToClusterName = null ) {
43        parent::__construct( $readConnection, null, 0 );
44        $this->writeToClusterName = $writeToClusterName;
45    }
46
47    /**
48     * @param SearchConfig $config
49     * @param string|null $cluster cluster to read from and write to,
50     * null to read from the default cluster and write to all
51     * @return self
52     */
53    public static function build( SearchConfig $config, $cluster ): self {
54        Assert::invariant( self::class === static::class, 'Must be invoked as Updater::build( ... )' );
55        $connection = Connection::getPool( $config, $cluster );
56        return new self( $connection, $cluster );
57    }
58
59    /**
60     * Update a single page.
61     * @param Title $title
62     * @param string|null $updateKind kind of update to perform (used for monitoring)
63     * @param int|null $rootEventTime the time of MW event that caused this update (used for monitoring)
64     */
65    public function updateFromTitle( $title, ?string $updateKind, ?int $rootEventTime ): void {
66        [ $page, $redirects ] = $this->traceRedirects( $title );
67        if ( $page ) {
68            $this->updatePages(
69                [ $page ],
70                BuildDocument::INDEX_EVERYTHING,
71                $updateKind,
72                $rootEventTime
73            );
74        }
75
76        if ( $redirects === [] ) {
77            return;
78        }
79        $redirectDocIds = [];
80        foreach ( $redirects as $redirect ) {
81            $redirectDocIds[] = $this->connection->getConfig()->makeId( $redirect->getId() );
82        }
83        $this->deletePages( [], $redirectDocIds );
84    }
85
86    /**
87     * Trace redirects from the title to the destination.  Also registers the title in the
88     * memory of titles updated and detects special pages.
89     *
90     * @param Title $title title to trace
91     * @return array with keys: target, redirects
92     *    - target is WikiPage|null wikipage if the $title either isn't a redirect or resolves
93     *    to an updatable page that hasn't been updated yet.  Null if the page has been
94     *    updated, is a special page, or the redirects enter a loop.
95     *    - redirects is an array of WikiPages, one per redirect in the chain.  If title isn't
96     *    a redirect then this will be an empty array
97     */
98    public function traceRedirects( $title ) {
99        // Loop through redirects until we get to the ultimate target
100        $redirects = [];
101        $wikiPageFactory = MediaWikiServices::getInstance()->getWikiPageFactory();
102        while ( true ) {
103            $titleText = $title->getFullText();
104            if ( in_array( $titleText, $this->updated ) ) {
105                // Already indexed this article in this process.  This is mostly useful
106                // to catch self redirects but has a storied history of catching strange
107                // behavior.
108                return [ null, $redirects ];
109            }
110
111            // Don't index special pages, interwiki links, bad namespaces, etc
112            $logger = LoggerFactory::getInstance( 'CirrusSearch' );
113            if ( !$title->canExist() ) {
114                $logger->debug( "Ignoring an update for a page that cannot exist: $titleText" );
115                return [ null, $redirects ];
116            }
117
118            $page = $wikiPageFactory->newFromTitle( $title );
119            if ( !$page->exists() ) {
120                $logger->debug( "Ignoring an update for a nonexistent page: $titleText" );
121                return [ null, $redirects ];
122            }
123            $content = $page->getContent();
124            if ( is_string( $content ) ) {
125                $content = new TextContent( $content );
126            }
127            // If the event that the content is _still_ not usable, we have to give up.
128            if ( !is_object( $content ) ) {
129                return [ null, $redirects ];
130            }
131
132            // Add the page to the list of updated pages before we start trying to update to catch redirect loops.
133            $this->updated[] = $titleText;
134            if ( $content->isRedirect() ) {
135                $redirects[] = $page;
136                $target = $content->getRedirectTarget();
137                if ( $target->equals( $page->getTitle() ) ) {
138                    // This doesn't warn about redirect loops longer than one but we'll catch those anyway.
139                    $logger->info( "Title redirecting to itself. Skip indexing" );
140                    return [ null, $redirects ];
141                }
142                $title = $target;
143                continue;
144            } else {
145                return [ $page, $redirects ];
146            }
147        }
148    }
149
150    /**
151     * This updates pages in elasticsearch.
152     *
153     * $flags includes:
154     *   INDEX_EVERYTHING Cirrus will parse the page and count the links and send the document
155     *     to Elasticsearch as an index so if it doesn't exist it'll be created.
156     *   SKIP_PARSE Cirrus will skip parsing the page when building the document.  It makes
157     *     sense to do this when you know the page hasn't changed like when it is newly linked
158     *     from another page.
159     *   SKIP_LINKS Cirrus will skip collecting links information.  It makes sense to do this
160     *     when you know the link counts aren't yet available like during the first phase of
161     *     the two phase index build.
162     *   INDEX_ON_SKIP Cirrus will send an update if SKIP_PARSE or SKIP_LINKS rather than an
163     *     index.  Indexing with any portion of the document skipped is dangerous because it
164     *     can put half created pages in the index.  This is only a good idea during the first
165     *     half of the two phase index build.
166     *
167     * @param WikiPage[] $pages pages to update
168     * @param int $flags Bit field containing instructions about how the document should be built
169     *   and sent to Elasticsearch.
170     * @param string|null $updateKind kind of update to perform (used for monitoring)
171     * @param int|null $rootEventTime the time of MW event that caused this update (used for monitoring)
172     * @return int Number of documents updated
173     */
174    public function updatePages( $pages, $flags, ?string $updateKind = null, ?int $rootEventTime = null ): int {
175        // Don't update the same page twice. We shouldn't, but meh
176        $pageIds = [];
177        $pages = array_filter( $pages, static function ( WikiPage $page ) use ( &$pageIds ) {
178            if ( !in_array( $page->getId(), $pageIds ) ) {
179                $pageIds[] = $page->getId();
180                return true;
181            }
182            return false;
183        } );
184
185        $titles = $this->pagesToTitles( $pages );
186        Job\OtherIndex::queueIfRequired( $this->connection->getConfig(), $titles, $this->writeToClusterName );
187
188        $allDocuments = array_fill_keys( $this->connection->getAllIndexSuffixes(), [] );
189        $services = MediaWikiServices::getInstance();
190        $docSizeLimiter = new DocumentSizeLimiter(
191            $this->connection->getConfig()->getProfileService()->loadProfile( SearchProfileService::DOCUMENT_SIZE_LIMITER ) );
192        $builder = new BuildDocument(
193            $this->connection,
194            $services->getConnectionProvider()->getReplicaDatabase(),
195            $services->getRevisionStore(),
196            $services->getBacklinkCacheFactory(),
197            $docSizeLimiter,
198            $services->getTitleFormatter(),
199            $services->getWikiPageFactory(),
200            $services->getTitleFactory()
201        );
202        foreach ( $builder->initialize( $pages, $flags ) as $document ) {
203            // This isn't really a property of the connection, so it doesn't matter
204            // this is the read cluster and not the write cluster.
205            $suffix = $this->connection->getIndexSuffixForNamespace( $document->get( 'namespace' ) );
206            $allDocuments[$suffix][] = $document;
207        }
208
209        $count = 0;
210        foreach ( $allDocuments as $indexSuffix => $documents ) {
211            $this->pushElasticaWriteJobs(
212                UpdateGroup::PAGE,
213                $documents,
214                static function ( array $chunk, string $cluster ) use ( $indexSuffix, $updateKind, $rootEventTime ) {
215                    return Job\ElasticaWrite::build(
216                        $cluster,
217                        UpdateGroup::PAGE,
218                        'sendData',
219                        [ $indexSuffix, $chunk ],
220                        [],
221                        $updateKind,
222                        $rootEventTime
223                    );
224                } );
225            $count += count( $documents );
226        }
227
228        return $count;
229    }
230
231    /**
232     * @inheritDoc
233     */
234    public function updateWeightedTags(
235        ProperPageIdentity $page,
236        string $tagPrefix,
237        ?array $tagWeights = null,
238        ?string $trigger = null
239    ): void {
240        Assert::precondition( $page->exists(), "page must exist" );
241        $docId = $this->connection->getConfig()->makeId( $page->getId() );
242        $indexSuffix = $this->connection->getIndexSuffixForNamespace( $page->getNamespace() );
243        $this->pushElasticaWriteJobs(
244            UpdateGroup::WEIGHTED_TAGS,
245            [ $docId ],
246            static function ( array $docIds, string $cluster ) use (
247                $docId,
248                $indexSuffix,
249                $tagPrefix,
250                $tagWeights
251            ) {
252                return Job\ElasticaWrite::build(
253                    $cluster,
254                    UpdateGroup::WEIGHTED_TAGS,
255                    'sendWeightedTagsUpdate',
256                    [
257                        $indexSuffix,
258                        $tagPrefix,
259                        [ $docId => $tagWeights ]
260                    ],
261                );
262            } );
263    }
264
265    /**
266     * @inheritDoc
267     */
268    public function resetWeightedTags( ProperPageIdentity $page, array $tagPrefixes, ?string $trigger = null ): void {
269        foreach ( $tagPrefixes as $tagPrefix ) {
270            $this->updateWeightedTags(
271                $page,
272                $tagPrefix,
273                [ CirrusIndexField::MULTILIST_DELETE_GROUPING => null ],
274                $trigger
275            );
276        }
277    }
278
279    /**
280     * Delete pages from the elasticsearch index.  $titles and $docIds must point to the
281     * same pages and should point to them in the same order.
282     *
283     * @param Title[] $titles List of titles to delete.  If empty then skipped other index
284     *      maintenance is skipped.
285     * @param int[]|string[] $docIds List of elasticsearch document ids to delete
286     * @param string|null $indexSuffix index from which to delete.  null means all.
287     * @param array $writeJobParams Parameters passed on to ElasticaWriteJob
288     */
289    public function deletePages( $titles, $docIds, $indexSuffix = null, array $writeJobParams = [] ): void {
290        Job\OtherIndex::queueIfRequired( $this->connection->getConfig(), $titles, $this->writeToClusterName );
291
292        // Deletes are fairly cheap to send, they can be batched in larger
293        // chunks. Unlikely a batch this large ever comes through.
294        $batchSize = 50;
295        $this->pushElasticaWriteJobs(
296            UpdateGroup::PAGE,
297            $docIds,
298            static function ( array $chunk, string $cluster ) use ( $indexSuffix, $writeJobParams ) {
299                return Job\ElasticaWrite::build(
300                    $cluster,
301                    UpdateGroup::PAGE,
302                    'sendDeletes',
303                    [ $chunk, $indexSuffix ],
304                    $writeJobParams
305                );
306            },
307            $batchSize
308        );
309    }
310
311    /**
312     * Add documents to archive index.
313     * @param array $archived
314     * @return bool
315     */
316    public function archivePages( $archived ) {
317        if ( !$this->connection->getConfig()->getElement( 'CirrusSearchIndexDeletes' ) ) {
318            // Disabled by config - don't do anything
319            return true;
320        }
321        $docs = $this->buildArchiveDocuments( $archived );
322        $this->pushElasticaWriteJobs(
323            UpdateGroup::ARCHIVE,
324            $docs,
325            static function ( array $chunk, string $cluster ) {
326                return Job\ElasticaWrite::build(
327                    $cluster,
328                    UpdateGroup::ARCHIVE,
329                    'sendData',
330                    [ Connection::ARCHIVE_INDEX_SUFFIX, $chunk ],
331                    [ 'private_data' => true ],
332                );
333            } );
334
335        return true;
336    }
337
338    /**
339     * Build Elastica documents for archived pages.
340     * @param array $archived
341     * @return \Elastica\Document[]
342     */
343    private function buildArchiveDocuments( array $archived ) {
344        $docs = [];
345        foreach ( $archived as $delete ) {
346            if ( !isset( $delete['title'] ) ) {
347                // These come from pages that still exist, but are redirects.
348                // This is non-obvious and we probably need a better way...
349                continue;
350            }
351            /** @var Title $title */
352            $title = $delete['title'];
353            $doc = new \Elastica\Document( $delete['page'], [
354                'namespace' => $title->getNamespace(),
355                'title' => $title->getText(),
356                'wiki' => WikiMap::getCurrentWikiId(),
357            ] );
358            $doc->setDocAsUpsert( true );
359            $doc->setRetryOnConflict( $this->connection->getConfig()->getElement( 'CirrusSearchUpdateConflictRetryCount' ) );
360
361            $docs[] = $doc;
362        }
363
364        return $docs;
365    }
366
367    /**
368     * Update the search index for newly linked or unlinked articles.
369     * @param Title[] $titles titles to update
370     */
371    public function updateLinkedArticles( $titles ): void {
372        $pages = [];
373        $wikiPageFactory = MediaWikiServices::getInstance()->getWikiPageFactory();
374        foreach ( $titles as $title ) {
375            // Special pages don't get updated, we only index
376            // actual existing pages.
377            if ( !$title || !$title->canExist() ) {
378                continue;
379            }
380
381            $page = $wikiPageFactory->newFromTitle( $title );
382            if ( $page === null || !$page->exists() ) {
383                // Skip link to nonexistent page.
384                continue;
385            }
386            // Resolve one level of redirects because only one level of redirects is scored.
387            if ( $page->isRedirect() ) {
388                $target = $page->getRedirectTarget();
389                if ( $target === null ) {
390                    // Redirect to itself or broken redirect? ignore.
391                    continue;
392                }
393                if ( !$target->exists() ) {
394                    // Skip redirects to nonexistent pages
395                    continue;
396                }
397                $page = $wikiPageFactory->newFromTitle( $target );
398            }
399            if ( $page->isRedirect() ) {
400                // This is a redirect to a redirect which doesn't count in the search score any way.
401                continue;
402            }
403            if ( in_array( $title->getFullText(), $this->updated ) ) {
404                // We've already updated this page in this process so there is no need to update it again.
405                continue;
406            }
407            // Note that we don't add this page to the list of updated pages because this update isn't
408            // a full update (just link counts).
409            $pages[] = $page;
410        }
411        $this->updatePages( $pages, BuildDocument::SKIP_PARSE );
412    }
413
414    /**
415     * Convert an array of pages to an array of their titles.
416     *
417     * @param WikiPage[] $pages
418     * @return Title[]
419     */
420    private function pagesToTitles( $pages ) {
421        $titles = [];
422        foreach ( $pages as $page ) {
423            $titles[] = $page->getTitle();
424        }
425        return $titles;
426    }
427
428    /**
429     * @param string $updateGroup UpdateGroup::* constant
430     * @param mixed[] $items
431     * @param callable $factory
432     * @param int $batchSize
433     */
434    protected function pushElasticaWriteJobs( string $updateGroup, array $items, $factory, int $batchSize = 10 ): void {
435        // Elasticsearch has a queue capacity of 50 so if $documents contains 50 pages it could bump up
436        // against the max.  So we chunk it and do them sequentially.
437        $config = $this->connection->getConfig();
438        $clusters = $this->elasticaWriteClusters( $updateGroup );
439
440        foreach ( array_chunk( $items, $batchSize ) as $chunked ) {
441            foreach ( $clusters as $cluster ) {
442                $job = $factory( $chunked, $cluster );
443                // If the job fails for any reason it will enqueue itself to retry later.
444                $job->run();
445            }
446        }
447    }
448
449    private function elasticaWriteClusters( string $updateGroup ): array {
450        if ( $this->writeToClusterName !== null ) {
451            return [ $this->writeToClusterName ];
452        } else {
453            return $this->connection
454                ->getConfig()
455                ->getClusterAssignment()
456                ->getWritableClusters( $updateGroup );
457        }
458    }
459
460    /**
461     * @param string $description
462     * @param string $queryType
463     * @param string[] $extra
464     * @return SearchRequestLog
465     */
466    protected function newLog( $description, $queryType, array $extra = [] ) {
467        return new SearchRequestLog(
468            $this->connection->getClient(),
469            $description,
470            $queryType,
471            $extra
472        );
473    }
474}