jeudi 3 juillet 2014

Make exact copy of file, multi threaded in C. Debugging


Vote count:

0




I am working on a multi threaded program that creates an exact copy of a file. I thought I was done, but after testing it some more I found some bugs.


The code has two different groups of threads, one for reading a byte from the source file then writing that byte to a circular buffer (INthreads), and the other for reading from the buffer then writing to the copy file (OUTthreads).


The size of the circular buffer, the number of IN/OUT threads, the src, and copy files are all given as command line args.


ex: cpy #IN #OUT srcFilePath copyfileName buffSize


Here is a simplified version of my program:



#include <semaphore.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>

#define TEN_MILLIS_IN_NANOS 10000000

// Circular buffer items.
typedef struct {
char data ;
off_t offset ;
} BufferItem ;

// Circular buffer
typedef struct {
int startInd; // Index of first element added to buffer.
int lastInd; // Index of most recent element added to buffer.
int size; // Number of elements in circular buffer.
BufferItem *cBuffItems; // Circular buffer items.
} CircularBuffer;

// Add item to the tail, overlapping if cBuff is full.
void addItem(CircularBuffer *cBuff, BufferItem *cbItem) {
cBuff->cBuffItems[cBuff->lastInd] = *cbItem;
cBuff->lastInd = ( ((cBuff->lastInd) + 1) % cBuff->size);
if (cBuff->lastInd == cBuff->startInd)
{
cBuff->startInd = (cBuff->startInd + 1) % cBuff->size;
}
}

void initializeBuffer(CircularBuffer *cBuff, int size) {
cBuff->cBuffItems = calloc(size + 1, sizeof(BufferItem));
cBuff->size = size + 1;
cBuff->startInd = 0;
cBuff->lastInd = 0;
}

// Get the item at the current head.
void takeItem(CircularBuffer *cBuff, BufferItem *cbItem) {
*cbItem = cBuff->cBuffItems[cBuff->startInd];
cBuff->startInd = (cBuff->startInd + 1) % cBuff->size;
}

sem_t full, empty; // Semaphores for circular buffer.
pthread_mutex_t sf_mutex; // Source file mutex lock.
pthread_mutex_t cf_mutex; // Copy file mutex lock.
pthread_mutex_t cb_mutex; // Cicular buffer mutex lock.
CircularBuffer cBuff;
int len, INid, OUTid = 0;
int fixedLen;
int processing_OUT = 1; // Lets the OUT threads know when to exit. Set to 0 if all bytes copied to file.
int *inJoin, *outJoin;

void *INthread(void *arg)
{
FILE *srcFile = (FILE*)arg;
int t_id = INid++; // Thread number.
int offs, ch, lastInd;
int processing_IN = 1;
BufferItem result;
struct timespec t;

if(len == 0) // If len hasnt been set by the first IN thread yet.
{
fseek(srcFile, 0, SEEK_END);
len = ftell(srcFile); // Save the length of the srcFile (number of bytes).
fixedLen = len;
rewind(srcFile);
}

t.tv_sec = 0;
t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
nanosleep(&t, NULL);

if(t_id < fixedLen) // Exit thread if this thread id equals/greater than the number of bytes.
while(len > 0) // Go through each byte/char in file.
{

if(len > 0){
fseek(srcFile, 0, SEEK_CUR);
if((offs = ftell(srcFile)) != -1)
result.offset = offs; /* get position of byte in file */
if((ch = fgetc(srcFile)) != EOF)
result.data = ch; /* read byte from file */
len--;
}

// CS - Only one thread can operate on the buffer at a time.
sem_wait(&empty);
pthread_mutex_lock( &cb_mutex );
addItem(&cBuff, &result);
lastInd = cBuff.lastInd - 1;
// printf("ID %d --- offset %d char %c len%d ADDING TO BUFFER\n", t_id, offs, ch, len);
pthread_mutex_unlock( &cb_mutex );
sem_post(&full);

t.tv_sec = 0;
t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
nanosleep(&t, NULL);

}
inJoin[t_id] = 1; // This IN thread is ready to be joined.
pthread_exit(0);
}

void *OUTthread(void *arg)
{
struct timespec t;
t.tv_sec = 0;
t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
nanosleep(&t, NULL);

FILE *targetFile;
BufferItem OUTresult;
int t_id = OUTid++; // This the id of this thread.
int offs, ch, lastInd;
int sValE; // Empty semaphore value.

if(t_id < fixedLen) // Exit thread if this thread id equals/greater than the number of bytes.
while(processing_OUT){

// CS - Only one thread can operate on the buffer at a time.
sem_wait(&full);
pthread_mutex_lock( &cb_mutex );
takeItem(&cBuff, &OUTresult);
lastInd = cBuff.lastInd - 1;
pthread_mutex_unlock( &cb_mutex );
sem_post(&empty);

offs = OUTresult.offset;
ch = OUTresult.data;

// CS - Only one thread can write to the copy file at a time.
pthread_mutex_lock( &cf_mutex );
if (!(targetFile = fopen(arg, "r+"))) {
printf("could not open output file for writing");
}
if (fseek(targetFile, offs, SEEK_SET) == -1) {
fprintf(stderr, "error setting output file position to %u\n",
(unsigned int) offs);
exit(-1);
}
if (fputc(OUTresult.data, targetFile) == EOF) {
fprintf(stderr, "error writing byte %d to output file\n", ch);
exit(-1);
}
fclose(targetFile);
pthread_mutex_unlock( &cf_mutex );

// printf("ID %d From buffer: offset %d char %c\n", t_id, offs, ch);
t.tv_sec = 0;
t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
nanosleep(&t, NULL);

sem_getvalue(&empty, &sValE);

if(sValE == cBuff.size - 1){
processing_OUT = 0;
}

}

outJoin[t_id] = 1; // This OUT thread is ready to be joined.
pthread_exit(0);
}

int threadsFinished(int nIN, int nOUT);

int main( int argc, char *argv[] )
{
clock_t start = clock();
if (argc < 6)
{
printf("Expected 5 arguments, received %d\n", argc - 1);
return 1;
}

int nIN; // Number of IN threads.
int nOUT; // Number of OUT threads.
int bufSize; // Capacity of BufferItems in circular buffer.
char *file; // Pathname of file to be copied.
char *copy; // Name to be given to the copy.
FILE *fp, *cp; // File pointers.

sscanf(argv[1], "%d", &nIN);
sscanf(argv[2], "%d", &nOUT);
sscanf(argv[5], "%d", &bufSize);
file = argv[3];
copy = argv[4];
inJoin = calloc(nIN, sizeof(int));
if(inJoin == NULL){
printf("Error allocating memory");
return 1;
}
outJoin = calloc(nOUT, sizeof(int));
if(outJoin == NULL){
printf("Error allocating memory");
return 1;
}

if (!(fp = fopen(file, "r"))) // Open source file for reading by the IN threads.
{
printf("Could not open source file for reading\n");
return 1;
}
if (!(cp = fopen(copy, "w"))) // Need create copy file, or empty it if it exists.
{
printf("Could not create copy file for writing\n");
return 1;
}
fclose(cp);

pthread_t tidIN[nIN]; // IN thread identifiers.
pthread_attr_t attrIN; // IN thread attributes.
pthread_t tidOUT[nOUT]; // OUT thread identifiers.
pthread_attr_t attrOUT; // OUT thread attributes.

pthread_mutex_init(&sf_mutex, NULL);
pthread_mutex_init(&cf_mutex, NULL);
pthread_mutex_init(&cb_mutex, NULL);
sem_init(&full, 0, 0);
sem_init(&empty, 0, bufSize);

initializeBuffer(&cBuff, bufSize);

int i;
for(i = 0; i < nIN; i++) // Create IN threads
{
pthread_attr_init(&attrIN); // Set the attribute of the thread (default).
pthread_create(&tidIN[i], &attrIN, INthread, fp); // Create thread.
}

int j;
for(j = 0; j < nOUT; j++) // Create OUT threads
{
pthread_attr_init(&attrOUT); // Set the attribute of the thread (default).
pthread_create(&tidOUT[j], &attrOUT, OUTthread, copy); // Create thread.
}

while(!threadsFinished(nIN, nOUT)){
// Do nothing, wait for threads to finish.
}

// Join all IN threads.
for(i = 0; i < nIN; i++)
{
pthread_join(tidIN[i],NULL);
}
// Join all OUT threads.
for(j = 0; j < nOUT; j++)
{
pthread_join(tidOUT[j],NULL);
}

free(inJoin);
free(outJoin);

clock_t end = clock();
printf("Elapsed time: %.2f seconds\n", (double)(end - start) / CLOCKS_PER_SEC);
return 0;
}// Main.

int threadsFinished(int nIN, int nOUT)
{
int i;
for(i = 0; i < nIN; i++){
if(inJoin[i] == 0){
return 0;
}
}
for(i = 0; i < nOUT; i++){
if(outJoin[i] == 0)
return 0;
}
return 1;
}


I dont know if im structuring the critical sections incorrectly. I believe I have a problem in the INthread. I could be wrong, but from my testing it seems that everything works as expected if the number of IN threads and the number of OUT threads are less than the number of bytes in the file. Since the maximum IN or OUT threads needed equals the number of bytes in the file, I figured I could just exit the thread if its thread id is greater than the number of bytes in the file. I done know what I am doing wrong though.


I think OUTthread() and main() are fine, but I included everything needed to compile in case someone wants to try it.



asked 1 min ago






Aucun commentaire:

Enregistrer un commentaire