1 /*
2  * libhdfs engine
3  *
4  * this engine helps perform read/write operations on hdfs cluster using
5  * libhdfs. hdfs doesnot support modification of data once file is created.
6  *
7  * so to mimic that create many files of small size (e.g 256k), and this
8  * engine select a file based on the offset generated by fio.
9  *
10  * thus, random reads and writes can also be achieved with this logic.
11  *
12  */
13 
14 #include <math.h>
15 #include <hdfs.h>
16 
17 #include "../fio.h"
18 #include "../optgroup.h"
19 
20 #define CHUNCK_NAME_LENGTH_MAX 80
21 #define CHUNCK_CREATION_BUFFER_SIZE 65536
22 
23 struct hdfsio_data {
24 	hdfsFS fs;
25 	hdfsFile fp;
26 	uint64_t curr_file_id;
27 };
28 
29 struct hdfsio_options {
30 	void *pad;			/* needed because offset can't be 0 for a option defined used offsetof */
31 	char *host;
32 	char *directory;
33 	unsigned int port;
34 	unsigned int chunck_size;
35 	unsigned int single_instance;
36 	unsigned int use_direct;
37 };
38 
39 static struct fio_option options[] = {
40 	{
41 		.name	= "namenode",
42 		.lname	= "hfds namenode",
43 		.type	= FIO_OPT_STR_STORE,
44 		.off1   = offsetof(struct hdfsio_options, host),
45 		.def    = "localhost",
46 		.help	= "Namenode of the HDFS cluster",
47 		.category = FIO_OPT_C_ENGINE,
48 		.group	= FIO_OPT_G_HDFS,
49 	},
50 	{
51 		.name	= "hostname",
52 		.lname	= "hfds namenode",
53 		.type	= FIO_OPT_STR_STORE,
54 		.off1   = offsetof(struct hdfsio_options, host),
55 		.def    = "localhost",
56 		.help	= "Namenode of the HDFS cluster",
57 		.category = FIO_OPT_C_ENGINE,
58 		.group	= FIO_OPT_G_HDFS,
59 	},
60 	{
61 		.name	= "port",
62 		.lname	= "hdfs namenode port",
63 		.type	= FIO_OPT_INT,
64 		.off1	= offsetof(struct hdfsio_options, port),
65 		.def    = "9000",
66 		.minval	= 1,
67 		.maxval	= 65535,
68 		.help	= "Port used by the HDFS cluster namenode",
69 		.category = FIO_OPT_C_ENGINE,
70 		.group	= FIO_OPT_G_HDFS,
71 	},
72 	{
73 		.name	= "hdfsdirectory",
74 		.lname	= "hfds directory",
75 		.type	= FIO_OPT_STR_STORE,
76 		.off1   = offsetof(struct hdfsio_options, directory),
77 		.def    = "/",
78 		.help	= "The HDFS directory where fio will create chuncks",
79 		.category = FIO_OPT_C_ENGINE,
80 		.group	= FIO_OPT_G_HDFS,
81 	},
82 	{
83 		.name	= "chunk_size",
84 		.alias	= "chunck_size",
85 		.lname	= "Chunk size",
86 		.type	= FIO_OPT_INT,
87 		.off1	= offsetof(struct hdfsio_options, chunck_size),
88 		.def    = "1048576",
89 		.help	= "Size of individual chunck",
90 		.category = FIO_OPT_C_ENGINE,
91 		.group	= FIO_OPT_G_HDFS,
92 	},
93 	{
94 		.name	= "single_instance",
95 		.lname	= "Single Instance",
96 		.type	= FIO_OPT_BOOL,
97 		.off1	= offsetof(struct hdfsio_options, single_instance),
98 		.def    = "1",
99 		.help	= "Use a single instance",
100 		.category = FIO_OPT_C_ENGINE,
101 		.group	= FIO_OPT_G_HDFS,
102 	},
103 	{
104 		.name	= "hdfs_use_direct",
105 		.lname	= "HDFS Use Direct",
106 		.type	= FIO_OPT_BOOL,
107 		.off1	= offsetof(struct hdfsio_options, use_direct),
108 		.def    = "0",
109 		.help	= "Use readDirect instead of hdfsRead",
110 		.category = FIO_OPT_C_ENGINE,
111 		.group	= FIO_OPT_G_HDFS,
112 	},
113 	{
114 		.name	= NULL,
115 	},
116 };
117 
118 
get_chunck_name(char * dest,char * file_name,uint64_t chunk_id)119 static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) {
120 	return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id);
121 }
122 
fio_hdfsio_prep(struct thread_data * td,struct io_u * io_u)123 static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
124 {
125 	struct hdfsio_options *options = td->eo;
126 	struct hdfsio_data *hd = td->io_ops_data;
127 	unsigned long f_id;
128 	char fname[CHUNCK_NAME_LENGTH_MAX];
129 	int open_flags;
130 
131 	/* find out file id based on the offset generated by fio */
132 	f_id = floor(io_u->offset / options-> chunck_size);
133 
134 	if (f_id == hd->curr_file_id) {
135 		/* file is already open */
136 		return 0;
137 	}
138 
139 	if (hd->curr_file_id != -1) {
140 		if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
141 			log_err("hdfs: unable to close file: %s\n", strerror(errno));
142 			return errno;
143 		}
144 		hd->curr_file_id = -1;
145 	}
146 
147 	if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) {
148 		open_flags = O_RDONLY;
149 	} else if (io_u->ddir == DDIR_WRITE) {
150 		open_flags = O_WRONLY;
151 	} else {
152 		log_err("hdfs: Invalid I/O Operation\n");
153 		return 0;
154 	}
155 
156 	get_chunck_name(fname, io_u->file->file_name, f_id);
157 	hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
158 			      options->chunck_size);
159 	if(hd->fp == NULL) {
160 		log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno));
161 		return errno;
162 	}
163 	hd->curr_file_id = f_id;
164 
165 	return 0;
166 }
167 
fio_hdfsio_queue(struct thread_data * td,struct io_u * io_u)168 static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
169 {
170 	struct hdfsio_data *hd = td->io_ops_data;
171 	struct hdfsio_options *options = td->eo;
172 	int ret;
173 	unsigned long offset;
174 
175 	offset = io_u->offset % options->chunck_size;
176 
177 	if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) &&
178 	     hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) {
179 		log_err("hdfs: seek failed: %s, are you doing random write smaller than chunck size ?\n", strerror(errno));
180 		io_u->error = errno;
181 		return FIO_Q_COMPLETED;
182 	};
183 
184 	// do the IO
185 	if (io_u->ddir == DDIR_READ) {
186 		if (options->use_direct) {
187 			ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
188 		} else {
189 			ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
190 		}
191 	} else if (io_u->ddir == DDIR_WRITE) {
192 		ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
193 				io_u->xfer_buflen);
194 	} else if (io_u->ddir == DDIR_SYNC) {
195 		ret = hdfsFlush(hd->fs, hd->fp);
196 	} else {
197 		log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir);
198 		ret = EINVAL;
199 	}
200 
201 	// Check if the IO went fine, or is incomplete
202 	if (ret != (int)io_u->xfer_buflen) {
203 		if (ret >= 0) {
204 			io_u->resid = io_u->xfer_buflen - ret;
205 			io_u->error = 0;
206 			return FIO_Q_COMPLETED;
207 		} else {
208 			io_u->error = errno;
209 		}
210 	}
211 
212 	if (io_u->error)
213 		td_verror(td, io_u->error, "xfer");
214 
215 	return FIO_Q_COMPLETED;
216 }
217 
fio_hdfsio_open_file(struct thread_data * td,struct fio_file * f)218 int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
219 {
220 	if (td->o.odirect) {
221 		td->error = EINVAL;
222 		return 0;
223 	}
224 
225 	return 0;
226 }
227 
fio_hdfsio_close_file(struct thread_data * td,struct fio_file * f)228 int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
229 {
230 	struct hdfsio_data *hd = td->io_ops_data;
231 
232 	if (hd->curr_file_id != -1) {
233 		if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
234 			log_err("hdfs: unable to close file: %s\n", strerror(errno));
235 			return errno;
236 		}
237 		hd->curr_file_id = -1;
238 	}
239 	return 0;
240 }
241 
fio_hdfsio_init(struct thread_data * td)242 static int fio_hdfsio_init(struct thread_data *td)
243 {
244 	struct hdfsio_options *options = td->eo;
245 	struct hdfsio_data *hd = td->io_ops_data;
246 	struct fio_file *f;
247 	uint64_t j,k;
248 	int i, failure = 0;
249 	uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE];
250 	uint64_t bytes_left;
251 	char fname[CHUNCK_NAME_LENGTH_MAX];
252 	hdfsFile fp;
253 	hdfsFileInfo *fi;
254 	tOffset fi_size;
255 
256 	for_each_file(td, f, i) {
257 		k = 0;
258 		for(j=0; j < f->real_file_size; j += options->chunck_size) {
259 			get_chunck_name(fname, f->file_name, k++);
260 			fi = hdfsGetPathInfo(hd->fs, fname);
261 			fi_size = fi ? fi->mSize : 0;
262 			// fill exist and is big enough, nothing to do
263 			if( fi && fi_size >= options->chunck_size) {
264 				continue;
265 			}
266 			fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0,
267 					  options->chunck_size);
268 			if(fp == NULL) {
269 				failure = errno;
270 				log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
271 				break;
272 			}
273 			bytes_left = options->chunck_size;
274 			memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE);
275 			while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) {
276 				if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE)
277 				    != CHUNCK_CREATION_BUFFER_SIZE) {
278     					failure = errno;
279 	    				log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
280 					break;
281 				};
282 				bytes_left -= CHUNCK_CREATION_BUFFER_SIZE;
283 			}
284 			if(bytes_left > 0) {
285 				if( hdfsWrite(hd->fs, fp, buffer, bytes_left)
286 				    != bytes_left) {
287 					failure = errno;
288 					break;
289 				};
290 			}
291 			if( hdfsCloseFile(hd->fs, fp) != 0) {
292 				failure = errno;
293 				log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
294 				break;
295 			}
296 		}
297 		if(failure) {
298 			break;
299 		}
300 	}
301 
302 	if( !failure ) {
303 		fio_file_set_size_known(f);
304 	}
305 
306 	return failure;
307 }
308 
fio_hdfsio_setup(struct thread_data * td)309 static int fio_hdfsio_setup(struct thread_data *td)
310 {
311 	struct hdfsio_data *hd;
312 	struct fio_file *f;
313 	int i;
314 	uint64_t file_size, total_file_size;
315 
316 	if (!td->io_ops_data) {
317 		hd = malloc(sizeof(*hd));
318 		memset(hd, 0, sizeof(*hd));
319 
320 		hd->curr_file_id = -1;
321 
322 		td->io_ops_data = hd;
323 	}
324 
325 	total_file_size = 0;
326 	file_size = 0;
327 
328 	for_each_file(td, f, i) {
329 		if(!td->o.file_size_low) {
330 			file_size = floor(td->o.size / td->o.nr_files);
331 			total_file_size += file_size;
332 		}
333 		else if (td->o.file_size_low == td->o.file_size_high)
334 			file_size = td->o.file_size_low;
335 		else {
336 			file_size = get_rand_file_size(td);
337 		}
338 		f->real_file_size = file_size;
339 	}
340 	/* If the size doesn't divide nicely with the chunck size,
341 	 * make the last files bigger.
342 	 * Used only if filesize was not explicitely given
343 	 */
344 	if (!td->o.file_size_low && total_file_size < td->o.size) {
345 		f->real_file_size += (td->o.size - total_file_size);
346 	}
347 
348 	return 0;
349 }
350 
fio_hdfsio_io_u_init(struct thread_data * td,struct io_u * io_u)351 static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u)
352 {
353 	struct hdfsio_data *hd = td->io_ops_data;
354 	struct hdfsio_options *options = td->eo;
355 	int failure;
356 	struct hdfsBuilder *bld;
357 
358 	if (options->host == NULL || options->port == 0) {
359 		log_err("hdfs: server not defined\n");
360 		return EINVAL;
361 	}
362 
363 	bld = hdfsNewBuilder();
364 	if (!bld) {
365 		failure = errno;
366 		log_err("hdfs: unable to allocate connect builder\n");
367 		return failure;
368 	}
369 	hdfsBuilderSetNameNode(bld, options->host);
370 	hdfsBuilderSetNameNodePort(bld, options->port);
371 	if(! options->single_instance) {
372 		hdfsBuilderSetForceNewInstance(bld);
373 	}
374 	hd->fs = hdfsBuilderConnect(bld);
375 
376 	/* hdfsSetWorkingDirectory succeed on non existend directory */
377 	if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) {
378 		failure = errno;
379 		log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno));
380 		return failure;
381 	}
382 
383 	return 0;
384 }
385 
fio_hdfsio_io_u_free(struct thread_data * td,struct io_u * io_u)386 static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u)
387 {
388 	struct hdfsio_data *hd = td->io_ops_data;
389 
390 	if (hd->fs && hdfsDisconnect(hd->fs) < 0) {
391 		log_err("hdfs: disconnect failed: %d\n", errno);
392 	}
393 }
394 
395 static struct ioengine_ops ioengine_hdfs = {
396 	.name = "libhdfs",
397 	.version = FIO_IOOPS_VERSION,
398 	.flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL,
399 	.setup = fio_hdfsio_setup,
400 	.init = fio_hdfsio_init,
401 	.prep = fio_hdfsio_prep,
402 	.queue = fio_hdfsio_queue,
403 	.open_file = fio_hdfsio_open_file,
404 	.close_file = fio_hdfsio_close_file,
405 	.io_u_init = fio_hdfsio_io_u_init,
406 	.io_u_free = fio_hdfsio_io_u_free,
407 	.option_struct_size	= sizeof(struct hdfsio_options),
408 	.options		= options,
409 };
410 
411 
fio_hdfsio_register(void)412 static void fio_init fio_hdfsio_register(void)
413 {
414 	register_ioengine(&ioengine_hdfs);
415 }
416 
fio_hdfsio_unregister(void)417 static void fio_exit fio_hdfsio_unregister(void)
418 {
419 	unregister_ioengine(&ioengine_hdfs);
420 }
421