LineStreamFormat.java
package org.wikimedia.eventutilities.flink.stream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.core.fs.FSDataInputStream;
// NOTE: LineStreamFormat was removed from the main source during code review.
// This is used by fileStreamSourceBuilder method in TestEventDataStreamFactory.
// If we decide we want to promote fileStreamSourceBuilder back into the main EventDataStreamFactory
// in the future, we can move this class into main source then.
/**
* A reader format that reads text lines from a file, and uses the provided DeserializationSchema to deserialize them.
*
* <p>The reader uses Java's built-in {@link InputStreamReader} to decode the byte stream using
* StandardCharsets.UTF_8.
*
* NOTE: This class is based on Flink 1.15.0's TextLineInputFormat.
*
* <p>This format does not support optimized recovery from checkpoints. On recovery, it will re-read
* and discard the number of lines that were processed before the last checkpoint. That is due to
* the fact that the offsets of lines in the file cannot be tracked through the charset decoders
* with their internal buffering of stream input and charset decoder state.
*
*/
public class LineStreamFormat<T> extends SimpleStreamFormat<T> {
private static final long serialVersionUID = 1L;
private final DeserializationSchema<T> deserializationSchema;
public LineStreamFormat(DeserializationSchema<T> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
}
@Override
public Reader<T> createReader(Configuration config, FSDataInputStream stream) {
final BufferedReader reader =
new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
return new Reader<T>(
reader,
deserializationSchema
);
}
@Override
public TypeInformation<T> getProducedType() {
return deserializationSchema.getProducedType();
}
// ------------------------------------------------------------------------
/** The actual reader for the {@code LineStreamFormat}. */
private static final class Reader<T> implements StreamFormat.Reader<T> {
private final BufferedReader reader;
private final DeserializationSchema<T> deserializationSchema;
Reader(final BufferedReader reader, DeserializationSchema<T> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
this.reader = reader;
}
@Nullable
@Override
public T read() throws IOException {
String line = reader.readLine();
if (line == null) {
return null;
}
return deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8));
}
@Override
public void close() throws IOException {
reader.close();
}
}
}