Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 217 |
|
0.00% |
0 / 15 |
CRAP | |
0.00% |
0 / 1 |
Updater | |
0.00% |
0 / 217 |
|
0.00% |
0 / 15 |
2450 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
build | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
updateFromTitle | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
20 | |||
traceRedirects | |
0.00% |
0 / 29 |
|
0.00% |
0 / 1 |
90 | |||
updatePages | |
0.00% |
0 / 44 |
|
0.00% |
0 / 1 |
20 | |||
updateWeightedTags | |
0.00% |
0 / 23 |
|
0.00% |
0 / 1 |
2 | |||
resetWeightedTags | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
deletePages | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
2 | |||
archivePages | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
6 | |||
buildArchiveDocuments | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
12 | |||
updateLinkedArticles | |
0.00% |
0 / 21 |
|
0.00% |
0 / 1 |
132 | |||
pagesToTitles | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
pushElasticaWriteJobs | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
30 | |||
elasticaWriteClusters | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
6 | |||
newLog | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | namespace CirrusSearch; |
4 | |
5 | use CirrusSearch\BuildDocument\BuildDocument; |
6 | use CirrusSearch\BuildDocument\DocumentSizeLimiter; |
7 | use CirrusSearch\Profile\SearchProfileService; |
8 | use CirrusSearch\Search\CirrusIndexField; |
9 | use MediaWiki\Content\TextContent; |
10 | use MediaWiki\Logger\LoggerFactory; |
11 | use MediaWiki\MediaWikiServices; |
12 | use MediaWiki\Page\ProperPageIdentity; |
13 | use MediaWiki\Title\Title; |
14 | use MediaWiki\WikiMap\WikiMap; |
15 | use Wikimedia\Assert\Assert; |
16 | use WikiPage; |
17 | |
18 | /** |
19 | * Performs updates and deletes on the Elasticsearch index. Called by |
20 | * CirrusSearch.php (our SearchEngine implementation), forceSearchIndex |
21 | * (for bulk updates), and CirrusSearch's jobs. |
22 | * |
23 | * This program is free software; you can redistribute it and/or modify |
24 | * it under the terms of the GNU General Public License as published by |
25 | * the Free Software Foundation; either version 2 of the License, or |
26 | * (at your option) any later version. |
27 | * |
28 | * This program is distributed in the hope that it will be useful, |
29 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
30 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
31 | * GNU General Public License for more details. |
32 | * |
33 | * You should have received a copy of the GNU General Public License along |
34 | * with this program; if not, write to the Free Software Foundation, Inc., |
35 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
36 | * http://www.gnu.org/copyleft/gpl.html |
37 | */ |
38 | class Updater extends ElasticsearchIntermediary implements WeightedTagsUpdater { |
39 | /** |
40 | * Full title text of pages updated in this process. Used for deduplication |
41 | * of updates. |
42 | * @var string[] |
43 | */ |
44 | private $updated = []; |
45 | |
46 | /** |
47 | * @var string|null Name of cluster to write to, or null if none (write to all) |
48 | */ |
49 | protected $writeToClusterName; |
50 | |
51 | /** |
52 | * @param Connection $readConnection connection used to pull data out of elasticsearch |
53 | * @param string|null $writeToClusterName |
54 | */ |
55 | public function __construct( Connection $readConnection, $writeToClusterName = null ) { |
56 | parent::__construct( $readConnection, null, 0 ); |
57 | $this->writeToClusterName = $writeToClusterName; |
58 | } |
59 | |
60 | /** |
61 | * @param SearchConfig $config |
62 | * @param string|null $cluster cluster to read from and write to, |
63 | * null to read from the default cluster and write to all |
64 | * @return Updater |
65 | */ |
66 | public static function build( SearchConfig $config, $cluster ): Updater { |
67 | Assert::invariant( self::class === static::class, 'Must be invoked as Updater::build( ... )' ); |
68 | $connection = Connection::getPool( $config, $cluster ); |
69 | return new self( $connection, $cluster ); |
70 | } |
71 | |
72 | /** |
73 | * Update a single page. |
74 | * @param Title $title |
75 | * @param string|null $updateKind kind of update to perform (used for monitoring) |
76 | * @param int|null $rootEventTime the time of MW event that caused this update (used for monitoring) |
77 | */ |
78 | public function updateFromTitle( $title, ?string $updateKind, ?int $rootEventTime ): void { |
79 | [ $page, $redirects ] = $this->traceRedirects( $title ); |
80 | if ( $page ) { |
81 | $this->updatePages( |
82 | [ $page ], |
83 | BuildDocument::INDEX_EVERYTHING, |
84 | $updateKind, |
85 | $rootEventTime |
86 | ); |
87 | } |
88 | |
89 | if ( $redirects === [] ) { |
90 | return; |
91 | } |
92 | $redirectDocIds = []; |
93 | foreach ( $redirects as $redirect ) { |
94 | $redirectDocIds[] = $this->connection->getConfig()->makeId( $redirect->getId() ); |
95 | } |
96 | $this->deletePages( [], $redirectDocIds ); |
97 | } |
98 | |
99 | /** |
100 | * Trace redirects from the title to the destination. Also registers the title in the |
101 | * memory of titles updated and detects special pages. |
102 | * |
103 | * @param Title $title title to trace |
104 | * @return array with keys: target, redirects |
105 | * - target is WikiPage|null wikipage if the $title either isn't a redirect or resolves |
106 | * to an updatable page that hasn't been updated yet. Null if the page has been |
107 | * updated, is a special page, or the redirects enter a loop. |
108 | * - redirects is an array of WikiPages, one per redirect in the chain. If title isn't |
109 | * a redirect then this will be an empty array |
110 | */ |
111 | public function traceRedirects( $title ) { |
112 | // Loop through redirects until we get to the ultimate target |
113 | $redirects = []; |
114 | $wikiPageFactory = MediaWikiServices::getInstance()->getWikiPageFactory(); |
115 | while ( true ) { |
116 | $titleText = $title->getFullText(); |
117 | if ( in_array( $titleText, $this->updated ) ) { |
118 | // Already indexed this article in this process. This is mostly useful |
119 | // to catch self redirects but has a storied history of catching strange |
120 | // behavior. |
121 | return [ null, $redirects ]; |
122 | } |
123 | |
124 | // Don't index special pages, interwiki links, bad namespaces, etc |
125 | $logger = LoggerFactory::getInstance( 'CirrusSearch' ); |
126 | if ( !$title->canExist() ) { |
127 | $logger->debug( "Ignoring an update for a page that cannot exist: $titleText" ); |
128 | return [ null, $redirects ]; |
129 | } |
130 | |
131 | $page = $wikiPageFactory->newFromTitle( $title ); |
132 | if ( !$page->exists() ) { |
133 | $logger->debug( "Ignoring an update for a nonexistent page: $titleText" ); |
134 | return [ null, $redirects ]; |
135 | } |
136 | $content = $page->getContent(); |
137 | if ( is_string( $content ) ) { |
138 | $content = new TextContent( $content ); |
139 | } |
140 | // If the event that the content is _still_ not usable, we have to give up. |
141 | if ( !is_object( $content ) ) { |
142 | return [ null, $redirects ]; |
143 | } |
144 | |
145 | // Add the page to the list of updated pages before we start trying to update to catch redirect loops. |
146 | $this->updated[] = $titleText; |
147 | if ( $content->isRedirect() ) { |
148 | $redirects[] = $page; |
149 | $target = $content->getRedirectTarget(); |
150 | if ( $target->equals( $page->getTitle() ) ) { |
151 | // This doesn't warn about redirect loops longer than one but we'll catch those anyway. |
152 | $logger->info( "Title redirecting to itself. Skip indexing" ); |
153 | return [ null, $redirects ]; |
154 | } |
155 | $title = $target; |
156 | continue; |
157 | } else { |
158 | return [ $page, $redirects ]; |
159 | } |
160 | } |
161 | } |
162 | |
163 | /** |
164 | * This updates pages in elasticsearch. |
165 | * |
166 | * $flags includes: |
167 | * INDEX_EVERYTHING Cirrus will parse the page and count the links and send the document |
168 | * to Elasticsearch as an index so if it doesn't exist it'll be created. |
169 | * SKIP_PARSE Cirrus will skip parsing the page when building the document. It makes |
170 | * sense to do this when you know the page hasn't changed like when it is newly linked |
171 | * from another page. |
172 | * SKIP_LINKS Cirrus will skip collecting links information. It makes sense to do this |
173 | * when you know the link counts aren't yet available like during the first phase of |
174 | * the two phase index build. |
175 | * INDEX_ON_SKIP Cirrus will send an update if SKIP_PARSE or SKIP_LINKS rather than an |
176 | * index. Indexing with any portion of the document skipped is dangerous because it |
177 | * can put half created pages in the index. This is only a good idea during the first |
178 | * half of the two phase index build. |
179 | * |
180 | * @param WikiPage[] $pages pages to update |
181 | * @param int $flags Bit field containing instructions about how the document should be built |
182 | * and sent to Elasticsearch. |
183 | * @param string|null $updateKind kind of update to perform (used for monitoring) |
184 | * @param int|null $rootEventTime the time of MW event that caused this update (used for monitoring) |
185 | * @return int Number of documents updated |
186 | */ |
187 | public function updatePages( $pages, $flags, ?string $updateKind = null, ?int $rootEventTime = null ): int { |
188 | // Don't update the same page twice. We shouldn't, but meh |
189 | $pageIds = []; |
190 | $pages = array_filter( $pages, static function ( WikiPage $page ) use ( &$pageIds ) { |
191 | if ( !in_array( $page->getId(), $pageIds ) ) { |
192 | $pageIds[] = $page->getId(); |
193 | return true; |
194 | } |
195 | return false; |
196 | } ); |
197 | |
198 | $titles = $this->pagesToTitles( $pages ); |
199 | Job\OtherIndex::queueIfRequired( $this->connection->getConfig(), $titles, $this->writeToClusterName ); |
200 | |
201 | $allDocuments = array_fill_keys( $this->connection->getAllIndexSuffixes(), [] ); |
202 | $services = MediaWikiServices::getInstance(); |
203 | $docSizeLimiter = new DocumentSizeLimiter( |
204 | $this->connection->getConfig()->getProfileService()->loadProfile( SearchProfileService::DOCUMENT_SIZE_LIMITER ) ); |
205 | $builder = new BuildDocument( |
206 | $this->connection, |
207 | $services->getConnectionProvider()->getReplicaDatabase(), |
208 | $services->getRevisionStore(), |
209 | $services->getBacklinkCacheFactory(), |
210 | $docSizeLimiter, |
211 | $services->getTitleFormatter(), |
212 | $services->getWikiPageFactory(), |
213 | $services->getTitleFactory() |
214 | ); |
215 | foreach ( $builder->initialize( $pages, $flags ) as $document ) { |
216 | // This isn't really a property of the connection, so it doesn't matter |
217 | // this is the read cluster and not the write cluster. |
218 | $suffix = $this->connection->getIndexSuffixForNamespace( $document->get( 'namespace' ) ); |
219 | $allDocuments[$suffix][] = $document; |
220 | } |
221 | |
222 | $count = 0; |
223 | foreach ( $allDocuments as $indexSuffix => $documents ) { |
224 | $this->pushElasticaWriteJobs( |
225 | UpdateGroup::PAGE, |
226 | $documents, |
227 | static function ( array $chunk, ClusterSettings $cluster ) use ( $indexSuffix, $updateKind, $rootEventTime ) { |
228 | return Job\ElasticaWrite::build( |
229 | $cluster, |
230 | UpdateGroup::PAGE, |
231 | 'sendData', |
232 | [ $indexSuffix, $chunk ], |
233 | [], |
234 | $updateKind, |
235 | $rootEventTime |
236 | ); |
237 | } ); |
238 | $count += count( $documents ); |
239 | } |
240 | |
241 | return $count; |
242 | } |
243 | |
244 | /** |
245 | * @inheritDoc |
246 | */ |
247 | public function updateWeightedTags( |
248 | ProperPageIdentity $page, |
249 | string $tagPrefix, |
250 | ?array $tagWeights = null, |
251 | ?string $trigger = null |
252 | ): void { |
253 | Assert::precondition( $page->exists(), "page must exist" ); |
254 | $docId = $this->connection->getConfig()->makeId( $page->getId() ); |
255 | $indexSuffix = $this->connection->getIndexSuffixForNamespace( $page->getNamespace() ); |
256 | $this->pushElasticaWriteJobs( |
257 | UpdateGroup::WEIGHTED_TAGS, |
258 | [ $docId ], |
259 | static function ( array $docIds, ClusterSettings $cluster ) use ( |
260 | $docId, |
261 | $indexSuffix, |
262 | $tagPrefix, |
263 | $tagWeights |
264 | ) { |
265 | return Job\ElasticaWrite::build( |
266 | $cluster, |
267 | UpdateGroup::WEIGHTED_TAGS, |
268 | 'sendWeightedTagsUpdate', |
269 | [ |
270 | $indexSuffix, |
271 | $tagPrefix, |
272 | [ $docId => $tagWeights ] |
273 | ], |
274 | ); |
275 | } ); |
276 | } |
277 | |
278 | /** |
279 | * @inheritDoc |
280 | */ |
281 | public function resetWeightedTags( ProperPageIdentity $page, array $tagPrefixes, ?string $trigger = null ): void { |
282 | foreach ( $tagPrefixes as $tagPrefix ) { |
283 | $this->updateWeightedTags( |
284 | $page, |
285 | $tagPrefix, |
286 | [ CirrusIndexField::MULTILIST_DELETE_GROUPING => null ], |
287 | $trigger |
288 | ); |
289 | } |
290 | } |
291 | |
292 | /** |
293 | * Delete pages from the elasticsearch index. $titles and $docIds must point to the |
294 | * same pages and should point to them in the same order. |
295 | * |
296 | * @param Title[] $titles List of titles to delete. If empty then skipped other index |
297 | * maintenance is skipped. |
298 | * @param int[]|string[] $docIds List of elasticsearch document ids to delete |
299 | * @param string|null $indexSuffix index from which to delete. null means all. |
300 | * @param array $writeJobParams Parameters passed on to ElasticaWriteJob |
301 | */ |
302 | public function deletePages( $titles, $docIds, $indexSuffix = null, array $writeJobParams = [] ): void { |
303 | Job\OtherIndex::queueIfRequired( $this->connection->getConfig(), $titles, $this->writeToClusterName ); |
304 | |
305 | // Deletes are fairly cheap to send, they can be batched in larger |
306 | // chunks. Unlikely a batch this large ever comes through. |
307 | $batchSize = 50; |
308 | $this->pushElasticaWriteJobs( |
309 | UpdateGroup::PAGE, |
310 | $docIds, |
311 | static function ( array $chunk, ClusterSettings $cluster ) use ( $indexSuffix, $writeJobParams ) { |
312 | return Job\ElasticaWrite::build( |
313 | $cluster, |
314 | UpdateGroup::PAGE, |
315 | 'sendDeletes', |
316 | [ $chunk, $indexSuffix ], |
317 | $writeJobParams |
318 | ); |
319 | }, |
320 | $batchSize |
321 | ); |
322 | } |
323 | |
324 | /** |
325 | * Add documents to archive index. |
326 | * @param array $archived |
327 | * @return bool |
328 | */ |
329 | public function archivePages( $archived ) { |
330 | if ( !$this->connection->getConfig()->getElement( 'CirrusSearchIndexDeletes' ) ) { |
331 | // Disabled by config - don't do anything |
332 | return true; |
333 | } |
334 | $docs = $this->buildArchiveDocuments( $archived ); |
335 | $this->pushElasticaWriteJobs( |
336 | UpdateGroup::ARCHIVE, |
337 | $docs, |
338 | static function ( array $chunk, ClusterSettings $cluster ) { |
339 | return Job\ElasticaWrite::build( |
340 | $cluster, |
341 | UpdateGroup::ARCHIVE, |
342 | 'sendData', |
343 | [ Connection::ARCHIVE_INDEX_SUFFIX, $chunk ], |
344 | [ 'private_data' => true ], |
345 | ); |
346 | } ); |
347 | |
348 | return true; |
349 | } |
350 | |
351 | /** |
352 | * Build Elastica documents for archived pages. |
353 | * @param array $archived |
354 | * @return \Elastica\Document[] |
355 | */ |
356 | private function buildArchiveDocuments( array $archived ) { |
357 | $docs = []; |
358 | foreach ( $archived as $delete ) { |
359 | if ( !isset( $delete['title'] ) ) { |
360 | // These come from pages that still exist, but are redirects. |
361 | // This is non-obvious and we probably need a better way... |
362 | continue; |
363 | } |
364 | /** @var Title $title */ |
365 | $title = $delete['title']; |
366 | $doc = new \Elastica\Document( $delete['page'], [ |
367 | 'namespace' => $title->getNamespace(), |
368 | 'title' => $title->getText(), |
369 | 'wiki' => WikiMap::getCurrentWikiId(), |
370 | ] ); |
371 | $doc->setDocAsUpsert( true ); |
372 | $doc->setRetryOnConflict( $this->connection->getConfig()->getElement( 'CirrusSearchUpdateConflictRetryCount' ) ); |
373 | |
374 | $docs[] = $doc; |
375 | } |
376 | |
377 | return $docs; |
378 | } |
379 | |
380 | /** |
381 | * Update the search index for newly linked or unlinked articles. |
382 | * @param Title[] $titles titles to update |
383 | */ |
384 | public function updateLinkedArticles( $titles ): void { |
385 | $pages = []; |
386 | $wikiPageFactory = MediaWikiServices::getInstance()->getWikiPageFactory(); |
387 | foreach ( $titles as $title ) { |
388 | // Special pages don't get updated, we only index |
389 | // actual existing pages. |
390 | if ( !$title || !$title->canExist() ) { |
391 | continue; |
392 | } |
393 | |
394 | $page = $wikiPageFactory->newFromTitle( $title ); |
395 | if ( $page === null || !$page->exists() ) { |
396 | // Skip link to nonexistent page. |
397 | continue; |
398 | } |
399 | // Resolve one level of redirects because only one level of redirects is scored. |
400 | if ( $page->isRedirect() ) { |
401 | $target = $page->getRedirectTarget(); |
402 | if ( $target === null ) { |
403 | // Redirect to itself or broken redirect? ignore. |
404 | continue; |
405 | } |
406 | if ( !$target->exists() ) { |
407 | // Skip redirects to nonexistent pages |
408 | continue; |
409 | } |
410 | $page = $wikiPageFactory->newFromTitle( $target ); |
411 | } |
412 | if ( $page->isRedirect() ) { |
413 | // This is a redirect to a redirect which doesn't count in the search score any way. |
414 | continue; |
415 | } |
416 | if ( in_array( $title->getFullText(), $this->updated ) ) { |
417 | // We've already updated this page in this process so there is no need to update it again. |
418 | continue; |
419 | } |
420 | // Note that we don't add this page to the list of updated pages because this update isn't |
421 | // a full update (just link counts). |
422 | $pages[] = $page; |
423 | } |
424 | $this->updatePages( $pages, BuildDocument::SKIP_PARSE ); |
425 | } |
426 | |
427 | /** |
428 | * Convert an array of pages to an array of their titles. |
429 | * |
430 | * @param WikiPage[] $pages |
431 | * @return Title[] |
432 | */ |
433 | private function pagesToTitles( $pages ) { |
434 | $titles = []; |
435 | foreach ( $pages as $page ) { |
436 | $titles[] = $page->getTitle(); |
437 | } |
438 | return $titles; |
439 | } |
440 | |
441 | /** |
442 | * @param string $updateGroup UpdateGroup::* constant |
443 | * @param mixed[] $items |
444 | * @param callable $factory |
445 | * @param int $batchSize |
446 | */ |
447 | protected function pushElasticaWriteJobs( string $updateGroup, array $items, $factory, int $batchSize = 10 ): void { |
448 | // Elasticsearch has a queue capacity of 50 so if $documents contains 50 pages it could bump up |
449 | // against the max. So we chunk it and do them sequentially. |
450 | $jobs = []; |
451 | $config = $this->connection->getConfig(); |
452 | $clusters = $this->elasticaWriteClusters( $updateGroup ); |
453 | |
454 | foreach ( array_chunk( $items, $batchSize ) as $chunked ) { |
455 | // Queueing one job per cluster ensures isolation. If one clusters falls |
456 | // behind on writes the others shouldn't notice. |
457 | // Unfortunately queueing a job per cluster results in quite a few |
458 | // jobs to run. If the job queue can't keep up some clusters can |
459 | // be run in-process. Any failures will queue themselves for later |
460 | // execution. |
461 | foreach ( $clusters as $cluster ) { |
462 | $clusterSettings = new ClusterSettings( $config, $cluster ); |
463 | $job = $factory( $chunked, $clusterSettings ); |
464 | if ( $clusterSettings->isIsolated() ) { |
465 | $jobs[] = $job; |
466 | } else { |
467 | $job->run(); |
468 | } |
469 | } |
470 | } |
471 | |
472 | if ( $jobs ) { |
473 | MediaWikiServices::getInstance()->getJobQueueGroup()->push( $jobs ); |
474 | } |
475 | } |
476 | |
477 | private function elasticaWriteClusters( string $updateGroup ): array { |
478 | if ( $this->writeToClusterName !== null ) { |
479 | return [ $this->writeToClusterName ]; |
480 | } else { |
481 | return $this->connection |
482 | ->getConfig() |
483 | ->getClusterAssignment() |
484 | ->getWritableClusters( $updateGroup ); |
485 | } |
486 | } |
487 | |
488 | /** |
489 | * @param string $description |
490 | * @param string $queryType |
491 | * @param string[] $extra |
492 | * @return SearchRequestLog |
493 | */ |
494 | protected function newLog( $description, $queryType, array $extra = [] ) { |
495 | return new SearchRequestLog( |
496 | $this->connection->getClient(), |
497 | $description, |
498 | $queryType, |
499 | $extra |
500 | ); |
501 | } |
502 | } |