`

一个简单的linux线程池

阅读更多

线程池:简单地说,线程池 就是预先创建好一批线程,方便、快速地处理收到的业务。比起传统的到来一个任务,即时创建一个线程来处理,节省了线程的创建和回收的开销,响应更快,效率更高。

 

在linux中,使用的是posix线程库,首先介绍几个常用的函数:

1 线程的创建和取消函数

pthread_create

创建线程

pthread_join

合并线程

pthread_cancel

取消线程

2 线程同步函数

pthread_mutex_lock

pthread_mutex_unlock

pthread_cond_signal

pthread_cond_wait

 

关于函数的详细说明,参考man手册

 

线程池的实现:

线程池的实现主要分为三部分,线程的创建、添加任务到线程池中、工作线程从任务队列中取出任务进行处理。

主要有两个类来实现,CTask,CThreadPool

/**
执行任务的类,设置任务数据并执行
**/

class CTask
{
protected:
 string m_strTaskName;  //任务的名称
 void* m_ptrData;       //要执行的任务的具体数据
public:
 CTask(){}
 CTask(string taskName)
 {
  this->m_strTaskName = taskName;
  m_ptrData = NULL;
 }
 virtual int Run()= 0;
 void SetData(void* data);    //设置任务数据
};

 

任务类是个虚类,所有的任务要从CTask类中继承 ,实现run接口,run接口中需要实现的就是具体解析任务的逻辑。m_ptrData是指向任务数据的指针,可以是简单数据类型,也可以是自定义的复杂数据类型。

 

线程池类

/**
线程池
**/

class CThreadPool
{
private:
 vector<CTask*> m_vecTaskList;         //任务列表
 int m_iThreadNum;                            //线程池中启动的线程数           
 static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合
 static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合
 static pthread_mutex_t m_pthreadMutex;    //线程同步锁
 static pthread_cond_t m_pthreadCond;    //线程同步的条件变量
protected:
 static void* ThreadFunc(void * threadData); //新线程的线程函数
 static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中
 static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去
 int Create();          //创建所有的线程
public:
 CThreadPool(int threadNum);
 int AddTask(CTask *task);      //把任务添加到线程池中
 int StopAll();
};

 

当线程池对象创建后,启动一批线程,并把所有的线程放入空闲列表中,当有任务到达时,某一个线程取出任务并进行处理。

线程之间的同步用线程锁和条件变量。

这个类的对外接口有两个:

AddTask函数把任务添加到线程池的任务列表中,并通知线程进行处理。当任务到到时,把任务放入m_vecTaskList任务列表中,并用pthread_cond_signal唤醒一个线程进行处理。

StopAll函数停止所有的线程

 

************************************************

代码:

××××××××××××××××××××CThread.h

 

#ifndef __CTHREAD
#define __CTHREAD
#include <vector>
#include <string>
#include <pthread.h>

using namespace std;

/**
执行任务的类,设置任务数据并执行
**/
class CTask
{
protected:
 string m_strTaskName;  //任务的名称
 void* m_ptrData;       //要执行的任务的具体数据
public:
 CTask(){}
 CTask(string taskName)
 {
  this->m_strTaskName = taskName;
  m_ptrData = NULL;
 }
 virtual int Run()= 0;
 void SetData(void* data);    //设置任务数据
};

/**
线程池
**/
class CThreadPool
{
private:
 vector<CTask*> m_vecTaskList;         //任务列表
 int m_iThreadNum;                            //线程池中启动的线程数           
 static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合
 static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合
 static pthread_mutex_t m_pthreadMutex;    //线程同步锁
 static pthread_cond_t m_pthreadCond;    //线程同步的条件变量
protected:
 static void* ThreadFunc(void * threadData); //新线程的线程函数
 static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中
 static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去
 int Create();          //创建所有的线程
public:
 CThreadPool(int threadNum);
 int AddTask(CTask *task);      //把任务添加到线程池中
 int StopAll();
};

#endif

 

 

 

类的实现为:

××××××××××××××××××××CThread.cpp

 

#include "CThread.h"
#include <string>
#include <iostream>

using namespace std;

void CTask::SetData(void * data)
{
 m_ptrData = data;
}

vector<pthread_t> CThreadPool::m_vecBusyThread;
vector<pthread_t> CThreadPool::m_vecIdleThread;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;

CThreadPool::CThreadPool(int threadNum)
{
 this->m_iThreadNum = threadNum;
 Create();
}
int CThreadPool::MoveToIdle(pthread_t tid)
{
 vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin();
 while(busyIter != m_vecBusyThread.end())
 {
  if(tid == *busyIter)
  {
   break;
  }
  busyIter++;
 }
 m_vecBusyThread.erase(busyIter);
 m_vecIdleThread.push_back(tid);
 return 0;
}

int CThreadPool::MoveToBusy(pthread_t tid)
{
 vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin();
 while(idleIter != m_vecIdleThread.end())
 {
  if(tid == *idleIter)
  {
   break;
  }
  idleIter++;
 }
 m_vecIdleThread.erase(idleIter);
 m_vecBusyThread.push_back(tid);
 return 0;
}
void* CThreadPool::ThreadFunc(void * threadData)
{
 pthread_t tid = pthread_self();
 while(1)
 {
  pthread_mutex_lock(&m_pthreadMutex);
  pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);
  cout << "tid:" << tid << " run" << endl;
  //get task
  vector<CTask*>* taskList = (vector<CTask*>*)threadData;
  vector<CTask*>::iterator iter = taskList->begin();
  while(iter != taskList->end())
  {
   
   MoveToBusy(tid);
   break;
  }
  CTask* task = *iter;
  taskList->erase(iter);
  pthread_mutex_unlock(&m_pthreadMutex);
  cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl;
  cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl;
  //cout << "task to be run:" << taskList->size() << endl;
  task->Run();
  
  //cout << "CThread::thread work" << endl;
  cout << "tid:" << tid << " idle" << endl;
  
 }
 return (void*)0;
}

int CThreadPool::AddTask(CTask *task)
{
 this->m_vecTaskList.push_back(task);
 pthread_cond_signal(&m_pthreadCond);
 return 0;
}
int CThreadPool::Create()
{
 for(int i = 0; i < m_iThreadNum;i++)
 {
  pthread_t tid = 0;
  pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList);
  m_vecIdleThread.push_back(tid);
 }
 return 0;
}

int CThreadPool::StopAll()
{
 vector<pthread_t>::iterator iter = m_vecIdleThread.begin();
 while(iter != m_vecIdleThread.end())
 {
  pthread_cancel(*iter);
  pthread_join(*iter,NULL);
  iter++;
 }

 iter = m_vecBusyThread.begin();
 while(iter != m_vecBusyThread.end())
 {
  pthread_cancel(*iter);
  pthread_join(*iter,NULL);
  iter++;
 }
 
 return 0;
}

简单示例:

××××××××test.cpp

#include "CThread.h"
#include <iostream>

using namespace std;

class CWorkTask: public CTask
{
public:
 CWorkTask()
 {}
 int Run()
 {
  cout << (char*)this->m_ptrData << endl;
  sleep(10);
  return 0;
 }
};
int main()
{
 CWorkTask taskObj;
 char szTmp[] = "this is the first thread running,haha success";
 taskObj.SetData((void*)szTmp);
 CThreadPool threadPool(10);
 for(int i = 0;i < 11;i++)
 {
  threadPool.AddTask(&taskObj);
 }
 while(1)
 {
  sleep(120);
 }
 return 0;
}

 

分享到:
评论

相关推荐

    linux 实现一个简单的线程池及工作

    linux 实现一个简单的线程池及工作 本实例演示了线程池的创建使用

    linux下的一个线程池类

    在linux下实现的一个简单的线程池,可以实现多任务机制,在linux下实现的一个简单的线程池,可以实现多任务机制

    linux线程池的C语言实现

    通常我们使用多线程的方式是,需要时创建一个新的线程,在这个新的线程里执行特定的...这在一般的应用里已经能够满足我们应用的需要,毕竟我们并不是什么时候都需要创建大量的线程,并在它们执行一个简单的任务后销毁。

    Linux下C线程池实现

    在Linux下用C写的一个简易线程池。系统是RedHat 9,gcc版本"gcc version 4.1.2 20071124 (Red Hat 4.1.2-42)"。文件夹里的源码是按工程组织好的,在文件夹下的test目录下面有一个小的测试程序和Makefile,编译后即可...

    linux c线程池

    linux pthreadpool实现和线程池的用处 简单易懂 互斥和信号量使用

    简单linux C++线程池

    简单linux C++线程池实现 vector存储任务,封装类实现线程池功能。新手学习线程池的好机会,Makefile编译可直接运行。

    Linux下Epoll+线程池的简单Web服务器

    这是一个简单的web服务器利用Linux下的线程池技术和Epoll机制 简单粗略有不足的欢迎指导

    thd_pool_v1.02.tar.bz2.zip_linux_linux 线程池_whale1ce_线程池

    Linux中的一个简单线程池,基于POSIX线程API。 管理线程将检索不必要的空闲线程以保持低资源消耗。

    LINUX通用线程池的构建

    关于LINUX通用线程池的构建 本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进行...文章的最后,我们给出一个简单示例程序,通过该示例程序,我们会发现,通过该线程池框架执行多线程任务是多么的简单。

    Linux C线程池简单实现实例

    主要介绍了Linux C线程池简单实现实例的相关资料,需要的朋友可以参考下

    linux下c++线程池

    一个简单好用的线程池,包括如何使用的demo,未考虑动态伸缩。 之前上传的那些线程池资源都不及这个。 用qtcreator可以直接打开。

    VC简单的线程池使用实例

    1.线程池管理器(ThreadPoolManager):用于创建并管理...网上的c/c++线程池多是linux下的,这个是VC6.0的线程池。其涉及的主要文件有:ThreadPoolManage、IThreadExcute、TemporarilyThread、ThreadManage、RegKeyIni。

    linux C++ 实现线程池(避免线程创建的耗时)

    linux下c++写的线程池,可以了解pthread_cond_timewait和pthread_detach的用法,自定义最大使用的线程数量,线程退出线程池的超时时间,任务优先级处理。

    一个简单的c++ 线程池

    一个简单的c++ 线程池,非常实用,只有threadpool.h 和threadpool.cpp 和main 三个文件,希望能给大家提供方便。

    线程池Linux C语言简单版本

    本线程池采用C语言实现。包括以下内容 &gt; - thread_pool_create:创建...主要的核心点集中在thread_pool_post和thread_worker两个函数中,这两个函数也构成了生产者-消费者模型。本文采用队列+互斥锁+条件变量实现。

    线程池_c线程池_

    linux线程池C语言编程,通过简单的例子,可以很好理解到线程池的作用

    C语言实现简单线程池.zip

    由C语言实现简单的线程池,任务调配,合理创建销毁线程处理任务

    linux下的线程池实现

    简单的linux多线程实现,含互斥锁的使用

    线程池实现,通过C语言实现

    linux下通过200行C代码实现简单线程池,附源代码和文档说明: 200行C代码实现简单线程池.doc threadpool.c

    C++写的线程池,环境是ubuntu(Linux)

    比较简单的线程池,自己练手用,大家有兴趣可以参考参考,也可以批评指正

Global site tag (gtag.js) - Google Analytics