Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
31.82% covered (danger)
31.82%
28 / 88
22.22% covered (danger)
22.22%
2 / 9
CRAP
0.00% covered (danger)
0.00%
0 / 1
TopKIndex
31.82% covered (danger)
31.82%
28 / 88
22.22% covered (danger)
22.22%
2 / 9
573.81
0.00% covered (danger)
0.00%
0 / 1
 __construct
88.24% covered (warning)
88.24%
15 / 17
0.00% covered (danger)
0.00%
0 / 1
4.03
 canAnswer
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
72
 getLimit
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 filterResults
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 getOffsetLimit
30.77% covered (danger)
30.77%
8 / 26
0.00% covered (danger)
0.00%
0 / 1
69.08
 getOffsetFromOffsetValue
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
30
 compareRowToOffsetValue
0.00% covered (danger)
0.00%
0 / 15
0.00% covered (danger)
0.00%
0 / 1
30
 removeFromIndex
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 queryOptions
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2
3namespace Flow\Data\Index;
4
5use Flow\Data\Compactor\ShallowCompactor;
6use Flow\Data\FlowObjectCache;
7use Flow\Data\ObjectManager;
8use Flow\Data\ObjectMapper;
9use Flow\Data\ObjectStorage;
10use Flow\Exception\DataModelException;
11use Flow\Exception\InvalidParameterException;
12
13/**
14 * Holds the top k items with matching $indexed columns.  List is sorted and truncated to specified size.
15 */
16class TopKIndex extends FeatureIndex {
17    /**
18     * @var array
19     */
20    protected $options = [];
21
22    public function __construct(
23        FlowObjectCache $cache,
24        ObjectStorage $storage,
25        ObjectMapper $mapper,
26        $prefix,
27        array $indexed,
28        array $options = []
29    ) {
30        if ( empty( $options['sort'] ) ) {
31            throw new InvalidParameterException( 'TopKIndex must be sorted' );
32        }
33
34        parent::__construct( $cache, $storage, $mapper, $prefix, $indexed );
35
36        $this->options = $options + [
37            'limit' => 500,
38            'order' => 'DESC',
39            'create' => static function () {
40                return false;
41            },
42            'shallow' => null,
43        ];
44        $this->options['order'] = strtoupper( $this->options['order'] );
45
46        if ( !is_array( $this->options['sort'] ) ) {
47            $this->options['sort'] = [ $this->options['sort'] ];
48        }
49        if ( $this->options['shallow'] ) {
50            // TODO: perhaps we shouldn't even get a shallow option, just receive a proper compactor in
51            // FeatureIndex::__construct
52            $this->rowCompactor = new ShallowCompactor(
53                $this->rowCompactor, $this->options['shallow'], $this->options['sort'] );
54        }
55    }
56
57    public function canAnswer( array $keys, array $options ) {
58        if ( !parent::canAnswer( $keys, $options ) ) {
59            return false;
60        }
61
62        if ( isset( $options['offset-id'] ) ||
63            ( isset( $options['offset-dir'] ) && $options['offset-dir'] !== 'fwd' )
64        ) {
65            return false;
66        }
67
68        if ( isset( $options['sort'] ) && isset( $options['order'] ) ) {
69            return ObjectManager::makeArray( $options['sort'] ) === $this->options['sort']
70                && strtoupper( $options['order'] ) === $this->options['order'];
71        }
72        return true;
73    }
74
75    public function getLimit() {
76        return $this->options['limit'];
77    }
78
79    /**
80     * @param array[] $results
81     * @param array $options
82     *
83     * @return array[]
84     */
85    protected function filterResults( array $results, array $options = [] ) {
86        foreach ( $results as $i => $result ) {
87            [ $offset, $limit ] = $this->getOffsetLimit( $result, $options );
88            $results[$i] = array_slice( $result, $offset, $limit, true );
89        }
90
91        return $results;
92    }
93
94    // TODO: This is only left for now to handle non-ID offsets (e.g. updated
95    // timestamps).
96    // This has always been broken once you query past the TopKIndex limit.
97
98    /**
99     * @param array $rows
100     * @param array $options
101     * @return array [offset, limit] 0-based index to start with and limit.
102     */
103    protected function getOffsetLimit( array $rows, array $options ) {
104        $limit = $options['limit'] ?? $this->getLimit();
105
106        $offsetValue = $options['offset-value'] ?? null;
107
108        $dir = 'fwd';
109        if (
110            isset( $options['offset-dir'] ) &&
111            $options['offset-dir'] === 'rev'
112        ) {
113            $dir = 'rev';
114        }
115
116        if ( $offsetValue === null ) {
117            $offset = $dir === 'fwd' ? 0 : count( $rows ) - $limit;
118            return [ $offset, $limit ];
119        }
120
121        $offset = $this->getOffsetFromOffsetValue( $rows, $offsetValue );
122        $includeOffset = isset( $options['include-offset'] ) && $options['include-offset'];
123        if ( $dir === 'fwd' ) {
124            if ( $includeOffset ) {
125                $startPos = $offset;
126            } else {
127                $startPos = $offset + 1;
128            }
129        } elseif ( $dir === 'rev' ) {
130            $startPos = $offset - $limit;
131            if ( $includeOffset ) {
132                $startPos++;
133            }
134
135            if ( $startPos < 0 ) {
136                if (
137                    isset( $options['offset-elastic'] ) &&
138                    $options['offset-elastic'] === false
139                ) {
140                    // If non-elastic, then reduce the number of items shown commensurately
141                    $limit += $startPos;
142                }
143                $startPos = 0;
144            }
145        } else {
146            $startPos = 0;
147        }
148
149        return [ $startPos, $limit ];
150    }
151
152    /**
153     * Returns the 0-indexed position of $offsetValue within $rows or throws a
154     * DataModelException if $offsetValue is not contained within $rows
155     *
156     * @todo seems wasteful to pass string offsetValue instead of exploding when it comes in
157     * @param array $rows Current bucket contents
158     * @param string $offsetValue
159     * @return int The position of $offsetValue within $rows
160     * @throws DataModelException When $offsetValue is not found within $rows
161     */
162    protected function getOffsetFromOffsetValue( array $rows, $offsetValue ) {
163        $rowIndex = 0;
164        $nextInOrder = $this->getOrder() === 'DESC' ? -1 : 1;
165        foreach ( $rows as $row ) {
166            $comparisonValue = $this->compareRowToOffsetValue( $row, $offsetValue );
167            if ( $comparisonValue === 0 || $comparisonValue === $nextInOrder ) {
168                return $rowIndex;
169            }
170            $rowIndex++;
171        }
172
173        throw new DataModelException( 'Unable to find specified offset in query results', 'process-data' );
174    }
175
176    /**
177     * @param array $row Row to compare to
178     * @param string $offsetValue Value to compare to.  For instance, a timestamp if we
179     *  want all rows before/after that timestamp.  This consists of values for each field
180     *  we sort by, delimited by |.
181     *
182     * @return int An integer less than, equal to, or greater than zero
183     *  if $row is considered to be respectively less than, equal to, or
184     *  greater than $offsetValue
185     *
186     * @throws DataModelException When the index does not support offset values due to
187     *  having an undefined sort order.
188     */
189    public function compareRowToOffsetValue( array $row, $offsetValue ) {
190        $sortFields = $this->getSort();
191        $splitOffsetValue = explode( '|', $offsetValue );
192        $fieldIndex = 0;
193
194        if ( $sortFields === false ) {
195            throw new DataModelException( 'This Index implementation does not support offset values',
196                'process-data' );
197        }
198
199        foreach ( $sortFields as $field ) {
200            $valueInRow = $row[$field];
201            $offsetValuePart = $splitOffsetValue[$fieldIndex];
202
203            if ( $valueInRow > $offsetValuePart ) {
204                return 1;
205            } elseif ( $valueInRow < $offsetValuePart ) {
206                return -1;
207            }
208            ++$fieldIndex;
209        }
210
211        return 0;
212    }
213
214    protected function removeFromIndex( array $indexed, array $row ) {
215        $this->cache->delete( $this->cacheKey( $indexed ) );
216    }
217
218    /**
219     * INTERNAL: in 5.4 it can be protected.
220     *
221     * @return array
222     */
223    public function queryOptions() {
224        $options = [ 'LIMIT' => $this->options['limit'] ];
225
226        $orderBy = [];
227        $order = $this->options['order'];
228        // @phan-suppress-next-line PhanTypeNoPropertiesForeach
229        foreach ( $this->options['sort'] as $key ) {
230            $orderBy[] = "$key $order";
231        }
232        $options['ORDER BY'] = $orderBy;
233
234        return $options;
235    }
236}