Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 217
0.00% covered (danger)
0.00%
0 / 15
CRAP
0.00% covered (danger)
0.00%
0 / 1
Updater
0.00% covered (danger)
0.00%
0 / 217
0.00% covered (danger)
0.00%
0 / 15
2450
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 build
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 updateFromTitle
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
20
 traceRedirects
0.00% covered (danger)
0.00%
0 / 29
0.00% covered (danger)
0.00%
0 / 1
90
 updatePages
0.00% covered (danger)
0.00%
0 / 44
0.00% covered (danger)
0.00%
0 / 1
20
 updateWeightedTags
0.00% covered (danger)
0.00%
0 / 23
0.00% covered (danger)
0.00%
0 / 1
2
 resetWeightedTags
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 deletePages
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
2
 archivePages
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
6
 buildArchiveDocuments
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
12
 updateLinkedArticles
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
132
 pagesToTitles
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 pushElasticaWriteJobs
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
30
 elasticaWriteClusters
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 newLog
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3namespace CirrusSearch;
4
5use CirrusSearch\BuildDocument\BuildDocument;
6use CirrusSearch\BuildDocument\DocumentSizeLimiter;
7use CirrusSearch\Profile\SearchProfileService;
8use CirrusSearch\Search\CirrusIndexField;
9use MediaWiki\Content\TextContent;
10use MediaWiki\Logger\LoggerFactory;
11use MediaWiki\MediaWikiServices;
12use MediaWiki\Page\ProperPageIdentity;
13use MediaWiki\Title\Title;
14use MediaWiki\WikiMap\WikiMap;
15use Wikimedia\Assert\Assert;
16use WikiPage;
17
18/**
19 * Performs updates and deletes on the Elasticsearch index.  Called by
20 * CirrusSearch.php (our SearchEngine implementation), forceSearchIndex
21 * (for bulk updates), and CirrusSearch's jobs.
22 *
23 * This program is free software; you can redistribute it and/or modify
24 * it under the terms of the GNU General Public License as published by
25 * the Free Software Foundation; either version 2 of the License, or
26 * (at your option) any later version.
27 *
28 * This program is distributed in the hope that it will be useful,
29 * but WITHOUT ANY WARRANTY; without even the implied warranty of
30 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 * GNU General Public License for more details.
32 *
33 * You should have received a copy of the GNU General Public License along
34 * with this program; if not, write to the Free Software Foundation, Inc.,
35 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
36 * http://www.gnu.org/copyleft/gpl.html
37 */
38class Updater extends ElasticsearchIntermediary implements WeightedTagsUpdater {
39    /**
40     * Full title text of pages updated in this process.  Used for deduplication
41     * of updates.
42     * @var string[]
43     */
44    private $updated = [];
45
46    /**
47     * @var string|null Name of cluster to write to, or null if none (write to all)
48     */
49    protected $writeToClusterName;
50
51    /**
52     * @param Connection $readConnection connection used to pull data out of elasticsearch
53     * @param string|null $writeToClusterName
54     */
55    public function __construct( Connection $readConnection, $writeToClusterName = null ) {
56        parent::__construct( $readConnection, null, 0 );
57        $this->writeToClusterName = $writeToClusterName;
58    }
59
60    /**
61     * @param SearchConfig $config
62     * @param string|null $cluster cluster to read from and write to,
63     * null to read from the default cluster and write to all
64     * @return Updater
65     */
66    public static function build( SearchConfig $config, $cluster ): Updater {
67        Assert::invariant( self::class === static::class, 'Must be invoked as Updater::build( ... )' );
68        $connection = Connection::getPool( $config, $cluster );
69        return new self( $connection, $cluster );
70    }
71
72    /**
73     * Update a single page.
74     * @param Title $title
75     * @param string|null $updateKind kind of update to perform (used for monitoring)
76     * @param int|null $rootEventTime the time of MW event that caused this update (used for monitoring)
77     */
78    public function updateFromTitle( $title, ?string $updateKind, ?int $rootEventTime ): void {
79        [ $page, $redirects ] = $this->traceRedirects( $title );
80        if ( $page ) {
81            $this->updatePages(
82                [ $page ],
83                BuildDocument::INDEX_EVERYTHING,
84                $updateKind,
85                $rootEventTime
86            );
87        }
88
89        if ( $redirects === [] ) {
90            return;
91        }
92        $redirectDocIds = [];
93        foreach ( $redirects as $redirect ) {
94            $redirectDocIds[] = $this->connection->getConfig()->makeId( $redirect->getId() );
95        }
96        $this->deletePages( [], $redirectDocIds );
97    }
98
99    /**
100     * Trace redirects from the title to the destination.  Also registers the title in the
101     * memory of titles updated and detects special pages.
102     *
103     * @param Title $title title to trace
104     * @return array with keys: target, redirects
105     *    - target is WikiPage|null wikipage if the $title either isn't a redirect or resolves
106     *    to an updatable page that hasn't been updated yet.  Null if the page has been
107     *    updated, is a special page, or the redirects enter a loop.
108     *    - redirects is an array of WikiPages, one per redirect in the chain.  If title isn't
109     *    a redirect then this will be an empty array
110     */
111    public function traceRedirects( $title ) {
112        // Loop through redirects until we get to the ultimate target
113        $redirects = [];
114        $wikiPageFactory = MediaWikiServices::getInstance()->getWikiPageFactory();
115        while ( true ) {
116            $titleText = $title->getFullText();
117            if ( in_array( $titleText, $this->updated ) ) {
118                // Already indexed this article in this process.  This is mostly useful
119                // to catch self redirects but has a storied history of catching strange
120                // behavior.
121                return [ null, $redirects ];
122            }
123
124            // Don't index special pages, interwiki links, bad namespaces, etc
125            $logger = LoggerFactory::getInstance( 'CirrusSearch' );
126            if ( !$title->canExist() ) {
127                $logger->debug( "Ignoring an update for a page that cannot exist: $titleText" );
128                return [ null, $redirects ];
129            }
130
131            $page = $wikiPageFactory->newFromTitle( $title );
132            if ( !$page->exists() ) {
133                $logger->debug( "Ignoring an update for a nonexistent page: $titleText" );
134                return [ null, $redirects ];
135            }
136            $content = $page->getContent();
137            if ( is_string( $content ) ) {
138                $content = new TextContent( $content );
139            }
140            // If the event that the content is _still_ not usable, we have to give up.
141            if ( !is_object( $content ) ) {
142                return [ null, $redirects ];
143            }
144
145            // Add the page to the list of updated pages before we start trying to update to catch redirect loops.
146            $this->updated[] = $titleText;
147            if ( $content->isRedirect() ) {
148                $redirects[] = $page;
149                $target = $content->getRedirectTarget();
150                if ( $target->equals( $page->getTitle() ) ) {
151                    // This doesn't warn about redirect loops longer than one but we'll catch those anyway.
152                    $logger->info( "Title redirecting to itself. Skip indexing" );
153                    return [ null, $redirects ];
154                }
155                $title = $target;
156                continue;
157            } else {
158                return [ $page, $redirects ];
159            }
160        }
161    }
162
163    /**
164     * This updates pages in elasticsearch.
165     *
166     * $flags includes:
167     *   INDEX_EVERYTHING Cirrus will parse the page and count the links and send the document
168     *     to Elasticsearch as an index so if it doesn't exist it'll be created.
169     *   SKIP_PARSE Cirrus will skip parsing the page when building the document.  It makes
170     *     sense to do this when you know the page hasn't changed like when it is newly linked
171     *     from another page.
172     *   SKIP_LINKS Cirrus will skip collecting links information.  It makes sense to do this
173     *     when you know the link counts aren't yet available like during the first phase of
174     *     the two phase index build.
175     *   INDEX_ON_SKIP Cirrus will send an update if SKIP_PARSE or SKIP_LINKS rather than an
176     *     index.  Indexing with any portion of the document skipped is dangerous because it
177     *     can put half created pages in the index.  This is only a good idea during the first
178     *     half of the two phase index build.
179     *
180     * @param WikiPage[] $pages pages to update
181     * @param int $flags Bit field containing instructions about how the document should be built
182     *   and sent to Elasticsearch.
183     * @param string|null $updateKind kind of update to perform (used for monitoring)
184     * @param int|null $rootEventTime the time of MW event that caused this update (used for monitoring)
185     * @return int Number of documents updated
186     */
187    public function updatePages( $pages, $flags, ?string $updateKind = null, ?int $rootEventTime = null ): int {
188        // Don't update the same page twice. We shouldn't, but meh
189        $pageIds = [];
190        $pages = array_filter( $pages, static function ( WikiPage $page ) use ( &$pageIds ) {
191            if ( !in_array( $page->getId(), $pageIds ) ) {
192                $pageIds[] = $page->getId();
193                return true;
194            }
195            return false;
196        } );
197
198        $titles = $this->pagesToTitles( $pages );
199        Job\OtherIndex::queueIfRequired( $this->connection->getConfig(), $titles, $this->writeToClusterName );
200
201        $allDocuments = array_fill_keys( $this->connection->getAllIndexSuffixes(), [] );
202        $services = MediaWikiServices::getInstance();
203        $docSizeLimiter = new DocumentSizeLimiter(
204            $this->connection->getConfig()->getProfileService()->loadProfile( SearchProfileService::DOCUMENT_SIZE_LIMITER ) );
205        $builder = new BuildDocument(
206            $this->connection,
207            $services->getConnectionProvider()->getReplicaDatabase(),
208            $services->getRevisionStore(),
209            $services->getBacklinkCacheFactory(),
210            $docSizeLimiter,
211            $services->getTitleFormatter(),
212            $services->getWikiPageFactory(),
213            $services->getTitleFactory()
214        );
215        foreach ( $builder->initialize( $pages, $flags ) as $document ) {
216            // This isn't really a property of the connection, so it doesn't matter
217            // this is the read cluster and not the write cluster.
218            $suffix = $this->connection->getIndexSuffixForNamespace( $document->get( 'namespace' ) );
219            $allDocuments[$suffix][] = $document;
220        }
221
222        $count = 0;
223        foreach ( $allDocuments as $indexSuffix => $documents ) {
224            $this->pushElasticaWriteJobs(
225                UpdateGroup::PAGE,
226                $documents,
227                static function ( array $chunk, ClusterSettings $cluster ) use ( $indexSuffix, $updateKind, $rootEventTime ) {
228                    return Job\ElasticaWrite::build(
229                        $cluster,
230                        UpdateGroup::PAGE,
231                        'sendData',
232                        [ $indexSuffix, $chunk ],
233                        [],
234                        $updateKind,
235                        $rootEventTime
236                    );
237                } );
238            $count += count( $documents );
239        }
240
241        return $count;
242    }
243
244    /**
245     * @inheritDoc
246     */
247    public function updateWeightedTags(
248        ProperPageIdentity $page,
249        string $tagPrefix,
250        ?array $tagWeights = null,
251        ?string $trigger = null
252    ): void {
253        Assert::precondition( $page->exists(), "page must exist" );
254        $docId = $this->connection->getConfig()->makeId( $page->getId() );
255        $indexSuffix = $this->connection->getIndexSuffixForNamespace( $page->getNamespace() );
256        $this->pushElasticaWriteJobs(
257            UpdateGroup::WEIGHTED_TAGS,
258            [ $docId ],
259            static function ( array $docIds, ClusterSettings $cluster ) use (
260                $docId,
261                $indexSuffix,
262                $tagPrefix,
263                $tagWeights
264            ) {
265                return Job\ElasticaWrite::build(
266                    $cluster,
267                    UpdateGroup::WEIGHTED_TAGS,
268                    'sendWeightedTagsUpdate',
269                    [
270                        $indexSuffix,
271                        $tagPrefix,
272                        [ $docId => $tagWeights ]
273                    ],
274                );
275            } );
276    }
277
278    /**
279     * @inheritDoc
280     */
281    public function resetWeightedTags( ProperPageIdentity $page, array $tagPrefixes, ?string $trigger = null ): void {
282        foreach ( $tagPrefixes as $tagPrefix ) {
283            $this->updateWeightedTags(
284                $page,
285                $tagPrefix,
286                [ CirrusIndexField::MULTILIST_DELETE_GROUPING => null ],
287                $trigger
288            );
289        }
290    }
291
292    /**
293     * Delete pages from the elasticsearch index.  $titles and $docIds must point to the
294     * same pages and should point to them in the same order.
295     *
296     * @param Title[] $titles List of titles to delete.  If empty then skipped other index
297     *      maintenance is skipped.
298     * @param int[]|string[] $docIds List of elasticsearch document ids to delete
299     * @param string|null $indexSuffix index from which to delete.  null means all.
300     * @param array $writeJobParams Parameters passed on to ElasticaWriteJob
301     */
302    public function deletePages( $titles, $docIds, $indexSuffix = null, array $writeJobParams = [] ): void {
303        Job\OtherIndex::queueIfRequired( $this->connection->getConfig(), $titles, $this->writeToClusterName );
304
305        // Deletes are fairly cheap to send, they can be batched in larger
306        // chunks. Unlikely a batch this large ever comes through.
307        $batchSize = 50;
308        $this->pushElasticaWriteJobs(
309            UpdateGroup::PAGE,
310            $docIds,
311            static function ( array $chunk, ClusterSettings $cluster ) use ( $indexSuffix, $writeJobParams ) {
312                return Job\ElasticaWrite::build(
313                    $cluster,
314                    UpdateGroup::PAGE,
315                    'sendDeletes',
316                    [ $chunk, $indexSuffix ],
317                    $writeJobParams
318                );
319            },
320            $batchSize
321        );
322    }
323
324    /**
325     * Add documents to archive index.
326     * @param array $archived
327     * @return bool
328     */
329    public function archivePages( $archived ) {
330        if ( !$this->connection->getConfig()->getElement( 'CirrusSearchIndexDeletes' ) ) {
331            // Disabled by config - don't do anything
332            return true;
333        }
334        $docs = $this->buildArchiveDocuments( $archived );
335        $this->pushElasticaWriteJobs(
336            UpdateGroup::ARCHIVE,
337            $docs,
338            static function ( array $chunk, ClusterSettings $cluster ) {
339                return Job\ElasticaWrite::build(
340                    $cluster,
341                    UpdateGroup::ARCHIVE,
342                    'sendData',
343                    [ Connection::ARCHIVE_INDEX_SUFFIX, $chunk ],
344                    [ 'private_data' => true ],
345                );
346            } );
347
348        return true;
349    }
350
351    /**
352     * Build Elastica documents for archived pages.
353     * @param array $archived
354     * @return \Elastica\Document[]
355     */
356    private function buildArchiveDocuments( array $archived ) {
357        $docs = [];
358        foreach ( $archived as $delete ) {
359            if ( !isset( $delete['title'] ) ) {
360                // These come from pages that still exist, but are redirects.
361                // This is non-obvious and we probably need a better way...
362                continue;
363            }
364            /** @var Title $title */
365            $title = $delete['title'];
366            $doc = new \Elastica\Document( $delete['page'], [
367                'namespace' => $title->getNamespace(),
368                'title' => $title->getText(),
369                'wiki' => WikiMap::getCurrentWikiId(),
370            ] );
371            $doc->setDocAsUpsert( true );
372            $doc->setRetryOnConflict( $this->connection->getConfig()->getElement( 'CirrusSearchUpdateConflictRetryCount' ) );
373
374            $docs[] = $doc;
375        }
376
377        return $docs;
378    }
379
380    /**
381     * Update the search index for newly linked or unlinked articles.
382     * @param Title[] $titles titles to update
383     */
384    public function updateLinkedArticles( $titles ): void {
385        $pages = [];
386        $wikiPageFactory = MediaWikiServices::getInstance()->getWikiPageFactory();
387        foreach ( $titles as $title ) {
388            // Special pages don't get updated, we only index
389            // actual existing pages.
390            if ( !$title || !$title->canExist() ) {
391                continue;
392            }
393
394            $page = $wikiPageFactory->newFromTitle( $title );
395            if ( $page === null || !$page->exists() ) {
396                // Skip link to nonexistent page.
397                continue;
398            }
399            // Resolve one level of redirects because only one level of redirects is scored.
400            if ( $page->isRedirect() ) {
401                $target = $page->getRedirectTarget();
402                if ( $target === null ) {
403                    // Redirect to itself or broken redirect? ignore.
404                    continue;
405                }
406                if ( !$target->exists() ) {
407                    // Skip redirects to nonexistent pages
408                    continue;
409                }
410                $page = $wikiPageFactory->newFromTitle( $target );
411            }
412            if ( $page->isRedirect() ) {
413                // This is a redirect to a redirect which doesn't count in the search score any way.
414                continue;
415            }
416            if ( in_array( $title->getFullText(), $this->updated ) ) {
417                // We've already updated this page in this process so there is no need to update it again.
418                continue;
419            }
420            // Note that we don't add this page to the list of updated pages because this update isn't
421            // a full update (just link counts).
422            $pages[] = $page;
423        }
424        $this->updatePages( $pages, BuildDocument::SKIP_PARSE );
425    }
426
427    /**
428     * Convert an array of pages to an array of their titles.
429     *
430     * @param WikiPage[] $pages
431     * @return Title[]
432     */
433    private function pagesToTitles( $pages ) {
434        $titles = [];
435        foreach ( $pages as $page ) {
436            $titles[] = $page->getTitle();
437        }
438        return $titles;
439    }
440
441    /**
442     * @param string $updateGroup UpdateGroup::* constant
443     * @param mixed[] $items
444     * @param callable $factory
445     * @param int $batchSize
446     */
447    protected function pushElasticaWriteJobs( string $updateGroup, array $items, $factory, int $batchSize = 10 ): void {
448        // Elasticsearch has a queue capacity of 50 so if $documents contains 50 pages it could bump up
449        // against the max.  So we chunk it and do them sequentially.
450        $jobs = [];
451        $config = $this->connection->getConfig();
452        $clusters = $this->elasticaWriteClusters( $updateGroup );
453
454        foreach ( array_chunk( $items, $batchSize ) as $chunked ) {
455            // Queueing one job per cluster ensures isolation. If one clusters falls
456            // behind on writes the others shouldn't notice.
457            // Unfortunately queueing a job per cluster results in quite a few
458            // jobs to run. If the job queue can't keep up some clusters can
459            // be run in-process. Any failures will queue themselves for later
460            // execution.
461            foreach ( $clusters as $cluster ) {
462                $clusterSettings = new ClusterSettings( $config, $cluster );
463                $job = $factory( $chunked, $clusterSettings );
464                if ( $clusterSettings->isIsolated() ) {
465                    $jobs[] = $job;
466                } else {
467                    $job->run();
468                }
469            }
470        }
471
472        if ( $jobs ) {
473            MediaWikiServices::getInstance()->getJobQueueGroup()->push( $jobs );
474        }
475    }
476
477    private function elasticaWriteClusters( string $updateGroup ): array {
478        if ( $this->writeToClusterName !== null ) {
479            return [ $this->writeToClusterName ];
480        } else {
481            return $this->connection
482                ->getConfig()
483                ->getClusterAssignment()
484                ->getWritableClusters( $updateGroup );
485        }
486    }
487
488    /**
489     * @param string $description
490     * @param string $queryType
491     * @param string[] $extra
492     * @return SearchRequestLog
493     */
494    protected function newLog( $description, $queryType, array $extra = [] ) {
495        return new SearchRequestLog(
496            $this->connection->getClient(),
497            $description,
498            $queryType,
499            $extra
500        );
501    }
502}