Microservices using Fastapi and Redis Streams
4 min readOct 3, 2023
Fastapi is a modern high performance python web framework used for building REST APIs.
Fastapi has multiple features including but not limited as below -
- Built in OpenApi documentation i.e. Swagger UI for endpoints.
- Fastest python frameworks
- Highly suited for microservice applications
Project Dependencies are listed below
fastapi==0.75.0
redis-om==0.0.20
requests
Design Overview
Model Classes
- Product class
# Model class for Product
class Product(HashModel):
name: str
price: float
quantity: int
class Meta:
database: redis
2. Order class
# Model class for Order
class Order(HashModel):
product_id: str # mapped to Product
price: float
fee: float # 20% of price
total: float # total = price + fee
quantity: int
status: str # ("PENDING", "REFUNDED", "COMPLETED", "CANCELLED")
class Meta:
database: redis
1. Inventory Microservice
This service serves the below purposes
- exposes REST endpoints to fetch the product info from the inventory
# Fetches product by id
@app.get("/products/getProductById/{pk}")
def getProductById(pk: str):
try:
getProd = Product.get(pk)
return getProd
except Exception as e:
logging.error(e)
logging.error("Unable to get the product for this id")
- listens to ORDER_COMPLETED_EVENT and places the order.
- publishes ORDER_CANCELLED_EVENT and ORDER_REFUND_EVENT events based on conditional checks
# package --> inventory\inventory_consumer.py
from main import redis, Product
import logging, time
# Logging config
FORMAT = "%(levelname)s:\t%(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
key = 'ORDER_COMPLETED_EVENT'
group = 'inventory-group'
try:
redis.xgroup_create(key, group)
except Exception as e:
logging.error(e)
logging.error('Group already exists')
while True:
try:
results = redis.xreadgroup(group, key, {key: '>'}, None)
logging.info(results)
if(results!=[]):
for result in results:
order_obj = result[1][0][1]
logging.info(order_obj)
try:
product = Product.get(order_obj['product_id'])
if(product.quantity >= int(order_obj['quantity'])):
product.quantity = product.quantity - int(order_obj['quantity'])
logging.info("Product == "+product)
product.save()
else:
# There are not enough products in inventory so, cancel the order
logging.error("Product Quantity not sufficient !! Can't proceed !!")
#Listener for ORDER_CANCELLED_EVENT in payment_order_cancel_consumer.py
# Publishing ORDER_CANCELLED_EVENT
redis.xadd('ORDER_CANCELLED_EVENT', order_obj, '*')
except:
#Listener for ORDER_REFUND_EVENT in payment_refund_consumer.py
# Publishing ORDER_REFUND_EVENT
redis.xadd('ORDER_REFUND_EVENT', order_obj, '*')
except Exception as e:
logging.error(e)
time.sleep(1)
2. Payment Microservice
This microservice serves below purposes
- exposes REST end point to place the order (calls the inventory endpoint to fetch the product info with the key)
# create an order
@app.post('/orders/placeOrder')
async def createOrder(request: Request, background_tasks: BackgroundTasks):
requestBody = await request.json()
product_key = requestBody["id"]
product_quantity = requestBody["quantity"]
logging.info("Product Key is == "+str(product_key))
url = 'http://127.0.0.1:8000/products/getProductById/'+str(product_key)
logging.info("Inventory endpoint hit =="+url)
# Fetch product response from Inventory
try:
inventory_response = requests.get(url)
logging.info("Response received from inventory =="+str(inventory_response.json()))
except Exception as e:
logging.error(e)
inventory_product = inventory_response.json()
order = Order(
product_id= product_key,
price= inventory_product['price'],
fee= 0.2 * inventory_product['price'],
total= 1.2 * inventory_product['price'],
quantity= product_quantity,
status='PENDING'
)
try:
# Save order
order.save()
# BackgroundTasks creates a separate thread --> Callable interface
background_tasks.add_task(order_completed, order)
logging.info("Order saved successfully for the product == "+requestBody['id'])
return order
except Exception as e:
logging.error(e)
logging.error("Could not save the order")
- listens to the ORDER_REFUND_EVENT and updates order status to “REFUNDED”
from main import redis, Order
import logging, time
# Logging config
FORMAT = "%(levelname)s:\t%(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
key = 'ORDER_REFUND_EVENT'
group = 'payment-refund-group'
try:
redis.xgroup_create(key, group)
except Exception as e:
logging.error(e)
logging.error('Group already exists')
while True:
try:
results = redis.xreadgroup(group, key, {key: '>'}, None)
print(results)
if results != []:
logging.info("Payment Refund Results == "+str(results))
for result in results:
obj = result[1][0][1]
order = Order.get(obj['pk'])
order.status = 'REFUNDED'
order.save()
except Exception as e:
logging.error(e)
time.sleep(1)
- listens to the ORDER_CANCELLED_EVENT and deletes the order
from main import redis, Order
import logging, time, requests
# Logging config
FORMAT = "%(levelname)s:\t%(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
key = 'ORDER_CANCELLED_EVENT'
group = 'payment-order-cancelled-group'
try:
logging.info("Trying to create a payment-order-cancel-group")
redis.xgroup_create(key, group)
except Exception as e:
logging.error(e)
logging.error('Group already exists')
while True:
try:
results = redis.xreadgroup(group, key, {key: '>'}, None)
logging.info("Results are as below")
logging.info(results)
logging.info(len(results))
if(results!=[]):
for result in results:
logging.info(result)
order_obj = result[1][0][1]
logging.info(order_obj)
order_pk = order_obj['pk']
logging.info(order_pk)
delete_order_url = 'http://127.0.0.1:8001/orders/deleteOrderByOrderPk/'+str(order_pk)
delete_order_response = requests.get(delete_order_url)
logging.info("Response received from delete order URL =="+str(delete_order_response.json()))
except Exception as e:
logging.error(e)
time.sleep(1)
Outputs
- Created a Product in Inventory
2. Placing an order for 100 products in Payment
Initial status stays in “PENDING”
Order status changes to “COMPLETED”
3. ORDER_COMPLETED_EVENT is published and the order quantity is subtracted from the inventory quantity.
Product quantity is reduced by the order quantity.
4. If the order placing fails, ORDER_REFUND_EVENT is published and the order is marked REFUNDED.
References
- GitHub — https://github.com/vramakrishnaparjanya/microservices-with-fastapi
- Fastapi guide — https://fastapi.tiangolo.com/tutorial/
- Redis Streams — https://redis.io/docs/data-types/streams/