1 /*
2  * Small tool to check for dedupable blocks in a file or device. Basically
3  * just scans the filename for extents of the given size, checksums them,
4  * and orders them up.
5  */
6 #include <stdio.h>
7 #include <stdio.h>
8 #include <unistd.h>
9 #include <inttypes.h>
10 #include <assert.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13 #include <sys/ioctl.h>
14 #include <fcntl.h>
15 #include <string.h>
16 
17 #include "../lib/rbtree.h"
18 #include "../flist.h"
19 #include "../log.h"
20 #include "../mutex.h"
21 #include "../smalloc.h"
22 #include "../minmax.h"
23 #include "../crc/md5.h"
24 #include "../memalign.h"
25 #include "../os/os.h"
26 #include "../gettime.h"
27 #include "../fio_time.h"
28 
29 #include "../lib/bloom.h"
30 #include "debug.h"
31 
32 struct worker_thread {
33 	pthread_t thread;
34 
35 	volatile int done;
36 
37 	int fd;
38 	uint64_t cur_offset;
39 	uint64_t size;
40 
41 	unsigned long items;
42 	unsigned long dupes;
43 	int err;
44 };
45 
46 struct extent {
47 	struct flist_head list;
48 	uint64_t offset;
49 };
50 
51 struct chunk {
52 	struct rb_node rb_node;
53 	uint64_t count;
54 	uint32_t hash[MD5_HASH_WORDS];
55 	struct flist_head extent_list[0];
56 };
57 
58 struct item {
59 	uint64_t offset;
60 	uint32_t hash[MD5_HASH_WORDS];
61 };
62 
63 static struct rb_root rb_root;
64 static struct bloom *bloom;
65 static struct fio_mutex *rb_lock;
66 
67 static unsigned int blocksize = 4096;
68 static unsigned int num_threads;
69 static unsigned int chunk_size = 1048576;
70 static unsigned int dump_output;
71 static unsigned int odirect;
72 static unsigned int collision_check;
73 static unsigned int print_progress = 1;
74 static unsigned int use_bloom = 1;
75 
76 static uint64_t total_size;
77 static uint64_t cur_offset;
78 static struct fio_mutex *size_lock;
79 
80 static struct fio_file file;
81 
get_size(struct fio_file * f,struct stat * sb)82 static uint64_t get_size(struct fio_file *f, struct stat *sb)
83 {
84 	uint64_t ret;
85 
86 	if (S_ISBLK(sb->st_mode)) {
87 		unsigned long long bytes;
88 
89 		if (blockdev_size(f, &bytes)) {
90 			log_err("dedupe: failed getting bdev size\n");
91 			return 0;
92 		}
93 		ret = bytes;
94 	} else
95 		ret = sb->st_size;
96 
97 	return (ret & ~((uint64_t)blocksize - 1));
98 }
99 
get_work(uint64_t * offset,uint64_t * size)100 static int get_work(uint64_t *offset, uint64_t *size)
101 {
102 	uint64_t this_chunk;
103 	int ret = 1;
104 
105 	fio_mutex_down(size_lock);
106 
107 	if (cur_offset < total_size) {
108 		*offset = cur_offset;
109 		this_chunk = min((uint64_t)chunk_size, total_size - cur_offset);
110 		*size = this_chunk;
111 		cur_offset += this_chunk;
112 		ret = 0;
113 	}
114 
115 	fio_mutex_up(size_lock);
116 	return ret;
117 }
118 
__read_block(int fd,void * buf,off_t offset,size_t count)119 static int __read_block(int fd, void *buf, off_t offset, size_t count)
120 {
121 	ssize_t ret;
122 
123 	ret = pread(fd, buf, count, offset);
124 	if (ret < 0) {
125 		perror("pread");
126 		return 1;
127 	} else if (!ret)
128 		return 1;
129 	else if (ret != count) {
130 		log_err("dedupe: short read on block\n");
131 		return 1;
132 	}
133 
134 	return 0;
135 }
136 
read_block(int fd,void * buf,off_t offset)137 static int read_block(int fd, void *buf, off_t offset)
138 {
139 	return __read_block(fd, buf, offset, blocksize);
140 }
141 
add_item(struct chunk * c,struct item * i)142 static void add_item(struct chunk *c, struct item *i)
143 {
144 	/*
145 	 * Save some memory and don't add extent items, if we don't
146 	 * use them.
147 	 */
148 	if (dump_output || collision_check) {
149 		struct extent *e;
150 
151 		e = malloc(sizeof(*e));
152 		e->offset = i->offset;
153 		flist_add_tail(&e->list, &c->extent_list[0]);
154 	}
155 
156 	c->count++;
157 }
158 
col_check(struct chunk * c,struct item * i)159 static int col_check(struct chunk *c, struct item *i)
160 {
161 	struct extent *e;
162 	char *cbuf, *ibuf;
163 	int ret = 1;
164 
165 	cbuf = fio_memalign(blocksize, blocksize);
166 	ibuf = fio_memalign(blocksize, blocksize);
167 
168 	e = flist_entry(c->extent_list[0].next, struct extent, list);
169 	if (read_block(file.fd, cbuf, e->offset))
170 		goto out;
171 
172 	if (read_block(file.fd, ibuf, i->offset))
173 		goto out;
174 
175 	ret = memcmp(ibuf, cbuf, blocksize);
176 out:
177 	fio_memfree(cbuf, blocksize);
178 	fio_memfree(ibuf, blocksize);
179 	return ret;
180 }
181 
alloc_chunk(void)182 static struct chunk *alloc_chunk(void)
183 {
184 	struct chunk *c;
185 
186 	if (collision_check || dump_output) {
187 		c = malloc(sizeof(struct chunk) + sizeof(struct flist_head));
188 		INIT_FLIST_HEAD(&c->extent_list[0]);
189 	} else
190 		c = malloc(sizeof(struct chunk));
191 
192 	return c;
193 }
194 
insert_chunk(struct item * i)195 static void insert_chunk(struct item *i)
196 {
197 	struct rb_node **p, *parent;
198 	struct chunk *c;
199 	int diff;
200 
201 	p = &rb_root.rb_node;
202 	parent = NULL;
203 	while (*p) {
204 		parent = *p;
205 
206 		c = rb_entry(parent, struct chunk, rb_node);
207 		diff = memcmp(i->hash, c->hash, sizeof(i->hash));
208 		if (diff < 0)
209 			p = &(*p)->rb_left;
210 		else if (diff > 0)
211 			p = &(*p)->rb_right;
212 		else {
213 			int ret;
214 
215 			if (!collision_check)
216 				goto add;
217 
218 			fio_mutex_up(rb_lock);
219 			ret = col_check(c, i);
220 			fio_mutex_down(rb_lock);
221 
222 			if (!ret)
223 				goto add;
224 
225 			p = &(*p)->rb_right;
226 		}
227 	}
228 
229 	c = alloc_chunk();
230 	RB_CLEAR_NODE(&c->rb_node);
231 	c->count = 0;
232 	memcpy(c->hash, i->hash, sizeof(i->hash));
233 	rb_link_node(&c->rb_node, parent, p);
234 	rb_insert_color(&c->rb_node, &rb_root);
235 add:
236 	add_item(c, i);
237 }
238 
insert_chunks(struct item * items,unsigned int nitems,uint64_t * ndupes)239 static void insert_chunks(struct item *items, unsigned int nitems,
240 			  uint64_t *ndupes)
241 {
242 	int i;
243 
244 	fio_mutex_down(rb_lock);
245 
246 	for (i = 0; i < nitems; i++) {
247 		if (bloom) {
248 			unsigned int s;
249 			int r;
250 
251 			s = sizeof(items[i].hash) / sizeof(uint32_t);
252 			r = bloom_set(bloom, items[i].hash, s);
253 			*ndupes += r;
254 		} else
255 			insert_chunk(&items[i]);
256 	}
257 
258 	fio_mutex_up(rb_lock);
259 }
260 
crc_buf(void * buf,uint32_t * hash)261 static void crc_buf(void *buf, uint32_t *hash)
262 {
263 	struct fio_md5_ctx ctx = { .hash = hash };
264 
265 	fio_md5_init(&ctx);
266 	fio_md5_update(&ctx, buf, blocksize);
267 	fio_md5_final(&ctx);
268 }
269 
read_blocks(int fd,void * buf,off_t offset,size_t size)270 static unsigned int read_blocks(int fd, void *buf, off_t offset, size_t size)
271 {
272 	if (__read_block(fd, buf, offset, size))
273 		return 0;
274 
275 	return size / blocksize;
276 }
277 
do_work(struct worker_thread * thread,void * buf)278 static int do_work(struct worker_thread *thread, void *buf)
279 {
280 	unsigned int nblocks, i;
281 	off_t offset;
282 	int nitems = 0;
283 	uint64_t ndupes = 0;
284 	struct item *items;
285 
286 	offset = thread->cur_offset;
287 
288 	nblocks = read_blocks(thread->fd, buf, offset, min(thread->size, (uint64_t)chunk_size));
289 	if (!nblocks)
290 		return 1;
291 
292 	items = malloc(sizeof(*items) * nblocks);
293 
294 	for (i = 0; i < nblocks; i++) {
295 		void *thisptr = buf + (i * blocksize);
296 
297 		items[i].offset = offset;
298 		crc_buf(thisptr, items[i].hash);
299 		offset += blocksize;
300 		nitems++;
301 	}
302 
303 	insert_chunks(items, nitems, &ndupes);
304 
305 	free(items);
306 	thread->items += nitems;
307 	thread->dupes += ndupes;
308 	return 0;
309 }
310 
thread_fn(void * data)311 static void *thread_fn(void *data)
312 {
313 	struct worker_thread *thread = data;
314 	void *buf;
315 
316 	buf = fio_memalign(blocksize, chunk_size);
317 
318 	do {
319 		if (get_work(&thread->cur_offset, &thread->size)) {
320 			thread->err = 1;
321 			break;
322 		}
323 		if (do_work(thread, buf)) {
324 			thread->err = 1;
325 			break;
326 		}
327 	} while (1);
328 
329 	thread->done = 1;
330 	fio_memfree(buf, chunk_size);
331 	return NULL;
332 }
333 
show_progress(struct worker_thread * threads,unsigned long total)334 static void show_progress(struct worker_thread *threads, unsigned long total)
335 {
336 	unsigned long last_nitems = 0;
337 	struct timeval last_tv;
338 
339 	fio_gettime(&last_tv, NULL);
340 
341 	while (print_progress) {
342 		unsigned long this_items;
343 		unsigned long nitems = 0;
344 		uint64_t tdiff;
345 		float perc;
346 		int some_done = 0;
347 		int i;
348 
349 		for (i = 0; i < num_threads; i++) {
350 			nitems += threads[i].items;
351 			some_done = threads[i].done;
352 			if (some_done)
353 				break;
354 		}
355 
356 		if (some_done)
357 			break;
358 
359 		perc = (float) nitems / (float) total;
360 		perc *= 100.0;
361 		this_items = nitems - last_nitems;
362 		this_items *= blocksize;
363 		tdiff = mtime_since_now(&last_tv);
364 		if (tdiff) {
365 			this_items = (this_items * 1000) / (tdiff * 1024);
366 			printf("%3.2f%% done (%luKB/sec)\r", perc, this_items);
367 			last_nitems = nitems;
368 			fio_gettime(&last_tv, NULL);
369 		} else
370 			printf("%3.2f%% done\r", perc);
371 		fflush(stdout);
372 		usleep(250000);
373 	};
374 }
375 
run_dedupe_threads(struct fio_file * f,uint64_t dev_size,uint64_t * nextents,uint64_t * nchunks)376 static int run_dedupe_threads(struct fio_file *f, uint64_t dev_size,
377 			      uint64_t *nextents, uint64_t *nchunks)
378 {
379 	struct worker_thread *threads;
380 	unsigned long nitems, total_items;
381 	int i, err = 0;
382 
383 	total_size = dev_size;
384 	total_items = dev_size / blocksize;
385 	cur_offset = 0;
386 	size_lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
387 
388 	threads = malloc(num_threads * sizeof(struct worker_thread));
389 	for (i = 0; i < num_threads; i++) {
390 		memset(&threads[i], 0, sizeof(struct worker_thread));
391 		threads[i].fd = f->fd;
392 
393 		err = pthread_create(&threads[i].thread, NULL, thread_fn, &threads[i]);
394 		if (err) {
395 			log_err("fio: thread startup failed\n");
396 			break;
397 		}
398 	}
399 
400 	show_progress(threads, total_items);
401 
402 	nitems = 0;
403 	*nextents = 0;
404 	*nchunks = 1;
405 	for (i = 0; i < num_threads; i++) {
406 		void *ret;
407 		pthread_join(threads[i].thread, &ret);
408 		nitems += threads[i].items;
409 		*nchunks += threads[i].dupes;
410 	}
411 
412 	printf("Threads(%u): %lu items processed\n", num_threads, nitems);
413 
414 	*nextents = nitems;
415 	*nchunks = nitems - *nchunks;
416 
417 	fio_mutex_remove(size_lock);
418 	free(threads);
419 	return err;
420 }
421 
dedupe_check(const char * filename,uint64_t * nextents,uint64_t * nchunks)422 static int dedupe_check(const char *filename, uint64_t *nextents,
423 			uint64_t *nchunks)
424 {
425 	uint64_t dev_size;
426 	struct stat sb;
427 	int flags;
428 
429 	flags = O_RDONLY;
430 	if (odirect)
431 		flags |= OS_O_DIRECT;
432 
433 	memset(&file, 0, sizeof(file));
434 	file.file_name = strdup(filename);
435 
436 	file.fd = open(filename, flags);
437 	if (file.fd == -1) {
438 		perror("open");
439 		goto err;
440 	}
441 
442 	if (fstat(file.fd, &sb) < 0) {
443 		perror("fstat");
444 		goto err;
445 	}
446 
447 	dev_size = get_size(&file, &sb);
448 	if (!dev_size)
449 		goto err;
450 
451 	if (use_bloom) {
452 		uint64_t bloom_entries;
453 
454 		bloom_entries = 8 * (dev_size / blocksize);
455 		bloom = bloom_new(bloom_entries);
456 	}
457 
458 	printf("Will check <%s>, size <%llu>, using %u threads\n", filename, (unsigned long long) dev_size, num_threads);
459 
460 	return run_dedupe_threads(&file, dev_size, nextents, nchunks);
461 err:
462 	if (file.fd != -1)
463 		close(file.fd);
464 	free(file.file_name);
465 	return 1;
466 }
467 
show_chunk(struct chunk * c)468 static void show_chunk(struct chunk *c)
469 {
470 	struct flist_head *n;
471 	struct extent *e;
472 
473 	printf("c hash %8x %8x %8x %8x, count %lu\n", c->hash[0], c->hash[1], c->hash[2], c->hash[3], (unsigned long) c->count);
474 	flist_for_each(n, &c->extent_list[0]) {
475 		e = flist_entry(n, struct extent, list);
476 		printf("\toffset %llu\n", (unsigned long long) e->offset);
477 	}
478 }
479 
show_stat(uint64_t nextents,uint64_t nchunks)480 static void show_stat(uint64_t nextents, uint64_t nchunks)
481 {
482 	double perc, ratio;
483 
484 	printf("Extents=%lu, Unique extents=%lu\n", (unsigned long) nextents, (unsigned long) nchunks);
485 
486 	if (nchunks) {
487 		ratio = (double) nextents / (double) nchunks;
488 		printf("De-dupe ratio: 1:%3.2f\n", ratio - 1.0);
489 	} else
490 		printf("De-dupe ratio: 1:infinite\n");
491 
492 	perc = 1.00 - ((double) nchunks / (double) nextents);
493 	perc *= 100.0;
494 	printf("Fio setting: dedupe_percentage=%u\n", (int) (perc + 0.50));
495 
496 }
497 
iter_rb_tree(uint64_t * nextents,uint64_t * nchunks)498 static void iter_rb_tree(uint64_t *nextents, uint64_t *nchunks)
499 {
500 	struct rb_node *n;
501 
502 	*nchunks = *nextents = 0;
503 
504 	n = rb_first(&rb_root);
505 	if (!n)
506 		return;
507 
508 	do {
509 		struct chunk *c;
510 
511 		c = rb_entry(n, struct chunk, rb_node);
512 		(*nchunks)++;
513 		*nextents += c->count;
514 
515 		if (dump_output)
516 			show_chunk(c);
517 
518 	} while ((n = rb_next(n)) != NULL);
519 }
520 
usage(char * argv[])521 static int usage(char *argv[])
522 {
523 	log_err("Check for dedupable blocks on a device/file\n\n");
524 	log_err("%s: [options] <device or file>\n", argv[0]);
525 	log_err("\t-b\tChunk size to use\n");
526 	log_err("\t-t\tNumber of threads to use\n");
527 	log_err("\t-d\tFull extent/chunk debug output\n");
528 	log_err("\t-o\tUse O_DIRECT\n");
529 	log_err("\t-c\tFull collision check\n");
530 	log_err("\t-B\tUse probabilistic bloom filter\n");
531 	log_err("\t-p\tPrint progress indicator\n");
532 	return 1;
533 }
534 
main(int argc,char * argv[])535 int main(int argc, char *argv[])
536 {
537 	uint64_t nextents = 0, nchunks = 0;
538 	int c, ret;
539 
540 	debug_init();
541 
542 	while ((c = getopt(argc, argv, "b:t:d:o:c:p:B:")) != -1) {
543 		switch (c) {
544 		case 'b':
545 			blocksize = atoi(optarg);
546 			break;
547 		case 't':
548 			num_threads = atoi(optarg);
549 			break;
550 		case 'd':
551 			dump_output = atoi(optarg);
552 			break;
553 		case 'o':
554 			odirect = atoi(optarg);
555 			break;
556 		case 'c':
557 			collision_check = atoi(optarg);
558 			break;
559 		case 'p':
560 			print_progress = atoi(optarg);
561 			break;
562 		case 'B':
563 			use_bloom = atoi(optarg);
564 			break;
565 		case '?':
566 		default:
567 			return usage(argv);
568 		}
569 	}
570 
571 	if (collision_check || dump_output)
572 		use_bloom = 0;
573 
574 	if (!num_threads)
575 		num_threads = cpus_online();
576 
577 	if (argc == optind)
578 		return usage(argv);
579 
580 	sinit();
581 
582 	rb_root = RB_ROOT;
583 	rb_lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
584 
585 	ret = dedupe_check(argv[optind], &nextents, &nchunks);
586 
587 	if (!ret) {
588 		if (!bloom)
589 			iter_rb_tree(&nextents, &nchunks);
590 
591 		show_stat(nextents, nchunks);
592 	}
593 
594 	fio_mutex_remove(rb_lock);
595 	if (bloom)
596 		bloom_free(bloom);
597 	scleanup();
598 	return ret;
599 }
600