JsonStringTimestampExtractor.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.wikimedia.gobblin.utils;
import static java.util.Locale.ENGLISH;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.joda.time.DateTime;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
/**
* A utility class parsing JSON from byte[] ans extracting a timestamp from the data.
*
* The class provides static functions to initialize both the timestamp and the
* timestamp-format fields from a WriterPartitioner.
*
* If a record doesn't contain the specified field, or if no field is specified, the current timestamp will be used.
*
* Accepted formats are:
* <ul>
* <li>unix (same as unix_seconds)</li>
* <li>unix_seconds</li>
* <li>unix_milliseconds</li>
* <li>ISO_8601</li>
* <li>CUSTOM - Using the timestampCustomFormat parameter (can be empty for other values)</li>
* </ul>
*
*/
@Slf4j
public class JsonStringTimestampExtractor {
private final List<String> timestampPointers;
// Those two fields can't be made final due to initialization in try/catch
private SimpleDateFormat timestampParser;
private TimestampFormat timestampFormat;
private final ObjectMapper cachingMapper;
public JsonStringTimestampExtractor(Collection<String> timestampPointers, String timestampFormat) {
// Convert the timestampPointer column from JSON to Jackson syntax
this.timestampPointers = timestampPointers.stream()
.map(s -> "/" + s.replace('.', '/'))
.collect(Collectors.toList());
this.cachingMapper = new ObjectMapper();
try {
this.timestampFormat = TimestampFormat.valueOf(timestampFormat);
this.timestampParser = null;
} catch (IllegalArgumentException iae) {
this.timestampFormat = TimestampFormat.CUSTOM;
this.timestampParser = new SimpleDateFormat(timestampFormat, ENGLISH);
} catch (NullPointerException npe) {
throw new IllegalArgumentException("Parameter timestampFormat shouldn't be null", npe);
}
}
public long getRecordTimestamp(String payload) {
JsonNode root;
long ifNotFound = System.currentTimeMillis();
try {
root = cachingMapper.readTree(payload);
} catch (IOException ie) {
log.warn("Couldn't parse the json payload for timestamp extraction: {}", payload, ie);
return ifNotFound;
}
for (String timestampPointer: timestampPointers) {
try {
JsonNode match = root.at(timestampPointer);
if (match.canConvertToLong()) {
long found = match.asLong();
switch (timestampFormat) {
case unix:
case unix_seconds:
return found * 1000L;
case unix_milliseconds:
return found;
}
} else if (match.isTextual()) {
String found = match.asText();
switch (timestampFormat) {
case ISO_8601:
return new DateTime(found).getMillis();
case CUSTOM:
return timestampParser.parse(found).getTime();
}
}
log.warn("No timestamp extracted from json payload: {}", payload);
} catch (Exception ex) {
log.warn("Failed to extract timestamp from json payload: {}", payload, ex);
}
}
return ifNotFound;
}
public enum TimestampFormat {
unix,
unix_seconds,
unix_milliseconds,
ISO_8601,
CUSTOM
}
}