How to use WebSockets with AWS Serverless

0
1055

Introduction

In this guide, we are going to see how we can use WebSockets using AWS serverless framework using NodeJs, at the end of this guide we will have an application where we can create a chat room and other users can join our room to chat with each other in a custom room, I made the procedure very simple to follow, at the end of this post you will also get a link to the Github repository for the code.

Project Setup

The first thing is to set up the project folder and install the required project dependencies by creating a new folder and running the below commands in the root of the project folder

npm init
npm i aws-sdk --save

Create a folder named src at the root of the project and inside the src folder, we need to create four more folders with index.js files in each folder

connectionHandler:- This folder will contain the file with code to handle the connect and disconnect events of WebSockets.
manageRoom:- This folder will contain the file with code to create/join the chat room.
sendMessage:- This folder will contain the file with code to emit the message to all connected users in a particular room if any user in the room sends a message.

By now our project structure should look like this

Now we have the basic project set up done and we are ready to move to the next step which is creating the serverless.yml file, firstly we will discuss what this file does in brief.

What is a serverless.yml file?

In a very simple language, serverless.yml file is used to code out the template according to the resources we want to create in our AWS account, we can define different types of resources in the serverless.yml file and we can also set the different permissions for different resources.

In this project, the main use of serverless.yml will be to create the Lambda functions and to set up the DynamoDB table with different permissions.

Defining configuration and permissions block in serverless.yml file

service: serverless-chat

provider:
  name: aws
  runtime: nodejs12.x
  websocketsApiName: custom-websockets-api-name
  websocketsApiRouteSelectionExpression: $request.body.action
  environment:
    DYNAMO_TABLE_NAME: connections
  iamRoleStatements:
    - Effect: Allow
      Action:
        - dynamodb:Query
        - dynamodb:Scan
        - dynamodb:GetItem
        - dynamodb:PutItem
        - dynamodb:DeleteItem
        - dynamodb:UpdateItem
        - lambda:InvokeFunction
      Resource: "arn:aws:dynamodb:${opt:region, self:provider.region}:*:
      table/${self:provider.environment.DYNAMO_TABLE_NAME}"

This is the first part of our serverless.yml file, let’s break it down into parts

service:- This is just the name of the CloudFormation template which will be created in the AWS account.
provider:- We define configuration, environment variables, different permissions, roles in this block, here in this code we are defining things like the version of NodeJs we want to use in our AWS environment.
websocketsApiRouteSelectionExpression:- This is the custom route selection expression, meaning if we want to emit custom events from our WebSocket client then we will be passing the event name in the action property of the payload.
Action:- This block has all the permission we want to give our lambda function to perform different operations on the DynamoDB table.

Defining functions block in serverless.yml file

functions:
  connectionHandler:
    handler: src/connectionHandler/index.connectionHandler
    events:
      - websocket:
          route: $connect
      - websocket:
          route: $disconnect

  sendMessage:
    handler: src/sendMessage/index.sendMessage
    events:
      - websocket:
          route: sendmessage

  manageRoom:
    handler: src/manageRoom/index.manageRoom
    events:
      - websocket:
          route: manageroom

This is where we will define all our Lambda functions to be created, let’s break it down a little bit for a better understanding

connectionHandler:- This is the Lambda function which will be called when any user connects to or disconnects from our WebSocket server, there are three predefined events or routes defined by API Gateway – $connect$disconnect and $default.
$connect/$disconnect:- When the user connects to our WebSocket server $connect is the default event that gets called and when the user disconnects $disconnect event gets called.
sendMessage:- This function will be called if the user sends sendmessage as the value of action property in the request payload, it handles sending messages to all connected users in a particular room.
manageRoom:- This function is used for creating/joining a room according to room id.

Defining resources block in serverless.yml file

resources:
  Resources:
    UsersDynamoDbTable:
      Type: AWS::DynamoDB::Table
      DeletionPolicy: Retain
      Properties:
        AttributeDefinitions:
          - AttributeName: connectionId
            AttributeType: S
        KeySchema:
          - AttributeName: connectionId
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        TableName: ${self:provider.environment.DYNAMO_TABLE_NAME}

This is our resources block in serverless.yml file, we define all the resources we want to automatically create in the AWS account in this file, here we are creating a new DynamoDB table with a Hash key or in another language Primary key if you come from SQL background.

Connecting and disconnecting users

Let’s start working on the Lambda function to connect or disconnect WebSocket clients, we are using connectionHandler function to handle this functionality, it will look something like this

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

exports.connectionHandler = async event => {
    const connectionId = event.requestContext.connectionId;
    const eventType = event.requestContext.eventType
    if (eventType === 'DISCONNECT') {
        try {
            await ddb.delete({ TableName: process.env.DYNAMO_TABLE_NAME, Key: { connectionId } }).promise();
            return { statusCode: 200, body: 'Disconnected' };
        }
        catch (e) {
            return { statusCode: 500, body: 'Could not clear the connection.' };
        }
    }
    else if (eventType === "CONNECT") {
        const putParams = {
            TableName: process.env.DYNAMO_TABLE_NAME,
            Item: {
                connectionId
            }
        };

        try {
            await ddb.put(putParams).promise();
        } catch (err) {
            return { statusCode: 500, body: 'Failed to connect: ' + JSON.stringify(err) };
        }

        return { statusCode: 200, body: 'Connected.' };
    }
};

Don’t worry we will go through each part of the function in detail, so let’s start with the first part? which is handling the connected users.

Connecting users

else if (eventType === "CONNECT") {
        const putParams = {
            TableName: process.env.DYNAMO_TABLE_NAME,
            Item: {
                connectionId
            }
        };

        try {
            await ddb.put(putParams).promise();
        } catch (err) {
            return { statusCode: 500, body: 'Failed to connect: ' + JSON.stringify(err) };
        }

        return { statusCode: 200, body: 'Connected.' };
    }

What we are doing here is checking if the user got connected using API Gateway WebSocket URL and if the user got connected we are getting the connectionId from the event.requestContextobject and creating a new entry in Dynamo DB table with connectionId value, so this is just a simple insert operation on Dynamo DB table with connectionId.

What is .promise() ?

If you are wondering why we are using .promise() here then it is used because we want to write clean code to the best of our ability so we want to use async/await instead of callbacks, but to use async/await the function call must return a Javascript promise that is why we are using .promise() call, most of the functions in AWS-SDK has an option to use promise() which allows the function to return the result in a promise instead of a callback.

Disconnecting users

if (eventType === 'DISCONNECT') {
        try {
            await ddb.delete({ TableName: process.env.DYNAMO_TABLE_NAME, Key: { connectionId } }).promise();
            return { statusCode: 200, body: 'Disconnected' };
        }
        catch (e) {
            return { statusCode: 500, body: 'Could not clear the connection.' };
        }
    }

We are checking here if the user got disconnected from the WebSocket server and if the user got disconnected then connectionId is used to remove that user entry from DynamoDB table.

Creating and joining chat rooms

The next thing is to set up a Lambda function to allow users to create or join a room, the code of the function will look something like this –

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

exports.manageRoom = async event => {
    const body = JSON.parse(event.body)
    if (!body.roomid) return { statusCode: 200, body: 'Room id is required.' };

    const params = {
        TableName: process.env.DYNAMO_TABLE_NAME,
        Key: {
            connectionId: event.requestContext.connectionId,
        },
        ExpressionAttributeValues: {
            ":roomid": body.roomid,
        },
        UpdateExpression: "SET roomid = :roomid",
        ReturnValues: "ALL_NEW"
    };

    const data = await ddb.update(params).promise();
    if (data.Attributes) {
        return { statusCode: 200, body: 'Room joined.' };
    } else {
        return { statusCode: 400, body: 'Some error has occured.' };
    }
};

Let’s break the code into different parts for a better understanding of the code.

Getting and checking the room id

const body = JSON.parse(event.body)
if (!body.roomid) return { statusCode: 200, body: 'Room id is required.' };

Here we are getting the request body and parsing it as JSON data and we are also checking if roomid is not present in the body object or not because roomid is required if the user is trying to create/join a chat room.

Creating/join the chat room

const params = {
        TableName: process.env.DYNAMO_TABLE_NAME,
        Key: {
            connectionId: event.requestContext.connectionId,
        },
        ExpressionAttributeValues: {
            ":roomid": body.roomid,
        },
        UpdateExpression: "SET roomid = :roomid",
        ReturnValues: "ALL_NEW"
    };

    const data = await ddb.update(params).promise();
    if (data.Attributes) {
        return { statusCode: 200, body: 'Room joined.' };
    } else {
        return { statusCode: 400, body: 'Some error has occured.' };
    }

Here we are updating an entry in DynamoDB table according to the connectionId and also setting the column roomid with the value which is passed by the user in the request body, so for example if connectionId is #f!41fg and roomid passed by the user is test-chat-room then what this code will do is update the roomid column with the value test-chat-room in the row where connectionId is #f!41fg.

Sending a message to all connected users in the chat room

Now the final part of our project is to create a Lambda function to send a message to all connected users in a chat room if any user in the room sends a message, the code for this function will look like this –

const params = {
        TableName: process.env.DYNAMO_TABLE_NAME,
        Key: {
            connectionId: event.requestContext.connectionId,
        },
        ExpressionAttributeValues: {
            ":roomid": body.roomid,
        },
        UpdateExpression: "SET roomid = :roomid",
        ReturnValues: "ALL_NEW"
    };

    const data = await ddb.update(params).promise();
    if (data.Attributes) {
        return { statusCode: 200, body: 'Room joined.' };
    } else {
        return { statusCode: 400, body: 'Some error has occured.' };
    }

Let’s break down this function into different parts for better understanding.

Getting all connection id’s according to room id

let connectionData;
    try {
        const params = {
            TableName: process.env.DYNAMO_TABLE_NAME,
            FilterExpression: '#roomid = :roomid',
            ExpressionAttributeNames: {
                '#roomid': 'roomid',
            },
            ExpressionAttributeValues: {
                ':roomid': body.roomid
            },
        }

        connectionData = await ddb.scan(params).promise();
    } catch (e) {
        return { statusCode: 500, body: 'Could not send the message.' };
    }

When any user sends any message in a chat room, they must send the roomid, then we will use that roomid to find all the users connectionId’s associated with that roomid, here in this above code, we are using the roomid to find the records from DynamoDB table and store all that data in a variable called connectionData.

Sending a message to all connected users in a chat room

const apiGatewayMng = new AWS.ApiGatewayManagementApi({
        apiVersion: '2018-11-29',
        endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
});

    const postCalls = connectionData.Items.map(async ({ connectionId }) => {
        try {
            await apiGatewayMng.postToConnection({ ConnectionId: connectionId, Data: body.message }).promise();
        } catch (e) {
            if (e.statusCode === 410) {
                await ddb.delete({ TableName: DYNAMO_TABLE_NAME, Key: { connectionId } }).promise();
            } else {
                throw e;
            }
        }
    });

    try {
        await Promise.all(postCalls);
    } catch (e) {
        return { statusCode: 500, body: 'Could not send the message.' };
    }

    return { statusCode: 200, body: 'Message sent.' };
};

Here is the code which handles the functionality of sending a message to other users who are connected in a chat room if any member in the chat room sends any message, Let’s go through this code in detail –

Use of ApiGatewayManagementApi

const apiGatewayMng = new AWS.ApiGatewayManagementApi({
        apiVersion: '2018-11-29',
        endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
 });

ApiGatewayManagementApi is used to send the data to an endpoint, what we are doing here is creating an instance of this class to use the methods which ApiGatewayManagementApi provides, we are also getting the endpoint on which we are going to send the data from event data of our Lambda function.

Send the message in a chat room

const postCalls = connectionData.Items.map(async ({ connectionId }) => {
        try {
            await apiGatewayMng.postToConnection({ ConnectionId: connectionId, Data: body.message }).promise();
        } catch (e) {
            if (e.statusCode === 410) {
                await ddb.delete({ TableName: DYNAMO_TABLE_NAME, Key: { connectionId } }).promise();
            } else {
                throw e;
            }
        }
 });

If you are not familiar with javascript this code might seem confusing, what we are doing in this code is mapping through all the data which connectionData has if you remember connectionData is the collection of  connectionId's of users who are in a chat room.

  • postToConnection is the method which we are going to use to send a message to all the connected users in a chat room using the connectionId of the user.
  • Data property is the data that we want to send to the connected sockets.
  • postCalls will have the collection of pending Javascript Promises which are posting a message to each user in a particular chat room using user’s connectionId.

Using postCalls to resolve all the promises

try {
       await Promise.all(postCalls);
    } catch (e) {
        return { statusCode: 500, body: 'Could not send the message.' };
 }

We are passing postCalls which is a collection of pending promises into a function called Promise.all(), so what this function requires is an iterable array of promises and it returns a single promise resolved with an array of data after resolving each promise in an array, in easier words Promise.all() is going to send message to all the users in a chat room.

Woo! we are now Done! writing the code, it’s time to test this stuff out

We need to run sls deploy to deploy our code to the AWS and then we will get a URL that will look something like this –

URL – wss://{YOUR-API-ID}.execute-api.{YOUR-REGION}.amazonaws.com/dev

These are the steps we need to take to test this chat application –

  • Install an NPM named wscat by running this command – npm install wscat -g
  • Now run this command inside the terminal – wscat -c {your API Gateway URL} (without {}).
  • If you see this output –

then we are now connected to our Websocket server.

  • Now let’s create a new room named test room by sending this data – {“action”:”manageroom”,”roomid”:”test room”}.
  • After sending this data, we can go to our DynamoDB table and check if a new entry is created there or not with a connectionId with roomid.
  • This connection will work as one user who created a room and now is inside the chat room, let’s repeat this same process for another user by opening a new terminal window and running the same process from Step 2.
  • Now after repeating this process from another terminal window, check the DynamoDB table, if it has another entry with the same test room value as roomid then congrats our manage room code is working perfectly.
  • It’s time to send our first message by sending this data – {“action”:”sendmessage”,”roomid”:”test room”,”message”:”Hi there!”}.
  • If you see this output on both terminals –

then congratulations you have successfully posted your first message, now when any of the connected users send any message it will be shown to all the users which are in that chat room.

How to get this code?


Source Code On Github

LEAVE A REPLY

Please enter your comment!
Please enter your name here