25use Wikimedia\AtEase\AtEase;
66 private $sessionLastAutoRowId;
88 foreach ( [
'KeyPath',
'CertPath',
'CAFile',
'CAPath',
'Ciphers' ] as $name ) {
94 $this->utf8Mode = !empty(
$params[
'utf8Mode'] );
106 $params[
'lagDetectionMethod'] ??
'Seconds_Behind_Master',
107 $params[
'lagDetectionOptions'] ?? [],
119 protected function open( $server, $user, $password, $db, $schema, $tablePrefix ) {
120 $this->
close( __METHOD__ );
122 if ( $schema !==
null ) {
128 $this->conn = $this->mysqlConnect( $server, $user, $password, $db );
129 }
catch ( RuntimeException $e ) {
135 if ( !$this->conn ) {
141 ( $db !==
'' ) ? $db :
null,
148 if ( !$this->
flagsHolder->getFlag( self::DBO_GAUGE ) ) {
150 $set[] =
'group_concat_max_len = 262144';
155 if ( !is_int( $val ) && !is_float( $val ) ) {
158 $set[] = $this->
platform->addIdentifierQuotes( $var ) .
' = ' . $val;
163 $sql =
'SET ' . implode(
', ', $set );
169 if ( $qs->res ===
false ) {
173 }
catch ( RuntimeException $e ) {
182 __CLASS__ .
": domain '{$domain->getId()}' has a schema component"
188 if ( $database ===
null ) {
199 if ( $database !== $this->
getDBname() ) {
201 $query =
new Query( $sql, self::QUERY_CHANGE_TRX,
'USE' );
202 $qs = $this->
executeQuery( $query, __METHOD__, self::QUERY_CHANGE_TRX );
203 if ( $qs->res ===
false ) {
211 $this->
platform->setCurrentDomain( $domain );
222 $error = $this->mysqlError( $this->conn );
224 $error = $this->mysqlError();
227 $error = $this->mysqlError() ?: $this->lastConnectError;
234 $row = $this->replicationReporter->getReplicationSafetyInfo( $this, $fname );
236 if ( $row->binlog_format ===
'ROW' ) {
240 if ( isset( $selectOptions[
'LIMIT'] ) ) {
251 in_array(
'NO_AUTO_COLUMNS', $insertOptions ) ||
252 (
int)$row->innodb_autoinc_lock_mode === 0
257 if ( $this->conn && $this->conn->warning_count ) {
259 $warnings = $this->conn->get_warnings();
260 $done = $warnings ===
false;
262 if ( in_array( $warnings->errno, [
274 'Insert returned unacceptable warning: ' . $warnings->message,
280 $done = !$warnings->next();
293 $conds = $this->
platform->normalizeConditions( $conds, $fname );
294 $column = $this->
platform->extractSingleFieldFromList( $var );
295 if ( is_string( $column ) && !in_array( $column, [
'*',
'1' ] ) ) {
296 $conds[] =
"$column IS NOT NULL";
299 $options[
'EXPLAIN'] =
true;
300 $res = $this->
select( $tables, $var, $conds, $fname, $options, $join_conds );
301 if ( $res ===
false ) {
304 if ( !$res->numRows() ) {
309 foreach ( $res as $plan ) {
310 $rows *= $plan->rows > 0 ? $plan->rows : 1;
317 [ $db, $pt ] = $this->platform->getDatabaseAndTableIdentifier( $table );
318 if ( isset( $this->sessionTempTables[$db][$pt] ) ) {
322 return (
bool)$this->newSelectQueryBuilder()
324 ->from(
'information_schema.tables' )
326 'table_schema' => $db,
340 "SELECT * FROM " . $this->tableName( $table ) .
" LIMIT 1",
341 self::QUERY_SILENCE_ERRORS | self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE,
344 $res = $this->query( $query, __METHOD__ );
349 '@phan-var MysqliResultWrapper $res';
350 return $res->getInternalFieldInfo( $field );
353 public function indexInfo( $table, $index, $fname = __METHOD__ ) {
355 $index = $this->platform->indexName( $index );
357 'SHOW INDEX FROM ' . $this->tableName( $table ),
358 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE,
361 $res = $this->query( $query, $fname );
363 foreach ( $res as $row ) {
364 if ( $row->Key_name === $index ) {
365 return [
'unique' => !$row->Non_unique ];
377 return $this->mysqlRealEscapeString( $s );
382 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
383 $query =
new Query(
"SELECT @@GLOBAL.read_only AS Value", $flags,
'SELECT' );
384 $res = $this->query( $query, __METHOD__ );
385 $row = $res->fetchObject();
387 return $row && (bool)$row->Value;
394 [ $variant ] = $this->getMySqlServerVariant();
395 if ( $variant ===
'MariaDB' ) {
396 return '[{{int:version-db-mariadb-url}} MariaDB]';
399 return '[{{int:version-db-mysql-url}} MySQL]';
405 private function getMySqlServerVariant() {
406 $version = $this->getServerVersion();
412 $parts = explode(
'-', $version, 2 );
414 $suffix = $parts[1] ??
'';
415 if ( strpos( $suffix,
'MariaDB' ) !==
false || strpos( $suffix,
'-maria-' ) !==
false ) {
421 return [ $vendor, $number ];
430 $version = $this->conn->server_info;
432 str_starts_with( $version,
'5.5.5-' ) &&
433 ( str_contains( $version,
'MariaDB' ) || str_contains( $version,
'-maria-' ) )
435 $version = substr( $version, strlen(
'5.5.5-' ) );
441 $sqlAssignments = [];
443 if ( isset( $options[
'connTimeout'] ) ) {
444 $encTimeout = (int)$options[
'connTimeout'];
445 $sqlAssignments[] =
"net_read_timeout=$encTimeout";
446 $sqlAssignments[] =
"net_write_timeout=$encTimeout";
448 if ( isset( $options[
'groupConcatMaxLen'] ) ) {
449 $maxLength = (int)$options[
'groupConcatMaxLen'];
450 $sqlAssignments[] =
"group_concat_max_len=$maxLength";
453 if ( $sqlAssignments ) {
455 'SET ' . implode(
', ', $sqlAssignments ),
456 self::QUERY_CHANGE_TRX | self::QUERY_CHANGE_NONE,
459 $this->query( $query, __METHOD__ );
469 if ( preg_match(
'/^DELIMITER\s+(\S+)/i', $newLine, $m ) ) {
470 $this->delimiter = $m[1];
474 return parent::streamStatementEnd( $sql, $newLine );
478 $query =
new Query( $this->platform->lockIsFreeSQLText( $lockName ), self::QUERY_CHANGE_LOCKS,
'SELECT' );
479 $res = $this->query( $query, $method );
480 $row = $res->fetchObject();
482 return ( $row->unlocked == 1 );
485 public function doLock(
string $lockName,
string $method,
int $timeout ) {
486 $query =
new Query( $this->platform->lockSQLText( $lockName, $timeout ), self::QUERY_CHANGE_LOCKS,
'SELECT' );
487 $res = $this->query( $query, $method );
488 $row = $res->fetchObject();
490 return ( $row->acquired !==
null ) ? (float)$row->acquired :
null;
493 public function doUnlock(
string $lockName,
string $method ) {
494 $query =
new Query( $this->platform->unlockSQLText( $lockName ), self::QUERY_CHANGE_LOCKS,
'SELECT' );
495 $res = $this->query( $query, $method );
496 $row = $res->fetchObject();
498 return ( $row->released == 1 );
504 $releaseLockFields = [];
505 foreach ( $this->sessionNamedLocks as $name => $info ) {
506 $encName = $this->addQuotes( $this->platform->makeLockName( $name ) );
507 $releaseLockFields[] =
"RELEASE_LOCK($encName)";
509 if ( $releaseLockFields ) {
510 $sql =
'SELECT ' . implode(
',', $releaseLockFields );
511 $flags = self::QUERY_CHANGE_LOCKS | self::QUERY_NO_RETRY;
512 $query =
new Query( $sql, $flags,
'SELECT' );
513 $qs = $this->executeQuery( $query, __METHOD__, $flags );
514 if ( $qs->res ===
false ) {
515 $this->reportQueryError( $qs->message, $qs->code, $sql, $fname,
true );
520 public function upsert( $table, array $rows, $uniqueKeys, array $set, $fname = __METHOD__ ) {
521 $identityKey = $this->platform->normalizeUpsertParams( $uniqueKeys, $rows );
525 $this->platform->assertValidUpsertSetArray( $set, $identityKey, $rows );
527 $encTable = $this->tableName( $table );
528 [ $sqlColumns, $sqlTuples ] = $this->platform->makeInsertLists( $rows );
529 $sqlColumnAssignments = $this->makeList( $set, self::LIST_SET );
535 "INSERT INTO $encTable " .
536 "($sqlColumns) VALUES $sqlTuples " .
537 "ON DUPLICATE KEY UPDATE $sqlColumnAssignments";
538 $query =
new Query( $sql, self::QUERY_CHANGE_ROWS,
'INSERT', $table );
539 $this->query( $query, $fname );
541 $this->lastQueryAffectedRows = min( $this->lastQueryAffectedRows, count( $rows ) );
544 public function replace( $table, $uniqueKeys, $rows, $fname = __METHOD__ ) {
545 $this->platform->normalizeUpsertParams( $uniqueKeys, $rows );
549 $encTable = $this->tableName( $table );
550 [ $sqlColumns, $sqlTuples ] = $this->platform->makeInsertLists( $rows );
552 $sql =
"REPLACE INTO $encTable ($sqlColumns) VALUES $sqlTuples";
555 $query =
new Query( $sql, self::QUERY_CHANGE_ROWS,
'REPLACE', $table );
556 $this->query( $query, $fname );
558 $this->lastQueryAffectedRows = min( $this->lastQueryAffectedRows, count( $rows ) );
565 return in_array( $errno, [ 2013, 2006, 2003, 1927, 1053 ],
true );
572 return in_array( $errno, [ 3024, 2062, 1969, 1028 ],
true );
580 [ 3024, 1969, 1022, 1062, 1216, 1217, 1137, 1146, 1051, 1054 ],
593 $oldName, $newName, $temporary =
false, $fname = __METHOD__
595 $tmp = $temporary ?
'TEMPORARY ' :
'';
596 $newNameQuoted = $this->addIdentifierQuotes( $newName );
597 $oldNameQuoted = $this->addIdentifierQuotes( $oldName );
600 "CREATE $tmp TABLE $newNameQuoted (LIKE $oldNameQuoted)",
601 self::QUERY_PSEUDO_PERMANENT | self::QUERY_CHANGE_SCHEMA,
602 $temporary ?
'CREATE TEMPORARY' :
'CREATE',
606 return $this->query( $query, $fname );
616 public function listTables( $prefix =
null, $fname = __METHOD__ ) {
617 $qb = $this->newSelectQueryBuilder()
618 ->select(
'table_name' )
619 ->from(
'information_schema.tables' )
621 'table_schema' => $this->currentDomain->getDatabase(),
622 'table_type' =>
'BASE TABLE'
625 if ( $prefix !==
null && $prefix !==
'' ) {
626 $qb->andWhere( $this->expr(
630 return $qb->fetchFieldValues();
641 $sql = parent::selectSQLText( $tables, $vars, $conds, $fname, $options, $join_conds );
644 $timeoutMsec = intval( $options[
'MAX_EXECUTION_TIME'] ?? 0 );
645 if ( $timeoutMsec > 0 ) {
646 [ $vendor, $number ] = $this->getMySqlServerVariant();
647 if ( $vendor ===
'MariaDB' && version_compare( $number,
'10.1.2',
'>=' ) ) {
648 $timeoutSec = $timeoutMsec / 1000;
649 $sql =
"SET STATEMENT max_statement_time=$timeoutSec FOR $sql";
650 } elseif ( $vendor ===
'MySQL' && version_compare( $number,
'5.7.0',
'>=' ) ) {
653 "SELECT /*+ MAX_EXECUTION_TIME($timeoutMsec)*/",
663 $conn = $this->getBindingHandle();
666 AtEase::suppressWarnings();
667 $res = $conn->query( $sql );
668 AtEase::restoreWarnings();
670 $insertId = (int)$conn->insert_id;
671 $this->lastQueryInsertId = $insertId;
672 $this->sessionLastAutoRowId = $insertId ?: $this->sessionLastAutoRowId;
676 $conn->affected_rows,
690 private function mysqlConnect( $server, $user, $password, $db ) {
691 if ( !function_exists(
'mysqli_init' ) ) {
692 throw $this->newExceptionAfterConnectError(
693 "MySQLi functions missing, have you compiled PHP with the --with-mysqli option?"
698 mysqli_report( MYSQLI_REPORT_OFF );
710 $hostAndPort = IPUtils::splitHostAndPort( $server );
711 if ( $hostAndPort ) {
712 $realServer = $hostAndPort[0];
713 if ( $hostAndPort[1] ) {
714 $port = $hostAndPort[1];
716 } elseif ( substr_count( $server,
':/' ) == 1 ) {
719 [ $realServer, $socket ] = explode(
':', $server, 2 );
721 $realServer = $server;
724 $mysqli = mysqli_init();
728 $flags = MYSQLI_CLIENT_FOUND_ROWS;
730 $flags |= MYSQLI_CLIENT_SSL;
739 if ( $this->getFlag( self::DBO_COMPRESS ) ) {
740 $flags |= MYSQLI_CLIENT_COMPRESS;
742 if ( $this->getFlag( self::DBO_PERSISTENT ) ) {
743 $realServer =
'p:' . $realServer;
746 if ( $this->utf8Mode ) {
749 $mysqli->options( MYSQLI_SET_CHARSET_NAME,
'utf8' );
751 $mysqli->options( MYSQLI_SET_CHARSET_NAME,
'binary' );
754 $mysqli->options( MYSQLI_OPT_CONNECT_TIMEOUT, $this->connectTimeout ?: 3 );
755 if ( $this->receiveTimeout ) {
756 $mysqli->options( MYSQLI_OPT_READ_TIMEOUT, $this->receiveTimeout );
760 $ok = $mysqli->real_connect( $realServer, $user, $password, $db, $port, $socket, $flags );
762 return $ok ? $mysqli :
null;
766 return ( $this->conn instanceof mysqli ) ? mysqli_close( $this->conn ) :
true;
770 return $this->sessionLastAutoRowId;
775 $this->sessionLastAutoRowId = 0;
779 if ( $this->lastEmulatedInsertId ===
null ) {
780 $conn = $this->getBindingHandle();
782 $this->lastEmulatedInsertId = (int)$conn->insert_id;
785 return $this->lastEmulatedInsertId;
792 if ( $this->conn instanceof mysqli ) {
793 return $this->conn->errno;
795 return mysqli_connect_errno();
803 private function mysqlError( $conn =
null ) {
804 if ( $conn ===
null ) {
805 return (
string)mysqli_connect_error();
811 private function mysqlRealEscapeString( $s ) {
812 $conn = $this->getBindingHandle();
814 return $conn->real_escape_string( (
string)$s );
array $params
The job parameters.
Class to handle database/schema/prefix specifications for IDatabase.