1 /***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) 1998 - 2014, Daniel Stenberg, <daniel@haxx.se>, et al.
9 *
10 * This software is licensed as described in the file COPYING, which
11 * you should have received as part of this distribution. The terms
12 * are also available at http://curl.haxx.se/docs/copyright.html.
13 *
14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15 * copies of the Software, and permit persons to whom the Software is
16 * furnished to do so, under the terms of the COPYING file.
17 *
18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19 * KIND, either express or implied.
20 *
21 ***************************************************************************/
22 /* Example application source code using the multi socket interface to
23 * download many files at once.
24 *
25 * Written by Jeff Pohlmeyer
26
27 Requires glib-2.x and a (POSIX?) system that has mkfifo().
28
29 This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c"
30 sample programs, adapted to use glib's g_io_channel in place of libevent.
31
32 When running, the program creates the named pipe "hiper.fifo"
33
34 Whenever there is input into the fifo, the program reads the input as a list
35 of URL's and creates some new easy handles to fetch each URL via the
36 curl_multi "hiper" API.
37
38
39 Thus, you can try a single URL:
40 % echo http://www.yahoo.com > hiper.fifo
41
42 Or a whole bunch of them:
43 % cat my-url-list > hiper.fifo
44
45 The fifo buffer is handled almost instantly, so you can even add more URL's
46 while the previous requests are still being downloaded.
47
48 This is purely a demo app, all retrieved data is simply discarded by the write
49 callback.
50
51 */
52
53
54 #include <glib.h>
55 #include <sys/stat.h>
56 #include <unistd.h>
57 #include <fcntl.h>
58 #include <stdlib.h>
59 #include <stdio.h>
60 #include <errno.h>
61 #include <curl/curl.h>
62
63
64 #define MSG_OUT g_print /* Change to "g_error" to write to stderr */
65 #define SHOW_VERBOSE 0 /* Set to non-zero for libcurl messages */
66 #define SHOW_PROGRESS 0 /* Set to non-zero to enable progress callback */
67
68
69
70 /* Global information, common to all connections */
71 typedef struct _GlobalInfo {
72 CURLM *multi;
73 guint timer_event;
74 int still_running;
75 } GlobalInfo;
76
77
78
79 /* Information associated with a specific easy handle */
80 typedef struct _ConnInfo {
81 CURL *easy;
82 char *url;
83 GlobalInfo *global;
84 char error[CURL_ERROR_SIZE];
85 } ConnInfo;
86
87
88 /* Information associated with a specific socket */
89 typedef struct _SockInfo {
90 curl_socket_t sockfd;
91 CURL *easy;
92 int action;
93 long timeout;
94 GIOChannel *ch;
95 guint ev;
96 GlobalInfo *global;
97 } SockInfo;
98
99
100
101
102 /* Die if we get a bad CURLMcode somewhere */
mcode_or_die(const char * where,CURLMcode code)103 static void mcode_or_die(const char *where, CURLMcode code) {
104 if ( CURLM_OK != code ) {
105 const char *s;
106 switch (code) {
107 case CURLM_BAD_HANDLE: s="CURLM_BAD_HANDLE"; break;
108 case CURLM_BAD_EASY_HANDLE: s="CURLM_BAD_EASY_HANDLE"; break;
109 case CURLM_OUT_OF_MEMORY: s="CURLM_OUT_OF_MEMORY"; break;
110 case CURLM_INTERNAL_ERROR: s="CURLM_INTERNAL_ERROR"; break;
111 case CURLM_BAD_SOCKET: s="CURLM_BAD_SOCKET"; break;
112 case CURLM_UNKNOWN_OPTION: s="CURLM_UNKNOWN_OPTION"; break;
113 case CURLM_LAST: s="CURLM_LAST"; break;
114 default: s="CURLM_unknown";
115 }
116 MSG_OUT("ERROR: %s returns %s\n", where, s);
117 exit(code);
118 }
119 }
120
121
122
123 /* Check for completed transfers, and remove their easy handles */
check_multi_info(GlobalInfo * g)124 static void check_multi_info(GlobalInfo *g)
125 {
126 char *eff_url;
127 CURLMsg *msg;
128 int msgs_left;
129 ConnInfo *conn;
130 CURL *easy;
131 CURLcode res;
132
133 MSG_OUT("REMAINING: %d\n", g->still_running);
134 while ((msg = curl_multi_info_read(g->multi, &msgs_left))) {
135 if (msg->msg == CURLMSG_DONE) {
136 easy = msg->easy_handle;
137 res = msg->data.result;
138 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
139 curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
140 MSG_OUT("DONE: %s => (%d) %s\n", eff_url, res, conn->error);
141 curl_multi_remove_handle(g->multi, easy);
142 free(conn->url);
143 curl_easy_cleanup(easy);
144 free(conn);
145 }
146 }
147 }
148
149
150
151 /* Called by glib when our timeout expires */
timer_cb(gpointer data)152 static gboolean timer_cb(gpointer data)
153 {
154 GlobalInfo *g = (GlobalInfo *)data;
155 CURLMcode rc;
156
157 rc = curl_multi_socket_action(g->multi,
158 CURL_SOCKET_TIMEOUT, 0, &g->still_running);
159 mcode_or_die("timer_cb: curl_multi_socket_action", rc);
160 check_multi_info(g);
161 return FALSE;
162 }
163
164
165
166 /* Update the event timer after curl_multi library calls */
update_timeout_cb(CURLM * multi,long timeout_ms,void * userp)167 static int update_timeout_cb(CURLM *multi, long timeout_ms, void *userp)
168 {
169 struct timeval timeout;
170 GlobalInfo *g=(GlobalInfo *)userp;
171 timeout.tv_sec = timeout_ms/1000;
172 timeout.tv_usec = (timeout_ms%1000)*1000;
173
174 MSG_OUT("*** update_timeout_cb %ld => %ld:%ld ***\n",
175 timeout_ms, timeout.tv_sec, timeout.tv_usec);
176
177 g->timer_event = g_timeout_add(timeout_ms, timer_cb, g);
178 return 0;
179 }
180
181
182
183
184 /* Called by glib when we get action on a multi socket */
event_cb(GIOChannel * ch,GIOCondition condition,gpointer data)185 static gboolean event_cb(GIOChannel *ch, GIOCondition condition, gpointer data)
186 {
187 GlobalInfo *g = (GlobalInfo*) data;
188 CURLMcode rc;
189 int fd=g_io_channel_unix_get_fd(ch);
190
191 int action =
192 (condition & G_IO_IN ? CURL_CSELECT_IN : 0) |
193 (condition & G_IO_OUT ? CURL_CSELECT_OUT : 0);
194
195 rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running);
196 mcode_or_die("event_cb: curl_multi_socket_action", rc);
197
198 check_multi_info(g);
199 if(g->still_running) {
200 return TRUE;
201 } else {
202 MSG_OUT("last transfer done, kill timeout\n");
203 if (g->timer_event) { g_source_remove(g->timer_event); }
204 return FALSE;
205 }
206 }
207
208
209
210 /* Clean up the SockInfo structure */
remsock(SockInfo * f)211 static void remsock(SockInfo *f)
212 {
213 if (!f) { return; }
214 if (f->ev) { g_source_remove(f->ev); }
215 g_free(f);
216 }
217
218
219
220 /* Assign information to a SockInfo structure */
setsock(SockInfo * f,curl_socket_t s,CURL * e,int act,GlobalInfo * g)221 static void setsock(SockInfo*f, curl_socket_t s, CURL*e, int act, GlobalInfo*g)
222 {
223 GIOCondition kind =
224 (act&CURL_POLL_IN?G_IO_IN:0)|(act&CURL_POLL_OUT?G_IO_OUT:0);
225
226 f->sockfd = s;
227 f->action = act;
228 f->easy = e;
229 if (f->ev) { g_source_remove(f->ev); }
230 f->ev=g_io_add_watch(f->ch, kind, event_cb,g);
231
232 }
233
234
235
236 /* Initialize a new SockInfo structure */
addsock(curl_socket_t s,CURL * easy,int action,GlobalInfo * g)237 static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g)
238 {
239 SockInfo *fdp = g_malloc0(sizeof(SockInfo));
240
241 fdp->global = g;
242 fdp->ch=g_io_channel_unix_new(s);
243 setsock(fdp, s, easy, action, g);
244 curl_multi_assign(g->multi, s, fdp);
245 }
246
247
248
249 /* CURLMOPT_SOCKETFUNCTION */
sock_cb(CURL * e,curl_socket_t s,int what,void * cbp,void * sockp)250 static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
251 {
252 GlobalInfo *g = (GlobalInfo*) cbp;
253 SockInfo *fdp = (SockInfo*) sockp;
254 static const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE" };
255
256 MSG_OUT("socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]);
257 if (what == CURL_POLL_REMOVE) {
258 MSG_OUT("\n");
259 remsock(fdp);
260 } else {
261 if (!fdp) {
262 MSG_OUT("Adding data: %s%s\n",
263 what&CURL_POLL_IN?"READ":"",
264 what&CURL_POLL_OUT?"WRITE":"" );
265 addsock(s, e, what, g);
266 }
267 else {
268 MSG_OUT(
269 "Changing action from %d to %d\n", fdp->action, what);
270 setsock(fdp, s, e, what, g);
271 }
272 }
273 return 0;
274 }
275
276
277
278 /* CURLOPT_WRITEFUNCTION */
write_cb(void * ptr,size_t size,size_t nmemb,void * data)279 static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data)
280 {
281 size_t realsize = size * nmemb;
282 ConnInfo *conn = (ConnInfo*) data;
283 (void)ptr;
284 (void)conn;
285 return realsize;
286 }
287
288
289
290 /* CURLOPT_PROGRESSFUNCTION */
prog_cb(void * p,double dltotal,double dlnow,double ult,double uln)291 static int prog_cb (void *p, double dltotal, double dlnow, double ult, double uln)
292 {
293 ConnInfo *conn = (ConnInfo *)p;
294 MSG_OUT("Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal);
295 return 0;
296 }
297
298
299
300 /* Create a new easy handle, and add it to the global curl_multi */
new_conn(char * url,GlobalInfo * g)301 static void new_conn(char *url, GlobalInfo *g )
302 {
303 ConnInfo *conn;
304 CURLMcode rc;
305
306 conn = g_malloc0(sizeof(ConnInfo));
307
308 conn->error[0]='\0';
309
310 conn->easy = curl_easy_init();
311 if (!conn->easy) {
312 MSG_OUT("curl_easy_init() failed, exiting!\n");
313 exit(2);
314 }
315 conn->global = g;
316 conn->url = g_strdup(url);
317 curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url);
318 curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb);
319 curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, &conn);
320 curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, (long)SHOW_VERBOSE);
321 curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error);
322 curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn);
323 curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, SHOW_PROGRESS?0L:1L);
324 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSFUNCTION, prog_cb);
325 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn);
326 curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L);
327 curl_easy_setopt(conn->easy, CURLOPT_CONNECTTIMEOUT, 30L);
328 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 1L);
329 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 30L);
330
331 MSG_OUT("Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url);
332 rc =curl_multi_add_handle(g->multi, conn->easy);
333 mcode_or_die("new_conn: curl_multi_add_handle", rc);
334
335 /* note that the add_handle() will set a time-out to trigger very soon so
336 that the necessary socket_action() call will be called by this app */
337 }
338
339
340 /* This gets called by glib whenever data is received from the fifo */
fifo_cb(GIOChannel * ch,GIOCondition condition,gpointer data)341 static gboolean fifo_cb (GIOChannel *ch, GIOCondition condition, gpointer data)
342 {
343 #define BUF_SIZE 1024
344 gsize len, tp;
345 gchar *buf, *tmp, *all=NULL;
346 GIOStatus rv;
347
348 do {
349 GError *err=NULL;
350 rv = g_io_channel_read_line (ch,&buf,&len,&tp,&err);
351 if ( buf ) {
352 if (tp) { buf[tp]='\0'; }
353 new_conn(buf,(GlobalInfo*)data);
354 g_free(buf);
355 } else {
356 buf = g_malloc(BUF_SIZE+1);
357 while (TRUE) {
358 buf[BUF_SIZE]='\0';
359 g_io_channel_read_chars(ch,buf,BUF_SIZE,&len,&err);
360 if (len) {
361 buf[len]='\0';
362 if (all) {
363 tmp=all;
364 all=g_strdup_printf("%s%s", tmp, buf);
365 g_free(tmp);
366 } else {
367 all = g_strdup(buf);
368 }
369 } else {
370 break;
371 }
372 }
373 if (all) {
374 new_conn(all,(GlobalInfo*)data);
375 g_free(all);
376 }
377 g_free(buf);
378 }
379 if ( err ) {
380 g_error("fifo_cb: %s", err->message);
381 g_free(err);
382 break;
383 }
384 } while ( (len) && (rv == G_IO_STATUS_NORMAL) );
385 return TRUE;
386 }
387
388
389
390
init_fifo(void)391 int init_fifo(void)
392 {
393 struct stat st;
394 const char *fifo = "hiper.fifo";
395 int socket;
396
397 if (lstat (fifo, &st) == 0) {
398 if ((st.st_mode & S_IFMT) == S_IFREG) {
399 errno = EEXIST;
400 perror("lstat");
401 exit (1);
402 }
403 }
404
405 unlink (fifo);
406 if (mkfifo (fifo, 0600) == -1) {
407 perror("mkfifo");
408 exit (1);
409 }
410
411 socket = open (fifo, O_RDWR | O_NONBLOCK, 0);
412
413 if (socket == -1) {
414 perror("open");
415 exit (1);
416 }
417 MSG_OUT("Now, pipe some URL's into > %s\n", fifo);
418
419 return socket;
420
421 }
422
423
424
425
main(int argc,char ** argv)426 int main(int argc, char **argv)
427 {
428 GlobalInfo *g;
429 CURLMcode rc;
430 GMainLoop*gmain;
431 int fd;
432 GIOChannel* ch;
433 g=g_malloc0(sizeof(GlobalInfo));
434
435 fd=init_fifo();
436 ch=g_io_channel_unix_new(fd);
437 g_io_add_watch(ch,G_IO_IN,fifo_cb,g);
438 gmain=g_main_loop_new(NULL,FALSE);
439 g->multi = curl_multi_init();
440 curl_multi_setopt(g->multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
441 curl_multi_setopt(g->multi, CURLMOPT_SOCKETDATA, g);
442 curl_multi_setopt(g->multi, CURLMOPT_TIMERFUNCTION, update_timeout_cb);
443 curl_multi_setopt(g->multi, CURLMOPT_TIMERDATA, g);
444
445 /* we don't call any curl_multi_socket*() function yet as we have no handles
446 added! */
447
448 g_main_loop_run(gmain);
449 curl_multi_cleanup(g->multi);
450 return 0;
451 }
452