1 /*
2 * Copyright 2012, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "NetworkSession"
19 #include <utils/Log.h>
20
21 #include "ANetworkSession.h"
22 #include "ParsedMessage.h"
23
24 #include <arpa/inet.h>
25 #include <fcntl.h>
26 #include <linux/tcp.h>
27 #include <net/if.h>
28 #include <netdb.h>
29 #include <netinet/in.h>
30 #include <sys/ioctl.h>
31 #include <sys/socket.h>
32
33 #include <media/stagefright/foundation/ABuffer.h>
34 #include <media/stagefright/foundation/ADebug.h>
35 #include <media/stagefright/foundation/AMessage.h>
36 #include <media/stagefright/foundation/hexdump.h>
37
38 namespace android {
39
U16_AT(const uint8_t * ptr)40 static uint16_t U16_AT(const uint8_t *ptr) {
41 return ptr[0] << 8 | ptr[1];
42 }
43
U32_AT(const uint8_t * ptr)44 static uint32_t U32_AT(const uint8_t *ptr) {
45 return ptr[0] << 24 | ptr[1] << 16 | ptr[2] << 8 | ptr[3];
46 }
47
U64_AT(const uint8_t * ptr)48 static uint64_t U64_AT(const uint8_t *ptr) {
49 return ((uint64_t)U32_AT(ptr)) << 32 | U32_AT(ptr + 4);
50 }
51
52 static const size_t kMaxUDPSize = 1500;
53 static const int32_t kMaxUDPRetries = 200;
54
55 struct ANetworkSession::NetworkThread : public Thread {
56 NetworkThread(ANetworkSession *session);
57
58 protected:
59 virtual ~NetworkThread();
60
61 private:
62 ANetworkSession *mSession;
63
64 virtual bool threadLoop();
65
66 DISALLOW_EVIL_CONSTRUCTORS(NetworkThread);
67 };
68
69 struct ANetworkSession::Session : public RefBase {
70 enum Mode {
71 MODE_RTSP,
72 MODE_DATAGRAM,
73 MODE_WEBSOCKET,
74 };
75
76 enum State {
77 CONNECTING,
78 CONNECTED,
79 LISTENING_RTSP,
80 LISTENING_TCP_DGRAMS,
81 DATAGRAM,
82 };
83
84 Session(int32_t sessionID,
85 State state,
86 int s,
87 const sp<AMessage> ¬ify);
88
89 int32_t sessionID() const;
90 int socket() const;
91 sp<AMessage> getNotificationMessage() const;
92
93 bool isRTSPServer() const;
94 bool isTCPDatagramServer() const;
95
96 bool wantsToRead();
97 bool wantsToWrite();
98
99 status_t readMore();
100 status_t writeMore();
101
102 status_t sendRequest(
103 const void *data, ssize_t size, bool timeValid, int64_t timeUs);
104
105 void setMode(Mode mode);
106
107 status_t switchToWebSocketMode();
108
109 protected:
110 virtual ~Session();
111
112 private:
113 enum {
114 FRAGMENT_FLAG_TIME_VALID = 1,
115 };
116 struct Fragment {
117 uint32_t mFlags;
118 int64_t mTimeUs;
119 sp<ABuffer> mBuffer;
120 };
121
122 int32_t mSessionID;
123 State mState;
124 Mode mMode;
125 int mSocket;
126 sp<AMessage> mNotify;
127 bool mSawReceiveFailure, mSawSendFailure;
128 int32_t mUDPRetries;
129
130 List<Fragment> mOutFragments;
131
132 AString mInBuffer;
133
134 int64_t mLastStallReportUs;
135
136 void notifyError(bool send, status_t err, const char *detail);
137 void notify(NotificationReason reason);
138
139 void dumpFragmentStats(const Fragment &frag);
140
141 DISALLOW_EVIL_CONSTRUCTORS(Session);
142 };
143 ////////////////////////////////////////////////////////////////////////////////
144
NetworkThread(ANetworkSession * session)145 ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session)
146 : mSession(session) {
147 }
148
~NetworkThread()149 ANetworkSession::NetworkThread::~NetworkThread() {
150 }
151
threadLoop()152 bool ANetworkSession::NetworkThread::threadLoop() {
153 mSession->threadLoop();
154
155 return true;
156 }
157
158 ////////////////////////////////////////////////////////////////////////////////
159
Session(int32_t sessionID,State state,int s,const sp<AMessage> & notify)160 ANetworkSession::Session::Session(
161 int32_t sessionID,
162 State state,
163 int s,
164 const sp<AMessage> ¬ify)
165 : mSessionID(sessionID),
166 mState(state),
167 mMode(MODE_DATAGRAM),
168 mSocket(s),
169 mNotify(notify),
170 mSawReceiveFailure(false),
171 mSawSendFailure(false),
172 mUDPRetries(kMaxUDPRetries),
173 mLastStallReportUs(-1ll) {
174 if (mState == CONNECTED) {
175 struct sockaddr_in localAddr;
176 socklen_t localAddrLen = sizeof(localAddr);
177
178 int res = getsockname(
179 mSocket, (struct sockaddr *)&localAddr, &localAddrLen);
180 CHECK_GE(res, 0);
181
182 struct sockaddr_in remoteAddr;
183 socklen_t remoteAddrLen = sizeof(remoteAddr);
184
185 res = getpeername(
186 mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
187 CHECK_GE(res, 0);
188
189 in_addr_t addr = ntohl(localAddr.sin_addr.s_addr);
190 AString localAddrString = AStringPrintf(
191 "%d.%d.%d.%d",
192 (addr >> 24),
193 (addr >> 16) & 0xff,
194 (addr >> 8) & 0xff,
195 addr & 0xff);
196
197 addr = ntohl(remoteAddr.sin_addr.s_addr);
198 AString remoteAddrString = AStringPrintf(
199 "%d.%d.%d.%d",
200 (addr >> 24),
201 (addr >> 16) & 0xff,
202 (addr >> 8) & 0xff,
203 addr & 0xff);
204
205 sp<AMessage> msg = mNotify->dup();
206 msg->setInt32("sessionID", mSessionID);
207 msg->setInt32("reason", kWhatClientConnected);
208 msg->setString("server-ip", localAddrString.c_str());
209 msg->setInt32("server-port", ntohs(localAddr.sin_port));
210 msg->setString("client-ip", remoteAddrString.c_str());
211 msg->setInt32("client-port", ntohs(remoteAddr.sin_port));
212 msg->post();
213 }
214 }
215
~Session()216 ANetworkSession::Session::~Session() {
217 ALOGV("Session %d gone", mSessionID);
218
219 close(mSocket);
220 mSocket = -1;
221 }
222
sessionID() const223 int32_t ANetworkSession::Session::sessionID() const {
224 return mSessionID;
225 }
226
socket() const227 int ANetworkSession::Session::socket() const {
228 return mSocket;
229 }
230
setMode(Mode mode)231 void ANetworkSession::Session::setMode(Mode mode) {
232 mMode = mode;
233 }
234
switchToWebSocketMode()235 status_t ANetworkSession::Session::switchToWebSocketMode() {
236 if (mState != CONNECTED || mMode != MODE_RTSP) {
237 return INVALID_OPERATION;
238 }
239
240 mMode = MODE_WEBSOCKET;
241
242 return OK;
243 }
244
getNotificationMessage() const245 sp<AMessage> ANetworkSession::Session::getNotificationMessage() const {
246 return mNotify;
247 }
248
isRTSPServer() const249 bool ANetworkSession::Session::isRTSPServer() const {
250 return mState == LISTENING_RTSP;
251 }
252
isTCPDatagramServer() const253 bool ANetworkSession::Session::isTCPDatagramServer() const {
254 return mState == LISTENING_TCP_DGRAMS;
255 }
256
wantsToRead()257 bool ANetworkSession::Session::wantsToRead() {
258 return !mSawReceiveFailure && mState != CONNECTING;
259 }
260
wantsToWrite()261 bool ANetworkSession::Session::wantsToWrite() {
262 return !mSawSendFailure
263 && (mState == CONNECTING
264 || (mState == CONNECTED && !mOutFragments.empty())
265 || (mState == DATAGRAM && !mOutFragments.empty()));
266 }
267
readMore()268 status_t ANetworkSession::Session::readMore() {
269 if (mState == DATAGRAM) {
270 CHECK_EQ(mMode, MODE_DATAGRAM);
271
272 status_t err;
273 do {
274 sp<ABuffer> buf = new ABuffer(kMaxUDPSize);
275
276 struct sockaddr_in remoteAddr;
277 socklen_t remoteAddrLen = sizeof(remoteAddr);
278
279 ssize_t n;
280 do {
281 n = recvfrom(
282 mSocket, buf->data(), buf->capacity(), 0,
283 (struct sockaddr *)&remoteAddr, &remoteAddrLen);
284 } while (n < 0 && errno == EINTR);
285
286 err = OK;
287 if (n < 0) {
288 err = -errno;
289 } else if (n == 0) {
290 err = -ECONNRESET;
291 } else {
292 buf->setRange(0, n);
293
294 int64_t nowUs = ALooper::GetNowUs();
295 buf->meta()->setInt64("arrivalTimeUs", nowUs);
296
297 sp<AMessage> notify = mNotify->dup();
298 notify->setInt32("sessionID", mSessionID);
299 notify->setInt32("reason", kWhatDatagram);
300
301 uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr);
302 notify->setString(
303 "fromAddr",
304 AStringPrintf(
305 "%u.%u.%u.%u",
306 ip >> 24,
307 (ip >> 16) & 0xff,
308 (ip >> 8) & 0xff,
309 ip & 0xff).c_str());
310
311 notify->setInt32("fromPort", ntohs(remoteAddr.sin_port));
312
313 notify->setBuffer("data", buf);
314 notify->post();
315 }
316 } while (err == OK);
317
318 if (err == -EAGAIN) {
319 err = OK;
320 }
321
322 if (err != OK) {
323 if (!mUDPRetries) {
324 notifyError(false /* send */, err, "Recvfrom failed.");
325 mSawReceiveFailure = true;
326 } else {
327 mUDPRetries--;
328 ALOGE("Recvfrom failed, %d/%d retries left",
329 mUDPRetries, kMaxUDPRetries);
330 err = OK;
331 }
332 } else {
333 mUDPRetries = kMaxUDPRetries;
334 }
335
336 return err;
337 }
338
339 char tmp[512];
340 ssize_t n;
341 do {
342 n = recv(mSocket, tmp, sizeof(tmp), 0);
343 } while (n < 0 && errno == EINTR);
344
345 status_t err = OK;
346
347 if (n > 0) {
348 mInBuffer.append(tmp, n);
349
350 #if 0
351 ALOGI("in:");
352 hexdump(tmp, n);
353 #endif
354 } else if (n < 0) {
355 err = -errno;
356 } else {
357 err = -ECONNRESET;
358 }
359
360 if (mMode == MODE_DATAGRAM) {
361 // TCP stream carrying 16-bit length-prefixed datagrams.
362
363 while (mInBuffer.size() >= 2) {
364 size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str());
365
366 if (mInBuffer.size() < packetSize + 2) {
367 break;
368 }
369
370 sp<ABuffer> packet = new ABuffer(packetSize);
371 memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize);
372
373 int64_t nowUs = ALooper::GetNowUs();
374 packet->meta()->setInt64("arrivalTimeUs", nowUs);
375
376 sp<AMessage> notify = mNotify->dup();
377 notify->setInt32("sessionID", mSessionID);
378 notify->setInt32("reason", kWhatDatagram);
379 notify->setBuffer("data", packet);
380 notify->post();
381
382 mInBuffer.erase(0, packetSize + 2);
383 }
384 } else if (mMode == MODE_RTSP) {
385 for (;;) {
386 size_t length;
387
388 if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') {
389 if (mInBuffer.size() < 4) {
390 break;
391 }
392
393 length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2);
394
395 if (mInBuffer.size() < 4 + length) {
396 break;
397 }
398
399 sp<AMessage> notify = mNotify->dup();
400 notify->setInt32("sessionID", mSessionID);
401 notify->setInt32("reason", kWhatBinaryData);
402 notify->setInt32("channel", mInBuffer.c_str()[1]);
403
404 sp<ABuffer> data = new ABuffer(length);
405 memcpy(data->data(), mInBuffer.c_str() + 4, length);
406
407 int64_t nowUs = ALooper::GetNowUs();
408 data->meta()->setInt64("arrivalTimeUs", nowUs);
409
410 notify->setBuffer("data", data);
411 notify->post();
412
413 mInBuffer.erase(0, 4 + length);
414 continue;
415 }
416
417 sp<ParsedMessage> msg =
418 ParsedMessage::Parse(
419 mInBuffer.c_str(), mInBuffer.size(), err != OK, &length);
420
421 if (msg == NULL) {
422 break;
423 }
424
425 sp<AMessage> notify = mNotify->dup();
426 notify->setInt32("sessionID", mSessionID);
427 notify->setInt32("reason", kWhatData);
428 notify->setObject("data", msg);
429 notify->post();
430
431 #if 1
432 // XXX The (old) dongle sends the wrong content length header on a
433 // SET_PARAMETER request that signals a "wfd_idr_request".
434 // (17 instead of 19).
435 const char *content = msg->getContent();
436 if (content
437 && !memcmp(content, "wfd_idr_request\r\n", 17)
438 && length >= 19
439 && mInBuffer.c_str()[length] == '\r'
440 && mInBuffer.c_str()[length + 1] == '\n') {
441 length += 2;
442 }
443 #endif
444
445 mInBuffer.erase(0, length);
446
447 if (err != OK) {
448 break;
449 }
450 }
451 } else {
452 CHECK_EQ(mMode, MODE_WEBSOCKET);
453
454 const uint8_t *data = (const uint8_t *)mInBuffer.c_str();
455 // hexdump(data, mInBuffer.size());
456
457 while (mInBuffer.size() >= 2) {
458 size_t offset = 2;
459
460 uint64_t payloadLen = data[1] & 0x7f;
461 if (payloadLen == 126) {
462 if (offset + 2 > mInBuffer.size()) {
463 break;
464 }
465
466 payloadLen = U16_AT(&data[offset]);
467 offset += 2;
468 } else if (payloadLen == 127) {
469 if (offset + 8 > mInBuffer.size()) {
470 break;
471 }
472
473 payloadLen = U64_AT(&data[offset]);
474 offset += 8;
475 }
476
477 uint32_t mask = 0;
478 if (data[1] & 0x80) {
479 // MASK==1
480 if (offset + 4 > mInBuffer.size()) {
481 break;
482 }
483
484 mask = U32_AT(&data[offset]);
485 offset += 4;
486 }
487
488 if (payloadLen > mInBuffer.size() || offset > mInBuffer.size() - payloadLen) {
489 break;
490 }
491
492 // We have the full message.
493
494 sp<ABuffer> packet = new ABuffer(payloadLen);
495 memcpy(packet->data(), &data[offset], payloadLen);
496
497 if (mask != 0) {
498 for (size_t i = 0; i < payloadLen; ++i) {
499 packet->data()[i] =
500 data[offset + i]
501 ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
502 }
503 }
504
505 sp<AMessage> notify = mNotify->dup();
506 notify->setInt32("sessionID", mSessionID);
507 notify->setInt32("reason", kWhatWebSocketMessage);
508 notify->setBuffer("data", packet);
509 notify->setInt32("headerByte", data[0]);
510 notify->post();
511
512 mInBuffer.erase(0, offset + payloadLen);
513 }
514 }
515
516 if (err != OK) {
517 notifyError(false /* send */, err, "Recv failed.");
518 mSawReceiveFailure = true;
519 }
520
521 return err;
522 }
523
dumpFragmentStats(const Fragment &)524 void ANetworkSession::Session::dumpFragmentStats(const Fragment & /* frag */) {
525 #if 0
526 int64_t nowUs = ALooper::GetNowUs();
527 int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll;
528
529 static const int64_t kMinDelayMs = 0;
530 static const int64_t kMaxDelayMs = 300;
531
532 const char *kPattern = "########################################";
533 size_t kPatternSize = strlen(kPattern);
534
535 int n = (kPatternSize * (delayMs - kMinDelayMs))
536 / (kMaxDelayMs - kMinDelayMs);
537
538 if (n < 0) {
539 n = 0;
540 } else if ((size_t)n > kPatternSize) {
541 n = kPatternSize;
542 }
543
544 ALOGI("[%lld]: (%4lld ms) %s\n",
545 frag.mTimeUs / 1000,
546 delayMs,
547 kPattern + kPatternSize - n);
548 #endif
549 }
550
writeMore()551 status_t ANetworkSession::Session::writeMore() {
552 if (mState == DATAGRAM) {
553 CHECK(!mOutFragments.empty());
554
555 status_t err;
556 do {
557 const Fragment &frag = *mOutFragments.begin();
558 const sp<ABuffer> &datagram = frag.mBuffer;
559
560 int n;
561 do {
562 n = send(mSocket, datagram->data(), datagram->size(), 0);
563 } while (n < 0 && errno == EINTR);
564
565 err = OK;
566
567 if (n > 0) {
568 if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
569 dumpFragmentStats(frag);
570 }
571
572 mOutFragments.erase(mOutFragments.begin());
573 } else if (n < 0) {
574 err = -errno;
575 } else if (n == 0) {
576 err = -ECONNRESET;
577 }
578 } while (err == OK && !mOutFragments.empty());
579
580 if (err == -EAGAIN) {
581 if (!mOutFragments.empty()) {
582 ALOGI("%zu datagrams remain queued.", mOutFragments.size());
583 }
584 err = OK;
585 }
586
587 if (err != OK) {
588 if (!mUDPRetries) {
589 notifyError(true /* send */, err, "Send datagram failed.");
590 mSawSendFailure = true;
591 } else {
592 mUDPRetries--;
593 ALOGE("Send datagram failed, %d/%d retries left",
594 mUDPRetries, kMaxUDPRetries);
595 err = OK;
596 }
597 } else {
598 mUDPRetries = kMaxUDPRetries;
599 }
600
601 return err;
602 }
603
604 if (mState == CONNECTING) {
605 int err;
606 socklen_t optionLen = sizeof(err);
607 CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);
608 CHECK_EQ(optionLen, (socklen_t)sizeof(err));
609
610 if (err != 0) {
611 notifyError(kWhatError, -err, "Connection failed");
612 mSawSendFailure = true;
613
614 return -err;
615 }
616
617 mState = CONNECTED;
618 notify(kWhatConnected);
619
620 return OK;
621 }
622
623 CHECK_EQ(mState, CONNECTED);
624 CHECK(!mOutFragments.empty());
625
626 ssize_t n = -1;
627 while (!mOutFragments.empty()) {
628 const Fragment &frag = *mOutFragments.begin();
629
630 do {
631 n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0);
632 } while (n < 0 && errno == EINTR);
633
634 if (n <= 0) {
635 break;
636 }
637
638 frag.mBuffer->setRange(
639 frag.mBuffer->offset() + n, frag.mBuffer->size() - n);
640
641 if (frag.mBuffer->size() > 0) {
642 break;
643 }
644
645 if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
646 dumpFragmentStats(frag);
647 }
648
649 mOutFragments.erase(mOutFragments.begin());
650 }
651
652 status_t err = OK;
653
654 if (n < 0) {
655 err = -errno;
656 } else if (n == 0) {
657 err = -ECONNRESET;
658 }
659
660 if (err != OK) {
661 notifyError(true /* send */, err, "Send failed.");
662 mSawSendFailure = true;
663 }
664
665 #if 0
666 int numBytesQueued;
667 int res = ioctl(mSocket, SIOCOUTQ, &numBytesQueued);
668 if (res == 0 && numBytesQueued > 50 * 1024) {
669 if (numBytesQueued > 409600) {
670 ALOGW("!!! numBytesQueued = %d", numBytesQueued);
671 }
672
673 int64_t nowUs = ALooper::GetNowUs();
674
675 if (mLastStallReportUs < 0ll
676 || nowUs > mLastStallReportUs + 100000ll) {
677 sp<AMessage> msg = mNotify->dup();
678 msg->setInt32("sessionID", mSessionID);
679 msg->setInt32("reason", kWhatNetworkStall);
680 msg->setSize("numBytesQueued", numBytesQueued);
681 msg->post();
682
683 mLastStallReportUs = nowUs;
684 }
685 }
686 #endif
687
688 return err;
689 }
690
sendRequest(const void * data,ssize_t size,bool timeValid,int64_t timeUs)691 status_t ANetworkSession::Session::sendRequest(
692 const void *data, ssize_t size, bool timeValid, int64_t timeUs) {
693 CHECK(mState == CONNECTED || mState == DATAGRAM);
694
695 if (size < 0) {
696 size = strlen((const char *)data);
697 }
698
699 if (size == 0) {
700 return OK;
701 }
702
703 sp<ABuffer> buffer;
704
705 if (mState == CONNECTED && mMode == MODE_DATAGRAM) {
706 CHECK_LE(size, 65535);
707
708 buffer = new ABuffer(size + 2);
709 buffer->data()[0] = size >> 8;
710 buffer->data()[1] = size & 0xff;
711 memcpy(buffer->data() + 2, data, size);
712 } else if (mState == CONNECTED && mMode == MODE_WEBSOCKET) {
713 static const bool kUseMask = false; // Chromium doesn't like it.
714
715 size_t numHeaderBytes = 2 + (kUseMask ? 4 : 0);
716 if (size > 65535) {
717 numHeaderBytes += 8;
718 } else if (size > 125) {
719 numHeaderBytes += 2;
720 }
721
722 buffer = new ABuffer(numHeaderBytes + size);
723 buffer->data()[0] = 0x81; // FIN==1 | opcode=1 (text)
724 buffer->data()[1] = kUseMask ? 0x80 : 0x00;
725
726 if (size > 65535) {
727 buffer->data()[1] |= 127;
728 buffer->data()[2] = 0x00;
729 buffer->data()[3] = 0x00;
730 buffer->data()[4] = 0x00;
731 buffer->data()[5] = 0x00;
732 buffer->data()[6] = (size >> 24) & 0xff;
733 buffer->data()[7] = (size >> 16) & 0xff;
734 buffer->data()[8] = (size >> 8) & 0xff;
735 buffer->data()[9] = size & 0xff;
736 } else if (size > 125) {
737 buffer->data()[1] |= 126;
738 buffer->data()[2] = (size >> 8) & 0xff;
739 buffer->data()[3] = size & 0xff;
740 } else {
741 buffer->data()[1] |= size;
742 }
743
744 if (kUseMask) {
745 uint32_t mask = rand();
746
747 buffer->data()[numHeaderBytes - 4] = (mask >> 24) & 0xff;
748 buffer->data()[numHeaderBytes - 3] = (mask >> 16) & 0xff;
749 buffer->data()[numHeaderBytes - 2] = (mask >> 8) & 0xff;
750 buffer->data()[numHeaderBytes - 1] = mask & 0xff;
751
752 for (size_t i = 0; i < (size_t)size; ++i) {
753 buffer->data()[numHeaderBytes + i] =
754 ((const uint8_t *)data)[i]
755 ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
756 }
757 } else {
758 memcpy(buffer->data() + numHeaderBytes, data, size);
759 }
760 } else {
761 buffer = new ABuffer(size);
762 memcpy(buffer->data(), data, size);
763 }
764
765 Fragment frag;
766
767 frag.mFlags = 0;
768 if (timeValid) {
769 frag.mFlags = FRAGMENT_FLAG_TIME_VALID;
770 frag.mTimeUs = timeUs;
771 }
772
773 frag.mBuffer = buffer;
774
775 mOutFragments.push_back(frag);
776
777 return OK;
778 }
779
notifyError(bool send,status_t err,const char * detail)780 void ANetworkSession::Session::notifyError(
781 bool send, status_t err, const char *detail) {
782 sp<AMessage> msg = mNotify->dup();
783 msg->setInt32("sessionID", mSessionID);
784 msg->setInt32("reason", kWhatError);
785 msg->setInt32("send", send);
786 msg->setInt32("err", err);
787 msg->setString("detail", detail);
788 msg->post();
789 }
790
notify(NotificationReason reason)791 void ANetworkSession::Session::notify(NotificationReason reason) {
792 sp<AMessage> msg = mNotify->dup();
793 msg->setInt32("sessionID", mSessionID);
794 msg->setInt32("reason", reason);
795 msg->post();
796 }
797
798 ////////////////////////////////////////////////////////////////////////////////
799
ANetworkSession()800 ANetworkSession::ANetworkSession()
801 : mNextSessionID(1) {
802 mPipeFd[0] = mPipeFd[1] = -1;
803 }
804
~ANetworkSession()805 ANetworkSession::~ANetworkSession() {
806 stop();
807 }
808
start()809 status_t ANetworkSession::start() {
810 if (mThread != NULL) {
811 return INVALID_OPERATION;
812 }
813
814 int res = pipe(mPipeFd);
815 if (res != 0) {
816 mPipeFd[0] = mPipeFd[1] = -1;
817 return -errno;
818 }
819
820 mThread = new NetworkThread(this);
821
822 status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO);
823
824 if (err != OK) {
825 mThread.clear();
826
827 close(mPipeFd[0]);
828 close(mPipeFd[1]);
829 mPipeFd[0] = mPipeFd[1] = -1;
830
831 return err;
832 }
833
834 return OK;
835 }
836
stop()837 status_t ANetworkSession::stop() {
838 if (mThread == NULL) {
839 return INVALID_OPERATION;
840 }
841
842 mThread->requestExit();
843 interrupt();
844 mThread->requestExitAndWait();
845
846 mThread.clear();
847
848 close(mPipeFd[0]);
849 close(mPipeFd[1]);
850 mPipeFd[0] = mPipeFd[1] = -1;
851
852 return OK;
853 }
854
createRTSPClient(const char * host,unsigned port,const sp<AMessage> & notify,int32_t * sessionID)855 status_t ANetworkSession::createRTSPClient(
856 const char *host, unsigned port, const sp<AMessage> ¬ify,
857 int32_t *sessionID) {
858 return createClientOrServer(
859 kModeCreateRTSPClient,
860 NULL /* addr */,
861 0 /* port */,
862 host,
863 port,
864 notify,
865 sessionID);
866 }
867
createRTSPServer(const struct in_addr & addr,unsigned port,const sp<AMessage> & notify,int32_t * sessionID)868 status_t ANetworkSession::createRTSPServer(
869 const struct in_addr &addr, unsigned port,
870 const sp<AMessage> ¬ify, int32_t *sessionID) {
871 return createClientOrServer(
872 kModeCreateRTSPServer,
873 &addr,
874 port,
875 NULL /* remoteHost */,
876 0 /* remotePort */,
877 notify,
878 sessionID);
879 }
880
createUDPSession(unsigned localPort,const sp<AMessage> & notify,int32_t * sessionID)881 status_t ANetworkSession::createUDPSession(
882 unsigned localPort, const sp<AMessage> ¬ify, int32_t *sessionID) {
883 return createUDPSession(localPort, NULL, 0, notify, sessionID);
884 }
885
createUDPSession(unsigned localPort,const char * remoteHost,unsigned remotePort,const sp<AMessage> & notify,int32_t * sessionID)886 status_t ANetworkSession::createUDPSession(
887 unsigned localPort,
888 const char *remoteHost,
889 unsigned remotePort,
890 const sp<AMessage> ¬ify,
891 int32_t *sessionID) {
892 return createClientOrServer(
893 kModeCreateUDPSession,
894 NULL /* addr */,
895 localPort,
896 remoteHost,
897 remotePort,
898 notify,
899 sessionID);
900 }
901
createTCPDatagramSession(const struct in_addr & addr,unsigned port,const sp<AMessage> & notify,int32_t * sessionID)902 status_t ANetworkSession::createTCPDatagramSession(
903 const struct in_addr &addr, unsigned port,
904 const sp<AMessage> ¬ify, int32_t *sessionID) {
905 return createClientOrServer(
906 kModeCreateTCPDatagramSessionPassive,
907 &addr,
908 port,
909 NULL /* remoteHost */,
910 0 /* remotePort */,
911 notify,
912 sessionID);
913 }
914
createTCPDatagramSession(unsigned localPort,const char * remoteHost,unsigned remotePort,const sp<AMessage> & notify,int32_t * sessionID)915 status_t ANetworkSession::createTCPDatagramSession(
916 unsigned localPort,
917 const char *remoteHost,
918 unsigned remotePort,
919 const sp<AMessage> ¬ify,
920 int32_t *sessionID) {
921 return createClientOrServer(
922 kModeCreateTCPDatagramSessionActive,
923 NULL /* addr */,
924 localPort,
925 remoteHost,
926 remotePort,
927 notify,
928 sessionID);
929 }
930
destroySession(int32_t sessionID)931 status_t ANetworkSession::destroySession(int32_t sessionID) {
932 Mutex::Autolock autoLock(mLock);
933
934 ssize_t index = mSessions.indexOfKey(sessionID);
935
936 if (index < 0) {
937 return -ENOENT;
938 }
939
940 mSessions.removeItemsAt(index);
941
942 interrupt();
943
944 return OK;
945 }
946
947 // static
MakeSocketNonBlocking(int s)948 status_t ANetworkSession::MakeSocketNonBlocking(int s) {
949 int flags = fcntl(s, F_GETFL, 0);
950 if (flags < 0) {
951 flags = 0;
952 }
953
954 int res = fcntl(s, F_SETFL, flags | O_NONBLOCK);
955 if (res < 0) {
956 return -errno;
957 }
958
959 return OK;
960 }
961
createClientOrServer(Mode mode,const struct in_addr * localAddr,unsigned port,const char * remoteHost,unsigned remotePort,const sp<AMessage> & notify,int32_t * sessionID)962 status_t ANetworkSession::createClientOrServer(
963 Mode mode,
964 const struct in_addr *localAddr,
965 unsigned port,
966 const char *remoteHost,
967 unsigned remotePort,
968 const sp<AMessage> ¬ify,
969 int32_t *sessionID) {
970 Mutex::Autolock autoLock(mLock);
971
972 *sessionID = 0;
973 status_t err = OK;
974 int s, res;
975 sp<Session> session;
976
977 s = socket(
978 AF_INET,
979 (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM,
980 0);
981
982 if (s < 0) {
983 err = -errno;
984 goto bail;
985 }
986
987 if (mode == kModeCreateRTSPServer
988 || mode == kModeCreateTCPDatagramSessionPassive) {
989 const int yes = 1;
990 res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
991
992 if (res < 0) {
993 err = -errno;
994 goto bail2;
995 }
996 }
997
998 if (mode == kModeCreateUDPSession) {
999 int size = 256 * 1024;
1000
1001 res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
1002
1003 if (res < 0) {
1004 err = -errno;
1005 goto bail2;
1006 }
1007
1008 res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size));
1009
1010 if (res < 0) {
1011 err = -errno;
1012 goto bail2;
1013 }
1014 } else if (mode == kModeCreateTCPDatagramSessionActive) {
1015 int flag = 1;
1016 res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
1017
1018 if (res < 0) {
1019 err = -errno;
1020 goto bail2;
1021 }
1022
1023 int tos = 224; // VOICE
1024 res = setsockopt(s, IPPROTO_IP, IP_TOS, &tos, sizeof(tos));
1025
1026 if (res < 0) {
1027 err = -errno;
1028 goto bail2;
1029 }
1030 }
1031
1032 err = MakeSocketNonBlocking(s);
1033
1034 if (err != OK) {
1035 goto bail2;
1036 }
1037
1038 struct sockaddr_in addr;
1039 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
1040 addr.sin_family = AF_INET;
1041
1042 if (mode == kModeCreateRTSPClient
1043 || mode == kModeCreateTCPDatagramSessionActive) {
1044 struct hostent *ent= gethostbyname(remoteHost);
1045 if (ent == NULL) {
1046 err = -h_errno;
1047 goto bail2;
1048 }
1049
1050 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
1051 addr.sin_port = htons(remotePort);
1052 } else if (localAddr != NULL) {
1053 addr.sin_addr = *localAddr;
1054 addr.sin_port = htons(port);
1055 } else {
1056 addr.sin_addr.s_addr = htonl(INADDR_ANY);
1057 addr.sin_port = htons(port);
1058 }
1059
1060 if (mode == kModeCreateRTSPClient
1061 || mode == kModeCreateTCPDatagramSessionActive) {
1062 in_addr_t x = ntohl(addr.sin_addr.s_addr);
1063 ALOGI("connecting socket %d to %d.%d.%d.%d:%d",
1064 s,
1065 (x >> 24),
1066 (x >> 16) & 0xff,
1067 (x >> 8) & 0xff,
1068 x & 0xff,
1069 ntohs(addr.sin_port));
1070
1071 res = connect(s, (const struct sockaddr *)&addr, sizeof(addr));
1072
1073 CHECK_LT(res, 0);
1074 if (errno == EINPROGRESS) {
1075 res = 0;
1076 }
1077 } else {
1078 res = bind(s, (const struct sockaddr *)&addr, sizeof(addr));
1079
1080 if (res == 0) {
1081 if (mode == kModeCreateRTSPServer
1082 || mode == kModeCreateTCPDatagramSessionPassive) {
1083 res = listen(s, 4);
1084 } else {
1085 CHECK_EQ(mode, kModeCreateUDPSession);
1086
1087 if (remoteHost != NULL) {
1088 struct sockaddr_in remoteAddr;
1089 memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
1090 remoteAddr.sin_family = AF_INET;
1091 remoteAddr.sin_port = htons(remotePort);
1092
1093 struct hostent *ent= gethostbyname(remoteHost);
1094 if (ent == NULL) {
1095 err = -h_errno;
1096 goto bail2;
1097 }
1098
1099 remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
1100
1101 res = connect(
1102 s,
1103 (const struct sockaddr *)&remoteAddr,
1104 sizeof(remoteAddr));
1105 }
1106 }
1107 }
1108 }
1109
1110 if (res < 0) {
1111 err = -errno;
1112 goto bail2;
1113 }
1114
1115 Session::State state;
1116 switch (mode) {
1117 case kModeCreateRTSPClient:
1118 state = Session::CONNECTING;
1119 break;
1120
1121 case kModeCreateTCPDatagramSessionActive:
1122 state = Session::CONNECTING;
1123 break;
1124
1125 case kModeCreateTCPDatagramSessionPassive:
1126 state = Session::LISTENING_TCP_DGRAMS;
1127 break;
1128
1129 case kModeCreateRTSPServer:
1130 state = Session::LISTENING_RTSP;
1131 break;
1132
1133 default:
1134 CHECK_EQ(mode, kModeCreateUDPSession);
1135 state = Session::DATAGRAM;
1136 break;
1137 }
1138
1139 session = new Session(
1140 mNextSessionID++,
1141 state,
1142 s,
1143 notify);
1144
1145 if (mode == kModeCreateTCPDatagramSessionActive) {
1146 session->setMode(Session::MODE_DATAGRAM);
1147 } else if (mode == kModeCreateRTSPClient) {
1148 session->setMode(Session::MODE_RTSP);
1149 }
1150
1151 mSessions.add(session->sessionID(), session);
1152
1153 interrupt();
1154
1155 *sessionID = session->sessionID();
1156
1157 goto bail;
1158
1159 bail2:
1160 close(s);
1161 s = -1;
1162
1163 bail:
1164 return err;
1165 }
1166
connectUDPSession(int32_t sessionID,const char * remoteHost,unsigned remotePort)1167 status_t ANetworkSession::connectUDPSession(
1168 int32_t sessionID, const char *remoteHost, unsigned remotePort) {
1169 Mutex::Autolock autoLock(mLock);
1170
1171 ssize_t index = mSessions.indexOfKey(sessionID);
1172
1173 if (index < 0) {
1174 return -ENOENT;
1175 }
1176
1177 const sp<Session> session = mSessions.valueAt(index);
1178 int s = session->socket();
1179
1180 struct sockaddr_in remoteAddr;
1181 memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
1182 remoteAddr.sin_family = AF_INET;
1183 remoteAddr.sin_port = htons(remotePort);
1184
1185 status_t err = OK;
1186 struct hostent *ent = gethostbyname(remoteHost);
1187 if (ent == NULL) {
1188 err = -h_errno;
1189 } else {
1190 remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
1191
1192 int res = connect(
1193 s,
1194 (const struct sockaddr *)&remoteAddr,
1195 sizeof(remoteAddr));
1196
1197 if (res < 0) {
1198 err = -errno;
1199 }
1200 }
1201
1202 return err;
1203 }
1204
sendRequest(int32_t sessionID,const void * data,ssize_t size,bool timeValid,int64_t timeUs)1205 status_t ANetworkSession::sendRequest(
1206 int32_t sessionID, const void *data, ssize_t size,
1207 bool timeValid, int64_t timeUs) {
1208 Mutex::Autolock autoLock(mLock);
1209
1210 ssize_t index = mSessions.indexOfKey(sessionID);
1211
1212 if (index < 0) {
1213 return -ENOENT;
1214 }
1215
1216 const sp<Session> session = mSessions.valueAt(index);
1217
1218 status_t err = session->sendRequest(data, size, timeValid, timeUs);
1219
1220 interrupt();
1221
1222 return err;
1223 }
1224
switchToWebSocketMode(int32_t sessionID)1225 status_t ANetworkSession::switchToWebSocketMode(int32_t sessionID) {
1226 Mutex::Autolock autoLock(mLock);
1227
1228 ssize_t index = mSessions.indexOfKey(sessionID);
1229
1230 if (index < 0) {
1231 return -ENOENT;
1232 }
1233
1234 const sp<Session> session = mSessions.valueAt(index);
1235 return session->switchToWebSocketMode();
1236 }
1237
interrupt()1238 void ANetworkSession::interrupt() {
1239 static const char dummy = 0;
1240
1241 ssize_t n;
1242 do {
1243 n = write(mPipeFd[1], &dummy, 1);
1244 } while (n < 0 && errno == EINTR);
1245
1246 if (n < 0) {
1247 ALOGW("Error writing to pipe (%s)", strerror(errno));
1248 }
1249 }
1250
threadLoop()1251 void ANetworkSession::threadLoop() {
1252 fd_set rs, ws;
1253 FD_ZERO(&rs);
1254 FD_ZERO(&ws);
1255
1256 FD_SET(mPipeFd[0], &rs);
1257 int maxFd = mPipeFd[0];
1258
1259 {
1260 Mutex::Autolock autoLock(mLock);
1261
1262 for (size_t i = 0; i < mSessions.size(); ++i) {
1263 const sp<Session> &session = mSessions.valueAt(i);
1264
1265 int s = session->socket();
1266
1267 if (s < 0) {
1268 continue;
1269 }
1270
1271 if (session->wantsToRead()) {
1272 FD_SET(s, &rs);
1273 if (s > maxFd) {
1274 maxFd = s;
1275 }
1276 }
1277
1278 if (session->wantsToWrite()) {
1279 FD_SET(s, &ws);
1280 if (s > maxFd) {
1281 maxFd = s;
1282 }
1283 }
1284 }
1285 }
1286
1287 int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */);
1288
1289 if (res == 0) {
1290 return;
1291 }
1292
1293 if (res < 0) {
1294 if (errno == EINTR) {
1295 return;
1296 }
1297
1298 ALOGE("select failed w/ error %d (%s)", errno, strerror(errno));
1299 return;
1300 }
1301
1302 if (FD_ISSET(mPipeFd[0], &rs)) {
1303 char c;
1304 ssize_t n;
1305 do {
1306 n = read(mPipeFd[0], &c, 1);
1307 } while (n < 0 && errno == EINTR);
1308
1309 if (n < 0) {
1310 ALOGW("Error reading from pipe (%s)", strerror(errno));
1311 }
1312
1313 --res;
1314 }
1315
1316 {
1317 Mutex::Autolock autoLock(mLock);
1318
1319 List<sp<Session> > sessionsToAdd;
1320
1321 for (size_t i = mSessions.size(); res > 0 && i > 0;) {
1322 i--;
1323 const sp<Session> &session = mSessions.valueAt(i);
1324
1325 int s = session->socket();
1326
1327 if (s < 0) {
1328 continue;
1329 }
1330
1331 if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) {
1332 --res;
1333 }
1334
1335 if (FD_ISSET(s, &rs)) {
1336 if (session->isRTSPServer() || session->isTCPDatagramServer()) {
1337 struct sockaddr_in remoteAddr;
1338 socklen_t remoteAddrLen = sizeof(remoteAddr);
1339
1340 int clientSocket = accept(
1341 s, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
1342
1343 if (clientSocket >= 0) {
1344 status_t err = MakeSocketNonBlocking(clientSocket);
1345
1346 if (err != OK) {
1347 ALOGE("Unable to make client socket non blocking, "
1348 "failed w/ error %d (%s)",
1349 err, strerror(-err));
1350
1351 close(clientSocket);
1352 clientSocket = -1;
1353 } else {
1354 in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr);
1355
1356 ALOGI("incoming connection from %d.%d.%d.%d:%d "
1357 "(socket %d)",
1358 (addr >> 24),
1359 (addr >> 16) & 0xff,
1360 (addr >> 8) & 0xff,
1361 addr & 0xff,
1362 ntohs(remoteAddr.sin_port),
1363 clientSocket);
1364
1365 sp<Session> clientSession =
1366 new Session(
1367 mNextSessionID++,
1368 Session::CONNECTED,
1369 clientSocket,
1370 session->getNotificationMessage());
1371
1372 clientSession->setMode(
1373 session->isRTSPServer()
1374 ? Session::MODE_RTSP
1375 : Session::MODE_DATAGRAM);
1376
1377 sessionsToAdd.push_back(clientSession);
1378 }
1379 } else {
1380 ALOGE("accept returned error %d (%s)",
1381 errno, strerror(errno));
1382 }
1383 } else {
1384 status_t err = session->readMore();
1385 if (err != OK) {
1386 ALOGE("readMore on socket %d failed w/ error %d (%s)",
1387 s, err, strerror(-err));
1388 }
1389 }
1390 }
1391
1392 if (FD_ISSET(s, &ws)) {
1393 status_t err = session->writeMore();
1394 if (err != OK) {
1395 ALOGE("writeMore on socket %d failed w/ error %d (%s)",
1396 s, err, strerror(-err));
1397 }
1398 }
1399 }
1400
1401 while (!sessionsToAdd.empty()) {
1402 sp<Session> session = *sessionsToAdd.begin();
1403 sessionsToAdd.erase(sessionsToAdd.begin());
1404
1405 mSessions.add(session->sessionID(), session);
1406
1407 ALOGI("added clientSession %d", session->sessionID());
1408 }
1409 }
1410 }
1411
1412 } // namespace android
1413