1 // StreamBinder.cpp
2 
3 #include "StdAfx.h"
4 
5 #include "../../Common/MyCom.h"
6 
7 #include "StreamBinder.h"
8 
9 class CBinderInStream:
10   public ISequentialInStream,
11   public CMyUnknownImp
12 {
13   CStreamBinder *_binder;
14 public:
15   MY_UNKNOWN_IMP
16   STDMETHOD(Read)(void *data, UInt32 size, UInt32 *processedSize);
~CBinderInStream()17   ~CBinderInStream() { _binder->CloseRead(); }
CBinderInStream(CStreamBinder * binder)18   CBinderInStream(CStreamBinder *binder): _binder(binder) {}
19 };
20 
Read(void * data,UInt32 size,UInt32 * processedSize)21 STDMETHODIMP CBinderInStream::Read(void *data, UInt32 size, UInt32 *processedSize)
22   { return _binder->Read(data, size, processedSize); }
23 
24 class CBinderOutStream:
25   public ISequentialOutStream,
26   public CMyUnknownImp
27 {
28   CStreamBinder *_binder;
29 public:
30   MY_UNKNOWN_IMP
31   STDMETHOD(Write)(const void *data, UInt32 size, UInt32 *processedSize);
~CBinderOutStream()32   ~CBinderOutStream() { _binder->CloseWrite(); }
CBinderOutStream(CStreamBinder * binder)33   CBinderOutStream(CStreamBinder *binder): _binder(binder) {}
34 };
35 
Write(const void * data,UInt32 size,UInt32 * processedSize)36 STDMETHODIMP CBinderOutStream::Write(const void *data, UInt32 size, UInt32 *processedSize)
37   { return _binder->Write(data, size, processedSize); }
38 
39 
40 
CreateEvents()41 WRes CStreamBinder::CreateEvents()
42 {
43   RINOK(_canWrite_Event.Create(true));
44   RINOK(_canRead_Event.Create());
45   return _readingWasClosed_Event.Create();
46 }
47 
ReInit()48 void CStreamBinder::ReInit()
49 {
50   _waitWrite = true;
51   _canRead_Event.Reset();
52   _readingWasClosed_Event.Reset();
53   ProcessedSize = 0;
54 }
55 
56 
CreateStreams(ISequentialInStream ** inStream,ISequentialOutStream ** outStream)57 void CStreamBinder::CreateStreams(ISequentialInStream **inStream, ISequentialOutStream **outStream)
58 {
59   _waitWrite = true;
60   _bufSize = 0;
61   _buf = NULL;
62   ProcessedSize = 0;
63 
64   CBinderInStream *inStreamSpec = new CBinderInStream(this);
65   CMyComPtr<ISequentialInStream> inStreamLoc(inStreamSpec);
66   *inStream = inStreamLoc.Detach();
67 
68   CBinderOutStream *outStreamSpec = new CBinderOutStream(this);
69   CMyComPtr<ISequentialOutStream> outStreamLoc(outStreamSpec);
70   *outStream = outStreamLoc.Detach();
71 }
72 
73 // (_canRead_Event && _bufSize == 0) means that stream is finished.
74 
Read(void * data,UInt32 size,UInt32 * processedSize)75 HRESULT CStreamBinder::Read(void *data, UInt32 size, UInt32 *processedSize)
76 {
77   if (processedSize)
78     *processedSize = 0;
79   if (size != 0)
80   {
81     if (_waitWrite)
82     {
83       RINOK(_canRead_Event.Lock());
84       _waitWrite = false;
85     }
86     if (size > _bufSize)
87       size = _bufSize;
88     if (size != 0)
89     {
90       memcpy(data, _buf, size);
91       _buf = ((const Byte *)_buf) + size;
92       ProcessedSize += size;
93       if (processedSize)
94         *processedSize = size;
95       _bufSize -= size;
96       if (_bufSize == 0)
97       {
98         _waitWrite = true;
99         _canRead_Event.Reset();
100         _canWrite_Event.Set();
101       }
102     }
103   }
104   return S_OK;
105 }
106 
Write(const void * data,UInt32 size,UInt32 * processedSize)107 HRESULT CStreamBinder::Write(const void *data, UInt32 size, UInt32 *processedSize)
108 {
109   if (processedSize)
110     *processedSize = 0;
111   if (size != 0)
112   {
113     _buf = data;
114     _bufSize = size;
115     _canWrite_Event.Reset();
116     _canRead_Event.Set();
117 
118     HANDLE events[2] = { _canWrite_Event, _readingWasClosed_Event };
119     DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE);
120     if (waitResult != WAIT_OBJECT_0 + 0)
121       return S_FALSE;
122     if (processedSize)
123       *processedSize = size;
124   }
125   return S_OK;
126 }
127