Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
11.46% |
11 / 96 |
|
11.11% |
1 / 9 |
CRAP | |
0.00% |
0 / 1 |
MWElasticUtils | |
11.58% |
11 / 95 |
|
11.11% |
1 / 9 |
457.06 | |
0.00% |
0 / 1 |
withRetry | |
90.91% |
10 / 11 |
|
0.00% |
0 / 1 |
5.02 | |||
backoffDelay | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getIndexHealth | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
6 | |||
waitForGreen | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
20 | |||
deleteByQuery | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
increasingDelay | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
deleteByQueryWithStatus | |
0.00% |
0 / 52 |
|
0.00% |
0 / 1 |
42 | |||
fetchClusterName | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
isFrozen | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | namespace MediaWiki\Extension\Elastica; |
4 | |
5 | use Elastica\Client; |
6 | use Elastica\Index; |
7 | use Elastica\Query; |
8 | use Elastica\Task; |
9 | use Elasticsearch\Endpoints\Cluster\Health; |
10 | use Exception; |
11 | use Generator; |
12 | use MediaWiki\Json\FormatJson; |
13 | use MediaWiki\Logger\LoggerFactory; |
14 | use MediaWiki\Utils\MWTimestamp; |
15 | use RuntimeException; |
16 | |
17 | /** |
18 | * This program is free software; you can redistribute it and/or modify |
19 | * it under the terms of the GNU General Public License as published by |
20 | * the Free Software Foundation; either version 2 of the License, or |
21 | * (at your option) any later version. |
22 | * |
23 | * This program is distributed in the hope that it will be useful, |
24 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
25 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
26 | * GNU General Public License for more details. |
27 | * |
28 | * You should have received a copy of the GNU General Public License along |
29 | * with this program; if not, write to the Free Software Foundation, Inc., |
30 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
31 | * |
32 | */ |
33 | |
34 | /** |
35 | * Utility class |
36 | */ |
37 | class MWElasticUtils { |
38 | |
39 | private const ONE_SEC_IN_MICROSEC = 1000000; |
40 | |
41 | /** |
42 | * A function that retries callback $func if it throws an exception. |
43 | * The $beforeRetry is called before a retry and receives the underlying |
44 | * ExceptionInterface object and the number of failed attempts. |
45 | * It's generally used to log and sleep between retries. Default behaviour |
46 | * is to sleep with a random backoff. |
47 | * @see Util::backoffDelay |
48 | * |
49 | * @param int $attempts the number of times we retry |
50 | * @param callable $func |
51 | * @param callable|null $beforeRetry function called before each retry |
52 | * @return mixed |
53 | */ |
54 | public static function withRetry( $attempts, $func, $beforeRetry = null ) { |
55 | $errors = 0; |
56 | while ( true ) { |
57 | if ( $errors < $attempts ) { |
58 | try { |
59 | return $func(); |
60 | } catch ( Exception $e ) { |
61 | $errors++; |
62 | if ( $beforeRetry ) { |
63 | $beforeRetry( $e, $errors ); |
64 | } else { |
65 | $seconds = static::backoffDelay( $errors ); |
66 | usleep( (int)( $seconds * self::ONE_SEC_IN_MICROSEC ) ); |
67 | } |
68 | } |
69 | } else { |
70 | return $func(); |
71 | } |
72 | } |
73 | } |
74 | |
75 | /** |
76 | * Backoff with lowest possible upper bound as 16 seconds. |
77 | * With the default maximum number of errors (5) this maxes out at 256 seconds. |
78 | * |
79 | * @param int $errorCount |
80 | * @return int |
81 | */ |
82 | public static function backoffDelay( $errorCount ) { |
83 | return rand( 1, (int)pow( 2, 3 + $errorCount ) ); |
84 | } |
85 | |
86 | /** |
87 | * Get index health |
88 | * |
89 | * @param Client $client |
90 | * @param string $indexName |
91 | * @return array the index health status |
92 | */ |
93 | public static function getIndexHealth( Client $client, $indexName ) { |
94 | $endpoint = new Health; |
95 | $endpoint->setIndex( $indexName ); |
96 | $response = $client->requestEndpoint( $endpoint ); |
97 | if ( $response->hasError() ) { |
98 | throw new RuntimeException( "Error while fetching index health status: " . $response->getError() ); |
99 | } |
100 | return $response->getData(); |
101 | } |
102 | |
103 | /** |
104 | * Wait for the index to go green |
105 | * |
106 | * @param Client $client |
107 | * @param string $indexName Name of index to wait for |
108 | * @param int $timeout In seconds |
109 | * @return Generator|string[]|bool Returns a generator. Generator yields |
110 | * string status messages. Generator return value is true if the index is |
111 | * green false otherwise. |
112 | */ |
113 | public static function waitForGreen( Client $client, $indexName, $timeout ) { |
114 | $startTime = time(); |
115 | while ( ( $startTime + $timeout ) > time() ) { |
116 | try { |
117 | $response = self::getIndexHealth( $client, $indexName ); |
118 | $status = $response['status'] ?? 'unknown'; |
119 | if ( $status === 'green' ) { |
120 | yield "\tGreen!"; |
121 | return true; |
122 | } |
123 | yield "\tIndex is $status retrying..."; |
124 | sleep( 5 ); |
125 | } catch ( \Exception $e ) { |
126 | yield "Error while waiting for green ({$e->getMessage()}), retrying..."; |
127 | } |
128 | } |
129 | return false; |
130 | } |
131 | |
132 | /** |
133 | * Delete docs by query and wait for it to complete via tasks api. |
134 | * |
135 | * @param Index $index the source index |
136 | * @param Query $query the query |
137 | * @param bool $allowConflicts When true documents updated since starting |
138 | * the query will not be deleted, and will not fail the delete-by-query. When |
139 | * false (default) the updated document will not be deleted and the delete-by-query |
140 | * will abort. Deletes are not transactional, some subset of matching documents |
141 | * will have been deleted. |
142 | * @param int $reportEveryNumSec Log task status on this interval of seconds |
143 | * @return Task Generator returns the Task instance on completion. |
144 | */ |
145 | public static function deleteByQuery( |
146 | Index $index, |
147 | Query $query, |
148 | $allowConflicts = false, |
149 | $reportEveryNumSec = 300 |
150 | ) { |
151 | $gen = self::deleteByQueryWithStatus( $index, $query, $allowConflicts, $reportEveryNumSec ); |
152 | // @phan-suppress-next-line PhanTypeNoAccessiblePropertiesForeach always a generator object |
153 | foreach ( $gen as $status ) { |
154 | // We don't need these status updates. But we need to iterate |
155 | // the generator until it is done. |
156 | } |
157 | return $gen->getReturn(); |
158 | } |
159 | |
160 | /** |
161 | * @param float $minDelay Starting value of generator |
162 | * @param float $maxDelay Maximum value to return |
163 | * @param float $increaseByRatio Increase by this ratio on each iteration, up to $maxDelay |
164 | * @return Generator|float[] Returns a generator. Generator yields floats between |
165 | * $minDelay and $maxDelay |
166 | * @suppress PhanInfiniteLoop |
167 | */ |
168 | private static function increasingDelay( $minDelay, $maxDelay, $increaseByRatio = 1.5 ) { |
169 | $delay = $minDelay; |
170 | while ( true ) { |
171 | yield $delay; |
172 | $delay = min( $delay * $increaseByRatio, $maxDelay ); |
173 | } |
174 | } |
175 | |
176 | /** |
177 | * Delete docs by query and wait for it to complete via tasks api. This |
178 | * method returns a generator which must be iterated on at least once |
179 | * or the deletion will not occur. |
180 | * |
181 | * Client code that doesn't care about the result or when the deleteByQuery |
182 | * completes are safe to call next( $gen ) a single time to start the deletion, |
183 | * and then throw away the generator. Note that logging about how long the task |
184 | * has been running will not be logged if the generator is not iterated. |
185 | * |
186 | * @param Index $index the source index |
187 | * @param Query $query the query |
188 | * @param bool $allowConflicts When true documents updated since starting |
189 | * the query will not be deleted, and will not fail the delete-by-query. When |
190 | * false (default) the updated document will not be deleted and the delete-by-query |
191 | * will abort. Deletes are not transactional, some subset of matching documents |
192 | * will have been deleted. |
193 | * @param int $reportEveryNumSec Log task status on this interval of seconds |
194 | * @return \Generator|array[]|\Elastica\Task Returns a generator. Generator yields |
195 | * arrays containing task status responses. Generator returns the Task instance |
196 | * on completion via Generator::getReturn. |
197 | */ |
198 | public static function deleteByQueryWithStatus( |
199 | Index $index, |
200 | Query $query, |
201 | $allowConflicts = false, |
202 | $reportEveryNumSec = 300 |
203 | ) { |
204 | $params = [ |
205 | 'wait_for_completion' => 'false', |
206 | 'scroll' => '15m', |
207 | ]; |
208 | if ( $allowConflicts ) { |
209 | $params['conflicts'] = 'proceed'; |
210 | } |
211 | $response = $index->deleteByQuery( $query, $params )->getData(); |
212 | if ( !isset( $response['task'] ) ) { |
213 | throw new RuntimeException( 'No task returned: ' . var_export( $response, true ) ); |
214 | } |
215 | $log = LoggerFactory::getInstance( 'Elastica' ); |
216 | $clusterName = self::fetchClusterName( $index->getClient() ); |
217 | $logContext = [ |
218 | 'index' => $index->getName(), |
219 | 'cluster' => $clusterName, |
220 | 'taskId' => $response['task'], |
221 | ]; |
222 | $logPrefix = 'deleteByQuery against [{index}] on cluster [{cluster}] with task id [{taskId}]'; |
223 | $log->info( "$logPrefix starting", $logContext + [ |
224 | 'elastic_query' => FormatJson::encode( $query->toArray() ) |
225 | ] ); |
226 | |
227 | // Log tasks running longer than 10 minutes to help track down job runner |
228 | // timeouts that occur after 20 minutes. T219234 |
229 | $start = MWTimestamp::time(); |
230 | $reportAfter = $start + $reportEveryNumSec; |
231 | $task = new \Elastica\Task( |
232 | $index->getClient(), |
233 | $response['task'] ); |
234 | $delay = self::increasingDelay( 0.05, 5 ); |
235 | while ( !$task->isCompleted() ) { |
236 | $now = MWTimestamp::time(); |
237 | if ( $now >= $reportAfter ) { |
238 | $reportAfter = $now + $reportEveryNumSec; |
239 | $log->warning( "$logPrefix still running after [{runtime}] seconds", $logContext + [ |
240 | 'runtime' => $now - $start, |
241 | // json encode to ensure we don't add a bunch of properties in |
242 | // logstash, we only really need the content and this will still be |
243 | // searchable. |
244 | 'status' => FormatJson::encode( $task->getData() ), |
245 | ] ); |
246 | } |
247 | yield $task->getData(); |
248 | $delay->next(); |
249 | usleep( (int)( $delay->current() * self::ONE_SEC_IN_MICROSEC ) ); |
250 | $task->refresh(); |
251 | } |
252 | |
253 | $now = MWTimestamp::time(); |
254 | $taskCompleteResponse = $task->getData()['response']; |
255 | if ( $taskCompleteResponse['failures'] ) { |
256 | $log->error( "$logPrefix failed", $logContext + [ |
257 | 'runtime' => $now - $start, |
258 | 'status' => FormatJson::encode( $task->getData() ), |
259 | ] ); |
260 | throw new RuntimeException( 'Failed deleteByQuery: ' |
261 | . implode( ', ', $taskCompleteResponse['failures'] ) ); |
262 | } |
263 | |
264 | $log->info( "$logPrefix completed", $logContext + [ |
265 | 'runtime' => $now - $start, |
266 | 'status' => FormatJson::encode( $task->getData() ), |
267 | ] ); |
268 | |
269 | return $task; |
270 | } |
271 | |
272 | /** |
273 | * Fetch the name of the cluster client is communicating with. |
274 | * |
275 | * @param Client $client Elasticsearch client to fetch name for |
276 | * @return string Name of cluster $client is communicating with |
277 | */ |
278 | public static function fetchClusterName( Client $client ) { |
279 | $response = $client->requestEndpoint( new \Elasticsearch\Endpoints\Info ); |
280 | if ( $response->getStatus() !== 200 ) { |
281 | throw new RuntimeException( |
282 | "Failed requesting cluster name, got status code [{$response->getStatus()}]" ); |
283 | } |
284 | return $response->getData()['cluster_name']; |
285 | } |
286 | |
287 | /** |
288 | * @param Client $client |
289 | * @return bool True when no writes should be sent via $client |
290 | * @deprecated (always returns false) |
291 | */ |
292 | public static function isFrozen( Client $client ) { |
293 | return false; |
294 | } |
295 | } |
296 | |
297 | class_alias( MWElasticUtils::class, 'MWElasticUtils' ); |