22use InvalidArgumentException;
46 private $replicationInfoRow;
48 private const SERVER_ID_CACHE_TTL = 86400;
51 private const LAG_STALE_WARN_THRESHOLD = 0.100;
68 if ( $this->lagDetectionMethod ===
'pt-heartbeat' ) {
83 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE
85 $row =
$res ?
$res->fetchObject() :
false;
87 if ( $row && strval( $row->Seconds_Behind_Master ) !==
'' ) {
90 return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
102 if ( $currentTrxInfo ) {
104 $staleness = microtime(
true ) - $currentTrxInfo[
'since'];
105 if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
109 $this->logger->warning(
110 "Using cached lag value for {db_server} due to active transaction",
112 'method' => __METHOD__,
114 'exception' =>
new RuntimeException()
119 return $currentTrxInfo[
'lag'];
123 if ( $ago !==
null ) {
124 return max( $ago, 0.0 );
127 $this->logger->error(
128 "Unable to find pt-heartbeat row for {db_server}",
130 'method' => __METHOD__
148 $this->lagDetectionOptions[
'conds'] ?? [
'server_id != @@server_id' ],
149 ISQLPlatform::LIST_AND
154 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
155 "FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
157 ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE
159 $row =
$res ?
$res->fetchObject() :
false;
161 return $row ? ( $row->us_ago / 1e6 ) :
null;
165 if ( $this->lagDetectionMethod ===
'pt-heartbeat' ) {
169 return parent::getApproximateLagStatus( $conn );
172 $key = $this->srvCache->makeGlobalKey(
'mysql-lag', $conn->
getServerName() );
173 $approxLag = $this->srvCache->get( $key );
175 $approxLag = parent::getApproximateLagStatus( $conn );
176 $this->srvCache->set( $key, $approxLag, 1 );
187 if ( $this->replicationInfoRow ===
null ) {
188 $this->replicationInfoRow = $conn->
selectRow(
191 'innodb_autoinc_lock_mode' =>
'@@innodb_autoinc_lock_mode',
192 'binlog_format' =>
'@@binlog_format',
198 return $this->replicationInfoRow;
210 throw new InvalidArgumentException(
"Position not an instance of MySQLPrimaryPos" );
213 if ( $this->topologyRole === IDatabase::ROLE_STATIC_CLONE ) {
214 $this->logger->debug(
215 "Bypassed replication wait; database has a static dataset",
216 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
220 } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
221 $this->logger->debug(
222 "Bypassed replication wait; replication known to have reached {raw_pos}",
223 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
230 if ( $pos->getGTIDs() ) {
234 $this->logger->error(
235 "Could not get replication position on replica DB to compare to {raw_pos}",
236 $this->
getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos ] )
242 $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
244 $this->logger->error(
245 "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
247 'method' => __METHOD__,
249 'current_pos' => $refPos
256 $gtidArg = $conn->
addQuotes( implode(
',', $gtidsWait ) );
257 if ( strpos( $gtidArg,
':' ) !==
false ) {
259 $sql =
"SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)";
262 $sql =
"SELECT MASTER_GTID_WAIT($gtidArg, $timeout)";
264 $waitPos = implode(
',', $gtidsWait );
267 $encFile = $conn->
addQuotes( $pos->getLogFile() );
269 $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
270 $sql =
"SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)";
274 $start = microtime(
true );
275 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
276 $res = $conn->
query( $sql, __METHOD__, $flags );
277 $row =
$res->fetchRow();
278 $seconds = max( microtime(
true ) - $start, 0 );
281 $status = ( $row[0] !== null ) ? intval( $row[0] ) :
null;
282 if ( $status ===
null ) {
283 $this->logger->error(
284 "An error occurred while waiting for replication to reach {wait_pos}",
287 'wait_pos' => $waitPos,
289 'seconds_waited' => $seconds,
290 'exception' =>
new RuntimeException()
293 } elseif ( $status < 0 ) {
294 $this->logger->error(
295 "Timed out waiting for replication to reach {wait_pos}",
298 'wait_pos' => $waitPos,
299 'timeout' => $timeout,
301 'seconds_waited' => $seconds,
302 'exception' =>
new RuntimeException()
305 } elseif ( $status >= 0 ) {
306 $this->logger->debug(
307 "Replication has reached {wait_pos}",
310 'wait_pos' => $waitPos,
311 'seconds_waited' => $seconds,
315 $this->lastKnownReplicaPos = $pos;
328 $now = microtime(
true );
334 foreach ( [
'gtid_slave_pos',
'gtid_executed' ] as $name ) {
335 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
342 if ( $data && strlen( $data[
'Relay_Master_Log_File'] ) ) {
344 "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
359 $now = microtime(
true );
366 foreach ( [
'gtid_binlog_pos',
'gtid_executed' ] as $name ) {
367 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
374 $pos->setActiveOriginServerId( $this->
getServerId( $conn ) );
375 $pos->setActiveOriginServerUUID( $this->
getServerUUID( $conn ) );
376 if ( isset( $data[
'gtid_domain_id'] ) ) {
377 $pos->setActiveDomain( $data[
'gtid_domain_id'] );
384 if ( $data && strlen( $data[
'File'] ) ) {
385 $pos =
new MySQLPrimaryPos(
"{$data['File']}/{$data['Position']}", $now );
408 return $this->srvCache->getWithSetCallback(
409 $this->srvCache->makeGlobalKey(
'mysql-server-id', $conn->
getServerName() ),
410 self::SERVER_ID_CACHE_TTL,
411 static function () use ( $conn, $fname ) {
412 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
413 $res = $conn->
query(
"SELECT @@server_id AS id", $fname, $flags );
415 return $res->fetchObject()->
id;
427 return $this->srvCache->getWithSetCallback(
428 $this->srvCache->makeGlobalKey(
'mysql-server-uuid', $conn->
getServerName() ),
429 self::SERVER_ID_CACHE_TTL,
430 static function () use ( $conn, $fname ) {
431 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
432 $res = $conn->
query(
"SHOW GLOBAL VARIABLES LIKE 'server_uuid'", $fname, $flags );
433 $row =
$res->fetchObject();
435 return $row ? $row->Value :
null;
448 $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
451 $res = $conn->
query(
"SHOW GLOBAL VARIABLES LIKE 'gtid_%'", $fname, $flags );
452 foreach (
$res as $row ) {
453 $map[$row->Variable_name] = $row->Value;
456 $res = $conn->
query(
"SHOW SESSION VARIABLES LIKE 'gtid_%'", $fname, $flags );
457 foreach (
$res as $row ) {
458 $map[$row->Variable_name] = $row->Value;
471 $flags = ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX |
472 ISQLPlatform::QUERY_CHANGE_NONE;
473 $res = $conn->
query(
"SHOW $role STATUS", $fname, $flags );
474 $row =
$res ?
$res->fetchRow() :
false;
476 return ( $row ?: null );