MediaWiki REL1_40
MysqlReplicationReporter.php
Go to the documentation of this file.
1<?php
21
22use InvalidArgumentException;
23use RuntimeException;
24use stdClass;
30
42 protected $lagDetectionOptions = [];
44 protected $useGTIDs = false;
46 private $replicationInfoRow;
47 // Cache getServerId() for 24 hours
48 private const SERVER_ID_CACHE_TTL = 86400;
49
51 private const LAG_STALE_WARN_THRESHOLD = 0.100;
52
53 public function __construct(
55 $logger,
60 ) {
61 parent::__construct( $topologyRole, $logger, $srvCache );
62 $this->lagDetectionMethod = $lagDetectionMethod;
63 $this->lagDetectionOptions = $lagDetectionOptions;
64 $this->useGTIDs = $useGTIDs;
65 }
66
67 protected function doGetLag( IDatabase $conn ) {
68 if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
69 return $this->getLagFromPtHeartbeat( $conn );
70 } else {
71 return $this->getLagFromSlaveStatus( $conn );
72 }
73 }
74
79 protected function getLagFromSlaveStatus( IDatabase $conn ) {
80 $res = $conn->query(
81 'SHOW SLAVE STATUS',
82 __METHOD__,
83 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE
84 );
85 $row = $res ? $res->fetchObject() : false;
86 // If the server is not replicating, there will be no row
87 if ( $row && strval( $row->Seconds_Behind_Master ) !== '' ) {
88 // https://mariadb.com/kb/en/delayed-replication/
89 // https://dev.mysql.com/doc/refman/5.6/en/replication-delayed.html
90 return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
91 }
92
93 return false;
94 }
95
100 protected function getLagFromPtHeartbeat( IDatabase $conn ) {
101 $currentTrxInfo = $this->getRecordedTransactionLagStatus( $conn );
102 if ( $currentTrxInfo ) {
103 // There is an active transaction and the initial lag was already queried
104 $staleness = microtime( true ) - $currentTrxInfo['since'];
105 if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
106 // Avoid returning higher and higher lag value due to snapshot age
107 // given that the isolation level will typically be REPEATABLE-READ
108 // but UTC_TIMESTAMP() is not affected by point-in-time snapshots
109 $this->logger->warning(
110 "Using cached lag value for {db_server} due to active transaction",
111 $this->getLogContext( $conn, [
112 'method' => __METHOD__,
113 'age' => $staleness,
114 'exception' => new RuntimeException()
115 ] )
116 );
117 }
118
119 return $currentTrxInfo['lag'];
120 }
121
122 $ago = $this->fetchSecondsSinceHeartbeat( $conn );
123 if ( $ago !== null ) {
124 return max( $ago, 0.0 );
125 }
126
127 $this->logger->error(
128 "Unable to find pt-heartbeat row for {db_server}",
129 $this->getLogContext( $conn, [
130 'method' => __METHOD__
131 ] )
132 );
133
134 return false;
135 }
136
142 protected function fetchSecondsSinceHeartbeat( IDatabase $conn ) {
143 // Some setups might have pt-heartbeat running on each replica server.
144 // Exclude the row for events originating on this DB server. Assume that
145 // there is only one active replication channel and that any other row
146 // getting updates must be the row for the primary DB server.
147 $where = $conn->makeList(
148 $this->lagDetectionOptions['conds'] ?? [ 'server_id != @@server_id' ],
149 ISQLPlatform::LIST_AND
150 );
151 // User mysql server time so that query time and trip time are not counted.
152 // Use ORDER BY for channel based queries since that field might not be UNIQUE.
153 $res = $conn->query(
154 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
155 "FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
156 __METHOD__,
157 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE
158 );
159 $row = $res ? $res->fetchObject() : false;
160
161 return $row ? ( $row->us_ago / 1e6 ) : null;
162 }
163
164 public function getApproximateLagStatus( IDatabase $conn ) {
165 if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
166 // Disable caching since this is fast enough and we don't want
167 // to be *too* pessimistic by having both the cache TTL and the
168 // pt-heartbeat interval count as lag in getSessionLagStatus()
169 return parent::getApproximateLagStatus( $conn );
170 }
171
172 $key = $this->srvCache->makeGlobalKey( 'mysql-lag', $conn->getServerName() );
173 $approxLag = $this->srvCache->get( $key );
174 if ( !$approxLag ) {
175 $approxLag = parent::getApproximateLagStatus( $conn );
176 $this->srvCache->set( $key, $approxLag, 1 );
177 }
178
179 return $approxLag;
180 }
181
186 public function getReplicationSafetyInfo( IDatabase $conn ) {
187 if ( $this->replicationInfoRow === null ) {
188 $this->replicationInfoRow = $conn->selectRow(
189 [],
190 [
191 'innodb_autoinc_lock_mode' => '@@innodb_autoinc_lock_mode',
192 'binlog_format' => '@@binlog_format',
193 ],
194 [],
195 __METHOD__
196 );
197 }
198 return $this->replicationInfoRow;
199 }
200
204 protected function useGTIDs() {
205 return $this->useGTIDs;
206 }
207
208 public function primaryPosWait( IDatabase $conn, DBPrimaryPos $pos, $timeout ) {
209 if ( !( $pos instanceof MySQLPrimaryPos ) ) {
210 throw new InvalidArgumentException( "Position not an instance of MySQLPrimaryPos" );
211 }
212
213 if ( $this->topologyRole === IDatabase::ROLE_STATIC_CLONE ) {
214 $this->logger->debug(
215 "Bypassed replication wait; database has a static dataset",
216 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
217 );
218
219 return 0; // this is a copy of a read-only dataset with no primary DB
220 } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
221 $this->logger->debug(
222 "Bypassed replication wait; replication known to have reached {raw_pos}",
223 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
224 );
225
226 return 0; // already reached this point for sure
227 }
228
229 // Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
230 if ( $pos->getGTIDs() ) {
231 // Get the GTIDs from this replica server too see the domains (channels)
232 $refPos = $this->getReplicaPos( $conn );
233 if ( !$refPos ) {
234 $this->logger->error(
235 "Could not get replication position on replica DB to compare to {raw_pos}",
236 $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
237 );
238
239 return -1; // this is the primary DB itself?
240 }
241 // GTIDs with domains (channels) that are active and are present on the replica
242 $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
243 if ( !$gtidsWait ) {
244 $this->logger->error(
245 "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
246 $this->getLogContext( $conn, [
247 'method' => __METHOD__,
248 'raw_pos' => $pos,
249 'current_pos' => $refPos
250 ] )
251 );
252
253 return -1; // $pos is from the wrong cluster?
254 }
255 // Wait on the GTID set
256 $gtidArg = $conn->addQuotes( implode( ',', $gtidsWait ) );
257 if ( strpos( $gtidArg, ':' ) !== false ) {
258 // MySQL GTIDs, e.g "source_id:transaction_id"
259 $sql = "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)";
260 } else {
261 // MariaDB GTIDs, e.g."domain:server:sequence"
262 $sql = "SELECT MASTER_GTID_WAIT($gtidArg, $timeout)";
263 }
264 $waitPos = implode( ',', $gtidsWait );
265 } else {
266 // Wait on the binlog coordinates
267 $encFile = $conn->addQuotes( $pos->getLogFile() );
268 // @phan-suppress-next-line PhanTypeArraySuspiciousNullable
269 $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
270 $sql = "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)";
271 $waitPos = $pos->__toString();
272 }
273
274 $start = microtime( true );
275 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
276 $res = $conn->query( $sql, __METHOD__, $flags );
277 $row = $res->fetchRow();
278 $seconds = max( microtime( true ) - $start, 0 );
279
280 // Result can be NULL (error), -1 (timeout), or 0+ per the MySQL manual
281 $status = ( $row[0] !== null ) ? intval( $row[0] ) : null;
282 if ( $status === null ) {
283 $this->logger->error(
284 "An error occurred while waiting for replication to reach {wait_pos}",
285 $this->getLogContext( $conn, [
286 'raw_pos' => $pos,
287 'wait_pos' => $waitPos,
288 'sql' => $sql,
289 'seconds_waited' => $seconds,
290 'exception' => new RuntimeException()
291 ] )
292 );
293 } elseif ( $status < 0 ) {
294 $this->logger->error(
295 "Timed out waiting for replication to reach {wait_pos}",
296 $this->getLogContext( $conn, [
297 'raw_pos' => $pos,
298 'wait_pos' => $waitPos,
299 'timeout' => $timeout,
300 'sql' => $sql,
301 'seconds_waited' => $seconds,
302 'exception' => new RuntimeException()
303 ] )
304 );
305 } elseif ( $status >= 0 ) {
306 $this->logger->debug(
307 "Replication has reached {wait_pos}",
308 $this->getLogContext( $conn, [
309 'raw_pos' => $pos,
310 'wait_pos' => $waitPos,
311 'seconds_waited' => $seconds,
312 ] )
313 );
314 // Remember that this position was reached to save queries next time
315 $this->lastKnownReplicaPos = $pos;
316 }
317
318 return $status;
319 }
320
327 public function getReplicaPos( IDatabase $conn ) {
328 $now = microtime( true ); // as-of-time *before* fetching GTID variables
329
330 if ( $this->useGTIDs() ) {
331 // Try to use GTIDs, fallbacking to binlog positions if not possible
332 $data = $this->getServerGTIDs( $conn, __METHOD__ );
333 // Use gtid_slave_pos for MariaDB and gtid_executed for MySQL
334 foreach ( [ 'gtid_slave_pos', 'gtid_executed' ] as $name ) {
335 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
336 return new MySQLPrimaryPos( $data[$name], $now );
337 }
338 }
339 }
340
341 $data = $this->getServerRoleStatus( $conn, 'SLAVE', __METHOD__ );
342 if ( $data && strlen( $data['Relay_Master_Log_File'] ) ) {
343 return new MySQLPrimaryPos(
344 "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
345 $now
346 );
347 }
348
349 return false;
350 }
351
358 public function getPrimaryPos( IDatabase $conn ) {
359 $now = microtime( true ); // as-of-time *before* fetching GTID variables
360
361 $pos = false;
362 if ( $this->useGTIDs() ) {
363 // Try to use GTIDs, fallbacking to binlog positions if not possible
364 $data = $this->getServerGTIDs( $conn, __METHOD__ );
365 // Use gtid_binlog_pos for MariaDB and gtid_executed for MySQL
366 foreach ( [ 'gtid_binlog_pos', 'gtid_executed' ] as $name ) {
367 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
368 $pos = new MySQLPrimaryPos( $data[$name], $now );
369 break;
370 }
371 }
372 // Filter domains that are inactive or not relevant to the session
373 if ( $pos ) {
374 $pos->setActiveOriginServerId( $this->getServerId( $conn ) );
375 $pos->setActiveOriginServerUUID( $this->getServerUUID( $conn ) );
376 if ( isset( $data['gtid_domain_id'] ) ) {
377 $pos->setActiveDomain( $data['gtid_domain_id'] );
378 }
379 }
380 }
381
382 if ( !$pos ) {
383 $data = $this->getServerRoleStatus( $conn, 'MASTER', __METHOD__ );
384 if ( $data && strlen( $data['File'] ) ) {
385 $pos = new MySQLPrimaryPos( "{$data['File']}/{$data['Position']}", $now );
386 }
387 }
388
389 return $pos;
390 }
391
397 public function getTopologyBasedServerId( IDatabase $conn ) {
398 return $this->getServerId( $conn );
399 }
400
406 protected function getServerId( IDatabase $conn ) {
407 $fname = __METHOD__;
408 return $this->srvCache->getWithSetCallback(
409 $this->srvCache->makeGlobalKey( 'mysql-server-id', $conn->getServerName() ),
410 self::SERVER_ID_CACHE_TTL,
411 static function () use ( $conn, $fname ) {
412 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
413 $res = $conn->query( "SELECT @@server_id AS id", $fname, $flags );
414
415 return $res->fetchObject()->id;
416 }
417 );
418 }
419
425 protected function getServerUUID( IDatabase $conn ) {
426 $fname = __METHOD__;
427 return $this->srvCache->getWithSetCallback(
428 $this->srvCache->makeGlobalKey( 'mysql-server-uuid', $conn->getServerName() ),
429 self::SERVER_ID_CACHE_TTL,
430 static function () use ( $conn, $fname ) {
431 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
432 $res = $conn->query( "SHOW GLOBAL VARIABLES LIKE 'server_uuid'", $fname, $flags );
433 $row = $res->fetchObject();
434
435 return $row ? $row->Value : null;
436 }
437 );
438 }
439
445 protected function getServerGTIDs( IDatabase $conn, $fname = __METHOD__ ) {
446 $map = [];
447
448 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
449
450 // Get global-only variables like gtid_executed
451 $res = $conn->query( "SHOW GLOBAL VARIABLES LIKE 'gtid_%'", $fname, $flags );
452 foreach ( $res as $row ) {
453 $map[$row->Variable_name] = $row->Value;
454 }
455 // Get session-specific (e.g. gtid_domain_id since that is were writes will log)
456 $res = $conn->query( "SHOW SESSION VARIABLES LIKE 'gtid_%'", $fname, $flags );
457 foreach ( $res as $row ) {
458 $map[$row->Variable_name] = $row->Value;
459 }
460
461 return $map;
462 }
463
470 protected function getServerRoleStatus( IDatabase $conn, $role, $fname = __METHOD__ ) {
471 $flags = ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX |
472 ISQLPlatform::QUERY_CHANGE_NONE;
473 $res = $conn->query( "SHOW $role STATUS", $fname, $flags );
474 $row = $res ? $res->fetchRow() : false;
475
476 return ( $row ?: null );
477 }
478
479}
DBPrimaryPos implementation for MySQL and MariaDB.
primaryPosWait(IDatabase $conn, DBPrimaryPos $pos, $timeout)
doGetLag(IDatabase $conn)
Get the amount of replication lag for this database server.
getApproximateLagStatus(IDatabase $conn)
Get a replica DB lag estimate for this server at the start of a transaction.
getPrimaryPos(IDatabase $conn)
Get the position of the primary DB from SHOW MASTER STATUS.
getReplicaPos(IDatabase $conn)
Get the position of the primary DB from SHOW SLAVE STATUS.
string $lagDetectionMethod
Method to detect replica DB lag.
getTopologyBasedServerId(IDatabase $conn)
string|null 32 bit integer ID; null if not applicable or unknown
getServerRoleStatus(IDatabase $conn, $role, $fname=__METHOD__)
__construct( $topologyRole, $logger, $srvCache, $lagDetectionMethod, $lagDetectionOptions, $useGTIDs)
getLogContext(IDatabase $conn, array $extras=[])
Create a log context to pass to PSR-3 logger functions.
string $topologyRole
Replication topology role of the server; one of the class ROLE_* constants.
getRecordedTransactionLagStatus(IDatabase $conn)
Get the replica DB lag when the current transaction started.
An object representing a primary or replica DB position in a replicated setup.
addQuotes( $s)
Escape and quote a raw value string for use in a SQL query.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:36
query( $sql, $fname=__METHOD__, $flags=0)
Run an SQL query statement and return the result.
getServerName()
Get the readable name for the server.
selectRow( $table, $vars, $conds, $fname=__METHOD__, $options=[], $join_conds=[])
Wrapper to IDatabase::select() that only fetches one row (via LIMIT)
Interface for query language.
makeList(array $a, $mode=self::LIST_COMMA)
Makes an encoded list of strings from an array.