MediaWiki master
MysqlReplicationReporter.php
Go to the documentation of this file.
1<?php
7
8use InvalidArgumentException;
9use Psr\Log\LoggerInterface;
10use RuntimeException;
11use stdClass;
19
31 protected $lagDetectionOptions = [];
33 protected $useGTIDs = false;
35 private $replicationInfoRow;
36 // Cache getServerId() for 24 hours
37 private const SERVER_ID_CACHE_TTL = 86400;
38
40 private const LAG_STALE_WARN_THRESHOLD = 0.100;
41
50 public function __construct(
52 $logger,
57 ) {
58 parent::__construct( $topologyRole, $logger, $srvCache );
59 $this->lagDetectionMethod = $lagDetectionMethod;
60 $this->lagDetectionOptions = $lagDetectionOptions;
61 $this->useGTIDs = $useGTIDs;
62 }
63
65 protected function doGetLag( IDatabase $conn ) {
66 if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
67 return $this->getLagFromPtHeartbeat( $conn );
68 } else {
69 return $this->getLagFromSlaveStatus( $conn );
70 }
71 }
72
77 protected function getLagFromSlaveStatus( IDatabase $conn ) {
78 $query = new Query(
79 'SHOW SLAVE STATUS',
80 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
81 'SHOW',
82 null,
83 'SHOW SLAVE STATUS'
84 );
85 $res = $conn->query( $query, __METHOD__ );
86 $row = $res ? $res->fetchObject() : false;
87 // If the server is not replicating, there will be no row
88 if ( $row && strval( $row->Seconds_Behind_Master ) !== '' ) {
89 // https://mariadb.com/kb/en/delayed-replication/
90 // https://dev.mysql.com/doc/refman/5.6/en/replication-delayed.html
91 return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
92 }
93
94 return false;
95 }
96
101 protected function getLagFromPtHeartbeat( IDatabase $conn ) {
102 $currentTrxInfo = $this->getRecordedTransactionLagStatus( $conn );
103 if ( $currentTrxInfo ) {
104 // There is an active transaction and the initial lag was already queried
105 $staleness = microtime( true ) - $currentTrxInfo['since'];
106 if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
107 // Avoid returning higher and higher lag value due to snapshot age
108 // given that the isolation level will typically be REPEATABLE-READ
109 // but UTC_TIMESTAMP() is not affected by point-in-time snapshots
110 $this->logger->warning(
111 "Using cached lag value for {db_server} due to active transaction",
112 $this->getLogContext( $conn, [
113 'method' => __METHOD__,
114 'age' => $staleness,
115 'exception' => new RuntimeException()
116 ] )
117 );
118 }
119
120 return $currentTrxInfo['lag'];
121 }
122
123 $ago = $this->fetchSecondsSinceHeartbeat( $conn );
124 if ( $ago !== null ) {
125 return max( $ago, 0.0 );
126 }
127
128 $this->logger->error(
129 "Unable to find pt-heartbeat row for {db_server}",
130 $this->getLogContext( $conn, [
131 'method' => __METHOD__
132 ] )
133 );
134
135 return false;
136 }
137
143 protected function fetchSecondsSinceHeartbeat( IDatabase $conn ) {
144 // Some setups might have pt-heartbeat running on each replica server.
145 // Exclude the row for events originating on this DB server. Assume that
146 // there is only one active replication channel and that any other row
147 // getting updates must be the row for the primary DB server.
148 $where = $conn->makeList(
149 $this->lagDetectionOptions['conds'] ?? [ 'server_id != @@server_id' ],
150 ISQLPlatform::LIST_AND
151 );
152 // User mysql server time so that query time and trip time are not counted.
153 // Use ORDER BY for channel based queries since that field might not be UNIQUE.
154 $query = new Query(
155 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
156 "FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
157 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
158 'SELECT',
159 null,
160 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
161 "FROM heartbeat.heartbeat WHERE ? ORDER BY ts DESC LIMIT 1",
162 );
163 $res = $conn->query( $query, __METHOD__ );
164 $row = $res ? $res->fetchObject() : false;
165
166 return $row ? ( $row->us_ago / 1e6 ) : null;
167 }
168
170 public function getApproximateLagStatus( IDatabase $conn ) {
171 if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
172 // Disable caching since this is fast enough and we don't want
173 // to be *too* pessimistic by having both the cache TTL and the
174 // pt-heartbeat interval count as lag in getSessionLagStatus()
175 return parent::getApproximateLagStatus( $conn );
176 }
177
178 $key = $this->srvCache->makeGlobalKey( 'mysql-lag', $conn->getServerName() );
179 $approxLag = $this->srvCache->get( $key );
180 if ( !$approxLag ) {
181 $approxLag = parent::getApproximateLagStatus( $conn );
182 $this->srvCache->set( $key, $approxLag, 1 );
183 }
184
185 return $approxLag;
186 }
187
193 public function getReplicationSafetyInfo( IDatabase $conn, $fname ) {
194 if ( $this->replicationInfoRow === null ) {
195 $this->replicationInfoRow = $conn->selectRow(
196 [],
197 [
198 'innodb_autoinc_lock_mode' => '@@innodb_autoinc_lock_mode',
199 'binlog_format' => '@@binlog_format',
200 ],
201 [],
202 $fname
203 );
204 }
205 return $this->replicationInfoRow;
206 }
207
211 protected function useGTIDs() {
212 return $this->useGTIDs;
213 }
214
216 public function primaryPosWait( IDatabase $conn, DBPrimaryPos $pos, $timeout ) {
217 if ( !( $pos instanceof MySQLPrimaryPos ) ) {
218 throw new InvalidArgumentException( "Position not an instance of MySQLPrimaryPos" );
219 }
220
221 if ( $this->topologyRole === IDatabase::ROLE_STATIC_CLONE ) {
222 $this->logger->debug(
223 "Bypassed replication wait; database has a static dataset",
224 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
225 );
226
227 return 0; // this is a copy of a read-only dataset with no primary DB
228 } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
229 $this->logger->debug(
230 "Bypassed replication wait; replication known to have reached {raw_pos}",
231 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
232 );
233
234 return 0; // already reached this point for sure
235 }
236
237 // Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
238 if ( $pos->getGTIDs() ) {
239 // Get the GTIDs from this replica server too see the domains (channels)
240 $refPos = $this->getReplicaPos( $conn );
241 if ( !$refPos ) {
242 $this->logger->error(
243 "Could not get replication position on replica DB to compare to {raw_pos}",
244 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
245 );
246
247 return -1; // this is the primary DB itself?
248 }
249 // GTIDs with domains (channels) that are active and are present on the replica
250 $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
251 if ( !$gtidsWait ) {
252 $this->logger->error(
253 "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
254 $this->getLogContext( $conn, [
255 'method' => __METHOD__,
256 'raw_pos' => $pos,
257 'current_pos' => $refPos
258 ] )
259 );
260
261 return -1; // $pos is from the wrong cluster?
262 }
263 // Wait on the GTID set
264 $gtidArg = $conn->addQuotes( implode( ',', $gtidsWait ) );
265 if ( str_contains( $gtidArg, ':' ) ) {
266 // MySQL GTIDs, e.g "source_id:transaction_id"
267 $query = new Query(
268 "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)",
269 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
270 'SELECT',
271 null,
272 "SELECT WAIT_FOR_EXECUTED_GTID_SET(?, ?)"
273 );
274 } else {
275 // MariaDB GTIDs, e.g."domain:server:sequence"
276 $query = new Query(
277 "SELECT MASTER_GTID_WAIT($gtidArg, $timeout)",
278 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
279 'SELECT',
280 null,
281 "SELECT MASTER_GTID_WAIT(?, ?)"
282 );
283 }
284 $waitPos = implode( ',', $gtidsWait );
285 } else {
286 // Wait on the binlog coordinates
287 $encFile = $conn->addQuotes( $pos->getLogFile() );
288 // @phan-suppress-next-line PhanTypeArraySuspiciousNullable
289 $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
290 $query = new Query(
291 "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)",
292 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
293 'SELECT',
294 null,
295 "SELECT MASTER_POS_WAIT(?, ?, ?)"
296 );
297 $waitPos = $pos->__toString();
298 }
299
300 $start = microtime( true );
301 $res = $conn->query( $query, __METHOD__ );
302 $row = $res->fetchRow();
303 $seconds = max( microtime( true ) - $start, 0 );
304
305 // Result can be NULL (error), -1 (timeout), or 0+ per the MySQL manual
306 $status = ( $row[0] !== null ) ? intval( $row[0] ) : null;
307 if ( $status === null ) {
308 $this->logger->error(
309 "An error occurred while waiting for replication to reach {wait_pos}",
310 $this->getLogContext( $conn, [
311 'raw_pos' => $pos,
312 'wait_pos' => $waitPos,
313 'sql' => $query->getSQL(),
314 'seconds_waited' => $seconds,
315 'exception' => new RuntimeException()
316 ] )
317 );
318 } elseif ( $status < 0 ) {
319 $this->logger->info(
320 "Timed out waiting for replication to reach {wait_pos}",
321 $this->getLogContext( $conn, [
322 'raw_pos' => $pos,
323 'wait_pos' => $waitPos,
324 'timeout' => $timeout,
325 'sql' => $query->getSQL(),
326 'seconds_waited' => $seconds,
327 ] )
328 );
329 } elseif ( $status >= 0 ) {
330 $this->logger->debug(
331 "Replication has reached {wait_pos}",
332 $this->getLogContext( $conn, [
333 'raw_pos' => $pos,
334 'wait_pos' => $waitPos,
335 'seconds_waited' => $seconds,
336 ] )
337 );
338 // Remember that this position was reached to save queries next time
339 $this->lastKnownReplicaPos = $pos;
340 }
341
342 return $status;
343 }
344
351 public function getReplicaPos( IDatabase $conn ) {
352 $now = microtime( true ); // as-of-time *before* fetching GTID variables
353
354 if ( $this->useGTIDs() ) {
355 // Try to use GTIDs, fallbacking to binlog positions if not possible
356 $data = $this->getServerGTIDs( $conn, __METHOD__ );
357 // Use gtid_slave_pos for MariaDB and gtid_executed for MySQL
358 foreach ( [ 'gtid_slave_pos', 'gtid_executed' ] as $name ) {
359 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
360 return new MySQLPrimaryPos( $data[$name], $now );
361 }
362 }
363 }
364
365 $data = $this->getServerRoleStatus( $conn, 'SLAVE', __METHOD__ );
366 if ( $data && strlen( $data['Relay_Master_Log_File'] ) ) {
367 return new MySQLPrimaryPos(
368 "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
369 $now
370 );
371 }
372
373 return false;
374 }
375
382 public function getPrimaryPos( IDatabase $conn ) {
383 $now = microtime( true ); // as-of-time *before* fetching GTID variables
384
385 $pos = false;
386 if ( $this->useGTIDs() ) {
387 // Try to use GTIDs, fallbacking to binlog positions if not possible
388 $data = $this->getServerGTIDs( $conn, __METHOD__ );
389 // Use gtid_binlog_pos for MariaDB and gtid_executed for MySQL
390 foreach ( [ 'gtid_binlog_pos', 'gtid_executed' ] as $name ) {
391 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
392 $pos = new MySQLPrimaryPos( $data[$name], $now );
393 break;
394 }
395 }
396 // Filter domains that are inactive or not relevant to the session
397 if ( $pos ) {
398 $pos->setActiveOriginServerId( $this->getServerId( $conn ) );
399 $pos->setActiveOriginServerUUID( $this->getServerUUID( $conn ) );
400 if ( isset( $data['gtid_domain_id'] ) ) {
401 $pos->setActiveDomain( $data['gtid_domain_id'] );
402 }
403 }
404 }
405
406 if ( !$pos ) {
407 $data = $this->getServerRoleStatus( $conn, 'MASTER', __METHOD__ );
408 if ( $data && strlen( $data['File'] ) ) {
409 $pos = new MySQLPrimaryPos( "{$data['File']}/{$data['Position']}", $now );
410 }
411 }
412
413 return $pos;
414 }
415
421 protected function getServerId( IDatabase $conn ) {
422 $fname = __METHOD__;
423 return $this->srvCache->getWithSetCallback(
424 $this->srvCache->makeGlobalKey( 'mysql-server-id', $conn->getServerName() ),
425 self::SERVER_ID_CACHE_TTL,
426 static function () use ( $conn, $fname ) {
427 $query = new Query(
428 "SELECT @@server_id AS id",
429 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
430 'SELECT',
431 null,
432 "SELECT @@server_id AS id"
433 );
434 $res = $conn->query( $query, $fname );
435
436 return $res->fetchObject()->id;
437 }
438 );
439 }
440
446 protected function getServerUUID( IDatabase $conn ) {
447 $fname = __METHOD__;
448 return $this->srvCache->getWithSetCallback(
449 $this->srvCache->makeGlobalKey( 'mysql-server-uuid', $conn->getServerName() ),
450 self::SERVER_ID_CACHE_TTL,
451 static function () use ( $conn, $fname ) {
452 $query = new Query(
453 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'",
454 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
455 'SHOW',
456 null,
457 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'"
458 );
459 $res = $conn->query( $query, $fname );
460 $row = $res->fetchObject();
461
462 return $row ? $row->Value : null;
463 }
464 );
465 }
466
472 protected function getServerGTIDs( IDatabase $conn, $fname ) {
473 $map = [];
474
475 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
476
477 // Get global-only variables like gtid_executed
478 $query = new Query(
479 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'",
480 $flags,
481 'SHOW',
482 null,
483 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'"
484 );
485 $res = $conn->query( $query, $fname );
486 foreach ( $res as $row ) {
487 $map[$row->Variable_name] = $row->Value;
488 }
489 // Get session-specific (e.g. gtid_domain_id since that is were writes will log)
490 $query = new Query(
491 "SHOW SESSION VARIABLES LIKE 'gtid_%'",
492 $flags,
493 'SHOW',
494 null,
495 "SHOW SESSION VARIABLES LIKE 'gtid_%'"
496 );
497 $res = $conn->query( $query, $fname );
498 foreach ( $res as $row ) {
499 $map[$row->Variable_name] = $row->Value;
500 }
501
502 return $map;
503 }
504
511 protected function getServerRoleStatus( IDatabase $conn, $role, $fname ) {
512 $query = new Query(
513 "SHOW $role STATUS",
514 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
515 'SHOW',
516 null,
517 "SHOW $role STATUS"
518 );
519 $res = $conn->query( $query, $fname );
520 $row = $res ? $res->fetchRow() : false;
521
522 return ( $row ?: null );
523 }
524
525}
Abstract class for any ephemeral data store.
Definition BagOStuff.php:73
DBPrimaryPos implementation for MySQL and MariaDB.
Holds information on Query to be executed.
Definition Query.php:17
primaryPosWait(IDatabase $conn, DBPrimaryPos $pos, $timeout)
int|null
doGetLag(IDatabase $conn)
Get the amount of replication lag for this database server.Callers should avoid using this method whi...
getApproximateLagStatus(IDatabase $conn)
Get a replica DB lag estimate for this server at the start of a transaction.This is a no-op unless th...
getPrimaryPos(IDatabase $conn)
Get the position of the primary DB from SHOW MASTER STATUS.
getReplicaPos(IDatabase $conn)
Get the position of the primary DB from SHOW SLAVE STATUS.
string $lagDetectionMethod
Method to detect replica DB lag.
__construct( $topologyRole, $logger, $srvCache, $lagDetectionMethod, $lagDetectionOptions, $useGTIDs)
getLogContext(IDatabase $conn, array $extras=[])
Create a log context to pass to PSR-3 logger functions.
string $topologyRole
Replication topology role of the server; one of the class ROLE_* constants.
getRecordedTransactionLagStatus(IDatabase $conn)
Get the replica DB lag when the current transaction started.
An object representing a primary or replica DB position in a replicated setup.
addQuotes( $s)
Escape and quote a raw value string for use in a SQL query.
Interface to a relational database.
Definition IDatabase.php:31
const ROLE_STATIC_CLONE
Replica server within a static dataset.
Definition IDatabase.php:98
query( $sql, $fname=__METHOD__, $flags=0)
Run an SQL query statement and return the result.
selectRow( $tables, $vars, $conds, $fname=__METHOD__, $options=[], $join_conds=[])
Wrapper to IDatabase::select() that only fetches one row (via LIMIT)
getServerName()
Get the readable name for the server.
Interface for query language.
makeList(array $a, $mode=self::LIST_COMMA)
Makes an encoded list of strings from an array.