Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
28.57% covered (danger)
28.57%
28 / 98
16.67% covered (danger)
16.67%
2 / 12
CRAP
0.00% covered (danger)
0.00%
0 / 1
TopKIndex
28.57% covered (danger)
28.57%
28 / 98
16.67% covered (danger)
16.67%
2 / 12
782.97
0.00% covered (danger)
0.00%
0 / 1
 __construct
88.24% covered (warning)
88.24%
15 / 17
0.00% covered (danger)
0.00%
0 / 1
4.03
 canAnswer
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
72
 getLimit
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 filterResults
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 getOffsetLimit
30.77% covered (danger)
30.77%
8 / 26
0.00% covered (danger)
0.00%
0 / 1
69.08
 getOffsetFromOffsetValue
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
30
 compareRowToOffsetValue
0.00% covered (danger)
0.00%
0 / 15
0.00% covered (danger)
0.00%
0 / 1
30
 removeFromIndex
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 normalizeCompressed
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 sortIndex
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 limitIndexSize
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 queryOptions
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2
3namespace Flow\Data\Index;
4
5use Flow\Data\Compactor\ShallowCompactor;
6use Flow\Data\FlowObjectCache;
7use Flow\Data\ObjectManager;
8use Flow\Data\ObjectMapper;
9use Flow\Data\ObjectStorage;
10use Flow\Data\Utils\SortArrayByKeys;
11use Flow\Exception\DataModelException;
12use Flow\Exception\InvalidParameterException;
13
14/**
15 * Holds the top k items with matching $indexed columns.  List is sorted and truncated to specified size.
16 */
17class TopKIndex extends FeatureIndex {
18    /**
19     * @var array
20     */
21    protected $options = [];
22
23    public function __construct(
24        FlowObjectCache $cache,
25        ObjectStorage $storage,
26        ObjectMapper $mapper,
27        $prefix,
28        array $indexed,
29        array $options = []
30    ) {
31        if ( empty( $options['sort'] ) ) {
32            throw new InvalidParameterException( 'TopKIndex must be sorted' );
33        }
34
35        parent::__construct( $cache, $storage, $mapper, $prefix, $indexed );
36
37        $this->options = $options + [
38            'limit' => 500,
39            'order' => 'DESC',
40            'create' => static function () {
41                return false;
42            },
43            'shallow' => null,
44        ];
45        $this->options['order'] = strtoupper( $this->options['order'] );
46
47        if ( !is_array( $this->options['sort'] ) ) {
48            $this->options['sort'] = [ $this->options['sort'] ];
49        }
50        if ( $this->options['shallow'] ) {
51            // TODO: perhaps we shouldn't even get a shallow option, just receive a proper compactor in
52            // FeatureIndex::__construct
53            $this->rowCompactor = new ShallowCompactor(
54                $this->rowCompactor, $this->options['shallow'], $this->options['sort'] );
55        }
56    }
57
58    public function canAnswer( array $keys, array $options ) {
59        if ( !parent::canAnswer( $keys, $options ) ) {
60            return false;
61        }
62
63        if ( isset( $options['offset-id'] ) ||
64            ( isset( $options['offset-dir'] ) && $options['offset-dir'] !== 'fwd' )
65        ) {
66            return false;
67        }
68
69        if ( isset( $options['sort'] ) && isset( $options['order'] ) ) {
70            return ObjectManager::makeArray( $options['sort'] ) === $this->options['sort']
71                && strtoupper( $options['order'] ) === $this->options['order'];
72        }
73        return true;
74    }
75
76    public function getLimit() {
77        return $this->options['limit'];
78    }
79
80    /**
81     * @param array[] $results
82     * @param array $options
83     *
84     * @return array[]
85     */
86    protected function filterResults( array $results, array $options = [] ) {
87        foreach ( $results as $i => $result ) {
88            [ $offset, $limit ] = $this->getOffsetLimit( $result, $options );
89            $results[$i] = array_slice( $result, $offset, $limit, true );
90        }
91
92        return $results;
93    }
94
95    // TODO: This is only left for now to handle non-ID offsets (e.g. updated
96    // timestamps).
97    // This has always been broken once you query past the TopKIndex limit.
98
99    /**
100     * @param array $rows
101     * @param array $options
102     * @return array [offset, limit] 0-based index to start with and limit.
103     */
104    protected function getOffsetLimit( array $rows, array $options ) {
105        $limit = $options['limit'] ?? $this->getLimit();
106
107        $offsetValue = $options['offset-value'] ?? null;
108
109        $dir = 'fwd';
110        if (
111            isset( $options['offset-dir'] ) &&
112            $options['offset-dir'] === 'rev'
113        ) {
114            $dir = 'rev';
115        }
116
117        if ( $offsetValue === null ) {
118            $offset = $dir === 'fwd' ? 0 : count( $rows ) - $limit;
119            return [ $offset, $limit ];
120        }
121
122        $offset = $this->getOffsetFromOffsetValue( $rows, $offsetValue );
123        $includeOffset = isset( $options['include-offset'] ) && $options['include-offset'];
124        if ( $dir === 'fwd' ) {
125            if ( $includeOffset ) {
126                $startPos = $offset;
127            } else {
128                $startPos = $offset + 1;
129            }
130        } elseif ( $dir === 'rev' ) {
131            $startPos = $offset - $limit;
132            if ( $includeOffset ) {
133                $startPos++;
134            }
135
136            if ( $startPos < 0 ) {
137                if (
138                    isset( $options['offset-elastic'] ) &&
139                    $options['offset-elastic'] === false
140                ) {
141                    // If non-elastic, then reduce the number of items shown commensurately
142                    $limit += $startPos;
143                }
144                $startPos = 0;
145            }
146        } else {
147            $startPos = 0;
148        }
149
150        return [ $startPos, $limit ];
151    }
152
153    /**
154     * Returns the 0-indexed position of $offsetValue within $rows or throws a
155     * DataModelException if $offsetValue is not contained within $rows
156     *
157     * @todo seems wasteful to pass string offsetValue instead of exploding when it comes in
158     * @param array $rows Current bucket contents
159     * @param string $offsetValue
160     * @return int The position of $offsetValue within $rows
161     * @throws DataModelException When $offsetValue is not found within $rows
162     */
163    protected function getOffsetFromOffsetValue( array $rows, $offsetValue ) {
164        $rowIndex = 0;
165        $nextInOrder = $this->getOrder() === 'DESC' ? -1 : 1;
166        foreach ( $rows as $row ) {
167            $comparisonValue = $this->compareRowToOffsetValue( $row, $offsetValue );
168            if ( $comparisonValue === 0 || $comparisonValue === $nextInOrder ) {
169                return $rowIndex;
170            }
171            $rowIndex++;
172        }
173
174        throw new DataModelException( 'Unable to find specified offset in query results', 'process-data' );
175    }
176
177    /**
178     * @param array $row Row to compare to
179     * @param string $offsetValue Value to compare to.  For instance, a timestamp if we
180     *  want all rows before/after that timestamp.  This consists of values for each field
181     *  we sort by, delimited by |.
182     *
183     * @return int An integer less than, equal to, or greater than zero
184     *  if $row is considered to be respectively less than, equal to, or
185     *  greater than $offsetValue
186     *
187     * @throws DataModelException When the index does not support offset values due to
188     *  having an undefined sort order.
189     */
190    public function compareRowToOffsetValue( array $row, $offsetValue ) {
191        $sortFields = $this->getSort();
192        $splitOffsetValue = explode( '|', $offsetValue );
193        $fieldIndex = 0;
194
195        if ( $sortFields === false ) {
196            throw new DataModelException( 'This Index implementation does not support offset values',
197                'process-data' );
198        }
199
200        foreach ( $sortFields as $field ) {
201            $valueInRow = $row[$field];
202            $offsetValuePart = $splitOffsetValue[$fieldIndex];
203
204            if ( $valueInRow > $offsetValuePart ) {
205                return 1;
206            } elseif ( $valueInRow < $offsetValuePart ) {
207                return -1;
208            }
209            ++$fieldIndex;
210        }
211
212        return 0;
213    }
214
215    protected function removeFromIndex( array $indexed, array $row ) {
216        $this->cache->delete( $this->cacheKey( $indexed ) );
217    }
218
219    /**
220     * In order to be able to reliably find a row in an array of
221     * cached rows, we need to normalize those to make sure the
222     * columns match: they may be outdated.
223     *
224     * INTERNAL: in 5.4 it can be protected.
225     *
226     * @param array $row Array in [column => value] format
227     * @param array $schema Array of column names to be present in $row
228     * @return array
229     */
230    public function normalizeCompressed( array $row, array $schema ) {
231        $schema = array_fill_keys( $schema, null );
232
233        // add null value for columns currently in cache
234        $row = array_merge( $schema, $row );
235
236        // remove unknown columns from the row
237        $row = array_intersect_key( $row, $schema );
238
239        return $row;
240    }
241
242    /**
243     * INTERNAL: in 5.4 it can be protected.
244     *
245     * @param array $values
246     * @return array
247     */
248    public function sortIndex( array $values ) {
249        // I don't think this is a valid way to sort a 128bit integer string
250        $callback = new SortArrayByKeys( $this->options['sort'], true );
251        /** @noinspection PhpParamsInspection */
252        usort( $values, $callback );
253        if ( $this->options['order'] === 'DESC' ) {
254            $values = array_reverse( $values );
255        }
256        return $values;
257    }
258
259    /**
260     * INTERNAL: in 5.4 it can be protected.
261     *
262     * @param array $values
263     * @return array
264     */
265    public function limitIndexSize( array $values ) {
266        return array_slice( $values, 0, $this->options['limit'] );
267    }
268
269    /**
270     * INTERNAL: in 5.4 it can be protected.
271     *
272     * @return array
273     */
274    public function queryOptions() {
275        $options = [ 'LIMIT' => $this->options['limit'] ];
276
277        $orderBy = [];
278        $order = $this->options['order'];
279        // @phan-suppress-next-line PhanTypeNoPropertiesForeach
280        foreach ( $this->options['sort'] as $key ) {
281            $orderBy[] = "$key $order";
282        }
283        $options['ORDER BY'] = $orderBy;
284
285        return $options;
286    }
287}