At Heartbeat, we have a lot of different tasks that need to run at a particular time. Users can create draft posts or events that get published at a certain time. Event reminders need to be sent at a certain number of hours before an event. Automated workflows can be set up that send emails or direct messages after a delay.
For the longest time, all of these tasks were managed by a variety of cron scripts. We had createScheduledPosts.ts that would run every 15 minutes, scan our table of scheduled posts and create any that needed to be published. sendEventReminders.ts would run every single minute, scan our table of events, and send any notifications that needed to be sent out. And so on.
Each of these cron jobs would need to be managed independently. Whenever a new feature was added that involved running tasks in the future, a new cron job would be created. If one of the scripts started erroring, I’d need to figure out why, fix it and then figure out a way to retroactively run the tasks that were missed while the script was broken. Sometimes, we’d get reports from customers that a certain task that was supposed to run did not. I’d painstakingly dig into the logs & code, trying to figure out why a particular event reminder did not get sent on time. The first couple times this happened, I’d usually discover that we lacked the logs to even properly diagnose the issue. All I would be able to do is add some more logs and hope that I’d find the problem the next time. Once the logs were in place, I’d uncover some bug caused by timezones, improper error handling or who knows what else.
Eventually, I came to my senses and realized that all of these various cron jobs were doing the same thing. And rather than have 10 different cron jobs each implementing their own half-baked version of a task scheduler, we should just have a robust, centralized system for scheduling tasks.
The way it works is we have a single database table called ScheduledTasks with the following schema:
enum ScheduledTaskStatus { QUEUED EXECUTING COMPLETED } model ScheduledTask { id String @id communityID String createdAt DateTime lastStatusUpdate DateTime timestamp DateTime status ScheduledTaskStatus expectedExecutionTimeInMinutes Int expirationInMinutes Int? priority Int payload Json message String? @@index([status, timestamp]) }
payload is a discriminated union that contains each type of task we have. For example:
type ScheduledTaskPayload = | { type : " PUBLISH_EVENT " ; eventID : EventID ; } | { type : " PUBLISH_SCHEDULED_POST " ; scheduledPostID : ScheduledPostID ; } | { type : " SEND_EVENT_REMINDER " ; eventID : EventID ; } | { type : " SEND_EMAIL " ; email : string ; subject : string ; body : string ; };
Now, whenever we have a task that needs to be scheduled for the future, all we need to do is insert a new ScheduledTask into the database. We have a single cron job responsible for executing scheduled tasks that runs once every minute.
The cron job works as follows:
Get all tasks that meet the following criteria: Status is not Completed
timestamp is less than now + 30 seconds
The task has not expired ( now is less than timestamp + expirationInMinutes or expirationInMinutes is null)
is less than or is null) If status is Executing , now > timestamp + expectedExecutionTimeInMinutes Sort all of the tasks by priority Update all of the tasks as Executing in the database Create an AWS SQS message for each task
Separately, we have an SQS consumer that listens for the SQS messages. The consumer reads the payload discriminated union and calls the corresponding function responsible for executing the task.
async function processTask ( taskPayload : ScheduledTaskPayload ) { if (taskPayload.type === " PUBLISH_EVENT " ) { await publishEvent (taskPayload.eventID); } else if (taskPayload.type === " PUBLISH_SCHEDULED_POST " ) { await publishScheduledPost (taskPayload.scheduledPostID); } //... }
After the task runs, mark it as completed in the database. Some of our tasks will return a new scheduled task. If they do, insert the new scheduled task into the database. For example, after sending an event reminder for an instance of a recurring event, the next reminder is scheduled.
The system has retry logic built in. If for some reason the script does not run for some amount of time due to an outage or error, the scheduled tasks will still exist in the database. Once the script is running again, any tasks that were not executed when they were originally supposed to will be run. The expirationInMinutes enables us to control which tasks are run at a later time. Some tasks, such as event reminders, don’t make sense to be run after a certain point. Others, like publishing scheduled posts, fall into the “better late than never” bucket, in which case expirationInMinutes will be set to null. The expectedExecutionTimeInMinutes field lets us handle retry for tasks that get stuck in Executing . If a task that was scheduled for 10:00am is still marked as Executing at 10:01am, we probably don’t want to run the task again because the first run might still be in progress. However, by 10:08am, if the task is still stuck in Executing , it probably ran into an error and we can try running it again. expectedExecutionTimeInMinutes tells the system how long to wait until rerunning a task stuck in Executing .
To ensure tasks run at the right time, we need to make sure that whenever a change is made to an entity, the corresponding scheduled task is also updated. For example, when a user creates an event that starts at 3pm, we create a scheduled task for the reminder to be sent at 2pm. If the user later updates the event to be at 6pm, we need to update the same scheduled task to send the reminder at 5pm instead. We enable this by using consistent ids for editable tasks.
type ScheduledTaskPayload = | { type : " PUBLISH_EVENT " ; eventID : EventID ; } | { type : " PUBLISH_SCHEDULED_POST " ; scheduledPostID : ScheduledPostID ; } | { type : " SEND_EVENT_REMINDER " ; eventID : EventID ; } | { type : " SEND_EMAIL " ; email : string ; subject : string ; body : string ; }; function getTaskID ( payload : ScheduledTaskPayload ) { if (payload.type === " PUBLISH_EVENT " ) { return `task- ${ payload.type } - ${ payload.eventID } ` ; } else if (payload.type === " PUBLISH_SCHEDULED_POST " ) { return `task- ${ payload.type } - ${ payload.scheduledPostID } ` ; } else if (payload.type === " SEND_EVENT_REMINDER " ) { return `task- ${ payload.type } - ${ payload.eventID } ` ; } else if (payload.type === " SEND_EMAIL " ) { return generateUUID (); } else { assertNever (payload); } }
Whenever we create a task, we use getTaskID to get the id for the scheduled task. And rather than just creating the task, we do an upsert. So when a user creates an event for the first time, the scheduled task does not exist, so it will be created. When edits are made to the event, the eventID remains the same, so the id for the corresponding scheduled task will be the same. As a result, the previous scheduled task will be updated rather than creating a new one. Other tasks, such as SEND_EMAIL , can be triggered from a variety of sources and are not directly editable by the user, so those tasks just use a uuid instead.
Overall, creating the scheduled tasks system has come with enormous benefits:
We can centralize the work into one place. Our scheduled tasks cron job has a bunch of logging, error handling and monitoring built into it. We can implement all of this logic once instead of having to constantly reimplement it across various scripts.
Retries are handled naturally. If we have an outage or if one our task handlers starts running into errors, it’s easy for us to catch up on tasks that we missed after we fix the problem.
There’s a lasting record of every task that gets executed. If a user asks about why a certain action didn’t happen, we can check the database & logs to see whether the task got created. If it did get created, we can see if it ran into a particular error.
Implementing new features that involve task scheduling is now trivial. Instead of having to create a new cron script, all we need to do is add a new type to the ScheduledTaskPayload and a new handler to the processTask function.
Many of you probably already had the foresight to centralize your scheduled tasks into one place. But I haven’t seen too many people talking about this problem, so hopefully this was helpful for anyone that’s currently stuck maintaining a sea of scattered cron jobs.