1 /*
2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include <iomanip>
12 
13 #include "webrtc/base/asyncsocket.h"
14 #include "webrtc/base/logging.h"
15 #include "webrtc/base/socketfactory.h"
16 #include "webrtc/base/socketpool.h"
17 #include "webrtc/base/socketstream.h"
18 #include "webrtc/base/thread.h"
19 
20 namespace rtc {
21 
22 ///////////////////////////////////////////////////////////////////////////////
23 // StreamCache - Caches a set of open streams, defers creation to a separate
24 //  StreamPool.
25 ///////////////////////////////////////////////////////////////////////////////
26 
StreamCache(StreamPool * pool)27 StreamCache::StreamCache(StreamPool* pool) : pool_(pool) {
28 }
29 
~StreamCache()30 StreamCache::~StreamCache() {
31   for (ConnectedList::iterator it = active_.begin(); it != active_.end();
32        ++it) {
33     delete it->second;
34   }
35   for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
36        ++it) {
37     delete it->second;
38   }
39 }
40 
RequestConnectedStream(const SocketAddress & remote,int * err)41 StreamInterface* StreamCache::RequestConnectedStream(
42     const SocketAddress& remote, int* err) {
43   LOG_F(LS_VERBOSE) << "(" << remote << ")";
44   for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
45        ++it) {
46     if (remote == it->first) {
47       it->second->SignalEvent.disconnect(this);
48       // Move from cached_ to active_
49       active_.push_front(*it);
50       cached_.erase(it);
51       if (err)
52         *err = 0;
53       LOG_F(LS_VERBOSE) << "Providing cached stream";
54       return active_.front().second;
55     }
56   }
57   if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) {
58     // We track active streams so that we can remember their address
59     active_.push_front(ConnectedStream(remote, stream));
60     LOG_F(LS_VERBOSE) << "Providing new stream";
61     return active_.front().second;
62   }
63   return NULL;
64 }
65 
ReturnConnectedStream(StreamInterface * stream)66 void StreamCache::ReturnConnectedStream(StreamInterface* stream) {
67   for (ConnectedList::iterator it = active_.begin(); it != active_.end();
68        ++it) {
69     if (stream == it->second) {
70       LOG_F(LS_VERBOSE) << "(" << it->first << ")";
71       if (stream->GetState() == SS_CLOSED) {
72         // Return closed streams
73         LOG_F(LS_VERBOSE) << "Returning closed stream";
74         pool_->ReturnConnectedStream(it->second);
75       } else {
76         // Monitor open streams
77         stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent);
78         LOG_F(LS_VERBOSE) << "Caching stream";
79         cached_.push_front(*it);
80       }
81       active_.erase(it);
82       return;
83     }
84   }
85   ASSERT(false);
86 }
87 
OnStreamEvent(StreamInterface * stream,int events,int err)88 void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) {
89   if ((events & SE_CLOSE) == 0) {
90     LOG_F(LS_WARNING) << "(" << events << ", " << err
91                       << ") received non-close event";
92     return;
93   }
94   for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
95        ++it) {
96     if (stream == it->second) {
97       LOG_F(LS_VERBOSE) << "(" << it->first << ")";
98       // We don't cache closed streams, so return it.
99       it->second->SignalEvent.disconnect(this);
100       LOG_F(LS_VERBOSE) << "Returning closed stream";
101       pool_->ReturnConnectedStream(it->second);
102       cached_.erase(it);
103       return;
104     }
105   }
106   ASSERT(false);
107 }
108 
109 //////////////////////////////////////////////////////////////////////
110 // NewSocketPool
111 //////////////////////////////////////////////////////////////////////
112 
NewSocketPool(SocketFactory * factory)113 NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) {
114 }
115 
~NewSocketPool()116 NewSocketPool::~NewSocketPool() {
117 }
118 
119 StreamInterface*
RequestConnectedStream(const SocketAddress & remote,int * err)120 NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) {
121   AsyncSocket* socket =
122       factory_->CreateAsyncSocket(remote.family(), SOCK_STREAM);
123   if (!socket) {
124     if (err)
125       *err = -1;
126     return NULL;
127   }
128   if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) {
129     if (err)
130       *err = socket->GetError();
131     delete socket;
132     return NULL;
133   }
134   if (err)
135     *err = 0;
136   return new SocketStream(socket);
137 }
138 
139 void
ReturnConnectedStream(StreamInterface * stream)140 NewSocketPool::ReturnConnectedStream(StreamInterface* stream) {
141   Thread::Current()->Dispose(stream);
142 }
143 
144 //////////////////////////////////////////////////////////////////////
145 // ReuseSocketPool
146 //////////////////////////////////////////////////////////////////////
147 
ReuseSocketPool(SocketFactory * factory)148 ReuseSocketPool::ReuseSocketPool(SocketFactory* factory)
149 : factory_(factory), stream_(NULL), checked_out_(false) {
150 }
151 
~ReuseSocketPool()152 ReuseSocketPool::~ReuseSocketPool() {
153   ASSERT(!checked_out_);
154   delete stream_;
155 }
156 
157 StreamInterface*
RequestConnectedStream(const SocketAddress & remote,int * err)158 ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) {
159   // Only one socket can be used from this "pool" at a time
160   ASSERT(!checked_out_);
161   if (!stream_) {
162     LOG_F(LS_VERBOSE) << "Creating new socket";
163     int family = remote.family();
164     // TODO: Deal with this when we/I clean up DNS resolution.
165     if (remote.IsUnresolvedIP()) {
166       family = AF_INET;
167     }
168     AsyncSocket* socket =
169         factory_->CreateAsyncSocket(family, SOCK_STREAM);
170     if (!socket) {
171       if (err)
172         *err = -1;
173       return NULL;
174     }
175     stream_ = new SocketStream(socket);
176   }
177   if ((stream_->GetState() == SS_OPEN) && (remote == remote_)) {
178     LOG_F(LS_VERBOSE) << "Reusing connection to: " << remote_;
179   } else {
180     remote_ = remote;
181     stream_->Close();
182     if ((stream_->GetSocket()->Connect(remote_) != 0)
183         && !stream_->GetSocket()->IsBlocking()) {
184       if (err)
185         *err = stream_->GetSocket()->GetError();
186       return NULL;
187     } else {
188       LOG_F(LS_VERBOSE) << "Opening connection to: " << remote_;
189     }
190   }
191   stream_->SignalEvent.disconnect(this);
192   checked_out_ = true;
193   if (err)
194     *err = 0;
195   return stream_;
196 }
197 
198 void
ReturnConnectedStream(StreamInterface * stream)199 ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) {
200   ASSERT(stream == stream_);
201   ASSERT(checked_out_);
202   checked_out_ = false;
203   // Until the socket is reused, monitor it to determine if it closes.
204   stream_->SignalEvent.connect(this, &ReuseSocketPool::OnStreamEvent);
205 }
206 
207 void
OnStreamEvent(StreamInterface * stream,int events,int err)208 ReuseSocketPool::OnStreamEvent(StreamInterface* stream, int events, int err) {
209   ASSERT(stream == stream_);
210   ASSERT(!checked_out_);
211 
212   // If the stream was written to and then immediately returned to us then
213   // we may get a writable notification for it, which we should ignore.
214   if (events == SE_WRITE) {
215     LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly writable: ignoring";
216     return;
217   }
218 
219   // If the peer sent data, we can't process it, so drop the connection.
220   // If the socket has closed, clean it up.
221   // In either case, we'll reconnect it the next time it is used.
222   ASSERT(0 != (events & (SE_READ|SE_CLOSE)));
223   if (0 != (events & SE_CLOSE)) {
224     LOG_F(LS_VERBOSE) << "Connection closed with error: " << err;
225   } else {
226     LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly readable: closing";
227   }
228   stream_->Close();
229 }
230 
231 ///////////////////////////////////////////////////////////////////////////////
232 // LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached
233 // LoggingAdapters.
234 ///////////////////////////////////////////////////////////////////////////////
235 
LoggingPoolAdapter(StreamPool * pool,LoggingSeverity level,const std::string & label,bool binary_mode)236 LoggingPoolAdapter::LoggingPoolAdapter(
237     StreamPool* pool, LoggingSeverity level, const std::string& label,
238     bool binary_mode)
239   : pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) {
240 }
241 
~LoggingPoolAdapter()242 LoggingPoolAdapter::~LoggingPoolAdapter() {
243   for (StreamList::iterator it = recycle_bin_.begin();
244        it != recycle_bin_.end(); ++it) {
245     delete *it;
246   }
247 }
248 
RequestConnectedStream(const SocketAddress & remote,int * err)249 StreamInterface* LoggingPoolAdapter::RequestConnectedStream(
250     const SocketAddress& remote, int* err) {
251   if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) {
252     ASSERT(SS_CLOSED != stream->GetState());
253     std::stringstream ss;
254     ss << label_ << "(0x" << std::setfill('0') << std::hex << std::setw(8)
255        << stream << ")";
256     LOG_V(level_) << ss.str()
257                   << ((SS_OPEN == stream->GetState()) ? " Connected"
258                                                       : " Connecting")
259                   << " to " << remote;
260     if (recycle_bin_.empty()) {
261       return new LoggingAdapter(stream, level_, ss.str(), binary_mode_);
262     }
263     LoggingAdapter* logging = recycle_bin_.front();
264     recycle_bin_.pop_front();
265     logging->set_label(ss.str());
266     logging->Attach(stream);
267     return logging;
268   }
269   return NULL;
270 }
271 
ReturnConnectedStream(StreamInterface * stream)272 void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) {
273   LoggingAdapter* logging = static_cast<LoggingAdapter*>(stream);
274   pool_->ReturnConnectedStream(logging->Detach());
275   recycle_bin_.push_back(logging);
276 }
277 
278 ///////////////////////////////////////////////////////////////////////////////
279 
280 } // namespace rtc
281