Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
79.53% |
637 / 801 |
|
52.63% |
30 / 57 |
CRAP | |
0.00% |
0 / 1 |
SqlBagOStuff | |
79.53% |
637 / 801 |
|
52.63% |
30 / 57 |
630.64 | |
0.00% |
0 / 1 |
__construct | |
77.78% |
21 / 27 |
|
0.00% |
0 / 1 |
7.54 | |||
doGet | |
100.00% |
12 / 12 |
|
100.00% |
1 / 1 |
4 | |||
doSet | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
doDelete | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
doAdd | |
87.50% |
7 / 8 |
|
0.00% |
0 / 1 |
2.01 | |||
doCas | |
87.50% |
7 / 8 |
|
0.00% |
0 / 1 |
2.01 | |||
doChangeTTL | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
doIncrWithInit | |
100.00% |
11 / 11 |
|
100.00% |
1 / 1 |
3 | |||
doGetMulti | |
100.00% |
15 / 15 |
|
100.00% |
1 / 1 |
4 | |||
doSetMulti | |
100.00% |
11 / 11 |
|
100.00% |
1 / 1 |
1 | |||
doDeleteMulti | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
doChangeTTLMulti | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
getConnection | |
80.00% |
8 / 10 |
|
0.00% |
0 / 1 |
5.20 | |||
getShardIndexesForKey | |
92.86% |
13 / 14 |
|
0.00% |
0 / 1 |
6.01 | |||
getTableNameForKey | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
3 | |||
getTableNameByShard | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
3 | |||
fetchBlobs | |
81.67% |
49 / 60 |
|
0.00% |
0 / 1 |
21.22 | |||
modifyBlobs | |
73.33% |
22 / 30 |
|
0.00% |
0 / 1 |
19.27 | |||
modifyTableSpecificBlobsForSet | |
100.00% |
24 / 24 |
|
100.00% |
1 / 1 |
4 | |||
modifyTableSpecificBlobsForDelete | |
100.00% |
18 / 18 |
|
100.00% |
1 / 1 |
4 | |||
modifyTableSpecificBlobsForAdd | |
100.00% |
30 / 30 |
|
100.00% |
1 / 1 |
5 | |||
modifyTableSpecificBlobsForCas | |
92.31% |
36 / 39 |
|
0.00% |
0 / 1 |
7.02 | |||
modifyTableSpecificBlobsForChangeTTL | |
100.00% |
44 / 44 |
|
100.00% |
1 / 1 |
9 | |||
modifyTableSpecificBlobsForIncrInit | |
77.42% |
24 / 31 |
|
0.00% |
0 / 1 |
6.41 | |||
modifyTableSpecificBlobsForIncrInitAsync | |
92.31% |
12 / 13 |
|
0.00% |
0 / 1 |
3.00 | |||
makeNewKeyExpiry | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
2 | |||
newLockingWriteSectionModificationTimestamp | |
83.33% |
5 / 6 |
|
0.00% |
0 / 1 |
2.02 | |||
makeTimestampedModificationToken | |
93.75% |
15 / 16 |
|
0.00% |
0 / 1 |
2.00 | |||
buildExistenceConditions | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
1 | |||
buildUpsertRow | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
2 | |||
buildMultiUpsertSetForOverwrite | |
100.00% |
18 / 18 |
|
100.00% |
1 / 1 |
4 | |||
buildIncrUpsertSet | |
100.00% |
21 / 21 |
|
100.00% |
1 / 1 |
3 | |||
encodeDbExpiry | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
2 | |||
decodeDbExpiry | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
dbEncodeSerialValue | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
2 | |||
dbDecodeSerialValue | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
2 | |||
addCasTokenFields | |
44.44% |
8 / 18 |
|
0.00% |
0 / 1 |
9.29 | |||
getCasTokenFromRow | |
80.00% |
4 / 5 |
|
0.00% |
0 / 1 |
2.03 | |||
garbageCollect | |
100.00% |
14 / 14 |
|
100.00% |
1 / 1 |
2 | |||
expireAll | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
deleteObjectsExpiringBefore | |
0.00% |
0 / 34 |
|
0.00% |
0 / 1 |
90 | |||
deleteServerObjectsExpiringBefore | |
75.51% |
37 / 49 |
|
0.00% |
0 / 1 |
12.78 | |||
deleteAll | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
20 | |||
doLock | |
53.85% |
7 / 13 |
|
0.00% |
0 / 1 |
3.88 | |||
doUnlock | |
70.00% |
7 / 10 |
|
0.00% |
0 / 1 |
3.24 | |||
makeKeyInternal | |
100.00% |
17 / 17 |
|
100.00% |
1 / 1 |
5 | |||
requireConvertGenericKey | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
serialize | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
3 | |||
unserialize | |
90.91% |
10 / 11 |
|
0.00% |
0 / 1 |
5.02 | |||
getLoadBalancer | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
getConnectionViaLoadBalancer | |
45.45% |
5 / 11 |
|
0.00% |
0 / 1 |
2.65 | |||
getConnectionFromServerInfo | |
100.00% |
10 / 10 |
|
100.00% |
1 / 1 |
3 | |||
handleDBError | |
0.00% |
0 / 18 |
|
0.00% |
0 / 1 |
42 | |||
initSqliteDatabase | |
84.21% |
16 / 19 |
|
0.00% |
0 / 1 |
3.04 | |||
createTables | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
getShardServerIndexes | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
silenceTransactionProfiler | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 |
1 | <?php |
2 | /** |
3 | * Object caching using a SQL database. |
4 | * |
5 | * This program is free software; you can redistribute it and/or modify |
6 | * it under the terms of the GNU General Public License as published by |
7 | * the Free Software Foundation; either version 2 of the License, or |
8 | * (at your option) any later version. |
9 | * |
10 | * This program is distributed in the hope that it will be useful, |
11 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | * GNU General Public License for more details. |
14 | * |
15 | * You should have received a copy of the GNU General Public License along |
16 | * with this program; if not, write to the Free Software Foundation, Inc., |
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
18 | * http://www.gnu.org/copyleft/gpl.html |
19 | * |
20 | * @file |
21 | * @ingroup Cache |
22 | */ |
23 | |
24 | use MediaWiki\MediaWikiServices; |
25 | use Wikimedia\AtEase\AtEase; |
26 | use Wikimedia\ObjectCache\MediumSpecificBagOStuff; |
27 | use Wikimedia\Rdbms\Blob; |
28 | use Wikimedia\Rdbms\Database; |
29 | use Wikimedia\Rdbms\DBConnectionError; |
30 | use Wikimedia\Rdbms\DBError; |
31 | use Wikimedia\Rdbms\DBQueryError; |
32 | use Wikimedia\Rdbms\IDatabase; |
33 | use Wikimedia\Rdbms\ILoadBalancer; |
34 | use Wikimedia\Rdbms\IMaintainableDatabase; |
35 | use Wikimedia\Rdbms\RawSQLValue; |
36 | use Wikimedia\Rdbms\SelectQueryBuilder; |
37 | use Wikimedia\Rdbms\ServerInfo; |
38 | use Wikimedia\ScopedCallback; |
39 | use Wikimedia\Timestamp\ConvertibleTimestamp; |
40 | |
41 | /** |
42 | * RDBMS-based caching module |
43 | * |
44 | * The following database sharding schemes are supported: |
45 | * - None; all keys map to the same shard |
46 | * - Hash; keys map to shards via consistent hashing |
47 | * |
48 | * The following database replication topologies are supported: |
49 | * - A primary database server for each shard, all within one datacenter |
50 | * - A co-primary database server for each shard within each datacenter |
51 | * |
52 | * @ingroup Cache |
53 | */ |
54 | class SqlBagOStuff extends MediumSpecificBagOStuff { |
55 | /** @var callable|null Injected function which returns a LoadBalancer */ |
56 | protected $loadBalancerCallback; |
57 | /** @var ILoadBalancer|null */ |
58 | protected $loadBalancer; |
59 | /** @var string|false|null DB name used for keys using the LoadBalancer */ |
60 | protected $dbDomain; |
61 | /** @var bool Whether to use the LoadBalancer */ |
62 | protected $useLB = false; |
63 | |
64 | /** @var array[] (server index => server config) */ |
65 | protected $serverInfos = []; |
66 | /** @var string[] (server index => tag/host name) */ |
67 | protected $serverTags = []; |
68 | /** @var float UNIX timestamp */ |
69 | protected $lastGarbageCollect = 0; |
70 | /** @var int Average number of writes required to trigger garbage collection */ |
71 | protected $purgePeriod = 10; |
72 | /** @var int Max expired rows to purge during randomized garbage collection */ |
73 | protected $purgeLimit = 100; |
74 | /** @var int Number of table shards to use on each server */ |
75 | protected $numTableShards = 1; |
76 | /** @var int */ |
77 | protected $writeBatchSize = 100; |
78 | /** @var string */ |
79 | protected $tableName = 'objectcache'; |
80 | /** @var bool Whether multi-primary mode is enabled */ |
81 | protected $multiPrimaryMode; |
82 | |
83 | /** @var IMaintainableDatabase[] Map of (shard index => DB handle) */ |
84 | protected $conns; |
85 | /** @var float[] Map of (shard index => UNIX timestamps) */ |
86 | protected $connFailureTimes = []; |
87 | /** @var Exception[] Map of (shard index => Exception) */ |
88 | protected $connFailureErrors = []; |
89 | |
90 | /** @var bool Whether zlib methods are available to PHP */ |
91 | private $hasZlib; |
92 | |
93 | /** @var int Number of redundant read and writes for better consistency */ |
94 | private $dataRedundancy; |
95 | |
96 | /** A number of seconds well above any expected clock skew */ |
97 | private const SAFE_CLOCK_BOUND_SEC = 15; |
98 | /** A number of seconds well above any expected clock skew and replication lag */ |
99 | private const SAFE_PURGE_DELAY_SEC = 3600; |
100 | /** Distinct string for tombstones stored in the "serialized" value column */ |
101 | private const TOMB_SERIAL = ''; |
102 | /** Relative seconds-to-live to use for tombstones */ |
103 | private const TOMB_EXPTIME = -self::SAFE_CLOCK_BOUND_SEC; |
104 | /** How many seconds must pass before triggering a garbage collection */ |
105 | private const GC_DELAY_SEC = 1; |
106 | |
107 | private const BLOB_VALUE = 0; |
108 | private const BLOB_EXPIRY = 1; |
109 | private const BLOB_CASTOKEN = 2; |
110 | |
111 | /** |
112 | * Placeholder timestamp to use for TTL_INDEFINITE that can be stored in all RDBMs types. |
113 | * We use BINARY(14) for MySQL, BLOB for Sqlite, and TIMESTAMPZ for Postgres (which goes |
114 | * up to 294276 AD). The last second of the year 9999 can be stored in all these cases. |
115 | * https://www.postgresql.org/docs/9.0/datatype-datetime.html |
116 | */ |
117 | private const INF_TIMESTAMP_PLACEHOLDER = '99991231235959'; |
118 | |
119 | /** |
120 | * Create a new backend instance from parameters injected by ObjectCache::newFromParams() |
121 | * |
122 | * The database servers must be provided by *either* the "server" parameter, the "servers" |
123 | * parameter or the "loadBalancer" parameter. |
124 | * |
125 | * The parameters are as described at {@link \MediaWiki\MainConfigSchema::ObjectCaches} |
126 | * except that: |
127 | * |
128 | * - the configured "cluster" and main LB fallback modes are implemented by |
129 | * the wiring by passing "loadBalancerCallback". |
130 | * - "dbDomain" is required if "loadBalancerCallback" is set, whereas in |
131 | * config it may be absent. |
132 | * |
133 | * @internal |
134 | * |
135 | * @param array $params |
136 | * - server: string |
137 | * - servers: string[] |
138 | * - loadBalancerCallback: A closure which provides a LoadBalancer object |
139 | * - dbDomain: string|false |
140 | * - multiPrimaryMode: bool |
141 | * - purgePeriod: int|float |
142 | * - purgeLimit: int |
143 | * - tableName: string |
144 | * - shards: int |
145 | * - writeBatchSize: int |
146 | */ |
147 | public function __construct( $params ) { |
148 | parent::__construct( $params ); |
149 | |
150 | if ( isset( $params['servers'] ) || isset( $params['server'] ) ) { |
151 | // Configuration uses a direct list of servers. |
152 | // Object data is horizontally partitioned via key hash. |
153 | $index = 0; |
154 | foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) { |
155 | $this->serverInfos[$index] = $info; |
156 | // Allow integer-indexes arrays for b/c |
157 | $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index"; |
158 | ++$index; |
159 | } |
160 | } elseif ( isset( $params['loadBalancerCallback'] ) ) { |
161 | $this->loadBalancerCallback = $params['loadBalancerCallback']; |
162 | if ( !isset( $params['dbDomain'] ) ) { |
163 | throw new InvalidArgumentException( |
164 | __METHOD__ . ": 'dbDomain' is required if 'loadBalancerCallback' is given" |
165 | ); |
166 | } |
167 | $this->dbDomain = $params['dbDomain']; |
168 | $this->useLB = true; |
169 | } else { |
170 | throw new InvalidArgumentException( |
171 | __METHOD__ . " requires 'server', 'servers', or 'loadBalancerCallback'" |
172 | ); |
173 | } |
174 | |
175 | $this->purgePeriod = intval( $params['purgePeriod'] ?? $this->purgePeriod ); |
176 | $this->purgeLimit = intval( $params['purgeLimit'] ?? $this->purgeLimit ); |
177 | $this->tableName = $params['tableName'] ?? $this->tableName; |
178 | $this->numTableShards = intval( $params['shards'] ?? $this->numTableShards ); |
179 | $this->writeBatchSize = intval( $params['writeBatchSize'] ?? $this->writeBatchSize ); |
180 | $this->multiPrimaryMode = $params['multiPrimaryMode'] ?? false; |
181 | $this->dataRedundancy = min( intval( $params['dataRedundancy'] ?? 1 ), count( $this->serverTags ) ); |
182 | |
183 | $this->attrMap[self::ATTR_DURABILITY] = self::QOS_DURABILITY_RDBMS; |
184 | |
185 | $this->hasZlib = extension_loaded( 'zlib' ); |
186 | } |
187 | |
188 | protected function doGet( $key, $flags = 0, &$casToken = null ) { |
189 | $getToken = ( $casToken === self::PASS_BY_REF ); |
190 | $casToken = null; |
191 | |
192 | $data = $this->fetchBlobs( [ $key ], $getToken )[$key]; |
193 | if ( $data ) { |
194 | $result = $this->unserialize( $data[self::BLOB_VALUE] ); |
195 | if ( $getToken && $result !== false ) { |
196 | $casToken = $data[self::BLOB_CASTOKEN]; |
197 | } |
198 | $valueSize = strlen( $data[self::BLOB_VALUE] ); |
199 | } else { |
200 | $result = false; |
201 | $valueSize = false; |
202 | } |
203 | |
204 | $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] ); |
205 | |
206 | return $result; |
207 | } |
208 | |
209 | protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) { |
210 | $mtime = $this->getCurrentTime(); |
211 | |
212 | return $this->modifyBlobs( |
213 | [ $this, 'modifyTableSpecificBlobsForSet' ], |
214 | $mtime, |
215 | [ $key => [ $value, $exptime ] ] |
216 | ); |
217 | } |
218 | |
219 | protected function doDelete( $key, $flags = 0 ) { |
220 | $mtime = $this->getCurrentTime(); |
221 | |
222 | return $this->modifyBlobs( |
223 | [ $this, 'modifyTableSpecificBlobsForDelete' ], |
224 | $mtime, |
225 | [ $key => [] ] |
226 | ); |
227 | } |
228 | |
229 | protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) { |
230 | $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope ); |
231 | if ( $mtime === null ) { |
232 | // Timeout or I/O error during lock acquisition |
233 | return false; |
234 | } |
235 | |
236 | return $this->modifyBlobs( |
237 | [ $this, 'modifyTableSpecificBlobsForAdd' ], |
238 | $mtime, |
239 | [ $key => [ $value, $exptime ] ] |
240 | ); |
241 | } |
242 | |
243 | protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) { |
244 | $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope ); |
245 | if ( $mtime === null ) { |
246 | // Timeout or I/O error during lock acquisition |
247 | return false; |
248 | } |
249 | |
250 | return $this->modifyBlobs( |
251 | [ $this, 'modifyTableSpecificBlobsForCas' ], |
252 | $mtime, |
253 | [ $key => [ $value, $exptime, $casToken ] ] |
254 | ); |
255 | } |
256 | |
257 | protected function doChangeTTL( $key, $exptime, $flags ) { |
258 | $mtime = $this->getCurrentTime(); |
259 | |
260 | return $this->modifyBlobs( |
261 | [ $this, 'modifyTableSpecificBlobsForChangeTTL' ], |
262 | $mtime, |
263 | [ $key => [ $exptime ] ] |
264 | ); |
265 | } |
266 | |
267 | protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) { |
268 | $mtime = $this->getCurrentTime(); |
269 | |
270 | if ( $flags & self::WRITE_BACKGROUND ) { |
271 | $callback = [ $this, 'modifyTableSpecificBlobsForIncrInitAsync' ]; |
272 | } else { |
273 | $callback = [ $this, 'modifyTableSpecificBlobsForIncrInit' ]; |
274 | } |
275 | |
276 | $result = $this->modifyBlobs( |
277 | $callback, |
278 | $mtime, |
279 | [ $key => [ $step, $init, $exptime ] ], |
280 | $resByKey |
281 | ) ? $resByKey[$key] : false; |
282 | |
283 | return $result; |
284 | } |
285 | |
286 | protected function doGetMulti( array $keys, $flags = 0 ) { |
287 | $result = []; |
288 | $valueSizeByKey = []; |
289 | |
290 | $dataByKey = $this->fetchBlobs( $keys ); |
291 | foreach ( $keys as $key ) { |
292 | $data = $dataByKey[$key]; |
293 | if ( $data ) { |
294 | $serialValue = $data[self::BLOB_VALUE]; |
295 | $value = $this->unserialize( $serialValue ); |
296 | if ( $value !== false ) { |
297 | $result[$key] = $value; |
298 | } |
299 | $valueSize = strlen( $serialValue ); |
300 | } else { |
301 | $valueSize = false; |
302 | } |
303 | $valueSizeByKey[$key] = [ 0, $valueSize ]; |
304 | } |
305 | |
306 | $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey ); |
307 | |
308 | return $result; |
309 | } |
310 | |
311 | protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) { |
312 | $mtime = $this->getCurrentTime(); |
313 | |
314 | return $this->modifyBlobs( |
315 | [ $this, 'modifyTableSpecificBlobsForSet' ], |
316 | $mtime, |
317 | array_map( |
318 | static function ( $value ) use ( $exptime ) { |
319 | return [ $value, $exptime ]; |
320 | }, |
321 | $data |
322 | ) |
323 | ); |
324 | } |
325 | |
326 | protected function doDeleteMulti( array $keys, $flags = 0 ) { |
327 | $mtime = $this->getCurrentTime(); |
328 | |
329 | return $this->modifyBlobs( |
330 | [ $this, 'modifyTableSpecificBlobsForDelete' ], |
331 | $mtime, |
332 | array_fill_keys( $keys, [] ) |
333 | ); |
334 | } |
335 | |
336 | public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) { |
337 | $mtime = $this->getCurrentTime(); |
338 | |
339 | return $this->modifyBlobs( |
340 | [ $this, 'modifyTableSpecificBlobsForChangeTTL' ], |
341 | $mtime, |
342 | array_fill_keys( $keys, [ $exptime ] ) |
343 | ); |
344 | } |
345 | |
346 | /** |
347 | * Get a connection to the specified database |
348 | * |
349 | * @param int $shardIndex Server index |
350 | * @return IMaintainableDatabase |
351 | * @throws DBConnectionError |
352 | * @throws UnexpectedValueException |
353 | */ |
354 | private function getConnection( $shardIndex ) { |
355 | if ( $this->useLB ) { |
356 | return $this->getConnectionViaLoadBalancer(); |
357 | } |
358 | |
359 | // Don't keep timing out trying to connect if the server is down |
360 | if ( |
361 | isset( $this->connFailureErrors[$shardIndex] ) && |
362 | ( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60 |
363 | ) { |
364 | throw $this->connFailureErrors[$shardIndex]; |
365 | } |
366 | |
367 | if ( isset( $this->serverInfos[$shardIndex] ) ) { |
368 | $server = $this->serverInfos[$shardIndex]; |
369 | $conn = $this->getConnectionFromServerInfo( $shardIndex, $server ); |
370 | } else { |
371 | throw new UnexpectedValueException( "Invalid server index #$shardIndex" ); |
372 | } |
373 | |
374 | return $conn; |
375 | } |
376 | |
377 | /** |
378 | * Get shard indexes to read and write from for a given key |
379 | * |
380 | * @param string $key |
381 | * @param bool $fallback Whether try to fallback to the next db |
382 | * @return int[] list of shard indexes |
383 | */ |
384 | private function getShardIndexesForKey( $key, $fallback = false ) { |
385 | // Pick the same shard for sister keys |
386 | // Using the same hash stop as mc-router for consistency |
387 | if ( str_contains( $key, '|#|' ) ) { |
388 | $key = explode( '|#|', $key )[0]; |
389 | } |
390 | if ( $this->useLB ) { |
391 | // LoadBalancer based configuration |
392 | return [ 0 ]; |
393 | } else { |
394 | // Striped array of database servers |
395 | if ( count( $this->serverTags ) == 1 ) { |
396 | return [ 0 ]; |
397 | } else { |
398 | $sortedServers = $this->serverTags; |
399 | // shuffle the servers based on hashing of the keys |
400 | ArrayUtils::consistentHashSort( $sortedServers, $key ); |
401 | $shardIndexes = array_keys( $sortedServers ); |
402 | if ( $this->dataRedundancy === 1 ) { |
403 | if ( $fallback ) { |
404 | return [ $shardIndexes[1] ]; |
405 | } else { |
406 | return [ $shardIndexes[0] ]; |
407 | } |
408 | } else { |
409 | return array_slice( $shardIndexes, 0, $this->dataRedundancy ); |
410 | } |
411 | } |
412 | } |
413 | } |
414 | |
415 | /** |
416 | * Get the table name for a given key |
417 | * @param string $key |
418 | * @return string table name |
419 | */ |
420 | private function getTableNameForKey( $key ) { |
421 | // Pick the same shard for sister keys |
422 | // Using the same hash stop as mc-router for consistency |
423 | if ( str_contains( $key, '|#|' ) ) { |
424 | $key = explode( '|#|', $key )[0]; |
425 | } |
426 | |
427 | if ( $this->numTableShards > 1 ) { |
428 | $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff; |
429 | $tableIndex = $hash % $this->numTableShards; |
430 | } else { |
431 | $tableIndex = null; |
432 | } |
433 | |
434 | return $this->getTableNameByShard( $tableIndex ); |
435 | } |
436 | |
437 | /** |
438 | * Get the table name for a given shard index |
439 | * @param int|null $index |
440 | * @return string |
441 | */ |
442 | private function getTableNameByShard( $index ) { |
443 | if ( $index !== null && $this->numTableShards > 1 ) { |
444 | $decimals = strlen( (string)( $this->numTableShards - 1 ) ); |
445 | |
446 | return $this->tableName . sprintf( "%0{$decimals}d", $index ); |
447 | } |
448 | |
449 | return $this->tableName; |
450 | } |
451 | |
452 | /** |
453 | * @param string[] $keys |
454 | * @param bool $getCasToken Whether to get a CAS token |
455 | * @param bool $fallback Whether try to fallback to the next db |
456 | * @return array<string,array|null> Map of (key => (value,expiry,token) or null) |
457 | */ |
458 | private function fetchBlobs( array $keys, bool $getCasToken = false, $fallback = false ) { |
459 | if ( $fallback && count( $this->serverTags ) > 1 && $this->dataRedundancy > 1 ) { |
460 | // fallback doesn't work with data redundancy |
461 | return []; |
462 | } |
463 | |
464 | /** @noinspection PhpUnusedLocalVariableInspection */ |
465 | $silenceScope = $this->silenceTransactionProfiler(); |
466 | |
467 | // Initialize order-preserved per-key results; set values for live keys below |
468 | $dataByKey = array_fill_keys( $keys, null ); |
469 | $dataByKeyAndShard = []; |
470 | |
471 | $readTime = (int)$this->getCurrentTime(); |
472 | $keysByTableByShard = []; |
473 | foreach ( $keys as $key ) { |
474 | $partitionTable = $this->getTableNameForKey( $key ); |
475 | $shardIndexes = $this->getShardIndexesForKey( $key, $fallback ); |
476 | foreach ( $shardIndexes as $shardIndex ) { |
477 | $keysByTableByShard[$shardIndex][$partitionTable][] = $key; |
478 | } |
479 | } |
480 | $fallbackResult = []; |
481 | |
482 | foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) { |
483 | try { |
484 | $db = $this->getConnection( $shardIndex ); |
485 | foreach ( $serverKeys as $partitionTable => $tableKeys ) { |
486 | $res = $db->newSelectQueryBuilder() |
487 | ->select( |
488 | $getCasToken |
489 | ? $this->addCasTokenFields( $db, [ 'keyname', 'value', 'exptime' ] ) |
490 | : [ 'keyname', 'value', 'exptime' ] ) |
491 | ->from( $partitionTable ) |
492 | ->where( $this->buildExistenceConditions( $db, $tableKeys, $readTime ) ) |
493 | ->caller( __METHOD__ ) |
494 | ->fetchResultSet(); |
495 | foreach ( $res as $row ) { |
496 | $row->shardIndex = $shardIndex; |
497 | $row->tableName = $partitionTable; |
498 | $dataByKeyAndShard[$row->keyname][$shardIndex] = $row; |
499 | } |
500 | } |
501 | } catch ( DBError $e ) { |
502 | if ( $fallback ) { |
503 | $this->handleDBError( $e, $shardIndex ); |
504 | } else { |
505 | $fallbackResult += $this->fetchBlobs( |
506 | array_merge( ...array_values( $serverKeys ) ), |
507 | $getCasToken, |
508 | true |
509 | ); |
510 | } |
511 | |
512 | } |
513 | } |
514 | foreach ( $keys as $key ) { |
515 | $rowsByShard = $dataByKeyAndShard[$key] ?? null; |
516 | if ( !$rowsByShard ) { |
517 | continue; |
518 | } |
519 | |
520 | // One response, no point of consistency checks |
521 | if ( count( $rowsByShard ) == 1 ) { |
522 | $row = array_values( $rowsByShard )[0]; |
523 | } else { |
524 | usort( $rowsByShard, static function ( $a, $b ) { |
525 | if ( $a->exptime == $b->exptime ) { |
526 | return 0; |
527 | } |
528 | return ( $a->exptime < $b->exptime ) ? 1 : -1; |
529 | } ); |
530 | $row = array_values( $rowsByShard )[0]; |
531 | } |
532 | |
533 | $this->debug( __METHOD__ . ": retrieved $key; expiry time is {$row->exptime}" ); |
534 | try { |
535 | $db = $this->getConnection( $row->shardIndex ); |
536 | $dataByKey[$key] = [ |
537 | self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ), |
538 | self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ), |
539 | self::BLOB_CASTOKEN => $getCasToken |
540 | ? $this->getCasTokenFromRow( $db, $row ) |
541 | : null |
542 | ]; |
543 | } catch ( DBQueryError $e ) { |
544 | $this->handleDBError( $e, $row->shardIndex ); |
545 | } |
546 | } |
547 | |
548 | return array_merge( $dataByKey, $fallbackResult ); |
549 | } |
550 | |
551 | /** |
552 | * @param callable $tableWriteCallback Callback the takes the following arguments: |
553 | * - IDatabase instance |
554 | * - Partition table name string |
555 | * - UNIX modification timestamp |
556 | * - Map of (key => list of arguments) for keys belonging to the server/table partition |
557 | * - Map of (key => result) [returned] |
558 | * @param float $mtime UNIX modification timestamp |
559 | * @param array<string,array> $argsByKey Map of (key => list of arguments) |
560 | * @param array<string,mixed> &$resByKey Map of (key => result) [returned] |
561 | * @param bool $fallback Whether try to fallback to the next db |
562 | * @return bool Whether all keys were processed |
563 | * @param-taint $argsByKey none |
564 | */ |
565 | private function modifyBlobs( |
566 | callable $tableWriteCallback, |
567 | float $mtime, |
568 | array $argsByKey, |
569 | &$resByKey = [], |
570 | $fallback = false |
571 | ) { |
572 | if ( $fallback && count( $this->serverTags ) > 1 && $this->dataRedundancy > 1 ) { |
573 | // fallback doesn't work with data redundancy |
574 | return false; |
575 | } |
576 | // Initialize order-preserved per-key results; callbacks mark successful results |
577 | $resByKey = array_fill_keys( array_keys( $argsByKey ), false ); |
578 | |
579 | /** @noinspection PhpUnusedLocalVariableInspection */ |
580 | $silenceScope = $this->silenceTransactionProfiler(); |
581 | |
582 | $argsByKeyByTableByShard = []; |
583 | foreach ( $argsByKey as $key => $args ) { |
584 | $partitionTable = $this->getTableNameForKey( $key ); |
585 | $shardIndexes = $this->getShardIndexesForKey( $key, $fallback ); |
586 | foreach ( $shardIndexes as $shardIndex ) { |
587 | $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args; |
588 | } |
589 | } |
590 | |
591 | $shardIndexesAffected = []; |
592 | foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) { |
593 | foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) { |
594 | try { |
595 | $db = $this->getConnection( $shardIndex ); |
596 | $shardIndexesAffected[] = $shardIndex; |
597 | $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey ); |
598 | } catch ( DBError $e ) { |
599 | if ( $fallback ) { |
600 | $this->handleDBError( $e, $shardIndex ); |
601 | continue; |
602 | } else { |
603 | $this->modifyBlobs( $tableWriteCallback, $mtime, $ptKeyArgs, $resByKey, true ); |
604 | } |
605 | } |
606 | } |
607 | } |
608 | |
609 | $success = !in_array( false, $resByKey, true ); |
610 | |
611 | foreach ( $shardIndexesAffected as $shardIndex ) { |
612 | try { |
613 | if ( |
614 | // Random purging is enabled |
615 | $this->purgePeriod >= 1 && |
616 | // Only purge on one in every $this->purgePeriod writes |
617 | mt_rand( 1, $this->purgePeriod ) == 1 && |
618 | // Avoid repeating the delete within a few seconds |
619 | ( $this->getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC |
620 | ) { |
621 | $this->garbageCollect( $shardIndex ); |
622 | } |
623 | } catch ( DBError $e ) { |
624 | $this->handleDBError( $e, $shardIndex ); |
625 | } |
626 | } |
627 | |
628 | return $success; |
629 | } |
630 | |
631 | /** |
632 | * Set key/value pairs belonging to a partition table on the given server |
633 | * |
634 | * In multi-primary mode, if the current row for a key exists and has a modification token |
635 | * with a greater integral UNIX timestamp than that of the provided modification timestamp, |
636 | * then the write to that key will be aborted with a "false" result. Successfully modified |
637 | * key rows will be assigned a new modification token using the provided timestamp. |
638 | * |
639 | * @param IDatabase $db Handle to the database server where the argument keys belong |
640 | * @param string $ptable Name of the partition table where the argument keys belong |
641 | * @param float $mtime UNIX modification timestamp |
642 | * @param array<string,array> $argsByKey Non-empty (key => (value,exptime)) map |
643 | * @param array<string,mixed> &$resByKey Map of (key => result) for successful writes [returned] |
644 | * @throws DBError |
645 | */ |
646 | private function modifyTableSpecificBlobsForSet( |
647 | IDatabase $db, |
648 | string $ptable, |
649 | float $mtime, |
650 | array $argsByKey, |
651 | array &$resByKey |
652 | ) { |
653 | $valueSizesByKey = []; |
654 | |
655 | $mt = $this->makeTimestampedModificationToken( $mtime, $db ); |
656 | |
657 | $rows = []; |
658 | foreach ( $argsByKey as $key => [ $value, $exptime ] ) { |
659 | $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); |
660 | $serialValue = $this->getSerialized( $value, $key ); |
661 | $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ); |
662 | |
663 | $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ]; |
664 | } |
665 | |
666 | if ( $this->multiPrimaryMode ) { |
667 | $db->newInsertQueryBuilder() |
668 | ->insertInto( $ptable ) |
669 | ->rows( $rows ) |
670 | ->onDuplicateKeyUpdate() |
671 | ->uniqueIndexFields( [ 'keyname' ] ) |
672 | ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) ) |
673 | ->caller( __METHOD__ )->execute(); |
674 | } else { |
675 | // T288998: use REPLACE, if possible, to avoid cluttering the binlogs |
676 | $db->newReplaceQueryBuilder() |
677 | ->replaceInto( $ptable ) |
678 | ->rows( $rows ) |
679 | ->uniqueIndexFields( [ 'keyname' ] ) |
680 | ->caller( __METHOD__ )->execute(); |
681 | } |
682 | |
683 | foreach ( $argsByKey as $key => $unused ) { |
684 | $resByKey[$key] = true; |
685 | } |
686 | |
687 | $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey ); |
688 | } |
689 | |
690 | /** |
691 | * Purge/tombstone key/value pairs belonging to a partition table on the given server |
692 | * |
693 | * In multi-primary mode, if the current row for a key exists and has a modification token |
694 | * with a greater integral UNIX timestamp than that of the provided modification timestamp, |
695 | * then the write to that key will be aborted with a "false" result. Successfully modified |
696 | * key rows will be assigned a new modification token/timestamp, an empty value, and an |
697 | * expiration timestamp dated slightly before the new modification timestamp. |
698 | * |
699 | * @param IDatabase $db Handle to the database server where the argument keys belong |
700 | * @param string $ptable Name of the partition table where the argument keys belong |
701 | * @param float $mtime UNIX modification timestamp |
702 | * @param array<string,array> $argsByKey Non-empty (key => []) map |
703 | * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned] |
704 | * @throws DBError |
705 | */ |
706 | private function modifyTableSpecificBlobsForDelete( |
707 | IDatabase $db, |
708 | string $ptable, |
709 | float $mtime, |
710 | array $argsByKey, |
711 | array &$resByKey |
712 | ) { |
713 | if ( $this->multiPrimaryMode ) { |
714 | // Tombstone keys in order to respect eventual consistency |
715 | $mt = $this->makeTimestampedModificationToken( $mtime, $db ); |
716 | $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (int)$mtime ); |
717 | $queryBuilder = $db->newInsertQueryBuilder() |
718 | ->insertInto( $ptable ) |
719 | ->onDuplicateKeyUpdate() |
720 | ->uniqueIndexFields( [ 'keyname' ] ) |
721 | ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) ); |
722 | foreach ( $argsByKey as $key => $arg ) { |
723 | $queryBuilder->row( $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt ) ); |
724 | } |
725 | $queryBuilder->caller( __METHOD__ )->execute(); |
726 | } else { |
727 | // Just purge the keys since there is only one primary (e.g. "source of truth") |
728 | $db->newDeleteQueryBuilder() |
729 | ->deleteFrom( $ptable ) |
730 | ->where( [ 'keyname' => array_keys( $argsByKey ) ] ) |
731 | ->caller( __METHOD__ )->execute(); |
732 | } |
733 | |
734 | foreach ( $argsByKey as $key => $arg ) { |
735 | $resByKey[$key] = true; |
736 | } |
737 | |
738 | $this->updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) ); |
739 | } |
740 | |
741 | /** |
742 | * Insert key/value pairs belonging to a partition table on the given server |
743 | * |
744 | * If the current row for a key exists and has an integral UNIX timestamp of expiration |
745 | * greater than that of the provided modification timestamp, then the write to that key |
746 | * will be aborted with a "false" result. Acquisition of advisory key locks must be handled |
747 | * by calling functions. |
748 | * |
749 | * In multi-primary mode, if the current row for a key exists and has a modification token |
750 | * with a greater integral UNIX timestamp than that of the provided modification timestamp, |
751 | * then the write to that key will be aborted with a "false" result. Successfully modified |
752 | * key rows will be assigned a new modification token/timestamp. |
753 | * |
754 | * @param IDatabase $db Handle to the database server where the argument keys belong |
755 | * @param string $ptable Name of the partition table where the argument keys belong |
756 | * @param float $mtime UNIX modification timestamp |
757 | * @param array<string,array> $argsByKey Non-empty (key => (value,exptime)) map |
758 | * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned] |
759 | * @throws DBError |
760 | */ |
761 | private function modifyTableSpecificBlobsForAdd( |
762 | IDatabase $db, |
763 | string $ptable, |
764 | float $mtime, |
765 | array $argsByKey, |
766 | array &$resByKey |
767 | ) { |
768 | $valueSizesByKey = []; |
769 | |
770 | $mt = $this->makeTimestampedModificationToken( $mtime, $db ); |
771 | |
772 | // This check must happen outside the write query to respect eventual consistency |
773 | $existingKeys = $db->newSelectQueryBuilder() |
774 | ->select( 'keyname' ) |
775 | ->from( $ptable ) |
776 | ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) ) |
777 | ->caller( __METHOD__ ) |
778 | ->fetchFieldValues(); |
779 | $existingByKey = array_fill_keys( $existingKeys, true ); |
780 | |
781 | $rows = []; |
782 | foreach ( $argsByKey as $key => [ $value, $exptime ] ) { |
783 | if ( isset( $existingByKey[$key] ) ) { |
784 | $this->logger->debug( __METHOD__ . ": $key already exists" ); |
785 | continue; |
786 | } |
787 | |
788 | $serialValue = $this->getSerialized( $value, $key ); |
789 | $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); |
790 | $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ]; |
791 | $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ); |
792 | } |
793 | if ( !$rows ) { |
794 | return; |
795 | } |
796 | $db->newInsertQueryBuilder() |
797 | ->insertInto( $ptable ) |
798 | ->rows( $rows ) |
799 | ->onDuplicateKeyUpdate() |
800 | ->uniqueIndexFields( [ 'keyname' ] ) |
801 | ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) ) |
802 | ->caller( __METHOD__ )->execute(); |
803 | |
804 | foreach ( $argsByKey as $key => $unused ) { |
805 | $resByKey[$key] = !isset( $existingByKey[$key] ); |
806 | } |
807 | |
808 | $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey ); |
809 | } |
810 | |
811 | /** |
812 | * Insert key/value pairs belonging to a partition table on the given server |
813 | * |
814 | * If the current row for a key exists, has an integral UNIX timestamp of expiration greater |
815 | * than that of the provided modification timestamp, and the CAS token does not match, then |
816 | * the write to that key will be aborted with a "false" result. Acquisition of advisory key |
817 | * locks must be handled by calling functions. |
818 | * |
819 | * In multi-primary mode, if the current row for a key exists and has a modification token |
820 | * with a greater integral UNIX timestamp than that of the provided modification timestamp, |
821 | * then the write to that key will be aborted with a "false" result. Successfully modified |
822 | * key rows will be assigned a new modification token/timestamp. |
823 | * |
824 | * @param IDatabase $db Handle to the database server where the argument keys belong |
825 | * @param string $ptable Name of the partition table where the argument keys belong |
826 | * @param float $mtime UNIX modification timestamp |
827 | * @param array<string,array> $argsByKey Non-empty (key => (value, exptime, CAS token)) map |
828 | * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned] |
829 | * @throws DBError |
830 | */ |
831 | private function modifyTableSpecificBlobsForCas( |
832 | IDatabase $db, |
833 | string $ptable, |
834 | float $mtime, |
835 | array $argsByKey, |
836 | array &$resByKey |
837 | ) { |
838 | $valueSizesByKey = []; |
839 | |
840 | $mt = $this->makeTimestampedModificationToken( $mtime, $db ); |
841 | |
842 | // This check must happen outside the write query to respect eventual consistency |
843 | $res = $db->newSelectQueryBuilder() |
844 | ->select( $this->addCasTokenFields( $db, [ 'keyname' ] ) ) |
845 | ->from( $ptable ) |
846 | ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) ) |
847 | ->caller( __METHOD__ ) |
848 | ->fetchResultSet(); |
849 | |
850 | $curTokensByKey = []; |
851 | foreach ( $res as $row ) { |
852 | $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row ); |
853 | } |
854 | |
855 | $nonMatchingByKey = []; |
856 | $rows = []; |
857 | foreach ( $argsByKey as $key => [ $value, $exptime, $casToken ] ) { |
858 | $curToken = $curTokensByKey[$key] ?? null; |
859 | if ( $curToken === null ) { |
860 | $nonMatchingByKey[$key] = true; |
861 | $this->logger->debug( __METHOD__ . ": $key does not exists" ); |
862 | continue; |
863 | } |
864 | |
865 | if ( $curToken !== $casToken ) { |
866 | $nonMatchingByKey[$key] = true; |
867 | $this->logger->debug( __METHOD__ . ": $key does not have a matching token" ); |
868 | continue; |
869 | } |
870 | |
871 | $serialValue = $this->getSerialized( $value, $key ); |
872 | $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); |
873 | $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ]; |
874 | |
875 | $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ); |
876 | } |
877 | if ( !$rows ) { |
878 | return; |
879 | } |
880 | $db->newInsertQueryBuilder() |
881 | ->insertInto( $ptable ) |
882 | ->rows( $rows ) |
883 | ->onDuplicateKeyUpdate() |
884 | ->uniqueIndexFields( [ 'keyname' ] ) |
885 | ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) ) |
886 | ->caller( __METHOD__ )->execute(); |
887 | |
888 | foreach ( $argsByKey as $key => $unused ) { |
889 | $resByKey[$key] = !isset( $nonMatchingByKey[$key] ); |
890 | } |
891 | |
892 | $this->updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey ); |
893 | } |
894 | |
895 | /** |
896 | * Update the TTL for keys belonging to a partition table on the given server |
897 | * |
898 | * If no current row for a key exists or the current row has an integral UNIX timestamp of |
899 | * expiration less than that of the provided modification timestamp, then the write to that |
900 | * key will be aborted with a "false" result. |
901 | * |
902 | * In multi-primary mode, if the current row for a key exists and has a modification token |
903 | * with a greater integral UNIX timestamp than that of the provided modification timestamp, |
904 | * then the write to that key will be aborted with a "false" result. Successfully modified |
905 | * key rows will be assigned a new modification token/timestamp. |
906 | * |
907 | * @param IDatabase $db Handle to the database server where the argument keys belong |
908 | * @param string $ptable Name of the partition table where the argument keys belong |
909 | * @param float $mtime UNIX modification timestamp |
910 | * @param array<string,array> $argsByKey Non-empty (key => (exptime)) map |
911 | * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned] |
912 | * @throws DBError |
913 | */ |
914 | private function modifyTableSpecificBlobsForChangeTTL( |
915 | IDatabase $db, |
916 | string $ptable, |
917 | float $mtime, |
918 | array $argsByKey, |
919 | array &$resByKey |
920 | ) { |
921 | if ( $this->multiPrimaryMode ) { |
922 | $mt = $this->makeTimestampedModificationToken( $mtime, $db ); |
923 | |
924 | $res = $db->newSelectQueryBuilder() |
925 | ->select( [ 'keyname', 'value' ] ) |
926 | ->from( $ptable ) |
927 | ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) ) |
928 | ->caller( __METHOD__ ) |
929 | ->fetchResultSet(); |
930 | |
931 | $rows = []; |
932 | $existingKeys = []; |
933 | foreach ( $res as $curRow ) { |
934 | $key = $curRow->keyname; |
935 | $existingKeys[$key] = true; |
936 | $serialValue = $this->dbDecodeSerialValue( $db, $curRow->value ); |
937 | [ $exptime ] = $argsByKey[$key]; |
938 | $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); |
939 | $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ); |
940 | } |
941 | if ( !$rows ) { |
942 | return; |
943 | } |
944 | $db->newInsertQueryBuilder() |
945 | ->insertInto( $ptable ) |
946 | ->rows( $rows ) |
947 | ->onDuplicateKeyUpdate() |
948 | ->uniqueIndexFields( [ 'keyname' ] ) |
949 | ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) ) |
950 | ->caller( __METHOD__ )->execute(); |
951 | |
952 | foreach ( $argsByKey as $key => $unused ) { |
953 | $resByKey[$key] = isset( $existingKeys[$key] ); |
954 | } |
955 | } else { |
956 | $keysBatchesByExpiry = []; |
957 | foreach ( $argsByKey as $key => [ $exptime ] ) { |
958 | $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); |
959 | $keysBatchesByExpiry[$expiry][] = $key; |
960 | } |
961 | |
962 | $existingCount = 0; |
963 | foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) { |
964 | $db->newUpdateQueryBuilder() |
965 | ->update( $ptable ) |
966 | ->set( [ 'exptime' => $this->encodeDbExpiry( $db, $expiry ) ] ) |
967 | ->where( $this->buildExistenceConditions( $db, $keyBatch, (int)$mtime ) ) |
968 | ->caller( __METHOD__ )->execute(); |
969 | $existingCount += $db->affectedRows(); |
970 | } |
971 | if ( $existingCount === count( $argsByKey ) ) { |
972 | foreach ( $argsByKey as $key => $args ) { |
973 | $resByKey[$key] = true; |
974 | } |
975 | } |
976 | } |
977 | |
978 | $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) ); |
979 | } |
980 | |
981 | /** |
982 | * Either increment a counter key, if it exists, or initialize it, otherwise |
983 | * |
984 | * If no current row for a key exists or the current row has an integral UNIX timestamp of |
985 | * expiration less than that of the provided modification timestamp, then the key row will |
986 | * be set to the initial value. Otherwise, the current row will be incremented. |
987 | * |
988 | * In multi-primary mode, if the current row for a key exists and has a modification token |
989 | * with a greater integral UNIX timestamp than that of the provided modification timestamp, |
990 | * then the write to that key will be aborted with a "false" result. Successfully initialized |
991 | * key rows will be assigned a new modification token/timestamp. |
992 | * |
993 | * @param IDatabase $db Handle to the database server where the argument keys belong |
994 | * @param string $ptable Name of the partition table where the argument keys belong |
995 | * @param float $mtime UNIX modification timestamp |
996 | * @param array<string,array> $argsByKey Non-empty (key => (step, init, exptime) map |
997 | * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned] |
998 | * @throws DBError |
999 | */ |
1000 | private function modifyTableSpecificBlobsForIncrInit( |
1001 | IDatabase $db, |
1002 | string $ptable, |
1003 | float $mtime, |
1004 | array $argsByKey, |
1005 | array &$resByKey |
1006 | ) { |
1007 | foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) { |
1008 | $mt = $this->makeTimestampedModificationToken( $mtime, $db ); |
1009 | $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); |
1010 | |
1011 | // Use a transaction so that changes from other threads are not visible due to |
1012 | // "consistent reads". This way, the exact post-increment value can be returned. |
1013 | // The "live key exists" check can go inside the write query and remain safe for |
1014 | // replication since the TTL for such keys is either indefinite or very short. |
1015 | $atomic = $db->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE ); |
1016 | try { |
1017 | $db->newInsertQueryBuilder() |
1018 | ->insertInto( $ptable ) |
1019 | ->rows( $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ) ) |
1020 | ->onDuplicateKeyUpdate() |
1021 | ->uniqueIndexFields( [ 'keyname' ] ) |
1022 | ->set( $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ) ) |
1023 | ->caller( __METHOD__ )->execute(); |
1024 | $affectedCount = $db->affectedRows(); |
1025 | $row = $db->newSelectQueryBuilder() |
1026 | ->select( 'value' ) |
1027 | ->from( $ptable ) |
1028 | ->where( [ 'keyname' => $key ] ) |
1029 | ->caller( __METHOD__ ) |
1030 | ->fetchRow(); |
1031 | } catch ( Exception $e ) { |
1032 | $db->cancelAtomic( __METHOD__, $atomic ); |
1033 | throw $e; |
1034 | } |
1035 | $db->endAtomic( __METHOD__ ); |
1036 | |
1037 | if ( !$affectedCount || $row === false ) { |
1038 | $this->logger->warning( __METHOD__ . ": failed to set new $key value" ); |
1039 | continue; |
1040 | } |
1041 | |
1042 | $serialValue = $this->dbDecodeSerialValue( $db, $row->value ); |
1043 | if ( !$this->isInteger( $serialValue ) ) { |
1044 | $this->logger->warning( __METHOD__ . ": got non-integer $key value" ); |
1045 | continue; |
1046 | } |
1047 | |
1048 | $resByKey[$key] = (int)$serialValue; |
1049 | } |
1050 | |
1051 | $this->updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) ); |
1052 | } |
1053 | |
1054 | /** |
1055 | * Same as modifyTableSpecificBlobsForIncrInit() but does not return the |
1056 | * new value. |
1057 | * |
1058 | * @param IDatabase $db |
1059 | * @param string $ptable |
1060 | * @param float $mtime |
1061 | * @param array<string,array> $argsByKey |
1062 | * @param array<string,mixed> &$resByKey |
1063 | * @throws DBError |
1064 | */ |
1065 | private function modifyTableSpecificBlobsForIncrInitAsync( |
1066 | IDatabase $db, |
1067 | string $ptable, |
1068 | float $mtime, |
1069 | array $argsByKey, |
1070 | array &$resByKey |
1071 | ) { |
1072 | foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) { |
1073 | $mt = $this->makeTimestampedModificationToken( $mtime, $db ); |
1074 | $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); |
1075 | $db->newInsertQueryBuilder() |
1076 | ->insertInto( $ptable ) |
1077 | ->rows( $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ) ) |
1078 | ->onDuplicateKeyUpdate() |
1079 | ->uniqueIndexFields( [ 'keyname' ] ) |
1080 | ->set( $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ) ) |
1081 | ->caller( __METHOD__ )->execute(); |
1082 | if ( !$db->affectedRows() ) { |
1083 | $this->logger->warning( __METHOD__ . ": failed to set new $key value" ); |
1084 | } else { |
1085 | $resByKey[$key] = true; |
1086 | } |
1087 | } |
1088 | } |
1089 | |
1090 | /** |
1091 | * @param int $exptime Relative or absolute expiration |
1092 | * @param int $nowTsUnix Current UNIX timestamp |
1093 | * @return int UNIX timestamp or TTL_INDEFINITE |
1094 | */ |
1095 | private function makeNewKeyExpiry( $exptime, int $nowTsUnix ) { |
1096 | $expiry = $this->getExpirationAsTimestamp( $exptime ); |
1097 | // Eventual consistency requires the preservation of recently modified keys. |
1098 | // Do not create rows with `exptime` fields so low that they might get garbage |
1099 | // collected before being replicated. |
1100 | if ( $expiry !== self::TTL_INDEFINITE ) { |
1101 | $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC ); |
1102 | } |
1103 | |
1104 | return $expiry; |
1105 | } |
1106 | |
1107 | /** |
1108 | * Get a scoped lock and modification timestamp for a critical section of reads/writes |
1109 | * |
1110 | * This is used instead of BagOStuff::getCurrentTime() for certain writes (such as "add", |
1111 | * "incr", and "cas"), for which we want to support tight race conditions where the same |
1112 | * key is repeatedly written to by multiple web servers that each get to see the previous |
1113 | * value, act on it, and modify it in some way. |
1114 | * |
1115 | * It is assumed that this method is normally only invoked from the primary datacenter. |
1116 | * A lock is acquired on the primary server of the local datacenter in order to avoid race |
1117 | * conditions within the critical section. The clock on the SQL server is used to get the |
1118 | * modification timestamp in order to minimize issues with clock drift between web servers; |
1119 | * thus key writes will not be rejected due to some web servers having lagged clocks. |
1120 | * |
1121 | * @param string $key |
1122 | * @param ?ScopedCallback &$scope Unlocker callback; null on failure [returned] |
1123 | * @return float|null UNIX timestamp with 6 decimal places; null on failure |
1124 | */ |
1125 | private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) { |
1126 | if ( !$this->lock( $key, 0 ) ) { |
1127 | return null; |
1128 | } |
1129 | |
1130 | $scope = new ScopedCallback( function () use ( $key ) { |
1131 | $this->unlock( $key ); |
1132 | } ); |
1133 | |
1134 | // sprintf is used to adjust precision |
1135 | return (float)sprintf( '%.6F', $this->locks[$key][self::LOCK_TIME] ); |
1136 | } |
1137 | |
1138 | /** |
1139 | * Make a `modtoken` column value with the original time and source database server of a write |
1140 | * |
1141 | * @param float $mtime UNIX modification timestamp |
1142 | * @param IDatabase $db Handle to the primary database server sourcing the write |
1143 | * @return string String of the form "<SECONDS_SOURCE><MICROSECONDS>", where SECONDS_SOURCE |
1144 | * is "<35 bit seconds portion of UNIX time><32 bit database server ID>" as 13 base 36 chars, |
1145 | * and MICROSECONDS is "<20 bit microseconds portion of UNIX time>" as 4 base 36 chars |
1146 | */ |
1147 | private function makeTimestampedModificationToken( float $mtime, IDatabase $db ) { |
1148 | // We have reserved space for upto 6 digits in the microsecond portion of the token. |
1149 | // This is for future use only (maybe CAS tokens) and not currently used. |
1150 | // It is currently populated by the microsecond portion returned by microtime, |
1151 | // which generally has fewer than 6 digits of meaningful precision but can still be useful |
1152 | // in debugging (to see the token continuously change even during rapid testing). |
1153 | $seconds = (int)$mtime; |
1154 | [ , $microseconds ] = explode( '.', sprintf( '%.6F', $mtime ) ); |
1155 | |
1156 | $id = sprintf( '%u', crc32( $db->getServerName() ) ); |
1157 | |
1158 | $token = implode( '', [ |
1159 | // 67 bit integral portion of UNIX timestamp, qualified |
1160 | \Wikimedia\base_convert( |
1161 | // 35 bit integral seconds portion of UNIX timestamp |
1162 | str_pad( base_convert( (string)$seconds, 10, 2 ), 35, '0', STR_PAD_LEFT ) . |
1163 | // 32 bit ID of the primary database server handling the write |
1164 | str_pad( base_convert( $id, 10, 2 ), 32, '0', STR_PAD_LEFT ), |
1165 | 2, |
1166 | 36, |
1167 | 13 |
1168 | ), |
1169 | // 20 bit fractional portion of UNIX timestamp, as integral microseconds |
1170 | str_pad( base_convert( $microseconds, 10, 36 ), 4, '0', STR_PAD_LEFT ) |
1171 | ] ); |
1172 | |
1173 | if ( strlen( $token ) !== 17 ) { |
1174 | throw new RuntimeException( "Modification timestamp overflow detected" ); |
1175 | } |
1176 | |
1177 | return $token; |
1178 | } |
1179 | |
1180 | /** |
1181 | * WHERE conditions that check for existence and liveness of keys |
1182 | * |
1183 | * @param IDatabase $db |
1184 | * @param string[]|string $keys |
1185 | * @param int $time UNIX modification timestamp |
1186 | * @return array |
1187 | */ |
1188 | private function buildExistenceConditions( IDatabase $db, $keys, int $time ) { |
1189 | // Note that tombstones always have past expiration dates |
1190 | return [ |
1191 | 'keyname' => $keys, |
1192 | $db->expr( 'exptime', '>=', $db->timestamp( $time ) ) |
1193 | ]; |
1194 | } |
1195 | |
1196 | /** |
1197 | * INSERT array for handling key writes/overwrites when no live nor stale key exists |
1198 | * |
1199 | * @param IDatabase $db |
1200 | * @param string $key |
1201 | * @param string|int $serialValue New value |
1202 | * @param int $expiry Expiration timestamp or TTL_INDEFINITE |
1203 | * @param string $mt Modification token |
1204 | * @return array |
1205 | */ |
1206 | private function buildUpsertRow( |
1207 | IDatabase $db, |
1208 | $key, |
1209 | $serialValue, |
1210 | int $expiry, |
1211 | string $mt |
1212 | ) { |
1213 | $row = [ |
1214 | 'keyname' => $key, |
1215 | 'value' => $this->dbEncodeSerialValue( $db, $serialValue ), |
1216 | 'exptime' => $this->encodeDbExpiry( $db, $expiry ) |
1217 | ]; |
1218 | if ( $this->multiPrimaryMode ) { |
1219 | $row['modtoken'] = $mt; |
1220 | } |
1221 | |
1222 | return $row; |
1223 | } |
1224 | |
1225 | /** |
1226 | * SET array for handling key overwrites when a live or stale key exists |
1227 | * |
1228 | * @param IDatabase $db |
1229 | * @param string $mt Modification token |
1230 | * @return array |
1231 | */ |
1232 | private function buildMultiUpsertSetForOverwrite( IDatabase $db, string $mt ) { |
1233 | $expressionsByColumn = [ |
1234 | 'value' => $db->buildExcludedValue( 'value' ), |
1235 | 'exptime' => $db->buildExcludedValue( 'exptime' ) |
1236 | ]; |
1237 | |
1238 | $set = []; |
1239 | if ( $this->multiPrimaryMode ) { |
1240 | // The query might take a while to replicate, during which newer values might get |
1241 | // written. Qualify the query so that it does not override such values. Note that |
1242 | // duplicate tokens generated around the same time for a key should still result |
1243 | // in convergence given the use of server_id in modtoken (providing a total order |
1244 | // among primary DB servers) and MySQL binlog ordering (providing a total order |
1245 | // for writes replicating from a given primary DB server). |
1246 | $expressionsByColumn['modtoken'] = $db->addQuotes( $mt ); |
1247 | foreach ( $expressionsByColumn as $column => $updateExpression ) { |
1248 | $rhs = $db->conditional( |
1249 | $db->addQuotes( substr( $mt, 0, 13 ) ) . ' >= ' . |
1250 | $db->buildSubString( 'modtoken', 1, 13 ), |
1251 | $updateExpression, |
1252 | $column |
1253 | ); |
1254 | $set[$column] = new RawSQLValue( $rhs ); |
1255 | } |
1256 | } else { |
1257 | foreach ( $expressionsByColumn as $column => $updateExpression ) { |
1258 | $set[$column] = new RawSQLValue( $updateExpression ); |
1259 | } |
1260 | } |
1261 | |
1262 | return $set; |
1263 | } |
1264 | |
1265 | /** |
1266 | * SET array for handling key overwrites when a live or stale key exists |
1267 | * |
1268 | * @param IDatabase $db |
1269 | * @param int $step Positive counter incrementation value |
1270 | * @param int $init Positive initial counter value |
1271 | * @param int $expiry Expiration timestamp or TTL_INDEFINITE |
1272 | * @param string $mt Modification token |
1273 | * @param int $mtUnixTs UNIX timestamp of modification token |
1274 | * @return array |
1275 | */ |
1276 | private function buildIncrUpsertSet( |
1277 | IDatabase $db, |
1278 | int $step, |
1279 | int $init, |
1280 | int $expiry, |
1281 | string $mt, |
1282 | int $mtUnixTs |
1283 | ) { |
1284 | // Map of (column => (SQL for non-expired key rows, SQL for expired key rows)) |
1285 | $expressionsByColumn = [ |
1286 | 'value' => [ |
1287 | $db->buildIntegerCast( 'value' ) . " + {$db->addQuotes( $step )}", |
1288 | $db->addQuotes( $this->dbEncodeSerialValue( $db, $init ) ) |
1289 | ], |
1290 | 'exptime' => [ |
1291 | 'exptime', |
1292 | $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) ) |
1293 | ] |
1294 | ]; |
1295 | if ( $this->multiPrimaryMode ) { |
1296 | $expressionsByColumn['modtoken'] = [ 'modtoken', $db->addQuotes( $mt ) ]; |
1297 | } |
1298 | |
1299 | $set = []; |
1300 | foreach ( $expressionsByColumn as $column => [ $updateExpression, $initExpression ] ) { |
1301 | $rhs = $db->conditional( |
1302 | $db->expr( 'exptime', '>=', $db->timestamp( $mtUnixTs ) ), |
1303 | $updateExpression, |
1304 | $initExpression |
1305 | ); |
1306 | $set[$column] = new RawSQLValue( $rhs ); |
1307 | } |
1308 | |
1309 | return $set; |
1310 | } |
1311 | |
1312 | /** |
1313 | * @param IDatabase $db |
1314 | * @param int $expiry UNIX timestamp of expiration or TTL_INDEFINITE |
1315 | * @return string |
1316 | */ |
1317 | private function encodeDbExpiry( IDatabase $db, int $expiry ) { |
1318 | return ( $expiry === self::TTL_INDEFINITE ) |
1319 | // Use the maximum timestamp that the column can store |
1320 | ? $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) |
1321 | // Convert the absolute timestamp into the DB timestamp format |
1322 | : $db->timestamp( $expiry ); |
1323 | } |
1324 | |
1325 | /** |
1326 | * @param IDatabase $db |
1327 | * @param string $dbExpiry DB timestamp of expiration |
1328 | * @return int UNIX timestamp of expiration or TTL_INDEFINITE |
1329 | */ |
1330 | private function decodeDbExpiry( IDatabase $db, string $dbExpiry ) { |
1331 | return ( $dbExpiry === $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) ) |
1332 | ? self::TTL_INDEFINITE |
1333 | : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry ); |
1334 | } |
1335 | |
1336 | /** |
1337 | * @param IDatabase $db |
1338 | * @param string|int $serialValue |
1339 | * @return string |
1340 | */ |
1341 | private function dbEncodeSerialValue( IDatabase $db, $serialValue ) { |
1342 | return is_int( $serialValue ) ? (string)$serialValue : $db->encodeBlob( $serialValue ); |
1343 | } |
1344 | |
1345 | /** |
1346 | * @param IDatabase $db |
1347 | * @param Blob|string|int $blob |
1348 | * @return string|int |
1349 | */ |
1350 | private function dbDecodeSerialValue( IDatabase $db, $blob ) { |
1351 | return $this->isInteger( $blob ) ? (int)$blob : $db->decodeBlob( $blob ); |
1352 | } |
1353 | |
1354 | /** |
1355 | * Either append a 'castoken' field or append the fields needed to compute the CAS token |
1356 | * |
1357 | * @param IDatabase $db |
1358 | * @param string[] $fields SELECT field array |
1359 | * @return string[] SELECT field array |
1360 | */ |
1361 | private function addCasTokenFields( IDatabase $db, array $fields ) { |
1362 | $type = $db->getType(); |
1363 | |
1364 | if ( $type === 'mysql' ) { |
1365 | $fields['castoken'] = $db->buildConcat( [ |
1366 | 'SHA1(value)', |
1367 | $db->addQuotes( '@' ), |
1368 | 'exptime' |
1369 | ] ); |
1370 | } elseif ( $type === 'postgres' ) { |
1371 | $fields['castoken'] = $db->buildConcat( [ |
1372 | 'md5(value)', |
1373 | $db->addQuotes( '@' ), |
1374 | 'exptime' |
1375 | ] ); |
1376 | } else { |
1377 | if ( !in_array( 'value', $fields, true ) ) { |
1378 | $fields[] = 'value'; |
1379 | } |
1380 | if ( !in_array( 'exptime', $fields, true ) ) { |
1381 | $fields[] = 'exptime'; |
1382 | } |
1383 | } |
1384 | |
1385 | return $fields; |
1386 | } |
1387 | |
1388 | /** |
1389 | * Get a CAS token from a SELECT result row |
1390 | * |
1391 | * @param IDatabase $db |
1392 | * @param stdClass $row A row for a key |
1393 | * @return string CAS token |
1394 | */ |
1395 | private function getCasTokenFromRow( IDatabase $db, stdClass $row ) { |
1396 | if ( isset( $row->castoken ) ) { |
1397 | $token = $row->castoken; |
1398 | } else { |
1399 | $token = sha1( $this->dbDecodeSerialValue( $db, $row->value ) ) . '@' . $row->exptime; |
1400 | $this->logger->debug( __METHOD__ . ": application computed hash for CAS token" ); |
1401 | } |
1402 | |
1403 | return $token; |
1404 | } |
1405 | |
1406 | /** |
1407 | * @param int $shardIndex |
1408 | * @throws DBError |
1409 | */ |
1410 | private function garbageCollect( $shardIndex ) { |
1411 | // set right away, avoid queuing duplicate async callbacks |
1412 | $this->lastGarbageCollect = $this->getCurrentTime(); |
1413 | |
1414 | $garbageCollector = function () use ( $shardIndex ) { |
1415 | $db = $this->getConnection( $shardIndex ); |
1416 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1417 | $silenceScope = $this->silenceTransactionProfiler(); |
1418 | $this->deleteServerObjectsExpiringBefore( |
1419 | $db, |
1420 | (int)$this->getCurrentTime(), |
1421 | $this->purgeLimit |
1422 | ); |
1423 | $this->lastGarbageCollect = $this->getCurrentTime(); |
1424 | }; |
1425 | |
1426 | if ( $this->asyncHandler ) { |
1427 | ( $this->asyncHandler )( $garbageCollector ); |
1428 | } else { |
1429 | $garbageCollector(); |
1430 | } |
1431 | } |
1432 | |
1433 | /** |
1434 | * @deprecated since 1.41, use deleteObjectsExpiringBefore() instead |
1435 | */ |
1436 | public function expireAll() { |
1437 | wfDeprecated( __METHOD__, '1.41' ); |
1438 | $this->deleteObjectsExpiringBefore( (int)$this->getCurrentTime() ); |
1439 | } |
1440 | |
1441 | public function deleteObjectsExpiringBefore( |
1442 | $timestamp, |
1443 | ?callable $progress = null, |
1444 | $limit = INF, |
1445 | ?string $tag = null |
1446 | ) { |
1447 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1448 | $silenceScope = $this->silenceTransactionProfiler(); |
1449 | |
1450 | if ( $tag !== null ) { |
1451 | // Purge one server only, to support concurrent purging in large wiki farms (T282761). |
1452 | $shardIndexes = []; |
1453 | if ( !$this->serverTags ) { |
1454 | throw new InvalidArgumentException( "Given a tag but no tags are configured" ); |
1455 | } |
1456 | foreach ( $this->serverTags as $serverShardIndex => $serverTag ) { |
1457 | if ( $tag === $serverTag ) { |
1458 | $shardIndexes[] = $serverShardIndex; |
1459 | break; |
1460 | } |
1461 | } |
1462 | if ( !$shardIndexes ) { |
1463 | throw new InvalidArgumentException( "Unknown server tag: $tag" ); |
1464 | } |
1465 | } else { |
1466 | $shardIndexes = $this->getShardServerIndexes(); |
1467 | shuffle( $shardIndexes ); |
1468 | } |
1469 | |
1470 | $ok = true; |
1471 | $numServers = count( $shardIndexes ); |
1472 | |
1473 | $keysDeletedCount = 0; |
1474 | foreach ( $shardIndexes as $numServersDone => $shardIndex ) { |
1475 | try { |
1476 | $db = $this->getConnection( $shardIndex ); |
1477 | |
1478 | // Avoid deadlock (T330377) |
1479 | $lockKey = "SqlBagOStuff-purge-shard:$shardIndex"; |
1480 | if ( !$db->lock( $lockKey, __METHOD__, 0 ) ) { |
1481 | $this->logger->info( "SqlBagOStuff purge for shard $shardIndex already locked, skip" ); |
1482 | continue; |
1483 | } |
1484 | |
1485 | $this->deleteServerObjectsExpiringBefore( |
1486 | $db, |
1487 | $timestamp, |
1488 | $limit, |
1489 | $keysDeletedCount, |
1490 | [ 'fn' => $progress, 'serversDone' => $numServersDone, 'serversTotal' => $numServers ] |
1491 | ); |
1492 | $db->unlock( $lockKey, __METHOD__ ); |
1493 | } catch ( DBError $e ) { |
1494 | $this->handleDBError( $e, $shardIndex ); |
1495 | $ok = false; |
1496 | } |
1497 | } |
1498 | |
1499 | return $ok; |
1500 | } |
1501 | |
1502 | /** |
1503 | * @param IDatabase $db |
1504 | * @param string|int $timestamp |
1505 | * @param int|float $limit Maximum number of rows to delete in total or INF for no limit |
1506 | * @param int &$keysDeletedCount |
1507 | * @param array|null $progress |
1508 | * @phan-param array{fn:?callback,serversDone:int,serversTotal:int}|null $progress |
1509 | * @throws DBError |
1510 | */ |
1511 | private function deleteServerObjectsExpiringBefore( |
1512 | IDatabase $db, |
1513 | $timestamp, |
1514 | $limit, |
1515 | &$keysDeletedCount = 0, |
1516 | ?array $progress = null |
1517 | ) { |
1518 | $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp ); |
1519 | if ( $this->multiPrimaryMode ) { |
1520 | // Eventual consistency requires the preservation of any key that was recently |
1521 | // modified. The key must exist on this database server long enough for the server |
1522 | // to receive, via replication, all writes to the key with lower timestamps. Such |
1523 | // writes should be no-ops since the existing key value should "win". If the network |
1524 | // partitions between datacenters A and B for 30 minutes, the database servers in |
1525 | // each datacenter will see an initial burst of writes with "old" timestamps via |
1526 | // replication. This might include writes with lower timestamps that the existing |
1527 | // key value. Therefore, clock skew and replication delay are both factors. |
1528 | $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC; |
1529 | $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe ); |
1530 | } |
1531 | $tableIndexes = range( 0, $this->numTableShards - 1 ); |
1532 | shuffle( $tableIndexes ); |
1533 | |
1534 | $batchSize = min( $this->writeBatchSize, $limit ); |
1535 | |
1536 | foreach ( $tableIndexes as $numShardsDone => $tableIndex ) { |
1537 | // The oldest expiry of a row we have deleted on this shard |
1538 | // (the first row that we deleted) |
1539 | $minExpUnix = null; |
1540 | // The most recent expiry time so far, from a row we have deleted on this shard |
1541 | $maxExp = null; |
1542 | // Size of the time range we'll delete, in seconds (for progress estimate) |
1543 | $totalSeconds = null; |
1544 | |
1545 | do { |
1546 | $res = $db->newSelectQueryBuilder() |
1547 | ->select( [ 'keyname', 'exptime' ] ) |
1548 | ->from( $this->getTableNameByShard( $tableIndex ) ) |
1549 | ->where( $db->expr( 'exptime', '<', $db->timestamp( $cutoffUnix ) ) ) |
1550 | ->andWhere( $maxExp ? $db->expr( 'exptime', '>=', $maxExp ) : [] ) |
1551 | ->orderBy( 'exptime', SelectQueryBuilder::SORT_ASC ) |
1552 | ->limit( $batchSize ) |
1553 | ->caller( __METHOD__ ) |
1554 | ->fetchResultSet(); |
1555 | |
1556 | if ( $res->numRows() ) { |
1557 | $row = $res->current(); |
1558 | if ( $minExpUnix === null ) { |
1559 | $minExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $row->exptime ); |
1560 | $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 ); |
1561 | } |
1562 | |
1563 | $keys = []; |
1564 | foreach ( $res as $row ) { |
1565 | $keys[] = $row->keyname; |
1566 | $maxExp = $row->exptime; |
1567 | } |
1568 | |
1569 | $db->newDeleteQueryBuilder() |
1570 | ->deleteFrom( $this->getTableNameByShard( $tableIndex ) ) |
1571 | ->where( [ |
1572 | 'keyname' => $keys, |
1573 | $db->expr( 'exptime', '<', $db->timestamp( $cutoffUnix ) ), |
1574 | ] ) |
1575 | ->caller( __METHOD__ )->execute(); |
1576 | $keysDeletedCount += $db->affectedRows(); |
1577 | } |
1578 | |
1579 | if ( $progress && is_callable( $progress['fn'] ) ) { |
1580 | if ( $totalSeconds ) { |
1581 | $maxExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $maxExp ); |
1582 | $remainingSeconds = $cutoffUnix - $maxExpUnix; |
1583 | $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 ); |
1584 | // For example, if we've done 1.5 table shard, and are thus half-way on the |
1585 | // 2nd of perhaps 5 tables on this server, then this might be: |
1586 | // `( 1 + ( 43200 / 86400 ) ) / 5 = 0.3`, or 30% done, of tables on this server. |
1587 | $tablesDoneRatio = |
1588 | ( $numShardsDone + ( $processedSeconds / $totalSeconds ) ) / $this->numTableShards; |
1589 | } else { |
1590 | $tablesDoneRatio = 1; |
1591 | } |
1592 | |
1593 | // For example, if we're 30% done on the last of 10 servers, then this might be: |
1594 | // `( 9 / 10 ) + ( 0.3 / 10 ) = 0.93`, or 93% done, overall. |
1595 | $overallRatio = ( $progress['serversDone'] / $progress['serversTotal'] ) + |
1596 | ( $tablesDoneRatio / $progress['serversTotal'] ); |
1597 | ( $progress['fn'] )( $overallRatio * 100 ); |
1598 | } |
1599 | } while ( $res->numRows() && $keysDeletedCount < $limit ); |
1600 | } |
1601 | } |
1602 | |
1603 | /** |
1604 | * Delete content of shard tables in every server. |
1605 | * Return true if the operation is successful, false otherwise. |
1606 | * |
1607 | * @deprecated since 1.41, unused. |
1608 | * |
1609 | * @return bool |
1610 | */ |
1611 | public function deleteAll() { |
1612 | wfDeprecated( __METHOD__, '1.41' ); |
1613 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1614 | $silenceScope = $this->silenceTransactionProfiler(); |
1615 | foreach ( $this->getShardServerIndexes() as $shardIndex ) { |
1616 | try { |
1617 | $db = $this->getConnection( $shardIndex ); |
1618 | for ( $i = 0; $i < $this->numTableShards; $i++ ) { |
1619 | $db->newDeleteQueryBuilder() |
1620 | ->deleteFrom( $this->getTableNameByShard( $i ) ) |
1621 | ->where( $db::ALL_ROWS ) |
1622 | ->caller( __METHOD__ )->execute(); |
1623 | } |
1624 | } catch ( DBError $e ) { |
1625 | $this->handleDBError( $e, $shardIndex ); |
1626 | return false; |
1627 | } |
1628 | } |
1629 | return true; |
1630 | } |
1631 | |
1632 | public function doLock( $key, $timeout = 6, $exptime = 6 ) { |
1633 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1634 | $silenceScope = $this->silenceTransactionProfiler(); |
1635 | |
1636 | $lockTsUnix = null; |
1637 | |
1638 | $shardIndexes = $this->getShardIndexesForKey( $key ); |
1639 | foreach ( $shardIndexes as $shardIndex ) { |
1640 | try { |
1641 | $db = $this->getConnection( $shardIndex ); |
1642 | $lockTsUnix = $db->lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP ); |
1643 | } catch ( DBError $e ) { |
1644 | $this->handleDBError( $e, $shardIndex ); |
1645 | $this->logger->warning( |
1646 | __METHOD__ . ' failed due to I/O error for {key}.', |
1647 | [ 'key' => $key ] |
1648 | ); |
1649 | } |
1650 | } |
1651 | |
1652 | return $lockTsUnix; |
1653 | } |
1654 | |
1655 | public function doUnlock( $key ) { |
1656 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1657 | $silenceScope = $this->silenceTransactionProfiler(); |
1658 | |
1659 | $shardIndexes = $this->getShardIndexesForKey( $key ); |
1660 | $released = false; |
1661 | foreach ( $shardIndexes as $shardIndex ) { |
1662 | try { |
1663 | $db = $this->getConnection( $shardIndex ); |
1664 | $released = $db->unlock( $key, __METHOD__ ); |
1665 | } catch ( DBError $e ) { |
1666 | $this->handleDBError( $e, $shardIndex ); |
1667 | $released = false; |
1668 | } |
1669 | } |
1670 | |
1671 | return $released; |
1672 | } |
1673 | |
1674 | protected function makeKeyInternal( $keyspace, $components ) { |
1675 | // SQL schema for 'objectcache' specifies keys as varchar(255). From that, |
1676 | // subtract the number of characters we need for the keyspace and for |
1677 | // the separator character needed for each argument. To handle some |
1678 | // custom prefixes used by things like WANObjectCache, limit to 205. |
1679 | $keyspace = strtr( $keyspace, ' ', '_' ); |
1680 | $charsLeft = 205 - strlen( $keyspace ) - count( $components ); |
1681 | foreach ( $components as &$component ) { |
1682 | $component = strtr( |
1683 | $component ?? '', |
1684 | [ |
1685 | ' ' => '_', // Avoid unnecessary misses from pre-1.35 code |
1686 | ':' => '%3A', |
1687 | ] |
1688 | ); |
1689 | |
1690 | // 33 = 32 characters for the MD5 + 1 for the '#' prefix. |
1691 | if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) { |
1692 | $component = '#' . md5( $component ); |
1693 | } |
1694 | $charsLeft -= strlen( $component ); |
1695 | } |
1696 | unset( $component ); |
1697 | |
1698 | if ( $charsLeft < 0 ) { |
1699 | return $keyspace . ':BagOStuff-long-key:##' . md5( implode( ':', $components ) ); |
1700 | } |
1701 | return $keyspace . ':' . implode( ':', $components ); |
1702 | } |
1703 | |
1704 | protected function requireConvertGenericKey(): bool { |
1705 | return true; |
1706 | } |
1707 | |
1708 | protected function serialize( $value ) { |
1709 | if ( is_int( $value ) ) { |
1710 | return $value; |
1711 | } |
1712 | |
1713 | $serial = serialize( $value ); |
1714 | if ( $this->hasZlib ) { |
1715 | // On typical message and page data, this can provide a 3X storage savings |
1716 | $serial = gzdeflate( $serial ); |
1717 | } |
1718 | |
1719 | return $serial; |
1720 | } |
1721 | |
1722 | protected function unserialize( $value ) { |
1723 | if ( $value === self::TOMB_SERIAL ) { |
1724 | return false; // tombstone |
1725 | } |
1726 | |
1727 | if ( $this->isInteger( $value ) ) { |
1728 | return (int)$value; |
1729 | } |
1730 | |
1731 | if ( $this->hasZlib ) { |
1732 | AtEase::suppressWarnings(); |
1733 | $decompressed = gzinflate( $value ); |
1734 | AtEase::restoreWarnings(); |
1735 | |
1736 | if ( $decompressed !== false ) { |
1737 | $value = $decompressed; |
1738 | } |
1739 | } |
1740 | |
1741 | return unserialize( $value ); |
1742 | } |
1743 | |
1744 | private function getLoadBalancer(): ILoadBalancer { |
1745 | if ( !$this->loadBalancer ) { |
1746 | $this->loadBalancer = ( $this->loadBalancerCallback )(); |
1747 | } |
1748 | return $this->loadBalancer; |
1749 | } |
1750 | |
1751 | /** |
1752 | * @return IMaintainableDatabase |
1753 | * @throws DBError |
1754 | */ |
1755 | private function getConnectionViaLoadBalancer() { |
1756 | $lb = $this->getLoadBalancer(); |
1757 | |
1758 | if ( $lb->getServerAttributes( ServerInfo::WRITER_INDEX )[Database::ATTR_DB_LEVEL_LOCKING] ) { |
1759 | // Use the main connection to avoid transaction deadlocks |
1760 | $conn = $lb->getMaintenanceConnectionRef( DB_PRIMARY, [], $this->dbDomain ); |
1761 | } else { |
1762 | // If the RDBMS has row/table/page level locking, then use separate auto-commit |
1763 | // connection to avoid needless contention and deadlocks. |
1764 | $conn = $lb->getMaintenanceConnectionRef( |
1765 | DB_PRIMARY, |
1766 | [], |
1767 | $this->dbDomain, |
1768 | $lb::CONN_TRX_AUTOCOMMIT |
1769 | ); |
1770 | } |
1771 | |
1772 | // Make sure any errors are thrown now while we can more easily handle them |
1773 | $conn->ensureConnection(); |
1774 | return $conn; |
1775 | } |
1776 | |
1777 | /** |
1778 | * @param int $shardIndex |
1779 | * @param array $server Server config map |
1780 | * @return IMaintainableDatabase |
1781 | * @throws DBError |
1782 | */ |
1783 | private function getConnectionFromServerInfo( $shardIndex, array $server ) { |
1784 | if ( !isset( $this->conns[$shardIndex] ) ) { |
1785 | $server['logger'] = $this->logger; |
1786 | // Always use autocommit mode, even if DBO_TRX is configured |
1787 | $server['flags'] ??= 0; |
1788 | $server['flags'] &= ~( IDatabase::DBO_TRX | IDatabase::DBO_DEFAULT ); |
1789 | |
1790 | /** @var IMaintainableDatabase $conn Auto-commit connection to the server */ |
1791 | $conn = MediaWikiServices::getInstance()->getDatabaseFactory() |
1792 | ->create( $server['type'], $server ); |
1793 | |
1794 | // Automatically create the objectcache table for sqlite as needed |
1795 | if ( $conn->getType() === 'sqlite' ) { |
1796 | $this->initSqliteDatabase( $conn ); |
1797 | } |
1798 | $this->conns[$shardIndex] = $conn; |
1799 | } |
1800 | |
1801 | // @phan-suppress-next-line PhanTypeMismatchReturnNullable False positive |
1802 | return $this->conns[$shardIndex]; |
1803 | } |
1804 | |
1805 | /** |
1806 | * Handle a DBError which occurred during a read operation. |
1807 | * |
1808 | * @param DBError $exception |
1809 | * @param int $shardIndex Server index |
1810 | */ |
1811 | private function handleDBError( DBError $exception, $shardIndex ) { |
1812 | if ( !$this->useLB && $exception instanceof DBConnectionError ) { |
1813 | unset( $this->conns[$shardIndex] ); // bug T103435 |
1814 | |
1815 | $now = $this->getCurrentTime(); |
1816 | if ( isset( $this->connFailureTimes[$shardIndex] ) ) { |
1817 | if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) { |
1818 | unset( $this->connFailureTimes[$shardIndex] ); |
1819 | unset( $this->connFailureErrors[$shardIndex] ); |
1820 | } else { |
1821 | $this->logger->debug( __METHOD__ . ": Server #$shardIndex already down" ); |
1822 | return; |
1823 | } |
1824 | } |
1825 | $this->logger->info( __METHOD__ . ": Server #$shardIndex down until " . ( $now + 60 ) ); |
1826 | $this->connFailureTimes[$shardIndex] = $now; |
1827 | $this->connFailureErrors[$shardIndex] = $exception; |
1828 | } |
1829 | $this->logger->error( "DBError: {$exception->getMessage()}", [ 'exception' => $exception ] ); |
1830 | if ( $exception instanceof DBConnectionError ) { |
1831 | $this->setLastError( self::ERR_UNREACHABLE ); |
1832 | $this->logger->warning( __METHOD__ . ": ignoring connection error" ); |
1833 | } else { |
1834 | $this->setLastError( self::ERR_UNEXPECTED ); |
1835 | $this->logger->warning( __METHOD__ . ": ignoring query error" ); |
1836 | } |
1837 | } |
1838 | |
1839 | /** |
1840 | * @param IMaintainableDatabase $db |
1841 | * @throws DBError |
1842 | */ |
1843 | private function initSqliteDatabase( IMaintainableDatabase $db ) { |
1844 | if ( $db->tableExists( 'objectcache', __METHOD__ ) ) { |
1845 | return; |
1846 | } |
1847 | // Use one table for SQLite; sharding does not seem to have much benefit |
1848 | $db->query( "PRAGMA journal_mode=WAL", __METHOD__ ); // this is permanent |
1849 | $db->startAtomic( __METHOD__ ); // atomic DDL |
1850 | try { |
1851 | $encTable = $db->tableName( 'objectcache' ); |
1852 | $encExptimeIndex = $db->addIdentifierQuotes( $db->tablePrefix() . 'exptime' ); |
1853 | $db->query( |
1854 | "CREATE TABLE $encTable (\n" . |
1855 | " keyname BLOB NOT NULL default '' PRIMARY KEY,\n" . |
1856 | " value BLOB,\n" . |
1857 | " exptime BLOB NOT NULL\n" . |
1858 | ")", |
1859 | __METHOD__ |
1860 | ); |
1861 | $db->query( "CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ ); |
1862 | $db->endAtomic( __METHOD__ ); |
1863 | } catch ( DBError $e ) { |
1864 | $db->rollback( __METHOD__ ); |
1865 | throw $e; |
1866 | } |
1867 | } |
1868 | |
1869 | /** |
1870 | * Create the shard tables on all databases |
1871 | * |
1872 | * This is typically called manually by a sysadmin via eval.php, e.g. for ParserCache: |
1873 | * |
1874 | * @code |
1875 | * ObjectCache::getInstance( 'myparsercache' )->createTables(); |
1876 | * @endcode |
1877 | * |
1878 | * This is different from `$services->getParserCache()->getCacheStorage()->createTables()`, |
1879 | * which would use the backend set via $wgParserCacheType, which shouldn't be |
1880 | * set yet for the backend you are creating shard tables on. The expectation |
1881 | * is to first add the new backend to $wgObjectCaches, run the above, and then enable |
1882 | * it for live ParserCache traffic by setting $wgParserCacheType. |
1883 | */ |
1884 | public function createTables() { |
1885 | foreach ( $this->getShardServerIndexes() as $shardIndex ) { |
1886 | $db = $this->getConnection( $shardIndex ); |
1887 | if ( in_array( $db->getType(), [ 'mysql', 'postgres' ], true ) ) { |
1888 | for ( $i = 0; $i < $this->numTableShards; $i++ ) { |
1889 | $encBaseTable = $db->tableName( 'objectcache' ); |
1890 | $encShardTable = $db->tableName( $this->getTableNameByShard( $i ) ); |
1891 | $db->query( "CREATE TABLE IF NOT EXISTS $encShardTable LIKE $encBaseTable", __METHOD__ ); |
1892 | } |
1893 | } |
1894 | } |
1895 | } |
1896 | |
1897 | /** |
1898 | * @return int[] List of server indexes |
1899 | */ |
1900 | private function getShardServerIndexes() { |
1901 | if ( $this->useLB ) { |
1902 | // LoadBalancer based configuration |
1903 | $shardIndexes = [ 0 ]; |
1904 | } else { |
1905 | // Striped array of database servers |
1906 | $shardIndexes = array_keys( $this->serverTags ); |
1907 | } |
1908 | |
1909 | return $shardIndexes; |
1910 | } |
1911 | |
1912 | /** |
1913 | * Silence the transaction profiler until the return value falls out of scope |
1914 | * |
1915 | * @return ScopedCallback|null |
1916 | */ |
1917 | private function silenceTransactionProfiler() { |
1918 | if ( $this->serverInfos ) { |
1919 | return null; // no TransactionProfiler injected anyway |
1920 | } |
1921 | return Profiler::instance()->getTransactionProfiler()->silenceForScope(); |
1922 | } |
1923 | } |