1 /*
2 * lws-minimal-http-server-sse
3 *
4 * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * This demonstrates a minimal http server that can serve both normal static
10 * content and server-side event connections.
11 *
12 * To keep it simple, it serves the static stuff from the subdirectory
13 * "./mount-origin" of the directory it was started in.
14 *
15 * You can change that by changing mount.origin below.
16 */
17
18 #include <libwebsockets.h>
19 #include <string.h>
20 #include <stdlib.h>
21 #include <signal.h>
22 #include <pthread.h>
23 #include <time.h>
24
25 /* one of these created for each message in the ringbuffer */
26
27 struct msg {
28 void *payload; /* is malloc'd */
29 size_t len;
30 };
31
32 /*
33 * Unlike ws, http is a stateless protocol. This pss only exists for the
34 * duration of a single http transaction. With http/1.1 keep-alive and http/2,
35 * that is unrelated to (shorter than) the lifetime of the network connection.
36 */
37 struct pss {
38 struct pss *pss_list;
39 struct lws *wsi;
40 uint32_t tail;
41 };
42
43 /* one of these is created for each vhost our protocol is used with */
44
45 struct vhd {
46 struct lws_context *context;
47 struct lws_vhost *vhost;
48 const struct lws_protocols *protocol;
49
50 struct pss *pss_list; /* linked-list of live pss*/
51 pthread_t pthread_spam[2];
52
53 pthread_mutex_t lock_ring; /* serialize access to the ring buffer */
54 struct lws_ring *ring; /* ringbuffer holding unsent messages */
55 char finished;
56 };
57
58 static int interrupted;
59
60 #if defined(WIN32)
usleep(unsigned long l)61 static void usleep(unsigned long l) { Sleep(l / 1000); }
62 #endif
63
64
65 /* destroys the message when everyone has had a copy of it */
66
67 static void
__minimal_destroy_message(void * _msg)68 __minimal_destroy_message(void *_msg)
69 {
70 struct msg *msg = _msg;
71
72 free(msg->payload);
73 msg->payload = NULL;
74 msg->len = 0;
75 }
76
77 /*
78 * This runs under the "spam thread" thread context only.
79 *
80 * We spawn two threads that generate messages with this.
81 *
82 */
83
84 static void *
thread_spam(void * d)85 thread_spam(void *d)
86 {
87 struct vhd *vhd = (struct vhd *)d;
88 struct msg amsg;
89 int len = 128, index = 1, n;
90
91 do {
92 /* don't generate output if nobody connected */
93 if (!vhd->pss_list)
94 goto wait;
95
96 pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */
97
98 /* only create if space in ringbuffer */
99 n = (int)lws_ring_get_count_free_elements(vhd->ring);
100 if (!n) {
101 lwsl_user("dropping!\n");
102 goto wait_unlock;
103 }
104
105 amsg.payload = malloc(len);
106 if (!amsg.payload) {
107 lwsl_user("OOM: dropping\n");
108 goto wait_unlock;
109 }
110 n = lws_snprintf((char *)amsg.payload, len,
111 "%s: tid: %p, msg: %d", __func__,
112 (void *)pthread_self(), index++);
113 amsg.len = n;
114 n = lws_ring_insert(vhd->ring, &amsg, 1);
115 if (n != 1) {
116 __minimal_destroy_message(&amsg);
117 lwsl_user("dropping!\n");
118 } else
119 /*
120 * This will cause a LWS_CALLBACK_EVENT_WAIT_CANCELLED
121 * in the lws service thread context.
122 */
123 lws_cancel_service(vhd->context);
124
125 wait_unlock:
126 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */
127
128 wait:
129 /* rand() would make more sense but coverity shrieks */
130 usleep(100000 + (time(NULL) & 0xffff));
131
132 } while (!vhd->finished);
133
134 lwsl_notice("thread_spam %p exiting\n", (void *)pthread_self());
135
136 pthread_exit(NULL);
137
138 return NULL;
139 }
140
141
142 static int
callback_sse(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)143 callback_sse(struct lws *wsi, enum lws_callback_reasons reason, void *user,
144 void *in, size_t len)
145 {
146 struct pss *pss = (struct pss *)user;
147 struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
148 lws_get_vhost(wsi), lws_get_protocol(wsi));
149 uint8_t buf[LWS_PRE + LWS_RECOMMENDED_MIN_HEADER_SPACE],
150 *start = &buf[LWS_PRE], *p = start,
151 *end = &buf[sizeof(buf) - 1];
152 const struct msg *pmsg;
153 void *retval;
154 int n;
155
156 switch (reason) {
157
158 /* --- vhost protocol lifecycle --- */
159
160 case LWS_CALLBACK_PROTOCOL_INIT:
161 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
162 lws_get_protocol(wsi), sizeof(struct vhd));
163 vhd->context = lws_get_context(wsi);
164 vhd->protocol = lws_get_protocol(wsi);
165 vhd->vhost = lws_get_vhost(wsi);
166
167 vhd->ring = lws_ring_create(sizeof(struct msg), 8,
168 __minimal_destroy_message);
169 if (!vhd->ring)
170 return 1;
171
172 pthread_mutex_init(&vhd->lock_ring, NULL);
173
174 /* start the content-creating threads */
175
176 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
177 if (pthread_create(&vhd->pthread_spam[n], NULL,
178 thread_spam, vhd)) {
179 lwsl_err("thread creation failed\n");
180 goto init_fail;
181 }
182
183 return 0;
184
185 case LWS_CALLBACK_PROTOCOL_DESTROY:
186 init_fail:
187 vhd->finished = 1;
188 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
189 if (vhd->pthread_spam[n])
190 pthread_join(vhd->pthread_spam[n], &retval);
191
192 if (vhd->ring)
193 lws_ring_destroy(vhd->ring);
194
195 pthread_mutex_destroy(&vhd->lock_ring);
196 return 0;
197
198 /* --- http connection lifecycle --- */
199
200 case LWS_CALLBACK_HTTP:
201 /*
202 * `in` contains the url part after our mountpoint /sse, if any
203 * you can use this to determine what data to return and store
204 * that in the pss
205 */
206 lwsl_info("%s: LWS_CALLBACK_HTTP: '%s'\n", __func__,
207 (const char *)in);
208
209 /* SSE requires a http OK response with this content-type */
210
211 if (lws_add_http_common_headers(wsi, HTTP_STATUS_OK,
212 "text/event-stream",
213 LWS_ILLEGAL_HTTP_CONTENT_LEN,
214 &p, end))
215 return 1;
216
217 if (lws_finalize_write_http_header(wsi, start, &p, end))
218 return 1;
219
220 /* add ourselves to the list of live pss held in the vhd */
221
222 lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
223 pss->tail = lws_ring_get_oldest_tail(vhd->ring);
224 pss->wsi = wsi;
225
226 /*
227 * This tells lws we are no longer a normal http stream,
228 * but are an "immortal" (plus or minus whatever timeout you
229 * set on it afterwards) SSE stream. In http/2 case that also
230 * stops idle timeouts being applied to the network connection
231 * while this wsi is still open.
232 */
233 lws_http_mark_sse(wsi);
234
235 /* write the body separately */
236
237 lws_callback_on_writable(wsi);
238
239 return 0;
240
241 case LWS_CALLBACK_CLOSED_HTTP:
242 /* remove our closing pss from the list of live pss */
243
244 lws_ll_fwd_remove(struct pss, pss_list, pss, vhd->pss_list);
245 return 0;
246
247 /* --- data transfer --- */
248
249 case LWS_CALLBACK_HTTP_WRITEABLE:
250
251 lwsl_info("%s: LWS_CALLBACK_HTTP_WRITEABLE\n", __func__);
252
253 pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
254 if (!pmsg)
255 break;
256
257 p += lws_snprintf((char *)p, end - p,
258 "data: %s\x0d\x0a\x0d\x0a",
259 (const char *)pmsg->payload);
260
261 if (lws_write(wsi, (uint8_t *)start, lws_ptr_diff(p, start),
262 LWS_WRITE_HTTP) != lws_ptr_diff(p, start))
263 return 1;
264
265 lws_ring_consume_and_update_oldest_tail(
266 vhd->ring, /* lws_ring object */
267 struct pss, /* type of objects with tails */
268 &pss->tail, /* tail of guy doing the consuming */
269 1, /* number of payload objects being consumed */
270 vhd->pss_list, /* head of list of objects with tails */
271 tail, /* member name of tail in objects with tails */
272 pss_list /* member name of next object in objects with tails */
273 );
274
275 if (lws_ring_get_element(vhd->ring, &pss->tail))
276 /* come back as soon as we can write more */
277 lws_callback_on_writable(pss->wsi);
278
279 return 0;
280
281 case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
282 if (!vhd)
283 break;
284 /*
285 * let everybody know we want to write something on them
286 * as soon as they are ready
287 */
288 lws_start_foreach_llp(struct pss **, ppss, vhd->pss_list) {
289 lws_callback_on_writable((*ppss)->wsi);
290 } lws_end_foreach_llp(ppss, pss_list);
291 return 0;
292
293 default:
294 break;
295 }
296
297 return lws_callback_http_dummy(wsi, reason, user, in, len);
298 }
299
300 static struct lws_protocols protocols[] = {
301 { "http", lws_callback_http_dummy, 0, 0 },
302 { "sse", callback_sse, sizeof(struct pss), 0 },
303 { NULL, NULL, 0, 0 } /* terminator */
304 };
305
306 /* override the default mount for /sse in the URL space */
307
308 static const struct lws_http_mount mount_sse = {
309 /* .mount_next */ NULL, /* linked-list "next" */
310 /* .mountpoint */ "/sse", /* mountpoint URL */
311 /* .origin */ NULL, /* protocol */
312 /* .def */ NULL,
313 /* .protocol */ "sse",
314 /* .cgienv */ NULL,
315 /* .extra_mimetypes */ NULL,
316 /* .interpret */ NULL,
317 /* .cgi_timeout */ 0,
318 /* .cache_max_age */ 0,
319 /* .auth_mask */ 0,
320 /* .cache_reusable */ 0,
321 /* .cache_revalidate */ 0,
322 /* .cache_intermediaries */ 0,
323 /* .origin_protocol */ LWSMPRO_CALLBACK, /* dynamic */
324 /* .mountpoint_len */ 4, /* char count */
325 /* .basic_auth_login_file */ NULL,
326 };
327
328 /* default mount serves the URL space from ./mount-origin */
329
330 static const struct lws_http_mount mount = {
331 /* .mount_next */ &mount_sse, /* linked-list "next" */
332 /* .mountpoint */ "/", /* mountpoint URL */
333 /* .origin */ "./mount-origin", /* serve from dir */
334 /* .def */ "index.html", /* default filename */
335 /* .protocol */ NULL,
336 /* .cgienv */ NULL,
337 /* .extra_mimetypes */ NULL,
338 /* .interpret */ NULL,
339 /* .cgi_timeout */ 0,
340 /* .cache_max_age */ 0,
341 /* .auth_mask */ 0,
342 /* .cache_reusable */ 0,
343 /* .cache_revalidate */ 0,
344 /* .cache_intermediaries */ 0,
345 /* .origin_protocol */ LWSMPRO_FILE, /* files in a dir */
346 /* .mountpoint_len */ 1, /* char count */
347 /* .basic_auth_login_file */ NULL,
348 };
349
sigint_handler(int sig)350 void sigint_handler(int sig)
351 {
352 interrupted = 1;
353 }
354
main(int argc,const char ** argv)355 int main(int argc, const char **argv)
356 {
357 struct lws_context_creation_info info;
358 struct lws_context *context;
359 const char *p;
360 int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
361 /* for LLL_ verbosity above NOTICE to be built into lws,
362 * lws must have been configured and built with
363 * -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */
364 /* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */
365 /* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */
366 /* | LLL_DEBUG */;
367
368 signal(SIGINT, sigint_handler);
369
370 if ((p = lws_cmdline_option(argc, argv, "-d")))
371 logs = atoi(p);
372
373 lws_set_log_level(logs, NULL);
374 lwsl_user("LWS minimal http Server-Side Events + ring | visit http://localhost:7681\n");
375
376 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
377 info.port = 7681;
378 info.protocols = protocols;
379 info.mounts = &mount;
380 info.options =
381 LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE;
382
383 context = lws_create_context(&info);
384 if (!context) {
385 lwsl_err("lws init failed\n");
386 return 1;
387 }
388
389 while (n >= 0 && !interrupted)
390 n = lws_service(context, 0);
391
392 lws_context_destroy(context);
393
394 return 0;
395 }
396