eventutilities_python.stream package
Submodules
eventutilities_python.stream.descriptor module
- class eventutilities_python.stream.descriptor.EventStreamDescriptor(name: str, version: str | None = 'latest')
Bases:
object
Source and sink streams have an associated event version.
- Example:
>>> stream = EventStreamDescriptor(name="test.event.stream", version="1.1.0") >>> stream.name, stream.version
- static from_string(stream_name: str) Any
A factory method that parses stream_name and returns an instance of EventStream.
- Example:
>>> stream = EventStreamDescriptor.from_string("test.event.stream:1.1.0") >>> stream.name, stream.version
- Parameters:
stream_name – a stream name with versioning scheme “<name>:<version>”
- static is_valid_stream_name(stream_name)
Check if stream_name contains invalid characters.
This method should be invoked every time we generate a stream name from the library (e.g. error_stream auto-detection).
Stream name validation should not be enabled by default on all streams: We might want to consume streams that for historical reasons are now considered “invalid”. Howeverm we should make sure that the streams generated by eventutilities-python are valid.
- Parameters:
stream_name – a stream name
- Returns:
- is_version_pinned() bool
- name: str
- version: str | None = 'latest'
- class eventutilities_python.stream.descriptor.SinkDescriptor(connector: str, stream: str, name: Optional[str] = None, options: Dict[Any, Any] = <factory>)
Bases:
ConnectorDescriptor
- sink(datastream_factory: EventDataStreamFactory) Sink | SinkFunction | None
Instantiates a Flink Sink.
- Parameters:
datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using the SourceDescriptor configuration.
- sink_to(datastream_factory: EventDataStreamFactory, datastream: DataStream, to_row: bool = True) DataStreamSink | None
Instantiates a Flink Sink and sinks the datastream to it.
- Parameters:
datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using the SourceDescriptor configuration.
datastream – the datastream to write to the Sink. This datastream MUST conform to the shape (TypeInformation) of the Sink, which is set by the schema of this SinkDescriptor’s stream_descriptor.
to_row – If True, it is expected that the elements in datastream are python dicts, and they will be converted to Flink Row’s using the Sink’s TypeInformation before being written to the Sink. This means that the dict element types must conform to the stream_descriptor event schema.
- supported_connectors: ClassVar[List[str]] = ['null', 'file', 'kafka', 'print']
Note: “null” is a special case, and not a real sink. It causes both sink and sink_to to return None.
- class eventutilities_python.stream.descriptor.SourceDescriptor(connector: str, stream: str, name: Optional[str] = None, options: Dict[Any, Any] = <factory>)
Bases:
ConnectorDescriptor
- datastream(env: StreamExecutionEnvironment, datastream_factory: EventDataStreamFactory, watermark_strategy: WatermarkStrategy | None = None, as_dict: bool = True) DataStream
Instantiates a Flink DataStream reading from source.
- Parameters:
env – Flink env with required Java dependencies loaded.
datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using the SourceDescriptor.
watermark_strategy – WatermarkStrategy to use for the DataStream source. Defaults to for_monotonous_timestamps() if connector is Kafka, else no_watermarks() will be used.
as_dict – If True, the datastream will be converted from Flink Row to python dict before being returned.
- source(datastream_factory: EventDataStreamFactory) Source | SourceFunction
Instantiates a Flink Source.
- Parameters:
datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using this SourceDescriptor.
- supported_connectors: ClassVar[List[str]] = ['file', 'collection', 'kafka', 'sse']
List of supported connectors.
These will be checked before instantiating a concrete Connector (Source or Sink).
eventutilities_python.stream.error module
- eventutilities_python.stream.error.create_error_event(error: Exception | str, emitter_id: str, error_dt: datetime | str | Instant, error_stream_name: str | None = None, errored_event: Dict[str, Any] | str | None = None) Dict[str, Any]
An error that can be serialized into a WMF error event. https://schema.wikimedia.org/#!/primary/jsonschema/error.
- Parameters:
error – An Exception or error message
emitter_id – Informative name of the ‘emitter’ of this error.
error_dt –
event time to use for the error event. Note that this is not the same as the event time of the errored_event which caused the error. - If this is a datetime, it will be formatted into an ISO-8601 ‘Z’ string. - If this is a Flink Instant or a str, the dt will be set as is.
(If you are creating this error event for use in a Flink pipeline, you’ll want to pass in an Instant.)
error_stream_name – Value to use for meta.stream. This is optional, as some serializers (e.g. wikimedia-event-utilities JsonEventGenerator) handle setting meta.stream..
errored_event – The input event that triggered this error. - If this is a dict, it will be serialized as a JSON string and set in the raw_event field. - Else if not None, it will be set as a str in the raw_event field.
eventutilities_python.stream.functions module
Function wrappers for the stream.manager.Stream process() func.
- eventutilities_python.stream.functions.http_process_function(max_retries: int = 3, retry_backoff_factor: float = 0.1, retry_status_forcelist: List[int] | None = None, pool_maxsize: int = 1) Callable[[...], Any]
Decorator that wraps a function and handles instantiation of a requests Session object, with a retry policy and a worker pool.
- Usage:
>>> @http_process_function() >>> def my_func(event: dict, http_session: requests.Session): >>> event["enriched"] = http_session.get(...)["enriched_data"] >>> return event
- Parameters:
max_retries – Retry total retries
retry_backoff_factor – Retry backoff_factor
retry_status_forcelist – Retry status_forcelist. Default: [408, 500, 502, 503, 504]
pool_maxsize – HTTPAdapter pool_maxsize
eventutilities_python.stream.manager module
- eventutilities_python.stream.manager.load_config(argv: List[str] | None = None, parser: ArgumentParser | None = None, defaults: Dict[Any, Any] | None = None, default_config_files: List[str] | None = None) Namespace
Returns a jsonargparse Namespace object with all configured classes instantiated from configs.
Notably, the returned config.stream_manager will contain all the kwargs necessary to call the stream_manager constructor.
- Parameters:
argv – List of args to parse into config. If this is None, jsonsargparse will use sys.argv.
parser – A pre-instantiated ArgumentParser. Pass this in if you have custom arguments to configure. If None, a default ArgumentParser will be constructed.
defaults – Default values to apply when parsing args.
- Example:
>>> parser = ArgumentParser() >>> parser.add_argument('--my_custom_arg', type=str) >>> config = load_config(parser=parser) >>> with stream_manager.from_config(config) as stream: # ...
- class eventutilities_python.stream.manager.stream_manager(job_name: str, source: SourceDescriptor, sink: SinkDescriptor, error_sink: SinkDescriptor = SinkDescriptor(connector='null', stream='stream.error:2.1.0', name='null__stream.error', options={}), schema_uris: Sequence[UriAbsolute] | UriAbsolute = ('https://schema.wikimedia.org/repositories/primary/jsonschema', 'https://schema.wikimedia.org/repositories/secondary/jsonschema'), stream_config_uri: UriAbsolute = 'https://meta.wikimedia.org/w/api.php', http_client_routes: Dict[str, str] | None = None, process_max_workers_default: int = 12, process_batch_size_default: int = 12, process_batch_duration_ms_default: int = 120000)
Bases:
object
- classmethod arg_parser(parser: ArgumentParser | None = None) ArgumentParser
Returns an ArgumentParser which can be used to auto instantiate a stream_manager from CLI and/or config files. You can augment this parser with your own CLI opts if you need to.
- Parameters:
parser – If not given, a new ArgumentParser will be created.
- argparse_namespace: str = 'stream_manager'
Prefix namespace key in which arg_parser expects stream_manager configs..
- static check_valid_stream_version(stream_descriptor: EventStreamDescriptor) None
- classmethod from_config(config: Namespace) stream_manager
Instantiate stream_manager with a jsonargparsed config Namespace (with instantiated dataclass params).
- Example:
>>> stream_manager.from_config(stream_manager.arg_parser()) or more generally: >>> stream_manager.from_config(load_config())
Module contents
Python stream processing abstractions for use with Wikimedia Event Platform.
- class eventutilities_python.stream.EventStreamDescriptor(name: str, version: str | None = 'latest')
Bases:
object
Source and sink streams have an associated event version.
- Example:
>>> stream = EventStreamDescriptor(name="test.event.stream", version="1.1.0") >>> stream.name, stream.version
- static from_string(stream_name: str) Any
A factory method that parses stream_name and returns an instance of EventStream.
- Example:
>>> stream = EventStreamDescriptor.from_string("test.event.stream:1.1.0") >>> stream.name, stream.version
- Parameters:
stream_name – a stream name with versioning scheme “<name>:<version>”
- static is_valid_stream_name(stream_name)
Check if stream_name contains invalid characters.
This method should be invoked every time we generate a stream name from the library (e.g. error_stream auto-detection).
Stream name validation should not be enabled by default on all streams: We might want to consume streams that for historical reasons are now considered “invalid”. Howeverm we should make sure that the streams generated by eventutilities-python are valid.
- Parameters:
stream_name – a stream name
- Returns:
- is_version_pinned() bool
- name: str
- version: str | None = 'latest'
- class eventutilities_python.stream.SinkDescriptor(connector: str, stream: str, name: Optional[str] = None, options: Dict[Any, Any] = <factory>)
Bases:
ConnectorDescriptor
- connector: str
A supported connector name, e.g. “file” or “kafka”.
- options: Dict[Any, Any]
Kwargs to pass to source or sink connector.
- sink(datastream_factory: EventDataStreamFactory) Sink | SinkFunction | None
Instantiates a Flink Sink.
- Parameters:
datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using the SourceDescriptor configuration.
- sink_to(datastream_factory: EventDataStreamFactory, datastream: DataStream, to_row: bool = True) DataStreamSink | None
Instantiates a Flink Sink and sinks the datastream to it.
- Parameters:
datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using the SourceDescriptor configuration.
datastream – the datastream to write to the Sink. This datastream MUST conform to the shape (TypeInformation) of the Sink, which is set by the schema of this SinkDescriptor’s stream_descriptor.
to_row – If True, it is expected that the elements in datastream are python dicts, and they will be converted to Flink Row’s using the Sink’s TypeInformation before being written to the Sink. This means that the dict element types must conform to the stream_descriptor event schema.
- stream: str
An EventStreamDescriptor string, in the form of “stream_name:schema_version”.
- supported_connectors: ClassVar[List[str]] = ['null', 'file', 'kafka', 'print']
Note: “null” is a special case, and not a real sink. It causes both sink and sink_to to return None.
- class eventutilities_python.stream.SourceDescriptor(connector: str, stream: str, name: Optional[str] = None, options: Dict[Any, Any] = <factory>)
Bases:
ConnectorDescriptor
- connector: str
A supported connector name, e.g. “file” or “kafka”.
- datastream(env: StreamExecutionEnvironment, datastream_factory: EventDataStreamFactory, watermark_strategy: WatermarkStrategy | None = None, as_dict: bool = True) DataStream
Instantiates a Flink DataStream reading from source.
- Parameters:
env – Flink env with required Java dependencies loaded.
datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using the SourceDescriptor.
watermark_strategy – WatermarkStrategy to use for the DataStream source. Defaults to for_monotonous_timestamps() if connector is Kafka, else no_watermarks() will be used.
as_dict – If True, the datastream will be converted from Flink Row to python dict before being returned.
- options: Dict[Any, Any]
Kwargs to pass to source or sink connector.
- source(datastream_factory: EventDataStreamFactory) Source | SourceFunction
Instantiates a Flink Source.
- Parameters:
datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using this SourceDescriptor.
- stream: str
An EventStreamDescriptor string, in the form of “stream_name:schema_version”.
- supported_connectors: ClassVar[List[str]] = ['file', 'collection', 'kafka', 'sse']
List of supported connectors.
These will be checked before instantiating a concrete Connector (Source or Sink).
- eventutilities_python.stream.http_process_function(max_retries: int = 3, retry_backoff_factor: float = 0.1, retry_status_forcelist: List[int] | None = None, pool_maxsize: int = 1) Callable[[...], Any]
Decorator that wraps a function and handles instantiation of a requests Session object, with a retry policy and a worker pool.
- Usage:
>>> @http_process_function() >>> def my_func(event: dict, http_session: requests.Session): >>> event["enriched"] = http_session.get(...)["enriched_data"] >>> return event
- Parameters:
max_retries – Retry total retries
retry_backoff_factor – Retry backoff_factor
retry_status_forcelist – Retry status_forcelist. Default: [408, 500, 502, 503, 504]
pool_maxsize – HTTPAdapter pool_maxsize
- eventutilities_python.stream.load_config(argv: List[str] | None = None, parser: ArgumentParser | None = None, defaults: Dict[Any, Any] | None = None, default_config_files: List[str] | None = None) Namespace
Returns a jsonargparse Namespace object with all configured classes instantiated from configs.
Notably, the returned config.stream_manager will contain all the kwargs necessary to call the stream_manager constructor.
- Parameters:
argv – List of args to parse into config. If this is None, jsonsargparse will use sys.argv.
parser – A pre-instantiated ArgumentParser. Pass this in if you have custom arguments to configure. If None, a default ArgumentParser will be constructed.
defaults – Default values to apply when parsing args.
- Example:
>>> parser = ArgumentParser() >>> parser.add_argument('--my_custom_arg', type=str) >>> config = load_config(parser=parser) >>> with stream_manager.from_config(config) as stream: # ...
- class eventutilities_python.stream.stream_manager(job_name: str, source: SourceDescriptor, sink: SinkDescriptor, error_sink: SinkDescriptor = SinkDescriptor(connector='null', stream='stream.error:2.1.0', name='null__stream.error', options={}), schema_uris: Sequence[UriAbsolute] | UriAbsolute = ('https://schema.wikimedia.org/repositories/primary/jsonschema', 'https://schema.wikimedia.org/repositories/secondary/jsonschema'), stream_config_uri: UriAbsolute = 'https://meta.wikimedia.org/w/api.php', http_client_routes: Dict[str, str] | None = None, process_max_workers_default: int = 12, process_batch_size_default: int = 12, process_batch_duration_ms_default: int = 120000)
Bases:
object
- classmethod arg_parser(parser: ArgumentParser | None = None) ArgumentParser
Returns an ArgumentParser which can be used to auto instantiate a stream_manager from CLI and/or config files. You can augment this parser with your own CLI opts if you need to.
- Parameters:
parser – If not given, a new ArgumentParser will be created.
- argparse_namespace: str = 'stream_manager'
Prefix namespace key in which arg_parser expects stream_manager configs..
- static check_valid_stream_version(stream_descriptor: EventStreamDescriptor) None
- flink_env: StreamExecutionEnvironment
- classmethod from_config(config: Namespace) stream_manager
Instantiate stream_manager with a jsonargparsed config Namespace (with instantiated dataclass params).
- Example:
>>> stream_manager.from_config(stream_manager.arg_parser()) or more generally: >>> stream_manager.from_config(load_config())