multi-threaded server, pthreads & sleep

From: Parahat Melayev (parahat_at_gmail.com)
Date: 03/16/05


Date: 16 Mar 2005 04:42:42 -0800

I am trying to writa a multi-client & multi-threaded TCP server.
There is a thread pool. Each thread in the pool will handle requests
of multiple clients.

But here I have a problem. I find a solution but it is not how it must
be... i think. When threads working without sleep(1) I can't get
response from server but when I put sleep(1) in thread function as you
will see in code, everything works fine and server can make echo
nearly for 40000 clients between 1 or 2 seconds. What am I doing wrong
here?

thanks,
parahat

------------ server.c -------------
$ cc server.c -o server -lpthread
-----------------------------------
#include <pthread.h>
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>

#define MAX_CLIENT_PER_THREAD 300
#define MAX_THREAD 200
#define PORT 3355
#define MAX_CLIENT_BUFFER 256
/*#define DEBUG*/

int listenfd;

typedef struct {
        pthread_t tid;
        int client_count;
        int clients[MAX_CLIENT_PER_THREAD];
} Thread;

pthread_cond_t new_connection_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t new_connection_mutex = PTHREAD_MUTEX_INITIALIZER;

Thread threads[MAX_THREAD];

void nonblock(int sockfd)
{
        int opts;
        opts = fcntl(sockfd, F_GETFL);
        if(opts < 0)
        {
                perror("fcntl(F_GETFL)\n");
                exit(1);
        }
        opts = (opts | O_NONBLOCK);
        if(fcntl(sockfd, F_SETFL, opts) < 0)
        {
                perror("fcntl(F_SETFL)\n");
                exit(1);
        }
}

void *thread_init_func(void *arg)
{
        int tid = (int) arg;
        
        int readsocks;
        int i;
        char buffer[MAX_CLIENT_BUFFER];
        char c;
        int n;
#ifdef DEBUG
        printf("thread %d created\n", tid);
        printf("sizeof thread.clients: %d\n", sizeof(threads[tid].clients));
#endif
        memset((int *) &threads[tid].clients, 0,
sizeof(threads[tid].clients));
        memset((char *) &buffer, 0, sizeof(buffer));
        while(1)
        {
#ifdef DEBUG
                printf("thread %d running, client count: %d\n", tid,
threads[tid].client_count);
                sleep(3);
#endif
                sleep(1); /* <-- it works ??? :-| */

                for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
                {
                        if(threads[tid].clients[i] != 0)
                        {
                                n = recv(threads[tid].clients[i], buffer, MAX_CLIENT_BUFFER, 0);
                                if(n == 0)
                                {
#ifdef DEBUG
                                        printf("client %d closed connection 0\n",
threads[tid].clients[i]);
#endif
                                        threads[tid].clients[i] = 0;
                                        threads[tid].client_count--;
                                        memset((char *) &buffer, 0, strlen(buffer));
                                }
                                else if(n < 0)
                                {
                                        if(errno == EAGAIN)
                                        {
#ifdef DEBUG
                                                printf("errno: EAGAIN\n");
#endif
                                        }
                                        else {
#ifdef DEBUG
                                                printf("errno: %d\n", errno);
#endif
                                                threads[tid].clients[i] = 0;
                                                threads[tid].client_count--;
                                                memset( (char *) &buffer, 0, strlen(buffer));
#ifdef DEBUG
                                                printf("client %d closed connection -1\n",
threads[tid].clients[i]);
#endif
                                        }
                                }
                                else {
#ifdef DEBUG
                                        printf("%d bytes received from %d - %s\n", n,
threads[tid].clients[i], buffer);
#endif
                                        
                                        send(threads[tid].clients[i], buffer, strlen(buffer), 0);
                                        memset((char *) &buffer, 0, strlen(buffer));
                                }
                        }
                }
        }
}

int choose_thread()
{
        int i=MAX_THREAD-1;
        int min = 0;
        while(i > -1)
        {
                if(threads[i].client_count < threads[i-1].client_count)
                {
                        min = i;
                        break;
                }
                i--;
        }
        return min;
}

int main()
{
        char c;
        struct sockaddr_in srv, cli;
        int clifd;
        int tid;
        int i;
        int choosen;

        signal(SIGPIPE, SIG_IGN);
        
        if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
        {
                perror("sockfd\n");
                exit(1);
        }
        bzero(&srv, sizeof(srv));
        srv.sin_family = AF_INET;
        srv.sin_addr.s_addr = INADDR_ANY;
        srv.sin_port = htons(PORT);

        if( bind(listenfd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
        {
                perror("bind\n");
                exit(1);
        }
        
        listen(listenfd, 1024);

        
        /* create threads */
        for(i = 0; i < MAX_THREAD; i++)
        {
                pthread_create(&threads[i].tid, NULL, &thread_init_func, (void *)
i);
                threads[i].client_count = 0;
        }
        

        for( ; ; )
        {
                clifd = accept(listenfd, NULL, NULL);
                nonblock(clifd);

                pthread_mutex_lock(&new_connection_mutex);
                
                choosen = choose_thread();

                for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
                {
                        if(threads[choosen].clients[i] == 0)
                        {
#ifdef DEBUG
                                printf("before threads clifd\n");
#endif
                                threads[choosen].clients[i] = clifd;
#ifdef DEBUG
                                printf("after threads clifd\n");
#endif
                                threads[choosen].client_count++;
                                break;
                        }
                }

#ifdef DEBUG
                printf("choosen: %d\n", choosen);

                for(i = 0; i < MAX_THREAD; i++)
                {
                        printf("threads[%d].client_count:%d\n", i,
threads[i].client_count);
                }
#endif

                pthread_mutex_unlock(&new_connection_mutex);
        }

        if(errno)
        {
                printf("errno: %d", errno);
        }
        
        return 0;
}

===========================================================

--------------- test.c --------------------
$ cc test.c -o test
-------------------------------------------
#include <stdio.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>

#define MAX_CLIENT 1000
/*#define DEBUG*/

int PORT = 3355;
char *IP = "192.168.222.22";

enum {
        U_INT_SIZE = sizeof(unsigned int)
};

int list[MAX_CLIENT];
fd_set fdset;
int highfd = MAX_CLIENT;

void nonblock(int sockfd)
{
        int opts;
        opts = fcntl(sockfd, F_GETFL);
        if(opts < 0)
        {
                perror("fcntl(F_GETFL)\n");
                exit(1);
        }
        opts = (opts | O_NONBLOCK);
        if(fcntl(sockfd, F_SETFL, opts) < 0)
        {
                perror("fcntl(F_SETFL)\n");
                exit(1);
        }
}

void cli_con()
{
        struct sockaddr_in srv;
        struct hostent *h_name;
        int clifd;
        int n;

        bzero(&srv, sizeof(srv));
        srv.sin_family = AF_INET;
        srv.sin_port = htons(PORT);
        inet_pton(AF_INET, IP, &srv.sin_addr);
        
        if( (clifd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
        {
                printf("ERROR clifd\n");
                exit(1);
        }

        if(connect(clifd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
        {
                printf("ERROR connect\n");
                exit(1);
        }

        nonblock(clifd);
        for(n = 0; (n < MAX_CLIENT) && (clifd != -1); n++)
        {
                if(list[n] == 0) {
#ifdef DEBUG
                        printf("connected %d\n", clifd);
#endif
                        list[n] = clifd;
                        clifd = -1;
                }
        }
        
        if(clifd != -1) {
                printf("list FULL");
        }
}

void set_list() {
        int n;
        FD_ZERO(&fdset);

        for(n = 0; n < MAX_CLIENT; n++)
        {
                if(list[n] != 0) {
                        FD_SET(list[n], &fdset);
                        if(list[n] > highfd)
                                highfd = list[n];
                }
        }
}

void recv_data(int num)
{
        int n;
        char buffer[1024];
        n = recv(list[num], buffer, 1023, 0);
        if(n > 0)
        {
#ifdef DEBUG
                printf("%d bytes from %d\n", n, list[num]);
                printf("%s", buffer);
#endif
        }
        else if(n < 0)
        {
                if(errno != EAGAIN)
                        printf("client %d disconnected\n", list[num]);
        }
        else if(n == 0)
        {
                printf("client %d disconnected\n", list[num]);
        }
}

void send_data(int num)
{
        int n;
        char buffer[1024];
        sprintf(buffer, "TEST list[%d]=%d \r\n", num, list[num]);
        if((n = send(list[num], buffer, strlen(buffer), 0)) > 0)
        {
#ifdef DEBUG
                printf("%d bytes sent from %d\n", n, list[num]);
#endif
        }
}

void scan_clients()
{
#ifdef DEBUG
        printf("scan_clients\n");
#endif
        int num;
        
        for(num = 0; num < MAX_CLIENT; num++) {
                if(FD_ISSET(list[num], &fdset))
                        recv_data(num);
        }

        for(num = 0; num < MAX_CLIENT; num++)
                send_data(num);
}

int main()
{
        int readsocks;
        int i=0;
        struct timeval tv;
        tv.tv_sec = 0;
        tv.tv_usec = 10;
        
        while(i < MAX_CLIENT)
        {
                cli_con();
                i++;
        }
        i=0;
        while(i < MAX_CLIENT)
        {
                printf("list[%d] = %d\n",i,list[i]);
                i++;
        }
        i = 0;
        while(1)
        {
                set_list();
                readsocks = select(highfd+1, &fdset, NULL, NULL, &tv);
                if(readsocks < 0) {
                        perror("select\n");
                        exit(1);
                }
                 
                /*printf("else scan_clients\n");*/
                scan_clients();
#ifdef DEBUG
                sleep(4);
#endif
        }
        return 0;
}



Relevant Pages