How to Harness Prefect for Optimal Workflow Orchestration

BY ALIX COOK
Senior Software Engineer

A Common Challenge

We all know that as organizations and code bases grow, engineering and data teams often end up with a patchwork of loosely managed and often undocumented ad-hoc scripts, cron jobs, and background tasks to automate workflows, putting even production-critical deployments at risk. We see our client teams struggle with two primary problems when they grow organically without an orchestrator: seeing and recovering from broken workflows is time-consuming and difficult, and deploying new workflows is a slow and complicated process.

The solution may seem obvious; there are dozens of workflow management platforms available to unify these scripts under a centralized platform…just pick one, right? 

Our Approach: Tailor and Codify Your Process

Sounds simple, but the key is in how the platform is set up. Options like Airflow, Jenkins and Prefect offer a suite of tools to help visualize, orchestrate and observe workflows, but they can be a drag on developer productivity if introduced without clearly defined standards, guardrails and tooling. Teams with minimal (or no) infrastructure specialists may not have the skills or the budget to properly implement or maintain new workflows, setting the stage for even more disarray.

Here at APrime, we’re adamant that codifying a set of tools and patterns around workflow management–and tailoring those tools and patterns to a team’s unique engineering skill set–is essential to the team’s long-term success. In this post, I’ll walk you through how to do that, based on our structured approach of aligning on these three foundational areas before kicking off an implementation:

  1. Deployment Strategy: How will new workflows get deployed?
  2. Configuration and Standardization across infrastructure and compute resources: Where will workflows run and how do we ensure sufficient resources to support all running processes?
  3. Metrics, Monitoring and Observability: How will we ensure the reliability and stability of production-critical workflows, and detect issues and anomalies in real-time?  

On tooling: There are many solid options on the market, but for the purposes of this guide, I’ll be using Prefect. I personally prefer Prefect—and we are one of their featured partners—because it can run workflows in almost any environment or managed cloud platform, offers the flexibility to support most workflow needs, including ad-hoc parameterized jobs and scheduled crons, and their “hybrid” model is particularly well-suited for workflows running sensitive data, without the need for a self-managed orchestrator. In this setup, I use Kubernetes as the runtime infrastructure, DataDog as the metrics platform and CircleCI as the deployment mechanism.

Step 1: Deployment Strategy

One of the most important questions to answer after building something new is “how do I deploy it?” Manual and repetitive deployment processes often lead to human error, and take unnecessary time, leading to wasted hours and missed deadlines, and worse. An automated deployment process is crucial for deploying workflows on any workflow orchestration platform. 

Prefect offers a solution by allowing you to configure your workflows and their deployment entirely in Python. Leveraging Prefect’s Python-based configuration, we can ensure efficient and error-free deployments, saving time and improving overall workflow management.

In Prefect, a “flow” is a container for the workflow logic, and is basically just a Python function that is decorated with Prefect’s @flow Python decorator. A “Deployment” is a concept that encapsulates a flow and allows Prefect to manage and remotely run the flow.

A flow in Prefect using the tools out of the box would look something like:

# my_project/flows.py
from prefect import flow

@flow()
def my_important_workflow():
	# my important business logic

This works if I just want to run my_important_workflow() locally by invoking the Python function. However, in order to run this on my production infrastructure, I would have to do the following:

  1. Package and publish the code, e.g. Docker or S3
  2. Launch infrastructure to run my code, e.g. Kuberntetes, ECS
  3. Configure and authorize Prefect to run my code on the desired infrastructure

We can simplify the deployment process and short-circuit the process outlined above by extending Prefect’s flow() decorator. With this implementation, we can create a centralized and opinionated deployment entry point:

# my_project/registry.py

# my_project/registry.py

class FlowRegistry:
	deployments: list[Flow] = []

	def flow(
        self,
        *,
        **kwargs,
    ):
        def decorated(fn):

# use prefect's flow decorator to make this function a flow
            prefect_flow = flow(fn, **kwargs)

		 deployment = Deployment.build_from_flow(
flow=deploy,
name=deploy.__name__,
infrastructure=KubernetesJob( image=img, namespace=NAMESPACE, manifest=manifest,)
)
            self.deployments.append(prefect_flow)
            return prefect_flow

        return decorated

	# Flow deployment mechanism
	async def deploy_flows(
        self,
    ) -> None:
		for deploy in self.deployments:
			deploy.apply()

Using our custom decorator, our workflow will now look like:

# my_project/flows.py

from prefect.orion.schemas.schedules import IntervalSchedule
from datetime import timedelta

from my_proj import FlowRegistry

fr = FlowRegistry()

@fr.flow()
def my_important_workflow():
	# my important business logic

Now, all we need to do is invoke deployment creation in an automated way. We can create a simple Python script that we’ll run in CI:

# my_project/deploy.py
from os import getenv

from my_project.flows import fr

def deploy(img: str):
	fr.deploy_flows(img)

if __name__ == '__main__':
    img = os.getenv("DEPLOY_IMAGE")
    if not img:
        raise ValueError("DEPLOY_IMAGE must be set in the environment")
	deploy(img)

The CI process will set DEPLOY_IMAGE and run `deploy.py` to deploy new flows. By using a few simple Python tricks, we’ve developed a lightweight deployment script that abstracts away the specifics of Docker and Kubernetes and allows the engineer to focus on hardening the underlying code. Now, all the engineer needs to do is simply write the Python function that implements their workflow logic.

Step 2: Configuration and Standardization

Centralizing the deployment logic also allows us to expose workflow configuration in a single location. For example, we can expose a `schedule` parameter to our flow decorator that allows the flow deployment’s schedule to be defined inline with the workflow function.

class FlowRegistry

class FlowRegistry:
...
	def flow(
        self,
        *,
        schedule, 
        **kwargs,
    ):
        ...

		 deployment = Deployment.build_from_flow(
		 	flow=deploy,
		 	name=deploy.__name__,
		 	schedule=schedule,
		 ...
		 )
            self.deployments.append(prefect_flow)
            return prefect_flow

We can also enforce standardization for the flows at the deployment level. For example, we can directly configure the Kubernetes job manifests to enforce labels, resource constraints, and job lifecycle, as well as limit concurrency.

async def update_or_create_work_queue(
    work_queue_name: str, work_pool_name: str, concurrency_limit: Optional[int] = None
):
    concurrency_limit = (
        min(concurrency_limit, DEFAULT_CONCURRENCY_LIMIT)
        if concurrency_limit is not None
        else DEFAULT_CONCURRENCY_LIMIT
    )
... Create or update the pool based on limits

By setting resource constraints and concurrency limits, we guard against code changes that have unintended consequences, like a huge increase in cloud spend or a change that inadvertently prevents other workflows from running.

Step 3: Metrics, Monitoring and Observability

Our registry isn’t just useful for streamlining deployments; we can also leverage it to execute pre- and post- workflow execution steps. One great use case is to emit metrics about our workflows.

To do this, we create a new Python decorator that emits a DataDog event before and after each workflow is executed.

import functools

import functools

from datadog import api
from prefect.context import get_run_context

def datadog_event(fn):
	# decorator that will push datadog events at the beginning and end of
	# each flow run
    @functools.wraps(fn)
    def decorated(*args, **kwargs):
        context = get_run_context()
        flow_name = context.flow.name
        api.Event.create(
            title="Flow Started",
            text=f"flow run for {flow_name} started",
            tags=[f"flow:{flow_name}"],
        )
        try:
            ret = fn(*args, **kwargs)
        except Exception as e:
            api.Event.create(
                title="Flow Errored",
                text=f"flow run for {flow_name} errored",
                tags=[f"flow:{flow_name}"],
            )
            raise

        api.Event.create(
            title="Flow Completed",
            text=f"flow run for {flow_name} completed",
            tags=[f"flow:{flow_name}"],
        )

        return ret

    return decorated

We then simply wrap the flow with our decorator function in our `flow` decorator. Each deployed workflow now automatically emits metrics that can easily be turned into templated Datadog dashboards and monitors, because they all follow the same format.

Similarly, we recommend leveraging this method to set up additional functionality like logging context and flow duration metrics.

Workflow…Optimized!

We’ve now implemented this customized Prefect setup with multiple clients, and they’ve already seen massive benefits. We leveraged the Prefect SDK to support low-effort deployments, enforce guardrails on compute usage, and automatically emit custom metrics, significantly reducing the cognitive overhead of our clients’ engineering and DevOps teams responsible for running those workflows. And even beyond that, our clients have seen very real time and cost savings, not only by reducing the time and complexity involved in setting up a new workflow, but also in reducing support costs. In one case, introducing our Prefect setup reduced the company’s on-call burden by reducing time spent on on-call issues by 50%. 

We’re here to help.

APrime Engineers

Here at APrime Technology, we initially designed and implemented this entire framework for one of our health tech clients over the course of just a couple weeks; now we can reuse that framework to get any client up and running with Prefect’s basic workflow management in a matter of hours.

Introducing a new tool is always going to be difficult, but crafting a toolkit that caters to your engineering team’s needs empowers them to hit the ground running–minimizing the efficiency hit so common to initial migrations, and reducing the toil of running asynchronous workflows over time. Feel free to explore my example project on Github and please don’t hesitate to share your comments and questions!

And if your team is interested in those short-term gains along with exponential time savings over the long haul (translating to lower costs, fewer deployment incidents, and shorter time-to-market for new features), simply send us a message or schedule a call. We’re excited to learn more about your unique setup and challenges, and to explore how we can help.

Let Aprime help you overcome your challenges

and build your core technology

Are you ready to accelerate?