00001 using System;
00002 using System.IO;
00003 using System.Runtime.CompilerServices;
00004 using System.Threading;
00005
00006 namespace Tamir.Streams
00007 {
00008
00009
00010
00011
00012
00013
00014
00035 public class PipedInputStream : Tamir.SharpSsh.java.io.InputStream
00036 {
00037 internal bool closedByWriter = false;
00038 internal volatile bool closedByReader = false;
00039 internal bool connected = false;
00040
00041
00042
00043
00044
00045 internal Thread readSide;
00046 internal Thread writeSide;
00047
00052 internal const int PIPE_SIZE = 1024;
00053
00058 internal byte[] buffer = new byte[PIPE_SIZE];
00059
00067 internal int m_in = -1;
00068
00074 internal int m_out = 0;
00075
00086 public PipedInputStream(PipedOutputStream src)
00087 {
00088 connect(src);
00089 }
00090
00100 public PipedInputStream()
00101 {
00102 int i = 0;
00103 }
00104
00129 public virtual void connect(PipedOutputStream src)
00130 {
00131 src.connect(this);
00132 }
00133
00141 [MethodImpl(MethodImplOptions.Synchronized)]
00142 internal void receive(int b)
00143 {
00144 checkStateForReceive();
00145 writeSide = Thread.CurrentThread;
00146 if (m_in == m_out)
00147 awaitSpace();
00148 if (m_in < 0)
00149 {
00150 m_in = 0;
00151 m_out = 0;
00152 }
00153 buffer[m_in++] = (byte)(b & 0xFF);
00154 if (m_in >= buffer.Length)
00155 {
00156 m_in = 0;
00157 }
00158 }
00159
00168 [MethodImpl(MethodImplOptions.Synchronized)]
00169 internal void receive(byte[] b, int off, int len)
00170 {
00171 checkStateForReceive();
00172 writeSide = Thread.CurrentThread;
00173 int bytesToTransfer = len;
00174 while (bytesToTransfer > 0)
00175 {
00176 if (m_in == m_out)
00177 awaitSpace();
00178 int nextTransferAmount = 0;
00179 if (m_out < m_in)
00180 {
00181 nextTransferAmount = buffer.Length - m_in;
00182 }
00183 else if (m_in < m_out)
00184 {
00185 if (m_in == -1)
00186 {
00187 m_in = m_out = 0;
00188 nextTransferAmount = buffer.Length - m_in;
00189 }
00190 else
00191 {
00192 nextTransferAmount = m_out - m_in;
00193 }
00194 }
00195 if (nextTransferAmount > bytesToTransfer)
00196 nextTransferAmount = bytesToTransfer;
00197 assert(nextTransferAmount > 0);
00198 Array.Copy(b, off, buffer, m_in, nextTransferAmount);
00199 bytesToTransfer -= nextTransferAmount;
00200 off += nextTransferAmount;
00201 m_in += nextTransferAmount;
00202 if (m_in >= buffer.Length)
00203 {
00204 m_in = 0;
00205 }
00206 }
00207 }
00208
00209 private void checkStateForReceive()
00210 {
00211 if (!connected)
00212 {
00213 throw new IOException("Pipe not connected");
00214 }
00215 else if (closedByWriter || closedByReader)
00216 {
00217 throw new IOException("Pipe closed");
00218 }
00219 else if (readSide != null && !readSide.IsAlive)
00220 {
00221 throw new IOException("Read end dead");
00222 }
00223 }
00224
00225 private void awaitSpace()
00226 {
00227 while (m_in == m_out)
00228 {
00229 if ((readSide != null) && !readSide.IsAlive)
00230 {
00231 throw new IOException("Pipe broken");
00232 }
00233
00234
00235 Monitor.PulseAll(this);
00236 try
00237 {
00238
00239 Monitor.Wait(this, 1000);
00240 }
00241 catch (ThreadInterruptedException ex)
00242 {
00243 throw ex;
00244 }
00245 }
00246 }
00247
00252 [MethodImpl(MethodImplOptions.Synchronized)]
00253 internal void receivedLast()
00254 {
00255 closedByWriter = true;
00256
00257 Monitor.PulseAll(this);
00258 }
00259
00277 [MethodImpl(MethodImplOptions.Synchronized)]
00278 public virtual int read()
00279 {
00280 if (!connected)
00281 {
00282 throw new IOException("Pipe not connected");
00283 }
00284 else if (closedByReader)
00285 {
00286 throw new IOException("Pipe closed");
00287 }
00288 else if (writeSide != null && !writeSide.IsAlive
00289 && !closedByWriter && (m_in < 0))
00290 {
00291 throw new IOException("Write end dead");
00292 }
00293
00294 readSide = Thread.CurrentThread;
00295 int trials = 2;
00296 while (m_in < 0)
00297 {
00298 if (closedByWriter)
00299 {
00300
00301 return -1;
00302 }
00303 if ((writeSide != null) && (!writeSide.IsAlive) && (--trials < 0))
00304 {
00305 throw new IOException("Pipe broken");
00306 }
00307
00308 Monitor.PulseAll(this);
00309 try
00310 {
00311 Monitor.Wait(this, 1000);
00312 }
00313 catch (ThreadInterruptedException ex)
00314 {
00315 throw ex;
00316 }
00317 }
00318 int ret = buffer[m_out++] & 0xFF;
00319 if (m_out >= buffer.Length)
00320 {
00321 m_out = 0;
00322 }
00323 if (m_in == m_out)
00324 {
00325
00326 m_in = -1;
00327 }
00328 return ret;
00329 }
00330
00349 [MethodImpl(MethodImplOptions.Synchronized)]
00350 public override int read(byte[] b, int off, int len)
00351 {
00352 if (b == null)
00353 {
00354 throw new NullReferenceException();
00355 }
00356 else if ((off < 0) || (off > b.Length) || (len < 0) ||
00357 ((off + len) > b.Length) || ((off + len) < 0))
00358 {
00359 throw new IndexOutOfRangeException();
00360 }
00361 else if (len == 0)
00362 {
00363 return 0;
00364 }
00365
00366
00367 int c = read();
00368 if (c < 0)
00369 {
00370 return -1;
00371 }
00372 b[off] = (byte) c;
00373 int rlen = 1;
00374 while ((m_in >= 0) && (--len > 0))
00375 {
00376 b[off + rlen] = buffer[m_out++];
00377 rlen++;
00378 if (m_out >= buffer.Length)
00379 {
00380 m_out = 0;
00381 }
00382 if (m_in == m_out)
00383 {
00384
00385 m_in = -1;
00386 }
00387 }
00388 return rlen;
00389 }
00390
00401 [MethodImpl(MethodImplOptions.Synchronized)]
00402 public virtual int available()
00403 {
00404 if(m_in < 0)
00405 return 0;
00406 else if(m_in == m_out)
00407 return buffer.Length;
00408 else if (m_in > m_out)
00409 return m_in - m_out;
00410 else
00411 return m_in + buffer.Length - m_out;
00412 }
00413
00420 public override void close()
00421 {
00422 closedByReader = true;
00423 lock (this)
00424 {
00425 m_in = -1;
00426 }
00427 }
00428
00429 private void assert(bool exp)
00430 {
00431 if (!exp)
00432 throw new Exception("Assertion failed!");
00433 }
00434
00436
00437
00438 public override int Read(byte[] buffer, int offset, int count)
00439 {
00440 return this.read(buffer, offset, count);
00441 }
00442
00443 public override int ReadByte()
00444 {
00445 return this.read();
00446 }
00447
00448 public override void WriteByte(byte value)
00449 {
00450 }
00451
00452 public override void Write(byte[] buffer, int offset, int count)
00453 {
00454 }
00455 public override void Close()
00456 {
00457 base.Close ();
00458 this.close();
00459 }
00460 public override bool CanRead
00461 {
00462 get
00463 {
00464 return true;
00465 }
00466 }
00467 public override bool CanWrite
00468 {
00469 get
00470 {
00471 return false;
00472 }
00473 }
00474 public override bool CanSeek
00475 {
00476 get
00477 {
00478 return false;
00479 }
00480 }
00481 public override void Flush()
00482 {
00483
00484 }
00485 public override long Length
00486 {
00487 get
00488 {
00489 if(m_in > m_out)
00490 return (m_in - m_out);
00491 else
00492 {
00493 return (buffer.Length -m_out+m_in);
00494 }
00495 }
00496 }
00497 public override long Position
00498 {
00499 get
00500 {
00501 return m_out;
00502 }
00503 set
00504 {
00505 throw new IOException("Setting the position of this stream is not supported");
00506 }
00507 }
00508 public override void SetLength(long value)
00509 {
00510 throw new IOException("Setting the length of this stream is not supported");
00511 }
00512 public override long Seek(long offset, SeekOrigin origin)
00513 {
00514 return 0;
00515 }
00516 }
00517 }