View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *    http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  
19  package org.wikimedia.gobblin.utils;
20  
21  import static java.util.Locale.ENGLISH;
22  
23  import java.io.IOException;
24  import java.text.SimpleDateFormat;
25  import java.util.Collection;
26  import java.util.List;
27  import java.util.stream.Collectors;
28  
29  import org.joda.time.DateTime;
30  
31  import com.fasterxml.jackson.databind.JsonNode;
32  import com.fasterxml.jackson.databind.ObjectMapper;
33  
34  import lombok.extern.slf4j.Slf4j;
35  
36  
37  /**
38   * A utility class parsing JSON from byte[] ans extracting a timestamp from the data.
39   *
40   * The class provides static functions to initialize both the timestamp and the
41   * timestamp-format fields from a WriterPartitioner.
42   *
43   * If a record doesn't contain the specified field, or if no field is specified, the current timestamp will be used.
44   *
45   * Accepted formats are:
46   * <ul>
47   * <li>unix (same as unix_seconds)</li>
48   * <li>unix_seconds</li>
49   * <li>unix_milliseconds</li>
50   * <li>ISO_8601</li>
51   * <li>CUSTOM - Using the timestampCustomFormat parameter (can be empty for other values)</li>
52   * </ul>
53   *
54   */
55  @Slf4j
56  public class JsonStringTimestampExtractor {
57  
58      private final List<String> timestampPointers;
59      // Those two fields can't be made final due to initialization in try/catch
60      private SimpleDateFormat timestampParser;
61      private TimestampFormat timestampFormat;
62      private final ObjectMapper cachingMapper;
63  
64      public JsonStringTimestampExtractor(Collection<String> timestampPointers, String timestampFormat) {
65          // Convert the timestampPointer column from JSON to Jackson syntax
66          this.timestampPointers = timestampPointers.stream()
67                  .map(s -> "/" + s.replace('.', '/'))
68                  .collect(Collectors.toList());
69          this.cachingMapper = new ObjectMapper();
70          try {
71              this.timestampFormat = TimestampFormat.valueOf(timestampFormat);
72              this.timestampParser = null;
73          } catch (IllegalArgumentException iae) {
74              this.timestampFormat = TimestampFormat.CUSTOM;
75              this.timestampParser = new SimpleDateFormat(timestampFormat, ENGLISH);
76          } catch (NullPointerException npe) {
77              throw new IllegalArgumentException("Parameter timestampFormat shouldn't be null", npe);
78          }
79      }
80  
81      public long getRecordTimestamp(String payload) {
82          JsonNode root;
83          long ifNotFound = System.currentTimeMillis();
84          try {
85              root = cachingMapper.readTree(payload);
86          } catch (IOException ie) {
87              log.warn("Couldn't parse the json payload for timestamp extraction: {}", payload, ie);
88              return ifNotFound;
89          }
90          for (String timestampPointer: timestampPointers) {
91              try {
92                  JsonNode match = root.at(timestampPointer);
93  
94                  if (match.canConvertToLong()) {
95                      long found = match.asLong();
96                      switch (timestampFormat) {
97                          case unix:
98                          case unix_seconds:
99                              return found * 1000L;
100                         case unix_milliseconds:
101                             return found;
102                     }
103                 } else if (match.isTextual()) {
104                     String found = match.asText();
105                     switch (timestampFormat) {
106                         case ISO_8601:
107                             return new DateTime(found).getMillis();
108                         case CUSTOM:
109                             return timestampParser.parse(found).getTime();
110                     }
111                 }
112                 log.warn("No timestamp extracted from json payload: {}", payload);
113             } catch (Exception ex) {
114                 log.warn("Failed to extract timestamp from json payload: {}", payload, ex);
115             }
116         }
117         return ifNotFound;
118     }
119 
120     public enum TimestampFormat {
121         unix,
122         unix_seconds,
123         unix_milliseconds,
124         ISO_8601,
125         CUSTOM
126     }
127 
128 }