跳至主要內容

生产消费模型

AkashiNeko原创Linux线程生产消费模型

1. 生产者-消费者问题

生产消费模型常用于描述多线程编程中的一种并发模式,也称为生产者-消费者问题。模型涉及到两类并发执行的角色:生产者和消费者。生产者负责生成数据或任务,并将其放入共享的缓冲区中,而消费者则从缓冲区中获取数据或任务进行处理。

生产消费模型涉及到两类并发执行的角色:生产者消费者。生产者负责生成数据或任务,并将其放入共享的缓冲区中,而消费者则从缓冲区中获取数据或任务进行处理。

如果生产者生产数据的速度比消费者消费数据的速度快,那么生产者就必须等待消费者消费完数据才能够继续生产数据,如果消费者消费数据的速度比生产者生产数据快,那么消费者就必须等待生产者生产出数据才能继续消费。所以为了达到生产数据和消费数据之间的平衡,那么就需要一个缓冲区用来存储生产者生产的数据,所以就引入了生产消费模型。

生产消费模型
生产消费模型

生产者-缓冲区-消费者三者的关系类似于工厂-超市-顾客。有若干和工厂和若干的顾客,工厂生产出的商品交给超时进行售卖,顾客从超市购物进行消费。

2. 生产消费模型的实现

需要实现的功能

生产者线程不断向缓冲区(这里使用队列)中存入数据,消费者线程不断从缓冲区中取出数据。当缓冲区为空时,消费者线程应阻塞等待直到缓冲区有数据;同样当缓冲区满时,生产者线程应阻塞等待直到缓冲区有空闲空间。

基于条件变量和阻塞队列

接下来,我们使用条件变量制作阻塞队列,实现生产消费模型。

使用一个全局链式队列作为缓冲区,数据类型为 int,缓冲区最大容量为 5。

std::queue<int> q;            // 缓冲区
const size_t BUFFER_SIZE = 5; // 缓冲区容量

初始化条件变量和互斥量。

pthread_cond_t cond_full = PTHREAD_COND_INITIALIZER;
pthread_cond_t cond_empty = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

生产者线程,在缓冲区没有空闲空间时应被阻塞。如果有空闲空间,则可以放入数据并通知消费者进行消费。

void* producer(void*) {
    // 生产者
    while (true) {
        // 生产者生产数据
        int data = 123;

        pthread_mutex_lock(&mutex);

        while (q.size() == BUFFER_SIZE) {
            // 缓冲区满,等待
            pthread_cond_wait(&cond_empty, &mutex);
        }

        // 向缓冲区放入数据
        q.push(data);
        printf("生产者:向缓冲区放入数据;q.size() = %zu\n", q.size());

        // 通知消费者
        pthread_cond_signal(&cond_full);

        pthread_mutex_unlock(&mutex);
    }
    return nullptr;
}

消费者线程,在缓冲区中没有数据时应被阻塞。如果有数据,则可以取出数据并通知生产者进行生产。

void* consumer(void*) {
    // 消费者
    while (true) {
        pthread_mutex_lock(&mutex);

        while (q.empty()) {
            // 缓冲区空,等待
            pthread_cond_wait(&cond_full, &mutex);
        }
        // 从缓冲区取走数据
        int data = q.front();
        q.pop();
        printf("消费者:从缓冲区读走数据;q.size() = %zu\n", q.size());

        // 通知生产者
        pthread_cond_signal(&cond_empty);

        pthread_mutex_unlock(&mutex);

        // 消费者消费数据
        (void)data;
    }
    return nullptr;
}
完整代码
#include <stdio.h>
#include <pthread.h>
#include <queue>

std::queue<int> q;            // 缓冲区
const size_t BUFFER_SIZE = 5; // 缓冲区容量

pthread_cond_t cond_full = PTHREAD_COND_INITIALIZER;
pthread_cond_t cond_empty = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* producer(void*) {
    // 生产者
    while (true) {
        // 生产者生产数据
        int data = 123;

        pthread_mutex_lock(&mutex);

        while (q.size() == BUFFER_SIZE) {
            // 缓冲区满,等待
            pthread_cond_wait(&cond_empty, &mutex);
        }

        // 向缓冲区放入数据
        q.push(data);
        printf("生产者:向缓冲区放入数据;q.size() = %zu\n", q.size());

        // 通知消费者
        pthread_cond_signal(&cond_full);

        pthread_mutex_unlock(&mutex);
    }
    return nullptr;
}

void* consumer(void*) {
    // 消费者
    while (true) {
        pthread_mutex_lock(&mutex);

        while (q.empty()) {
            // 缓冲区空,等待
            pthread_cond_wait(&cond_full, &mutex);
        }
        // 从缓冲区取出数据
        int data = q.front();
        q.pop();
        printf("消费者:从缓冲区取出数据;q.size() = %zu\n", q.size());

        // 通知生产者
        pthread_cond_signal(&cond_empty);

        pthread_mutex_unlock(&mutex);

        // 消费者消费数据
        (void)data;
    }
    return nullptr;
}

int main() {

    // 创建一批生产者
    const int NPROS = 3;
    pthread_t producers[NPROS];
    for (int i = 0; i < NPROS; i++)
        pthread_create(producers + i, nullptr, producer, nullptr);

    // 创建一批消费者
    const int NCONS = 3;
    pthread_t consumers[NCONS];
    for (int i = 0; i < NCONS; i++)
        pthread_create(consumers + i, nullptr, consumer, nullptr);

    // never
    for (int i = 0; i < NPROS; i++)
        pthread_join(producers[i], nullptr);
    for (int i = 0; i < NCONS; i++)
        pthread_join(consumers[i], nullptr);
    return 0;
}

基于信号量和环形队列

使用信号量和环形队列,实现生产消费模型。

使用一个全局的环形队列作为缓冲区,数据类型为 int,缓冲区最大容量为 5。

const size_t CAPACITY = 5;    // 缓冲区容量
int ring_buf[CAPACITY];       // 缓冲区(环形队列)
size_t front = 0, rear = 0;   // 环形队列头尾指针

初始化信号量和互斥量。

sem_t sem_full, sem_empty;    // 数据计数器,空间计数器
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

...
// 初始化信号量
sem_init(&sem_full, CAPACITY, 0);
sem_init(&sem_empty, CAPACITY, CAPACITY);

生产者线程,在缓冲区没有空闲空间时应被阻塞。如果有空闲空间,则可以放入数据并通知消费者进行消费。

void* producer(void*) {
    // 生产者
    while (true) {
        // 生产者生产数据
        int data = 123;

        // 空间计数器 P 操作
        sem_wait(&sem_empty);

        pthread_mutex_lock(&mutex);
        // 向缓冲区放入数据
        ring_buf[front++] = data;
        if (front == CAPACITY) front = 0;
        printf("生产者:向缓冲区放入数据\n");
        pthread_mutex_unlock(&mutex);

        // 数据计数器 V 操作
        sem_post(&sem_full);

    }
    return nullptr;
}

消费者线程,在缓冲区中没有数据时应被阻塞。如果有数据,则可以取出数据并通知生产者进行生产。

void* consumer(void*) {
    // 消费者
    while (true) {

        // 数据计数器 P 操作
        sem_wait(&sem_full);

        pthread_mutex_lock(&mutex);
        // 从缓冲区取出数据
        int data = ring_buf[rear++];
        if (rear == CAPACITY) rear = 0;
        printf("消费者:从缓冲区取出数据\n");
        pthread_mutex_unlock(&mutex);

        // 空间计数器 V 操作
        sem_post(&sem_empty);

        // 消费者消费数据
        (void)data;
    }
    return nullptr;
}
完整代码
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

const size_t CAPACITY = 5;  // 缓冲区容量
int ring_buf[CAPACITY];     // 缓冲区(环形队列)
size_t front = 0, rear = 0; // 环形队列头尾指针

sem_t sem_full, sem_empty;  // 数据计数器,空间计数器
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* producer(void*) {
    // 生产者
    while (true) {
        // 生产者生产数据
        int data = 123;

        // 空间计数器 P 操作
        sem_wait(&sem_empty);

        pthread_mutex_lock(&mutex);
        // 向缓冲区存入数据
        ring_buf[front++] = data;
        if (front == CAPACITY) front = 0;
        printf("生产者:向缓冲区存入数据\n");
        pthread_mutex_unlock(&mutex);

        // 数据计数器 V 操作
        sem_post(&sem_full);

    }
    return nullptr;
}

void* consumer(void*) {
    // 消费者
    while (true) {

        // 数据计数器 P 操作
        sem_wait(&sem_full);

        pthread_mutex_lock(&mutex);
        // 从缓冲区取走数据
        int data = ring_buf[rear++];
        if (rear == CAPACITY) rear = 0;
        printf("消费者:从缓冲区读走数据\n");
        pthread_mutex_unlock(&mutex);

        // 空间计数器 V 操作
        sem_post(&sem_empty);

        // 消费者消费数据
        (void)data;
    }
    return nullptr;
}

int main() {

    // 初始化空间计数器为 5,数据计数器为 0,上限都为 5
    sem_init(&sem_full, CAPACITY, 0);
    sem_init(&sem_empty, CAPACITY, CAPACITY);

    // 创建一批生产者
    const int NPROS = 3;
    pthread_t producers[NPROS];
    for (int i = 0; i < NPROS; i++)
        pthread_create(producers + i, nullptr, producer, nullptr);

    // 创建一批消费者
    const int NCONS = 3;
    pthread_t consumers[NCONS];
    for (int i = 0; i < NCONS; i++)
        pthread_create(consumers + i, nullptr, consumer, nullptr);

    // never
    for (int i = 0; i < NPROS; i++)
        pthread_join(producers[i], nullptr);
    for (int i = 0; i < NCONS; i++)
        pthread_join(consumers[i], nullptr);
    return 0;

    sem_destroy(&sem_full);
    sem_destroy(&sem_empty);
}