ConsumerApplication.java
package org.wikimedia.discovery.cirrus.updater.consumer;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.wikimedia.discovery.cirrus.updater.common.config.ParameterToolMerger;
import org.wikimedia.discovery.cirrus.updater.consumer.config.ConsumerConfig;
import org.wikimedia.discovery.cirrus.updater.consumer.graph.ConsumerGraphFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.experimental.UtilityClass;
@UtilityClass
@SuppressWarnings("HideUtilityClassConstructor")
@SuppressFBWarnings(
value = "HideUtilityClassConstructor",
justification = "lombok takes care of that")
public class ConsumerApplication {
public static final String JOB_NAME = "cirrus-streaming-updater-consumer";
public static void main(String[] args) throws Exception {
final ParameterTool params =
ParameterToolMerger.fromDefaultsWithOverrides(
"/cirrus-streaming-updater-consumer.properties", args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
new ConsumerGraphFactory(env, ConsumerConfig.of(params)).createStreamGraph();
env.execute(JOB_NAME);
}
}