Code Coverage
 
Classes and Traits
Functions and Methods
Lines
Total
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 6
CRAP
0.00% covered (danger)
0.00%
0 / 98
ElasticaWrite
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 6
272
0.00% covered (danger)
0.00%
0 / 98
 build
0.00% covered (danger)
0.00%
0 / 1
2
0.00% covered (danger)
0.00%
0 / 6
 serde
0.00% covered (danger)
0.00%
0 / 1
42
0.00% covered (danger)
0.00%
0 / 14
 __construct
0.00% covered (danger)
0.00%
0 / 1
2
0.00% covered (danger)
0.00%
0 / 7
 allowRetries
0.00% covered (danger)
0.00%
0 / 1
2
0.00% covered (danger)
0.00%
0 / 2
 doJob
0.00% covered (danger)
0.00%
0 / 1
30
0.00% covered (danger)
0.00%
0 / 42
 requeueError
0.00% covered (danger)
0.00%
0 / 1
6
0.00% covered (danger)
0.00%
0 / 27
<?php
namespace CirrusSearch\Job;
use CirrusSearch\Connection;
use CirrusSearch\DataSender;
use MediaWiki\Logger\LoggerFactory;
use MediaWiki\MediaWikiServices;
use Status;
use Wikimedia\Assert\Assert;
/**
 * Performs writes to elasticsearch indexes with requeuing and an
 * exponential backoff (if supported by jobqueue) when the index
 * writes fail.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 * http://www.gnu.org/copyleft/gpl.html
 */
class ElasticaWrite extends CirrusGenericJob {
    private const MAX_ERROR_RETRY = 4;
    /**
     * @var array Map from method name to list of classes to
     *  handle serialization for each argument.
     */
    private static $SERDE = [
        'sendData' => [ null, ElasticaDocumentsJsonSerde::class ],
    ];
    /**
     * @param string $cluster
     * @param string $method
     * @param array $arguments
     * @param array $params
     * @return ElasticaWrite
     */
    public static function build( string $cluster, string $method, array $arguments, array $params = [] ) {
        return new self( [
            'method' => $method,
            'arguments' => self::serde( $method, $arguments ),
            'cluster' => $cluster,
        ] + $params );
    }
    private static function serde( $method, array $arguments, $serialize = true ) {
        if ( isset( self::$SERDE[$method] ) ) {
            foreach ( self::$SERDE[$method] as $i => $serde ) {
                if ( $serde !== null && array_key_exists( $i, $arguments ) ) {
                    $impl = new $serde();
                    if ( $serialize ) {
                        $arguments[$i] = $impl->serialize( $arguments[$i] );
                    } else {
                        $arguments[$i] = $impl->deserialize( $arguments[$i] );
                    }
                }
            }
        }
        return $arguments;
    }
    /**
     * Entry point for jobs received from the job queue. Creating new
     * jobs should be done via self::build.
     *
     * @param array $params
     */
    public function __construct( array $params ) {
        parent::__construct( $params + [
            'createdAt' => time(),
            'errorCount' => 0,
            'retryCount' => 0,
            'cluster' => null,
        ] );
    }
    /**
     * This job handles all its own retries internally. These jobs are so
     * numerous that if they were to start failing they would possibly
     * overflow the job queue and bring down redis in production.
     *
     * Basically we just can't let these jobs hang out in the abandoned
     * queue for a week like retries typically do. If these jobs get
     * failed they will log to CirrusSearchChangeFailed which is a signal
     * that some point in time around the failure needs to be reindexed
     * manually. See https://wikitech.wikimedia.org/wiki/Search for more
     * details.
     * @return bool
     */
    public function allowRetries() {
        return false;
    }
    /**
     * @return bool
     */
    protected function doJob() {
        // While we can only have a single connection per job, we still
        // use decideClusters() which includes a variety of safeguards.
        $connections = $this->decideClusters();
        if ( empty( $connections ) ) {
            // Chosen cluster no longer exists in configuration.
            return true;
        }
        Assert::precondition( count( $connections ) == 1,
            'per self::build() we must have a single connection' );
        $conn = reset( $connections );
        $arguments = self::serde( $this->params['method'], $this->params['arguments'], false );
        LoggerFactory::getInstance( 'CirrusSearch' )->debug(
            "Running {method} on cluster {cluster} {diff}s after insertion",
            [
                'method' => $this->params['method'],
                'arguments' => $arguments,
                'diff' => time() - $this->params['createdAt'],
                'cluster' => $conn->getClusterName(),
            ]
        );
        $retry = [];
        $error = [];
        $sender = new DataSender( $conn, $this->searchConfig );
        try {
            $status = $sender->{$this->params['method']}( ...$arguments );
        } catch ( \Exception $e ) {
            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
                "Exception thrown while running DataSender::{method} in cluster {cluster}: {errorMessage}",
                [
                    'method' => $this->params['method'],
                    'cluster' => $conn->getClusterName(),
                    'errorMessage' => $e->getMessage(),
                    'exception' => $e,
                ]
            );
            $status = Status::newFatal( 'cirrussearch-send-failure' );
        }
        $ok = true;
        if ( !$status->isOK() ) {
            $action = $this->requeueError( $conn ) ? "Requeued" : "Dropped";
            $this->setLastError( "ElasticaWrite job failed: ${action}" );
            $ok = false;
        }
        return $ok;
    }
    /**
     * Re-queue job that failed, or drop the job if it has failed
     * too many times
     *
     * @param Connection $conn
     * @return bool True when the job has been queued
     */
    private function requeueError( Connection $conn ) {
        if ( $this->params['errorCount'] >= self::MAX_ERROR_RETRY ) {
            LoggerFactory::getInstance( 'CirrusSearchChangeFailed' )->warning(
                "Dropping failing ElasticaWrite job for DataSender::{method} in cluster {cluster} after repeated failure",
                [
                    'method' => $this->params['method'],
                    'cluster' => $conn->getClusterName(),
                ]
            );
            return false;
        } else {
            $delay = $this->backoffDelay( $this->params['retryCount'] );
            $params = $this->params;
            $params['errorCount']++;
            unset( $params['jobReleaseTimestamp'] );
            $params += self::buildJobDelayOptions( self::class, $delay );
            $job = new self( $params );
            // Individual failures should have already logged specific errors,
            LoggerFactory::getInstance( 'CirrusSearch' )->info(
                "ElasticaWrite job reported failure on cluster {cluster}. Requeueing job with delay of {delay}.",
                [
                    'cluster' => $conn->getClusterName(),
                    'delay' => $delay
                ]
            );
            MediaWikiServices::getInstance()->getJobQueueGroup()->push( $job );
            return true;
        }
    }
}