eventutilities_python.stream package

Submodules

eventutilities_python.stream.descriptor module

class eventutilities_python.stream.descriptor.EventStreamDescriptor(name: str, version: str | None = 'latest')

Bases: object

Source and sink streams have an associated event version.

Example:
>>> stream = EventStreamDescriptor(name="test.event.stream", version="1.1.0")
>>> stream.name, stream.version
static from_string(stream_name: str) Any

A factory method that parses stream_name and returns an instance of EventStream.

Example:
>>> stream = EventStreamDescriptor.from_string("test.event.stream:1.1.0")
>>> stream.name, stream.version
Parameters:

stream_name – a stream name with versioning scheme “<name>:<version>”

static is_valid_stream_name(stream_name)

Check if stream_name contains invalid characters.

This method should be invoked every time we generate a stream name from the library (e.g. error_stream auto-detection).

Stream name validation should not be enabled by default on all streams: We might want to consume streams that for historical reasons are now considered “invalid”. Howeverm we should make sure that the streams generated by eventutilities-python are valid.

Parameters:

stream_name – a stream name

Returns:

is_version_pinned() bool
name: str
version: str | None = 'latest'
class eventutilities_python.stream.descriptor.SinkDescriptor(connector: str, stream: str, name: Optional[str] = None, options: Dict[Any, Any] = <factory>)

Bases: ConnectorDescriptor

sink(datastream_factory: EventDataStreamFactory) Sink | SinkFunction | None

Instantiates a Flink Sink.

Parameters:

datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using the SourceDescriptor configuration.

sink_to(datastream_factory: EventDataStreamFactory, datastream: DataStream, to_row: bool = True) DataStreamSink | None

Instantiates a Flink Sink and sinks the datastream to it.

Parameters:
  • datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using the SourceDescriptor configuration.

  • datastream – the datastream to write to the Sink. This datastream MUST conform to the shape (TypeInformation) of the Sink, which is set by the schema of this SinkDescriptor’s stream_descriptor.

  • to_row – If True, it is expected that the elements in datastream are python dicts, and they will be converted to Flink Row’s using the Sink’s TypeInformation before being written to the Sink. This means that the dict element types must conform to the stream_descriptor event schema.

supported_connectors: ClassVar[List[str]] = ['null', 'file', 'kafka', 'print']

Note: “null” is a special case, and not a real sink. It causes both sink and sink_to to return None.

class eventutilities_python.stream.descriptor.SourceDescriptor(connector: str, stream: str, name: Optional[str] = None, options: Dict[Any, Any] = <factory>)

Bases: ConnectorDescriptor

datastream(env: StreamExecutionEnvironment, datastream_factory: EventDataStreamFactory, watermark_strategy: WatermarkStrategy | None = None, as_dict: bool = True) DataStream

Instantiates a Flink DataStream reading from source.

Parameters:
  • env – Flink env with required Java dependencies loaded.

  • datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using the SourceDescriptor.

  • watermark_strategy – WatermarkStrategy to use for the DataStream source. Defaults to for_monotonous_timestamps() if connector is Kafka, else no_watermarks() will be used.

  • as_dict – If True, the datastream will be converted from Flink Row to python dict before being returned.

source(datastream_factory: EventDataStreamFactory) Source | SourceFunction

Instantiates a Flink Source.

Parameters:

datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using this SourceDescriptor.

supported_connectors: ClassVar[List[str]] = ['file', 'collection', 'kafka', 'sse']

List of supported connectors.

These will be checked before instantiating a concrete Connector (Source or Sink).

eventutilities_python.stream.error module

eventutilities_python.stream.error.create_error_event(error: Exception | str, emitter_id: str, error_dt: datetime | str | Instant, error_stream_name: str | None = None, errored_event: Dict[str, Any] | str | None = None) Dict[str, Any]

An error that can be serialized into a WMF error event. https://schema.wikimedia.org/#!/primary/jsonschema/error.

Parameters:
  • error – An Exception or error message

  • emitter_id – Informative name of the ‘emitter’ of this error.

  • error_dt

    event time to use for the error event. Note that this is not the same as the event time of the errored_event which caused the error. - If this is a datetime, it will be formatted into an ISO-8601 ‘Z’ string. - If this is a Flink Instant or a str, the dt will be set as is.

    (If you are creating this error event for use in a Flink pipeline, you’ll want to pass in an Instant.)

  • error_stream_name – Value to use for meta.stream. This is optional, as some serializers (e.g. wikimedia-event-utilities JsonEventGenerator) handle setting meta.stream..

  • errored_event – The input event that triggered this error. - If this is a dict, it will be serialized as a JSON string and set in the raw_event field. - Else if not None, it will be set as a str in the raw_event field.

eventutilities_python.stream.functions module

Function wrappers for the stream.manager.Stream process() func.

eventutilities_python.stream.functions.http_process_function(max_retries: int = 3, retry_backoff_factor: float = 0.1, retry_status_forcelist: List[int] | None = None, pool_maxsize: int = 1) Callable[[...], Any]

Decorator that wraps a function and handles instantiation of a requests Session object, with a retry policy and a worker pool.

Usage:
>>> @http_process_function()
>>> def my_func(event: dict, http_session: requests.Session):
>>>     event["enriched"] = http_session.get(...)["enriched_data"]
>>>     return event
Parameters:
  • max_retries – Retry total retries

  • retry_backoff_factor – Retry backoff_factor

  • retry_status_forcelist – Retry status_forcelist. Default: [408, 500, 502, 503, 504]

  • pool_maxsize – HTTPAdapter pool_maxsize

eventutilities_python.stream.manager module

eventutilities_python.stream.manager.load_config(argv: List[str] | None = None, parser: ArgumentParser | None = None, defaults: Dict[Any, Any] | None = None, default_config_files: List[str] | None = None) Namespace

Returns a jsonargparse Namespace object with all configured classes instantiated from configs.

Notably, the returned config.stream_manager will contain all the kwargs necessary to call the stream_manager constructor.

Parameters:
  • argv – List of args to parse into config. If this is None, jsonsargparse will use sys.argv.

  • parser – A pre-instantiated ArgumentParser. Pass this in if you have custom arguments to configure. If None, a default ArgumentParser will be constructed.

  • defaults – Default values to apply when parsing args.

Example:
>>> parser = ArgumentParser()
>>> parser.add_argument('--my_custom_arg', type=str)
>>> config = load_config(parser=parser)
>>> with stream_manager.from_config(config) as stream: # ...
class eventutilities_python.stream.manager.stream_manager(job_name: str, source: SourceDescriptor, sink: SinkDescriptor, error_sink: SinkDescriptor = SinkDescriptor(connector='null', stream='stream.error:2.1.0', name='null__stream.error', options={}), schema_uris: Sequence[UriAbsolute] | UriAbsolute = ('https://schema.wikimedia.org/repositories/primary/jsonschema', 'https://schema.wikimedia.org/repositories/secondary/jsonschema'), stream_config_uri: UriAbsolute = 'https://meta.wikimedia.org/w/api.php', http_client_routes: Dict[str, str] | None = None, process_max_workers_default: int = 12, process_batch_size_default: int = 12, process_batch_duration_ms_default: int = 120000)

Bases: object

classmethod arg_parser(parser: ArgumentParser | None = None) ArgumentParser

Returns an ArgumentParser which can be used to auto instantiate a stream_manager from CLI and/or config files. You can augment this parser with your own CLI opts if you need to.

Parameters:

parser – If not given, a new ArgumentParser will be created.

argparse_namespace: str = 'stream_manager'

Prefix namespace key in which arg_parser expects stream_manager configs..

static check_valid_stream_version(stream_descriptor: EventStreamDescriptor) None
classmethod from_config(config: Namespace) stream_manager

Instantiate stream_manager with a jsonargparsed config Namespace (with instantiated dataclass params).

Example:
>>> stream_manager.from_config(stream_manager.arg_parser())
or more generally:
>>> stream_manager.from_config(load_config())

Module contents

Python stream processing abstractions for use with Wikimedia Event Platform.

class eventutilities_python.stream.EventStreamDescriptor(name: str, version: str | None = 'latest')

Bases: object

Source and sink streams have an associated event version.

Example:
>>> stream = EventStreamDescriptor(name="test.event.stream", version="1.1.0")
>>> stream.name, stream.version
static from_string(stream_name: str) Any

A factory method that parses stream_name and returns an instance of EventStream.

Example:
>>> stream = EventStreamDescriptor.from_string("test.event.stream:1.1.0")
>>> stream.name, stream.version
Parameters:

stream_name – a stream name with versioning scheme “<name>:<version>”

static is_valid_stream_name(stream_name)

Check if stream_name contains invalid characters.

This method should be invoked every time we generate a stream name from the library (e.g. error_stream auto-detection).

Stream name validation should not be enabled by default on all streams: We might want to consume streams that for historical reasons are now considered “invalid”. Howeverm we should make sure that the streams generated by eventutilities-python are valid.

Parameters:

stream_name – a stream name

Returns:

is_version_pinned() bool
name: str
version: str | None = 'latest'
class eventutilities_python.stream.SinkDescriptor(connector: str, stream: str, name: Optional[str] = None, options: Dict[Any, Any] = <factory>)

Bases: ConnectorDescriptor

connector: str

A supported connector name, e.g. “file” or “kafka”.

options: Dict[Any, Any]

Kwargs to pass to source or sink connector.

sink(datastream_factory: EventDataStreamFactory) Sink | SinkFunction | None

Instantiates a Flink Sink.

Parameters:

datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using the SourceDescriptor configuration.

sink_to(datastream_factory: EventDataStreamFactory, datastream: DataStream, to_row: bool = True) DataStreamSink | None

Instantiates a Flink Sink and sinks the datastream to it.

Parameters:
  • datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using the SourceDescriptor configuration.

  • datastream – the datastream to write to the Sink. This datastream MUST conform to the shape (TypeInformation) of the Sink, which is set by the schema of this SinkDescriptor’s stream_descriptor.

  • to_row – If True, it is expected that the elements in datastream are python dicts, and they will be converted to Flink Row’s using the Sink’s TypeInformation before being written to the Sink. This means that the dict element types must conform to the stream_descriptor event schema.

stream: str

An EventStreamDescriptor string, in the form of “stream_name:schema_version”.

supported_connectors: ClassVar[List[str]] = ['null', 'file', 'kafka', 'print']

Note: “null” is a special case, and not a real sink. It causes both sink and sink_to to return None.

class eventutilities_python.stream.SourceDescriptor(connector: str, stream: str, name: Optional[str] = None, options: Dict[Any, Any] = <factory>)

Bases: ConnectorDescriptor

connector: str

A supported connector name, e.g. “file” or “kafka”.

datastream(env: StreamExecutionEnvironment, datastream_factory: EventDataStreamFactory, watermark_strategy: WatermarkStrategy | None = None, as_dict: bool = True) DataStream

Instantiates a Flink DataStream reading from source.

Parameters:
  • env – Flink env with required Java dependencies loaded.

  • datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using the SourceDescriptor.

  • watermark_strategy – WatermarkStrategy to use for the DataStream source. Defaults to for_monotonous_timestamps() if connector is Kafka, else no_watermarks() will be used.

  • as_dict – If True, the datastream will be converted from Flink Row to python dict before being returned.

options: Dict[Any, Any]

Kwargs to pass to source or sink connector.

source(datastream_factory: EventDataStreamFactory) Source | SourceFunction

Instantiates a Flink Source.

Parameters:

datastream_factory – An EventDataStreamFactory that will be used to create the Flink source using this SourceDescriptor.

stream: str

An EventStreamDescriptor string, in the form of “stream_name:schema_version”.

supported_connectors: ClassVar[List[str]] = ['file', 'collection', 'kafka', 'sse']

List of supported connectors.

These will be checked before instantiating a concrete Connector (Source or Sink).

eventutilities_python.stream.http_process_function(max_retries: int = 3, retry_backoff_factor: float = 0.1, retry_status_forcelist: List[int] | None = None, pool_maxsize: int = 1) Callable[[...], Any]

Decorator that wraps a function and handles instantiation of a requests Session object, with a retry policy and a worker pool.

Usage:
>>> @http_process_function()
>>> def my_func(event: dict, http_session: requests.Session):
>>>     event["enriched"] = http_session.get(...)["enriched_data"]
>>>     return event
Parameters:
  • max_retries – Retry total retries

  • retry_backoff_factor – Retry backoff_factor

  • retry_status_forcelist – Retry status_forcelist. Default: [408, 500, 502, 503, 504]

  • pool_maxsize – HTTPAdapter pool_maxsize

eventutilities_python.stream.load_config(argv: List[str] | None = None, parser: ArgumentParser | None = None, defaults: Dict[Any, Any] | None = None, default_config_files: List[str] | None = None) Namespace

Returns a jsonargparse Namespace object with all configured classes instantiated from configs.

Notably, the returned config.stream_manager will contain all the kwargs necessary to call the stream_manager constructor.

Parameters:
  • argv – List of args to parse into config. If this is None, jsonsargparse will use sys.argv.

  • parser – A pre-instantiated ArgumentParser. Pass this in if you have custom arguments to configure. If None, a default ArgumentParser will be constructed.

  • defaults – Default values to apply when parsing args.

Example:
>>> parser = ArgumentParser()
>>> parser.add_argument('--my_custom_arg', type=str)
>>> config = load_config(parser=parser)
>>> with stream_manager.from_config(config) as stream: # ...
class eventutilities_python.stream.stream_manager(job_name: str, source: SourceDescriptor, sink: SinkDescriptor, error_sink: SinkDescriptor = SinkDescriptor(connector='null', stream='stream.error:2.1.0', name='null__stream.error', options={}), schema_uris: Sequence[UriAbsolute] | UriAbsolute = ('https://schema.wikimedia.org/repositories/primary/jsonschema', 'https://schema.wikimedia.org/repositories/secondary/jsonschema'), stream_config_uri: UriAbsolute = 'https://meta.wikimedia.org/w/api.php', http_client_routes: Dict[str, str] | None = None, process_max_workers_default: int = 12, process_batch_size_default: int = 12, process_batch_duration_ms_default: int = 120000)

Bases: object

classmethod arg_parser(parser: ArgumentParser | None = None) ArgumentParser

Returns an ArgumentParser which can be used to auto instantiate a stream_manager from CLI and/or config files. You can augment this parser with your own CLI opts if you need to.

Parameters:

parser – If not given, a new ArgumentParser will be created.

argparse_namespace: str = 'stream_manager'

Prefix namespace key in which arg_parser expects stream_manager configs..

static check_valid_stream_version(stream_descriptor: EventStreamDescriptor) None
classmethod from_config(config: Namespace) stream_manager

Instantiate stream_manager with a jsonargparsed config Namespace (with instantiated dataclass params).

Example:
>>> stream_manager.from_config(stream_manager.arg_parser())
or more generally:
>>> stream_manager.from_config(load_config())