Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
74.80% |
567 / 758 |
|
48.21% |
27 / 56 |
CRAP | |
0.00% |
0 / 1 |
SqlBagOStuff | |
74.80% |
567 / 758 |
|
48.21% |
27 / 56 |
839.96 | |
0.00% |
0 / 1 |
__construct | |
77.78% |
21 / 27 |
|
0.00% |
0 / 1 |
7.54 | |||
doGet | |
100.00% |
12 / 12 |
|
100.00% |
1 / 1 |
4 | |||
doSet | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
doDelete | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
doAdd | |
87.50% |
7 / 8 |
|
0.00% |
0 / 1 |
2.01 | |||
doCas | |
87.50% |
7 / 8 |
|
0.00% |
0 / 1 |
2.01 | |||
doChangeTTL | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
doIncrWithInit | |
100.00% |
11 / 11 |
|
100.00% |
1 / 1 |
3 | |||
doGetMulti | |
100.00% |
15 / 15 |
|
100.00% |
1 / 1 |
4 | |||
doSetMulti | |
100.00% |
11 / 11 |
|
100.00% |
1 / 1 |
1 | |||
doDeleteMulti | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
doChangeTTLMulti | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
getConnection | |
80.00% |
8 / 10 |
|
0.00% |
0 / 1 |
5.20 | |||
getKeyLocation | |
58.33% |
7 / 12 |
|
0.00% |
0 / 1 |
5.16 | |||
getTableNameByShard | |
50.00% |
2 / 4 |
|
0.00% |
0 / 1 |
4.12 | |||
fetchBlobs | |
90.00% |
36 / 40 |
|
0.00% |
0 / 1 |
11.12 | |||
modifyBlobs | |
79.17% |
19 / 24 |
|
0.00% |
0 / 1 |
10.90 | |||
modifyTableSpecificBlobsForSet | |
100.00% |
24 / 24 |
|
100.00% |
1 / 1 |
4 | |||
modifyTableSpecificBlobsForDelete | |
100.00% |
18 / 18 |
|
100.00% |
1 / 1 |
4 | |||
modifyTableSpecificBlobsForAdd | |
100.00% |
30 / 30 |
|
100.00% |
1 / 1 |
5 | |||
modifyTableSpecificBlobsForCas | |
92.31% |
36 / 39 |
|
0.00% |
0 / 1 |
7.02 | |||
modifyTableSpecificBlobsForChangeTTL | |
100.00% |
44 / 44 |
|
100.00% |
1 / 1 |
9 | |||
modifyTableSpecificBlobsForIncrInit | |
77.42% |
24 / 31 |
|
0.00% |
0 / 1 |
6.41 | |||
modifyTableSpecificBlobsForIncrInitAsync | |
92.31% |
12 / 13 |
|
0.00% |
0 / 1 |
3.00 | |||
makeNewKeyExpiry | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
2 | |||
newLockingWriteSectionModificationTimestamp | |
83.33% |
5 / 6 |
|
0.00% |
0 / 1 |
2.02 | |||
makeTimestampedModificationToken | |
93.75% |
15 / 16 |
|
0.00% |
0 / 1 |
2.00 | |||
buildExistenceConditions | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
1 | |||
buildUpsertRow | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
2 | |||
buildMultiUpsertSetForOverwrite | |
100.00% |
18 / 18 |
|
100.00% |
1 / 1 |
4 | |||
buildIncrUpsertSet | |
100.00% |
21 / 21 |
|
100.00% |
1 / 1 |
3 | |||
encodeDbExpiry | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
2 | |||
decodeDbExpiry | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
dbEncodeSerialValue | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
2 | |||
dbDecodeSerialValue | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
2 | |||
addCasTokenFields | |
44.44% |
8 / 18 |
|
0.00% |
0 / 1 |
9.29 | |||
getCasTokenFromRow | |
80.00% |
4 / 5 |
|
0.00% |
0 / 1 |
2.03 | |||
garbageCollect | |
92.86% |
13 / 14 |
|
0.00% |
0 / 1 |
2.00 | |||
expireAll | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
deleteObjectsExpiringBefore | |
0.00% |
0 / 34 |
|
0.00% |
0 / 1 |
90 | |||
deleteServerObjectsExpiringBefore | |
42.86% |
21 / 49 |
|
0.00% |
0 / 1 |
33.58 | |||
deleteAll | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
20 | |||
doLock | |
50.00% |
6 / 12 |
|
0.00% |
0 / 1 |
2.50 | |||
doUnlock | |
62.50% |
5 / 8 |
|
0.00% |
0 / 1 |
2.21 | |||
makeKeyInternal | |
100.00% |
13 / 13 |
|
100.00% |
1 / 1 |
5 | |||
requireConvertGenericKey | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
serialize | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
3 | |||
unserialize | |
90.91% |
10 / 11 |
|
0.00% |
0 / 1 |
5.02 | |||
getLoadBalancer | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
getConnectionViaLoadBalancer | |
45.45% |
5 / 11 |
|
0.00% |
0 / 1 |
4.46 | |||
getConnectionFromServerInfo | |
100.00% |
9 / 9 |
|
100.00% |
1 / 1 |
3 | |||
handleDBError | |
0.00% |
0 / 18 |
|
0.00% |
0 / 1 |
42 | |||
initSqliteDatabase | |
10.53% |
2 / 19 |
|
0.00% |
0 / 1 |
9.45 | |||
createTables | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
getShardServerIndexes | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
silenceTransactionProfiler | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 |
1 | <?php |
2 | /** |
3 | * Object caching using a SQL database. |
4 | * |
5 | * This program is free software; you can redistribute it and/or modify |
6 | * it under the terms of the GNU General Public License as published by |
7 | * the Free Software Foundation; either version 2 of the License, or |
8 | * (at your option) any later version. |
9 | * |
10 | * This program is distributed in the hope that it will be useful, |
11 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | * GNU General Public License for more details. |
14 | * |
15 | * You should have received a copy of the GNU General Public License along |
16 | * with this program; if not, write to the Free Software Foundation, Inc., |
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
18 | * http://www.gnu.org/copyleft/gpl.html |
19 | * |
20 | * @file |
21 | * @ingroup Cache |
22 | */ |
23 | |
24 | use MediaWiki\MediaWikiServices; |
25 | use Wikimedia\AtEase\AtEase; |
26 | use Wikimedia\Rdbms\Blob; |
27 | use Wikimedia\Rdbms\Database; |
28 | use Wikimedia\Rdbms\DBConnectionError; |
29 | use Wikimedia\Rdbms\DBError; |
30 | use Wikimedia\Rdbms\DBQueryError; |
31 | use Wikimedia\Rdbms\IDatabase; |
32 | use Wikimedia\Rdbms\ILoadBalancer; |
33 | use Wikimedia\Rdbms\IMaintainableDatabase; |
34 | use Wikimedia\Rdbms\SelectQueryBuilder; |
35 | use Wikimedia\ScopedCallback; |
36 | use Wikimedia\Timestamp\ConvertibleTimestamp; |
37 | |
38 | /** |
39 | * RDBMS-based caching module |
40 | * |
41 | * The following database sharding schemes are supported: |
42 | * - None; all keys map to the same shard |
43 | * - Hash; keys map to shards via consistent hashing |
44 | * |
45 | * The following database replication topologies are supported: |
46 | * - A primary database server for each shard, all within one datacenter |
47 | * - A co-primary database server for each shard within each datacenter |
48 | * |
49 | * @ingroup Cache |
50 | */ |
51 | class SqlBagOStuff extends MediumSpecificBagOStuff { |
52 | /** @var callable|null Injected function which returns a LoadBalancer */ |
53 | protected $loadBalancerCallback; |
54 | /** @var ILoadBalancer|null */ |
55 | protected $loadBalancer; |
56 | /** @var string|false|null DB name used for keys using the LoadBalancer */ |
57 | protected $dbDomain; |
58 | /** @var bool Whether to use the LoadBalancer */ |
59 | protected $useLB = false; |
60 | |
61 | /** @var array[] (server index => server config) */ |
62 | protected $serverInfos = []; |
63 | /** @var string[] (server index => tag/host name) */ |
64 | protected $serverTags = []; |
65 | /** @var float UNIX timestamp */ |
66 | protected $lastGarbageCollect = 0; |
67 | /** @var int Average number of writes required to trigger garbage collection */ |
68 | protected $purgePeriod = 10; |
69 | /** @var int Max expired rows to purge during randomized garbage collection */ |
70 | protected $purgeLimit = 100; |
71 | /** @var int Number of table shards to use on each server */ |
72 | protected $numTableShards = 1; |
73 | /** @var int */ |
74 | protected $writeBatchSize = 100; |
75 | /** @var string */ |
76 | protected $tableName = 'objectcache'; |
77 | /** @var bool Whether to use replicas instead of primaries (if using LoadBalancer) */ |
78 | protected $replicaOnly; |
79 | /** @var bool Whether multi-primary mode is enabled */ |
80 | protected $multiPrimaryMode; |
81 | |
82 | /** @var IMaintainableDatabase[] Map of (shard index => DB handle) */ |
83 | protected $conns; |
84 | /** @var float[] Map of (shard index => UNIX timestamps) */ |
85 | protected $connFailureTimes = []; |
86 | /** @var Exception[] Map of (shard index => Exception) */ |
87 | protected $connFailureErrors = []; |
88 | |
89 | /** @var bool Whether zlib methods are available to PHP */ |
90 | private $hasZlib; |
91 | |
92 | /** A number of seconds well above any expected clock skew */ |
93 | private const SAFE_CLOCK_BOUND_SEC = 15; |
94 | /** A number of seconds well above any expected clock skew and replication lag */ |
95 | private const SAFE_PURGE_DELAY_SEC = 3600; |
96 | /** Distinct string for tombstones stored in the "serialized" value column */ |
97 | private const TOMB_SERIAL = ''; |
98 | /** Relative seconds-to-live to use for tombstones */ |
99 | private const TOMB_EXPTIME = -self::SAFE_CLOCK_BOUND_SEC; |
100 | /** How many seconds must pass before triggering a garbage collection */ |
101 | private const GC_DELAY_SEC = 1; |
102 | |
103 | private const BLOB_VALUE = 0; |
104 | private const BLOB_EXPIRY = 1; |
105 | private const BLOB_CASTOKEN = 2; |
106 | |
107 | /** |
108 | * Placeholder timestamp to use for TTL_INDEFINITE that can be stored in all RDBMs types. |
109 | * We use BINARY(14) for MySQL, BLOB for Sqlite, and TIMESTAMPZ for Postgres (which goes |
110 | * up to 294276 AD). The last second of the year 9999 can be stored in all these cases. |
111 | * https://www.postgresql.org/docs/9.0/datatype-datetime.html |
112 | */ |
113 | private const INF_TIMESTAMP_PLACEHOLDER = '99991231235959'; |
114 | |
115 | /** |
116 | * Create a new backend instance from parameters injected by ObjectCache::newFromParams() |
117 | * |
118 | * The database servers must be provided by *either* the "server" parameter, the "servers" |
119 | * parameter or the "loadBalancer" parameter. |
120 | * |
121 | * The parameters are as described at {@link \MediaWiki\MainConfigSchema::ObjectCaches} |
122 | * except that: |
123 | * |
124 | * - the configured "cluster" and main LB fallback modes are implemented by |
125 | * the wiring by passing "loadBalancerCallback". |
126 | * - "dbDomain" is required if "loadBalancerCallback" is set, whereas in |
127 | * config it may be absent. |
128 | * |
129 | * @internal |
130 | * |
131 | * @param array $params |
132 | * - server: string |
133 | * - servers: string[] |
134 | * - loadBalancerCallback: A closure which provides a LoadBalancer object |
135 | * - dbDomain: string|false |
136 | * - multiPrimaryMode: bool |
137 | * - purgePeriod: int|float |
138 | * - purgeLimit: int |
139 | * - tableName: string |
140 | * - shards: int |
141 | * - replicaOnly: bool |
142 | * - writeBatchSize: int |
143 | */ |
144 | public function __construct( $params ) { |
145 | parent::__construct( $params ); |
146 | |
147 | if ( isset( $params['servers'] ) || isset( $params['server'] ) ) { |
148 | // Configuration uses a direct list of servers. |
149 | // Object data is horizontally partitioned via key hash. |
150 | $index = 0; |
151 | foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) { |
152 | $this->serverInfos[$index] = $info; |
153 | // Allow integer-indexes arrays for b/c |
154 | $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index"; |
155 | ++$index; |
156 | } |
157 | } elseif ( isset( $params['loadBalancerCallback'] ) ) { |
158 | $this->loadBalancerCallback = $params['loadBalancerCallback']; |
159 | if ( !isset( $params['dbDomain'] ) ) { |
160 | throw new InvalidArgumentException( |
161 | __METHOD__ . ": 'dbDomain' is required if 'loadBalancerCallback' is given" |
162 | ); |
163 | } |
164 | $this->dbDomain = $params['dbDomain']; |
165 | $this->useLB = true; |
166 | } else { |
167 | throw new InvalidArgumentException( |
168 | __METHOD__ . " requires 'server', 'servers', or 'loadBalancerCallback'" |
169 | ); |
170 | } |
171 | |
172 | $this->purgePeriod = intval( $params['purgePeriod'] ?? $this->purgePeriod ); |
173 | $this->purgeLimit = intval( $params['purgeLimit'] ?? $this->purgeLimit ); |
174 | $this->tableName = $params['tableName'] ?? $this->tableName; |
175 | $this->numTableShards = intval( $params['shards'] ?? $this->numTableShards ); |
176 | $this->writeBatchSize = intval( $params['writeBatchSize'] ?? $this->writeBatchSize ); |
177 | $this->replicaOnly = $params['replicaOnly'] ?? false; |
178 | $this->multiPrimaryMode = $params['multiPrimaryMode'] ?? false; |
179 | |
180 | $this->attrMap[self::ATTR_DURABILITY] = self::QOS_DURABILITY_RDBMS; |
181 | |
182 | $this->hasZlib = extension_loaded( 'zlib' ); |
183 | } |
184 | |
185 | protected function doGet( $key, $flags = 0, &$casToken = null ) { |
186 | $getToken = ( $casToken === self::PASS_BY_REF ); |
187 | $casToken = null; |
188 | |
189 | $data = $this->fetchBlobs( [ $key ], $getToken )[$key]; |
190 | if ( $data ) { |
191 | $result = $this->unserialize( $data[self::BLOB_VALUE] ); |
192 | if ( $getToken && $result !== false ) { |
193 | $casToken = $data[self::BLOB_CASTOKEN]; |
194 | } |
195 | $valueSize = strlen( $data[self::BLOB_VALUE] ); |
196 | } else { |
197 | $result = false; |
198 | $valueSize = false; |
199 | } |
200 | |
201 | $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] ); |
202 | |
203 | return $result; |
204 | } |
205 | |
206 | protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) { |
207 | $mtime = $this->getCurrentTime(); |
208 | |
209 | return $this->modifyBlobs( |
210 | [ $this, 'modifyTableSpecificBlobsForSet' ], |
211 | $mtime, |
212 | [ $key => [ $value, $exptime ] ] |
213 | ); |
214 | } |
215 | |
216 | protected function doDelete( $key, $flags = 0 ) { |
217 | $mtime = $this->getCurrentTime(); |
218 | |
219 | return $this->modifyBlobs( |
220 | [ $this, 'modifyTableSpecificBlobsForDelete' ], |
221 | $mtime, |
222 | [ $key => [] ] |
223 | ); |
224 | } |
225 | |
226 | protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) { |
227 | $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope ); |
228 | if ( $mtime === null ) { |
229 | // Timeout or I/O error during lock acquisition |
230 | return false; |
231 | } |
232 | |
233 | return $this->modifyBlobs( |
234 | [ $this, 'modifyTableSpecificBlobsForAdd' ], |
235 | $mtime, |
236 | [ $key => [ $value, $exptime ] ] |
237 | ); |
238 | } |
239 | |
240 | protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) { |
241 | $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope ); |
242 | if ( $mtime === null ) { |
243 | // Timeout or I/O error during lock acquisition |
244 | return false; |
245 | } |
246 | |
247 | return $this->modifyBlobs( |
248 | [ $this, 'modifyTableSpecificBlobsForCas' ], |
249 | $mtime, |
250 | [ $key => [ $value, $exptime, $casToken ] ] |
251 | ); |
252 | } |
253 | |
254 | protected function doChangeTTL( $key, $exptime, $flags ) { |
255 | $mtime = $this->getCurrentTime(); |
256 | |
257 | return $this->modifyBlobs( |
258 | [ $this, 'modifyTableSpecificBlobsForChangeTTL' ], |
259 | $mtime, |
260 | [ $key => [ $exptime ] ] |
261 | ); |
262 | } |
263 | |
264 | protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) { |
265 | $mtime = $this->getCurrentTime(); |
266 | |
267 | if ( $flags & self::WRITE_BACKGROUND ) { |
268 | $callback = [ $this, 'modifyTableSpecificBlobsForIncrInitAsync' ]; |
269 | } else { |
270 | $callback = [ $this, 'modifyTableSpecificBlobsForIncrInit' ]; |
271 | } |
272 | |
273 | $result = $this->modifyBlobs( |
274 | $callback, |
275 | $mtime, |
276 | [ $key => [ $step, $init, $exptime ] ], |
277 | $resByKey |
278 | ) ? $resByKey[$key] : false; |
279 | |
280 | return $result; |
281 | } |
282 | |
283 | protected function doGetMulti( array $keys, $flags = 0 ) { |
284 | $result = []; |
285 | $valueSizeByKey = []; |
286 | |
287 | $dataByKey = $this->fetchBlobs( $keys ); |
288 | foreach ( $keys as $key ) { |
289 | $data = $dataByKey[$key]; |
290 | if ( $data ) { |
291 | $serialValue = $data[self::BLOB_VALUE]; |
292 | $value = $this->unserialize( $serialValue ); |
293 | if ( $value !== false ) { |
294 | $result[$key] = $value; |
295 | } |
296 | $valueSize = strlen( $serialValue ); |
297 | } else { |
298 | $valueSize = false; |
299 | } |
300 | $valueSizeByKey[$key] = [ 0, $valueSize ]; |
301 | } |
302 | |
303 | $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey ); |
304 | |
305 | return $result; |
306 | } |
307 | |
308 | protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) { |
309 | $mtime = $this->getCurrentTime(); |
310 | |
311 | return $this->modifyBlobs( |
312 | [ $this, 'modifyTableSpecificBlobsForSet' ], |
313 | $mtime, |
314 | array_map( |
315 | static function ( $value ) use ( $exptime ) { |
316 | return [ $value, $exptime ]; |
317 | }, |
318 | $data |
319 | ) |
320 | ); |
321 | } |
322 | |
323 | protected function doDeleteMulti( array $keys, $flags = 0 ) { |
324 | $mtime = $this->getCurrentTime(); |
325 | |
326 | return $this->modifyBlobs( |
327 | [ $this, 'modifyTableSpecificBlobsForDelete' ], |
328 | $mtime, |
329 | array_fill_keys( $keys, [] ) |
330 | ); |
331 | } |
332 | |
333 | public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) { |
334 | $mtime = $this->getCurrentTime(); |
335 | |
336 | return $this->modifyBlobs( |
337 | [ $this, 'modifyTableSpecificBlobsForChangeTTL' ], |
338 | $mtime, |
339 | array_fill_keys( $keys, [ $exptime ] ) |
340 | ); |
341 | } |
342 | |
343 | /** |
344 | * Get a connection to the specified database |
345 | * |
346 | * @param int $shardIndex Server index |
347 | * @return IMaintainableDatabase |
348 | * @throws DBConnectionError |
349 | * @throws UnexpectedValueException |
350 | */ |
351 | private function getConnection( $shardIndex ) { |
352 | if ( $this->useLB ) { |
353 | return $this->getConnectionViaLoadBalancer(); |
354 | } |
355 | |
356 | // Don't keep timing out trying to connect if the server is down |
357 | if ( |
358 | isset( $this->connFailureErrors[$shardIndex] ) && |
359 | ( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60 |
360 | ) { |
361 | throw $this->connFailureErrors[$shardIndex]; |
362 | } |
363 | |
364 | if ( isset( $this->serverInfos[$shardIndex] ) ) { |
365 | $server = $this->serverInfos[$shardIndex]; |
366 | $conn = $this->getConnectionFromServerInfo( $shardIndex, $server ); |
367 | } else { |
368 | throw new UnexpectedValueException( "Invalid server index #$shardIndex" ); |
369 | } |
370 | |
371 | return $conn; |
372 | } |
373 | |
374 | /** |
375 | * Get the server index and table name for a given key |
376 | * @param string $key |
377 | * @return array (server index, table name) |
378 | */ |
379 | private function getKeyLocation( $key ) { |
380 | if ( $this->useLB ) { |
381 | // LoadBalancer based configuration |
382 | $shardIndex = 0; |
383 | } else { |
384 | // Striped array of database servers |
385 | if ( count( $this->serverTags ) == 1 ) { |
386 | $shardIndex = 0; // short-circuit |
387 | } else { |
388 | $sortedServers = $this->serverTags; |
389 | ArrayUtils::consistentHashSort( $sortedServers, $key ); |
390 | $shardIndex = array_key_first( $sortedServers ); |
391 | } |
392 | } |
393 | |
394 | if ( $this->numTableShards > 1 ) { |
395 | $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff; |
396 | $tableIndex = $hash % $this->numTableShards; |
397 | } else { |
398 | $tableIndex = null; |
399 | } |
400 | |
401 | return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ]; |
402 | } |
403 | |
404 | /** |
405 | * Get the table name for a given shard index |
406 | * @param int|null $index |
407 | * @return string |
408 | */ |
409 | private function getTableNameByShard( $index ) { |
410 | if ( $index !== null && $this->numTableShards > 1 ) { |
411 | $decimals = strlen( (string)( $this->numTableShards - 1 ) ); |
412 | |
413 | return $this->tableName . sprintf( "%0{$decimals}d", $index ); |
414 | } |
415 | |
416 | return $this->tableName; |
417 | } |
418 | |
419 | /** |
420 | * @param string[] $keys |
421 | * @param bool $getCasToken Whether to get a CAS token |
422 | * @return array<string,array|null> Order-preserved map of (key => (value,expiry,token) or null) |
423 | */ |
424 | private function fetchBlobs( array $keys, bool $getCasToken = false ) { |
425 | /** @noinspection PhpUnusedLocalVariableInspection */ |
426 | $silenceScope = $this->silenceTransactionProfiler(); |
427 | |
428 | // Initialize order-preserved per-key results; set values for live keys below |
429 | $dataByKey = array_fill_keys( $keys, null ); |
430 | |
431 | $readTime = (int)$this->getCurrentTime(); |
432 | $keysByTableByShard = []; |
433 | foreach ( $keys as $key ) { |
434 | [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key ); |
435 | $keysByTableByShard[$shardIndex][$partitionTable][] = $key; |
436 | } |
437 | |
438 | foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) { |
439 | try { |
440 | $db = $this->getConnection( $shardIndex ); |
441 | foreach ( $serverKeys as $partitionTable => $tableKeys ) { |
442 | $res = $db->newSelectQueryBuilder() |
443 | ->select( |
444 | $getCasToken |
445 | ? $this->addCasTokenFields( $db, [ 'keyname', 'value', 'exptime' ] ) |
446 | : [ 'keyname', 'value', 'exptime' ] ) |
447 | ->from( $partitionTable ) |
448 | ->where( $this->buildExistenceConditions( $db, $tableKeys, $readTime ) ) |
449 | ->caller( __METHOD__ ) |
450 | ->fetchResultSet(); |
451 | foreach ( $res as $row ) { |
452 | $row->shardIndex = $shardIndex; |
453 | $row->tableName = $partitionTable; |
454 | $dataByKey[$row->keyname] = $row; |
455 | } |
456 | } |
457 | } catch ( DBError $e ) { |
458 | $this->handleDBError( $e, $shardIndex ); |
459 | } |
460 | } |
461 | |
462 | foreach ( $keys as $key ) { |
463 | $row = $dataByKey[$key] ?? null; |
464 | if ( !$row ) { |
465 | continue; |
466 | } |
467 | |
468 | $this->debug( __METHOD__ . ": retrieved $key; expiry time is {$row->exptime}" ); |
469 | try { |
470 | $db = $this->getConnection( $row->shardIndex ); |
471 | $dataByKey[$key] = [ |
472 | self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ), |
473 | self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ), |
474 | self::BLOB_CASTOKEN => $getCasToken |
475 | ? $this->getCasTokenFromRow( $db, $row ) |
476 | : null |
477 | ]; |
478 | } catch ( DBQueryError $e ) { |
479 | $this->handleDBError( $e, $row->shardIndex ); |
480 | } |
481 | } |
482 | |
483 | return $dataByKey; |
484 | } |
485 | |
486 | /** |
487 | * @param callable $tableWriteCallback Callback the takes the following arguments: |
488 | * - IDatabase instance |
489 | * - Partition table name string |
490 | * - UNIX modification timestamp |
491 | * - Map of (key => list of arguments) for keys belonging to the server/table partition |
492 | * - Map of (key => result) [returned] |
493 | * @param float $mtime UNIX modification timestamp |
494 | * @param array<string,array> $argsByKey Map of (key => list of arguments) |
495 | * @param array<string,mixed> &$resByKey Order-preserved map of (key => result) [returned] |
496 | * @return bool Whether all keys were processed |
497 | * @param-taint $argsByKey none |
498 | */ |
499 | private function modifyBlobs( |
500 | callable $tableWriteCallback, |
501 | float $mtime, |
502 | array $argsByKey, |
503 | &$resByKey = [] |
504 | ) { |
505 | // Initialize order-preserved per-key results; callbacks mark successful results |
506 | $resByKey = array_fill_keys( array_keys( $argsByKey ), false ); |
507 | |
508 | /** @noinspection PhpUnusedLocalVariableInspection */ |
509 | $silenceScope = $this->silenceTransactionProfiler(); |
510 | |
511 | $argsByKeyByTableByShard = []; |
512 | foreach ( $argsByKey as $key => $args ) { |
513 | [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key ); |
514 | $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args; |
515 | } |
516 | |
517 | $shardIndexesAffected = []; |
518 | foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) { |
519 | foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) { |
520 | try { |
521 | $db = $this->getConnection( $shardIndex ); |
522 | $shardIndexesAffected[] = $shardIndex; |
523 | $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey ); |
524 | } catch ( DBError $e ) { |
525 | $this->handleDBError( $e, $shardIndex ); |
526 | continue; |
527 | } |
528 | } |
529 | } |
530 | |
531 | $success = !in_array( false, $resByKey, true ); |
532 | |
533 | foreach ( $shardIndexesAffected as $shardIndex ) { |
534 | try { |
535 | if ( |
536 | // Random purging is enabled |
537 | $this->purgePeriod && |
538 | // Only purge on one in every $this->purgePeriod writes |
539 | mt_rand( 0, $this->purgePeriod - 1 ) == 0 && |
540 | // Avoid repeating the delete within a few seconds |
541 | ( $this->getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC |
542 | ) { |
543 | $this->garbageCollect( $shardIndex ); |
544 | } |
545 | } catch ( DBError $e ) { |
546 | $this->handleDBError( $e, $shardIndex ); |
547 | } |
548 | } |
549 | |
550 | return $success; |
551 | } |
552 | |
553 | /** |
554 | * Set key/value pairs belonging to a partition table on the given server |
555 | * |
556 | * In multi-primary mode, if the current row for a key exists and has a modification token |
557 | * with a greater integral UNIX timestamp than that of the provided modification timestamp, |
558 | * then the write to that key will be aborted with a "false" result. Successfully modified |
559 | * key rows will be assigned a new modification token using the provided timestamp. |
560 | * |
561 | * @param IDatabase $db Handle to the database server where the argument keys belong |
562 | * @param string $ptable Name of the partition table where the argument keys belong |
563 | * @param float $mtime UNIX modification timestamp |
564 | * @param array<string,array> $argsByKey Non-empty (key => (value,exptime)) map |
565 | * @param array<string,mixed> &$resByKey Map of (key => result) for successful writes [returned] |
566 | * @throws DBError |
567 | */ |
568 | private function modifyTableSpecificBlobsForSet( |
569 | IDatabase $db, |
570 | string $ptable, |
571 | float $mtime, |
572 | array $argsByKey, |
573 | array &$resByKey |
574 | ) { |
575 | $valueSizesByKey = []; |
576 | |
577 | $mt = $this->makeTimestampedModificationToken( $mtime, $db ); |
578 | |
579 | $rows = []; |
580 | foreach ( $argsByKey as $key => [ $value, $exptime ] ) { |
581 | $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); |
582 | $serialValue = $this->getSerialized( $value, $key ); |
583 | $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ); |
584 | |
585 | $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ]; |
586 | } |
587 | |
588 | if ( $this->multiPrimaryMode ) { |
589 | $db->newInsertQueryBuilder() |
590 | ->insertInto( $ptable ) |
591 | ->rows( $rows ) |
592 | ->onDuplicateKeyUpdate() |
593 | ->uniqueIndexFields( [ 'keyname' ] ) |
594 | ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) ) |
595 | ->caller( __METHOD__ )->execute(); |
596 | } else { |
597 | // T288998: use REPLACE, if possible, to avoid cluttering the binlogs |
598 | $db->newReplaceQueryBuilder() |
599 | ->replaceInto( $ptable ) |
600 | ->rows( $rows ) |
601 | ->uniqueIndexFields( [ 'keyname' ] ) |
602 | ->caller( __METHOD__ )->execute(); |
603 | } |
604 | |
605 | foreach ( $argsByKey as $key => $unused ) { |
606 | $resByKey[$key] = true; |
607 | } |
608 | |
609 | $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey ); |
610 | } |
611 | |
612 | /** |
613 | * Purge/tombstone key/value pairs belonging to a partition table on the given server |
614 | * |
615 | * In multi-primary mode, if the current row for a key exists and has a modification token |
616 | * with a greater integral UNIX timestamp than that of the provided modification timestamp, |
617 | * then the write to that key will be aborted with a "false" result. Successfully modified |
618 | * key rows will be assigned a new modification token/timestamp, an empty value, and an |
619 | * expiration timestamp dated slightly before the new modification timestamp. |
620 | * |
621 | * @param IDatabase $db Handle to the database server where the argument keys belong |
622 | * @param string $ptable Name of the partition table where the argument keys belong |
623 | * @param float $mtime UNIX modification timestamp |
624 | * @param array<string,array> $argsByKey Non-empty (key => []) map |
625 | * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned] |
626 | * @throws DBError |
627 | */ |
628 | private function modifyTableSpecificBlobsForDelete( |
629 | IDatabase $db, |
630 | string $ptable, |
631 | float $mtime, |
632 | array $argsByKey, |
633 | array &$resByKey |
634 | ) { |
635 | if ( $this->multiPrimaryMode ) { |
636 | // Tombstone keys in order to respect eventual consistency |
637 | $mt = $this->makeTimestampedModificationToken( $mtime, $db ); |
638 | $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (int)$mtime ); |
639 | $queryBuilder = $db->newInsertQueryBuilder() |
640 | ->insertInto( $ptable ) |
641 | ->onDuplicateKeyUpdate() |
642 | ->uniqueIndexFields( [ 'keyname' ] ) |
643 | ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) ); |
644 | foreach ( $argsByKey as $key => $arg ) { |
645 | $queryBuilder->row( $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt ) ); |
646 | } |
647 | $queryBuilder->caller( __METHOD__ )->execute(); |
648 | } else { |
649 | // Just purge the keys since there is only one primary (e.g. "source of truth") |
650 | $db->newDeleteQueryBuilder() |
651 | ->deleteFrom( $ptable ) |
652 | ->where( [ 'keyname' => array_keys( $argsByKey ) ] ) |
653 | ->caller( __METHOD__ )->execute(); |
654 | } |
655 | |
656 | foreach ( $argsByKey as $key => $arg ) { |
657 | $resByKey[$key] = true; |
658 | } |
659 | |
660 | $this->updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) ); |
661 | } |
662 | |
663 | /** |
664 | * Insert key/value pairs belonging to a partition table on the given server |
665 | * |
666 | * If the current row for a key exists and has an integral UNIX timestamp of expiration |
667 | * greater than that of the provided modification timestamp, then the write to that key |
668 | * will be aborted with a "false" result. Acquisition of advisory key locks must be handled |
669 | * by calling functions. |
670 | * |
671 | * In multi-primary mode, if the current row for a key exists and has a modification token |
672 | * with a greater integral UNIX timestamp than that of the provided modification timestamp, |
673 | * then the write to that key will be aborted with a "false" result. Successfully modified |
674 | * key rows will be assigned a new modification token/timestamp. |
675 | * |
676 | * @param IDatabase $db Handle to the database server where the argument keys belong |
677 | * @param string $ptable Name of the partition table where the argument keys belong |
678 | * @param float $mtime UNIX modification timestamp |
679 | * @param array<string,array> $argsByKey Non-empty (key => (value,exptime)) map |
680 | * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned] |
681 | * @throws DBError |
682 | */ |
683 | private function modifyTableSpecificBlobsForAdd( |
684 | IDatabase $db, |
685 | string $ptable, |
686 | float $mtime, |
687 | array $argsByKey, |
688 | array &$resByKey |
689 | ) { |
690 | $valueSizesByKey = []; |
691 | |
692 | $mt = $this->makeTimestampedModificationToken( $mtime, $db ); |
693 | |
694 | // This check must happen outside the write query to respect eventual consistency |
695 | $existingKeys = $db->newSelectQueryBuilder() |
696 | ->select( 'keyname' ) |
697 | ->from( $ptable ) |
698 | ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) ) |
699 | ->caller( __METHOD__ ) |
700 | ->fetchFieldValues(); |
701 | $existingByKey = array_fill_keys( $existingKeys, true ); |
702 | |
703 | $rows = []; |
704 | foreach ( $argsByKey as $key => [ $value, $exptime ] ) { |
705 | if ( isset( $existingByKey[$key] ) ) { |
706 | $this->logger->debug( __METHOD__ . ": $key already exists" ); |
707 | continue; |
708 | } |
709 | |
710 | $serialValue = $this->getSerialized( $value, $key ); |
711 | $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); |
712 | $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ]; |
713 | $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ); |
714 | } |
715 | if ( !$rows ) { |
716 | return; |
717 | } |
718 | $db->newInsertQueryBuilder() |
719 | ->insertInto( $ptable ) |
720 | ->rows( $rows ) |
721 | ->onDuplicateKeyUpdate() |
722 | ->uniqueIndexFields( [ 'keyname' ] ) |
723 | ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) ) |
724 | ->caller( __METHOD__ )->execute(); |
725 | |
726 | foreach ( $argsByKey as $key => $unused ) { |
727 | $resByKey[$key] = !isset( $existingByKey[$key] ); |
728 | } |
729 | |
730 | $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey ); |
731 | } |
732 | |
733 | /** |
734 | * Insert key/value pairs belonging to a partition table on the given server |
735 | * |
736 | * If the current row for a key exists, has an integral UNIX timestamp of expiration greater |
737 | * than that of the provided modification timestamp, and the CAS token does not match, then |
738 | * the write to that key will be aborted with a "false" result. Acquisition of advisory key |
739 | * locks must be handled by calling functions. |
740 | * |
741 | * In multi-primary mode, if the current row for a key exists and has a modification token |
742 | * with a greater integral UNIX timestamp than that of the provided modification timestamp, |
743 | * then the write to that key will be aborted with a "false" result. Successfully modified |
744 | * key rows will be assigned a new modification token/timestamp. |
745 | * |
746 | * @param IDatabase $db Handle to the database server where the argument keys belong |
747 | * @param string $ptable Name of the partition table where the argument keys belong |
748 | * @param float $mtime UNIX modification timestamp |
749 | * @param array<string,array> $argsByKey Non-empty (key => (value, exptime, CAS token)) map |
750 | * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned] |
751 | * @throws DBError |
752 | */ |
753 | private function modifyTableSpecificBlobsForCas( |
754 | IDatabase $db, |
755 | string $ptable, |
756 | float $mtime, |
757 | array $argsByKey, |
758 | array &$resByKey |
759 | ) { |
760 | $valueSizesByKey = []; |
761 | |
762 | $mt = $this->makeTimestampedModificationToken( $mtime, $db ); |
763 | |
764 | // This check must happen outside the write query to respect eventual consistency |
765 | $res = $db->newSelectQueryBuilder() |
766 | ->select( $this->addCasTokenFields( $db, [ 'keyname' ] ) ) |
767 | ->from( $ptable ) |
768 | ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) ) |
769 | ->caller( __METHOD__ ) |
770 | ->fetchResultSet(); |
771 | |
772 | $curTokensByKey = []; |
773 | foreach ( $res as $row ) { |
774 | $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row ); |
775 | } |
776 | |
777 | $nonMatchingByKey = []; |
778 | $rows = []; |
779 | foreach ( $argsByKey as $key => [ $value, $exptime, $casToken ] ) { |
780 | $curToken = $curTokensByKey[$key] ?? null; |
781 | if ( $curToken === null ) { |
782 | $nonMatchingByKey[$key] = true; |
783 | $this->logger->debug( __METHOD__ . ": $key does not exists" ); |
784 | continue; |
785 | } |
786 | |
787 | if ( $curToken !== $casToken ) { |
788 | $nonMatchingByKey[$key] = true; |
789 | $this->logger->debug( __METHOD__ . ": $key does not have a matching token" ); |
790 | continue; |
791 | } |
792 | |
793 | $serialValue = $this->getSerialized( $value, $key ); |
794 | $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); |
795 | $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ]; |
796 | |
797 | $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ); |
798 | } |
799 | if ( !$rows ) { |
800 | return; |
801 | } |
802 | $db->newInsertQueryBuilder() |
803 | ->insertInto( $ptable ) |
804 | ->rows( $rows ) |
805 | ->onDuplicateKeyUpdate() |
806 | ->uniqueIndexFields( [ 'keyname' ] ) |
807 | ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) ) |
808 | ->caller( __METHOD__ )->execute(); |
809 | |
810 | foreach ( $argsByKey as $key => $unused ) { |
811 | $resByKey[$key] = !isset( $nonMatchingByKey[$key] ); |
812 | } |
813 | |
814 | $this->updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey ); |
815 | } |
816 | |
817 | /** |
818 | * Update the TTL for keys belonging to a partition table on the given server |
819 | * |
820 | * If no current row for a key exists or the current row has an integral UNIX timestamp of |
821 | * expiration less than that of the provided modification timestamp, then the write to that |
822 | * key will be aborted with a "false" result. |
823 | * |
824 | * In multi-primary mode, if the current row for a key exists and has a modification token |
825 | * with a greater integral UNIX timestamp than that of the provided modification timestamp, |
826 | * then the write to that key will be aborted with a "false" result. Successfully modified |
827 | * key rows will be assigned a new modification token/timestamp. |
828 | * |
829 | * @param IDatabase $db Handle to the database server where the argument keys belong |
830 | * @param string $ptable Name of the partition table where the argument keys belong |
831 | * @param float $mtime UNIX modification timestamp |
832 | * @param array<string,array> $argsByKey Non-empty (key => (exptime)) map |
833 | * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned] |
834 | * @throws DBError |
835 | */ |
836 | private function modifyTableSpecificBlobsForChangeTTL( |
837 | IDatabase $db, |
838 | string $ptable, |
839 | float $mtime, |
840 | array $argsByKey, |
841 | array &$resByKey |
842 | ) { |
843 | if ( $this->multiPrimaryMode ) { |
844 | $mt = $this->makeTimestampedModificationToken( $mtime, $db ); |
845 | |
846 | $res = $db->newSelectQueryBuilder() |
847 | ->select( [ 'keyname', 'value' ] ) |
848 | ->from( $ptable ) |
849 | ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) ) |
850 | ->caller( __METHOD__ ) |
851 | ->fetchResultSet(); |
852 | |
853 | $rows = []; |
854 | $existingKeys = []; |
855 | foreach ( $res as $curRow ) { |
856 | $key = $curRow->keyname; |
857 | $existingKeys[$key] = true; |
858 | $serialValue = $this->dbDecodeSerialValue( $db, $curRow->value ); |
859 | [ $exptime ] = $argsByKey[$key]; |
860 | $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); |
861 | $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ); |
862 | } |
863 | if ( !$rows ) { |
864 | return; |
865 | } |
866 | $db->newInsertQueryBuilder() |
867 | ->insertInto( $ptable ) |
868 | ->rows( $rows ) |
869 | ->onDuplicateKeyUpdate() |
870 | ->uniqueIndexFields( [ 'keyname' ] ) |
871 | ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) ) |
872 | ->caller( __METHOD__ )->execute(); |
873 | |
874 | foreach ( $argsByKey as $key => $unused ) { |
875 | $resByKey[$key] = isset( $existingKeys[$key] ); |
876 | } |
877 | } else { |
878 | $keysBatchesByExpiry = []; |
879 | foreach ( $argsByKey as $key => [ $exptime ] ) { |
880 | $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); |
881 | $keysBatchesByExpiry[$expiry][] = $key; |
882 | } |
883 | |
884 | $existingCount = 0; |
885 | foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) { |
886 | $db->newUpdateQueryBuilder() |
887 | ->update( $ptable ) |
888 | ->set( [ 'exptime' => $this->encodeDbExpiry( $db, $expiry ) ] ) |
889 | ->where( $this->buildExistenceConditions( $db, $keyBatch, (int)$mtime ) ) |
890 | ->caller( __METHOD__ )->execute(); |
891 | $existingCount += $db->affectedRows(); |
892 | } |
893 | if ( $existingCount === count( $argsByKey ) ) { |
894 | foreach ( $argsByKey as $key => $args ) { |
895 | $resByKey[$key] = true; |
896 | } |
897 | } |
898 | } |
899 | |
900 | $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) ); |
901 | } |
902 | |
903 | /** |
904 | * Either increment a counter key, if it exists, or initialize it, otherwise |
905 | * |
906 | * If no current row for a key exists or the current row has an integral UNIX timestamp of |
907 | * expiration less than that of the provided modification timestamp, then the key row will |
908 | * be set to the initial value. Otherwise, the current row will be incremented. |
909 | * |
910 | * In multi-primary mode, if the current row for a key exists and has a modification token |
911 | * with a greater integral UNIX timestamp than that of the provided modification timestamp, |
912 | * then the write to that key will be aborted with a "false" result. Successfully initialized |
913 | * key rows will be assigned a new modification token/timestamp. |
914 | * |
915 | * @param IDatabase $db Handle to the database server where the argument keys belong |
916 | * @param string $ptable Name of the partition table where the argument keys belong |
917 | * @param float $mtime UNIX modification timestamp |
918 | * @param array<string,array> $argsByKey Non-empty (key => (step, init, exptime) map |
919 | * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned] |
920 | * @throws DBError |
921 | */ |
922 | private function modifyTableSpecificBlobsForIncrInit( |
923 | IDatabase $db, |
924 | string $ptable, |
925 | float $mtime, |
926 | array $argsByKey, |
927 | array &$resByKey |
928 | ) { |
929 | foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) { |
930 | $mt = $this->makeTimestampedModificationToken( $mtime, $db ); |
931 | $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); |
932 | |
933 | // Use a transaction so that changes from other threads are not visible due to |
934 | // "consistent reads". This way, the exact post-increment value can be returned. |
935 | // The "live key exists" check can go inside the write query and remain safe for |
936 | // replication since the TTL for such keys is either indefinite or very short. |
937 | $atomic = $db->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE ); |
938 | try { |
939 | $db->newInsertQueryBuilder() |
940 | ->insertInto( $ptable ) |
941 | ->rows( $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ) ) |
942 | ->onDuplicateKeyUpdate() |
943 | ->uniqueIndexFields( [ 'keyname' ] ) |
944 | ->set( $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ) ) |
945 | ->caller( __METHOD__ )->execute(); |
946 | $affectedCount = $db->affectedRows(); |
947 | $row = $db->newSelectQueryBuilder() |
948 | ->select( 'value' ) |
949 | ->from( $ptable ) |
950 | ->where( [ 'keyname' => $key ] ) |
951 | ->caller( __METHOD__ ) |
952 | ->fetchRow(); |
953 | } catch ( Exception $e ) { |
954 | $db->cancelAtomic( __METHOD__, $atomic ); |
955 | throw $e; |
956 | } |
957 | $db->endAtomic( __METHOD__ ); |
958 | |
959 | if ( !$affectedCount || $row === false ) { |
960 | $this->logger->warning( __METHOD__ . ": failed to set new $key value" ); |
961 | continue; |
962 | } |
963 | |
964 | $serialValue = $this->dbDecodeSerialValue( $db, $row->value ); |
965 | if ( !$this->isInteger( $serialValue ) ) { |
966 | $this->logger->warning( __METHOD__ . ": got non-integer $key value" ); |
967 | continue; |
968 | } |
969 | |
970 | $resByKey[$key] = (int)$serialValue; |
971 | } |
972 | |
973 | $this->updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) ); |
974 | } |
975 | |
976 | /** |
977 | * Same as modifyTableSpecificBlobsForIncrInit() but does not return the |
978 | * new value. |
979 | * |
980 | * @param IDatabase $db |
981 | * @param string $ptable |
982 | * @param float $mtime |
983 | * @param array<string,array> $argsByKey |
984 | * @param array<string,mixed> &$resByKey |
985 | * @throws DBError |
986 | */ |
987 | private function modifyTableSpecificBlobsForIncrInitAsync( |
988 | IDatabase $db, |
989 | string $ptable, |
990 | float $mtime, |
991 | array $argsByKey, |
992 | array &$resByKey |
993 | ) { |
994 | foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) { |
995 | $mt = $this->makeTimestampedModificationToken( $mtime, $db ); |
996 | $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); |
997 | $db->newInsertQueryBuilder() |
998 | ->insertInto( $ptable ) |
999 | ->rows( $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ) ) |
1000 | ->onDuplicateKeyUpdate() |
1001 | ->uniqueIndexFields( [ 'keyname' ] ) |
1002 | ->set( $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ) ) |
1003 | ->caller( __METHOD__ )->execute(); |
1004 | if ( !$db->affectedRows() ) { |
1005 | $this->logger->warning( __METHOD__ . ": failed to set new $key value" ); |
1006 | } else { |
1007 | $resByKey[$key] = true; |
1008 | } |
1009 | } |
1010 | } |
1011 | |
1012 | /** |
1013 | * @param int $exptime Relative or absolute expiration |
1014 | * @param int $nowTsUnix Current UNIX timestamp |
1015 | * @return int UNIX timestamp or TTL_INDEFINITE |
1016 | */ |
1017 | private function makeNewKeyExpiry( $exptime, int $nowTsUnix ) { |
1018 | $expiry = $this->getExpirationAsTimestamp( $exptime ); |
1019 | // Eventual consistency requires the preservation of recently modified keys. |
1020 | // Do not create rows with `exptime` fields so low that they might get garbage |
1021 | // collected before being replicated. |
1022 | if ( $expiry !== self::TTL_INDEFINITE ) { |
1023 | $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC ); |
1024 | } |
1025 | |
1026 | return $expiry; |
1027 | } |
1028 | |
1029 | /** |
1030 | * Get a scoped lock and modification timestamp for a critical section of reads/writes |
1031 | * |
1032 | * This is used instead of BagOStuff::getCurrentTime() for certain writes (such as "add", |
1033 | * "incr", and "cas"), for which we want to support tight race conditions where the same |
1034 | * key is repeatedly written to by multiple web servers that each get to see the previous |
1035 | * value, act on it, and modify it in some way. |
1036 | * |
1037 | * It is assumed that this method is normally only invoked from the primary datacenter. |
1038 | * A lock is acquired on the primary server of the local datacenter in order to avoid race |
1039 | * conditions within the critical section. The clock on the SQL server is used to get the |
1040 | * modification timestamp in order to minimize issues with clock drift between web servers; |
1041 | * thus key writes will not be rejected due to some web servers having lagged clocks. |
1042 | * |
1043 | * @param string $key |
1044 | * @param ?ScopedCallback &$scope Unlocker callback; null on failure [returned] |
1045 | * @return float|null UNIX timestamp with 6 decimal places; null on failure |
1046 | */ |
1047 | private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) { |
1048 | if ( !$this->lock( $key, 0 ) ) { |
1049 | return null; |
1050 | } |
1051 | |
1052 | $scope = new ScopedCallback( function () use ( $key ) { |
1053 | $this->unlock( $key ); |
1054 | } ); |
1055 | |
1056 | // sprintf is used to adjust precision |
1057 | return (float)sprintf( '%.6F', $this->locks[$key][self::LOCK_TIME] ); |
1058 | } |
1059 | |
1060 | /** |
1061 | * Make a `modtoken` column value with the original time and source database server of a write |
1062 | * |
1063 | * @param float $mtime UNIX modification timestamp |
1064 | * @param IDatabase $db Handle to the primary database server sourcing the write |
1065 | * @return string String of the form "<SECONDS_SOURCE><MICROSECONDS>", where SECONDS_SOURCE |
1066 | * is "<35 bit seconds portion of UNIX time><32 bit database server ID>" as 13 base 36 chars, |
1067 | * and MICROSECONDS is "<20 bit microseconds portion of UNIX time>" as 4 base 36 chars |
1068 | */ |
1069 | private function makeTimestampedModificationToken( float $mtime, IDatabase $db ) { |
1070 | // We have reserved space for upto 6 digits in the microsecond portion of the token. |
1071 | // This is for future use only (maybe CAS tokens) and not currently used. |
1072 | // It is currently populated by the microsecond portion returned by microtime, |
1073 | // which generally has fewer than 6 digits of meaningful precision but can still be useful |
1074 | // in debugging (to see the token continuously change even during rapid testing). |
1075 | $seconds = (int)$mtime; |
1076 | [ , $microseconds ] = explode( '.', sprintf( '%.6F', $mtime ) ); |
1077 | |
1078 | $id = $db->getTopologyBasedServerId() ?? sprintf( '%u', crc32( $db->getServerName() ) ); |
1079 | |
1080 | $token = implode( '', [ |
1081 | // 67 bit integral portion of UNIX timestamp, qualified |
1082 | \Wikimedia\base_convert( |
1083 | // 35 bit integral seconds portion of UNIX timestamp |
1084 | str_pad( base_convert( (string)$seconds, 10, 2 ), 35, '0', STR_PAD_LEFT ) . |
1085 | // 32 bit ID of the primary database server handling the write |
1086 | str_pad( base_convert( $id, 10, 2 ), 32, '0', STR_PAD_LEFT ), |
1087 | 2, |
1088 | 36, |
1089 | 13 |
1090 | ), |
1091 | // 20 bit fractional portion of UNIX timestamp, as integral microseconds |
1092 | str_pad( base_convert( $microseconds, 10, 36 ), 4, '0', STR_PAD_LEFT ) |
1093 | ] ); |
1094 | |
1095 | if ( strlen( $token ) !== 17 ) { |
1096 | throw new RuntimeException( "Modification timestamp overflow detected" ); |
1097 | } |
1098 | |
1099 | return $token; |
1100 | } |
1101 | |
1102 | /** |
1103 | * WHERE conditions that check for existence and liveness of keys |
1104 | * |
1105 | * @param IDatabase $db |
1106 | * @param string[]|string $keys |
1107 | * @param int $time UNIX modification timestamp |
1108 | * @return array |
1109 | */ |
1110 | private function buildExistenceConditions( IDatabase $db, $keys, int $time ) { |
1111 | // Note that tombstones always have past expiration dates |
1112 | return [ |
1113 | 'keyname' => $keys, |
1114 | $db->expr( 'exptime', '>=', $db->timestamp( $time ) ) |
1115 | ]; |
1116 | } |
1117 | |
1118 | /** |
1119 | * INSERT array for handling key writes/overwrites when no live nor stale key exists |
1120 | * |
1121 | * @param IDatabase $db |
1122 | * @param string $key |
1123 | * @param string|int $serialValue New value |
1124 | * @param int $expiry Expiration timestamp or TTL_INDEFINITE |
1125 | * @param string $mt Modification token |
1126 | * @return array |
1127 | */ |
1128 | private function buildUpsertRow( |
1129 | IDatabase $db, |
1130 | $key, |
1131 | $serialValue, |
1132 | int $expiry, |
1133 | string $mt |
1134 | ) { |
1135 | $row = [ |
1136 | 'keyname' => $key, |
1137 | 'value' => $this->dbEncodeSerialValue( $db, $serialValue ), |
1138 | 'exptime' => $this->encodeDbExpiry( $db, $expiry ) |
1139 | ]; |
1140 | if ( $this->multiPrimaryMode ) { |
1141 | $row['modtoken'] = $mt; |
1142 | } |
1143 | |
1144 | return $row; |
1145 | } |
1146 | |
1147 | /** |
1148 | * SET array for handling key overwrites when a live or stale key exists |
1149 | * |
1150 | * @param IDatabase $db |
1151 | * @param string $mt Modification token |
1152 | * @return array |
1153 | */ |
1154 | private function buildMultiUpsertSetForOverwrite( IDatabase $db, string $mt ) { |
1155 | $expressionsByColumn = [ |
1156 | 'value' => $db->buildExcludedValue( 'value' ), |
1157 | 'exptime' => $db->buildExcludedValue( 'exptime' ) |
1158 | ]; |
1159 | |
1160 | $set = []; |
1161 | if ( $this->multiPrimaryMode ) { |
1162 | // The query might take a while to replicate, during which newer values might get |
1163 | // written. Qualify the query so that it does not override such values. Note that |
1164 | // duplicate tokens generated around the same time for a key should still result |
1165 | // in convergence given the use of server_id in modtoken (providing a total order |
1166 | // among primary DB servers) and MySQL binlog ordering (providing a total order |
1167 | // for writes replicating from a given primary DB server). |
1168 | $expressionsByColumn['modtoken'] = $db->addQuotes( $mt ); |
1169 | foreach ( $expressionsByColumn as $column => $updateExpression ) { |
1170 | $rhs = $db->conditional( |
1171 | $db->addQuotes( substr( $mt, 0, 13 ) ) . ' >= ' . |
1172 | $db->buildSubString( 'modtoken', 1, 13 ), |
1173 | $updateExpression, |
1174 | $column |
1175 | ); |
1176 | $set[] = "{$column}=" . trim( $rhs ); |
1177 | } |
1178 | } else { |
1179 | foreach ( $expressionsByColumn as $column => $updateExpression ) { |
1180 | $set[] = "{$column}={$updateExpression}"; |
1181 | } |
1182 | } |
1183 | |
1184 | return $set; |
1185 | } |
1186 | |
1187 | /** |
1188 | * SET array for handling key overwrites when a live or stale key exists |
1189 | * |
1190 | * @param IDatabase $db |
1191 | * @param int $step Positive counter incrementation value |
1192 | * @param int $init Positive initial counter value |
1193 | * @param int $expiry Expiration timestamp or TTL_INDEFINITE |
1194 | * @param string $mt Modification token |
1195 | * @param int $mtUnixTs UNIX timestamp of modification token |
1196 | * @return array |
1197 | */ |
1198 | private function buildIncrUpsertSet( |
1199 | IDatabase $db, |
1200 | int $step, |
1201 | int $init, |
1202 | int $expiry, |
1203 | string $mt, |
1204 | int $mtUnixTs |
1205 | ) { |
1206 | // Map of (column => (SQL for non-expired key rows, SQL for expired key rows)) |
1207 | $expressionsByColumn = [ |
1208 | 'value' => [ |
1209 | $db->buildIntegerCast( 'value' ) . " + {$db->addQuotes( $step )}", |
1210 | $db->addQuotes( $this->dbEncodeSerialValue( $db, $init ) ) |
1211 | ], |
1212 | 'exptime' => [ |
1213 | 'exptime', |
1214 | $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) ) |
1215 | ] |
1216 | ]; |
1217 | if ( $this->multiPrimaryMode ) { |
1218 | $expressionsByColumn['modtoken'] = [ 'modtoken', $db->addQuotes( $mt ) ]; |
1219 | } |
1220 | |
1221 | $set = []; |
1222 | foreach ( $expressionsByColumn as $column => [ $updateExpression, $initExpression ] ) { |
1223 | $rhs = $db->conditional( |
1224 | $db->expr( 'exptime', '>=', $db->timestamp( $mtUnixTs ) ), |
1225 | $updateExpression, |
1226 | $initExpression |
1227 | ); |
1228 | $set[] = "{$column}=" . trim( $rhs ); |
1229 | } |
1230 | |
1231 | return $set; |
1232 | } |
1233 | |
1234 | /** |
1235 | * @param IDatabase $db |
1236 | * @param int $expiry UNIX timestamp of expiration or TTL_INDEFINITE |
1237 | * @return string |
1238 | */ |
1239 | private function encodeDbExpiry( IDatabase $db, int $expiry ) { |
1240 | return ( $expiry === self::TTL_INDEFINITE ) |
1241 | // Use the maximum timestamp that the column can store |
1242 | ? $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) |
1243 | // Convert the absolute timestamp into the DB timestamp format |
1244 | : $db->timestamp( $expiry ); |
1245 | } |
1246 | |
1247 | /** |
1248 | * @param IDatabase $db |
1249 | * @param string $dbExpiry DB timestamp of expiration |
1250 | * @return int UNIX timestamp of expiration or TTL_INDEFINITE |
1251 | */ |
1252 | private function decodeDbExpiry( IDatabase $db, string $dbExpiry ) { |
1253 | return ( $dbExpiry === $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) ) |
1254 | ? self::TTL_INDEFINITE |
1255 | : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry ); |
1256 | } |
1257 | |
1258 | /** |
1259 | * @param IDatabase $db |
1260 | * @param string|int $serialValue |
1261 | * @return string |
1262 | */ |
1263 | private function dbEncodeSerialValue( IDatabase $db, $serialValue ) { |
1264 | return is_int( $serialValue ) ? (string)$serialValue : $db->encodeBlob( $serialValue ); |
1265 | } |
1266 | |
1267 | /** |
1268 | * @param IDatabase $db |
1269 | * @param Blob|string|int $blob |
1270 | * @return string|int |
1271 | */ |
1272 | private function dbDecodeSerialValue( IDatabase $db, $blob ) { |
1273 | return $this->isInteger( $blob ) ? (int)$blob : $db->decodeBlob( $blob ); |
1274 | } |
1275 | |
1276 | /** |
1277 | * Either append a 'castoken' field or append the fields needed to compute the CAS token |
1278 | * |
1279 | * @param IDatabase $db |
1280 | * @param string[] $fields SELECT field array |
1281 | * @return string[] SELECT field array |
1282 | */ |
1283 | private function addCasTokenFields( IDatabase $db, array $fields ) { |
1284 | $type = $db->getType(); |
1285 | |
1286 | if ( $type === 'mysql' ) { |
1287 | $fields['castoken'] = $db->buildConcat( [ |
1288 | 'SHA1(value)', |
1289 | $db->addQuotes( '@' ), |
1290 | 'exptime' |
1291 | ] ); |
1292 | } elseif ( $type === 'postgres' ) { |
1293 | $fields['castoken'] = $db->buildConcat( [ |
1294 | 'md5(value)', |
1295 | $db->addQuotes( '@' ), |
1296 | 'exptime' |
1297 | ] ); |
1298 | } else { |
1299 | if ( !in_array( 'value', $fields, true ) ) { |
1300 | $fields[] = 'value'; |
1301 | } |
1302 | if ( !in_array( 'exptime', $fields, true ) ) { |
1303 | $fields[] = 'exptime'; |
1304 | } |
1305 | } |
1306 | |
1307 | return $fields; |
1308 | } |
1309 | |
1310 | /** |
1311 | * Get a CAS token from a SELECT result row |
1312 | * |
1313 | * @param IDatabase $db |
1314 | * @param stdClass $row A row for a key |
1315 | * @return string CAS token |
1316 | */ |
1317 | private function getCasTokenFromRow( IDatabase $db, stdClass $row ) { |
1318 | if ( isset( $row->castoken ) ) { |
1319 | $token = $row->castoken; |
1320 | } else { |
1321 | $token = sha1( $this->dbDecodeSerialValue( $db, $row->value ) ) . '@' . $row->exptime; |
1322 | $this->logger->debug( __METHOD__ . ": application computed hash for CAS token" ); |
1323 | } |
1324 | |
1325 | return $token; |
1326 | } |
1327 | |
1328 | /** |
1329 | * @param int $shardIndex |
1330 | * @throws DBError |
1331 | */ |
1332 | private function garbageCollect( $shardIndex ) { |
1333 | // set right away, avoid queuing duplicate async callbacks |
1334 | $this->lastGarbageCollect = $this->getCurrentTime(); |
1335 | |
1336 | $garbageCollector = function () use ( $shardIndex ) { |
1337 | $db = $this->getConnection( $shardIndex ); |
1338 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1339 | $silenceScope = $this->silenceTransactionProfiler(); |
1340 | $this->deleteServerObjectsExpiringBefore( |
1341 | $db, |
1342 | (int)$this->getCurrentTime(), |
1343 | $this->purgeLimit |
1344 | ); |
1345 | $this->lastGarbageCollect = $this->getCurrentTime(); |
1346 | }; |
1347 | |
1348 | if ( $this->asyncHandler ) { |
1349 | ( $this->asyncHandler )( $garbageCollector ); |
1350 | } else { |
1351 | $garbageCollector(); |
1352 | } |
1353 | } |
1354 | |
1355 | /** |
1356 | * @deprecated since 1.41, use deleteObjectsExpiringBefore() instead |
1357 | */ |
1358 | public function expireAll() { |
1359 | wfDeprecated( __METHOD__, '1.41' ); |
1360 | $this->deleteObjectsExpiringBefore( (int)$this->getCurrentTime() ); |
1361 | } |
1362 | |
1363 | public function deleteObjectsExpiringBefore( |
1364 | $timestamp, |
1365 | callable $progress = null, |
1366 | $limit = INF, |
1367 | string $tag = null |
1368 | ) { |
1369 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1370 | $silenceScope = $this->silenceTransactionProfiler(); |
1371 | |
1372 | if ( $tag !== null ) { |
1373 | // Purge one server only, to support concurrent purging in large wiki farms (T282761). |
1374 | $shardIndexes = []; |
1375 | if ( !$this->serverTags ) { |
1376 | throw new InvalidArgumentException( "Given a tag but no tags are configured" ); |
1377 | } |
1378 | foreach ( $this->serverTags as $serverShardIndex => $serverTag ) { |
1379 | if ( $tag === $serverTag ) { |
1380 | $shardIndexes[] = $serverShardIndex; |
1381 | break; |
1382 | } |
1383 | } |
1384 | if ( !$shardIndexes ) { |
1385 | throw new InvalidArgumentException( "Unknown server tag: $tag" ); |
1386 | } |
1387 | } else { |
1388 | $shardIndexes = $this->getShardServerIndexes(); |
1389 | shuffle( $shardIndexes ); |
1390 | } |
1391 | |
1392 | $ok = true; |
1393 | $numServers = count( $shardIndexes ); |
1394 | |
1395 | $keysDeletedCount = 0; |
1396 | foreach ( $shardIndexes as $numServersDone => $shardIndex ) { |
1397 | try { |
1398 | $db = $this->getConnection( $shardIndex ); |
1399 | |
1400 | // Avoid deadlock (T330377) |
1401 | $lockKey = "SqlBagOStuff-purge-shard:$shardIndex"; |
1402 | if ( !$db->lock( $lockKey, __METHOD__, 0 ) ) { |
1403 | $this->logger->info( "SqlBagOStuff purge for shard $shardIndex already locked, skip" ); |
1404 | continue; |
1405 | } |
1406 | |
1407 | $this->deleteServerObjectsExpiringBefore( |
1408 | $db, |
1409 | $timestamp, |
1410 | $limit, |
1411 | $keysDeletedCount, |
1412 | [ 'fn' => $progress, 'serversDone' => $numServersDone, 'serversTotal' => $numServers ] |
1413 | ); |
1414 | $db->unlock( $lockKey, __METHOD__ ); |
1415 | } catch ( DBError $e ) { |
1416 | $this->handleDBError( $e, $shardIndex ); |
1417 | $ok = false; |
1418 | } |
1419 | } |
1420 | |
1421 | return $ok; |
1422 | } |
1423 | |
1424 | /** |
1425 | * @param IDatabase $db |
1426 | * @param string|int $timestamp |
1427 | * @param int|float $limit Maximum number of rows to delete in total or INF for no limit |
1428 | * @param int &$keysDeletedCount |
1429 | * @param array|null $progress |
1430 | * @phan-param array{fn:?callback,serversDone:int,serversTotal:int}|null $progress |
1431 | * @throws DBError |
1432 | */ |
1433 | private function deleteServerObjectsExpiringBefore( |
1434 | IDatabase $db, |
1435 | $timestamp, |
1436 | $limit, |
1437 | &$keysDeletedCount = 0, |
1438 | array $progress = null |
1439 | ) { |
1440 | $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp ); |
1441 | if ( $this->multiPrimaryMode ) { |
1442 | // Eventual consistency requires the preservation of any key that was recently |
1443 | // modified. The key must exist on this database server long enough for the server |
1444 | // to receive, via replication, all writes to the key with lower timestamps. Such |
1445 | // writes should be no-ops since the existing key value should "win". If the network |
1446 | // partitions between datacenters A and B for 30 minutes, the database servers in |
1447 | // each datacenter will see an initial burst of writes with "old" timestamps via |
1448 | // replication. This might include writes with lower timestamps that the existing |
1449 | // key value. Therefore, clock skew and replication delay are both factors. |
1450 | $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC; |
1451 | $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe ); |
1452 | } |
1453 | $tableIndexes = range( 0, $this->numTableShards - 1 ); |
1454 | shuffle( $tableIndexes ); |
1455 | |
1456 | $batchSize = min( $this->writeBatchSize, $limit ); |
1457 | |
1458 | foreach ( $tableIndexes as $numShardsDone => $tableIndex ) { |
1459 | // The oldest expiry of a row we have deleted on this shard |
1460 | // (the first row that we deleted) |
1461 | $minExpUnix = null; |
1462 | // The most recent expiry time so far, from a row we have deleted on this shard |
1463 | $maxExp = null; |
1464 | // Size of the time range we'll delete, in seconds (for progress estimate) |
1465 | $totalSeconds = null; |
1466 | |
1467 | do { |
1468 | $res = $db->newSelectQueryBuilder() |
1469 | ->select( [ 'keyname', 'exptime' ] ) |
1470 | ->from( $this->getTableNameByShard( $tableIndex ) ) |
1471 | ->where( $db->expr( 'exptime', '<', $db->timestamp( $cutoffUnix ) ) ) |
1472 | ->andWhere( $maxExp ? $db->expr( 'exptime', '>=', $maxExp ) : [] ) |
1473 | ->orderBy( 'exptime', SelectQueryBuilder::SORT_ASC ) |
1474 | ->limit( $batchSize ) |
1475 | ->caller( __METHOD__ ) |
1476 | ->fetchResultSet(); |
1477 | |
1478 | if ( $res->numRows() ) { |
1479 | $row = $res->current(); |
1480 | if ( $minExpUnix === null ) { |
1481 | $minExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $row->exptime ); |
1482 | $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 ); |
1483 | } |
1484 | |
1485 | $keys = []; |
1486 | foreach ( $res as $row ) { |
1487 | $keys[] = $row->keyname; |
1488 | $maxExp = $row->exptime; |
1489 | } |
1490 | |
1491 | $db->newDeleteQueryBuilder() |
1492 | ->deleteFrom( $this->getTableNameByShard( $tableIndex ) ) |
1493 | ->where( [ |
1494 | 'keyname' => $keys, |
1495 | $db->expr( 'exptime', '<', $db->timestamp( $cutoffUnix ) ), |
1496 | ] ) |
1497 | ->caller( __METHOD__ )->execute(); |
1498 | $keysDeletedCount += $db->affectedRows(); |
1499 | } |
1500 | |
1501 | if ( $progress && is_callable( $progress['fn'] ) ) { |
1502 | if ( $totalSeconds ) { |
1503 | $maxExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $maxExp ); |
1504 | $remainingSeconds = $cutoffUnix - $maxExpUnix; |
1505 | $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 ); |
1506 | // For example, if we've done 1.5 table shard, and are thus half-way on the |
1507 | // 2nd of perhaps 5 tables on this server, then this might be: |
1508 | // `( 1 + ( 43200 / 86400 ) ) / 5 = 0.3`, or 30% done, of tables on this server. |
1509 | $tablesDoneRatio = |
1510 | ( $numShardsDone + ( $processedSeconds / $totalSeconds ) ) / $this->numTableShards; |
1511 | } else { |
1512 | $tablesDoneRatio = 1; |
1513 | } |
1514 | |
1515 | // For example, if we're 30% done on the last of 10 servers, then this might be: |
1516 | // `( 9 / 10 ) + ( 0.3 / 10 ) = 0.93`, or 93% done, overall. |
1517 | $overallRatio = ( $progress['serversDone'] / $progress['serversTotal'] ) + |
1518 | ( $tablesDoneRatio / $progress['serversTotal'] ); |
1519 | ( $progress['fn'] )( $overallRatio * 100 ); |
1520 | } |
1521 | } while ( $res->numRows() && $keysDeletedCount < $limit ); |
1522 | } |
1523 | } |
1524 | |
1525 | /** |
1526 | * Delete content of shard tables in every server. |
1527 | * Return true if the operation is successful, false otherwise. |
1528 | * |
1529 | * @deprecated since 1.41, unused. |
1530 | * |
1531 | * @return bool |
1532 | */ |
1533 | public function deleteAll() { |
1534 | wfDeprecated( __METHOD__, '1.41' ); |
1535 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1536 | $silenceScope = $this->silenceTransactionProfiler(); |
1537 | foreach ( $this->getShardServerIndexes() as $shardIndex ) { |
1538 | try { |
1539 | $db = $this->getConnection( $shardIndex ); |
1540 | for ( $i = 0; $i < $this->numTableShards; $i++ ) { |
1541 | $db->newDeleteQueryBuilder() |
1542 | ->deleteFrom( $this->getTableNameByShard( $i ) ) |
1543 | ->where( $db::ALL_ROWS ) |
1544 | ->caller( __METHOD__ )->execute(); |
1545 | } |
1546 | } catch ( DBError $e ) { |
1547 | $this->handleDBError( $e, $shardIndex ); |
1548 | return false; |
1549 | } |
1550 | } |
1551 | return true; |
1552 | } |
1553 | |
1554 | public function doLock( $key, $timeout = 6, $exptime = 6 ) { |
1555 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1556 | $silenceScope = $this->silenceTransactionProfiler(); |
1557 | |
1558 | $lockTsUnix = null; |
1559 | |
1560 | [ $shardIndex ] = $this->getKeyLocation( $key ); |
1561 | try { |
1562 | $db = $this->getConnection( $shardIndex ); |
1563 | $lockTsUnix = $db->lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP ); |
1564 | } catch ( DBError $e ) { |
1565 | $this->handleDBError( $e, $shardIndex ); |
1566 | $this->logger->warning( |
1567 | __METHOD__ . ' failed due to I/O error for {key}.', |
1568 | [ 'key' => $key ] |
1569 | ); |
1570 | } |
1571 | |
1572 | return $lockTsUnix; |
1573 | } |
1574 | |
1575 | public function doUnlock( $key ) { |
1576 | /** @noinspection PhpUnusedLocalVariableInspection */ |
1577 | $silenceScope = $this->silenceTransactionProfiler(); |
1578 | |
1579 | [ $shardIndex ] = $this->getKeyLocation( $key ); |
1580 | |
1581 | try { |
1582 | $db = $this->getConnection( $shardIndex ); |
1583 | $released = $db->unlock( $key, __METHOD__ ); |
1584 | } catch ( DBError $e ) { |
1585 | $this->handleDBError( $e, $shardIndex ); |
1586 | $released = false; |
1587 | } |
1588 | |
1589 | return $released; |
1590 | } |
1591 | |
1592 | protected function makeKeyInternal( $keyspace, $components ) { |
1593 | // SQL schema for 'objectcache' specifies keys as varchar(255). From that, |
1594 | // subtract the number of characters we need for the keyspace and for |
1595 | // the separator character needed for each argument. To handle some |
1596 | // custom prefixes used by thing like WANObjectCache, limit to 205. |
1597 | $keyspace = strtr( $keyspace, ' ', '_' ); |
1598 | $charsLeft = 205 - strlen( $keyspace ) - count( $components ); |
1599 | foreach ( $components as &$component ) { |
1600 | $component = strtr( $component, [ |
1601 | ' ' => '_', // Avoid unnecessary misses from pre-1.35 code |
1602 | ':' => '%3A', |
1603 | ] ); |
1604 | |
1605 | // 33 = 32 characters for the MD5 + 1 for the '#' prefix. |
1606 | if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) { |
1607 | $component = '#' . md5( $component ); |
1608 | } |
1609 | $charsLeft -= strlen( $component ); |
1610 | } |
1611 | |
1612 | if ( $charsLeft < 0 ) { |
1613 | return $keyspace . ':BagOStuff-long-key:##' . md5( implode( ':', $components ) ); |
1614 | } |
1615 | return $keyspace . ':' . implode( ':', $components ); |
1616 | } |
1617 | |
1618 | protected function requireConvertGenericKey(): bool { |
1619 | return true; |
1620 | } |
1621 | |
1622 | protected function serialize( $value ) { |
1623 | if ( is_int( $value ) ) { |
1624 | return $value; |
1625 | } |
1626 | |
1627 | $serial = serialize( $value ); |
1628 | if ( $this->hasZlib ) { |
1629 | // On typical message and page data, this can provide a 3X storage savings |
1630 | $serial = gzdeflate( $serial ); |
1631 | } |
1632 | |
1633 | return $serial; |
1634 | } |
1635 | |
1636 | protected function unserialize( $value ) { |
1637 | if ( $value === self::TOMB_SERIAL ) { |
1638 | return false; // tombstone |
1639 | } |
1640 | |
1641 | if ( $this->isInteger( $value ) ) { |
1642 | return (int)$value; |
1643 | } |
1644 | |
1645 | if ( $this->hasZlib ) { |
1646 | AtEase::suppressWarnings(); |
1647 | $decompressed = gzinflate( $value ); |
1648 | AtEase::restoreWarnings(); |
1649 | |
1650 | if ( $decompressed !== false ) { |
1651 | $value = $decompressed; |
1652 | } |
1653 | } |
1654 | |
1655 | return unserialize( $value ); |
1656 | } |
1657 | |
1658 | private function getLoadBalancer(): ILoadBalancer { |
1659 | if ( !$this->loadBalancer ) { |
1660 | $this->loadBalancer = ( $this->loadBalancerCallback )(); |
1661 | } |
1662 | return $this->loadBalancer; |
1663 | } |
1664 | |
1665 | /** |
1666 | * @return IMaintainableDatabase |
1667 | * @throws DBError |
1668 | */ |
1669 | private function getConnectionViaLoadBalancer() { |
1670 | $lb = $this->getLoadBalancer(); |
1671 | |
1672 | if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) { |
1673 | // Use the main connection to avoid transaction deadlocks |
1674 | $conn = $lb->getMaintenanceConnectionRef( DB_PRIMARY, [], $this->dbDomain ); |
1675 | } else { |
1676 | // If the RDBMS has row/table/page level locking, then use separate auto-commit |
1677 | // connection to avoid needless contention and deadlocks. |
1678 | $conn = $lb->getMaintenanceConnectionRef( |
1679 | $this->replicaOnly ? DB_REPLICA : DB_PRIMARY, |
1680 | [], |
1681 | $this->dbDomain, |
1682 | $lb::CONN_TRX_AUTOCOMMIT |
1683 | ); |
1684 | } |
1685 | |
1686 | // Make sure any errors are thrown now while we can more easily handle them |
1687 | $conn->ensureConnection(); |
1688 | return $conn; |
1689 | } |
1690 | |
1691 | /** |
1692 | * @param int $shardIndex |
1693 | * @param array $server Server config map |
1694 | * @return IMaintainableDatabase |
1695 | * @throws DBError |
1696 | */ |
1697 | private function getConnectionFromServerInfo( $shardIndex, array $server ) { |
1698 | if ( !isset( $this->conns[$shardIndex] ) ) { |
1699 | $server['logger'] = $this->logger; |
1700 | // Make sure this handle always uses autocommit mode, even if DBO_TRX is |
1701 | // configured. |
1702 | $server['flags'] &= ~DBO_TRX; |
1703 | |
1704 | /** @var IMaintainableDatabase $conn Auto-commit connection to the server */ |
1705 | $conn = MediaWikiServices::getInstance()->getDatabaseFactory() |
1706 | ->create( $server['type'], $server ); |
1707 | |
1708 | // Automatically create the objectcache table for sqlite as needed |
1709 | if ( $conn->getType() === 'sqlite' ) { |
1710 | $this->initSqliteDatabase( $conn ); |
1711 | } |
1712 | $this->conns[$shardIndex] = $conn; |
1713 | } |
1714 | |
1715 | // @phan-suppress-next-line PhanTypeMismatchReturnNullable False positive |
1716 | return $this->conns[$shardIndex]; |
1717 | } |
1718 | |
1719 | /** |
1720 | * Handle a DBError which occurred during a read operation. |
1721 | * |
1722 | * @param DBError $exception |
1723 | * @param int $shardIndex Server index |
1724 | */ |
1725 | private function handleDBError( DBError $exception, $shardIndex ) { |
1726 | if ( !$this->useLB && $exception instanceof DBConnectionError ) { |
1727 | unset( $this->conns[$shardIndex] ); // bug T103435 |
1728 | |
1729 | $now = $this->getCurrentTime(); |
1730 | if ( isset( $this->connFailureTimes[$shardIndex] ) ) { |
1731 | if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) { |
1732 | unset( $this->connFailureTimes[$shardIndex] ); |
1733 | unset( $this->connFailureErrors[$shardIndex] ); |
1734 | } else { |
1735 | $this->logger->debug( __METHOD__ . ": Server #$shardIndex already down" ); |
1736 | return; |
1737 | } |
1738 | } |
1739 | $this->logger->info( __METHOD__ . ": Server #$shardIndex down until " . ( $now + 60 ) ); |
1740 | $this->connFailureTimes[$shardIndex] = $now; |
1741 | $this->connFailureErrors[$shardIndex] = $exception; |
1742 | } |
1743 | $this->logger->error( "DBError: {$exception->getMessage()}", [ 'exception' => $exception ] ); |
1744 | if ( $exception instanceof DBConnectionError ) { |
1745 | $this->setLastError( self::ERR_UNREACHABLE ); |
1746 | $this->logger->warning( __METHOD__ . ": ignoring connection error" ); |
1747 | } else { |
1748 | $this->setLastError( self::ERR_UNEXPECTED ); |
1749 | $this->logger->warning( __METHOD__ . ": ignoring query error" ); |
1750 | } |
1751 | } |
1752 | |
1753 | /** |
1754 | * @param IMaintainableDatabase $db |
1755 | * @throws DBError |
1756 | */ |
1757 | private function initSqliteDatabase( IMaintainableDatabase $db ) { |
1758 | if ( $db->tableExists( 'objectcache', __METHOD__ ) ) { |
1759 | return; |
1760 | } |
1761 | // Use one table for SQLite; sharding does not seem to have much benefit |
1762 | $db->query( "PRAGMA journal_mode=WAL", __METHOD__ ); // this is permanent |
1763 | $db->startAtomic( __METHOD__ ); // atomic DDL |
1764 | try { |
1765 | $encTable = $db->tableName( 'objectcache' ); |
1766 | $encExptimeIndex = $db->addIdentifierQuotes( $db->tablePrefix() . 'exptime' ); |
1767 | $db->query( |
1768 | "CREATE TABLE $encTable (\n" . |
1769 | " keyname BLOB NOT NULL default '' PRIMARY KEY,\n" . |
1770 | " value BLOB,\n" . |
1771 | " exptime BLOB NOT NULL\n" . |
1772 | ")", |
1773 | __METHOD__ |
1774 | ); |
1775 | $db->query( "CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ ); |
1776 | $db->endAtomic( __METHOD__ ); |
1777 | } catch ( DBError $e ) { |
1778 | $db->rollback( __METHOD__ ); |
1779 | throw $e; |
1780 | } |
1781 | } |
1782 | |
1783 | /** |
1784 | * Create the shard tables on all databases |
1785 | * |
1786 | * This is typically called manually by a sysadmin via eval.php, e.g. for ParserCache: |
1787 | * |
1788 | * @code |
1789 | * ObjectCache::getInstance( 'myparsercache' )->createTables(); |
1790 | * @endcode |
1791 | * |
1792 | * This is different from `$services->getParserCache()->getCacheStorage()->createTables()`, |
1793 | * which would use the backend set via $wgParserCacheType, which shouldn't be |
1794 | * set yet for the backend you are creating shard tables on. The expectation |
1795 | * is to first add the new backend to $wgObjectCaches, run the above, and then enable |
1796 | * it for live ParserCache traffic by setting $wgParserCacheType. |
1797 | */ |
1798 | public function createTables() { |
1799 | foreach ( $this->getShardServerIndexes() as $shardIndex ) { |
1800 | $db = $this->getConnection( $shardIndex ); |
1801 | if ( in_array( $db->getType(), [ 'mysql', 'postgres' ], true ) ) { |
1802 | for ( $i = 0; $i < $this->numTableShards; $i++ ) { |
1803 | $encBaseTable = $db->tableName( 'objectcache' ); |
1804 | $encShardTable = $db->tableName( $this->getTableNameByShard( $i ) ); |
1805 | $db->query( "CREATE TABLE $encShardTable LIKE $encBaseTable", __METHOD__ ); |
1806 | } |
1807 | } |
1808 | } |
1809 | } |
1810 | |
1811 | /** |
1812 | * @return int[] List of server indexes |
1813 | */ |
1814 | private function getShardServerIndexes() { |
1815 | if ( $this->useLB ) { |
1816 | // LoadBalancer based configuration |
1817 | $shardIndexes = [ 0 ]; |
1818 | } else { |
1819 | // Striped array of database servers |
1820 | $shardIndexes = array_keys( $this->serverTags ); |
1821 | } |
1822 | |
1823 | return $shardIndexes; |
1824 | } |
1825 | |
1826 | /** |
1827 | * Silence the transaction profiler until the return value falls out of scope |
1828 | * |
1829 | * @return ScopedCallback|null |
1830 | */ |
1831 | private function silenceTransactionProfiler() { |
1832 | if ( $this->serverInfos ) { |
1833 | return null; // no TransactionProfiler injected anyway |
1834 | } |
1835 | return Profiler::instance()->getTransactionProfiler()->silenceForScope(); |
1836 | } |
1837 | } |