Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
11.46% covered (danger)
11.46%
11 / 96
11.11% covered (danger)
11.11%
1 / 9
CRAP
0.00% covered (danger)
0.00%
0 / 1
MWElasticUtils
11.58% covered (danger)
11.58%
11 / 95
11.11% covered (danger)
11.11%
1 / 9
457.06
0.00% covered (danger)
0.00%
0 / 1
 withRetry
90.91% covered (success)
90.91%
10 / 11
0.00% covered (danger)
0.00%
0 / 1
5.02
 backoffDelay
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getIndexHealth
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 waitForGreen
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
20
 deleteByQuery
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 increasingDelay
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 deleteByQueryWithStatus
0.00% covered (danger)
0.00%
0 / 52
0.00% covered (danger)
0.00%
0 / 1
42
 fetchClusterName
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 isFrozen
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3namespace MediaWiki\Extension\Elastica;
4
5use Elastica\Client;
6use Elastica\Index;
7use Elastica\Query;
8use Elastica\Task;
9use Elasticsearch\Endpoints\Cluster\Health;
10use Exception;
11use Generator;
12use MediaWiki\Json\FormatJson;
13use MediaWiki\Logger\LoggerFactory;
14use MediaWiki\Utils\MWTimestamp;
15use RuntimeException;
16
17/**
18 * This program is free software; you can redistribute it and/or modify
19 * it under the terms of the GNU General Public License as published by
20 * the Free Software Foundation; either version 2 of the License, or
21 * (at your option) any later version.
22 *
23 * This program is distributed in the hope that it will be useful,
24 * but WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26 * GNU General Public License for more details.
27 *
28 * You should have received a copy of the GNU General Public License along
29 * with this program; if not, write to the Free Software Foundation, Inc.,
30 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
31 *
32 */
33
34/**
35 * Utility class
36 */
37class MWElasticUtils {
38
39    private const ONE_SEC_IN_MICROSEC = 1000000;
40
41    /**
42     * A function that retries callback $func if it throws an exception.
43     * The $beforeRetry is called before a retry and receives the underlying
44     * ExceptionInterface object and the number of failed attempts.
45     * It's generally used to log and sleep between retries. Default behaviour
46     * is to sleep with a random backoff.
47     * @see Util::backoffDelay
48     *
49     * @param int $attempts the number of times we retry
50     * @param callable $func
51     * @param callable|null $beforeRetry function called before each retry
52     * @return mixed
53     */
54    public static function withRetry( $attempts, $func, $beforeRetry = null ) {
55        $errors = 0;
56        while ( true ) {
57            if ( $errors < $attempts ) {
58                try {
59                    return $func();
60                } catch ( Exception $e ) {
61                    $errors++;
62                    if ( $beforeRetry ) {
63                        $beforeRetry( $e, $errors );
64                    } else {
65                        $seconds = static::backoffDelay( $errors );
66                        usleep( (int)( $seconds * self::ONE_SEC_IN_MICROSEC ) );
67                    }
68                }
69            } else {
70                return $func();
71            }
72        }
73    }
74
75    /**
76     * Backoff with lowest possible upper bound as 16 seconds.
77     * With the default maximum number of errors (5) this maxes out at 256 seconds.
78     *
79     * @param int $errorCount
80     * @return int
81     */
82    public static function backoffDelay( $errorCount ) {
83        return rand( 1, (int)pow( 2, 3 + $errorCount ) );
84    }
85
86    /**
87     * Get index health
88     *
89     * @param Client $client
90     * @param string $indexName
91     * @return array the index health status
92     */
93    public static function getIndexHealth( Client $client, $indexName ) {
94        $endpoint = new Health;
95        $endpoint->setIndex( $indexName );
96        $response = $client->requestEndpoint( $endpoint );
97        if ( $response->hasError() ) {
98            throw new RuntimeException( "Error while fetching index health status: " . $response->getError() );
99        }
100        return $response->getData();
101    }
102
103    /**
104     * Wait for the index to go green
105     *
106     * @param Client $client
107     * @param string $indexName Name of index to wait for
108     * @param int $timeout In seconds
109     * @return Generator|string[]|bool Returns a generator. Generator yields
110     *  string status messages. Generator return value is true if the index is
111     *  green false otherwise.
112     */
113    public static function waitForGreen( Client $client, $indexName, $timeout ) {
114        $startTime = time();
115        while ( ( $startTime + $timeout ) > time() ) {
116            try {
117                $response = self::getIndexHealth( $client, $indexName );
118                $status = $response['status'] ?? 'unknown';
119                if ( $status === 'green' ) {
120                    yield "\tGreen!";
121                    return true;
122                }
123                yield "\tIndex is $status retrying...";
124                sleep( 5 );
125            } catch ( \Exception $e ) {
126                yield "Error while waiting for green ({$e->getMessage()}), retrying...";
127            }
128        }
129        return false;
130    }
131
132    /**
133     * Delete docs by query and wait for it to complete via tasks api.
134     *
135     * @param Index $index the source index
136     * @param Query $query the query
137     * @param bool $allowConflicts When true documents updated since starting
138     *  the query will not be deleted, and will not fail the delete-by-query. When
139     *  false (default) the updated document will not be deleted and the delete-by-query
140     *  will abort. Deletes are not transactional, some subset of matching documents
141     *  will have been deleted.
142     * @param int $reportEveryNumSec Log task status on this interval of seconds
143     * @return Task Generator returns the Task instance on completion.
144     */
145    public static function deleteByQuery(
146        Index $index,
147        Query $query,
148        $allowConflicts = false,
149        $reportEveryNumSec = 300
150    ) {
151        $gen = self::deleteByQueryWithStatus( $index, $query, $allowConflicts, $reportEveryNumSec );
152        // @phan-suppress-next-line PhanTypeNoAccessiblePropertiesForeach always a generator object
153        foreach ( $gen as $status ) {
154            // We don't need these status updates. But we need to iterate
155            // the generator until it is done.
156        }
157        return $gen->getReturn();
158    }
159
160    /**
161     * @param float $minDelay Starting value of generator
162     * @param float $maxDelay Maximum value to return
163     * @param float $increaseByRatio Increase by this ratio on each iteration, up to $maxDelay
164     * @return Generator|float[] Returns a generator. Generator yields floats between
165     *  $minDelay and $maxDelay
166     * @suppress PhanInfiniteLoop
167     */
168    private static function increasingDelay( $minDelay, $maxDelay, $increaseByRatio = 1.5 ) {
169        $delay = $minDelay;
170        while ( true ) {
171            yield $delay;
172            $delay = min( $delay * $increaseByRatio, $maxDelay );
173        }
174    }
175
176    /**
177     * Delete docs by query and wait for it to complete via tasks api. This
178     * method returns a generator which must be iterated on at least once
179     * or the deletion will not occur.
180     *
181     * Client code that doesn't care about the result or when the deleteByQuery
182     * completes are safe to call next( $gen ) a single time to start the deletion,
183     * and then throw away the generator. Note that logging about how long the task
184     * has been running will not be logged if the generator is not iterated.
185     *
186     * @param Index $index the source index
187     * @param Query $query the query
188     * @param bool $allowConflicts When true documents updated since starting
189     *  the query will not be deleted, and will not fail the delete-by-query. When
190     *  false (default) the updated document will not be deleted and the delete-by-query
191     *  will abort. Deletes are not transactional, some subset of matching documents
192     *  will have been deleted.
193     * @param int $reportEveryNumSec Log task status on this interval of seconds
194     * @return \Generator|array[]|\Elastica\Task Returns a generator. Generator yields
195     *  arrays containing task status responses. Generator returns the Task instance
196     *  on completion via Generator::getReturn.
197     */
198    public static function deleteByQueryWithStatus(
199        Index $index,
200        Query $query,
201        $allowConflicts = false,
202        $reportEveryNumSec = 300
203    ) {
204        $params = [
205            'wait_for_completion' => 'false',
206            'scroll' => '15m',
207        ];
208        if ( $allowConflicts ) {
209            $params['conflicts'] = 'proceed';
210        }
211        $response = $index->deleteByQuery( $query, $params )->getData();
212        if ( !isset( $response['task'] ) ) {
213            throw new RuntimeException( 'No task returned: ' . var_export( $response, true ) );
214        }
215        $log = LoggerFactory::getInstance( 'Elastica' );
216        $clusterName = self::fetchClusterName( $index->getClient() );
217        $logContext = [
218            'index' => $index->getName(),
219            'cluster' => $clusterName,
220            'taskId' => $response['task'],
221        ];
222        $logPrefix = 'deleteByQuery against [{index}] on cluster [{cluster}] with task id [{taskId}]';
223        $log->info( "$logPrefix starting", $logContext + [
224            'elastic_query' => FormatJson::encode( $query->toArray() )
225        ] );
226
227        // Log tasks running longer than 10 minutes to help track down job runner
228        // timeouts that occur after 20 minutes. T219234
229        $start = MWTimestamp::time();
230        $reportAfter = $start + $reportEveryNumSec;
231        $task = new \Elastica\Task(
232            $index->getClient(),
233            $response['task'] );
234        $delay = self::increasingDelay( 0.05, 5 );
235        while ( !$task->isCompleted() ) {
236            $now = MWTimestamp::time();
237            if ( $now >= $reportAfter ) {
238                $reportAfter = $now + $reportEveryNumSec;
239                $log->warning( "$logPrefix still running after [{runtime}] seconds", $logContext + [
240                    'runtime' => $now - $start,
241                    // json encode to ensure we don't add a bunch of properties in
242                    // logstash, we only really need the content and this will still be
243                    // searchable.
244                    'status' => FormatJson::encode( $task->getData() ),
245                ] );
246            }
247            yield $task->getData();
248            $delay->next();
249            usleep( (int)( $delay->current() * self::ONE_SEC_IN_MICROSEC ) );
250            $task->refresh();
251        }
252
253        $now = MWTimestamp::time();
254        $taskCompleteResponse = $task->getData()['response'];
255        if ( $taskCompleteResponse['failures'] ) {
256            $log->error( "$logPrefix failed", $logContext + [
257                'runtime' => $now - $start,
258                'status' => FormatJson::encode( $task->getData() ),
259            ] );
260            throw new RuntimeException( 'Failed deleteByQuery: '
261                . implode( ', ', $taskCompleteResponse['failures'] ) );
262        }
263
264        $log->info( "$logPrefix completed", $logContext + [
265            'runtime' => $now - $start,
266            'status' => FormatJson::encode( $task->getData() ),
267        ] );
268
269        return $task;
270    }
271
272    /**
273     * Fetch the name of the cluster client is communicating with.
274     *
275     * @param Client $client Elasticsearch client to fetch name for
276     * @return string Name of cluster $client is communicating with
277     */
278    public static function fetchClusterName( Client $client ) {
279        $response = $client->requestEndpoint( new \Elasticsearch\Endpoints\Info );
280        if ( $response->getStatus() !== 200 ) {
281            throw new RuntimeException(
282                "Failed requesting cluster name, got status code [{$response->getStatus()}]" );
283        }
284        return $response->getData()['cluster_name'];
285    }
286
287    /**
288     * @param Client $client
289     * @return bool True when no writes should be sent via $client
290     * @deprecated (always returns false)
291     */
292    public static function isFrozen( Client $client ) {
293        return false;
294    }
295}
296
297class_alias( MWElasticUtils::class, 'MWElasticUtils' );