MediaWiki master
MysqlReplicationReporter.php
Go to the documentation of this file.
1<?php
21
22use InvalidArgumentException;
23use RuntimeException;
24use stdClass;
31
43 protected $lagDetectionOptions = [];
45 protected $useGTIDs = false;
47 private $replicationInfoRow;
48 // Cache getServerId() for 24 hours
49 private const SERVER_ID_CACHE_TTL = 86400;
50
52 private const LAG_STALE_WARN_THRESHOLD = 0.100;
53
54 public function __construct(
56 $logger,
61 ) {
62 parent::__construct( $topologyRole, $logger, $srvCache );
63 $this->lagDetectionMethod = $lagDetectionMethod;
64 $this->lagDetectionOptions = $lagDetectionOptions;
65 $this->useGTIDs = $useGTIDs;
66 }
67
68 protected function doGetLag( IDatabase $conn ) {
69 if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
70 return $this->getLagFromPtHeartbeat( $conn );
71 } else {
72 return $this->getLagFromSlaveStatus( $conn );
73 }
74 }
75
80 protected function getLagFromSlaveStatus( IDatabase $conn ) {
81 $query = new Query(
82 'SHOW SLAVE STATUS',
83 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
84 'SHOW',
85 null,
86 'SHOW SLAVE STATUS'
87 );
88 $res = $conn->query( $query );
89 $row = $res ? $res->fetchObject() : false;
90 // If the server is not replicating, there will be no row
91 if ( $row && strval( $row->Seconds_Behind_Master ) !== '' ) {
92 // https://mariadb.com/kb/en/delayed-replication/
93 // https://dev.mysql.com/doc/refman/5.6/en/replication-delayed.html
94 return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
95 }
96
97 return false;
98 }
99
104 protected function getLagFromPtHeartbeat( IDatabase $conn ) {
105 $currentTrxInfo = $this->getRecordedTransactionLagStatus( $conn );
106 if ( $currentTrxInfo ) {
107 // There is an active transaction and the initial lag was already queried
108 $staleness = microtime( true ) - $currentTrxInfo['since'];
109 if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
110 // Avoid returning higher and higher lag value due to snapshot age
111 // given that the isolation level will typically be REPEATABLE-READ
112 // but UTC_TIMESTAMP() is not affected by point-in-time snapshots
113 $this->logger->warning(
114 "Using cached lag value for {db_server} due to active transaction",
115 $this->getLogContext( $conn, [
116 'method' => __METHOD__,
117 'age' => $staleness,
118 'exception' => new RuntimeException()
119 ] )
120 );
121 }
122
123 return $currentTrxInfo['lag'];
124 }
125
126 $ago = $this->fetchSecondsSinceHeartbeat( $conn );
127 if ( $ago !== null ) {
128 return max( $ago, 0.0 );
129 }
130
131 $this->logger->error(
132 "Unable to find pt-heartbeat row for {db_server}",
133 $this->getLogContext( $conn, [
134 'method' => __METHOD__
135 ] )
136 );
137
138 return false;
139 }
140
146 protected function fetchSecondsSinceHeartbeat( IDatabase $conn ) {
147 // Some setups might have pt-heartbeat running on each replica server.
148 // Exclude the row for events originating on this DB server. Assume that
149 // there is only one active replication channel and that any other row
150 // getting updates must be the row for the primary DB server.
151 $where = $conn->makeList(
152 $this->lagDetectionOptions['conds'] ?? [ 'server_id != @@server_id' ],
153 ISQLPlatform::LIST_AND
154 );
155 // User mysql server time so that query time and trip time are not counted.
156 // Use ORDER BY for channel based queries since that field might not be UNIQUE.
157 $query = new Query(
158 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
159 "FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
160 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
161 'SELECT',
162 null,
163 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
164 "FROM heartbeat.heartbeat WHERE ? ORDER BY ts DESC LIMIT 1",
165 );
166 $res = $conn->query( $query );
167 $row = $res ? $res->fetchObject() : false;
168
169 return $row ? ( $row->us_ago / 1e6 ) : null;
170 }
171
172 public function getApproximateLagStatus( IDatabase $conn ) {
173 if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
174 // Disable caching since this is fast enough and we don't want
175 // to be *too* pessimistic by having both the cache TTL and the
176 // pt-heartbeat interval count as lag in getSessionLagStatus()
177 return parent::getApproximateLagStatus( $conn );
178 }
179
180 $key = $this->srvCache->makeGlobalKey( 'mysql-lag', $conn->getServerName() );
181 $approxLag = $this->srvCache->get( $key );
182 if ( !$approxLag ) {
183 $approxLag = parent::getApproximateLagStatus( $conn );
184 $this->srvCache->set( $key, $approxLag, 1 );
185 }
186
187 return $approxLag;
188 }
189
194 public function getReplicationSafetyInfo( IDatabase $conn ) {
195 if ( $this->replicationInfoRow === null ) {
196 $this->replicationInfoRow = $conn->selectRow(
197 [],
198 [
199 'innodb_autoinc_lock_mode' => '@@innodb_autoinc_lock_mode',
200 'binlog_format' => '@@binlog_format',
201 ],
202 [],
203 __METHOD__
204 );
205 }
206 return $this->replicationInfoRow;
207 }
208
212 protected function useGTIDs() {
213 return $this->useGTIDs;
214 }
215
216 public function primaryPosWait( IDatabase $conn, DBPrimaryPos $pos, $timeout ) {
217 if ( !( $pos instanceof MySQLPrimaryPos ) ) {
218 throw new InvalidArgumentException( "Position not an instance of MySQLPrimaryPos" );
219 }
220
221 if ( $this->topologyRole === IDatabase::ROLE_STATIC_CLONE ) {
222 $this->logger->debug(
223 "Bypassed replication wait; database has a static dataset",
224 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
225 );
226
227 return 0; // this is a copy of a read-only dataset with no primary DB
228 } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
229 $this->logger->debug(
230 "Bypassed replication wait; replication known to have reached {raw_pos}",
231 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
232 );
233
234 return 0; // already reached this point for sure
235 }
236
237 // Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
238 if ( $pos->getGTIDs() ) {
239 // Get the GTIDs from this replica server too see the domains (channels)
240 $refPos = $this->getReplicaPos( $conn );
241 if ( !$refPos ) {
242 $this->logger->error(
243 "Could not get replication position on replica DB to compare to {raw_pos}",
244 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
245 );
246
247 return -1; // this is the primary DB itself?
248 }
249 // GTIDs with domains (channels) that are active and are present on the replica
250 $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
251 if ( !$gtidsWait ) {
252 $this->logger->error(
253 "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
254 $this->getLogContext( $conn, [
255 'method' => __METHOD__,
256 'raw_pos' => $pos,
257 'current_pos' => $refPos
258 ] )
259 );
260
261 return -1; // $pos is from the wrong cluster?
262 }
263 // Wait on the GTID set
264 $gtidArg = $conn->addQuotes( implode( ',', $gtidsWait ) );
265 if ( strpos( $gtidArg, ':' ) !== false ) {
266 // MySQL GTIDs, e.g "source_id:transaction_id"
267 $query = new Query(
268 "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)",
269 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
270 'SELECT',
271 null,
272 "SELECT WAIT_FOR_EXECUTED_GTID_SET(?, ?)"
273 );
274 } else {
275 // MariaDB GTIDs, e.g."domain:server:sequence"
276 $query = new Query(
277 "SELECT MASTER_GTID_WAIT($gtidArg, $timeout)",
278 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
279 'SELECT',
280 null,
281 "SELECT MASTER_GTID_WAIT(?, ?)"
282 );
283 }
284 $waitPos = implode( ',', $gtidsWait );
285 } else {
286 // Wait on the binlog coordinates
287 $encFile = $conn->addQuotes( $pos->getLogFile() );
288 // @phan-suppress-next-line PhanTypeArraySuspiciousNullable
289 $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
290 $query = new Query(
291 "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)",
292 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
293 'SELECT',
294 null,
295 "SELECT MASTER_POS_WAIT(?, ?, ?)"
296 );
297 $waitPos = $pos->__toString();
298 }
299
300 $start = microtime( true );
301 $res = $conn->query( $query, __METHOD__ );
302 $row = $res->fetchRow();
303 $seconds = max( microtime( true ) - $start, 0 );
304
305 // Result can be NULL (error), -1 (timeout), or 0+ per the MySQL manual
306 $status = ( $row[0] !== null ) ? intval( $row[0] ) : null;
307 if ( $status === null ) {
308 $this->logger->error(
309 "An error occurred while waiting for replication to reach {wait_pos}",
310 $this->getLogContext( $conn, [
311 'raw_pos' => $pos,
312 'wait_pos' => $waitPos,
313 'sql' => $query->getSQL(),
314 'seconds_waited' => $seconds,
315 'exception' => new RuntimeException()
316 ] )
317 );
318 } elseif ( $status < 0 ) {
319 $this->logger->info(
320 "Timed out waiting for replication to reach {wait_pos}",
321 $this->getLogContext( $conn, [
322 'raw_pos' => $pos,
323 'wait_pos' => $waitPos,
324 'timeout' => $timeout,
325 'sql' => $query->getSQL(),
326 'seconds_waited' => $seconds,
327 ] )
328 );
329 } elseif ( $status >= 0 ) {
330 $this->logger->debug(
331 "Replication has reached {wait_pos}",
332 $this->getLogContext( $conn, [
333 'raw_pos' => $pos,
334 'wait_pos' => $waitPos,
335 'seconds_waited' => $seconds,
336 ] )
337 );
338 // Remember that this position was reached to save queries next time
339 $this->lastKnownReplicaPos = $pos;
340 }
341
342 return $status;
343 }
344
351 public function getReplicaPos( IDatabase $conn ) {
352 $now = microtime( true ); // as-of-time *before* fetching GTID variables
353
354 if ( $this->useGTIDs() ) {
355 // Try to use GTIDs, fallbacking to binlog positions if not possible
356 $data = $this->getServerGTIDs( $conn, __METHOD__ );
357 // Use gtid_slave_pos for MariaDB and gtid_executed for MySQL
358 foreach ( [ 'gtid_slave_pos', 'gtid_executed' ] as $name ) {
359 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
360 return new MySQLPrimaryPos( $data[$name], $now );
361 }
362 }
363 }
364
365 $data = $this->getServerRoleStatus( $conn, 'SLAVE', __METHOD__ );
366 if ( $data && strlen( $data['Relay_Master_Log_File'] ) ) {
367 return new MySQLPrimaryPos(
368 "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
369 $now
370 );
371 }
372
373 return false;
374 }
375
382 public function getPrimaryPos( IDatabase $conn ) {
383 $now = microtime( true ); // as-of-time *before* fetching GTID variables
384
385 $pos = false;
386 if ( $this->useGTIDs() ) {
387 // Try to use GTIDs, fallbacking to binlog positions if not possible
388 $data = $this->getServerGTIDs( $conn, __METHOD__ );
389 // Use gtid_binlog_pos for MariaDB and gtid_executed for MySQL
390 foreach ( [ 'gtid_binlog_pos', 'gtid_executed' ] as $name ) {
391 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
392 $pos = new MySQLPrimaryPos( $data[$name], $now );
393 break;
394 }
395 }
396 // Filter domains that are inactive or not relevant to the session
397 if ( $pos ) {
398 $pos->setActiveOriginServerId( $this->getServerId( $conn ) );
399 $pos->setActiveOriginServerUUID( $this->getServerUUID( $conn ) );
400 if ( isset( $data['gtid_domain_id'] ) ) {
401 $pos->setActiveDomain( $data['gtid_domain_id'] );
402 }
403 }
404 }
405
406 if ( !$pos ) {
407 $data = $this->getServerRoleStatus( $conn, 'MASTER', __METHOD__ );
408 if ( $data && strlen( $data['File'] ) ) {
409 $pos = new MySQLPrimaryPos( "{$data['File']}/{$data['Position']}", $now );
410 }
411 }
412
413 return $pos;
414 }
415
421 public function getTopologyBasedServerId( IDatabase $conn ) {
422 return $this->getServerId( $conn );
423 }
424
430 protected function getServerId( IDatabase $conn ) {
431 $fname = __METHOD__;
432 return $this->srvCache->getWithSetCallback(
433 $this->srvCache->makeGlobalKey( 'mysql-server-id', $conn->getServerName() ),
434 self::SERVER_ID_CACHE_TTL,
435 static function () use ( $conn, $fname ) {
436 $query = new Query(
437 "SELECT @@server_id AS id",
438 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
439 'SELECT',
440 null,
441 "SELECT @@server_id AS id"
442 );
443 $res = $conn->query( $query, $fname );
444
445 return $res->fetchObject()->id;
446 }
447 );
448 }
449
455 protected function getServerUUID( IDatabase $conn ) {
456 $fname = __METHOD__;
457 return $this->srvCache->getWithSetCallback(
458 $this->srvCache->makeGlobalKey( 'mysql-server-uuid', $conn->getServerName() ),
459 self::SERVER_ID_CACHE_TTL,
460 static function () use ( $conn, $fname ) {
461 $query = new Query(
462 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'",
463 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
464 'SHOW',
465 null,
466 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'"
467 );
468 $res = $conn->query( $query, $fname );
469 $row = $res->fetchObject();
470
471 return $row ? $row->Value : null;
472 }
473 );
474 }
475
481 protected function getServerGTIDs( IDatabase $conn, $fname = __METHOD__ ) {
482 $map = [];
483
484 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
485
486 // Get global-only variables like gtid_executed
487 $query = new Query(
488 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'",
489 $flags,
490 'SHOW',
491 null,
492 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'"
493 );
494 $res = $conn->query( $query, $fname );
495 foreach ( $res as $row ) {
496 $map[$row->Variable_name] = $row->Value;
497 }
498 // Get session-specific (e.g. gtid_domain_id since that is were writes will log)
499 $query = new Query(
500 "SHOW SESSION VARIABLES LIKE 'gtid_%'",
501 $flags,
502 'SHOW',
503 null,
504 "SHOW SESSION VARIABLES LIKE 'gtid_%'"
505 );
506 $res = $conn->query( $query, $fname );
507 foreach ( $res as $row ) {
508 $map[$row->Variable_name] = $row->Value;
509 }
510
511 return $map;
512 }
513
520 protected function getServerRoleStatus( IDatabase $conn, $role, $fname = __METHOD__ ) {
521 $query = new Query(
522 "SHOW $role STATUS",
523 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
524 'SHOW',
525 null,
526 "SHOW $role STATUS"
527 );
528 $res = $conn->query( $query, $fname );
529 $row = $res ? $res->fetchRow() : false;
530
531 return ( $row ?: null );
532 }
533
534}
DBPrimaryPos implementation for MySQL and MariaDB.
Holds information on Query to be executed.
Definition Query.php:31
primaryPosWait(IDatabase $conn, DBPrimaryPos $pos, $timeout)
doGetLag(IDatabase $conn)
Get the amount of replication lag for this database server.
getApproximateLagStatus(IDatabase $conn)
Get a replica DB lag estimate for this server at the start of a transaction.
getPrimaryPos(IDatabase $conn)
Get the position of the primary DB from SHOW MASTER STATUS.
getReplicaPos(IDatabase $conn)
Get the position of the primary DB from SHOW SLAVE STATUS.
string $lagDetectionMethod
Method to detect replica DB lag.
getTopologyBasedServerId(IDatabase $conn)
string|null 32 bit integer ID; null if not applicable or unknown
getServerRoleStatus(IDatabase $conn, $role, $fname=__METHOD__)
__construct( $topologyRole, $logger, $srvCache, $lagDetectionMethod, $lagDetectionOptions, $useGTIDs)
getLogContext(IDatabase $conn, array $extras=[])
Create a log context to pass to PSR-3 logger functions.
string $topologyRole
Replication topology role of the server; one of the class ROLE_* constants.
getRecordedTransactionLagStatus(IDatabase $conn)
Get the replica DB lag when the current transaction started.
An object representing a primary or replica DB position in a replicated setup.
addQuotes( $s)
Escape and quote a raw value string for use in a SQL query.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:36
query( $sql, $fname=__METHOD__, $flags=0)
Run an SQL query statement and return the result.
getServerName()
Get the readable name for the server.
selectRow( $table, $vars, $conds, $fname=__METHOD__, $options=[], $join_conds=[])
Wrapper to IDatabase::select() that only fetches one row (via LIMIT)
Interface for query language.
makeList(array $a, $mode=self::LIST_COMMA)
Makes an encoded list of strings from an array.