22use InvalidArgumentException;
71 private $insertSelectIsSafe;
73 private $replicationInfoRow;
76 private const SERVER_ID_CACHE_TTL = 86400;
79 private const LAG_STALE_WARN_THRESHOLD = 0.100;
105 $this->lagDetectionMethod = $params[
'lagDetectionMethod'] ??
'Seconds_Behind_Master';
106 $this->lagDetectionOptions = $params[
'lagDetectionOptions'] ?? [];
107 $this->
useGTIDs = !empty( $params[
'useGTIDs' ] );
108 foreach ( [
'KeyPath',
'CertPath',
'CAFile',
'CAPath',
'Ciphers' ] as $name ) {
110 if ( isset( $params[$var] ) ) {
111 $this->$var = $params[$var];
114 $this->utf8Mode = !empty( $params[
'utf8Mode'] );
115 $this->insertSelectIsSafe = isset( $params[
'insertSelectIsSafe'] )
116 ? (bool)$params[
'insertSelectIsSafe'] :
null;
117 parent::__construct( $params );
120 $params[
'queryLogger'],
121 $this->currentDomain,
134 $this->
close( __METHOD__ );
136 if ( $schema !==
null ) {
143 }
catch ( RuntimeException $e ) {
149 if ( !$this->conn ) {
155 $db && strlen( $db ) ? $db :
null,
159 $this->platform->setPrefix( $tablePrefix );
161 $set = [
'group_concat_max_len = 262144' ];
164 foreach ( $this->connectionVariables as $var => $val ) {
166 if ( !is_int( $val ) && !is_float( $val ) ) {
169 $set[] = $this->platform->addIdentifierQuotes( $var ) .
' = ' . $val;
175 $sql =
'SET ' . implode(
', ', $set );
176 $flags = self::QUERY_NO_RETRY | self::QUERY_CHANGE_TRX;
180 if ( $qs->res ===
false ) {
184 }
catch ( RuntimeException $e ) {
193 __CLASS__ .
": domain '{$domain->getId()}' has a schema component"
199 if ( $database ===
null ) {
201 $this->currentDomain->getDatabase(),
210 if ( $database !== $this->
getDBname() ) {
212 $qs = $this->
executeQuery( $sql, __METHOD__, self::QUERY_IGNORE_DBO_TRX, $sql );
213 if ( $qs->res ===
false ) {
220 $this->currentDomain = $domain;
266 if ( $row->binlog_format ===
'ROW' ) {
270 if ( isset( $selectOptions[
'LIMIT'] ) ) {
281 in_array(
'NO_AUTO_COLUMNS', $insertOptions ) ||
282 (
int)$row->innodb_autoinc_lock_mode === 0
290 if ( $this->replicationInfoRow ===
null ) {
291 $this->replicationInfoRow = $this->
selectRow(
294 'innodb_autoinc_lock_mode' =>
'@@innodb_autoinc_lock_mode',
295 'binlog_format' =>
'@@binlog_format',
302 return $this->replicationInfoRow;
326 $conds = $this->platform->normalizeConditions( $conds, $fname );
327 $column = $this->platform->extractSingleFieldFromList( $var );
328 if ( is_string( $column ) && !in_array( $column, [
'*',
'1' ] ) ) {
329 $conds[] =
"$column IS NOT NULL";
332 $options[
'EXPLAIN'] =
true;
333 $res = $this->
select( $tables, $var, $conds, $fname, $options, $join_conds );
334 if (
$res ===
false ) {
337 if ( !
$res->numRows() ) {
342 foreach (
$res as $plan ) {
343 $rows *= $plan->rows > 0 ? $plan->rows : 1;
352 list( $database, , $prefix, $table ) = $this->platform->qualifiedTableComponents( $table );
353 $tableName =
"{$prefix}{$table}";
355 if ( isset( $this->sessionTempTables[$tableName] ) ) {
362 $encLike = $this->platform->escapeLikeInternal( $tableName,
'\\' );
365 if ( $database !==
'' ) {
366 $encDatabase = $this->platform->addIdentifierQuotes( $database );
367 $sql =
"SHOW TABLES FROM $encDatabase LIKE '$encLike'";
369 $sql =
"SHOW TABLES LIKE '$encLike'";
375 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
378 return $res->numRows() > 0;
388 "SELECT * FROM " . $this->
tableName( $table ) .
" LIMIT 1",
390 self::QUERY_SILENCE_ERRORS | self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
396 '@phan-var MysqliResultWrapper $res';
397 return $res->getInternalFieldInfo( $field );
409 public function indexInfo( $table, $index, $fname = __METHOD__ ) {
414 'SHOW INDEX FROM ' . $this->
tableName( $table ),
416 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
425 foreach (
$res as $row ) {
426 if ( $row->Key_name == $index ) {
431 return $result ?:
false;
472 self::QUERY_SILENCE_ERRORS | self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
474 $row =
$res ?
$res->fetchObject() :
false;
476 if ( $row && strval( $row->Seconds_Behind_Master ) !==
'' ) {
479 return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
492 if ( $currentTrxInfo ) {
494 $staleness = microtime(
true ) - $currentTrxInfo[
'since'];
495 if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
498 $this->queryLogger->warning(
499 "Using cached lag value for {db_server} due to active transaction",
501 'method' => __METHOD__,
503 'exception' =>
new RuntimeException()
508 return $currentTrxInfo[
'lag'];
511 if ( isset( $options[
'conds'] ) ) {
517 $conds = $options[
'conds'];
522 if ( !$sourceInfo ) {
523 $this->queryLogger->error(
524 "Unable to query primary of {db_server} for server ID",
526 'method' => __METHOD__
533 $conds = [
'server_id' => $sourceInfo[
'serverId'] ];
537 if ( $ago !==
null ) {
538 return max( $ago, 0.0 );
541 $this->queryLogger->error(
542 "Unable to find pt-heartbeat row for {db_server}",
544 'method' => __METHOD__
564 $id = (int)( $row[
'Master_Server_Id'] ?? $row[
'Source_Server_Id'] ?? 0 );
570 return $id ? [
'serverId' => $id,
'asOf' => time() ] :
false;
579 $whereSQL = $this->
makeList( $conds, self::LIST_AND );
583 "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
584 "FROM heartbeat.heartbeat WHERE $whereSQL ORDER BY ts DESC LIMIT 1",
586 self::QUERY_SILENCE_ERRORS | self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
588 $row =
$res ?
$res->fetchObject() :
false;
590 return $row ? ( $row->us_ago / 1e6 ) :
null;
598 return parent::getApproximateLagStatus();
601 $key = $this->srvCache->makeGlobalKey(
'mysql-lag', $this->
getServerName() );
602 $approxLag = $this->srvCache->get( $key );
604 $approxLag = parent::getApproximateLagStatus();
605 $this->srvCache->set( $key, $approxLag, 1 );
613 throw new InvalidArgumentException(
"Position not an instance of MySQLPrimaryPos" );
616 if ( $this->topologyRole === self::ROLE_STATIC_CLONE ) {
617 $this->queryLogger->debug(
618 "Bypassed replication wait; database has a static dataset",
619 $this->
getLogContext( [
'method' => __METHOD__,
'raw_pos' => $pos ] )
623 } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
624 $this->queryLogger->debug(
625 "Bypassed replication wait; replication known to have reached {raw_pos}",
626 $this->
getLogContext( [
'method' => __METHOD__,
'raw_pos' => $pos ] )
633 if ( $pos->getGTIDs() ) {
637 $this->queryLogger->error(
638 "Could not get replication position on replica DB to compare to {raw_pos}",
639 $this->
getLogContext( [
'method' => __METHOD__,
'raw_pos' => $pos ] )
645 $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
647 $this->queryLogger->error(
648 "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
650 'method' => __METHOD__,
652 'current_pos' => $refPos
659 $gtidArg = $this->
addQuotes( implode(
',', $gtidsWait ) );
660 if ( strpos( $gtidArg,
':' ) !==
false ) {
662 $sql =
"SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)";
665 $sql =
"SELECT MASTER_GTID_WAIT($gtidArg, $timeout)";
667 $waitPos = implode(
',', $gtidsWait );
670 $encFile = $this->
addQuotes( $pos->getLogFile() );
672 $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
673 $sql =
"SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)";
677 $start = microtime(
true );
678 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
680 $row =
$res->fetchRow();
681 $seconds = max( microtime(
true ) - $start, 0 );
684 $status = ( $row[0] !== null ) ? intval( $row[0] ) :
null;
685 if ( $status ===
null ) {
686 $this->replLogger->error(
687 "An error occurred while waiting for replication to reach {wait_pos}",
690 'wait_pos' => $waitPos,
692 'seconds_waited' => $seconds,
693 'exception' =>
new RuntimeException()
696 } elseif ( $status < 0 ) {
697 $this->replLogger->error(
698 "Timed out waiting for replication to reach {wait_pos}",
701 'wait_pos' => $waitPos,
702 'timeout' => $timeout,
704 'seconds_waited' => $seconds,
705 'exception' =>
new RuntimeException()
708 } elseif ( $status >= 0 ) {
709 $this->replLogger->debug(
710 "Replication has reached {wait_pos}",
713 'wait_pos' => $waitPos,
714 'seconds_waited' => $seconds,
718 $this->lastKnownReplicaPos = $pos;
730 $now = microtime(
true );
736 foreach ( [
'gtid_slave_pos',
'gtid_executed' ] as $name ) {
737 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
744 if ( $data && strlen( $data[
'Relay_Master_Log_File'] ) ) {
746 "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
760 $now = microtime(
true );
767 foreach ( [
'gtid_binlog_pos',
'gtid_executed' ] as $name ) {
768 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
775 $pos->setActiveOriginServerId( $this->
getServerId() );
777 if ( isset( $data[
'gtid_domain_id'] ) ) {
778 $pos->setActiveDomain( $data[
'gtid_domain_id'] );
785 if ( $data && strlen( $data[
'File'] ) ) {
786 $pos =
new MySQLPrimaryPos(
"{$data['File']}/{$data['Position']}", $now );
807 return $this->srvCache->getWithSetCallback(
808 $this->srvCache->makeGlobalKey(
'mysql-server-id', $this->getServerName() ),
809 self::SERVER_ID_CACHE_TTL,
810 function () use ( $fname ) {
811 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
814 return $res->fetchObject()->id;
825 return $this->srvCache->getWithSetCallback(
826 $this->srvCache->makeGlobalKey(
'mysql-server-uuid', $this->getServerName() ),
827 self::SERVER_ID_CACHE_TTL,
828 function () use ( $fname ) {
829 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
830 $res = $this->
query(
"SHOW GLOBAL VARIABLES LIKE 'server_uuid'", $fname,
$flags );
831 $row =
$res->fetchObject();
833 return $row ? $row->Value :
null;
845 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
848 $res = $this->
query(
"SHOW GLOBAL VARIABLES LIKE 'gtid_%'", $fname,
$flags );
849 foreach (
$res as $row ) {
850 $map[$row->Variable_name] = $row->Value;
853 $res = $this->
query(
"SHOW SESSION VARIABLES LIKE 'gtid_%'", $fname,
$flags );
854 foreach (
$res as $row ) {
855 $map[$row->Variable_name] = $row->Value;
867 $flags = self::QUERY_SILENCE_ERRORS | self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
869 $row =
$res ?
$res->fetchRow() :
false;
871 return ( $row ?: null );
876 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
877 $res = $this->
query(
"SELECT @@GLOBAL.read_only AS Value", __METHOD__,
$flags );
878 $row =
$res->fetchObject();
880 return $row ? (bool)$row->Value :
false;
888 if ( $variant ===
'MariaDB' ) {
889 return '[{{int:version-db-mariadb-url}} MariaDB]';
892 return '[{{int:version-db-mysql-url}} MySQL]';
905 $parts = explode(
'-', $version, 2 );
907 $suffix = $parts[1] ??
'';
908 if ( strpos( $suffix,
'MariaDB' ) !==
false || strpos( $suffix,
'-maria-' ) !==
false ) {
914 return [ $vendor, $number ];
925 $cache->makeGlobalKey(
'mysql-server-version', $this->getServerName() ),
927 function () use ( $fname ) {
931 return $this->
selectField(
'',
'VERSION()',
'', $fname );
940 $sqlAssignments = [];
942 if ( isset( $options[
'connTimeout'] ) ) {
943 $encTimeout = (int)$options[
'connTimeout'];
944 $sqlAssignments[] =
"net_read_timeout=$encTimeout";
945 $sqlAssignments[] =
"net_write_timeout=$encTimeout";
948 if ( $sqlAssignments ) {
950 'SET ' . implode(
', ', $sqlAssignments ),
952 self::QUERY_CHANGE_TRX | self::QUERY_CHANGE_NONE
963 if ( preg_match(
'/^DELIMITER\s+(\S+)/i', $newLine, $m ) ) {
964 $this->delimiter = $m[1];
968 return parent::streamStatementEnd( $sql, $newLine );
973 $this->platform->lockIsFreeSQLText( $lockName ),
975 self::QUERY_CHANGE_LOCKS
977 $row =
$res->fetchObject();
979 return ( $row->unlocked == 1 );
982 public function doLock(
string $lockName,
string $method,
int $timeout ) {
984 $this->platform->lockSQLText( $lockName, $timeout ),
986 self::QUERY_CHANGE_LOCKS
988 $row =
$res->fetchObject();
990 return ( $row->acquired !==
null ) ? (float)$row->acquired :
null;
993 public function doUnlock(
string $lockName,
string $method ) {
995 $this->platform->unlockSQLText( $lockName ),
997 self::QUERY_CHANGE_LOCKS
999 $row =
$res->fetchObject();
1001 return ( $row->released == 1 );
1009 $flags = self::QUERY_CHANGE_LOCKS | self::QUERY_NO_RETRY;
1012 $releaseLockFields = [];
1013 foreach ( $this->sessionNamedLocks as $name => $info ) {
1014 $encName = $this->
addQuotes( $this->platform->makeLockName( $name ) );
1015 $releaseLockFields[] =
"RELEASE_LOCK($encName)";
1017 if ( $releaseLockFields ) {
1018 $sql =
'SELECT ' . implode(
',', $releaseLockFields );
1020 if ( $qs->res ===
false ) {
1030 if ( $value ===
'default' ) {
1031 if ( $this->defaultBigSelects ===
null ) {
1032 # Function hasn't been called before so it must already be set to the default
1037 } elseif ( $this->defaultBigSelects ===
null ) {
1038 $this->defaultBigSelects =
1039 (bool)$this->
selectField(
false,
'@@sql_big_selects',
'', __METHOD__ );
1043 "SET sql_big_selects=" . ( $value ?
'1' :
'0' ),
1045 self::QUERY_CHANGE_TRX
1057 list( $sqlColumns, $sqlTuples ) = $this->platform->makeInsertLists( $rows );
1058 $sqlColumnAssignments = $this->
makeList( $set, self::LIST_SET );
1064 "INSERT INTO $encTable " .
1065 "($sqlColumns) VALUES $sqlTuples " .
1066 "ON DUPLICATE KEY UPDATE $sqlColumnAssignments";
1068 $this->
query( $sql, $fname, self::QUERY_CHANGE_ROWS );
1071 protected function doReplace( $table, array $identityKey, array $rows, $fname ) {
1073 list( $sqlColumns, $sqlTuples ) = $this->platform->makeInsertLists( $rows );
1075 $sql =
"REPLACE INTO $encTable ($sqlColumns) VALUES $sqlTuples";
1077 $this->
query( $sql, $fname, self::QUERY_CHANGE_ROWS );
1105 ( $this->
lastErrno() == 1290 && strpos( $this->
lastError(),
'--read-only' ) !== false );
1112 return in_array( $errno, [ 2013, 2006, 2003, 1927, 1053 ],
true );
1119 return in_array( $errno, [ 3024, 2062, 1969, 1028 ],
true );
1125 if ( $errno === 1205 ) {
1128 "SELECT @@innodb_rollback_on_timeout AS Value",
1130 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
1132 $row =
$res ?
$res->fetchObject() :
false;
1135 return ( $row && !$row->Value );
1140 [ 3024, 1969, 1022, 1062, 1216, 1217, 1137, 1146, 1051, 1054 ],
1153 $oldName, $newName, $temporary =
false, $fname = __METHOD__
1155 $tmp = $temporary ?
'TEMPORARY ' :
'';
1159 return $this->
query(
1160 "CREATE $tmp TABLE $newName (LIKE $oldName)",
1162 self::QUERY_PSEUDO_PERMANENT | self::QUERY_CHANGE_SCHEMA
1173 public function listTables( $prefix =
null, $fname = __METHOD__ ) {
1174 $result = $this->
query(
1177 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
1182 foreach ( $result as $table ) {
1183 $vars = get_object_vars( $table );
1184 $table = array_pop( $vars );
1186 if ( !$prefix || strpos( $table, $prefix ) === 0 ) {
1187 $endArray[] = $table;
1203 public function listViews( $prefix =
null, $fname = __METHOD__ ) {
1205 $propertyName =
'Tables_in_' . $this->
getDBname();
1209 'SHOW FULL TABLES WHERE TABLE_TYPE = "VIEW"',
1211 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
1215 foreach (
$res as $row ) {
1216 array_push( $allViews, $row->$propertyName );
1219 if ( $prefix ===
null || $prefix ===
'' ) {
1223 $filteredViews = [];
1224 foreach ( $allViews as $viewName ) {
1226 if ( strpos( $viewName, $prefix ) === 0 ) {
1227 array_push( $filteredViews, $viewName );
1231 return $filteredViews;
1242 public function isView( $name, $prefix =
null ) {
1243 return in_array( $name, $this->
listViews( $prefix, __METHOD__ ) );
1250 $fname = __METHOD__,
1254 $sql = parent::selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds );
1257 $timeoutMsec = intval( $options[
'MAX_EXECUTION_TIME'] ?? 0 );
1258 if ( $timeoutMsec > 0 ) {
1260 if ( $vendor ===
'MariaDB' && version_compare( $number,
'10.1.2',
'>=' ) ) {
1261 $timeoutSec = $timeoutMsec / 1000;
1262 $sql =
"SET STATEMENT max_statement_time=$timeoutSec FOR $sql";
1263 } elseif ( $vendor ===
'MySQL' && version_compare( $number,
'5.7.0',
'>=' ) ) {
1264 $sql = preg_replace(
1266 "SELECT /*+ MAX_EXECUTION_TIME($timeoutMsec)*/",
1286class_alias( DatabaseMysqlBase::class,
'DatabaseMysqlBase' );
getWithSetCallback( $key, $exptime, $callback, $flags=0)
Get an item, regenerating and setting it if not found.
Class to handle database/schema/prefix specifications for IDatabase.
foreach( $mmfl['setupFiles'] as $fileName) if($queue) if(empty( $mmfl['quiet'])) $s