1 /*
2  * Copyright 2012, The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "NetworkSession"
19 #include <utils/Log.h>
20 
21 #include "ANetworkSession.h"
22 #include "ParsedMessage.h"
23 
24 #include <arpa/inet.h>
25 #include <fcntl.h>
26 #include <linux/tcp.h>
27 #include <net/if.h>
28 #include <netdb.h>
29 #include <netinet/in.h>
30 #include <sys/ioctl.h>
31 #include <sys/socket.h>
32 
33 #include <media/stagefright/foundation/ABuffer.h>
34 #include <media/stagefright/foundation/ADebug.h>
35 #include <media/stagefright/foundation/AMessage.h>
36 #include <media/stagefright/foundation/hexdump.h>
37 
38 namespace android {
39 
U16_AT(const uint8_t * ptr)40 static uint16_t U16_AT(const uint8_t *ptr) {
41     return ptr[0] << 8 | ptr[1];
42 }
43 
U32_AT(const uint8_t * ptr)44 static uint32_t U32_AT(const uint8_t *ptr) {
45     return ptr[0] << 24 | ptr[1] << 16 | ptr[2] << 8 | ptr[3];
46 }
47 
U64_AT(const uint8_t * ptr)48 static uint64_t U64_AT(const uint8_t *ptr) {
49     return ((uint64_t)U32_AT(ptr)) << 32 | U32_AT(ptr + 4);
50 }
51 
52 static const size_t kMaxUDPSize = 1500;
53 static const int32_t kMaxUDPRetries = 200;
54 
55 struct ANetworkSession::NetworkThread : public Thread {
56     NetworkThread(ANetworkSession *session);
57 
58 protected:
59     virtual ~NetworkThread();
60 
61 private:
62     ANetworkSession *mSession;
63 
64     virtual bool threadLoop();
65 
66     DISALLOW_EVIL_CONSTRUCTORS(NetworkThread);
67 };
68 
69 struct ANetworkSession::Session : public RefBase {
70     enum Mode {
71         MODE_RTSP,
72         MODE_DATAGRAM,
73         MODE_WEBSOCKET,
74     };
75 
76     enum State {
77         CONNECTING,
78         CONNECTED,
79         LISTENING_RTSP,
80         LISTENING_TCP_DGRAMS,
81         DATAGRAM,
82     };
83 
84     Session(int32_t sessionID,
85             State state,
86             int s,
87             const sp<AMessage> &notify);
88 
89     int32_t sessionID() const;
90     int socket() const;
91     sp<AMessage> getNotificationMessage() const;
92 
93     bool isRTSPServer() const;
94     bool isTCPDatagramServer() const;
95 
96     bool wantsToRead();
97     bool wantsToWrite();
98 
99     status_t readMore();
100     status_t writeMore();
101 
102     status_t sendRequest(
103             const void *data, ssize_t size, bool timeValid, int64_t timeUs);
104 
105     void setMode(Mode mode);
106 
107     status_t switchToWebSocketMode();
108 
109 protected:
110     virtual ~Session();
111 
112 private:
113     enum {
114         FRAGMENT_FLAG_TIME_VALID = 1,
115     };
116     struct Fragment {
117         uint32_t mFlags;
118         int64_t mTimeUs;
119         sp<ABuffer> mBuffer;
120     };
121 
122     int32_t mSessionID;
123     State mState;
124     Mode mMode;
125     int mSocket;
126     sp<AMessage> mNotify;
127     bool mSawReceiveFailure, mSawSendFailure;
128     int32_t mUDPRetries;
129 
130     List<Fragment> mOutFragments;
131 
132     AString mInBuffer;
133 
134     int64_t mLastStallReportUs;
135 
136     void notifyError(bool send, status_t err, const char *detail);
137     void notify(NotificationReason reason);
138 
139     void dumpFragmentStats(const Fragment &frag);
140 
141     DISALLOW_EVIL_CONSTRUCTORS(Session);
142 };
143 ////////////////////////////////////////////////////////////////////////////////
144 
NetworkThread(ANetworkSession * session)145 ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session)
146     : mSession(session) {
147 }
148 
~NetworkThread()149 ANetworkSession::NetworkThread::~NetworkThread() {
150 }
151 
threadLoop()152 bool ANetworkSession::NetworkThread::threadLoop() {
153     mSession->threadLoop();
154 
155     return true;
156 }
157 
158 ////////////////////////////////////////////////////////////////////////////////
159 
Session(int32_t sessionID,State state,int s,const sp<AMessage> & notify)160 ANetworkSession::Session::Session(
161         int32_t sessionID,
162         State state,
163         int s,
164         const sp<AMessage> &notify)
165     : mSessionID(sessionID),
166       mState(state),
167       mMode(MODE_DATAGRAM),
168       mSocket(s),
169       mNotify(notify),
170       mSawReceiveFailure(false),
171       mSawSendFailure(false),
172       mUDPRetries(kMaxUDPRetries),
173       mLastStallReportUs(-1ll) {
174     if (mState == CONNECTED) {
175         struct sockaddr_in localAddr;
176         socklen_t localAddrLen = sizeof(localAddr);
177 
178         int res = getsockname(
179                 mSocket, (struct sockaddr *)&localAddr, &localAddrLen);
180         CHECK_GE(res, 0);
181 
182         struct sockaddr_in remoteAddr;
183         socklen_t remoteAddrLen = sizeof(remoteAddr);
184 
185         res = getpeername(
186                 mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
187         CHECK_GE(res, 0);
188 
189         in_addr_t addr = ntohl(localAddr.sin_addr.s_addr);
190         AString localAddrString = AStringPrintf(
191                 "%d.%d.%d.%d",
192                 (addr >> 24),
193                 (addr >> 16) & 0xff,
194                 (addr >> 8) & 0xff,
195                 addr & 0xff);
196 
197         addr = ntohl(remoteAddr.sin_addr.s_addr);
198         AString remoteAddrString = AStringPrintf(
199                 "%d.%d.%d.%d",
200                 (addr >> 24),
201                 (addr >> 16) & 0xff,
202                 (addr >> 8) & 0xff,
203                 addr & 0xff);
204 
205         sp<AMessage> msg = mNotify->dup();
206         msg->setInt32("sessionID", mSessionID);
207         msg->setInt32("reason", kWhatClientConnected);
208         msg->setString("server-ip", localAddrString.c_str());
209         msg->setInt32("server-port", ntohs(localAddr.sin_port));
210         msg->setString("client-ip", remoteAddrString.c_str());
211         msg->setInt32("client-port", ntohs(remoteAddr.sin_port));
212         msg->post();
213     }
214 }
215 
~Session()216 ANetworkSession::Session::~Session() {
217     ALOGV("Session %d gone", mSessionID);
218 
219     close(mSocket);
220     mSocket = -1;
221 }
222 
sessionID() const223 int32_t ANetworkSession::Session::sessionID() const {
224     return mSessionID;
225 }
226 
socket() const227 int ANetworkSession::Session::socket() const {
228     return mSocket;
229 }
230 
setMode(Mode mode)231 void ANetworkSession::Session::setMode(Mode mode) {
232     mMode = mode;
233 }
234 
switchToWebSocketMode()235 status_t ANetworkSession::Session::switchToWebSocketMode() {
236     if (mState != CONNECTED || mMode != MODE_RTSP) {
237         return INVALID_OPERATION;
238     }
239 
240     mMode = MODE_WEBSOCKET;
241 
242     return OK;
243 }
244 
getNotificationMessage() const245 sp<AMessage> ANetworkSession::Session::getNotificationMessage() const {
246     return mNotify;
247 }
248 
isRTSPServer() const249 bool ANetworkSession::Session::isRTSPServer() const {
250     return mState == LISTENING_RTSP;
251 }
252 
isTCPDatagramServer() const253 bool ANetworkSession::Session::isTCPDatagramServer() const {
254     return mState == LISTENING_TCP_DGRAMS;
255 }
256 
wantsToRead()257 bool ANetworkSession::Session::wantsToRead() {
258     return !mSawReceiveFailure && mState != CONNECTING;
259 }
260 
wantsToWrite()261 bool ANetworkSession::Session::wantsToWrite() {
262     return !mSawSendFailure
263         && (mState == CONNECTING
264             || (mState == CONNECTED && !mOutFragments.empty())
265             || (mState == DATAGRAM && !mOutFragments.empty()));
266 }
267 
readMore()268 status_t ANetworkSession::Session::readMore() {
269     if (mState == DATAGRAM) {
270         CHECK_EQ(mMode, MODE_DATAGRAM);
271 
272         status_t err;
273         do {
274             sp<ABuffer> buf = new ABuffer(kMaxUDPSize);
275 
276             struct sockaddr_in remoteAddr;
277             socklen_t remoteAddrLen = sizeof(remoteAddr);
278 
279             ssize_t n;
280             do {
281                 n = recvfrom(
282                         mSocket, buf->data(), buf->capacity(), 0,
283                         (struct sockaddr *)&remoteAddr, &remoteAddrLen);
284             } while (n < 0 && errno == EINTR);
285 
286             err = OK;
287             if (n < 0) {
288                 err = -errno;
289             } else if (n == 0) {
290                 err = -ECONNRESET;
291             } else {
292                 buf->setRange(0, n);
293 
294                 int64_t nowUs = ALooper::GetNowUs();
295                 buf->meta()->setInt64("arrivalTimeUs", nowUs);
296 
297                 sp<AMessage> notify = mNotify->dup();
298                 notify->setInt32("sessionID", mSessionID);
299                 notify->setInt32("reason", kWhatDatagram);
300 
301                 uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr);
302                 notify->setString(
303                         "fromAddr",
304                         AStringPrintf(
305                             "%u.%u.%u.%u",
306                             ip >> 24,
307                             (ip >> 16) & 0xff,
308                             (ip >> 8) & 0xff,
309                             ip & 0xff).c_str());
310 
311                 notify->setInt32("fromPort", ntohs(remoteAddr.sin_port));
312 
313                 notify->setBuffer("data", buf);
314                 notify->post();
315             }
316         } while (err == OK);
317 
318         if (err == -EAGAIN) {
319             err = OK;
320         }
321 
322         if (err != OK) {
323             if (!mUDPRetries) {
324                 notifyError(false /* send */, err, "Recvfrom failed.");
325                 mSawReceiveFailure = true;
326             } else {
327                 mUDPRetries--;
328                 ALOGE("Recvfrom failed, %d/%d retries left",
329                         mUDPRetries, kMaxUDPRetries);
330                 err = OK;
331             }
332         } else {
333             mUDPRetries = kMaxUDPRetries;
334         }
335 
336         return err;
337     }
338 
339     char tmp[512];
340     ssize_t n;
341     do {
342         n = recv(mSocket, tmp, sizeof(tmp), 0);
343     } while (n < 0 && errno == EINTR);
344 
345     status_t err = OK;
346 
347     if (n > 0) {
348         mInBuffer.append(tmp, n);
349 
350 #if 0
351         ALOGI("in:");
352         hexdump(tmp, n);
353 #endif
354     } else if (n < 0) {
355         err = -errno;
356     } else {
357         err = -ECONNRESET;
358     }
359 
360     if (mMode == MODE_DATAGRAM) {
361         // TCP stream carrying 16-bit length-prefixed datagrams.
362 
363         while (mInBuffer.size() >= 2) {
364             size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str());
365 
366             if (mInBuffer.size() < packetSize + 2) {
367                 break;
368             }
369 
370             sp<ABuffer> packet = new ABuffer(packetSize);
371             memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize);
372 
373             int64_t nowUs = ALooper::GetNowUs();
374             packet->meta()->setInt64("arrivalTimeUs", nowUs);
375 
376             sp<AMessage> notify = mNotify->dup();
377             notify->setInt32("sessionID", mSessionID);
378             notify->setInt32("reason", kWhatDatagram);
379             notify->setBuffer("data", packet);
380             notify->post();
381 
382             mInBuffer.erase(0, packetSize + 2);
383         }
384     } else if (mMode == MODE_RTSP) {
385         for (;;) {
386             size_t length;
387 
388             if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') {
389                 if (mInBuffer.size() < 4) {
390                     break;
391                 }
392 
393                 length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2);
394 
395                 if (mInBuffer.size() < 4 + length) {
396                     break;
397                 }
398 
399                 sp<AMessage> notify = mNotify->dup();
400                 notify->setInt32("sessionID", mSessionID);
401                 notify->setInt32("reason", kWhatBinaryData);
402                 notify->setInt32("channel", mInBuffer.c_str()[1]);
403 
404                 sp<ABuffer> data = new ABuffer(length);
405                 memcpy(data->data(), mInBuffer.c_str() + 4, length);
406 
407                 int64_t nowUs = ALooper::GetNowUs();
408                 data->meta()->setInt64("arrivalTimeUs", nowUs);
409 
410                 notify->setBuffer("data", data);
411                 notify->post();
412 
413                 mInBuffer.erase(0, 4 + length);
414                 continue;
415             }
416 
417             sp<ParsedMessage> msg =
418                 ParsedMessage::Parse(
419                         mInBuffer.c_str(), mInBuffer.size(), err != OK, &length);
420 
421             if (msg == NULL) {
422                 break;
423             }
424 
425             sp<AMessage> notify = mNotify->dup();
426             notify->setInt32("sessionID", mSessionID);
427             notify->setInt32("reason", kWhatData);
428             notify->setObject("data", msg);
429             notify->post();
430 
431 #if 1
432             // XXX The (old) dongle sends the wrong content length header on a
433             // SET_PARAMETER request that signals a "wfd_idr_request".
434             // (17 instead of 19).
435             const char *content = msg->getContent();
436             if (content
437                     && !memcmp(content, "wfd_idr_request\r\n", 17)
438                     && length >= 19
439                     && mInBuffer.c_str()[length] == '\r'
440                     && mInBuffer.c_str()[length + 1] == '\n') {
441                 length += 2;
442             }
443 #endif
444 
445             mInBuffer.erase(0, length);
446 
447             if (err != OK) {
448                 break;
449             }
450         }
451     } else {
452         CHECK_EQ(mMode, MODE_WEBSOCKET);
453 
454         const uint8_t *data = (const uint8_t *)mInBuffer.c_str();
455         // hexdump(data, mInBuffer.size());
456 
457         while (mInBuffer.size() >= 2) {
458             size_t offset = 2;
459 
460             uint64_t payloadLen = data[1] & 0x7f;
461             if (payloadLen == 126) {
462                 if (offset + 2 > mInBuffer.size()) {
463                     break;
464                 }
465 
466                 payloadLen = U16_AT(&data[offset]);
467                 offset += 2;
468             } else if (payloadLen == 127) {
469                 if (offset + 8 > mInBuffer.size()) {
470                     break;
471                 }
472 
473                 payloadLen = U64_AT(&data[offset]);
474                 offset += 8;
475             }
476 
477             uint32_t mask = 0;
478             if (data[1] & 0x80) {
479                 // MASK==1
480                 if (offset + 4 > mInBuffer.size()) {
481                     break;
482                 }
483 
484                 mask = U32_AT(&data[offset]);
485                 offset += 4;
486             }
487 
488             if (payloadLen > mInBuffer.size() || offset > mInBuffer.size() - payloadLen) {
489                 break;
490             }
491 
492             // We have the full message.
493 
494             sp<ABuffer> packet = new ABuffer(payloadLen);
495             memcpy(packet->data(), &data[offset], payloadLen);
496 
497             if (mask != 0) {
498                 for (size_t i = 0; i < payloadLen; ++i) {
499                     packet->data()[i] =
500                         data[offset + i]
501                             ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
502                 }
503             }
504 
505             sp<AMessage> notify = mNotify->dup();
506             notify->setInt32("sessionID", mSessionID);
507             notify->setInt32("reason", kWhatWebSocketMessage);
508             notify->setBuffer("data", packet);
509             notify->setInt32("headerByte", data[0]);
510             notify->post();
511 
512             mInBuffer.erase(0, offset + payloadLen);
513         }
514     }
515 
516     if (err != OK) {
517         notifyError(false /* send */, err, "Recv failed.");
518         mSawReceiveFailure = true;
519     }
520 
521     return err;
522 }
523 
dumpFragmentStats(const Fragment &)524 void ANetworkSession::Session::dumpFragmentStats(const Fragment & /* frag */) {
525 #if 0
526     int64_t nowUs = ALooper::GetNowUs();
527     int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll;
528 
529     static const int64_t kMinDelayMs = 0;
530     static const int64_t kMaxDelayMs = 300;
531 
532     const char *kPattern = "########################################";
533     size_t kPatternSize = strlen(kPattern);
534 
535     int n = (kPatternSize * (delayMs - kMinDelayMs))
536                 / (kMaxDelayMs - kMinDelayMs);
537 
538     if (n < 0) {
539         n = 0;
540     } else if ((size_t)n > kPatternSize) {
541         n = kPatternSize;
542     }
543 
544     ALOGI("[%lld]: (%4lld ms) %s\n",
545           frag.mTimeUs / 1000,
546           delayMs,
547           kPattern + kPatternSize - n);
548 #endif
549 }
550 
writeMore()551 status_t ANetworkSession::Session::writeMore() {
552     if (mState == DATAGRAM) {
553         CHECK(!mOutFragments.empty());
554 
555         status_t err;
556         do {
557             const Fragment &frag = *mOutFragments.begin();
558             const sp<ABuffer> &datagram = frag.mBuffer;
559 
560             int n;
561             do {
562                 n = send(mSocket, datagram->data(), datagram->size(), 0);
563             } while (n < 0 && errno == EINTR);
564 
565             err = OK;
566 
567             if (n > 0) {
568                 if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
569                     dumpFragmentStats(frag);
570                 }
571 
572                 mOutFragments.erase(mOutFragments.begin());
573             } else if (n < 0) {
574                 err = -errno;
575             } else if (n == 0) {
576                 err = -ECONNRESET;
577             }
578         } while (err == OK && !mOutFragments.empty());
579 
580         if (err == -EAGAIN) {
581             if (!mOutFragments.empty()) {
582                 ALOGI("%zu datagrams remain queued.", mOutFragments.size());
583             }
584             err = OK;
585         }
586 
587         if (err != OK) {
588             if (!mUDPRetries) {
589                 notifyError(true /* send */, err, "Send datagram failed.");
590                 mSawSendFailure = true;
591             } else {
592                 mUDPRetries--;
593                 ALOGE("Send datagram failed, %d/%d retries left",
594                         mUDPRetries, kMaxUDPRetries);
595                 err = OK;
596             }
597         } else {
598             mUDPRetries = kMaxUDPRetries;
599         }
600 
601         return err;
602     }
603 
604     if (mState == CONNECTING) {
605         int err;
606         socklen_t optionLen = sizeof(err);
607         CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);
608         CHECK_EQ(optionLen, (socklen_t)sizeof(err));
609 
610         if (err != 0) {
611             notifyError(kWhatError, -err, "Connection failed");
612             mSawSendFailure = true;
613 
614             return -err;
615         }
616 
617         mState = CONNECTED;
618         notify(kWhatConnected);
619 
620         return OK;
621     }
622 
623     CHECK_EQ(mState, CONNECTED);
624     CHECK(!mOutFragments.empty());
625 
626     ssize_t n = -1;
627     while (!mOutFragments.empty()) {
628         const Fragment &frag = *mOutFragments.begin();
629 
630         do {
631             n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0);
632         } while (n < 0 && errno == EINTR);
633 
634         if (n <= 0) {
635             break;
636         }
637 
638         frag.mBuffer->setRange(
639                 frag.mBuffer->offset() + n, frag.mBuffer->size() - n);
640 
641         if (frag.mBuffer->size() > 0) {
642             break;
643         }
644 
645         if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
646             dumpFragmentStats(frag);
647         }
648 
649         mOutFragments.erase(mOutFragments.begin());
650     }
651 
652     status_t err = OK;
653 
654     if (n < 0) {
655         err = -errno;
656     } else if (n == 0) {
657         err = -ECONNRESET;
658     }
659 
660     if (err != OK) {
661         notifyError(true /* send */, err, "Send failed.");
662         mSawSendFailure = true;
663     }
664 
665 #if 0
666     int numBytesQueued;
667     int res = ioctl(mSocket, SIOCOUTQ, &numBytesQueued);
668     if (res == 0 && numBytesQueued > 50 * 1024) {
669         if (numBytesQueued > 409600) {
670             ALOGW("!!! numBytesQueued = %d", numBytesQueued);
671         }
672 
673         int64_t nowUs = ALooper::GetNowUs();
674 
675         if (mLastStallReportUs < 0ll
676                 || nowUs > mLastStallReportUs + 100000ll) {
677             sp<AMessage> msg = mNotify->dup();
678             msg->setInt32("sessionID", mSessionID);
679             msg->setInt32("reason", kWhatNetworkStall);
680             msg->setSize("numBytesQueued", numBytesQueued);
681             msg->post();
682 
683             mLastStallReportUs = nowUs;
684         }
685     }
686 #endif
687 
688     return err;
689 }
690 
sendRequest(const void * data,ssize_t size,bool timeValid,int64_t timeUs)691 status_t ANetworkSession::Session::sendRequest(
692         const void *data, ssize_t size, bool timeValid, int64_t timeUs) {
693     CHECK(mState == CONNECTED || mState == DATAGRAM);
694 
695     if (size < 0) {
696         size = strlen((const char *)data);
697     }
698 
699     if (size == 0) {
700         return OK;
701     }
702 
703     sp<ABuffer> buffer;
704 
705     if (mState == CONNECTED && mMode == MODE_DATAGRAM) {
706         CHECK_LE(size, 65535);
707 
708         buffer = new ABuffer(size + 2);
709         buffer->data()[0] = size >> 8;
710         buffer->data()[1] = size & 0xff;
711         memcpy(buffer->data() + 2, data, size);
712     } else if (mState == CONNECTED && mMode == MODE_WEBSOCKET) {
713         static const bool kUseMask = false;  // Chromium doesn't like it.
714 
715         size_t numHeaderBytes = 2 + (kUseMask ? 4 : 0);
716         if (size > 65535) {
717             numHeaderBytes += 8;
718         } else if (size > 125) {
719             numHeaderBytes += 2;
720         }
721 
722         buffer = new ABuffer(numHeaderBytes + size);
723         buffer->data()[0] = 0x81;  // FIN==1 | opcode=1 (text)
724         buffer->data()[1] = kUseMask ? 0x80 : 0x00;
725 
726         if (size > 65535) {
727             buffer->data()[1] |= 127;
728             buffer->data()[2] = 0x00;
729             buffer->data()[3] = 0x00;
730             buffer->data()[4] = 0x00;
731             buffer->data()[5] = 0x00;
732             buffer->data()[6] = (size >> 24) & 0xff;
733             buffer->data()[7] = (size >> 16) & 0xff;
734             buffer->data()[8] = (size >> 8) & 0xff;
735             buffer->data()[9] = size & 0xff;
736         } else if (size > 125) {
737             buffer->data()[1] |= 126;
738             buffer->data()[2] = (size >> 8) & 0xff;
739             buffer->data()[3] = size & 0xff;
740         } else {
741             buffer->data()[1] |= size;
742         }
743 
744         if (kUseMask) {
745             uint32_t mask = rand();
746 
747             buffer->data()[numHeaderBytes - 4] = (mask >> 24) & 0xff;
748             buffer->data()[numHeaderBytes - 3] = (mask >> 16) & 0xff;
749             buffer->data()[numHeaderBytes - 2] = (mask >> 8) & 0xff;
750             buffer->data()[numHeaderBytes - 1] = mask & 0xff;
751 
752             for (size_t i = 0; i < (size_t)size; ++i) {
753                 buffer->data()[numHeaderBytes + i] =
754                     ((const uint8_t *)data)[i]
755                         ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
756             }
757         } else {
758             memcpy(buffer->data() + numHeaderBytes, data, size);
759         }
760     } else {
761         buffer = new ABuffer(size);
762         memcpy(buffer->data(), data, size);
763     }
764 
765     Fragment frag;
766 
767     frag.mFlags = 0;
768     if (timeValid) {
769         frag.mFlags = FRAGMENT_FLAG_TIME_VALID;
770         frag.mTimeUs = timeUs;
771     }
772 
773     frag.mBuffer = buffer;
774 
775     mOutFragments.push_back(frag);
776 
777     return OK;
778 }
779 
notifyError(bool send,status_t err,const char * detail)780 void ANetworkSession::Session::notifyError(
781         bool send, status_t err, const char *detail) {
782     sp<AMessage> msg = mNotify->dup();
783     msg->setInt32("sessionID", mSessionID);
784     msg->setInt32("reason", kWhatError);
785     msg->setInt32("send", send);
786     msg->setInt32("err", err);
787     msg->setString("detail", detail);
788     msg->post();
789 }
790 
notify(NotificationReason reason)791 void ANetworkSession::Session::notify(NotificationReason reason) {
792     sp<AMessage> msg = mNotify->dup();
793     msg->setInt32("sessionID", mSessionID);
794     msg->setInt32("reason", reason);
795     msg->post();
796 }
797 
798 ////////////////////////////////////////////////////////////////////////////////
799 
ANetworkSession()800 ANetworkSession::ANetworkSession()
801     : mNextSessionID(1) {
802     mPipeFd[0] = mPipeFd[1] = -1;
803 }
804 
~ANetworkSession()805 ANetworkSession::~ANetworkSession() {
806     stop();
807 }
808 
start()809 status_t ANetworkSession::start() {
810     if (mThread != NULL) {
811         return INVALID_OPERATION;
812     }
813 
814     int res = pipe(mPipeFd);
815     if (res != 0) {
816         mPipeFd[0] = mPipeFd[1] = -1;
817         return -errno;
818     }
819 
820     mThread = new NetworkThread(this);
821 
822     status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO);
823 
824     if (err != OK) {
825         mThread.clear();
826 
827         close(mPipeFd[0]);
828         close(mPipeFd[1]);
829         mPipeFd[0] = mPipeFd[1] = -1;
830 
831         return err;
832     }
833 
834     return OK;
835 }
836 
stop()837 status_t ANetworkSession::stop() {
838     if (mThread == NULL) {
839         return INVALID_OPERATION;
840     }
841 
842     mThread->requestExit();
843     interrupt();
844     mThread->requestExitAndWait();
845 
846     mThread.clear();
847 
848     close(mPipeFd[0]);
849     close(mPipeFd[1]);
850     mPipeFd[0] = mPipeFd[1] = -1;
851 
852     return OK;
853 }
854 
createRTSPClient(const char * host,unsigned port,const sp<AMessage> & notify,int32_t * sessionID)855 status_t ANetworkSession::createRTSPClient(
856         const char *host, unsigned port, const sp<AMessage> &notify,
857         int32_t *sessionID) {
858     return createClientOrServer(
859             kModeCreateRTSPClient,
860             NULL /* addr */,
861             0 /* port */,
862             host,
863             port,
864             notify,
865             sessionID);
866 }
867 
createRTSPServer(const struct in_addr & addr,unsigned port,const sp<AMessage> & notify,int32_t * sessionID)868 status_t ANetworkSession::createRTSPServer(
869         const struct in_addr &addr, unsigned port,
870         const sp<AMessage> &notify, int32_t *sessionID) {
871     return createClientOrServer(
872             kModeCreateRTSPServer,
873             &addr,
874             port,
875             NULL /* remoteHost */,
876             0 /* remotePort */,
877             notify,
878             sessionID);
879 }
880 
createUDPSession(unsigned localPort,const sp<AMessage> & notify,int32_t * sessionID)881 status_t ANetworkSession::createUDPSession(
882         unsigned localPort, const sp<AMessage> &notify, int32_t *sessionID) {
883     return createUDPSession(localPort, NULL, 0, notify, sessionID);
884 }
885 
createUDPSession(unsigned localPort,const char * remoteHost,unsigned remotePort,const sp<AMessage> & notify,int32_t * sessionID)886 status_t ANetworkSession::createUDPSession(
887         unsigned localPort,
888         const char *remoteHost,
889         unsigned remotePort,
890         const sp<AMessage> &notify,
891         int32_t *sessionID) {
892     return createClientOrServer(
893             kModeCreateUDPSession,
894             NULL /* addr */,
895             localPort,
896             remoteHost,
897             remotePort,
898             notify,
899             sessionID);
900 }
901 
createTCPDatagramSession(const struct in_addr & addr,unsigned port,const sp<AMessage> & notify,int32_t * sessionID)902 status_t ANetworkSession::createTCPDatagramSession(
903         const struct in_addr &addr, unsigned port,
904         const sp<AMessage> &notify, int32_t *sessionID) {
905     return createClientOrServer(
906             kModeCreateTCPDatagramSessionPassive,
907             &addr,
908             port,
909             NULL /* remoteHost */,
910             0 /* remotePort */,
911             notify,
912             sessionID);
913 }
914 
createTCPDatagramSession(unsigned localPort,const char * remoteHost,unsigned remotePort,const sp<AMessage> & notify,int32_t * sessionID)915 status_t ANetworkSession::createTCPDatagramSession(
916         unsigned localPort,
917         const char *remoteHost,
918         unsigned remotePort,
919         const sp<AMessage> &notify,
920         int32_t *sessionID) {
921     return createClientOrServer(
922             kModeCreateTCPDatagramSessionActive,
923             NULL /* addr */,
924             localPort,
925             remoteHost,
926             remotePort,
927             notify,
928             sessionID);
929 }
930 
destroySession(int32_t sessionID)931 status_t ANetworkSession::destroySession(int32_t sessionID) {
932     Mutex::Autolock autoLock(mLock);
933 
934     ssize_t index = mSessions.indexOfKey(sessionID);
935 
936     if (index < 0) {
937         return -ENOENT;
938     }
939 
940     mSessions.removeItemsAt(index);
941 
942     interrupt();
943 
944     return OK;
945 }
946 
947 // static
MakeSocketNonBlocking(int s)948 status_t ANetworkSession::MakeSocketNonBlocking(int s) {
949     int flags = fcntl(s, F_GETFL, 0);
950     if (flags < 0) {
951         flags = 0;
952     }
953 
954     int res = fcntl(s, F_SETFL, flags | O_NONBLOCK);
955     if (res < 0) {
956         return -errno;
957     }
958 
959     return OK;
960 }
961 
createClientOrServer(Mode mode,const struct in_addr * localAddr,unsigned port,const char * remoteHost,unsigned remotePort,const sp<AMessage> & notify,int32_t * sessionID)962 status_t ANetworkSession::createClientOrServer(
963         Mode mode,
964         const struct in_addr *localAddr,
965         unsigned port,
966         const char *remoteHost,
967         unsigned remotePort,
968         const sp<AMessage> &notify,
969         int32_t *sessionID) {
970     Mutex::Autolock autoLock(mLock);
971 
972     *sessionID = 0;
973     status_t err = OK;
974     int s, res;
975     sp<Session> session;
976 
977     s = socket(
978             AF_INET,
979             (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM,
980             0);
981 
982     if (s < 0) {
983         err = -errno;
984         goto bail;
985     }
986 
987     if (mode == kModeCreateRTSPServer
988             || mode == kModeCreateTCPDatagramSessionPassive) {
989         const int yes = 1;
990         res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
991 
992         if (res < 0) {
993             err = -errno;
994             goto bail2;
995         }
996     }
997 
998     if (mode == kModeCreateUDPSession) {
999         int size = 256 * 1024;
1000 
1001         res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
1002 
1003         if (res < 0) {
1004             err = -errno;
1005             goto bail2;
1006         }
1007 
1008         res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size));
1009 
1010         if (res < 0) {
1011             err = -errno;
1012             goto bail2;
1013         }
1014     } else if (mode == kModeCreateTCPDatagramSessionActive) {
1015         int flag = 1;
1016         res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
1017 
1018         if (res < 0) {
1019             err = -errno;
1020             goto bail2;
1021         }
1022 
1023         int tos = 224;  // VOICE
1024         res = setsockopt(s, IPPROTO_IP, IP_TOS, &tos, sizeof(tos));
1025 
1026         if (res < 0) {
1027             err = -errno;
1028             goto bail2;
1029         }
1030     }
1031 
1032     err = MakeSocketNonBlocking(s);
1033 
1034     if (err != OK) {
1035         goto bail2;
1036     }
1037 
1038     struct sockaddr_in addr;
1039     memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
1040     addr.sin_family = AF_INET;
1041 
1042     if (mode == kModeCreateRTSPClient
1043             || mode == kModeCreateTCPDatagramSessionActive) {
1044         struct hostent *ent= gethostbyname(remoteHost);
1045         if (ent == NULL) {
1046             err = -h_errno;
1047             goto bail2;
1048         }
1049 
1050         addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
1051         addr.sin_port = htons(remotePort);
1052     } else if (localAddr != NULL) {
1053         addr.sin_addr = *localAddr;
1054         addr.sin_port = htons(port);
1055     } else {
1056         addr.sin_addr.s_addr = htonl(INADDR_ANY);
1057         addr.sin_port = htons(port);
1058     }
1059 
1060     if (mode == kModeCreateRTSPClient
1061             || mode == kModeCreateTCPDatagramSessionActive) {
1062         in_addr_t x = ntohl(addr.sin_addr.s_addr);
1063         ALOGI("connecting socket %d to %d.%d.%d.%d:%d",
1064               s,
1065               (x >> 24),
1066               (x >> 16) & 0xff,
1067               (x >> 8) & 0xff,
1068               x & 0xff,
1069               ntohs(addr.sin_port));
1070 
1071         res = connect(s, (const struct sockaddr *)&addr, sizeof(addr));
1072 
1073         CHECK_LT(res, 0);
1074         if (errno == EINPROGRESS) {
1075             res = 0;
1076         }
1077     } else {
1078         res = bind(s, (const struct sockaddr *)&addr, sizeof(addr));
1079 
1080         if (res == 0) {
1081             if (mode == kModeCreateRTSPServer
1082                     || mode == kModeCreateTCPDatagramSessionPassive) {
1083                 res = listen(s, 4);
1084             } else {
1085                 CHECK_EQ(mode, kModeCreateUDPSession);
1086 
1087                 if (remoteHost != NULL) {
1088                     struct sockaddr_in remoteAddr;
1089                     memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
1090                     remoteAddr.sin_family = AF_INET;
1091                     remoteAddr.sin_port = htons(remotePort);
1092 
1093                     struct hostent *ent= gethostbyname(remoteHost);
1094                     if (ent == NULL) {
1095                         err = -h_errno;
1096                         goto bail2;
1097                     }
1098 
1099                     remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
1100 
1101                     res = connect(
1102                             s,
1103                             (const struct sockaddr *)&remoteAddr,
1104                             sizeof(remoteAddr));
1105                 }
1106             }
1107         }
1108     }
1109 
1110     if (res < 0) {
1111         err = -errno;
1112         goto bail2;
1113     }
1114 
1115     Session::State state;
1116     switch (mode) {
1117         case kModeCreateRTSPClient:
1118             state = Session::CONNECTING;
1119             break;
1120 
1121         case kModeCreateTCPDatagramSessionActive:
1122             state = Session::CONNECTING;
1123             break;
1124 
1125         case kModeCreateTCPDatagramSessionPassive:
1126             state = Session::LISTENING_TCP_DGRAMS;
1127             break;
1128 
1129         case kModeCreateRTSPServer:
1130             state = Session::LISTENING_RTSP;
1131             break;
1132 
1133         default:
1134             CHECK_EQ(mode, kModeCreateUDPSession);
1135             state = Session::DATAGRAM;
1136             break;
1137     }
1138 
1139     session = new Session(
1140             mNextSessionID++,
1141             state,
1142             s,
1143             notify);
1144 
1145     if (mode == kModeCreateTCPDatagramSessionActive) {
1146         session->setMode(Session::MODE_DATAGRAM);
1147     } else if (mode == kModeCreateRTSPClient) {
1148         session->setMode(Session::MODE_RTSP);
1149     }
1150 
1151     mSessions.add(session->sessionID(), session);
1152 
1153     interrupt();
1154 
1155     *sessionID = session->sessionID();
1156 
1157     goto bail;
1158 
1159 bail2:
1160     close(s);
1161     s = -1;
1162 
1163 bail:
1164     return err;
1165 }
1166 
connectUDPSession(int32_t sessionID,const char * remoteHost,unsigned remotePort)1167 status_t ANetworkSession::connectUDPSession(
1168         int32_t sessionID, const char *remoteHost, unsigned remotePort) {
1169     Mutex::Autolock autoLock(mLock);
1170 
1171     ssize_t index = mSessions.indexOfKey(sessionID);
1172 
1173     if (index < 0) {
1174         return -ENOENT;
1175     }
1176 
1177     const sp<Session> session = mSessions.valueAt(index);
1178     int s = session->socket();
1179 
1180     struct sockaddr_in remoteAddr;
1181     memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
1182     remoteAddr.sin_family = AF_INET;
1183     remoteAddr.sin_port = htons(remotePort);
1184 
1185     status_t err = OK;
1186     struct hostent *ent = gethostbyname(remoteHost);
1187     if (ent == NULL) {
1188         err = -h_errno;
1189     } else {
1190         remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
1191 
1192         int res = connect(
1193                 s,
1194                 (const struct sockaddr *)&remoteAddr,
1195                 sizeof(remoteAddr));
1196 
1197         if (res < 0) {
1198             err = -errno;
1199         }
1200     }
1201 
1202     return err;
1203 }
1204 
sendRequest(int32_t sessionID,const void * data,ssize_t size,bool timeValid,int64_t timeUs)1205 status_t ANetworkSession::sendRequest(
1206         int32_t sessionID, const void *data, ssize_t size,
1207         bool timeValid, int64_t timeUs) {
1208     Mutex::Autolock autoLock(mLock);
1209 
1210     ssize_t index = mSessions.indexOfKey(sessionID);
1211 
1212     if (index < 0) {
1213         return -ENOENT;
1214     }
1215 
1216     const sp<Session> session = mSessions.valueAt(index);
1217 
1218     status_t err = session->sendRequest(data, size, timeValid, timeUs);
1219 
1220     interrupt();
1221 
1222     return err;
1223 }
1224 
switchToWebSocketMode(int32_t sessionID)1225 status_t ANetworkSession::switchToWebSocketMode(int32_t sessionID) {
1226     Mutex::Autolock autoLock(mLock);
1227 
1228     ssize_t index = mSessions.indexOfKey(sessionID);
1229 
1230     if (index < 0) {
1231         return -ENOENT;
1232     }
1233 
1234     const sp<Session> session = mSessions.valueAt(index);
1235     return session->switchToWebSocketMode();
1236 }
1237 
interrupt()1238 void ANetworkSession::interrupt() {
1239     static const char dummy = 0;
1240 
1241     ssize_t n;
1242     do {
1243         n = write(mPipeFd[1], &dummy, 1);
1244     } while (n < 0 && errno == EINTR);
1245 
1246     if (n < 0) {
1247         ALOGW("Error writing to pipe (%s)", strerror(errno));
1248     }
1249 }
1250 
threadLoop()1251 void ANetworkSession::threadLoop() {
1252     fd_set rs, ws;
1253     FD_ZERO(&rs);
1254     FD_ZERO(&ws);
1255 
1256     FD_SET(mPipeFd[0], &rs);
1257     int maxFd = mPipeFd[0];
1258 
1259     {
1260         Mutex::Autolock autoLock(mLock);
1261 
1262         for (size_t i = 0; i < mSessions.size(); ++i) {
1263             const sp<Session> &session = mSessions.valueAt(i);
1264 
1265             int s = session->socket();
1266 
1267             if (s < 0) {
1268                 continue;
1269             }
1270 
1271             if (session->wantsToRead()) {
1272                 FD_SET(s, &rs);
1273                 if (s > maxFd) {
1274                     maxFd = s;
1275                 }
1276             }
1277 
1278             if (session->wantsToWrite()) {
1279                 FD_SET(s, &ws);
1280                 if (s > maxFd) {
1281                     maxFd = s;
1282                 }
1283             }
1284         }
1285     }
1286 
1287     int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */);
1288 
1289     if (res == 0) {
1290         return;
1291     }
1292 
1293     if (res < 0) {
1294         if (errno == EINTR) {
1295             return;
1296         }
1297 
1298         ALOGE("select failed w/ error %d (%s)", errno, strerror(errno));
1299         return;
1300     }
1301 
1302     if (FD_ISSET(mPipeFd[0], &rs)) {
1303         char c;
1304         ssize_t n;
1305         do {
1306             n = read(mPipeFd[0], &c, 1);
1307         } while (n < 0 && errno == EINTR);
1308 
1309         if (n < 0) {
1310             ALOGW("Error reading from pipe (%s)", strerror(errno));
1311         }
1312 
1313         --res;
1314     }
1315 
1316     {
1317         Mutex::Autolock autoLock(mLock);
1318 
1319         List<sp<Session> > sessionsToAdd;
1320 
1321         for (size_t i = mSessions.size(); res > 0 && i > 0;) {
1322             i--;
1323             const sp<Session> &session = mSessions.valueAt(i);
1324 
1325             int s = session->socket();
1326 
1327             if (s < 0) {
1328                 continue;
1329             }
1330 
1331             if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) {
1332                 --res;
1333             }
1334 
1335             if (FD_ISSET(s, &rs)) {
1336                 if (session->isRTSPServer() || session->isTCPDatagramServer()) {
1337                     struct sockaddr_in remoteAddr;
1338                     socklen_t remoteAddrLen = sizeof(remoteAddr);
1339 
1340                     int clientSocket = accept(
1341                             s, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
1342 
1343                     if (clientSocket >= 0) {
1344                         status_t err = MakeSocketNonBlocking(clientSocket);
1345 
1346                         if (err != OK) {
1347                             ALOGE("Unable to make client socket non blocking, "
1348                                   "failed w/ error %d (%s)",
1349                                   err, strerror(-err));
1350 
1351                             close(clientSocket);
1352                             clientSocket = -1;
1353                         } else {
1354                             in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr);
1355 
1356                             ALOGI("incoming connection from %d.%d.%d.%d:%d "
1357                                   "(socket %d)",
1358                                   (addr >> 24),
1359                                   (addr >> 16) & 0xff,
1360                                   (addr >> 8) & 0xff,
1361                                   addr & 0xff,
1362                                   ntohs(remoteAddr.sin_port),
1363                                   clientSocket);
1364 
1365                             sp<Session> clientSession =
1366                                 new Session(
1367                                         mNextSessionID++,
1368                                         Session::CONNECTED,
1369                                         clientSocket,
1370                                         session->getNotificationMessage());
1371 
1372                             clientSession->setMode(
1373                                     session->isRTSPServer()
1374                                         ? Session::MODE_RTSP
1375                                         : Session::MODE_DATAGRAM);
1376 
1377                             sessionsToAdd.push_back(clientSession);
1378                         }
1379                     } else {
1380                         ALOGE("accept returned error %d (%s)",
1381                               errno, strerror(errno));
1382                     }
1383                 } else {
1384                     status_t err = session->readMore();
1385                     if (err != OK) {
1386                         ALOGE("readMore on socket %d failed w/ error %d (%s)",
1387                               s, err, strerror(-err));
1388                     }
1389                 }
1390             }
1391 
1392             if (FD_ISSET(s, &ws)) {
1393                 status_t err = session->writeMore();
1394                 if (err != OK) {
1395                     ALOGE("writeMore on socket %d failed w/ error %d (%s)",
1396                           s, err, strerror(-err));
1397                 }
1398             }
1399         }
1400 
1401         while (!sessionsToAdd.empty()) {
1402             sp<Session> session = *sessionsToAdd.begin();
1403             sessionsToAdd.erase(sessionsToAdd.begin());
1404 
1405             mSessions.add(session->sessionID(), session);
1406 
1407             ALOGI("added clientSession %d", session->sessionID());
1408         }
1409     }
1410 }
1411 
1412 }  // namespace android
1413