HourlyPartitionSelector.java
package org.wikimedia.search.glent;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.lpad;
import static org.apache.spark.sql.functions.udf;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import javax.annotation.Nullable;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
class HourlyPartitionSelector {
@SuppressFBWarnings(value = "FCBL_FIELD_COULD_BE_LOCAL",
justification = "Used to make unix_timestamp definition more readable")
private final UserDefinedFunction unixTimestamp = udf((UDF1<String, Long>) text ->
Instant.parse(text).getEpochSecond(), DataTypes.LongType);
@VisibleForTesting
final Column rowDate = unixTimestamp.apply(concat(
col("year"), lit("-"),
lpad(col("month"), 2, "0"), lit("-"),
lpad(col("day"), 2, "0"), lit("T"),
lpad(col("hour"), 2, "0"), lit(":00:00Z")));
Column apply(Instant start, Instant end) {
Column startCond = startCondition(start);
Column endCond = endCondition(end);
if (endCond == null) {
return startCond;
} else {
return startCond.and(endCond);
}
}
private Column startCondition(Instant start) {
// Floor to beginning of starting hour
long startTs = start.truncatedTo(ChronoUnit.HOURS).getEpochSecond();
return rowDate.geq(startTs);
}
@Nullable
private Column endCondition(Instant end) {
// If the end date is in the far future don't bother generating
// a condition. This avoids handling times close to Instant.MAX
// below. Instant.MAX is ~2^54
if (end.getEpochSecond() > Math.pow(2, 40)) {
return null;
}
// Ceil to beginning of next hour
Instant truncatedEnd = end.truncatedTo(ChronoUnit.HOURS);
long endTs;
if (truncatedEnd.equals(end)) {
// If the provided date is already truncated to the hour we don't
// need to push up to next hour to include it
endTs = truncatedEnd.getEpochSecond();
} else {
endTs = truncatedEnd.plus(Duration.ofHours(1)).getEpochSecond();
}
return rowDate.lt(endTs);
}
}