Re: pipedstreams




"Tomasz Grobelny" <tomasz@xxxxxxxxxxxxxxxxxxxxxxx> wrote in message
news:egtjl3-bj2.ln1@xxxxxxxxxxxxxxxxx
What's wrong with the following code? Usually works fine but from time to
time readInt returns values that has never been written to the stream
(that
results in ArrayIndexOutOfBoundsException). On the other hand if I have
only one consumer and one producer thread everything works just fine. What
may be the problem? Should I synchronize access to that streams in some
other way?

I haven't seen anything yet in this, although it would help greatly if you
could post indented code and for Pete's sake capitalize your classnames.
Although there are only a few places where you index an array here, say
which line the error is occuring on. It can't really be in the comm
constructor, but that still leaves send and receive and I'm guessing you
mean the error is in send because the producer calls that with an index read
from the stream.

I noticed that the catch about line 39 has no closing parenthesis. This is
unusual for cut-and-paste and makes me think the code isn't real (was
re-typed and edited) or not really run. Are you sure you recompiled after
some other fix? The compiler may issue error messages but still leave the
old code behind. Finally, I don't know whether it's a good idea to be
creating and discarding so many DataInput/OutputStreams. I doubt they would
interfere with the stream handling, but why risk it when it's so much easier
to put the wrapped streams into your Comm object.


--
Regards,
Tomasz Grobelny

import java.io.*;

class comm
{
PipedInputStream in;
PipedOutputStream out;
PipedInputStream[] in_t;
PipedOutputStream[] out_t;
public comm(int num)
{
in=new PipedInputStream();
out=new PipedOutputStream();
try{in.connect(out);}
catch (IOException e){System.err.println("An error occured
while
connecting");}
in_t=new PipedInputStream[num];
out_t=new PipedOutputStream[num];
for(int i=0;i<num;i++)
{
in_t[i]=new PipedInputStream();
out_t[i]=new PipedOutputStream();
try{in_t[i].connect(out_t[i]);}
catch (IOException e){System.err.println("An error
occured while
connecting ("+i+")");}
}
}
public synchronized void send(int dest, int v)
{
DataOutputStream dos;
if(dest==-1)
{
synchronized(out)
{
dos=new DataOutputStream(out);

try{dos.writeInt(v);dos.flush();out.flush();}
catch (IOException e
{System.err.println("Error while writing (-1)");}
}
}
else
{
synchronized(out_t[dest])
{
dos=new DataOutputStream(out_t[dest]);

try{dos.writeInt(v);dos.flush();out_t[dest].flush();}
catch (IOException e

^ Why no closing parenthesis here?


{System.err.println("Error while writing
("+dest+")");}
}
}
}
public int receive(int from)
{
if(from==-1)
{
synchronized(in)
{
try{return (new
DataInputStream(in)).readInt();}
catch (IOException e
{System.err.println("Error while reading
(-1)");return -1;}
}
}
else
{
synchronized(in_t[from])
{
try{return (new
DataInputStream(in_t[from])).readInt();}
catch (IOException e

^ and here for that matter?

{System.err.println("Error while reading
("+from+")");return -1;}
}
}
}
}

class mp_producer extends Thread
{
comm global;
int id;
public mp_producer(comm g, int i)
{
global=g;
id=i;
}
public void run()
{
comm local;
for(int i=0;;i++)
{
System.out.println("Producer "+id+" Waiting for free
space...");
int cd=global.receive(-1);
System.out.println("Producer "+id+" Started
producing for "+cd);
try{Thread.sleep((int
(java.lang.Math.random()*1000));}catch(java.lang.InterruptedException e){}
System.out.println("Producer "+id+" Finished
producing for "+cd+"
item "+i);
global.send(cd, i);
}
}
}

class mp_consumer extends Thread
{
comm global;
int id;
public mp_consumer(comm g, int i)
{
global=g;
id=i;
}
public void run()
{
for(;;)
{
global.send(-1, id); //announce ability to consume
an item
System.out.println("Consumer "+id+" Waiting for
item...");
int d=global.receive(id);
System.out.println("Consumer "+id+" Started
consuming: "+d);
try{Thread.sleep((int
(java.lang.Math.random()*1000));}catch(java.lang.InterruptedException e){}
System.out.println("Consumer "+id+" Finished
consuming: "+d);
}
}
}

class mp
{
public static void main(String[] args)
{
int max=10;
comm globalcomm=new comm(max);
for(int i=0;i<max;i++)
{
mp_consumer cons=new mp_consumer(globalcomm, i);
mp_producer prod=new mp_producer(globalcomm, i);
cons.start();
prod.start();
}
}
}


Cheers,
Matt Humphrey matth@xxxxxxxxxxxxxx http://www.iviz.com/


.



Relevant Pages

  • Re: problems with ImageIO
    ... public void write(PrintStream stream) throws IOException { ... int size = nextNumber; ...
    (comp.lang.java.programmer)
  • Re: Exception Names
    ... There are two kinds of conceptual "read" operations on a stream. ... public int readFully() ... throws IOException ... public void readFully ...
    (comp.lang.java.programmer)
  • pipedstreams
    ... catch (IOException e){System.err.println("An error occured while ... public synchronized void send(int dest, ... catch {System.err.println("Error while reading ... comm global; ...
    (comp.lang.java.help)
  • problems with ImageIO
    ... public void write(PrintStream stream) throws IOException { ... int size = nextNumber; ...
    (comp.lang.java.programmer)
  • Re: problems with ImageIO
    ... public void writethrows IOException { ... NOT full size of data behind stream. ... int size = nextNumber; ... http://uio.imagero.com Unified I/O for Java ...
    (comp.lang.java.programmer)