Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
9.35% covered (danger)
9.35%
10 / 107
0.00% covered (danger)
0.00%
0 / 19
CRAP
0.00% covered (danger)
0.00%
0 / 1
PostgresPlatform
9.35% covered (danger)
9.35%
10 / 107
0.00% covered (danger)
0.00%
0 / 19
2066.52
0.00% covered (danger)
0.00%
0 / 1
 limitResult
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
 buildConcat
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 buildGroupConcat
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 timestamp
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 buildStringCast
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 implicitOrderby
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getCoreSchema
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setCoreSchema
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 selectSQLText
0.00% covered (danger)
0.00%
0 / 24
0.00% covered (danger)
0.00%
0 / 1
182
 makeSelectOptions
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
56
 getDatabaseAndTableIdentifier
88.89% covered (warning)
88.89%
8 / 9
0.00% covered (danger)
0.00%
0 / 1
5.03
 relationSchemaQualifier
66.67% covered (warning)
66.67%
2 / 3
0.00% covered (danger)
0.00%
0 / 1
2.15
 makeInsertLists
0.00% covered (danger)
0.00%
0 / 30
0.00% covered (danger)
0.00%
0 / 1
90
 makeInsertNonConflictingVerbAndOptions
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 makeUpdateOptionsArray
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 isTransactableQuery
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 lockSQLText
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 unlockSQLText
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 bigintFromLockName
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2/**
3 * @license GPL-2.0-or-later
4 * @file
5 */
6namespace Wikimedia\Rdbms\Platform;
7
8use Wikimedia\Rdbms\DBLanguageError;
9use Wikimedia\Rdbms\Query;
10use Wikimedia\Timestamp\ConvertibleTimestamp;
11use Wikimedia\Timestamp\TimestampFormat as TS;
12
13/**
14 * @since 1.39
15 * @see ISQLPlatform
16 */
17class PostgresPlatform extends SQLPlatform {
18    /** @var string */
19    private $coreSchema;
20
21    /** @inheritDoc */
22    public function limitResult( $sql, $limit, $offset = false ) {
23        return "$sql LIMIT $limit " . ( is_numeric( $offset ) ? " OFFSET {$offset} " : '' );
24    }
25
26    /** @inheritDoc */
27    public function buildConcat( $stringList ) {
28        return implode( ' || ', $stringList );
29    }
30
31    /** @inheritDoc */
32    public function buildGroupConcat( $field, $delim ): string {
33        return "array_to_string(array_agg($field)," . $this->quoter->addQuotes( $delim ) . ')';
34    }
35
36    /** @inheritDoc */
37    public function timestamp( $ts = 0 ) {
38        $ct = new ConvertibleTimestamp( $ts );
39
40        return $ct->getTimestamp( TS::POSTGRES );
41    }
42
43    /** @inheritDoc */
44    public function buildStringCast( $field ) {
45        return $field . '::text';
46    }
47
48    /** @inheritDoc */
49    public function implicitOrderby() {
50        return false;
51    }
52
53    public function getCoreSchema(): string {
54        return $this->coreSchema;
55    }
56
57    public function setCoreSchema( string $coreSchema ): void {
58        $this->coreSchema = $coreSchema;
59    }
60
61    /** @inheritDoc */
62    public function selectSQLText(
63        $tables, $vars, $conds = '', $fname = __METHOD__, $options = [], $join_conds = []
64    ) {
65        if ( is_string( $options ) ) {
66            $options = [ $options ];
67        }
68
69        // Change the FOR UPDATE option as necessary based on the join conditions. Then pass
70        // to the parent function to get the actual SQL text.
71        // In Postgres when using FOR UPDATE, only the main table and tables that are inner joined
72        // can be locked. That means tables in an outer join cannot be FOR UPDATE locked. Trying to
73        // do so causes a DB error. This wrapper checks which tables can be locked and adjusts it
74        // accordingly.
75        // MySQL uses "ORDER BY NULL" as an optimization hint, but that is illegal in PostgreSQL.
76        if ( is_array( $options ) ) {
77            $forUpdateKey = array_search( 'FOR UPDATE', $options, true );
78            if ( $forUpdateKey !== false && $join_conds ) {
79                unset( $options[$forUpdateKey] );
80                $options['FOR UPDATE'] = [];
81
82                $toCheck = $tables;
83                reset( $toCheck );
84                while ( $toCheck ) {
85                    $alias = key( $toCheck );
86                    $table = $toCheck[$alias];
87                    unset( $toCheck[$alias] );
88
89                    if ( !is_string( $alias ) ) {
90                        // No alias? Set it equal to the table name
91                        $alias = $table;
92                    }
93
94                    if ( !isset( $join_conds[$alias] ) ||
95                        !preg_match( '/^(?:LEFT|RIGHT|FULL)(?: OUTER)? JOIN$/i', $join_conds[$alias][0] )
96                    ) {
97                        if ( is_array( $table ) ) {
98                            // It's a parenthesized group, process all the tables inside the group.
99                            $toCheck = array_merge( $toCheck, $table );
100                        } else {
101                            // If an alias is declared, then any FOR UPDATE FOR must use it
102                            $options['FOR UPDATE'][] = $alias;
103                        }
104                    }
105                }
106            }
107
108            if (
109                isset( $options['ORDER BY'] ) &&
110                ( $options['ORDER BY'] == 'NULL' || $options['ORDER BY'] == [ 'NULL' ] )
111            ) {
112                unset( $options['ORDER BY'] );
113            }
114        }
115
116        return parent::selectSQLText( $tables, $vars, $conds, $fname, $options, $join_conds );
117    }
118
119    /** @inheritDoc */
120    protected function makeSelectOptions( array $options ) {
121        $preLimitTail = $postLimitTail = '';
122        $startOpts = '';
123
124        $noKeyOptions = [];
125        foreach ( $options as $key => $option ) {
126            if ( is_numeric( $key ) ) {
127                $noKeyOptions[$option] = true;
128            }
129        }
130
131        $preLimitTail .= $this->makeGroupByWithHaving( $options );
132
133        $preLimitTail .= $this->makeOrderBy( $options );
134
135        if ( isset( $options['FOR UPDATE'] ) ) {
136            $postLimitTail .= ' FOR UPDATE OF ' . implode(
137                ', ',
138                array_map( $this->addIdentifierQuotes( ... ), $options['FOR UPDATE'] )
139            );
140        } elseif ( isset( $noKeyOptions['FOR UPDATE'] ) ) {
141            $postLimitTail .= ' FOR UPDATE';
142        }
143
144        if ( isset( $noKeyOptions['DISTINCT'] ) || isset( $noKeyOptions['DISTINCTROW'] ) ) {
145            $startOpts .= 'DISTINCT';
146        }
147
148        return [ $startOpts, $preLimitTail, $postLimitTail ];
149    }
150
151    /** @inheritDoc */
152    public function getDatabaseAndTableIdentifier( string $table ) {
153        $components = $this->qualifiedTableComponents( $table );
154        switch ( count( $components ) ) {
155            case 1:
156                return [ $this->currentDomain->getDatabase(), $components[0] ];
157            case 2:
158                return [ $this->currentDomain->getDatabase(), $components[1] ];
159            case 3:
160                return [ $components[0], $components[2] ];
161            default:
162                throw new DBLanguageError( 'Too many table components' );
163        }
164    }
165
166    /** @inheritDoc */
167    protected function relationSchemaQualifier() {
168        if ( $this->coreSchema === $this->currentDomain->getSchema() ) {
169            // The schema to be used is now in the search path; no need for explicit qualification
170            return '';
171        }
172
173        return parent::relationSchemaQualifier();
174    }
175
176    /** @inheritDoc */
177    public function makeInsertLists( array $rows, $aliasPrefix = '', array $typeByColumn = [] ) {
178        $firstRow = $rows[0];
179        if ( !is_array( $firstRow ) || !$firstRow ) {
180            throw new DBLanguageError( 'Got an empty row list or empty row' );
181        }
182        // List of columns that define the value tuple ordering
183        $tupleColumns = array_keys( $firstRow );
184
185        $valueTuples = [];
186        foreach ( $rows as $row ) {
187            $rowColumns = array_keys( $row );
188            // VALUES(...) requires a uniform correspondence of (column => value)
189            if ( $rowColumns !== $tupleColumns ) {
190                throw new DBLanguageError(
191                    'All rows must specify the same columns in multi-row inserts. Found a row with (' .
192                    implode( ', ', $rowColumns ) . ') ' .
193                    'instead of expected (' . implode( ', ', $tupleColumns ) . ') as in the first row'
194                );
195            }
196            // Make the value tuple that defines this row
197            $typedRowValues = [];
198            foreach ( $row as $column => $value ) {
199                $type = $typeByColumn[$column] ?? null;
200                if ( $value === null ) {
201                    $typedRowValues[] = 'NULL';
202                } elseif ( $type !== null ) {
203                    $typedRowValues[] = $this->quoter->addQuotes( $value ) . '::' . $type;
204                } else {
205                    $typedRowValues[] = $this->quoter->addQuotes( $value );
206                }
207            }
208            $valueTuples[] = '(' . implode( ',', $typedRowValues ) . ')';
209        }
210
211        $magicAliasFields = [];
212        foreach ( $tupleColumns as $column ) {
213            $magicAliasFields[] = $aliasPrefix . $column;
214        }
215
216        return [
217            $this->makeList( $tupleColumns, self::LIST_NAMES ),
218            implode( ',', $valueTuples ),
219            $this->makeList( $magicAliasFields, self::LIST_NAMES )
220        ];
221    }
222
223    /** @inheritDoc */
224    protected function makeInsertNonConflictingVerbAndOptions() {
225        return [ 'INSERT INTO', 'ON CONFLICT DO NOTHING' ];
226    }
227
228    /** @inheritDoc */
229    protected function makeUpdateOptionsArray( $options ) {
230        $options = $this->normalizeOptions( $options );
231        // PostgreSQL doesn't support anything like "ignore" for UPDATE.
232        $options = array_diff( $options, [ 'IGNORE' ] );
233
234        return parent::makeUpdateOptionsArray( $options );
235    }
236
237    /** @inheritDoc */
238    public function isTransactableQuery( Query $sql ) {
239        return parent::isTransactableQuery( $sql ) &&
240            !preg_match( '/^SELECT\s+pg_(try_|)advisory_\w+\(/', $sql->getSQL() );
241    }
242
243    /** @inheritDoc */
244    public function lockSQLText( $lockName, $timeout ) {
245        // http://www.postgresql.org/docs/9.2/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
246        $key = $this->quoter->addQuotes( $this->bigintFromLockName( $lockName ) );
247        return "SELECT (CASE WHEN pg_try_advisory_lock($key" .
248            "THEN EXTRACT(epoch from clock_timestamp()) " .
249            "ELSE NULL " .
250            "END) AS acquired";
251    }
252
253    /** @inheritDoc */
254    public function unlockSQLText( $lockName ) {
255        // http://www.postgresql.org/docs/9.2/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
256        $key = $this->quoter->addQuotes( $this->bigintFromLockName( $lockName ) );
257        return "SELECT pg_advisory_unlock($key) AS released";
258    }
259
260    /**
261     * @param string $lockName
262     * @return string Integer
263     */
264    private function bigintFromLockName( $lockName ) {
265        return \Wikimedia\base_convert( substr( sha1( $lockName ), 0, 15 ), 16, 10 );
266    }
267}