kafka

Provides methods to manipulate offsets set for specific consumer groups.

exception spicerack.kafka.KafkaError[source]

Bases: spicerack.exceptions.SpicerackError

Custom exception class for errors in this module.

class spicerack.kafka.ConsumerDefinition(site: str, cluster: str, consumer_group: str)[source]

Bases: object

Data needed to identify a Kafka Consumer.

Parameters
  • site (str) -- Kafka site/DC.

  • cluster (str) -- Kafka cluster.

  • consumer_group (str) -- Kafka consumer group.

class spicerack.kafka.Kafka(*, kafka_config: Dict[str, Dict[str, Dict]], dry_run: bool = True)[source]

Bases: object

Kafka module, that currently allows for inter and cross cluster consumer group position transfer.

Create Kafka module instance.

Kafka config is based on a Puppet generated config.yaml in spicerack configs. At minimum, it requires a ssl_string defined for each participating cluster, e.g.:

main:
  eqiad:
     brokers:
        ssl_string: "address:port,address:port"
        ...
Parameters
  • kafka_config (dict) -- Complete, available in Puppet, kafka definition.

  • dry_run (bool, optional) -- Enable dry run mode.

set_consumer_position_by_timestamp(target_consumer: spicerack.kafka.ConsumerDefinition, timestamps: Dict[str, int]) None[source]

Set an approximated offsets for given topics (provided without site prefix).

Module uses timestamps earlier by spicerack.kafka.DELTA ms.

Parameters
transfer_consumer_position(topics: List[str], source_consumer: spicerack.kafka.ConsumerDefinition, target_consumer: spicerack.kafka.ConsumerDefinition) None[source]

Transfers position from one Kafka consumer group to another.

Same cluster position is an offset transfer, different cluster will involve approximation based on the source timestamp (with spicerack.kafka.DELTA ms earlier seek time).

All topics for which the transfer will happen are assumed to use site prefixes (e.g. eqiad.mutation).

Parameters
class spicerack.kafka.KafkaClient(consumer_definition: spicerack.kafka.ConsumerDefinition, kafka_config: Dict, dry_run: bool)[source]

Bases: object

Class encapsulating Kafka operations for specific site, cluster and consumer group.

Sets up a KafkaConsumer.

Parameters
  • consumer_definition (spicerack.kafka.ConsumerDefinition) -- Definition of the Kafka data for the consumer.

  • kafka_config (dict) -- Complete, available in Puppet, kafka definition.

  • dry_run (bool) -- Enable dry run mode.

find_offset_for_timestamp(topic_partition: kafka.structs.TopicPartition, timestamp: int) int[source]

Find offset by approximating it with the provided timestamp.

Parameters
  • topic_partition (kafka.structs.TopicPartition) -- Non-localized topic partition.

  • timestamp (int) -- Timestamp for offset approximation.

Returns

Approximated offset.

Return type

int

get_committed_offset(topic_partition: kafka.structs.TopicPartition) int[source]

Retrieve a committed offset for given TopicPartition.

Parameters

topic_partition (kafka.structs.TopicPartition) -- Non-localized topic partition.

Returns

Last committed offset.

Return type

int

get_next_timestamp(topic_partition: kafka.structs.TopicPartition) int[source]

Retrieve a timestamp for given TopicPartition.

Parameters

topic_partition (kafka.structs.TopicPartition) -- Non-localized topic partition.

Returns

Currently about to be processed timestamp.

Return type

int

partitions_for_topic(topic_name: str) Set[int][source]

Get partitions for a localized provided topic.

Parameters

topic_name (str) -- Topic name without site prefix.

Returns

Set of partitions available for given topic.

Return type

set[int]

seek_offset(topic_partition: kafka.structs.TopicPartition, offset: int) None[source]

Seek the provided partition for a configured consumer group to a specific offset.

Parameters
  • topic_partition (kafka.structs.TopicPartition) -- Non-localized topic partition.

  • offset (int) -- Desired offset.

spicerack.kafka.DELTA = 120000

For offset approximation, timestamp that will be used will be earlier by this amount of ms.

Type

int

spicerack.kafka.TIMEOUT_MS = 20000

Timeout set for any kafka operations used in this module.

Type

int