HourlyPartitionSelector.java

package org.wikimedia.search.glent;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.lpad;
import static org.apache.spark.sql.functions.udf;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

import javax.annotation.Nullable;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

import com.google.common.annotations.VisibleForTesting;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

class HourlyPartitionSelector {
    @SuppressFBWarnings(value = "FCBL_FIELD_COULD_BE_LOCAL",
            justification = "Used to make unix_timestamp definition more readable")
    private final UserDefinedFunction unixTimestamp = udf((UDF1<String, Long>) text ->
            Instant.parse(text).getEpochSecond(), DataTypes.LongType);

    @VisibleForTesting
    final Column rowDate = unixTimestamp.apply(concat(
            col("year"), lit("-"),
            lpad(col("month"), 2, "0"), lit("-"),
            lpad(col("day"), 2, "0"), lit("T"),
            lpad(col("hour"), 2, "0"), lit(":00:00Z")));

    Column apply(Instant start, Instant end) {
        Column startCond = startCondition(start);
        Column endCond = endCondition(end);
        if (endCond == null) {
            return startCond;
        } else {
            return startCond.and(endCond);
        }
    }

    private Column startCondition(Instant start) {
        // Floor to beginning of starting hour
        long startTs = start.truncatedTo(ChronoUnit.HOURS).getEpochSecond();
        return rowDate.geq(startTs);
    }

    @Nullable
    private Column endCondition(Instant end) {
        // If the end date is in the far future don't bother generating
        // a condition. This avoids handling times close to Instant.MAX
        // below. Instant.MAX is ~2^54
        if (end.getEpochSecond() > Math.pow(2, 40)) {
            return null;
        }
        // Ceil to beginning of next hour
        Instant truncatedEnd = end.truncatedTo(ChronoUnit.HOURS);
        long endTs;
        if (truncatedEnd.equals(end)) {
            // If the provided date is already truncated to the hour we don't
            // need to push up to next hour to include it
            endTs = truncatedEnd.getEpochSecond();
        } else {
            endTs = truncatedEnd.plus(Duration.ofHours(1)).getEpochSecond();
        }
        return rowDate.lt(endTs);
    }
}