Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 44
0.00% covered (danger)
0.00%
0 / 3
CRAP
0.00% covered (danger)
0.00%
0 / 1
Map
0.00% covered (danger)
0.00%
0 / 44
0.00% covered (danger)
0.00%
0 / 3
132
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 pushJob
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 getJobs
0.00% covered (danger)
0.00%
0 / 39
0.00% covered (danger)
0.00%
0 / 1
90
1<?php
2
3namespace MediaWiki\Extension\MathSearch\Graph;
4
5use JobQueueGroup;
6use MediaWiki\Extension\MathSearch\Graph\Job\FetchIdsFromWd;
7use MediaWiki\Extension\MathSearch\Graph\Job\NormalizeDoi;
8use MediaWiki\Extension\MathSearch\Graph\Job\SetProfileType;
9use MediaWiki\MediaWikiServices;
10
11class Map {
12    private int $batch_size;
13    private const PAGES_PER_JOB = 100;
14    private JobQueueGroup $jobQueueGroup;
15
16    public function __construct( ?JobQueueGroup $jobQueueGroup = null, $batch_size = 100000 ) {
17        $this->jobQueueGroup = $jobQueueGroup ?? MediaWikiServices::getInstance()->getJobQueueGroup();
18        $this->batch_size = $batch_size;
19    }
20
21    public function pushJob(
22        array $table, int $segment, string $jobType, array $options
23    ): void {
24        $options[ 'rows' ] = $table;
25        $options[ 'segment' ] = $segment;
26        $this->jobQueueGroup->lazyPush( new $jobType( $options ) );
27    }
28
29    public function getJobs(
30        callable $output, int $batch_size, string $type, string $jobType, array $jobOptions = []
31    ): void {
32        $jobOptions[ 'jobname' ] = 'import' . date( 'ymdhms' );
33        $jobOptions[ 'prefix' ] = $type;
34
35        $sp = Query::getQueryEndpoint();
36        $offset = 0;
37        $table = [];
38        $segment = 0;
39        do {
40            $output( 'Read from offset ' . $offset . ".\n" );
41            switch ( $jobType ) {
42                case SetProfileType::class:
43                    $query = Query::getQueryFromConfig( $type, $offset, $batch_size );
44                    break;
45                case NormalizeDoi::class:
46                    $query = Query::getQueryForDoi( $offset, $batch_size );
47                    break;
48                case FetchIdsFromWd::class:
49                    $jobOptions[ 'batch_size' ] = $batch_size;
50                    $this->pushJob( $table, $segment, $jobType, $jobOptions );
51                    $output( "Pushed job.\n" );
52                    return;
53                default:
54                    $query = Query::getQueryFromProfileType( $type, $offset, $batch_size );
55            }
56            $rs = $sp->query( $query );
57            if ( !$rs ) {
58                $output( "No results retrieved!\n" );
59                break;
60            } else {
61                $output( "Retrieved " . count( $rs['result']['rows'] ) . " results.\n" );
62            }
63            foreach ( $rs['result']['rows'] as $row ) {
64                $qID = $row['qid'];
65                if ( $jobType === NormalizeDoi::class ) {
66                    $table[$qID] = $row['doi'];
67                } else {
68                    $table[] = $qID;
69                }
70                if ( count( $table ) > self::PAGES_PER_JOB ) {
71                    $this->pushJob( $table, $segment, $jobType, $jobOptions );
72                    $output( "Pushed jobs to segment $segment.\n" );
73                    $segment++;
74                    $table = [];
75                }
76            }
77            $offset += $this->batch_size;
78        } while ( count( $rs['result']['rows'] ) === $this->batch_size );
79        $this->pushJob( $table, $segment, $jobType, $jobOptions );
80        $output( "Pushed jobs to last segment $segment.\n" );
81    }
82
83}