All files Evaluator.js

89.66% Statements 26/29
50% Branches 7/14
100% Functions 7/7
89.66% Lines 26/29

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199    1x 1x 1x   1x                         36x                                 259x 259x 259x 259x                 259x 259x 259x 259x 259x   259x                         36x     36x                     36x             36x       36x               32x       32x                                                                                                                               259x                     36x                     36x       1x        
'use strict';
 
const fetch = require( '../lib/fetch.js' );
const { convertWrappedZObjectToVersionedBinary } = require( '../function-schemata/javascript/src/binaryFormatter.js' );
const WebSocket = require( 'ws' );
 
const AVRO_SCHEMA_VERSION_ = '0.1.1';
 
/**
 * Gets the ZObjects of a list of LID.
 *
 * @param {Object} functionCall zobject
 * @param {boolean} useReentrance Boolean
 * @param {number} remainingTime Number
 * @param {string} requestId string
 * @return {Object} zobject ZObject retrieved from current binary version
 */
function convertWrappedZObjectToCurrentVersionedBinary(
	functionCall, useReentrance, remainingTime, requestId ) {
	return convertWrappedZObjectToVersionedBinary(
		{
			zobject: functionCall,
			reentrant: useReentrance,
			remainingTime: remainingTime,
			requestId: requestId
		},
		/* version= */ AVRO_SCHEMA_VERSION_,
		/* enableTypeConverters= */ true );
}
 
/**
 * Function evaluator. Wraps API calls to the function-evaluator service, which
 * runs native code implementations.
 */
class Evaluator {
	constructor( evaluatorConfig, logger ) {
		this.logger_ = logger;
		this.useReentrance_ = evaluatorConfig.useReentrance;
		this.evaluatorWs_ = evaluatorConfig.evaluatorWs;
		Iif ( this.evaluatorWs_ === null && this.useReentrance_ ) {
			logger.log( 'warn',
				{
					message: 'useReentrance was specified but no websocket location was supplied; setting useReentrance to false',
					info: evaluatorConfig
				}
			);
			this.useReentrance_ = false;
		}
		this.evaluatorUri_ = evaluatorConfig.evaluatorUri;
		this.invariants_ = null;
		this.timeout_ = 10000; // wait 10 seconds
		this.programmingLanguages_ = Object.freeze( evaluatorConfig.programmingLanguages || [] );
		Object.defineProperty( this, 'programmingLanguages', {
			get: function () {
				return this.programmingLanguages_;
			}
		} );
	}
 
	/**
	 * Evaluates a Z7 object or invokes a call back to the Orchestrator
	 * to run another function concurrently (if useReentrance)
	 *
	 * @param {Object} functionCall ZObject Z7/function call
	 * @return {Object} fetched evaluated zobject from the Evaluator
	 */
	async evaluate( functionCall ) {
		Iif ( this.useReentrance_ ) {
			return await this.evaluateReentrant_( functionCall );
		} else {
			return await this.evaluate_( functionCall );
		}
	}
 
	/**
	 * Primary evaluator function for a Z7 object
	 *
	 * @param {Object} functionCall ZObject, i.e. Z7/function call
	 * @return {Object} fetched evaluated zobject from the Evaluator
	 */
	async evaluate_( functionCall ) {
		const binaryForm = convertWrappedZObjectToCurrentVersionedBinary(
			functionCall,
			this.useReentrance_,
			this.invariants_.getRemainingTime(),
			this.invariants_.requestId,
			true );
 
		this.invariants_.logger.log( 'debug',
			{ message: 'calling Evaluator in orchestrator...', requestId: this.invariants_.requestId }
		);
 
		const response = await fetch(
			this.evaluatorUri_, {
				method: 'POST',
				body: binaryForm,
				headers: this.determineHeaders()
			}
		);
 
		this.invariants_.logger.log( 'debug',
			{ message: '...finished calling Evaluator in orchestrator', requestId: this.invariants_.requestId }
		);
 
		return response;
	}
 
	/**
	 * Evaluator function for a 're-entrant' calls, meaning we need to call the Orchestrator
	 * for a Wikifunctions function call while evaluating.
	 *
	 * Our security model prohibits outgoing network requests from the Evaluator,
	 * so we use a websocket: Evaluator -> Orchestrator -> Evaluator.
	 *
	 * @param {Object} zobject ZObject function call
	 * @return {Object} fetched evaluated zobject
	 * (fetched as result of communication protocol and finally from Evaluator)
	 */
	/*
	 * Ignore the subsequent function for coverage purposes; we can't feasibly
	 * unit-test this functionality.
	 *
	 * TODO (T322056): Make EvaluatorStub able to recognize and handle multiple
	 * reentrant websocket-based function calls.
	 */
	// istanbul ignore next
	async evaluateReentrant_( functionCall ) {
		const evaluatePromise = this.evaluate_( functionCall );
		const client = new WebSocket( this.evaluatorWs_ );
		client.on( 'open', () => {
			this.logger_.log( 'info', { message: 'WS connection opened' } );
		} );
		client.on( 'message', async ( theMessage ) => {
			theMessage = theMessage.toString();
			this.logger_.log( 'info', { message: 'WS message received', info: theMessage } );
			if ( theMessage.startsWith( 'call' ) ) {
				const { orchestrate } = require( './orchestrate.js' );
				theMessage = theMessage.replace( /^call\s*/, '' );
				const Z7 = JSON.parse( theMessage );
				if ( this.invariants_.getRemainingTime() > 0 ) {
					this.invariants_.logger.log( 'debug',
						{ message: 'calling orchestrator...', requestId: this.invariants_.requestId }
					);
 
					const normalResult = ( await orchestrate(
						Z7, this.invariants_, /* implementationSelector= */ null,
						/* returnNormal= */ true ) ).Z22K1;
 
					this.invariants_.logger.log( 'debug',
						{ message: '...finished calling orchestrator', requestId: this.invariants_.requestId }
					);
 
					client.send( JSON.stringify( normalResult ) );
				}
				// TODO (T334485): Send a death threat to the evaluator if getRemainingTime
				// <= 0.
			}
		} );
		client.on( 'close', () => {
			this.logger_.log( 'info', { message: 'WS connection closed' } );
		} );
		const result = await evaluatePromise;
		// TODO: How to wait until connection opens?
		client.close();
		return result;
	}
 
	setInvariants( invariants ) {
		this.invariants_ = invariants;
	}
 
	/**
	 * In Prod, generateFunctionMetrics is set to 'true' so trace headers is propogated.
	 * In other envs like local/test, trace headers will not be included as OTel is not
	 * available. Headers in both cases does include 'Content-type'.
	 *
	 * @return {Object} headers object
	 */
	determineHeaders() {
		const headers = {
			'Content-type': 'application/octet-stream',
			...( this.invariants_.orchestratorConfig.generateFunctionsMetrics && {
				'x-request-id': this.invariants_.requestId,
				...( this.invariants_.req.headers.traceparent &&
					{ traceparent: this.invariants_.req.headers.traceparent } ),
				...( this.invariants_.req.headers.tracestate &&
					{ tracestate: this.invariants_.req.headers.tracestate } )
			} )
		};
 
		return headers;
	}
}
 
module.exports = {
	convertWrappedZObjectToCurrentVersionedBinary,
	Evaluator
};