MediaWiki REL1_34
AvroFormatter.php
Go to the documentation of this file.
1<?php
22
23use AvroIODatumWriter;
24use AvroIOBinaryEncoder;
25use AvroIOTypeException;
26use AvroSchema;
27use AvroStringIO;
29use Monolog\Formatter\FormatterInterface;
30
38class AvroFormatter implements FormatterInterface {
42 const MAGIC = 0x0;
46 protected $schemas;
47
51 protected $io;
52
56 protected $encoder;
57
61 protected $writer;
62
68 public function __construct( array $schemas ) {
69 $this->schemas = $schemas;
70 $this->io = new AvroStringIO( '' );
71 $this->encoder = new AvroIOBinaryEncoder( $this->io );
72 $this->writer = new AvroIODatumWriter();
73 }
74
83 public function format( array $record ) {
84 $this->io->truncate();
85 $schema = $this->getSchema( $record['channel'] );
86 $revId = $this->getSchemaRevisionId( $record['channel'] );
87 if ( $schema === null || $revId === null ) {
88 trigger_error( "The schema for channel '{$record['channel']}' is not available" );
89 return null;
90 }
91 try {
92 $this->writer->write_data( $schema, $record['context'], $this->encoder );
93 } catch ( AvroIOTypeException $e ) {
94 $errors = AvroValidator::getErrors( $schema, $record['context'] );
95 $json = json_encode( $errors );
96 trigger_error( "Avro failed to serialize record for {$record['channel']} : {$json}" );
97 return null;
98 }
99 return chr( self::MAGIC ) . $this->encodeLong( $revId ) . $this->io->string();
100 }
101
109 public function formatBatch( array $records ) {
110 $result = [];
111 foreach ( $records as $record ) {
112 $message = $this->format( $record );
113 if ( $message !== null ) {
114 $result[] = $message;
115 }
116 }
117 return $result;
118 }
119
126 protected function getSchema( $channel ) {
127 if ( !isset( $this->schemas[$channel] ) ) {
128 return null;
129 }
130 if ( !isset( $this->schemas[$channel]['revision'], $this->schemas[$channel]['schema'] ) ) {
131 return null;
132 }
133
134 if ( !$this->schemas[$channel]['schema'] instanceof AvroSchema ) {
135 $schema = $this->schemas[$channel]['schema'];
136 if ( is_string( $schema ) ) {
137 $this->schemas[$channel]['schema'] = AvroSchema::parse( $schema );
138 } else {
139 $this->schemas[$channel]['schema'] = AvroSchema::real_parse(
140 $schema
141 );
142 }
143 }
144 return $this->schemas[$channel]['schema'];
145 }
146
153 public function getSchemaRevisionId( $channel ) {
154 if ( isset( $this->schemas[$channel]['revision'] ) ) {
155 return (int)$this->schemas[$channel]['revision'];
156 }
157 return null;
158 }
159
166 private function encodeLong( $id ) {
167 $high = ( $id & 0xffffffff00000000 ) >> 32;
168 $low = $id & 0x00000000ffffffff;
169 return pack( 'NN', $high, $low );
170 }
171}
Generate error strings for data that doesn't match the specified Avro schema.
static getErrors(AvroSchema $schema, $datum)
Log message formatter that uses the apache Avro format.
formatBatch(array $records)
Format a set of records into a list of binary strings conforming to the configured schema.
encodeLong( $id)
convert an integer to a 64bits big endian long (Java compatible) NOTE: certainly only compatible with...
getSchemaRevisionId( $channel)
Get the writer for the named channel.
format(array $record)
Formats the record context into a binary string per the schema configured for the records channel.
getSchema( $channel)
Get the writer for the named channel.
array $schemas
Map from schema name to schema definition.