1 /*
2  * ws protocol handler plugin for "lws-minimal-server-echo"
3  *
4  * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5  *
6  * This file is made available under the Creative Commons CC0 1.0
7  * Universal Public Domain Dedication.
8  *
9  * The protocol shows how to send and receive bulk messages over a ws connection
10  * that optionally may have the permessage-deflate extension negotiated on it.
11  */
12 
13 #if !defined (LWS_PLUGIN_STATIC)
14 #define LWS_DLL
15 #define LWS_INTERNAL
16 #include <libwebsockets.h>
17 #endif
18 
19 #include <string.h>
20 
21 #define RING_DEPTH 4096
22 
23 /* one of these created for each message */
24 
25 struct msg {
26 	void *payload; /* is malloc'd */
27 	size_t len;
28 	char binary;
29 	char first;
30 	char final;
31 };
32 
33 struct per_session_data__minimal_server_echo {
34 	struct lws_ring *ring;
35 	uint32_t msglen;
36 	uint32_t tail;
37 	uint8_t completed:1;
38 	uint8_t flow_controlled:1;
39 	uint8_t write_consume_pending:1;
40 };
41 
42 struct vhd_minimal_server_echo {
43 	struct lws_context *context;
44 	struct lws_vhost *vhost;
45 
46 	int *interrupted;
47 	int *options;
48 };
49 
50 static void
__minimal_destroy_message(void * _msg)51 __minimal_destroy_message(void *_msg)
52 {
53 	struct msg *msg = _msg;
54 
55 	free(msg->payload);
56 	msg->payload = NULL;
57 	msg->len = 0;
58 }
59 #include <assert.h>
60 static int
callback_minimal_server_echo(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)61 callback_minimal_server_echo(struct lws *wsi, enum lws_callback_reasons reason,
62 			  void *user, void *in, size_t len)
63 {
64 	struct per_session_data__minimal_server_echo *pss =
65 			(struct per_session_data__minimal_server_echo *)user;
66 	struct vhd_minimal_server_echo *vhd = (struct vhd_minimal_server_echo *)
67 			lws_protocol_vh_priv_get(lws_get_vhost(wsi),
68 				lws_get_protocol(wsi));
69 	const struct msg *pmsg;
70 	struct msg amsg;
71 	int m, n, flags;
72 
73 	switch (reason) {
74 
75 	case LWS_CALLBACK_PROTOCOL_INIT:
76 		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
77 				lws_get_protocol(wsi),
78 				sizeof(struct vhd_minimal_server_echo));
79 		if (!vhd)
80 			return -1;
81 
82 		vhd->context = lws_get_context(wsi);
83 		vhd->vhost = lws_get_vhost(wsi);
84 
85 		/* get the pointers we were passed in pvo */
86 
87 		vhd->interrupted = (int *)lws_pvo_search(
88 			(const struct lws_protocol_vhost_options *)in,
89 			"interrupted")->value;
90 		vhd->options = (int *)lws_pvo_search(
91 			(const struct lws_protocol_vhost_options *)in,
92 			"options")->value;
93 		break;
94 
95 	case LWS_CALLBACK_ESTABLISHED:
96 		/* generate a block of output before travis times us out */
97 		lwsl_warn("LWS_CALLBACK_ESTABLISHED\n");
98 		pss->ring = lws_ring_create(sizeof(struct msg), RING_DEPTH,
99 					    __minimal_destroy_message);
100 		if (!pss->ring)
101 			return 1;
102 		pss->tail = 0;
103 		break;
104 
105 	case LWS_CALLBACK_SERVER_WRITEABLE:
106 
107 		lwsl_user("LWS_CALLBACK_SERVER_WRITEABLE\n");
108 
109 		if (pss->write_consume_pending) {
110 			/* perform the deferred fifo consume */
111 			lws_ring_consume_single_tail(pss->ring, &pss->tail, 1);
112 			pss->write_consume_pending = 0;
113 		}
114 
115 		pmsg = lws_ring_get_element(pss->ring, &pss->tail);
116 		if (!pmsg) {
117 			lwsl_user(" (nothing in ring)\n");
118 			break;
119 		}
120 
121 		flags = lws_write_ws_flags(
122 			    pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT,
123 			    pmsg->first, pmsg->final);
124 
125 		/* notice we allowed for LWS_PRE in the payload already */
126 		m = lws_write(wsi, ((unsigned char *)pmsg->payload) +
127 			      LWS_PRE, pmsg->len, flags);
128 		if (m < (int)pmsg->len) {
129 			lwsl_err("ERROR %d writing to ws socket\n", m);
130 			return -1;
131 		}
132 
133 		lwsl_user(" wrote %d: flags: 0x%x first: %d final %d\n",
134 				m, flags, pmsg->first, pmsg->final);
135 		/*
136 		 * Workaround deferred deflate in pmd extension by only
137 		 * consuming the fifo entry when we are certain it has been
138 		 * fully deflated at the next WRITABLE callback.  You only need
139 		 * this if you're using pmd.
140 		 */
141 		pss->write_consume_pending = 1;
142 		lws_callback_on_writable(wsi);
143 
144 		if (pss->flow_controlled &&
145 		    (int)lws_ring_get_count_free_elements(pss->ring) > RING_DEPTH - 5) {
146 			lws_rx_flow_control(wsi, 1);
147 			pss->flow_controlled = 0;
148 		}
149 
150 		if ((*vhd->options & 1) && pmsg && pmsg->final)
151 			pss->completed = 1;
152 
153 		break;
154 
155 	case LWS_CALLBACK_RECEIVE:
156 
157 		lwsl_user("LWS_CALLBACK_RECEIVE: %4d (rpp %5d, first %d, "
158 			  "last %d, bin %d, msglen %d (+ %d = %d))\n",
159 			  (int)len, (int)lws_remaining_packet_payload(wsi),
160 			  lws_is_first_fragment(wsi),
161 			  lws_is_final_fragment(wsi),
162 			  lws_frame_is_binary(wsi), pss->msglen, (int)len,
163 			  (int)pss->msglen + (int)len);
164 
165 		if (len) {
166 			;
167 			//puts((const char *)in);
168 			//lwsl_hexdump_notice(in, len);
169 		}
170 
171 		amsg.first = lws_is_first_fragment(wsi);
172 		amsg.final = lws_is_final_fragment(wsi);
173 		amsg.binary = lws_frame_is_binary(wsi);
174 		n = (int)lws_ring_get_count_free_elements(pss->ring);
175 		if (!n) {
176 			lwsl_user("dropping!\n");
177 			break;
178 		}
179 
180 		if (amsg.final)
181 			pss->msglen = 0;
182 		else
183 			pss->msglen += len;
184 
185 		amsg.len = len;
186 		/* notice we over-allocate by LWS_PRE */
187 		amsg.payload = malloc(LWS_PRE + len);
188 		if (!amsg.payload) {
189 			lwsl_user("OOM: dropping\n");
190 			break;
191 		}
192 
193 		memcpy((char *)amsg.payload + LWS_PRE, in, len);
194 		if (!lws_ring_insert(pss->ring, &amsg, 1)) {
195 			__minimal_destroy_message(&amsg);
196 			lwsl_user("dropping!\n");
197 			break;
198 		}
199 		lws_callback_on_writable(wsi);
200 
201 		if (n < 3 && !pss->flow_controlled) {
202 			pss->flow_controlled = 1;
203 			lws_rx_flow_control(wsi, 0);
204 		}
205 		break;
206 
207 	case LWS_CALLBACK_CLOSED:
208 		lwsl_user("LWS_CALLBACK_CLOSED\n");
209 		lws_ring_destroy(pss->ring);
210 
211 		if (*vhd->options & 1) {
212 			if (!*vhd->interrupted)
213 				*vhd->interrupted = 1 + pss->completed;
214 			lws_cancel_service(lws_get_context(wsi));
215 		}
216 		break;
217 
218 	default:
219 		break;
220 	}
221 
222 	return 0;
223 }
224 
225 #define LWS_PLUGIN_PROTOCOL_MINIMAL_SERVER_ECHO \
226 	{ \
227 		"lws-minimal-server-echo", \
228 		callback_minimal_server_echo, \
229 		sizeof(struct per_session_data__minimal_server_echo), \
230 		1024, \
231 		0, NULL, 0 \
232 	}
233 
234 #if !defined (LWS_PLUGIN_STATIC)
235 
236 /* boilerplate needed if we are built as a dynamic plugin */
237 
238 static const struct lws_protocols protocols[] = {
239 	LWS_PLUGIN_PROTOCOL_MINIMAL_SERVER_ECHO
240 };
241 
242 int
init_protocol_minimal_server_echo(struct lws_context * context,struct lws_plugin_capability * c)243 init_protocol_minimal_server_echo(struct lws_context *context,
244 			       struct lws_plugin_capability *c)
245 {
246 	if (c->api_magic != LWS_PLUGIN_API_MAGIC) {
247 		lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC,
248 			 c->api_magic);
249 		return 1;
250 	}
251 
252 	c->protocols = protocols;
253 	c->count_protocols = LWS_ARRAY_SIZE(protocols);
254 	c->extensions = NULL;
255 	c->count_extensions = 0;
256 
257 	return 0;
258 }
259 
260 int
destroy_protocol_minimal_server_echo(struct lws_context * context)261 destroy_protocol_minimal_server_echo(struct lws_context *context)
262 {
263 	return 0;
264 }
265 #endif
266