Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
| Total | |
0.00% |
0 / 48 |
|
0.00% |
0 / 5 |
CRAP | |
0.00% |
0 / 1 |
| JobTraits | |
0.00% |
0 / 48 |
|
0.00% |
0 / 5 |
272 | |
0.00% |
0 / 1 |
| getParams | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
| getSearchConfig | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
| getType | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
| doJob | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
| backoffDelay | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
| decideClusters | |
0.00% |
0 / 33 |
|
0.00% |
0 / 1 |
56 | |||
| run | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
12 | |||
| buildJobDelayOptions | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
| buildJobName | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| 1 | <?php |
| 2 | |
| 3 | namespace CirrusSearch\Job; |
| 4 | |
| 5 | use CirrusSearch\ClusterSettings; |
| 6 | use CirrusSearch\Connection; |
| 7 | use CirrusSearch\ExternalIndex; |
| 8 | use CirrusSearch\HashSearchConfig; |
| 9 | use CirrusSearch\SearchConfig; |
| 10 | use MediaWiki\JobQueue\JobQueueGroup; |
| 11 | use MediaWiki\Logger\LoggerFactory; |
| 12 | use MediaWiki\MainConfigNames; |
| 13 | |
| 14 | /** |
| 15 | * Traits for CirrusSearch Jobs. |
| 16 | */ |
| 17 | trait JobTraits { |
| 18 | /** |
| 19 | * @return array |
| 20 | */ |
| 21 | abstract public function getParams(); |
| 22 | |
| 23 | /** |
| 24 | * @return SearchConfig |
| 25 | */ |
| 26 | abstract public function getSearchConfig(): SearchConfig; |
| 27 | |
| 28 | /** |
| 29 | * @return string |
| 30 | */ |
| 31 | abstract public function getType(); |
| 32 | |
| 33 | /** |
| 34 | * Actually perform the labor of the job. |
| 35 | * The Job will be retried if true is returned from allowRetries() when |
| 36 | * this method fails (thrown exception or returning false from this |
| 37 | * method). |
| 38 | * @return bool true for success, false for failures |
| 39 | */ |
| 40 | abstract protected function doJob(); |
| 41 | |
| 42 | /** |
| 43 | * @param int $retryCount The number of times the job has errored out. |
| 44 | * @return int Number of seconds to delay. With the default minimum exponent |
| 45 | * of 6 the possible return values are 64, 128, 256, 512 and 1024 giving a |
| 46 | * maximum delay of 17 minutes. |
| 47 | */ |
| 48 | public function backoffDelay( $retryCount ) { |
| 49 | $exponent = $this->getSearchConfig()->get( 'CirrusSearchWriteBackoffExponent' ); |
| 50 | $minIncrease = 0; |
| 51 | if ( $retryCount > 1 ) { |
| 52 | // Delay at least 2 minutes for everything that fails more than once |
| 53 | $minIncrease = 1; |
| 54 | } |
| 55 | return ceil( pow( 2, $exponent + rand( $minIncrease, min( $retryCount, 4 ) ) ) ); |
| 56 | } |
| 57 | |
| 58 | /** |
| 59 | * Construct the list of connections suited for this job. |
| 60 | * NOTE: only suited for jobs that work on multiple clusters by |
| 61 | * inspecting the 'cluster' job param |
| 62 | * |
| 63 | * @param string $updateGroup UpdateGroup::* constant |
| 64 | * @return Connection[] indexed by cluster name |
| 65 | */ |
| 66 | protected function decideClusters( string $updateGroup ) { |
| 67 | $params = $this->getParams(); |
| 68 | $searchConfig = $this->getSearchConfig(); |
| 69 | $jobType = $this->getType(); |
| 70 | |
| 71 | $cluster = $params['cluster'] ?? null; |
| 72 | $assignment = $searchConfig->getClusterAssignment(); |
| 73 | if ( $cluster === null ) { |
| 74 | $clusterNames = $assignment->getWritableClusters( $updateGroup ); |
| 75 | } elseif ( $assignment->canWriteToCluster( $cluster, $updateGroup ) ) { |
| 76 | $clusterNames = [ $cluster ]; |
| 77 | } else { |
| 78 | // Just in case a job is present in the queue but its cluster |
| 79 | // has been removed from the config file. |
| 80 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
| 81 | "Received {command} job for unwritable cluster {cluster}", |
| 82 | [ |
| 83 | 'command' => $jobType, |
| 84 | 'cluster' => $cluster |
| 85 | ] |
| 86 | ); |
| 87 | // this job does not allow retries so we just need to throw an exception |
| 88 | throw new \RuntimeException( "Received {$jobType} job with {$updateGroup} updates for an unwritable cluster $cluster." ); |
| 89 | } |
| 90 | |
| 91 | $config = $searchConfig; |
| 92 | if ( isset( $params['external-index'] ) ) { |
| 93 | $otherIndex = new ExternalIndex( $searchConfig, $params['external-index'] ); |
| 94 | if ( $otherIndex->getCrossClusterName() !== null ) { |
| 95 | // We assume that the cluster configs is mostly shared across cluster groups |
| 96 | // e.g. this group config is available in CirrusSearchClusters |
| 97 | // So that changing the CirrusSearchReplicaGroup to the CrossClusterName of the external |
| 98 | // index we build the correct config to write to desired replica group. |
| 99 | $config = new HashSearchConfig( [ 'CirrusSearchReplicaGroup' => $otherIndex->getCrossClusterName() ], |
| 100 | [ HashSearchConfig::FLAG_INHERIT ], $config ); |
| 101 | } |
| 102 | } |
| 103 | |
| 104 | // Limit private data writes, such as archive index, to appropriately |
| 105 | // flagged clusters |
| 106 | if ( $params['private_data'] ?? false ) { |
| 107 | // $clusterNames could be empty after this filter. All consumers |
| 108 | // must work appropriately with no connections returned, typically |
| 109 | // by looping over the connections and doing nothing when no |
| 110 | // connections are provided. |
| 111 | $clusterNames = array_filter( $clusterNames, static function ( $name ) use ( $config ) { |
| 112 | $settings = new ClusterSettings( $config, $name ); |
| 113 | return $settings->isPrivateCluster(); |
| 114 | } ); |
| 115 | } |
| 116 | |
| 117 | $conns = Connection::getClusterConnections( $clusterNames, $config ); |
| 118 | |
| 119 | $timeout = $config->get( 'CirrusSearchClientSideUpdateTimeout' ); |
| 120 | foreach ( $conns as $connection ) { |
| 121 | $connection->setTimeout( $timeout ); |
| 122 | } |
| 123 | |
| 124 | return $conns; |
| 125 | } |
| 126 | |
| 127 | /** |
| 128 | * Some boilerplate stuff for all jobs goes here |
| 129 | * |
| 130 | * @return bool |
| 131 | */ |
| 132 | public function run() { |
| 133 | if ( $this->getSearchConfig()->get( MainConfigNames::DisableSearchUpdate ) || |
| 134 | $this->getSearchConfig()->get( 'CirrusSearchDisableUpdate' ) |
| 135 | ) { |
| 136 | LoggerFactory::getInstance( 'CirrusSearch' )->debug( "Skipping job: search updates disabled by config" ); |
| 137 | return true; |
| 138 | } |
| 139 | |
| 140 | return $this->doJob(); |
| 141 | } |
| 142 | |
| 143 | /** |
| 144 | * Get options to enable delayed jobs. Note that this might not be possible the JobQueue |
| 145 | * implementation handling this job doesn't support it (JobQueueDB) but is possible |
| 146 | * for the high performance JobQueueRedis. Note also that delays are minimums - |
| 147 | * at least JobQueueRedis makes no effort to remove the delay as soon as possible |
| 148 | * after it has expired. By default it only checks every five minutes or so. |
| 149 | * Note yet again that if another delay has been set that is longer then this one |
| 150 | * then the _longer_ delay stays. |
| 151 | * |
| 152 | * @param string $jobClass name of the job class |
| 153 | * @param int $delay seconds to delay this job if possible |
| 154 | * @param JobQueueGroup $jobQueueGroup |
| 155 | * @return array options to set to add to the job param |
| 156 | */ |
| 157 | public static function buildJobDelayOptions( $jobClass, $delay, JobQueueGroup $jobQueueGroup ): array { |
| 158 | $jobQueue = $jobQueueGroup->get( $jobClass ); |
| 159 | if ( !$delay || !$jobQueue->delayedJobsEnabled() ) { |
| 160 | return []; |
| 161 | } |
| 162 | return [ 'jobReleaseTimestamp' => time() + $delay ]; |
| 163 | } |
| 164 | |
| 165 | /** |
| 166 | * @param string $clazz |
| 167 | * @return string |
| 168 | */ |
| 169 | public static function buildJobName( $clazz ) { |
| 170 | return 'cirrusSearch' . str_replace( 'CirrusSearch\\Job\\', '', $clazz ); |
| 171 | } |
| 172 | } |