1 /*
2 * ws protocol handler plugin for "lws-minimal" demonstrating lws threadpool
3 *
4 * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * The main reason some things are as they are is that the task lifecycle may
10 * be unrelated to the wsi lifecycle that queued that task.
11 *
12 * Consider the task may call an external library and run for 30s without
13 * "checking in" to see if it should stop. The wsi that started the task may
14 * have closed at any time before the 30s are up, with the browser window
15 * closing or whatever.
16 *
17 * So data shared between the asynchronous task and the wsi must have its
18 * lifecycle determined by the task, not the wsi. That means a separate struct
19 * that can be freed by the task.
20 *
21 * In the case the wsi outlives the task, the tasks do not get destroyed until
22 * the service thread has called lws_threadpool_task_status() on the completed
23 * task. So there is no danger of the shared task private data getting randomly
24 * freed.
25 */
26
27 #if !defined (LWS_PLUGIN_STATIC)
28 #define LWS_DLL
29 #define LWS_INTERNAL
30 #include <libwebsockets.h>
31 #endif
32
33 #include <string.h>
34
35 struct per_vhost_data__minimal {
36 struct lws_threadpool *tp;
37 const char *config;
38 };
39
40 struct task_data {
41 char result[64];
42
43 uint64_t pos, end;
44 };
45
46 /*
47 * Create the private data for the task
48 *
49 * Notice we hand over responsibility for the cleanup and freeing of the
50 * allocated task_data to the threadpool, because the wsi it was originally
51 * bound to may close while the thread is still running. So we allocate
52 * something discrete for the task private data that can be definitively owned
53 * and freed by the threadpool, not the wsi... the pss won't do, as it only
54 * exists for the lifecycle of the wsi connection.
55 *
56 * When the task is created, we also tell it how to destroy the private data
57 * by giving it args.cleanup as cleanup_task_private_data() defined below.
58 */
59
60 static struct task_data *
create_task_private_data(void)61 create_task_private_data(void)
62 {
63 struct task_data *priv = malloc(sizeof(*priv));
64
65 return priv;
66 }
67
68 /*
69 * Destroy the private data for the task
70 *
71 * Notice the wsi the task was originally bound to may be long gone, in the
72 * case we are destroying the lws context and the thread was doing something
73 * for a long time without checking in.
74 */
75 static void
cleanup_task_private_data(struct lws * wsi,void * user)76 cleanup_task_private_data(struct lws *wsi, void *user)
77 {
78 struct task_data *priv = (struct task_data *)user;
79
80 free(priv);
81 }
82
83 /*
84 * This runs in its own thread, from the threadpool.
85 *
86 * The implementation behind this in lws uses pthreads, but no pthreadisms are
87 * required in the user code.
88 *
89 * The example counts to 10M, "checking in" to see if it should stop after every
90 * 100K and pausing to sync with the service thread to send a ws message every
91 * 1M. It resumes after the service thread determines the wsi is writable and
92 * the LWS_CALLBACK_SERVER_WRITEABLE indicates the task thread can continue by
93 * calling lws_threadpool_task_sync().
94 */
95
96 static enum lws_threadpool_task_return
task_function(void * user,enum lws_threadpool_task_status s)97 task_function(void *user, enum lws_threadpool_task_status s)
98 {
99 struct task_data *priv = (struct task_data *)user;
100 int budget = 100 * 1000;
101
102 if (priv->pos == priv->end)
103 return LWS_TP_RETURN_FINISHED;
104
105 /*
106 * Preferably replace this with ~100ms of your real task, so it
107 * can "check in" at short intervals to see if it has been asked to
108 * stop.
109 *
110 * You can just run tasks atomically here with the thread dedicated
111 * to it, but it will cause odd delays while shutting down etc and
112 * the task will run to completion even if the wsi that started it
113 * has since closed.
114 */
115
116 while (budget--)
117 priv->pos++;
118
119 usleep(100000);
120
121 if (!(priv->pos % (1000 * 1000))) {
122 lws_snprintf(priv->result + LWS_PRE,
123 sizeof(priv->result) - LWS_PRE,
124 "pos %llu", (unsigned long long)priv->pos);
125
126 return LWS_TP_RETURN_SYNC;
127 }
128
129 return LWS_TP_RETURN_CHECKING_IN;
130 }
131
132 static int
callback_minimal(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)133 callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
134 void *user, void *in, size_t len)
135 {
136 struct per_vhost_data__minimal *vhd =
137 (struct per_vhost_data__minimal *)
138 lws_protocol_vh_priv_get(lws_get_vhost(wsi),
139 lws_get_protocol(wsi));
140 const struct lws_protocol_vhost_options *pvo;
141 struct lws_threadpool_create_args cargs;
142 struct lws_threadpool_task_args args;
143 struct lws_threadpool_task *task;
144 struct task_data *priv;
145 int n, m, r = 0;
146 char name[32];
147 void *_user;
148
149 switch (reason) {
150 case LWS_CALLBACK_PROTOCOL_INIT:
151 /* create our per-vhost struct */
152 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
153 lws_get_protocol(wsi),
154 sizeof(struct per_vhost_data__minimal));
155 if (!vhd)
156 return 1;
157
158 /* recover the pointer to the globals struct */
159 pvo = lws_pvo_search(
160 (const struct lws_protocol_vhost_options *)in,
161 "config");
162 if (!pvo || !pvo->value) {
163 lwsl_err("%s: Can't find \"config\" pvo\n", __func__);
164 return 1;
165 }
166 vhd->config = pvo->value;
167
168 memset(&cargs, 0, sizeof(cargs));
169
170 cargs.max_queue_depth = 8;
171 cargs.threads = 3;
172 vhd->tp = lws_threadpool_create(lws_get_context(wsi),
173 &cargs, "%s",
174 lws_get_vhost_name(lws_get_vhost(wsi)));
175 if (!vhd->tp)
176 return 1;
177
178 lws_timed_callback_vh_protocol(lws_get_vhost(wsi),
179 lws_get_protocol(wsi),
180 LWS_CALLBACK_USER, 1);
181
182 break;
183
184 case LWS_CALLBACK_PROTOCOL_DESTROY:
185 lws_threadpool_finish(vhd->tp);
186 lws_threadpool_destroy(vhd->tp);
187 break;
188
189 case LWS_CALLBACK_USER:
190
191 /*
192 * in debug mode, dump the threadpool stat to the logs once
193 * a second
194 */
195 lws_threadpool_dump(vhd->tp);
196 lws_timed_callback_vh_protocol(lws_get_vhost(wsi),
197 lws_get_protocol(wsi),
198 LWS_CALLBACK_USER, 1);
199 break;
200
201 case LWS_CALLBACK_ESTABLISHED:
202
203 memset(&args, 0, sizeof(args));
204 priv = args.user = create_task_private_data();
205 if (!args.user)
206 return 1;
207
208 priv->pos = 0;
209 priv->end = 10 * 1000 * 1000;
210
211 /* queue the task... the task takes on responsibility for
212 * destroying args.user. pss->priv just has a copy of it */
213
214 args.wsi = wsi;
215 args.task = task_function;
216 args.cleanup = cleanup_task_private_data;
217
218 lws_get_peer_simple(wsi, name, sizeof(name));
219
220 if (!lws_threadpool_enqueue(vhd->tp, &args, "ws %s", name)) {
221 lwsl_user("%s: Couldn't enqueue task\n", __func__);
222 cleanup_task_private_data(wsi, priv);
223 return 1;
224 }
225
226 lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL, 30);
227
228 /*
229 * so the asynchronous worker will let us know the next step
230 * by causing LWS_CALLBACK_SERVER_WRITEABLE
231 */
232
233 break;
234
235 case LWS_CALLBACK_CLOSED:
236 break;
237
238 case LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL:
239 lwsl_debug("LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: %p\n", wsi);
240 lws_threadpool_dequeue(wsi);
241 break;
242
243 case LWS_CALLBACK_SERVER_WRITEABLE:
244
245 /*
246 * even completed tasks wait in a queue until we call the
247 * below on them. Then they may destroy themselves and their
248 * args.user data (by calling the cleanup callback).
249 *
250 * If you need to get things from the still-valid private task
251 * data, copy it here before calling
252 * lws_threadpool_task_status() that may free the task and the
253 * private task data.
254 */
255
256 n = lws_threadpool_task_status_wsi(wsi, &task, &_user);
257 lwsl_debug("%s: LWS_CALLBACK_SERVER_WRITEABLE: status %d\n",
258 __func__, n);
259 switch(n) {
260
261 case LWS_TP_STATUS_FINISHED:
262 case LWS_TP_STATUS_STOPPED:
263 case LWS_TP_STATUS_QUEUED:
264 case LWS_TP_STATUS_RUNNING:
265 case LWS_TP_STATUS_STOPPING:
266 return 0;
267
268 case LWS_TP_STATUS_SYNCING:
269 /* the task has paused for us to do something */
270 break;
271 default:
272 return -1;
273 }
274
275 priv = (struct task_data *)_user;
276
277 lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL_TASK, 5);
278
279 n = strlen(priv->result + LWS_PRE);
280 m = lws_write(wsi, (unsigned char *)priv->result + LWS_PRE,
281 n, LWS_WRITE_TEXT);
282 if (m < n) {
283 lwsl_err("ERROR %d writing to ws socket\n", m);
284 lws_threadpool_task_sync(task, 1);
285 return -1;
286 }
287
288 /*
289 * service thread has done whatever it wanted to do with the
290 * data the task produced: if it's waiting to do more it can
291 * continue now.
292 */
293 lws_threadpool_task_sync(task, 0);
294 break;
295
296 default:
297 break;
298 }
299
300 return r;
301 }
302
303 #define LWS_PLUGIN_PROTOCOL_MINIMAL \
304 { \
305 "lws-minimal", \
306 callback_minimal, \
307 0, \
308 128, \
309 0, NULL, 0 \
310 }
311
312 #if !defined (LWS_PLUGIN_STATIC)
313
314 /* boilerplate needed if we are built as a dynamic plugin */
315
316 static const struct lws_protocols protocols[] = {
317 LWS_PLUGIN_PROTOCOL_MINIMAL
318 };
319
320 int
init_protocol_minimal(struct lws_context * context,struct lws_plugin_capability * c)321 init_protocol_minimal(struct lws_context *context,
322 struct lws_plugin_capability *c)
323 {
324 if (c->api_magic != LWS_PLUGIN_API_MAGIC) {
325 lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC,
326 c->api_magic);
327 return 1;
328 }
329
330 c->protocols = protocols;
331 c->count_protocols = LWS_ARRAY_SIZE(protocols);
332 c->extensions = NULL;
333 c->count_extensions = 0;
334
335 return 0;
336 }
337
338 int
destroy_protocol_minimal(struct lws_context * context)339 destroy_protocol_minimal(struct lws_context *context)
340 {
341 return 0;
342 }
343 #endif
344