C++:设计一个线程安全的队列

news/2024/6/18 21:38:39 标签: 多线程, c++, 单元测试, ThreadSanitizer, pthread

文章目录

    • 1. 目的
    • 2. 实现?验证!
    • 3. 实现 Queue 类的方案

在这里插入图片描述

1. 目的

串行的程序只用到单个 CPU 核心, 希望加速整个程序, 考虑使用多线程加速。典型情况下可以找到生产者、消费者,两个角色之间通过队列进行数据交互:

  • 生产者负责把数据放入队列Q
  • 消费者负责从队列取出数据Q

要求队列是线程安全的,即:不能有读写冲突等。

2. 实现?验证!

这里并不给出具体实现, 主要原因是网络上有太多的“实现”,也许很强大,但是否正确则有待验证,反倒是怎样验证正确性,总是被忽略:

  • 新手小白,或者“算法工程师”们,往往没怎么写过合格的单元测试
  • 验证也许只是粗略跑一下,Thread Sanitizer 这样的有力武器没有被用上

makefile

我是在 Linux 下验证的, 用的 makefile 如下, 重点是 tsan 的设定, 以及 gtest 的配置:

SANITIZER_OPTION=-fsanitize=thread -fno-omit-frame-pointer
#SANITIZER_OPTION=

all:
	clang++ test_queue.cpp -I. -g `pkg-config --cflags --libs gtest gtest_main` ${SANITIZER_OPTION}

Queue 类的 public 成员

template<typename T>
class Queue
{
public:
    Queue(unsigned int max_size = 0);
    ~Queue();
    void push(const T& elem);
    T pop();
    bool empty();
    size_t size();

其中:

  • Queue是模板类,这样可以支持任意数据类型作为队列元素(但队列中所有元素类型需要相同)
  • 所有成员函数都不能是 const 的, 尤其是 empty 和 size 函数, 原因是当前线程调用它们时,其他线程可能立即改变队列成员,需要 mutex 锁住, 对于 mutex 的操作导致函数不再是 const 的
  • 支持设定队列最大元素数量,如果没指定, 看似用0,实际表示“无限”

单元测试

如下是基于 GoogleTest 和 Queue 的 ADT 给出的单元测试代码。
如果你基于上述 Queue 类的定义, 能通过如下单元测试, 那么程序的正确性应该说比较高了。这部分代码的价值比 Queue 本身的价值要更高, 但往往被人们忽略:

#include <gtest/gtest.h>
#include <digimon/queue.hpp>
#include <shadow/queue.hpp>
#include <unistd.h>

using namespace digimon;
//using namespace Shadow;

TEST(Queue, SingleThread)
{
    Queue<int> q;
    EXPECT_EQ(q.empty(), true);

    q.push(1);
    q.push(2);
    EXPECT_EQ(q.empty(), false);
    
    int x = q.pop();
    EXPECT_EQ(x, 1);

    x = q.pop();
    EXPECT_EQ(x, 2);
}

class ThreadData
{
public:
    ThreadData() {}
    ThreadData(Queue<int>* _q, int _start, int _end) :
        q(_q), start(_start), end(_end)
    {}

public:
    Queue<int>* q;
    int start;
    int end;
};

class ConsumerThreadData
{
public:
    ConsumerThreadData(Queue<int>* _q, int _start, int _end) :
        q(_q), start(_start), end(_end), sum(0)
    {
        pthread_mutex_init(&mutex, NULL);
    }
    ~ConsumerThreadData()
    {
        pthread_mutex_destroy(&mutex);
    }

public:
    Queue<int>* q;
    int start;
    int end;
    int sum;
    pthread_mutex_t mutex;
};

static void* producer(void* _thread_data)
{
    ThreadData* thread_data = (ThreadData*)_thread_data;

    for (int i = thread_data->start; i < thread_data->end; i++)
    {
        thread_data->q->push(i);
    }

    return NULL;
}

TEST(Queue, MultiThread_MultiProducer)
{
    Queue<int> q;

    pthread_t t1;
    ThreadData thread_data1(&q, 0, 10);
    pthread_create(&t1, NULL, producer, (void*)&thread_data1);

    pthread_t t2;
    ThreadData thread_data2(&q, 0, 10);
    pthread_create(&t2, NULL, producer, (void*)&thread_data2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    EXPECT_EQ(q.empty(), false);
    EXPECT_EQ(q.size(), 20);

    int sum = 0;
    while (!q.empty())
    {
        int x = q.pop();
        sum += x;
    }
    int expected_sum = 90;
    EXPECT_EQ(expected_sum, sum);
}

static void* consumer(void* _thread_data)
{
    ConsumerThreadData* thread_data = (ConsumerThreadData*)_thread_data;

    for (int i = thread_data->start; i < thread_data->end; i++)
    {
        int x = thread_data->q->pop();
        thread_data->sum += x;
        std::cout << x << std::endl;
    }

    return NULL;
}

TEST(Queue, MultiThread_SingleProducer_SingleConsumer)
{
    Queue<int> q;

    pthread_t t1;
    ThreadData thread_data1(&q, 0, 10);
    pthread_create(&t1, NULL, producer, (void*)&thread_data1);

    pthread_t t2;
    ConsumerThreadData thread_data2(&q, 0, 10);
    pthread_create(&t2, NULL, consumer, (void*)&thread_data2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    EXPECT_EQ(q.empty(), true);
    EXPECT_EQ(q.size(), 0);
}

static void* producer_slow(void* _thread_data)
{
    ThreadData* thread_data = (ThreadData*)_thread_data;

    for (int i = thread_data->start; i < thread_data->end; i++)
    {
        sleep(1);
        thread_data->q->push(i);
    }

    return NULL;
}

TEST(Queue, MultiThread_Consumer_Meaningless_Grab_Mutex)
{
    Queue<int> q;

    pthread_t t1;
    ThreadData thread_data1(&q, 0, 3);
    pthread_create(&t1, NULL, producer_slow, (void*)&thread_data1);

    pthread_t t2;
    ConsumerThreadData thread_data2(&q, 0, 3);
    pthread_create(&t2, NULL, consumer, (void*)&thread_data2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    EXPECT_EQ(q.empty(), true);
    EXPECT_EQ(q.size(), 0);
    EXPECT_EQ(thread_data2.sum, 3);
}

static void* consumer_slow(void* _thread_data)
{
    ConsumerThreadData* thread_data = (ConsumerThreadData*)_thread_data;

    for (int i = thread_data->start; i < thread_data->end; i++)
    {
        EXPECT_EQ(thread_data->q->size(), 5);
        int x = thread_data->q->pop();
        thread_data->sum += x;
        sleep(1);
        std::cout << x << std::endl;
    }

    return NULL;
}

TEST(Queue, LimitedQueueSize)
{
    Queue<int> q(5);

    pthread_t t1;
    ThreadData thread_data1(&q, 0, 10);
    pthread_create(&t1, NULL, producer, (void*)&thread_data1);

    pthread_t t2;
    ConsumerThreadData thread_data2(&q, 0, 5);
    pthread_create(&t2, NULL, consumer_slow, (void*)&thread_data2);
    
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    EXPECT_EQ(q.empty(), false);
    EXPECT_EQ(q.size(), 5);
}

3. 实现 Queue 类的方案

可以基于 C++ 11 实现, 不过据说 C++11 的 thread 在华为手机上有问题,传闻中 pthread 能消除问题;
于是乎还有另一个选择: C++03 + pthread 实现 Queue 类。

Windows 平台上可以使用 windows-pthreads 库, 它是基于 Windows threads 模拟实现了 PThread 和 Semaphore 接口。(完)


http://www.niftyadmin.cn/n/319221.html

相关文章

how2heap-fastbin_dup.c

不同libc版本的fastbin_dup.c源码有点小区别&#xff1a;主要是有tcache的&#xff0c;需要先填充 以下为有tcache的源码示例&#xff1a; #include <stdio.h> #include <stdlib.h> #include <assert.h>int main() {setbuf(stdout, NULL);printf("This…

双层优化入门(3)—基于智能优化算法的求解方法(附matlab代码)

前面两篇博客介绍了双层优化的基本原理和使用KKT条件求解双层优化的方法&#xff0c;以及使用yalmip工具箱求解双层优化的方法&#xff1a; 双层优化入门(1)—基本原理与求解方法 双层优化入门(2)—基于yalmip的双层优化求解(附matlab代码) 除了数学规划方法之外&#xff0c;…

无标签背景图(负样本)的拼图代码

训练目标检测模型有个很令人头疼的问题&#xff0c;就是有些特征与要训练的特征较为相似的背景区域也被误检出来&#xff08;作为本应不该检测出来的负样本却被误检出为正样本的FP&#xff09;。 根据这一问题的解决办法&#xff0c;除了可以对正样本特征较为模糊或者有歧义的样…

七天从零实现Web框架Gee - 3

之前&#xff0c;我们用了一个非常简单的map结构存储了路由表&#xff0c;使用map存储键值对&#xff0c;索引非常高效&#xff0c;但是有一个弊端&#xff0c;键值对的存储的方式&#xff0c;只能用来索引静态路由。那如果我们想支持类似于/hello/:name这样的动态路由怎么办呢…

Intel SGX学习笔记(2):用数组向Enclave传递5个数实现自增操作

写在前面 1、实现一个简单的Intel SGX的应用&#xff1a;非安全区定义初始化一个数组&#xff0c;数组里面存储5个数&#xff0c;然后向安全区&#xff08;enclave&#xff09;传入&#xff0c;在安全区中进行加减乘除&#xff0c;然后返回。 2、Intel SGX开发初学整体思路&a…

pyinstaller打包为.exe过程中的问题与解决方法

目录 问题一&#xff1a;.exe文件过大问题二&#xff1a;pyinstaller与opencv-python版本不兼容问题三&#xff1a;打开文件时提示***.pyd文件已存在问题四&#xff1a;pyinstaller打包时提示UPX is not available.另&#xff1a;查看CUDA成功配置的方法 pyinsatller -F -w mai…

这个抓包工具太强了,科来网络分析系统强烈推荐

一直以来抓包工具&#xff0c;都推荐和使用wireshark&#xff0c;简单好用。最近发现一款更强大好用的网络分析工具&#xff0c;科来网络分析系统。且技术交流版是完全免费的&#xff0c;无需注册激活。这里强烈推荐和分享给大家。这可是个网络报文分析和监控神器。有多强大&am…

使用Zookeeper对集群节点进行管理

1 相关概念 Zookeeper是Hadoop生态系统中分布式的服务管理框架&#xff0c;负责存储和管理集群中的公共数据如配置信息等&#xff0c;并且对节点进行注册和通知管理。它具有如下几个特点&#xff1a; 集群由一个领导者&#xff08;Leader&#xff09;&#xff0c;多个跟随者&a…