kafka¶
Provides methods to manipulate offsets set for specific consumer groups.
- exception spicerack.kafka.KafkaError[source]¶
Bases:
spicerack.exceptions.SpicerackError
Custom exception class for errors in this module.
- class spicerack.kafka.ConsumerDefinition(site: str, cluster: str, consumer_group: str)[source]¶
Bases:
object
Data needed to identify a Kafka Consumer.
- class spicerack.kafka.Kafka(*, kafka_config: Dict[str, Dict[str, Dict]], dry_run: bool = True)[source]¶
Bases:
object
Kafka module, that currently allows for inter and cross cluster consumer group position transfer.
Create Kafka module instance.
Kafka config is based on a Puppet generated config.yaml in spicerack configs. At minimum, it requires a ssl_string defined for each participating cluster, e.g.:
main: eqiad: brokers: ssl_string: "address:port,address:port" ...
- Parameters
- set_consumer_position_by_timestamp(target_consumer: spicerack.kafka.ConsumerDefinition, timestamps: Dict[str, int]) None [source]¶
Set an approximated offsets for given topics (provided without site prefix).
Module uses timestamps earlier by
spicerack.kafka.DELTA
ms.- Parameters
target_consumer (spicerack.kafka.ConsumerDefinition) -- Consumer definition for the target consumer group.
timestamps (dict[str, int) -- List of topics with timestamps to use.
- transfer_consumer_position(topics: List[str], source_consumer: spicerack.kafka.ConsumerDefinition, target_consumer: spicerack.kafka.ConsumerDefinition) None [source]¶
Transfers position from one Kafka consumer group to another.
Same cluster position is an offset transfer, different cluster will involve approximation based on the source timestamp (with
spicerack.kafka.DELTA
ms earlier seek time).All topics for which the transfer will happen are assumed to use site prefixes (e.g. eqiad.mutation).
- Parameters
topics (list[str]) -- List of topics to transfer from and to, without site prefixes.
source_consumer (spicerack.kafka.ConsumerDefinition) -- Consumer definition for the source consumer group.
target_consumer (spicerack.kafka.ConsumerDefinition) -- Consumer definition for the target consumer group.
- class spicerack.kafka.KafkaClient(consumer_definition: spicerack.kafka.ConsumerDefinition, kafka_config: Dict, dry_run: bool)[source]¶
Bases:
object
Class encapsulating Kafka operations for specific site, cluster and consumer group.
Sets up a KafkaConsumer.
- Parameters
consumer_definition (spicerack.kafka.ConsumerDefinition) -- Definition of the Kafka data for the consumer.
kafka_config (dict) -- Complete, available in Puppet, kafka definition.
dry_run (bool) -- Enable dry run mode.
- find_offset_for_timestamp(topic_partition: kafka.structs.TopicPartition, timestamp: int) int [source]¶
Find offset by approximating it with the provided timestamp.
- get_committed_offset(topic_partition: kafka.structs.TopicPartition) int [source]¶
Retrieve a committed offset for given TopicPartition.
- Parameters
topic_partition (kafka.structs.TopicPartition) -- Non-localized topic partition.
- Returns
Last committed offset.
- Return type
- get_next_timestamp(topic_partition: kafka.structs.TopicPartition) int [source]¶
Retrieve a timestamp for given TopicPartition.
- Parameters
topic_partition (kafka.structs.TopicPartition) -- Non-localized topic partition.
- Returns
Currently about to be processed timestamp.
- Return type
- spicerack.kafka.DELTA = 120000¶
For offset approximation, timestamp that will be used will be earlier by this amount of ms.
- Type