Thread-Ring benchmark C++ implementation with Piped threads

The thread-ring is a standard benchmark challenge to validate the performance of sequential communication among parallel threads.

The challenge mandates that, each program should

  • create 503 linked pre-emptive threads (named 1 to 503)
  • thread 503 should be linked to thread 1, forming an unbroken ring
  • pass a token to thread 1
  • pass the token from thread to thread N times
  • print the name of the last thread (1 to 503) to take the token

Some of the benchmark results can be seen here: Thread-ring benchmark results.

While there are many elegant solutions with C++ on achieving the best results for this benchmark, in this post we will try to demonstrate how to solve this in a basic way using native pipes as a communication channel among threads. Note that, many faster implementations that you see on the benchmark results page do not really create 503 real OS threads (including the Haskel and C++ Boost Asio implementation) – rather what they do is, create a handful of worker threads (e.g. thread pool of 4 or 8 OS threads) and keep reusing them, which works fine in this case, because only one thread is active at any one point of time.

In our code below, we rather stick to the basic guideline of the challenge and create 503 real OS threads and make them communicate with pipes. Each thread holds two pipe handles (one for reading data from and one to write data to). The pipes are connected in such a way that the thread[i]‘s written data goes to the thread[i+1]‘s read end (looped over at the end to close the cycle to the beginning).

When all the pipe connections are made, we inject the token on the last thread’s write pipe handle (which is the same as the first thread’s read pipe end) and start the process. Here is the code, run it on your system, and see the timings. Post your results here if you like what you see.

////////////////////////////////////////////////////////
/// (c) 2014. Cenacle Research
/// Basic version of ThreadRing benchmark implementation with Pipes between threads
///
///////////////////////////////////////////////////////

#define _GNU_SOURCE
#include <pthread.h>
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <inttypes.h>
#include <time.h>
#include <fcntl.h>

#if defined(__MINGW_H)
#define pipe(fds) _pipe(fds, 32, _O_BINARY)
#endif

#define NUMBER_OF_THREADS 503

pthread_mutex_t cv_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv_main = PTHREAD_COND_INITIALIZER;
pthread_cond_t *cvs = NULL;
uint32_t token = 0;
uint32_t token_count = 1000;
uint32_t threads_started = 0;
uint32_t number_of_cpus = 0;


struct _tio
{
        int readFd;
        int writeFd;
        short thread_id;
};

/* POSIX Thread requires this signature*/
void* threadEntryProc(void* pArg)
{
        _tio* tio = (_tio*)pArg;       
        int tokenMax = token_count + 1;
        int token;
        do
        {
                if (read(tio->readFd, &token, sizeof(token)) != sizeof(token))
                        perror("read failure inside thread");

                if (write(tio->writeFd, &++token, sizeof(token)) != sizeof(token))
                        perror("write failure inside thread");

                if (token == tokenMax)
                {
                        printf("%d\n", tio->thread_id+1); break;
                }

                if (token > tokenMax) break;

        } while (1);

        pthread_exit(NULL);
}

void ThreadRing()
{
        _tio tio[NUMBER_OF_THREADS];
        int fd[NUMBER_OF_THREADS][2];

        int nLastThread = NUMBER_OF_THREADS - 1;
        int nTokenStart = 0;

        /* create up the pipes */
        {
                for (int i = 0; i < NUMBER_OF_THREADS; ++i)
                {
                        if (pipe(fd[i]))
                                exit(fprintf(stderr, "\nUnable to create Pipe for %d\n", i));
                }
        }
        /* setup the pipe ends */
        {
                for (int i = 0; i < NUMBER_OF_THREADS-1; ++i)
                {
                        tio[i].readFd = fd[i][0];
                        tio[i].writeFd = fd[i + 1][1];
                        tio[i].thread_id = i;
                }
                {
                        tio[nLastThread].readFd = fd[nLastThread][0];
                        tio[nLastThread].writeFd = fd[0][1];
                        tio[nLastThread].thread_id = nLastThread;
                }
        }

        pthread_t threadObj[NUMBER_OF_THREADS];
        /* Create the threads */
        {
                for (int i = 0; i < NUMBER_OF_THREADS; ++i)
                if (pthread_create(&threadObj[i], NULL, threadEntryProc, &tio[i]))
                        exit(fprintf(stderr, "\nUnable to create thread %d\n", i));
        }

        /* we send on last thread's writing fd, because thats where the first thread is listening*/
        if (write(tio[nLastThread].writeFd, &nTokenStart, sizeof(nTokenStart)) != sizeof(nTokenStart))
                perror("token send initiation failure");

        /* wait for all threads to finish*/
        {
                for (int i = 0; i < NUMBER_OF_THREADS; ++i)
                if (pthread_join(threadObj[i], NULL))
                        perror("error while waiting for threads to finish");
        }

        return;
}

int freeRun()
{
        int token = 0;
        for (int i = 0; i < token_count; ++i)
                ++token;
        return token;
}

int main(int argc, char **argv)
{
        clock_t start, end;

        if (argc > 1)
                token_count = strtol(argv[1], NULL, 0);

        start = clock();
        argc = freeRun(); /* just storing it some var so that gcc wont optimize it out */
        end = clock();
        printf("\nFreeRun: time start=%ld, end=%ld, spent: %f\n", start, end, (double)(end - start) / CLOCKS_PER_SEC);

        start = clock();
        ThreadRing();
        end = clock();
        printf("\nThreadRing: time start=%ld, end=%ld, spent: %f\n", start, end, (double)(end - start) / CLOCKS_PER_SEC);
}

Leave a Reply