Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
76.91% covered (warning)
76.91%
583 / 758
48.21% covered (danger)
48.21%
27 / 56
CRAP
0.00% covered (danger)
0.00%
0 / 1
SqlBagOStuff
76.91% covered (warning)
76.91%
583 / 758
48.21% covered (danger)
48.21%
27 / 56
692.23
0.00% covered (danger)
0.00%
0 / 1
 __construct
77.78% covered (warning)
77.78%
21 / 27
0.00% covered (danger)
0.00%
0 / 1
7.54
 doGet
100.00% covered (success)
100.00%
12 / 12
100.00% covered (success)
100.00%
1 / 1
4
 doSet
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 doDelete
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 doAdd
87.50% covered (warning)
87.50%
7 / 8
0.00% covered (danger)
0.00%
0 / 1
2.01
 doCas
87.50% covered (warning)
87.50%
7 / 8
0.00% covered (danger)
0.00%
0 / 1
2.01
 doChangeTTL
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 doIncrWithInit
100.00% covered (success)
100.00%
11 / 11
100.00% covered (success)
100.00%
1 / 1
3
 doGetMulti
100.00% covered (success)
100.00%
15 / 15
100.00% covered (success)
100.00%
1 / 1
4
 doSetMulti
100.00% covered (success)
100.00%
11 / 11
100.00% covered (success)
100.00%
1 / 1
1
 doDeleteMulti
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 doChangeTTLMulti
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 getConnection
80.00% covered (warning)
80.00%
8 / 10
0.00% covered (danger)
0.00%
0 / 1
5.20
 getKeyLocation
58.33% covered (warning)
58.33%
7 / 12
0.00% covered (danger)
0.00%
0 / 1
5.16
 getTableNameByShard
50.00% covered (danger)
50.00%
2 / 4
0.00% covered (danger)
0.00%
0 / 1
4.12
 fetchBlobs
90.00% covered (success)
90.00%
36 / 40
0.00% covered (danger)
0.00%
0 / 1
11.12
 modifyBlobs
79.17% covered (warning)
79.17%
19 / 24
0.00% covered (danger)
0.00%
0 / 1
10.90
 modifyTableSpecificBlobsForSet
100.00% covered (success)
100.00%
24 / 24
100.00% covered (success)
100.00%
1 / 1
4
 modifyTableSpecificBlobsForDelete
100.00% covered (success)
100.00%
18 / 18
100.00% covered (success)
100.00%
1 / 1
4
 modifyTableSpecificBlobsForAdd
100.00% covered (success)
100.00%
30 / 30
100.00% covered (success)
100.00%
1 / 1
5
 modifyTableSpecificBlobsForCas
92.31% covered (success)
92.31%
36 / 39
0.00% covered (danger)
0.00%
0 / 1
7.02
 modifyTableSpecificBlobsForChangeTTL
100.00% covered (success)
100.00%
44 / 44
100.00% covered (success)
100.00%
1 / 1
9
 modifyTableSpecificBlobsForIncrInit
77.42% covered (warning)
77.42%
24 / 31
0.00% covered (danger)
0.00%
0 / 1
6.41
 modifyTableSpecificBlobsForIncrInitAsync
92.31% covered (success)
92.31%
12 / 13
0.00% covered (danger)
0.00%
0 / 1
3.00
 makeNewKeyExpiry
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
 newLockingWriteSectionModificationTimestamp
83.33% covered (warning)
83.33%
5 / 6
0.00% covered (danger)
0.00%
0 / 1
2.02
 makeTimestampedModificationToken
93.75% covered (success)
93.75%
15 / 16
0.00% covered (danger)
0.00%
0 / 1
2.00
 buildExistenceConditions
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 buildUpsertRow
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
2
 buildMultiUpsertSetForOverwrite
100.00% covered (success)
100.00%
18 / 18
100.00% covered (success)
100.00%
1 / 1
4
 buildIncrUpsertSet
100.00% covered (success)
100.00%
21 / 21
100.00% covered (success)
100.00%
1 / 1
3
 encodeDbExpiry
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 decodeDbExpiry
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 dbEncodeSerialValue
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
2
 dbDecodeSerialValue
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
2
 addCasTokenFields
44.44% covered (danger)
44.44%
8 / 18
0.00% covered (danger)
0.00%
0 / 1
9.29
 getCasTokenFromRow
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
2.03
 garbageCollect
92.86% covered (success)
92.86%
13 / 14
0.00% covered (danger)
0.00%
0 / 1
2.00
 expireAll
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 deleteObjectsExpiringBefore
0.00% covered (danger)
0.00%
0 / 34
0.00% covered (danger)
0.00%
0 / 1
90
 deleteServerObjectsExpiringBefore
75.51% covered (warning)
75.51%
37 / 49
0.00% covered (danger)
0.00%
0 / 1
12.78
 deleteAll
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
20
 doLock
50.00% covered (danger)
50.00%
6 / 12
0.00% covered (danger)
0.00%
0 / 1
2.50
 doUnlock
62.50% covered (warning)
62.50%
5 / 8
0.00% covered (danger)
0.00%
0 / 1
2.21
 makeKeyInternal
100.00% covered (success)
100.00%
13 / 13
100.00% covered (success)
100.00%
1 / 1
5
 requireConvertGenericKey
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 serialize
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
3
 unserialize
90.91% covered (success)
90.91%
10 / 11
0.00% covered (danger)
0.00%
0 / 1
5.02
 getLoadBalancer
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 getConnectionViaLoadBalancer
45.45% covered (danger)
45.45%
5 / 11
0.00% covered (danger)
0.00%
0 / 1
4.46
 getConnectionFromServerInfo
100.00% covered (success)
100.00%
9 / 9
100.00% covered (success)
100.00%
1 / 1
3
 handleDBError
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
42
 initSqliteDatabase
10.53% covered (danger)
10.53%
2 / 19
0.00% covered (danger)
0.00%
0 / 1
9.45
 createTables
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 getShardServerIndexes
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 silenceTransactionProfiler
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
1<?php
2/**
3 * Object caching using a SQL database.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
19 *
20 * @file
21 * @ingroup Cache
22 */
23
24use MediaWiki\MediaWikiServices;
25use Wikimedia\AtEase\AtEase;
26use Wikimedia\ObjectCache\MediumSpecificBagOStuff;
27use Wikimedia\Rdbms\Blob;
28use Wikimedia\Rdbms\Database;
29use Wikimedia\Rdbms\DBConnectionError;
30use Wikimedia\Rdbms\DBError;
31use Wikimedia\Rdbms\DBQueryError;
32use Wikimedia\Rdbms\IDatabase;
33use Wikimedia\Rdbms\ILoadBalancer;
34use Wikimedia\Rdbms\IMaintainableDatabase;
35use Wikimedia\Rdbms\RawSQLValue;
36use Wikimedia\Rdbms\SelectQueryBuilder;
37use Wikimedia\Rdbms\ServerInfo;
38use Wikimedia\ScopedCallback;
39use Wikimedia\Timestamp\ConvertibleTimestamp;
40
41/**
42 * RDBMS-based caching module
43 *
44 * The following database sharding schemes are supported:
45 *   - None; all keys map to the same shard
46 *   - Hash; keys map to shards via consistent hashing
47 *
48 * The following database replication topologies are supported:
49 *   - A primary database server for each shard, all within one datacenter
50 *   - A co-primary database server for each shard within each datacenter
51 *
52 * @ingroup Cache
53 */
54class SqlBagOStuff extends MediumSpecificBagOStuff {
55    /** @var callable|null Injected function which returns a LoadBalancer */
56    protected $loadBalancerCallback;
57    /** @var ILoadBalancer|null */
58    protected $loadBalancer;
59    /** @var string|false|null DB name used for keys using the LoadBalancer */
60    protected $dbDomain;
61    /** @var bool Whether to use the LoadBalancer */
62    protected $useLB = false;
63
64    /** @var array[] (server index => server config) */
65    protected $serverInfos = [];
66    /** @var string[] (server index => tag/host name) */
67    protected $serverTags = [];
68    /** @var float UNIX timestamp */
69    protected $lastGarbageCollect = 0;
70    /** @var int Average number of writes required to trigger garbage collection */
71    protected $purgePeriod = 10;
72    /** @var int Max expired rows to purge during randomized garbage collection */
73    protected $purgeLimit = 100;
74    /** @var int Number of table shards to use on each server */
75    protected $numTableShards = 1;
76    /** @var int */
77    protected $writeBatchSize = 100;
78    /** @var string */
79    protected $tableName = 'objectcache';
80    /** @var bool Whether to use replicas instead of primaries (if using LoadBalancer) */
81    protected $replicaOnly;
82    /** @var bool Whether multi-primary mode is enabled */
83    protected $multiPrimaryMode;
84
85    /** @var IMaintainableDatabase[] Map of (shard index => DB handle) */
86    protected $conns;
87    /** @var float[] Map of (shard index => UNIX timestamps) */
88    protected $connFailureTimes = [];
89    /** @var Exception[] Map of (shard index => Exception) */
90    protected $connFailureErrors = [];
91
92    /** @var bool Whether zlib methods are available to PHP */
93    private $hasZlib;
94
95    /** A number of seconds well above any expected clock skew */
96    private const SAFE_CLOCK_BOUND_SEC = 15;
97    /** A number of seconds well above any expected clock skew and replication lag */
98    private const SAFE_PURGE_DELAY_SEC = 3600;
99    /** Distinct string for tombstones stored in the "serialized" value column */
100    private const TOMB_SERIAL = '';
101    /** Relative seconds-to-live to use for tombstones */
102    private const TOMB_EXPTIME = -self::SAFE_CLOCK_BOUND_SEC;
103    /** How many seconds must pass before triggering a garbage collection */
104    private const GC_DELAY_SEC = 1;
105
106    private const BLOB_VALUE = 0;
107    private const BLOB_EXPIRY = 1;
108    private const BLOB_CASTOKEN = 2;
109
110    /**
111     * Placeholder timestamp to use for TTL_INDEFINITE that can be stored in all RDBMs types.
112     * We use BINARY(14) for MySQL, BLOB for Sqlite, and TIMESTAMPZ for Postgres (which goes
113     * up to 294276 AD). The last second of the year 9999 can be stored in all these cases.
114     * https://www.postgresql.org/docs/9.0/datatype-datetime.html
115     */
116    private const INF_TIMESTAMP_PLACEHOLDER = '99991231235959';
117
118    /**
119     * Create a new backend instance from parameters injected by ObjectCache::newFromParams()
120     *
121     * The database servers must be provided by *either* the "server" parameter, the "servers"
122     * parameter or the "loadBalancer" parameter.
123     *
124     * The parameters are as described at {@link \MediaWiki\MainConfigSchema::ObjectCaches}
125     * except that:
126     *
127     *   - the configured "cluster" and main LB fallback modes are implemented by
128     *     the wiring by passing "loadBalancerCallback".
129     *   - "dbDomain" is required if "loadBalancerCallback" is set, whereas in
130     *     config it may be absent.
131     *
132     * @internal
133     *
134     * @param array $params
135     *   - server: string
136     *   - servers: string[]
137     *   - loadBalancerCallback: A closure which provides a LoadBalancer object
138     *      - dbDomain: string|false
139     *   - multiPrimaryMode: bool
140     *   - purgePeriod: int|float
141     *   - purgeLimit: int
142     *   - tableName: string
143     *   - shards: int
144     *   - replicaOnly: bool
145     *   - writeBatchSize: int
146     */
147    public function __construct( $params ) {
148        parent::__construct( $params );
149
150        if ( isset( $params['servers'] ) || isset( $params['server'] ) ) {
151            // Configuration uses a direct list of servers.
152            // Object data is horizontally partitioned via key hash.
153            $index = 0;
154            foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) {
155                $this->serverInfos[$index] = $info;
156                // Allow integer-indexes arrays for b/c
157                $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index";
158                ++$index;
159            }
160        } elseif ( isset( $params['loadBalancerCallback'] ) ) {
161            $this->loadBalancerCallback = $params['loadBalancerCallback'];
162            if ( !isset( $params['dbDomain'] ) ) {
163                throw new InvalidArgumentException(
164                    __METHOD__ . ": 'dbDomain' is required if 'loadBalancerCallback' is given"
165                );
166            }
167            $this->dbDomain = $params['dbDomain'];
168            $this->useLB = true;
169        } else {
170            throw new InvalidArgumentException(
171                __METHOD__ . " requires 'server', 'servers', or 'loadBalancerCallback'"
172            );
173        }
174
175        $this->purgePeriod = intval( $params['purgePeriod'] ?? $this->purgePeriod );
176        $this->purgeLimit = intval( $params['purgeLimit'] ?? $this->purgeLimit );
177        $this->tableName = $params['tableName'] ?? $this->tableName;
178        $this->numTableShards = intval( $params['shards'] ?? $this->numTableShards );
179        $this->writeBatchSize = intval( $params['writeBatchSize'] ?? $this->writeBatchSize );
180        $this->replicaOnly = $params['replicaOnly'] ?? false;
181        $this->multiPrimaryMode = $params['multiPrimaryMode'] ?? false;
182
183        $this->attrMap[self::ATTR_DURABILITY] = self::QOS_DURABILITY_RDBMS;
184
185        $this->hasZlib = extension_loaded( 'zlib' );
186    }
187
188    protected function doGet( $key, $flags = 0, &$casToken = null ) {
189        $getToken = ( $casToken === self::PASS_BY_REF );
190        $casToken = null;
191
192        $data = $this->fetchBlobs( [ $key ], $getToken )[$key];
193        if ( $data ) {
194            $result = $this->unserialize( $data[self::BLOB_VALUE] );
195            if ( $getToken && $result !== false ) {
196                $casToken = $data[self::BLOB_CASTOKEN];
197            }
198            $valueSize = strlen( $data[self::BLOB_VALUE] );
199        } else {
200            $result = false;
201            $valueSize = false;
202        }
203
204        $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );
205
206        return $result;
207    }
208
209    protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
210        $mtime = $this->getCurrentTime();
211
212        return $this->modifyBlobs(
213            [ $this, 'modifyTableSpecificBlobsForSet' ],
214            $mtime,
215            [ $key => [ $value, $exptime ] ]
216        );
217    }
218
219    protected function doDelete( $key, $flags = 0 ) {
220        $mtime = $this->getCurrentTime();
221
222        return $this->modifyBlobs(
223            [ $this, 'modifyTableSpecificBlobsForDelete' ],
224            $mtime,
225            [ $key => [] ]
226        );
227    }
228
229    protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
230        $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
231        if ( $mtime === null ) {
232            // Timeout or I/O error during lock acquisition
233            return false;
234        }
235
236        return $this->modifyBlobs(
237            [ $this, 'modifyTableSpecificBlobsForAdd' ],
238            $mtime,
239            [ $key => [ $value, $exptime ] ]
240        );
241    }
242
243    protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
244        $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
245        if ( $mtime === null ) {
246            // Timeout or I/O error during lock acquisition
247            return false;
248        }
249
250        return $this->modifyBlobs(
251            [ $this, 'modifyTableSpecificBlobsForCas' ],
252            $mtime,
253            [ $key => [ $value, $exptime, $casToken ] ]
254        );
255    }
256
257    protected function doChangeTTL( $key, $exptime, $flags ) {
258        $mtime = $this->getCurrentTime();
259
260        return $this->modifyBlobs(
261            [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
262            $mtime,
263            [ $key => [ $exptime ] ]
264        );
265    }
266
267    protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
268        $mtime = $this->getCurrentTime();
269
270        if ( $flags & self::WRITE_BACKGROUND ) {
271            $callback = [ $this, 'modifyTableSpecificBlobsForIncrInitAsync' ];
272        } else {
273            $callback = [ $this, 'modifyTableSpecificBlobsForIncrInit' ];
274        }
275
276        $result = $this->modifyBlobs(
277            $callback,
278            $mtime,
279            [ $key => [ $step, $init, $exptime ] ],
280            $resByKey
281        ) ? $resByKey[$key] : false;
282
283        return $result;
284    }
285
286    protected function doGetMulti( array $keys, $flags = 0 ) {
287        $result = [];
288        $valueSizeByKey = [];
289
290        $dataByKey = $this->fetchBlobs( $keys );
291        foreach ( $keys as $key ) {
292            $data = $dataByKey[$key];
293            if ( $data ) {
294                $serialValue = $data[self::BLOB_VALUE];
295                $value = $this->unserialize( $serialValue );
296                if ( $value !== false ) {
297                    $result[$key] = $value;
298                }
299                $valueSize = strlen( $serialValue );
300            } else {
301                $valueSize = false;
302            }
303            $valueSizeByKey[$key] = [ 0, $valueSize ];
304        }
305
306        $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey );
307
308        return $result;
309    }
310
311    protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
312        $mtime = $this->getCurrentTime();
313
314        return $this->modifyBlobs(
315            [ $this, 'modifyTableSpecificBlobsForSet' ],
316            $mtime,
317            array_map(
318                static function ( $value ) use ( $exptime ) {
319                    return [ $value, $exptime ];
320                },
321                $data
322            )
323        );
324    }
325
326    protected function doDeleteMulti( array $keys, $flags = 0 ) {
327        $mtime = $this->getCurrentTime();
328
329        return $this->modifyBlobs(
330            [ $this, 'modifyTableSpecificBlobsForDelete' ],
331            $mtime,
332            array_fill_keys( $keys, [] )
333        );
334    }
335
336    public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
337        $mtime = $this->getCurrentTime();
338
339        return $this->modifyBlobs(
340            [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
341            $mtime,
342            array_fill_keys( $keys, [ $exptime ] )
343        );
344    }
345
346    /**
347     * Get a connection to the specified database
348     *
349     * @param int $shardIndex Server index
350     * @return IMaintainableDatabase
351     * @throws DBConnectionError
352     * @throws UnexpectedValueException
353     */
354    private function getConnection( $shardIndex ) {
355        if ( $this->useLB ) {
356            return $this->getConnectionViaLoadBalancer();
357        }
358
359        // Don't keep timing out trying to connect if the server is down
360        if (
361            isset( $this->connFailureErrors[$shardIndex] ) &&
362            ( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
363        ) {
364            throw $this->connFailureErrors[$shardIndex];
365        }
366
367        if ( isset( $this->serverInfos[$shardIndex] ) ) {
368            $server = $this->serverInfos[$shardIndex];
369            $conn = $this->getConnectionFromServerInfo( $shardIndex, $server );
370        } else {
371            throw new UnexpectedValueException( "Invalid server index #$shardIndex" );
372        }
373
374        return $conn;
375    }
376
377    /**
378     * Get the server index and table name for a given key
379     * @param string $key
380     * @return array (server index, table name)
381     */
382    private function getKeyLocation( $key ) {
383        if ( $this->useLB ) {
384            // LoadBalancer based configuration
385            $shardIndex = 0;
386        } else {
387            // Striped array of database servers
388            if ( count( $this->serverTags ) == 1 ) {
389                $shardIndex = 0; // short-circuit
390            } else {
391                $sortedServers = $this->serverTags;
392                ArrayUtils::consistentHashSort( $sortedServers, $key );
393                $shardIndex = array_key_first( $sortedServers );
394            }
395        }
396
397        if ( $this->numTableShards > 1 ) {
398            $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
399            $tableIndex = $hash % $this->numTableShards;
400        } else {
401            $tableIndex = null;
402        }
403
404        return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ];
405    }
406
407    /**
408     * Get the table name for a given shard index
409     * @param int|null $index
410     * @return string
411     */
412    private function getTableNameByShard( $index ) {
413        if ( $index !== null && $this->numTableShards > 1 ) {
414            $decimals = strlen( (string)( $this->numTableShards - 1 ) );
415
416            return $this->tableName . sprintf( "%0{$decimals}d", $index );
417        }
418
419        return $this->tableName;
420    }
421
422    /**
423     * @param string[] $keys
424     * @param bool $getCasToken Whether to get a CAS token
425     * @return array<string,array|null> Order-preserved map of (key => (value,expiry,token) or null)
426     */
427    private function fetchBlobs( array $keys, bool $getCasToken = false ) {
428        /** @noinspection PhpUnusedLocalVariableInspection */
429        $silenceScope = $this->silenceTransactionProfiler();
430
431        // Initialize order-preserved per-key results; set values for live keys below
432        $dataByKey = array_fill_keys( $keys, null );
433
434        $readTime = (int)$this->getCurrentTime();
435        $keysByTableByShard = [];
436        foreach ( $keys as $key ) {
437            [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key );
438            $keysByTableByShard[$shardIndex][$partitionTable][] = $key;
439        }
440
441        foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) {
442            try {
443                $db = $this->getConnection( $shardIndex );
444                foreach ( $serverKeys as $partitionTable => $tableKeys ) {
445                    $res = $db->newSelectQueryBuilder()
446                        ->select(
447                            $getCasToken
448                            ? $this->addCasTokenFields( $db, [ 'keyname', 'value', 'exptime' ] )
449                            : [ 'keyname', 'value', 'exptime' ] )
450                        ->from( $partitionTable )
451                        ->where( $this->buildExistenceConditions( $db, $tableKeys, $readTime ) )
452                        ->caller( __METHOD__ )
453                        ->fetchResultSet();
454                    foreach ( $res as $row ) {
455                        $row->shardIndex = $shardIndex;
456                        $row->tableName = $partitionTable;
457                        $dataByKey[$row->keyname] = $row;
458                    }
459                }
460            } catch ( DBError $e ) {
461                $this->handleDBError( $e, $shardIndex );
462            }
463        }
464
465        foreach ( $keys as $key ) {
466            $row = $dataByKey[$key] ?? null;
467            if ( !$row ) {
468                continue;
469            }
470
471            $this->debug( __METHOD__ . ": retrieved $key; expiry time is {$row->exptime}" );
472            try {
473                $db = $this->getConnection( $row->shardIndex );
474                $dataByKey[$key] = [
475                    self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ),
476                    self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ),
477                    self::BLOB_CASTOKEN => $getCasToken
478                        ? $this->getCasTokenFromRow( $db, $row )
479                        : null
480                ];
481            } catch ( DBQueryError $e ) {
482                $this->handleDBError( $e, $row->shardIndex );
483            }
484        }
485
486        return $dataByKey;
487    }
488
489    /**
490     * @param callable $tableWriteCallback Callback the takes the following arguments:
491     *  - IDatabase instance
492     *  - Partition table name string
493     *  - UNIX modification timestamp
494     *  - Map of (key => list of arguments) for keys belonging to the server/table partition
495     *  - Map of (key => result) [returned]
496     * @param float $mtime UNIX modification timestamp
497     * @param array<string,array> $argsByKey Map of (key => list of arguments)
498     * @param array<string,mixed> &$resByKey Order-preserved map of (key => result) [returned]
499     * @return bool Whether all keys were processed
500     * @param-taint $argsByKey none
501     */
502    private function modifyBlobs(
503        callable $tableWriteCallback,
504        float $mtime,
505        array $argsByKey,
506        &$resByKey = []
507    ) {
508        // Initialize order-preserved per-key results; callbacks mark successful results
509        $resByKey = array_fill_keys( array_keys( $argsByKey ), false );
510
511        /** @noinspection PhpUnusedLocalVariableInspection */
512        $silenceScope = $this->silenceTransactionProfiler();
513
514        $argsByKeyByTableByShard = [];
515        foreach ( $argsByKey as $key => $args ) {
516            [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key );
517            $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args;
518        }
519
520        $shardIndexesAffected = [];
521        foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) {
522            foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) {
523                try {
524                    $db = $this->getConnection( $shardIndex );
525                    $shardIndexesAffected[] = $shardIndex;
526                    $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey );
527                } catch ( DBError $e ) {
528                    $this->handleDBError( $e, $shardIndex );
529                    continue;
530                }
531            }
532        }
533
534        $success = !in_array( false, $resByKey, true );
535
536        foreach ( $shardIndexesAffected as $shardIndex ) {
537            try {
538                if (
539                    // Random purging is enabled
540                    $this->purgePeriod &&
541                    // Only purge on one in every $this->purgePeriod writes
542                    mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
543                    // Avoid repeating the delete within a few seconds
544                    ( $this->getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC
545                ) {
546                    $this->garbageCollect( $shardIndex );
547                }
548            } catch ( DBError $e ) {
549                $this->handleDBError( $e, $shardIndex );
550            }
551        }
552
553        return $success;
554    }
555
556    /**
557     * Set key/value pairs belonging to a partition table on the given server
558     *
559     * In multi-primary mode, if the current row for a key exists and has a modification token
560     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
561     * then the write to that key will be aborted with a "false" result. Successfully modified
562     * key rows will be assigned a new modification token using the provided timestamp.
563     *
564     * @param IDatabase $db Handle to the database server where the argument keys belong
565     * @param string $ptable Name of the partition table where the argument keys belong
566     * @param float $mtime UNIX modification timestamp
567     * @param array<string,array> $argsByKey Non-empty (key => (value,exptime)) map
568     * @param array<string,mixed> &$resByKey Map of (key => result) for successful writes [returned]
569     * @throws DBError
570     */
571    private function modifyTableSpecificBlobsForSet(
572        IDatabase $db,
573        string $ptable,
574        float $mtime,
575        array $argsByKey,
576        array &$resByKey
577    ) {
578        $valueSizesByKey = [];
579
580        $mt = $this->makeTimestampedModificationToken( $mtime, $db );
581
582        $rows = [];
583        foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
584            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
585            $serialValue = $this->getSerialized( $value, $key );
586            $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
587
588            $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
589        }
590
591        if ( $this->multiPrimaryMode ) {
592            $db->newInsertQueryBuilder()
593                ->insertInto( $ptable )
594                ->rows( $rows )
595                ->onDuplicateKeyUpdate()
596                ->uniqueIndexFields( [ 'keyname' ] )
597                ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) )
598                ->caller( __METHOD__ )->execute();
599        } else {
600            // T288998: use REPLACE, if possible, to avoid cluttering the binlogs
601            $db->newReplaceQueryBuilder()
602                ->replaceInto( $ptable )
603                ->rows( $rows )
604                ->uniqueIndexFields( [ 'keyname' ] )
605                ->caller( __METHOD__ )->execute();
606        }
607
608        foreach ( $argsByKey as $key => $unused ) {
609            $resByKey[$key] = true;
610        }
611
612        $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
613    }
614
615    /**
616     * Purge/tombstone key/value pairs belonging to a partition table on the given server
617     *
618     * In multi-primary mode, if the current row for a key exists and has a modification token
619     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
620     * then the write to that key will be aborted with a "false" result. Successfully modified
621     * key rows will be assigned a new modification token/timestamp, an empty value, and an
622     * expiration timestamp dated slightly before the new modification timestamp.
623     *
624     * @param IDatabase $db Handle to the database server where the argument keys belong
625     * @param string $ptable Name of the partition table where the argument keys belong
626     * @param float $mtime UNIX modification timestamp
627     * @param array<string,array> $argsByKey Non-empty (key => []) map
628     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
629     * @throws DBError
630     */
631    private function modifyTableSpecificBlobsForDelete(
632        IDatabase $db,
633        string $ptable,
634        float $mtime,
635        array $argsByKey,
636        array &$resByKey
637    ) {
638        if ( $this->multiPrimaryMode ) {
639            // Tombstone keys in order to respect eventual consistency
640            $mt = $this->makeTimestampedModificationToken( $mtime, $db );
641            $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (int)$mtime );
642            $queryBuilder = $db->newInsertQueryBuilder()
643                ->insertInto( $ptable )
644                ->onDuplicateKeyUpdate()
645                ->uniqueIndexFields( [ 'keyname' ] )
646                ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) );
647            foreach ( $argsByKey as $key => $arg ) {
648                $queryBuilder->row( $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt ) );
649            }
650            $queryBuilder->caller( __METHOD__ )->execute();
651        } else {
652            // Just purge the keys since there is only one primary (e.g. "source of truth")
653            $db->newDeleteQueryBuilder()
654                ->deleteFrom( $ptable )
655                ->where( [ 'keyname' => array_keys( $argsByKey ) ] )
656                ->caller( __METHOD__ )->execute();
657        }
658
659        foreach ( $argsByKey as $key => $arg ) {
660            $resByKey[$key] = true;
661        }
662
663        $this->updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) );
664    }
665
666    /**
667     * Insert key/value pairs belonging to a partition table on the given server
668     *
669     * If the current row for a key exists and has an integral UNIX timestamp of expiration
670     * greater than that of the provided modification timestamp, then the write to that key
671     * will be aborted with a "false" result. Acquisition of advisory key locks must be handled
672     * by calling functions.
673     *
674     * In multi-primary mode, if the current row for a key exists and has a modification token
675     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
676     * then the write to that key will be aborted with a "false" result. Successfully modified
677     * key rows will be assigned a new modification token/timestamp.
678     *
679     * @param IDatabase $db Handle to the database server where the argument keys belong
680     * @param string $ptable Name of the partition table where the argument keys belong
681     * @param float $mtime UNIX modification timestamp
682     * @param array<string,array> $argsByKey Non-empty (key => (value,exptime)) map
683     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
684     * @throws DBError
685     */
686    private function modifyTableSpecificBlobsForAdd(
687        IDatabase $db,
688        string $ptable,
689        float $mtime,
690        array $argsByKey,
691        array &$resByKey
692    ) {
693        $valueSizesByKey = [];
694
695        $mt = $this->makeTimestampedModificationToken( $mtime, $db );
696
697        // This check must happen outside the write query to respect eventual consistency
698        $existingKeys = $db->newSelectQueryBuilder()
699            ->select( 'keyname' )
700            ->from( $ptable )
701            ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
702            ->caller( __METHOD__ )
703            ->fetchFieldValues();
704        $existingByKey = array_fill_keys( $existingKeys, true );
705
706        $rows = [];
707        foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
708            if ( isset( $existingByKey[$key] ) ) {
709                $this->logger->debug( __METHOD__ . "$key already exists" );
710                continue;
711            }
712
713            $serialValue = $this->getSerialized( $value, $key );
714            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
715            $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
716            $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
717        }
718        if ( !$rows ) {
719            return;
720        }
721        $db->newInsertQueryBuilder()
722            ->insertInto( $ptable )
723            ->rows( $rows )
724            ->onDuplicateKeyUpdate()
725            ->uniqueIndexFields( [ 'keyname' ] )
726            ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) )
727            ->caller( __METHOD__ )->execute();
728
729        foreach ( $argsByKey as $key => $unused ) {
730            $resByKey[$key] = !isset( $existingByKey[$key] );
731        }
732
733        $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey );
734    }
735
736    /**
737     * Insert key/value pairs belonging to a partition table on the given server
738     *
739     * If the current row for a key exists, has an integral UNIX timestamp of expiration greater
740     * than that of the provided modification timestamp, and the CAS token does not match, then
741     * the write to that key will be aborted with a "false" result. Acquisition of advisory key
742     * locks must be handled by calling functions.
743     *
744     * In multi-primary mode, if the current row for a key exists and has a modification token
745     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
746     * then the write to that key will be aborted with a "false" result. Successfully modified
747     * key rows will be assigned a new modification token/timestamp.
748     *
749     * @param IDatabase $db Handle to the database server where the argument keys belong
750     * @param string $ptable Name of the partition table where the argument keys belong
751     * @param float $mtime UNIX modification timestamp
752     * @param array<string,array> $argsByKey Non-empty (key => (value, exptime, CAS token)) map
753     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
754     * @throws DBError
755     */
756    private function modifyTableSpecificBlobsForCas(
757        IDatabase $db,
758        string $ptable,
759        float $mtime,
760        array $argsByKey,
761        array &$resByKey
762    ) {
763        $valueSizesByKey = [];
764
765        $mt = $this->makeTimestampedModificationToken( $mtime, $db );
766
767        // This check must happen outside the write query to respect eventual consistency
768        $res = $db->newSelectQueryBuilder()
769            ->select( $this->addCasTokenFields( $db, [ 'keyname' ] ) )
770            ->from( $ptable )
771            ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
772            ->caller( __METHOD__ )
773            ->fetchResultSet();
774
775        $curTokensByKey = [];
776        foreach ( $res as $row ) {
777            $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row );
778        }
779
780        $nonMatchingByKey = [];
781        $rows = [];
782        foreach ( $argsByKey as $key => [ $value, $exptime, $casToken ] ) {
783            $curToken = $curTokensByKey[$key] ?? null;
784            if ( $curToken === null ) {
785                $nonMatchingByKey[$key] = true;
786                $this->logger->debug( __METHOD__ . "$key does not exists" );
787                continue;
788            }
789
790            if ( $curToken !== $casToken ) {
791                $nonMatchingByKey[$key] = true;
792                $this->logger->debug( __METHOD__ . "$key does not have a matching token" );
793                continue;
794            }
795
796            $serialValue = $this->getSerialized( $value, $key );
797            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
798            $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
799
800            $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
801        }
802        if ( !$rows ) {
803