22use InvalidArgumentException;
47 private $replicationInfoRow;
49 private const SERVER_ID_CACHE_TTL = 86400;
52 private const LAG_STALE_WARN_THRESHOLD = 0.100;
69 if ( $this->lagDetectionMethod ===
'pt-heartbeat' ) {
83 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
88 $res = $conn->
query( $query, __METHOD__ );
89 $row = $res ? $res->fetchObject() :
false;
91 if ( $row && strval( $row->Seconds_Behind_Master ) !==
'' ) {
94 return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
106 if ( $currentTrxInfo ) {
108 $staleness = microtime(
true ) - $currentTrxInfo[
'since'];
109 if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
113 $this->logger->warning(
114 "Using cached lag value for {db_server} due to active transaction",
116 'method' => __METHOD__,
118 'exception' =>
new RuntimeException()
123 return $currentTrxInfo[
'lag'];
127 if ( $ago !==
null ) {
128 return max( $ago, 0.0 );
131 $this->logger->error(
132 "Unable to find pt-heartbeat row for {db_server}",
134 'method' => __METHOD__
152 $this->lagDetectionOptions[
'conds'] ?? [
'server_id != @@server_id' ],
153 ISQLPlatform::LIST_AND
158 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
159 "FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
160 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
163 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
164 "FROM heartbeat.heartbeat WHERE ? ORDER BY ts DESC LIMIT 1",
166 $res = $conn->
query( $query, __METHOD__ );
167 $row = $res ? $res->fetchObject() :
false;
169 return $row ? ( $row->us_ago / 1e6 ) :
null;
173 if ( $this->lagDetectionMethod ===
'pt-heartbeat' ) {
177 return parent::getApproximateLagStatus( $conn );
180 $key = $this->srvCache->makeGlobalKey(
'mysql-lag', $conn->
getServerName() );
181 $approxLag = $this->srvCache->get( $key );
183 $approxLag = parent::getApproximateLagStatus( $conn );
184 $this->srvCache->set( $key, $approxLag, 1 );
196 if ( $this->replicationInfoRow ===
null ) {
197 $this->replicationInfoRow = $conn->
selectRow(
200 'innodb_autoinc_lock_mode' =>
'@@innodb_autoinc_lock_mode',
201 'binlog_format' =>
'@@binlog_format',
207 return $this->replicationInfoRow;
219 throw new InvalidArgumentException(
"Position not an instance of MySQLPrimaryPos" );
223 $this->logger->debug(
224 "Bypassed replication wait; database has a static dataset",
225 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
229 } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
230 $this->logger->debug(
231 "Bypassed replication wait; replication known to have reached {raw_pos}",
232 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
239 if ( $pos->getGTIDs() ) {
243 $this->logger->error(
244 "Could not get replication position on replica DB to compare to {raw_pos}",
245 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
251 $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
253 $this->logger->error(
254 "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
256 'method' => __METHOD__,
258 'current_pos' => $refPos
265 $gtidArg = $conn->
addQuotes( implode(
',', $gtidsWait ) );
266 if ( strpos( $gtidArg,
':' ) !==
false ) {
269 "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)",
270 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
273 "SELECT WAIT_FOR_EXECUTED_GTID_SET(?, ?)"
278 "SELECT MASTER_GTID_WAIT($gtidArg, $timeout)",
279 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
282 "SELECT MASTER_GTID_WAIT(?, ?)"
285 $waitPos = implode(
',', $gtidsWait );
288 $encFile = $conn->
addQuotes( $pos->getLogFile() );
290 $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
292 "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)",
293 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
296 "SELECT MASTER_POS_WAIT(?, ?, ?)"
301 $start = microtime(
true );
302 $res = $conn->
query( $query, __METHOD__ );
303 $row = $res->fetchRow();
304 $seconds = max( microtime(
true ) - $start, 0 );
307 $status = ( $row[0] !== null ) ? intval( $row[0] ) :
null;
308 if ( $status ===
null ) {
309 $this->logger->error(
310 "An error occurred while waiting for replication to reach {wait_pos}",
313 'wait_pos' => $waitPos,
314 'sql' => $query->getSQL(),
315 'seconds_waited' => $seconds,
316 'exception' =>
new RuntimeException()
319 } elseif ( $status < 0 ) {
321 "Timed out waiting for replication to reach {wait_pos}",
324 'wait_pos' => $waitPos,
325 'timeout' => $timeout,
326 'sql' => $query->getSQL(),
327 'seconds_waited' => $seconds,
330 } elseif ( $status >= 0 ) {
331 $this->logger->debug(
332 "Replication has reached {wait_pos}",
335 'wait_pos' => $waitPos,
336 'seconds_waited' => $seconds,
340 $this->lastKnownReplicaPos = $pos;
353 $now = microtime(
true );
359 foreach ( [
'gtid_slave_pos',
'gtid_executed' ] as $name ) {
360 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
367 if ( $data && strlen( $data[
'Relay_Master_Log_File'] ) ) {
369 "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
384 $now = microtime(
true );
391 foreach ( [
'gtid_binlog_pos',
'gtid_executed' ] as $name ) {
392 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
399 $pos->setActiveOriginServerId( $this->
getServerId( $conn ) );
400 $pos->setActiveOriginServerUUID( $this->
getServerUUID( $conn ) );
401 if ( isset( $data[
'gtid_domain_id'] ) ) {
402 $pos->setActiveDomain( $data[
'gtid_domain_id'] );
409 if ( $data && strlen( $data[
'File'] ) ) {
410 $pos =
new MySQLPrimaryPos(
"{$data['File']}/{$data['Position']}", $now );
424 return $this->srvCache->getWithSetCallback(
425 $this->srvCache->makeGlobalKey(
'mysql-server-id', $conn->
getServerName() ),
426 self::SERVER_ID_CACHE_TTL,
427 static function () use ( $conn, $fname ) {
429 "SELECT @@server_id AS id",
430 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
433 "SELECT @@server_id AS id"
435 $res = $conn->
query( $query, $fname );
437 return $res->fetchObject()->id;
449 return $this->srvCache->getWithSetCallback(
450 $this->srvCache->makeGlobalKey(
'mysql-server-uuid', $conn->
getServerName() ),
451 self::SERVER_ID_CACHE_TTL,
452 static function () use ( $conn, $fname ) {
454 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'",
455 ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
458 "SHOW GLOBAL VARIABLES LIKE 'server_uuid'"
460 $res = $conn->
query( $query, $fname );
461 $row = $res->fetchObject();
463 return $row ? $row->Value :
null;
476 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
480 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'",
484 "SHOW GLOBAL VARIABLES LIKE 'gtid_%'"
486 $res = $conn->
query( $query, $fname );
487 foreach ( $res as $row ) {
488 $map[$row->Variable_name] = $row->Value;
492 "SHOW SESSION VARIABLES LIKE 'gtid_%'",
496 "SHOW SESSION VARIABLES LIKE 'gtid_%'"
498 $res = $conn->
query( $query, $fname );
499 foreach ( $res as $row ) {
500 $map[$row->Variable_name] = $row->Value;
515 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
520 $res = $conn->
query( $query, $fname );
521 $row = $res ? $res->fetchRow() :
false;
523 return ( $row ?: null );