A stateless streaming application
In this tutorial we’ll show how to write an application that implements a typical data enrichment pattern.
The application will consume mediawiki.page_change.v1
from the public eventstreams service via an SSE connector. It
will produce enriched events (with a fixed content_body
) into a local filesystem sink.
To follow along you’ll need Docker and Python (>=3.9) virtual environment.
python3.9 -m venv venv
source venv/bin/activate
Before getting started, we’ll need to install cookiecutter.
pip install cookiecutter
1. Scaffold a new application
cookiecutter https://gitlab.wikimedia.org/repos/data-engineering/eventutilities-python/ --directory cookiecutter-event-application
Or if you have the eventutilities-python repo cloned locally:
cookiecutter ${HOME}/repos/data-engineering/eventutilities-python/ --directory cookiecutter-event-application
For this application, we’ll use default settings
[1/4] project_name (event_platform_app):
[2/4] description ():
[3/4] eventutilities_python_version (0.15):
[4/4] uses_http (N):
2. Configure the application
Next we’ll create some local stream configuration, that will allow us to work with streams not yet available
via the global EventStreamConfig service.
We’ll declare mediawiki.page_change.v1
(our source), which contains events with mediawiki/page/change
schema.
We’ll also declare event_platform_app.sink
that also uses the mediawiki/page/change
schema.
{
"mediawiki.page_change.v1": {
"schema_title": "mediawiki/page/change"
},
"event_platform_app.sink": {
"schema_title": "mediawiki/page/change"
},
"event_platform_app.error": {
"schema_title": "error"
}
}
Now that have declared them, we can use these streams to configure source and sink connectors in config.yaml
.
We’ll consume data from a SSE stream:
source:
connector: sse
stream: mediawiki.page_change.v1:1.1.0
Here we pinned version 1.1.0
of mediawiki/page/change
to the stream.
We’ll produce events into a local directory:
sink:
connector: file
stream: event_platform_app.sink:1.1.0
options:
uri: /tmp/out
Finally, we’ll route errors into a local error sink:
error_sink:
connector: file
stream: event_platform_app.error:2.1.0
options:
uri: /tmp/out
Dcumentation for stream_manager
config settings can be found at https://doc.wikimedia.org/data-engineering/eventutilities-python/configuration.html.
3. Implement an enrichment function
Finally we can implement an enrich
function with our desired logic. Modify the application template under event_platform_app/app.py
.
Here we modified the sample enrich
to enrich page_change
events with a content body.
def enrich(event: dict) -> dict:
event["revision"]['content_slots']['main']['content_body'] = "content body"
return event
We’ll need to also adjust the stream partition key to map the new schema:
stream.partition_by(lambda e: (e["wiki_id"], e["page"]["page_id"]))
In this case (wiki_id, page_id)
matches the partition scheme used internally (Kafka).
Exercise: update the test fixtures under tests/events.json
and the test_app.py
suite
to reflect this behaviour change.
4. Run the application locally
To run the application locally, you’ll need eventutilities-python[provided]
dependencies.
Install the application dependencies with
pip install --extra-index-url https://gitlab.wikimedia.org/api/v4/projects/1014/packages/pypi/simple eventutilities-python[provided]
and run the application with:
python event_platform_app/app.py --config config.yaml
After startup you’ll /tmp/out
will be populated with enriched events.
tail -f /tmp/out/.*
5. Run the application with docker
.pipeline/blubber.yaml
will create images atop the base ones we use in k8s. It’s good practice to work and test
with images before moving to the deployment phase.
These image package eventutilites-python
provided deps.
Build the image and run a event_platform_app-devel
container with:
DOCKER_BUILDKIT=1 docker build --target development -t event_platform_app-devel -f .pipeline/blubber.yaml .
docker run event_platform_app-devel
By default, events will be produced in /tmp/out
inside the running container. You’ll need to access the container
to inspect the application output.
6. Enrichment with external systems
The example we gave so far is limited to enriching a stream with some constant, self-contained, data transformation. In practice,
application will often need to query external systems. eventutilities_python
comes some helper for processing functions that
need to access HTTP endpoints.
By convention, such function must have the following signature:
my_func(event:dict, http_session: requests.Session) -> dict
and must be decorated with eventutilities_python.functions.http_process_function
.
request.Session
is provided by the requests
package.
For example:
from eventutilities_python.functions import http_process_function
@http_process_function()
def my_func(event: dict, http_session: requests.Session):
event["enriched"] = http_session.get(...)["enriched_data"]
return event
When dispatched via stream_manager
, the http_process_function
decorator will monkey patch
the session object with a requests.Session
instance that can be scheduled on an async thread pool,
and supports retry on error logic with backoff.
More details can be found at https://doc.wikimedia.org/data-engineering/eventutilities-python/DESIGN.html#asynchronous-http-callback-via-datastream-minibatching/