Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
11.46% covered (danger)
11.46%
11 / 96
11.11% covered (danger)
11.11%
1 / 9
CRAP
0.00% covered (danger)
0.00%
0 / 1
MWElasticUtils
11.58% covered (danger)
11.58%
11 / 95
11.11% covered (danger)
11.11%
1 / 9
457.06
0.00% covered (danger)
0.00%
0 / 1
 withRetry
90.91% covered (success)
90.91%
10 / 11
0.00% covered (danger)
0.00%
0 / 1
5.02
 backoffDelay
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getIndexHealth
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 waitForGreen
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
20
 deleteByQuery
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 increasingDelay
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 deleteByQueryWithStatus
0.00% covered (danger)
0.00%
0 / 52
0.00% covered (danger)
0.00%
0 / 1
42
 fetchClusterName
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 isFrozen
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3namespace MediaWiki\Extension\Elastica;
4
5use Elastica\Client;
6use Elastica\Index;
7use Elastica\Query;
8use Elastica\Task;
9use Elasticsearch\Endpoints\Cluster\Health;
10use Exception;
11use FormatJson;
12use Generator;
13use MediaWiki\Logger\LoggerFactory;
14use MWTimestamp;
15
16/**
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License along
28 * with this program; if not, write to the Free Software Foundation, Inc.,
29 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
30 *
31 */
32
33/**
34 * Utility class
35 */
36class MWElasticUtils {
37
38    private const ONE_SEC_IN_MICROSEC = 1000000;
39
40    /**
41     * A function that retries callback $func if it throws an exception.
42     * The $beforeRetry is called before a retry and receives the underlying
43     * ExceptionInterface object and the number of failed attempts.
44     * It's generally used to log and sleep between retries. Default behaviour
45     * is to sleep with a random backoff.
46     * @see Util::backoffDelay
47     *
48     * @param int $attempts the number of times we retry
49     * @param callable $func
50     * @param callable|null $beforeRetry function called before each retry
51     * @return mixed
52     */
53    public static function withRetry( $attempts, $func, $beforeRetry = null ) {
54        $errors = 0;
55        while ( true ) {
56            if ( $errors < $attempts ) {
57                try {
58                    return $func();
59                } catch ( Exception $e ) {
60                    $errors++;
61                    if ( $beforeRetry ) {
62                        $beforeRetry( $e, $errors );
63                    } else {
64                        $seconds = static::backoffDelay( $errors );
65                        usleep( $seconds * self::ONE_SEC_IN_MICROSEC );
66                    }
67                }
68            } else {
69                return $func();
70            }
71        }
72    }
73
74    /**
75     * Backoff with lowest possible upper bound as 16 seconds.
76     * With the default maximum number of errors (5) this maxes out at 256 seconds.
77     *
78     * @param int $errorCount
79     * @return int
80     */
81    public static function backoffDelay( $errorCount ) {
82        return rand( 1, (int)pow( 2, 3 + $errorCount ) );
83    }
84
85    /**
86     * Get index health
87     *
88     * @param Client $client
89     * @param string $indexName
90     * @return array the index health status
91     */
92    public static function getIndexHealth( Client $client, $indexName ) {
93        $endpoint = new Health;
94        $endpoint->setIndex( $indexName );
95        $response = $client->requestEndpoint( $endpoint );
96        if ( $response->hasError() ) {
97            throw new \Exception( "Error while fetching index health status: " . $response->getError() );
98        }
99        return $response->getData();
100    }
101
102    /**
103     * Wait for the index to go green
104     *
105     * @param Client $client
106     * @param string $indexName Name of index to wait for
107     * @param int $timeout In seconds
108     * @return Generator|string[]|bool Returns a generator. Generator yields
109     *  string status messages. Generator return value is true if the index is
110     *  green false otherwise.
111     */
112    public static function waitForGreen( Client $client, $indexName, $timeout ) {
113        $startTime = time();
114        while ( ( $startTime + $timeout ) > time() ) {
115            try {
116                $response = self::getIndexHealth( $client, $indexName );
117                $status = $response['status'] ?? 'unknown';
118                if ( $status === 'green' ) {
119                    yield "\tGreen!";
120                    return true;
121                }
122                yield "\tIndex is $status retrying...";
123                sleep( 5 );
124            } catch ( \Exception $e ) {
125                yield "Error while waiting for green ({$e->getMessage()}), retrying...";
126            }
127        }
128        return false;
129    }
130
131    /**
132     * Delete docs by query and wait for it to complete via tasks api.
133     *
134     * @param Index $index the source index
135     * @param Query $query the query
136     * @param bool $allowConflicts When true documents updated since starting
137     *  the query will not be deleted, and will not fail the delete-by-query. When
138     *  false (default) the updated document will not be deleted and the delete-by-query
139     *  will abort. Deletes are not transactional, some subset of matching documents
140     *  will have been deleted.
141     * @param int $reportEveryNumSec Log task status on this interval of seconds
142     * @return Task Generator returns the Task instance on completion.
143     * @throws Exception when task reports failures
144     */
145    public static function deleteByQuery(
146        Index $index,
147        Query $query,
148        $allowConflicts = false,
149        $reportEveryNumSec = 300
150    ) {
151        $gen = self::deleteByQueryWithStatus( $index, $query, $allowConflicts, $reportEveryNumSec );
152        // @phan-suppress-next-line PhanTypeNoAccessiblePropertiesForeach always a generator object
153        foreach ( $gen as $status ) {
154            // We don't need these status updates. But we need to iterate
155            // the generator until it is done.
156        }
157        return $gen->getReturn();
158    }
159
160    /**
161     * @param float $minDelay Starting value of generator
162     * @param float $maxDelay Maximum value to return
163     * @param float $increaseByRatio Increase by this ratio on each iteration, up to $maxDelay
164     * @return Generator|float[] Returns a generator. Generator yields floats between
165     *  $minDelay and $maxDelay
166     * @suppress PhanInfiniteLoop
167     */
168    private static function increasingDelay( $minDelay, $maxDelay, $increaseByRatio = 1.5 ) {
169        $delay = $minDelay;
170        while ( true ) {
171            yield $delay;
172            $delay = min( $delay * $increaseByRatio, $maxDelay );
173        }
174    }
175
176    /**
177     * Delete docs by query and wait for it to complete via tasks api. This
178     * method returns a generator which must be iterated on at least once
179     * or the deletion will not occur.
180     *
181     * Client code that doesn't care about the result or when the deleteByQuery
182     * completes are safe to call next( $gen ) a single time to start the deletion,
183     * and then throw away the generator. Note that logging about how long the task
184     * has been running will not be logged if the generator is not iterated.
185     *
186     * @param Index $index the source index
187     * @param Query $query the query
188     * @param bool $allowConflicts When true documents updated since starting
189     *  the query will not be deleted, and will not fail the delete-by-query. When
190     *  false (default) the updated document will not be deleted and the delete-by-query
191     *  will abort. Deletes are not transactional, some subset of matching documents
192     *  will have been deleted.
193     * @param int $reportEveryNumSec Log task status on this interval of seconds
194     * @return \Generator|array[]|\Elastica\Task Returns a generator. Generator yields
195     *  arrays containing task status responses. Generator returns the Task instance
196     *  on completion via Generator::getReturn.
197     * @throws Exception when task reports failures
198     */
199    public static function deleteByQueryWithStatus(
200        Index $index,
201        Query $query,
202        $allowConflicts = false,
203        $reportEveryNumSec = 300
204    ) {
205        $params = [
206            'wait_for_completion' => 'false',
207            'scroll' => '15m',
208        ];
209        if ( $allowConflicts ) {
210            $params['conflicts'] = 'proceed';
211        }
212        $response = $index->deleteByQuery( $query, $params )->getData();
213        if ( !isset( $response['task'] ) ) {
214            throw new \Exception( 'No task returned: ' . var_export( $response, true ) );
215        }
216        $log = LoggerFactory::getInstance( 'Elastica' );
217        $clusterName = self::fetchClusterName( $index->getClient() );
218        $logContext = [
219            'index' => $index->getName(),
220            'cluster' => $clusterName,
221            'taskId' => $response['task'],
222        ];
223        $logPrefix = 'deleteByQuery against [{index}] on cluster [{cluster}] with task id [{taskId}]';
224        $log->info( "$logPrefix starting", $logContext + [
225            'elastic_query' => FormatJson::encode( $query->toArray() )
226        ] );
227
228        // Log tasks running longer than 10 minutes to help track down job runner
229        // timeouts that occur after 20 minutes. T219234
230        $start = MWTimestamp::time();
231        $reportAfter = $start + $reportEveryNumSec;
232        $task = new \Elastica\Task(
233            $index->getClient(),
234            $response['task'] );
235        $delay = self::increasingDelay( 0.05, 5 );
236        while ( !$task->isCompleted() ) {
237            $now = MWTimestamp::time();
238            if ( $now >= $reportAfter ) {
239                $reportAfter = $now + $reportEveryNumSec;
240                $log->warning( "$logPrefix still running after [{runtime}] seconds", $logContext + [
241                    'runtime' => $now - $start,
242                    // json encode to ensure we don't add a bunch of properties in
243                    // logstash, we only really need the content and this will still be
244                    // searchable.
245                    'status' => FormatJson::encode( $task->getData() ),
246                ] );
247            }
248            yield $task->getData();
249            $delay->next();
250            usleep( $delay->current() * self::ONE_SEC_IN_MICROSEC );
251            $task->refresh();
252        }
253
254        $now = MWTimestamp::time();
255        $taskCompleteResponse = $task->getData()['response'];
256        if ( $taskCompleteResponse['failures'] ) {
257            $log->error( "$logPrefix failed", $logContext + [
258                'runtime' => $now - $start,
259                'status' => FormatJson::encode( $task->getData() ),
260            ] );
261            throw new \Exception( 'Failed deleteByQuery: '
262                . implode( ', ', $taskCompleteResponse['failures'] ) );
263        }
264
265        $log->info( "$logPrefix completed", $logContext + [
266            'runtime' => $now - $start,
267            'status' => FormatJson::encode( $task->getData() ),
268        ] );
269
270        return $task;
271    }
272
273    /**
274     * Fetch the name of the cluster client is communicating with.
275     *
276     * @param Client $client Elasticsearch client to fetch name for
277     * @return string Name of cluster $client is communicating with
278     */
279    public static function fetchClusterName( Client $client ) {
280        $response = $client->requestEndpoint( new \Elasticsearch\Endpoints\Info );
281        if ( $response->getStatus() !== 200 ) {
282            throw new Exception(
283                "Failed requesting cluster name, got status code [{$response->getStatus()}]" );
284        }
285        return $response->getData()['cluster_name'];
286    }
287
288    /**
289     * @param Client $client
290     * @return bool True when no writes should be sent via $client
291     * @deprecated (always returns false)
292     */
293    public static function isFrozen( Client $client ) {
294        return false;
295    }
296}
297
298class_alias( MWElasticUtils::class, 'MWElasticUtils' );