1 #include "fio.h"
2 #include "mutex.h"
3 #include "smalloc.h"
4 #include "flist.h"
5 
6 struct fio_flow {
7 	unsigned int refs;
8 	struct flist_head list;
9 	unsigned int id;
10 	long long int flow_counter;
11 };
12 
13 static struct flist_head *flow_list;
14 static struct fio_mutex *flow_lock;
15 
flow_threshold_exceeded(struct thread_data * td)16 int flow_threshold_exceeded(struct thread_data *td)
17 {
18 	struct fio_flow *flow = td->flow;
19 	int sign;
20 
21 	if (!flow)
22 		return 0;
23 
24 	sign = td->o.flow > 0 ? 1 : -1;
25 	if (sign * flow->flow_counter > td->o.flow_watermark) {
26 		if (td->o.flow_sleep) {
27 			io_u_quiesce(td);
28 			usleep(td->o.flow_sleep);
29 		}
30 
31 		return 1;
32 	}
33 
34 	/* No synchronization needed because it doesn't
35 	 * matter if the flow count is slightly inaccurate */
36 	flow->flow_counter += td->o.flow;
37 	return 0;
38 }
39 
flow_get(unsigned int id)40 static struct fio_flow *flow_get(unsigned int id)
41 {
42 	struct fio_flow *flow = NULL;
43 	struct flist_head *n;
44 
45 	if (!flow_lock)
46 		return NULL;
47 
48 	fio_mutex_down(flow_lock);
49 
50 	flist_for_each(n, flow_list) {
51 		flow = flist_entry(n, struct fio_flow, list);
52 		if (flow->id == id)
53 			break;
54 
55 		flow = NULL;
56 	}
57 
58 	if (!flow) {
59 		flow = smalloc(sizeof(*flow));
60 		if (!flow) {
61 			fio_mutex_up(flow_lock);
62 			return NULL;
63 		}
64 		flow->refs = 0;
65 		INIT_FLIST_HEAD(&flow->list);
66 		flow->id = id;
67 		flow->flow_counter = 0;
68 
69 		flist_add_tail(&flow->list, flow_list);
70 	}
71 
72 	flow->refs++;
73 	fio_mutex_up(flow_lock);
74 	return flow;
75 }
76 
flow_put(struct fio_flow * flow)77 static void flow_put(struct fio_flow *flow)
78 {
79 	if (!flow_lock)
80 		return;
81 
82 	fio_mutex_down(flow_lock);
83 
84 	if (!--flow->refs) {
85 		flist_del(&flow->list);
86 		sfree(flow);
87 	}
88 
89 	fio_mutex_up(flow_lock);
90 }
91 
flow_init_job(struct thread_data * td)92 void flow_init_job(struct thread_data *td)
93 {
94 	if (td->o.flow)
95 		td->flow = flow_get(td->o.flow_id);
96 }
97 
flow_exit_job(struct thread_data * td)98 void flow_exit_job(struct thread_data *td)
99 {
100 	if (td->flow) {
101 		flow_put(td->flow);
102 		td->flow = NULL;
103 	}
104 }
105 
flow_init(void)106 void flow_init(void)
107 {
108 	flow_list = smalloc(sizeof(*flow_list));
109 	if (!flow_list) {
110 		log_err("fio: smalloc pool exhausted\n");
111 		return;
112 	}
113 
114 	flow_lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
115 	if (!flow_lock) {
116 		log_err("fio: failed to allocate flow lock\n");
117 		sfree(flow_list);
118 		return;
119 	}
120 
121 	INIT_FLIST_HEAD(flow_list);
122 }
123 
flow_exit(void)124 void flow_exit(void)
125 {
126 	if (flow_lock)
127 		fio_mutex_remove(flow_lock);
128 	if (flow_list)
129 		sfree(flow_list);
130 }
131