1 // StreamBinder.cpp
2
3 #include "StdAfx.h"
4
5 #include "../../Common/MyCom.h"
6
7 #include "StreamBinder.h"
8
9 class CBinderInStream:
10 public ISequentialInStream,
11 public CMyUnknownImp
12 {
13 CStreamBinder *_binder;
14 public:
15 MY_UNKNOWN_IMP
16 STDMETHOD(Read)(void *data, UInt32 size, UInt32 *processedSize);
~CBinderInStream()17 ~CBinderInStream() { _binder->CloseRead(); }
CBinderInStream(CStreamBinder * binder)18 CBinderInStream(CStreamBinder *binder): _binder(binder) {}
19 };
20
Read(void * data,UInt32 size,UInt32 * processedSize)21 STDMETHODIMP CBinderInStream::Read(void *data, UInt32 size, UInt32 *processedSize)
22 { return _binder->Read(data, size, processedSize); }
23
24 class CBinderOutStream:
25 public ISequentialOutStream,
26 public CMyUnknownImp
27 {
28 CStreamBinder *_binder;
29 public:
30 MY_UNKNOWN_IMP
31 STDMETHOD(Write)(const void *data, UInt32 size, UInt32 *processedSize);
~CBinderOutStream()32 ~CBinderOutStream() { _binder->CloseWrite(); }
CBinderOutStream(CStreamBinder * binder)33 CBinderOutStream(CStreamBinder *binder): _binder(binder) {}
34 };
35
Write(const void * data,UInt32 size,UInt32 * processedSize)36 STDMETHODIMP CBinderOutStream::Write(const void *data, UInt32 size, UInt32 *processedSize)
37 { return _binder->Write(data, size, processedSize); }
38
39
40
CreateEvents()41 WRes CStreamBinder::CreateEvents()
42 {
43 RINOK(_canWrite_Event.Create(true));
44 RINOK(_canRead_Event.Create());
45 return _readingWasClosed_Event.Create();
46 }
47
ReInit()48 void CStreamBinder::ReInit()
49 {
50 _waitWrite = true;
51 _canRead_Event.Reset();
52 _readingWasClosed_Event.Reset();
53 ProcessedSize = 0;
54 }
55
56
CreateStreams(ISequentialInStream ** inStream,ISequentialOutStream ** outStream)57 void CStreamBinder::CreateStreams(ISequentialInStream **inStream, ISequentialOutStream **outStream)
58 {
59 _waitWrite = true;
60 _bufSize = 0;
61 _buf = NULL;
62 ProcessedSize = 0;
63
64 CBinderInStream *inStreamSpec = new CBinderInStream(this);
65 CMyComPtr<ISequentialInStream> inStreamLoc(inStreamSpec);
66 *inStream = inStreamLoc.Detach();
67
68 CBinderOutStream *outStreamSpec = new CBinderOutStream(this);
69 CMyComPtr<ISequentialOutStream> outStreamLoc(outStreamSpec);
70 *outStream = outStreamLoc.Detach();
71 }
72
73 // (_canRead_Event && _bufSize == 0) means that stream is finished.
74
Read(void * data,UInt32 size,UInt32 * processedSize)75 HRESULT CStreamBinder::Read(void *data, UInt32 size, UInt32 *processedSize)
76 {
77 if (processedSize)
78 *processedSize = 0;
79 if (size != 0)
80 {
81 if (_waitWrite)
82 {
83 RINOK(_canRead_Event.Lock());
84 _waitWrite = false;
85 }
86 if (size > _bufSize)
87 size = _bufSize;
88 if (size != 0)
89 {
90 memcpy(data, _buf, size);
91 _buf = ((const Byte *)_buf) + size;
92 ProcessedSize += size;
93 if (processedSize)
94 *processedSize = size;
95 _bufSize -= size;
96 if (_bufSize == 0)
97 {
98 _waitWrite = true;
99 _canRead_Event.Reset();
100 _canWrite_Event.Set();
101 }
102 }
103 }
104 return S_OK;
105 }
106
Write(const void * data,UInt32 size,UInt32 * processedSize)107 HRESULT CStreamBinder::Write(const void *data, UInt32 size, UInt32 *processedSize)
108 {
109 if (processedSize)
110 *processedSize = 0;
111 if (size != 0)
112 {
113 _buf = data;
114 _bufSize = size;
115 _canWrite_Event.Reset();
116 _canRead_Event.Set();
117
118 HANDLE events[2] = { _canWrite_Event, _readingWasClosed_Event };
119 DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE);
120 if (waitResult != WAIT_OBJECT_0 + 0)
121 return S_FALSE;
122 if (processedSize)
123 *processedSize = size;
124 }
125 return S_OK;
126 }
127