View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *    http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.wikimedia.gobblin.converter;
19  
20  import java.util.Optional;
21  import java.util.stream.Collectors;
22  import java.util.stream.StreamSupport;
23  
24  import org.apache.gobblin.configuration.WorkUnitState;
25  import org.apache.gobblin.converter.Converter;
26  import org.apache.gobblin.converter.DataConversionException;
27  import org.apache.gobblin.converter.SchemaConversionException;
28  import org.apache.gobblin.util.ForkOperatorUtils;
29  import org.wikimedia.gobblin.TimestampedRecord;
30  
31  import com.google.common.base.Preconditions;
32  
33  
34  public class TimestampedRecordConverterWrapper<I, O>
35          extends Converter<String, String, TimestampedRecord<I>, TimestampedRecord<O>> {
36  
37      public static final String CONVERTER_TIMESTAMPED_RECORD_WRAPPED = "converter.timestampedrecord.wrapped.class";
38  
39      private Converter<String, String, I, O> wrappedConverter;
40  
41      @Override
42      public Converter<String, String, TimestampedRecord<I>, TimestampedRecord<O>> init(WorkUnitState workUnit) {
43          String wrappedConverterClassKey = ForkOperatorUtils.getPropertyNameForBranch(workUnit, CONVERTER_TIMESTAMPED_RECORD_WRAPPED);
44  
45          Preconditions.checkArgument(workUnit.contains(wrappedConverterClassKey),
46                  "The converter " + this.getClass().getName() + " cannot be used without setting the property "
47                          + CONVERTER_TIMESTAMPED_RECORD_WRAPPED);
48  
49          try {
50              wrappedConverter = (Converter) Class.forName(workUnit.getProp(wrappedConverterClassKey)).getDeclaredConstructor().newInstance();
51          } catch (Exception e) {
52              throw new RuntimeException("Problem instantiating wrapper converter class: " +
53                      workUnit.getProp(wrappedConverterClassKey), e);
54          }
55  
56          return this;
57      }
58  
59      @Override
60      public String convertSchema(String inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
61          return inputSchema;
62      }
63  
64      @Override
65      public Iterable<TimestampedRecord<O>> convertRecord(String outputSchema, TimestampedRecord<I> inputRecord, WorkUnitState workUnit)
66              throws DataConversionException {
67          Iterable<O> convertedOutput = wrappedConverter.convertRecord(outputSchema, inputRecord.getPayload(), workUnit);
68          Optional<Long> recordTimestamp = inputRecord.getTimestamp();
69          return StreamSupport.stream(convertedOutput.spliterator(), false).
70                  map(outputItem -> new TimestampedRecord<O>(outputItem, recordTimestamp))
71                  .collect(Collectors.toList());
72      }
73  
74  }