/* * buffer-copy.c * Copies data efficiently from its stdin to its stdout. * Works by spawning two subprocesses, one for reading, and one for * writing. That way, both read and write operations can be run in * paralell, rather than sequentially. * At ltnb, we use this send entire disks over our network for backup * purposes. A 10 Gig disk can be backed up in 20 minutes that way * * Usage: buffer-copy [-i inputbufferSize] [-o outputbufferSize] * [-s sleepDelay] [-a amount] [-t targetFile] [-v] #include #include #include #include #include #include #include #include #include #define NBUF 10 #define DEBUG 0 char *progname; char hostname[100]; int verbose=0; int sleeptime = 100; int ofd = 1; struct sem { volatile int readPtr; volatile int writePtr; volatile int readAmount[NBUF]; }; void usage(void) __attribute__ ((noreturn)); void usage(void) { fprintf(stderr, "Usage: %s [-i inputbufferSize] [-o outputbufferSize] [-s sleepDelay] [-a amount] [-t targetFile] [-v] \n", progname); exit(1); } unsigned int size(char *arg) { unsigned int s; char *ptr; s = strtoul(arg, &ptr, 0); switch(*ptr) { case 'k': return s * 1024; case '\0': case 'b': return s; case 'm': return s * 1024 * 1024; default: fprintf(stderr, "Bad quantitiy %s\n", arg); exit(1); } } void writer(struct sem *sem, char *buffer, int obufsize, int blocksize) { unsigned int amount; int len; int offset; int sel; int wsize; while(sem->writePtr >= sem->readPtr) { usleep(sleeptime); } if(verbose /*&& sem->writePtr % 10 == 0*/) fprintf (stderr, "\r%06d", sem->writePtr); fflush(stderr); #if DEBUG fprintf(stderr, "Writer: w=%d r=%d\n", sem->writePtr, sem->readPtr); #endif sel = sem->writePtr % NBUF; amount = sem->readAmount[sel]; #if DEBUG fprintf(stderr, "Amount = %d\n", amount); #endif if (! amount) { /* reader signalled us EOF */ fprintf(stderr, "\n"); exit(0); } offset = sel * blocksize; while(amount) { wsize = amount; if (wsize > obufsize) wsize = obufsize; len = write(ofd, buffer + offset, wsize); if (len < 0) { perror("write"); sem->writePtr = 0; exit(1); } amount -= len; offset += len; } sem->writePtr++; } void reader(struct sem *sem, char *buffer, int ibufsize, int blocksize) { int offset; int sel; char *lbuffer; int len; int rsize; while(sem->readPtr >= sem->writePtr + NBUF && sem->writePtr) { usleep(sleeptime); } if(!sem->writePtr) { /* writer signalled us EOF */ fprintf(stderr, "writer signalled EOF\n"); exit(0); } sel = sem->readPtr % NBUF; lbuffer = buffer + blocksize * sel; offset = 0; while(offset < blocksize) { rsize = blocksize - offset; if (rsize > ibufsize) rsize = ibufsize; len = read(0, lbuffer + offset, rsize); if (len == 0) break; if (len < 0) { perror("read"); sem->readAmount[sel] = 0; sem->readPtr ++; exit(1); } offset += len; } #if DEBUG fprintf(stderr, "Read %04x bytes, rptr=%d wptr=%d %02x %02x\n", offset, sem->readPtr, sem->writePtr, (unsigned char) lbuffer[0], (unsigned char) lbuffer[1]); #endif sem->readAmount[sel] = offset; sem->readPtr ++; #if DEBUG fprintf(stderr, "Reader: offset=%d ptr=%d\n", offset, sem->readPtr); #endif if(offset == 0) exit(0); } int main(int argc, char **argv) { int rpid, wpid; int shmid = -1; int zero = open("/dev/zero", O_RDWR); int ibufsize = 0; int obufsize = 0; int blocksize = 0; char *ptr; struct sem *sem; char *buffer; int status; int amount=0; progname = argv[0]; if (zero < 0) { perror("open /dev/zero"); exit(1); } gethostname(hostname, 99); while(1) { switch(getopt(argc, argv, "vi:o:a:s:t:")) { case 'v': verbose = 1; break; case 'a': amount = size(optarg); break; case 'i': ibufsize = size(optarg); break; case 'o': obufsize = size(optarg); break; case 's': sleeptime = size(optarg); break; case 't': ofd = open(optarg, O_WRONLY | O_SYNC); if(ofd < 0 ) { perror("open"); } break; case '?': fprintf(stderr, "Unknown flag\n"); usage(); case -1: goto end_while; } } end_while: if (ibufsize == 0) { fprintf(stderr, "Missing input buffer size\n"); exit(1); } if (obufsize == 0) { fprintf(stderr, "Missing output buffer size\n"); exit(1); } if (ibufsize > obufsize) { blocksize = ibufsize; } else { blocksize = obufsize; } fprintf(stderr, "ibuf=%04x obuf=%04x blk=%04x\n", ibufsize, obufsize, blocksize); shmid = shmget(0, blocksize * NBUF + 4096, IPC_CREAT | 0600); if (shmid < 0) { perror("shmget"); exit(1); } ptr = shmat(shmid, 0, 0); /* ptr = mmap(0, 4096 + 2 * blocksize, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, zero, 0); */ if ((int) ptr == -1 || ptr == 0) { perror("mmap"); exit(1); } buffer = ptr + 4096; sem = (struct sem *) ptr; sem->readPtr = 1; sem->writePtr = 1; switch( (rpid=fork()) ) { case 0: /* the child */ if (amount) { int i; for(i=0; i < amount ; i++) { reader(sem, buffer, ibufsize, blocksize); } sem->readAmount[sem->readPtr % NBUF] = 0; sem->readPtr++; } else { while(1) { reader(sem, buffer, ibufsize, blocksize); } } exit(0); case -1: /* an error */ perror("fork"); exit(1); default: /* the father */ } switch( (wpid=fork()) ) { case 0: /* the child */ while(1) { writer(sem, buffer, obufsize, blocksize); } exit(0); case -1: /* an error */ perror("fork"); exit(1); default: /* the father */ } waitpid(rpid, &status, 0); fprintf(stderr, "Reader %s exited %d\n", hostname, WEXITSTATUS(status)); fprintf(stderr, "%d %d %d\n", sem->readPtr, sem->writePtr, sem->readAmount[sem->readPtr % NBUF]); waitpid(wpid, &status, 0); fprintf(stderr, "Writer %s exited %d\n", hostname, WEXITSTATUS(status)); if(shmctl(shmid, IPC_RMID, 0)) { perror("remove shared segment"); exit(1); } exit(0); }