23 use AvroIODatumWriter;
24 use AvroIOBinaryEncoder;
25 use AvroIOTypeException;
29 use Monolog\Formatter\FormatterInterface;
70 $this->io =
new AvroStringIO(
'' );
71 $this->encoder =
new AvroIOBinaryEncoder( $this->io );
72 $this->writer =
new AvroIODatumWriter();
83 public function format( array $record ) {
84 $this->io->truncate();
85 $schema = $this->
getSchema( $record[
'channel'] );
87 if ( $schema ===
null || $revId ===
null ) {
88 trigger_error(
"The schema for channel '{$record['channel']}' is not available" );
92 $this->writer->write_data( $schema, $record[
'context'], $this->encoder );
93 }
catch ( AvroIOTypeException $e ) {
95 $json = json_encode( $errors );
96 trigger_error(
"Avro failed to serialize record for {$record['channel']} : {$json}" );
99 return chr( self::MAGIC ) . $this->
encodeLong( $revId ) . $this->io->string();
111 foreach ( $records as $record ) {
112 $message = $this->
format( $record );
113 if ( $message !==
null ) {
114 $result[] = $message;
127 if ( !isset( $this->schemas[$channel] ) ) {
130 if ( !isset( $this->schemas[$channel][
'revision'], $this->schemas[$channel][
'schema'] ) ) {
134 if ( !$this->schemas[$channel][
'schema'] instanceof AvroSchema ) {
135 $schema = $this->schemas[$channel][
'schema'];
136 if ( is_string( $schema ) ) {
137 $this->schemas[$channel][
'schema'] = AvroSchema::parse( $schema );
139 $this->schemas[$channel][
'schema'] = AvroSchema::real_parse(
144 return $this->schemas[$channel][
'schema'];
154 if ( isset( $this->schemas[$channel][
'revision'] ) ) {
155 return (
int)$this->schemas[$channel][
'revision'];
167 $high = ( $id & 0xffffffff00000000 ) >> 32;
168 $low = $id & 0x00000000ffffffff;
169 return pack(
'NN', $high, $low );