Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 219
0.00% covered (danger)
0.00%
0 / 15
CRAP
0.00% covered (danger)
0.00%
0 / 1
Updater
0.00% covered (danger)
0.00%
0 / 219
0.00% covered (danger)
0.00%
0 / 15
2450
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 build
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 updateFromTitle
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
20
 traceRedirects
0.00% covered (danger)
0.00%
0 / 29
0.00% covered (danger)
0.00%
0 / 1
90
 updatePages
0.00% covered (danger)
0.00%
0 / 43
0.00% covered (danger)
0.00%
0 / 1
20
 updateWeightedTags
0.00% covered (danger)
0.00%
0 / 17
0.00% covered (danger)
0.00%
0 / 1
6
 resetWeightedTags
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
2
 deletePages
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
2
 archivePages
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
6
 buildArchiveDocuments
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
12
 updateLinkedArticles
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
132
 pagesToTitles
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 pushElasticaWriteJobs
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
30
 elasticaWriteClusters
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 newLog
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3namespace CirrusSearch;
4
5use CirrusSearch\BuildDocument\BuildDocument;
6use CirrusSearch\BuildDocument\DocumentSizeLimiter;
7use CirrusSearch\Profile\SearchProfileService;
8use MediaWiki\Logger\LoggerFactory;
9use MediaWiki\MediaWikiServices;
10use MediaWiki\Page\ProperPageIdentity;
11use MediaWiki\Title\Title;
12use MediaWiki\WikiMap\WikiMap;
13use TextContent;
14use Wikimedia\Assert\Assert;
15use WikiPage;
16
17/**
18 * Performs updates and deletes on the Elasticsearch index.  Called by
19 * CirrusSearch.php (our SearchEngine implementation), forceSearchIndex
20 * (for bulk updates), and CirrusSearch's jobs.
21 *
22 * This program is free software; you can redistribute it and/or modify
23 * it under the terms of the GNU General Public License as published by
24 * the Free Software Foundation; either version 2 of the License, or
25 * (at your option) any later version.
26 *
27 * This program is distributed in the hope that it will be useful,
28 * but WITHOUT ANY WARRANTY; without even the implied warranty of
29 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 * GNU General Public License for more details.
31 *
32 * You should have received a copy of the GNU General Public License along
33 * with this program; if not, write to the Free Software Foundation, Inc.,
34 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
35 * http://www.gnu.org/copyleft/gpl.html
36 */
37class Updater extends ElasticsearchIntermediary {
38    /**
39     * Full title text of pages updated in this process.  Used for deduplication
40     * of updates.
41     * @var string[]
42     */
43    private $updated = [];
44
45    /**
46     * @var string|null Name of cluster to write to, or null if none (write to all)
47     */
48    protected $writeToClusterName;
49
50    /**
51     * @param Connection $readConnection connection used to pull data out of elasticsearch
52     * @param string|null $writeToClusterName
53     */
54    public function __construct( Connection $readConnection, $writeToClusterName = null ) {
55        parent::__construct( $readConnection, null, 0 );
56        $this->writeToClusterName = $writeToClusterName;
57    }
58
59    /**
60     * @param SearchConfig $config
61     * @param string|null $cluster cluster to read from and write to,
62     * null to read from the default cluster and write to all
63     * @return Updater
64     */
65    public static function build( SearchConfig $config, $cluster ): Updater {
66        Assert::invariant( self::class === static::class, 'Must be invoked as Updater::build( ... )' );
67        $connection = Connection::getPool( $config, $cluster );
68        return new self( $connection, $cluster );
69    }
70
71    /**
72     * Update a single page.
73     * @param Title $title
74     * @param string|null $updateKind kind of update to perform (used for monitoring)
75     * @param int|null $rootEventTime the time of MW event that caused this update (used for monitoring)
76     */
77    public function updateFromTitle( $title, ?string $updateKind, ?int $rootEventTime ): void {
78        [ $page, $redirects ] = $this->traceRedirects( $title );
79        if ( $page ) {
80            $this->updatePages(
81                [ $page ],
82                BuildDocument::INDEX_EVERYTHING,
83                $updateKind,
84                $rootEventTime
85            );
86        }
87
88        if ( $redirects === [] ) {
89            return;
90        }
91        $redirectDocIds = [];
92        foreach ( $redirects as $redirect ) {
93            $redirectDocIds[] = $this->connection->getConfig()->makeId( $redirect->getId() );
94        }
95        $this->deletePages( [], $redirectDocIds );
96    }
97
98    /**
99     * Trace redirects from the title to the destination.  Also registers the title in the
100     * memory of titles updated and detects special pages.
101     *
102     * @param Title $title title to trace
103     * @return array with keys: target, redirects
104     *    - target is WikiPage|null wikipage if the $title either isn't a redirect or resolves
105     *    to an updatable page that hasn't been updated yet.  Null if the page has been
106     *    updated, is a special page, or the redirects enter a loop.
107     *    - redirects is an array of WikiPages, one per redirect in the chain.  If title isn't
108     *    a redirect then this will be an empty array
109     */
110    public function traceRedirects( $title ) {
111        // Loop through redirects until we get to the ultimate target
112        $redirects = [];
113        $wikiPageFactory = MediaWikiServices::getInstance()->getWikiPageFactory();
114        while ( true ) {
115            $titleText = $title->getFullText();
116            if ( in_array( $titleText, $this->updated ) ) {
117                // Already indexed this article in this process.  This is mostly useful
118                // to catch self redirects but has a storied history of catching strange
119                // behavior.
120                return [ null, $redirects ];
121            }
122
123            // Don't index special pages, interwiki links, bad namespaces, etc
124            $logger = LoggerFactory::getInstance( 'CirrusSearch' );
125            if ( !$title->canExist() ) {
126                $logger->debug( "Ignoring an update for a page that cannot exist: $titleText" );
127                return [ null, $redirects ];
128            }
129
130            $page = $wikiPageFactory->newFromTitle( $title );
131            if ( !$page->exists() ) {
132                $logger->debug( "Ignoring an update for a nonexistent page: $titleText" );
133                return [ null, $redirects ];
134            }
135            $content = $page->getContent();
136            if ( is_string( $content ) ) {
137                $content = new TextContent( $content );
138            }
139            // If the event that the content is _still_ not usable, we have to give up.
140            if ( !is_object( $content ) ) {
141                return [ null, $redirects ];
142            }
143
144            // Add the page to the list of updated pages before we start trying to update to catch redirect loops.
145            $this->updated[] = $titleText;
146            if ( $content->isRedirect() ) {
147                $redirects[] = $page;
148                $target = $content->getRedirectTarget();
149                if ( $target->equals( $page->getTitle() ) ) {
150                    // This doesn't warn about redirect loops longer than one but we'll catch those anyway.
151                    $logger->info( "Title redirecting to itself. Skip indexing" );
152                    return [ null, $redirects ];
153                }
154                $title = $target;
155                continue;
156            } else {
157                return [ $page, $redirects ];
158            }
159        }
160    }
161
162    /**
163     * This updates pages in elasticsearch.
164     *
165     * $flags includes:
166     *   INDEX_EVERYTHING Cirrus will parse the page and count the links and send the document
167     *     to Elasticsearch as an index so if it doesn't exist it'll be created.
168     *   SKIP_PARSE Cirrus will skip parsing the page when building the document.  It makes
169     *     sense to do this when you know the page hasn't changed like when it is newly linked
170     *     from another page.
171     *   SKIP_LINKS Cirrus will skip collecting links information.  It makes sense to do this
172     *     when you know the link counts aren't yet available like during the first phase of
173     *     the two phase index build.
174     *   INDEX_ON_SKIP Cirrus will send an update if SKIP_PARSE or SKIP_LINKS rather than an
175     *     index.  Indexing with any portion of the document skipped is dangerous because it
176     *     can put half created pages in the index.  This is only a good idea during the first
177     *     half of the two phase index build.
178     *
179     * @param WikiPage[] $pages pages to update
180     * @param int $flags Bit field containing instructions about how the document should be built
181     *   and sent to Elasticsearch.
182     * @param string|null $updateKind kind of update to perform (used for monitoring)
183     * @param int|null $rootEventTime the time of MW event that caused this update (used for monitoring)
184     * @return int Number of documents updated
185     */
186    public function updatePages( $pages, $flags, string $updateKind = null, int $rootEventTime = null ): int {
187        // Don't update the same page twice. We shouldn't, but meh
188        $pageIds = [];
189        $pages = array_filter( $pages, static function ( WikiPage $page ) use ( &$pageIds ) {
190            if ( !in_array( $page->getId(), $pageIds ) ) {
191                $pageIds[] = $page->getId();
192                return true;
193            }
194            return false;
195        } );
196
197        $titles = $this->pagesToTitles( $pages );
198        Job\OtherIndex::queueIfRequired( $this->connection->getConfig(), $titles, $this->writeToClusterName );
199
200        $allDocuments = array_fill_keys( $this->connection->getAllIndexSuffixes(), [] );
201        $services = MediaWikiServices::getInstance();
202        $docSizeLimiter = new DocumentSizeLimiter(
203            $this->connection->getConfig()->getProfileService()->loadProfile( SearchProfileService::DOCUMENT_SIZE_LIMITER ) );
204        $builder = new BuildDocument(
205            $this->connection,
206            $services->getDBLoadBalancer()->getConnection( DB_REPLICA ),
207            $services->getRevisionStore(),
208            $services->getBacklinkCacheFactory(),
209            $docSizeLimiter,
210            $services->getTitleFormatter(),
211            $services->getWikiPageFactory()
212        );
213        foreach ( $builder->initialize( $pages, $flags ) as $document ) {
214            // This isn't really a property of the connection, so it doesn't matter
215            // this is the read cluster and not the write cluster.
216            $suffix = $this->connection->getIndexSuffixForNamespace( $document->get( 'namespace' ) );
217            $allDocuments[$suffix][] = $document;
218        }
219
220        $count = 0;
221        foreach ( $allDocuments as $indexSuffix => $documents ) {
222            $this->pushElasticaWriteJobs(
223                UpdateGroup::PAGE,
224                $documents,
225                static function ( array $chunk, ClusterSettings $cluster ) use ( $indexSuffix, $updateKind, $rootEventTime ) {
226                    return Job\ElasticaWrite::build(
227                        $cluster,
228                        UpdateGroup::PAGE,
229                        'sendData',
230                        [ $indexSuffix, $chunk ],
231                        [],
232                        $updateKind,
233                        $rootEventTime
234                    );
235                } );
236            $count += count( $documents );
237        }
238
239        return $count;
240    }
241
242    /**
243     * @param ProperPageIdentity $page
244     * @param string $tagField
245     * @param string $tagPrefix
246     * @param string|string[]|null $tagNames
247     * @param int|int[]|null $tagWeights
248     */
249    public function updateWeightedTags(
250        ProperPageIdentity $page, string $tagField, string $tagPrefix, $tagNames = null, $tagWeights = null
251    ) {
252        Assert::precondition( $page->exists(), "page must exist" );
253        $docId = $this->connection->getConfig()->makeId( $page->getId() );
254        $indexSuffix = $this->connection->getIndexSuffixForNamespace( $page->getNamespace() );
255        $this->pushElasticaWriteJobs(
256            UpdateGroup::PAGE,
257            [ $docId ],
258            static function ( array $docIds, ClusterSettings $cluster ) use (
259                $docId, $indexSuffix, $tagField, $tagPrefix, $tagNames, $tagWeights
260            ) {
261                $tagWeights = ( $tagWeights === null ) ? null : [ $docId => $tagWeights ];
262                return Job\ElasticaWrite::build(
263                    $cluster,
264                    UpdateGroup::PAGE,
265                    'sendUpdateWeightedTags',
266                    [ $indexSuffix, $docIds, $tagField, $tagPrefix, $tagNames, $tagWeights ],
267                );
268            } );
269    }
270
271    /**
272     * @param ProperPageIdentity $page
273     * @param string $tagField
274     * @param string $tagPrefix
275     */
276    public function resetWeightedTags( ProperPageIdentity $page, string $tagField, string $tagPrefix ) {
277        Assert::precondition( $page->exists(), "page must exist" );
278        $docId = $this->connection->getConfig()->makeId( $page->getId() );
279        $indexSuffix = $this->connection->getIndexSuffixForNamespace( $page->getNamespace() );
280        $this->pushElasticaWriteJobs(
281            UpdateGroup::PAGE,
282            [ $docId ],
283            static function (
284                array $docIds, ClusterSettings $cluster
285            ) use ( $indexSuffix, $tagField, $tagPrefix ) {
286                return Job\ElasticaWrite::build(
287                    $cluster,
288                    UpdateGroup::PAGE,
289                    'sendResetWeightedTags',
290                    [ $indexSuffix, $docIds, $tagField, $tagPrefix ],
291                );
292            } );
293    }
294
295    /**
296     * Delete pages from the elasticsearch index.  $titles and $docIds must point to the
297     * same pages and should point to them in the same order.
298     *
299     * @param Title[] $titles List of titles to delete.  If empty then skipped other index
300     *      maintenance is skipped.
301     * @param int[]|string[] $docIds List of elasticsearch document ids to delete
302     * @param string|null $indexSuffix index from which to delete.  null means all.
303     * @param array $writeJobParams Parameters passed on to ElasticaWriteJob
304     */
305    public function deletePages( $titles, $docIds, $indexSuffix = null, array $writeJobParams = [] ): void {
306        Job\OtherIndex::queueIfRequired( $this->connection->getConfig(), $titles, $this->writeToClusterName );
307
308        // Deletes are fairly cheap to send, they can be batched in larger
309        // chunks. Unlikely a batch this large ever comes through.
310        $batchSize = 50;
311        $this->pushElasticaWriteJobs(
312            UpdateGroup::PAGE,
313            $docIds,
314            static function ( array $chunk, ClusterSettings $cluster ) use ( $indexSuffix, $writeJobParams ) {
315                return Job\ElasticaWrite::build(
316                    $cluster,
317                    UpdateGroup::PAGE,
318                    'sendDeletes',
319                    [ $chunk, $indexSuffix ],
320                    $writeJobParams
321                );
322            },
323            $batchSize
324        );
325    }
326
327    /**
328     * Add documents to archive index.
329     * @param array $archived
330     * @return bool
331     */
332    public function archivePages( $archived ) {
333        if ( !$this->connection->getConfig()->getElement( 'CirrusSearchIndexDeletes' ) ) {
334            // Disabled by config - don't do anything
335            return true;
336        }
337        $docs = $this->buildArchiveDocuments( $archived );
338        $this->pushElasticaWriteJobs(
339            UpdateGroup::ARCHIVE,
340            $docs,
341            static function ( array $chunk, ClusterSettings $cluster ) {
342                return Job\ElasticaWrite::build(
343                    $cluster,
344                    UpdateGroup::ARCHIVE,
345                    'sendData',
346                    [ Connection::ARCHIVE_INDEX_SUFFIX, $chunk ],
347                    [ 'private_data' => true ],
348                );
349            } );
350
351        return true;
352    }
353
354    /**
355     * Build Elastica documents for archived pages.
356     * @param array $archived
357     * @return \Elastica\Document[]
358     */
359    private function buildArchiveDocuments( array $archived ) {
360        $docs = [];
361        foreach ( $archived as $delete ) {
362            if ( !isset( $delete['title'] ) ) {
363                // These come from pages that still exist, but are redirects.
364                // This is non-obvious and we probably need a better way...
365                continue;
366            }
367            /** @var Title $title */
368            $title = $delete['title'];
369            $doc = new \Elastica\Document( $delete['page'], [
370                'namespace' => $title->getNamespace(),
371                'title' => $title->getText(),
372                'wiki' => WikiMap::getCurrentWikiId(),
373            ] );
374            $doc->setDocAsUpsert( true );
375            $doc->setRetryOnConflict( $this->connection->getConfig()->getElement( 'CirrusSearchUpdateConflictRetryCount' ) );
376
377            $docs[] = $doc;
378        }
379
380        return $docs;
381    }
382
383    /**
384     * Update the search index for newly linked or unlinked articles.
385     * @param Title[] $titles titles to update
386     */
387    public function updateLinkedArticles( $titles ): void {
388        $pages = [];
389        $wikiPageFactory = MediaWikiServices::getInstance()->getWikiPageFactory();
390        foreach ( $titles as $title ) {
391            // Special pages don't get updated, we only index
392            // actual existing pages.
393            if ( !$title || !$title->canExist() ) {
394                continue;
395            }
396
397            $page = $wikiPageFactory->newFromTitle( $title );
398            if ( $page === null || !$page->exists() ) {
399                // Skip link to nonexistent page.
400                continue;
401            }
402            // Resolve one level of redirects because only one level of redirects is scored.
403            if ( $page->isRedirect() ) {
404                $target = $page->getRedirectTarget();
405                if ( $target === null ) {
406                    // Redirect to itself or broken redirect? ignore.
407                    continue;
408                }
409                if ( !$target->exists() ) {
410                    // Skip redirects to nonexistent pages
411                    continue;
412                }
413                $page = $wikiPageFactory->newFromTitle( $target );
414            }
415            if ( $page->isRedirect() ) {
416                // This is a redirect to a redirect which doesn't count in the search score any way.
417                continue;
418            }
419            if ( in_array( $title->getFullText(), $this->updated ) ) {
420                // We've already updated this page in this process so there is no need to update it again.
421                continue;
422            }
423            // Note that we don't add this page to the list of updated pages because this update isn't
424            // a full update (just link counts).
425            $pages[] = $page;
426        }
427        $this->updatePages( $pages, BuildDocument::SKIP_PARSE );
428    }
429
430    /**
431     * Convert an array of pages to an array of their titles.
432     *
433     * @param WikiPage[] $pages
434     * @return Title[]
435     */
436    private function pagesToTitles( $pages ) {
437        $titles = [];
438        foreach ( $pages as $page ) {
439            $titles[] = $page->getTitle();
440        }
441        return $titles;
442    }
443
444    /**
445     * @param string $updateGroup UpdateGroup::* constant
446     * @param mixed[] $items
447     * @param callable $factory
448     * @param int $batchSize
449     */
450    protected function pushElasticaWriteJobs( string $updateGroup, array $items, $factory, int $batchSize = 10 ): void {
451        // Elasticsearch has a queue capacity of 50 so if $documents contains 50 pages it could bump up
452        // against the max.  So we chunk it and do them sequentially.
453        $jobs = [];
454        $config = $this->connection->getConfig();
455        $clusters = $this->elasticaWriteClusters( $updateGroup );
456
457        foreach ( array_chunk( $items, $batchSize ) as $chunked ) {
458            // Queueing one job per cluster ensures isolation. If one clusters falls
459            // behind on writes the others shouldn't notice.
460            // Unfortunately queueing a job per cluster results in quite a few
461            // jobs to run. If the job queue can't keep up some clusters can
462            // be run in-process. Any failures will queue themselves for later
463            // execution.
464            foreach ( $clusters as $cluster ) {
465                $clusterSettings = new ClusterSettings( $config, $cluster );
466                $job = $factory( $chunked, $clusterSettings );
467                if ( $clusterSettings->isIsolated() ) {
468                    $jobs[] = $job;
469                } else {
470                    $job->run();
471                }
472            }
473        }
474
475        if ( $jobs ) {
476            MediaWikiServices::getInstance()->getJobQueueGroup()->push( $jobs );
477        }
478    }
479
480    private function elasticaWriteClusters( string $updateGroup ): array {
481        if ( $this->writeToClusterName !== null ) {
482            return [ $this->writeToClusterName ];
483        } else {
484            return $this->connection
485                ->getConfig()
486                ->getClusterAssignment()
487                ->getWritableClusters( $updateGroup );
488        }
489    }
490
491    /**
492     * @param string $description
493     * @param string $queryType
494     * @param string[] $extra
495     * @return SearchRequestLog
496     */
497    protected function newLog( $description, $queryType, array $extra = [] ) {
498        return new SearchRequestLog(
499            $this->connection->getClient(),
500            $description,
501            $queryType,
502            $extra
503        );
504    }
505}