« Philippe Creux

Building a Robust Data Synchronization Framework with Rails

07 Oct 2024

At Zipline, we offer a one-stop shop application for retail employees that handles everything from clocking to task management and team communication. To achieve this, we need to integrate with numerous third-party applications with unique requirements. We decided to develop a satellite service called ZipIO for data synchronization. In this post, I’ll review our journey of building this robust and scalable system.

Key Design Principles

  1. Language and Framework: We chose Ruby on Rails for its simplicity and productivity. However, we structured our core logic as POROs (Plain Old Ruby Object) to decouple the core logic from the engine that runs it.

  2. Modular Architecture: We broke down the workflow into discrete steps: Signals, Commands, Outcomes, and Tasks. This modular approach allows for easy reuse of logic across different third-party integrations.

  3. Asynchronous Processing: While we support synchronous operations for specific use cases, 99% of our workflows run asynchronously for optimal performance and scalability.

  4. Robust Error Handling: Each step in the workflow is designed to handle errors gracefully, ensuring smooth operation even when issues arise.

  5. Full Visibility: We prioritized comprehensive logging and monitoring to make troubleshooting and maintenance as painless as possible.

Defining workflows

Steps

Our workflow consists of four main kind of steps:

  1. Signals: These are triggers, often from API calls or webhooks, that initiate a workflow.
  2. Commands: Actions performed in response to signals, such as synchronizing data with another app.
  3. Outcomes: The results of commands, which can be success, failure, or skipped.
  4. Tasks: Scheduled operations that poll third-party services for new data to sync. They create Signals, in turn.

Each step in the workflow is designed as a standalone Ruby object that takes a data input, performs an action, and returns the next step(s) to be executed. Since each step defines the downstream step(s), it can perform complex routing depending on input data or user configuration. For example, a Signal could return several Commands to sync an event to multiple third-party services.

flowchart LR

A[Signal: New Order]
B[Command: Sync to service A]
C[Command: Sync to service B]
BB[Outcome: Success]
CB[Outcome: Success]

A --> B
A --> C
B --> BB
C --> CB

The data inputs are defined as Dry::Struct. Each step declares the type(s) of step it returns, adding an extra layer of type safety and allowing us to generate documentation automatically.

Here is an example:

# This signal is created when we receive a new order via a webhook.
class Stripe::Signals::NewOrder < ZipIO::Step
  data_struct [Stripe::Structs::Order]

  emits MyStore::Commands::SaveOrder

  # Convert Stripe Order to a Generic Struct and return a Command to create the order
  def call
    order = Transformer.call(data, to: Generic::Structs::Order)

    MyStore::Commands::CreateOrder.new(order)
  end
end

class Stripe::Commands::CreateOrder < ZipIO::Step
  # Create an order via an API and return an Outcome according to the response.
 def call
   response = api_client.orders.post(data)

   if response.success?
     MyStore::Outcome::OrderCreated.new(response.parsed_body)
   else
     MyStore::Outcome::OrderCreateFailure.new(response.parsed_body)
  end

  private

  def api_client
    # ...
  end
end

# Two outcomes. They are the final steps of the workflow so they don't do anything.
Stripe::Commands::OrderCreated = Class.new(ZipIO::Outcomes::Success)
Stripe::Commands::OrderCreateFailure = Class.new(ZipIO::Outcomes::Failure)

# Order received from Stripe.
class Stripe::Structs::Order < DryStruct
  attribute :amount, Types::Decimal
  attribute :order_id, Types::String
  attribute :user_id, Types::String
  attribute :email, Types::String
  attribute :payment_type, Types::String
end

# Generic order used throughout the system.
class Generic::Structs::Order < DryStruct
  attribute :amount, Types::Decimal
  attribute :order_id, Types::String
  attribute :integration_key, Types::String
  attribute :email, Types::String
  attribute :payment_source, Types::String.enum('credit_card', 'debit_card')
end

Transforming data

The core logic is to transform data from one schema (ex: Stripe Order) to another (ex: Generic Order). We created a simple library inspired by csv-importer to define data transformation using a DSL.

class StripeOrderToGenericOrder < ZipIO::Transformer
  # map `amount` to `amount`
  attribute :amount

  # map `stripe_order_id` to `order_id`
  attribute :order_id, source: :stripe_order_id

  # build a custom integration key
  attribute :integration_key, source: ->(data) {
    [data.fetch(:user_id), data.fetch(:order_id)].join("-")
  }

  # downcase email
  attribute :email, transform: -> { _1.&downcase }

  # Define mapping
  attribute :payment_source, source: :payment_type, transform: Mapper.new(
    "cc" => "credit_card",
    "dc" => "debit_card",
    # ...
  )
end

Using a proper naming convention, we can look up the transformers on the fly:

Transformer.call(stripe_order, to: Generic::Structs::Order)
# => <# Generic::Structs::Order ...>

Live documentation

Since Steps, Structs, and Transformers are defined using DSLs, we can introspect them to generate documentation. This documentation is available in our Admin UI so that non-technical folks (Product Owners, Account Managers, …) understand how the data flows and how it’s transformed.

We traverse the graph of emit-ed steps to render a flow chart:

flowchart LR

A[Stripe::Signals::Webhook] --> B[Stripe::Signals::PaymentSucceeded]

A[Stripe::Signals::Webhook] --> C[Stripe::Signals::NewOrder]

A[Stripe::Signals::Webhook] --> D[Stripe::Outcome::IgnoredWebhookEvent]

C --> E[MyStore::Commands::CreateOrder]

E --> F[MyStore::Outcome::OrderCreated]

E --> G[MyStore::Outcome::OrderCreateFailure]

L[Shopify::Signals::Webhook] --> M

M[Shopify::Signals::NewOrder] --> E

We look up all Structs and Transformers to display them in a human-friendly way. Here is for example the documentation for StripeOrderToGenericOrder:

Source Target Transform
amount ➡️ amount
stripe_order_id ➡️ order_id
Proc[data.fetch(:user_id), data.fetch(:order_id)].join("-")
➡️ integration_key
email ➡️ email
Proc -> { _1.&downcase }
payment_source ➡️ payment_type
"cc" ➡️ "credit_card"
"dc" ➡️ "debit_card"

So far, we’ve defined the steps, structs, and transformations. Let’s see how we make data flow through them.

Processing a Workflow

Processing a Workflow synchronously is straightforward.

# Run a workflow synchronously and return a collection of child steps.
# Ex: [ Stripe::Signals::NewOrder, [ MyStore::Commands::CreateOrder, [ MyStore::Outcome::OrderCreated ] ] ]
def run(step)
  return step if step.is_a?(Symbol) # e.g. :success, :skip, ...

  next_steps = Array(step.call)

  next_steps.map do |next_step|
    run(next_step)
  end
end

We use this approach to perform integration tests where we want to exerce an entire workflow. On a production environment, we persist Pipelines and Steps to the database and we process them asynchronously.

Database Structure

We use a minimal set of Rails models:

  • Pipeline: A specific instance of a pipeline
  • PipelineStep: Individual steps within a pipeline

Asynchronous Processing

When a signal is received:

  1. A new pipeline is created and persisted in the database.
  2. A job is enqueued to process the first step.
  3. Each step, when processed, can trigger subsequent steps, which are then enqueued as separate jobs.

This approach allows for excellent scalability, fault tolerance, and visibility.

Error Handling and Retry Mechanism

By persisting each step in the database, we can retry failed steps without restarting the entire workflow. This is particularly useful for long sequences of API calls where only one step might fail.

Scalability

Our service runs on Heroku, using Judoscale to auto-scale background workers. This setup allows us to handle peak loads efficiently by spinning up additional workers as needed.

Example

In the example below, we handle Stripe webhooks. Each webhook can contain multiple events, so we start a pipeline where the first step breaks down each event into its own Signal.

class StripeWebhooksController < ApplicationController
  # POST /webhooks/stripe
  # The webhooks can contain multiple events.
  # Build a Webhook signal from the raw payload and start a Pipeline.
  def create
    signal = Stripe::Signals::Webhook.new(params)
    Pipeline.start!(signal)
  end
end

class Stripe::Signals::Webhook < ZipIO::Signal
  # Return an array of individual signals.
  # These signals will be processed in parallel.
  def call(webhook)
    webhook.events.filter_map do |event|
      case event.type
      when 'payment_intent.succeeded'
        Stripe::Signals::PaymentSucceeded.new(event)
      when 'orders.succeeded'
        # See definition above.
        Stripe::Signals::NewOrder.new(event)
      else
        Stripe::Outcome::IgnoredWebhookEvent.new(event)
      end
    end
  end
end

At a high level, the Pipeline implementation looks like this:

class Pipeline < ApplicationRecord
  # Persist the signal as the first step of the pipeline and process it asynchronously.
  def self.start!(signal)
    PipelineStep.enqueue(signal)
  end
end

class PipelineStep < ApplicationRecord
  # Persist the step and process it asynchronously.
  def self.enqueue(step, pipeline: nil)
    pipeline ||= Pipeline.create!

    step_model = create!(data: step.attributes, step_class: step.class.name, pipeline: pipeline)

    step_model.process_async!
  end

  # Enqueue a job that calls `process!`
  def process_async!
    ProcessPipelineStepJob.perform_async(self)
  end

  # Constantize the persisted step, trigger `call`, and enqueue the resulting steps.
  def process!
    step = step_class.constantize.new(data) # ex: Stripe::Signals::Webhook
    # Trigger the step
    resulting_step = step.call

    return if resulting_step.is_a?(Symbol) # :success, :fail, :skip

    Array(resulting_step).each do |step|
      PipelineStep.enqueue(step, pipeline: self.pipeline)
    end
  end
end

Instrumentation, visibility, and monitoring

Since every single step is persisted in the database, writing an admin interface that gives complete visibility into each pipeline and its steps is straightforward.

Thanks to this admin interface, most debugging sessions are painless. There’s no need to dig into the logs or open a Rails console; everything is nicely laid out in a web UI.

Every step run is sent to Datadog to help us monitor the system’s health and alert us when failure rates are high.

Conclusion

I’m very happy with how ZipIO turned out. We succeeded to built a framework that allows us to define the core logic by writing simple and elegant code. After three years of operation, the codebase defines 125 steps and 50 transformers. We have processed 1 billion steps, and our service continues to be a pleasure to work with and expand.

Questions? Comments? 👉 pcreux@ruby.social.


You might also be interested in: