1 /*
2  * Copyright (C) 2009 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "rsContext.h"
18 #include "rsThreadIO.h"
19 #include "rsgApiStructs.h"
20 
21 #include <unistd.h>
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 
25 #include <fcntl.h>
26 #include <poll.h>
27 
28 
29 using namespace android;
30 using namespace android::renderscript;
31 
ThreadIO()32 ThreadIO::ThreadIO() {
33     mRunning = true;
34     mPureFifo = false;
35     mMaxInlineSize = 1024;
36 }
37 
~ThreadIO()38 ThreadIO::~ThreadIO() {
39 }
40 
init()41 void ThreadIO::init() {
42     mToClient.init();
43     mToCore.init();
44 }
45 
shutdown()46 void ThreadIO::shutdown() {
47     mRunning = false;
48     mToCore.shutdown();
49 }
50 
coreHeader(uint32_t cmdID,size_t dataLen)51 void * ThreadIO::coreHeader(uint32_t cmdID, size_t dataLen) {
52     //ALOGE("coreHeader %i %i", cmdID, dataLen);
53     CoreCmdHeader *hdr = (CoreCmdHeader *)&mSendBuffer[0];
54     hdr->bytes = dataLen;
55     hdr->cmdID = cmdID;
56     mSendLen = dataLen + sizeof(CoreCmdHeader);
57     //mToCoreSocket.writeAsync(&hdr, sizeof(hdr));
58     //ALOGE("coreHeader ret ");
59     return &mSendBuffer[sizeof(CoreCmdHeader)];
60 }
61 
coreCommit()62 void ThreadIO::coreCommit() {
63     mToCore.writeAsync(&mSendBuffer, mSendLen);
64 }
65 
clientShutdown()66 void ThreadIO::clientShutdown() {
67     mToClient.shutdown();
68 }
69 
coreWrite(const void * data,size_t len)70 void ThreadIO::coreWrite(const void *data, size_t len) {
71     //ALOGV("core write %p %i", data, (int)len);
72     mToCore.writeAsync(data, len, true);
73 }
74 
coreRead(void * data,size_t len)75 void ThreadIO::coreRead(void *data, size_t len) {
76     //ALOGV("core read %p %i", data, (int)len);
77     mToCore.read(data, len);
78 }
79 
coreSetReturn(const void * data,size_t dataLen)80 void ThreadIO::coreSetReturn(const void *data, size_t dataLen) {
81     uint32_t buf;
82     if (data == NULL) {
83         data = &buf;
84         dataLen = sizeof(buf);
85     }
86 
87     mToCore.readReturn(data, dataLen);
88 }
89 
coreGetReturn(void * data,size_t dataLen)90 void ThreadIO::coreGetReturn(void *data, size_t dataLen) {
91     uint32_t buf;
92     if (data == NULL) {
93         data = &buf;
94         dataLen = sizeof(buf);
95     }
96 
97     mToCore.writeWaitReturn(data, dataLen);
98 }
99 
setTimeoutCallback(void (* cb)(void *),void * dat,uint64_t timeout)100 void ThreadIO::setTimeoutCallback(void (*cb)(void *), void *dat, uint64_t timeout) {
101     //mToCore.setTimeoutCallback(cb, dat, timeout);
102 }
103 
playCoreCommands(Context * con,int waitFd)104 bool ThreadIO::playCoreCommands(Context *con, int waitFd) {
105     bool ret = false;
106     const bool isLocal = !isPureFifo();
107 
108     uint8_t buf[2 * 1024];
109     const CoreCmdHeader *cmd = (const CoreCmdHeader *)&buf[0];
110     const void * data = (const void *)&buf[sizeof(CoreCmdHeader)];
111 
112     struct pollfd p[2];
113     p[0].fd = mToCore.getReadFd();
114     p[0].events = POLLIN;
115     p[0].revents = 0;
116     p[1].fd = waitFd;
117     p[1].events = POLLIN;
118     p[1].revents = 0;
119     int pollCount = 1;
120     if (waitFd >= 0) {
121         pollCount = 2;
122     }
123 
124     if (con->props.mLogTimes) {
125         con->timerSet(Context::RS_TIMER_IDLE);
126     }
127 
128     int waitTime = -1;
129     while (mRunning) {
130         int pr = poll(p, pollCount, waitTime);
131         if (pr <= 0) {
132             break;
133         }
134 
135         if (p[0].revents) {
136             size_t r = 0;
137             if (isLocal) {
138                 r = mToCore.read(&buf[0], sizeof(CoreCmdHeader));
139                 mToCore.read(&buf[sizeof(CoreCmdHeader)], cmd->bytes);
140                 if (r != sizeof(CoreCmdHeader)) {
141                     // exception or timeout occurred.
142                     break;
143                 }
144             } else {
145                 r = mToCore.read((void *)&cmd->cmdID, sizeof(cmd->cmdID));
146             }
147 
148 
149             ret = true;
150             if (con->props.mLogTimes) {
151                 con->timerSet(Context::RS_TIMER_INTERNAL);
152             }
153             //ALOGV("playCoreCommands 3 %i %i", cmd->cmdID, cmd->bytes);
154 
155             if (cmd->cmdID >= (sizeof(gPlaybackFuncs) / sizeof(void *))) {
156                 rsAssert(cmd->cmdID < (sizeof(gPlaybackFuncs) / sizeof(void *)));
157                 ALOGE("playCoreCommands error con %p, cmd %i", con, cmd->cmdID);
158             }
159 
160             if (isLocal) {
161                 gPlaybackFuncs[cmd->cmdID](con, data, cmd->bytes);
162             } else {
163                 gPlaybackRemoteFuncs[cmd->cmdID](con, this);
164             }
165 
166             if (con->props.mLogTimes) {
167                 con->timerSet(Context::RS_TIMER_IDLE);
168             }
169 
170             if (waitFd < 0) {
171                 // If we don't have a secondary wait object we should stop blocking now
172                 // that at least one command has been processed.
173                 waitTime = 0;
174             }
175         }
176 
177         if (p[1].revents && !p[0].revents) {
178             // We want to finish processing fifo events before processing the vsync.
179             // Otherwise we can end up falling behind and having tremendous lag.
180             break;
181         }
182     }
183     return ret;
184 }
185 
getClientHeader(size_t * receiveLen,uint32_t * usrID)186 RsMessageToClientType ThreadIO::getClientHeader(size_t *receiveLen, uint32_t *usrID) {
187     //ALOGE("getClientHeader");
188     mToClient.read(&mLastClientHeader, sizeof(mLastClientHeader));
189 
190     receiveLen[0] = mLastClientHeader.bytes;
191     usrID[0] = mLastClientHeader.userID;
192     //ALOGE("getClientHeader %i %i %i", mLastClientHeader.cmdID, usrID[0], receiveLen[0]);
193     return (RsMessageToClientType)mLastClientHeader.cmdID;
194 }
195 
getClientPayload(void * data,size_t * receiveLen,uint32_t * usrID,size_t bufferLen)196 RsMessageToClientType ThreadIO::getClientPayload(void *data, size_t *receiveLen,
197                                 uint32_t *usrID, size_t bufferLen) {
198     //ALOGE("getClientPayload");
199     receiveLen[0] = mLastClientHeader.bytes;
200     usrID[0] = mLastClientHeader.userID;
201     if (bufferLen < mLastClientHeader.bytes) {
202         return RS_MESSAGE_TO_CLIENT_RESIZE;
203     }
204     if (receiveLen[0]) {
205         mToClient.read(data, receiveLen[0]);
206     }
207     //ALOGE("getClientPayload x");
208     return (RsMessageToClientType)mLastClientHeader.cmdID;
209 }
210 
sendToClient(RsMessageToClientType cmdID,uint32_t usrID,const void * data,size_t dataLen,bool waitForSpace)211 bool ThreadIO::sendToClient(RsMessageToClientType cmdID, uint32_t usrID, const void *data,
212                             size_t dataLen, bool waitForSpace) {
213 
214     //ALOGE("sendToClient %i %i %i", cmdID, usrID, (int)dataLen);
215     ClientCmdHeader hdr;
216     hdr.bytes = (uint32_t)dataLen;
217     hdr.cmdID = cmdID;
218     hdr.userID = usrID;
219 
220     mToClient.writeAsync(&hdr, sizeof(hdr));
221     if (dataLen) {
222         mToClient.writeAsync(data, dataLen);
223     }
224 
225     //ALOGE("sendToClient x");
226     return true;
227 }
228 
229