Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 219 |
|
0.00% |
0 / 15 |
CRAP | |
0.00% |
0 / 1 |
Updater | |
0.00% |
0 / 219 |
|
0.00% |
0 / 15 |
2450 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
build | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
updateFromTitle | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
20 | |||
traceRedirects | |
0.00% |
0 / 29 |
|
0.00% |
0 / 1 |
90 | |||
updatePages | |
0.00% |
0 / 43 |
|
0.00% |
0 / 1 |
20 | |||
updateWeightedTags | |
0.00% |
0 / 17 |
|
0.00% |
0 / 1 |
6 | |||
resetWeightedTags | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
2 | |||
deletePages | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
2 | |||
archivePages | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
6 | |||
buildArchiveDocuments | |
0.00% |
0 / 14 |
|
0.00% |
0 / 1 |
12 | |||
updateLinkedArticles | |
0.00% |
0 / 21 |
|
0.00% |
0 / 1 |
132 | |||
pagesToTitles | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
pushElasticaWriteJobs | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
30 | |||
elasticaWriteClusters | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
6 | |||
newLog | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | namespace CirrusSearch; |
4 | |
5 | use CirrusSearch\BuildDocument\BuildDocument; |
6 | use CirrusSearch\BuildDocument\DocumentSizeLimiter; |
7 | use CirrusSearch\Profile\SearchProfileService; |
8 | use MediaWiki\Logger\LoggerFactory; |
9 | use MediaWiki\MediaWikiServices; |
10 | use MediaWiki\Page\ProperPageIdentity; |
11 | use MediaWiki\Title\Title; |
12 | use MediaWiki\WikiMap\WikiMap; |
13 | use TextContent; |
14 | use Wikimedia\Assert\Assert; |
15 | use WikiPage; |
16 | |
17 | /** |
18 | * Performs updates and deletes on the Elasticsearch index. Called by |
19 | * CirrusSearch.php (our SearchEngine implementation), forceSearchIndex |
20 | * (for bulk updates), and CirrusSearch's jobs. |
21 | * |
22 | * This program is free software; you can redistribute it and/or modify |
23 | * it under the terms of the GNU General Public License as published by |
24 | * the Free Software Foundation; either version 2 of the License, or |
25 | * (at your option) any later version. |
26 | * |
27 | * This program is distributed in the hope that it will be useful, |
28 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
29 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
30 | * GNU General Public License for more details. |
31 | * |
32 | * You should have received a copy of the GNU General Public License along |
33 | * with this program; if not, write to the Free Software Foundation, Inc., |
34 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
35 | * http://www.gnu.org/copyleft/gpl.html |
36 | */ |
37 | class Updater extends ElasticsearchIntermediary { |
38 | /** |
39 | * Full title text of pages updated in this process. Used for deduplication |
40 | * of updates. |
41 | * @var string[] |
42 | */ |
43 | private $updated = []; |
44 | |
45 | /** |
46 | * @var string|null Name of cluster to write to, or null if none (write to all) |
47 | */ |
48 | protected $writeToClusterName; |
49 | |
50 | /** |
51 | * @param Connection $readConnection connection used to pull data out of elasticsearch |
52 | * @param string|null $writeToClusterName |
53 | */ |
54 | public function __construct( Connection $readConnection, $writeToClusterName = null ) { |
55 | parent::__construct( $readConnection, null, 0 ); |
56 | $this->writeToClusterName = $writeToClusterName; |
57 | } |
58 | |
59 | /** |
60 | * @param SearchConfig $config |
61 | * @param string|null $cluster cluster to read from and write to, |
62 | * null to read from the default cluster and write to all |
63 | * @return Updater |
64 | */ |
65 | public static function build( SearchConfig $config, $cluster ): Updater { |
66 | Assert::invariant( self::class === static::class, 'Must be invoked as Updater::build( ... )' ); |
67 | $connection = Connection::getPool( $config, $cluster ); |
68 | return new self( $connection, $cluster ); |
69 | } |
70 | |
71 | /** |
72 | * Update a single page. |
73 | * @param Title $title |
74 | * @param string|null $updateKind kind of update to perform (used for monitoring) |
75 | * @param int|null $rootEventTime the time of MW event that caused this update (used for monitoring) |
76 | */ |
77 | public function updateFromTitle( $title, ?string $updateKind, ?int $rootEventTime ): void { |
78 | [ $page, $redirects ] = $this->traceRedirects( $title ); |
79 | if ( $page ) { |
80 | $this->updatePages( |
81 | [ $page ], |
82 | BuildDocument::INDEX_EVERYTHING, |
83 | $updateKind, |
84 | $rootEventTime |
85 | ); |
86 | } |
87 | |
88 | if ( $redirects === [] ) { |
89 | return; |
90 | } |
91 | $redirectDocIds = []; |
92 | foreach ( $redirects as $redirect ) { |
93 | $redirectDocIds[] = $this->connection->getConfig()->makeId( $redirect->getId() ); |
94 | } |
95 | $this->deletePages( [], $redirectDocIds ); |
96 | } |
97 | |
98 | /** |
99 | * Trace redirects from the title to the destination. Also registers the title in the |
100 | * memory of titles updated and detects special pages. |
101 | * |
102 | * @param Title $title title to trace |
103 | * @return array with keys: target, redirects |
104 | * - target is WikiPage|null wikipage if the $title either isn't a redirect or resolves |
105 | * to an updatable page that hasn't been updated yet. Null if the page has been |
106 | * updated, is a special page, or the redirects enter a loop. |
107 | * - redirects is an array of WikiPages, one per redirect in the chain. If title isn't |
108 | * a redirect then this will be an empty array |
109 | */ |
110 | public function traceRedirects( $title ) { |
111 | // Loop through redirects until we get to the ultimate target |
112 | $redirects = []; |
113 | $wikiPageFactory = MediaWikiServices::getInstance()->getWikiPageFactory(); |
114 | while ( true ) { |
115 | $titleText = $title->getFullText(); |
116 | if ( in_array( $titleText, $this->updated ) ) { |
117 | // Already indexed this article in this process. This is mostly useful |
118 | // to catch self redirects but has a storied history of catching strange |
119 | // behavior. |
120 | return [ null, $redirects ]; |
121 | } |
122 | |
123 | // Don't index special pages, interwiki links, bad namespaces, etc |
124 | $logger = LoggerFactory::getInstance( 'CirrusSearch' ); |
125 | if ( !$title->canExist() ) { |
126 | $logger->debug( "Ignoring an update for a page that cannot exist: $titleText" ); |
127 | return [ null, $redirects ]; |
128 | } |
129 | |
130 | $page = $wikiPageFactory->newFromTitle( $title ); |
131 | if ( !$page->exists() ) { |
132 | $logger->debug( "Ignoring an update for a nonexistent page: $titleText" ); |
133 | return [ null, $redirects ]; |
134 | } |
135 | $content = $page->getContent(); |
136 | if ( is_string( $content ) ) { |
137 | $content = new TextContent( $content ); |
138 | } |
139 | // If the event that the content is _still_ not usable, we have to give up. |
140 | if ( !is_object( $content ) ) { |
141 | return [ null, $redirects ]; |
142 | } |
143 | |
144 | // Add the page to the list of updated pages before we start trying to update to catch redirect loops. |
145 | $this->updated[] = $titleText; |
146 | if ( $content->isRedirect() ) { |
147 | $redirects[] = $page; |
148 | $target = $content->getRedirectTarget(); |
149 | if ( $target->equals( $page->getTitle() ) ) { |
150 | // This doesn't warn about redirect loops longer than one but we'll catch those anyway. |
151 | $logger->info( "Title redirecting to itself. Skip indexing" ); |
152 | return [ null, $redirects ]; |
153 | } |
154 | $title = $target; |
155 | continue; |
156 | } else { |
157 | return [ $page, $redirects ]; |
158 | } |
159 | } |
160 | } |
161 | |
162 | /** |
163 | * This updates pages in elasticsearch. |
164 | * |
165 | * $flags includes: |
166 | * INDEX_EVERYTHING Cirrus will parse the page and count the links and send the document |
167 | * to Elasticsearch as an index so if it doesn't exist it'll be created. |
168 | * SKIP_PARSE Cirrus will skip parsing the page when building the document. It makes |
169 | * sense to do this when you know the page hasn't changed like when it is newly linked |
170 | * from another page. |
171 | * SKIP_LINKS Cirrus will skip collecting links information. It makes sense to do this |
172 | * when you know the link counts aren't yet available like during the first phase of |
173 | * the two phase index build. |
174 | * INDEX_ON_SKIP Cirrus will send an update if SKIP_PARSE or SKIP_LINKS rather than an |
175 | * index. Indexing with any portion of the document skipped is dangerous because it |
176 | * can put half created pages in the index. This is only a good idea during the first |
177 | * half of the two phase index build. |
178 | * |
179 | * @param WikiPage[] $pages pages to update |
180 | * @param int $flags Bit field containing instructions about how the document should be built |
181 | * and sent to Elasticsearch. |
182 | * @param string|null $updateKind kind of update to perform (used for monitoring) |
183 | * @param int|null $rootEventTime the time of MW event that caused this update (used for monitoring) |
184 | * @return int Number of documents updated |
185 | */ |
186 | public function updatePages( $pages, $flags, string $updateKind = null, int $rootEventTime = null ): int { |
187 | // Don't update the same page twice. We shouldn't, but meh |
188 | $pageIds = []; |
189 | $pages = array_filter( $pages, static function ( WikiPage $page ) use ( &$pageIds ) { |
190 | if ( !in_array( $page->getId(), $pageIds ) ) { |
191 | $pageIds[] = $page->getId(); |
192 | return true; |
193 | } |
194 | return false; |
195 | } ); |
196 | |
197 | $titles = $this->pagesToTitles( $pages ); |
198 | Job\OtherIndex::queueIfRequired( $this->connection->getConfig(), $titles, $this->writeToClusterName ); |
199 | |
200 | $allDocuments = array_fill_keys( $this->connection->getAllIndexSuffixes(), [] ); |
201 | $services = MediaWikiServices::getInstance(); |
202 | $docSizeLimiter = new DocumentSizeLimiter( |
203 | $this->connection->getConfig()->getProfileService()->loadProfile( SearchProfileService::DOCUMENT_SIZE_LIMITER ) ); |
204 | $builder = new BuildDocument( |
205 | $this->connection, |
206 | $services->getDBLoadBalancer()->getConnection( DB_REPLICA ), |
207 | $services->getRevisionStore(), |
208 | $services->getBacklinkCacheFactory(), |
209 | $docSizeLimiter, |
210 | $services->getTitleFormatter(), |
211 | $services->getWikiPageFactory() |
212 | ); |
213 | foreach ( $builder->initialize( $pages, $flags ) as $document ) { |
214 | // This isn't really a property of the connection, so it doesn't matter |
215 | // this is the read cluster and not the write cluster. |
216 | $suffix = $this->connection->getIndexSuffixForNamespace( $document->get( 'namespace' ) ); |
217 | $allDocuments[$suffix][] = $document; |
218 | } |
219 | |
220 | $count = 0; |
221 | foreach ( $allDocuments as $indexSuffix => $documents ) { |
222 | $this->pushElasticaWriteJobs( |
223 | UpdateGroup::PAGE, |
224 | $documents, |
225 | static function ( array $chunk, ClusterSettings $cluster ) use ( $indexSuffix, $updateKind, $rootEventTime ) { |
226 | return Job\ElasticaWrite::build( |
227 | $cluster, |
228 | UpdateGroup::PAGE, |
229 | 'sendData', |
230 | [ $indexSuffix, $chunk ], |
231 | [], |
232 | $updateKind, |
233 | $rootEventTime |
234 | ); |
235 | } ); |
236 | $count += count( $documents ); |
237 | } |
238 | |
239 | return $count; |
240 | } |
241 | |
242 | /** |
243 | * @param ProperPageIdentity $page |
244 | * @param string $tagField |
245 | * @param string $tagPrefix |
246 | * @param string|string[]|null $tagNames |
247 | * @param int|int[]|null $tagWeights |
248 | */ |
249 | public function updateWeightedTags( |
250 | ProperPageIdentity $page, string $tagField, string $tagPrefix, $tagNames = null, $tagWeights = null |
251 | ) { |
252 | Assert::precondition( $page->exists(), "page must exist" ); |
253 | $docId = $this->connection->getConfig()->makeId( $page->getId() ); |
254 | $indexSuffix = $this->connection->getIndexSuffixForNamespace( $page->getNamespace() ); |
255 | $this->pushElasticaWriteJobs( |
256 | UpdateGroup::PAGE, |
257 | [ $docId ], |
258 | static function ( array $docIds, ClusterSettings $cluster ) use ( |
259 | $docId, $indexSuffix, $tagField, $tagPrefix, $tagNames, $tagWeights |
260 | ) { |
261 | $tagWeights = ( $tagWeights === null ) ? null : [ $docId => $tagWeights ]; |
262 | return Job\ElasticaWrite::build( |
263 | $cluster, |
264 | UpdateGroup::PAGE, |
265 | 'sendUpdateWeightedTags', |
266 | [ $indexSuffix, $docIds, $tagField, $tagPrefix, $tagNames, $tagWeights ], |
267 | ); |
268 | } ); |
269 | } |
270 | |
271 | /** |
272 | * @param ProperPageIdentity $page |
273 | * @param string $tagField |
274 | * @param string $tagPrefix |
275 | */ |
276 | public function resetWeightedTags( ProperPageIdentity $page, string $tagField, string $tagPrefix ) { |
277 | Assert::precondition( $page->exists(), "page must exist" ); |
278 | $docId = $this->connection->getConfig()->makeId( $page->getId() ); |
279 | $indexSuffix = $this->connection->getIndexSuffixForNamespace( $page->getNamespace() ); |
280 | $this->pushElasticaWriteJobs( |
281 | UpdateGroup::PAGE, |
282 | [ $docId ], |
283 | static function ( |
284 | array $docIds, ClusterSettings $cluster |
285 | ) use ( $indexSuffix, $tagField, $tagPrefix ) { |
286 | return Job\ElasticaWrite::build( |
287 | $cluster, |
288 | UpdateGroup::PAGE, |
289 | 'sendResetWeightedTags', |
290 | [ $indexSuffix, $docIds, $tagField, $tagPrefix ], |
291 | ); |
292 | } ); |
293 | } |
294 | |
295 | /** |
296 | * Delete pages from the elasticsearch index. $titles and $docIds must point to the |
297 | * same pages and should point to them in the same order. |
298 | * |
299 | * @param Title[] $titles List of titles to delete. If empty then skipped other index |
300 | * maintenance is skipped. |
301 | * @param int[]|string[] $docIds List of elasticsearch document ids to delete |
302 | * @param string|null $indexSuffix index from which to delete. null means all. |
303 | * @param array $writeJobParams Parameters passed on to ElasticaWriteJob |
304 | */ |
305 | public function deletePages( $titles, $docIds, $indexSuffix = null, array $writeJobParams = [] ): void { |
306 | Job\OtherIndex::queueIfRequired( $this->connection->getConfig(), $titles, $this->writeToClusterName ); |
307 | |
308 | // Deletes are fairly cheap to send, they can be batched in larger |
309 | // chunks. Unlikely a batch this large ever comes through. |
310 | $batchSize = 50; |
311 | $this->pushElasticaWriteJobs( |
312 | UpdateGroup::PAGE, |
313 | $docIds, |
314 | static function ( array $chunk, ClusterSettings $cluster ) use ( $indexSuffix, $writeJobParams ) { |
315 | return Job\ElasticaWrite::build( |
316 | $cluster, |
317 | UpdateGroup::PAGE, |
318 | 'sendDeletes', |
319 | [ $chunk, $indexSuffix ], |
320 | $writeJobParams |
321 | ); |
322 | }, |
323 | $batchSize |
324 | ); |
325 | } |
326 | |
327 | /** |
328 | * Add documents to archive index. |
329 | * @param array $archived |
330 | * @return bool |
331 | */ |
332 | public function archivePages( $archived ) { |
333 | if ( !$this->connection->getConfig()->getElement( 'CirrusSearchIndexDeletes' ) ) { |
334 | // Disabled by config - don't do anything |
335 | return true; |
336 | } |
337 | $docs = $this->buildArchiveDocuments( $archived ); |
338 | $this->pushElasticaWriteJobs( |
339 | UpdateGroup::ARCHIVE, |
340 | $docs, |
341 | static function ( array $chunk, ClusterSettings $cluster ) { |
342 | return Job\ElasticaWrite::build( |
343 | $cluster, |
344 | UpdateGroup::ARCHIVE, |
345 | 'sendData', |
346 | [ Connection::ARCHIVE_INDEX_SUFFIX, $chunk ], |
347 | [ 'private_data' => true ], |
348 | ); |
349 | } ); |
350 | |
351 | return true; |
352 | } |
353 | |
354 | /** |
355 | * Build Elastica documents for archived pages. |
356 | * @param array $archived |
357 | * @return \Elastica\Document[] |
358 | */ |
359 | private function buildArchiveDocuments( array $archived ) { |
360 | $docs = []; |
361 | foreach ( $archived as $delete ) { |
362 | if ( !isset( $delete['title'] ) ) { |
363 | // These come from pages that still exist, but are redirects. |
364 | // This is non-obvious and we probably need a better way... |
365 | continue; |
366 | } |
367 | /** @var Title $title */ |
368 | $title = $delete['title']; |
369 | $doc = new \Elastica\Document( $delete['page'], [ |
370 | 'namespace' => $title->getNamespace(), |
371 | 'title' => $title->getText(), |
372 | 'wiki' => WikiMap::getCurrentWikiId(), |
373 | ] ); |
374 | $doc->setDocAsUpsert( true ); |
375 | $doc->setRetryOnConflict( $this->connection->getConfig()->getElement( 'CirrusSearchUpdateConflictRetryCount' ) ); |
376 | |
377 | $docs[] = $doc; |
378 | } |
379 | |
380 | return $docs; |
381 | } |
382 | |
383 | /** |
384 | * Update the search index for newly linked or unlinked articles. |
385 | * @param Title[] $titles titles to update |
386 | */ |
387 | public function updateLinkedArticles( $titles ): void { |
388 | $pages = []; |
389 | $wikiPageFactory = MediaWikiServices::getInstance()->getWikiPageFactory(); |
390 | foreach ( $titles as $title ) { |
391 | // Special pages don't get updated, we only index |
392 | // actual existing pages. |
393 | if ( !$title || !$title->canExist() ) { |
394 | continue; |
395 | } |
396 | |
397 | $page = $wikiPageFactory->newFromTitle( $title ); |
398 | if ( $page === null || !$page->exists() ) { |
399 | // Skip link to nonexistent page. |
400 | continue; |
401 | } |
402 | // Resolve one level of redirects because only one level of redirects is scored. |
403 | if ( $page->isRedirect() ) { |
404 | $target = $page->getRedirectTarget(); |
405 | if ( $target === null ) { |
406 | // Redirect to itself or broken redirect? ignore. |
407 | continue; |
408 | } |
409 | if ( !$target->exists() ) { |
410 | // Skip redirects to nonexistent pages |
411 | continue; |
412 | } |
413 | $page = $wikiPageFactory->newFromTitle( $target ); |
414 | } |
415 | if ( $page->isRedirect() ) { |
416 | // This is a redirect to a redirect which doesn't count in the search score any way. |
417 | continue; |
418 | } |
419 | if ( in_array( $title->getFullText(), $this->updated ) ) { |
420 | // We've already updated this page in this process so there is no need to update it again. |
421 | continue; |
422 | } |
423 | // Note that we don't add this page to the list of updated pages because this update isn't |
424 | // a full update (just link counts). |
425 | $pages[] = $page; |
426 | } |
427 | $this->updatePages( $pages, BuildDocument::SKIP_PARSE ); |
428 | } |
429 | |
430 | /** |
431 | * Convert an array of pages to an array of their titles. |
432 | * |
433 | * @param WikiPage[] $pages |
434 | * @return Title[] |
435 | */ |
436 | private function pagesToTitles( $pages ) { |
437 | $titles = []; |
438 | foreach ( $pages as $page ) { |
439 | $titles[] = $page->getTitle(); |
440 | } |
441 | return $titles; |
442 | } |
443 | |
444 | /** |
445 | * @param string $updateGroup UpdateGroup::* constant |
446 | * @param mixed[] $items |
447 | * @param callable $factory |
448 | * @param int $batchSize |
449 | */ |
450 | protected function pushElasticaWriteJobs( string $updateGroup, array $items, $factory, int $batchSize = 10 ): void { |
451 | // Elasticsearch has a queue capacity of 50 so if $documents contains 50 pages it could bump up |
452 | // against the max. So we chunk it and do them sequentially. |
453 | $jobs = []; |
454 | $config = $this->connection->getConfig(); |
455 | $clusters = $this->elasticaWriteClusters( $updateGroup ); |
456 | |
457 | foreach ( array_chunk( $items, $batchSize ) as $chunked ) { |
458 | // Queueing one job per cluster ensures isolation. If one clusters falls |
459 | // behind on writes the others shouldn't notice. |
460 | // Unfortunately queueing a job per cluster results in quite a few |
461 | // jobs to run. If the job queue can't keep up some clusters can |
462 | // be run in-process. Any failures will queue themselves for later |
463 | // execution. |
464 | foreach ( $clusters as $cluster ) { |
465 | $clusterSettings = new ClusterSettings( $config, $cluster ); |
466 | $job = $factory( $chunked, $clusterSettings ); |
467 | if ( $clusterSettings->isIsolated() ) { |
468 | $jobs[] = $job; |
469 | } else { |
470 | $job->run(); |
471 | } |
472 | } |
473 | } |
474 | |
475 | if ( $jobs ) { |
476 | MediaWikiServices::getInstance()->getJobQueueGroup()->push( $jobs ); |
477 | } |
478 | } |
479 | |
480 | private function elasticaWriteClusters( string $updateGroup ): array { |
481 | if ( $this->writeToClusterName !== null ) { |
482 | return [ $this->writeToClusterName ]; |
483 | } else { |
484 | return $this->connection |
485 | ->getConfig() |
486 | ->getClusterAssignment() |
487 | ->getWritableClusters( $updateGroup ); |
488 | } |
489 | } |
490 | |
491 | /** |
492 | * @param string $description |
493 | * @param string $queryType |
494 | * @param string[] $extra |
495 | * @return SearchRequestLog |
496 | */ |
497 | protected function newLog( $description, $queryType, array $extra = [] ) { |
498 | return new SearchRequestLog( |
499 | $this->connection->getClient(), |
500 | $description, |
501 | $queryType, |
502 | $extra |
503 | ); |
504 | } |
505 | } |