How to use Semaphores in POSIX Concurrency Control
Concept of the semaphore and its counterpart object in the pthread library: the POSIX semaphore, with an example.
Concurrency simply means having multiple pieces of logic within a program being executed simultaneously. Modern software systems are often concurrent, as programs need to run various pieces of logic at the same time. Concurrency can be implemented using only one of the multithreading or multi-processing approaches within a POSIX compliant system.
We are going to have a look at possible control mechanisms that are offered by the pthread
library. Semaphores, mutexes, and condition variables alongside different types of locks are used in various combinations to bring determinism to multithreaded programs. This article is focused on Semaphores — the difference between Binary and General Semaphores and how it helps preserve Data Integrity.
This article is an excerpt from the book Extreme C by Kamran Amini. This book is a highly comprehensive resource that guides you through the most advanced capabilities of C programming.
POSIX Semaphores
In most cases, mutexes (or binary semaphores) are enough to synchronize several threads accessing a shared resource. That is because, in order to make read and write operations sequentially, only one thread should be able to enter the critical section at a time. It is known as mutual exclusion, hence, “mutex.” In some scenarios, however, you might want to have more than one thread to enter the critical section and operate on the shared resource. This is the scenario in which you should use general semaphores.
Binary semaphores
The following code is the solution made using semaphores. It involves two threads; each of them incrementing a shared integer by a different value. We want to protect the data integrity of the shared variable. Note that we won’t be using POSIX mutexes in the following code:
#include <stdio.h>
#include <stdlib.h>// The POSIX standard header for using pthread library
#include <pthread.h>// The semaphores are not exposed through pthread.h
#include <semaphore.h>// The main pointer addressing a semaphore object used
// to synchronize the access to the shared state.
sem_t *semaphore;void* thread_body_1(void* arg) {
// Obtain a pointer to the shared variable
int* shared_var_ptr = (int*)arg;
// Waiting for the semaphore
sem_wait(semaphore);
// Increment the shared variable by 1 by writing directly
// to its memory address
(*shared_var_ptr)++;
printf("%d\n", *shared_var_ptr);
// Release the semaphore
sem_post(semaphore);
return NULL;
}void* thread_body_2(void* arg) {
// Obtain a pointer to the shared variable
int* shared_var_ptr = (int*)arg;
// Waiting for the semaphore
sem_wait(semaphore);
// Increment the shared variable by 1 by writing directly
// to its memory address
(*shared_var_ptr) += 2;
printf("%d\n", *shared_var_ptr);
// Release the semaphore
sem_post(semaphore);
return NULL;
}
int main(int argc, char** argv) {
// The shared variable
int shared_var = 0;// The thread handlers
pthread_t thread1;
pthread_t thread2;#ifdef __APPLE__
// Unnamed semaphores are not supported in OS/X. Therefore
// we need to initialize the semaphore like a named one using
// sem_open function.
semaphore = sem_open("sem0", O_CREAT | O_EXCL, 0644, 1);
#else
sem_t local_semaphore;
semaphore = &local_semaphore;
// Initiliaze the semaphore as a mutex (binary semaphore)
sem_init(semaphore, 0, 1);
#endif // Create new threads
int result1 = pthread_create(&thread1, NULL,
thread_body_1, &shared_var);
int result2 = pthread_create(&thread2, NULL,
thread_body_2, &shared_var);
if (result1 || result2) {
printf("The threads could not be created.\n");
exit(1);
}
// Wait for the threads to finish
result1 = pthread_join(thread1, NULL);
result2 = pthread_join(thread2, NULL);
if (result1 || result2) {
printf("The threads could not be joined.\n");
exit(2)
}#ifdef __APPLE__
sem_close(semaphore);
#else
sem_destroy(semaphore);
#endif return 0;
}
The first thing you might notice in the preceding code is the different semaphore functions that we’ve used in Apple systems. In Apple operating systems (macOS, OS X, and iOS), unnamed semaphores are not supported. Therefore, we couldn’t just use sem_init
and sem_destroy
functions. Unnamed semaphores don’t have names (surprisingly enough) and they can only be used inside a process, by a number of threads. Named semaphores, on the other hand, are system-wide and can be seen and used by various processes in the system.
In Apple systems, the functions required for creating unnamed semaphores are marked as deprecated, and the semaphore object won’t get initialized by sem_init. So, we had to use sem_open
and sem_close
functions in order to define named semaphores instead.
In other POSIX-compliant operating systems, Linux specifically, we still can use unnamed semaphores and have them initialized and destroyed by using the sem_init
and sem_destroy
functions respectively.
In the preceding code, we have included an extra header file, semaphore.h.
Inside the main
function, and in Apple systems, we create a named semaphore sem0
. In other POSIX-compliant operating systems, we initialize the semaphore using sem_init
. Note that in this case, the pointer semaphore points to the variable local_sempahore
allocated on top of the main thread’s Stack. The pointer semaphore
won’t become a dangling pointer because the main thread doesn’t exit and waits for the threads to get complete by joining them.
The maximum number of threads that are allowed to be in the critical section is determined when initializing the semaphore object. We have passed the value 1 for the maximum number of threads as the last argument to the sem_open
and sem_init
functions; therefore, the semaphore is supposed to behave like a mutex.
To get a better understanding of semaphores, let’s dive a bit more into the details. Each semaphore object has an integer value. Whenever a thread waits for a semaphore by calling the sem_wait
function if the semaphore’s value is greater than zero, then the value is decreased by 1 and the thread is allowed to enter the critical section. If the semaphore’s value is 0, the thread must wait until the semaphore’s value becomes positive again. Whenever a thread exits the critical section by calling the sem_post
function, the semaphore’s value is incremented by 1. Therefore, by specifying the initial value 1, we will eventually get a binary semaphore.
General semaphores
Here we shall consider a classic example that uses general semaphores. Here the scenario in which multiple threads are allowed to enter the critical section is interesting.
This classic example involves the creation of 50 water molecules. For 50 water molecules, you need to have 50 oxygen atoms and 100 hydrogen atoms. If we simulate each atom using a thread, we require two hydrogen threads, and one oxygen thread to enter their critical sections, in order to generate one water molecule and have it counted.
In the following code, we firstly create 50 oxygen threads and 1 00 hydrogen threads. For protecting the oxygen thread’s critical section, we use a mutex, but for the hydrogen threads’ critical sections, we use a general semaphore that allows two threads to enter the critical section simultaneously.
For signaling purposes, we use POSIX barriers, but since barriers are not implemented in Apple systems, we need to implement them using mutexes and condition variables. The following is the code:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <limits.h>
#include <errno.h> // For errno and strerror function
// The POSIX standard header for using pthread library
#include <pthread.h>
// Semaphores are not exposed through pthread.h
#include <semaphore.h>
#ifdef __APPLE__
// In Apple systems, we have to simulate the barrier
functionality.
pthread_mutex_t barrier_mutex;
pthread_cond_t barrier_cv;
unsigned int barrier_thread_count;
unsigned int barrier_round;
unsigned int barrier_thread_limit;void barrier_wait(){
pthread_mutex_lock(&barrier_mutex);
barrier_thread_count++;
if (barrier_thread_count >= barrier_thread_limit) {
barrier_thread_count = 0;
barrier_round++;
pthread_cond_broadcast(&barrier_cv);
} else {
unsigned int my_round = barrier_round;
do {
pthread_cond_wait(&barrier_cv, &barrier_mutex);
} while (my_round == barrier_round);
}
pthread_mutex_unlock(&barrier_mutex);
}
#else
// A barrier to make hydrogen and oxygen threads synchronized
pthread_barrier_t water_barrier;
#endif// A mutex in order to synchronize oxygen threads
pthread_mutex_t oxygen_mutex;
// A general semaphore to make hydrogen threads synchronized
sem_t* hydrogen_sem;
// A shared integer counting the number of made water molecules
unsigned int num_of_water_molecules;
void* hydrogen_thread_body(void* arg) {
// Two hydrogen threads can enter this critical section
sem_wait(hydrogen_sem);
// Wait for the other hydrogen thread to join
#ifdef __APPLE__
barrier_wait();
#else
pthread_barrier_wait(&water_barrier);
#endif
sem_post(hydrogen_sem);
return NULL;
}void* oxygen_thread_body(void* arg) {
pthread_mutex_lock(&oxygen_mutex);
// Wait for the hydrogen threads to join
#ifdef __APPLE__
barrier_wait();
#else
pthread_barrier_wait(&water_barrier);
#endif
num_of_water_molecules++;
pthread_mutex_unlock(&oxygen_mutex);
return NULL;
}
int main(int argc, char** argv) { num_of_water_molecules = 0; // Initialize oxygen mutex
pthread_mutex_init(&oxygen_mutex, NULL);
// Initialize hydrogen semaphore
#ifdef __APPLE__
hydrogen_sem = sem_open(“hydrogen_sem”,
O_CREAT | O_EXCL, 0644, 2);
#else
sem_t local_sem;
hydrogen_sem = &local_sem;
sem_init(hydrogen_sem, 0, 2);
#endif // Initialize water barrier
#ifdef __APPLE__
pthread_mutex_init(&barrier_mutex, NULL);
pthread_cond_init(&barrier_cv, NULL);
barrier_thread_count = 0;
barrier_thread_limit = 0;
barrier_round = 0;
#else
pthread_barrier_init(&water_barrier, NULL, 3);
#endif // For creating 50 water molecules, we need 50 oxygen atoms and
// 100 hydrogen atoms
pthread_t thread[150];
// Create oxygen threads
for (int i = 0; i < 50; i++) {
if (pthread_create(thread + i, NULL,
oxygen_thread_body, NULL)) {
printf(“Couldn’t create an oxygen thread.\n”);
exit(1);
}
}
// Create hydrogen threads
for (int i = 50; i < 150; i++) {
if (pthread_create(thread + i, NULL,
hydrogen_thread_body, NULL)) {
printf(“Couldn’t create an hydrogen thread.\n”);
exit(2);
}
}
printf(“Waiting for hydrogen and oxygen atoms to react …\n”);
// Wait for all threads to finish
for (int i = 0; i < 150; i++) {
if (pthread_join(thread[i], NULL)) {
printf(“The thread could not be joined.\n”);
exit(3);
}
}
printf(“Number of made water molecules: %d\n”,
num_of_water_molecules);
#ifdef __APPLE__
sem_close(hydrogen_sem);
#else
sem_destroy(hydrogen_sem);
#endif return 0;
}
At the beginning of the code, there are several lines that are surrounded by #ifdef __APPLE__
and #endif
. These lines are only compiled in Apple systems. These lines are mainly the implementation and variables required for simulating POSIX barrier behavior. In other POSIX-compliant systems other than Apple, we use an ordinary POSIX barrier.
As part of several global variables defined in the preceding code, we have declared the mutex oxygen_mutex
which is supposed to protect the oxygen threads’ critical sections. At each time, only one oxygen thread (or oxygen atom) can enter the critical section.
Then in its critical section, an oxygen thread waits for two other hydrogen threads to join and then it continues to increment the water molecule counter. The increment happens within the oxygen’s critical section.
To elaborate more on the things that happen inside the critical sections, we need to explain the role of the general semaphore. In the preceding code, we have also declared the general semaphore, hydrogen_sem
which is supposed to protect hydrogen threads’ critical sections. At each time, only a maximum of two hydrogen threads can enter their critical sections, and they wait on the barrier object shared between the oxygen and hydrogen threads.
When the number of waiting threads on the shared barrier object reaches two, it means that we have got one oxygen and two hydrogens, and then voilà: a water molecule is made, and all waiting threads can continue. Hydrogen threads exit immediately, but the oxygen thread exists only after incrementing the water molecules counter.
We close this section with this last note. In this example, we used the pthread_cond_broadcast
function when implementing the barriers for Apple systems. It signals all threads, waiting on the barrier’s condition variable, that are supposed to continue after having other threads joining them.
Summary:
Concurrency is an important aspect of programming and maintaining data integrity is the key. In this article, we explored the general semaphore as a part of thread synchronization in multi-threaded programming. Further, we explored POSIX condition variables to wait for a specific condition, various types of locks together with mutexes and condition variables, the memory structure of a thread and how this structure can affect memory visibility in a multi-core system and much more…
About the author:
Kamran Amini is a senior professional specialized in embedded and kernel development. He has worked for numerous Iranian well-known companies. In 2017, he moved to Europe to work as a senior architect and engineer for highly reputable companies such as Jeppesen, Adecco, TomTom, and ActiveVideo Networks. While residing in Amsterdam, he worked on his first book, Extreme C, published by PacktPub. His main areas of interest are computation theory, distributed systems, machine learning, information theory, and quantum computation. Parallel to his professional career, he is studying Astronomy and Planetary sciences.