1 /*
2  * libhdfs engine
3  *
4  * this engine helps perform read/write operations on hdfs cluster using
5  * libhdfs. hdfs doesnot support modification of data once file is created.
6  *
7  * so to mimic that create many files of small size (e.g 256k), and this
8  * engine select a file based on the offset generated by fio.
9  *
10  * thus, random reads and writes can also be achieved with this logic.
11  *
12  * NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT
13  * to appropriate value to work this engine properly
14  *
15  */
16 
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <unistd.h>
20 #include <sys/uio.h>
21 #include <errno.h>
22 #include <assert.h>
23 
24 #include "../fio.h"
25 
26 #include "hdfs.h"
27 
28 struct hdfsio_data {
29 	char host[256];
30 	int port;
31 	hdfsFS fs;
32 	hdfsFile fp;
33 	unsigned long fsbs;
34 	unsigned long fscount;
35 	unsigned long curr_file_id;
36 	unsigned int numjobs;
37 	unsigned int fid_correction;
38 };
39 
fio_hdfsio_setup_fs_params(struct hdfsio_data * hd)40 static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd)
41 {
42 	/* make sure that hdfsConnect is invoked before executing this function */
43 	hdfsSetWorkingDirectory(hd->fs, "/.perftest");
44 	hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0);
45 	if (hd->fp) {
46 		hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount));
47 		hdfsCloseFile(hd->fs, hd->fp);
48 	}
49 	hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0);
50 	if (hd->fp) {
51 		hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs));
52 		hdfsCloseFile(hd->fs, hd->fp);
53 	}
54 
55 	return 0;
56 }
57 
fio_hdfsio_prep(struct thread_data * td,struct io_u * io_u)58 static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
59 {
60 	struct hdfsio_data *hd;
61 	hdfsFileInfo *fi;
62 	unsigned long f_id;
63 	char fname[80];
64 	int open_flags = 0;
65 
66 	hd = td->io_ops->data;
67 
68 	if (hd->curr_file_id == -1) {
69 		/* see comment in fio_hdfsio_setup() function */
70 		fio_hdfsio_setup_fs_params(hd);
71 	}
72 
73 	/* find out file id based on the offset generated by fio */
74 	f_id = (io_u->offset / hd->fsbs) + hd->fid_correction;
75 
76 	if (f_id == hd->curr_file_id) {
77 		/* file is already open */
78 		return 0;
79 	}
80 
81 	if (hd->curr_file_id != -1) {
82 		hdfsCloseFile(hd->fs, hd->fp);
83 	}
84 
85 	if (io_u->ddir == DDIR_READ) {
86 		open_flags = O_RDONLY;
87 	} else if (io_u->ddir == DDIR_WRITE) {
88 		open_flags = O_WRONLY;
89 	} else {
90 		log_err("hdfs: Invalid I/O Operation\n");
91 	}
92 
93 	hd->curr_file_id = f_id;
94 	do {
95 		sprintf(fname, ".f%lu", f_id);
96 		fi = hdfsGetPathInfo(hd->fs, fname);
97 		if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) {
98 			/* file has enough data to read OR file is opened in write mode */
99 			hd->fp =
100 			    hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
101 					 hd->fsbs);
102 			if (hd->fp) {
103 				break;
104 			}
105 		}
106 		/* file is empty, so try next file for reading */
107 		f_id = (f_id + 1) % hd->fscount;
108 	} while (1);
109 
110 	return 0;
111 }
112 
fio_io_end(struct thread_data * td,struct io_u * io_u,int ret)113 static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret)
114 {
115 	if (ret != (int)io_u->xfer_buflen) {
116 		if (ret >= 0) {
117 			io_u->resid = io_u->xfer_buflen - ret;
118 			io_u->error = 0;
119 			return FIO_Q_COMPLETED;
120 		} else
121 			io_u->error = errno;
122 	}
123 
124 	if (io_u->error)
125 		td_verror(td, io_u->error, "xfer");
126 
127 	return FIO_Q_COMPLETED;
128 }
129 
fio_hdfsio_queue(struct thread_data * td,struct io_u * io_u)130 static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
131 {
132 	struct hdfsio_data *hd;
133 	int ret = 0;
134 
135 	hd = td->io_ops->data;
136 
137 	if (io_u->ddir == DDIR_READ) {
138 		ret =
139 		    hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
140 	} else if (io_u->ddir == DDIR_WRITE) {
141 		ret =
142 		    hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
143 			      io_u->xfer_buflen);
144 	} else {
145 		log_err("hdfs: Invalid I/O Operation\n");
146 	}
147 
148 	return fio_io_end(td, io_u, ret);
149 }
150 
fio_hdfsio_open_file(struct thread_data * td,struct fio_file * f)151 int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
152 {
153 	struct hdfsio_data *hd;
154 
155 	hd = td->io_ops->data;
156 	hd->fs = hdfsConnect(hd->host, hd->port);
157 	hdfsSetWorkingDirectory(hd->fs, "/.perftest");
158 	hd->fid_correction = (getpid() % hd->numjobs);
159 
160 	return 0;
161 }
162 
fio_hdfsio_close_file(struct thread_data * td,struct fio_file * f)163 int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
164 {
165 	struct hdfsio_data *hd;
166 
167 	hd = td->io_ops->data;
168 	hdfsDisconnect(hd->fs);
169 
170 	return 0;
171 }
172 
fio_hdfsio_setup(struct thread_data * td)173 static int fio_hdfsio_setup(struct thread_data *td)
174 {
175 	struct hdfsio_data *hd;
176 	struct fio_file *f;
177 	static unsigned int numjobs = 1;	/* atleast one job has to be there! */
178 	numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs;
179 
180 	if (!td->io_ops->data) {
181 		hd = malloc(sizeof(*hd));;
182 
183 		memset(hd, 0, sizeof(*hd));
184 		td->io_ops->data = hd;
185 
186 		/* separate host and port from filename */
187 		*(strchr(td->o.filename, ',')) = ' ';
188 		sscanf(td->o.filename, "%s%d", hd->host, &(hd->port));
189 
190 		/* read fbs and fcount and based on that set f->real_file_size */
191 		f = td->files[0];
192 #if 0
193 		/* IMHO, this should be done here instead of fio_hdfsio_prep()
194 		 * but somehow calling it here doesn't seem to work,
195 		 * some problem with libhdfs that needs to be debugged */
196 		hd->fs = hdfsConnect(hd->host, hd->port);
197 		fio_hdfsio_setup_fs_params(hd);
198 		hdfsDisconnect(hd->fs);
199 #else
200 		/* so, as an alternate, using environment variables */
201 		if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) {
202 			hd->fscount = atol(getenv("FIO_HDFS_FCOUNT"));
203 			hd->fsbs = atol(getenv("FIO_HDFS_BS"));
204 		} else {
205 			log_err("FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n");
206 			return 1;
207 		}
208 #endif
209 		f->real_file_size = hd->fscount * hd->fsbs;
210 
211 		td->o.nr_files = 1;
212 		hd->curr_file_id = -1;
213 		hd->numjobs = numjobs;
214 		fio_file_set_size_known(f);
215 	}
216 
217 	return 0;
218 }
219 
220 static struct ioengine_ops ioengine_hdfs = {
221 	.name = "libhdfs",
222 	.version = FIO_IOOPS_VERSION,
223 	.setup = fio_hdfsio_setup,
224 	.prep = fio_hdfsio_prep,
225 	.queue = fio_hdfsio_queue,
226 	.open_file = fio_hdfsio_open_file,
227 	.close_file = fio_hdfsio_close_file,
228 	.flags = FIO_SYNCIO,
229 };
230 
fio_hdfsio_register(void)231 static void fio_init fio_hdfsio_register(void)
232 {
233 	register_ioengine(&ioengine_hdfs);
234 }
235 
fio_hdfsio_unregister(void)236 static void fio_exit fio_hdfsio_unregister(void)
237 {
238 	unregister_ioengine(&ioengine_hdfs);
239 }
240