1 /*
2  *  Copyright 2012 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/examples/peerconnection/client/peer_connection_client.h"
12 
13 #include "webrtc/examples/peerconnection/client/defaults.h"
14 #include "webrtc/base/common.h"
15 #include "webrtc/base/logging.h"
16 #include "webrtc/base/nethelpers.h"
17 #include "webrtc/base/stringutils.h"
18 
19 #ifdef WIN32
20 #include "webrtc/base/win32socketserver.h"
21 #endif
22 
23 using rtc::sprintfn;
24 
25 namespace {
26 
27 // This is our magical hangup signal.
28 const char kByeMessage[] = "BYE";
29 // Delay between server connection retries, in milliseconds
30 const int kReconnectDelay = 2000;
31 
CreateClientSocket(int family)32 rtc::AsyncSocket* CreateClientSocket(int family) {
33 #ifdef WIN32
34   rtc::Win32Socket* sock = new rtc::Win32Socket();
35   sock->CreateT(family, SOCK_STREAM);
36   return sock;
37 #elif defined(WEBRTC_POSIX)
38   rtc::Thread* thread = rtc::Thread::Current();
39   ASSERT(thread != NULL);
40   return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM);
41 #else
42 #error Platform not supported.
43 #endif
44 }
45 
46 }  // namespace
47 
PeerConnectionClient()48 PeerConnectionClient::PeerConnectionClient()
49   : callback_(NULL),
50     resolver_(NULL),
51     state_(NOT_CONNECTED),
52     my_id_(-1) {
53 }
54 
~PeerConnectionClient()55 PeerConnectionClient::~PeerConnectionClient() {
56 }
57 
InitSocketSignals()58 void PeerConnectionClient::InitSocketSignals() {
59   ASSERT(control_socket_.get() != NULL);
60   ASSERT(hanging_get_.get() != NULL);
61   control_socket_->SignalCloseEvent.connect(this,
62       &PeerConnectionClient::OnClose);
63   hanging_get_->SignalCloseEvent.connect(this,
64       &PeerConnectionClient::OnClose);
65   control_socket_->SignalConnectEvent.connect(this,
66       &PeerConnectionClient::OnConnect);
67   hanging_get_->SignalConnectEvent.connect(this,
68       &PeerConnectionClient::OnHangingGetConnect);
69   control_socket_->SignalReadEvent.connect(this,
70       &PeerConnectionClient::OnRead);
71   hanging_get_->SignalReadEvent.connect(this,
72       &PeerConnectionClient::OnHangingGetRead);
73 }
74 
id() const75 int PeerConnectionClient::id() const {
76   return my_id_;
77 }
78 
is_connected() const79 bool PeerConnectionClient::is_connected() const {
80   return my_id_ != -1;
81 }
82 
peers() const83 const Peers& PeerConnectionClient::peers() const {
84   return peers_;
85 }
86 
RegisterObserver(PeerConnectionClientObserver * callback)87 void PeerConnectionClient::RegisterObserver(
88     PeerConnectionClientObserver* callback) {
89   ASSERT(!callback_);
90   callback_ = callback;
91 }
92 
Connect(const std::string & server,int port,const std::string & client_name)93 void PeerConnectionClient::Connect(const std::string& server, int port,
94                                    const std::string& client_name) {
95   ASSERT(!server.empty());
96   ASSERT(!client_name.empty());
97 
98   if (state_ != NOT_CONNECTED) {
99     LOG(WARNING)
100         << "The client must not be connected before you can call Connect()";
101     callback_->OnServerConnectionFailure();
102     return;
103   }
104 
105   if (server.empty() || client_name.empty()) {
106     callback_->OnServerConnectionFailure();
107     return;
108   }
109 
110   if (port <= 0)
111     port = kDefaultServerPort;
112 
113   server_address_.SetIP(server);
114   server_address_.SetPort(port);
115   client_name_ = client_name;
116 
117   if (server_address_.IsUnresolvedIP()) {
118     state_ = RESOLVING;
119     resolver_ = new rtc::AsyncResolver();
120     resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult);
121     resolver_->Start(server_address_);
122   } else {
123     DoConnect();
124   }
125 }
126 
OnResolveResult(rtc::AsyncResolverInterface * resolver)127 void PeerConnectionClient::OnResolveResult(
128     rtc::AsyncResolverInterface* resolver) {
129   if (resolver_->GetError() != 0) {
130     callback_->OnServerConnectionFailure();
131     resolver_->Destroy(false);
132     resolver_ = NULL;
133     state_ = NOT_CONNECTED;
134   } else {
135     server_address_ = resolver_->address();
136     DoConnect();
137   }
138 }
139 
DoConnect()140 void PeerConnectionClient::DoConnect() {
141   control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
142   hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
143   InitSocketSignals();
144   char buffer[1024];
145   sprintfn(buffer, sizeof(buffer),
146            "GET /sign_in?%s HTTP/1.0\r\n\r\n", client_name_.c_str());
147   onconnect_data_ = buffer;
148 
149   bool ret = ConnectControlSocket();
150   if (ret)
151     state_ = SIGNING_IN;
152   if (!ret) {
153     callback_->OnServerConnectionFailure();
154   }
155 }
156 
SendToPeer(int peer_id,const std::string & message)157 bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
158   if (state_ != CONNECTED)
159     return false;
160 
161   ASSERT(is_connected());
162   ASSERT(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
163   if (!is_connected() || peer_id == -1)
164     return false;
165 
166   char headers[1024];
167   sprintfn(headers, sizeof(headers),
168       "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
169       "Content-Length: %i\r\n"
170       "Content-Type: text/plain\r\n"
171       "\r\n",
172       my_id_, peer_id, message.length());
173   onconnect_data_ = headers;
174   onconnect_data_ += message;
175   return ConnectControlSocket();
176 }
177 
SendHangUp(int peer_id)178 bool PeerConnectionClient::SendHangUp(int peer_id) {
179   return SendToPeer(peer_id, kByeMessage);
180 }
181 
IsSendingMessage()182 bool PeerConnectionClient::IsSendingMessage() {
183   return state_ == CONNECTED &&
184          control_socket_->GetState() != rtc::Socket::CS_CLOSED;
185 }
186 
SignOut()187 bool PeerConnectionClient::SignOut() {
188   if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
189     return true;
190 
191   if (hanging_get_->GetState() != rtc::Socket::CS_CLOSED)
192     hanging_get_->Close();
193 
194   if (control_socket_->GetState() == rtc::Socket::CS_CLOSED) {
195     state_ = SIGNING_OUT;
196 
197     if (my_id_ != -1) {
198       char buffer[1024];
199       sprintfn(buffer, sizeof(buffer),
200           "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
201       onconnect_data_ = buffer;
202       return ConnectControlSocket();
203     } else {
204       // Can occur if the app is closed before we finish connecting.
205       return true;
206     }
207   } else {
208     state_ = SIGNING_OUT_WAITING;
209   }
210 
211   return true;
212 }
213 
Close()214 void PeerConnectionClient::Close() {
215   control_socket_->Close();
216   hanging_get_->Close();
217   onconnect_data_.clear();
218   peers_.clear();
219   if (resolver_ != NULL) {
220     resolver_->Destroy(false);
221     resolver_ = NULL;
222   }
223   my_id_ = -1;
224   state_ = NOT_CONNECTED;
225 }
226 
ConnectControlSocket()227 bool PeerConnectionClient::ConnectControlSocket() {
228   ASSERT(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
229   int err = control_socket_->Connect(server_address_);
230   if (err == SOCKET_ERROR) {
231     Close();
232     return false;
233   }
234   return true;
235 }
236 
OnConnect(rtc::AsyncSocket * socket)237 void PeerConnectionClient::OnConnect(rtc::AsyncSocket* socket) {
238   ASSERT(!onconnect_data_.empty());
239   size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
240   ASSERT(sent == onconnect_data_.length());
241   RTC_UNUSED(sent);
242   onconnect_data_.clear();
243 }
244 
OnHangingGetConnect(rtc::AsyncSocket * socket)245 void PeerConnectionClient::OnHangingGetConnect(rtc::AsyncSocket* socket) {
246   char buffer[1024];
247   sprintfn(buffer, sizeof(buffer),
248            "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
249   int len = static_cast<int>(strlen(buffer));
250   int sent = socket->Send(buffer, len);
251   ASSERT(sent == len);
252   RTC_UNUSED2(sent, len);
253 }
254 
OnMessageFromPeer(int peer_id,const std::string & message)255 void PeerConnectionClient::OnMessageFromPeer(int peer_id,
256                                              const std::string& message) {
257   if (message.length() == (sizeof(kByeMessage) - 1) &&
258       message.compare(kByeMessage) == 0) {
259     callback_->OnPeerDisconnected(peer_id);
260   } else {
261     callback_->OnMessageFromPeer(peer_id, message);
262   }
263 }
264 
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,size_t * value)265 bool PeerConnectionClient::GetHeaderValue(const std::string& data,
266                                           size_t eoh,
267                                           const char* header_pattern,
268                                           size_t* value) {
269   ASSERT(value != NULL);
270   size_t found = data.find(header_pattern);
271   if (found != std::string::npos && found < eoh) {
272     *value = atoi(&data[found + strlen(header_pattern)]);
273     return true;
274   }
275   return false;
276 }
277 
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,std::string * value)278 bool PeerConnectionClient::GetHeaderValue(const std::string& data, size_t eoh,
279                                           const char* header_pattern,
280                                           std::string* value) {
281   ASSERT(value != NULL);
282   size_t found = data.find(header_pattern);
283   if (found != std::string::npos && found < eoh) {
284     size_t begin = found + strlen(header_pattern);
285     size_t end = data.find("\r\n", begin);
286     if (end == std::string::npos)
287       end = eoh;
288     value->assign(data.substr(begin, end - begin));
289     return true;
290   }
291   return false;
292 }
293 
ReadIntoBuffer(rtc::AsyncSocket * socket,std::string * data,size_t * content_length)294 bool PeerConnectionClient::ReadIntoBuffer(rtc::AsyncSocket* socket,
295                                           std::string* data,
296                                           size_t* content_length) {
297   char buffer[0xffff];
298   do {
299     int bytes = socket->Recv(buffer, sizeof(buffer));
300     if (bytes <= 0)
301       break;
302     data->append(buffer, bytes);
303   } while (true);
304 
305   bool ret = false;
306   size_t i = data->find("\r\n\r\n");
307   if (i != std::string::npos) {
308     LOG(INFO) << "Headers received";
309     if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
310       size_t total_response_size = (i + 4) + *content_length;
311       if (data->length() >= total_response_size) {
312         ret = true;
313         std::string should_close;
314         const char kConnection[] = "\r\nConnection: ";
315         if (GetHeaderValue(*data, i, kConnection, &should_close) &&
316             should_close.compare("close") == 0) {
317           socket->Close();
318           // Since we closed the socket, there was no notification delivered
319           // to us.  Compensate by letting ourselves know.
320           OnClose(socket, 0);
321         }
322       } else {
323         // We haven't received everything.  Just continue to accept data.
324       }
325     } else {
326       LOG(LS_ERROR) << "No content length field specified by the server.";
327     }
328   }
329   return ret;
330 }
331 
OnRead(rtc::AsyncSocket * socket)332 void PeerConnectionClient::OnRead(rtc::AsyncSocket* socket) {
333   size_t content_length = 0;
334   if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
335     size_t peer_id = 0, eoh = 0;
336     bool ok = ParseServerResponse(control_data_, content_length, &peer_id,
337                                   &eoh);
338     if (ok) {
339       if (my_id_ == -1) {
340         // First response.  Let's store our server assigned ID.
341         ASSERT(state_ == SIGNING_IN);
342         my_id_ = static_cast<int>(peer_id);
343         ASSERT(my_id_ != -1);
344 
345         // The body of the response will be a list of already connected peers.
346         if (content_length) {
347           size_t pos = eoh + 4;
348           while (pos < control_data_.size()) {
349             size_t eol = control_data_.find('\n', pos);
350             if (eol == std::string::npos)
351               break;
352             int id = 0;
353             std::string name;
354             bool connected;
355             if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
356                            &connected) && id != my_id_) {
357               peers_[id] = name;
358               callback_->OnPeerConnected(id, name);
359             }
360             pos = eol + 1;
361           }
362         }
363         ASSERT(is_connected());
364         callback_->OnSignedIn();
365       } else if (state_ == SIGNING_OUT) {
366         Close();
367         callback_->OnDisconnected();
368       } else if (state_ == SIGNING_OUT_WAITING) {
369         SignOut();
370       }
371     }
372 
373     control_data_.clear();
374 
375     if (state_ == SIGNING_IN) {
376       ASSERT(hanging_get_->GetState() == rtc::Socket::CS_CLOSED);
377       state_ = CONNECTED;
378       hanging_get_->Connect(server_address_);
379     }
380   }
381 }
382 
OnHangingGetRead(rtc::AsyncSocket * socket)383 void PeerConnectionClient::OnHangingGetRead(rtc::AsyncSocket* socket) {
384   LOG(INFO) << __FUNCTION__;
385   size_t content_length = 0;
386   if (ReadIntoBuffer(socket, &notification_data_, &content_length)) {
387     size_t peer_id = 0, eoh = 0;
388     bool ok = ParseServerResponse(notification_data_, content_length,
389                                   &peer_id, &eoh);
390 
391     if (ok) {
392       // Store the position where the body begins.
393       size_t pos = eoh + 4;
394 
395       if (my_id_ == static_cast<int>(peer_id)) {
396         // A notification about a new member or a member that just
397         // disconnected.
398         int id = 0;
399         std::string name;
400         bool connected = false;
401         if (ParseEntry(notification_data_.substr(pos), &name, &id,
402                        &connected)) {
403           if (connected) {
404             peers_[id] = name;
405             callback_->OnPeerConnected(id, name);
406           } else {
407             peers_.erase(id);
408             callback_->OnPeerDisconnected(id);
409           }
410         }
411       } else {
412         OnMessageFromPeer(static_cast<int>(peer_id),
413                           notification_data_.substr(pos));
414       }
415     }
416 
417     notification_data_.clear();
418   }
419 
420   if (hanging_get_->GetState() == rtc::Socket::CS_CLOSED &&
421       state_ == CONNECTED) {
422     hanging_get_->Connect(server_address_);
423   }
424 }
425 
ParseEntry(const std::string & entry,std::string * name,int * id,bool * connected)426 bool PeerConnectionClient::ParseEntry(const std::string& entry,
427                                       std::string* name,
428                                       int* id,
429                                       bool* connected) {
430   ASSERT(name != NULL);
431   ASSERT(id != NULL);
432   ASSERT(connected != NULL);
433   ASSERT(!entry.empty());
434 
435   *connected = false;
436   size_t separator = entry.find(',');
437   if (separator != std::string::npos) {
438     *id = atoi(&entry[separator + 1]);
439     name->assign(entry.substr(0, separator));
440     separator = entry.find(',', separator + 1);
441     if (separator != std::string::npos) {
442       *connected = atoi(&entry[separator + 1]) ? true : false;
443     }
444   }
445   return !name->empty();
446 }
447 
GetResponseStatus(const std::string & response)448 int PeerConnectionClient::GetResponseStatus(const std::string& response) {
449   int status = -1;
450   size_t pos = response.find(' ');
451   if (pos != std::string::npos)
452     status = atoi(&response[pos + 1]);
453   return status;
454 }
455 
ParseServerResponse(const std::string & response,size_t content_length,size_t * peer_id,size_t * eoh)456 bool PeerConnectionClient::ParseServerResponse(const std::string& response,
457                                                size_t content_length,
458                                                size_t* peer_id,
459                                                size_t* eoh) {
460   int status = GetResponseStatus(response.c_str());
461   if (status != 200) {
462     LOG(LS_ERROR) << "Received error from server";
463     Close();
464     callback_->OnDisconnected();
465     return false;
466   }
467 
468   *eoh = response.find("\r\n\r\n");
469   ASSERT(*eoh != std::string::npos);
470   if (*eoh == std::string::npos)
471     return false;
472 
473   *peer_id = -1;
474 
475   // See comment in peer_channel.cc for why we use the Pragma header and
476   // not e.g. "X-Peer-Id".
477   GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
478 
479   return true;
480 }
481 
OnClose(rtc::AsyncSocket * socket,int err)482 void PeerConnectionClient::OnClose(rtc::AsyncSocket* socket, int err) {
483   LOG(INFO) << __FUNCTION__;
484 
485   socket->Close();
486 
487 #ifdef WIN32
488   if (err != WSAECONNREFUSED) {
489 #else
490   if (err != ECONNREFUSED) {
491 #endif
492     if (socket == hanging_get_.get()) {
493       if (state_ == CONNECTED) {
494         hanging_get_->Close();
495         hanging_get_->Connect(server_address_);
496       }
497     } else {
498       callback_->OnMessageSent(err);
499     }
500   } else {
501     if (socket == control_socket_.get()) {
502       LOG(WARNING) << "Connection refused; retrying in 2 seconds";
503       rtc::Thread::Current()->PostDelayed(kReconnectDelay, this, 0);
504     } else {
505       Close();
506       callback_->OnDisconnected();
507     }
508   }
509 }
510 
511 void PeerConnectionClient::OnMessage(rtc::Message* msg) {
512   // ignore msg; there is currently only one supported message ("retry")
513   DoConnect();
514 }
515