1 /***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) 1998 - 2019, Daniel Stenberg, <daniel@haxx.se>, et al.
9 *
10 * This software is licensed as described in the file COPYING, which
11 * you should have received as part of this distribution. The terms
12 * are also available at https://curl.haxx.se/docs/copyright.html.
13 *
14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15 * copies of the Software, and permit persons to whom the Software is
16 * furnished to do so, under the terms of the COPYING file.
17 *
18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19 * KIND, either express or implied.
20 *
21 ***************************************************************************/
22 /* <DESC>
23 * multi socket API usage together with with glib2
24 * </DESC>
25 */
26 /* Example application source code using the multi socket interface to
27 * download many files at once.
28 *
29 * Written by Jeff Pohlmeyer
30
31 Requires glib-2.x and a (POSIX?) system that has mkfifo().
32
33 This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c"
34 sample programs, adapted to use glib's g_io_channel in place of libevent.
35
36 When running, the program creates the named pipe "hiper.fifo"
37
38 Whenever there is input into the fifo, the program reads the input as a list
39 of URL's and creates some new easy handles to fetch each URL via the
40 curl_multi "hiper" API.
41
42
43 Thus, you can try a single URL:
44 % echo http://www.yahoo.com > hiper.fifo
45
46 Or a whole bunch of them:
47 % cat my-url-list > hiper.fifo
48
49 The fifo buffer is handled almost instantly, so you can even add more URL's
50 while the previous requests are still being downloaded.
51
52 This is purely a demo app, all retrieved data is simply discarded by the write
53 callback.
54
55 */
56
57 #include <glib.h>
58 #include <sys/stat.h>
59 #include <unistd.h>
60 #include <fcntl.h>
61 #include <stdlib.h>
62 #include <stdio.h>
63 #include <errno.h>
64 #include <curl/curl.h>
65
66 #define MSG_OUT g_print /* Change to "g_error" to write to stderr */
67 #define SHOW_VERBOSE 0 /* Set to non-zero for libcurl messages */
68 #define SHOW_PROGRESS 0 /* Set to non-zero to enable progress callback */
69
70 /* Global information, common to all connections */
71 typedef struct _GlobalInfo {
72 CURLM *multi;
73 guint timer_event;
74 int still_running;
75 } GlobalInfo;
76
77 /* Information associated with a specific easy handle */
78 typedef struct _ConnInfo {
79 CURL *easy;
80 char *url;
81 GlobalInfo *global;
82 char error[CURL_ERROR_SIZE];
83 } ConnInfo;
84
85 /* Information associated with a specific socket */
86 typedef struct _SockInfo {
87 curl_socket_t sockfd;
88 CURL *easy;
89 int action;
90 long timeout;
91 GIOChannel *ch;
92 guint ev;
93 GlobalInfo *global;
94 } SockInfo;
95
96 /* Die if we get a bad CURLMcode somewhere */
mcode_or_die(const char * where,CURLMcode code)97 static void mcode_or_die(const char *where, CURLMcode code)
98 {
99 if(CURLM_OK != code) {
100 const char *s;
101 switch(code) {
102 case CURLM_BAD_HANDLE: s = "CURLM_BAD_HANDLE"; break;
103 case CURLM_BAD_EASY_HANDLE: s = "CURLM_BAD_EASY_HANDLE"; break;
104 case CURLM_OUT_OF_MEMORY: s = "CURLM_OUT_OF_MEMORY"; break;
105 case CURLM_INTERNAL_ERROR: s = "CURLM_INTERNAL_ERROR"; break;
106 case CURLM_BAD_SOCKET: s = "CURLM_BAD_SOCKET"; break;
107 case CURLM_UNKNOWN_OPTION: s = "CURLM_UNKNOWN_OPTION"; break;
108 case CURLM_LAST: s = "CURLM_LAST"; break;
109 default: s = "CURLM_unknown";
110 }
111 MSG_OUT("ERROR: %s returns %s\n", where, s);
112 exit(code);
113 }
114 }
115
116 /* Check for completed transfers, and remove their easy handles */
check_multi_info(GlobalInfo * g)117 static void check_multi_info(GlobalInfo *g)
118 {
119 char *eff_url;
120 CURLMsg *msg;
121 int msgs_left;
122 ConnInfo *conn;
123 CURL *easy;
124 CURLcode res;
125
126 MSG_OUT("REMAINING: %d\n", g->still_running);
127 while((msg = curl_multi_info_read(g->multi, &msgs_left))) {
128 if(msg->msg == CURLMSG_DONE) {
129 easy = msg->easy_handle;
130 res = msg->data.result;
131 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
132 curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
133 MSG_OUT("DONE: %s => (%d) %s\n", eff_url, res, conn->error);
134 curl_multi_remove_handle(g->multi, easy);
135 free(conn->url);
136 curl_easy_cleanup(easy);
137 free(conn);
138 }
139 }
140 }
141
142 /* Called by glib when our timeout expires */
timer_cb(gpointer data)143 static gboolean timer_cb(gpointer data)
144 {
145 GlobalInfo *g = (GlobalInfo *)data;
146 CURLMcode rc;
147
148 rc = curl_multi_socket_action(g->multi,
149 CURL_SOCKET_TIMEOUT, 0, &g->still_running);
150 mcode_or_die("timer_cb: curl_multi_socket_action", rc);
151 check_multi_info(g);
152 return FALSE;
153 }
154
155 /* Update the event timer after curl_multi library calls */
update_timeout_cb(CURLM * multi,long timeout_ms,void * userp)156 static int update_timeout_cb(CURLM *multi, long timeout_ms, void *userp)
157 {
158 struct timeval timeout;
159 GlobalInfo *g = (GlobalInfo *)userp;
160 timeout.tv_sec = timeout_ms/1000;
161 timeout.tv_usec = (timeout_ms%1000)*1000;
162
163 MSG_OUT("*** update_timeout_cb %ld => %ld:%ld ***\n",
164 timeout_ms, timeout.tv_sec, timeout.tv_usec);
165
166 /*
167 * if timeout_ms is -1, just delete the timer
168 *
169 * For other values of timeout_ms, this should set or *update* the timer to
170 * the new value
171 */
172 if(timeout_ms >= 0)
173 g->timer_event = g_timeout_add(timeout_ms, timer_cb, g);
174 return 0;
175 }
176
177 /* Called by glib when we get action on a multi socket */
event_cb(GIOChannel * ch,GIOCondition condition,gpointer data)178 static gboolean event_cb(GIOChannel *ch, GIOCondition condition, gpointer data)
179 {
180 GlobalInfo *g = (GlobalInfo*) data;
181 CURLMcode rc;
182 int fd = g_io_channel_unix_get_fd(ch);
183
184 int action =
185 (condition & G_IO_IN ? CURL_CSELECT_IN : 0) |
186 (condition & G_IO_OUT ? CURL_CSELECT_OUT : 0);
187
188 rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running);
189 mcode_or_die("event_cb: curl_multi_socket_action", rc);
190
191 check_multi_info(g);
192 if(g->still_running) {
193 return TRUE;
194 }
195 else {
196 MSG_OUT("last transfer done, kill timeout\n");
197 if(g->timer_event) {
198 g_source_remove(g->timer_event);
199 }
200 return FALSE;
201 }
202 }
203
204 /* Clean up the SockInfo structure */
remsock(SockInfo * f)205 static void remsock(SockInfo *f)
206 {
207 if(!f) {
208 return;
209 }
210 if(f->ev) {
211 g_source_remove(f->ev);
212 }
213 g_free(f);
214 }
215
216 /* Assign information to a SockInfo structure */
setsock(SockInfo * f,curl_socket_t s,CURL * e,int act,GlobalInfo * g)217 static void setsock(SockInfo *f, curl_socket_t s, CURL *e, int act,
218 GlobalInfo *g)
219 {
220 GIOCondition kind =
221 (act&CURL_POLL_IN?G_IO_IN:0)|(act&CURL_POLL_OUT?G_IO_OUT:0);
222
223 f->sockfd = s;
224 f->action = act;
225 f->easy = e;
226 if(f->ev) {
227 g_source_remove(f->ev);
228 }
229 f->ev = g_io_add_watch(f->ch, kind, event_cb, g);
230 }
231
232 /* Initialize a new SockInfo structure */
addsock(curl_socket_t s,CURL * easy,int action,GlobalInfo * g)233 static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g)
234 {
235 SockInfo *fdp = g_malloc0(sizeof(SockInfo));
236
237 fdp->global = g;
238 fdp->ch = g_io_channel_unix_new(s);
239 setsock(fdp, s, easy, action, g);
240 curl_multi_assign(g->multi, s, fdp);
241 }
242
243 /* CURLMOPT_SOCKETFUNCTION */
sock_cb(CURL * e,curl_socket_t s,int what,void * cbp,void * sockp)244 static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
245 {
246 GlobalInfo *g = (GlobalInfo*) cbp;
247 SockInfo *fdp = (SockInfo*) sockp;
248 static const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE" };
249
250 MSG_OUT("socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]);
251 if(what == CURL_POLL_REMOVE) {
252 MSG_OUT("\n");
253 remsock(fdp);
254 }
255 else {
256 if(!fdp) {
257 MSG_OUT("Adding data: %s%s\n",
258 what&CURL_POLL_IN?"READ":"",
259 what&CURL_POLL_OUT?"WRITE":"");
260 addsock(s, e, what, g);
261 }
262 else {
263 MSG_OUT(
264 "Changing action from %d to %d\n", fdp->action, what);
265 setsock(fdp, s, e, what, g);
266 }
267 }
268 return 0;
269 }
270
271 /* CURLOPT_WRITEFUNCTION */
write_cb(void * ptr,size_t size,size_t nmemb,void * data)272 static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data)
273 {
274 size_t realsize = size * nmemb;
275 ConnInfo *conn = (ConnInfo*) data;
276 (void)ptr;
277 (void)conn;
278 return realsize;
279 }
280
281 /* CURLOPT_PROGRESSFUNCTION */
prog_cb(void * p,double dltotal,double dlnow,double ult,double uln)282 static int prog_cb(void *p, double dltotal, double dlnow, double ult,
283 double uln)
284 {
285 ConnInfo *conn = (ConnInfo *)p;
286 MSG_OUT("Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal);
287 return 0;
288 }
289
290 /* Create a new easy handle, and add it to the global curl_multi */
new_conn(char * url,GlobalInfo * g)291 static void new_conn(char *url, GlobalInfo *g)
292 {
293 ConnInfo *conn;
294 CURLMcode rc;
295
296 conn = g_malloc0(sizeof(ConnInfo));
297 conn->error[0]='\0';
298 conn->easy = curl_easy_init();
299 if(!conn->easy) {
300 MSG_OUT("curl_easy_init() failed, exiting!\n");
301 exit(2);
302 }
303 conn->global = g;
304 conn->url = g_strdup(url);
305 curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url);
306 curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb);
307 curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, &conn);
308 curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, (long)SHOW_VERBOSE);
309 curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error);
310 curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn);
311 curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, SHOW_PROGRESS?0L:1L);
312 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSFUNCTION, prog_cb);
313 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn);
314 curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L);
315 curl_easy_setopt(conn->easy, CURLOPT_CONNECTTIMEOUT, 30L);
316 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 1L);
317 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 30L);
318
319 MSG_OUT("Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url);
320 rc = curl_multi_add_handle(g->multi, conn->easy);
321 mcode_or_die("new_conn: curl_multi_add_handle", rc);
322
323 /* note that the add_handle() will set a time-out to trigger very soon so
324 that the necessary socket_action() call will be called by this app */
325 }
326
327 /* This gets called by glib whenever data is received from the fifo */
fifo_cb(GIOChannel * ch,GIOCondition condition,gpointer data)328 static gboolean fifo_cb(GIOChannel *ch, GIOCondition condition, gpointer data)
329 {
330 #define BUF_SIZE 1024
331 gsize len, tp;
332 gchar *buf, *tmp, *all = NULL;
333 GIOStatus rv;
334
335 do {
336 GError *err = NULL;
337 rv = g_io_channel_read_line(ch, &buf, &len, &tp, &err);
338 if(buf) {
339 if(tp) {
340 buf[tp]='\0';
341 }
342 new_conn(buf, (GlobalInfo*)data);
343 g_free(buf);
344 }
345 else {
346 buf = g_malloc(BUF_SIZE + 1);
347 while(TRUE) {
348 buf[BUF_SIZE]='\0';
349 g_io_channel_read_chars(ch, buf, BUF_SIZE, &len, &err);
350 if(len) {
351 buf[len]='\0';
352 if(all) {
353 tmp = all;
354 all = g_strdup_printf("%s%s", tmp, buf);
355 g_free(tmp);
356 }
357 else {
358 all = g_strdup(buf);
359 }
360 }
361 else {
362 break;
363 }
364 }
365 if(all) {
366 new_conn(all, (GlobalInfo*)data);
367 g_free(all);
368 }
369 g_free(buf);
370 }
371 if(err) {
372 g_error("fifo_cb: %s", err->message);
373 g_free(err);
374 break;
375 }
376 } while((len) && (rv == G_IO_STATUS_NORMAL));
377 return TRUE;
378 }
379
init_fifo(void)380 int init_fifo(void)
381 {
382 struct stat st;
383 const char *fifo = "hiper.fifo";
384 int socket;
385
386 if(lstat (fifo, &st) == 0) {
387 if((st.st_mode & S_IFMT) == S_IFREG) {
388 errno = EEXIST;
389 perror("lstat");
390 exit(1);
391 }
392 }
393
394 unlink(fifo);
395 if(mkfifo (fifo, 0600) == -1) {
396 perror("mkfifo");
397 exit(1);
398 }
399
400 socket = open(fifo, O_RDWR | O_NONBLOCK, 0);
401
402 if(socket == -1) {
403 perror("open");
404 exit(1);
405 }
406 MSG_OUT("Now, pipe some URL's into > %s\n", fifo);
407
408 return socket;
409 }
410
main(int argc,char ** argv)411 int main(int argc, char **argv)
412 {
413 GlobalInfo *g;
414 CURLMcode rc;
415 GMainLoop*gmain;
416 int fd;
417 GIOChannel* ch;
418 g = g_malloc0(sizeof(GlobalInfo));
419
420 fd = init_fifo();
421 ch = g_io_channel_unix_new(fd);
422 g_io_add_watch(ch, G_IO_IN, fifo_cb, g);
423 gmain = g_main_loop_new(NULL, FALSE);
424 g->multi = curl_multi_init();
425 curl_multi_setopt(g->multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
426 curl_multi_setopt(g->multi, CURLMOPT_SOCKETDATA, g);
427 curl_multi_setopt(g->multi, CURLMOPT_TIMERFUNCTION, update_timeout_cb);
428 curl_multi_setopt(g->multi, CURLMOPT_TIMERDATA, g);
429
430 /* we don't call any curl_multi_socket*() function yet as we have no handles
431 added! */
432
433 g_main_loop_run(gmain);
434 curl_multi_cleanup(g->multi);
435 return 0;
436 }
437