27use InvalidArgumentException;
30use Wikimedia\AtEase\AtEase;
75 private const LAG_STALE_WARN_THRESHOLD = 0.100;
97 $this->lagDetectionMethod = $params[
'lagDetectionMethod'] ??
'Seconds_Behind_Master';
98 $this->lagDetectionOptions = $params[
'lagDetectionOptions'] ?? [];
99 $this->
useGTIDs = !empty( $params[
'useGTIDs' ] );
100 foreach ( [
'KeyPath',
'CertPath',
'CAFile',
'CAPath',
'Ciphers' ] as $name ) {
102 if ( isset( $params[$var] ) ) {
103 $this->$var = $params[$var];
106 $this->sqlMode = $params[
'sqlMode'] ??
null;
107 $this->utf8Mode = !empty( $params[
'utf8Mode'] );
108 $this->insertSelectIsSafe = isset( $params[
'insertSelectIsSafe'] )
109 ? (bool)$params[
'insertSelectIsSafe'] :
null;
111 parent::__construct( $params );
122 $this->
close( __METHOD__ );
124 if ( $schema !==
null ) {
134 $this->conn = $this->
mysqlConnect( $this->server, $dbName );
135 }
catch ( RuntimeException $e ) {
141 if ( !$this->conn ) {
147 $dbName && strlen( $dbName ) ? $dbName :
null,
152 $set = [
'group_concat_max_len = 262144' ];
154 if ( is_string( $this->sqlMode ) ) {
155 $set[] =
'sql_mode = ' . $this->
addQuotes( $this->sqlMode );
159 foreach ( $this->connectionVariables as $var => $val ) {
161 if ( !is_int( $val ) && !is_float( $val ) ) {
171 'SET ' . implode(
', ', $set ),
173 self::QUERY_IGNORE_DBO_TRX | self::QUERY_NO_RETRY | self::QUERY_CHANGE_TRX
176 }
catch ( RuntimeException $e ) {
185 __CLASS__ .
": domain '{$domain->getId()}' has a schema component"
191 if ( $database ===
null ) {
193 $this->currentDomain->getDatabase(),
201 if ( $database !== $this->
getDBname() ) {
203 list(
$res, $err, $errno ) =
204 $this->
executeQuery( $sql, __METHOD__, self::QUERY_IGNORE_DBO_TRX );
206 if (
$res ===
false ) {
213 $this->currentDomain = $domain;
233 AtEase::suppressWarnings();
235 AtEase::restoreWarnings();
255 AtEase::suppressWarnings();
257 AtEase::restoreWarnings();
264 if ( $errno == 2000 || $errno == 2013 ) {
267 'Error in fetchObject(): ' . htmlspecialchars( $this->
lastError() )
288 AtEase::suppressWarnings();
290 AtEase::restoreWarnings();
297 if ( $errno == 2000 || $errno == 2013 ) {
300 'Error in fetchRow(): ' . htmlspecialchars( $this->
lastError() )
321 if ( is_bool(
$res ) ) {
324 AtEase::suppressWarnings();
326 AtEase::restoreWarnings();
421 # Even if it's non-zero, it can still be invalid
422 AtEase::suppressWarnings();
427 AtEase::restoreWarnings();
432 $error .=
' (' . $this->server .
')';
449 return in_array( $errno, [ 2062, 3024 ] );
455 if ( $row->binlog_format ===
'ROW' ) {
459 if ( isset( $selectOptions[
'LIMIT'] ) ) {
470 in_array(
'NO_AUTO_COLUMNS', $insertOptions ) ||
471 (
int)$row->innodb_autoinc_lock_mode === 0
479 if ( $this->replicationInfoRow ===
null ) {
480 $this->replicationInfoRow = $this->
selectRow(
483 'innodb_autoinc_lock_mode' =>
'@@innodb_autoinc_lock_mode',
484 'binlog_format' =>
'@@binlog_format',
508 $fname = __METHOD__, $options = [], $join_conds = []
512 if ( is_string( $column ) && !in_array( $column, [
'*',
'1' ] ) ) {
513 $conds[] =
"$column IS NOT NULL";
516 $options[
'EXPLAIN'] =
true;
517 $res = $this->
select( $table, $var, $conds, $fname, $options, $join_conds );
518 if (
$res ===
false ) {
526 foreach (
$res as $plan ) {
527 $rows *= $plan->rows > 0 ? $plan->rows : 1;
537 $tableName =
"{$prefix}{$table}";
539 if ( isset( $this->sessionTempTables[$tableName] ) ) {
548 if ( $database !==
'' ) {
550 $sql =
"SHOW TABLES FROM $encDatabase LIKE '$encLike'";
552 $sql =
"SHOW TABLES LIKE '$encLike'";
558 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
561 return $res->numRows() > 0;
571 "SELECT * FROM " . $this->
tableName( $table ) .
" LIMIT 1",
573 self::QUERY_SILENCE_ERRORS | self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
579 for ( $i = 0; $i < $n; $i++ ) {
581 if ( $field == $meta->name ) {
607 public function indexInfo( $table, $index, $fname = __METHOD__ ) {
612 'SHOW INDEX FROM ' . $this->
tableName( $table ),
614 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
623 foreach (
$res as $row ) {
624 if ( $row->Key_name == $index ) {
629 return $result ?:
false;
655 return '`' . str_replace( [
"\0",
'`' ], [
'',
'``' ],
$s ) .
'`';
663 return strlen( $name ) && $name[0] ==
'`' && substr( $name, -1, 1 ) ==
'`';
688 self::QUERY_SILENCE_ERRORS | self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
690 $row =
$res ?
$res->fetchObject() :
false;
692 if ( $row && strval( $row->Seconds_Behind_Master ) !==
'' ) {
693 return intval( $row->Seconds_Behind_Master );
706 if ( $currentTrxInfo ) {
708 $staleness = microtime(
true ) - $currentTrxInfo[
'since'];
709 if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
712 $this->queryLogger->warning(
713 "Using cached lag value for {db_server} due to active transaction",
715 'method' => __METHOD__,
717 'exception' =>
new RuntimeException()
722 return $currentTrxInfo[
'lag'];
725 if ( isset( $options[
'conds'] ) ) {
731 if ( !$masterInfo ) {
732 $this->queryLogger->error(
733 "Unable to query master of {db_server} for server ID",
735 'method' => __METHOD__
742 $conds = [
'server_id' => intval( $masterInfo[
'serverId'] ) ];
746 list( $time, $nowUnix ) = $data;
747 if ( $time !==
null ) {
749 $dateTime =
new DateTime( $time,
new DateTimeZone(
'UTC' ) );
750 $timeUnix = (int)$dateTime->format(
'U' ) + $dateTime->format(
'u' ) / 1e6;
752 return max( $nowUnix - $timeUnix, 0.0 );
755 $this->queryLogger->error(
756 "Unable to find pt-heartbeat row for {db_server}",
758 'method' => __METHOD__
771 $this->topologyRootMaster ?? $this->
getServer()
775 return $cache->getWithSetCallback(
777 $cache::TTL_INDEFINITE,
778 function () use (
$cache, $key, $fname ) {
780 if ( !
$cache->lock( $key, 0, 10 ) ) {
784 $conn = $this->getLazyMasterHandle();
789 $flags = self::QUERY_SILENCE_ERRORS | self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
793 $row =
$res ?
$res->fetchObject() :
false;
794 $id = $row ? (int)$row->id : 0;
800 return $id ? [
'serverId' => $id,
'asOf' => time() ] :
false;
812 $nowUnix = microtime(
true );
813 $whereSQL = $this->
makeList( $conds, self::LIST_AND );
818 "SELECT ts FROM heartbeat.heartbeat WHERE $whereSQL ORDER BY ts DESC LIMIT 1",
820 self::QUERY_SILENCE_ERRORS | self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
822 $row =
$res ?
$res->fetchObject() :
false;
824 return [ $row ? $row->ts :
null, $nowUnix ];
832 return parent::getApproximateLagStatus();
835 $key = $this->srvCache->makeGlobalKey(
'mysql-lag', $this->
getServer() );
836 $approxLag = $this->srvCache->get( $key );
838 $approxLag = parent::getApproximateLagStatus();
839 $this->srvCache->set( $key, $approxLag, 1 );
847 throw new InvalidArgumentException(
"Position not an instance of MySQLMasterPos" );
850 if ( $this->topologyRole === self::ROLE_STATIC_CLONE ) {
851 $this->queryLogger->debug(
852 "Bypassed replication wait; database has a static dataset",
853 $this->
getLogContext( [
'method' => __METHOD__,
'raw_pos' => $pos ] )
857 } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
858 $this->queryLogger->debug(
859 "Bypassed replication wait; replication known to have reached {raw_pos}",
860 $this->
getLogContext( [
'method' => __METHOD__,
'raw_pos' => $pos ] )
867 if ( $pos->getGTIDs() ) {
871 $this->queryLogger->error(
872 "Could not get replication position on replica DB to compare to {raw_pos}",
873 $this->
getLogContext( [
'method' => __METHOD__,
'raw_pos' => $pos ] )
879 $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
881 $this->queryLogger->error(
882 "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
884 'method' => __METHOD__,
886 'current_pos' => $refPos
893 $gtidArg = $this->
addQuotes( implode(
',', $gtidsWait ) );
894 if ( strpos( $gtidArg,
':' ) !==
false ) {
896 $sql =
"SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)";
899 $sql =
"SELECT MASTER_GTID_WAIT($gtidArg, $timeout)";
901 $waitPos = implode(
',', $gtidsWait );
904 $encFile = $this->
addQuotes( $pos->getLogFile() );
906 $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
907 $sql =
"SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)";
911 $start = microtime(
true );
912 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
915 $seconds = max( microtime(
true ) - $start, 0 );
918 $status = ( $row[0] !== null ) ? intval( $row[0] ) :
null;
919 if ( $status ===
null ) {
920 $this->replLogger->error(
921 "An error occurred while waiting for replication to reach {raw_pos}",
924 'wait_pos' => $waitPos,
926 'seconds_waited' => $seconds,
927 'exception' =>
new RuntimeException()
930 } elseif ( $status < 0 ) {
931 $this->replLogger->error(
932 "Timed out waiting for replication to reach {raw_pos}",
935 'wait_pos' => $waitPos,
936 'timeout' => $timeout,
938 'seconds_waited' => $seconds,
939 'exception' =>
new RuntimeException()
942 } elseif ( $status >= 0 ) {
943 $this->replLogger->debug(
944 "Replication has reached {raw_pos}",
947 'wait_pos' => $waitPos,
948 'seconds_waited' => $seconds,
952 $this->lastKnownReplicaPos = $pos;
964 $now = microtime(
true );
970 foreach ( [
'gtid_slave_pos',
'gtid_executed' ] as $name ) {
971 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
978 if ( $data && strlen( $data[
'Relay_Master_Log_File'] ) ) {
980 "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
994 $now = microtime(
true );
1001 foreach ( [
'gtid_binlog_pos',
'gtid_executed' ] as $name ) {
1002 if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
1009 $pos->setActiveOriginServerId( $this->
getServerId() );
1011 if ( isset( $data[
'gtid_domain_id'] ) ) {
1012 $pos->setActiveDomain( $data[
'gtid_domain_id'] );
1019 if ( $data && strlen( $data[
'File'] ) ) {
1020 $pos =
new MySQLMasterPos(
"{$data['File']}/{$data['Position']}", $now );
1032 $fname = __METHOD__;
1033 return $this->srvCache->getWithSetCallback(
1034 $this->srvCache->makeGlobalKey(
'mysql-server-id', $this->getServer() ),
1035 self::SERVER_ID_CACHE_TTL,
1036 function () use ( $fname ) {
1037 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
1049 $fname = __METHOD__;
1050 return $this->srvCache->getWithSetCallback(
1051 $this->srvCache->makeGlobalKey(
'mysql-server-uuid', $this->getServer() ),
1052 self::SERVER_ID_CACHE_TTL,
1053 function () use ( $fname ) {
1054 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
1055 $res = $this->
query(
"SHOW GLOBAL VARIABLES LIKE 'server_uuid'", $fname,
$flags );
1058 return $row ? $row->Value :
null;
1070 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
1073 $res = $this->
query(
"SHOW GLOBAL VARIABLES LIKE 'gtid_%'", $fname,
$flags );
1074 foreach (
$res as $row ) {
1075 $map[$row->Variable_name] = $row->Value;
1078 $res = $this->
query(
"SHOW SESSION VARIABLES LIKE 'gtid_%'", $fname,
$flags );
1079 foreach (
$res as $row ) {
1080 $map[$row->Variable_name] = $row->Value;
1092 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
1095 return $res->fetchRow() ?: [];
1100 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
1101 $res = $this->
query(
"SELECT @@GLOBAL.read_only AS Value", __METHOD__,
$flags );
1104 return $row ? (bool)$row->Value :
false;
1112 return "FORCE INDEX (" . $this->
indexName( $index ) .
")";
1120 return "IGNORE INDEX (" . $this->
indexName( $index ) .
")";
1131 if ( strpos( $version,
'MariaDB' ) !==
false || strpos( $version,
'-maria-' ) !==
false ) {
1132 return '[{{int:version-db-mariadb-url}} MariaDB]';
1138 return '[{{int:version-db-mysql-url}} MySQL]';
1146 $fname = __METHOD__;
1149 $cache->makeGlobalKey(
'mysql-server-version', $this->getServer() ),
1151 function () use ( $fname ) {
1155 return $this->
selectField(
'',
'VERSION()',
'', $fname );
1164 if ( isset( $options[
'connTimeout'] ) ) {
1165 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_TRX;
1166 $timeout = (int)$options[
'connTimeout'];
1167 $this->
query(
"SET net_read_timeout=$timeout", __METHOD__,
$flags );
1168 $this->
query(
"SET net_write_timeout=$timeout", __METHOD__,
$flags );
1178 if ( preg_match(
'/^DELIMITER\s+(\S+)/i', $newLine, $m ) ) {
1179 $this->delimiter = $m[1];
1183 return parent::streamStatementEnd( $sql, $newLine );
1195 if ( !parent::lockIsFree( $lockName, $method ) ) {
1201 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
1202 $res = $this->
query(
"SELECT IS_FREE_LOCK($encName) AS lockstatus", $method,
$flags );
1205 return ( $row->lockstatus == 1 );
1214 public function lock( $lockName, $method, $timeout = 5 ) {
1217 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
1218 $res = $this->
query(
"SELECT GET_LOCK($encName, $timeout) AS lockstatus", $method,
$flags );
1221 if ( $row->lockstatus == 1 ) {
1222 parent::lock( $lockName, $method, $timeout );
1226 $this->queryLogger->info( __METHOD__ .
" failed to acquire lock '{lockname}'",
1227 [
'lockname' => $lockName ] );
1239 public function unlock( $lockName, $method ) {
1242 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
1243 $res = $this->
query(
"SELECT RELEASE_LOCK($encName) as lockstatus", $method,
$flags );
1246 if ( $row->lockstatus == 1 ) {
1247 parent::unlock( $lockName, $method );
1251 $this->queryLogger->warning( __METHOD__ .
" failed to release lock '$lockName'\n" );
1259 return ( strlen( $lockName ) > 64 ) ? sha1( $lockName ) : $lockName;
1272 foreach ( $write as $table ) {
1273 $items[] = $this->
tableName( $table ) .
' WRITE';
1275 foreach ( $read as $table ) {
1276 $items[] = $this->
tableName( $table ) .
' READ';
1280 "LOCK TABLES " . implode(
',', $items ),
1282 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_ROWS
1292 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_ROWS
1302 if ( $value ===
'default' ) {
1303 if ( $this->defaultBigSelects ===
null ) {
1304 # Function hasn't been called before so it must already be set to the default
1309 } elseif ( $this->defaultBigSelects ===
null ) {
1310 $this->defaultBigSelects =
1311 (bool)$this->
selectField(
false,
'@@sql_big_selects',
'', __METHOD__ );
1315 "SET sql_big_selects=" . ( $value ?
'1' :
'0' ),
1317 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_TRX
1332 $delTable, $joinTable, $delVar, $joinVar, $conds, $fname = __METHOD__
1338 $delTable = $this->
tableName( $delTable );
1339 $joinTable = $this->
tableName( $joinTable );
1340 $sql =
"DELETE $delTable FROM $delTable, $joinTable WHERE $delVar=$joinVar ";
1342 if ( $conds !=
'*' ) {
1343 $sql .=
' AND ' . $this->
makeList( $conds, self::LIST_AND );
1346 $this->
query( $sql, $fname, self::QUERY_CHANGE_ROWS );
1349 protected function doUpsert( $table, array $rows, array $uniqueKeys, array $set, $fname ) {
1352 $sqlColumnAssignments = $this->
makeList( $set, self::LIST_SET );
1355 "INSERT INTO $encTable ($sqlColumns) VALUES $sqlTuples " .
1356 "ON DUPLICATE KEY UPDATE $sqlColumnAssignments";
1358 $this->
query( $sql, $fname, self::QUERY_CHANGE_ROWS );
1361 protected function doReplace( $table, array $uniqueKeys, array $rows, $fname ) {
1365 $sql =
"REPLACE INTO $encTable ($sqlColumns) VALUES $sqlTuples";
1367 $this->
query( $sql, $fname, self::QUERY_CHANGE_ROWS );
1378 return (
int)$vars[
'Uptime'];
1406 ( $this->
lastErrno() == 1290 && strpos( $this->
lastError(),
'--read-only' ) !== false );
1410 return $errno == 2013 || $errno == 2006;
1416 if ( $errno === 1205 ) {
1420 [
'innodb_rollback_on_timeout' =>
'@@innodb_rollback_on_timeout' ],
1426 return $row->innodb_rollback_on_timeout ? false :
true;
1430 return in_array( $errno, [ 1022, 1062, 1216, 1217, 1137, 1146, 1051, 1054 ],
true );
1441 $oldName, $newName, $temporary =
false, $fname = __METHOD__
1443 $tmp = $temporary ?
'TEMPORARY ' :
'';
1447 return $this->
query(
1448 "CREATE $tmp TABLE $newName (LIKE $oldName)",
1450 self::QUERY_PSEUDO_PERMANENT | self::QUERY_CHANGE_SCHEMA
1461 public function listTables( $prefix =
null, $fname = __METHOD__ ) {
1462 $result = $this->
query(
1465 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
1470 foreach ( $result as $table ) {
1471 $vars = get_object_vars( $table );
1472 $table = array_pop( $vars );
1474 if ( !$prefix || strpos( $table, $prefix ) === 0 ) {
1475 $endArray[] = $table;
1490 "SHOW STATUS LIKE '{$which}'",
1492 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
1496 foreach (
$res as $row ) {
1497 $status[$row->Variable_name] = $row->Value;
1512 public function listViews( $prefix =
null, $fname = __METHOD__ ) {
1514 $propertyName =
'Tables_in_' . $this->
getDBname();
1518 'SHOW FULL TABLES WHERE TABLE_TYPE = "VIEW"',
1520 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
1524 foreach (
$res as $row ) {
1525 array_push( $allViews, $row->$propertyName );
1528 if ( $prefix ===
null || $prefix ===
'' ) {
1532 $filteredViews = [];
1533 foreach ( $allViews as $viewName ) {
1535 if ( strpos( $viewName, $prefix ) === 0 ) {
1536 array_push( $filteredViews, $viewName );
1540 return $filteredViews;
1551 public function isView( $name, $prefix =
null ) {
1552 return in_array( $name, $this->
listViews( $prefix, __METHOD__ ) );
1556 return parent::isTransactableQuery( $sql ) &&
1557 !preg_match(
'/^SELECT\s+(GET|RELEASE|IS_FREE)_LOCK\(/', $sql );
1561 return "CAST( $field AS BINARY )";
1569 return 'CAST( ' . $field .
' AS SIGNED )';
1583class_alias( DatabaseMysqlBase::class,
'DatabaseMysqlBase' );
getWithSetCallback( $key, $exptime, $callback, $flags=0)
Get an item with the given key, regenerating and setting it if not found.
makeGlobalKey( $class,... $components)
Make a global cache key.
Class to handle database/schema/prefix specifications for IDatabase.