Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
87.86% covered (warning)
87.86%
123 / 140
69.23% covered (warning)
69.23%
9 / 13
CRAP
0.00% covered (danger)
0.00%
0 / 1
NameTableStore
87.86% covered (warning)
87.86%
123 / 140
69.23% covered (warning)
69.23%
9 / 13
31.61
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
10 / 10
100.00% covered (success)
100.00%
1 / 1
1
 getDBConnection
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getCacheKey
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
1
 normalizeName
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 acquireId
66.67% covered (warning)
66.67%
10 / 15
0.00% covered (danger)
0.00%
0 / 1
3.33
 reloadMap
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 getId
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
2
 getName
77.42% covered (warning)
77.42%
24 / 31
0.00% covered (danger)
0.00%
0 / 1
8.74
 getMap
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getTableFromCachesOrReplica
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
2
 loadTable
100.00% covered (success)
100.00%
12 / 12
100.00% covered (success)
100.00%
1 / 1
2
 store
86.67% covered (warning)
86.67%
26 / 30
0.00% covered (danger)
0.00%
0 / 1
3.02
 getFieldsToStore
85.71% covered (warning)
85.71%
6 / 7
0.00% covered (danger)
0.00%
0 / 1
3.03
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20
21namespace MediaWiki\Storage;
22
23use Psr\Log\LoggerInterface;
24use Wikimedia\Assert\Assert;
25use Wikimedia\LightweightObjectStore\ExpirationAwareness;
26use Wikimedia\ObjectCache\WANObjectCache;
27use Wikimedia\Rdbms\Database;
28use Wikimedia\Rdbms\IDatabase;
29use Wikimedia\Rdbms\ILoadBalancer;
30use Wikimedia\Rdbms\IReadableDatabase;
31
32/**
33 * @since 1.31
34 * @author Addshore
35 */
36class NameTableStore {
37
38    /** @var ILoadBalancer */
39    private $loadBalancer;
40
41    /** @var WANObjectCache */
42    private $cache;
43
44    /** @var LoggerInterface */
45    private $logger;
46
47    /** @var array<int,string>|null */
48    private $tableCache = null;
49
50    /** @var bool|string */
51    private $domain;
52
53    /** @var int */
54    private $cacheTTL;
55
56    /** @var string */
57    private $table;
58    /** @var string */
59    private $idField;
60    /** @var string */
61    private $nameField;
62    /** @var null|callable */
63    private $normalizationCallback;
64    /** @var null|callable */
65    private $insertCallback;
66
67    /**
68     * @param ILoadBalancer $dbLoadBalancer A load balancer for acquiring database connections
69     * @param WANObjectCache $cache A cache manager for caching data. This can be the local
70     *        wiki's default instance even if $dbDomain refers to a different wiki, since
71     *        makeGlobalKey() is used to constructed a key that allows cached names from
72     *        the same database to be re-used between wikis. For example, enwiki and frwiki will
73     *        use the same cache keys for names from the wikidatawiki database, regardless
74     *        of the cache's default key space.
75     * @param LoggerInterface $logger
76     * @param string $table
77     * @param string $idField
78     * @param string $nameField
79     * @param callable|null $normalizationCallback Normalization to be applied to names before being
80     * saved or queried. This should be a callback that accepts and returns a single string.
81     * @param bool|string $dbDomain Database domain ID. Use false for the local database domain.
82     * @param callable|null $insertCallback Callback to change insert fields accordingly.
83     * This parameter was introduced in 1.32
84     */
85    public function __construct(
86        ILoadBalancer $dbLoadBalancer,
87        WANObjectCache $cache,
88        LoggerInterface $logger,
89        $table,
90        $idField,
91        $nameField,
92        ?callable $normalizationCallback = null,
93        $dbDomain = false,
94        ?callable $insertCallback = null
95    ) {
96        $this->loadBalancer = $dbLoadBalancer;
97        $this->cache = $cache;
98        $this->logger = $logger;
99        $this->table = $table;
100        $this->idField = $idField;
101        $this->nameField = $nameField;
102        $this->normalizationCallback = $normalizationCallback;
103        $this->domain = $dbDomain;
104        $this->cacheTTL = ExpirationAwareness::TTL_MONTH;
105        $this->insertCallback = $insertCallback;
106    }
107
108    /**
109     * @param int $index A database index, like DB_PRIMARY or DB_REPLICA
110     * @param int $flags Database connection flags
111     * @return IDatabase
112     */
113    private function getDBConnection( $index, $flags = 0 ) {
114        return $this->loadBalancer->getConnection( $index, [], $this->domain, $flags );
115    }
116
117    /**
118     * Gets the cache key for names.
119     *
120     * The cache key is constructed based on the wiki ID passed to the constructor, and allows
121     * sharing of name tables cached for a specific database between wikis.
122     *
123     * @return string
124     */
125    private function getCacheKey() {
126        return $this->cache->makeGlobalKey(
127            'NameTableSqlStore',
128            $this->table,
129            $this->loadBalancer->resolveDomainID( $this->domain )
130        );
131    }
132
133    /**
134     * @param string $name
135     * @return string
136     */
137    private function normalizeName( $name ) {
138        if ( $this->normalizationCallback === null ) {
139            return $name;
140        }
141        return call_user_func( $this->normalizationCallback, $name );
142    }
143
144    /**
145     * Acquire the id of the given name.
146     * This creates a row in the table if it doesn't already exist.
147     *
148     * @note If called within an atomic section, there is a chance for the acquired ID to be
149     * lost on rollback. There is no guarantee that an ID returned by this method is valid
150     * outside the transaction in which it was produced. This means that calling code should
151     * not retain the return value beyond the scope of a transaction, but rather call acquireId()
152     * again after the transaction is complete. In some rare cases, this may produce an ID
153     * different from the first call.
154     *
155     * @param string $name
156     * @throws NameTableAccessException
157     * @return int
158     */
159    public function acquireId( string $name ) {
160        $name = $this->normalizeName( $name );
161
162        $table = $this->getTableFromCachesOrReplica();
163        $searchResult = array_search( $name, $table, true );
164        if ( $searchResult === false ) {
165            $id = $this->store( $name );
166
167            if ( isset( $table[$id] ) ) {
168                // This can happen when a name is assigned an ID within a transaction due to
169                // CONN_TRX_AUTOCOMMIT being unable to use a separate connection (e.g. SQLite).
170                // The right thing to do in this case is to discard the old value. According to
171                // the contract of acquireId, the caller should not have used it outside the
172                // transaction, so it should not be persisted anywhere after the rollback.
173                $m = "Got ID $id for '$name' from insert"
174                    . " into '{$this->table}', but ID $id was previously associated with"
175                    . " the name '{$table[$id]}'. Overriding the old value, which presumably"
176                    . " has been removed from the database due to a transaction rollback.";
177                $this->logger->warning( $m );
178            }
179
180            $table[$id] = $name;
181            $searchResult = $id;
182
183            $this->tableCache = $table;
184        }
185
186        return $searchResult;
187    }
188
189    /**
190     * Reloads the name table from the primary database, and purges the WAN cache entry.
191     *
192     * @note This should only be called in situations where the local cache has been detected
193     * to be out of sync with the database. There should be no reason to call this method
194     * from outside the NameTableStore during normal operation. This method may however be
195     * useful in unit tests.
196     *
197     * @param int $connFlags ILoadBalancer::CONN_XXX flags. Optional.
198     *
199     * @return string[] The freshly reloaded name map
200     */
201    public function reloadMap( $connFlags = 0 ) {
202        $dbw = $this->getDBConnection( DB_PRIMARY, $connFlags );
203        $this->tableCache = $this->loadTable( $dbw );
204        $dbw->onTransactionPreCommitOrIdle( function () {
205            $this->cache->delete( $this->getCacheKey() );
206        }, __METHOD__ );
207
208        return $this->tableCache;
209    }
210
211    /**
212     * Get the id of the given name.
213     * If the name doesn't exist this will throw.
214     * This should be used in cases where we believe the name already exists or want to check for
215     * existence.
216     *
217     * @param string $name
218     * @throws NameTableAccessException The name does not exist
219     * @return int Id
220     */
221    public function getId( string $name ) {
222        $name = $this->normalizeName( $name );
223
224        $table = $this->getTableFromCachesOrReplica();
225        $searchResult = array_search( $name, $table, true );
226
227        if ( $searchResult !== false ) {
228            return $searchResult;
229        }
230
231        throw NameTableAccessException::newFromDetails( $this->table, 'name', $name );
232    }
233
234    /**
235     * Get the name of the given id.
236     * If the id doesn't exist this will throw.
237     * This should be used in cases where we believe the id already exists.
238     *
239     * Note: Calls to this method will result in a primary DB select for non existing IDs.
240     *
241     * @param int $id
242     * @throws NameTableAccessException The id does not exist
243     * @return string name
244     */
245    public function getName( int $id ) {
246        $table = $this->getTableFromCachesOrReplica();
247        if ( array_key_exists( $id, $table ) ) {
248            return $table[$id];
249        }
250        $fname = __METHOD__;
251
252        $table = $this->cache->getWithSetCallback(
253            $this->getCacheKey(),
254            $this->cacheTTL,
255            function ( $oldValue, &$ttl, &$setOpts ) use ( $id, $fname ) {
256                // Check if cached value is up-to-date enough to have $id
257                if ( is_array( $oldValue ) && array_key_exists( $id, $oldValue ) ) {
258                    // Completely leave the cache key alone
259                    $ttl = WANObjectCache::TTL_UNCACHEABLE;
260                    // Use the old value
261                    return $oldValue;
262                }
263                // Regenerate from replica DB, and primary DB if needed
264                foreach ( [ DB_REPLICA, DB_PRIMARY ] as $source ) {
265                    // Log a fallback to primary
266                    if ( $source === DB_PRIMARY ) {
267                        $this->logger->info(
268                            $fname . ' falling back to primary select from ' .
269                            $this->table . ' with id ' . $id
270                        );
271                    }
272                    $db = $this->getDBConnection( $source );
273                    $cacheSetOpts = Database::getCacheSetOptions( $db );
274                    $table = $this->loadTable( $db );
275                    if ( array_key_exists( $id, $table ) ) {
276                        break; // found it
277                    }
278                }
279                // Use the value from last source checked
280                $setOpts += $cacheSetOpts;
281
282                return $table;
283            },
284            [ 'minAsOf' => INF ] // force callback run
285        );
286
287        $this->tableCache = $table;
288
289        if ( array_key_exists( $id, $table ) ) {
290            return $table[$id];
291        }
292
293        throw NameTableAccessException::newFromDetails( $this->table, 'id', $id );
294    }
295
296    /**
297     * Get the whole table, in no particular order as a map of ids to names.
298     * This method could be subject to DB or cache lag.
299     *
300     * @return string[] keys are the name ids, values are the names themselves
301     *  Example: [ 1 => 'foo', 3 => 'bar' ]
302     */
303    public function getMap() {
304        return $this->getTableFromCachesOrReplica();
305    }
306
307    /**
308     * @return array<int,string>
309     */
310    private function getTableFromCachesOrReplica() {
311        if ( $this->tableCache !== null ) {
312            return $this->tableCache;
313        }
314
315        $table = $this->cache->getWithSetCallback(
316            $this->getCacheKey(),
317            $this->cacheTTL,
318            function ( $oldValue, &$ttl, &$setOpts ) {
319                $dbr = $this->getDBConnection( DB_REPLICA );
320                $setOpts += Database::getCacheSetOptions( $dbr );
321                return $this->loadTable( $dbr );
322            }
323        );
324
325        $this->tableCache = $table;
326
327        return $table;
328    }
329
330    /**
331     * Gets the table from the db
332     *
333     * @param IReadableDatabase $db
334     * @return array<int,string>
335     */
336    private function loadTable( IReadableDatabase $db ) {
337        $result = $db->newSelectQueryBuilder()
338            ->select( [
339                'id' => $this->idField,
340                'name' => $this->nameField
341            ] )
342            ->from( $this->table )
343            ->orderBy( 'id' )
344            ->caller( __METHOD__ )->fetchResultSet();
345
346        $assocArray = [];
347        foreach ( $result as $row ) {
348            $assocArray[(int)$row->id] = $row->name;
349        }
350
351        return $assocArray;
352    }
353
354    /**
355     * Stores the given name in the DB, returning the ID when an insert occurs.
356     *
357     * @param string $name
358     * @return int The new or colliding ID
359     */
360    private function store( string $name ) {
361        Assert::parameter( $name !== '', '$name', 'should not be an empty string' );
362        // Note: this is only called internally so normalization of $name has already occurred.
363
364        $dbw = $this->getDBConnection( DB_PRIMARY, ILoadBalancer::CONN_TRX_AUTOCOMMIT );
365
366        $dbw->newInsertQueryBuilder()
367            ->insertInto( $this->table )
368            ->ignore()
369            ->row( $this->getFieldsToStore( $name ) )
370            ->caller( __METHOD__ )->execute();
371
372        if ( $dbw->affectedRows() > 0 ) {
373            $id = $dbw->insertId();
374            // As store returned an ID we know we inserted so delete from WAN cache
375            $dbw->onTransactionPreCommitOrIdle(
376                function () {
377                    $this->cache->delete( $this->getCacheKey() );
378                },
379                __METHOD__
380            );
381
382            return $id;
383        }
384
385        $this->logger->info(
386            'Tried to insert name into table ' . $this->table . ', but value already existed.'
387        );
388
389        // Note that in MySQL, even if this method somehow runs in a transaction, a plain
390        // (non-locking) SELECT will see the new row created by the other transaction, even
391        // with REPEATABLE-READ. This is due to how "consistent reads" works: the latest
392        // version of rows become visible to the snapshot after the transaction sees those
393        // rows as either matching an update query or conflicting with an insert query.
394        $id = $dbw->newSelectQueryBuilder()
395            ->select( [ 'id' => $this->idField ] )
396            ->from( $this->table )
397            ->where( [ $this->nameField => $name ] )
398            ->caller( __METHOD__ )->fetchField();
399
400        if ( $id === false ) {
401            // Insert failed due to IGNORE flag, but DB_PRIMARY didn't give us the data
402            $m = "No insert possible but primary DB didn't give us a record for " .
403                "'{$name}' in '{$this->table}'";
404            $this->logger->error( $m );
405            throw new NameTableAccessException( $m );
406        }
407
408        return (int)$id;
409    }
410
411    /**
412     * @param string $name
413     * @param int|null $id
414     * @return array
415     */
416    private function getFieldsToStore( $name, $id = null ) {
417        $fields = [];
418
419        $fields[$this->nameField] = $name;
420
421        if ( $id !== null ) {
422            $fields[$this->idField] = $id;
423        }
424
425        if ( $this->insertCallback !== null ) {
426            $fields = call_user_func( $this->insertCallback, $fields );
427        }
428        return $fields;
429    }
430
431}