Microservices using Fastapi and Redis Streams

Varanasi Rama Krishna Parjanya
4 min readOct 3, 2023

--

Fastapi is a modern high performance python web framework used for building REST APIs.

Fastapi has multiple features including but not limited as below -

  • Built in OpenApi documentation i.e. Swagger UI for endpoints.
  • Fastest python frameworks
  • Highly suited for microservice applications

Project Dependencies are listed below

fastapi==0.75.0
redis-om==0.0.20
requests

Design Overview

Model Classes

  1. Product class
# Model class for Product
class Product(HashModel):
name: str
price: float
quantity: int

class Meta:
database: redis

2. Order class

# Model class for Order
class Order(HashModel):
product_id: str # mapped to Product
price: float
fee: float # 20% of price
total: float # total = price + fee
quantity: int
status: str # ("PENDING", "REFUNDED", "COMPLETED", "CANCELLED")

class Meta:
database: redis

1. Inventory Microservice

This service serves the below purposes

  • exposes REST endpoints to fetch the product info from the inventory
# Fetches product by id
@app.get("/products/getProductById/{pk}")
def getProductById(pk: str):
try:
getProd = Product.get(pk)
return getProd
except Exception as e:
logging.error(e)
logging.error("Unable to get the product for this id")
  • listens to ORDER_COMPLETED_EVENT and places the order.
  • publishes ORDER_CANCELLED_EVENT and ORDER_REFUND_EVENT events based on conditional checks


# package --> inventory\inventory_consumer.py

from main import redis, Product
import logging, time

# Logging config
FORMAT = "%(levelname)s:\t%(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)


key = 'ORDER_COMPLETED_EVENT'
group = 'inventory-group'


try:
redis.xgroup_create(key, group)
except Exception as e:
logging.error(e)
logging.error('Group already exists')



while True:
try:
results = redis.xreadgroup(group, key, {key: '>'}, None)

logging.info(results)

if(results!=[]):
for result in results:
order_obj = result[1][0][1]
logging.info(order_obj)
try:
product = Product.get(order_obj['product_id'])

if(product.quantity >= int(order_obj['quantity'])):
product.quantity = product.quantity - int(order_obj['quantity'])
logging.info("Product == "+product)
product.save()
else:
# There are not enough products in inventory so, cancel the order
logging.error("Product Quantity not sufficient !! Can't proceed !!")

#Listener for ORDER_CANCELLED_EVENT in payment_order_cancel_consumer.py
# Publishing ORDER_CANCELLED_EVENT
redis.xadd('ORDER_CANCELLED_EVENT', order_obj, '*')

except:
#Listener for ORDER_REFUND_EVENT in payment_refund_consumer.py
# Publishing ORDER_REFUND_EVENT
redis.xadd('ORDER_REFUND_EVENT', order_obj, '*')


except Exception as e:
logging.error(e)

time.sleep(1)

2. Payment Microservice

This microservice serves below purposes

  • exposes REST end point to place the order (calls the inventory endpoint to fetch the product info with the key)

# create an order

@app.post('/orders/placeOrder')
async def createOrder(request: Request, background_tasks: BackgroundTasks):

requestBody = await request.json()
product_key = requestBody["id"]
product_quantity = requestBody["quantity"]

logging.info("Product Key is == "+str(product_key))
url = 'http://127.0.0.1:8000/products/getProductById/'+str(product_key)
logging.info("Inventory endpoint hit =="+url)


# Fetch product response from Inventory
try:
inventory_response = requests.get(url)
logging.info("Response received from inventory =="+str(inventory_response.json()))
except Exception as e:
logging.error(e)

inventory_product = inventory_response.json()

order = Order(
product_id= product_key,
price= inventory_product['price'],
fee= 0.2 * inventory_product['price'],
total= 1.2 * inventory_product['price'],
quantity= product_quantity,
status='PENDING'
)


try:
# Save order
order.save()

# BackgroundTasks creates a separate thread --> Callable interface
background_tasks.add_task(order_completed, order)

logging.info("Order saved successfully for the product == "+requestBody['id'])
return order

except Exception as e:
logging.error(e)
logging.error("Could not save the order")
  • listens to the ORDER_REFUND_EVENT and updates order status to “REFUNDED
from main import redis, Order
import logging, time

# Logging config
FORMAT = "%(levelname)s:\t%(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)


key = 'ORDER_REFUND_EVENT'
group = 'payment-refund-group'


try:
redis.xgroup_create(key, group)
except Exception as e:
logging.error(e)
logging.error('Group already exists')

while True:
try:
results = redis.xreadgroup(group, key, {key: '>'}, None)

print(results)

if results != []:
logging.info("Payment Refund Results == "+str(results))
for result in results:
obj = result[1][0][1]
order = Order.get(obj['pk'])
order.status = 'REFUNDED'
order.save()

except Exception as e:
logging.error(e)

time.sleep(1)

  • listens to the ORDER_CANCELLED_EVENT and deletes the order
from main import redis, Order
import logging, time, requests

# Logging config
FORMAT = "%(levelname)s:\t%(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)


key = 'ORDER_CANCELLED_EVENT'
group = 'payment-order-cancelled-group'


try:
logging.info("Trying to create a payment-order-cancel-group")
redis.xgroup_create(key, group)
except Exception as e:
logging.error(e)
logging.error('Group already exists')

while True:
try:
results = redis.xreadgroup(group, key, {key: '>'}, None)
logging.info("Results are as below")
logging.info(results)
logging.info(len(results))

if(results!=[]):
for result in results:
logging.info(result)
order_obj = result[1][0][1]
logging.info(order_obj)

order_pk = order_obj['pk']
logging.info(order_pk)

delete_order_url = 'http://127.0.0.1:8001/orders/deleteOrderByOrderPk/'+str(order_pk)

delete_order_response = requests.get(delete_order_url)
logging.info("Response received from delete order URL =="+str(delete_order_response.json()))

except Exception as e:
logging.error(e)

time.sleep(1)

Outputs

  1. Created a Product in Inventory

2. Placing an order for 100 products in Payment

Initial status stays in “PENDING”

Order status changes to “COMPLETED”

3. ORDER_COMPLETED_EVENT is published and the order quantity is subtracted from the inventory quantity.

Product quantity is reduced by the order quantity.

4. If the order placing fails, ORDER_REFUND_EVENT is published and the order is marked REFUNDED.

References

--

--

No responses yet