Project Status: WIP – Initial development is in progress, but there has not yet been a stable, usable release suitable for the public.

WARNING: code in this repo is experimental. Expect things to be broken.

See the design document that describes assumptions and approaches of this library is available at in docs/DESIGN.md.

Eventutilities Python

A Python companion to eventutilities.

The library design doc is available at doc/DESIGN.md. See CONTRIBUTION.md for getting started with developing on this project.

For some example usage of this package see: https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-event-enrichment

eventutilities-python and its dependencies can be installed with:

pip install --extra-index-url https://gitlab.wikimedia.org/api/v4/projects/1014/packages/pypi/simple eventutilities-python[provided]

Getting started

Create a new application using the cookiecutter template:

python3 -m pip install --user cookiecutter
cookiecutter git+https://gitlab.wikimedia.org/repos/data-engineering/eventutilities-python.git --directory cookiecutter-event-pipeline

Caveats

Currently, if you enable uses_http you have to provide the config manually to create the http session by using the nested key http_session and the config listed in functions.http_process_function

If running on a machine in the internal wmf network, for example on a stat machine, make sure to unset http proxies for localhost before launching a Flink local job:

no_proxy=127.0.0.1,localhost,.wmnet

See https://wikitech.wikimedia.org/wiki/HTTP_proxy for additional information how to configure http proxies on analytics hosts.

Streaming application

eventutilities-python comes with a ‘stream manager’ ContextManager interface that abstracts some of the complexities of Event Platform and stream processing of simple pipelines. Configuration is managed via the CLI and config files.

stream_manager ‘enrichment’ example:

This example will demonstrate creating a stream processing application that consumes a stream of events, transforms / enriches them using a custom python function, and then produces a new stream of events.

# stream_enrich.py

from eventutilities_python.stream import stream_manager, load_config


def enrich_event(event: dict):
    """
    A map function that takes an event dict, transforms it in some way,
    and then returns it. 
    """
    event["new_field"] = "I am enriched"


def main(config=None):
    # Load configuration from defaults.
    config = config or load_config()
    with stream_manager.from_config(config) as stream:
        stream.process(enrich_event)
        stream.execute()


if __name__ == '__main__':
    main()

Because we call load_config, we get handy CLI and config file configuration.

Run python stream_enrich.py --help to print out a detailed help message with all of the available options.

python stream_enrich.py --print_config=comments will auto generate a yaml config file.

We are using jsonargparse for configuration and CLI parsing, see docs there for usage info. Config file and CLI opt merging are supported. For example, you could provide defaults in your job code or a config file:

stream_manager:
  # Will be used in UI and metric names
  job_name: my_enrich_job_name

  # Will be used to instantiate and then read from sources
  source:
    connector: null # source type, e.g. kafka or file
    stream: null # stream descriptor, stream_name:schema_version
    name: null # will be used in metrics and UI
    options: {} # connector specific options
  # Will be used to instantiate and write the final output.
  sink:
    connector: null # sink type, e.g. kafka, or file
    stream: null # stream descriptor, stream_name:schema_version
    name: null # will be used in metrics and UI
    options: {} # connector specific options
  schema_uris:
  - https://schema.wikimedia.org/primary/jsonschema
  - https://schema.wikimedia.org/secondary/jsonschema
  stream_config_uri: https://meta.wikimedia.org/w/api.php
  error_sink: null
  auto_error_sink_enabled: false
  kafka_topic_prefix: ''
  http_client_routes: null

Then, you could launch your flink application like:

flink run -py stream_enrich.py --config ./my_config.yaml --stream_config_uri=/path/to/local/stream_config.json

Supported sources and sinks:

Fink Sinks and Sources definition are abstracted via ‘Descriptor’ dataclasses. SourceDescriptor defines a list of support sources, and SinkDescriptor defines a list of supported sinks.

The descriptors are mostly a thin dataclass/config proxy to calling flink EventDataStreamFactory methods to instantiate Flink Sources or DataStreams, or Flink Sinks.

(As of 2023-06:)

The supported sources are:

  • file - read events in from a list of files.

  • collection - read events in from a list of Python dicts.

  • kafka - read events from kafka.

  • sse - read events from an HTTP Server Sent Events endpoint. NOTE: this should only be used for development.

The supported sinks are:

  • file - write events to files in a directory.

  • kafka - write events to kafka.

  • null - don’t use any sink. This is a dummy sink, and is used for the default stream_manager error_sink param.

  • print - write events to stdout or stderr NOTE: print and should only be used for development.


TODO:

-[ ] Better config generation using cookieninja instead of cookiecutter -[ ] Derive example events from stream schema for tests instead of using eventgate-main.test.event