Producer and Consumer Problem

This is one of the most common problems in Concurrency. We studied this in SE350.

Also example from cs343 https://student.cs.uwaterloo.ca/~cs343/examples/FullProdCons.cc

Problem setup:

  • One or more producers are generating data and placing these in a bufferer
  • A single consumer is taking items out of the buffer one at time
  • Only one producer or consumer may access the buffer at any one time

Producer

while (true) {
  /* Produce item v */
  b[in] = v;
  in++;
}

Consumer

while (true) {
  while (in < out) /* Do nothing */;
    w = b[out];
    out++;
    /* Consume item w */
}
  • wait(): Decreases the semaphore’s value by 1 if it’s greater than zero, blocking otherwise until it becomes positive.
  • signal(): Increases the semaphore’s value by 1, potentially unblocking a waiting process.

Ways to solve:

A solution using Binary Semaphores:

/* program producerconsumer */
int n;
binary_semaphore s = 1, delay = 0;
 
void producer() {
    while (true) {
        produce();
        semWaitB(s);
        append();
        n++;
        if (n==1) semSignalB(delay);
        semSignalB(s);
    }
}
 
void consumer() {
    int m; /* a local variable */
    semWaitB(delay);
    while (true) {
        semWaitB(s);
        take();
        n--;
        m = n;
        semSignalB(s);
        consume();
        if (m==0) semWaitB(delay);
    }
}
 
void main() {
    n = 0;
    parbegin (producer, consumer);
}
  • I’m confused why you need to check n == 1 or m == 0 ?

Naa, binary semaphores are confusing

/* program producerconsumer */
semaphore n = 0, s = 1;
void producer() {
    while (true) {
        produce();
        semWait(s);
        append();
        semSignal(s);
        semSignal(n);
    }
}
 
void consumer() {
    while (true) {
        semWait(n);
        semWait(s);
        take();
        semSignal(s);
        consume();
    }
}
 
void main() {
    parbegin (producer, consumer);
}

Why are there 2 different semaphore signals?? ( n and s)

  • s is to ensure Mutual Exclusion and makes sense to me (initially 1)
  • n is the ensure synchronization, specifically the number of items available (initially 0)

Consider the scenario where the consumer runs faster than the producer. If you don’t have the n semaphore, then you might end up with the consumer consuming more than what the queue actually has.

Bounded Buffer

/* program boundedbuffer */
const int sizeofbuffer = /* buffer size */;
semaphore s = 1, n = 0, e = sizeofbuffer;
void producer() {
    while (true) {
        produce();
        semWait(e);
        semWait(s);
        append();
        semSignal(s);
        semSignal(n);
    }
}
 
void consumer() {
    while (true) {
        semWait(n);
        semWait(s);
        take();
        semSignal(s);
        semSignal(e);
        consume();
    }
}
 
void main() {
    parbegin (producer, consumer);
}

C++

How to solve? Use the condition_variable.

https://www.geeksforgeeks.org/producer-consumer-problem-using-semaphores-set-1/

CS343

From https://student.cs.uwaterloo.ca/~cs343/examples/FullProdCons.cc

#include <iostream>
using namespace std;
 
_Coroutine Cons;					// Pre-declaration.
 
// Basic structure of the producer full coroutine.
_Coroutine Prod {
    Cons *c;						// Address of partner coroutine.
    int   N;						// Number of items to produce.
    int   money;					// Payment received.
    int   receipt;					// Number of returned receipt.
 
    void  main();					// Main body of the coroutine.
  public:
    int  payment(int money);				// Transfers payment in and returns receipt.
    void start(int N, Cons *c);				// Initializes the coroutine.
}; // Prod
 
// Basic structure of the consumer full coroutine.
_Coroutine Cons {
    Prod &p;						// Address of partner coroutine.
    int   p1, p2;					// Items to be consumed.
    int   status;					// Marks a successful delivery.
    int   done;						// Termination flag.
 
    void main();					// Main body of the coroutine.
  public:
    Cons(Prod &p) : p(p), done(0) {}			// Constructor.
    int delivery(int p1, int p2);			// Delivers the two items to be consumed.
    void stop();					// Used to terminate the coroutine.
}; // Cons
 
void Prod::main() {
    int i;						// Loop counter.
    int p1, p2;						// Items produced by random # generator.
    int status;						// Status of delivery.
 
    // 1st resume starts here.
    for (i = 1; i <= N; i += 1) {
	p1 = random() % 100;				// Generate p1 and p2.
	p2 = random() % 100;
	cout << "delivers:" << p1 << ", " << p2 << endl;
	status = c->delivery(p1, p2);
	cout << " gets status:" << status << endl;
	receipt += 1;
    } // for
    cout << "Prod stops" << endl;
    c->stop();
} // Prod::main
 
int Prod::payment(int money) {
    Prod::money = money;
    cout << " gets payment $" << money << endl;
    resume();						// Restart prod in Cons::delivery
    return receipt;
} // Prod::payment
 
void Prod::start(int N, Cons *c) {
    Prod::N = N;
    Prod::c = c;
    receipt = 0;
    resume();						// Restart Prod::main().
} // Prod::start
 
void Cons::main() {
    int money = 1;					// Amount to pay for the delivery.
    int receipt;					// Receipt received for payment.
    status = 0;
 
    // 1st resume starts here.
    for (;;) {
      if (done) break;
	cout << "receives:" << p1 << ", " << p2 << " and pays $" << money << endl;
	status += 1;
	receipt = p.payment(money);
	cout << "gets receipt #" << receipt << endl;
	money += 1;
    } // for
    cout << "Cons stops" << endl;
} // Cons::main
 
int Cons::delivery(int p1, int p2) {
    Cons::p1 = p1;
    Cons::p2 = p2;
    resume();						// Restart cons in Cons::main first time
    return status;					// and cons in Prod::payment afterwards.
} // Cons::delivery
 
void Cons::stop() {
    done = 1;
    resume();						// Restart main.
} // Cons::stop
 
// Main body of the program starts up the coroutines and waits for them to terminate.
int main() {
    Prod prod;
    Cons cons(prod);
    prod.start(5, &cons);
} // main