本文源码

改造 ThreadTask 为异步

在上文 [C++ 造轮子] 基于 pthread 线程池 中,我们实现了基于任务队列的线程池。但对于任务执行单元ThreadTask类封装的并不好,还有如下可改进之处

  • 不支持直接传入自定义函数对象构造,只能写继承类
  • 缺少回调函数的支持
  • 无法随时获取结果

本文将对其进行优化,封装为一个真正意义上的异步任务类。注意本文使用的仍然是pthread为基础的线程库.

异步任务类的特征是非阻塞调用接口 , 随时取得函数结果支持回调函数。回调函数在本文中定义为以任务结果为入参的函数。这在很多语言标准里都实现了:

  • C++11 的 std::futurestd::async
  • Python 的 asyncio.Taskconcurrent.future
  • Javascript 的 promise

相信熟悉 js 的朋友对下段代码十分熟悉

var canDel = false;
$.ajax({
    url: web.contextPath(),
    dataType: "json",
    async:false,
    data: postData
    }).done(function(result) { 
        canDel= result.canDel;
});

js 的ajax请求的回调注册就是异步场景的典型应用,我们希望把涉及到 IO 的事件设计为非阻塞接口,若需要对其结果进行处理,则可以为其注册回调函数。

接下来在 C++ 中,考虑实现异步任务类的两个关键:

  • 如何非阻塞
  • 如何制定任务函数和回调函数
  • 如何监控任务完成情况

如何 non-blocking

要求任务启动函数不能阻塞,主要可采用两种办法

  • 多线程,将任务放入后台线程,由操作系统调度执行
  • 使用用户线程 (协程),用户自定义事件循环调度,适用于 IO 任务

对于计算类型的任务只能采用方法 1 才能实现异步,单线程的协程调度只适合为 IO 任务让出时间片。本文先不谈协程的方法,基于之前的线程池,使用方法 1 设计异步任务类。

执行函数与回调函数

之前的ThreadTask的执行逻辑是实现一个子类,在Run()函数中定义,这样无疑会增大开发量,现在我们希望ThreadTask能接受一个函数对象 (function object) 作为执行函数和回调函数。我们先使用 C 风格的函数指针实现这两个函数:

typedef const void* (*ProcessFunc)(const void *);
typedef void (*CallbackFunc)(void*, const void*);

在不使用模板的前提,为尽可能地实现泛化,函数指针需要使用可以指向任意类型的void*, 而两个函数参数的关系通过const表明语义。

  • const void* (*ProcessFunc)(const void *), 接受参数不改变其内容,且返回结果也不能被改变;
  • void (*CallbackFunc)(void*, const void*), 其接受的 const void* 来自 ProcessFunc 的结果,他的结果由传入的可变 void* 指针传出,函数本身不返回结果

有一种 C++ 编程范式不喜欢让函数返回对象,喜欢使用 bool func (T& changed, const T& changed) 表示函数更改语义。本节先遵循此风格设计。

当然上述两个是 C 风格的函数指针,你也可以借助引用设计为 C++ 风格的函数。基于函数指针的设计我们修改下ThreadTask:

class ThreadTask
{
    private:
        friend class PthreadPool;
        ProcessFunc m_ProcessFunc;
        CallbackFunc m_CallbackFunc;
        const void * m_Arg; // ProcessFunc Arguements
        const void * m_Res; // ProcessFunc Results
        void* m_CallbackArg; // CallBackFunc Arguements

    protected:
        void* m_pData;
        inline void initTask()
        {
            m_bFinished = false;
            m_pData = nullptr;
        }

    public:
        bool m_bFinished;

        ThreadTask(ProcessFunc process, const void* arg, 
                   CallbackFunc callback, void* call_arg)
            : m_ProcessFunc(process),
              m_Arg(arg),
              m_CallbackFunc(callback),
              m_CallbackArg(call_arg)
        {
            initTask();
        }

        virtual ~ThreadTask()
        {
            initTask();
        }

        void Run()
        {
            m_Res = m_ProcessFunc(m_Arg);
            m_CallbackFunc(m_CallbackArg, m_Res);   
        }

        const void* Result()
        {
            if (WaitTask() == 0)
            {
                return m_Res;
            }
            return NULL;
        }

        inline void SetData(void* data)
        {
            m_pData = data;
        }
        // Blocking Result 
        inline int WaitTask()
        {
            if ( !m_bFinished )
            {
                while ( !m_bFinished )
                {
                    nanosleep(&ts, NULL);
                }
            }
            return 0;
        }      
};

线程池的调度逻辑不变,我们写一个测试用例

// ProcessFunc
const char* test_func(const char* str)
{
    console_info("In async task:");
    sleep(1);
    return str;
}
// CallBack Func
void test_callback(char* s, const char* cs)
{
    console_info("In Callback task: last Res is {}", std::string{cs});
    if ( s == NULL)
    {
        console_error("Call back pointer null!");
        return;
    }
    memcpy(s, cs, 10);
}

主线程调用:

#define MAX_TASK_NUM 5

// Initialize a ThreadPool
ThreadPool* pool = new PthreadPool(5);

// task and result container
std::vector<ThreadTask*> vTasks;
std::vector<char*> vCallRes;

// ProcessFunc arguments
const char* proc_c = "hello!~";

// Add Async Tasks into pool
for ( int ind = 0; ind < MAX_TASK_NUM; ++ind)
{
    char* c = new char(10);

    // The Function Need to Type Cast
    ThreadTask* task = new ThreadTask((ProcessFunc)test_func, proc_c, 
                            (CallbackFunc)test_callback, c);
    vTasks.push_back(task);
    vCallRes.push_back(c);
    pool->AddTask(task);
} 

/*
    Here you can do something inrevelent to tasks
*/

// Here we need Tasks' results
for ( int i = 0; i < vTasks.size(); ++i)
{
    auto pt = vTasks[i];
    auto call_c = vCallRes[i];

    // The result need also type-cast
    console_info("Task Res: {}", string{ (const char*)pt->Result() });
    console_info("Callback Res: {}", string{ (const char*)call_c});
}

clock_t end = clock();
delete pool;
console_info("Time Consume: {}", (double)(end - start) / CLOCKS_PER_SEC);

相信你看出来了,完全是 C 风格的语法,需要频繁类型转换。下节一个使用现代 C++ 写法的线程池可以解决此问题。

更加现代的 C++ 异步任务

当我们接纳模板后,可以构造一个std::function模板来表示所有可能传进来的函数:

  • 普通函数
  • lambda 函数
  • 成员函数
  • 静态成员函数
  • 函数对象(重载了 operator () 的类)

且参数可以是任意类型、任意数量的。同时借助std::future可以免去设计ThreadTask的工作,这是一个表示未来的对象,包含可执行的函数对象,能保存结果和状态。

现代 ThreadPool 类设计

以下实现来自 这里 , 原版作者的注释非常简略,下面按照我的理解解释了一下作者的意图。

class ThreadPool
{
    public:
        ThreadPool(size_t);

        /* 
            AddTask, F and Args... can represent any
            function type; Then `auto->` implicate the 
            return_type of the function which should be 
            the type of `std::future<>`;

            The `std::result_of` is similar to `decltype()`.
        */
        template<class F, class... Args>
        auto enqueue(F&& f, Args&&... args)
            -> std::future<typename std::result_of<F(Args...)>::type>;
        ~ThreadPool();

    private:
        std::vector<std::thread> workers;
        // Taks queue
        std::queue<std::function<void()>> tasks; 

        std::mutex queue_mutex;
        std::condition_variable condition;
        bool stop;
};

这个线程池实现的思路与我设计的PthreadPool一致,类在构造时即启动 N 个线程配合条件数和互斥锁来监控调度任务队列。不过他的任务队列采用了 FIFO 模式,这在用户层面上控制了一个执行顺序。

其启动线程监控队列的实现与PthreadPool十分相似,我们重点看下enqueue入队操作的实现,也是使用模板最复杂的函数:

template< class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
        -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;
    /*
        1. use shared_ptr to manage task's live period.
        2. use `package_task` to store the function state
           so that we can evoke `future.get()` more than once.
        3. use `std::forward` to perfect transmit params so
           that the type will not be cast emplicitly.
    */
    auto task = std::make_shared< std::packaged_task<return_type()>>(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );
    std::future<return_type> res = task->get_future();

    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        if (stop)
        {
            throw std::runtime_error("enqueue on stopped queue");
        }
        /* use `emplace` instead of `insert` to decrease the
           total number of ctor evoking.

           [task](){ (*task)();} are suited for `std::function<void()>`
        */
        tasks.emplace([task]() { (*task)(); });
    }
    condition.notify_one();
    return res;
}

其实剥离出眼花缭乱的现代 C++ 模板语法,核心就是为适配任意函数对象设计出std::future<return_type>std::function<>. 这样的函数泛化方法可以在今后通用。

最后是现代线程池的测试用例:

ThreadPool pool(4);
std::vector< std::future<int> > results;

for(int i = 0; i < 8; ++i) {
    /*
        Use `emplace_back` instead of `push_back`
        the lambda function can be used by `std::future` to construct.
    */
    results.emplace_back(
        pool.enqueue([i] {
            std::cout << "hello " << i << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(1));
            std::cout << "world " << i << std::endl;
            return i*i;
        })
    );
}

/*
    `result.get()` can block the `std::future` object until
    its task finished.
*/
for(auto && result: results)
    std::cout << result.get() << ' ';
std::cout << std::endl;

仍有提升空间

无论是现代线程池还是PthreadPool只剩下一个明显的问题 -- 对结果的等待。

// ThreadPool:
for(auto && result: results)
    std::cout << result.get() << ' ';

// PthreadPool:
for ( int i = 0; i < vTasks.size(); ++i)
{
    // ...
    console_info("Task Res: {}", string{ (const char*)pt->Result() });
}

无论是result.get()还是ThreadTask::Result都是阻塞接口,在容器内挨个轮询都会阻塞,不能优先地返回已经完后的任务。

这一点 Python 的接口就实现了实时取出完成任务的机制:

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    # 动态地返回已经完成的结果
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

C++ 要实现优先返回完成的任务,需要再添加临界区变量完成队列,过多的临界区变量必然使用同步原语,必然会牺牲一些性能。此处的优化完全视大家需求而定。

预告

C++造轮子系列已经相继完成了 io 相关和线程相关的基础轮子,下一步就是迈向与外部通信的轮子,关于和网络通信的模块。这一部分成熟且优秀的三方轮子太多了,本系列先尝试学习封装一个 tcp,再考虑往上封装 http 或 rcp.

Reference