1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.wikimedia.gobblin.converter;
19
20 import java.util.Optional;
21 import java.util.stream.Collectors;
22 import java.util.stream.StreamSupport;
23
24 import org.apache.gobblin.configuration.WorkUnitState;
25 import org.apache.gobblin.converter.Converter;
26 import org.apache.gobblin.converter.DataConversionException;
27 import org.apache.gobblin.converter.SchemaConversionException;
28 import org.apache.gobblin.util.ForkOperatorUtils;
29 import org.wikimedia.gobblin.TimestampedRecord;
30
31 import com.google.common.base.Preconditions;
32
33
34 public class TimestampedRecordConverterWrapper<I, O>
35 extends Converter<String, String, TimestampedRecord<I>, TimestampedRecord<O>> {
36
37 public static final String CONVERTER_TIMESTAMPED_RECORD_WRAPPED = "converter.timestampedrecord.wrapped.class";
38
39 private Converter<String, String, I, O> wrappedConverter;
40
41 @Override
42 public Converter<String, String, TimestampedRecord<I>, TimestampedRecord<O>> init(WorkUnitState workUnit) {
43 String wrappedConverterClassKey = ForkOperatorUtils.getPropertyNameForBranch(workUnit, CONVERTER_TIMESTAMPED_RECORD_WRAPPED);
44
45 Preconditions.checkArgument(workUnit.contains(wrappedConverterClassKey),
46 "The converter " + this.getClass().getName() + " cannot be used without setting the property "
47 + CONVERTER_TIMESTAMPED_RECORD_WRAPPED);
48
49 try {
50 wrappedConverter = (Converter) Class.forName(workUnit.getProp(wrappedConverterClassKey)).getDeclaredConstructor().newInstance();
51 } catch (Exception e) {
52 throw new RuntimeException("Problem instantiating wrapper converter class: " +
53 workUnit.getProp(wrappedConverterClassKey), e);
54 }
55
56 return this;
57 }
58
59 @Override
60 public String convertSchema(String inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
61 return inputSchema;
62 }
63
64 @Override
65 public Iterable<TimestampedRecord<O>> convertRecord(String outputSchema, TimestampedRecord<I> inputRecord, WorkUnitState workUnit)
66 throws DataConversionException {
67 Iterable<O> convertedOutput = wrappedConverter.convertRecord(outputSchema, inputRecord.getPayload(), workUnit);
68 Optional<Long> recordTimestamp = inputRecord.getTimestamp();
69 return StreamSupport.stream(convertedOutput.spliterator(), false).
70 map(outputItem -> new TimestampedRecord<O>(outputItem, recordTimestamp))
71 .collect(Collectors.toList());
72 }
73
74 }