Re: Structure of the multitasking server



Since you are using Channels, I am assuming that your talking about TCP/IP
servers. In this case you should look into using the Check_Selector with
the use of Signalling_Fds and Socket_Sets instead of using arrays.

Now the way GNAT has written the Selector function and the way the C_Select
is written the maximum number of server per Check_Selector is 27. That is,
each C_Select function can only monitor 32 sockets, but
GNAT.Sockets.Create_Selector uses two socket and the TCP/IP sub-system
uses or predefines 3, so you are left with 27 user define servers.

Found this clent/server on the net. The following client/server shows how to
use the Check_Selector routine. They were both were written in all caps and
use Ada-95 spec so, you may need to adjust the Signalling_Fds routine
calls for Ada-2005 (GNAT-2008), but they do compile and excute under
GNAT GPL 2007 using Linux.

Note: 1) I alter for spacing and cap format. To make them more readable.
2) The procedure "Set_Socket_Option" may need to be commented
out. Some TCP/IP system do not like GNAT version of that
routine.
3) As for Windows, I did not test!


--------------------------------------------------------------
--
-- Pool_Server
--
with GNAT.Sockets ;
use GNAT.Sockets ;
with Ada.Text_IO ;

procedure Pool_Server is

MaxTasks : constant Positive := 5 ; -- buffer size
type Index is mod MaxTasks ;

function Rev ( S : String ) return String is
Res : String ( S'Range ) ;
J : Integer := S'First ;
begin
for I in reverse S'Range loop
Res ( J ) := S ( I ) ;
J := J + 1 ;
end loop ;
return Res ;
end Rev ;

protected Aborted is
procedure Set ;
function Check return Boolean ;
private
Done : Boolean := False ;
end Aborted ;

protected body Aborted is
procedure Set is
begin
Done := True ;
end Set ;

function Check return Boolean is
begin
return Done ;
end Check ;
end Aborted ;


type Echo ;
type Echo_Access is access Echo ;

task type Echo is
entry Start ( N_Sock : IN Socket_Type ;
Self : IN Echo_Access ) ;
entry ReStart ( N_Sock : IN Socket_Type ) ;
end Echo ;

type Task_Array is array ( Index ) of Echo_Access ;

protected Buffer is
entry Deposit ( X : in Echo_Access ) ;
entry Extract ( X : out Echo_Access ) ;
function NumWaiting return Natural ;
private
Buf : Task_Array ;
I, J : Index := 0 ;
Count : Natural range 0 .. MaxTasks := 0 ;
end Buffer ;


task body Echo is
Sock : Socket_Type ;
S : Stream_Access ;
Me : Echo_Access ;
Input_Selector : Selector_Type ;
Input_Set : Socket_Set_Type ;
WSet : Socket_Set_Type ;
Input_Status : Selector_Status ;
begin
--set up selector
Create_Selector ( Input_Selector ) ;

--Initialise socket sets
--WSet is always empty as we are not interested in output events
-- RSet only ever contains one socket namely Sock
Empty ( Input_Set ) ;
Empty ( WSet ) ;

ACCEPT Start ( N_Sock : IN Socket_Type ;
Self : IN Echo_Access ) DO
Sock := N_Sock ;
Me := Self ;
end Start ;

loop
begin -- block for exception handling
S := Stream ( Sock ) ; -- set up stream on socket
Boolean'Write ( S, True ) ; -- acknowledge connection

loop
-- check for input on Sock socket
Set ( Input_Set, Sock ) ;

-- time-out on check if no input within 0.5 second
Check_Selector ( Input_Selector,
Input_Set,
WSet,
Input_Status,
0.5 ) ;
if Input_Status = Completed then
-- we have input, so process it
declare
Str : String := String'Input ( S ) ;
begin
exit when Str = "quit" ;
String'Output ( S, Rev ( Str ) ) ;
end ;
end if ;
if Aborted.Check then
String'Output ( S, "Server aborted" ) ;
exit ;
end if ;
end loop ;

Ada.Text_IO.New_Line ;
Ada.Text_IO.Put_Line ( "Slave Closing Connection" ) ;
ShutDown_Socket ( Sock, Shut_Read_Write ) ;
Buffer.Deposit ( Me ) ;

exception
-- The mostly likely exception is if client quits unexpectedly
-- close the socket and deposit ourselves in the buffer
when others =>
Ada.Text_IO.New_Line ;
Ada.Text_IO.Put_Line ( "Connection closed unexpectedly" ) ;
Close_Socket ( Sock ) ;
Buffer.Deposit ( Me ) ;
end ;

select
ACCEPT ReStart ( N_Sock : IN Socket_Type ) DO
Sock := N_Sock ;
end ReStart ;
or
-- terminate if all slaves are queued here and
-- if the main server task has finished
terminate ;
end select ;

end loop ;
end Echo ;

protected body Buffer is
entry Deposit ( X : IN Echo_Access ) when Count < MaxTasks is
begin
Buf ( I ) := X ;
I := I + 1 ;
Count := Count + 1 ;
end Deposit ;

entry Extract ( X : OUT Echo_Access ) when Count > 0 is
begin
X := Buf ( J ) ;
J := J + 1 ;
Count := Count - 1 ;
end Extract ;

function NumWaiting return Natural is
begin
return Count ;
end NumWaiting ;
end Buffer ;

Server : Socket_Type ;
New_Sock : Socket_Type ;
Slave : Echo_Access ;
Addr : Sock_Addr_Type ( Family_Inet ) ;
Peer_Addr : Sock_Addr_Type ( Family_Inet ) ;
Avail : Boolean := False ;
Ch : Character ;
TotalTasks : Natural := 0 ;
Accept_Selector : Selector_Type ;
Accept_Set : Socket_Set_Type ;
WSet : Socket_Set_Type ;
Accept_Status : Selector_Status ;

begin -- main server task
Ada.Text_IO.Put_Line ( "WARNING server loops for ever." ) ;
Ada.Text_IO.Put ( "Press A to terminate server and all " ) ;
Ada.Text_IO.Put_Line ( "tasks immediately or press Q to ") ;
Ada.Text_IO.Put ( "accept no further connections and " ) ;
Ada.Text_IO.Put ( "terminate gracefully when all clients " ) ;
Ada.Text_IO.Put ( "are fully when all clients are through." ) ;
Ada.Text_IO.New_Line ;
Initialize ;
Create_Socket ( Server) ;
Addr := ( Family_Inet,
Addresses ( Get_Host_By_Name ( Host_Name ), 1 ),
50000 ) ;
-- allow server address to be reused for multiple connections
Set_Socket_Option ( Server,
Socket_Level,
( Reuse_Address, True ) ) ;

Bind_Socket ( Server, Addr ) ;
Listen_Socket ( Server, 4 ) ;

-- set up selector
Create_Selector ( Accept_Selector ) ;

-- Initialise socket sets
-- WSet is always empty as we are not interested in output
-- events Accept_Set only ever contains one socket namely
-- Server
Empty ( Accept_Set ) ;
Empty ( WSet ) ;
loop
Ada.Text_IO.Get_Immediate ( Ch, Avail ) ;
if Avail and then
( Ch = 'q' or Ch = 'Q' or Ch = 'a' or Ch = 'A' ) then
exit ;
end if ;

-- check for input (connection requests) on Server socket
Set ( Accept_Set, Server ) ;
-- time-out on check if no request within 1 second
Check_Selector ( Accept_Selector,
Accept_Set,
WSet,
Accept_Status,
1.0 ) ;

if Accept_Status = Completed then
-- must be an event on Server socket as it is the only
-- one that we are checking.
-- Hence the Accept_Socket call should not block.

Accept_Socket ( Server, New_Sock, Peer_Addr ) ;
Ada.Text_IO.New_Line ;
Ada.Text_IO.Put_Line
( "Connection accepted -- allocating slave" ) ;


if Buffer.NumWaiting = 0 and TotalTasks < MaxTasks then
Slave := NEW Echo ; -- start new task
TotalTasks := TotalTasks + 1 ;
Ada.Text_IO.Put_Line ( "New slave task started" ) ;
-- call entry Start to activate task
Slave.Start ( New_Sock, Slave ) ;
else
Ada.Text_IO.Put_Line ( "Waiting for an idle slave task" ) ;
Buffer.Extract ( Slave ) ;
-- call entry Start to re-activate task
Slave.ReStart ( New_Sock ) ;
Ada.Text_IO.Put_Line ( " Idle slave task reactivated" ) ;
end if ;
end if ;
end loop ;

if Ch = 'a' or Ch = 'A' then
-- signal slave tasks to terminate
Aborted.Set ;
end if ;

-- tidy up
Close_Selector ( Accept_Selector ) ;
Empty ( Accept_Set ) ;

Close_Socket ( Server ) ;
Ada.Text_IO.New_Line ;
Ada.Text_IO.Put_Line ( "Main server task exiting ..." ) ;
Finalize ;
end Pool_Server ;


--------------------------------------------------------------
--
-- Pool_Client
--
with Gnat.Sockets ;
use Gnat.Sockets ;
with Ada.Command_Line ;
use Ada.Command_Line ;
with Ada.Text_IO ;
use Ada.Text_IO ;

procedure Pool_Client is
Sock : Socket_Type ;
S : Stream_Access ;
Addr : Sock_Addr_Type ( Family_Inet ) ;
Msg : String ( 1 .. 80 ) ;
Last : Natural ;
B : Boolean ;
Read_Selector : Selector_Type ;
Read_Set, WSet : Socket_Set_Type ;
Read_Status : Selector_Status ;
begin
Initialize ;
Create_Socket ( Sock ) ;
Addr := ( Family_Inet,
Addresses ( Get_Host_By_Name ( Argument ( 1 ) ), 1 ),
50000 ) ;
Create_Selector ( Read_Selector ) ;
Empty ( Read_Set ) ;
Empty ( WSet ) ;


Connect_Socket ( Sock, Addr ) ;
S := Stream ( Sock ) ;
Boolean'Read ( S, B ) ;
-- wait for connection to be accepted

loop
Set ( Read_Set, Sock ) ;

-- check for input on socket (server may be aborting)
-- time-out immediately if no input pending
-- We seem to need a small delay here (using zero seems to block
-- forever)
-- Is this a GNAT bug or AB misreading Check_Selector docs?

Check_Selector ( Read_Selector,
Read_Set,
WSet,
Read_Status,
0.005 ) ;
if Read_Status = Expired then
Ada.Text_IO.Put ( "Message> " ) ; -- prompt user for message
Ada.Text_IO.Get_Line ( Msg, Last ) ;

-- send message to socket unless server is aborting
String'Output ( S, Msg ( 1 .. Last ) ) ;
exit when Msg ( 1 .. Last ) = "quit" ;
end if ;

declare
-- receive message
Str : String := String'Input ( S ) ;
begin Ada.Text_IO.Put_Line ( Str ) ;
exit when Str = "Server aborted" ;
end ;
end loop ;

Ada.Text_IO.Put_Line ( "Client quitting ..." ) ;
ShutDown_Socket ( Sock ) ;
Close_Selector ( Read_Selector ) ;
Finalize ;
exception
when others =>
Ada.Text_IO.Put_Line ("Exception: Client quitting ..." ) ;
Close_Socket ( Sock ) ;
Close_Selector( Read_Selector ) ;
Finalize ;
end Pool_Client ;


In <8b4d1170-22e6-40d3-8ed1-096dc0163491@xxxxxxxxxxxxxxxxxxxxxxxxxxxx>, Maciej Sobczak <see.my.homepage@xxxxxxxxx> writes:
Hi all,

Imagine a server with fixed number of worker tasks. There is no queue
of jobs and jobs are immediately passed to one of the tasks that is
currently idle. There is a separate task (or just the main one) that
provides jobs for worker tasks.

I am concerned with the proper structure of objects - I mean in the
sense of recommended Ada practice.
Obviously there is a need for some shared resource where the
requesting task will put the job and from where the worker task will
pick it up.

This is more or less what I came up with, where the "channel" is a
single processing pipeline:

type Worker_State is (Idle, Ready, Working);

protected type Channel_State is
procedure Post (J : in Job_Type);
entry Get_Job (J : out Job_Type);
function Busy return Boolean;
private
State : Worker_State := Idle;
Job_To_Do : Job_Type;
end Channel_State;

protected body Channel_State is

procedure Post (J : in Job_Type) is
begin
if State /= Idle then
raise Program_Error;
end if;

Job_To_Do := J;
State := Ready;
end Post;

entry Get_Job (J : out Job_Type) when State = Ready is
begin
J := Job_To_Do;
State := Working;
end Get_Job;

function Busy return Boolean is
begin
return State /= Idle;
end Busy;

end Channel_State;

type Channel;
task type Worker_Task (Ch : access Channel);

type Channel is record
State : Channel_State;
Worker : Worker_Task (Channel'Access);
end record;

task body Worker_Task is
Job : Job_Type;
begin
loop
Ch.all.Get_Job (Job);

-- do the job ...

end loop;
end Worker_Task;

Max_Channels : constant := 5;

Channels : array (1 .. Max_Channels) of Channel;

My question is whether this is what a seasoned Ada programmer would
do.
Initially I tried to have two separate arrays, one for jobs and one
for worker tasks, but I found it difficult to link workers with their
respective jobs. After bundling them together in a single record that
is referenced from the task it worked and I actually find it
structured better.

The main task after constructing a job object finds some channel where
the worker task is not busy and posts the job to its shared state
component:

loop
Job := ...

Found_Worker := False;
for I in Channels'Range loop
if not Channels (I).State.Busy then
Channels (I).State.Post (Job);
Found_Worker := True;
exit;
end if;
end loop;

if not Found_Worker then
-- all pipelines are busy,
-- but the overflow handling is not shown...
end if;
end loop;

All this works fine, but my question considers the choice of language
constructs and idioms.

--
Maciej Sobczak * www.msobczak.com * www.inspirel.com

Database Access Library for Ada: www.inspirel.com/soci-ada

.



Relevant Pages

  • Re: multiple tcp server and client execution with close problem
    ... Ignore the loop for accept. ... for acceptafter settting socket to blocking mode. ... Client side: ... execute main to run server 3 times: ...
    (comp.unix.programmer)
  • Re: TCL multiple sockets communication
    ... can anybody tell me if it is possible for a server to open ... puts "Accepted connection $channel from $client" ... easily modify it to accept multiple ports by adding another [socket ... proc accept {srv channel client port} { ...
    (comp.lang.tcl)
  • Re: multiple tcp server and client execution with close problem
    ... while loop to accept ... Why is theserverstill waiting for the client? ... The server is still waiting because there is another function calling ...
    (comp.unix.programmer)
  • Re: client-server network
    ... sent by server didn't reach the client...... ... nRet = WSAStartup; ... server scoket, and this would be binding some other random socket value, apparently the ... If you make this a server loop, it should be 'continue', not 'return'. ...
    (microsoft.public.vc.mfc)
  • [Full-disclosure] Multiple vulnerabilities in ircu
    ... Ircu is the open source IRC server used on Undernet and other IRC networks. ... they allow clients to get more privileges on the IRC ... Gaining ops on channels that get empty on one side of a netsplit ... Making clients think someone is on a channel, ...
    (Full-Disclosure)