MediaWiki master
MysqlReplicationReporter.php
Go to the documentation of this file.
1<?php
21
22use InvalidArgumentException;
23use RuntimeException;
24use stdClass;
31
43 protected $lagDetectionOptions = [];
45 protected $useGTIDs = false;
47 private $replicationInfoRow;
48 // Cache getServerId() for 24 hours
49 private const SERVER_ID_CACHE_TTL = 86400;
50
52 private const LAG_STALE_WARN_THRESHOLD = 0.100;
53
54 public function __construct(
56 $logger,
61 ) {
62 parent::__construct( $topologyRole, $logger, $srvCache );
63 $this->lagDetectionMethod = $lagDetectionMethod;
64 $this->lagDetectionOptions = $lagDetectionOptions;
65 $this->useGTIDs = $useGTIDs;
66 }
67
68 protected function doGetLag( IDatabase $conn ) {
69 if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
70 return $this->getLagFromPtHeartbeat( $conn );
71 } else {
72 return $this->getLagFromSlaveStatus( $conn );
73 }
74 }
75
80 protected function getLagFromSlaveStatus( IDatabase $conn ) {
81 $query = new Query(
82 'SHOW SLAVE STATUS',
83 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
84 'SHOW',
85 null,
86 'SHOW SLAVE STATUS'
87 );
88 $res = $conn->query( $query );
89 $row = $res ? $res->fetchObject() : false;
90 // If the server is not replicating, there will be no row
91 if ( $row && strval( $row->Seconds_Behind_Master ) !== '' ) {
92 // https://mariadb.com/kb/en/delayed-replication/
93 // https://dev.mysql.com/doc/refman/5.6/en/replication-delayed.html
94 return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
95 }
96
97 return false;
98 }
99
104 protected function getLagFromPtHeartbeat( IDatabase $conn ) {
105 $currentTrxInfo = $this->getRecordedTransactionLagStatus( $conn );
106 if ( $currentTrxInfo ) {
107 // There is an active transaction and the initial lag was already queried
108 $staleness = microtime( true ) - $currentTrxInfo['since'];
109 if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
110 // Avoid returning higher and higher lag value due to snapshot age
111 // given that the isolation level will typically be REPEATABLE-READ
112 // but UTC_TIMESTAMP() is not affected by point-in-time snapshots
113 $this->logger->warning(
114 "Using cached lag value for {db_server} due to active transaction",
115 $this->getLogContext( $conn, [
116 'method' => __METHOD__,
117 'age' => $staleness,
118 'exception' => new RuntimeException()
119 ] )
120 );
121 }
122
123 return $currentTrxInfo['lag'];
124 }
125
126 $ago = $this->fetchSecondsSinceHeartbeat( $conn );
127 if ( $ago !== null ) {
128 return max( $ago, 0.0 );
129 }
130
131 $this->logger->error(
132 "Unable to find pt-heartbeat row for {db_server}",
133 $this->getLogContext( $conn, [
134 'method' => __METHOD__
135 ] )
136 );
137
138 return false;
139 }
140
146 protected function fetchSecondsSinceHeartbeat( IDatabase $conn ) {
147 // Some setups might have pt-heartbeat running on each replica server.
148 // Exclude the row for events originating on this DB server. Assume that
149 // there is only one active replication channel and that any other row
150 // getting updates must be the row for the primary DB server.
151 $where = $conn->makeList(
152 $this->lagDetectionOptions['conds'] ?? [ 'server_id != @@server_id' ],
153 ISQLPlatform::LIST_AND
154 );
155 // User mysql server time so that query time and trip time are not counted.
156 // Use ORDER BY for channel based queries since that field might not be UNIQUE.
157 $query = new Query(
158 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
159 "FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
160 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
161 'SELECT',
162 null,
163 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
164 "FROM heartbeat.heartbeat WHERE ? ORDER BY ts DESC LIMIT 1",
165 );
166 $res = $conn->query( $query );
167 $row = $res ? $res->fetchObject() : false;
168
169 return $row ? ( $row->us_ago / 1e6 ) : null;
170 }
171
172 public function getApproximateLagStatus( IDatabase $conn ) {
173 if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
174 // Disable caching since this is fast enough and we don't want
175 // to be *too* pessimistic by having both the cache TTL and the
176 // pt-heartbeat interval count as lag in getSessionLagStatus()
177 return parent::getApproximateLagStatus( $conn );
178 }
179
180 $key = $this->srvCache->makeGlobalKey( 'mysql-lag', $conn->getServerName() );
181 $approxLag = $this->srvCache->get( $key );
182 if ( !$approxLag ) {
183 $approxLag = parent::getApproximateLagStatus( $conn );
184 $this->srvCache->set( $key, $approxLag, 1 );
185 }
186
187 return $approxLag;
188 }
189
195 public function getReplicationSafetyInfo( IDatabase $conn, $fname = __METHOD__ ) {
196 if ( $this->replicationInfoRow === null ) {
197 $this->replicationInfoRow = $conn->selectRow(
198 [],
199 [
200 'innodb_autoinc_lock_mode' => '@@innodb_autoinc_lock_mode',
201 'binlog_format' => '@@binlog_format',
202 ],
203 [],
204 $fname
205 );
206 }
207 return $this->replicationInfoRow;
208 }
209
213 protected function useGTIDs() {
214 return $this->useGTIDs;
215 }
216
217 public function primaryPosWait( IDatabase $conn, DBPrimaryPos $pos, $timeout ) {
218 if ( !( $pos instanceof MySQLPrimaryPos ) ) {
219 throw new InvalidArgumentException( "Position not an instance of MySQLPrimaryPos" );
220 }
221
222 if ( $this->topologyRole === IDatabase::ROLE_STATIC_CLONE ) {
223 $this->logger->debug(
224 "Bypassed replication wait; database has a static dataset",
225 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
226 );
227
228 return 0; // this is a copy of a read-only dataset with no primary DB
229 } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
230 $this->logger->debug(
231 "Bypassed replication wait; replication known to have reached {raw_pos}",
232 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
233 );
234
235 return 0; // already reached this point for sure
236 }
237
238 // Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
239 if ( $pos->getGTIDs() ) {
240 // Get the GTIDs from this replica server too see the domains (channels)
241 $refPos = $this->getReplicaPos( $conn );
242 if ( !$refPos ) {
243 $this->logger->error(
244 "Could not get replication position on replica DB to compare to {raw_pos}",
245 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
246 );
247
248 return -1; // this is the primary DB itself?
249 }
250 // GTIDs with domains (channels) that are active and are present on the replica
251 $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
252 if ( !$gtidsWait ) {
253 $this->logger->error(
254 "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
255 $this->getLogContext( $conn, [
256 'method' => __METHOD__,
257 'raw_pos' => $pos,
258 'current_pos' => $refPos
259 ] )
260 );
261
262 return -1; // $pos is from the wrong cluster?
263 }
264 // Wait on the GTID set
265 $gtidArg = $conn->addQuotes( implode( ',', $gtidsWait ) );
266 if ( strpos( $gtidArg, ':' ) !== false ) {
267 // MySQL GTIDs, e.g "source_id:transaction_id"
268 $query = new Query(
269 "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)",
270 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
271 'SELECT',
272 null,
273 "SELECT WAIT_FOR_EXECUTED_GTID_SET(?, ?)"
274 );
275 } else {
276 // MariaDB GTIDs, e.g."domain:server:sequence"
277 $query = new Query(
278 "SELECT MASTER_GTID_WAIT($gtidArg, $timeout)",
279 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
280 'SELECT',
281 null,
282 "SELECT MASTER_GTID_WAIT(?, ?)"
283 );
284 }
285 $waitPos = implode( ',', $gtidsWait );
286 } else {
287 // Wait on the binlog coordinates
288 $encFile = $conn->addQuotes( $pos->getLogFile() );
289 // @phan-suppress-next-line PhanTypeArraySuspiciousNullable
290 $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
291 $query = new Query(
292 "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)",
293 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
294 'SELECT',
295 null,
296 "SELECT MASTER_POS_WAIT(?, ?, ?)"
297 );
298 $waitPos = $pos->__toString();
299 }
300
301 $start = microtime( true );
302 $res = $conn->query( $query, __METHOD__ );
303 $row = $res->fetchRow();
304 $seconds = max( microtime( true ) - $start, 0 );
305
306 // Result can be NULL (error), -1 (timeout), or 0+ per the MySQL manual
307 $status = ( $row[0] !== null ) ? intval( $row[0] ) : null;
308 if ( $status === null ) {
309 $this->logger->error(
310 "An error occurred while waiting for replication to reach {wait_pos}",
311 $this->getLogContext( $conn, [
312 'raw_pos' => $pos,
313 'wait_pos' => $waitPos,
314 'sql' => $query->getSQL(),
315 'seconds_waited' => $seconds,
316 'exception' => new RuntimeException()
317 ] )
318 );
319 } elseif ( $status < 0 ) {
320 $this->logger->info(
321 "Timed out waiting for replication to reach {wait_pos}",
322 $this->getLogContext( $conn, [
323 'raw_pos' => $pos,
324 'wait_pos' => $waitPos,
325 'timeout' => $timeout,
326 'sql' => $query->getSQL(),
327 'seconds_waited' => $seconds,
328 ] )
329 );
330 } elseif ( $status >= 0 ) {
331 $this->logger->debug(
332 "Replication has reached {wait_pos}",
333 $this->getLogContext( $conn, [
334 'raw_pos' => $pos,
335 'wait_pos' => $waitPos,
336 'seconds_waited' => $seconds,
337 ] )
338 );
339 // Remember that this position was reached to save queries next time
340 $this->lastKnownReplicaPos = $pos;
341 }
342
343 return $status;
344 }
345
352 public function getReplicaPos( IDatabase $conn ) {
353 $now = microtime( true ); // as-of-time *before* fetching GTID variables
354
355 if ( $this->useGTIDs() ) {
356 // Try to use GTIDs, fallbacking to binlog positions if not possible
357 $data = $this->getServerGTIDs( $conn, __METHOD__ );
358 // Use gtid_slave_pos for MariaDB and gtid_executed for MySQL
359 foreach ( [ 'gtid_slave_pos', 'gtid_executed' ] as $name ) {
360 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
361 return new MySQLPrimaryPos( $data[$name], $now );
362 }
363 }
364 }
365
366 $data = $this->getServerRoleStatus( $conn, 'SLAVE', __METHOD__ );
367 if ( $data && strlen( $data['Relay_Master_Log_File'] ) ) {
368 return new MySQLPrimaryPos(
369 "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
370 $now
371 );
372 }
373
374 return false;
375 }
376
383 public function getPrimaryPos( IDatabase $conn ) {
384 $now = microtime( true ); // as-of-time *before* fetching GTID variables
385
386 $pos = false;
387 if ( $this->useGTIDs() ) {
388 // Try to use GTIDs, fallbacking to binlog positions if not possible
389 $data = $this->getServerGTIDs( $conn, __METHOD__ );
390 // Use gtid_binlog_pos for MariaDB and gtid_executed for MySQL
391 foreach ( [ 'gtid_binlog_pos', 'gtid_executed' ] as $name ) {
392 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
393 $pos = new MySQLPrimaryPos( $data[$name], $now );
394 break;
395 }
396 }
397 // Filter domains that are inactive or not relevant to the session
398 if ( $pos ) {
399 $pos->setActiveOriginServerId( $this->getServerId( $conn ) );
400 $pos->setActiveOriginServerUUID( $this->getServerUUID( $conn ) );
401 if ( isset( $data['gtid_domain_id'] ) ) {
402 $pos->setActiveDomain( $data['gtid_domain_id'] );
403 }
404 }
405 }
406
407 if ( !$pos ) {
408 $data = $this->getServerRoleStatus( $conn, 'MASTER', __METHOD__ );
409 if ( $data && strlen( $data['File'] ) ) {
410 $pos = new MySQLPrimaryPos( "{$data['File']}/{$data['Position']}", $now );
411 }
412 }
413
414 return $pos;
415 }
416
422 protected function getServerId( IDatabase $conn ) {
423 $fname = __METHOD__;
424 return $this->srvCache->getWithSetCallback(
425 $this->srvCache->makeGlobalKey( 'mysql-server-id', $conn->getServerName() ),
426 self::SERVER_ID_CACHE_TTL,
427 static function () use ( $conn, $fname ) {
428 $query = new Query(
429 "SELECT @@server_id AS id",
430 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
431 'SELECT',
432 null,
433 "SELECT @@server_id AS id"
434 );
435 $res = $conn->query( $query, $fname );
436
437 return $res->fetchObject()->id;
438 }
439 );
440 }
441
447 protected function getServerUUID( IDatabase $conn ) {
448 $fname = __METHOD__;
449 return $this->srvCache->getWithSetCallback(
450 $this->srvCache->makeGlobalKey( 'mysql-server-uuid', $conn->getServerName() ),
451 self::SERVER_ID_CACHE_TTL,
452 static function () use ( $conn, $fname ) {
453 $query = new Query(
454 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'",
455 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
456 'SHOW',
457 null,
458 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'"
459 );
460 $res = $conn->query( $query, $fname );
461 $row = $res->fetchObject();
462
463 return $row ? $row->Value : null;
464 }
465 );
466 }
467
473 protected function getServerGTIDs( IDatabase $conn, $fname = __METHOD__ ) {
474 $map = [];
475
476 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
477
478 // Get global-only variables like gtid_executed
479 $query = new Query(
480 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'",
481 $flags,
482 'SHOW',
483 null,
484 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'"
485 );
486 $res = $conn->query( $query, $fname );
487 foreach ( $res as $row ) {
488 $map[$row->Variable_name] = $row->Value;
489 }
490 // Get session-specific (e.g. gtid_domain_id since that is were writes will log)
491 $query = new Query(
492 "SHOW SESSION VARIABLES LIKE 'gtid_%'",
493 $flags,
494 'SHOW',
495 null,
496 "SHOW SESSION VARIABLES LIKE 'gtid_%'"
497 );
498 $res = $conn->query( $query, $fname );
499 foreach ( $res as $row ) {
500 $map[$row->Variable_name] = $row->Value;
501 }
502
503 return $map;
504 }
505
512 protected function getServerRoleStatus( IDatabase $conn, $role, $fname = __METHOD__ ) {
513 $query = new Query(
514 "SHOW $role STATUS",
515 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
516 'SHOW',
517 null,
518 "SHOW $role STATUS"
519 );
520 $res = $conn->query( $query, $fname );
521 $row = $res ? $res->fetchRow() : false;
522
523 return ( $row ?: null );
524 }
525
526}
DBPrimaryPos implementation for MySQL and MariaDB.
Holds information on Query to be executed.
Definition Query.php:31
primaryPosWait(IDatabase $conn, DBPrimaryPos $pos, $timeout)
doGetLag(IDatabase $conn)
Get the amount of replication lag for this database server.
getApproximateLagStatus(IDatabase $conn)
Get a replica DB lag estimate for this server at the start of a transaction.
getPrimaryPos(IDatabase $conn)
Get the position of the primary DB from SHOW MASTER STATUS.
getReplicaPos(IDatabase $conn)
Get the position of the primary DB from SHOW SLAVE STATUS.
string $lagDetectionMethod
Method to detect replica DB lag.
getServerRoleStatus(IDatabase $conn, $role, $fname=__METHOD__)
__construct( $topologyRole, $logger, $srvCache, $lagDetectionMethod, $lagDetectionOptions, $useGTIDs)
getLogContext(IDatabase $conn, array $extras=[])
Create a log context to pass to PSR-3 logger functions.
string $topologyRole
Replication topology role of the server; one of the class ROLE_* constants.
getRecordedTransactionLagStatus(IDatabase $conn)
Get the replica DB lag when the current transaction started.
An object representing a primary or replica DB position in a replicated setup.
addQuotes( $s)
Escape and quote a raw value string for use in a SQL query.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:39
const ROLE_STATIC_CLONE
Replica server within a static dataset.
Definition IDatabase.php:84
query( $sql, $fname=__METHOD__, $flags=0)
Run an SQL query statement and return the result.
selectRow( $tables, $vars, $conds, $fname=__METHOD__, $options=[], $join_conds=[])
Wrapper to IDatabase::select() that only fetches one row (via LIMIT)
getServerName()
Get the readable name for the server.
Interface for query language.
makeList(array $a, $mode=self::LIST_COMMA)
Makes an encoded list of strings from an array.