kafka
Provides methods to manipulate offsets set for specific consumer groups.
- exception spicerack.kafka.KafkaError[source]
Bases:
SpicerackErrorCustom exception class for errors in this module.
- class spicerack.kafka.ConsumerDefinition(site: str, cluster: str, consumer_group: str) None[source]
Bases:
objectData needed to identify a Kafka Consumer.
- class spicerack.kafka.Kafka(*, kafka_config: dict[str, dict[str, dict]], dry_run: bool = True, ca_bundle_path: str = '/etc/ssl/certs/wmf-ca-certificates.crt')[source]
Bases:
objectKafka module, that currently allows for inter and cross cluster consumer group position transfer.
Create Kafka module instance.
Kafka config is based on a Puppet generated config.yaml in spicerack configs. At minimum, it requires a ssl_string defined for each participating cluster, e.g.:
main: eqiad: brokers: ssl_string: "address:port,address:port" ...
- Parameters:
- admin_client(site: str, cluster_name: str) kafka.admin.client.KafkaAdminClient[source]
Return a KafkaAdminClient connected to the kafka cluster of provided name located in the argument site.
- Parameters:
- Return type:
- set_consumer_position_by_timestamp(target_consumer: spicerack.kafka.ConsumerDefinition, timestamps: dict[str, int]) None[source]
Set an approximated offsets for given topics (provided without site prefix).
Module uses timestamps earlier by
spicerack.kafka.DELTAms.- Parameters:
target_consumer (
spicerack.kafka.ConsumerDefinition) -- consumer definition for the target consumer group.timestamps (
dict[str,int]) -- list of topics with timestamps to use.
- Return type:
- transfer_consumer_position(topics: list[str], source_consumer: spicerack.kafka.ConsumerDefinition, target_consumer: spicerack.kafka.ConsumerDefinition) None[source]
Transfers position from one Kafka consumer group to another.
Same cluster position is an offset transfer, different cluster will involve approximation based on the source timestamp (with
spicerack.kafka.DELTAms earlier seek time).All topics for which the transfer will happen are assumed to use site prefixes (e.g. eqiad.mutation).
- Parameters:
topics (
list[str]) -- list of topics to transfer from and to, without site prefixes.source_consumer (
spicerack.kafka.ConsumerDefinition) -- consumer definition for the source consumer group.target_consumer (
spicerack.kafka.ConsumerDefinition) -- consumer definition for the target consumer group.
- Return type:
- class spicerack.kafka.KafkaClient(consumer_definition: spicerack.kafka.ConsumerDefinition, kafka_config: dict, dry_run: bool, ca_bundle_path: str = '/etc/ssl/certs/wmf-ca-certificates.crt') None[source]
Bases:
objectClass encapsulating Kafka operations for specific site, cluster and consumer group.
Sets up a KafkaConsumer.
- Parameters:
consumer_definition (
spicerack.kafka.ConsumerDefinition) -- definition of the Kafka data for the consumer.kafka_config (
dict) -- complete, available in Puppet, kafka definition.dry_run (
bool) -- enable dry run mode.ca_bundle_path (
str, default:'/etc/ssl/certs/wmf-ca-certificates.crt') -- the path to the CA certificate bundle.
- find_offset_for_timestamp(topic_partition: kafka.structs.TopicPartition, timestamp: int) int[source]
Find offset by approximating it with the provided timestamp.
- get_committed_offset(topic_partition: kafka.structs.TopicPartition) int[source]
Retrieve the last committed offset for given TopicPartition.
- Parameters:
topic_partition (
kafka.structs.TopicPartition) -- non-localized topic partition.- Return type:
- get_next_timestamp(topic_partition: kafka.structs.TopicPartition) int[source]
Retrieve a timestamp for given TopicPartition about to be processed.
- Parameters:
topic_partition (
kafka.structs.TopicPartition) -- non-localized topic partition.- Return type:
- partitions_for_topic(topic_name: str) set[int][source]
Get partitions for a localized provided topic.