1 /*
2  * ws protocol handler plugin for "lws-minimal" demonstrating lws threadpool
3  *
4  * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5  *
6  * This file is made available under the Creative Commons CC0 1.0
7  * Universal Public Domain Dedication.
8  *
9  * The main reason some things are as they are is that the task lifecycle may
10  * be unrelated to the wsi lifecycle that queued that task.
11  *
12  * Consider the task may call an external library and run for 30s without
13  * "checking in" to see if it should stop.  The wsi that started the task may
14  * have closed at any time before the 30s are up, with the browser window
15  * closing or whatever.
16  *
17  * So data shared between the asynchronous task and the wsi must have its
18  * lifecycle determined by the task, not the wsi.  That means a separate struct
19  * that can be freed by the task.
20  *
21  * In the case the wsi outlives the task, the tasks do not get destroyed until
22  * the service thread has called lws_threadpool_task_status() on the completed
23  * task.  So there is no danger of the shared task private data getting randomly
24  * freed.
25  */
26 
27 #if !defined (LWS_PLUGIN_STATIC)
28 #define LWS_DLL
29 #define LWS_INTERNAL
30 #include <libwebsockets.h>
31 #endif
32 
33 #include <string.h>
34 
35 struct per_vhost_data__minimal {
36 	struct lws_threadpool *tp;
37 	const char *config;
38 };
39 
40 struct task_data {
41 	char result[64];
42 
43 	uint64_t pos, end;
44 };
45 
46 /*
47  * Create the private data for the task
48  *
49  * Notice we hand over responsibility for the cleanup and freeing of the
50  * allocated task_data to the threadpool, because the wsi it was originally
51  * bound to may close while the thread is still running.  So we allocate
52  * something discrete for the task private data that can be definitively owned
53  * and freed by the threadpool, not the wsi... the pss won't do, as it only
54  * exists for the lifecycle of the wsi connection.
55  *
56  * When the task is created, we also tell it how to destroy the private data
57  * by giving it args.cleanup as cleanup_task_private_data() defined below.
58  */
59 
60 static struct task_data *
create_task_private_data(void)61 create_task_private_data(void)
62 {
63 	struct task_data *priv = malloc(sizeof(*priv));
64 
65 	return priv;
66 }
67 
68 /*
69  * Destroy the private data for the task
70  *
71  * Notice the wsi the task was originally bound to may be long gone, in the
72  * case we are destroying the lws context and the thread was doing something
73  * for a long time without checking in.
74  */
75 static void
cleanup_task_private_data(struct lws * wsi,void * user)76 cleanup_task_private_data(struct lws *wsi, void *user)
77 {
78 	struct task_data *priv = (struct task_data *)user;
79 
80 	free(priv);
81 }
82 
83 /*
84  * This runs in its own thread, from the threadpool.
85  *
86  * The implementation behind this in lws uses pthreads, but no pthreadisms are
87  * required in the user code.
88  *
89  * The example counts to 10M, "checking in" to see if it should stop after every
90  * 100K and pausing to sync with the service thread to send a ws message every
91  * 1M.  It resumes after the service thread determines the wsi is writable and
92  * the LWS_CALLBACK_SERVER_WRITEABLE indicates the task thread can continue by
93  * calling lws_threadpool_task_sync().
94  */
95 
96 static enum lws_threadpool_task_return
task_function(void * user,enum lws_threadpool_task_status s)97 task_function(void *user, enum lws_threadpool_task_status s)
98 {
99 	struct task_data *priv = (struct task_data *)user;
100 	int budget = 100 * 1000;
101 
102 	if (priv->pos == priv->end)
103 		return LWS_TP_RETURN_FINISHED;
104 
105 	/*
106 	 * Preferably replace this with ~100ms of your real task, so it
107 	 * can "check in" at short intervals to see if it has been asked to
108 	 * stop.
109 	 *
110 	 * You can just run tasks atomically here with the thread dedicated
111 	 * to it, but it will cause odd delays while shutting down etc and
112 	 * the task will run to completion even if the wsi that started it
113 	 * has since closed.
114 	 */
115 
116 	while (budget--)
117 		priv->pos++;
118 
119 	usleep(100000);
120 
121 	if (!(priv->pos % (1000 * 1000))) {
122 		lws_snprintf(priv->result + LWS_PRE,
123 			     sizeof(priv->result) - LWS_PRE,
124 			     "pos %llu", (unsigned long long)priv->pos);
125 
126 		return LWS_TP_RETURN_SYNC;
127 	}
128 
129 	return LWS_TP_RETURN_CHECKING_IN;
130 }
131 
132 static int
callback_minimal(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)133 callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
134 			void *user, void *in, size_t len)
135 {
136 	struct per_vhost_data__minimal *vhd =
137 			(struct per_vhost_data__minimal *)
138 			lws_protocol_vh_priv_get(lws_get_vhost(wsi),
139 					lws_get_protocol(wsi));
140 	const struct lws_protocol_vhost_options *pvo;
141 	struct lws_threadpool_create_args cargs;
142 	struct lws_threadpool_task_args args;
143 	struct lws_threadpool_task *task;
144 	struct task_data *priv;
145 	int n, m, r = 0;
146 	char name[32];
147 	void *_user;
148 
149 	switch (reason) {
150 	case LWS_CALLBACK_PROTOCOL_INIT:
151 		/* create our per-vhost struct */
152 		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
153 				lws_get_protocol(wsi),
154 				sizeof(struct per_vhost_data__minimal));
155 		if (!vhd)
156 			return 1;
157 
158 		/* recover the pointer to the globals struct */
159 		pvo = lws_pvo_search(
160 			(const struct lws_protocol_vhost_options *)in,
161 			"config");
162 		if (!pvo || !pvo->value) {
163 			lwsl_err("%s: Can't find \"config\" pvo\n", __func__);
164 			return 1;
165 		}
166 		vhd->config = pvo->value;
167 
168 		memset(&cargs, 0, sizeof(cargs));
169 
170 		cargs.max_queue_depth = 8;
171 		cargs.threads = 3;
172 		vhd->tp = lws_threadpool_create(lws_get_context(wsi),
173 				&cargs, "%s",
174 				lws_get_vhost_name(lws_get_vhost(wsi)));
175 		if (!vhd->tp)
176 			return 1;
177 
178 		lws_timed_callback_vh_protocol(lws_get_vhost(wsi),
179 					       lws_get_protocol(wsi),
180 					       LWS_CALLBACK_USER, 1);
181 
182 		break;
183 
184 	case LWS_CALLBACK_PROTOCOL_DESTROY:
185 		lws_threadpool_finish(vhd->tp);
186 		lws_threadpool_destroy(vhd->tp);
187 		break;
188 
189 	case LWS_CALLBACK_USER:
190 
191 		/*
192 		 * in debug mode, dump the threadpool stat to the logs once
193 		 * a second
194 		 */
195 		lws_threadpool_dump(vhd->tp);
196 		lws_timed_callback_vh_protocol(lws_get_vhost(wsi),
197 					       lws_get_protocol(wsi),
198 					       LWS_CALLBACK_USER, 1);
199 		break;
200 
201 	case LWS_CALLBACK_ESTABLISHED:
202 
203 		memset(&args, 0, sizeof(args));
204 		priv = args.user = create_task_private_data();
205 		if (!args.user)
206 			return 1;
207 
208 		priv->pos = 0;
209 		priv->end = 10 * 1000 * 1000;
210 
211 		/* queue the task... the task takes on responsibility for
212 		 * destroying args.user.  pss->priv just has a copy of it */
213 
214 		args.wsi = wsi;
215 		args.task = task_function;
216 		args.cleanup = cleanup_task_private_data;
217 
218 		lws_get_peer_simple(wsi, name, sizeof(name));
219 
220 		if (!lws_threadpool_enqueue(vhd->tp, &args, "ws %s", name)) {
221 			lwsl_user("%s: Couldn't enqueue task\n", __func__);
222 			cleanup_task_private_data(wsi, priv);
223 			return 1;
224 		}
225 
226 		lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL, 30);
227 
228 		/*
229 		 * so the asynchronous worker will let us know the next step
230 		 * by causing LWS_CALLBACK_SERVER_WRITEABLE
231 		 */
232 
233 		break;
234 
235 	case LWS_CALLBACK_CLOSED:
236 		break;
237 
238 	case LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL:
239 		lwsl_debug("LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: %p\n", wsi);
240 		lws_threadpool_dequeue(wsi);
241 		break;
242 
243 	case LWS_CALLBACK_SERVER_WRITEABLE:
244 
245 		/*
246 		 * even completed tasks wait in a queue until we call the
247 		 * below on them.  Then they may destroy themselves and their
248 		 * args.user data (by calling the cleanup callback).
249 		 *
250 		 * If you need to get things from the still-valid private task
251 		 * data, copy it here before calling
252 		 * lws_threadpool_task_status() that may free the task and the
253 		 * private task data.
254 		 */
255 
256 		n = lws_threadpool_task_status_wsi(wsi, &task, &_user);
257 		lwsl_debug("%s: LWS_CALLBACK_SERVER_WRITEABLE: status %d\n",
258 			   __func__, n);
259 		switch(n) {
260 
261 		case LWS_TP_STATUS_FINISHED:
262 		case LWS_TP_STATUS_STOPPED:
263 		case LWS_TP_STATUS_QUEUED:
264 		case LWS_TP_STATUS_RUNNING:
265 		case LWS_TP_STATUS_STOPPING:
266 			return 0;
267 
268 		case LWS_TP_STATUS_SYNCING:
269 			/* the task has paused for us to do something */
270 			break;
271 		default:
272 			return -1;
273 		}
274 
275 		priv = (struct task_data *)_user;
276 
277 		lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL_TASK, 5);
278 
279 		n = strlen(priv->result + LWS_PRE);
280 		m = lws_write(wsi, (unsigned char *)priv->result + LWS_PRE,
281 			      n, LWS_WRITE_TEXT);
282 		if (m < n) {
283 			lwsl_err("ERROR %d writing to ws socket\n", m);
284 			lws_threadpool_task_sync(task, 1);
285 			return -1;
286 		}
287 
288 		/*
289 		 * service thread has done whatever it wanted to do with the
290 		 * data the task produced: if it's waiting to do more it can
291 		 * continue now.
292 		 */
293 		lws_threadpool_task_sync(task, 0);
294 		break;
295 
296 	default:
297 		break;
298 	}
299 
300 	return r;
301 }
302 
303 #define LWS_PLUGIN_PROTOCOL_MINIMAL \
304 	{ \
305 		"lws-minimal", \
306 		callback_minimal, \
307 		0, \
308 		128, \
309 		0, NULL, 0 \
310 	}
311 
312 #if !defined (LWS_PLUGIN_STATIC)
313 
314 /* boilerplate needed if we are built as a dynamic plugin */
315 
316 static const struct lws_protocols protocols[] = {
317 	LWS_PLUGIN_PROTOCOL_MINIMAL
318 };
319 
320 int
init_protocol_minimal(struct lws_context * context,struct lws_plugin_capability * c)321 init_protocol_minimal(struct lws_context *context,
322 		      struct lws_plugin_capability *c)
323 {
324 	if (c->api_magic != LWS_PLUGIN_API_MAGIC) {
325 		lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC,
326 			 c->api_magic);
327 		return 1;
328 	}
329 
330 	c->protocols = protocols;
331 	c->count_protocols = LWS_ARRAY_SIZE(protocols);
332 	c->extensions = NULL;
333 	c->count_extensions = 0;
334 
335 	return 0;
336 }
337 
338 int
destroy_protocol_minimal(struct lws_context * context)339 destroy_protocol_minimal(struct lws_context *context)
340 {
341 	return 0;
342 }
343 #endif
344