ReconciliationSender.scala

package org.wikidata.query.rdf.updater.reconcile

import java.io.IOException
import java.net.URI
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.http.client.methods.HttpPost
import org.apache.http.entity.{ByteArrayEntity, ContentType}
import org.apache.http.StatusLine
import org.apache.http.client.HttpClient
import org.apache.http.util.EntityUtils
import org.wikidata.query.rdf.tool.MapperUtils
import org.wikidata.query.rdf.tool.change.events.ReconcileEvent

import java.time.Instant

class ReconciliationSender(httpClient: HttpClient,
                           eventServiceUri: URI,
                           batchSize: Int = 100,
                           jsonObjectMapper: ObjectMapper = MapperUtils.getObjectMapper,
                           retries: Int = 3,
                           retryWaitMs: Long = 500,
                           clock: () => Instant = () => Instant.now()) {

  def send(events: Iterable[ReconcileEvent]): Unit = {
    events.grouped(batchSize).foreach(batch => withRetry()(() => sendBatch(batch, httpClient)))
  }

  @tailrec
  private def withRetry(nretry: Int = retries)(func: () => Unit): Unit = {
    Try {
      func()
    } match {
      case Success(_: Any) => // success
      case Failure(_: IOException) if nretry > 0 => Thread.sleep(retryWaitMs); withRetry(nretry - 1)(func)
      case Failure(e) => throw new IOException("Failed to send events: " + e.getMessage, e)
    }
  }

  private def sendBatch(events: Iterable[ReconcileEvent], httpClient: HttpClient): Unit = {
    val post = new HttpPost(eventServiceUri)
    // force a "fresh" event-time, we don't want these events to be considered late or they might be collected again...
    post.setEntity(new ByteArrayEntity(jsonObjectMapper.writeValueAsBytes(events.map(e => e.overrideEventTime(clock())).toArray), ContentType.APPLICATION_JSON))
    val response = httpClient.execute(post)
    EntityUtils.consume(response.getEntity)
    if (!validResponse(response.getStatusLine)) {
      throw new IOException("Received unexpected error " + response.getStatusLine + " from " + eventServiceUri)
    }
  }

  def validResponse(statusLine: StatusLine): Boolean = Seq(201, 202).contains(statusLine.getStatusCode)
}