07 Oct 2024
At Zipline, we offer a one-stop shop application for retail employees that handles everything from clocking to task management and team communication. To achieve this, we need to integrate with numerous third-party applications with unique requirements. We decided to develop a satellite service called ZipIO for data synchronization. In this post, I’ll review our journey of building this robust and scalable system.
Language and Framework: We chose Ruby on Rails for its simplicity and productivity. However, we structured our core logic as POROs (Plain Old Ruby Object) to decouple the core logic from the engine that runs it.
Modular Architecture: We broke down the workflow into discrete steps: Signals, Commands, Outcomes, and Tasks. This modular approach allows for easy reuse of logic across different third-party integrations.
Asynchronous Processing: While we support synchronous operations for specific use cases, 99% of our workflows run asynchronously for optimal performance and scalability.
Robust Error Handling: Each step in the workflow is designed to handle errors gracefully, ensuring smooth operation even when issues arise.
Full Visibility: We prioritized comprehensive logging and monitoring to make troubleshooting and maintenance as painless as possible.
Our workflow consists of four main kind of steps:
Each step in the workflow is designed as a standalone Ruby object that takes a data input, performs an action, and returns the next step(s) to be executed. Since each step defines the downstream step(s), it can perform complex routing depending on input data or user configuration. For example, a Signal could return several Commands to sync an event to multiple third-party services.
flowchart LR
A[Signal: New Order]
B[Command: Sync to service A]
C[Command: Sync to service B]
BB[Outcome: Success]
CB[Outcome: Success]
A --> B
A --> C
B --> BB
C --> CB
The data inputs are defined as Dry::Struct
. Each step declares the type(s) of step it returns, adding an extra layer of type safety and allowing us to generate documentation automatically.
Here is an example:
# This signal is created when we receive a new order via a webhook.
class Stripe::Signals::NewOrder < ZipIO::Step
data_struct [Stripe::Structs::Order]
emits MyStore::Commands::SaveOrder
# Convert Stripe Order to a Generic Struct and return a Command to create the order
def call
order = Transformer.call(data, to: Generic::Structs::Order)
MyStore::Commands::CreateOrder.new(order)
end
end
class Stripe::Commands::CreateOrder < ZipIO::Step
# Create an order via an API and return an Outcome according to the response.
def call
response = api_client.orders.post(data)
if response.success?
MyStore::Outcome::OrderCreated.new(response.parsed_body)
else
MyStore::Outcome::OrderCreateFailure.new(response.parsed_body)
end
private
def api_client
# ...
end
end
# Two outcomes. They are the final steps of the workflow so they don't do anything.
Stripe::Commands::OrderCreated = Class.new(ZipIO::Outcomes::Success)
Stripe::Commands::OrderCreateFailure = Class.new(ZipIO::Outcomes::Failure)
# Order received from Stripe.
class Stripe::Structs::Order < DryStruct
attribute :amount, Types::Decimal
attribute :order_id, Types::String
attribute :user_id, Types::String
attribute :email, Types::String
attribute :payment_type, Types::String
end
# Generic order used throughout the system.
class Generic::Structs::Order < DryStruct
attribute :amount, Types::Decimal
attribute :order_id, Types::String
attribute :integration_key, Types::String
attribute :email, Types::String
attribute :payment_source, Types::String.enum('credit_card', 'debit_card')
end
The core logic is to transform data from one schema (ex: Stripe Order) to another (ex: Generic Order). We created a simple library inspired by csv-importer to define data transformation using a DSL.
class StripeOrderToGenericOrder < ZipIO::Transformer
# map `amount` to `amount`
attribute :amount
# map `stripe_order_id` to `order_id`
attribute :order_id, source: :stripe_order_id
# build a custom integration key
attribute :integration_key, source: ->(data) {
[data.fetch(:user_id), data.fetch(:order_id)].join("-")
}
# downcase email
attribute :email, transform: -> { _1.&downcase }
# Define mapping
attribute :payment_source, source: :payment_type, transform: Mapper.new(
"cc" => "credit_card",
"dc" => "debit_card",
# ...
)
end
Using a proper naming convention, we can look up the transformers on the fly:
Transformer.call(stripe_order, to: Generic::Structs::Order)
# => <# Generic::Structs::Order ...>
Since Steps, Structs, and Transformers are defined using DSLs, we can introspect them to generate documentation. This documentation is available in our Admin UI so that non-technical folks (Product Owners, Account Managers, …) understand how the data flows and how it’s transformed.
We traverse the graph of emit
-ed steps to render a flow chart:
flowchart LR
A[Stripe::Signals::Webhook] --> B[Stripe::Signals::PaymentSucceeded]
A[Stripe::Signals::Webhook] --> C[Stripe::Signals::NewOrder]
A[Stripe::Signals::Webhook] --> D[Stripe::Outcome::IgnoredWebhookEvent]
C --> E[MyStore::Commands::CreateOrder]
E --> F[MyStore::Outcome::OrderCreated]
E --> G[MyStore::Outcome::OrderCreateFailure]
L[Shopify::Signals::Webhook] --> M
M[Shopify::Signals::NewOrder] --> E
We look up all Structs and Transformers to display them in a human-friendly way.
Here is for example the documentation for StripeOrderToGenericOrder
:
Source | Target | Transform | |||||||
---|---|---|---|---|---|---|---|---|---|
amount | ➡️ | amount | |||||||
stripe_order_id | ➡️ | order_id | |||||||
Proc[data.fetch(:user_id), data.fetch(:order_id)].join("-") |
➡️ | integration_key | |||||||
➡️ | Proc-> { _1.&downcase }
|
||||||||
payment_source | ➡️ | payment_type |
|
So far, we’ve defined the steps, structs, and transformations. Let’s see how we make data flow through them.
Processing a Workflow synchronously is straightforward.
# Run a workflow synchronously and return a collection of child steps.
# Ex: [ Stripe::Signals::NewOrder, [ MyStore::Commands::CreateOrder, [ MyStore::Outcome::OrderCreated ] ] ]
def run(step)
return step if step.is_a?(Symbol) # e.g. :success, :skip, ...
next_steps = Array(step.call)
next_steps.map do |next_step|
run(next_step)
end
end
We use this approach to perform integration tests where we want to exerce an entire workflow. On a production environment, we persist Pipelines and Steps to the database and we process them asynchronously.
We use a minimal set of Rails models:
Pipeline
: A specific instance of a pipelinePipelineStep
: Individual steps within a pipelineWhen a signal is received:
This approach allows for excellent scalability, fault tolerance, and visibility.
By persisting each step in the database, we can retry failed steps without restarting the entire workflow. This is particularly useful for long sequences of API calls where only one step might fail.
Our service runs on Heroku, using Judoscale to auto-scale background workers. This setup allows us to handle peak loads efficiently by spinning up additional workers as needed.
In the example below, we handle Stripe webhooks. Each webhook can contain multiple events, so we start a pipeline where the first step breaks down each event into its own Signal.
class StripeWebhooksController < ApplicationController
# POST /webhooks/stripe
# The webhooks can contain multiple events.
# Build a Webhook signal from the raw payload and start a Pipeline.
def create
signal = Stripe::Signals::Webhook.new(params)
Pipeline.start!(signal)
end
end
class Stripe::Signals::Webhook < ZipIO::Signal
# Return an array of individual signals.
# These signals will be processed in parallel.
def call(webhook)
webhook.events.filter_map do |event|
case event.type
when 'payment_intent.succeeded'
Stripe::Signals::PaymentSucceeded.new(event)
when 'orders.succeeded'
# See definition above.
Stripe::Signals::NewOrder.new(event)
else
Stripe::Outcome::IgnoredWebhookEvent.new(event)
end
end
end
end
At a high level, the Pipeline implementation looks like this:
class Pipeline < ApplicationRecord
# Persist the signal as the first step of the pipeline and process it asynchronously.
def self.start!(signal)
PipelineStep.enqueue(signal)
end
end
class PipelineStep < ApplicationRecord
# Persist the step and process it asynchronously.
def self.enqueue(step, pipeline: nil)
pipeline ||= Pipeline.create!
step_model = create!(data: step.attributes, step_class: step.class.name, pipeline: pipeline)
step_model.process_async!
end
# Enqueue a job that calls `process!`
def process_async!
ProcessPipelineStepJob.perform_async(self)
end
# Constantize the persisted step, trigger `call`, and enqueue the resulting steps.
def process!
step = step_class.constantize.new(data) # ex: Stripe::Signals::Webhook
# Trigger the step
resulting_step = step.call
return if resulting_step.is_a?(Symbol) # :success, :fail, :skip
Array(resulting_step).each do |step|
PipelineStep.enqueue(step, pipeline: self.pipeline)
end
end
end
Since every single step is persisted in the database, writing an admin interface that gives complete visibility into each pipeline and its steps is straightforward.
Thanks to this admin interface, most debugging sessions are painless. There’s no need to dig into the logs or open a Rails console; everything is nicely laid out in a web UI.
Every step run is sent to Datadog to help us monitor the system’s health and alert us when failure rates are high.
I’m very happy with how ZipIO turned out. We succeeded to built a framework that allows us to define the core logic by writing simple and elegant code. After three years of operation, the codebase defines 125 steps and 50 transformers. We have processed 1 billion steps, and our service continues to be a pleasure to work with and expand.
Questions? Comments? 👉 pcreux@ruby.social.
You might also be interested in: