`

Linux下基于TCP多线程服务与客户的实现

阅读更多
本例子是本人Linux下基于TCP多线程Socket编程的第二个例子,本例子是用C++实现的
服务器采用了面向对象的多线程,用到了队列与链表,信号量(操作系统中叫PV操作)
本例子中的队列与链表源代码在前面可以找到,这里就不多贴了
此系统所支持的自定义命令跟上个例子相同,就里就不多说明了

头文件Thread.h代码,里面就一个抽象类(抽象类没有自己的实例,一定要被子类所继承)
#ifndef THREAD_H_INCLUDED
#define THREAD_H_INCLUDED
class Thread
{
public:
void ThreadEnter();
protected:
virtual void Start() = 0;
virtual void Initialize(){}
};
#endif // THREAD_H_INCLUDED

Thread.cpp代码:
#include "Thread.h"
void Thread::ThreadEnter()
{
    Start();
}
以下为服务器主要头文件Server.h代码:
#ifndef SERVER_H_INCLUDED
#define SERVER_H_INCLUDED
#include "Thread.h"
#include "LinkList.h"
#include "ThreadQueue.h"
#include <netinet/in.h>
#include <pthread.h>
#include <semaphore.h>
#define MSG_SIZE 1024
#define BACKLOG 10
#define PORT 8001
class Server : public Thread
{
public:
Server();
~Server();
public:
void Start();
void Initialize();
void SendMessage(Server* serer);
static void* SendMessageThread(void* param);
void ReadMessage(Server* server);
static void* ReadMessageThread(void* param);
private:
int sock_fd,new_fds[BACKLOG],new_fd;
struct sockaddr_in serv_addr,dest_addr;
pthread_mutex_t mutex;
pthread_t pth_r,pth_s;
sem_t sem_r,sem_s;
int thread_cout;
LinkList list;
ThreadQueue queue;
DataType *pData;
};
#endif // SERVER_H_INCLUDED

以下为Thread.h实现的Thread.cpp代码:
#include "Server.h"
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <iostream>
#include <string>
using namespace std;
//------------------------------------------------------------------
Server::Server()
{
    pthread_mutex_init(&mutex,NULL);
    sem_init(&sem_r,0,10);
    sem_init(&sem_s,0,0);
}
//------------------------------------------------------------------
void Server::Initialize()
{
    sock_fd = socket(AF_INET,SOCK_STREAM,0);
    if(sock_fd < 0)
    {
        perror("socket fail!" );
        exit(-1);
    }
    serv_addr.sin_family = AF_INET;
serv_addr.sin_port = ntohs(PORT);
serv_addr.sin_addr.s_addr = INADDR_ANY;
bzero(&(serv_addr.sin_zero), 8);
    if (bind(sock_fd, (struct sockaddr*) &serv_addr,
        sizeof(struct sockaddr)) < 0)
    {
        perror("bind fail! ");
        exit(-1);
    }
    if(listen(sock_fd,BACKLOG) < 0)
    {
        perror("listen fail!" );
        exit(-1);
    }
    cout << "listenning......" << endl;
    socklen_t sin_size = sizeof(dest_addr);
    while(1)
    {
        if(thread_cout == BACKLOG - 1)
        {
            return;
        }
        new_fd = accept(sock_fd,(struct sockaddr *)&dest_addr,&sin_size);
        if(new_fd < 0)
        {
            perror("accept fail!" );
            exit(-1);
        }
        cout << "\nA client has connected to me "
    << inet_ntoa(dest_addr.sin_addr)
    << ":" << ntohs(dest_addr.sin_port)
    << endl;
        pthread_mutex_lock(&mutex);
        thread_cout++;
        list.InsertNode(thread_cout,new_fd);
        pthread_mutex_unlock(&mutex);
        pthread_create(&pth_r,NULL,ReadMessageThread,this);
    }
}
//------------------------------------------------------------------
void Server::Start()
{
    pthread_create(&pth_s,NULL,SendMessageThread,this);
    Initialize();
}
//------------------------------------------------------------------
void Server::ReadMessage(Server* server)
{
    int fd = server->new_fd;
    char buf[MSG_SIZE];
    int len;
/*
    pthread_mutex_lock(&mutex);
    int count = thread_cout - 1;
    pthread_mutex_unlock(&mutex);
*/
    while(1)
    {
        sem_wait(&sem_r);
        if ((len = read(fd, buf, MSG_SIZE)) == -1)
  {
   perror("read fail!");
   pthread_exit(NULL);
  }
  else if (len == 0)
        {
            cout << "Current client has disconnected to me" << endl;
            //cout << "close fd = " << fd << endl;
            close(fd);
            list.DeleteNode(fd);
            pthread_exit(NULL);
        }
        //cout << "read fd = " << fd << endl;
        buf[len] = '\0';
        DataType *data = new DataType();
        data->fd = fd;
        strcpy(data->buff,buf);
        cout << "\nRECEIVE: " << buf
             << " receive fd = " << fd << endl;
        //pthread_mutex_lock(&mutex);
        queue.EnterQueue(data);
        //pthread_mutex_unlock(&mutex);
        //delete data;
        sem_post(&sem_s);
    }
}
//------------------------------------------------------------------
void* Server::ReadMessageThread(void* param)
{
    Server* server = (Server *)param;
    server->ReadMessage(server);
    return NULL;
}
//------------------------------------------------------------------
void Server::SendMessage(Server* server)
{
    while(1)
    {
        sem_wait(&sem_s);
        int list_len = list.GetLength();

        int tNewfd,tReceivefd;
        //pthread_mutex_lock(&mutex);
        pData = queue.OutQueue();
        //int queue_len = queue.Queuelength();
        //pthread_mutex_unlock(&mutex);
        tReceivefd = pData->fd;
        //cout << "Received fd = " << tReceivefd << endl;
        pthread_mutex_lock(&mutex);
        for(int i = 1; i <= list_len; i++)
        {
            list.GetNodeData(i,tNewfd);
            //cout << "New fd = " << tNewfd << endl;
            //if(queue_len != 0)
            //{
                if(tNewfd != tReceivefd)
                {
                    write(tNewfd,pData->buff,sizeof(pData->buff));
                    cout << "Send to client successful! fd = " << tNewfd << endl;;
                }
            //}
        }
        delete pData;
        pthread_mutex_unlock(&mutex);
        sem_post(&sem_r);
    }
}
//------------------------------------------------------------------
void* Server::SendMessageThread(void* param)
{
    Server* server = (Server *)param;
    server->SendMessage(server);
    return NULL;
}
//------------------------------------------------------------------
Server::~Server()
{
     close(sock_fd);
    pthread_join(pth_r,NULL);
    pthread_join(pth_s,NULL);
}
//------------------------------------------------------------------

以下为主文件main.cpp代码:
#include "Server.h"
int main(void)
{
    Server* server = new Server();
    server->ThreadEnter();
    return 0;
} <!--v:3.2-->
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics