Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
10.20% covered (danger)
10.20%
10 / 98
11.11% covered (danger)
11.11%
1 / 9
CRAP
0.00% covered (danger)
0.00%
0 / 1
MWElasticUtils
10.31% covered (danger)
10.31%
10 / 97
11.11% covered (danger)
11.11%
1 / 9
552.98
0.00% covered (danger)
0.00%
0 / 1
 withRetry
75.00% covered (warning)
75.00%
9 / 12
0.00% covered (danger)
0.00%
0 / 1
6.56
 backoffDelay
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getIndexHealth
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 waitForGreen
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
30
 deleteByQuery
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 increasingDelay
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 deleteByQueryWithStatus
0.00% covered (danger)
0.00%
0 / 52
0.00% covered (danger)
0.00%
0 / 1
42
 fetchClusterName
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 isFrozen
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3namespace MediaWiki\Extension\Elastica;
4
5use Elastica\Client;
6use Elastica\Index;
7use Elastica\Query;
8use Elastica\Task;
9use Elasticsearch\Endpoints\Cluster\Health;
10use Exception;
11use Generator;
12use MediaWiki\Json\FormatJson;
13use MediaWiki\Logger\LoggerFactory;
14use MediaWiki\Utils\MWTimestamp;
15use RuntimeException;
16
17/**
18 * This program is free software; you can redistribute it and/or modify
19 * it under the terms of the GNU General Public License as published by
20 * the Free Software Foundation; either version 2 of the License, or
21 * (at your option) any later version.
22 *
23 * This program is distributed in the hope that it will be useful,
24 * but WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26 * GNU General Public License for more details.
27 *
28 * You should have received a copy of the GNU General Public License along
29 * with this program; if not, write to the Free Software Foundation, Inc.,
30 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
31 *
32 */
33
34/**
35 * Utility class
36 */
37class MWElasticUtils {
38
39    private const ONE_SEC_IN_MICROSEC = 1000000;
40
41    /**
42     * A function that retries callback $func if it throws an exception.
43     * The $beforeRetry is called before a retry and receives the underlying
44     * ExceptionInterface object and the number of failed attempts.
45     * It's generally used to log and sleep between retries. Default behaviour
46     * is to sleep with a random backoff.
47     * @see Util::backoffDelay
48     *
49     * @param int $attempts the number of times we retry
50     * @param callable $func
51     * @param callable|null $beforeRetry function called before each retry
52     * @return mixed
53     */
54    public static function withRetry( $attempts, $func, $beforeRetry = null ) {
55        $errors = 0;
56        while ( true ) {
57            if ( $errors < $attempts ) {
58                try {
59                    return $func();
60                } catch ( Exception $e ) {
61                    $errors++;
62                    if ( $beforeRetry ) {
63                        $beforeRetry( $e, $errors );
64                    } elseif ( !defined( 'MW_PHPUNIT_TEST' ) ) {
65                        $seconds = static::backoffDelay( $errors );
66                        usleep( (int)( $seconds * self::ONE_SEC_IN_MICROSEC ) );
67                    }
68                }
69            } else {
70                return $func();
71            }
72        }
73    }
74
75    /**
76     * Backoff with lowest possible upper bound as 16 seconds.
77     * With the default maximum number of errors (5) this maxes out at 256 seconds.
78     *
79     * @param int $errorCount
80     * @return int
81     */
82    public static function backoffDelay( $errorCount ) {
83        return rand( 1, (int)pow( 2, 3 + $errorCount ) );
84    }
85
86    /**
87     * Get index health
88     *
89     * @param Client $client
90     * @param string $indexName
91     * @return array the index health status
92     */
93    public static function getIndexHealth( Client $client, $indexName ) {
94        $endpoint = new Health;
95        $endpoint->setIndex( $indexName );
96        $response = $client->requestEndpoint( $endpoint );
97        if ( $response->hasError() ) {
98            throw new RuntimeException( "Error while fetching index health status: " . $response->getError() );
99        }
100        return $response->getData();
101    }
102
103    /**
104     * Wait for the index to go green
105     *
106     * @param Client $client
107     * @param string $indexName Name of index to wait for
108     * @param int $timeout In seconds
109     * @return Generator|string[]|bool Returns a generator. Generator yields
110     *  string status messages. Generator return value is true if the index is
111     *  green false otherwise.
112     */
113    public static function waitForGreen( Client $client, $indexName, $timeout ) {
114        $startTime = time();
115        while ( ( $startTime + $timeout ) > time() ) {
116            try {
117                $response = self::getIndexHealth( $client, $indexName );
118                $status = $response['status'] ?? 'unknown';
119                if ( $status === 'green' ) {
120                    yield "\tGreen!";
121                    return true;
122                }
123                yield "\tIndex is $status retrying...";
124                if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
125                    sleep( 5 );
126                }
127            } catch ( \Exception $e ) {
128                yield "Error while waiting for green ({$e->getMessage()}), retrying...";
129            }
130        }
131        return false;
132    }
133
134    /**
135     * Delete docs by query and wait for it to complete via tasks api.
136     *
137     * @param Index $index the source index
138     * @param Query $query the query
139     * @param bool $allowConflicts When true documents updated since starting
140     *  the query will not be deleted, and will not fail the delete-by-query. When
141     *  false (default) the updated document will not be deleted and the delete-by-query
142     *  will abort. Deletes are not transactional, some subset of matching documents
143     *  will have been deleted.
144     * @param int $reportEveryNumSec Log task status on this interval of seconds
145     * @return Task Generator returns the Task instance on completion.
146     */
147    public static function deleteByQuery(
148        Index $index,
149        Query $query,
150        $allowConflicts = false,
151        $reportEveryNumSec = 300
152    ) {
153        $gen = self::deleteByQueryWithStatus( $index, $query, $allowConflicts, $reportEveryNumSec );
154        // @phan-suppress-next-line PhanTypeNoAccessiblePropertiesForeach always a generator object
155        foreach ( $gen as $status ) {
156            // We don't need these status updates. But we need to iterate
157            // the generator until it is done.
158        }
159        return $gen->getReturn();
160    }
161
162    /**
163     * @param float $minDelay Starting value of generator
164     * @param float $maxDelay Maximum value to return
165     * @param float $increaseByRatio Increase by this ratio on each iteration, up to $maxDelay
166     * @return Generator|float[] Returns a generator. Generator yields floats between
167     *  $minDelay and $maxDelay
168     * @suppress PhanInfiniteLoop
169     */
170    private static function increasingDelay( $minDelay, $maxDelay, $increaseByRatio = 1.5 ) {
171        $delay = $minDelay;
172        while ( true ) {
173            yield $delay;
174            $delay = min( $delay * $increaseByRatio, $maxDelay );
175        }
176    }
177
178    /**
179     * Delete docs by query and wait for it to complete via tasks api. This
180     * method returns a generator which must be iterated on at least once
181     * or the deletion will not occur.
182     *
183     * Client code that doesn't care about the result or when the deleteByQuery
184     * completes are safe to call next( $gen ) a single time to start the deletion,
185     * and then throw away the generator. Note that logging about how long the task
186     * has been running will not be logged if the generator is not iterated.
187     *
188     * @param Index $index the source index
189     * @param Query $query the query
190     * @param bool $allowConflicts When true documents updated since starting
191     *  the query will not be deleted, and will not fail the delete-by-query. When
192     *  false (default) the updated document will not be deleted and the delete-by-query
193     *  will abort. Deletes are not transactional, some subset of matching documents
194     *  will have been deleted.
195     * @param int $reportEveryNumSec Log task status on this interval of seconds
196     * @return \Generator|array[]|\Elastica\Task Returns a generator. Generator yields
197     *  arrays containing task status responses. Generator returns the Task instance
198     *  on completion via Generator::getReturn.
199     */
200    public static function deleteByQueryWithStatus(
201        Index $index,
202        Query $query,
203        $allowConflicts = false,
204        $reportEveryNumSec = 300
205    ) {
206        $params = [
207            'wait_for_completion' => 'false',
208            'scroll' => '15m',
209        ];
210        if ( $allowConflicts ) {
211            $params['conflicts'] = 'proceed';
212        }
213        $response = $index->deleteByQuery( $query, $params )->getData();
214        if ( !isset( $response['task'] ) ) {
215            throw new RuntimeException( 'No task returned: ' . var_export( $response, true ) );
216        }
217        $log = LoggerFactory::getInstance( 'Elastica' );
218        $clusterName = self::fetchClusterName( $index->getClient() );
219        $logContext = [
220            'index' => $index->getName(),
221            'cluster' => $clusterName,
222            'taskId' => $response['task'],
223        ];
224        $logPrefix = 'deleteByQuery against [{index}] on cluster [{cluster}] with task id [{taskId}]';
225        $log->info( "$logPrefix starting", $logContext + [
226            'elastic_query' => FormatJson::encode( $query->toArray() )
227        ] );
228
229        // Log tasks running longer than 10 minutes to help track down job runner
230        // timeouts that occur after 20 minutes. T219234
231        $start = MWTimestamp::time();
232        $reportAfter = $start + $reportEveryNumSec;
233        $task = new \Elastica\Task(
234            $index->getClient(),
235            $response['task'] );
236        $delay = self::increasingDelay( 0.05, 5 );
237        while ( !$task->isCompleted() ) {
238            $now = MWTimestamp::time();
239            if ( $now >= $reportAfter ) {
240                $reportAfter = $now + $reportEveryNumSec;
241                $log->warning( "$logPrefix still running after [{runtime}] seconds", $logContext + [
242                    'runtime' => $now - $start,
243                    // json encode to ensure we don't add a bunch of properties in
244                    // logstash, we only really need the content and this will still be
245                    // searchable.
246                    'status' => FormatJson::encode( $task->getData() ),
247                ] );
248            }
249            yield $task->getData();
250            $delay->next();
251            usleep( (int)( $delay->current() * self::ONE_SEC_IN_MICROSEC ) );
252            $task->refresh();
253        }
254
255        $now = MWTimestamp::time();
256        $taskCompleteResponse = $task->getData()['response'];
257        if ( $taskCompleteResponse['failures'] ) {
258            $log->error( "$logPrefix failed", $logContext + [
259                'runtime' => $now - $start,
260                'status' => FormatJson::encode( $task->getData() ),
261            ] );
262            throw new RuntimeException( 'Failed deleteByQuery: '
263                . implode( ', ', $taskCompleteResponse['failures'] ) );
264        }
265
266        $log->info( "$logPrefix completed", $logContext + [
267            'runtime' => $now - $start,
268            'status' => FormatJson::encode( $task->getData() ),
269        ] );
270
271        return $task;
272    }
273
274    /**
275     * Fetch the name of the cluster client is communicating with.
276     *
277     * @param Client $client Elasticsearch client to fetch name for
278     * @return string Name of cluster $client is communicating with
279     */
280    public static function fetchClusterName( Client $client ) {
281        $response = $client->requestEndpoint( new \Elasticsearch\Endpoints\Info );
282        if ( $response->getStatus() !== 200 ) {
283            throw new RuntimeException(
284                "Failed requesting cluster name, got status code [{$response->getStatus()}]" );
285        }
286        return $response->getData()['cluster_name'];
287    }
288
289    /**
290     * @param Client $client
291     * @return bool True when no writes should be sent via $client
292     * @deprecated (always returns false)
293     */
294    public static function isFrozen( Client $client ) {
295        return false;
296    }
297}
298
299class_alias( MWElasticUtils::class, 'MWElasticUtils' );