22use InvalidArgumentException;
23use Psr\Log\LoggerInterface;
49 private $replicationInfoRow;
51 private const SERVER_ID_CACHE_TTL = 86400;
54 private const LAG_STALE_WARN_THRESHOLD = 0.100;
79 if ( $this->lagDetectionMethod ===
'pt-heartbeat' ) {
93 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
98 $res = $conn->
query( $query, __METHOD__ );
99 $row = $res ? $res->fetchObject() :
false;
101 if ( $row && strval( $row->Seconds_Behind_Master ) !==
'' ) {
104 return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
116 if ( $currentTrxInfo ) {
118 $staleness = microtime(
true ) - $currentTrxInfo[
'since'];
119 if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
123 $this->logger->warning(
124 "Using cached lag value for {db_server} due to active transaction",
126 'method' => __METHOD__,
128 'exception' =>
new RuntimeException()
133 return $currentTrxInfo[
'lag'];
137 if ( $ago !==
null ) {
138 return max( $ago, 0.0 );
141 $this->logger->error(
142 "Unable to find pt-heartbeat row for {db_server}",
144 'method' => __METHOD__
162 $this->lagDetectionOptions[
'conds'] ?? [
'server_id != @@server_id' ],
163 ISQLPlatform::LIST_AND
168 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
169 "FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
170 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
173 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
174 "FROM heartbeat.heartbeat WHERE ? ORDER BY ts DESC LIMIT 1",
176 $res = $conn->
query( $query, __METHOD__ );
177 $row = $res ? $res->fetchObject() :
false;
179 return $row ? ( $row->us_ago / 1e6 ) :
null;
183 if ( $this->lagDetectionMethod ===
'pt-heartbeat' ) {
187 return parent::getApproximateLagStatus( $conn );
190 $key = $this->srvCache->makeGlobalKey(
'mysql-lag', $conn->
getServerName() );
191 $approxLag = $this->srvCache->get( $key );
193 $approxLag = parent::getApproximateLagStatus( $conn );
194 $this->srvCache->set( $key, $approxLag, 1 );
206 if ( $this->replicationInfoRow ===
null ) {
207 $this->replicationInfoRow = $conn->
selectRow(
210 'innodb_autoinc_lock_mode' =>
'@@innodb_autoinc_lock_mode',
211 'binlog_format' =>
'@@binlog_format',
217 return $this->replicationInfoRow;
229 throw new InvalidArgumentException(
"Position not an instance of MySQLPrimaryPos" );
233 $this->logger->debug(
234 "Bypassed replication wait; database has a static dataset",
235 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
239 } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
240 $this->logger->debug(
241 "Bypassed replication wait; replication known to have reached {raw_pos}",
242 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
249 if ( $pos->getGTIDs() ) {
253 $this->logger->error(
254 "Could not get replication position on replica DB to compare to {raw_pos}",
255 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
261 $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
263 $this->logger->error(
264 "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
266 'method' => __METHOD__,
268 'current_pos' => $refPos
275 $gtidArg = $conn->
addQuotes( implode(
',', $gtidsWait ) );
276 if ( strpos( $gtidArg,
':' ) !==
false ) {
279 "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)",
280 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
283 "SELECT WAIT_FOR_EXECUTED_GTID_SET(?, ?)"
288 "SELECT MASTER_GTID_WAIT($gtidArg, $timeout)",
289 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
292 "SELECT MASTER_GTID_WAIT(?, ?)"
295 $waitPos = implode(
',', $gtidsWait );
298 $encFile = $conn->
addQuotes( $pos->getLogFile() );
300 $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
302 "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)",
303 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
306 "SELECT MASTER_POS_WAIT(?, ?, ?)"
311 $start = microtime(
true );
312 $res = $conn->
query( $query, __METHOD__ );
313 $row = $res->fetchRow();
314 $seconds = max( microtime(
true ) - $start, 0 );
317 $status = ( $row[0] !== null ) ? intval( $row[0] ) :
null;
318 if ( $status ===
null ) {
319 $this->logger->error(
320 "An error occurred while waiting for replication to reach {wait_pos}",
323 'wait_pos' => $waitPos,
324 'sql' => $query->getSQL(),
325 'seconds_waited' => $seconds,
326 'exception' =>
new RuntimeException()
329 } elseif ( $status < 0 ) {
331 "Timed out waiting for replication to reach {wait_pos}",
334 'wait_pos' => $waitPos,
335 'timeout' => $timeout,
336 'sql' => $query->getSQL(),
337 'seconds_waited' => $seconds,
340 } elseif ( $status >= 0 ) {
341 $this->logger->debug(
342 "Replication has reached {wait_pos}",
345 'wait_pos' => $waitPos,
346 'seconds_waited' => $seconds,
350 $this->lastKnownReplicaPos = $pos;
363 $now = microtime(
true );
369 foreach ( [
'gtid_slave_pos',
'gtid_executed' ] as $name ) {
370 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
377 if ( $data && strlen( $data[
'Relay_Master_Log_File'] ) ) {
379 "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
394 $now = microtime(
true );
401 foreach ( [
'gtid_binlog_pos',
'gtid_executed' ] as $name ) {
402 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
409 $pos->setActiveOriginServerId( $this->
getServerId( $conn ) );
410 $pos->setActiveOriginServerUUID( $this->
getServerUUID( $conn ) );
411 if ( isset( $data[
'gtid_domain_id'] ) ) {
412 $pos->setActiveDomain( $data[
'gtid_domain_id'] );
419 if ( $data && strlen( $data[
'File'] ) ) {
420 $pos =
new MySQLPrimaryPos(
"{$data['File']}/{$data['Position']}", $now );
434 return $this->srvCache->getWithSetCallback(
435 $this->srvCache->makeGlobalKey(
'mysql-server-id', $conn->
getServerName() ),
436 self::SERVER_ID_CACHE_TTL,
437 static function () use ( $conn, $fname ) {
439 "SELECT @@server_id AS id",
440 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
443 "SELECT @@server_id AS id"
445 $res = $conn->
query( $query, $fname );
447 return $res->fetchObject()->id;
459 return $this->srvCache->getWithSetCallback(
460 $this->srvCache->makeGlobalKey(
'mysql-server-uuid', $conn->
getServerName() ),
461 self::SERVER_ID_CACHE_TTL,
462 static function () use ( $conn, $fname ) {
464 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'",
465 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
468 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'"
470 $res = $conn->
query( $query, $fname );
471 $row = $res->fetchObject();
473 return $row ? $row->Value :
null;
486 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
490 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'",
494 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'"
496 $res = $conn->
query( $query, $fname );
497 foreach ( $res as $row ) {
498 $map[$row->Variable_name] = $row->Value;
502 "SHOW SESSION VARIABLES LIKE 'gtid_%'",
506 "SHOW SESSION VARIABLES LIKE 'gtid_%'"
508 $res = $conn->
query( $query, $fname );
509 foreach ( $res as $row ) {
510 $map[$row->Variable_name] = $row->Value;
525 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
530 $res = $conn->
query( $query, $fname );
531 $row = $res ? $res->fetchRow() :
false;
533 return ( $row ?: null );