Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
71.74% |
33 / 46 |
|
66.67% |
6 / 9 |
CRAP | |
0.00% |
0 / 1 |
SearchAfter | |
71.74% |
33 / 46 |
|
66.67% |
6 / 9 |
27.15 | |
0.00% |
0 / 1 |
__construct | |
85.71% |
6 / 7 |
|
0.00% |
0 / 1 |
3.03 | |||
current | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
key | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
next | |
80.00% |
8 / 10 |
|
0.00% |
0 / 1 |
4.13 | |||
initializeSearchAfter | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
runSearch | |
16.67% |
2 / 12 |
|
0.00% |
0 / 1 |
8.21 | |||
rewind | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
2 | |||
valid | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
calcBackoff | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
2 |
1 | <?php |
2 | |
3 | declare( strict_types = 1 ); |
4 | namespace CirrusSearch\Elastica; |
5 | |
6 | use Elastica\Exception\ExceptionInterface as ElasticaExceptionInterface; |
7 | use Elastica\Query; |
8 | use Elastica\ResultSet; |
9 | use Elastica\Search; |
10 | use InvalidArgumentException; |
11 | use MediaWiki\Logger\LoggerFactory; |
12 | use RuntimeException; |
13 | |
14 | class SearchAfter implements \Iterator { |
15 | private const MAX_BACKOFF_SEC = 120; |
16 | private const MICROSEC_PER_SEC = 1_000_000; |
17 | /** @var Search */ |
18 | private $search; |
19 | /** @var Query */ |
20 | private $baseQuery; |
21 | /** @var ?ResultSet */ |
22 | private $currentResultSet; |
23 | /** @var ?int */ |
24 | private $currentPage; |
25 | /** @var float[] Sequence of second length backoffs to use for retries */ |
26 | private $backoff; |
27 | /** @var array Initial value for search_after */ |
28 | private array $initialSearchAfter = []; |
29 | |
30 | /** |
31 | * @param Search $search |
32 | * @param int $numRetries The number of retries to perform on each iteration |
33 | * @param float $backoffFactor Scales the backoff duration, backoff calculated as |
34 | * {backoffFactor} * 2^({retry} - 1) which gives, with no scaling, [0.5, 1, 2, 4, 8, ...] |
35 | */ |
36 | public function __construct( Search $search, int $numRetries = 12, float $backoffFactor = 1. ) { |
37 | $this->search = $search; |
38 | $this->baseQuery = clone $search->getQuery(); |
39 | if ( !$this->baseQuery->hasParam( 'sort' ) ) { |
40 | throw new InvalidArgumentException( 'ScrollAfter query must have a sort' ); |
41 | } |
42 | if ( $numRetries < 0 ) { |
43 | throw new InvalidArgumentException( '$numRetries must be >= 0' ); |
44 | } |
45 | $this->backoff = $this->calcBackoff( $numRetries, $backoffFactor ); |
46 | } |
47 | |
48 | public function current(): ResultSet { |
49 | if ( $this->currentResultSet === null ) { |
50 | throw new RuntimeException( 'Iterator is in an invalid state and must be rewound' ); |
51 | } |
52 | return $this->currentResultSet; |
53 | } |
54 | |
55 | public function key(): int { |
56 | return $this->currentPage ?? 0; |
57 | } |
58 | |
59 | public function next(): void { |
60 | if ( $this->currentResultSet !== null ) { |
61 | if ( count( $this->currentResultSet ) === 0 ) { |
62 | return; |
63 | } |
64 | $lastHit = $this->currentResultSet[count( $this->currentResultSet ) - 1]; |
65 | $this->search->getQuery()->setParam( 'search_after', $lastHit->getSort() ); |
66 | } elseif ( $this->currentPage !== -1 ) { |
67 | // iterator is in failed state |
68 | return; |
69 | } |
70 | // ensure if runSearch throws the iterator becomes invalid |
71 | $this->currentResultSet = null; |
72 | $this->currentResultSet = $this->runSearch(); |
73 | $this->currentPage++; |
74 | } |
75 | |
76 | public function initializeSearchAfter( array $searchAfter ): void { |
77 | $this->initialSearchAfter = $searchAfter; |
78 | } |
79 | |
80 | private function runSearch() { |
81 | foreach ( $this->backoff as $backoffSec ) { |
82 | try { |
83 | return $this->search->search(); |
84 | } catch ( ElasticaExceptionInterface $e ) { |
85 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
86 | "Exception thrown during SearchAfter iteration. Retrying in {backoffSec}s.", |
87 | [ |
88 | 'exception' => $e, |
89 | 'backoffSec' => $backoffSec, |
90 | ] |
91 | ); |
92 | usleep( (int)( $backoffSec * self::MICROSEC_PER_SEC ) ); |
93 | } |
94 | } |
95 | // Final attempt after exhausting retries. |
96 | return $this->search->search(); |
97 | } |
98 | |
99 | public function rewind(): void { |
100 | // Use -1 so that on increment the first page is 0 |
101 | $this->currentPage = -1; |
102 | $this->currentResultSet = null; |
103 | $query = clone $this->baseQuery; |
104 | if ( $this->initialSearchAfter ) { |
105 | $query->setParam( 'search_after', $this->initialSearchAfter ); |
106 | } |
107 | $this->search->setQuery( $query ); |
108 | // rewind performs the first query |
109 | $this->next(); |
110 | } |
111 | |
112 | public function valid(): bool { |
113 | return count( $this->currentResultSet ?? [] ) > 0; |
114 | } |
115 | |
116 | private function calcBackoff( int $maxRetries, float $backoffFactor ): array { |
117 | $backoff = []; |
118 | for ( $retry = 0; $retry < $maxRetries; $retry++ ) { |
119 | $backoff[$retry] = min( $backoffFactor * pow( 2, $retry - 1 ), self::MAX_BACKOFF_SEC ); |
120 | } |
121 | return $backoff; |
122 | } |
123 | } |