As the entire purpose of embedded Linux in many cases such as industrial control, automotive control, and other similarly time-sensitive areas often is to have real-time capable control with an OS-level software stack, it follows, naturally, that one would want to write real-time software on Linux.
Before I start, note that the main external reference for this section is The Linux Foundation’s wiki. However, Linux documentation should always be referenced in all cases, and since the aforementioned wiki is rather lacking in some information, a lot of this comes from my own testing and experience.
As with all real-time systems, you should experiment, do your homework, and test what works best for you rather than blindly following what someone online said will work. This worked for me, and should work in most or all other cases, but may not be appropriate for your use case.
I encourage you to read this excellent paper written by Pale E McKenney of the IBM Linux Technology Center. This paper explains how to choose whether you want a real-time kernel (using the PREEMPT RT patches and the like) or a standard “real fast” kernel (the sort an average desktop user might use). There are benchmarks, code samples, and hard data explaining when to choose each.
Here’s a few quick details:
Now that you should be fully aware of which to choose (or at least have some inkling that you should choose one or the other, but may need to do more testing), we can get on with the real-time stuff.
As always, the Linux manual pages are a great source of information for all things Linux. I will refer to them as we go, so you will want to be cross-referencing with them as well.
Linux has a whole bunch of different scheduling types. The real-time scheduling options are FIFO, RR, and most recently, DEADLINE. These are explained in great detail in sched(7), so I will not go into a lot of detail here. The basic concepts are that for a real-time system without EDF (earliest-deadline-first) scheduling, you will want to use FIFO: first in, first out. The real-time priorities on Linux are given priorities 0-99, with the exception of DEADLINE, which is higher priority than the others and can (and will) preempt them as necessary to meet the task’s deadline.
The most straightforward way of doing real-time scheduling (and likely the most common) on Linux is to use FIFO. Your real-time tasks should avoid doing extensive I/O when possible and should spend as little time as possible doing blocking operations when you can avoid it, such that other threads with lower priority can do their work without being preempted all the time. Additionally, it’s commonly recommended not to give a task priority 99, as this priority will compete with interrupt handlers and the like, causing a lot of slowdowns. Instead, max your tasks out at 98 when using FIFO or RR scheduling.
As usual, in accordance with best practices, you should avoid memory allocation in hot paths of real-time tasks. You should use statically allocated memory when possible, or if you must use dynamic allocation, you should allocate it ahead of time as much as possible. At the very least, you should allocate a pool of memory to use later on if you must.
When memory is swapped out and must be retrieved (among other common operations), a page fault is generated. These can be either minor or major page faults. There are plenty of details online about how these work, and the word “fault” here does not actually mean it’s an error. However, these do take some computational time away from your program, so you should always lock your program’s memory space such that it doesn’t get swapped out. See mlock(2) for more details on this process.
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
int main(void) {
if (mlockall(MCL_CURRENT | MCL_FUTURE) < 0) {
perror("mlockall");
exit(1);
}
return 0;
}
If you want to create a task (aside from main, which will simply wait for the task to finish) which has FIFO scheduling and priority 98, you will write something like this:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sched.h>
#include <sys/mman.h>
// this function will be run by the task we create, so any RT task stuff should
// happen here
void* rt_func(void* data) {
// (do RT stuff)
return NULL;
}
int main(void) {
struct sched_param sp;
pthread_attr_t attr;
pthread_t thread;
int ret;
if (mlockall(MCL_CURRENT | MCL_FUTURE) < 0) {
perror("mlockall");
exit(1);
}
ret = pthread_attr_init(&attr);
if (ret < 0) {
perror("pthread_attr_init");
exit(1);
}
// this step is optional, as this should be the default value
ret = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
if (ret < 0) {
perror("pthread_attr_setstacksize");
exit(1);
}
ret = pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
if (ret < 0) {
perror("pthread_attr_setschedpolicy");
exit(1);
}
memset(&sp, 0, sizeof(sp));
// since technically the max priority is implementation-defined, we will query
// it and then subtract 1
sp.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1;
ret = pthread_attr_setschedparam(&attr, &sp);
if (ret < 0) {
perror("pthread_attr_setschedparam");
exit(1);
}
// by default, threads will inherit the parent's scheduling, so we will tell
// this thread to use the sched params we just gave it instead of inheriting
ret = pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);
if (ret < 0) {
perror("pthread_attr_setinheritsched");
exit(1);
}
// create our thread
ret = pthread_create(&thread, &attr, rt_func, NULL);
if (ret < 0) {
perror("pthread_create");
exit(1);
}
// wait for the thread to return (we could also proceed to do other things
// here as well, but for this example we will just wait)
ret = pthread_join(thread, NULL);
if (ret < 0) {
perror("pthread_join");
exit(1);
}
return 0;
}
You could, of course, create more tasks with other priorities (or even the same; see sched(7) for details) to do other things. These would be preempted by the higher priority ones.
Earliest-deadline-first scheduling is quite new to Linux. I have tested a little and it seems to work quite well and yield excellent performance. However, as it requires a little more work than FIFO at the moment (glibc currently does not have wrappers for some of the syscalls and structures used), it’s best to simply refer to the kernel documentation for this one: Deadline Task Scheduling
When writing a real-time application, it is important that your tasks perform their duties at a specific interval and sleep during the unused time so that other lower priority tasks can do their work. There are a handful of ways to accomplish this, but I have adopted a very simple and reliable way based on the solution found here. This can easily be wrapped in a C++ class (which is, in fact, what I did) to make it more convenient to use. That is left as an exercise to the reader. Instead, here is an example implementation that causes a task to wakeup every 1ms, do some work, and then go back to sleep for the remainder of the period. This is base on the link above, but also includes error handling.
#include <errno.h>
#include <time.h>
struct tasktimer {
struct timespec next_period;
long period_ns;
};
inline int tasktimer_init(struct tasktimer* tmr, long period_ns) {
tmr->period_ns = period_ns;
return clock_gettime(CLOCK_MONOTONIC, &tmr->next_period);
}
inline void tasktimer_inc(struct tasktimer* tmr) {
tmr->next_period.tv_nsec += tmr->period_ns;
// since the number of nanoseconds is not allowed to go above 999999999, we
// have to deal with rollover of the nanoseconds into seconds here
while (tmr->next_period.tv_nsev >= 1000000000L) {
++tmr->next_period.tv_sec;
tmr->next_period.tv_nsec -= 1000000000L;
}
}
inline int tasktimer_wait(struct tasktimer* tmr) {
int ret;
tasktimer_inc(tmr);
// this loop is required to deal with the sleep being interrupted by signals
// (if any)
do {
ret = clock_nanosleep(CLOCK_MONOTONIC,
TIMER_ABSTIME,
&tmr->next_period,
NULL);
} while (ret < 0 && errno == EINTR);
return ret;
}
void* rt_task(void* data) {
struct tasktimer timer;
int ret;
ret = tasktimer_init(&timer, 1000000);
if (ret < 0) {
perror("tasktimer_init");
// handle error however fits your program the best
}
while (1) {
// do RT work here...
ret = tasktimer_wait(&timer);
if (ret < 0) {
perror("tasktimer_wait");
// handle error however fits your program the best
}
}
return NULL;
}
In the above example, the basic idea is that each task will follow the following (pseudocode) steps:
t := current time
while true:
do work
t := t + 1ms
sleep until t
This may initially seem odd to some, as it is much more intuitive to do work,
then sleep for 1ms, then repeat. However, since the task’s work will take some
amount of time, this means that you will be sleeping the task for 1 millisecond,
but you won’t be accurately keeping time. If the task starts its work at 0ms,
takes 50μs to complete, and then sleeps for 1ms, the next cycle will start at
1.05ms rather than 1ms. By using an absolute time for t
in this example, the
task is guaranteed to always do its work at precise (as precise as the timer can
be, anyway) 1ms intervals, so long as the work takes less than 1ms.
When the work takes longer than 1ms (so that t + 1ms
is in the past), this
does nothing special to handle that situation, so it’s up to you to deal with
it. To understand the behavior in the example above, we can refer to the manual
page for clock_nanosleep(3p)
:
If, at the time of the call, the time value specified by
rqtp
is less than or equal to the time value of the specified clock, thenclock_nanosleep()
shall return immediately and the calling process shall not be suspended.
Thus, if the task in the example above were to do 1.5ms of work, it would immediately wake again and do work, and would repeat this until it caught up. It’s up to you whether or not this is acceptable, but in most cases you should avoid doing more work than the time window you’re allotted, of course.
The most important and most confusing part of the code above is the
tasktimer_wait()
function. This code looks a little bit cryptic, but it’s
actually quite simple. First, we increment the time value so we know when to
sleep until, which should be pretty intuitive. Next, we just need to sleep until
that time, right? Why is there a loop there? If we check the manual page for
clock_nanosleep()
again, it will shed a little light on this behavior:
The suspension time for the absolute
clock_nanosleep()
function (that is, with theTIMER_ABSTIME
flag set) shall be in effect at least until the value of the corresponding clock reaches the absolute time specified byrqtp
, except for the case of being interrupted by a signal.
I added bold to emphasize the important part: the sleep can be interrupted by
the delivery of a signal to the thread. After a little more reading, we come to
find out that if an error occurs, this function returns -1 (whereas it returns
0 on success), and that if the sleep was interrupted by a signal, it will set
errno
to EINTR
. In order to resume the sleep when we receive a signal, we
use the loop to check if the return value was negative and errno
is set to
EINTR
. If either of those is not true, we exit the loop and return whatever
return value clock_nanosleep()
gave us, regardless of whether it was
successful or not.
CLOCK_REALTIME
or CLOCK_MONOTONIC
?While the last 3 arguments to clock_nanosleep()
are well-explained in the man
page, the first one is a little less obvious. Why choose CLOCK_MONOTONIC
over
the other clock options? If we check the POSIX manual page for
time.h(0p)
, we will
see that there is a CLOCK_REALTIME
. Why not use the REALTIME
clock for our
real-time tasks?
Unfortunately, this is an easy mistake to make, but the name of the clock has
nothing to do with real-time, but rather, it measures real time.
CLOCK_REALTIME
is the actual wall clock time, and can changed backwards,
forwards, and so on at the whim of an administrator or any other process.
However, the value of CLOCK_MONOTONIC
cannot be changed. In fact, the
clock_settime()
function will immediately return EINVAL
if you attempt to
set the monotonic clock. It can never go backwards, be changed by an
administrator, and is not affected by the wall clock time. It is purely
a measure of how long it has been in seconds and nanoseconds since some
arbitrary fixed point in the past, which is precisely what we need here.
Synchronizing data between real-time tasks has some quirks compared to a normal non-real-time program. You must take care to ensure your application employs proper priority inversion, else you will run into deadlocks between tasks when using mutexes and other synchronization types!
Securing shared data is done with a pthread mutex, like normal, but with one major difference: you must set the mutex to inherit the priority of the tasks waiting on it. This allows for proper priority inversion when tasks are waiting on other tasks to finish with the lock. When creating your mutex, you should do the following (the initialization of the mutex has been left out for brevity, so see pthread_mutex_init(3p) for more info). Error handling has been omitted for brevity.
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
// enable priority inversion
pthread_mutexattr_setprotocol(&attr, PTHREAD_PRIO_INHERIT);
// set to at least the highest priority of the threads which will lock it
pthread_mutexattr_setprioceiling(&attr, sched_get_priority_max(SCHED_FIFO));
I cannot stress this enough: DO NOT USE glibc’s pthread condition variables! They do NOT properly implement priority inheritance and will cause unbounded priority inversion. I cannot seem to find this documented anywhere, but the issue has been open since 2010 and doesn’t appear to have any real solution in sight. If you need something like a condition variable, you should manually implement it. This is a fair bit of boilerplate, but it is necessary to properly achieve this functionality with glibc. I don’t know about other C libraries, but for glibc you must absolutely avoid pthread condition variables at all costs!
I found this out after using them for quite some time. It turns out that by removing them from my program, the CPU utilization of the system went down by nearly 40%. It would be quite nice if GNU decided to document this somewhere, but they didn’t, so here we are.