A stateless streaming application

In this tutorial we’ll show how to write an application that implements a typical data enrichment pattern.

The application will consume mediawiki.page_change.v1 from the public eventstreams service via an SSE connector. It will produce enriched events (with a fixed content_body) into a local filesystem sink.

To follow along you’ll need Docker and Python (>=3.9) virtual environment.

python3.9 -m venv venv
source venv/bin/activate

Before getting started, we’ll need to install cookiecutter.

pip install cookiecutter

1. Scaffold a new application

cookiecutter https://gitlab.wikimedia.org/repos/data-engineering/eventutilities-python/ --directory cookiecutter-event-application

Or if you have the eventutilities-python repo cloned locally:

cookiecutter ${HOME}/repos/data-engineering/eventutilities-python/ --directory cookiecutter-event-application

For this application, we’ll use default settings

  [1/4] project_name (event_platform_app): 
  [2/4] description (): 
  [3/4] eventutilities_python_version (0.15): 
  [4/4] uses_http (N): 

2. Configure the application

Next we’ll create some local stream configuration, that will allow us to work with streams not yet available via the global EventStreamConfig service. We’ll declare mediawiki.page_change.v1 (our source), which contains events with mediawiki/page/change schema. We’ll also declare event_platform_app.sink that also uses the mediawiki/page/change schema.

  "mediawiki.page_change.v1": {
    "schema_title": "mediawiki/page/change"
  "event_platform_app.sink": {
    "schema_title": "mediawiki/page/change"
  "event_platform_app.error": {
    "schema_title": "error"

Now that have declared them, we can use these streams to configure source and sink connectors in config.yaml.

We’ll consume data from a SSE stream:

    connector: sse
    stream: mediawiki.page_change.v1:1.1.0

Here we pinned version 1.1.0 of mediawiki/page/change to the stream.

We’ll produce events into a local directory:

    connector: file
    stream: event_platform_app.sink:1.1.0
      uri: /tmp/out

Finally, we’ll route errors into a local error sink:

    connector: file 
    stream: event_platform_app.error:2.1.0
      uri: /tmp/out

Dcumentation for stream_manager config settings can be found at https://doc.wikimedia.org/data-engineering/eventutilities-python/configuration.html.

3. Implement an enrichment function

Finally we can implement an enrich function with our desired logic. Modify the application template under event_platform_app/app.py.

Here we modified the sample enrich to enrich page_change events with a content body.

def enrich(event: dict) -> dict:
        event["revision"]['content_slots']['main']['content_body'] = "content body"
        return event

We’ll need to also adjust the stream partition key to map the new schema:

stream.partition_by(lambda e: (e["wiki_id"], e["page"]["page_id"]))

In this case (wiki_id, page_id) matches the partition scheme used internally (Kafka).

Exercise: update the test fixtures under tests/events.json and the test_app.py suite to reflect this behaviour change.

4. Run the application locally

To run the application locally, you’ll need eventutilities-python[provided] dependencies.

Install the application dependencies with

pip install --extra-index-url https://gitlab.wikimedia.org/api/v4/projects/1014/packages/pypi/simple eventutilities-python[provided]

and run the application with:

python event_platform_app/app.py --config config.yaml

After startup you’ll /tmp/out will be populated with enriched events.

tail -f /tmp/out/.*

5. Run the application with docker

.pipeline/blubber.yaml will create images atop the base ones we use in k8s. It’s good practice to work and test with images before moving to the deployment phase. These image package eventutilites-python provided deps.

Build the image and run a event_platform_app-devel container with:

DOCKER_BUILDKIT=1 docker build --target development -t event_platform_app-devel -f .pipeline/blubber.yaml . 
docker run event_platform_app-devel

By default, events will be produced in /tmp/out inside the running container. You’ll need to access the container to inspect the application output.

6. Enrichment with external systems

The example we gave so far is limited to enriching a stream with some constant, self-contained, data transformation. In practice, application will often need to query external systems. eventutilities_python comes some helper for processing functions that need to access HTTP endpoints.

By convention, such function must have the following signature:

my_func(event:dict, http_session: requests.Session) -> dict

and must be decorated with eventutilities_python.functions.http_process_function.

request.Session is provided by the requests package.

For example:

from eventutilities_python.functions import http_process_function
 def my_func(event: dict, http_session: requests.Session):
    event["enriched"] = http_session.get(...)["enriched_data"]
    return event

When dispatched via stream_manager, the http_process_function decorator will monkey patch the session object with a requests.Session instance that can be scheduled on an async thread pool, and supports retry on error logic with backoff. More details can be found at https://doc.wikimedia.org/data-engineering/eventutilities-python/DESIGN.html#asynchronous-http-callback-via-datastream-minibatching/