eventutilities_python package
Subpackages
- eventutilities_python.stream package
- Submodules
- eventutilities_python.stream.descriptor module
- eventutilities_python.stream.error module
- eventutilities_python.stream.functions module
- eventutilities_python.stream.manager module
- Module contents
- eventutilities_python.testing package
Submodules
eventutilities_python.flink module
Helper methods and wrappers for working with PyFlink and Wikimedia’s eventutilities.
- class eventutilities_python.flink.EventDataStreamFactory(java_object: JavaObject)
Bases:
JavaObjectWrapper
A Python wrapper to eventutilities-flink’s EventDataStreamFactory.
- collection_source(stream_name: str, source_data: Collection[Dict[str, Any]], schema_version: str | None = None) Source
Uses the provided source collection of dict to instantiate the input Source. Because PyFlink does not provide a way to use eventutilities- flink Java based JsonRowDeserializationSchema to instantiate a DataStream of Rows using the built in env.from_collection(), we mock a ‘collection source’ by writing the collection out to a temporary file, and then using a FileSource to read it in.
- Parameters:
stream_name – Stream name.
source_data – a list of dictionaries, each of them representing an event to deserialize.
schema_version – Version of the stream’s event schema to use. If None, latest will be used.
- file_source(stream_name: str, uris: Collection[str | Path], schema_version: str | None = None) FileSource
Gets a FileSource that will deserialize NLJSON files using the stream_descriptor’s schema.
Note that there is no corresponding file_source_builder method, as there are not so many options to pass to the Java FileSourceBuilder, and PyFlink does not provide a FileSourceBuilder wrapper like it does for other SourceBuilders.
- Parameters:
stream_name – Name of the stream. Must be declared in event stream config.
uris – String URIs or list of URIs to read NLJSON from.
schema_version – Version of the stream’s event schema to use. If None, latest will be used.
- from_source(env: StreamExecutionEnvironment, stream_name: str, source: Source | SourceFunction, watermark_strategy: WatermarkStrategy | None = None, source_name: str | None = None, schema_version: str = 'latest') DataStream
Instantiates new DataStream from the provided Source using the RowTypeInfo of the stream_descriptor.
- Parameters:
env – an instance of Flink StreamExecutionEnvironment
stream_name – Stream name
schema_version – Event schema version
source – Flink Source or SourceFunction.
watermark_strategy – If none is provided, WatermarkStrategy.noWatermarks() will be used. If source is a SourceFunction, this will be ignored.
source_name – Name of source, will be used in UI and metrics. If not provided, will use the stream name and version.
- get_event_stream_factory()
Returns the eventutilities-core Java EventStreamFactory instance.
We may one day want to provide a python wrapper for this too.
- get_row_type_info(stream_name: str, schema_version: str = 'latest') TypeInformation
Return the Flink RowTypeInfo for a given stream name and version.
- Parameters:
stream_name – Name of the stream
schema_version – Version of the stream’s event schema to use.
- java_object: JavaObject
The underlying py4j JavaObject.
- kafka_sink(stream_name: str, schema_version: str, bootstrap_servers: str, topic: str, delivery_guarantee: DeliveryGuarantee | str | None = None, transactional_id_prefix: str | None = None, properties: Dict[str, str] | None = None) KafkaSink
Instantiate new Kafka Sink for a streaming application.
- Parameters:
stream_name – Stream Name.
schema_version – Event schema version. Must be an actual version. “latest” is not supported for Sinks.
bootstrap_servers – Kafka bootstrap.servers property.
topic – The Kafka topic to write to. If not one of the topics referenced by the EventStreamConfig, a warning will be logged.
delivery_guarantee – Flink DeliveryGuarantee (or str name) to use for the Sink. https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#fault-tolerance
transactional_id_prefix – Required if using DeliveryGuarantee.EXACTLY_ONCE. https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#fault-tolerance
properties – Additional properties to set on the KafkaSinkBuilder.
- kafka_sink_builder(stream_name: str, schema_version: str, bootstrap_servers: str, topic: str) KafkaSinkBuilder
Prepare a KafkaSinkBuilder with all the required components to produce to kafka using a json format matching the provided schema.
The produced messages match the WMF Event Platform Rules: - the meta.dt field is filled and will be used as the kafka timestamp - the topic to produce to is one of the topics defined in the EventStream configuration - the provided schema must match the one defined in the EventStream configuration - the resulting json is validated against this schema - the dt (event time) field remains optional and must be set beforehand in the pipeline
This sink builder will be configured to use a default random Kafka partitioner, and a timestamp strategy of KafkaRecordTimestampStrategy.ROW_EVENT_TIME.
If you want to change any of these settings, then call the appropriate method on the returned KafkaSinkBuilder before calling build().
TODO: - support custom kafka partitioner - support custom timestamp strategy
- Parameters:
stream_name – Stream name. Must be declared in event stream config.
schema_version – Event schema version. Must be an actual version. “latest” is not supported for Sinks.
bootstrap_servers – Kafka bootstrap.servers property.
topic – The Kafka topic to write to. If not one of the topics referenced by the EventStreamConfig, a warning will be logged.
- kafka_source(stream_name: str, bootstrap_servers: str, consumer_group: str, offsets_initializer: KafkaOffsetsInitializer | int | str | None = None, properties: Dict[str, str] | None = None, topics: Sequence[str] | None = None, schema_version: str | None = None) KafkaSource
Returns a KafkaSource using kafka_source_builder, setting provided properties on the KafkaSourceBuilder.
- Parameters:
stream_name – Stream name. Must be declared in event stream config.
bootstrap_servers – Kafka bootstrap.servers property.
consumer_group – Kafka consumer.group.id property.
offsets_initializer –
EventDataStreamFactory kafkaSourceBuilder will set the default offset initializer to use committed offsets, or latest if none are found.
Set offsets_initializer to override this. - If an int or a digit str, it is expected to be a timestamp - If a str, it should be one of “earliest” or “latest” - If a KafkaOffsetsInitializer, just use it.
TODO: add support for specific offsets.
properties – Additional properties to set on the KafkaSourceBuilder.
topics – Topics to use. If None, the stream’s configured topics will be used.
schema_version – Version of the stream’s event schema to use. If None, latest will be used.
- kafka_source_builder(stream_name: str, bootstrap_servers: str, consumer_group: str, topics: Sequence[str] | None = None, schema_version: str | None = None) KafkaSourceBuilder
Get a KafkaSourceBuilder that is primed with settings needed to consume the stream from Kafka.
This sets the following: - bootstrapServers, - topics, - consumer group id - value only deserializer that will deserialize to a
Row conforming to streamName’s JSONSchema
starting offsets will use committed offsets, resetting to LATEST if no offsets are committed.
If you want to change any of these settings, then call the appropriate method on the returned KafkaSourceBuilder before calling build().
- Parameters:
stream_name – Stream name. Must be declared in event stream config.
bootstrap_servers – Kafka bootstrap.servers property.
consumer_group – Kafka consumer.group.id property.
topics – Topics to use. If None, the stream’s configured topics will be used.
schema_version – Version of the stream’s event schema to use. If None, latest will be used.
- classmethod of(schema_uris: Sequence[str] | str, stream_config_uri: str, http_client_routes: Dict[str, str] | None = None) EventDataStreamFactory
- Parameters:
schema_uris – Event schema repository URIs. Event schemas
will be loaded from these. Either a list or a csv.
- Parameters:
stream_config_uri – URI from which to get stream config. See org.wikimedia.eventutilities.core.event.EventStreamConfig
http_client_routes – A mapping of source hostnames to dest hostnames. If an HTTP request is to made to a source hostname, the request will be sent to dest hostname but with the Host header set to source hostname. Used in WMF production environments where we need to use discovery MediaWiki API endpoints, e.g. api-ro.discovery.wmnet
- print_sink(stderr: bool = False) SinkFunction
A simple print sink. No serializer will be used. Equivalent to using datastream.print().
- Parameters:
stderr – If true, the print sink will print to stderr instead of stdout
NOTE: This should not be used for ‘production’ jobs. This is mostly useful for development and testing.
- row_file_sink(stream_name: str, schema_version: str, uri: str) Sink
Returns a FileSink that will write Rows as JSON strings with the default bucket assigner and rolling policies. Mostly useful for integration testing.
- Parameters:
stream_name – Stream Name.
schema_version – Event schema version. Must be an actual version. “latest” is not supported for Sinks.
uri – Base uri location directory where partitioned output directories and files will be written.
- sse_source(stream_name: str, uri: str = 'https://stream.wikimedia.org/v2/stream', is_wmf_eventstreams_api: bool = True, schema_version: str | None = None) SourceFunction
Gets a SourceFunction that reads JSON events from the SSE EventSource URI. This is useful for reading events from the WMF HTTP EventStreams API, e.g. https://stream.wikimedia.org/v2/stream.
The default values of uri and is_wmf_eventstreams_api are enough to read a stream from the public stream.wikimedia.org endpoint.
NOTE: This should not be used for ‘production’ jobs. It uses the deprecated Flink SourceFunction, and has not been thoroughly tested. This is mostly useful for development and testing.
- Parameters:
stream_name – Stream name. Must be declared in event stream config.
uri – SSE EventSource URI endpoint from which to read JSON events.
is_wmf_eventstreams_api – If True, the uri endpoint will be altered to use the stream name according the WMF EventStreams HTTP API. E.g. uri + /<stream_name>
schema_version – Version of the stream’s event schema to use. If None, latest will be used.
- stream_exists(stream_name: str) bool
Returns true if the stream is declared in stream config.
- eventutilities_python.flink.convert_dicts_to_rows(value: Any, type_info: TypeInformation) Any
Iterates over type_info and value and recursively converts any dicts in value that should be a Row to a Row.
- Parameters:
value – Value to convert.
type_info – TypeInformation describing the Flink type that the value should be converted to.
- eventutilities_python.flink.default_kafka_producer_properties = {'max.request.size': '4194304'}
Used when building a Kafka sink to set some defaults.
- eventutilities_python.flink.dict_to_properties(d: Dict[str, str]) JavaObject
Converts a dict of string -> string key value pairs into java.util.Properties.
- eventutilities_python.flink.dict_to_row_converter(type_info: TypeInformation) Callable[[Dict[str, Any]], Row]
Returns a converter function for type_info.
dict_to_row_converter delegates a dictionary to row transformation to convert_dicts_to_rows by means of partial function application.
An input dictionary will be passed to convert_dicts_to_rows as the call stack is built incrementally.
dict_to_row_converter can be used in DataStream higher-order functions: Example >>> ds.map(dict_to_row_converter(type_info), type_info).map(…)
- eventutilities_python.flink.flink_jvm()
- eventutilities_python.flink.get_flink_env(lib_dir: str | None = None) StreamExecutionEnvironment
Get a flink StreamExecutionEnvironment with lib_dir loaded.
- eventutilities_python.flink.load_lib_dir(flink_env: StreamExecutionEnvironment, lib_dir: str | None = None) None
Add all jars in lib_dir to flink_env.
If lib_dir is not defined, the value of the environment variable EVENTUTILITIES_LIB_DIR will be used. If EVENTUTILITIES_LIB_DIR is not set, jars are expected to be part of this package in the /lib directory.
- eventutilities_python.flink.row_to_dict(row: Row, recursive: bool = True) Dict[str, Any]
Convert a PyFlink Row to a python dictionary.
- Parameters:
row – a PyFlink row
recursive – convert nested Rows to dict
eventutilities_python.type_aliases module
- eventutilities_python.type_aliases.NestedStr
Recursive Sequence of strings.
Allows for [str], [str, [str, str]], etc.
alias of
Sequence
[Union
[str
,NestedStr
]]
- class eventutilities_python.type_aliases.Path_uc(v, **k)
Bases:
PathType
- class eventutilities_python.type_aliases.UriAbsolute
UriAbsolute is just a type alias of str.
It exists to be able to allow jsonargparse to automatically deserialize and resolve incoming URI parameters with this type.
alias of TypeVar(‘UriAbsolute’, bound=
str
)
- eventutilities_python.type_aliases.resolve_uris(uri: str | Sequence[str | Sequence[str | NestedStr]], relative_to: Sequence[str] | None = None) str | Sequence[str | Sequence[str | NestedStr]]
Resolves u as an absolute URI. Uses jsonargparse url parsing, as well as pathlib.Path.as_uri() to add file:// to the beginning of local file paths.
- Parameters:
uri – uri to resolve, or a (recursive) list of uris to resolve.
relative_to – If not provided, will attempt to resolve only to cwd. If provided, each path here will be attempted to be used to resolve in order. The first readable file found in one of these paths will be returned.
- Returns:
The resolved URI or (recursive) list of resolved URIs.
eventutilities_python.utils module
- class eventutilities_python.utils.JavaObjectWrapper(java_object: JavaObject)
Bases:
object
A wrapper class that maintains an object implemented in Java via py4j.
Based on pyflink’s JavaFunctionWrapper.
- java_object: JavaObject
The underlying py4j JavaObject.
- eventutilities_python.utils.callable_repr(func: Callable[[...], Any]) str
Given a callable, returns a nice description string using its name, file and lineno, if available.
- eventutilities_python.utils.find_lib_jars(lib_dir: str) Collection[str]
Returns a list of file:// prefixed .jar file paths in the directory.
- eventutilities_python.utils.flink_instant_of_datetime(dt: datetime) Instant
Converts a python datetime to a PyFlink Instant.
- eventutilities_python.utils.format_dt(dt: datetime) str
Formats dt into an ISO-8601 UTC (‘Z’) string.
- eventutilities_python.utils.get_memory_usage_of(mem_info_key: str) int
Calls psutil.Process().memory_full_info() and returns the memory usage bytes value for mem_info_key.
- Parameters:
mem_info_key – See
https://psutil.readthedocs.io/en/latest/#psutil.Process.memory_full_info :param mem_info_key: See https://psutil.readthedocs.io/en/latest/#ps
util.Process.memory_full_info for a list of available keys.
- eventutilities_python.utils.json_default_serializer(obj: Any) Any
A serializer function that handles a few more types that python json serializer doesn’t know how to handle.
Used when serializing objects to json that have types like datetimes or Flink Instants.
- eventutilities_python.utils.setup_logging(config_dict: Dict[Any, Any] | None = None, level: str = 'INFO') None
Sets up python logging either from a log config file or using basicConfig.
- Parameters:
config_dict – Setup logging from a config dict. If not set, then basicConfig will be used.
level – basicConfig log level that will be used if config_file is not set.
Module contents
A Python library to build Event Platform streaming services.
- eventutilities_python.get_version() str