22use InvalidArgumentException;
47 private $replicationInfoRow;
49 private const SERVER_ID_CACHE_TTL = 86400;
52 private const LAG_STALE_WARN_THRESHOLD = 0.100;
69 if ( $this->lagDetectionMethod ===
'pt-heartbeat' ) {
83 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
88 $res = $conn->
query( $query );
89 $row = $res ? $res->fetchObject() :
false;
91 if ( $row && strval( $row->Seconds_Behind_Master ) !==
'' ) {
94 return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
106 if ( $currentTrxInfo ) {
108 $staleness = microtime(
true ) - $currentTrxInfo[
'since'];
109 if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
113 $this->logger->warning(
114 "Using cached lag value for {db_server} due to active transaction",
116 'method' => __METHOD__,
118 'exception' =>
new RuntimeException()
123 return $currentTrxInfo[
'lag'];
127 if ( $ago !==
null ) {
128 return max( $ago, 0.0 );
131 $this->logger->error(
132 "Unable to find pt-heartbeat row for {db_server}",
134 'method' => __METHOD__
152 $this->lagDetectionOptions[
'conds'] ?? [
'server_id != @@server_id' ],
153 ISQLPlatform::LIST_AND
158 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
159 "FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
160 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
163 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
164 "FROM heartbeat.heartbeat WHERE ? ORDER BY ts DESC LIMIT 1",
166 $res = $conn->
query( $query );
167 $row = $res ? $res->fetchObject() :
false;
169 return $row ? ( $row->us_ago / 1e6 ) :
null;
173 if ( $this->lagDetectionMethod ===
'pt-heartbeat' ) {
177 return parent::getApproximateLagStatus( $conn );
180 $key = $this->srvCache->makeGlobalKey(
'mysql-lag', $conn->
getServerName() );
181 $approxLag = $this->srvCache->get( $key );
183 $approxLag = parent::getApproximateLagStatus( $conn );
184 $this->srvCache->set( $key, $approxLag, 1 );
195 if ( $this->replicationInfoRow ===
null ) {
196 $this->replicationInfoRow = $conn->
selectRow(
199 'innodb_autoinc_lock_mode' =>
'@@innodb_autoinc_lock_mode',
200 'binlog_format' =>
'@@binlog_format',
206 return $this->replicationInfoRow;
218 throw new InvalidArgumentException(
"Position not an instance of MySQLPrimaryPos" );
221 if ( $this->topologyRole === IDatabase::ROLE_STATIC_CLONE ) {
222 $this->logger->debug(
223 "Bypassed replication wait; database has a static dataset",
224 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
228 } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
229 $this->logger->debug(
230 "Bypassed replication wait; replication known to have reached {raw_pos}",
231 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
238 if ( $pos->getGTIDs() ) {
242 $this->logger->error(
243 "Could not get replication position on replica DB to compare to {raw_pos}",
244 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
250 $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
252 $this->logger->error(
253 "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
255 'method' => __METHOD__,
257 'current_pos' => $refPos
264 $gtidArg = $conn->
addQuotes( implode(
',', $gtidsWait ) );
265 if ( strpos( $gtidArg,
':' ) !==
false ) {
268 "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)",
269 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
272 "SELECT WAIT_FOR_EXECUTED_GTID_SET(?, ?)"
277 "SELECT MASTER_GTID_WAIT($gtidArg, $timeout)",
278 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
281 "SELECT MASTER_GTID_WAIT(?, ?)"
284 $waitPos = implode(
',', $gtidsWait );
287 $encFile = $conn->
addQuotes( $pos->getLogFile() );
289 $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
291 "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)",
292 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
295 "SELECT MASTER_POS_WAIT(?, ?, ?)"
300 $start = microtime(
true );
301 $res = $conn->
query( $query, __METHOD__ );
302 $row = $res->fetchRow();
303 $seconds = max( microtime(
true ) - $start, 0 );
306 $status = ( $row[0] !== null ) ? intval( $row[0] ) :
null;
307 if ( $status ===
null ) {
308 $this->logger->error(
309 "An error occurred while waiting for replication to reach {wait_pos}",
312 'wait_pos' => $waitPos,
313 'sql' => $query->getSQL(),
314 'seconds_waited' => $seconds,
315 'exception' =>
new RuntimeException()
318 } elseif ( $status < 0 ) {
320 "Timed out waiting for replication to reach {wait_pos}",
323 'wait_pos' => $waitPos,
324 'timeout' => $timeout,
325 'sql' => $query->getSQL(),
326 'seconds_waited' => $seconds,
329 } elseif ( $status >= 0 ) {
330 $this->logger->debug(
331 "Replication has reached {wait_pos}",
334 'wait_pos' => $waitPos,
335 'seconds_waited' => $seconds,
339 $this->lastKnownReplicaPos = $pos;
352 $now = microtime(
true );
358 foreach ( [
'gtid_slave_pos',
'gtid_executed' ] as $name ) {
359 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
366 if ( $data && strlen( $data[
'Relay_Master_Log_File'] ) ) {
368 "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
383 $now = microtime(
true );
390 foreach ( [
'gtid_binlog_pos',
'gtid_executed' ] as $name ) {
391 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
398 $pos->setActiveOriginServerId( $this->
getServerId( $conn ) );
399 $pos->setActiveOriginServerUUID( $this->
getServerUUID( $conn ) );
400 if ( isset( $data[
'gtid_domain_id'] ) ) {
401 $pos->setActiveDomain( $data[
'gtid_domain_id'] );
408 if ( $data && strlen( $data[
'File'] ) ) {
409 $pos =
new MySQLPrimaryPos(
"{$data['File']}/{$data['Position']}", $now );
432 return $this->srvCache->getWithSetCallback(
433 $this->srvCache->makeGlobalKey(
'mysql-server-id', $conn->
getServerName() ),
434 self::SERVER_ID_CACHE_TTL,
435 static function () use ( $conn, $fname ) {
437 "SELECT @@server_id AS id",
438 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
441 "SELECT @@server_id AS id"
443 $res = $conn->
query( $query, $fname );
445 return $res->fetchObject()->id;
457 return $this->srvCache->getWithSetCallback(
458 $this->srvCache->makeGlobalKey(
'mysql-server-uuid', $conn->
getServerName() ),
459 self::SERVER_ID_CACHE_TTL,
460 static function () use ( $conn, $fname ) {
462 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'",
463 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
466 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'"
468 $res = $conn->
query( $query, $fname );
469 $row = $res->fetchObject();
471 return $row ? $row->Value :
null;
484 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
488 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'",
492 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'"
494 $res = $conn->
query( $query, $fname );
495 foreach ( $res as $row ) {
496 $map[$row->Variable_name] = $row->Value;
500 "SHOW SESSION VARIABLES LIKE 'gtid_%'",
504 "SHOW SESSION VARIABLES LIKE 'gtid_%'"
506 $res = $conn->
query( $query, $fname );
507 foreach ( $res as $row ) {
508 $map[$row->Variable_name] = $row->Value;
523 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
528 $res = $conn->
query( $query, $fname );
529 $row = $res ? $res->fetchRow() :
false;
531 return ( $row ?: null );