Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
74.80% covered (warning)
74.80%
567 / 758
48.21% covered (danger)
48.21%
27 / 56
CRAP
0.00% covered (danger)
0.00%
0 / 1
SqlBagOStuff
74.80% covered (warning)
74.80%
567 / 758
48.21% covered (danger)
48.21%
27 / 56
839.96
0.00% covered (danger)
0.00%
0 / 1
 __construct
77.78% covered (warning)
77.78%
21 / 27
0.00% covered (danger)
0.00%
0 / 1
7.54
 doGet
100.00% covered (success)
100.00%
12 / 12
100.00% covered (success)
100.00%
1 / 1
4
 doSet
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 doDelete
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 doAdd
87.50% covered (warning)
87.50%
7 / 8
0.00% covered (danger)
0.00%
0 / 1
2.01
 doCas
87.50% covered (warning)
87.50%
7 / 8
0.00% covered (danger)
0.00%
0 / 1
2.01
 doChangeTTL
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 doIncrWithInit
100.00% covered (success)
100.00%
11 / 11
100.00% covered (success)
100.00%
1 / 1
3
 doGetMulti
100.00% covered (success)
100.00%
15 / 15
100.00% covered (success)
100.00%
1 / 1
4
 doSetMulti
100.00% covered (success)
100.00%
11 / 11
100.00% covered (success)
100.00%
1 / 1
1
 doDeleteMulti
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 doChangeTTLMulti
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 getConnection
80.00% covered (warning)
80.00%
8 / 10
0.00% covered (danger)
0.00%
0 / 1
5.20
 getKeyLocation
58.33% covered (warning)
58.33%
7 / 12
0.00% covered (danger)
0.00%
0 / 1
5.16
 getTableNameByShard
50.00% covered (danger)
50.00%
2 / 4
0.00% covered (danger)
0.00%
0 / 1
4.12
 fetchBlobs
90.00% covered (success)
90.00%
36 / 40
0.00% covered (danger)
0.00%
0 / 1
11.12
 modifyBlobs
79.17% covered (warning)
79.17%
19 / 24
0.00% covered (danger)
0.00%
0 / 1
10.90
 modifyTableSpecificBlobsForSet
100.00% covered (success)
100.00%
24 / 24
100.00% covered (success)
100.00%
1 / 1
4
 modifyTableSpecificBlobsForDelete
100.00% covered (success)
100.00%
18 / 18
100.00% covered (success)
100.00%
1 / 1
4
 modifyTableSpecificBlobsForAdd
100.00% covered (success)
100.00%
30 / 30
100.00% covered (success)
100.00%
1 / 1
5
 modifyTableSpecificBlobsForCas
92.31% covered (success)
92.31%
36 / 39
0.00% covered (danger)
0.00%
0 / 1
7.02
 modifyTableSpecificBlobsForChangeTTL
100.00% covered (success)
100.00%
44 / 44
100.00% covered (success)
100.00%
1 / 1
9
 modifyTableSpecificBlobsForIncrInit
77.42% covered (warning)
77.42%
24 / 31
0.00% covered (danger)
0.00%
0 / 1
6.41
 modifyTableSpecificBlobsForIncrInitAsync
92.31% covered (success)
92.31%
12 / 13
0.00% covered (danger)
0.00%
0 / 1
3.00
 makeNewKeyExpiry
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 newLockingWriteSectionModificationTimestamp
83.33% covered (warning)
83.33%
5 / 6
0.00% covered (danger)
0.00%
0 / 1
2.02
 makeTimestampedModificationToken
93.75% covered (success)
93.75%
15 / 16
0.00% covered (danger)
0.00%
0 / 1
2.00
 buildExistenceConditions
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 buildUpsertRow
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
2
 buildMultiUpsertSetForOverwrite
100.00% covered (success)
100.00%
18 / 18
100.00% covered (success)
100.00%
1 / 1
4
 buildIncrUpsertSet
100.00% covered (success)
100.00%
21 / 21
100.00% covered (success)
100.00%
1 / 1
3
 encodeDbExpiry
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 decodeDbExpiry
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 dbEncodeSerialValue
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
2
 dbDecodeSerialValue
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
2
 addCasTokenFields
44.44% covered (danger)
44.44%
8 / 18
0.00% covered (danger)
0.00%
0 / 1
9.29
 getCasTokenFromRow
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
2.03
 garbageCollect
92.86% covered (success)
92.86%
13 / 14
0.00% covered (danger)
0.00%
0 / 1
2.00
 expireAll
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 deleteObjectsExpiringBefore
0.00% covered (danger)
0.00%
0 / 34
0.00% covered (danger)
0.00%
0 / 1
90
 deleteServerObjectsExpiringBefore
42.86% covered (danger)
42.86%
21 / 49
0.00% covered (danger)
0.00%
0 / 1
33.58
 deleteAll
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
20
 doLock
50.00% covered (danger)
50.00%
6 / 12
0.00% covered (danger)
0.00%
0 / 1
2.50
 doUnlock
62.50% covered (warning)
62.50%
5 / 8
0.00% covered (danger)
0.00%
0 / 1
2.21
 makeKeyInternal
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
5
 requireConvertGenericKey
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 serialize
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
3
 unserialize
90.91% covered (success)
90.91%
10 / 11
0.00% covered (danger)
0.00%
0 / 1
5.02
 getLoadBalancer
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 getConnectionViaLoadBalancer
45.45% covered (danger)
45.45%
5 / 11
0.00% covered (danger)
0.00%
0 / 1
4.46
 getConnectionFromServerInfo
100.00% covered (success)
100.00%
9 / 9
100.00% covered (success)
100.00%
1 / 1
3
 handleDBError
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
42
 initSqliteDatabase
10.53% covered (danger)
10.53%
2 / 19
0.00% covered (danger)
0.00%
0 / 1
9.45
 createTables
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 getShardServerIndexes
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 silenceTransactionProfiler
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
1<?php
2/**
3 * Object caching using a SQL database.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
19 *
20 * @file
21 * @ingroup Cache
22 */
23
24use MediaWiki\MediaWikiServices;
25use Wikimedia\AtEase\AtEase;
26use Wikimedia\Rdbms\Blob;
27use Wikimedia\Rdbms\Database;
28use Wikimedia\Rdbms\DBConnectionError;
29use Wikimedia\Rdbms\DBError;
30use Wikimedia\Rdbms\DBQueryError;
31use Wikimedia\Rdbms\IDatabase;
32use Wikimedia\Rdbms\ILoadBalancer;
33use Wikimedia\Rdbms\IMaintainableDatabase;
34use Wikimedia\Rdbms\SelectQueryBuilder;
35use Wikimedia\ScopedCallback;
36use Wikimedia\Timestamp\ConvertibleTimestamp;
37
38/**
39 * RDBMS-based caching module
40 *
41 * The following database sharding schemes are supported:
42 *   - None; all keys map to the same shard
43 *   - Hash; keys map to shards via consistent hashing
44 *
45 * The following database replication topologies are supported:
46 *   - A primary database server for each shard, all within one datacenter
47 *   - A co-primary database server for each shard within each datacenter
48 *
49 * @ingroup Cache
50 */
51class SqlBagOStuff extends MediumSpecificBagOStuff {
52    /** @var callable|null Injected function which returns a LoadBalancer */
53    protected $loadBalancerCallback;
54    /** @var ILoadBalancer|null */
55    protected $loadBalancer;
56    /** @var string|false|null DB name used for keys using the LoadBalancer */
57    protected $dbDomain;
58    /** @var bool Whether to use the LoadBalancer */
59    protected $useLB = false;
60
61    /** @var array[] (server index => server config) */
62    protected $serverInfos = [];
63    /** @var string[] (server index => tag/host name) */
64    protected $serverTags = [];
65    /** @var float UNIX timestamp */
66    protected $lastGarbageCollect = 0;
67    /** @var int Average number of writes required to trigger garbage collection */
68    protected $purgePeriod = 10;
69    /** @var int Max expired rows to purge during randomized garbage collection */
70    protected $purgeLimit = 100;
71    /** @var int Number of table shards to use on each server */
72    protected $numTableShards = 1;
73    /** @var int */
74    protected $writeBatchSize = 100;
75    /** @var string */
76    protected $tableName = 'objectcache';
77    /** @var bool Whether to use replicas instead of primaries (if using LoadBalancer) */
78    protected $replicaOnly;
79    /** @var bool Whether multi-primary mode is enabled */
80    protected $multiPrimaryMode;
81
82    /** @var IMaintainableDatabase[] Map of (shard index => DB handle) */
83    protected $conns;
84    /** @var float[] Map of (shard index => UNIX timestamps) */
85    protected $connFailureTimes = [];
86    /** @var Exception[] Map of (shard index => Exception) */
87    protected $connFailureErrors = [];
88
89    /** @var bool Whether zlib methods are available to PHP */
90    private $hasZlib;
91
92    /** A number of seconds well above any expected clock skew */
93    private const SAFE_CLOCK_BOUND_SEC = 15;
94    /** A number of seconds well above any expected clock skew and replication lag */
95    private const SAFE_PURGE_DELAY_SEC = 3600;
96    /** Distinct string for tombstones stored in the "serialized" value column */
97    private const TOMB_SERIAL = '';
98    /** Relative seconds-to-live to use for tombstones */
99    private const TOMB_EXPTIME = -self::SAFE_CLOCK_BOUND_SEC;
100    /** How many seconds must pass before triggering a garbage collection */
101    private const GC_DELAY_SEC = 1;
102
103    private const BLOB_VALUE = 0;
104    private const BLOB_EXPIRY = 1;
105    private const BLOB_CASTOKEN = 2;
106
107    /**
108     * Placeholder timestamp to use for TTL_INDEFINITE that can be stored in all RDBMs types.
109     * We use BINARY(14) for MySQL, BLOB for Sqlite, and TIMESTAMPZ for Postgres (which goes
110     * up to 294276 AD). The last second of the year 9999 can be stored in all these cases.
111     * https://www.postgresql.org/docs/9.0/datatype-datetime.html
112     */
113    private const INF_TIMESTAMP_PLACEHOLDER = '99991231235959';
114
115    /**
116     * Create a new backend instance from parameters injected by ObjectCache::newFromParams()
117     *
118     * The database servers must be provided by *either* the "server" parameter, the "servers"
119     * parameter or the "loadBalancer" parameter.
120     *
121     * The parameters are as described at {@link \MediaWiki\MainConfigSchema::ObjectCaches}
122     * except that:
123     *
124     *   - the configured "cluster" and main LB fallback modes are implemented by
125     *     the wiring by passing "loadBalancerCallback".
126     *   - "dbDomain" is required if "loadBalancerCallback" is set, whereas in
127     *     config it may be absent.
128     *
129     * @internal
130     *
131     * @param array $params
132     *   - server: string
133     *   - servers: string[]
134     *   - loadBalancerCallback: A closure which provides a LoadBalancer object
135     *      - dbDomain: string|false
136     *   - multiPrimaryMode: bool
137     *   - purgePeriod: int|float
138     *   - purgeLimit: int
139     *   - tableName: string
140     *   - shards: int
141     *   - replicaOnly: bool
142     *   - writeBatchSize: int
143     */
144    public function __construct( $params ) {
145        parent::__construct( $params );
146
147        if ( isset( $params['servers'] ) || isset( $params['server'] ) ) {
148            // Configuration uses a direct list of servers.
149            // Object data is horizontally partitioned via key hash.
150            $index = 0;
151            foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) {
152                $this->serverInfos[$index] = $info;
153                // Allow integer-indexes arrays for b/c
154                $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index";
155                ++$index;
156            }
157        } elseif ( isset( $params['loadBalancerCallback'] ) ) {
158            $this->loadBalancerCallback = $params['loadBalancerCallback'];
159            if ( !isset( $params['dbDomain'] ) ) {
160                throw new InvalidArgumentException(
161                    __METHOD__ . ": 'dbDomain' is required if 'loadBalancerCallback' is given"
162                );
163            }
164            $this->dbDomain = $params['dbDomain'];
165            $this->useLB = true;
166        } else {
167            throw new InvalidArgumentException(
168                __METHOD__ . " requires 'server', 'servers', or 'loadBalancerCallback'"
169            );
170        }
171
172        $this->purgePeriod = intval( $params['purgePeriod'] ?? $this->purgePeriod );
173        $this->purgeLimit = intval( $params['purgeLimit'] ?? $this->purgeLimit );
174        $this->tableName = $params['tableName'] ?? $this->tableName;
175        $this->numTableShards = intval( $params['shards'] ?? $this->numTableShards );
176        $this->writeBatchSize = intval( $params['writeBatchSize'] ?? $this->writeBatchSize );
177        $this->replicaOnly = $params['replicaOnly'] ?? false;
178        $this->multiPrimaryMode = $params['multiPrimaryMode'] ?? false;
179
180        $this->attrMap[self::ATTR_DURABILITY] = self::QOS_DURABILITY_RDBMS;
181
182        $this->hasZlib = extension_loaded( 'zlib' );
183    }
184
185    protected function doGet( $key, $flags = 0, &$casToken = null ) {
186        $getToken = ( $casToken === self::PASS_BY_REF );
187        $casToken = null;
188
189        $data = $this->fetchBlobs( [ $key ], $getToken )[$key];
190        if ( $data ) {
191            $result = $this->unserialize( $data[self::BLOB_VALUE] );
192            if ( $getToken && $result !== false ) {
193                $casToken = $data[self::BLOB_CASTOKEN];
194            }
195            $valueSize = strlen( $data[self::BLOB_VALUE] );
196        } else {
197            $result = false;
198            $valueSize = false;
199        }
200
201        $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
202
203        return $result;
204    }
205
206    protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
207        $mtime = $this->getCurrentTime();
208
209        return $this->modifyBlobs(
210            [ $this, 'modifyTableSpecificBlobsForSet' ],
211            $mtime,
212            [ $key => [ $value, $exptime ] ]
213        );
214    }
215
216    protected function doDelete( $key, $flags = 0 ) {
217        $mtime = $this->getCurrentTime();
218
219        return $this->modifyBlobs(
220            [ $this, 'modifyTableSpecificBlobsForDelete' ],
221            $mtime,
222            [ $key => [] ]
223        );
224    }
225
226    protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
227        $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
228        if ( $mtime === null ) {
229            // Timeout or I/O error during lock acquisition
230            return false;
231        }
232
233        return $this->modifyBlobs(
234            [ $this, 'modifyTableSpecificBlobsForAdd' ],
235            $mtime,
236            [ $key => [ $value, $exptime ] ]
237        );
238    }
239
240    protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
241        $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
242        if ( $mtime === null ) {
243            // Timeout or I/O error during lock acquisition
244            return false;
245        }
246
247        return $this->modifyBlobs(
248            [ $this, 'modifyTableSpecificBlobsForCas' ],
249            $mtime,
250            [ $key => [ $value, $exptime, $casToken ] ]
251        );
252    }
253
254    protected function doChangeTTL( $key, $exptime, $flags ) {
255        $mtime = $this->getCurrentTime();
256
257        return $this->modifyBlobs(
258            [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
259            $mtime,
260            [ $key => [ $exptime ] ]
261        );
262    }
263
264    protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
265        $mtime = $this->getCurrentTime();
266
267        if ( $flags & self::WRITE_BACKGROUND ) {
268            $callback = [ $this, 'modifyTableSpecificBlobsForIncrInitAsync' ];
269        } else {
270            $callback = [ $this, 'modifyTableSpecificBlobsForIncrInit' ];
271        }
272
273        $result = $this->modifyBlobs(
274            $callback,
275            $mtime,
276            [ $key => [ $step, $init, $exptime ] ],
277            $resByKey
278        ) ? $resByKey[$key] : false;
279
280        return $result;
281    }
282
283    protected function doGetMulti( array $keys, $flags = 0 ) {
284        $result = [];
285        $valueSizeByKey = [];
286
287        $dataByKey = $this->fetchBlobs( $keys );
288        foreach ( $keys as $key ) {
289            $data = $dataByKey[$key];
290            if ( $data ) {
291                $serialValue = $data[self::BLOB_VALUE];
292                $value = $this->unserialize( $serialValue );
293                if ( $value !== false ) {
294                    $result[$key] = $value;
295                }
296                $valueSize = strlen( $serialValue );
297            } else {
298                $valueSize = false;
299            }
300            $valueSizeByKey[$key] = [ 0, $valueSize ];
301        }
302
303        $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey );
304
305        return $result;
306    }
307
308    protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
309        $mtime = $this->getCurrentTime();
310
311        return $this->modifyBlobs(
312            [ $this, 'modifyTableSpecificBlobsForSet' ],
313            $mtime,
314            array_map(
315                static function ( $value ) use ( $exptime ) {
316                    return [ $value, $exptime ];
317                },
318                $data
319            )
320        );
321    }
322
323    protected function doDeleteMulti( array $keys, $flags = 0 ) {
324        $mtime = $this->getCurrentTime();
325
326        return $this->modifyBlobs(
327            [ $this, 'modifyTableSpecificBlobsForDelete' ],
328            $mtime,
329            array_fill_keys( $keys, [] )
330        );
331    }
332
333    public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
334        $mtime = $this->getCurrentTime();
335
336        return $this->modifyBlobs(
337            [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
338            $mtime,
339            array_fill_keys( $keys, [ $exptime ] )
340        );
341    }
342
343    /**
344     * Get a connection to the specified database
345     *
346     * @param int $shardIndex Server index
347     * @return IMaintainableDatabase
348     * @throws DBConnectionError
349     * @throws UnexpectedValueException
350     */
351    private function getConnection( $shardIndex ) {
352        if ( $this->useLB ) {
353            return $this->getConnectionViaLoadBalancer();
354        }
355
356        // Don't keep timing out trying to connect if the server is down
357        if (
358            isset( $this->connFailureErrors[$shardIndex] ) &&
359            ( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
360        ) {
361            throw $this->connFailureErrors[$shardIndex];
362        }
363
364        if ( isset( $this->serverInfos[$shardIndex] ) ) {
365            $server = $this->serverInfos[$shardIndex];
366            $conn = $this->getConnectionFromServerInfo( $shardIndex, $server );
367        } else {
368            throw new UnexpectedValueException( "Invalid server index #$shardIndex" );
369        }
370
371        return $conn;
372    }
373
374    /**
375     * Get the server index and table name for a given key
376     * @param string $key
377     * @return array (server index, table name)
378     */
379    private function getKeyLocation( $key ) {
380        if ( $this->useLB ) {
381            // LoadBalancer based configuration
382            $shardIndex = 0;
383        } else {
384            // Striped array of database servers
385            if ( count( $this->serverTags ) == 1 ) {
386                $shardIndex = 0; // short-circuit
387            } else {
388                $sortedServers = $this->serverTags;
389                ArrayUtils::consistentHashSort( $sortedServers, $key );
390                $shardIndex = array_key_first( $sortedServers );
391            }
392        }
393
394        if ( $this->numTableShards > 1 ) {
395            $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
396            $tableIndex = $hash % $this->numTableShards;
397        } else {
398            $tableIndex = null;
399        }
400
401        return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ];
402    }
403
404    /**
405     * Get the table name for a given shard index
406     * @param int|null $index
407     * @return string
408     */
409    private function getTableNameByShard( $index ) {
410        if ( $index !== null && $this->numTableShards > 1 ) {
411            $decimals = strlen( (string)( $this->numTableShards - 1 ) );
412
413            return $this->tableName . sprintf( "%0{$decimals}d", $index );
414        }
415
416        return $this->tableName;
417    }
418
419    /**
420     * @param string[] $keys
421     * @param bool $getCasToken Whether to get a CAS token
422     * @return array<string,array|null> Order-preserved map of (key => (value,expiry,token) or null)
423     */
424    private function fetchBlobs( array $keys, bool $getCasToken = false ) {
425        /** @noinspection PhpUnusedLocalVariableInspection */
426        $silenceScope = $this->silenceTransactionProfiler();
427
428        // Initialize order-preserved per-key results; set values for live keys below
429        $dataByKey = array_fill_keys( $keys, null );
430
431        $readTime = (int)$this->getCurrentTime();
432        $keysByTableByShard = [];
433        foreach ( $keys as $key ) {
434            [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key );
435            $keysByTableByShard[$shardIndex][$partitionTable][] = $key;
436        }
437
438        foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) {
439            try {
440                $db = $this->getConnection( $shardIndex );
441                foreach ( $serverKeys as $partitionTable => $tableKeys ) {
442                    $res = $db->newSelectQueryBuilder()
443                        ->select(
444                            $getCasToken
445                            ? $this->addCasTokenFields( $db, [ 'keyname', 'value', 'exptime' ] )
446                            : [ 'keyname', 'value', 'exptime' ] )
447                        ->from( $partitionTable )
448                        ->where( $this->buildExistenceConditions( $db, $tableKeys, $readTime ) )
449                        ->caller( __METHOD__ )
450                        ->fetchResultSet();
451                    foreach ( $res as $row ) {
452                        $row->shardIndex = $shardIndex;
453                        $row->tableName = $partitionTable;
454                        $dataByKey[$row->keyname] = $row;
455                    }
456                }
457            } catch ( DBError $e ) {
458                $this->handleDBError( $e, $shardIndex );
459            }
460        }
461
462        foreach ( $keys as $key ) {
463            $row = $dataByKey[$key] ?? null;
464            if ( !$row ) {
465                continue;
466            }
467
468            $this->debug( __METHOD__ . ": retrieved $key; expiry time is {$row->exptime}" );
469            try {
470                $db = $this->getConnection( $row->shardIndex );
471                $dataByKey[$key] = [
472                    self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ),
473                    self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ),
474                    self::BLOB_CASTOKEN => $getCasToken
475                        ? $this->getCasTokenFromRow( $db, $row )
476                        : null
477                ];
478            } catch ( DBQueryError $e ) {
479                $this->handleDBError( $e, $row->shardIndex );
480            }
481        }
482
483        return $dataByKey;
484    }
485
486    /**
487     * @param callable $tableWriteCallback Callback the takes the following arguments:
488     *  - IDatabase instance
489     *  - Partition table name string
490     *  - UNIX modification timestamp
491     *  - Map of (key => list of arguments) for keys belonging to the server/table partition
492     *  - Map of (key => result) [returned]
493     * @param float $mtime UNIX modification timestamp
494     * @param array<string,array> $argsByKey Map of (key => list of arguments)
495     * @param array<string,mixed> &$resByKey Order-preserved map of (key => result) [returned]
496     * @return bool Whether all keys were processed
497     * @param-taint $argsByKey none
498     */
499    private function modifyBlobs(
500        callable $tableWriteCallback,
501        float $mtime,
502        array $argsByKey,
503        &$resByKey = []
504    ) {
505        // Initialize order-preserved per-key results; callbacks mark successful results
506        $resByKey = array_fill_keys( array_keys( $argsByKey ), false );
507
508        /** @noinspection PhpUnusedLocalVariableInspection */
509        $silenceScope = $this->silenceTransactionProfiler();
510
511        $argsByKeyByTableByShard = [];
512        foreach ( $argsByKey as $key => $args ) {
513            [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key );
514            $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args;
515        }
516
517        $shardIndexesAffected = [];
518        foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) {
519            foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) {
520                try {
521                    $db = $this->getConnection( $shardIndex );
522                    $shardIndexesAffected[] = $shardIndex;
523                    $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey );
524                } catch ( DBError $e ) {
525                    $this->handleDBError( $e, $shardIndex );
526                    continue;
527                }
528            }
529        }
530
531        $success = !in_array( false, $resByKey, true );
532
533        foreach ( $shardIndexesAffected as $shardIndex ) {
534            try {
535                if (
536                    // Random purging is enabled
537                    $this->purgePeriod &&
538                    // Only purge on one in every $this->purgePeriod writes
539                    mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
540                    // Avoid repeating the delete within a few seconds
541                    ( $this->getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC
542                ) {
543                    $this->garbageCollect( $shardIndex );
544                }
545            } catch ( DBError $e ) {
546                $this->handleDBError( $e, $shardIndex );
547            }
548        }
549
550        return $success;
551    }
552
553    /**
554     * Set key/value pairs belonging to a partition table on the given server
555     *
556     * In multi-primary mode, if the current row for a key exists and has a modification token
557     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
558     * then the write to that key will be aborted with a "false" result. Successfully modified
559     * key rows will be assigned a new modification token using the provided timestamp.
560     *
561     * @param IDatabase $db Handle to the database server where the argument keys belong
562     * @param string $ptable Name of the partition table where the argument keys belong
563     * @param float $mtime UNIX modification timestamp
564     * @param array<string,array> $argsByKey Non-empty (key => (value,exptime)) map
565     * @param array<string,mixed> &$resByKey Map of (key => result) for successful writes [returned]
566     * @throws DBError
567     */
568    private function modifyTableSpecificBlobsForSet(
569        IDatabase $db,
570        string $ptable,
571        float $mtime,
572        array $argsByKey,
573        array &$resByKey
574    ) {
575        $valueSizesByKey = [];
576
577        $mt = $this->makeTimestampedModificationToken( $mtime, $db );
578
579        $rows = [];
580        foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
581            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
582            $serialValue = $this->getSerialized( $value, $key );
583            $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
584
585            $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
586        }
587
588        if ( $this->multiPrimaryMode ) {
589            $db->newInsertQueryBuilder()
590                ->insertInto( $ptable )
591                ->rows( $rows )
592                ->onDuplicateKeyUpdate()
593                ->uniqueIndexFields( [ 'keyname' ] )
594                ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) )
595                ->caller( __METHOD__ )->execute();
596        } else {
597            // T288998: use REPLACE, if possible, to avoid cluttering the binlogs
598            $db->newReplaceQueryBuilder()
599                ->replaceInto( $ptable )
600                ->rows( $rows )
601                ->uniqueIndexFields( [ 'keyname' ] )
602                ->caller( __METHOD__ )->execute();
603        }
604
605        foreach ( $argsByKey as $key => $unused ) {
606            $resByKey[$key] = true;
607        }
608
609        $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
610    }
611
612    /**
613     * Purge/tombstone key/value pairs belonging to a partition table on the given server
614     *
615     * In multi-primary mode, if the current row for a key exists and has a modification token
616     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
617     * then the write to that key will be aborted with a "false" result. Successfully modified
618     * key rows will be assigned a new modification token/timestamp, an empty value, and an
619     * expiration timestamp dated slightly before the new modification timestamp.
620     *
621     * @param IDatabase $db Handle to the database server where the argument keys belong
622     * @param string $ptable Name of the partition table where the argument keys belong
623     * @param float $mtime UNIX modification timestamp
624     * @param array<string,array> $argsByKey Non-empty (key => []) map
625     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
626     * @throws DBError
627     */
628    private function modifyTableSpecificBlobsForDelete(
629        IDatabase $db,
630        string $ptable,
631        float $mtime,
632        array $argsByKey,
633        array &$resByKey
634    ) {
635        if ( $this->multiPrimaryMode ) {
636            // Tombstone keys in order to respect eventual consistency
637            $mt = $this->makeTimestampedModificationToken( $mtime, $db );
638            $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (int)$mtime );
639            $queryBuilder = $db->newInsertQueryBuilder()
640                ->insertInto( $ptable )
641                ->onDuplicateKeyUpdate()
642                ->uniqueIndexFields( [ 'keyname' ] )
643                ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) );
644            foreach ( $argsByKey as $key => $arg ) {
645                $queryBuilder->row( $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt ) );
646            }
647            $queryBuilder->caller( __METHOD__ )->execute();
648        } else {
649            // Just purge the keys since there is only one primary (e.g. "source of truth")
650            $db->newDeleteQueryBuilder()
651                ->deleteFrom( $ptable )
652                ->where( [ 'keyname' => array_keys( $argsByKey ) ] )
653                ->caller( __METHOD__ )->execute();
654        }
655
656        foreach ( $argsByKey as $key => $arg ) {
657            $resByKey[$key] = true;
658        }
659
660        $this->updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) );
661    }
662
663    /**
664     * Insert key/value pairs belonging to a partition table on the given server
665     *
666     * If the current row for a key exists and has an integral UNIX timestamp of expiration
667     * greater than that of the provided modification timestamp, then the write to that key
668     * will be aborted with a "false" result. Acquisition of advisory key locks must be handled
669     * by calling functions.
670     *
671     * In multi-primary mode, if the current row for a key exists and has a modification token
672     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
673     * then the write to that key will be aborted with a "false" result. Successfully modified
674     * key rows will be assigned a new modification token/timestamp.
675     *
676     * @param IDatabase $db Handle to the database server where the argument keys belong
677     * @param string $ptable Name of the partition table where the argument keys belong
678     * @param float $mtime UNIX modification timestamp
679     * @param array<string,array> $argsByKey Non-empty (key => (value,exptime)) map
680     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
681     * @throws DBError
682     */
683    private function modifyTableSpecificBlobsForAdd(
684        IDatabase $db,
685        string $ptable,
686        float $mtime,
687        array $argsByKey,
688        array &$resByKey
689    ) {
690        $valueSizesByKey = [];
691
692        $mt = $this->makeTimestampedModificationToken( $mtime, $db );
693
694        // This check must happen outside the write query to respect eventual consistency
695        $existingKeys = $db->newSelectQueryBuilder()
696            ->select( 'keyname' )
697            ->from( $ptable )
698            ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
699            ->caller( __METHOD__ )
700            ->fetchFieldValues();
701        $existingByKey = array_fill_keys( $existingKeys, true );
702
703        $rows = [];
704        foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
705            if ( isset( $existingByKey[$key] ) ) {
706                $this->logger->debug( __METHOD__ . "$key already exists" );
707                continue;
708            }
709
710            $serialValue = $this->getSerialized( $value, $key );
711            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
712            $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
713            $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
714        }
715        if ( !$rows ) {
716            return;
717        }
718        $db->newInsertQueryBuilder()
719            ->insertInto( $ptable )
720            ->rows( $rows )
721            ->onDuplicateKeyUpdate()
722            ->uniqueIndexFields( [ 'keyname' ] )
723            ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) )
724            ->caller( __METHOD__ )->execute();
725
726        foreach ( $argsByKey as $key => $unused ) {
727            $resByKey[$key] = !isset( $existingByKey[$key] );
728        }
729
730        $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey );
731    }
732
733    /**
734     * Insert key/value pairs belonging to a partition table on the given server
735     *
736     * If the current row for a key exists, has an integral UNIX timestamp of expiration greater
737     * than that of the provided modification timestamp, and the CAS token does not match, then
738     * the write to that key will be aborted with a "false" result. Acquisition of advisory key
739     * locks must be handled by calling functions.
740     *
741     * In multi-primary mode, if the current row for a key exists and has a modification token
742     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
743     * then the write to that key will be aborted with a "false" result. Successfully modified
744     * key rows will be assigned a new modification token/timestamp.
745     *
746     * @param IDatabase $db Handle to the database server where the argument keys belong
747     * @param string $ptable Name of the partition table where the argument keys belong
748     * @param float $mtime UNIX modification timestamp
749     * @param array<string,array> $argsByKey Non-empty (key => (value, exptime, CAS token)) map
750     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
751     * @throws DBError
752     */
753    private function modifyTableSpecificBlobsForCas(
754        IDatabase $db,
755        string $ptable,
756        float $mtime,
757        array $argsByKey,
758        array &$resByKey
759    ) {
760        $valueSizesByKey = [];
761
762        $mt = $this->makeTimestampedModificationToken( $mtime, $db );
763
764        // This check must happen outside the write query to respect eventual consistency
765        $res = $db->newSelectQueryBuilder()
766            ->select( $this->addCasTokenFields( $db, [ 'keyname' ] ) )
767            ->from( $ptable )
768            ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
769            ->caller( __METHOD__ )
770            ->fetchResultSet();
771
772        $curTokensByKey = [];
773        foreach ( $res as $row ) {
774            $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row );
775        }
776
777        $nonMatchingByKey = [];
778        $rows = [];
779        foreach ( $argsByKey as $key => [ $value, $exptime, $casToken ] ) {
780            $curToken = $curTokensByKey[$key] ?? null;
781            if ( $curToken === null ) {
782                $nonMatchingByKey[$key] = true;
783                $this->logger->debug( __METHOD__ . "$key does not exists" );
784                continue;
785            }
786
787            if ( $curToken !== $casToken ) {
788                $nonMatchingByKey[$key] = true;
789                $this->logger->debug( __METHOD__ . "$key does not have a matching token" );
790                continue;
791            }
792
793            $serialValue = $this->getSerialized( $value, $key );
794            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
795            $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
796
797            $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
798        }
799        if ( !$rows ) {
800            return;
801        }
802        $db->newInsertQueryBuilder()
803            ->insertInto( $ptable )
804            ->rows( $rows )
805            ->onDuplicateKeyUpdate()
806            ->uniqueIndexFields( [ 'keyname' ] )
807            ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) )
808            ->caller( __METHOD__ )->execute();
809
810        foreach ( $argsByKey as $key => $unused ) {
811            $resByKey[$key] = !isset( $nonMatchingByKey[$key] );
812        }
813
814        $this->updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey );
815    }
816
817    /**
818     * Update the TTL for keys belonging to a partition table on the given server
819     *
820     * If no current row for a key exists or the current row has an integral UNIX timestamp of
821     * expiration less than that of the provided modification timestamp, then the write to that
822     * key will be aborted with a "false" result.
823     *
824     * In multi-primary mode, if the current row for a key exists and has a modification token
825     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
826     * then the write to that key will be aborted with a "false" result. Successfully modified
827     * key rows will be assigned a new modification token/timestamp.
828     *
829     * @param IDatabase $db Handle to the database server where the argument keys belong
830     * @param string $ptable Name of the partition table where the argument keys belong
831     * @param float $mtime UNIX modification timestamp
832     * @param array<string,array> $argsByKey Non-empty (key => (exptime)) map
833     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
834     * @throws DBError
835     */
836    private function modifyTableSpecificBlobsForChangeTTL(
837        IDatabase $db,
838        string $ptable,
839        float $mtime,
840        array $argsByKey,
841        array &$resByKey
842    ) {
843        if ( $this->multiPrimaryMode ) {
844            $mt = $this->makeTimestampedModificationToken( $mtime, $db );
845
846            $res = $db->newSelectQueryBuilder()
847                ->select( [ 'keyname', 'value' ] )
848                ->from( $ptable )
849                ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
850                ->caller( __METHOD__ )
851                ->fetchResultSet();
852
853            $rows = [];
854            $existingKeys = [];
855            foreach ( $res as $curRow ) {
856                $key = $curRow->keyname;
857                $existingKeys[$key] = true;
858                $serialValue = $this->dbDecodeSerialValue( $db, $curRow->value );
859                [ $exptime ] = $argsByKey[$key];
860                $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
861                $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
862            }
863            if ( !$rows ) {
864                return;
865            }
866            $db->newInsertQueryBuilder()
867                ->insertInto( $ptable )
868                ->rows( $rows )
869                ->onDuplicateKeyUpdate()
870                ->uniqueIndexFields( [ 'keyname' ] )
871                ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) )
872                ->caller( __METHOD__ )->execute();
873
874            foreach ( $argsByKey as $key => $unused ) {
875                $resByKey[$key] = isset( $existingKeys[$key] );
876            }
877        } else {
878            $keysBatchesByExpiry = [];
879            foreach ( $argsByKey as $key => [ $exptime ] ) {
880                $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
881                $keysBatchesByExpiry[$expiry][] = $key;
882            }
883
884            $existingCount = 0;
885            foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) {
886                $db->newUpdateQueryBuilder()
887                    ->update( $ptable )
888                    ->set( [ 'exptime' => $this->encodeDbExpiry( $db, $expiry ) ] )
889                    ->where( $this->buildExistenceConditions( $db, $keyBatch, (int)$mtime ) )
890                    ->caller( __METHOD__ )->execute();
891                $existingCount += $db->affectedRows();
892            }
893            if ( $existingCount === count( $argsByKey ) ) {
894                foreach ( $argsByKey as $key => $args ) {
895                    $resByKey[$key] = true;
896                }
897            }
898        }
899
900        $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) );
901    }
902
903    /**
904     * Either increment a counter key, if it exists, or initialize it, otherwise
905     *
906     * If no current row for a key exists or the current row has an integral UNIX timestamp of
907     * expiration less than that of the provided modification timestamp, then the key row will
908     * be set to the initial value. Otherwise, the current row will be incremented.
909     *
910     * In multi-primary mode, if the current row for a key exists and has a modification token
911     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
912     * then the write to that key will be aborted with a "false" result. Successfully initialized
913     * key rows will be assigned a new modification token/timestamp.
914     *
915     * @param IDatabase $db Handle to the database server where the argument keys belong
916     * @param string $ptable Name of the partition table where the argument keys belong
917     * @param float $mtime UNIX modification timestamp
918     * @param array<string,array> $argsByKey Non-empty (key => (step, init, exptime) map
919     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
920     * @throws DBError
921     */
922    private function modifyTableSpecificBlobsForIncrInit(
923        IDatabase $db,
924        string $ptable,
925        float $mtime,
926        array $argsByKey,
927        array &$resByKey
928    ) {
929        foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) {
930            $mt = $this->makeTimestampedModificationToken( $mtime, $db );
931            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
932
933            // Use a transaction so that changes from other threads are not visible due to
934            // "consistent reads". This way, the exact post-increment value can be returned.
935            // The "live key exists" check can go inside the write query and remain safe for
936            // replication since the TTL for such keys is either indefinite or very short.
937            $atomic = $db->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
938            try {
939                $db->newInsertQueryBuilder()
940                    ->insertInto( $ptable )
941                    ->rows( $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ) )
942                    ->onDuplicateKeyUpdate()
943                    ->uniqueIndexFields( [ 'keyname' ] )
944                    ->set( $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ) )
945                    ->caller( __METHOD__ )->execute();
946                $affectedCount = $db->affectedRows();
947                $row = $db->newSelectQueryBuilder()
948                    ->select( 'value' )
949                    ->from( $ptable )
950                    ->where( [ 'keyname' => $key ] )
951                    ->caller( __METHOD__ )
952                    ->fetchRow();
953            } catch ( Exception $e ) {
954                $db->cancelAtomic( __METHOD__, $atomic );
955                throw $e;
956            }
957            $db->endAtomic( __METHOD__ );
958
959            if ( !$affectedCount || $row === false ) {
960                $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
961                continue;
962            }
963
964            $serialValue = $this->dbDecodeSerialValue( $db, $row->value );
965            if ( !$this->isInteger( $serialValue ) ) {
966                $this->logger->warning( __METHOD__ . ": got non-integer $key value" );
967                continue;
968            }
969
970            $resByKey[$key] = (int)$serialValue;
971        }
972
973        $this->updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) );
974    }
975
976    /**
977     * Same as modifyTableSpecificBlobsForIncrInit() but does not return the
978     * new value.
979     *
980     * @param IDatabase $db
981     * @param string $ptable
982     * @param float $mtime
983     * @param array<string,array> $argsByKey
984     * @param array<string,mixed> &$resByKey
985     * @throws DBError
986     */
987    private function modifyTableSpecificBlobsForIncrInitAsync(
988        IDatabase $db,
989        string $ptable,
990        float $mtime,
991        array $argsByKey,
992        array &$resByKey
993    ) {
994        foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) {
995            $mt = $this->makeTimestampedModificationToken( $mtime, $db );
996            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
997            $db->newInsertQueryBuilder()
998                ->insertInto( $ptable )
999                ->rows( $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ) )
1000                ->onDuplicateKeyUpdate()
1001                ->uniqueIndexFields( [ 'keyname' ] )
1002                ->set( $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ) )
1003                ->caller( __METHOD__ )->execute();
1004            if ( !$db->affectedRows() ) {
1005                $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
1006            } else {
1007                $resByKey[$key] = true;
1008            }
1009        }
1010    }
1011
1012    /**
1013     * @param int $exptime Relative or absolute expiration
1014     * @param int $nowTsUnix Current UNIX timestamp
1015     * @return int UNIX timestamp or TTL_INDEFINITE
1016     */
1017    private function makeNewKeyExpiry( $exptime, int $nowTsUnix ) {
1018        $expiry = $this->getExpirationAsTimestamp( $exptime );
1019        // Eventual consistency requires the preservation of recently modified keys.
1020        // Do not create rows with `exptime` fields so low that they might get garbage
1021        // collected before being replicated.
1022        if ( $expiry !== self::TTL_INDEFINITE ) {
1023            $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC );
1024        }
1025
1026        return $expiry;
1027    }
1028
1029    /**
1030     * Get a scoped lock and modification timestamp for a critical section of reads/writes
1031     *
1032     * This is used instead of BagOStuff::getCurrentTime() for certain writes (such as "add",
1033     * "incr", and "cas"), for which we want to support tight race conditions where the same
1034     * key is repeatedly written to by multiple web servers that each get to see the previous
1035     * value, act on it, and modify it in some way.
1036     *
1037     * It is assumed that this method is normally only invoked from the primary datacenter.
1038     * A lock is acquired on the primary server of the local datacenter in order to avoid race
1039     * conditions within the critical section. The clock on the SQL server is used to get the
1040     * modification timestamp in order to minimize issues with clock drift between web servers;
1041     * thus key writes will not be rejected due to some web servers having lagged clocks.
1042     *
1043     * @param string $key
1044     * @param ?ScopedCallback &$scope Unlocker callback; null on failure [returned]
1045     * @return float|null UNIX timestamp with 6 decimal places; null on failure
1046     */
1047    private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) {
1048        if ( !$this->lock( $key, 0 ) ) {
1049            return null;
1050        }
1051
1052        $scope = new ScopedCallback( function () use ( $key ) {
1053            $this->unlock( $key );
1054        } );
1055
1056        // sprintf is used to adjust precision
1057        return (float)sprintf( '%.6F', $this->locks[$key][self::LOCK_TIME] );
1058    }
1059
1060    /**
1061     * Make a `modtoken` column value with the original time and source database server of a write
1062     *
1063     * @param float $mtime UNIX modification timestamp
1064     * @param IDatabase $db Handle to the primary database server sourcing the write
1065     * @return string String of the form "<SECONDS_SOURCE><MICROSECONDS>", where SECONDS_SOURCE
1066     *  is "<35 bit seconds portion of UNIX time><32 bit database server ID>" as 13 base 36 chars,
1067     *  and MICROSECONDS is "<20 bit microseconds portion of UNIX time>" as 4 base 36 chars
1068     */
1069    private function makeTimestampedModificationToken( float $mtime, IDatabase $db ) {
1070        // We have reserved space for upto 6 digits in the microsecond portion of the token.
1071        // This is for future use only (maybe CAS tokens) and not currently used.
1072        // It is currently populated by the microsecond portion returned by microtime,
1073        // which generally has fewer than 6 digits of meaningful precision but can still be useful
1074        // in debugging (to see the token continuously change even during rapid testing).
1075        $seconds = (int)$mtime;
1076        [ , $microseconds ] = explode( '.', sprintf( '%.6F', $mtime ) );
1077
1078        $id = $db->getTopologyBasedServerId() ?? sprintf( '%u', crc32( $db->getServerName() ) );
1079
1080        $token = implode( '', [
1081            // 67 bit integral portion of UNIX timestamp, qualified
1082            \Wikimedia\base_convert(
1083                // 35 bit integral seconds portion of UNIX timestamp
1084                str_pad( base_convert( (string)$seconds, 10, 2 ), 35, '0', STR_PAD_LEFT ) .
1085                // 32 bit ID of the primary database server handling the write
1086                str_pad( base_convert( $id, 10, 2 ), 32, '0', STR_PAD_LEFT ),
1087                2,
1088                36,
1089                13
1090            ),
1091            // 20 bit fractional portion of UNIX timestamp, as integral microseconds
1092            str_pad( base_convert( $microseconds, 10, 36 ), 4, '0', STR_PAD_LEFT )
1093        ] );
1094
1095        if ( strlen( $token ) !== 17 ) {
1096            throw new RuntimeException( "Modification timestamp overflow detected" );
1097        }
1098
1099        return $token;
1100    }
1101
1102    /**
1103     * WHERE conditions that check for existence and liveness of keys
1104     *
1105     * @param IDatabase $db
1106     * @param string[]|string $keys
1107     * @param int $time UNIX modification timestamp
1108     * @return array
1109     */
1110    private function buildExistenceConditions( IDatabase $db, $keys, int $time ) {
1111        // Note that tombstones always have past expiration dates
1112        return [
1113            'keyname' => $keys,
1114            $db->expr( 'exptime', '>=', $db->timestamp( $time ) )
1115        ];
1116    }
1117
1118    /**
1119     * INSERT array for handling key writes/overwrites when no live nor stale key exists
1120     *
1121     * @param IDatabase $db
1122     * @param string $key
1123     * @param string|int $serialValue New value
1124     * @param int $expiry Expiration timestamp or TTL_INDEFINITE
1125     * @param string $mt Modification token
1126     * @return array
1127     */
1128    private function buildUpsertRow(
1129        IDatabase $db,
1130        $key,
1131        $serialValue,
1132        int $expiry,
1133        string $mt
1134    ) {
1135        $row = [
1136            'keyname' => $key,
1137            'value'   => $this->dbEncodeSerialValue( $db, $serialValue ),
1138            'exptime' => $this->encodeDbExpiry( $db, $expiry )
1139        ];
1140        if ( $this->multiPrimaryMode ) {
1141            $row['modtoken'] = $mt;
1142        }
1143
1144        return $row;
1145    }
1146
1147    /**
1148     * SET array for handling key overwrites when a live or stale key exists
1149     *
1150     * @param IDatabase $db
1151     * @param string $mt Modification token
1152     * @return array
1153     */
1154    private function buildMultiUpsertSetForOverwrite( IDatabase $db, string $mt ) {
1155        $expressionsByColumn = [
1156            'value'   => $db->buildExcludedValue( 'value' ),
1157            'exptime' => $db->buildExcludedValue( 'exptime' )
1158        ];
1159
1160        $set = [];
1161        if ( $this->multiPrimaryMode ) {
1162            // The query might take a while to replicate, during which newer values might get
1163            // written. Qualify the query so that it does not override such values. Note that
1164            // duplicate tokens generated around the same time for a key should still result
1165            // in convergence given the use of server_id in modtoken (providing a total order
1166            // among primary DB servers) and MySQL binlog ordering (providing a total order
1167            // for writes replicating from a given primary DB server).
1168            $expressionsByColumn['modtoken'] = $db->addQuotes( $mt );
1169            foreach ( $expressionsByColumn as $column => $updateExpression ) {
1170                $rhs = $db->conditional(
1171                    $db->addQuotes( substr( $mt, 0, 13 ) ) . ' >= ' .
1172                        $db->buildSubString( 'modtoken', 1, 13 ),
1173                    $updateExpression,
1174                    $column
1175                );
1176                $set[] = "{$column}=" . trim( $rhs );
1177            }
1178        } else {
1179            foreach ( $expressionsByColumn as $column => $updateExpression ) {
1180                $set[] = "{$column}={$updateExpression}";
1181            }
1182        }
1183
1184        return $set;
1185    }
1186
1187    /**
1188     * SET array for handling key overwrites when a live or stale key exists
1189     *
1190     * @param IDatabase $db
1191     * @param int $step Positive counter incrementation value
1192     * @param int $init Positive initial counter value
1193     * @param int $expiry Expiration timestamp or TTL_INDEFINITE
1194     * @param string $mt Modification token
1195     * @param int $mtUnixTs UNIX timestamp of modification token
1196     * @return array
1197     */
1198    private function buildIncrUpsertSet(
1199        IDatabase $db,
1200        int $step,
1201        int $init,
1202        int $expiry,
1203        string $mt,
1204        int $mtUnixTs
1205    ) {
1206        // Map of (column => (SQL for non-expired key rows, SQL for expired key rows))
1207        $expressionsByColumn = [
1208            'value'   => [
1209                $db->buildIntegerCast( 'value' ) . " + {$db->addQuotes( $step )}",
1210                $db->addQuotes( $this->dbEncodeSerialValue( $db, $init ) )
1211            ],
1212            'exptime' => [
1213                'exptime',
1214                $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
1215            ]
1216        ];
1217        if ( $this->multiPrimaryMode ) {
1218            $expressionsByColumn['modtoken'] = [ 'modtoken', $db->addQuotes( $mt ) ];
1219        }
1220
1221        $set = [];
1222        foreach ( $expressionsByColumn as $column => [ $updateExpression, $initExpression ] ) {
1223            $rhs = $db->conditional(
1224                $db->expr( 'exptime', '>=', $db->timestamp( $mtUnixTs ) ),
1225                $updateExpression,
1226                $initExpression
1227            );
1228            $set[] = "{$column}=" . trim( $rhs );
1229        }
1230
1231        return $set;
1232    }
1233
1234    /**
1235     * @param IDatabase $db
1236     * @param int $expiry UNIX timestamp of expiration or TTL_INDEFINITE
1237     * @return string
1238     */
1239    private function encodeDbExpiry( IDatabase $db, int $expiry ) {
1240        return ( $expiry === self::TTL_INDEFINITE )
1241            // Use the maximum timestamp that the column can store
1242            ? $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER )
1243            // Convert the absolute timestamp into the DB timestamp format
1244            : $db->timestamp( $expiry );
1245    }
1246
1247    /**
1248     * @param IDatabase $db
1249     * @param string $dbExpiry DB timestamp of expiration
1250     * @return int UNIX timestamp of expiration or TTL_INDEFINITE
1251     */
1252    private function decodeDbExpiry( IDatabase $db, string $dbExpiry ) {
1253        return ( $dbExpiry === $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) )
1254            ? self::TTL_INDEFINITE
1255            : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry );
1256    }
1257
1258    /**
1259     * @param IDatabase $db
1260     * @param string|int $serialValue
1261     * @return string
1262     */
1263    private function dbEncodeSerialValue( IDatabase $db, $serialValue ) {
1264        return is_int( $serialValue ) ? (string)$serialValue : $db->encodeBlob( $serialValue );
1265    }
1266
1267    /**
1268     * @param IDatabase $db
1269     * @param Blob|string|int $blob
1270     * @return string|int
1271     */
1272    private function dbDecodeSerialValue( IDatabase $db, $blob ) {
1273        return $this->isInteger( $blob ) ? (int)$blob : $db->decodeBlob( $blob );
1274    }
1275
1276    /**
1277     * Either append a 'castoken' field or append the fields needed to compute the CAS token
1278     *
1279     * @param IDatabase $db
1280     * @param string[] $fields SELECT field array
1281     * @return string[] SELECT field array
1282     */
1283    private function addCasTokenFields( IDatabase $db, array $fields ) {
1284        $type = $db->getType();
1285
1286        if ( $type === 'mysql' ) {
1287            $fields['castoken'] = $db->buildConcat( [
1288                'SHA1(value)',
1289                $db->addQuotes( '@' ),
1290                'exptime'
1291            ] );
1292        } elseif ( $type === 'postgres' ) {
1293            $fields['castoken'] = $db->buildConcat( [
1294                'md5(value)',
1295                $db->addQuotes( '@' ),
1296                'exptime'
1297            ] );
1298        } else {
1299            if ( !in_array( 'value', $fields, true ) ) {
1300                $fields[] = 'value';
1301            }
1302            if ( !in_array( 'exptime', $fields, true ) ) {
1303                $fields[] = 'exptime';
1304            }
1305        }
1306
1307        return $fields;
1308    }
1309
1310    /**
1311     * Get a CAS token from a SELECT result row
1312     *
1313     * @param IDatabase $db
1314     * @param stdClass $row A row for a key
1315     * @return string CAS token
1316     */
1317    private function getCasTokenFromRow( IDatabase $db, stdClass $row ) {
1318        if ( isset( $row->castoken ) ) {
1319            $token = $row->castoken;
1320        } else {
1321            $token = sha1( $this->dbDecodeSerialValue( $db, $row->value ) ) . '@' . $row->exptime;
1322            $this->logger->debug( __METHOD__ . ": application computed hash for CAS token" );
1323        }
1324
1325        return $token;
1326    }
1327
1328    /**
1329     * @param int $shardIndex
1330     * @throws DBError
1331     */
1332    private function garbageCollect( $shardIndex ) {
1333        // set right away, avoid queuing duplicate async callbacks
1334        $this->lastGarbageCollect = $this->getCurrentTime();
1335
1336        $garbageCollector = function () use ( $shardIndex ) {
1337            $db = $this->getConnection( $shardIndex );
1338            /** @noinspection PhpUnusedLocalVariableInspection */
1339            $silenceScope = $this->silenceTransactionProfiler();
1340            $this->deleteServerObjectsExpiringBefore(
1341                $db,
1342                (int)$this->getCurrentTime(),
1343                $this->purgeLimit
1344            );
1345            $this->lastGarbageCollect = $this->getCurrentTime();
1346        };
1347
1348        if ( $this->asyncHandler ) {
1349            ( $this->asyncHandler )( $garbageCollector );
1350        } else {
1351            $garbageCollector();
1352        }
1353    }
1354
1355    /**
1356     * @deprecated since 1.41, use deleteObjectsExpiringBefore() instead
1357     */
1358    public function expireAll() {
1359        wfDeprecated( __METHOD__, '1.41' );
1360        $this->deleteObjectsExpiringBefore( (int)$this->getCurrentTime() );
1361    }
1362
1363    public function deleteObjectsExpiringBefore(
1364        $timestamp,
1365        callable $progress = null,
1366        $limit = INF,
1367        string $tag = null
1368    ) {
1369        /** @noinspection PhpUnusedLocalVariableInspection */
1370        $silenceScope = $this->silenceTransactionProfiler();
1371
1372        if ( $tag !== null ) {
1373            // Purge one server only, to support concurrent purging in large wiki farms (T282761).
1374            $shardIndexes = [];
1375            if ( !$this->serverTags ) {
1376                throw new InvalidArgumentException( "Given a tag but no tags are configured" );
1377            }
1378            foreach ( $this->serverTags as $serverShardIndex => $serverTag ) {
1379                if ( $tag === $serverTag ) {
1380                    $shardIndexes[] = $serverShardIndex;
1381                    break;
1382                }
1383            }
1384            if ( !$shardIndexes ) {
1385                throw new InvalidArgumentException( "Unknown server tag: $tag" );
1386            }
1387        } else {
1388            $shardIndexes = $this->getShardServerIndexes();
1389            shuffle( $shardIndexes );
1390        }
1391
1392        $ok = true;
1393        $numServers = count( $shardIndexes );
1394
1395        $keysDeletedCount = 0;
1396        foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
1397            try {
1398                $db = $this->getConnection( $shardIndex );
1399
1400                // Avoid deadlock (T330377)
1401                $lockKey = "SqlBagOStuff-purge-shard:$shardIndex";
1402                if ( !$db->lock( $lockKey, __METHOD__, 0 ) ) {
1403                    $this->logger->info( "SqlBagOStuff purge for shard $shardIndex already locked, skip" );
1404                    continue;
1405                }
1406
1407                $this->deleteServerObjectsExpiringBefore(
1408                    $db,
1409                    $timestamp,
1410                    $limit,
1411                    $keysDeletedCount,
1412                    [ 'fn' => $progress, 'serversDone' => $numServersDone, 'serversTotal' => $numServers ]
1413                );
1414                $db->unlock( $lockKey, __METHOD__ );
1415            } catch ( DBError $e ) {
1416                $this->handleDBError( $e, $shardIndex );
1417                $ok = false;
1418            }
1419        }
1420
1421        return $ok;
1422    }
1423
1424    /**
1425     * @param IDatabase $db
1426     * @param string|int $timestamp
1427     * @param int|float $limit Maximum number of rows to delete in total or INF for no limit
1428     * @param int &$keysDeletedCount
1429     * @param array|null $progress
1430     * @phan-param array{fn:?callback,serversDone:int,serversTotal:int}|null $progress
1431     * @throws DBError
1432     */
1433    private function deleteServerObjectsExpiringBefore(
1434        IDatabase $db,
1435        $timestamp,
1436        $limit,
1437        &$keysDeletedCount = 0,
1438        array $progress = null
1439    ) {
1440        $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
1441        if ( $this->multiPrimaryMode ) {
1442            // Eventual consistency requires the preservation of any key that was recently
1443            // modified. The key must exist on this database server long enough for the server
1444            // to receive, via replication, all writes to the key with lower timestamps. Such
1445            // writes should be no-ops since the existing key value should "win". If the network
1446            // partitions between datacenters A and B for 30 minutes, the database servers in
1447            // each datacenter will see an initial burst of writes with "old" timestamps via
1448            // replication. This might include writes with lower timestamps that the existing
1449            // key value. Therefore, clock skew and replication delay are both factors.
1450            $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC;
1451            $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe );
1452        }
1453        $tableIndexes = range( 0, $this->numTableShards - 1 );
1454        shuffle( $tableIndexes );
1455
1456        $batchSize = min( $this->writeBatchSize, $limit );
1457
1458        foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
1459            // The oldest expiry of a row we have deleted on this shard
1460            // (the first row that we deleted)
1461            $minExpUnix = null;
1462            // The most recent expiry time so far, from a row we have deleted on this shard
1463            $maxExp = null;
1464            // Size of the time range we'll delete, in seconds (for progress estimate)
1465            $totalSeconds = null;
1466
1467            do {
1468                $res = $db->newSelectQueryBuilder()
1469                    ->select( [ 'keyname', 'exptime' ] )
1470                    ->from( $this->getTableNameByShard( $tableIndex ) )
1471                    ->where( $db->expr( 'exptime', '<', $db->timestamp( $cutoffUnix ) ) )
1472                    ->andWhere( $maxExp ? $db->expr( 'exptime', '>=', $maxExp ) : [] )
1473                    ->orderBy( 'exptime', SelectQueryBuilder::SORT_ASC )
1474                    ->limit( $batchSize )
1475                    ->caller( __METHOD__ )
1476                    ->fetchResultSet();
1477
1478                if ( $res->numRows() ) {
1479                    $row = $res->current();
1480                    if ( $minExpUnix === null ) {
1481                        $minExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
1482                        $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 );
1483                    }
1484
1485                    $keys = [];
1486                    foreach ( $res as $row ) {
1487                        $keys[] = $row->keyname;
1488                        $maxExp = $row->exptime;
1489                    }
1490
1491                    $db->newDeleteQueryBuilder()
1492                        ->deleteFrom( $this->getTableNameByShard( $tableIndex ) )
1493                        ->where( [
1494                            'keyname' => $keys,
1495                            $db->expr( 'exptime', '<', $db->timestamp( $cutoffUnix ) ),
1496                        ] )
1497                        ->caller( __METHOD__ )->execute();
1498                    $keysDeletedCount += $db->affectedRows();
1499                }
1500
1501                if ( $progress && is_callable( $progress['fn'] ) ) {
1502                    if ( $totalSeconds ) {
1503                        $maxExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $maxExp );
1504                        $remainingSeconds = $cutoffUnix - $maxExpUnix;
1505                        $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 );
1506                        // For example, if we've done 1.5 table shard, and are thus half-way on the
1507                        // 2nd of perhaps 5 tables on this server, then this might be:
1508                        // `( 1 + ( 43200 / 86400 ) ) / 5 = 0.3`, or 30% done, of tables on this server.
1509                        $tablesDoneRatio =
1510                            ( $numShardsDone + ( $processedSeconds / $totalSeconds ) ) / $this->numTableShards;
1511                    } else {
1512                        $tablesDoneRatio = 1;
1513                    }
1514
1515                    // For example, if we're 30% done on the last of 10 servers, then this might be:
1516                    // `( 9 / 10 ) + ( 0.3 / 10 ) = 0.93`, or 93% done, overall.
1517                    $overallRatio = ( $progress['serversDone'] / $progress['serversTotal'] ) +
1518                        ( $tablesDoneRatio / $progress['serversTotal'] );
1519                    ( $progress['fn'] )( $overallRatio * 100 );
1520                }
1521            } while ( $res->numRows() && $keysDeletedCount < $limit );
1522        }
1523    }
1524
1525    /**
1526     * Delete content of shard tables in every server.
1527     * Return true if the operation is successful, false otherwise.
1528     *
1529     * @deprecated since 1.41, unused.
1530     *
1531     * @return bool
1532     */
1533    public function deleteAll() {
1534        wfDeprecated( __METHOD__, '1.41' );
1535        /** @noinspection PhpUnusedLocalVariableInspection */
1536        $silenceScope = $this->silenceTransactionProfiler();
1537        foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1538            try {
1539                $db = $this->getConnection( $shardIndex );
1540                for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1541                    $db->newDeleteQueryBuilder()
1542                        ->deleteFrom( $this->getTableNameByShard( $i ) )
1543                        ->where( $db::ALL_ROWS )
1544                        ->caller( __METHOD__ )->execute();
1545                }
1546            } catch ( DBError $e ) {
1547                $this->handleDBError( $e, $shardIndex );
1548                return false;
1549            }
1550        }
1551        return true;
1552    }
1553
1554    public function doLock( $key, $timeout = 6, $exptime = 6 ) {
1555        /** @noinspection PhpUnusedLocalVariableInspection */
1556        $silenceScope = $this->silenceTransactionProfiler();
1557
1558        $lockTsUnix = null;
1559
1560        [ $shardIndex ] = $this->getKeyLocation( $key );
1561        try {
1562            $db = $this->getConnection( $shardIndex );
1563            $lockTsUnix = $db->lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP );
1564        } catch ( DBError $e ) {
1565            $this->handleDBError( $e, $shardIndex );
1566            $this->logger->warning(
1567                __METHOD__ . ' failed due to I/O error for {key}.',
1568                [ 'key' => $key ]
1569            );
1570        }
1571
1572        return $lockTsUnix;
1573    }
1574
1575    public function doUnlock( $key ) {
1576        /** @noinspection PhpUnusedLocalVariableInspection */
1577        $silenceScope = $this->silenceTransactionProfiler();
1578
1579        [ $shardIndex ] = $this->getKeyLocation( $key );
1580
1581        try {
1582            $db = $this->getConnection( $shardIndex );
1583            $released = $db->unlock( $key, __METHOD__ );
1584        } catch ( DBError $e ) {
1585            $this->handleDBError( $e, $shardIndex );
1586            $released = false;
1587        }
1588
1589        return $released;
1590    }
1591
1592    protected function makeKeyInternal( $keyspace, $components ) {
1593        // SQL schema for 'objectcache' specifies keys as varchar(255). From that,
1594        // subtract the number of characters we need for the keyspace and for
1595        // the separator character needed for each argument. To handle some
1596        // custom prefixes used by thing like WANObjectCache, limit to 205.
1597        $keyspace = strtr( $keyspace, ' ', '_' );
1598        $charsLeft = 205 - strlen( $keyspace ) - count( $components );
1599        foreach ( $components as &$component ) {
1600            $component = strtr( $component, [
1601                ' ' => '_', // Avoid unnecessary misses from pre-1.35 code
1602                ':' => '%3A',
1603            ] );
1604
1605            // 33 = 32 characters for the MD5 + 1 for the '#' prefix.
1606            if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) {
1607                $component = '#' . md5( $component );
1608            }
1609            $charsLeft -= strlen( $component );
1610        }
1611
1612        if ( $charsLeft < 0 ) {
1613            return $keyspace . ':BagOStuff-long-key:##' . md5( implode( ':', $components ) );
1614        }
1615        return $keyspace . ':' . implode( ':', $components );
1616    }
1617
1618    protected function requireConvertGenericKey(): bool {
1619        return true;
1620    }
1621
1622    protected function serialize( $value ) {
1623        if ( is_int( $value ) ) {
1624            return $value;
1625        }
1626
1627        $serial = serialize( $value );
1628        if ( $this->hasZlib ) {
1629            // On typical message and page data, this can provide a 3X storage savings
1630            $serial = gzdeflate( $serial );
1631        }
1632
1633        return $serial;
1634    }
1635
1636    protected function unserialize( $value ) {
1637        if ( $value === self::TOMB_SERIAL ) {
1638            return false; // tombstone
1639        }
1640
1641        if ( $this->isInteger( $value ) ) {
1642            return (int)$value;
1643        }
1644
1645        if ( $this->hasZlib ) {
1646            AtEase::suppressWarnings();
1647            $decompressed = gzinflate( $value );
1648            AtEase::restoreWarnings();
1649
1650            if ( $decompressed !== false ) {
1651                $value = $decompressed;
1652            }
1653        }
1654
1655        return unserialize( $value );
1656    }
1657
1658    private function getLoadBalancer(): ILoadBalancer {
1659        if ( !$this->loadBalancer ) {
1660            $this->loadBalancer = ( $this->loadBalancerCallback )();
1661        }
1662        return $this->loadBalancer;
1663    }
1664
1665    /**
1666     * @return IMaintainableDatabase
1667     * @throws DBError
1668     */
1669    private function getConnectionViaLoadBalancer() {
1670        $lb = $this->getLoadBalancer();
1671
1672        if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) {
1673            // Use the main connection to avoid transaction deadlocks
1674            $conn = $lb->getMaintenanceConnectionRef( DB_PRIMARY, [], $this->dbDomain );
1675        } else {
1676            // If the RDBMS has row/table/page level locking, then use separate auto-commit
1677            // connection to avoid needless contention and deadlocks.
1678            $conn = $lb->getMaintenanceConnectionRef(
1679                $this->replicaOnly ? DB_REPLICA : DB_PRIMARY,
1680                [],
1681                $this->dbDomain,
1682                $lb::CONN_TRX_AUTOCOMMIT
1683            );
1684        }
1685
1686        // Make sure any errors are thrown now while we can more easily handle them
1687        $conn->ensureConnection();
1688        return $conn;
1689    }
1690
1691    /**
1692     * @param int $shardIndex
1693     * @param array $server Server config map
1694     * @return IMaintainableDatabase
1695     * @throws DBError
1696     */
1697    private function getConnectionFromServerInfo( $shardIndex, array $server ) {
1698        if ( !isset( $this->conns[$shardIndex] ) ) {
1699            $server['logger'] = $this->logger;
1700            // Make sure this handle always uses autocommit mode, even if DBO_TRX is
1701            // configured.
1702            $server['flags'] &= ~DBO_TRX;
1703
1704            /** @var IMaintainableDatabase $conn Auto-commit connection to the server */
1705            $conn = MediaWikiServices::getInstance()->getDatabaseFactory()
1706                ->create( $server['type'], $server );
1707
1708            // Automatically create the objectcache table for sqlite as needed
1709            if ( $conn->getType() === 'sqlite' ) {
1710                $this->initSqliteDatabase( $conn );
1711            }
1712            $this->conns[$shardIndex] = $conn;
1713        }
1714
1715        // @phan-suppress-next-line PhanTypeMismatchReturnNullable False positive
1716        return $this->conns[$shardIndex];
1717    }
1718
1719    /**
1720     * Handle a DBError which occurred during a read operation.
1721     *
1722     * @param DBError $exception
1723     * @param int $shardIndex Server index
1724     */
1725    private function handleDBError( DBError $exception, $shardIndex ) {
1726        if ( !$this->useLB && $exception instanceof DBConnectionError ) {
1727            unset( $this->conns[$shardIndex] ); // bug T103435
1728
1729            $now = $this->getCurrentTime();
1730            if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
1731                if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
1732                    unset( $this->connFailureTimes[$shardIndex] );
1733                    unset( $this->connFailureErrors[$shardIndex] );
1734                } else {
1735                    $this->logger->debug( __METHOD__ . ": Server #$shardIndex already down" );
1736                    return;
1737                }
1738            }
1739            $this->logger->info( __METHOD__ . ": Server #$shardIndex down until " . ( $now + 60 ) );
1740            $this->connFailureTimes[$shardIndex] = $now;
1741            $this->connFailureErrors[$shardIndex] = $exception;
1742        }
1743        $this->logger->error( "DBError: {$exception->getMessage()}", [ 'exception' => $exception ] );
1744        if ( $exception instanceof DBConnectionError ) {
1745            $this->setLastError( self::ERR_UNREACHABLE );
1746            $this->logger->warning( __METHOD__ . ": ignoring connection error" );
1747        } else {
1748            $this->setLastError( self::ERR_UNEXPECTED );
1749            $this->logger->warning( __METHOD__ . ": ignoring query error" );
1750        }
1751    }
1752
1753    /**
1754     * @param IMaintainableDatabase $db
1755     * @throws DBError
1756     */
1757    private function initSqliteDatabase( IMaintainableDatabase $db ) {
1758        if ( $db->tableExists( 'objectcache', __METHOD__ ) ) {
1759            return;
1760        }
1761        // Use one table for SQLite; sharding does not seem to have much benefit
1762        $db->query( "PRAGMA journal_mode=WAL", __METHOD__ ); // this is permanent
1763        $db->startAtomic( __METHOD__ ); // atomic DDL
1764        try {
1765            $encTable = $db->tableName( 'objectcache' );
1766            $encExptimeIndex = $db->addIdentifierQuotes( $db->tablePrefix() . 'exptime' );
1767            $db->query(
1768                "CREATE TABLE $encTable (\n" .
1769                "    keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
1770                "    value BLOB,\n" .
1771                "    exptime BLOB NOT NULL\n" .
1772                ")",
1773                __METHOD__
1774            );
1775            $db->query( "CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ );
1776            $db->endAtomic( __METHOD__ );
1777        } catch ( DBError $e ) {
1778            $db->rollback( __METHOD__ );
1779            throw $e;
1780        }
1781    }
1782
1783    /**
1784     * Create the shard tables on all databases
1785     *
1786     * This is typically called manually by a sysadmin via eval.php, e.g. for ParserCache:
1787     *
1788     * @code
1789     *     ObjectCache::getInstance( 'myparsercache' )->createTables();
1790     * @endcode
1791     *
1792     * This is different from `$services->getParserCache()->getCacheStorage()->createTables()`,
1793     * which would use the backend set via $wgParserCacheType, which shouldn't be
1794     * set yet for the backend you are creating shard tables on. The expectation
1795     * is to first add the new backend to $wgObjectCaches, run the above, and then enable
1796     * it for live ParserCache traffic by setting $wgParserCacheType.
1797     */
1798    public function createTables() {
1799        foreach ( $this->getShardServerIndexes() as $shardIndex ) {
1800            $db = $this->getConnection( $shardIndex );
1801            if ( in_array( $db->getType(), [ 'mysql', 'postgres' ], true ) ) {
1802                for ( $i = 0; $i < $this->numTableShards; $i++ ) {
1803                    $encBaseTable = $db->tableName( 'objectcache' );
1804                    $encShardTable = $db->tableName( $this->getTableNameByShard( $i ) );
1805                    $db->query( "CREATE TABLE $encShardTable LIKE $encBaseTable", __METHOD__ );
1806                }
1807            }
1808        }
1809    }
1810
1811    /**
1812     * @return int[] List of server indexes
1813     */
1814    private function getShardServerIndexes() {
1815        if ( $this->useLB ) {
1816            // LoadBalancer based configuration
1817            $shardIndexes = [ 0 ];
1818        } else {
1819            // Striped array of database servers
1820            $shardIndexes = array_keys( $this->serverTags );
1821        }
1822
1823        return $shardIndexes;
1824    }
1825
1826    /**
1827     * Silence the transaction profiler until the return value falls out of scope
1828     *
1829     * @return ScopedCallback|null
1830     */
1831    private function silenceTransactionProfiler() {
1832        if ( $this->serverInfos ) {
1833            return null; // no TransactionProfiler injected anyway
1834        }
1835        return Profiler::instance()->getTransactionProfiler()->silenceForScope();
1836    }
1837}