MediaWiki  master
AvroFormatter.php
Go to the documentation of this file.
1 <?php
22 
26 use AvroSchema;
27 use AvroStringIO;
28 use AvroValidator;
30 
38 class AvroFormatter implements FormatterInterface {
42  const MAGIC = 0x0;
46  protected $schemas;
47 
51  protected $io;
52 
56  protected $encoder;
57 
61  protected $writer;
62 
68  public function __construct( array $schemas ) {
69  $this->schemas = $schemas;
70  $this->io = new AvroStringIO( '' );
71  $this->encoder = new AvroIOBinaryEncoder( $this->io );
72  $this->writer = new AvroIODatumWriter();
73  }
74 
83  public function format( array $record ) {
84  $this->io->truncate();
85  $schema = $this->getSchema( $record['channel'] );
86  $revId = $this->getSchemaRevisionId( $record['channel'] );
87  if ( $schema === null || $revId === null ) {
88  trigger_error( "The schema for channel '{$record['channel']}' is not available" );
89  return null;
90  }
91  try {
92  $this->writer->write_data( $schema, $record['context'], $this->encoder );
93  } catch ( AvroIOTypeException $e ) {
94  $errors = AvroValidator::getErrors( $schema, $record['context'] );
95  $json = json_encode( $errors );
96  trigger_error( "Avro failed to serialize record for {$record['channel']} : {$json}" );
97  return null;
98  }
99  return chr( self::MAGIC ) . $this->encodeLong( $revId ) . $this->io->string();
100  }
101 
109  public function formatBatch( array $records ) {
110  $result = [];
111  foreach ( $records as $record ) {
112  $message = $this->format( $record );
113  if ( $message !== null ) {
114  $result[] = $message;
115  }
116  }
117  return $result;
118  }
119 
126  protected function getSchema( $channel ) {
127  if ( !isset( $this->schemas[$channel] ) ) {
128  return null;
129  }
130  if ( !isset( $this->schemas[$channel]['revision'], $this->schemas[$channel]['schema'] ) ) {
131  return null;
132  }
133 
134  if ( !$this->schemas[$channel]['schema'] instanceof AvroSchema ) {
135  $schema = $this->schemas[$channel]['schema'];
136  if ( is_string( $schema ) ) {
137  $this->schemas[$channel]['schema'] = AvroSchema::parse( $schema );
138  } else {
139  $this->schemas[$channel]['schema'] = AvroSchema::real_parse(
140  $schema
141  );
142  }
143  }
144  return $this->schemas[$channel]['schema'];
145  }
146 
153  public function getSchemaRevisionId( $channel ) {
154  if ( isset( $this->schemas[$channel]['revision'] ) ) {
155  return (int)$this->schemas[$channel]['revision'];
156  }
157  return null;
158  }
159 
166  private function encodeLong( $id ) {
167  $high = ( $id & 0xffffffff00000000 ) >> 32;
168  $low = $id & 0x00000000ffffffff;
169  return pack( 'NN', $high, $low );
170  }
171 }
array $schemas
Map from schema name to schema definition.
getSchemaRevisionId( $channel)
Get the writer for the named channel.
Log message formatter that uses the apache Avro format.
static getErrors(AvroSchema $schema, $datum)
encodeLong( $id)
convert an integer to a 64bits big endian long (Java compatible) NOTE: certainly only compatible with...
formatBatch(array $records)
Format a set of records into a list of binary strings conforming to the configured schema...
format(array $record)
Formats the record context into a binary string per the schema configured for the records channel...
getSchema( $channel)
Get the writer for the named channel.