WARNING: code in this repo is experimental. Expect things to be broken.
See the design document that describes assumptions and approaches of this library is available at in docs/DESIGN.md.
Eventutilities Python
A Python companion to eventutilities.
The library design doc is available at doc/DESIGN.md
.
See CONTRIBUTION.md
for getting started with developing on this project.
For some example usage of this package see: https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-event-enrichment
eventutilities-python
and its dependencies can be installed with:
pip install --extra-index-url https://gitlab.wikimedia.org/api/v4/projects/1014/packages/pypi/simple eventutilities-python[provided]
Getting started
Create a new application using the cookiecutter template:
python3 -m pip install --user cookiecutter
cookiecutter git+https://gitlab.wikimedia.org/repos/data-engineering/eventutilities-python.git --directory cookiecutter-event-pipeline
Caveats
Currently, if you enable uses_http
you have to provide the config manually to create the http session
by using the nested key http_session
and the config listed in functions.http_process_function
If running on a machine in the internal wmf network, for example on a stat machine, make sure to unset http proxies for localhost before launching a Flink local job:
no_proxy=127.0.0.1,localhost,.wmnet
See https://wikitech.wikimedia.org/wiki/HTTP_proxy for additional information how to configure http proxies on analytics hosts.
Streaming application
eventutilities-python comes with a ‘stream manager’ ContextManager interface that abstracts some of the complexities of Event Platform and stream processing of simple pipelines. Configuration is managed via the CLI and config files.
stream_manager
‘enrichment’ example:
This example will demonstrate creating a stream processing application that consumes a stream of events, transforms / enriches them using a custom python function, and then produces a new stream of events.
# stream_enrich.py
from eventutilities_python.stream import stream_manager, load_config
def enrich_event(event: dict):
"""
A map function that takes an event dict, transforms it in some way,
and then returns it.
"""
event["new_field"] = "I am enriched"
def main(config=None):
# Load configuration from defaults.
config = config or load_config()
with stream_manager.from_config(config) as stream:
stream.process(enrich_event)
stream.execute()
if __name__ == '__main__':
main()
Because we call load_config
, we get handy CLI and config file
configuration.
Run python stream_enrich.py --help
to print out a detailed help message with
all of the available options.
python stream_enrich.py --print_config=comments
will auto generate a yaml config file.
We are using jsonargparse for configuration and CLI parsing, see docs there for usage info. Config file and CLI opt merging are supported. For example, you could provide defaults in your job code or a config file:
stream_manager:
# Will be used in UI and metric names
job_name: my_enrich_job_name
# Will be used to instantiate and then read from sources
source:
connector: null # source type, e.g. kafka or file
stream: null # stream descriptor, stream_name:schema_version
name: null # will be used in metrics and UI
options: {} # connector specific options
# Will be used to instantiate and write the final output.
sink:
connector: null # sink type, e.g. kafka, or file
stream: null # stream descriptor, stream_name:schema_version
name: null # will be used in metrics and UI
options: {} # connector specific options
schema_uris:
- https://schema.wikimedia.org/primary/jsonschema
- https://schema.wikimedia.org/secondary/jsonschema
stream_config_uri: https://meta.wikimedia.org/w/api.php
error_sink: null
auto_error_sink_enabled: false
kafka_topic_prefix: ''
http_client_routes: null
Then, you could launch your flink application like:
flink run -py stream_enrich.py --config ./my_config.yaml --stream_config_uri=/path/to/local/stream_config.json
Supported sources and sinks:
Fink Sinks and Sources definition are abstracted via ‘Descriptor’ dataclasses.
SourceDescriptor
defines a list of support sources, and SinkDescriptor
defines
a list of supported sinks.
The descriptors are mostly a thin dataclass/config proxy to calling
flink EventDataStreamFactory
methods to instantiate Flink Sources or DataStreams,
or Flink Sinks.
(As of 2023-06:)
The supported sources are:
file
- read events in from a list of files.collection
- read events in from a list of Python dicts.kafka
- read events from kafka.sse
- read events from an HTTP Server Sent Events endpoint. NOTE: this should only be used for development.
The supported sinks are:
file
- write events to files in a directory.kafka
- write events to kafka.null
- don’t use any sink. This is a dummy sink, and is used for the default stream_managererror_sink
param.print
- write events to stdout or stderr NOTE:print
and should only be used for development.
TODO:
-[ ] Better config generation using cookieninja instead of cookiecutter -[ ] Derive example events from stream schema for tests instead of using eventgate-main.test.event