8use InvalidArgumentException;
9use Psr\Log\LoggerInterface;
35 private $replicationInfoRow;
37 private const SERVER_ID_CACHE_TTL = 86400;
40 private const LAG_STALE_WARN_THRESHOLD = 0.100;
66 if ( $this->lagDetectionMethod ===
'pt-heartbeat' ) {
80 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
85 $res = $conn->
query( $query, __METHOD__ );
86 $row = $res ? $res->fetchObject() :
false;
88 if ( $row && strval( $row->Seconds_Behind_Master ) !==
'' ) {
91 return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
103 if ( $currentTrxInfo ) {
105 $staleness = microtime(
true ) - $currentTrxInfo[
'since'];
106 if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
110 $this->logger->warning(
111 "Using cached lag value for {db_server} due to active transaction",
113 'method' => __METHOD__,
115 'exception' =>
new RuntimeException()
120 return $currentTrxInfo[
'lag'];
124 if ( $ago !==
null ) {
125 return max( $ago, 0.0 );
128 $this->logger->error(
129 "Unable to find pt-heartbeat row for {db_server}",
131 'method' => __METHOD__
149 $this->lagDetectionOptions[
'conds'] ?? [
'server_id != @@server_id' ],
150 ISQLPlatform::LIST_AND
155 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
156 "FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
157 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
160 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
161 "FROM heartbeat.heartbeat WHERE ? ORDER BY ts DESC LIMIT 1",
163 $res = $conn->
query( $query, __METHOD__ );
164 $row = $res ? $res->fetchObject() :
false;
166 return $row ? ( $row->us_ago / 1e6 ) :
null;
171 if ( $this->lagDetectionMethod ===
'pt-heartbeat' ) {
175 return parent::getApproximateLagStatus( $conn );
178 $key = $this->srvCache->makeGlobalKey(
'mysql-lag', $conn->
getServerName() );
179 $approxLag = $this->srvCache->get( $key );
181 $approxLag = parent::getApproximateLagStatus( $conn );
182 $this->srvCache->set( $key, $approxLag, 1 );
194 if ( $this->replicationInfoRow ===
null ) {
195 $this->replicationInfoRow = $conn->
selectRow(
198 'innodb_autoinc_lock_mode' =>
'@@innodb_autoinc_lock_mode',
199 'binlog_format' =>
'@@binlog_format',
205 return $this->replicationInfoRow;
218 throw new InvalidArgumentException(
"Position not an instance of MySQLPrimaryPos" );
222 $this->logger->debug(
223 "Bypassed replication wait; database has a static dataset",
224 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
228 } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
229 $this->logger->debug(
230 "Bypassed replication wait; replication known to have reached {raw_pos}",
231 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
238 if ( $pos->getGTIDs() ) {
242 $this->logger->error(
243 "Could not get replication position on replica DB to compare to {raw_pos}",
244 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
250 $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
252 $this->logger->error(
253 "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
255 'method' => __METHOD__,
257 'current_pos' => $refPos
264 $gtidArg = $conn->
addQuotes( implode(
',', $gtidsWait ) );
265 if ( str_contains( $gtidArg,
':' ) ) {
268 "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)",
269 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
272 "SELECT WAIT_FOR_EXECUTED_GTID_SET(?, ?)"
277 "SELECT MASTER_GTID_WAIT($gtidArg, $timeout)",
278 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
281 "SELECT MASTER_GTID_WAIT(?, ?)"
284 $waitPos = implode(
',', $gtidsWait );
287 $encFile = $conn->
addQuotes( $pos->getLogFile() );
289 $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
291 "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)",
292 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
295 "SELECT MASTER_POS_WAIT(?, ?, ?)"
300 $start = microtime(
true );
301 $res = $conn->
query( $query, __METHOD__ );
302 $row = $res->fetchRow();
303 $seconds = max( microtime(
true ) - $start, 0 );
306 $status = ( $row[0] !== null ) ? intval( $row[0] ) :
null;
307 if ( $status ===
null ) {
308 $this->logger->error(
309 "An error occurred while waiting for replication to reach {wait_pos}",
312 'wait_pos' => $waitPos,
313 'sql' => $query->getSQL(),
314 'seconds_waited' => $seconds,
315 'exception' =>
new RuntimeException()
318 } elseif ( $status < 0 ) {
320 "Timed out waiting for replication to reach {wait_pos}",
323 'wait_pos' => $waitPos,
324 'timeout' => $timeout,
325 'sql' => $query->getSQL(),
326 'seconds_waited' => $seconds,
329 } elseif ( $status >= 0 ) {
330 $this->logger->debug(
331 "Replication has reached {wait_pos}",
334 'wait_pos' => $waitPos,
335 'seconds_waited' => $seconds,
339 $this->lastKnownReplicaPos = $pos;
352 $now = microtime(
true );
358 foreach ( [
'gtid_slave_pos',
'gtid_executed' ] as $name ) {
359 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
366 if ( $data && strlen( $data[
'Relay_Master_Log_File'] ) ) {
368 "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
383 $now = microtime(
true );
390 foreach ( [
'gtid_binlog_pos',
'gtid_executed' ] as $name ) {
391 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
398 $pos->setActiveOriginServerId( $this->
getServerId( $conn ) );
399 $pos->setActiveOriginServerUUID( $this->
getServerUUID( $conn ) );
400 if ( isset( $data[
'gtid_domain_id'] ) ) {
401 $pos->setActiveDomain( $data[
'gtid_domain_id'] );
408 if ( $data && strlen( $data[
'File'] ) ) {
409 $pos =
new MySQLPrimaryPos(
"{$data['File']}/{$data['Position']}", $now );
423 return $this->srvCache->getWithSetCallback(
424 $this->srvCache->makeGlobalKey(
'mysql-server-id', $conn->
getServerName() ),
425 self::SERVER_ID_CACHE_TTL,
426 static function () use ( $conn, $fname ) {
428 "SELECT @@server_id AS id",
429 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
432 "SELECT @@server_id AS id"
434 $res = $conn->
query( $query, $fname );
436 return $res->fetchObject()->id;
448 return $this->srvCache->getWithSetCallback(
449 $this->srvCache->makeGlobalKey(
'mysql-server-uuid', $conn->
getServerName() ),
450 self::SERVER_ID_CACHE_TTL,
451 static function () use ( $conn, $fname ) {
453 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'",
454 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
457 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'"
459 $res = $conn->
query( $query, $fname );
460 $row = $res->fetchObject();
462 return $row ? $row->Value :
null;
475 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
479 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'",
483 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'"
485 $res = $conn->
query( $query, $fname );
486 foreach ( $res as $row ) {
487 $map[$row->Variable_name] = $row->Value;
491 "SHOW SESSION VARIABLES LIKE 'gtid_%'",
495 "SHOW SESSION VARIABLES LIKE 'gtid_%'"
497 $res = $conn->
query( $query, $fname );
498 foreach ( $res as $row ) {
499 $map[$row->Variable_name] = $row->Value;
514 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
519 $res = $conn->
query( $query, $fname );
520 $row = $res ? $res->fetchRow() :
false;
522 return ( $row ?: null );