1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5  * use this file except in compliance with the License. You may obtain a copy of
6  * the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations under
14  * the License.
15  */
16 
17 #include <sys/types.h>
18 #include <sys/wait.h>
19 #include <unistd.h>
20 #include <errno.h>
21 
22 #include <utils/StrongPointer.h>
23 
24 #include "Log.h"
25 #include "audio/Buffer.h"
26 #include "StringUtil.h"
27 #include "SimpleScriptExec.h"
28 #include "SignalProcessingImpl.h"
29 #include "task/TaskCase.h"
30 
31 enum ToPythonCommandType {
32     EHeader             = 0x0,
33     ETerminate          = 0x1,
34     EFunctionName       = 0x2,
35     EAudioMono          = 0x4,
36     EAudioStereo        = 0x5,
37     EValue64Int         = 0x8,
38     EValueDouble        = 0x9,
39     EExecutionResult    = 0x10
40 };
41 
42 const android::String8 \
43     SignalProcessingImpl::MAIN_PROCESSING_SCRIPT("test_description/processing_main.py");
44 
SignalProcessingImpl()45 SignalProcessingImpl::SignalProcessingImpl()
46     : mChildRunning(false),
47       mBuffer(1024)
48 {
49 
50 }
51 
~SignalProcessingImpl()52 SignalProcessingImpl::~SignalProcessingImpl()
53 {
54     if (mSocket.get() != NULL) {
55         int terminationCommand [] = {ETerminate, 0};
56         send((char*)terminationCommand, sizeof(terminationCommand));
57         mSocket->release();
58     }
59     if (mChildRunning) {
60         waitpid(mChildPid, NULL, 0);
61     }
62 }
63 
64 #define CHILD_LOGE(x...) do { fprintf(stderr, x); \
65     fprintf(stderr, " %s - %d\n", __FILE__, __LINE__); } while(0)
66 
67 const int CHILD_WAIT_TIME_US = 100000;
68 
init(const android::String8 & script)69 bool SignalProcessingImpl::init(const android::String8& script)
70 {
71     pid_t pid;
72     if ((pid = fork()) < 0) {
73         LOGE("SignalProcessingImpl::init fork failed %d", errno);
74         return false;
75     } else if (pid == 0) { // child
76         if (execl(SimpleScriptExec::PYTHON_PATH, SimpleScriptExec::PYTHON_PATH,
77                 script.string(), NULL) < 0) {
78             CHILD_LOGE("execl %s %s failed %d", SimpleScriptExec::PYTHON_PATH,
79                     script.string(), errno);
80             exit(EXIT_FAILURE);
81         }
82     } else { // parent
83         mChildPid = pid;
84         mChildRunning = true;
85         int result = false;
86         int retryCount = 0;
87         // not that clean, but it takes some time for python side to have socket ready
88         const int MAX_RETRY = 20;
89         while (retryCount < MAX_RETRY) {
90             usleep(CHILD_WAIT_TIME_US);
91             mSocket.reset(new ClientSocket());
92             if (mSocket.get() == NULL) {
93                 result = false;
94                 break;
95             }
96             if (mSocket->init("127.0.0.1", SCRIPT_PORT, true)) {
97                 result = true;
98                 break;
99             }
100             retryCount++;
101         }
102         if (!result) {
103             LOGE("cannot connect to child");
104             mSocket.reset(NULL);
105             return result;
106         }
107     }
108     return true;
109 }
110 
111 
run(const android::String8 & functionScript,int nInputs,bool * inputTypes,void ** inputs,int nOutputs,bool * outputTypes,void ** outputs)112 TaskGeneric::ExecutionResult SignalProcessingImpl::run( const android::String8& functionScript,
113         int nInputs, bool* inputTypes, void** inputs,
114         int nOutputs, bool* outputTypes, void** outputs)
115 {
116     mBuffer.reset();
117     mBuffer.write <int32_t>((int32_t)EHeader);
118     mBuffer.write<int32_t>(nInputs + 1);
119     mBuffer.write<int32_t>((int32_t)EFunctionName);
120     mBuffer.write<int32_t>((int32_t)functionScript.length());
121     mBuffer.writeStr(functionScript);
122     if (!send(mBuffer.getBuffer(), mBuffer.getSizeWritten())) {
123         LOGE("send failed");
124         return TaskGeneric::EResultError;
125     }
126     for (int i = 0; i < nInputs; i++) {
127         mBuffer.reset();
128         if (inputTypes[i]) { // android::sp<Buffer>*
129             android::sp<Buffer>* buffer = reinterpret_cast<android::sp<Buffer>*>(inputs[i]);
130             mBuffer.write<int32_t>((int32_t)((*buffer)->isStereo() ? EAudioStereo : EAudioMono));
131             int dataLen = (*buffer)->getSize();
132             mBuffer.write<int32_t>(dataLen);
133             if (!send(mBuffer.getBuffer(), mBuffer.getSizeWritten())) {
134                 LOGE("send failed");
135                 return TaskGeneric::EResultError;
136             }
137             if (!send((*buffer)->getData(), dataLen)) {
138                 LOGE("send failed");
139                 return TaskGeneric::EResultError;
140             }
141             LOGD("%d-th param buffer %d, stereo:%d", i, dataLen, (*buffer)->isStereo());
142         } else { //TaskCase::Value*
143             TaskCase::Value* val = reinterpret_cast<TaskCase::Value*>(inputs[i]);
144             bool isI64 = (val->getType() == TaskCase::Value::ETypeI64);
145             mBuffer.write<int32_t>((int32_t)(isI64 ? EValue64Int : EValueDouble));
146             if (isI64) {
147                 mBuffer.write<int64_t>(val->getInt64());
148             } else  {
149                 mBuffer.write<double>(val->getDouble());
150             }
151             if (!send(mBuffer.getBuffer(), mBuffer.getSizeWritten())) {
152                 LOGE("send failed");
153                 return TaskGeneric::EResultError;
154             }
155             LOGD("%d-th param Value", i);
156         }
157     }
158     int32_t header[4]; // id 0 - no of types - id 0x10 - ExecutionResult
159     if (!read((char*)header, sizeof(header))) {
160         LOGE("read failed");
161         return TaskGeneric::EResultError;
162     }
163     if (header[0] != 0) {
164         LOGE("wrong data");
165         return TaskGeneric::EResultError;
166     }
167     if (header[2] != EExecutionResult) {
168         LOGE("wrong data");
169         return TaskGeneric::EResultError;
170     }
171     if (header[3] == TaskGeneric::EResultError) {
172         LOGE("script returned error %d", header[3]);
173         return (TaskGeneric::ExecutionResult)header[3];
174     }
175     if ((header[1] - 1) != nOutputs) {
176         LOGE("wrong data");
177         return TaskGeneric::EResultError;
178     }
179     for (int i = 0; i < nOutputs; i++) {
180         int32_t type;
181         if (!read((char*)&type, sizeof(type))) {
182             LOGE("read failed");
183             return TaskGeneric::EResultError;
184         }
185         if (outputTypes[i]) { // android::sp<Buffer>*
186             int32_t dataLen;
187             if (!read((char*)&dataLen, sizeof(dataLen))) {
188                 LOGE("read failed");
189                 return TaskGeneric::EResultError;
190             }
191             android::sp<Buffer>* buffer = reinterpret_cast<android::sp<Buffer>*>(outputs[i]);
192             if (buffer->get() == NULL) { // data not allocated, this can happen for unknown-length output
193                 *buffer = new Buffer(dataLen, dataLen, (type == EAudioStereo) ? true: false);
194                 if (buffer->get() == NULL) {
195                     LOGE("alloc failed");
196                     return TaskGeneric::EResultError;
197                 }
198             }
199             bool isStereo = (*buffer)->isStereo();
200 
201             if (((type == EAudioStereo) && isStereo) || ((type == EAudioMono) && !isStereo)) {
202                 // valid
203             } else {
204                 LOGE("%d-th output wrong type %d stereo: %d", i, type, isStereo);
205                 return TaskGeneric::EResultError;
206             }
207 
208             if (dataLen > (int)(*buffer)->getSize()) {
209                 LOGE("%d-th output data too long %d while buffer size %d", i, dataLen,
210                         (*buffer)->getSize());
211                 return TaskGeneric::EResultError;
212             }
213             if (!read((*buffer)->getData(), dataLen)) {
214                 LOGE("read failed");
215                 return TaskGeneric::EResultError;
216             }
217             LOGD("received buffer %x %x", ((*buffer)->getData())[0], ((*buffer)->getData())[1]);
218             (*buffer)->setHandled(dataLen);
219             (*buffer)->setSize(dataLen);
220         } else { //TaskCase::Value*
221             TaskCase::Value* val = reinterpret_cast<TaskCase::Value*>(outputs[i]);
222             if ((type == EValue64Int) || (type == EValueDouble)) {
223                 if (!read((char*)val->getPtr(), sizeof(int64_t))) {
224                     LOGE("read failed");
225                     return TaskGeneric::EResultError;
226                 }
227                 if (type == EValue64Int) {
228                     val->setType(TaskCase::Value::ETypeI64);
229                 } else {
230                     val->setType(TaskCase::Value::ETypeDouble);
231                 }
232             } else {
233                 LOGE("wrong type %d", type);
234                 return TaskGeneric::EResultError;
235             }
236         }
237     }
238     return (TaskGeneric::ExecutionResult)header[3];
239 }
240 
send(const char * data,int len)241 bool SignalProcessingImpl::send(const char* data, int len)
242 {
243     //LOGD("send %d", len);
244     return mSocket->sendData(data, len);
245 }
246 
read(char * data,int len)247 bool SignalProcessingImpl::read(char* data, int len)
248 {
249     const int READ_TIMEOUT_MS = 60000 * 2; // as some calculation like calc_delay takes almost 20 secs
250     //LOGD("read %d", len);
251     return mSocket->readData(data, len, READ_TIMEOUT_MS);
252 }
253 
254