kafka
Provides methods to manipulate offsets set for specific consumer groups.
- exception spicerack.kafka.KafkaError[source]
Bases:
SpicerackError
Custom exception class for errors in this module.
- class spicerack.kafka.ConsumerDefinition(site: str, cluster: str, consumer_group: str) None [source]
Bases:
object
Data needed to identify a Kafka Consumer.
- class spicerack.kafka.Kafka(*, kafka_config: dict[str, dict[str, dict]], dry_run: bool = True)[source]
Bases:
object
Kafka module, that currently allows for inter and cross cluster consumer group position transfer.
Create Kafka module instance.
Kafka config is based on a Puppet generated config.yaml in spicerack configs. At minimum, it requires a ssl_string defined for each participating cluster, e.g.:
main: eqiad: brokers: ssl_string: "address:port,address:port" ...
- Parameters:
- set_consumer_position_by_timestamp(target_consumer: spicerack.kafka.ConsumerDefinition, timestamps: dict[str, int]) None [source]
Set an approximated offsets for given topics (provided without site prefix).
Module uses timestamps earlier by
spicerack.kafka.DELTA
ms.- Parameters:
target_consumer (
spicerack.kafka.ConsumerDefinition
) -- consumer definition for the target consumer group.timestamps (
dict
[str
,int
]) -- list of topics with timestamps to use.
- Return type:
- transfer_consumer_position(topics: list[str], source_consumer: spicerack.kafka.ConsumerDefinition, target_consumer: spicerack.kafka.ConsumerDefinition) None [source]
Transfers position from one Kafka consumer group to another.
Same cluster position is an offset transfer, different cluster will involve approximation based on the source timestamp (with
spicerack.kafka.DELTA
ms earlier seek time).All topics for which the transfer will happen are assumed to use site prefixes (e.g. eqiad.mutation).
- Parameters:
topics (
list
[str
]) -- list of topics to transfer from and to, without site prefixes.source_consumer (
spicerack.kafka.ConsumerDefinition
) -- consumer definition for the source consumer group.target_consumer (
spicerack.kafka.ConsumerDefinition
) -- consumer definition for the target consumer group.
- Return type:
- class spicerack.kafka.KafkaClient(consumer_definition: spicerack.kafka.ConsumerDefinition, kafka_config: dict, dry_run: bool) None [source]
Bases:
object
Class encapsulating Kafka operations for specific site, cluster and consumer group.
Sets up a KafkaConsumer.
- Parameters:
consumer_definition (
spicerack.kafka.ConsumerDefinition
) -- definition of the Kafka data for the consumer.kafka_config (
dict
) -- complete, available in Puppet, kafka definition.dry_run (
bool
) -- enable dry run mode.
- find_offset_for_timestamp(topic_partition: kafka.structs.TopicPartition, timestamp: int) int [source]
Find offset by approximating it with the provided timestamp.
- get_committed_offset(topic_partition: kafka.structs.TopicPartition) int [source]
Retrieve the last committed offset for given TopicPartition.
- Parameters:
topic_partition (
kafka.structs.TopicPartition
) -- non-localized topic partition.- Return type:
- get_next_timestamp(topic_partition: kafka.structs.TopicPartition) int [source]
Retrieve a timestamp for given TopicPartition about to be processed.
- Parameters:
topic_partition (
kafka.structs.TopicPartition
) -- non-localized topic partition.- Return type:
- partitions_for_topic(topic_name: str) set[int] [source]
Get partitions for a localized provided topic.