1 /* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "fec_private.h" 18 19 struct process_info { 20 int id; 21 fec_handle *f; 22 uint8_t *buf; 23 size_t count; 24 uint64_t offset; 25 read_func func; 26 ssize_t rc; 27 size_t errors; 28 }; 29 30 /* thread function */ 31 static void * __process(void *cookie) 32 { 33 process_info *p = static_cast<process_info *>(cookie); 34 35 debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset, 36 p->offset + p->count); 37 38 p->rc = p->func(p->f, p->buf, p->count, p->offset, &p->errors); 39 return p; 40 } 41 42 /* launches a maximum number of threads to process a read */ 43 ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset, 44 read_func func) 45 { 46 check(f); 47 check(buf) 48 check(func); 49 50 if (count == 0) { 51 return 0; 52 } 53 54 int threads = sysconf(_SC_NPROCESSORS_ONLN); 55 56 if (threads < WORK_MIN_THREADS) { 57 threads = WORK_MIN_THREADS; 58 } else if (threads > WORK_MAX_THREADS) { 59 threads = WORK_MAX_THREADS; 60 } 61 62 uint64_t start = (offset / FEC_BLOCKSIZE) * FEC_BLOCKSIZE; 63 size_t blocks = fec_div_round_up(count, FEC_BLOCKSIZE); 64 65 size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE; 66 size_t max_threads = fec_div_round_up(count, count_per_thread); 67 68 if ((size_t)threads > max_threads) { 69 threads = (int)max_threads; 70 } 71 72 size_t left = count; 73 uint64_t pos = offset; 74 uint64_t end = start + count_per_thread; 75 76 debug("%d threads, %zu bytes per thread (total %zu)", threads, 77 count_per_thread, count); 78 79 std::vector<pthread_t> handles; 80 process_info info[threads]; 81 ssize_t rc = 0; 82 83 /* start threads to process queue */ 84 for (int i = 0; i < threads; ++i) { 85 check(left > 0); 86 87 info[i].id = i; 88 info[i].f = f; 89 info[i].buf = &buf[pos - offset]; 90 info[i].count = (size_t)(end - pos); 91 info[i].offset = pos; 92 info[i].func = func; 93 info[i].rc = -1; 94 info[i].errors = 0; 95 96 if (info[i].count > left) { 97 info[i].count = left; 98 } 99 100 pthread_t thread; 101 102 if (pthread_create(&thread, NULL, __process, &info[i]) != 0) { 103 error("failed to create thread: %s", strerror(errno)); 104 rc = -1; 105 } else { 106 handles.push_back(thread); 107 } 108 109 pos = end; 110 end += count_per_thread; 111 left -= info[i].count; 112 } 113 114 check(left == 0); 115 116 ssize_t nread = 0; 117 118 /* wait for all threads to complete */ 119 for (auto thread : handles) { 120 process_info *p = NULL; 121 122 if (pthread_join(thread, (void **)&p) != 0) { 123 error("failed to join thread: %s", strerror(errno)); 124 rc = -1; 125 } else if (!p || p->rc == -1) { 126 rc = -1; 127 } else { 128 nread += p->rc; 129 f->errors += p->errors; 130 } 131 } 132 133 if (rc == -1) { 134 errno = EIO; 135 return -1; 136 } 137 138 return nread; 139 } 140