1 /*
2  * libiio - Library for interfacing industrial I/O (IIO) devices
3  *
4  * Copyright (C) 2014 Analog Devices, Inc.
5  * Author: Paul Cercueil <paul.cercueil@analog.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * */
18 
19 #include "ops.h"
20 #include "parser.h"
21 #include "thread-pool.h"
22 #include "../debug.h"
23 #include "../iio-private.h"
24 
25 #include <errno.h>
26 #include <limits.h>
27 #include <pthread.h>
28 #include <poll.h>
29 #include <stdbool.h>
30 #include <string.h>
31 #include <sys/eventfd.h>
32 #include <time.h>
33 #include <fcntl.h>
34 #include <signal.h>
35 
36 int yyparse(yyscan_t scanner);
37 
38 struct DevEntry;
39 
40 /* Corresponds to a thread reading from a device */
41 struct ThdEntry {
42 	SLIST_ENTRY(ThdEntry) parser_list_entry;
43 	SLIST_ENTRY(ThdEntry) dev_list_entry;
44 	unsigned int nb, sample_size, samples_count;
45 	ssize_t err;
46 
47 	int eventfd;
48 
49 	struct parser_pdata *pdata;
50 	struct iio_device *dev;
51 	struct DevEntry *entry;
52 
53 	uint32_t *mask;
54 	bool active, is_writer, new_client, wait_for_open;
55 };
56 
thd_entry_event_signal(struct ThdEntry * thd)57 static void thd_entry_event_signal(struct ThdEntry *thd)
58 {
59 	uint64_t e = 1;
60 	int ret;
61 
62 	do {
63 		ret = write(thd->eventfd, &e, sizeof(e));
64 	} while (ret == -1 && errno == EINTR);
65 }
66 
thd_entry_event_wait(struct ThdEntry * thd,pthread_mutex_t * mutex,int fd_in)67 static int thd_entry_event_wait(struct ThdEntry *thd, pthread_mutex_t *mutex,
68 	int fd_in)
69 {
70 	struct pollfd pfd[3];
71 	uint64_t e;
72 	int ret;
73 
74 	pthread_mutex_unlock(mutex);
75 
76 	pfd[0].fd = thd->eventfd;
77 	pfd[0].events = POLLIN;
78 	pfd[1].fd = fd_in;
79 	pfd[1].events = POLLRDHUP;
80 	pfd[2].fd = thread_pool_get_poll_fd(thd->pdata->pool);
81 	pfd[2].events = POLLIN;
82 
83 	do {
84 		poll_nointr(pfd, 3);
85 
86 		if ((pfd[1].revents & POLLRDHUP) || (pfd[2].revents & POLLIN)) {
87 			pthread_mutex_lock(mutex);
88 			return -EPIPE;
89 		}
90 
91 		do {
92 			ret = read(thd->eventfd, &e, sizeof(e));
93 		} while (ret == -1 && errno == EINTR);
94 	} while (ret == -1 && errno == EAGAIN);
95 
96 	pthread_mutex_lock(mutex);
97 
98 	return 0;
99 }
100 
101 /* Corresponds to an opened device */
102 struct DevEntry {
103 	unsigned int ref_count;
104 
105 	struct iio_device *dev;
106 	struct iio_buffer *buf;
107 	unsigned int sample_size, nb_clients;
108 	bool update_mask;
109 	bool cyclic;
110 	bool closed;
111 	bool cancelled;
112 
113 	/* Linked list of ThdEntry structures corresponding
114 	 * to all the threads who opened the device */
115 	SLIST_HEAD(ThdHead, ThdEntry) thdlist_head;
116 	pthread_mutex_t thdlist_lock;
117 
118 	pthread_cond_t rw_ready_cond;
119 
120 	uint32_t *mask;
121 	size_t nb_words;
122 };
123 
124 struct sample_cb_info {
125 	struct parser_pdata *pdata;
126 	unsigned int nb_bytes, cpt;
127 	uint32_t *mask;
128 };
129 
130 /* Protects iio_device_{set,get}_data() from concurrent access from multiple
131  * clients */
132 static pthread_mutex_t devlist_lock = PTHREAD_MUTEX_INITIALIZER;
133 
134 #if WITH_AIO
async_io(struct parser_pdata * pdata,void * buf,size_t len,bool do_read)135 static ssize_t async_io(struct parser_pdata *pdata, void *buf, size_t len,
136 	bool do_read)
137 {
138 	ssize_t ret;
139 	struct pollfd pfd[2];
140 	unsigned int num_pfds;
141 	struct iocb iocb;
142 	struct iocb *ios[1];
143 	struct io_event e[1];
144 
145 	ios[0] = &iocb;
146 
147 	if (do_read)
148 		io_prep_pread(&iocb, pdata->fd_in, buf, len, 0);
149 	else
150 		io_prep_pwrite(&iocb, pdata->fd_out, buf, len, 0);
151 
152 	io_set_eventfd(&iocb, pdata->aio_eventfd);
153 
154 	pthread_mutex_lock(&pdata->aio_mutex);
155 
156 	ret = io_submit(pdata->aio_ctx, 1, ios);
157 	if (ret != 1) {
158 		pthread_mutex_unlock(&pdata->aio_mutex);
159 		ERROR("Failed to submit IO operation: %zd\n", ret);
160 		return -EIO;
161 	}
162 
163 	pfd[0].fd = pdata->aio_eventfd;
164 	pfd[0].events = POLLIN;
165 	pfd[0].revents = 0;
166 	pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
167 	pfd[1].events = POLLIN;
168 	pfd[1].revents = 0;
169 	num_pfds = 2;
170 
171 	do {
172 		poll_nointr(pfd, num_pfds);
173 
174 		if (pfd[0].revents & POLLIN) {
175 			uint64_t event;
176 			ret = read(pdata->aio_eventfd, &event, sizeof(event));
177 			if (ret != sizeof(event)) {
178 				ERROR("Failed to read from eventfd: %d\n", -errno);
179 				ret = -EIO;
180 				break;
181 			}
182 
183 			ret = io_getevents(pdata->aio_ctx, 0, 1, e, NULL);
184 			if (ret != 1) {
185 				ERROR("Failed to read IO events: %zd\n", ret);
186 				ret = -EIO;
187 				break;
188 			} else {
189 				ret = (long)e[0].res;
190 			}
191 		} else if ((num_pfds > 1 && pfd[1].revents & POLLIN)) {
192 			/* Got a STOP event to abort this whole session */
193 			ret = io_cancel(pdata->aio_ctx, &iocb, e);
194 			if (ret != -EINPROGRESS && ret != -EINVAL) {
195 				ERROR("Failed to cancel IO transfer: %zd\n", ret);
196 				ret = -EIO;
197 				break;
198 			}
199 			/* It should not be long now until we get the cancellation event */
200 			num_pfds = 1;
201 		}
202 	} while (!(pfd[0].revents & POLLIN));
203 
204 	pthread_mutex_unlock(&pdata->aio_mutex);
205 
206 	/* Got STOP event, treat it as EOF */
207 	if (num_pfds == 1)
208 		return 0;
209 
210 	return ret;
211 }
212 
213 #define MAX_AIO_REQ_SIZE (1024 * 1024)
214 
readfd_aio(struct parser_pdata * pdata,void * dest,size_t len)215 static ssize_t readfd_aio(struct parser_pdata *pdata, void *dest, size_t len)
216 {
217 	if (len > MAX_AIO_REQ_SIZE)
218 		len = MAX_AIO_REQ_SIZE;
219 	return async_io(pdata, dest, len, true);
220 }
221 
writefd_aio(struct parser_pdata * pdata,const void * dest,size_t len)222 static ssize_t writefd_aio(struct parser_pdata *pdata, const void *dest,
223 		size_t len)
224 {
225 	if (len > MAX_AIO_REQ_SIZE)
226 		len = MAX_AIO_REQ_SIZE;
227 	return async_io(pdata, (void *)dest, len, false);
228 }
229 #endif /* WITH_AIO */
230 
readfd_io(struct parser_pdata * pdata,void * dest,size_t len)231 static ssize_t readfd_io(struct parser_pdata *pdata, void *dest, size_t len)
232 {
233 	ssize_t ret;
234 	struct pollfd pfd[2];
235 
236 	pfd[0].fd = pdata->fd_in;
237 	pfd[0].events = POLLIN | POLLRDHUP;
238 	pfd[0].revents = 0;
239 	pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
240 	pfd[1].events = POLLIN;
241 	pfd[1].revents = 0;
242 
243 	do {
244 		poll_nointr(pfd, 2);
245 
246 		/* Got STOP event, or client closed the socket: treat it as EOF */
247 		if (pfd[1].revents & POLLIN || pfd[0].revents & POLLRDHUP)
248 			return 0;
249 		if (pfd[0].revents & POLLERR)
250 			return -EIO;
251 		if (!(pfd[0].revents & POLLIN))
252 			continue;
253 
254 		do {
255 			if (pdata->fd_in_is_socket)
256 				ret = recv(pdata->fd_in, dest, len, MSG_NOSIGNAL);
257 			else
258 				ret = read(pdata->fd_in, dest, len);
259 		} while (ret == -1 && errno == EINTR);
260 
261 		if (ret != -1 || errno != EAGAIN)
262 			break;
263 	} while (true);
264 
265 	if (ret == -1)
266 		return -errno;
267 
268 	return ret;
269 }
270 
writefd_io(struct parser_pdata * pdata,const void * src,size_t len)271 static ssize_t writefd_io(struct parser_pdata *pdata, const void *src, size_t len)
272 {
273 	ssize_t ret;
274 	struct pollfd pfd[2];
275 
276 	pfd[0].fd = pdata->fd_out;
277 	pfd[0].events = POLLOUT;
278 	pfd[0].revents = 0;
279 	pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
280 	pfd[1].events = POLLIN;
281 	pfd[1].revents = 0;
282 
283 	do {
284 		poll_nointr(pfd, 2);
285 
286 		/* Got STOP event, or client closed the socket: treat it as EOF */
287 		if (pfd[1].revents & POLLIN || pfd[0].revents & POLLHUP)
288 			return 0;
289 		if (pfd[0].revents & POLLERR)
290 			return -EIO;
291 		if (!(pfd[0].revents & POLLOUT))
292 			continue;
293 
294 		do {
295 			if (pdata->fd_out_is_socket)
296 				ret = send(pdata->fd_out, src, len, MSG_NOSIGNAL);
297 			else
298 				ret = write(pdata->fd_out, src, len);
299 		} while (ret == -1 && errno == EINTR);
300 
301 		if (ret != -1 || errno != EAGAIN)
302 			break;
303 	} while (true);
304 
305 	if (ret == -1)
306 		return -errno;
307 
308 	return ret;
309 }
310 
write_all(struct parser_pdata * pdata,const void * src,size_t len)311 ssize_t write_all(struct parser_pdata *pdata, const void *src, size_t len)
312 {
313 	uintptr_t ptr = (uintptr_t) src;
314 
315 	while (len) {
316 		ssize_t ret = pdata->writefd(pdata, (void *) ptr, len);
317 		if (ret < 0)
318 			return ret;
319 		if (!ret)
320 			return -EPIPE;
321 		ptr += ret;
322 		len -= ret;
323 	}
324 
325 	return ptr - (uintptr_t) src;
326 }
327 
read_all(struct parser_pdata * pdata,void * dst,size_t len)328 static ssize_t read_all(struct parser_pdata *pdata,
329 		void *dst, size_t len)
330 {
331 	uintptr_t ptr = (uintptr_t) dst;
332 
333 	while (len) {
334 		ssize_t ret = pdata->readfd(pdata, (void *) ptr, len);
335 		if (ret < 0)
336 			return ret;
337 		if (!ret)
338 			return -EPIPE;
339 		ptr += ret;
340 		len -= ret;
341 	}
342 
343 	return ptr - (uintptr_t) dst;
344 }
345 
print_value(struct parser_pdata * pdata,long value)346 static void print_value(struct parser_pdata *pdata, long value)
347 {
348 	if (pdata->verbose && value < 0) {
349 		char buf[1024];
350 		iio_strerror(-value, buf, sizeof(buf));
351 		output(pdata, "ERROR: ");
352 		output(pdata, buf);
353 		output(pdata, "\n");
354 	} else {
355 		char buf[128];
356 		sprintf(buf, "%li\n", value);
357 		output(pdata, buf);
358 	}
359 }
360 
send_sample(const struct iio_channel * chn,void * src,size_t length,void * d)361 static ssize_t send_sample(const struct iio_channel *chn,
362 		void *src, size_t length, void *d)
363 {
364 	struct sample_cb_info *info = d;
365 	if (chn->index < 0 || !TEST_BIT(info->mask, chn->number))
366 		return 0;
367 	if (info->nb_bytes < length)
368 		return 0;
369 
370 	if (info->cpt % length) {
371 		unsigned int i, goal = length - info->cpt % length;
372 		char zero = 0;
373 		ssize_t ret;
374 
375 		for (i = 0; i < goal; i++) {
376 			ret = info->pdata->writefd(info->pdata, &zero, 1);
377 			if (ret < 0)
378 				return ret;
379 		}
380 		info->cpt += goal;
381 	}
382 
383 	info->cpt += length;
384 	info->nb_bytes -= length;
385 	return write_all(info->pdata, src, length);
386 }
387 
receive_sample(const struct iio_channel * chn,void * dst,size_t length,void * d)388 static ssize_t receive_sample(const struct iio_channel *chn,
389 		void *dst, size_t length, void *d)
390 {
391 	struct sample_cb_info *info = d;
392 	if (chn->index < 0 || !TEST_BIT(info->mask, chn->number))
393 		return 0;
394 	if (info->cpt == info->nb_bytes)
395 		return 0;
396 
397 	/* Skip the padding if needed */
398 	if (info->cpt % length) {
399 		unsigned int i, goal = length - info->cpt % length;
400 		char foo;
401 		ssize_t ret;
402 
403 		for (i = 0; i < goal; i++) {
404 			ret = info->pdata->readfd(info->pdata, &foo, 1);
405 			if (ret < 0)
406 				return ret;
407 		}
408 		info->cpt += goal;
409 	}
410 
411 	info->cpt += length;
412 	return read_all(info->pdata, dst, length);
413 }
414 
send_data(struct DevEntry * dev,struct ThdEntry * thd,size_t len)415 static ssize_t send_data(struct DevEntry *dev, struct ThdEntry *thd, size_t len)
416 {
417 	struct parser_pdata *pdata = thd->pdata;
418 	bool demux = server_demux && dev->sample_size != thd->sample_size;
419 
420 	if (demux)
421 		len = (len / dev->sample_size) * thd->sample_size;
422 	if (len > thd->nb)
423 		len = thd->nb;
424 
425 	print_value(pdata, len);
426 
427 	if (thd->new_client) {
428 		unsigned int i;
429 		char buf[129], *ptr = buf;
430 		uint32_t *mask = demux ? thd->mask : dev->mask;
431 		ssize_t ret;
432 
433 		/* Send the current mask */
434 		for (i = dev->nb_words; i > 0 && ptr < buf + sizeof(buf);
435 				i--, ptr += 8)
436 			sprintf(ptr, "%08x", mask[i - 1]);
437 
438 		*ptr = '\n';
439 		ret = write_all(pdata, buf, ptr + 1 - buf);
440 		if (ret < 0)
441 			return ret;
442 
443 		thd->new_client = false;
444 	}
445 
446 	if (!demux) {
447 		/* Short path */
448 		return write_all(pdata, dev->buf->buffer, len);
449 	} else {
450 		struct sample_cb_info info = {
451 			.pdata = pdata,
452 			.cpt = 0,
453 			.nb_bytes = len,
454 			.mask = thd->mask,
455 		};
456 
457 		return iio_buffer_foreach_sample(dev->buf, send_sample, &info);
458 	}
459 }
460 
receive_data(struct DevEntry * dev,struct ThdEntry * thd)461 static ssize_t receive_data(struct DevEntry *dev, struct ThdEntry *thd)
462 {
463 	struct parser_pdata *pdata = thd->pdata;
464 
465 	/* Inform that no error occured, and that we'll start reading data */
466 	if (thd->new_client) {
467 		print_value(thd->pdata, 0);
468 		thd->new_client = false;
469 	}
470 
471 	if (dev->sample_size == thd->sample_size) {
472 		/* Short path: Receive directly in the buffer */
473 
474 		size_t len = dev->buf->length;
475 		if (thd->nb < len)
476 			len = thd->nb;
477 
478 		return read_all(pdata, dev->buf->buffer, len);
479 	} else {
480 		/* Long path: Mux the samples to the buffer */
481 
482 		struct sample_cb_info info = {
483 			.pdata = pdata,
484 			.cpt = 0,
485 			.nb_bytes = thd->nb,
486 			.mask = thd->mask,
487 		};
488 
489 		return iio_buffer_foreach_sample(dev->buf,
490 				receive_sample, &info);
491 	}
492 }
493 
dev_entry_put(struct DevEntry * entry)494 static void dev_entry_put(struct DevEntry *entry)
495 {
496 	bool free_entry = false;
497 
498 	pthread_mutex_lock(&entry->thdlist_lock);
499 	entry->ref_count--;
500 	if (entry->ref_count == 0)
501 		free_entry = true;
502 	pthread_mutex_unlock(&entry->thdlist_lock);
503 
504 	if (free_entry) {
505 		pthread_mutex_destroy(&entry->thdlist_lock);
506 		pthread_cond_destroy(&entry->rw_ready_cond);
507 
508 		free(entry->mask);
509 		free(entry);
510 	}
511 }
512 
signal_thread(struct ThdEntry * thd,ssize_t ret)513 static void signal_thread(struct ThdEntry *thd, ssize_t ret)
514 {
515 	thd->err = ret;
516 	thd->nb = 0;
517 	thd->active = false;
518 	thd_entry_event_signal(thd);
519 }
520 
rw_thd(struct thread_pool * pool,void * d)521 static void rw_thd(struct thread_pool *pool, void *d)
522 {
523 	struct DevEntry *entry = d;
524 	struct ThdEntry *thd, *next_thd;
525 	struct iio_device *dev = entry->dev;
526 	unsigned int nb_words = entry->nb_words;
527 	ssize_t ret = 0;
528 
529 	DEBUG("R/W thread started for device %s\n",
530 			dev->name ? dev->name : dev->id);
531 
532 	while (true) {
533 		bool has_readers = false, has_writers = false,
534 		     mask_updated = false;
535 		unsigned int sample_size;
536 
537 		/* NOTE: this while loop must exit with thdlist_lock locked. */
538 		pthread_mutex_lock(&entry->thdlist_lock);
539 
540 		if (SLIST_EMPTY(&entry->thdlist_head))
541 			break;
542 
543 		if (entry->update_mask) {
544 			unsigned int i;
545 			unsigned int samples_count = 0;
546 
547 			memset(entry->mask, 0, nb_words * sizeof(*entry->mask));
548 			SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
549 				for (i = 0; i < nb_words; i++)
550 					entry->mask[i] |= thd->mask[i];
551 
552 				if (thd->samples_count > samples_count)
553 					samples_count = thd->samples_count;
554 			}
555 
556 			if (entry->buf)
557 				iio_buffer_destroy(entry->buf);
558 
559 			for (i = 0; i < dev->nb_channels; i++) {
560 				struct iio_channel *chn = dev->channels[i];
561 				long index = chn->index;
562 
563 				if (index < 0)
564 					continue;
565 
566 				if (TEST_BIT(entry->mask, chn->number))
567 					iio_channel_enable(chn);
568 				else
569 					iio_channel_disable(chn);
570 			}
571 
572 			entry->buf = iio_device_create_buffer(dev,
573 					samples_count, entry->cyclic);
574 			if (!entry->buf) {
575 				ret = -errno;
576 				ERROR("Unable to create buffer\n");
577 				break;
578 			}
579 			entry->cancelled = false;
580 
581 			/* Signal the threads that we opened the device */
582 			SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
583 				if (thd->wait_for_open) {
584 					thd->wait_for_open = false;
585 					signal_thread(thd, 0);
586 				}
587 			}
588 
589 			DEBUG("IIO device %s reopened with new mask:\n",
590 					dev->id);
591 			for (i = 0; i < nb_words; i++)
592 				DEBUG("Mask[%i] = 0x%08x\n", i, entry->mask[i]);
593 			entry->update_mask = false;
594 
595 			entry->sample_size = iio_device_get_sample_size(dev);
596 			mask_updated = true;
597 		}
598 
599 		sample_size = entry->sample_size;
600 
601 		SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
602 			thd->active = !thd->err && thd->nb >= sample_size;
603 			if (mask_updated && thd->active)
604 				signal_thread(thd, thd->nb);
605 
606 			if (thd->is_writer)
607 				has_writers |= thd->active;
608 			else
609 				has_readers |= thd->active;
610 		}
611 
612 		if (!has_readers && !has_writers) {
613 			pthread_cond_wait(&entry->rw_ready_cond,
614 					&entry->thdlist_lock);
615 		}
616 
617 		pthread_mutex_unlock(&entry->thdlist_lock);
618 
619 		if (!has_readers && !has_writers)
620 			continue;
621 
622 		if (has_readers) {
623 			ssize_t nb_bytes;
624 
625 			ret = iio_buffer_refill(entry->buf);
626 
627 			pthread_mutex_lock(&entry->thdlist_lock);
628 
629 			/*
630 			 * When the last client disconnects the buffer is
631 			 * cancelled and iio_buffer_refill() returns an error. A
632 			 * new client might have connected before we got here
633 			 * though, in that case the rw thread has to stay active
634 			 * and a new buffer is created. If the list is still empty the loop
635 			 * will exit normally.
636 			 */
637 			if (entry->cancelled) {
638 				pthread_mutex_unlock(&entry->thdlist_lock);
639 				continue;
640 			}
641 
642 			if (ret < 0) {
643 				ERROR("Reading from device failed: %i\n",
644 						(int) ret);
645 				break;
646 			}
647 
648 			nb_bytes = ret;
649 
650 			/* We don't use SLIST_FOREACH here. As soon as a thread is
651 			 * signaled, its "thd" structure might be freed;
652 			 * SLIST_FOREACH would then cause a segmentation fault, as it
653 			 * reads "thd" to get the address of the next element. */
654 			for (thd = SLIST_FIRST(&entry->thdlist_head);
655 					thd; thd = next_thd) {
656 				next_thd = SLIST_NEXT(thd, dev_list_entry);
657 
658 				if (!thd->active || thd->is_writer)
659 					continue;
660 
661 				ret = send_data(entry, thd, nb_bytes);
662 				if (ret > 0)
663 					thd->nb -= ret;
664 
665 				if (ret < 0 || thd->nb < sample_size)
666 					signal_thread(thd, (ret < 0) ?
667 							ret : thd->nb);
668 			}
669 
670 			pthread_mutex_unlock(&entry->thdlist_lock);
671 		}
672 
673 		if (has_writers) {
674 			ssize_t nb_bytes = 0;
675 
676 			pthread_mutex_lock(&entry->thdlist_lock);
677 
678 			/* Reset the size of the buffer to its maximum size */
679 			entry->buf->data_length = entry->buf->length;
680 
681 			/* Same comment as above */
682 			for (thd = SLIST_FIRST(&entry->thdlist_head);
683 					thd; thd = next_thd) {
684 				next_thd = SLIST_NEXT(thd, dev_list_entry);
685 
686 				if (!thd->active || !thd->is_writer)
687 					continue;
688 
689 				ret = receive_data(entry, thd);
690 				if (ret > 0) {
691 					thd->nb -= ret;
692 					if (ret > nb_bytes)
693 						nb_bytes = ret;
694 				}
695 
696 				if (ret < 0)
697 					signal_thread(thd, ret);
698 			}
699 
700 			ret = iio_buffer_push_partial(entry->buf,
701 				nb_bytes / sample_size);
702 			if (entry->cancelled) {
703 				pthread_mutex_unlock(&entry->thdlist_lock);
704 				continue;
705 			}
706 			if (ret < 0) {
707 				ERROR("Writing to device failed: %i\n",
708 						(int) ret);
709 				break;
710 			}
711 
712 			/* Signal threads which completed their RW command */
713 			for (thd = SLIST_FIRST(&entry->thdlist_head);
714 					thd; thd = next_thd) {
715 				next_thd = SLIST_NEXT(thd, dev_list_entry);
716 				if (thd->active && thd->is_writer &&
717 						thd->nb < sample_size)
718 					signal_thread(thd, thd->nb);
719 			}
720 
721 			pthread_mutex_unlock(&entry->thdlist_lock);
722 		}
723 	}
724 
725 	/* Signal all remaining threads */
726 	for (thd = SLIST_FIRST(&entry->thdlist_head); thd; thd = next_thd) {
727 		next_thd = SLIST_NEXT(thd, dev_list_entry);
728 		SLIST_REMOVE(&entry->thdlist_head, thd, ThdEntry, dev_list_entry);
729 		thd->wait_for_open = false;
730 		signal_thread(thd, ret);
731 	}
732 	if (entry->buf) {
733 		iio_buffer_destroy(entry->buf);
734 		entry->buf = NULL;
735 	}
736 	entry->closed = true;
737 	pthread_mutex_unlock(&entry->thdlist_lock);
738 
739 	pthread_mutex_lock(&devlist_lock);
740 	/* It is possible that a new thread has already started, make sure to
741 	 * not overwrite it. */
742 	if (iio_device_get_data(dev) == entry)
743 		iio_device_set_data(dev, NULL);
744 	pthread_mutex_unlock(&devlist_lock);
745 
746 	DEBUG("Stopping R/W thread for device %s\n",
747 			dev->name ? dev->name : dev->id);
748 
749 	dev_entry_put(entry);
750 }
751 
parser_lookup_thd_entry(struct parser_pdata * pdata,struct iio_device * dev)752 static struct ThdEntry *parser_lookup_thd_entry(struct parser_pdata *pdata,
753 	struct iio_device *dev)
754 {
755 	struct ThdEntry *t;
756 
757 	SLIST_FOREACH(t, &pdata->thdlist_head, parser_list_entry) {
758 		if (t->dev == dev)
759 			return t;
760 	}
761 
762 	return NULL;
763 }
764 
rw_buffer(struct parser_pdata * pdata,struct iio_device * dev,unsigned int nb,bool is_write)765 static ssize_t rw_buffer(struct parser_pdata *pdata,
766 		struct iio_device *dev, unsigned int nb, bool is_write)
767 {
768 	struct DevEntry *entry;
769 	struct ThdEntry *thd;
770 	ssize_t ret;
771 
772 	if (!dev)
773 		return -ENODEV;
774 
775 	thd = parser_lookup_thd_entry(pdata, dev);
776 	if (!thd)
777 		return -EBADF;
778 
779 	entry = thd->entry;
780 
781 	if (nb < entry->sample_size)
782 		return 0;
783 
784 	pthread_mutex_lock(&entry->thdlist_lock);
785 	if (entry->closed) {
786 		pthread_mutex_unlock(&entry->thdlist_lock);
787 		return -EBADF;
788 	}
789 
790 	if (thd->nb) {
791 		pthread_mutex_unlock(&entry->thdlist_lock);
792 		return -EBUSY;
793 	}
794 
795 	thd->new_client = true;
796 	thd->nb = nb;
797 	thd->err = 0;
798 	thd->is_writer = is_write;
799 	thd->active = true;
800 
801 	pthread_cond_signal(&entry->rw_ready_cond);
802 
803 	DEBUG("Waiting for completion...\n");
804 	while (thd->active) {
805 		ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
806 		if (ret)
807 			break;
808 	}
809 	if (ret == 0)
810 		ret = thd->err;
811 	pthread_mutex_unlock(&entry->thdlist_lock);
812 
813 	if (ret > 0 && ret < nb)
814 		print_value(thd->pdata, 0);
815 
816 	DEBUG("Exiting rw_buffer with code %li\n", (long) ret);
817 	if (ret < 0)
818 		return ret;
819 	else
820 		return nb - ret;
821 }
822 
get_mask(const char * mask,size_t * len)823 static uint32_t *get_mask(const char *mask, size_t *len)
824 {
825 	size_t nb = (*len + 7) / 8;
826 	uint32_t *ptr, *words = calloc(nb, sizeof(*words));
827 	if (!words)
828 		return NULL;
829 
830 	ptr = words + nb;
831 	while (*mask) {
832 		char buf[9];
833 		sprintf(buf, "%.*s", 8, mask);
834 		sscanf(buf, "%08x", --ptr);
835 		mask += 8;
836 		DEBUG("Mask[%lu]: 0x%08x\n",
837 				(unsigned long) (words - ptr) / 4, *ptr);
838 	}
839 
840 	*len = nb;
841 	return words;
842 }
843 
free_thd_entry(struct ThdEntry * t)844 static void free_thd_entry(struct ThdEntry *t)
845 {
846 	close(t->eventfd);
847 	free(t->mask);
848 	free(t);
849 }
850 
remove_thd_entry(struct ThdEntry * t)851 static void remove_thd_entry(struct ThdEntry *t)
852 {
853 	struct DevEntry *entry = t->entry;
854 
855 	pthread_mutex_lock(&entry->thdlist_lock);
856 	if (!entry->closed) {
857 		entry->update_mask = true;
858 		SLIST_REMOVE(&entry->thdlist_head, t, ThdEntry, dev_list_entry);
859 		if (SLIST_EMPTY(&entry->thdlist_head) && entry->buf) {
860 			entry->cancelled = true;
861 			iio_buffer_cancel(entry->buf); /* Wakeup the rw thread */
862 		}
863 
864 		pthread_cond_signal(&entry->rw_ready_cond);
865 	}
866 	pthread_mutex_unlock(&entry->thdlist_lock);
867 	dev_entry_put(entry);
868 
869 	free_thd_entry(t);
870 }
871 
open_dev_helper(struct parser_pdata * pdata,struct iio_device * dev,size_t samples_count,const char * mask,bool cyclic)872 static int open_dev_helper(struct parser_pdata *pdata, struct iio_device *dev,
873 		size_t samples_count, const char *mask, bool cyclic)
874 {
875 	int ret = -ENOMEM;
876 	struct DevEntry *entry;
877 	struct ThdEntry *thd;
878 	size_t len = strlen(mask);
879 	uint32_t *words;
880 	unsigned int nb_channels;
881 	unsigned int cyclic_retry = 500;
882 
883 	if (!dev)
884 		return -ENODEV;
885 
886 	nb_channels = dev->nb_channels;
887 	if (len != ((nb_channels + 31) / 32) * 8)
888 		return -EINVAL;
889 
890 	words = get_mask(mask, &len);
891 	if (!words)
892 		return -ENOMEM;
893 
894 	thd = zalloc(sizeof(*thd));
895 	if (!thd)
896 		goto err_free_words;
897 
898 	thd->wait_for_open = true;
899 	thd->mask = words;
900 	thd->nb = 0;
901 	thd->samples_count = samples_count;
902 	thd->sample_size = iio_device_get_sample_size_mask(dev, words, len);
903 	thd->pdata = pdata;
904 	thd->dev = dev;
905 	thd->eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
906 
907 retry:
908 	/* Atomically look up the thread and make sure that it is still active
909 	 * or allocate new one. */
910 	pthread_mutex_lock(&devlist_lock);
911 	entry = iio_device_get_data(dev);
912 	if (entry) {
913 		if (cyclic || entry->cyclic) {
914 			/* Only one client allowed in cyclic mode */
915 			pthread_mutex_unlock(&devlist_lock);
916 
917 			/* There is an inherent race condition if a client
918 			 * creates a new cyclic buffer shortly after destroying
919 			 * a previous. E.g. like
920 			 *
921 			 *     iio_buffer_destroy(buf);
922 			 *     buf = iio_device_create_buffer(dev, n, true);
923 			 *
924 			 * In this case the two buffers each use their own
925 			 * communication channel which are unordered to each
926 			 * other. E.g. the socket open might arrive before the
927 			 * socket close on the host side, even though they were
928 			 * sent in the opposite order on the client side. This
929 			 * race condition can cause an error being reported back
930 			 * to the client, even though the code on the client
931 			 * side was well formed and would work fine e.g. using
932 			 * the local backend.
933 			 *
934 			 * To avoid this issue go to sleep for up to 50ms in
935 			 * intervals of 100us. This should be enough time for
936 			 * the issue to resolve itself. If there actually is
937 			 * contention on the buffer an error will eventually be
938 			 * returned in which case the additional delay cause by
939 			 * the retires should not matter too much.
940 			 *
941 			 * This is not pretty but it works.
942 			 */
943 			if (cyclic_retry) {
944 			       cyclic_retry--;
945 			       usleep(100);
946 			       goto retry;
947 			}
948 
949 			ret = -EBUSY;
950 			goto err_free_thd;
951 		}
952 
953 		pthread_mutex_lock(&entry->thdlist_lock);
954 		if (!entry->closed) {
955 			pthread_mutex_unlock(&devlist_lock);
956 
957 			entry->ref_count++;
958 
959 			SLIST_INSERT_HEAD(&entry->thdlist_head, thd, dev_list_entry);
960 			thd->entry = entry;
961 			entry->update_mask = true;
962 			DEBUG("Added thread to client list\n");
963 
964 			pthread_cond_signal(&entry->rw_ready_cond);
965 
966 			/* Wait until the device is opened by the rw thread */
967 			while (thd->wait_for_open) {
968 				ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
969 				if (ret)
970 					break;
971 			}
972 			pthread_mutex_unlock(&entry->thdlist_lock);
973 
974 			if (ret == 0)
975 				ret = (int) thd->err;
976 			if (ret < 0)
977 				remove_thd_entry(thd);
978 			else
979 				SLIST_INSERT_HEAD(&pdata->thdlist_head, thd, parser_list_entry);
980 			return ret;
981 		} else {
982 			pthread_mutex_unlock(&entry->thdlist_lock);
983 		}
984 	}
985 
986 	entry = zalloc(sizeof(*entry));
987 	if (!entry) {
988 		pthread_mutex_unlock(&devlist_lock);
989 		goto err_free_thd;
990 	}
991 
992 	entry->ref_count = 2; /* One for thread, one for the client */
993 
994 	entry->mask = malloc(len * sizeof(*words));
995 	if (!entry->mask) {
996 		pthread_mutex_unlock(&devlist_lock);
997 		goto err_free_entry;
998 	}
999 
1000 	entry->cyclic = cyclic;
1001 	entry->nb_words = len;
1002 	entry->update_mask = true;
1003 	entry->dev = dev;
1004 	entry->buf = NULL;
1005 	SLIST_INIT(&entry->thdlist_head);
1006 	SLIST_INSERT_HEAD(&entry->thdlist_head, thd, dev_list_entry);
1007 	thd->entry = entry;
1008 	DEBUG("Added thread to client list\n");
1009 
1010 	pthread_mutex_init(&entry->thdlist_lock, NULL);
1011 	pthread_cond_init(&entry->rw_ready_cond, NULL);
1012 
1013 	ret = thread_pool_add_thread(main_thread_pool, rw_thd, entry, "rw_thd");
1014 	if (ret) {
1015 		pthread_mutex_unlock(&devlist_lock);
1016 		goto err_free_entry_mask;
1017 	}
1018 
1019 	DEBUG("Adding new device thread to device list\n");
1020 	iio_device_set_data(dev, entry);
1021 	pthread_mutex_unlock(&devlist_lock);
1022 
1023 	pthread_mutex_lock(&entry->thdlist_lock);
1024 	/* Wait until the device is opened by the rw thread */
1025 	while (thd->wait_for_open) {
1026 		ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
1027 		if (ret)
1028 			break;
1029 	}
1030 	pthread_mutex_unlock(&entry->thdlist_lock);
1031 
1032 	if (ret == 0)
1033 		ret = (int) thd->err;
1034 	if (ret < 0)
1035 		remove_thd_entry(thd);
1036 	else
1037 		SLIST_INSERT_HEAD(&pdata->thdlist_head, thd, parser_list_entry);
1038 	return ret;
1039 
1040 err_free_entry_mask:
1041 	free(entry->mask);
1042 err_free_entry:
1043 	free(entry);
1044 err_free_thd:
1045 	close(thd->eventfd);
1046 	free(thd);
1047 err_free_words:
1048 	free(words);
1049 	return ret;
1050 }
1051 
close_dev_helper(struct parser_pdata * pdata,struct iio_device * dev)1052 static int close_dev_helper(struct parser_pdata *pdata, struct iio_device *dev)
1053 {
1054 	struct ThdEntry *t;
1055 
1056 	if (!dev)
1057 		return -ENODEV;
1058 
1059 	t = parser_lookup_thd_entry(pdata, dev);
1060 	if (!t)
1061 		return -ENXIO;
1062 
1063 	SLIST_REMOVE(&pdata->thdlist_head, t, ThdEntry, parser_list_entry);
1064 	remove_thd_entry(t);
1065 
1066 	return 0;
1067 }
1068 
open_dev(struct parser_pdata * pdata,struct iio_device * dev,size_t samples_count,const char * mask,bool cyclic)1069 int open_dev(struct parser_pdata *pdata, struct iio_device *dev,
1070 		size_t samples_count, const char *mask, bool cyclic)
1071 {
1072 	int ret = open_dev_helper(pdata, dev, samples_count, mask, cyclic);
1073 	print_value(pdata, ret);
1074 	return ret;
1075 }
1076 
close_dev(struct parser_pdata * pdata,struct iio_device * dev)1077 int close_dev(struct parser_pdata *pdata, struct iio_device *dev)
1078 {
1079 	int ret = close_dev_helper(pdata, dev);
1080 	print_value(pdata, ret);
1081 	return ret;
1082 }
1083 
rw_dev(struct parser_pdata * pdata,struct iio_device * dev,unsigned int nb,bool is_write)1084 ssize_t rw_dev(struct parser_pdata *pdata, struct iio_device *dev,
1085 		unsigned int nb, bool is_write)
1086 {
1087 	ssize_t ret = rw_buffer(pdata, dev, nb, is_write);
1088 	if (ret <= 0 || is_write)
1089 		print_value(pdata, ret);
1090 	return ret;
1091 }
1092 
read_dev_attr(struct parser_pdata * pdata,struct iio_device * dev,const char * attr,enum iio_attr_type type)1093 ssize_t read_dev_attr(struct parser_pdata *pdata, struct iio_device *dev,
1094 		const char *attr, enum iio_attr_type type)
1095 {
1096 	/* We use a very large buffer here, as if attr is NULL all the
1097 	 * attributes will be read, which may represents a few kilobytes worth
1098 	 * of data. */
1099 	char buf[0x10000];
1100 	ssize_t ret = -EINVAL;
1101 
1102 	if (!dev) {
1103 		print_value(pdata, -ENODEV);
1104 		return -ENODEV;
1105 	}
1106 
1107 	switch (type) {
1108 		case IIO_ATTR_TYPE_DEVICE:
1109 			ret = iio_device_attr_read(dev, attr, buf, sizeof(buf) - 1);
1110 			break;
1111 		case IIO_ATTR_TYPE_DEBUG:
1112 			ret = iio_device_debug_attr_read(dev,
1113 				attr, buf, sizeof(buf) - 1);
1114 			break;
1115 		case IIO_ATTR_TYPE_BUFFER:
1116 			ret = iio_device_buffer_attr_read(dev,
1117 							attr, buf, sizeof(buf) - 1);
1118 			break;
1119 		default:
1120 			ret = -EINVAL;
1121 			break;
1122 	}
1123 	print_value(pdata, ret);
1124 	if (ret < 0)
1125 		return ret;
1126 
1127 	buf[ret] = '\n';
1128 	return write_all(pdata, buf, ret + 1);
1129 }
1130 
write_dev_attr(struct parser_pdata * pdata,struct iio_device * dev,const char * attr,size_t len,enum iio_attr_type type)1131 ssize_t write_dev_attr(struct parser_pdata *pdata, struct iio_device *dev,
1132 		const char *attr, size_t len, enum iio_attr_type type)
1133 {
1134 	ssize_t ret = -ENOMEM;
1135 	char *buf;
1136 
1137 	if (!dev) {
1138 		ret = -ENODEV;
1139 		goto err_print_value;
1140 	}
1141 
1142 	buf = malloc(len);
1143 	if (!buf)
1144 		goto err_print_value;
1145 
1146 	ret = read_all(pdata, buf, len);
1147 	if (ret < 0)
1148 		goto err_free_buffer;
1149 
1150 	switch (type) {
1151 		case IIO_ATTR_TYPE_DEVICE:
1152 			ret = iio_device_attr_write_raw(dev, attr, buf, len);
1153 			break;
1154 		case IIO_ATTR_TYPE_DEBUG:
1155 			ret = iio_device_debug_attr_write_raw(dev, attr, buf, len);
1156 			break;
1157 		case IIO_ATTR_TYPE_BUFFER:
1158 			ret = iio_device_buffer_attr_write_raw(dev, attr, buf, len);
1159 			break;
1160 		default:
1161 			ret = -EINVAL;
1162 			break;
1163 	}
1164 
1165 err_free_buffer:
1166 	free(buf);
1167 err_print_value:
1168 	print_value(pdata, ret);
1169 	return ret;
1170 }
1171 
read_chn_attr(struct parser_pdata * pdata,struct iio_channel * chn,const char * attr)1172 ssize_t read_chn_attr(struct parser_pdata *pdata,
1173 		struct iio_channel *chn, const char *attr)
1174 {
1175 	char buf[1024];
1176 	ssize_t ret = -ENODEV;
1177 
1178 	if (chn)
1179 		ret = iio_channel_attr_read(chn, attr, buf, sizeof(buf) - 1);
1180 	else if (pdata->dev)
1181 		ret = -ENXIO;
1182 	print_value(pdata, ret);
1183 	if (ret < 0)
1184 		return ret;
1185 
1186 	buf[ret] = '\n';
1187 	return write_all(pdata, buf, ret + 1);
1188 }
1189 
write_chn_attr(struct parser_pdata * pdata,struct iio_channel * chn,const char * attr,size_t len)1190 ssize_t write_chn_attr(struct parser_pdata *pdata,
1191 		struct iio_channel *chn, const char *attr, size_t len)
1192 {
1193 	ssize_t ret = -ENOMEM;
1194 	char *buf = malloc(len);
1195 	if (!buf)
1196 		goto err_print_value;
1197 
1198 	ret = read_all(pdata, buf, len);
1199 	if (ret < 0)
1200 		goto err_free_buffer;
1201 
1202 	if (chn)
1203 		ret = iio_channel_attr_write_raw(chn, attr, buf, len);
1204 	else if (pdata->dev)
1205 		ret = -ENXIO;
1206 	else
1207 		ret = -ENODEV;
1208 err_free_buffer:
1209 	free(buf);
1210 err_print_value:
1211 	print_value(pdata, ret);
1212 	return ret;
1213 }
1214 
set_trigger(struct parser_pdata * pdata,struct iio_device * dev,const char * trigger)1215 ssize_t set_trigger(struct parser_pdata *pdata,
1216 		struct iio_device *dev, const char *trigger)
1217 {
1218 	struct iio_device *trig = NULL;
1219 	ssize_t ret = -ENOENT;
1220 
1221 	if (!dev) {
1222 		ret = -ENODEV;
1223 		goto err_print_value;
1224 	}
1225 
1226 	if (trigger) {
1227 		trig = iio_context_find_device(pdata->ctx, trigger);
1228 		if (!trig)
1229 			goto err_print_value;
1230 	}
1231 
1232 	ret = iio_device_set_trigger(dev, trig);
1233 err_print_value:
1234 	print_value(pdata, ret);
1235 	return ret;
1236 }
1237 
get_trigger(struct parser_pdata * pdata,struct iio_device * dev)1238 ssize_t get_trigger(struct parser_pdata *pdata, struct iio_device *dev)
1239 {
1240 	const struct iio_device *trigger;
1241 	ssize_t ret;
1242 
1243 	if (!dev) {
1244 		print_value(pdata, -ENODEV);
1245 		return -ENODEV;
1246 	}
1247 
1248 	ret = iio_device_get_trigger(dev, &trigger);
1249 	if (!ret && trigger) {
1250 		char buf[256];
1251 
1252 		ret = strlen(trigger->name);
1253 		print_value(pdata, ret);
1254 
1255 		snprintf(buf, sizeof(buf), "%s\n", trigger->name);
1256 		ret = write_all(pdata, buf, ret + 1);
1257 	} else {
1258 		print_value(pdata, ret);
1259 	}
1260 	return ret;
1261 }
1262 
set_timeout(struct parser_pdata * pdata,unsigned int timeout)1263 int set_timeout(struct parser_pdata *pdata, unsigned int timeout)
1264 {
1265 	int ret = iio_context_set_timeout(pdata->ctx, timeout);
1266 	print_value(pdata, ret);
1267 	return ret;
1268 }
1269 
set_buffers_count(struct parser_pdata * pdata,struct iio_device * dev,long value)1270 int set_buffers_count(struct parser_pdata *pdata,
1271 		struct iio_device *dev, long value)
1272 {
1273 	int ret = -EINVAL;
1274 
1275 	if (!dev) {
1276 		ret = -ENODEV;
1277 		goto err_print_value;
1278 	}
1279 
1280 	if (value >= 1)
1281 		ret = iio_device_set_kernel_buffers_count(
1282 				dev, (unsigned int) value);
1283 err_print_value:
1284 	print_value(pdata, ret);
1285 	return ret;
1286 }
1287 
read_line(struct parser_pdata * pdata,char * buf,size_t len)1288 ssize_t read_line(struct parser_pdata *pdata, char *buf, size_t len)
1289 {
1290 	ssize_t ret;
1291 
1292 	if (pdata->fd_in_is_socket) {
1293 		struct pollfd pfd[2];
1294 		bool found;
1295 		size_t bytes_read = 0;
1296 
1297 		pfd[0].fd = pdata->fd_in;
1298 		pfd[0].events = POLLIN | POLLRDHUP;
1299 		pfd[0].revents = 0;
1300 		pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
1301 		pfd[1].events = POLLIN;
1302 		pfd[1].revents = 0;
1303 
1304 		do {
1305 			size_t i, to_trunc;
1306 
1307 			poll_nointr(pfd, 2);
1308 
1309 			if (pfd[1].revents & POLLIN ||
1310 					pfd[0].revents & POLLRDHUP)
1311 				return 0;
1312 
1313 			/* First read from the socket, without advancing the
1314 			 * read offset */
1315 			ret = recv(pdata->fd_in, buf, len,
1316 					MSG_NOSIGNAL | MSG_PEEK);
1317 			if (ret < 0)
1318 				return -errno;
1319 
1320 			/* Lookup for the trailing \n */
1321 			for (i = 0; i < (size_t) ret && buf[i] != '\n'; i++);
1322 			found = i < (size_t) ret;
1323 
1324 			len -= ret;
1325 			buf += ret;
1326 
1327 			to_trunc = found ? i + 1 : (size_t) ret;
1328 
1329 			/* Advance the read offset after the \n if found, or
1330 			 * after the last character read otherwise */
1331 			ret = recv(pdata->fd_in, NULL, to_trunc,
1332 					MSG_NOSIGNAL | MSG_TRUNC);
1333 			if (ret < 0)
1334 				return -errno;
1335 
1336 			bytes_read += to_trunc;
1337 		} while (!found && len);
1338 
1339 		/* No \n found? Just garbage data */
1340 		if (!found)
1341 			ret = -EIO;
1342 		else
1343 			ret = bytes_read;
1344 	} else {
1345 		ret = pdata->readfd(pdata, buf, len);
1346 	}
1347 
1348 	return ret;
1349 }
1350 
interpreter(struct iio_context * ctx,int fd_in,int fd_out,bool verbose,bool is_socket,bool use_aio,struct thread_pool * pool)1351 void interpreter(struct iio_context *ctx, int fd_in, int fd_out, bool verbose,
1352 	bool is_socket, bool use_aio, struct thread_pool *pool)
1353 {
1354 	yyscan_t scanner;
1355 	struct parser_pdata pdata;
1356 	unsigned int i;
1357 	int ret;
1358 
1359 	pdata.ctx = ctx;
1360 	pdata.stop = false;
1361 	pdata.fd_in = fd_in;
1362 	pdata.fd_out = fd_out;
1363 	pdata.verbose = verbose;
1364 	pdata.pool = pool;
1365 
1366 	pdata.fd_in_is_socket = is_socket;
1367 	pdata.fd_out_is_socket = is_socket;
1368 
1369 	SLIST_INIT(&pdata.thdlist_head);
1370 
1371 	if (use_aio) {
1372 		/* Note: if WITH_AIO is not defined, use_aio is always false.
1373 		 * We ensure that in iiod.c. */
1374 #if WITH_AIO
1375 		char err_str[1024];
1376 
1377 		pdata.aio_eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
1378 		if (pdata.aio_eventfd < 0) {
1379 			iio_strerror(errno, err_str, sizeof(err_str));
1380 			ERROR("Failed to create AIO eventfd: %s\n", err_str);
1381 			return;
1382 		}
1383 
1384 		pdata.aio_ctx = 0;
1385 		ret = io_setup(1, &pdata.aio_ctx);
1386 		if (ret < 0) {
1387 			iio_strerror(-ret, err_str, sizeof(err_str));
1388 			ERROR("Failed to create AIO context: %s\n", err_str);
1389 			close(pdata.aio_eventfd);
1390 			return;
1391 		}
1392 		pthread_mutex_init(&pdata.aio_mutex, NULL);
1393 		pdata.readfd = readfd_aio;
1394 		pdata.writefd = writefd_aio;
1395 #endif
1396 	} else {
1397 		pdata.readfd = readfd_io;
1398 		pdata.writefd = writefd_io;
1399 	}
1400 
1401 	yylex_init_extra(&pdata, &scanner);
1402 
1403 	do {
1404 		if (verbose)
1405 			output(&pdata, "iio-daemon > ");
1406 		ret = yyparse(scanner);
1407 	} while (!pdata.stop && ret >= 0);
1408 
1409 	yylex_destroy(scanner);
1410 
1411 	/* Close all opened devices */
1412 	for (i = 0; i < ctx->nb_devices; i++)
1413 		close_dev_helper(&pdata, ctx->devices[i]);
1414 
1415 #if WITH_AIO
1416 	if (use_aio) {
1417 		io_destroy(pdata.aio_ctx);
1418 		close(pdata.aio_eventfd);
1419 	}
1420 #endif
1421 }
1422