MediaWiki REL1_35
AvroFormatter.php
Go to the documentation of this file.
1<?php
22
23use AvroIOBinaryEncoder;
24use AvroIODatumWriter;
25use AvroIOTypeException;
26use AvroSchema;
27use AvroStringIO;
29use Monolog\Formatter\FormatterInterface;
30
38class AvroFormatter implements FormatterInterface {
42 private const MAGIC = 0x0;
46 protected $schemas;
47
51 protected $io;
52
56 protected $encoder;
57
61 protected $writer;
62
68 public function __construct( array $schemas ) {
69 $this->schemas = $schemas;
70 $this->io = new AvroStringIO( '' );
71 $this->encoder = new AvroIOBinaryEncoder( $this->io );
72 $this->writer = new AvroIODatumWriter();
73 }
74
83 public function format( array $record ): string {
84 $this->io->truncate();
85 $schema = $this->getSchema( $record['channel'] );
86 $revId = $this->getSchemaRevisionId( $record['channel'] );
87 if ( $schema === null || $revId === null ) {
88 trigger_error( "The schema for channel '{$record['channel']}' is not available" );
89 return '';
90 }
91 try {
92 $this->writer->write_data( $schema, $record['context'], $this->encoder );
93 } catch ( AvroIOTypeException $e ) {
94 $errors = AvroValidator::getErrors( $schema, $record['context'] );
95 $json = json_encode( $errors );
96 trigger_error( "Avro failed to serialize record for {$record['channel']} : {$json}" );
97 return '';
98 }
99 return chr( self::MAGIC ) . $this->encodeLong( $revId ) . $this->io->string();
100 }
101
109 public function formatBatch( array $records ) {
110 $result = [];
111 foreach ( $records as $record ) {
112 $message = $this->format( $record );
113 if ( $message !== null ) {
114 $result[] = $message;
115 }
116 }
117 return $result;
118 }
119
126 protected function getSchema( $channel ) {
127 if ( !isset( $this->schemas[$channel] ) ) {
128 return null;
129 }
130 if (
131 !isset( $this->schemas[$channel]['revision'] )
132 && !isset( $this->schemas[$channel]['schema'] )
133 ) {
134 return null;
135 }
136
137 if ( !$this->schemas[$channel]['schema'] instanceof AvroSchema ) {
138 $schema = $this->schemas[$channel]['schema'];
139 if ( is_string( $schema ) ) {
140 $this->schemas[$channel]['schema'] = AvroSchema::parse( $schema );
141 } else {
142 $this->schemas[$channel]['schema'] = AvroSchema::real_parse(
143 $schema
144 );
145 }
146 }
147 return $this->schemas[$channel]['schema'];
148 }
149
156 public function getSchemaRevisionId( $channel ) {
157 if ( isset( $this->schemas[$channel]['revision'] ) ) {
158 return (int)$this->schemas[$channel]['revision'];
159 }
160 return null;
161 }
162
169 private function encodeLong( $id ) {
170 $high = ( $id & 0xffffffff00000000 ) >> 32;
171 $low = $id & 0x00000000ffffffff;
172 return pack( 'NN', $high, $low );
173 }
174}
Generate error strings for data that doesn't match the specified Avro schema.
static getErrors(AvroSchema $schema, $datum)
Log message formatter that uses the apache Avro format.
formatBatch(array $records)
Format a set of records into a list of binary strings conforming to the configured schema.
encodeLong( $id)
convert an integer to a 64bits big endian long (Java compatible) NOTE: certainly only compatible with...
const MAGIC
Magic byte to encode schema revision id.
getSchemaRevisionId( $channel)
Get the writer for the named channel.
format(array $record)
Formats the record context into a binary string per the schema configured for the records channel.
getSchema( $channel)
Get the writer for the named channel.
array $schemas
Map from schema name to schema definition.