MediaWiki master
MysqlReplicationReporter.php
Go to the documentation of this file.
1<?php
21
22use InvalidArgumentException;
23use Psr\Log\LoggerInterface;
24use RuntimeException;
25use stdClass;
33
45 protected $lagDetectionOptions = [];
47 protected $useGTIDs = false;
49 private $replicationInfoRow;
50 // Cache getServerId() for 24 hours
51 private const SERVER_ID_CACHE_TTL = 86400;
52
54 private const LAG_STALE_WARN_THRESHOLD = 0.100;
55
64 public function __construct(
66 $logger,
71 ) {
72 parent::__construct( $topologyRole, $logger, $srvCache );
73 $this->lagDetectionMethod = $lagDetectionMethod;
74 $this->lagDetectionOptions = $lagDetectionOptions;
75 $this->useGTIDs = $useGTIDs;
76 }
77
78 protected function doGetLag( IDatabase $conn ) {
79 if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
80 return $this->getLagFromPtHeartbeat( $conn );
81 } else {
82 return $this->getLagFromSlaveStatus( $conn );
83 }
84 }
85
90 protected function getLagFromSlaveStatus( IDatabase $conn ) {
91 $query = new Query(
92 'SHOW SLAVE STATUS',
93 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
94 'SHOW',
95 null,
96 'SHOW SLAVE STATUS'
97 );
98 $res = $conn->query( $query, __METHOD__ );
99 $row = $res ? $res->fetchObject() : false;
100 // If the server is not replicating, there will be no row
101 if ( $row && strval( $row->Seconds_Behind_Master ) !== '' ) {
102 // https://mariadb.com/kb/en/delayed-replication/
103 // https://dev.mysql.com/doc/refman/5.6/en/replication-delayed.html
104 return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
105 }
106
107 return false;
108 }
109
114 protected function getLagFromPtHeartbeat( IDatabase $conn ) {
115 $currentTrxInfo = $this->getRecordedTransactionLagStatus( $conn );
116 if ( $currentTrxInfo ) {
117 // There is an active transaction and the initial lag was already queried
118 $staleness = microtime( true ) - $currentTrxInfo['since'];
119 if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
120 // Avoid returning higher and higher lag value due to snapshot age
121 // given that the isolation level will typically be REPEATABLE-READ
122 // but UTC_TIMESTAMP() is not affected by point-in-time snapshots
123 $this->logger->warning(
124 "Using cached lag value for {db_server} due to active transaction",
125 $this->getLogContext( $conn, [
126 'method' => __METHOD__,
127 'age' => $staleness,
128 'exception' => new RuntimeException()
129 ] )
130 );
131 }
132
133 return $currentTrxInfo['lag'];
134 }
135
136 $ago = $this->fetchSecondsSinceHeartbeat( $conn );
137 if ( $ago !== null ) {
138 return max( $ago, 0.0 );
139 }
140
141 $this->logger->error(
142 "Unable to find pt-heartbeat row for {db_server}",
143 $this->getLogContext( $conn, [
144 'method' => __METHOD__
145 ] )
146 );
147
148 return false;
149 }
150
156 protected function fetchSecondsSinceHeartbeat( IDatabase $conn ) {
157 // Some setups might have pt-heartbeat running on each replica server.
158 // Exclude the row for events originating on this DB server. Assume that
159 // there is only one active replication channel and that any other row
160 // getting updates must be the row for the primary DB server.
161 $where = $conn->makeList(
162 $this->lagDetectionOptions['conds'] ?? [ 'server_id != @@server_id' ],
163 ISQLPlatform::LIST_AND
164 );
165 // User mysql server time so that query time and trip time are not counted.
166 // Use ORDER BY for channel based queries since that field might not be UNIQUE.
167 $query = new Query(
168 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
169 "FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
170 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
171 'SELECT',
172 null,
173 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
174 "FROM heartbeat.heartbeat WHERE ? ORDER BY ts DESC LIMIT 1",
175 );
176 $res = $conn->query( $query, __METHOD__ );
177 $row = $res ? $res->fetchObject() : false;
178
179 return $row ? ( $row->us_ago / 1e6 ) : null;
180 }
181
182 public function getApproximateLagStatus( IDatabase $conn ) {
183 if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
184 // Disable caching since this is fast enough and we don't want
185 // to be *too* pessimistic by having both the cache TTL and the
186 // pt-heartbeat interval count as lag in getSessionLagStatus()
187 return parent::getApproximateLagStatus( $conn );
188 }
189
190 $key = $this->srvCache->makeGlobalKey( 'mysql-lag', $conn->getServerName() );
191 $approxLag = $this->srvCache->get( $key );
192 if ( !$approxLag ) {
193 $approxLag = parent::getApproximateLagStatus( $conn );
194 $this->srvCache->set( $key, $approxLag, 1 );
195 }
196
197 return $approxLag;
198 }
199
205 public function getReplicationSafetyInfo( IDatabase $conn, $fname ) {
206 if ( $this->replicationInfoRow === null ) {
207 $this->replicationInfoRow = $conn->selectRow(
208 [],
209 [
210 'innodb_autoinc_lock_mode' => '@@innodb_autoinc_lock_mode',
211 'binlog_format' => '@@binlog_format',
212 ],
213 [],
214 $fname
215 );
216 }
217 return $this->replicationInfoRow;
218 }
219
223 protected function useGTIDs() {
224 return $this->useGTIDs;
225 }
226
227 public function primaryPosWait( IDatabase $conn, DBPrimaryPos $pos, $timeout ) {
228 if ( !( $pos instanceof MySQLPrimaryPos ) ) {
229 throw new InvalidArgumentException( "Position not an instance of MySQLPrimaryPos" );
230 }
231
232 if ( $this->topologyRole === IDatabase::ROLE_STATIC_CLONE ) {
233 $this->logger->debug(
234 "Bypassed replication wait; database has a static dataset",
235 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
236 );
237
238 return 0; // this is a copy of a read-only dataset with no primary DB
239 } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
240 $this->logger->debug(
241 "Bypassed replication wait; replication known to have reached {raw_pos}",
242 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
243 );
244
245 return 0; // already reached this point for sure
246 }
247
248 // Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
249 if ( $pos->getGTIDs() ) {
250 // Get the GTIDs from this replica server too see the domains (channels)
251 $refPos = $this->getReplicaPos( $conn );
252 if ( !$refPos ) {
253 $this->logger->error(
254 "Could not get replication position on replica DB to compare to {raw_pos}",
255 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
256 );
257
258 return -1; // this is the primary DB itself?
259 }
260 // GTIDs with domains (channels) that are active and are present on the replica
261 $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
262 if ( !$gtidsWait ) {
263 $this->logger->error(
264 "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
265 $this->getLogContext( $conn, [
266 'method' => __METHOD__,
267 'raw_pos' => $pos,
268 'current_pos' => $refPos
269 ] )
270 );
271
272 return -1; // $pos is from the wrong cluster?
273 }
274 // Wait on the GTID set
275 $gtidArg = $conn->addQuotes( implode( ',', $gtidsWait ) );
276 if ( strpos( $gtidArg, ':' ) !== false ) {
277 // MySQL GTIDs, e.g "source_id:transaction_id"
278 $query = new Query(
279 "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)",
280 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
281 'SELECT',
282 null,
283 "SELECT WAIT_FOR_EXECUTED_GTID_SET(?, ?)"
284 );
285 } else {
286 // MariaDB GTIDs, e.g."domain:server:sequence"
287 $query = new Query(
288 "SELECT MASTER_GTID_WAIT($gtidArg, $timeout)",
289 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
290 'SELECT',
291 null,
292 "SELECT MASTER_GTID_WAIT(?, ?)"
293 );
294 }
295 $waitPos = implode( ',', $gtidsWait );
296 } else {
297 // Wait on the binlog coordinates
298 $encFile = $conn->addQuotes( $pos->getLogFile() );
299 // @phan-suppress-next-line PhanTypeArraySuspiciousNullable
300 $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
301 $query = new Query(
302 "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)",
303 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
304 'SELECT',
305 null,
306 "SELECT MASTER_POS_WAIT(?, ?, ?)"
307 );
308 $waitPos = $pos->__toString();
309 }
310
311 $start = microtime( true );
312 $res = $conn->query( $query, __METHOD__ );
313 $row = $res->fetchRow();
314 $seconds = max( microtime( true ) - $start, 0 );
315
316 // Result can be NULL (error), -1 (timeout), or 0+ per the MySQL manual
317 $status = ( $row[0] !== null ) ? intval( $row[0] ) : null;
318 if ( $status === null ) {
319 $this->logger->error(
320 "An error occurred while waiting for replication to reach {wait_pos}",
321 $this->getLogContext( $conn, [
322 'raw_pos' => $pos,
323 'wait_pos' => $waitPos,
324 'sql' => $query->getSQL(),
325 'seconds_waited' => $seconds,
326 'exception' => new RuntimeException()
327 ] )
328 );
329 } elseif ( $status < 0 ) {
330 $this->logger->info(
331 "Timed out waiting for replication to reach {wait_pos}",
332 $this->getLogContext( $conn, [
333 'raw_pos' => $pos,
334 'wait_pos' => $waitPos,
335 'timeout' => $timeout,
336 'sql' => $query->getSQL(),
337 'seconds_waited' => $seconds,
338 ] )
339 );
340 } elseif ( $status >= 0 ) {
341 $this->logger->debug(
342 "Replication has reached {wait_pos}",
343 $this->getLogContext( $conn, [
344 'raw_pos' => $pos,
345 'wait_pos' => $waitPos,
346 'seconds_waited' => $seconds,
347 ] )
348 );
349 // Remember that this position was reached to save queries next time
350 $this->lastKnownReplicaPos = $pos;
351 }
352
353 return $status;
354 }
355
362 public function getReplicaPos( IDatabase $conn ) {
363 $now = microtime( true ); // as-of-time *before* fetching GTID variables
364
365 if ( $this->useGTIDs() ) {
366 // Try to use GTIDs, fallbacking to binlog positions if not possible
367 $data = $this->getServerGTIDs( $conn, __METHOD__ );
368 // Use gtid_slave_pos for MariaDB and gtid_executed for MySQL
369 foreach ( [ 'gtid_slave_pos', 'gtid_executed' ] as $name ) {
370 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
371 return new MySQLPrimaryPos( $data[$name], $now );
372 }
373 }
374 }
375
376 $data = $this->getServerRoleStatus( $conn, 'SLAVE', __METHOD__ );
377 if ( $data && strlen( $data['Relay_Master_Log_File'] ) ) {
378 return new MySQLPrimaryPos(
379 "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
380 $now
381 );
382 }
383
384 return false;
385 }
386
393 public function getPrimaryPos( IDatabase $conn ) {
394 $now = microtime( true ); // as-of-time *before* fetching GTID variables
395
396 $pos = false;
397 if ( $this->useGTIDs() ) {
398 // Try to use GTIDs, fallbacking to binlog positions if not possible
399 $data = $this->getServerGTIDs( $conn, __METHOD__ );
400 // Use gtid_binlog_pos for MariaDB and gtid_executed for MySQL
401 foreach ( [ 'gtid_binlog_pos', 'gtid_executed' ] as $name ) {
402 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
403 $pos = new MySQLPrimaryPos( $data[$name], $now );
404 break;
405 }
406 }
407 // Filter domains that are inactive or not relevant to the session
408 if ( $pos ) {
409 $pos->setActiveOriginServerId( $this->getServerId( $conn ) );
410 $pos->setActiveOriginServerUUID( $this->getServerUUID( $conn ) );
411 if ( isset( $data['gtid_domain_id'] ) ) {
412 $pos->setActiveDomain( $data['gtid_domain_id'] );
413 }
414 }
415 }
416
417 if ( !$pos ) {
418 $data = $this->getServerRoleStatus( $conn, 'MASTER', __METHOD__ );
419 if ( $data && strlen( $data['File'] ) ) {
420 $pos = new MySQLPrimaryPos( "{$data['File']}/{$data['Position']}", $now );
421 }
422 }
423
424 return $pos;
425 }
426
432 protected function getServerId( IDatabase $conn ) {
433 $fname = __METHOD__;
434 return $this->srvCache->getWithSetCallback(
435 $this->srvCache->makeGlobalKey( 'mysql-server-id', $conn->getServerName() ),
436 self::SERVER_ID_CACHE_TTL,
437 static function () use ( $conn, $fname ) {
438 $query = new Query(
439 "SELECT @@server_id AS id",
440 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
441 'SELECT',
442 null,
443 "SELECT @@server_id AS id"
444 );
445 $res = $conn->query( $query, $fname );
446
447 return $res->fetchObject()->id;
448 }
449 );
450 }
451
457 protected function getServerUUID( IDatabase $conn ) {
458 $fname = __METHOD__;
459 return $this->srvCache->getWithSetCallback(
460 $this->srvCache->makeGlobalKey( 'mysql-server-uuid', $conn->getServerName() ),
461 self::SERVER_ID_CACHE_TTL,
462 static function () use ( $conn, $fname ) {
463 $query = new Query(
464 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'",
465 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
466 'SHOW',
467 null,
468 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'"
469 );
470 $res = $conn->query( $query, $fname );
471 $row = $res->fetchObject();
472
473 return $row ? $row->Value : null;
474 }
475 );
476 }
477
483 protected function getServerGTIDs( IDatabase $conn, $fname ) {
484 $map = [];
485
486 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
487
488 // Get global-only variables like gtid_executed
489 $query = new Query(
490 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'",
491 $flags,
492 'SHOW',
493 null,
494 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'"
495 );
496 $res = $conn->query( $query, $fname );
497 foreach ( $res as $row ) {
498 $map[$row->Variable_name] = $row->Value;
499 }
500 // Get session-specific (e.g. gtid_domain_id since that is were writes will log)
501 $query = new Query(
502 "SHOW SESSION VARIABLES LIKE 'gtid_%'",
503 $flags,
504 'SHOW',
505 null,
506 "SHOW SESSION VARIABLES LIKE 'gtid_%'"
507 );
508 $res = $conn->query( $query, $fname );
509 foreach ( $res as $row ) {
510 $map[$row->Variable_name] = $row->Value;
511 }
512
513 return $map;
514 }
515
522 protected function getServerRoleStatus( IDatabase $conn, $role, $fname ) {
523 $query = new Query(
524 "SHOW $role STATUS",
525 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
526 'SHOW',
527 null,
528 "SHOW $role STATUS"
529 );
530 $res = $conn->query( $query, $fname );
531 $row = $res ? $res->fetchRow() : false;
532
533 return ( $row ?: null );
534 }
535
536}
Abstract class for any ephemeral data store.
Definition BagOStuff.php:87
DBPrimaryPos implementation for MySQL and MariaDB.
Holds information on Query to be executed.
Definition Query.php:31
primaryPosWait(IDatabase $conn, DBPrimaryPos $pos, $timeout)
doGetLag(IDatabase $conn)
Get the amount of replication lag for this database server.
getApproximateLagStatus(IDatabase $conn)
Get a replica DB lag estimate for this server at the start of a transaction.
getPrimaryPos(IDatabase $conn)
Get the position of the primary DB from SHOW MASTER STATUS.
getReplicaPos(IDatabase $conn)
Get the position of the primary DB from SHOW SLAVE STATUS.
string $lagDetectionMethod
Method to detect replica DB lag.
__construct( $topologyRole, $logger, $srvCache, $lagDetectionMethod, $lagDetectionOptions, $useGTIDs)
getLogContext(IDatabase $conn, array $extras=[])
Create a log context to pass to PSR-3 logger functions.
string $topologyRole
Replication topology role of the server; one of the class ROLE_* constants.
getRecordedTransactionLagStatus(IDatabase $conn)
Get the replica DB lag when the current transaction started.
An object representing a primary or replica DB position in a replicated setup.
addQuotes( $s)
Escape and quote a raw value string for use in a SQL query.
Interface to a relational database.
Definition IDatabase.php:45
const ROLE_STATIC_CLONE
Replica server within a static dataset.
query( $sql, $fname=__METHOD__, $flags=0)
Run an SQL query statement and return the result.
selectRow( $tables, $vars, $conds, $fname=__METHOD__, $options=[], $join_conds=[])
Wrapper to IDatabase::select() that only fetches one row (via LIMIT)
getServerName()
Get the readable name for the server.
Interface for query language.
makeList(array $a, $mode=self::LIST_COMMA)
Makes an encoded list of strings from an array.