1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.wikimedia.gobblin.utils;
20
21 import static java.util.Locale.ENGLISH;
22
23 import java.io.IOException;
24 import java.text.SimpleDateFormat;
25 import java.util.Collection;
26 import java.util.List;
27 import java.util.stream.Collectors;
28
29 import org.joda.time.DateTime;
30
31 import com.fasterxml.jackson.databind.JsonNode;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33
34 import lombok.extern.slf4j.Slf4j;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 @Slf4j
56 public class JsonStringTimestampExtractor {
57
58 private final List<String> timestampPointers;
59
60 private SimpleDateFormat timestampParser;
61 private TimestampFormat timestampFormat;
62 private final ObjectMapper cachingMapper;
63
64 public JsonStringTimestampExtractor(Collection<String> timestampPointers, String timestampFormat) {
65
66 this.timestampPointers = timestampPointers.stream()
67 .map(s -> "/" + s.replace('.', '/'))
68 .collect(Collectors.toList());
69 this.cachingMapper = new ObjectMapper();
70 try {
71 this.timestampFormat = TimestampFormat.valueOf(timestampFormat);
72 this.timestampParser = null;
73 } catch (IllegalArgumentException iae) {
74 this.timestampFormat = TimestampFormat.CUSTOM;
75 this.timestampParser = new SimpleDateFormat(timestampFormat, ENGLISH);
76 } catch (NullPointerException npe) {
77 throw new IllegalArgumentException("Parameter timestampFormat shouldn't be null", npe);
78 }
79 }
80
81 public long getRecordTimestamp(String payload) {
82 JsonNode root;
83 long ifNotFound = System.currentTimeMillis();
84 try {
85 root = cachingMapper.readTree(payload);
86 } catch (IOException ie) {
87 log.warn("Couldn't parse the json payload for timestamp extraction: {}", payload, ie);
88 return ifNotFound;
89 }
90 for (String timestampPointer: timestampPointers) {
91 try {
92 JsonNode match = root.at(timestampPointer);
93
94 if (match.canConvertToLong()) {
95 long found = match.asLong();
96 switch (timestampFormat) {
97 case unix:
98 case unix_seconds:
99 return found * 1000L;
100 case unix_milliseconds:
101 return found;
102 }
103 } else if (match.isTextual()) {
104 String found = match.asText();
105 switch (timestampFormat) {
106 case ISO_8601:
107 return new DateTime(found).getMillis();
108 case CUSTOM:
109 return timestampParser.parse(found).getTime();
110 }
111 }
112 log.warn("No timestamp extracted from json payload: {}", payload);
113 } catch (Exception ex) {
114 log.warn("Failed to extract timestamp from json payload: {}", payload, ex);
115 }
116 }
117 return ifNotFound;
118 }
119
120 public enum TimestampFormat {
121 unix,
122 unix_seconds,
123 unix_milliseconds,
124 ISO_8601,
125 CUSTOM
126 }
127
128 }