1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "fec_private.h"
18 
19 struct process_info {
20     int id;
21     fec_handle *f;
22     uint8_t *buf;
23     size_t count;
24     uint64_t offset;
25     read_func func;
26     ssize_t rc;
27     size_t errors;
28 };
29 
30 /* thread function  */
31 static void * __process(void *cookie)
32 {
33     process_info *p = static_cast<process_info *>(cookie);
34 
35     debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset,
36         p->offset + p->count);
37 
38     p->rc = p->func(p->f, p->buf, p->count, p->offset, &p->errors);
39     return p;
40 }
41 
42 /* launches a maximum number of threads to process a read */
43 ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset,
44         read_func func)
45 {
46     check(f);
47     check(buf)
48     check(func);
49 
50     if (count == 0) {
51         return 0;
52     }
53 
54     int threads = sysconf(_SC_NPROCESSORS_ONLN);
55 
56     if (threads < WORK_MIN_THREADS) {
57         threads = WORK_MIN_THREADS;
58     } else if (threads > WORK_MAX_THREADS) {
59         threads = WORK_MAX_THREADS;
60     }
61 
62     uint64_t start = (offset / FEC_BLOCKSIZE) * FEC_BLOCKSIZE;
63     size_t blocks = fec_div_round_up(count, FEC_BLOCKSIZE);
64 
65     size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE;
66     size_t max_threads = fec_div_round_up(count, count_per_thread);
67 
68     if ((size_t)threads > max_threads) {
69         threads = (int)max_threads;
70     }
71 
72     size_t left = count;
73     uint64_t pos = offset;
74     uint64_t end = start + count_per_thread;
75 
76     debug("%d threads, %zu bytes per thread (total %zu)", threads,
77         count_per_thread, count);
78 
79     std::vector<pthread_t> handles;
80     process_info info[threads];
81     ssize_t rc = 0;
82 
83     /* start threads to process queue */
84     for (int i = 0; i < threads; ++i) {
85         check(left > 0);
86 
87         info[i].id = i;
88         info[i].f = f;
89         info[i].buf = &buf[pos - offset];
90         info[i].count = (size_t)(end - pos);
91         info[i].offset = pos;
92         info[i].func = func;
93         info[i].rc = -1;
94         info[i].errors = 0;
95 
96         if (info[i].count > left) {
97             info[i].count = left;
98         }
99 
100         pthread_t thread;
101 
102         if (pthread_create(&thread, NULL, __process, &info[i]) != 0) {
103             error("failed to create thread: %s", strerror(errno));
104             rc = -1;
105         } else {
106             handles.push_back(thread);
107         }
108 
109         pos = end;
110         end  += count_per_thread;
111         left -= info[i].count;
112     }
113 
114     check(left == 0);
115 
116     ssize_t nread = 0;
117 
118     /* wait for all threads to complete */
119     for (auto thread : handles) {
120         process_info *p = NULL;
121 
122         if (pthread_join(thread, (void **)&p) != 0) {
123             error("failed to join thread: %s", strerror(errno));
124             rc = -1;
125         } else if (!p || p->rc == -1) {
126             rc = -1;
127         } else {
128             nread += p->rc;
129             f->errors += p->errors;
130         }
131     }
132 
133     if (rc == -1) {
134         errno = EIO;
135         return -1;
136     }
137 
138     return nread;
139 }
140