Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 44 |
|
0.00% |
0 / 3 |
CRAP | |
0.00% |
0 / 1 |
Map | |
0.00% |
0 / 44 |
|
0.00% |
0 / 3 |
132 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
pushJob | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
getJobs | |
0.00% |
0 / 39 |
|
0.00% |
0 / 1 |
90 |
1 | <?php |
2 | |
3 | namespace MediaWiki\Extension\MathSearch\Graph; |
4 | |
5 | use JobQueueGroup; |
6 | use MediaWiki\Extension\MathSearch\Graph\Job\FetchIdsFromWd; |
7 | use MediaWiki\Extension\MathSearch\Graph\Job\NormalizeDoi; |
8 | use MediaWiki\Extension\MathSearch\Graph\Job\SetProfileType; |
9 | use MediaWiki\MediaWikiServices; |
10 | |
11 | class Map { |
12 | private int $batch_size; |
13 | private const PAGES_PER_JOB = 100; |
14 | private JobQueueGroup $jobQueueGroup; |
15 | |
16 | public function __construct( ?JobQueueGroup $jobQueueGroup = null, $batch_size = 100000 ) { |
17 | $this->jobQueueGroup = $jobQueueGroup ?? MediaWikiServices::getInstance()->getJobQueueGroup(); |
18 | $this->batch_size = $batch_size; |
19 | } |
20 | |
21 | public function pushJob( |
22 | array $table, int $segment, string $jobType, array $options |
23 | ): void { |
24 | $options[ 'rows' ] = $table; |
25 | $options[ 'segment' ] = $segment; |
26 | $this->jobQueueGroup->lazyPush( new $jobType( $options ) ); |
27 | } |
28 | |
29 | public function getJobs( |
30 | callable $output, int $batch_size, string $type, string $jobType, array $jobOptions = [] |
31 | ): void { |
32 | $jobOptions[ 'jobname' ] = 'import' . date( 'ymdhms' ); |
33 | $jobOptions[ 'prefix' ] = $type; |
34 | |
35 | $sp = Query::getQueryEndpoint(); |
36 | $offset = 0; |
37 | $table = []; |
38 | $segment = 0; |
39 | do { |
40 | $output( 'Read from offset ' . $offset . ".\n" ); |
41 | switch ( $jobType ) { |
42 | case SetProfileType::class: |
43 | $query = Query::getQueryFromConfig( $type, $offset, $batch_size ); |
44 | break; |
45 | case NormalizeDoi::class: |
46 | $query = Query::getQueryForDoi( $offset, $batch_size ); |
47 | break; |
48 | case FetchIdsFromWd::class: |
49 | $jobOptions[ 'batch_size' ] = $batch_size; |
50 | $this->pushJob( $table, $segment, $jobType, $jobOptions ); |
51 | $output( "Pushed job.\n" ); |
52 | return; |
53 | default: |
54 | $query = Query::getQueryFromProfileType( $type, $offset, $batch_size ); |
55 | } |
56 | $rs = $sp->query( $query ); |
57 | if ( !$rs ) { |
58 | $output( "No results retrieved!\n" ); |
59 | break; |
60 | } else { |
61 | $output( "Retrieved " . count( $rs['result']['rows'] ) . " results.\n" ); |
62 | } |
63 | foreach ( $rs['result']['rows'] as $row ) { |
64 | $qID = $row['qid']; |
65 | if ( $jobType === NormalizeDoi::class ) { |
66 | $table[$qID] = $row['doi']; |
67 | } else { |
68 | $table[] = $qID; |
69 | } |
70 | if ( count( $table ) > self::PAGES_PER_JOB ) { |
71 | $this->pushJob( $table, $segment, $jobType, $jobOptions ); |
72 | $output( "Pushed jobs to segment $segment.\n" ); |
73 | $segment++; |
74 | $table = []; |
75 | } |
76 | } |
77 | $offset += $this->batch_size; |
78 | } while ( count( $rs['result']['rows'] ) === $this->batch_size ); |
79 | $this->pushJob( $table, $segment, $jobType, $jobOptions ); |
80 | $output( "Pushed jobs to last segment $segment.\n" ); |
81 | } |
82 | |
83 | } |