07 Oct 2024
At Zipline, we offer a one-stop shop application for retail employees that handles everything from clocking to task management and team communication. To achieve this, we need to integrate with numerous third-party applications with unique requirements. We decided to develop a satellite service called ZipIO for data synchronization. In this post, I’ll review our journey of building this robust and scalable system.
Language and Framework: We chose Ruby on Rails for its simplicity and productivity. However, we structured our core logic as POROs (Plain Old Ruby Object) to decouple the core logic from the engine that runs it.
Modular Architecture: We broke down the workflow into discrete steps: Signals, Commands, Outcomes, and Tasks. This modular approach allows for easy reuse of logic across different third-party integrations.
Asynchronous Processing: While we support synchronous operations for specific use cases, 99% of our workflows run asynchronously for optimal performance and scalability.
Robust Error Handling: Each step in the workflow is designed to handle errors gracefully, ensuring smooth operation even when issues arise.
Full Visibility: We prioritized comprehensive logging and monitoring to make troubleshooting and maintenance as painless as possible.
Our workflow consists of four main components:
Each step in the workflow is designed as a standalone Ruby object that takes a data input, performs an action, and returns the next step(s) to be executed.
The data inputs are defined as Dry::Struct
. Each step declares the type(s) of step it returns, adding an extra layer of type safety and allowing us to generate documentation automatically.
Here is a dummy example:
# This signal is created when we receive a new order via a webhook.
class Stripe::Signals::NewOrder < ZipIO::Step
data_struct [Stripe::Structs::Order]
emits MyStore::Commands::SaveOrder
# Convert Stripe Order to a Generic Struct and return a Command to create the order
def call
order = Transformer.call(data, to: Generic::Structs::Order)
MyStore::Commands::CreateOrder.new(order)
end
end
class Stripe::Commands::CreateOrder < ZipIO::Step
# Create an order via an API and return an Outcome according to the response.
def call
response = api_client.orders.post(data)
if response.success?
MyStore::Outcome::OrderCreated.new(response.parsed_body)
else
MyStore::Outcome::OrderCreateFailure.new(response.parsed_body)
end
private
def api_client
# ...
end
end
# Two outcomes. They are the final steps of the workflow so they don't do anything.
Stripe::Commands::OrderCreated = Class.new(ZipIO::Outcomes::Success)
Stripe::Commands::OrderCreateFailure = Class.new(ZipIO::Outcomes::Failure)
# Order received from Stripe.
class Stripe::Structs::Order < DryStruct
attribute :amount, Types::Decimal
attribute :order_id, Types::String
attribute :user_id, Types::String
attribute :email, Types::String
attribute :payment_type, Types::String
end
# Generic order used throughout the system.
class Generic::Structs::Order < DryStruct
attribute :amount, Types::Decimal
attribute :order_id, Types::String
attribute :integration_key, Types::String
attribute :email, Types::String
attribute :payment_source, Types::String.enum('credit_card', 'debit_card')
end
The core logic is to transform data from one schema (ex: Stripe Order) to another (ex: Generic Order). We created a simple library inspired by csv-importer to define data transformation using a DSL.
class StripeOrderToGenericOrder < ZipIO::Transformer
# map `amount` to `amount`
attribute :amount
# map `stripe_order_id` to `order_id`
attribute :order_id, source: :stripe_order_id
# build a custom integration key
attribute :integration_key, source: ->(data) {
[data.fetch(:user_id), data.fetch(:order_id)].join("-")
}
# downcase email
attribute :email, transform: -> { _1.&downcase }
# Define mapping
attribute :payment_source, source: :payment_type, transform: Mapper.new(
"cc" => "credit_card",
"dc" => "debit_card",
# ...
)
end
Using a proper naming convention, we can look up the transformers on the fly:
Transformer.call(stripe_order, to: Generic::Structs::Order)
# => <# Generic::Structs::Order ...>
Since Steps, Structs, and Transformers are defined using DSLs, we can introspect them to generate documentation at runtime so that non-technical folks (Product Owners, Account Managers, …) understand how the data flows and how it’s transformed.
We traverse the graph of emit
-ed steps to render a flow chart:
flowchart LR
A[Stripe::Signals::Webhook] --> B[Stripe::Signals::PaymentSucceeded]
A[Stripe::Signals::Webhook] --> C[Stripe::Signals::NewOrder]
A[Stripe::Signals::Webhook] --> D[Stripe::Outcome::IgnoredWebhookEvent]
C --> E[MyStore::Commands::CreateOrder]
E --> F[MyStore::Outcome::OrderCreated]
E --> G[MyStore::Outcome::OrderCreateFailure]
L[Shopify::Signals::Webhook] --> M
M[Shopify::Signals::NewOrder] --> E
We look up all Structs and Transformers to display them in a human-friendly way.
Here is for example the documentation for StripeOrderToGenericOrder
:
Source | Target | Transform | |||||||
---|---|---|---|---|---|---|---|---|---|
amount | ➡️ | amount | |||||||
stripe_order_id | ➡️ | order_id | |||||||
Proc[data.fetch(:user_id), data.fetch(:order_id)].join("-") |
➡️ | integration_key | |||||||
➡️ | Proc-> { _1.&downcase }
|
||||||||
payment_source | ➡️ | payment_type |
|
So far, we’ve defined the steps, structs, and transformations. Let’s see how we make data flow through them.
Processing a Workflow synchronously is straightforward.
# Run a workflow synchronously and return a collection of child steps.
# Ex: [ Stripe::Signals::NewOrder, [ MyStore::Commands::CreateOrder, [ MyStore::Outcome::OrderCreated ] ] ]
def run(step)
return step if step.is_a?(Symbol) # e.g. :success, :skip, ...
next_steps = Array(step.call)
next_steps.map do |next_step|
run(next_step)
end
end
We use this approach to perform integration tests where we want to exerce an entire workflow. On a production environment, we persist Pipelines and Steps to the database and we process them asynchronously.
We use a minimal set of Rails models:
Pipeline
: A specific instance of a pipelinePipelineStep
: Individual steps within a pipelineWhen a signal is received:
This approach allows for excellent scalability, fault tolerance, and visibility.
By persisting each step in the database, we can retry failed steps without restarting the entire workflow. This is particularly useful for long sequences of API calls where only one step might fail.
Our service runs on Heroku, using Judoscale to auto-scale background workers. This setup allows us to handle peak loads efficiently by spinning up additional workers as needed.
In the example below, we handle Stripe webhooks. Each webhook can contain multiple events, so we start a pipeline where the first step breaks down each event into its own Signal.
class StripeWebhooksController < ApplicationController
# POST /webhooks/stripe
# The webhooks can contain multiple events.
# Build a Webhook signal from the raw payload and start a Pipeline.
def create
signal = Stripe::Signals::Webhook.new(params)
Pipeline.start!(signal)
end
end
class Stripe::Signals::Webhook < ZipIO::Signal
# Return an array of individual signals.
# These signals will be processed in parallel.
def call(webhook)
webhook.events.filter_map do |event|
case event.type
when 'payment_intent.succeeded'
Stripe::Signals::PaymentSucceeded.new(event)
when 'orders.succeeded'
Stripe::Signals::NewOrder.new(event)
else
Stripe::Outcome::IgnoredWebhookEvent.new(event)
end
end
end
end
At a high level, the Pipeline implementation looks like this:
class Pipeline < ApplicationRecord
# Persist the signal as the first step of the pipeline and process it asynchronously.
def self.start!(signal)
PipelineStep.enqueue(signal)
end
end
class PipelineStep < ApplicationRecord
# Persist the step and process it asynchronously.
def self.enqueue(step, pipeline: nil)
pipeline ||= Pipeline.create!
step_model = create!(data: step.attributes, step_class: step.class.name, pipeline: pipeline)
step_model.process_async!
end
# Enqueue a job that calls `process!`
def process_async!
ProcessPipelineStepJob.perform_async(self)
end
# Constantize the persisted step, trigger `call`, and enqueue the resulting steps.
def process!
step = step_class.constantize.new(data) # ex: Stripe::Signals::Webhook
# Trigger the step
resulting_step = step.call
return if resulting_step.is_a?(Symbol) # :success, :fail, :skip
Array(resulting_step).each do |step|
PipelineStep.enqueue(step, pipeline: self.pipeline)
end
end
end
Since every single step is persisted in the database, writing an admin interface that gives complete visibility into each pipeline and its steps is straightforward.
Thanks to this admin interface, most debugging sessions are painless. There’s no need to dig into the logs or open a Rails console; everything is nicely laid out in a web UI.
Every step run is sent to Datadog to help us monitor the system’s health and alert us when failure rates are high.
Building ZipIO has been a journey of balancing simplicity with robustness. We’ve created a solution that has stood the test of time by designing a modular system that decouples the core logic from the engine that runs it. After three years of operation, we have processed 1 billion steps, and our service continues to be a pleasure to work with and expand. The codebase currently defines 125 steps and 50 transformers.
The key takeaways from our experience are:
By following these principles, we’ve created a system that meets our current needs and is well-positioned to adapt to future challenges.
Questions? Comments? 👉 pcreux@ruby.social.