Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
87.22% covered (warning)
87.22%
116 / 133
69.23% covered (warning)
69.23%
9 / 13
CRAP
0.00% covered (danger)
0.00%
0 / 1
NameTableStore
87.22% covered (warning)
87.22%
116 / 133
69.23% covered (warning)
69.23%
9 / 13
31.88
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 getDBConnection
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getCacheKey
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
1
 normalizeName
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 acquireId
66.67% covered (warning)
66.67%
10 / 15
0.00% covered (danger)
0.00%
0 / 1
3.33
 reloadMap
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 getId
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
2
 getName
77.42% covered (warning)
77.42%
24 / 31
0.00% covered (danger)
0.00%
0 / 1
8.74
 getMap
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getTableFromCachesOrReplica
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
2
 loadTable
100.00% covered (success)
100.00%
12 / 12
100.00% covered (success)
100.00%
1 / 1
2
 store
86.67% covered (warning)
86.67%
26 / 30
0.00% covered (danger)
0.00%
0 / 1
3.02
 getFieldsToStore
85.71% covered (warning)
85.71%
6 / 7
0.00% covered (danger)
0.00%
0 / 1
3.03
1<?php
2/**
3 * @license GPL-2.0-or-later
4 * @file
5 */
6
7namespace MediaWiki\Storage;
8
9use Psr\Log\LoggerInterface;
10use Wikimedia\Assert\Assert;
11use Wikimedia\ObjectCache\BagOStuff;
12use Wikimedia\ObjectCache\WANObjectCache;
13use Wikimedia\Rdbms\Database;
14use Wikimedia\Rdbms\IDatabase;
15use Wikimedia\Rdbms\ILoadBalancer;
16use Wikimedia\Rdbms\IReadableDatabase;
17
18/**
19 * @since 1.31
20 * @author Addshore
21 */
22class NameTableStore {
23
24    /** @var array<int,string>|null */
25    private $tableCache = null;
26
27    private readonly int $cacheTTL;
28
29    /** @var null|callable */
30    private $normalizationCallback;
31    /** @var null|callable */
32    private $insertCallback;
33
34    /**
35     * @param ILoadBalancer $loadBalancer A load balancer for acquiring database connections
36     * @param WANObjectCache $cache A cache manager for caching data. This can be the local
37     *        wiki's default instance even if $dbDomain refers to a different wiki, since
38     *        makeGlobalKey() is used to constructed a key that allows cached names from
39     *        the same database to be re-used between wikis. For example, enwiki and frwiki will
40     *        use the same cache keys for names from the wikidatawiki database, regardless
41     *        of the cache's default key space.
42     * @param LoggerInterface $logger
43     * @param string $table
44     * @param string $idField
45     * @param string $nameField
46     * @param callable|null $normalizationCallback Normalization to be applied to names before being
47     * saved or queried. This should be a callback that accepts and returns a single string.
48     * @param bool|string $domain Database domain ID. Use false for the local database domain.
49     * @param callable|null $insertCallback Callback to change insert fields accordingly.
50     * This parameter was introduced in 1.32
51     */
52    public function __construct(
53        private readonly ILoadBalancer $loadBalancer,
54        private readonly WANObjectCache $cache,
55        private readonly LoggerInterface $logger,
56        private readonly string $table,
57        private readonly string $idField,
58        private readonly string $nameField,
59        ?callable $normalizationCallback = null,
60        private readonly bool|string $domain = false,
61        ?callable $insertCallback = null,
62    ) {
63        $this->normalizationCallback = $normalizationCallback;
64        $this->cacheTTL = BagOStuff::TTL_MONTH;
65        $this->insertCallback = $insertCallback;
66    }
67
68    /**
69     * @param int $index A database index, like DB_PRIMARY or DB_REPLICA
70     * @param int $flags Database connection flags
71     * @return IDatabase
72     */
73    private function getDBConnection( $index, $flags = 0 ) {
74        return $this->loadBalancer->getConnection( $index, [], $this->domain, $flags );
75    }
76
77    /**
78     * Gets the cache key for names.
79     *
80     * The cache key is constructed based on the wiki ID passed to the constructor, and allows
81     * sharing of name tables cached for a specific database between wikis.
82     *
83     * @return string
84     */
85    private function getCacheKey() {
86        return $this->cache->makeGlobalKey(
87            'NameTableSqlStore',
88            $this->table,
89            $this->loadBalancer->resolveDomainID( $this->domain )
90        );
91    }
92
93    /**
94     * @param string $name
95     * @return string
96     */
97    private function normalizeName( $name ) {
98        if ( $this->normalizationCallback === null ) {
99            return $name;
100        }
101        return ( $this->normalizationCallback )( $name );
102    }
103
104    /**
105     * Acquire the id of the given name.
106     * This creates a row in the table if it doesn't already exist.
107     *
108     * @note If called within an atomic section, there is a chance for the acquired ID to be
109     * lost on rollback. There is no guarantee that an ID returned by this method is valid
110     * outside the transaction in which it was produced. This means that calling code should
111     * not retain the return value beyond the scope of a transaction, but rather call acquireId()
112     * again after the transaction is complete. In some rare cases, this may produce an ID
113     * different from the first call.
114     *
115     * @param string $name
116     * @throws NameTableAccessException
117     * @return int
118     */
119    public function acquireId( string $name ) {
120        $name = $this->normalizeName( $name );
121
122        $table = $this->getTableFromCachesOrReplica();
123        $searchResult = array_search( $name, $table, true );
124        if ( $searchResult === false ) {
125            $id = $this->store( $name );
126
127            if ( isset( $table[$id] ) ) {
128                // This can happen when a name is assigned an ID within a transaction due to
129                // CONN_TRX_AUTOCOMMIT being unable to use a separate connection (e.g. SQLite).
130                // The right thing to do in this case is to discard the old value. According to
131                // the contract of acquireId, the caller should not have used it outside the
132                // transaction, so it should not be persisted anywhere after the rollback.
133                $m = "Got ID $id for '$name' from insert"
134                    . " into '{$this->table}', but ID $id was previously associated with"
135                    . " the name '{$table[$id]}'. Overriding the old value, which presumably"
136                    . " has been removed from the database due to a transaction rollback.";
137                $this->logger->warning( $m );
138            }
139
140            $table[$id] = $name;
141            $searchResult = $id;
142
143            $this->tableCache = $table;
144        }
145
146        return $searchResult;
147    }
148
149    /**
150     * Reloads the name table from the primary database, and purges the WAN cache entry.
151     *
152     * @note This should only be called in situations where the local cache has been detected
153     * to be out of sync with the database. There should be no reason to call this method
154     * from outside the NameTableStore during normal operation. This method may however be
155     * useful in unit tests.
156     *
157     * @param int $connFlags ILoadBalancer::CONN_XXX flags. Optional.
158     *
159     * @return string[] The freshly reloaded name map
160     */
161    public function reloadMap( $connFlags = 0 ) {
162        $dbw = $this->getDBConnection( DB_PRIMARY, $connFlags );
163        $this->tableCache = $this->loadTable( $dbw );
164        $dbw->onTransactionPreCommitOrIdle( function () {
165            $this->cache->delete( $this->getCacheKey() );
166        }, __METHOD__ );
167
168        return $this->tableCache;
169    }
170
171    /**
172     * Get the id of the given name.
173     * If the name doesn't exist this will throw.
174     * This should be used in cases where we believe the name already exists or want to check for
175     * existence.
176     *
177     * @param string $name
178     * @throws NameTableAccessException The name does not exist
179     * @return int Id
180     */
181    public function getId( string $name ) {
182        $name = $this->normalizeName( $name );
183
184        $table = $this->getTableFromCachesOrReplica();
185        $searchResult = array_search( $name, $table, true );
186
187        if ( $searchResult !== false ) {
188            return $searchResult;
189        }
190
191        throw NameTableAccessException::newFromDetails( $this->table, 'name', $name );
192    }
193
194    /**
195     * Get the name of the given id.
196     * If the id doesn't exist this will throw.
197     * This should be used in cases where we believe the id already exists.
198     *
199     * Note: Calls to this method will result in a primary DB select for non existing IDs.
200     *
201     * @param int $id
202     * @throws NameTableAccessException The id does not exist
203     * @return string name
204     */
205    public function getName( int $id ) {
206        $table = $this->getTableFromCachesOrReplica();
207        if ( array_key_exists( $id, $table ) ) {
208            return $table[$id];
209        }
210        $fname = __METHOD__;
211
212        $table = $this->cache->getWithSetCallback(
213            $this->getCacheKey(),
214            $this->cacheTTL,
215            function ( $oldValue, &$ttl, &$setOpts ) use ( $id, $fname ) {
216                // Check if cached value is up-to-date enough to have $id
217                if ( is_array( $oldValue ) && array_key_exists( $id, $oldValue ) ) {
218                    // Completely leave the cache key alone
219                    $ttl = WANObjectCache::TTL_UNCACHEABLE;
220                    // Use the old value
221                    return $oldValue;
222                }
223                // Regenerate from replica DB, and primary DB if needed
224                foreach ( [ DB_REPLICA, DB_PRIMARY ] as $source ) {
225                    // Log a fallback to primary
226                    if ( $source === DB_PRIMARY ) {
227                        $this->logger->info(
228                            $fname . ' falling back to primary select from ' .
229                            $this->table . ' with id ' . $id
230                        );
231                    }
232                    $db = $this->getDBConnection( $source );
233                    $cacheSetOpts = Database::getCacheSetOptions( $db );
234                    $table = $this->loadTable( $db );
235                    if ( array_key_exists( $id, $table ) ) {
236                        break; // found it
237                    }
238                }
239                // Use the value from last source checked
240                $setOpts += $cacheSetOpts;
241
242                return $table;
243            },
244            [ 'minAsOf' => INF ] // force callback run
245        );
246
247        $this->tableCache = $table;
248
249        if ( array_key_exists( $id, $table ) ) {
250            return $table[$id];
251        }
252
253        throw NameTableAccessException::newFromDetails( $this->table, 'id', $id );
254    }
255
256    /**
257     * Get the whole table, in no particular order as a map of ids to names.
258     * This method could be subject to DB or cache lag.
259     *
260     * @return string[] keys are the name ids, values are the names themselves
261     *  Example: [ 1 => 'foo', 3 => 'bar' ]
262     */
263    public function getMap() {
264        return $this->getTableFromCachesOrReplica();
265    }
266
267    /**
268     * @return array<int,string>
269     */
270    private function getTableFromCachesOrReplica() {
271        if ( $this->tableCache !== null ) {
272            return $this->tableCache;
273        }
274
275        $table = $this->cache->getWithSetCallback(
276            $this->getCacheKey(),
277            $this->cacheTTL,
278            function ( $oldValue, &$ttl, &$setOpts ) {
279                $dbr = $this->getDBConnection( DB_REPLICA );
280                $setOpts += Database::getCacheSetOptions( $dbr );
281                return $this->loadTable( $dbr );
282            }
283        );
284
285        $this->tableCache = $table;
286
287        return $table;
288    }
289
290    /**
291     * Gets the table from the db
292     *
293     * @param IReadableDatabase $db
294     * @return array<int,string>
295     */
296    private function loadTable( IReadableDatabase $db ) {
297        $result = $db->newSelectQueryBuilder()
298            ->select( [
299                'id' => $this->idField,
300                'name' => $this->nameField
301            ] )
302            ->from( $this->table )
303            ->orderBy( 'id' )
304            ->caller( __METHOD__ )->fetchResultSet();
305
306        $assocArray = [];
307        foreach ( $result as $row ) {
308            $assocArray[(int)$row->id] = $row->name;
309        }
310
311        return $assocArray;
312    }
313
314    /**
315     * Stores the given name in the DB, returning the ID when an insert occurs.
316     *
317     * @param string $name
318     * @return int The new or colliding ID
319     */
320    private function store( string $name ) {
321        Assert::parameter( $name !== '', '$name', 'should not be an empty string' );
322        // Note: this is only called internally so normalization of $name has already occurred.
323
324        $dbw = $this->getDBConnection( DB_PRIMARY, ILoadBalancer::CONN_TRX_AUTOCOMMIT );
325
326        $dbw->newInsertQueryBuilder()
327            ->insertInto( $this->table )
328            ->ignore()
329            ->row( $this->getFieldsToStore( $name ) )
330            ->caller( __METHOD__ )->execute();
331
332        if ( $dbw->affectedRows() > 0 ) {
333            $id = $dbw->insertId();
334            // As store returned an ID we know we inserted so delete from WAN cache
335            $dbw->onTransactionPreCommitOrIdle(
336                function () {
337                    $this->cache->delete( $this->getCacheKey() );
338                },
339                __METHOD__
340            );
341
342            return $id;
343        }
344
345        $this->logger->info(
346            'Tried to insert name into table ' . $this->table . ', but value already existed.'
347        );
348
349        // Note that in MySQL, even if this method somehow runs in a transaction, a plain
350        // (non-locking) SELECT will see the new row created by the other transaction, even
351        // with REPEATABLE-READ. This is due to how "consistent reads" works: the latest
352        // version of rows become visible to the snapshot after the transaction sees those
353        // rows as either matching an update query or conflicting with an insert query.
354        $id = $dbw->newSelectQueryBuilder()
355            ->select( [ 'id' => $this->idField ] )
356            ->from( $this->table )
357            ->where( [ $this->nameField => $name ] )
358            ->caller( __METHOD__ )->fetchField();
359
360        if ( $id === false ) {
361            // Insert failed due to IGNORE flag, but DB_PRIMARY didn't give us the data
362            $m = "No insert possible but primary DB didn't give us a record for " .
363                "'{$name}' in '{$this->table}'";
364            $this->logger->error( $m );
365            throw new NameTableAccessException( $m );
366        }
367
368        return (int)$id;
369    }
370
371    /**
372     * @param string $name
373     * @param int|null $id
374     * @return array
375     */
376    private function getFieldsToStore( $name, $id = null ) {
377        $fields = [];
378
379        $fields[$this->nameField] = $name;
380
381        if ( $id !== null ) {
382            $fields[$this->idField] = $id;
383        }
384
385        if ( $this->insertCallback !== null ) {
386            $fields = ( $this->insertCallback )( $fields );
387        }
388        return $fields;
389    }
390
391}