Introduction to Kafka with Ruby: Installing and Integrating Shopify and SendGrid

November 13, 2024

Introduction

Kafka has become a popular tool for managing data streams across distributed systems, making it a natural choice for applications that need to communicate with multiple services reliably and efficiently. It’s particularly valuable for applications that work with real-time data or need to ensure reliable data delivery across various systems.

In this article, we’ll introduce Kafka, walk you through setting up Kafka in Ruby using the rdkafka gem, and show how to build a workflow to connect Shopify and SendGrid. This setup could be useful, for example, in an e-commerce context, where events (like new orders in Shopify) trigger actions (like sending emails in SendGrid).


What is Kafka?

Kafka is a distributed event-streaming platform that serves as a central hub for data streams. It lets you publish (produce) and subscribe to (consume) streams of records, enabling a decoupled architecture. It’s a robust solution for applications that need to handle large volumes of data in real time or require fault tolerance for message delivery.

Key Concepts

  • Producers publish messages (or events) to Kafka topics.
  • Consumers subscribe to topics to receive messages in real-time.
  • Brokers are the servers that form the Kafka cluster.
  • Topics organize messages within Kafka, often used to separate types of data or services.

Why rdkafka?

The rdkafka gem, a Ruby wrapper around the librdkafka C library, is one of the most efficient Kafka clients available for Ruby. It provides a lightweight and fast implementation for interacting with Kafka while handling higher throughput than purely Ruby-based clients. This makes it a great choice for production-grade applications.

Installing Kafka with rdkafka in Ruby

Prerequisites

  1. Shopify API: You’ll need a Shopify API key and access token, along with a private app configured for your Shopify store.
  2. Kafka: Set up Kafka as described before.
  3. Ruby Gems: Ensure that you have rdkafka, sendgrid-ruby, and shopify_api installed

Install the necessary gems if you haven’t already:

gem install rdkafka sendgrid-ruby shopify_api

🚀 Need Expert Ruby on Rails Developers to Elevate Your Project?

Fill out our form! >>


Step 1: Configure Shopify API

Start by setting up the Shopify API client to fetch orders. Here’s an example of how to do that:

require 'shopify_api'
SHOPIFY_API_KEY = "your_shopify_api_key"
SHOPIFY_PASSWORD = "your_shopify_password"
SHOP_NAME = "your_shop_name"
API_VERSION = "2021-04"

ShopifyAPI::Base.site = "https://#{SHOPIFY_API_KEY}:#{SHOPIFY_PASSWORD}@#{SHOP_NAME}.myshopify.com/admin/api/#{API_VERSION}"

Step 2: Producer Code to Fetch Orders from Shopify and Publish to Kafka

This producer will connect to the Shopify API, retrieve recent orders, and publish each order to a Kafka topic.

require 'rdkafka'
require 'shopify_api'

# Set up Shopify API
SHOPIFY_API_KEY = "your_shopify_api_key"
SHOPIFY_PASSWORD = "your_shopify_password"
SHOP_NAME = "your_shop_name"
API_VERSION = "2021-04"

ShopifyAPI::Base.site = "https://#{SHOPIFY_API_KEY}:#{SHOPIFY_PASSWORD}@#{SHOP_NAME}.myshopify.com/admin/api/#{API_VERSION}"
# Kafka configuration

kafka_config = {
  :"bootstrap.servers" => "localhost:9092"
}

producer = Rdkafka::Config.new(kafka_config).producer
# Fetch recent orders from Shopify
def fetch_orders
  ShopifyAPI::Order.find(:all, params: { status: 'any', limit: 5 })
end

# Publish order event to Kafka
def publish_order_event(producer, order)
  event_payload = {
    order_id: order.id,
    customer_email: order.email,
    total_price: order.total_price,
    line_items: order.line_items.map { |item| { name: item.name, quantity: item.quantity } }
  }.to_json

  producer.produce(
    topic: "shopify_orders",
    payload: event_payload,
    key: order.id.to_s
  ).wait

  puts "Published order event to Kafka: #{event_payload}"
end

# Main execution
orders = fetch_orders
orders.each do |order|
  publish_order_event(producer, order)
end

This code will retrieve the five most recent orders from your Shopify store and publish each order as an event in Kafka, including essential details like the order ID, customer email, total price, and line items.

Step 3: Consumer Code to Listen for Order Events and Send Emails with SendGrid

Next, let’s set up the consumer. The consumer will read each order event from Kafka, parse the message, and use SendGrid to send a confirmation email to the customer.

require 'rdkafka'
require 'sendgrid-ruby'
require 'json'
include SendGrid

# Kafka consumer configuration
kafka_config = {
  :"bootstrap.servers" => "localhost:9092",
  :"group.id" => "sendgrid_email_service"
}

consumer = Rdkafka::Config.new(kafka_config).consumer
consumer.subscribe("shopify_orders")

# SendGrid configuration
SENDGRID_API_KEY = "your_sendgrid_api_key"
sg = SendGrid::API.new(api_key: SENDGRID_API_KEY)
# Define a method to send the email

def send_order_confirmation(email, order_details)
  from = Email.new(email: "no-reply@yourshop.com")
  to = Email.new(email: email)
  subject = "Thank You for Your Order!"
  content = Content.new(
    type: "text/plain",
    value: "Thank you for your purchase! Here are your order details:\n\n#{order_details}"
  )

  mail = Mail.new(from, subject, to, content)
  response = sg.client.mail._('send').post(request_body: mail.to_json)
  puts "Email sent to #{email}, status: #{response.status_code}"
end

# Consume and process messages

consumer.each do |message|
  event_data = JSON.parse(message.payload)
  puts "Received order event: #{event_data}"

  # Format order details for email

  order_details = "Order ID: #{event_data['order_id']}\n" \
                  "Total Price: #{event_data['total_price']}\n" \
                  "Items:\n" \
                  "#{event_data['line_items'].map { |item| "- #{item['name']} (Qty: #{item['quantity']})" }.join("\n")}"

  # Send order confirmation email

  send_order_confirmation(event_data["customer_email"], order_details)
end

This consumer listens to the shopify_orders topic and, upon receiving an order event, sends a confirmation email containing order details to the customer.

Running the Example

  • Start Kafka: Make sure your Kafka server is running and accessible.
  • Run the Consumer: Start the consumer so it’s listening for new messages.
ruby consumer_script.rb
  • Run the Producer: Trigger the producer to publish order events to Kafka.
ruby producer_script.rb

If everything is configured correctly, the consumer will pick up each order event, and you should see emails sent via SendGrid to the customers associated with those orders.


Conclusion

This setup demonstrates how Kafka can be integrated with Shopify and SendGrid using Ruby. By leveraging the Shopify API to fetch real orders and using Kafka to manage and relay messages, we’ve created a robust workflow for handling order events. Kafka’s message queue architecture ensures that no data is lost, even if the consumer is temporarily unavailable.

For production use, consider adding robust error handling, logging, and monitoring to ensure reliability and observability in your messaging pipeline. Happy coding!

Leave a comment