kafka

Provides methods to manipulate offsets set for specific consumer groups.

exception spicerack.kafka.KafkaError[source]

Bases: SpicerackError

Custom exception class for errors in this module.

class spicerack.kafka.ConsumerDefinition(site: str, cluster: str, consumer_group: str) None[source]

Bases: object

Data needed to identify a Kafka Consumer.

Parameters:
  • site (str) -- Kafka site/DC.

  • cluster (str) -- Kafka cluster.

  • consumer_group (str) -- Kafka consumer group.

class spicerack.kafka.Kafka(*, kafka_config: dict[str, dict[str, dict]], dry_run: bool = True)[source]

Bases: object

Kafka module, that currently allows for inter and cross cluster consumer group position transfer.

Create Kafka module instance.

Kafka config is based on a Puppet generated config.yaml in spicerack configs. At minimum, it requires a ssl_string defined for each participating cluster, e.g.:

main:
  eqiad:
     brokers:
        ssl_string: "address:port,address:port"
        ...
Parameters:
  • kafka_config (dict[str, dict[str, dict]]) -- complete, available in Puppet, kafka definition.

  • dry_run (bool, default: True) -- enable dry run mode.

set_consumer_position_by_timestamp(target_consumer: spicerack.kafka.ConsumerDefinition, timestamps: dict[str, int]) None[source]

Set an approximated offsets for given topics (provided without site prefix).

Module uses timestamps earlier by spicerack.kafka.DELTA ms.

Parameters:
Return type:

None

transfer_consumer_position(topics: list[str], source_consumer: spicerack.kafka.ConsumerDefinition, target_consumer: spicerack.kafka.ConsumerDefinition) None[source]

Transfers position from one Kafka consumer group to another.

Same cluster position is an offset transfer, different cluster will involve approximation based on the source timestamp (with spicerack.kafka.DELTA ms earlier seek time).

All topics for which the transfer will happen are assumed to use site prefixes (e.g. eqiad.mutation).

Parameters:
Return type:

None

class spicerack.kafka.KafkaClient(consumer_definition: spicerack.kafka.ConsumerDefinition, kafka_config: dict, dry_run: bool) None[source]

Bases: object

Class encapsulating Kafka operations for specific site, cluster and consumer group.

Sets up a KafkaConsumer.

Parameters:
  • consumer_definition (spicerack.kafka.ConsumerDefinition) -- definition of the Kafka data for the consumer.

  • kafka_config (dict) -- complete, available in Puppet, kafka definition.

  • dry_run (bool) -- enable dry run mode.

find_offset_for_timestamp(topic_partition: kafka.structs.TopicPartition, timestamp: int) int[source]

Find offset by approximating it with the provided timestamp.

Parameters:
  • topic_partition (kafka.structs.TopicPartition) -- non-localized topic partition.

  • timestamp (int) -- timestamp for offset approximation.

Return type:

int

get_committed_offset(topic_partition: kafka.structs.TopicPartition) int[source]

Retrieve the last committed offset for given TopicPartition.

Parameters:

topic_partition (kafka.structs.TopicPartition) -- non-localized topic partition.

Return type:

int

get_next_timestamp(topic_partition: kafka.structs.TopicPartition) int[source]

Retrieve a timestamp for given TopicPartition about to be processed.

Parameters:

topic_partition (kafka.structs.TopicPartition) -- non-localized topic partition.

Return type:

int

partitions_for_topic(topic_name: str) set[int][source]

Get partitions for a localized provided topic.

Parameters:

topic_name (str) -- topic name without site prefix.

Return type:

set[int]

seek_offset(topic_partition: kafka.structs.TopicPartition, offset: int) None[source]

Seek the provided partition for a configured consumer group to a specific offset.

Parameters:
  • topic_partition (kafka.structs.TopicPartition) -- non-localized topic partition.

  • offset (int) -- desired offset.

Return type:

None

spicerack.kafka.DELTA: int = 120000

For offset approximation, timestamp that will be used will be earlier by this amount of ms.

spicerack.kafka.TIMEOUT_MS: int = 20000

Timeout set for any kafka operations used in this module in milliseconds.