Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
79.53% covered (warning)
79.53%
637 / 801
52.63% covered (warning)
52.63%
30 / 57
CRAP
0.00% covered (danger)
0.00%
0 / 1
SqlBagOStuff
79.53% covered (warning)
79.53%
637 / 801
52.63% covered (warning)
52.63%
30 / 57
630.64
0.00% covered (danger)
0.00%
0 / 1
 __construct
77.78% covered (warning)
77.78%
21 / 27
0.00% covered (danger)
0.00%
0 / 1
7.54
 doGet
100.00% covered (success)
100.00%
12 / 12
100.00% covered (success)
100.00%
1 / 1
4
 doSet
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 doDelete
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 doAdd
87.50% covered (warning)
87.50%
7 / 8
0.00% covered (danger)
0.00%
0 / 1
2.01
 doCas
87.50% covered (warning)
87.50%
7 / 8
0.00% covered (danger)
0.00%
0 / 1
2.01
 doChangeTTL
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 doIncrWithInit
100.00% covered (success)
100.00%
11 / 11
100.00% covered (success)
100.00%
1 / 1
3
 doGetMulti
100.00% covered (success)
100.00%
15 / 15
100.00% covered (success)
100.00%
1 / 1
4
 doSetMulti
100.00% covered (success)
100.00%
11 / 11
100.00% covered (success)
100.00%
1 / 1
1
 doDeleteMulti
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 doChangeTTLMulti
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 getConnection
80.00% covered (warning)
80.00%
8 / 10
0.00% covered (danger)
0.00%
0 / 1
5.20
 getShardIndexesForKey
92.86% covered (success)
92.86%
13 / 14
0.00% covered (danger)
0.00%
0 / 1
6.01
 getTableNameForKey
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
3
 getTableNameByShard
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
3
 fetchBlobs
81.67% covered (warning)
81.67%
49 / 60
0.00% covered (danger)
0.00%
0 / 1
21.22
 modifyBlobs
73.33% covered (warning)
73.33%
22 / 30
0.00% covered (danger)
0.00%
0 / 1
19.27
 modifyTableSpecificBlobsForSet
100.00% covered (success)
100.00%
24 / 24
100.00% covered (success)
100.00%
1 / 1
4
 modifyTableSpecificBlobsForDelete
100.00% covered (success)
100.00%
18 / 18
100.00% covered (success)
100.00%
1 / 1
4
 modifyTableSpecificBlobsForAdd
100.00% covered (success)
100.00%
30 / 30
100.00% covered (success)
100.00%
1 / 1
5
 modifyTableSpecificBlobsForCas
92.31% covered (success)
92.31%
36 / 39
0.00% covered (danger)
0.00%
0 / 1
7.02
 modifyTableSpecificBlobsForChangeTTL
100.00% covered (success)
100.00%
44 / 44
100.00% covered (success)
100.00%
1 / 1
9
 modifyTableSpecificBlobsForIncrInit
77.42% covered (warning)
77.42%
24 / 31
0.00% covered (danger)
0.00%
0 / 1
6.41
 modifyTableSpecificBlobsForIncrInitAsync
92.31% covered (success)
92.31%
12 / 13
0.00% covered (danger)
0.00%
0 / 1
3.00
 makeNewKeyExpiry
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 newLockingWriteSectionModificationTimestamp
83.33% covered (warning)
83.33%
5 / 6
0.00% covered (danger)
0.00%
0 / 1
2.02
 makeTimestampedModificationToken
93.75% covered (success)
93.75%
15 / 16
0.00% covered (danger)
0.00%
0 / 1
2.00
 buildExistenceConditions
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 buildUpsertRow
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
2
 buildMultiUpsertSetForOverwrite
100.00% covered (success)
100.00%
18 / 18
100.00% covered (success)
100.00%
1 / 1
4
 buildIncrUpsertSet
100.00% covered (success)
100.00%
21 / 21
100.00% covered (success)
100.00%
1 / 1
3
 encodeDbExpiry
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 decodeDbExpiry
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 dbEncodeSerialValue
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
2
 dbDecodeSerialValue
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
2
 addCasTokenFields
44.44% covered (danger)
44.44%
8 / 18
0.00% covered (danger)
0.00%
0 / 1
9.29
 getCasTokenFromRow
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
2.03
 garbageCollect
100.00% covered (success)
100.00%
14 / 14
100.00% covered (success)
100.00%
1 / 1
2
 expireAll
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 deleteObjectsExpiringBefore
0.00% covered (danger)
0.00%
0 / 34
0.00% covered (danger)
0.00%
0 / 1
90
 deleteServerObjectsExpiringBefore
75.51% covered (warning)
75.51%
37 / 49
0.00% covered (danger)
0.00%
0 / 1
12.78
 deleteAll
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
20
 doLock
53.85% covered (warning)
53.85%
7 / 13
0.00% covered (danger)
0.00%
0 / 1
3.88
 doUnlock
70.00% covered (warning)
70.00%
7 / 10
0.00% covered (danger)
0.00%
0 / 1
3.24
 makeKeyInternal
100.00% covered (success)
100.00%
17 / 17
100.00% covered (success)
100.00%
1 / 1
5
 requireConvertGenericKey
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 serialize
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
3
 unserialize
90.91% covered (success)
90.91%
10 / 11
0.00% covered (danger)
0.00%
0 / 1
5.02
 getLoadBalancer
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 getConnectionViaLoadBalancer
45.45% covered (danger)
45.45%
5 / 11
0.00% covered (danger)
0.00%
0 / 1
2.65
 getConnectionFromServerInfo
100.00% covered (success)
100.00%
10 / 10
100.00% covered (success)
100.00%
1 / 1
3
 handleDBError
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
42
 initSqliteDatabase
84.21% covered (warning)
84.21%
16 / 19
0.00% covered (danger)
0.00%
0 / 1
3.04
 createTables
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 getShardServerIndexes
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 silenceTransactionProfiler
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
1<?php
2/**
3 * Object caching using a SQL database.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
19 *
20 * @file
21 * @ingroup Cache
22 */
23
24use MediaWiki\MediaWikiServices;
25use Wikimedia\AtEase\AtEase;
26use Wikimedia\ObjectCache\MediumSpecificBagOStuff;
27use Wikimedia\Rdbms\Blob;
28use Wikimedia\Rdbms\Database;
29use Wikimedia\Rdbms\DBConnectionError;
30use Wikimedia\Rdbms\DBError;
31use Wikimedia\Rdbms\DBQueryError;
32use Wikimedia\Rdbms\IDatabase;
33use Wikimedia\Rdbms\ILoadBalancer;
34use Wikimedia\Rdbms\IMaintainableDatabase;
35use Wikimedia\Rdbms\RawSQLValue;
36use Wikimedia\Rdbms\SelectQueryBuilder;
37use Wikimedia\Rdbms\ServerInfo;
38use Wikimedia\ScopedCallback;
39use Wikimedia\Timestamp\ConvertibleTimestamp;
40
41/**
42 * RDBMS-based caching module
43 *
44 * The following database sharding schemes are supported:
45 *   - None; all keys map to the same shard
46 *   - Hash; keys map to shards via consistent hashing
47 *
48 * The following database replication topologies are supported:
49 *   - A primary database server for each shard, all within one datacenter
50 *   - A co-primary database server for each shard within each datacenter
51 *
52 * @ingroup Cache
53 */
54class SqlBagOStuff extends MediumSpecificBagOStuff {
55    /** @var callable|null Injected function which returns a LoadBalancer */
56    protected $loadBalancerCallback;
57    /** @var ILoadBalancer|null */
58    protected $loadBalancer;
59    /** @var string|false|null DB name used for keys using the LoadBalancer */
60    protected $dbDomain;
61    /** @var bool Whether to use the LoadBalancer */
62    protected $useLB = false;
63
64    /** @var array[] (server index => server config) */
65    protected $serverInfos = [];
66    /** @var string[] (server index => tag/host name) */
67    protected $serverTags = [];
68    /** @var float UNIX timestamp */
69    protected $lastGarbageCollect = 0;
70    /** @var int Average number of writes required to trigger garbage collection */
71    protected $purgePeriod = 10;
72    /** @var int Max expired rows to purge during randomized garbage collection */
73    protected $purgeLimit = 100;
74    /** @var int Number of table shards to use on each server */
75    protected $numTableShards = 1;
76    /** @var int */
77    protected $writeBatchSize = 100;
78    /** @var string */
79    protected $tableName = 'objectcache';
80    /** @var bool Whether multi-primary mode is enabled */
81    protected $multiPrimaryMode;
82
83    /** @var IMaintainableDatabase[] Map of (shard index => DB handle) */
84    protected $conns;
85    /** @var float[] Map of (shard index => UNIX timestamps) */
86    protected $connFailureTimes = [];
87    /** @var Exception[] Map of (shard index => Exception) */
88    protected $connFailureErrors = [];
89
90    /** @var bool Whether zlib methods are available to PHP */
91    private $hasZlib;
92
93    /** @var int Number of redundant read and writes for better consistency */
94    private $dataRedundancy;
95
96    /** A number of seconds well above any expected clock skew */
97    private const SAFE_CLOCK_BOUND_SEC = 15;
98    /** A number of seconds well above any expected clock skew and replication lag */
99    private const SAFE_PURGE_DELAY_SEC = 3600;
100    /** Distinct string for tombstones stored in the "serialized" value column */
101    private const TOMB_SERIAL = '';
102    /** Relative seconds-to-live to use for tombstones */
103    private const TOMB_EXPTIME = -self::SAFE_CLOCK_BOUND_SEC;
104    /** How many seconds must pass before triggering a garbage collection */
105    private const GC_DELAY_SEC = 1;
106
107    private const BLOB_VALUE = 0;
108    private const BLOB_EXPIRY = 1;
109    private const BLOB_CASTOKEN = 2;
110
111    /**
112     * Placeholder timestamp to use for TTL_INDEFINITE that can be stored in all RDBMs types.
113     * We use BINARY(14) for MySQL, BLOB for Sqlite, and TIMESTAMPZ for Postgres (which goes
114     * up to 294276 AD). The last second of the year 9999 can be stored in all these cases.
115     * https://www.postgresql.org/docs/9.0/datatype-datetime.html
116     */
117    private const INF_TIMESTAMP_PLACEHOLDER = '99991231235959';
118
119    /**
120     * Create a new backend instance from parameters injected by ObjectCache::newFromParams()
121     *
122     * The database servers must be provided by *either* the "server" parameter, the "servers"
123     * parameter or the "loadBalancer" parameter.
124     *
125     * The parameters are as described at {@link \MediaWiki\MainConfigSchema::ObjectCaches}
126     * except that:
127     *
128     *   - the configured "cluster" and main LB fallback modes are implemented by
129     *     the wiring by passing "loadBalancerCallback".
130     *   - "dbDomain" is required if "loadBalancerCallback" is set, whereas in
131     *     config it may be absent.
132     *
133     * @internal
134     *
135     * @param array $params
136     *   - server: string
137     *   - servers: string[]
138     *   - loadBalancerCallback: A closure which provides a LoadBalancer object
139     *      - dbDomain: string|false
140     *   - multiPrimaryMode: bool
141     *   - purgePeriod: int|float
142     *   - purgeLimit: int
143     *   - tableName: string
144     *   - shards: int
145     *   - writeBatchSize: int
146     */
147    public function __construct( $params ) {
148        parent::__construct( $params );
149
150        if ( isset( $params['servers'] ) || isset( $params['server'] ) ) {
151            // Configuration uses a direct list of servers.
152            // Object data is horizontally partitioned via key hash.
153            $index = 0;
154            foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) {
155                $this->serverInfos[$index] = $info;
156                // Allow integer-indexes arrays for b/c
157                $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index";
158                ++$index;
159            }
160        } elseif ( isset( $params['loadBalancerCallback'] ) ) {
161            $this->loadBalancerCallback = $params['loadBalancerCallback'];
162            if ( !isset( $params['dbDomain'] ) ) {
163                throw new InvalidArgumentException(
164                    __METHOD__ . ": 'dbDomain' is required if 'loadBalancerCallback' is given"
165                );
166            }
167            $this->dbDomain = $params['dbDomain'];
168            $this->useLB = true;
169        } else {
170            throw new InvalidArgumentException(
171                __METHOD__ . " requires 'server', 'servers', or 'loadBalancerCallback'"
172            );
173        }
174
175        $this->purgePeriod = intval( $params['purgePeriod'] ?? $this->purgePeriod );
176        $this->purgeLimit = intval( $params['purgeLimit'] ?? $this->purgeLimit );
177        $this->tableName = $params['tableName'] ?? $this->tableName;
178        $this->numTableShards = intval( $params['shards'] ?? $this->numTableShards );
179        $this->writeBatchSize = intval( $params['writeBatchSize'] ?? $this->writeBatchSize );
180        $this->multiPrimaryMode = $params['multiPrimaryMode'] ?? false;
181        $this->dataRedundancy = min( intval( $params['dataRedundancy'] ?? 1 ), count( $this->serverTags ) );
182
183        $this->attrMap[self::ATTR_DURABILITY] = self::QOS_DURABILITY_RDBMS;
184
185        $this->hasZlib = extension_loaded( 'zlib' );
186    }
187
188    protected function doGet( $key, $flags = 0, &$casToken = null ) {
189        $getToken = ( $casToken === self::PASS_BY_REF );
190        $casToken = null;
191
192        $data = $this->fetchBlobs( [ $key ], $getToken )[$key];
193        if ( $data ) {
194            $result = $this->unserialize( $data[self::BLOB_VALUE] );
195            if ( $getToken && $result !== false ) {
196                $casToken = $data[self::BLOB_CASTOKEN];
197            }
198            $valueSize = strlen( $data[self::BLOB_VALUE] );
199        } else {
200            $result = false;
201            $valueSize = false;
202        }
203
204        $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
205
206        return $result;
207    }
208
209    protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
210        $mtime = $this->getCurrentTime();
211
212        return $this->modifyBlobs(
213            [ $this, 'modifyTableSpecificBlobsForSet' ],
214            $mtime,
215            [ $key => [ $value, $exptime ] ]
216        );
217    }
218
219    protected function doDelete( $key, $flags = 0 ) {
220        $mtime = $this->getCurrentTime();
221
222        return $this->modifyBlobs(
223            [ $this, 'modifyTableSpecificBlobsForDelete' ],
224            $mtime,
225            [ $key => [] ]
226        );
227    }
228
229    protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
230        $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
231        if ( $mtime === null ) {
232            // Timeout or I/O error during lock acquisition
233            return false;
234        }
235
236        return $this->modifyBlobs(
237            [ $this, 'modifyTableSpecificBlobsForAdd' ],
238            $mtime,
239            [ $key => [ $value, $exptime ] ]
240        );
241    }
242
243    protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
244        $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
245        if ( $mtime === null ) {
246            // Timeout or I/O error during lock acquisition
247            return false;
248        }
249
250        return $this->modifyBlobs(
251            [ $this, 'modifyTableSpecificBlobsForCas' ],
252            $mtime,
253            [ $key => [ $value, $exptime, $casToken ] ]
254        );
255    }
256
257    protected function doChangeTTL( $key, $exptime, $flags ) {
258        $mtime = $this->getCurrentTime();
259
260        return $this->modifyBlobs(
261            [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
262            $mtime,
263            [ $key => [ $exptime ] ]
264        );
265    }
266
267    protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
268        $mtime = $this->getCurrentTime();
269
270        if ( $flags & self::WRITE_BACKGROUND ) {
271            $callback = [ $this, 'modifyTableSpecificBlobsForIncrInitAsync' ];
272        } else {
273            $callback = [ $this, 'modifyTableSpecificBlobsForIncrInit' ];
274        }
275
276        $result = $this->modifyBlobs(
277            $callback,
278            $mtime,
279            [ $key => [ $step, $init, $exptime ] ],
280            $resByKey
281        ) ? $resByKey[$key] : false;
282
283        return $result;
284    }
285
286    protected function doGetMulti( array $keys, $flags = 0 ) {
287        $result = [];
288        $valueSizeByKey = [];
289
290        $dataByKey = $this->fetchBlobs( $keys );
291        foreach ( $keys as $key ) {
292            $data = $dataByKey[$key];
293            if ( $data ) {
294                $serialValue = $data[self::BLOB_VALUE];
295                $value = $this->unserialize( $serialValue );
296                if ( $value !== false ) {
297                    $result[$key] = $value;
298                }
299                $valueSize = strlen( $serialValue );
300            } else {
301                $valueSize = false;
302            }
303            $valueSizeByKey[$key] = [ 0, $valueSize ];
304        }
305
306        $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey );
307
308        return $result;
309    }
310
311    protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
312        $mtime = $this->getCurrentTime();
313
314        return $this->modifyBlobs(
315            [ $this, 'modifyTableSpecificBlobsForSet' ],
316            $mtime,
317            array_map(
318                static function ( $value ) use ( $exptime ) {
319                    return [ $value, $exptime ];
320                },
321                $data
322            )
323        );
324    }
325
326    protected function doDeleteMulti( array $keys, $flags = 0 ) {
327        $mtime = $this->getCurrentTime();
328
329        return $this->modifyBlobs(
330            [ $this, 'modifyTableSpecificBlobsForDelete' ],
331            $mtime,
332            array_fill_keys( $keys, [] )
333        );
334    }
335
336    public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
337        $mtime = $this->getCurrentTime();
338
339        return $this->modifyBlobs(
340            [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
341            $mtime,
342            array_fill_keys( $keys, [ $exptime ] )
343        );
344    }
345
346    /**
347     * Get a connection to the specified database
348     *
349     * @param int $shardIndex Server index
350     * @return IMaintainableDatabase
351     * @throws DBConnectionError
352     * @throws UnexpectedValueException
353     */
354    private function getConnection( $shardIndex ) {
355        if ( $this->useLB ) {
356            return $this->getConnectionViaLoadBalancer();
357        }
358
359        // Don't keep timing out trying to connect if the server is down
360        if (
361            isset( $this->connFailureErrors[$shardIndex] ) &&
362            ( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
363        ) {
364            throw $this->connFailureErrors[$shardIndex];
365        }
366
367        if ( isset( $this->serverInfos[$shardIndex] ) ) {
368            $server = $this->serverInfos[$shardIndex];
369            $conn = $this->getConnectionFromServerInfo( $shardIndex, $server );
370        } else {
371            throw new UnexpectedValueException( "Invalid server index #$shardIndex" );
372        }
373
374        return $conn;
375    }
376
377    /**
378     * Get shard indexes to read and write from for a given key
379     *
380     * @param string $key
381     * @param bool $fallback Whether try to fallback to the next db
382     * @return int[] list of shard indexes
383     */
384    private function getShardIndexesForKey( $key, $fallback = false ) {
385        // Pick the same shard for sister keys
386        // Using the same hash stop as mc-router for consistency
387        if ( str_contains( $key, '|#|' ) ) {
388            $key = explode( '|#|', $key )[0];
389        }
390        if ( $this->useLB ) {
391            // LoadBalancer based configuration
392            return [ 0 ];
393        } else {
394            // Striped array of database servers
395            if ( count( $this->serverTags ) == 1 ) {
396                return [ 0 ];
397            } else {
398                $sortedServers = $this->serverTags;
399                // shuffle the servers based on hashing of the keys
400                ArrayUtils::consistentHashSort( $sortedServers, $key );
401                $shardIndexes = array_keys( $sortedServers );
402                if ( $this->dataRedundancy === 1 ) {
403                    if ( $fallback ) {
404                        return [ $shardIndexes[1] ];
405                    } else {
406                        return [ $shardIndexes[0] ];
407                    }
408                } else {
409                    return array_slice( $shardIndexes, 0, $this->dataRedundancy );
410                }
411            }
412        }
413    }
414
415    /**
416     * Get the table name for a given key
417     * @param string $key
418     * @return string table name
419     */
420    private function getTableNameForKey( $key ) {
421        // Pick the same shard for sister keys
422        // Using the same hash stop as mc-router for consistency
423        if ( str_contains( $key, '|#|' ) ) {
424            $key = explode( '|#|', $key )[0];
425        }
426
427        if ( $this->numTableShards > 1 ) {
428            $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
429            $tableIndex = $hash % $this->numTableShards;
430        } else {
431            $tableIndex = null;
432        }
433
434        return $this->getTableNameByShard( $tableIndex );
435    }
436
437    /**
438     * Get the table name for a given shard index
439     * @param int|null $index
440     * @return string
441     */
442    private function getTableNameByShard( $index ) {
443        if ( $index !== null && $this->numTableShards > 1 ) {
444            $decimals = strlen( (string)( $this->numTableShards - 1 ) );
445
446            return $this->tableName . sprintf( "%0{$decimals}d", $index );
447        }
448
449        return $this->tableName;
450    }
451
452    /**
453     * @param string[] $keys
454     * @param bool $getCasToken Whether to get a CAS token
455     * @param bool $fallback Whether try to fallback to the next db
456     * @return array<string,array|null> Map of (key => (value,expiry,token) or null)
457     */
458    private function fetchBlobs( array $keys, bool $getCasToken = false, $fallback = false ) {
459        if ( $fallback && count( $this->serverTags ) > 1 && $this->dataRedundancy > 1 ) {
460            // fallback doesn't work with data redundancy
461            return [];
462        }
463
464        /** @noinspection PhpUnusedLocalVariableInspection */
465        $silenceScope = $this->silenceTransactionProfiler();
466
467        // Initialize order-preserved per-key results; set values for live keys below
468        $dataByKey = array_fill_keys( $keys, null );
469        $dataByKeyAndShard = [];
470
471        $readTime = (int)$this->getCurrentTime();
472        $keysByTableByShard = [];
473        foreach ( $keys as $key ) {
474            $partitionTable = $this->getTableNameForKey( $key );
475            $shardIndexes = $this->getShardIndexesForKey( $key, $fallback );
476            foreach ( $shardIndexes as $shardIndex ) {
477                $keysByTableByShard[$shardIndex][$partitionTable][] = $key;
478            }
479        }
480        $fallbackResult = [];
481
482        foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) {
483            try {
484                $db = $this->getConnection( $shardIndex );
485                foreach ( $serverKeys as $partitionTable => $tableKeys ) {
486                    $res = $db->newSelectQueryBuilder()
487                        ->select(
488                            $getCasToken
489                            ? $this->addCasTokenFields( $db, [ 'keyname', 'value', 'exptime' ] )
490                            : [ 'keyname', 'value', 'exptime' ] )
491                        ->from( $partitionTable )
492                        ->where( $this->buildExistenceConditions( $db, $tableKeys, $readTime ) )
493                        ->caller( __METHOD__ )
494                        ->fetchResultSet();
495                    foreach ( $res as $row ) {
496                        $row->shardIndex = $shardIndex;
497                        $row->tableName = $partitionTable;
498                        $dataByKeyAndShard[$row->keyname][$shardIndex] = $row;
499                    }
500                }
501            } catch ( DBError $e ) {
502                if ( $fallback ) {
503                    $this->handleDBError( $e, $shardIndex );
504                } else {
505                    $fallbackResult += $this->fetchBlobs(
506                        array_merge( ...array_values( $serverKeys ) ),
507                        $getCasToken,
508                        true
509                    );
510                }
511
512            }
513        }
514        foreach ( $keys as $key ) {
515            $rowsByShard = $dataByKeyAndShard[$key] ?? null;
516            if ( !$rowsByShard ) {
517                continue;
518            }
519
520            // One response, no point of consistency checks
521            if ( count( $rowsByShard ) == 1 ) {
522                $row = array_values( $rowsByShard )[0];
523            } else {
524                usort( $rowsByShard, static function ( $a, $b ) {
525                    if ( $a->exptime == $b->exptime ) {
526                        return 0;
527                    }
528                    return ( $a->exptime < $b->exptime ) ? 1 : -1;
529                } );
530                $row = array_values( $rowsByShard )[0];
531            }
532
533            $this->debug( __METHOD__ . ": retrieved $key; expiry time is {$row->exptime}" );
534            try {
535                $db = $this->getConnection( $row->shardIndex );
536                $dataByKey[$key] = [
537                    self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ),
538                    self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ),
539                    self::BLOB_CASTOKEN => $getCasToken
540                        ? $this->getCasTokenFromRow( $db, $row )
541                        : null
542                ];
543            } catch ( DBQueryError $e ) {
544                $this->handleDBError( $e, $row->shardIndex );
545            }
546        }
547
548        return array_merge( $dataByKey, $fallbackResult );
549    }
550
551    /**
552     * @param callable $tableWriteCallback Callback the takes the following arguments:
553     *  - IDatabase instance
554     *  - Partition table name string
555     *  - UNIX modification timestamp
556     *  - Map of (key => list of arguments) for keys belonging to the server/table partition
557     *  - Map of (key => result) [returned]
558     * @param float $mtime UNIX modification timestamp
559     * @param array<string,array> $argsByKey Map of (key => list of arguments)
560     * @param array<string,mixed> &$resByKey Map of (key => result) [returned]
561     * @param bool $fallback Whether try to fallback to the next db
562     * @return bool Whether all keys were processed
563     * @param-taint $argsByKey none
564     */
565    private function modifyBlobs(
566        callable $tableWriteCallback,
567        float $mtime,
568        array $argsByKey,
569        &$resByKey = [],
570        $fallback = false
571    ) {
572        if ( $fallback && count( $this->serverTags ) > 1 && $this->dataRedundancy > 1 ) {
573            // fallback doesn't work with data redundancy
574            return false;
575        }
576        // Initialize order-preserved per-key results; callbacks mark successful results
577        $resByKey = array_fill_keys( array_keys( $argsByKey ), false );
578
579        /** @noinspection PhpUnusedLocalVariableInspection */
580        $silenceScope = $this->silenceTransactionProfiler();
581
582        $argsByKeyByTableByShard = [];
583        foreach ( $argsByKey as $key => $args ) {
584            $partitionTable = $this->getTableNameForKey( $key );
585            $shardIndexes = $this->getShardIndexesForKey( $key, $fallback );
586            foreach ( $shardIndexes as $shardIndex ) {
587                $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args;
588            }
589        }
590
591        $shardIndexesAffected = [];
592        foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) {
593            foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) {
594                try {
595                    $db = $this->getConnection( $shardIndex );
596                    $shardIndexesAffected[] = $shardIndex;
597                    $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey );
598                } catch ( DBError $e ) {
599                    if ( $fallback ) {
600                        $this->handleDBError( $e, $shardIndex );
601                        continue;
602                    } else {
603                        $this->modifyBlobs( $tableWriteCallback, $mtime, $ptKeyArgs, $resByKey, true );
604                    }
605                }
606            }
607        }
608
609        $success = !in_array( false, $resByKey, true );
610
611        foreach ( $shardIndexesAffected as $shardIndex ) {
612            try {
613                if (
614                    // Random purging is enabled
615                    $this->purgePeriod >= 1 &&
616                    // Only purge on one in every $this->purgePeriod writes
617                    mt_rand( 1, $this->purgePeriod ) == 1 &&
618                    // Avoid repeating the delete within a few seconds
619                    ( $this->getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC
620                ) {
621                    $this->garbageCollect( $shardIndex );
622                }
623            } catch ( DBError $e ) {
624                $this->handleDBError( $e, $shardIndex );
625            }
626        }
627
628        return $success;
629    }
630
631    /**
632     * Set key/value pairs belonging to a partition table on the given server
633     *
634     * In multi-primary mode, if the current row for a key exists and has a modification token
635     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
636     * then the write to that key will be aborted with a "false" result. Successfully modified
637     * key rows will be assigned a new modification token using the provided timestamp.
638     *
639     * @param IDatabase $db Handle to the database server where the argument keys belong
640     * @param string $ptable Name of the partition table where the argument keys belong
641     * @param float $mtime UNIX modification timestamp
642     * @param array<string,array> $argsByKey Non-empty (key => (value,exptime)) map
643     * @param array<string,mixed> &$resByKey Map of (key => result) for successful writes [returned]
644     * @throws DBError
645     */
646    private function modifyTableSpecificBlobsForSet(
647        IDatabase $db,
648        string $ptable,
649        float $mtime,
650        array $argsByKey,
651        array &$resByKey
652    ) {
653        $valueSizesByKey = [];
654
655        $mt = $this->makeTimestampedModificationToken( $mtime, $db );
656
657        $rows = [];
658        foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
659            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
660            $serialValue = $this->getSerialized( $value, $key );
661            $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
662
663            $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
664        }
665
666        if ( $this->multiPrimaryMode ) {
667            $db->newInsertQueryBuilder()
668                ->insertInto( $ptable )
669                ->rows( $rows )
670                ->onDuplicateKeyUpdate()
671                ->uniqueIndexFields( [ 'keyname' ] )
672                ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) )
673                ->caller( __METHOD__ )->execute();
674        } else {
675            // T288998: use REPLACE, if possible, to avoid cluttering the binlogs
676            $db->newReplaceQueryBuilder()
677                ->replaceInto( $ptable )
678                ->rows( $rows )
679                ->uniqueIndexFields( [ 'keyname' ] )
680                ->caller( __METHOD__ )->execute();
681        }
682
683        foreach ( $argsByKey as $key => $unused ) {
684            $resByKey[$key] = true;
685        }
686
687        $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
688    }
689
690    /**
691     * Purge/tombstone key/value pairs belonging to a partition table on the given server
692     *
693     * In multi-primary mode, if the current row for a key exists and has a modification token
694     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
695     * then the write to that key will be aborted with a "false" result. Successfully modified
696     * key rows will be assigned a new modification token/timestamp, an empty value, and an
697     * expiration timestamp dated slightly before the new modification timestamp.
698     *
699     * @param IDatabase $db Handle to the database server where the argument keys belong
700     * @param string $ptable Name of the partition table where the argument keys belong
701     * @param float $mtime UNIX modification timestamp
702     * @param array<string,array> $argsByKey Non-empty (key => []) map
703     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
704     * @throws DBError
705     */
706    private function modifyTableSpecificBlobsForDelete(
707        IDatabase $db,
708        string $ptable,
709        float $mtime,
710        array $argsByKey,
711        array &$resByKey
712    ) {
713        if ( $this->multiPrimaryMode ) {
714            // Tombstone keys in order to respect eventual consistency
715            $mt = $this->makeTimestampedModificationToken( $mtime, $db );
716            $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (int)$mtime );
717            $queryBuilder = $db->newInsertQueryBuilder()
718                ->insertInto( $ptable )
719                ->onDuplicateKeyUpdate()
720                ->uniqueIndexFields( [ 'keyname' ] )
721                ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) );
722            foreach ( $argsByKey as $key => $arg ) {
723                $queryBuilder->row( $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt ) );
724            }
725            $queryBuilder->caller( __METHOD__ )->execute();
726        } else {
727            // Just purge the keys since there is only one primary (e.g. "source of truth")
728            $db->newDeleteQueryBuilder()
729                ->deleteFrom( $ptable )
730                ->where( [ 'keyname' => array_keys( $argsByKey ) ] )
731                ->caller( __METHOD__ )->execute();
732        }
733
734        foreach ( $argsByKey as $key => $arg ) {
735            $resByKey[$key] = true;
736        }
737
738        $this->updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) );
739    }
740
741    /**
742     * Insert key/value pairs belonging to a partition table on the given server
743     *
744     * If the current row for a key exists and has an integral UNIX timestamp of expiration
745     * greater than that of the provided modification timestamp, then the write to that key
746     * will be aborted with a "false" result. Acquisition of advisory key locks must be handled
747     * by calling functions.
748     *
749     * In multi-primary mode, if the current row for a key exists and has a modification token
750     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
751     * then the write to that key will be aborted with a "false" result. Successfully modified
752     * key rows will be assigned a new modification token/timestamp.
753     *
754     * @param IDatabase $db Handle to the database server where the argument keys belong
755     * @param string $ptable Name of the partition table where the argument keys belong
756     * @param float $mtime UNIX modification timestamp
757     * @param array<string,array> $argsByKey Non-empty (key => (value,exptime)) map
758     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
759     * @throws DBError
760     */
761    private function modifyTableSpecificBlobsForAdd(
762        IDatabase $db,
763        string $ptable,
764        float $mtime,
765        array $argsByKey,
766        array &$resByKey
767    ) {
768        $valueSizesByKey = [];
769
770        $mt = $this->makeTimestampedModificationToken( $mtime, $db );
771
772        // This check must happen outside the write query to respect eventual consistency
773        $existingKeys = $db->newSelectQueryBuilder()
774            ->select( 'keyname' )
775            ->from( $ptable )
776            ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
777            ->caller( __METHOD__ )
778            ->fetchFieldValues();
779        $existingByKey = array_fill_keys( $existingKeys, true );
780
781        $rows = [];
782        foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
783            if ( isset( $existingByKey[$key] ) ) {
784                $this->logger->debug( __METHOD__ . "$key already exists" );
785                continue;
786            }
787
788            $serialValue = $this->getSerialized( $value, $key );
789            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
790            $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
791            $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
792        }
793        if ( !$rows ) {
794            return;
795        }
796        $db->newInsertQueryBuilder()
797            ->insertInto( $ptable )
798            ->rows( $rows )
799            ->onDuplicateKeyUpdate()
800            ->uniqueIndexFields( [ 'keyname' ] )
801            ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) )
802            ->caller( __METHOD__ )->execute();
803
804        foreach ( $argsByKey as $key => $unused ) {
805            $resByKey[$key] = !isset( $existingByKey[$key] );
806        }
807
808        $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey );
809    }
810
811    /**
812     * Insert key/value pairs belonging to a partition table on the given server
813     *
814     * If the current row for a key exists, has an integral UNIX timestamp of expiration greater
815     * than that of the provided modification timestamp, and the CAS token does not match, then
816     * the write to that key will be aborted with a "false" result. Acquisition of advisory key
817     * locks must be handled by calling functions.
818     *
819     * In multi-primary mode, if the current row for a key exists and has a modification token
820     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
821     * then the write to that key will be aborted with a "false" result. Successfully modified
822     * key rows will be assigned a new modification token/timestamp.
823     *
824     * @param IDatabase $db Handle to the database server where the argument keys belong
825     * @param string $ptable Name of the partition table where the argument keys belong
826     * @param float $mtime UNIX modification timestamp
827     * @param array<string,array> $argsByKey Non-empty (key => (value, exptime, CAS token)) map
828     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
829     * @throws DBError
830     */
831    private function modifyTableSpecificBlobsForCas(
832        IDatabase $db,
833        string $ptable,
834        float $mtime,
835        array $argsByKey,
836        array &$resByKey
837    ) {
838        $valueSizesByKey = [];
839
840        $mt = $this->makeTimestampedModificationToken( $mtime, $db );
841
842        // This check must happen outside the write query to respect eventual consistency
843        $res = $db->newSelectQueryBuilder()
844            ->select( $this->addCasTokenFields( $db, [ 'keyname' ] ) )
845            ->from( $ptable )
846            ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
847            ->caller( __METHOD__ )
848            ->fetchResultSet();
849
850        $curTokensByKey = [];
851        foreach ( $res as $row ) {
852            $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row );
853        }
854
855        $nonMatchingByKey = [];
856        $rows = [];
857        foreach ( $argsByKey as $key => [ $value, $exptime, $casToken ] ) {
858            $curToken = $curTokensByKey[$key] ?? null;
859            if ( $curToken === null ) {
860                $nonMatchingByKey[$key] = true;
861                $this->logger->debug( __METHOD__ . "$key does not exists" );
862                continue;
863            }
864
865            if ( $curToken !== $casToken ) {
866                $nonMatchingByKey[$key] = true;
867                $this->logger->debug( __METHOD__ . "$key does not have a matching token" );
868                continue;
869            }
870
871            $serialValue = $this->getSerialized( $value, $key );
872            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
873            $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
874
875            $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
876        }
877        if ( !$rows ) {
878            return;
879        }
880        $db->newInsertQueryBuilder()
881            ->insertInto( $ptable )
882            ->rows( $rows )
883            ->onDuplicateKeyUpdate()
884            ->uniqueIndexFields( [ 'keyname' ] )
885            ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) )
886            ->caller( __METHOD__ )->execute();
887
888        foreach ( $argsByKey as $key => $unused ) {
889            $resByKey[$key] = !isset( $nonMatchingByKey[$key] );
890        }
891
892        $this->updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey );
893    }
894
895    /**
896     * Update the TTL for keys belonging to a partition table on the given server
897     *
898     * If no current row for a key exists or the current row has an integral UNIX timestamp of
899     * expiration less than that of the provided modification timestamp, then the write to that
900     * key will be aborted with a "false" result.
901     *
902     * In multi-primary mode, if the current row for a key exists and has a modification token
903     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
904     * then the write to that key will be aborted with a "false" result. Successfully modified
905     * key rows will be assigned a new modification token/timestamp.
906     *
907     * @param IDatabase $db Handle to the database server where the argument keys belong
908     * @param string $ptable Name of the partition table where the argument keys belong
909     * @param float $mtime UNIX modification timestamp
910     * @param array<string,array> $argsByKey Non-empty (key => (exptime)) map
911     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
912     * @throws DBError
913     */
914    private function modifyTableSpecificBlobsForChangeTTL(
915        IDatabase $db,
916        string $ptable,
917        float $mtime,
918        array $argsByKey,
919        array &$resByKey
920    ) {
921        if ( $this->multiPrimaryMode ) {
922            $mt = $this->makeTimestampedModificationToken( $mtime, $db );
923
924            $res = $db->newSelectQueryBuilder()
925                ->select( [ 'keyname', 'value' ] )
926                ->from( $ptable )
927                ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
928                ->caller( __METHOD__ )
929                ->fetchResultSet();
930
931            $rows = [];
932            $existingKeys = [];
933            foreach ( $res as $curRow ) {
934                $key = $curRow->keyname;
935                $existingKeys[$key] = true;
936                $serialValue = $this->dbDecodeSerialValue( $db, $curRow->value );
937                [ $exptime ] = $argsByKey[$key];
938                $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
939                $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
940            }
941            if ( !$rows ) {
942                return;
943            }
944            $db->newInsertQueryBuilder()
945                ->insertInto( $ptable )
946                ->rows( $rows )
947                ->onDuplicateKeyUpdate()
948                ->uniqueIndexFields( [ 'keyname' ] )
949                ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) )
950                ->caller( __METHOD__ )->execute();
951
952            foreach ( $argsByKey as $key => $unused ) {
953                $resByKey[$key] = isset( $existingKeys[$key] );
954            }
955        } else {
956            $keysBatchesByExpiry = [];
957            foreach ( $argsByKey as $key => [ $exptime ] ) {
958                $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
959                $keysBatchesByExpiry[$expiry][] = $key;
960            }
961
962            $existingCount = 0;
963            foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) {
964                $db->newUpdateQueryBuilder()
965                    ->update( $ptable )
966                    ->set( [ 'exptime' => $this->encodeDbExpiry( $db, $expiry ) ] )
967                    ->where( $this->buildExistenceConditions( $db, $keyBatch, (int)$mtime ) )
968                    ->caller( __METHOD__ )->execute();
969                $existingCount += $db->affectedRows();
970            }
971            if ( $existingCount === count( $argsByKey ) ) {
972                foreach ( $argsByKey as $key => $args ) {
973                    $resByKey[$key] = true;
974                }
975            }
976        }
977
978        $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) );
979    }
980
981    /**
982     * Either increment a counter key, if it exists, or initialize it, otherwise
983     *
984     * If no current row for a key exists or the current row has an integral UNIX timestamp of
985     * expiration less than that of the provided modification timestamp, then the key row will
986     * be set to the initial value. Otherwise, the current row will be incremented.
987     *
988     * In multi-primary mode, if the current row for a key exists and has a modification token
989     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
990     * then the write to that key will be aborted with a "false" result. Successfully initialized
991     * key rows will be assigned a new modification token/timestamp.
992     *
993     * @param IDatabase $db Handle to the database server where the argument keys belong
994     * @param string $ptable Name of the partition table where the argument keys belong
995     * @param float $mtime UNIX modification timestamp
996     * @param array<string,array> $argsByKey Non-empty (key => (step, init, exptime) map
997     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
998     * @throws DBError
999     */
1000    private function modifyTableSpecificBlobsForIncrInit(
1001        IDatabase $db,
1002        string $ptable,
1003        float $mtime,
1004        array $argsByKey,
1005        array &$resByKey
1006    ) {
1007        foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) {
1008            $mt = $this->makeTimestampedModificationToken( $mtime, $db );
1009            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
1010
1011            // Use a transaction so that changes from other threads are not visible due to
1012            // "consistent reads". This way, the exact post-increment value can be returned.
1013            // The "live key exists" check can go inside the write query and remain safe for
1014            // replication since the TTL for such keys is either indefinite or very short.
1015            $atomic = $db->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
1016            try {
1017                $db->newInsertQueryBuilder()
1018                    ->insertInto( $ptable )
1019                    ->rows( $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ) )
1020                    ->onDuplicateKeyUpdate()
1021                    ->uniqueIndexFields( [ 'keyname' ] )
1022                    ->set( $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ) )
1023                    ->caller( __METHOD__ )->execute();
1024                $affectedCount = $db->affectedRows();
1025                $row = $db->newSelectQueryBuilder()
1026                    ->select( 'value' )
1027                    ->from( $ptable )
1028                    ->where( [ 'keyname' => $key ] )
1029                    ->caller( __METHOD__ )
1030                    ->fetchRow();
1031            } catch ( Exception $e ) {
1032                $db->cancelAtomic( __METHOD__, $atomic );
1033                throw $e;
1034            }
1035            $db->endAtomic( __METHOD__ );
1036
1037            if ( !$affectedCount || $row === false ) {
1038                $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
1039                continue;
1040            }
1041
1042            $serialValue = $this->dbDecodeSerialValue( $db, $row->value );
1043            if ( !$this->isInteger( $serialValue ) ) {
1044                $this->logger->warning( __METHOD__ . ": got non-integer $key value" );
1045                continue;
1046            }
1047
1048            $resByKey[$key] = (int)$serialValue;
1049        }
1050
1051        $this->updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) );
1052    }
1053
1054    /**
1055     * Same as modifyTableSpecificBlobsForIncrInit() but does not return the
1056     * new value.
1057     *
1058     * @param IDatabase $db
1059     * @param string $ptable
1060     * @param float $mtime
1061     * @param array<string,array> $argsByKey
1062     * @param array<string,mixed> &$resByKey
1063     * @throws DBError
1064     */
1065    private function modifyTableSpecificBlobsForIncrInitAsync(
1066        IDatabase $db,
1067        string $ptable,
1068        float $mtime,
1069        array $argsByKey,
1070        array &$resByKey
1071    ) {
1072        foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) {
1073            $mt = $this->makeTimestampedModificationToken( $mtime, $db );
1074            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
1075            $db->newInsertQueryBuilder()
1076                ->insertInto( $ptable )
1077                ->rows( $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ) )
1078                ->onDuplicateKeyUpdate()
1079                ->uniqueIndexFields( [ 'keyname' ] )
1080                ->set( $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ) )
1081                ->caller( __METHOD__ )->execute();
1082            if ( !$db->affectedRows() ) {
1083                $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
1084            } else {
1085                $resByKey[$key] = true;
1086            }
1087        }
1088    }
1089
1090    /**
1091     * @param int $exptime Relative or absolute expiration
1092     * @param int $nowTsUnix Current UNIX timestamp
1093     * @return int UNIX timestamp or TTL_INDEFINITE
1094     */
1095    private function makeNewKeyExpiry( $exptime, int $nowTsUnix ) {
1096        $expiry = $this->getExpirationAsTimestamp( $exptime );
1097        // Eventual consistency requires the preservation of recently modified keys.
1098        // Do not create rows with `exptime` fields so low that they might get garbage
1099        // collected before being replicated.
1100        if ( $expiry !== self::TTL_INDEFINITE ) {
1101            $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC );
1102        }
1103
1104        return $expiry;
1105    }
1106
1107    /**
1108     * Get a scoped lock and modification timestamp for a critical section of reads/writes
1109     *
1110     * This is used instead of BagOStuff::getCurrentTime() for certain writes (such as "add",
1111     * "incr", and "cas"), for which we want to support tight race conditions where the same
1112     * key is repeatedly written to by multiple web servers that each get to see the previous
1113     * value, act on it, and modify it in some way.
1114     *
1115     * It is assumed that this method is normally only invoked from the primary datacenter.
1116     * A lock is acquired on the primary server of the local datacenter in order to avoid race
1117     * conditions within the critical section. The clock on the SQL server is used to get the
1118     * modification timestamp in order to minimize issues with clock drift between web servers;
1119     * thus key writes will not be rejected due to some web servers having lagged clocks.
1120     *
1121     * @param string $key
1122     * @param ?ScopedCallback &$scope Unlocker callback; null on failure [returned]
1123     * @return float|null UNIX timestamp with 6 decimal places; null on failure
1124     */
1125    private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) {
1126        if ( !$this->lock( $key, 0 ) ) {
1127            return null;
1128        }
1129
1130        $scope = new ScopedCallback( function () use ( $key ) {
1131            $this->unlock( $key );
1132        } );
1133
1134        // sprintf is used to adjust precision
1135        return (float)sprintf( '%.6F', $this->locks[$key][self::LOCK_TIME] );
1136    }
1137
1138    /**
1139     * Make a `modtoken` column value with the original time and source database server of a write
1140     *
1141     * @param float $mtime UNIX modification timestamp
1142     * @param IDatabase $db Handle to the primary database server sourcing the write
1143     * @return string String of the form "<SECONDS_SOURCE><MICROSECONDS>", where SECONDS_SOURCE
1144     *  is "<35 bit seconds portion of UNIX time><32 bit database server ID>" as 13 base 36 chars,
1145     *  and MICROSECONDS is "<20 bit microseconds portion of UNIX time>" as 4 base 36 chars
1146     */
1147    private function makeTimestampedModificationToken( float $mtime, IDatabase $db ) {
1148        // We have reserved space for upto 6 digits in the microsecond portion of the token.
1149        // This is for future use only (maybe CAS tokens) and not currently used.
1150        // It is currently populated by the microsecond portion returned by microtime,
1151        // which generally has fewer than 6 digits of meaningful precision but can still be useful
1152        // in debugging (to see the token continuously change even during rapid testing).
1153        $seconds = (int)$mtime;
1154        [ , $microseconds ] = explode( '.', sprintf( '%.6F', $mtime ) );
1155
1156        $id = sprintf( '%u', crc32( $db->getServerName() ) );
1157
1158        $token = implode( '', [
1159            // 67 bit integral portion of UNIX timestamp, qualified
1160            \Wikimedia\base_convert(
1161                // 35 bit integral seconds portion of UNIX timestamp
1162                str_pad( base_convert( (string)$seconds, 10, 2 ), 35, '0', STR_PAD_LEFT ) .
1163                // 32 bit ID of the primary database server handling the write
1164                str_pad( base_convert( $id, 10, 2 ), 32, '0', STR_PAD_LEFT ),
1165                2,
1166                36,
1167                13
1168            ),
1169            // 20 bit fractional portion of UNIX timestamp, as integral microseconds
1170            str_pad( base_convert( $microseconds, 10, 36 ), 4, '0', STR_PAD_LEFT )
1171        ] );
1172
1173        if ( strlen( $token ) !== 17 ) {
1174            throw new RuntimeException( "Modification timestamp overflow detected" );
1175        }
1176
1177        return $token;
1178    }
1179
1180    /**
1181     * WHERE conditions that check for existence and liveness of keys
1182     *
1183     * @param IDatabase $db
1184     * @param string[]|string $keys
1185     * @param int $time UNIX modification timestamp
1186     * @return array
1187     */
1188    private function buildExistenceConditions( IDatabase $db, $keys, int $time ) {
1189        // Note that tombstones always have past expiration dates
1190        return [
1191            'keyname' => $keys,
1192            $db->expr( 'exptime', '>=', $db->timestamp( $time ) )
1193        ];
1194    }
1195
1196    /**
1197     * INSERT array for handling key writes/overwrites when no live nor stale key exists
1198     *
1199     * @param IDatabase $db
1200     * @param string $key
1201     * @param string|int $serialValue New value
1202     * @param int $expiry Expiration timestamp or TTL_INDEFINITE
1203     * @param string $mt Modification token
1204     * @return array
1205     */
1206    private function buildUpsertRow(
1207        IDatabase $db,
1208        $key,
1209        $serialValue,
1210        int $expiry,
1211        string $mt
1212    ) {
1213        $row = [
1214            'keyname' => $key,
1215            'value'   => $this->dbEncodeSerialValue( $db, $serialValue ),
1216            'exptime' => $this->encodeDbExpiry( $db, $expiry )
1217        ];
1218        if ( $this->multiPrimaryMode ) {
1219            $row['modtoken'] = $mt;
1220        }
1221
1222        return $row;
1223    }
1224
1225    /**
1226     * SET array for handling key overwrites when a live or stale key exists
1227     *
1228     * @param IDatabase $db
1229     * @param string $mt Modification token
1230     * @return array
1231     */
1232    private function buildMultiUpsertSetForOverwrite( IDatabase $db, string $mt ) {
1233        $expressionsByColumn = [
1234            'value'   => $db->buildExcludedValue( 'value' ),
1235            'exptime' => $db->buildExcludedValue( 'exptime' )
1236        ];
1237
1238        $set = [];
1239        if ( $this->multiPrimaryMode ) {
1240            // The query might take a while to replicate, during which newer values might get
1241            // written. Qualify the query so that it does not override such values. Note that
1242            // duplicate tokens generated around the same time for a key should still result
1243            // in convergence given the use of server_id in modtoken (providing a total order
1244            // among primary DB servers) and MySQL binlog ordering (providing a total order
1245            // for writes replicating from a given primary DB server).
1246            $expressionsByColumn['modtoken'] = $db->addQuotes( $mt );
1247            foreach ( $expressionsByColumn as $column => $updateExpression ) {
1248                $rhs = $db->conditional(
1249                    $db->addQuotes( substr( $mt, 0, 13 ) ) . ' >= ' .
1250                        $db->buildSubString( 'modtoken', 1, 13 ),
1251                    $updateExpression,
1252                    $column
1253                );
1254                $set[$column] = new RawSQLValue( $rhs );
1255            }
1256        } else {
1257            foreach ( $expressionsByColumn as $column => $updateExpression ) {
1258                $set[$column] = new RawSQLValue( $updateExpression );
1259            }
1260        }
1261
1262        return $set;
1263    }
1264
1265    /**
1266     * SET array for handling key overwrites when a live or stale key exists
1267     *
1268     * @param IDatabase $db
1269     * @param int $step Positive counter incrementation value
1270     * @param int $init Positive initial counter value
1271     * @param int $expiry Expiration timestamp or TTL_INDEFINITE
1272     * @param string $mt Modification token
1273     * @param int $mtUnixTs UNIX timestamp of modification token
1274     * @return array
1275     */
1276    private function buildIncrUpsertSet(
1277        IDatabase $db,
1278        int $step,
1279        int $init,
1280        int $expiry,
1281        string $mt,
1282        int $mtUnixTs
1283    ) {
1284        // Map of (column => (SQL for non-expired key rows, SQL for expired key rows))
1285        $expressionsByColumn = [
1286            'value'   => [
1287                $db->buildIntegerCast( 'value' ) . " + {$db->addQuotes( $step )}",
1288                $db->addQuotes( $this->dbEncodeSerialValue( $db, $init ) )
1289            ],
1290            'exptime' => [
1291                'exptime',
1292                $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1293            ]
1294        ];
1295        if ( $this->multiPrimaryMode ) {
1296            $expressionsByColumn['modtoken'] = [ 'modtoken', $db->addQuotes( $mt ) ];
1297        }
1298
1299        $set = [];
1300        foreach ( $expressionsByColumn as $column => [ $updateExpression, $initExpression ] ) {
1301            $rhs = $db->conditional(
1302                $db->expr( 'exptime', '>=', $db->timestamp( $mtUnixTs ) ),
1303                $updateExpression,
1304                $initExpression
1305            );
1306            $set[$column] = new RawSQLValue( $rhs );
1307        }
1308
1309        return $set;
1310    }
1311
1312    /**
1313     * @param IDatabase $db
1314     * @param int $expiry UNIX timestamp of expiration or TTL_INDEFINITE
1315     * @return string
1316     */
1317    private function encodeDbExpiry( IDatabase $db, int $expiry ) {
1318        return ( $expiry === self::TTL_INDEFINITE )
1319            // Use the maximum timestamp that the column can store
1320            ? $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER )
1321            // Convert the absolute timestamp into the DB timestamp format
1322            : $db->timestamp( $expiry );
1323    }
1324
1325    /**
1326     * @param IDatabase $db
1327     * @param string $dbExpiry DB timestamp of expiration
1328     * @return int UNIX timestamp of expiration or TTL_INDEFINITE
1329     */
1330    private function decodeDbExpiry( IDatabase $db, string $dbExpiry ) {
1331        return ( $dbExpiry === $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) )
1332            ? self::TTL_INDEFINITE
1333            : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry );
1334    }
1335
1336    /**
1337     * @param IDatabase $db
1338     * @param string|int $serialValue
1339     * @return string
1340     */
1341    private function dbEncodeSerialValue( IDatabase $db, $serialValue ) {
1342        return is_int( $serialValue ) ? (string)$serialValue : $db->encodeBlob( $serialValue );
1343    }
1344
1345    /**
1346     * @param IDatabase $db
1347     * @param Blob|string|int $blob
1348     * @return string|int
1349     */
1350    private function dbDecodeSerialValue( IDatabase $db, $blob ) {
1351        return $this->isInteger( $blob ) ? (int)$blob : $db->decodeBlob( $blob );
1352    }
1353
1354    /**
1355     * Either append a 'castoken' field or append the fields needed to compute the CAS token
1356     *
1357     * @param IDatabase $db
1358     * @param string[] $fields SELECT field array
1359     * @return string[] SELECT field array
1360     */
1361    private function addCasTokenFields( IDatabase $db, array $fields ) {
1362        $type = $db->getType();
1363
1364        if ( $type === 'mysql' ) {
1365            $fields['castoken'] = $db->buildConcat( [
1366                'SHA1(value)',
1367                $db->addQuotes( '@' ),
1368                'exptime'
1369            ] );
1370        } elseif ( $type === 'postgres' ) {
1371            $fields['castoken'] = $db->buildConcat( [
1372                'md5(value)',
1373                $db->addQuotes( '@' ),
1374                'exptime'
1375            ] );
1376        } else {
1377            if ( !in_array( 'value', $fields, true ) ) {
1378                $fields[] = 'value';
1379            }
1380            if ( !in_array( 'exptime', $fields, true ) ) {
1381                $fields[] = 'exptime';
1382            }
1383        }
1384
1385        return $fields;
1386    }
1387
1388    /**
1389     * Get a CAS token from a SELECT result row
1390     *
1391     * @param IDatabase $db
1392     * @param stdClass $row A row for a key
1393     * @return string CAS token
1394     */
1395    private function getCasTokenFromRow( IDatabase $db, stdClass $row ) {
1396        if ( isset( $row->castoken ) ) {
1397            $token = $row->castoken;
1398        } else {
1399            $token = sha1( $this->dbDecodeSerialValue( $db, $row->value ) ) . '@' . $row->exptime;
1400            $this->logger->debug( __METHOD__ . ": application computed hash for CAS token" );
1401        }
1402
1403        return $token;
1404    }
1405
1406    /**
1407     * @param int $shardIndex
1408     * @throws DBError
1409     */
1410    private function garbageCollect( $shardIndex ) {
1411        // set right away, avoid queuing duplicate async callbacks
1412        $this->lastGarbageCollect = $this->getCurrentTime();
1413
1414        $garbageCollector = function () use ( $shardIndex ) {
1415            $db = $this->getConnection( $shardIndex );
1416            /** @noinspection PhpUnusedLocalVariableInspection */
1417            $silenceScope = $this->silenceTransactionProfiler();
1418            $this->deleteServerObjectsExpiringBefore(
1419                $db,
1420                (int)$this->getCurrentTime(),
1421                $this->purgeLimit
1422            );
1423            $this->lastGarbageCollect = $this->getCurrentTime();
1424        };
1425
1426        if ( $this->asyncHandler ) {
1427            ( $this->asyncHandler )( $garbageCollector );
1428        } else {
1429            $garbageCollector();
1430        }
1431    }
1432
1433    /**
1434     * @deprecated since 1.41, use deleteObjectsExpiringBefore() instead
1435     */
1436    public function expireAll() {
1437        wfDeprecated( __METHOD__, '1.41' );
1438        $this->deleteObjectsExpiringBefore( (int)$this->getCurrentTime() );
1439    }
1440
1441    public function deleteObjectsExpiringBefore(
1442        $timestamp,
1443        ?callable $progress = null,
1444        $limit = INF,
1445        ?string $tag = null
1446    ) {
1447        /** @noinspection PhpUnusedLocalVariableInspection */
1448        $silenceScope = $this->silenceTransactionProfiler();
1449
1450        if ( $tag !== null ) {
1451            // Purge one server only, to support concurrent purging in large wiki farms (T282761).
1452            $shardIndexes = [];
1453            if ( !$this->serverTags ) {
1454                throw new InvalidArgumentException( "Given a tag but no tags are configured" );
1455            }
1456            foreach ( $this->serverTags as $serverShardIndex => $serverTag ) {
1457                if ( $tag === $serverTag ) {
1458                    $shardIndexes[] = $serverShardIndex;
1459                    break;
1460                }
1461            }
1462            if ( !$shardIndexes ) {
1463                throw new InvalidArgumentException( "Unknown server tag: $tag" );
1464            }
1465        } else {
1466            $shardIndexes = $this->getShardServerIndexes();
1467            shuffle( $shardIndexes );
1468        }
1469
1470        $ok = true;
1471        $numServers = count( $shardIndexes );
1472
1473        $keysDeletedCount = 0;
1474        foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
1475            try {
1476                $db = $this->getConnection( $shardIndex );
1477
1478                // Avoid deadlock (T330377)
1479                $lockKey = "SqlBagOStuff-purge-shard:$shardIndex";
1480                if ( !$db->lock( $lockKey, __METHOD__, 0 ) ) {
1481                    $this->logger->info( "SqlBagOStuff purge for shard $shardIndex already locked, skip" );
1482                    continue;
1483                }
1484
1485                $this->deleteServerObjectsExpiringBefore(
1486                    $db,
1487                    $timestamp,
1488                    $limit,
1489                    $keysDeletedCount,
1490                    [ 'fn' => $progress, 'serversDone' => $numServersDone, 'serversTotal' => $numServers ]
1491                );
1492                $db->unlock( $lockKey, __METHOD__ );
1493            } catch ( DBError $e ) {
1494                $this->handleDBError( $e, $shardIndex );
1495                $ok = false;
1496            }
1497        }
1498
1499        return $ok;
1500    }
1501
1502    /**
1503     * @param IDatabase $db
1504     * @param string|int $timestamp
1505     * @param int|float $limit Maximum number of rows to delete in total or INF for no limit
1506     * @param int &$keysDeletedCount
1507     * @param array|null $progress
1508     * @phan-param array{fn:?callback,serversDone:int,serversTotal:int}|null $progress
1509     * @throws DBError
1510     */
1511    private function deleteServerObjectsExpiringBefore(
1512        IDatabase $db,
1513        $timestamp,
1514        $limit,
1515        &$keysDeletedCount = 0,
1516        ?array $progress = null
1517    ) {
1518        $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
1519        if ( $this->multiPrimaryMode ) {
1520            // Eventual consistency requires the preservation of any key that was recently
1521            // modified. The key must exist on this database server long enough for the server
1522            // to receive, via replication, all writes to the key with lower timestamps. Such
1523            // writes should be no-ops since the existing key value should "win". If the network
1524            // partitions between datacenters A and B for 30 minutes, the database servers in
1525            // each datacenter will see an initial burst of writes with "old" timestamps via
1526            // replication. This might include writes with lower timestamps that the existing
1527            // key value. Therefore, clock skew and replication delay are both factors.
1528            $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC;
1529            $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe );
1530        }
1531        $tableIndexes = range( 0, $this->numTableShards - 1 );
1532        shuffle( $tableIndexes );
1533
1534        $batchSize = min( $this->writeBatchSize, $limit );
1535
1536        foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
1537            // The oldest expiry of a row we have deleted on this shard
1538            // (the first row that we deleted)
1539            $minExpUnix = null;
1540            // The most recent expiry time so far, from a row we have deleted on this shard
1541            $maxExp = null;
1542            // Size of the time range we'll delete, in seconds (for progress estimate)
1543            $totalSeconds = null;
1544
1545            do {
1546                $res = $db->newSelectQueryBuilder()
1547                    ->select( [ 'keyname', 'exptime' ] )
1548                    ->from( $this->getTableNameByShard( $tableIndex ) )
1549                    ->where( $db->expr( 'exptime', '<', $db->timestamp( $cutoffUnix ) ) )
1550                    ->andWhere( $maxExp ? $db->expr( 'exptime', '>=', $maxExp ) : [] )
1551                    ->orderBy( 'exptime', SelectQueryBuilder::SORT_ASC )
1552                    ->limit( $batchSize )
1553                    ->caller( __METHOD__ )
1554                    ->fetchResultSet();
1555
1556                if ( $res->numRows() ) {
1557                    $row = $res->current();
1558                    if ( $minExpUnix === null ) {
1559                        $minExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
1560                        $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 );
1561                    }
1562
1563                    $keys = [];
1564                    foreach ( $res as $row ) {
1565                        $keys[] = $row->keyname;
1566                        $maxExp = $row->exptime;
1567                    }
1568
1569                    $db->newDeleteQueryBuilder()
1570                        ->deleteFrom( $this->getTableNameByShard( $tableIndex ) )
1571                        ->where( [
1572                            'keyname' => $keys,
1573                            $db->expr( 'exptime', '<', $db->timestamp( $cutoffUnix ) ),
1574                        ] )
1575                        ->caller( __METHOD__ )->execute();
1576                    $keysDeletedCount += $db->affectedRows();
1577                }
1578
1579                if ( $progress && is_callable( $progress['fn'] ) ) {
1580                    if ( $totalSeconds ) {
1581                        $maxExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $maxExp );
1582                        $remainingSeconds = $cutoffUnix - $maxExpUnix;
1583                        $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 );
1584                        // For example, if we've done 1.5 table shard, and are thus half-way on the
1585                        // 2nd of perhaps 5 tables on this server, then this might be:
1586                        // `( 1 + ( 43200 / 86400 ) ) / 5 = 0.3`, or 30% done, of tables on this server.
1587                        $tablesDoneRatio =
1588                            ( $numShardsDone + ( $processedSeconds / $totalSeconds ) ) / $this->numTableShards;
1589                    } else {
1590                        $tablesDoneRatio = 1;
1591                    }
1592
1593                    // For example, if we're 30% done on the last of 10 servers, then this might be:
1594                    // `( 9 / 10 ) + ( 0.3 / 10 ) = 0.93`, or 93% done, overall.
1595                    $overallRatio = ( $progress['serversDone'] / $progress['serversTotal'] ) +
1596                        ( $tablesDoneRatio / $progress['serversTotal'] );
1597                    ( $progress['fn'] )( $overallRatio * 100 );
1598                }
1599            } while ( $res->numRows() && $keysDeletedCount < $limit );
1600        }
1601    }
1602
1603    /**
1604     * Delete content of shard tables in every server.
1605     * Return true if the operation is successful, false otherwise.
1606     *
1607     * @deprecated since 1.41, unused.
1608     *
1609     * @return bool
1610     */
1611    public function deleteAll() {
1612        wfDeprecated( __METHOD__, '1.41' );
1613        /** @noinspection PhpUnusedLocalVariableInspection */
1614        $silenceScope = $this->silenceTransactionProfiler();
1615        foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1616            try {
1617                $db = $this->getConnection( $shardIndex );
1618                for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1619                    $db->newDeleteQueryBuilder()
1620                        ->deleteFrom( $this->getTableNameByShard( $i ) )
1621                        ->where( $db::ALL_ROWS )
1622                        ->caller( __METHOD__ )->execute();
1623                }
1624            } catch ( DBError $e ) {
1625                $this->handleDBError( $e, $shardIndex );
1626                return false;
1627            }
1628        }
1629        return true;
1630    }
1631
1632    public function doLock( $key, $timeout = 6, $exptime = 6 ) {
1633        /** @noinspection PhpUnusedLocalVariableInspection */
1634        $silenceScope = $this->silenceTransactionProfiler();
1635
1636        $lockTsUnix = null;
1637
1638        $shardIndexes = $this->getShardIndexesForKey( $key );
1639        foreach ( $shardIndexes as $shardIndex ) {
1640            try {
1641                $db = $this->getConnection( $shardIndex );
1642                $lockTsUnix = $db->lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP );
1643            } catch ( DBError $e ) {
1644                $this->handleDBError( $e, $shardIndex );
1645                $this->logger->warning(
1646                    __METHOD__ . ' failed due to I/O error for {key}.',
1647                    [ 'key' => $key ]
1648                );
1649            }
1650        }
1651
1652        return $lockTsUnix;
1653    }
1654
1655    public function doUnlock( $key ) {
1656        /** @noinspection PhpUnusedLocalVariableInspection */
1657        $silenceScope = $this->silenceTransactionProfiler();
1658
1659        $shardIndexes = $this->getShardIndexesForKey( $key );
1660        $released = false;
1661        foreach ( $shardIndexes as $shardIndex ) {
1662            try {
1663                $db = $this->getConnection( $shardIndex );
1664                $released = $db->unlock( $key, __METHOD__ );
1665            } catch ( DBError $e ) {
1666                $this->handleDBError( $e, $shardIndex );
1667                $released = false;
1668            }
1669        }
1670
1671        return $released;
1672    }
1673
1674    protected function makeKeyInternal( $keyspace, $components ) {
1675        // SQL schema for 'objectcache' specifies keys as varchar(255). From that,
1676        // subtract the number of characters we need for the keyspace and for
1677        // the separator character needed for each argument. To handle some
1678        // custom prefixes used by things like WANObjectCache, limit to 205.
1679        $keyspace = strtr( $keyspace, ' ', '_' );
1680        $charsLeft = 205 - strlen( $keyspace ) - count( $components );
1681        foreach ( $components as &$component ) {
1682            $component = strtr(
1683                $component ?? '',
1684                [
1685                    ' ' => '_', // Avoid unnecessary misses from pre-1.35 code
1686                    ':' => '%3A',
1687                ]
1688            );
1689
1690            // 33 = 32 characters for the MD5 + 1 for the '#' prefix.
1691            if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) {
1692                $component = '#' . md5( $component );
1693            }
1694            $charsLeft -= strlen( $component );
1695        }
1696        unset( $component );
1697
1698        if ( $charsLeft < 0 ) {
1699            return $keyspace . ':BagOStuff-long-key:##' . md5( implode( ':', $components ) );
1700        }
1701        return $keyspace . ':' . implode( ':', $components );
1702    }
1703
1704    protected function requireConvertGenericKey(): bool {
1705        return true;
1706    }
1707
1708    protected function serialize( $value ) {
1709        if ( is_int( $value ) ) {
1710            return $value;
1711        }
1712
1713        $serial = serialize( $value );
1714        if ( $this->hasZlib ) {
1715            // On typical message and page data, this can provide a 3X storage savings
1716            $serial = gzdeflate( $serial );
1717        }
1718
1719        return $serial;
1720    }
1721
1722    protected function unserialize( $value ) {
1723        if ( $value === self::TOMB_SERIAL ) {
1724            return false; // tombstone
1725        }
1726
1727        if ( $this->isInteger( $value ) ) {
1728            return (int)$value;
1729        }
1730
1731        if ( $this->hasZlib ) {
1732            AtEase::suppressWarnings();
1733            $decompressed = gzinflate( $value );
1734            AtEase::restoreWarnings();
1735
1736            if ( $decompressed !== false ) {
1737                $value = $decompressed;
1738            }
1739        }
1740
1741        return unserialize( $value );
1742    }
1743
1744    private function getLoadBalancer(): ILoadBalancer {
1745        if ( !$this->loadBalancer ) {
1746            $this->loadBalancer = ( $this->loadBalancerCallback )();
1747        }
1748        return $this->loadBalancer;
1749    }
1750
1751    /**
1752     * @return IMaintainableDatabase
1753     * @throws DBError
1754     */
1755    private function getConnectionViaLoadBalancer() {
1756        $lb = $this->getLoadBalancer();
1757
1758        if ( $lb->getServerAttributes( ServerInfo::WRITER_INDEX )[Database::ATTR_DB_LEVEL_LOCKING] ) {
1759            // Use the main connection to avoid transaction deadlocks
1760            $conn = $lb->getMaintenanceConnectionRef( DB_PRIMARY, [], $this->dbDomain );
1761        } else {
1762            // If the RDBMS has row/table/page level locking, then use separate auto-commit
1763            // connection to avoid needless contention and deadlocks.
1764            $conn = $lb->getMaintenanceConnectionRef(
1765                DB_PRIMARY,
1766                [],
1767                $this->dbDomain,
1768                $lb::CONN_TRX_AUTOCOMMIT
1769            );
1770        }
1771
1772        // Make sure any errors are thrown now while we can more easily handle them
1773        $conn->ensureConnection();
1774        return $conn;
1775    }
1776
1777    /**
1778     * @param int $shardIndex
1779     * @param array $server Server config map
1780     * @return IMaintainableDatabase
1781     * @throws DBError
1782     */
1783    private function getConnectionFromServerInfo( $shardIndex, array $server ) {
1784        if ( !isset( $this->conns[$shardIndex] ) ) {
1785            $server['logger'] = $this->logger;
1786            // Always use autocommit mode, even if DBO_TRX is configured
1787            $server['flags'] ??= 0;
1788            $server['flags'] &= ~( IDatabase::DBO_TRX | IDatabase::DBO_DEFAULT );
1789
1790            /** @var IMaintainableDatabase $conn Auto-commit connection to the server */
1791            $conn = MediaWikiServices::getInstance()->getDatabaseFactory()
1792                ->create( $server['type'], $server );
1793
1794            // Automatically create the objectcache table for sqlite as needed
1795            if ( $conn->getType() === 'sqlite' ) {
1796                $this->initSqliteDatabase( $conn );
1797            }
1798            $this->conns[$shardIndex] = $conn;
1799        }
1800
1801        // @phan-suppress-next-line PhanTypeMismatchReturnNullable False positive
1802        return $this->conns[$shardIndex];
1803    }
1804
1805    /**
1806     * Handle a DBError which occurred during a read operation.
1807     *
1808     * @param DBError $exception
1809     * @param int $shardIndex Server index
1810     */
1811    private function handleDBError( DBError $exception, $shardIndex ) {
1812        if ( !$this->useLB && $exception instanceof DBConnectionError ) {
1813            unset( $this->conns[$shardIndex] ); // bug T103435
1814
1815            $now = $this->getCurrentTime();
1816            if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
1817                if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
1818                    unset( $this->connFailureTimes[$shardIndex] );
1819                    unset( $this->connFailureErrors[$shardIndex] );
1820                } else {
1821                    $this->logger->debug( __METHOD__ . ": Server #$shardIndex already down" );
1822                    return;
1823                }
1824            }
1825            $this->logger->info( __METHOD__ . ": Server #$shardIndex down until " . ( $now + 60 ) );
1826            $this->connFailureTimes[$shardIndex] = $now;
1827            $this->connFailureErrors[$shardIndex] = $exception;
1828        }
1829        $this->logger->error( "DBError: {$exception->getMessage()}", [ 'exception' => $exception ] );
1830        if ( $exception instanceof DBConnectionError ) {
1831            $this->setLastError( self::ERR_UNREACHABLE );
1832            $this->logger->warning( __METHOD__ . ": ignoring connection error" );
1833        } else {
1834            $this->setLastError( self::ERR_UNEXPECTED );
1835            $this->logger->warning( __METHOD__ . ": ignoring query error" );
1836        }
1837    }
1838
1839    /**
1840     * @param IMaintainableDatabase $db
1841     * @throws DBError
1842     */
1843    private function initSqliteDatabase( IMaintainableDatabase $db ) {
1844        if ( $db->tableExists( 'objectcache', __METHOD__ ) ) {
1845            return;
1846        }
1847        // Use one table for SQLite; sharding does not seem to have much benefit
1848        $db->query( "PRAGMA journal_mode=WAL", __METHOD__ ); // this is permanent
1849        $db->startAtomic( __METHOD__ ); // atomic DDL
1850        try {
1851            $encTable = $db->tableName( 'objectcache' );
1852            $encExptimeIndex = $db->addIdentifierQuotes( $db->tablePrefix() . 'exptime' );
1853            $db->query(
1854                "CREATE TABLE $encTable (\n" .
1855                "    keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
1856                "    value BLOB,\n" .
1857                "    exptime BLOB NOT NULL\n" .
1858                ")",
1859                __METHOD__
1860            );
1861            $db->query( "CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ );
1862            $db->endAtomic( __METHOD__ );
1863        } catch ( DBError $e ) {
1864            $db->rollback( __METHOD__ );
1865            throw $e;
1866        }
1867    }
1868
1869    /**
1870     * Create the shard tables on all databases
1871     *
1872     * This is typically called manually by a sysadmin via eval.php, e.g. for ParserCache:
1873     *
1874     * @code
1875     *     ObjectCache::getInstance( 'myparsercache' )->createTables();
1876     * @endcode
1877     *
1878     * This is different from `$services->getParserCache()->getCacheStorage()->createTables()`,
1879     * which would use the backend set via $wgParserCacheType, which shouldn't be
1880     * set yet for the backend you are creating shard tables on. The expectation
1881     * is to first add the new backend to $wgObjectCaches, run the above, and then enable
1882     * it for live ParserCache traffic by setting $wgParserCacheType.
1883     */
1884    public function createTables() {
1885        foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1886            $db = $this->getConnection( $shardIndex );
1887            if ( in_array( $db->getType(), [ 'mysql', 'postgres' ], true ) ) {
1888                for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1889                    $encBaseTable = $db->tableName( 'objectcache' );
1890                    $encShardTable = $db->tableName( $this->getTableNameByShard( $i ) );
1891                    $db->query( "CREATE TABLE IF NOT EXISTS $encShardTable LIKE $encBaseTable", __METHOD__ );
1892                }
1893            }
1894        }
1895    }
1896
1897    /**
1898     * @return int[] List of server indexes
1899     */
1900    private function getShardServerIndexes() {
1901        if ( $this->useLB ) {
1902            // LoadBalancer based configuration
1903            $shardIndexes = [ 0 ];
1904        } else {
1905            // Striped array of database servers
1906            $shardIndexes = array_keys( $this->serverTags );
1907        }
1908
1909        return $shardIndexes;
1910    }
1911
1912    /**
1913     * Silence the transaction profiler until the return value falls out of scope
1914     *
1915     * @return ScopedCallback|null
1916     */
1917    private function silenceTransactionProfiler() {
1918        if ( $this->serverInfos ) {
1919            return null; // no TransactionProfiler injected anyway
1920        }
1921        return Profiler::instance()->getTransactionProfiler()->silenceForScope();
1922    }
1923}