1 /*
2 * libiio - Library for interfacing industrial I/O (IIO) devices
3 *
4 * Copyright (C) 2014 Analog Devices, Inc.
5 * Author: Paul Cercueil <paul.cercueil@analog.com>
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * */
18
19 #include "ops.h"
20 #include "parser.h"
21 #include "thread-pool.h"
22 #include "../debug.h"
23 #include "../iio-private.h"
24
25 #include <errno.h>
26 #include <limits.h>
27 #include <pthread.h>
28 #include <poll.h>
29 #include <stdbool.h>
30 #include <string.h>
31 #include <sys/eventfd.h>
32 #include <time.h>
33 #include <fcntl.h>
34 #include <signal.h>
35
36 int yyparse(yyscan_t scanner);
37
38 struct DevEntry;
39
40 /* Corresponds to a thread reading from a device */
41 struct ThdEntry {
42 SLIST_ENTRY(ThdEntry) parser_list_entry;
43 SLIST_ENTRY(ThdEntry) dev_list_entry;
44 unsigned int nb, sample_size, samples_count;
45 ssize_t err;
46
47 int eventfd;
48
49 struct parser_pdata *pdata;
50 struct iio_device *dev;
51 struct DevEntry *entry;
52
53 uint32_t *mask;
54 bool active, is_writer, new_client, wait_for_open;
55 };
56
thd_entry_event_signal(struct ThdEntry * thd)57 static void thd_entry_event_signal(struct ThdEntry *thd)
58 {
59 uint64_t e = 1;
60 int ret;
61
62 do {
63 ret = write(thd->eventfd, &e, sizeof(e));
64 } while (ret == -1 && errno == EINTR);
65 }
66
thd_entry_event_wait(struct ThdEntry * thd,pthread_mutex_t * mutex,int fd_in)67 static int thd_entry_event_wait(struct ThdEntry *thd, pthread_mutex_t *mutex,
68 int fd_in)
69 {
70 struct pollfd pfd[3];
71 uint64_t e;
72 int ret;
73
74 pthread_mutex_unlock(mutex);
75
76 pfd[0].fd = thd->eventfd;
77 pfd[0].events = POLLIN;
78 pfd[1].fd = fd_in;
79 pfd[1].events = POLLRDHUP;
80 pfd[2].fd = thread_pool_get_poll_fd(thd->pdata->pool);
81 pfd[2].events = POLLIN;
82
83 do {
84 poll_nointr(pfd, 3);
85
86 if ((pfd[1].revents & POLLRDHUP) || (pfd[2].revents & POLLIN)) {
87 pthread_mutex_lock(mutex);
88 return -EPIPE;
89 }
90
91 do {
92 ret = read(thd->eventfd, &e, sizeof(e));
93 } while (ret == -1 && errno == EINTR);
94 } while (ret == -1 && errno == EAGAIN);
95
96 pthread_mutex_lock(mutex);
97
98 return 0;
99 }
100
101 /* Corresponds to an opened device */
102 struct DevEntry {
103 unsigned int ref_count;
104
105 struct iio_device *dev;
106 struct iio_buffer *buf;
107 unsigned int sample_size, nb_clients;
108 bool update_mask;
109 bool cyclic;
110 bool closed;
111 bool cancelled;
112
113 /* Linked list of ThdEntry structures corresponding
114 * to all the threads who opened the device */
115 SLIST_HEAD(ThdHead, ThdEntry) thdlist_head;
116 pthread_mutex_t thdlist_lock;
117
118 pthread_cond_t rw_ready_cond;
119
120 uint32_t *mask;
121 size_t nb_words;
122 };
123
124 struct sample_cb_info {
125 struct parser_pdata *pdata;
126 unsigned int nb_bytes, cpt;
127 uint32_t *mask;
128 };
129
130 /* Protects iio_device_{set,get}_data() from concurrent access from multiple
131 * clients */
132 static pthread_mutex_t devlist_lock = PTHREAD_MUTEX_INITIALIZER;
133
134 #if WITH_AIO
async_io(struct parser_pdata * pdata,void * buf,size_t len,bool do_read)135 static ssize_t async_io(struct parser_pdata *pdata, void *buf, size_t len,
136 bool do_read)
137 {
138 ssize_t ret;
139 struct pollfd pfd[2];
140 unsigned int num_pfds;
141 struct iocb iocb;
142 struct iocb *ios[1];
143 struct io_event e[1];
144
145 ios[0] = &iocb;
146
147 if (do_read)
148 io_prep_pread(&iocb, pdata->fd_in, buf, len, 0);
149 else
150 io_prep_pwrite(&iocb, pdata->fd_out, buf, len, 0);
151
152 io_set_eventfd(&iocb, pdata->aio_eventfd);
153
154 pthread_mutex_lock(&pdata->aio_mutex);
155
156 ret = io_submit(pdata->aio_ctx, 1, ios);
157 if (ret != 1) {
158 pthread_mutex_unlock(&pdata->aio_mutex);
159 ERROR("Failed to submit IO operation: %zd\n", ret);
160 return -EIO;
161 }
162
163 pfd[0].fd = pdata->aio_eventfd;
164 pfd[0].events = POLLIN;
165 pfd[0].revents = 0;
166 pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
167 pfd[1].events = POLLIN;
168 pfd[1].revents = 0;
169 num_pfds = 2;
170
171 do {
172 poll_nointr(pfd, num_pfds);
173
174 if (pfd[0].revents & POLLIN) {
175 uint64_t event;
176 ret = read(pdata->aio_eventfd, &event, sizeof(event));
177 if (ret != sizeof(event)) {
178 ERROR("Failed to read from eventfd: %d\n", -errno);
179 ret = -EIO;
180 break;
181 }
182
183 ret = io_getevents(pdata->aio_ctx, 0, 1, e, NULL);
184 if (ret != 1) {
185 ERROR("Failed to read IO events: %zd\n", ret);
186 ret = -EIO;
187 break;
188 } else {
189 ret = (long)e[0].res;
190 }
191 } else if ((num_pfds > 1 && pfd[1].revents & POLLIN)) {
192 /* Got a STOP event to abort this whole session */
193 ret = io_cancel(pdata->aio_ctx, &iocb, e);
194 if (ret != -EINPROGRESS && ret != -EINVAL) {
195 ERROR("Failed to cancel IO transfer: %zd\n", ret);
196 ret = -EIO;
197 break;
198 }
199 /* It should not be long now until we get the cancellation event */
200 num_pfds = 1;
201 }
202 } while (!(pfd[0].revents & POLLIN));
203
204 pthread_mutex_unlock(&pdata->aio_mutex);
205
206 /* Got STOP event, treat it as EOF */
207 if (num_pfds == 1)
208 return 0;
209
210 return ret;
211 }
212
213 #define MAX_AIO_REQ_SIZE (1024 * 1024)
214
readfd_aio(struct parser_pdata * pdata,void * dest,size_t len)215 static ssize_t readfd_aio(struct parser_pdata *pdata, void *dest, size_t len)
216 {
217 if (len > MAX_AIO_REQ_SIZE)
218 len = MAX_AIO_REQ_SIZE;
219 return async_io(pdata, dest, len, true);
220 }
221
writefd_aio(struct parser_pdata * pdata,const void * dest,size_t len)222 static ssize_t writefd_aio(struct parser_pdata *pdata, const void *dest,
223 size_t len)
224 {
225 if (len > MAX_AIO_REQ_SIZE)
226 len = MAX_AIO_REQ_SIZE;
227 return async_io(pdata, (void *)dest, len, false);
228 }
229 #endif /* WITH_AIO */
230
readfd_io(struct parser_pdata * pdata,void * dest,size_t len)231 static ssize_t readfd_io(struct parser_pdata *pdata, void *dest, size_t len)
232 {
233 ssize_t ret;
234 struct pollfd pfd[2];
235
236 pfd[0].fd = pdata->fd_in;
237 pfd[0].events = POLLIN | POLLRDHUP;
238 pfd[0].revents = 0;
239 pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
240 pfd[1].events = POLLIN;
241 pfd[1].revents = 0;
242
243 do {
244 poll_nointr(pfd, 2);
245
246 /* Got STOP event, or client closed the socket: treat it as EOF */
247 if (pfd[1].revents & POLLIN || pfd[0].revents & POLLRDHUP)
248 return 0;
249 if (pfd[0].revents & POLLERR)
250 return -EIO;
251 if (!(pfd[0].revents & POLLIN))
252 continue;
253
254 do {
255 if (pdata->fd_in_is_socket)
256 ret = recv(pdata->fd_in, dest, len, MSG_NOSIGNAL);
257 else
258 ret = read(pdata->fd_in, dest, len);
259 } while (ret == -1 && errno == EINTR);
260
261 if (ret != -1 || errno != EAGAIN)
262 break;
263 } while (true);
264
265 if (ret == -1)
266 return -errno;
267
268 return ret;
269 }
270
writefd_io(struct parser_pdata * pdata,const void * src,size_t len)271 static ssize_t writefd_io(struct parser_pdata *pdata, const void *src, size_t len)
272 {
273 ssize_t ret;
274 struct pollfd pfd[2];
275
276 pfd[0].fd = pdata->fd_out;
277 pfd[0].events = POLLOUT;
278 pfd[0].revents = 0;
279 pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
280 pfd[1].events = POLLIN;
281 pfd[1].revents = 0;
282
283 do {
284 poll_nointr(pfd, 2);
285
286 /* Got STOP event, or client closed the socket: treat it as EOF */
287 if (pfd[1].revents & POLLIN || pfd[0].revents & POLLHUP)
288 return 0;
289 if (pfd[0].revents & POLLERR)
290 return -EIO;
291 if (!(pfd[0].revents & POLLOUT))
292 continue;
293
294 do {
295 if (pdata->fd_out_is_socket)
296 ret = send(pdata->fd_out, src, len, MSG_NOSIGNAL);
297 else
298 ret = write(pdata->fd_out, src, len);
299 } while (ret == -1 && errno == EINTR);
300
301 if (ret != -1 || errno != EAGAIN)
302 break;
303 } while (true);
304
305 if (ret == -1)
306 return -errno;
307
308 return ret;
309 }
310
write_all(struct parser_pdata * pdata,const void * src,size_t len)311 ssize_t write_all(struct parser_pdata *pdata, const void *src, size_t len)
312 {
313 uintptr_t ptr = (uintptr_t) src;
314
315 while (len) {
316 ssize_t ret = pdata->writefd(pdata, (void *) ptr, len);
317 if (ret < 0)
318 return ret;
319 if (!ret)
320 return -EPIPE;
321 ptr += ret;
322 len -= ret;
323 }
324
325 return ptr - (uintptr_t) src;
326 }
327
read_all(struct parser_pdata * pdata,void * dst,size_t len)328 static ssize_t read_all(struct parser_pdata *pdata,
329 void *dst, size_t len)
330 {
331 uintptr_t ptr = (uintptr_t) dst;
332
333 while (len) {
334 ssize_t ret = pdata->readfd(pdata, (void *) ptr, len);
335 if (ret < 0)
336 return ret;
337 if (!ret)
338 return -EPIPE;
339 ptr += ret;
340 len -= ret;
341 }
342
343 return ptr - (uintptr_t) dst;
344 }
345
print_value(struct parser_pdata * pdata,long value)346 static void print_value(struct parser_pdata *pdata, long value)
347 {
348 if (pdata->verbose && value < 0) {
349 char buf[1024];
350 iio_strerror(-value, buf, sizeof(buf));
351 output(pdata, "ERROR: ");
352 output(pdata, buf);
353 output(pdata, "\n");
354 } else {
355 char buf[128];
356 sprintf(buf, "%li\n", value);
357 output(pdata, buf);
358 }
359 }
360
send_sample(const struct iio_channel * chn,void * src,size_t length,void * d)361 static ssize_t send_sample(const struct iio_channel *chn,
362 void *src, size_t length, void *d)
363 {
364 struct sample_cb_info *info = d;
365 if (chn->index < 0 || !TEST_BIT(info->mask, chn->number))
366 return 0;
367 if (info->nb_bytes < length)
368 return 0;
369
370 if (info->cpt % length) {
371 unsigned int i, goal = length - info->cpt % length;
372 char zero = 0;
373 ssize_t ret;
374
375 for (i = 0; i < goal; i++) {
376 ret = info->pdata->writefd(info->pdata, &zero, 1);
377 if (ret < 0)
378 return ret;
379 }
380 info->cpt += goal;
381 }
382
383 info->cpt += length;
384 info->nb_bytes -= length;
385 return write_all(info->pdata, src, length);
386 }
387
receive_sample(const struct iio_channel * chn,void * dst,size_t length,void * d)388 static ssize_t receive_sample(const struct iio_channel *chn,
389 void *dst, size_t length, void *d)
390 {
391 struct sample_cb_info *info = d;
392 if (chn->index < 0 || !TEST_BIT(info->mask, chn->number))
393 return 0;
394 if (info->cpt == info->nb_bytes)
395 return 0;
396
397 /* Skip the padding if needed */
398 if (info->cpt % length) {
399 unsigned int i, goal = length - info->cpt % length;
400 char foo;
401 ssize_t ret;
402
403 for (i = 0; i < goal; i++) {
404 ret = info->pdata->readfd(info->pdata, &foo, 1);
405 if (ret < 0)
406 return ret;
407 }
408 info->cpt += goal;
409 }
410
411 info->cpt += length;
412 return read_all(info->pdata, dst, length);
413 }
414
send_data(struct DevEntry * dev,struct ThdEntry * thd,size_t len)415 static ssize_t send_data(struct DevEntry *dev, struct ThdEntry *thd, size_t len)
416 {
417 struct parser_pdata *pdata = thd->pdata;
418 bool demux = server_demux && dev->sample_size != thd->sample_size;
419
420 if (demux)
421 len = (len / dev->sample_size) * thd->sample_size;
422 if (len > thd->nb)
423 len = thd->nb;
424
425 print_value(pdata, len);
426
427 if (thd->new_client) {
428 unsigned int i;
429 char buf[129], *ptr = buf;
430 uint32_t *mask = demux ? thd->mask : dev->mask;
431 ssize_t ret;
432
433 /* Send the current mask */
434 for (i = dev->nb_words; i > 0 && ptr < buf + sizeof(buf);
435 i--, ptr += 8)
436 sprintf(ptr, "%08x", mask[i - 1]);
437
438 *ptr = '\n';
439 ret = write_all(pdata, buf, ptr + 1 - buf);
440 if (ret < 0)
441 return ret;
442
443 thd->new_client = false;
444 }
445
446 if (!demux) {
447 /* Short path */
448 return write_all(pdata, dev->buf->buffer, len);
449 } else {
450 struct sample_cb_info info = {
451 .pdata = pdata,
452 .cpt = 0,
453 .nb_bytes = len,
454 .mask = thd->mask,
455 };
456
457 return iio_buffer_foreach_sample(dev->buf, send_sample, &info);
458 }
459 }
460
receive_data(struct DevEntry * dev,struct ThdEntry * thd)461 static ssize_t receive_data(struct DevEntry *dev, struct ThdEntry *thd)
462 {
463 struct parser_pdata *pdata = thd->pdata;
464
465 /* Inform that no error occured, and that we'll start reading data */
466 if (thd->new_client) {
467 print_value(thd->pdata, 0);
468 thd->new_client = false;
469 }
470
471 if (dev->sample_size == thd->sample_size) {
472 /* Short path: Receive directly in the buffer */
473
474 size_t len = dev->buf->length;
475 if (thd->nb < len)
476 len = thd->nb;
477
478 return read_all(pdata, dev->buf->buffer, len);
479 } else {
480 /* Long path: Mux the samples to the buffer */
481
482 struct sample_cb_info info = {
483 .pdata = pdata,
484 .cpt = 0,
485 .nb_bytes = thd->nb,
486 .mask = thd->mask,
487 };
488
489 return iio_buffer_foreach_sample(dev->buf,
490 receive_sample, &info);
491 }
492 }
493
dev_entry_put(struct DevEntry * entry)494 static void dev_entry_put(struct DevEntry *entry)
495 {
496 bool free_entry = false;
497
498 pthread_mutex_lock(&entry->thdlist_lock);
499 entry->ref_count--;
500 if (entry->ref_count == 0)
501 free_entry = true;
502 pthread_mutex_unlock(&entry->thdlist_lock);
503
504 if (free_entry) {
505 pthread_mutex_destroy(&entry->thdlist_lock);
506 pthread_cond_destroy(&entry->rw_ready_cond);
507
508 free(entry->mask);
509 free(entry);
510 }
511 }
512
signal_thread(struct ThdEntry * thd,ssize_t ret)513 static void signal_thread(struct ThdEntry *thd, ssize_t ret)
514 {
515 thd->err = ret;
516 thd->nb = 0;
517 thd->active = false;
518 thd_entry_event_signal(thd);
519 }
520
rw_thd(struct thread_pool * pool,void * d)521 static void rw_thd(struct thread_pool *pool, void *d)
522 {
523 struct DevEntry *entry = d;
524 struct ThdEntry *thd, *next_thd;
525 struct iio_device *dev = entry->dev;
526 unsigned int nb_words = entry->nb_words;
527 ssize_t ret = 0;
528
529 DEBUG("R/W thread started for device %s\n",
530 dev->name ? dev->name : dev->id);
531
532 while (true) {
533 bool has_readers = false, has_writers = false,
534 mask_updated = false;
535 unsigned int sample_size;
536
537 /* NOTE: this while loop must exit with thdlist_lock locked. */
538 pthread_mutex_lock(&entry->thdlist_lock);
539
540 if (SLIST_EMPTY(&entry->thdlist_head))
541 break;
542
543 if (entry->update_mask) {
544 unsigned int i;
545 unsigned int samples_count = 0;
546
547 memset(entry->mask, 0, nb_words * sizeof(*entry->mask));
548 SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
549 for (i = 0; i < nb_words; i++)
550 entry->mask[i] |= thd->mask[i];
551
552 if (thd->samples_count > samples_count)
553 samples_count = thd->samples_count;
554 }
555
556 if (entry->buf)
557 iio_buffer_destroy(entry->buf);
558
559 for (i = 0; i < dev->nb_channels; i++) {
560 struct iio_channel *chn = dev->channels[i];
561 long index = chn->index;
562
563 if (index < 0)
564 continue;
565
566 if (TEST_BIT(entry->mask, chn->number))
567 iio_channel_enable(chn);
568 else
569 iio_channel_disable(chn);
570 }
571
572 entry->buf = iio_device_create_buffer(dev,
573 samples_count, entry->cyclic);
574 if (!entry->buf) {
575 ret = -errno;
576 ERROR("Unable to create buffer\n");
577 break;
578 }
579 entry->cancelled = false;
580
581 /* Signal the threads that we opened the device */
582 SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
583 if (thd->wait_for_open) {
584 thd->wait_for_open = false;
585 signal_thread(thd, 0);
586 }
587 }
588
589 DEBUG("IIO device %s reopened with new mask:\n",
590 dev->id);
591 for (i = 0; i < nb_words; i++)
592 DEBUG("Mask[%i] = 0x%08x\n", i, entry->mask[i]);
593 entry->update_mask = false;
594
595 entry->sample_size = iio_device_get_sample_size(dev);
596 mask_updated = true;
597 }
598
599 sample_size = entry->sample_size;
600
601 SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
602 thd->active = !thd->err && thd->nb >= sample_size;
603 if (mask_updated && thd->active)
604 signal_thread(thd, thd->nb);
605
606 if (thd->is_writer)
607 has_writers |= thd->active;
608 else
609 has_readers |= thd->active;
610 }
611
612 if (!has_readers && !has_writers) {
613 pthread_cond_wait(&entry->rw_ready_cond,
614 &entry->thdlist_lock);
615 }
616
617 pthread_mutex_unlock(&entry->thdlist_lock);
618
619 if (!has_readers && !has_writers)
620 continue;
621
622 if (has_readers) {
623 ssize_t nb_bytes;
624
625 ret = iio_buffer_refill(entry->buf);
626
627 pthread_mutex_lock(&entry->thdlist_lock);
628
629 /*
630 * When the last client disconnects the buffer is
631 * cancelled and iio_buffer_refill() returns an error. A
632 * new client might have connected before we got here
633 * though, in that case the rw thread has to stay active
634 * and a new buffer is created. If the list is still empty the loop
635 * will exit normally.
636 */
637 if (entry->cancelled) {
638 pthread_mutex_unlock(&entry->thdlist_lock);
639 continue;
640 }
641
642 if (ret < 0) {
643 ERROR("Reading from device failed: %i\n",
644 (int) ret);
645 break;
646 }
647
648 nb_bytes = ret;
649
650 /* We don't use SLIST_FOREACH here. As soon as a thread is
651 * signaled, its "thd" structure might be freed;
652 * SLIST_FOREACH would then cause a segmentation fault, as it
653 * reads "thd" to get the address of the next element. */
654 for (thd = SLIST_FIRST(&entry->thdlist_head);
655 thd; thd = next_thd) {
656 next_thd = SLIST_NEXT(thd, dev_list_entry);
657
658 if (!thd->active || thd->is_writer)
659 continue;
660
661 ret = send_data(entry, thd, nb_bytes);
662 if (ret > 0)
663 thd->nb -= ret;
664
665 if (ret < 0 || thd->nb < sample_size)
666 signal_thread(thd, (ret < 0) ?
667 ret : thd->nb);
668 }
669
670 pthread_mutex_unlock(&entry->thdlist_lock);
671 }
672
673 if (has_writers) {
674 ssize_t nb_bytes = 0;
675
676 pthread_mutex_lock(&entry->thdlist_lock);
677
678 /* Reset the size of the buffer to its maximum size */
679 entry->buf->data_length = entry->buf->length;
680
681 /* Same comment as above */
682 for (thd = SLIST_FIRST(&entry->thdlist_head);
683 thd; thd = next_thd) {
684 next_thd = SLIST_NEXT(thd, dev_list_entry);
685
686 if (!thd->active || !thd->is_writer)
687 continue;
688
689 ret = receive_data(entry, thd);
690 if (ret > 0) {
691 thd->nb -= ret;
692 if (ret > nb_bytes)
693 nb_bytes = ret;
694 }
695
696 if (ret < 0)
697 signal_thread(thd, ret);
698 }
699
700 ret = iio_buffer_push_partial(entry->buf,
701 nb_bytes / sample_size);
702 if (entry->cancelled) {
703 pthread_mutex_unlock(&entry->thdlist_lock);
704 continue;
705 }
706 if (ret < 0) {
707 ERROR("Writing to device failed: %i\n",
708 (int) ret);
709 break;
710 }
711
712 /* Signal threads which completed their RW command */
713 for (thd = SLIST_FIRST(&entry->thdlist_head);
714 thd; thd = next_thd) {
715 next_thd = SLIST_NEXT(thd, dev_list_entry);
716 if (thd->active && thd->is_writer &&
717 thd->nb < sample_size)
718 signal_thread(thd, thd->nb);
719 }
720
721 pthread_mutex_unlock(&entry->thdlist_lock);
722 }
723 }
724
725 /* Signal all remaining threads */
726 for (thd = SLIST_FIRST(&entry->thdlist_head); thd; thd = next_thd) {
727 next_thd = SLIST_NEXT(thd, dev_list_entry);
728 SLIST_REMOVE(&entry->thdlist_head, thd, ThdEntry, dev_list_entry);
729 thd->wait_for_open = false;
730 signal_thread(thd, ret);
731 }
732 if (entry->buf) {
733 iio_buffer_destroy(entry->buf);
734 entry->buf = NULL;
735 }
736 entry->closed = true;
737 pthread_mutex_unlock(&entry->thdlist_lock);
738
739 pthread_mutex_lock(&devlist_lock);
740 /* It is possible that a new thread has already started, make sure to
741 * not overwrite it. */
742 if (iio_device_get_data(dev) == entry)
743 iio_device_set_data(dev, NULL);
744 pthread_mutex_unlock(&devlist_lock);
745
746 DEBUG("Stopping R/W thread for device %s\n",
747 dev->name ? dev->name : dev->id);
748
749 dev_entry_put(entry);
750 }
751
parser_lookup_thd_entry(struct parser_pdata * pdata,struct iio_device * dev)752 static struct ThdEntry *parser_lookup_thd_entry(struct parser_pdata *pdata,
753 struct iio_device *dev)
754 {
755 struct ThdEntry *t;
756
757 SLIST_FOREACH(t, &pdata->thdlist_head, parser_list_entry) {
758 if (t->dev == dev)
759 return t;
760 }
761
762 return NULL;
763 }
764
rw_buffer(struct parser_pdata * pdata,struct iio_device * dev,unsigned int nb,bool is_write)765 static ssize_t rw_buffer(struct parser_pdata *pdata,
766 struct iio_device *dev, unsigned int nb, bool is_write)
767 {
768 struct DevEntry *entry;
769 struct ThdEntry *thd;
770 ssize_t ret;
771
772 if (!dev)
773 return -ENODEV;
774
775 thd = parser_lookup_thd_entry(pdata, dev);
776 if (!thd)
777 return -EBADF;
778
779 entry = thd->entry;
780
781 if (nb < entry->sample_size)
782 return 0;
783
784 pthread_mutex_lock(&entry->thdlist_lock);
785 if (entry->closed) {
786 pthread_mutex_unlock(&entry->thdlist_lock);
787 return -EBADF;
788 }
789
790 if (thd->nb) {
791 pthread_mutex_unlock(&entry->thdlist_lock);
792 return -EBUSY;
793 }
794
795 thd->new_client = true;
796 thd->nb = nb;
797 thd->err = 0;
798 thd->is_writer = is_write;
799 thd->active = true;
800
801 pthread_cond_signal(&entry->rw_ready_cond);
802
803 DEBUG("Waiting for completion...\n");
804 while (thd->active) {
805 ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
806 if (ret)
807 break;
808 }
809 if (ret == 0)
810 ret = thd->err;
811 pthread_mutex_unlock(&entry->thdlist_lock);
812
813 if (ret > 0 && ret < nb)
814 print_value(thd->pdata, 0);
815
816 DEBUG("Exiting rw_buffer with code %li\n", (long) ret);
817 if (ret < 0)
818 return ret;
819 else
820 return nb - ret;
821 }
822
get_mask(const char * mask,size_t * len)823 static uint32_t *get_mask(const char *mask, size_t *len)
824 {
825 size_t nb = (*len + 7) / 8;
826 uint32_t *ptr, *words = calloc(nb, sizeof(*words));
827 if (!words)
828 return NULL;
829
830 ptr = words + nb;
831 while (*mask) {
832 char buf[9];
833 sprintf(buf, "%.*s", 8, mask);
834 sscanf(buf, "%08x", --ptr);
835 mask += 8;
836 DEBUG("Mask[%lu]: 0x%08x\n",
837 (unsigned long) (words - ptr) / 4, *ptr);
838 }
839
840 *len = nb;
841 return words;
842 }
843
free_thd_entry(struct ThdEntry * t)844 static void free_thd_entry(struct ThdEntry *t)
845 {
846 close(t->eventfd);
847 free(t->mask);
848 free(t);
849 }
850
remove_thd_entry(struct ThdEntry * t)851 static void remove_thd_entry(struct ThdEntry *t)
852 {
853 struct DevEntry *entry = t->entry;
854
855 pthread_mutex_lock(&entry->thdlist_lock);
856 if (!entry->closed) {
857 entry->update_mask = true;
858 SLIST_REMOVE(&entry->thdlist_head, t, ThdEntry, dev_list_entry);
859 if (SLIST_EMPTY(&entry->thdlist_head) && entry->buf) {
860 entry->cancelled = true;
861 iio_buffer_cancel(entry->buf); /* Wakeup the rw thread */
862 }
863
864 pthread_cond_signal(&entry->rw_ready_cond);
865 }
866 pthread_mutex_unlock(&entry->thdlist_lock);
867 dev_entry_put(entry);
868
869 free_thd_entry(t);
870 }
871
open_dev_helper(struct parser_pdata * pdata,struct iio_device * dev,size_t samples_count,const char * mask,bool cyclic)872 static int open_dev_helper(struct parser_pdata *pdata, struct iio_device *dev,
873 size_t samples_count, const char *mask, bool cyclic)
874 {
875 int ret = -ENOMEM;
876 struct DevEntry *entry;
877 struct ThdEntry *thd;
878 size_t len = strlen(mask);
879 uint32_t *words;
880 unsigned int nb_channels;
881 unsigned int cyclic_retry = 500;
882
883 if (!dev)
884 return -ENODEV;
885
886 nb_channels = dev->nb_channels;
887 if (len != ((nb_channels + 31) / 32) * 8)
888 return -EINVAL;
889
890 words = get_mask(mask, &len);
891 if (!words)
892 return -ENOMEM;
893
894 thd = zalloc(sizeof(*thd));
895 if (!thd)
896 goto err_free_words;
897
898 thd->wait_for_open = true;
899 thd->mask = words;
900 thd->nb = 0;
901 thd->samples_count = samples_count;
902 thd->sample_size = iio_device_get_sample_size_mask(dev, words, len);
903 thd->pdata = pdata;
904 thd->dev = dev;
905 thd->eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
906
907 retry:
908 /* Atomically look up the thread and make sure that it is still active
909 * or allocate new one. */
910 pthread_mutex_lock(&devlist_lock);
911 entry = iio_device_get_data(dev);
912 if (entry) {
913 if (cyclic || entry->cyclic) {
914 /* Only one client allowed in cyclic mode */
915 pthread_mutex_unlock(&devlist_lock);
916
917 /* There is an inherent race condition if a client
918 * creates a new cyclic buffer shortly after destroying
919 * a previous. E.g. like
920 *
921 * iio_buffer_destroy(buf);
922 * buf = iio_device_create_buffer(dev, n, true);
923 *
924 * In this case the two buffers each use their own
925 * communication channel which are unordered to each
926 * other. E.g. the socket open might arrive before the
927 * socket close on the host side, even though they were
928 * sent in the opposite order on the client side. This
929 * race condition can cause an error being reported back
930 * to the client, even though the code on the client
931 * side was well formed and would work fine e.g. using
932 * the local backend.
933 *
934 * To avoid this issue go to sleep for up to 50ms in
935 * intervals of 100us. This should be enough time for
936 * the issue to resolve itself. If there actually is
937 * contention on the buffer an error will eventually be
938 * returned in which case the additional delay cause by
939 * the retires should not matter too much.
940 *
941 * This is not pretty but it works.
942 */
943 if (cyclic_retry) {
944 cyclic_retry--;
945 usleep(100);
946 goto retry;
947 }
948
949 ret = -EBUSY;
950 goto err_free_thd;
951 }
952
953 pthread_mutex_lock(&entry->thdlist_lock);
954 if (!entry->closed) {
955 pthread_mutex_unlock(&devlist_lock);
956
957 entry->ref_count++;
958
959 SLIST_INSERT_HEAD(&entry->thdlist_head, thd, dev_list_entry);
960 thd->entry = entry;
961 entry->update_mask = true;
962 DEBUG("Added thread to client list\n");
963
964 pthread_cond_signal(&entry->rw_ready_cond);
965
966 /* Wait until the device is opened by the rw thread */
967 while (thd->wait_for_open) {
968 ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
969 if (ret)
970 break;
971 }
972 pthread_mutex_unlock(&entry->thdlist_lock);
973
974 if (ret == 0)
975 ret = (int) thd->err;
976 if (ret < 0)
977 remove_thd_entry(thd);
978 else
979 SLIST_INSERT_HEAD(&pdata->thdlist_head, thd, parser_list_entry);
980 return ret;
981 } else {
982 pthread_mutex_unlock(&entry->thdlist_lock);
983 }
984 }
985
986 entry = zalloc(sizeof(*entry));
987 if (!entry) {
988 pthread_mutex_unlock(&devlist_lock);
989 goto err_free_thd;
990 }
991
992 entry->ref_count = 2; /* One for thread, one for the client */
993
994 entry->mask = malloc(len * sizeof(*words));
995 if (!entry->mask) {
996 pthread_mutex_unlock(&devlist_lock);
997 goto err_free_entry;
998 }
999
1000 entry->cyclic = cyclic;
1001 entry->nb_words = len;
1002 entry->update_mask = true;
1003 entry->dev = dev;
1004 entry->buf = NULL;
1005 SLIST_INIT(&entry->thdlist_head);
1006 SLIST_INSERT_HEAD(&entry->thdlist_head, thd, dev_list_entry);
1007 thd->entry = entry;
1008 DEBUG("Added thread to client list\n");
1009
1010 pthread_mutex_init(&entry->thdlist_lock, NULL);
1011 pthread_cond_init(&entry->rw_ready_cond, NULL);
1012
1013 ret = thread_pool_add_thread(main_thread_pool, rw_thd, entry, "rw_thd");
1014 if (ret) {
1015 pthread_mutex_unlock(&devlist_lock);
1016 goto err_free_entry_mask;
1017 }
1018
1019 DEBUG("Adding new device thread to device list\n");
1020 iio_device_set_data(dev, entry);
1021 pthread_mutex_unlock(&devlist_lock);
1022
1023 pthread_mutex_lock(&entry->thdlist_lock);
1024 /* Wait until the device is opened by the rw thread */
1025 while (thd->wait_for_open) {
1026 ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
1027 if (ret)
1028 break;
1029 }
1030 pthread_mutex_unlock(&entry->thdlist_lock);
1031
1032 if (ret == 0)
1033 ret = (int) thd->err;
1034 if (ret < 0)
1035 remove_thd_entry(thd);
1036 else
1037 SLIST_INSERT_HEAD(&pdata->thdlist_head, thd, parser_list_entry);
1038 return ret;
1039
1040 err_free_entry_mask:
1041 free(entry->mask);
1042 err_free_entry:
1043 free(entry);
1044 err_free_thd:
1045 close(thd->eventfd);
1046 free(thd);
1047 err_free_words:
1048 free(words);
1049 return ret;
1050 }
1051
close_dev_helper(struct parser_pdata * pdata,struct iio_device * dev)1052 static int close_dev_helper(struct parser_pdata *pdata, struct iio_device *dev)
1053 {
1054 struct ThdEntry *t;
1055
1056 if (!dev)
1057 return -ENODEV;
1058
1059 t = parser_lookup_thd_entry(pdata, dev);
1060 if (!t)
1061 return -ENXIO;
1062
1063 SLIST_REMOVE(&pdata->thdlist_head, t, ThdEntry, parser_list_entry);
1064 remove_thd_entry(t);
1065
1066 return 0;
1067 }
1068
open_dev(struct parser_pdata * pdata,struct iio_device * dev,size_t samples_count,const char * mask,bool cyclic)1069 int open_dev(struct parser_pdata *pdata, struct iio_device *dev,
1070 size_t samples_count, const char *mask, bool cyclic)
1071 {
1072 int ret = open_dev_helper(pdata, dev, samples_count, mask, cyclic);
1073 print_value(pdata, ret);
1074 return ret;
1075 }
1076
close_dev(struct parser_pdata * pdata,struct iio_device * dev)1077 int close_dev(struct parser_pdata *pdata, struct iio_device *dev)
1078 {
1079 int ret = close_dev_helper(pdata, dev);
1080 print_value(pdata, ret);
1081 return ret;
1082 }
1083
rw_dev(struct parser_pdata * pdata,struct iio_device * dev,unsigned int nb,bool is_write)1084 ssize_t rw_dev(struct parser_pdata *pdata, struct iio_device *dev,
1085 unsigned int nb, bool is_write)
1086 {
1087 ssize_t ret = rw_buffer(pdata, dev, nb, is_write);
1088 if (ret <= 0 || is_write)
1089 print_value(pdata, ret);
1090 return ret;
1091 }
1092
read_dev_attr(struct parser_pdata * pdata,struct iio_device * dev,const char * attr,enum iio_attr_type type)1093 ssize_t read_dev_attr(struct parser_pdata *pdata, struct iio_device *dev,
1094 const char *attr, enum iio_attr_type type)
1095 {
1096 /* We use a very large buffer here, as if attr is NULL all the
1097 * attributes will be read, which may represents a few kilobytes worth
1098 * of data. */
1099 char buf[0x10000];
1100 ssize_t ret = -EINVAL;
1101
1102 if (!dev) {
1103 print_value(pdata, -ENODEV);
1104 return -ENODEV;
1105 }
1106
1107 switch (type) {
1108 case IIO_ATTR_TYPE_DEVICE:
1109 ret = iio_device_attr_read(dev, attr, buf, sizeof(buf) - 1);
1110 break;
1111 case IIO_ATTR_TYPE_DEBUG:
1112 ret = iio_device_debug_attr_read(dev,
1113 attr, buf, sizeof(buf) - 1);
1114 break;
1115 case IIO_ATTR_TYPE_BUFFER:
1116 ret = iio_device_buffer_attr_read(dev,
1117 attr, buf, sizeof(buf) - 1);
1118 break;
1119 default:
1120 ret = -EINVAL;
1121 break;
1122 }
1123 print_value(pdata, ret);
1124 if (ret < 0)
1125 return ret;
1126
1127 buf[ret] = '\n';
1128 return write_all(pdata, buf, ret + 1);
1129 }
1130
write_dev_attr(struct parser_pdata * pdata,struct iio_device * dev,const char * attr,size_t len,enum iio_attr_type type)1131 ssize_t write_dev_attr(struct parser_pdata *pdata, struct iio_device *dev,
1132 const char *attr, size_t len, enum iio_attr_type type)
1133 {
1134 ssize_t ret = -ENOMEM;
1135 char *buf;
1136
1137 if (!dev) {
1138 ret = -ENODEV;
1139 goto err_print_value;
1140 }
1141
1142 buf = malloc(len);
1143 if (!buf)
1144 goto err_print_value;
1145
1146 ret = read_all(pdata, buf, len);
1147 if (ret < 0)
1148 goto err_free_buffer;
1149
1150 switch (type) {
1151 case IIO_ATTR_TYPE_DEVICE:
1152 ret = iio_device_attr_write_raw(dev, attr, buf, len);
1153 break;
1154 case IIO_ATTR_TYPE_DEBUG:
1155 ret = iio_device_debug_attr_write_raw(dev, attr, buf, len);
1156 break;
1157 case IIO_ATTR_TYPE_BUFFER:
1158 ret = iio_device_buffer_attr_write_raw(dev, attr, buf, len);
1159 break;
1160 default:
1161 ret = -EINVAL;
1162 break;
1163 }
1164
1165 err_free_buffer:
1166 free(buf);
1167 err_print_value:
1168 print_value(pdata, ret);
1169 return ret;
1170 }
1171
read_chn_attr(struct parser_pdata * pdata,struct iio_channel * chn,const char * attr)1172 ssize_t read_chn_attr(struct parser_pdata *pdata,
1173 struct iio_channel *chn, const char *attr)
1174 {
1175 char buf[1024];
1176 ssize_t ret = -ENODEV;
1177
1178 if (chn)
1179 ret = iio_channel_attr_read(chn, attr, buf, sizeof(buf) - 1);
1180 else if (pdata->dev)
1181 ret = -ENXIO;
1182 print_value(pdata, ret);
1183 if (ret < 0)
1184 return ret;
1185
1186 buf[ret] = '\n';
1187 return write_all(pdata, buf, ret + 1);
1188 }
1189
write_chn_attr(struct parser_pdata * pdata,struct iio_channel * chn,const char * attr,size_t len)1190 ssize_t write_chn_attr(struct parser_pdata *pdata,
1191 struct iio_channel *chn, const char *attr, size_t len)
1192 {
1193 ssize_t ret = -ENOMEM;
1194 char *buf = malloc(len);
1195 if (!buf)
1196 goto err_print_value;
1197
1198 ret = read_all(pdata, buf, len);
1199 if (ret < 0)
1200 goto err_free_buffer;
1201
1202 if (chn)
1203 ret = iio_channel_attr_write_raw(chn, attr, buf, len);
1204 else if (pdata->dev)
1205 ret = -ENXIO;
1206 else
1207 ret = -ENODEV;
1208 err_free_buffer:
1209 free(buf);
1210 err_print_value:
1211 print_value(pdata, ret);
1212 return ret;
1213 }
1214
set_trigger(struct parser_pdata * pdata,struct iio_device * dev,const char * trigger)1215 ssize_t set_trigger(struct parser_pdata *pdata,
1216 struct iio_device *dev, const char *trigger)
1217 {
1218 struct iio_device *trig = NULL;
1219 ssize_t ret = -ENOENT;
1220
1221 if (!dev) {
1222 ret = -ENODEV;
1223 goto err_print_value;
1224 }
1225
1226 if (trigger) {
1227 trig = iio_context_find_device(pdata->ctx, trigger);
1228 if (!trig)
1229 goto err_print_value;
1230 }
1231
1232 ret = iio_device_set_trigger(dev, trig);
1233 err_print_value:
1234 print_value(pdata, ret);
1235 return ret;
1236 }
1237
get_trigger(struct parser_pdata * pdata,struct iio_device * dev)1238 ssize_t get_trigger(struct parser_pdata *pdata, struct iio_device *dev)
1239 {
1240 const struct iio_device *trigger;
1241 ssize_t ret;
1242
1243 if (!dev) {
1244 print_value(pdata, -ENODEV);
1245 return -ENODEV;
1246 }
1247
1248 ret = iio_device_get_trigger(dev, &trigger);
1249 if (!ret && trigger) {
1250 char buf[256];
1251
1252 ret = strlen(trigger->name);
1253 print_value(pdata, ret);
1254
1255 snprintf(buf, sizeof(buf), "%s\n", trigger->name);
1256 ret = write_all(pdata, buf, ret + 1);
1257 } else {
1258 print_value(pdata, ret);
1259 }
1260 return ret;
1261 }
1262
set_timeout(struct parser_pdata * pdata,unsigned int timeout)1263 int set_timeout(struct parser_pdata *pdata, unsigned int timeout)
1264 {
1265 int ret = iio_context_set_timeout(pdata->ctx, timeout);
1266 print_value(pdata, ret);
1267 return ret;
1268 }
1269
set_buffers_count(struct parser_pdata * pdata,struct iio_device * dev,long value)1270 int set_buffers_count(struct parser_pdata *pdata,
1271 struct iio_device *dev, long value)
1272 {
1273 int ret = -EINVAL;
1274
1275 if (!dev) {
1276 ret = -ENODEV;
1277 goto err_print_value;
1278 }
1279
1280 if (value >= 1)
1281 ret = iio_device_set_kernel_buffers_count(
1282 dev, (unsigned int) value);
1283 err_print_value:
1284 print_value(pdata, ret);
1285 return ret;
1286 }
1287
read_line(struct parser_pdata * pdata,char * buf,size_t len)1288 ssize_t read_line(struct parser_pdata *pdata, char *buf, size_t len)
1289 {
1290 ssize_t ret;
1291
1292 if (pdata->fd_in_is_socket) {
1293 struct pollfd pfd[2];
1294 bool found;
1295 size_t bytes_read = 0;
1296
1297 pfd[0].fd = pdata->fd_in;
1298 pfd[0].events = POLLIN | POLLRDHUP;
1299 pfd[0].revents = 0;
1300 pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
1301 pfd[1].events = POLLIN;
1302 pfd[1].revents = 0;
1303
1304 do {
1305 size_t i, to_trunc;
1306
1307 poll_nointr(pfd, 2);
1308
1309 if (pfd[1].revents & POLLIN ||
1310 pfd[0].revents & POLLRDHUP)
1311 return 0;
1312
1313 /* First read from the socket, without advancing the
1314 * read offset */
1315 ret = recv(pdata->fd_in, buf, len,
1316 MSG_NOSIGNAL | MSG_PEEK);
1317 if (ret < 0)
1318 return -errno;
1319
1320 /* Lookup for the trailing \n */
1321 for (i = 0; i < (size_t) ret && buf[i] != '\n'; i++);
1322 found = i < (size_t) ret;
1323
1324 len -= ret;
1325 buf += ret;
1326
1327 to_trunc = found ? i + 1 : (size_t) ret;
1328
1329 /* Advance the read offset after the \n if found, or
1330 * after the last character read otherwise */
1331 ret = recv(pdata->fd_in, NULL, to_trunc,
1332 MSG_NOSIGNAL | MSG_TRUNC);
1333 if (ret < 0)
1334 return -errno;
1335
1336 bytes_read += to_trunc;
1337 } while (!found && len);
1338
1339 /* No \n found? Just garbage data */
1340 if (!found)
1341 ret = -EIO;
1342 else
1343 ret = bytes_read;
1344 } else {
1345 ret = pdata->readfd(pdata, buf, len);
1346 }
1347
1348 return ret;
1349 }
1350
interpreter(struct iio_context * ctx,int fd_in,int fd_out,bool verbose,bool is_socket,bool use_aio,struct thread_pool * pool)1351 void interpreter(struct iio_context *ctx, int fd_in, int fd_out, bool verbose,
1352 bool is_socket, bool use_aio, struct thread_pool *pool)
1353 {
1354 yyscan_t scanner;
1355 struct parser_pdata pdata;
1356 unsigned int i;
1357 int ret;
1358
1359 pdata.ctx = ctx;
1360 pdata.stop = false;
1361 pdata.fd_in = fd_in;
1362 pdata.fd_out = fd_out;
1363 pdata.verbose = verbose;
1364 pdata.pool = pool;
1365
1366 pdata.fd_in_is_socket = is_socket;
1367 pdata.fd_out_is_socket = is_socket;
1368
1369 SLIST_INIT(&pdata.thdlist_head);
1370
1371 if (use_aio) {
1372 /* Note: if WITH_AIO is not defined, use_aio is always false.
1373 * We ensure that in iiod.c. */
1374 #if WITH_AIO
1375 char err_str[1024];
1376
1377 pdata.aio_eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
1378 if (pdata.aio_eventfd < 0) {
1379 iio_strerror(errno, err_str, sizeof(err_str));
1380 ERROR("Failed to create AIO eventfd: %s\n", err_str);
1381 return;
1382 }
1383
1384 pdata.aio_ctx = 0;
1385 ret = io_setup(1, &pdata.aio_ctx);
1386 if (ret < 0) {
1387 iio_strerror(-ret, err_str, sizeof(err_str));
1388 ERROR("Failed to create AIO context: %s\n", err_str);
1389 close(pdata.aio_eventfd);
1390 return;
1391 }
1392 pthread_mutex_init(&pdata.aio_mutex, NULL);
1393 pdata.readfd = readfd_aio;
1394 pdata.writefd = writefd_aio;
1395 #endif
1396 } else {
1397 pdata.readfd = readfd_io;
1398 pdata.writefd = writefd_io;
1399 }
1400
1401 yylex_init_extra(&pdata, &scanner);
1402
1403 do {
1404 if (verbose)
1405 output(&pdata, "iio-daemon > ");
1406 ret = yyparse(scanner);
1407 } while (!pdata.stop && ret >= 0);
1408
1409 yylex_destroy(scanner);
1410
1411 /* Close all opened devices */
1412 for (i = 0; i < ctx->nb_devices; i++)
1413 close_dev_helper(&pdata, ctx->devices[i]);
1414
1415 #if WITH_AIO
1416 if (use_aio) {
1417 io_destroy(pdata.aio_ctx);
1418 close(pdata.aio_eventfd);
1419 }
1420 #endif
1421 }
1422