23use AvroIOBinaryEncoder;
25use AvroIOTypeException;
29use Monolog\Formatter\FormatterInterface;
70 $this->io =
new AvroStringIO(
'' );
71 $this->encoder =
new AvroIOBinaryEncoder( $this->io );
72 $this->writer =
new AvroIODatumWriter();
83 public function format( array $record ): string {
84 $this->io->truncate();
85 $schema = $this->
getSchema( $record[
'channel'] );
87 if ( $schema ===
null || $revId ===
null ) {
88 trigger_error(
"The schema for channel '{$record['channel']}' is not available" );
92 $this->writer->write_data( $schema, $record[
'context'], $this->encoder );
93 }
catch ( AvroIOTypeException $e ) {
95 $json = json_encode( $errors );
96 trigger_error(
"Avro failed to serialize record for {$record['channel']} : {$json}" );
99 return chr( self::MAGIC ) . $this->
encodeLong( $revId ) . $this->io->string();
111 foreach ( $records as $record ) {
112 $message = $this->format( $record );
113 if ( $message !==
null ) {
114 $result[] = $message;
127 if ( !isset( $this->schemas[$channel] ) ) {
131 !isset( $this->schemas[$channel][
'revision'] )
132 && !isset( $this->schemas[$channel][
'schema'] )
137 if ( !$this->schemas[$channel][
'schema'] instanceof AvroSchema ) {
138 $schema = $this->schemas[$channel][
'schema'];
139 if ( is_string( $schema ) ) {
140 $this->schemas[$channel][
'schema'] = AvroSchema::parse( $schema );
142 $this->schemas[$channel][
'schema'] = AvroSchema::real_parse(
147 return $this->schemas[$channel][
'schema'];
157 if ( isset( $this->schemas[$channel][
'revision'] ) ) {
158 return (
int)$this->schemas[$channel][
'revision'];
170 $high = ( $id & 0xffffffff00000000 ) >> 32;
171 $low = $id & 0x00000000ffffffff;
172 return pack(
'NN', $high, $low );
Generate error strings for data that doesn't match the specified Avro schema.
static getErrors(AvroSchema $schema, $datum)