×
Samples Blogs Make Payment About Us Reviews 4.9/5 Order Now

Process Data in Multiple Threads with A Single Output Thread in C Assignment Solutions

July 08, 2024
Sarah Williams
Sarah Williams
🇨🇦 Canada
C
Sarah is a skilled C programmer with a bachelor's degree in computer science and over 600 completed orders. Specializing in data structure implementation, she delivers meticulously crafted solutions that leverage arrays, linked lists, trees, and hash tables for effective mapping operations.
Key Topics
  • Instructions
  • Requirements and Specifications
Tip of the day
Familiarize yourself with OCaml's pattern matching; it simplifies handling recursive data structures like lists and trees, making your code concise and easier to debug.
News
In 2024, Girls Who Code introduced a Data Science + AI track in their free summer programs for high school students, fostering skills in cybersecurity and creative coding​

Instructions

Objective

Write a program to process data in multiple threads with a single output thread in C assignment solutions.

Requirements and Specifications

Process data in multiple threads with single output thread in C language

Screenshots of output

Process data in multiple threads with single output thread in C language 2

Source Code

#include <stdio.h> #include <unistd.h> #include <pthread.h> #define BUFFER_SIZE 100 /* size of input buffer */ char inputBuffer[BUFFER_SIZE]; /* input buffer */ pthread_mutex_t inputMutex; /* mutex to access buffer */ pthread_cond_t inputProduce; /* condition to access buffer */ pthread_cond_t inputConsume; /* condition to access buffer */ int buffStart = 0; /* variables to keep track of reads and writes to the buffer */ int buffEnd = 0; int buffLen = 0; char replacementBuffer[BUFFER_SIZE]; /* replacement buffer */ pthread_mutex_t replacementMutex; /* mutex to access buffer */ pthread_cond_t replacementProduce; /* condition to access buffer */ pthread_cond_t replacementConsume; /* condition to access buffer */ int replStart = 0; /* variables to keep track of reads and writes to the buffer */ int replEnd = 0; int replLen = 0; char outputBuffer[BUFFER_SIZE]; /* output buffer */ pthread_mutex_t outputMutex; /* mutex to access buffer */ pthread_cond_t outputProduce; /* condition to access buffer */ pthread_cond_t outputConsume; /* condition to access buffer */ int outputStart = 0; /* variables to keep track of reads and writes to the buffer */ int outputEnd = 0; int outputLen = 0; void *inputThread(void *arg) { int c; char *stop = "STOP\n"; char buffer[5]; int i, n = 0, start = 1; int terminate = 0; while(!terminate) { read(STDIN_FILENO, &c, 1); buffer[n++] = c; /* save in initial buffer */ if (c == '\n') /* if detected end of line */ { start = 1; } else if (start) /* we are at start of new line */ { start = 0; if (c == 'S') /* could be stop */ continue; /* accumulate next chars */ } if (n > 1 && c == stop[n - 1]) /* we are still inside a stop */ { if (n == 5) /* if we matched the whole stop */ { n = 1; /* send a single char */ buffer[0] = 0; /* send end of input */ terminate = 1; /* terminate after sending end of input */ } else continue; /* else, keep adding chars */ } for (i = 0; i < n; i++) /* send all accumulated chars */ { pthread_mutex_lock(&inputMutex); /* wait until there's space to save a character*/ while (buffLen >= BUFFER_SIZE) pthread_cond_wait(&inputProduce, &inputMutex); inputBuffer[buffEnd++] = buffer[i]; /* save read char in buffer*/ buffEnd %= BUFFER_SIZE; buffLen++; pthread_cond_signal(&inputConsume); /* wake consumer */ /* release the lock */ pthread_mutex_unlock(&inputMutex); } n = 0; /* reset buffer */ } pthread_exit(NULL); /* terminate thread */ } void *lineSeparatorThread(void *arg) { int c; int terminate = 0; while(!terminate) { /* consumer part*/ pthread_mutex_lock(&inputMutex); /* acquire lock */ /* wait until there's a character*/ while (buffLen == 0) pthread_cond_wait(&inputConsume, &inputMutex); c = inputBuffer[buffStart++]; /* read char from input buffer*/ buffStart %= BUFFER_SIZE; buffLen--; pthread_cond_signal(&inputProduce); /* wake producer */ /* release the lock */ pthread_mutex_unlock(&inputMutex); if (c == '\n') /* make the replacement */ c = ' '; else if (c == 0) /* end of input*/ terminate = 1; /* terminate after sending end of input */ /* producer part */ pthread_mutex_lock(&replacementMutex); /* wait until there's space to save a character*/ while (replLen >= BUFFER_SIZE) pthread_cond_wait(&replacementProduce, &replacementMutex); replacementBuffer[replEnd++] = c; /* save read char in buffer*/ replEnd %= BUFFER_SIZE; replLen++; pthread_cond_signal(&replacementConsume); /* wake consumer */ /* release the lock */ pthread_mutex_unlock(&replacementMutex); } pthread_exit(NULL); /* terminate thread */ } void *plusSignThread(void *arg) { int c; char buffer[2]; int i, n = 0; int terminate = 0; while(!terminate) { /* consumer part*/ pthread_mutex_lock(&replacementMutex); /* acquire lock */ /* wait until there's a character*/ while (replLen == 0) pthread_cond_wait(&replacementConsume, &replacementMutex); c = replacementBuffer[replStart++]; /* read char from input buffer*/ replStart %= BUFFER_SIZE; replLen--; pthread_cond_signal(&replacementProduce); /* wake producer */ /* release the lock */ pthread_mutex_unlock(&replacementMutex); buffer[n++] = c; if (c == '+') /* if detected '+' */ { if (n == 1) { continue; } else { buffer[0] = '^'; /* replace for '^' */ n = 1; } } else if (c == 0) /* if end of input */ terminate = 1; for (i = 0; i < n; i++) /* send 1 or 2 chars */ { /* producer part */ pthread_mutex_lock(&outputMutex); /* wait until there's space to save a character*/ while (outputLen >= BUFFER_SIZE) pthread_cond_wait(&outputProduce, &outputMutex); outputBuffer[outputEnd++] = buffer[i]; /* save read char in buffer*/ outputEnd %= BUFFER_SIZE; outputLen++; pthread_cond_signal(&outputConsume); /* wake consumer */ /* release the lock */ pthread_mutex_unlock(&outputMutex); } n = 0; } pthread_exit(NULL); /* terminate thread */ } void *outputThread(void *arg) { char printBuffer[81]; int printChars = 0; int c; while(1) { pthread_mutex_lock(&outputMutex); /* wait until there's a character*/ while (outputLen == 0) pthread_cond_wait(&outputConsume, &outputMutex); c = outputBuffer[outputStart++]; /* read char from input buffer*/ outputStart %= BUFFER_SIZE; outputLen--; pthread_cond_signal(&outputProduce); /* wake producer */ /* release the lock */ pthread_mutex_unlock(&outputMutex); if (c == 0) /* input has ended */ break; /* terminate thread */ printBuffer[printChars++] = c; /* save char in buffer to be printed */ if (printChars >= 80) /* print every 80 characters */ { printBuffer[printChars] = '\n'; write(STDOUT_FILENO, printBuffer, 81); printChars = 0; } } pthread_exit(NULL); /* terminate thread */ } int main() { int i; int status; pthread_t threads[4]; /* array for holding the 4 threads */ pthread_mutex_init(&inputMutex, NULL); pthread_cond_init (&inputConsume, NULL); pthread_cond_init (&inputProduce, NULL); pthread_mutex_init(&replacementMutex, NULL); pthread_cond_init (&replacementConsume, NULL); pthread_cond_init (&replacementProduce, NULL); pthread_mutex_init(&outputMutex, NULL); pthread_cond_init (&outputConsume, NULL); pthread_cond_init (&outputProduce, NULL); /* create input thread */ status = pthread_create(&threads[0], NULL, inputThread, NULL); if (status) /* if error in creating thread */ { printf("Error creating input thread\n"); return 1; /* exit with error */ } /* create line separator thread */ status = pthread_create(&threads[1], NULL, lineSeparatorThread, NULL); if (status) /* if error in creating thread */ { printf("Error creating line separator thread\n"); return 1; /* exit with error */ } /* create plus sign thread */ status = pthread_create(&threads[2], NULL, plusSignThread, NULL); if (status) /* if error in creating thread */ { printf("Error creating plus sign thread\n"); return 1; /* exit with error */ } /* create output thread */ status = pthread_create(&threads[3], NULL, outputThread, NULL); if (status) /* if error in creating thread */ { printf("Error creating output thread\n"); return 1; /* exit with error */ } /* wait for all threads to complete */ for (i = 0; i < 4; i++) pthread_join(threads[i], NULL); /* destroy mutexes and conditions */ pthread_mutex_destroy(&inputMutex); pthread_cond_destroy(&inputProduce); pthread_cond_destroy(&inputConsume); pthread_mutex_destroy(&replacementMutex); pthread_cond_destroy(&replacementProduce); pthread_cond_destroy(&replacementConsume); pthread_mutex_destroy(&outputMutex); pthread_cond_destroy(&outputProduce); pthread_cond_destroy(&outputConsume); return 0; }

Related Samples

Access our free C assignment samples to enhance your understanding and skills. These examples provide clear, detailed solutions, simplifying complex concepts and boosting your academic performance.