MediaWiki  master
MysqlReplicationReporter.php
Go to the documentation of this file.
1 <?php
21 
22 use InvalidArgumentException;
23 use RuntimeException;
24 use stdClass;
30 
42  protected $lagDetectionOptions = [];
44  protected $useGTIDs = false;
46  private $replicationInfoRow;
47  // Cache getServerId() for 24 hours
48  private const SERVER_ID_CACHE_TTL = 86400;
49 
51  private const LAG_STALE_WARN_THRESHOLD = 0.100;
52 
53  public function __construct(
55  $logger,
56  $srvCache,
59  $useGTIDs
60  ) {
61  parent::__construct( $topologyRole, $logger, $srvCache );
62  $this->lagDetectionMethod = $lagDetectionMethod;
63  $this->lagDetectionOptions = $lagDetectionOptions;
64  $this->useGTIDs = $useGTIDs;
65  }
66 
67  protected function doGetLag( IDatabase $conn ) {
68  if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
69  return $this->getLagFromPtHeartbeat( $conn );
70  } else {
71  return $this->getLagFromSlaveStatus( $conn );
72  }
73  }
74 
79  protected function getLagFromSlaveStatus( IDatabase $conn ) {
80  $res = $conn->query(
81  'SHOW SLAVE STATUS',
82  __METHOD__,
83  ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE
84  );
85  $row = $res ? $res->fetchObject() : false;
86  // If the server is not replicating, there will be no row
87  if ( $row && strval( $row->Seconds_Behind_Master ) !== '' ) {
88  // https://mariadb.com/kb/en/delayed-replication/
89  // https://dev.mysql.com/doc/refman/5.6/en/replication-delayed.html
90  return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
91  }
92 
93  return false;
94  }
95 
100  protected function getLagFromPtHeartbeat( IDatabase $conn ) {
101  $currentTrxInfo = $this->getRecordedTransactionLagStatus( $conn );
102  if ( $currentTrxInfo ) {
103  // There is an active transaction and the initial lag was already queried
104  $staleness = microtime( true ) - $currentTrxInfo['since'];
105  if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
106  // Avoid returning higher and higher lag value due to snapshot age
107  // given that the isolation level will typically be REPEATABLE-READ
108  // but UTC_TIMESTAMP() is not affected by point-in-time snapshots
109  $this->logger->warning(
110  "Using cached lag value for {db_server} due to active transaction",
111  $this->getLogContext( $conn, [
112  'method' => __METHOD__,
113  'age' => $staleness,
114  'exception' => new RuntimeException()
115  ] )
116  );
117  }
118 
119  return $currentTrxInfo['lag'];
120  }
121 
122  $ago = $this->fetchSecondsSinceHeartbeat( $conn );
123  if ( $ago !== null ) {
124  return max( $ago, 0.0 );
125  }
126 
127  $this->logger->error(
128  "Unable to find pt-heartbeat row for {db_server}",
129  $this->getLogContext( $conn, [
130  'method' => __METHOD__
131  ] )
132  );
133 
134  return false;
135  }
136 
142  protected function fetchSecondsSinceHeartbeat( IDatabase $conn ) {
143  // Some setups might have pt-heartbeat running on each replica server.
144  // Exclude the row for events originating on this DB server. Assume that
145  // there is only one active replication channel and that any other row
146  // getting updates must be the row for the primary DB server.
147  $where = $conn->makeList(
148  $this->lagDetectionOptions['conds'] ?? [ 'server_id != @@server_id' ],
150  );
151  // User mysql server time so that query time and trip time are not counted.
152  // Use ORDER BY for channel based queries since that field might not be UNIQUE.
153  $res = $conn->query(
154  "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
155  "FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
156  __METHOD__,
157  ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE
158  );
159  $row = $res ? $res->fetchObject() : false;
160 
161  return $row ? ( $row->us_ago / 1e6 ) : null;
162  }
163 
164  public function getApproximateLagStatus( IDatabase $conn ) {
165  if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
166  // Disable caching since this is fast enough and we don't want
167  // to be *too* pessimistic by having both the cache TTL and the
168  // pt-heartbeat interval count as lag in getSessionLagStatus()
169  return parent::getApproximateLagStatus( $conn );
170  }
171 
172  $key = $this->srvCache->makeGlobalKey( 'mysql-lag', $conn->getServerName() );
173  $approxLag = $this->srvCache->get( $key );
174  if ( !$approxLag ) {
175  $approxLag = parent::getApproximateLagStatus( $conn );
176  $this->srvCache->set( $key, $approxLag, 1 );
177  }
178 
179  return $approxLag;
180  }
181 
186  public function getReplicationSafetyInfo( IDatabase $conn ) {
187  if ( $this->replicationInfoRow === null ) {
188  $this->replicationInfoRow = $conn->selectRow(
189  [],
190  [
191  'innodb_autoinc_lock_mode' => '@@innodb_autoinc_lock_mode',
192  'binlog_format' => '@@binlog_format',
193  ],
194  [],
195  __METHOD__
196  );
197  }
198  return $this->replicationInfoRow;
199  }
200 
204  protected function useGTIDs() {
205  return $this->useGTIDs;
206  }
207 
208  public function primaryPosWait( IDatabase $conn, DBPrimaryPos $pos, $timeout ) {
209  if ( !( $pos instanceof MySQLPrimaryPos ) ) {
210  throw new InvalidArgumentException( "Position not an instance of MySQLPrimaryPos" );
211  }
212 
213  if ( $this->topologyRole === IDatabase::ROLE_STATIC_CLONE ) {
214  $this->logger->debug(
215  "Bypassed replication wait; database has a static dataset",
216  $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
217  );
218 
219  return 0; // this is a copy of a read-only dataset with no primary DB
220  } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
221  $this->logger->debug(
222  "Bypassed replication wait; replication known to have reached {raw_pos}",
223  $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
224  );
225 
226  return 0; // already reached this point for sure
227  }
228 
229  // Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
230  if ( $pos->getGTIDs() ) {
231  // Get the GTIDs from this replica server too see the domains (channels)
232  $refPos = $this->getReplicaPos( $conn );
233  if ( !$refPos ) {
234  $this->logger->error(
235  "Could not get replication position on replica DB to compare to {raw_pos}",
236  $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
237  );
238 
239  return -1; // this is the primary DB itself?
240  }
241  // GTIDs with domains (channels) that are active and are present on the replica
242  $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
243  if ( !$gtidsWait ) {
244  $this->logger->error(
245  "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
246  $this->getLogContext( $conn, [
247  'method' => __METHOD__,
248  'raw_pos' => $pos,
249  'current_pos' => $refPos
250  ] )
251  );
252 
253  return -1; // $pos is from the wrong cluster?
254  }
255  // Wait on the GTID set
256  $gtidArg = $conn->addQuotes( implode( ',', $gtidsWait ) );
257  if ( strpos( $gtidArg, ':' ) !== false ) {
258  // MySQL GTIDs, e.g "source_id:transaction_id"
259  $sql = "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)";
260  } else {
261  // MariaDB GTIDs, e.g."domain:server:sequence"
262  $sql = "SELECT MASTER_GTID_WAIT($gtidArg, $timeout)";
263  }
264  $waitPos = implode( ',', $gtidsWait );
265  } else {
266  // Wait on the binlog coordinates
267  $encFile = $conn->addQuotes( $pos->getLogFile() );
268  // @phan-suppress-next-line PhanTypeArraySuspiciousNullable
269  $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
270  $sql = "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)";
271  $waitPos = $pos->__toString();
272  }
273 
274  $start = microtime( true );
275  $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
276  $res = $conn->query( $sql, __METHOD__, $flags );
277  $row = $res->fetchRow();
278  $seconds = max( microtime( true ) - $start, 0 );
279 
280  // Result can be NULL (error), -1 (timeout), or 0+ per the MySQL manual
281  $status = ( $row[0] !== null ) ? intval( $row[0] ) : null;
282  if ( $status === null ) {
283  $this->logger->error(
284  "An error occurred while waiting for replication to reach {wait_pos}",
285  $this->getLogContext( $conn, [
286  'raw_pos' => $pos,
287  'wait_pos' => $waitPos,
288  'sql' => $sql,
289  'seconds_waited' => $seconds,
290  'exception' => new RuntimeException()
291  ] )
292  );
293  } elseif ( $status < 0 ) {
294  $this->logger->error(
295  "Timed out waiting for replication to reach {wait_pos}",
296  $this->getLogContext( $conn, [
297  'raw_pos' => $pos,
298  'wait_pos' => $waitPos,
299  'timeout' => $timeout,
300  'sql' => $sql,
301  'seconds_waited' => $seconds,
302  'exception' => new RuntimeException()
303  ] )
304  );
305  } elseif ( $status >= 0 ) {
306  $this->logger->debug(
307  "Replication has reached {wait_pos}",
308  $this->getLogContext( $conn, [
309  'raw_pos' => $pos,
310  'wait_pos' => $waitPos,
311  'seconds_waited' => $seconds,
312  ] )
313  );
314  // Remember that this position was reached to save queries next time
315  $this->lastKnownReplicaPos = $pos;
316  }
317 
318  return $status;
319  }
320 
327  public function getReplicaPos( IDatabase $conn ) {
328  $now = microtime( true ); // as-of-time *before* fetching GTID variables
329 
330  if ( $this->useGTIDs() ) {
331  // Try to use GTIDs, fallbacking to binlog positions if not possible
332  $data = $this->getServerGTIDs( $conn, __METHOD__ );
333  // Use gtid_slave_pos for MariaDB and gtid_executed for MySQL
334  foreach ( [ 'gtid_slave_pos', 'gtid_executed' ] as $name ) {
335  if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
336  return new MySQLPrimaryPos( $data[$name], $now );
337  }
338  }
339  }
340 
341  $data = $this->getServerRoleStatus( $conn, 'SLAVE', __METHOD__ );
342  if ( $data && strlen( $data['Relay_Master_Log_File'] ) ) {
343  return new MySQLPrimaryPos(
344  "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
345  $now
346  );
347  }
348 
349  return false;
350  }
351 
358  public function getPrimaryPos( IDatabase $conn ) {
359  $now = microtime( true ); // as-of-time *before* fetching GTID variables
360 
361  $pos = false;
362  if ( $this->useGTIDs() ) {
363  // Try to use GTIDs, fallbacking to binlog positions if not possible
364  $data = $this->getServerGTIDs( $conn, __METHOD__ );
365  // Use gtid_binlog_pos for MariaDB and gtid_executed for MySQL
366  foreach ( [ 'gtid_binlog_pos', 'gtid_executed' ] as $name ) {
367  if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
368  $pos = new MySQLPrimaryPos( $data[$name], $now );
369  break;
370  }
371  }
372  // Filter domains that are inactive or not relevant to the session
373  if ( $pos ) {
374  $pos->setActiveOriginServerId( $this->getServerId( $conn ) );
375  $pos->setActiveOriginServerUUID( $this->getServerUUID( $conn ) );
376  if ( isset( $data['gtid_domain_id'] ) ) {
377  $pos->setActiveDomain( $data['gtid_domain_id'] );
378  }
379  }
380  }
381 
382  if ( !$pos ) {
383  $data = $this->getServerRoleStatus( $conn, 'MASTER', __METHOD__ );
384  if ( $data && strlen( $data['File'] ) ) {
385  $pos = new MySQLPrimaryPos( "{$data['File']}/{$data['Position']}", $now );
386  }
387  }
388 
389  return $pos;
390  }
391 
397  public function getTopologyBasedServerId( IDatabase $conn ) {
398  return $this->getServerId( $conn );
399  }
400 
406  protected function getServerId( IDatabase $conn ) {
407  $fname = __METHOD__;
408  return $this->srvCache->getWithSetCallback(
409  $this->srvCache->makeGlobalKey( 'mysql-server-id', $conn->getServerName() ),
410  self::SERVER_ID_CACHE_TTL,
411  static function () use ( $conn, $fname ) {
412  $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
413  $res = $conn->query( "SELECT @@server_id AS id", $fname, $flags );
414 
415  return $res->fetchObject()->id;
416  }
417  );
418  }
419 
425  protected function getServerUUID( IDatabase $conn ) {
426  $fname = __METHOD__;
427  return $this->srvCache->getWithSetCallback(
428  $this->srvCache->makeGlobalKey( 'mysql-server-uuid', $conn->getServerName() ),
429  self::SERVER_ID_CACHE_TTL,
430  static function () use ( $conn, $fname ) {
431  $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
432  $res = $conn->query( "SHOW GLOBAL VARIABLES LIKE 'server_uuid'", $fname, $flags );
433  $row = $res->fetchObject();
434 
435  return $row ? $row->Value : null;
436  }
437  );
438  }
439 
445  protected function getServerGTIDs( IDatabase $conn, $fname = __METHOD__ ) {
446  $map = [];
447 
448  $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
449 
450  // Get global-only variables like gtid_executed
451  $res = $conn->query( "SHOW GLOBAL VARIABLES LIKE 'gtid_%'", $fname, $flags );
452  foreach ( $res as $row ) {
453  $map[$row->Variable_name] = $row->Value;
454  }
455  // Get session-specific (e.g. gtid_domain_id since that is were writes will log)
456  $res = $conn->query( "SHOW SESSION VARIABLES LIKE 'gtid_%'", $fname, $flags );
457  foreach ( $res as $row ) {
458  $map[$row->Variable_name] = $row->Value;
459  }
460 
461  return $map;
462  }
463 
470  protected function getServerRoleStatus( IDatabase $conn, $role, $fname = __METHOD__ ) {
471  $flags = ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX |
472  ISQLPlatform::QUERY_CHANGE_NONE;
473  $res = $conn->query( "SHOW $role STATUS", $fname, $flags );
474  $row = $res ? $res->fetchRow() : false;
475 
476  return ( $row ?: null );
477  }
478 
479 }
const LIST_AND
Definition: Defines.php:43
DBPrimaryPos implementation for MySQL and MariaDB.
array $lagDetectionOptions
Method to detect replica DB lag.
primaryPosWait(IDatabase $conn, DBPrimaryPos $pos, $timeout)
doGetLag(IDatabase $conn)
Get the amount of replication lag for this database server.
getApproximateLagStatus(IDatabase $conn)
Get a replica DB lag estimate for this server at the start of a transaction.
getPrimaryPos(IDatabase $conn)
Get the position of the primary DB from SHOW MASTER STATUS.
getReplicaPos(IDatabase $conn)
Get the position of the primary DB from SHOW SLAVE STATUS.
string $lagDetectionMethod
Method to detect replica DB lag.
getTopologyBasedServerId(IDatabase $conn)
To make queries string|null 32 bit integer ID; null if not applicable or unknown
getServerRoleStatus(IDatabase $conn, $role, $fname=__METHOD__)
__construct( $topologyRole, $logger, $srvCache, $lagDetectionMethod, $lagDetectionOptions, $useGTIDs)
getLogContext(IDatabase $conn, array $extras=[])
Create a log context to pass to PSR-3 logger functions.
string $topologyRole
Replication topology role of the server; one of the class ROLE_* constants.
getRecordedTransactionLagStatus(IDatabase $conn)
Get the replica DB lag when the current transaction started.
An object representing a primary or replica DB position in a replicated setup.
addQuotes( $s)
Escape and quote a raw value string for use in a SQL query.
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:36
query( $sql, $fname=__METHOD__, $flags=0)
Run an SQL query statement and return the result.
getServerName()
Get the readable name for the server.
selectRow( $table, $vars, $conds, $fname=__METHOD__, $options=[], $join_conds=[])
Wrapper to IDatabase::select() that only fetches one row (via LIMIT)
Interface for query language.
makeList(array $a, $mode=self::LIST_COMMA)
Makes an encoded list of strings from an array.