1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "snapuserd.h"
18 
19 #include <csignal>
20 #include <optional>
21 #include <set>
22 
23 #include <libsnapshot/snapuserd_client.h>
24 
25 namespace android {
26 namespace snapshot {
27 
28 using namespace android;
29 using namespace android::dm;
30 using android::base::unique_fd;
31 
32 #define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
33 #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
34 
Initialize(size_t size)35 void BufferSink::Initialize(size_t size) {
36     buffer_size_ = size;
37     buffer_offset_ = 0;
38     buffer_ = std::make_unique<uint8_t[]>(size);
39 }
40 
GetPayloadBuffer(size_t size)41 void* BufferSink::GetPayloadBuffer(size_t size) {
42     if ((buffer_size_ - buffer_offset_) < size) return nullptr;
43 
44     char* buffer = reinterpret_cast<char*>(GetBufPtr());
45     struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
46     return (char*)msg->payload.buf + buffer_offset_;
47 }
48 
GetBuffer(size_t requested,size_t * actual)49 void* BufferSink::GetBuffer(size_t requested, size_t* actual) {
50     void* buf = GetPayloadBuffer(requested);
51     if (!buf) {
52         *actual = 0;
53         return nullptr;
54     }
55     *actual = requested;
56     return buf;
57 }
58 
GetHeaderPtr()59 struct dm_user_header* BufferSink::GetHeaderPtr() {
60     if (!(sizeof(struct dm_user_header) <= buffer_size_)) {
61         return nullptr;
62     }
63     char* buf = reinterpret_cast<char*>(GetBufPtr());
64     struct dm_user_header* header = (struct dm_user_header*)(&(buf[0]));
65     return header;
66 }
67 
GetPayloadBufPtr()68 void* BufferSink::GetPayloadBufPtr() {
69     char* buffer = reinterpret_cast<char*>(GetBufPtr());
70     struct dm_user_message* msg = reinterpret_cast<struct dm_user_message*>(&(buffer[0]));
71     return msg->payload.buf;
72 }
73 
WorkerThread(const std::string & cow_device,const std::string & backing_device,const std::string & control_device,const std::string & misc_name,std::shared_ptr<Snapuserd> snapuserd)74 WorkerThread::WorkerThread(const std::string& cow_device, const std::string& backing_device,
75                            const std::string& control_device, const std::string& misc_name,
76                            std::shared_ptr<Snapuserd> snapuserd) {
77     cow_device_ = cow_device;
78     backing_store_device_ = backing_device;
79     control_device_ = control_device;
80     misc_name_ = misc_name;
81     snapuserd_ = snapuserd;
82     exceptions_per_area_ = (CHUNK_SIZE << SECTOR_SHIFT) / sizeof(struct disk_exception);
83 }
84 
InitializeFds()85 bool WorkerThread::InitializeFds() {
86     backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
87     if (backing_store_fd_ < 0) {
88         SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
89         return false;
90     }
91 
92     cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
93     if (cow_fd_ < 0) {
94         SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
95         return false;
96     }
97 
98     ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
99     if (ctrl_fd_ < 0) {
100         SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
101         return false;
102     }
103 
104     return true;
105 }
106 
InitReader()107 bool WorkerThread::InitReader() {
108     reader_ = std::make_unique<CowReader>();
109     if (!reader_->InitForMerge(std::move(cow_fd_))) {
110         return false;
111     }
112 
113     return true;
114 }
115 
116 // Construct kernel COW header in memory
117 // This header will be in sector 0. The IO
118 // request will always be 4k. After constructing
119 // the header, zero out the remaining block.
ConstructKernelCowHeader()120 void WorkerThread::ConstructKernelCowHeader() {
121     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
122 
123     memset(buffer, 0, BLOCK_SZ);
124 
125     struct disk_header* dh = reinterpret_cast<struct disk_header*>(buffer);
126 
127     dh->magic = SNAP_MAGIC;
128     dh->valid = SNAPSHOT_VALID;
129     dh->version = SNAPSHOT_DISK_VERSION;
130     dh->chunk_size = CHUNK_SIZE;
131 }
132 
133 // Start the replace operation. This will read the
134 // internal COW format and if the block is compressed,
135 // it will be de-compressed.
ProcessReplaceOp(const CowOperation * cow_op)136 bool WorkerThread::ProcessReplaceOp(const CowOperation* cow_op) {
137     if (!reader_->ReadData(*cow_op, &bufsink_)) {
138         SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
139         return false;
140     }
141 
142     return true;
143 }
144 
ReadFromBaseDevice(const CowOperation * cow_op)145 bool WorkerThread::ReadFromBaseDevice(const CowOperation* cow_op) {
146     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
147     if (buffer == nullptr) {
148         SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
149         return false;
150     }
151     SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block
152                     << " Source: " << cow_op->source;
153     if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ,
154                                           cow_op->source * BLOCK_SZ)) {
155         SNAP_PLOG(ERROR) << "Copy-op failed. Read from backing store: " << backing_store_device_
156                          << "at block :" << cow_op->source;
157         return false;
158     }
159 
160     return true;
161 }
162 
GetReadAheadPopulatedBuffer(const CowOperation * cow_op)163 bool WorkerThread::GetReadAheadPopulatedBuffer(const CowOperation* cow_op) {
164     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
165     if (buffer == nullptr) {
166         SNAP_LOG(ERROR) << "GetReadAheadPopulatedBuffer: Failed to get payload buffer";
167         return false;
168     }
169 
170     if (!snapuserd_->GetReadAheadPopulatedBuffer(cow_op->new_block, buffer)) {
171         return false;
172     }
173 
174     return true;
175 }
176 
177 // Start the copy operation. This will read the backing
178 // block device which is represented by cow_op->source.
ProcessCopyOp(const CowOperation * cow_op)179 bool WorkerThread::ProcessCopyOp(const CowOperation* cow_op) {
180     if (!GetReadAheadPopulatedBuffer(cow_op)) {
181         SNAP_LOG(DEBUG) << " GetReadAheadPopulatedBuffer failed..."
182                         << " new_block: " << cow_op->new_block;
183         if (!ReadFromBaseDevice(cow_op)) {
184             return false;
185         }
186     }
187 
188     return true;
189 }
190 
ProcessZeroOp()191 bool WorkerThread::ProcessZeroOp() {
192     // Zero out the entire block
193     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
194     if (buffer == nullptr) {
195         SNAP_LOG(ERROR) << "ProcessZeroOp: Failed to get payload buffer";
196         return false;
197     }
198 
199     memset(buffer, 0, BLOCK_SZ);
200     return true;
201 }
202 
ProcessCowOp(const CowOperation * cow_op)203 bool WorkerThread::ProcessCowOp(const CowOperation* cow_op) {
204     if (cow_op == nullptr) {
205         SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
206         return false;
207     }
208 
209     switch (cow_op->type) {
210         case kCowReplaceOp: {
211             return ProcessReplaceOp(cow_op);
212         }
213 
214         case kCowZeroOp: {
215             return ProcessZeroOp();
216         }
217 
218         case kCowCopyOp: {
219             return ProcessCopyOp(cow_op);
220         }
221 
222         default: {
223             SNAP_LOG(ERROR) << "Unknown operation-type found: " << cow_op->type;
224         }
225     }
226     return false;
227 }
228 
ReadUnalignedSector(sector_t sector,size_t size,std::vector<std::pair<sector_t,const CowOperation * >>::iterator & it)229 int WorkerThread::ReadUnalignedSector(
230         sector_t sector, size_t size,
231         std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
232     size_t skip_sector_size = 0;
233 
234     SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
235                     << " Aligned sector: " << it->first;
236 
237     if (!ProcessCowOp(it->second)) {
238         SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size
239                         << " Aligned sector: " << it->first;
240         return -1;
241     }
242 
243     int num_sectors_skip = sector - it->first;
244 
245     if (num_sectors_skip > 0) {
246         skip_sector_size = num_sectors_skip << SECTOR_SHIFT;
247         char* buffer = reinterpret_cast<char*>(bufsink_.GetBufPtr());
248         struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
249 
250         if (skip_sector_size == BLOCK_SZ) {
251             SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector
252                             << " Base-sector: " << it->first;
253             return -1;
254         }
255 
256         memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size,
257                 (BLOCK_SZ - skip_sector_size));
258     }
259 
260     bufsink_.ResetBufferOffset();
261     return std::min(size, (BLOCK_SZ - skip_sector_size));
262 }
263 
264 /*
265  * Read the data for a given COW Operation.
266  *
267  * Kernel can issue IO at a sector granularity.
268  * Hence, an IO may end up with reading partial
269  * data from a COW operation or we may also
270  * end up with interspersed request between
271  * two COW operations.
272  *
273  */
ReadData(sector_t sector,size_t size)274 int WorkerThread::ReadData(sector_t sector, size_t size) {
275     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
276     std::vector<std::pair<sector_t, const CowOperation*>>::iterator it;
277     /*
278      * chunk_map stores COW operation at 4k granularity.
279      * If the requested IO with the sector falls on the 4k
280      * boundary, then we can read the COW op directly without
281      * any issue.
282      *
283      * However, if the requested sector is not 4K aligned,
284      * then we will have the find the nearest COW operation
285      * and chop the 4K block to fetch the requested sector.
286      */
287     it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr),
288                           Snapuserd::compare);
289 
290     if (!(it != chunk_vec.end())) {
291         SNAP_LOG(ERROR) << "ReadData: Sector " << sector << " not found in chunk_vec";
292         return -1;
293     }
294 
295     // We didn't find the required sector; hence find the previous sector
296     // as lower_bound will gives us the value greater than
297     // the requested sector
298     if (it->first != sector) {
299         if (it != chunk_vec.begin()) {
300             --it;
301         }
302 
303         /*
304          * If the IO is spanned between two COW operations,
305          * split the IO into two parts:
306          *
307          * 1: Read the first part from the single COW op
308          * 2: Read the second part from the next COW op.
309          *
310          * Ex: Let's say we have a 1024 Bytes IO request.
311          *
312          * 0       COW OP-1  4096     COW OP-2  8192
313          * |******************|*******************|
314          *              |*****|*****|
315          *           3584           4608
316          *              <- 1024B - >
317          *
318          * We have two COW operations which are 4k blocks.
319          * The IO is requested for 1024 Bytes which are spanned
320          * between two COW operations. We will split this IO
321          * into two parts:
322          *
323          * 1: IO of size 512B from offset 3584 bytes (COW OP-1)
324          * 2: IO of size 512B from offset 4096 bytes (COW OP-2)
325          */
326         return ReadUnalignedSector(sector, size, it);
327     }
328 
329     int num_ops = DIV_ROUND_UP(size, BLOCK_SZ);
330     sector_t read_sector = sector;
331     while (num_ops) {
332         // We have to make sure that the reads are
333         // sequential; there shouldn't be a data
334         // request merged with a metadata IO.
335         if (it->first != read_sector) {
336             SNAP_LOG(ERROR) << "Invalid IO request: read_sector: " << read_sector
337                             << " cow-op sector: " << it->first;
338             return -1;
339         } else if (!ProcessCowOp(it->second)) {
340             return -1;
341         }
342         num_ops -= 1;
343         read_sector += (BLOCK_SZ >> SECTOR_SHIFT);
344 
345         it++;
346 
347         if (it == chunk_vec.end() && num_ops) {
348             SNAP_LOG(ERROR) << "Invalid IO request at sector " << sector
349                             << " COW ops completed; pending read-request: " << num_ops;
350             return -1;
351         }
352         // Update the buffer offset
353         bufsink_.UpdateBufferOffset(BLOCK_SZ);
354     }
355 
356     // Reset the buffer offset
357     bufsink_.ResetBufferOffset();
358     return size;
359 }
360 
361 /*
362  * dm-snap does prefetch reads while reading disk-exceptions.
363  * By default, prefetch value is set to 12; this means that
364  * dm-snap will issue 12 areas wherein each area is a 4k page
365  * of disk-exceptions.
366  *
367  * If during prefetch, if the chunk-id seen is beyond the
368  * actual number of metadata page, fill the buffer with zero.
369  * When dm-snap starts parsing the buffer, it will stop
370  * reading metadata page once the buffer content is zero.
371  */
ZerofillDiskExceptions(size_t read_size)372 bool WorkerThread::ZerofillDiskExceptions(size_t read_size) {
373     size_t size = exceptions_per_area_ * sizeof(struct disk_exception);
374 
375     if (read_size > size) {
376         return false;
377     }
378 
379     void* buffer = bufsink_.GetPayloadBuffer(size);
380     if (buffer == nullptr) {
381         SNAP_LOG(ERROR) << "ZerofillDiskExceptions: Failed to get payload buffer";
382         return false;
383     }
384 
385     memset(buffer, 0, size);
386     return true;
387 }
388 
389 /*
390  * A disk exception is a simple mapping of old_chunk to new_chunk.
391  * When dm-snapshot device is created, kernel requests these mapping.
392  *
393  * Each disk exception is of size 16 bytes. Thus a single 4k page can
394  * have:
395  *
396  * exceptions_per_area_ = 4096/16 = 256. This entire 4k page
397  * is considered a metadata page and it is represented by chunk ID.
398  *
399  * Convert the chunk ID to index into the vector which gives us
400  * the metadata page.
401  */
ReadDiskExceptions(chunk_t chunk,size_t read_size)402 bool WorkerThread::ReadDiskExceptions(chunk_t chunk, size_t read_size) {
403     uint32_t stride = exceptions_per_area_ + 1;
404     size_t size;
405     const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
406 
407     // ChunkID to vector index
408     lldiv_t divresult = lldiv(chunk, stride);
409 
410     if (divresult.quot < vec.size()) {
411         size = exceptions_per_area_ * sizeof(struct disk_exception);
412 
413         if (read_size != size) {
414             SNAP_LOG(ERROR) << "ReadDiskExceptions: read_size: " << read_size
415                             << " does not match with size: " << size;
416             return false;
417         }
418 
419         void* buffer = bufsink_.GetPayloadBuffer(size);
420         if (buffer == nullptr) {
421             SNAP_LOG(ERROR) << "ReadDiskExceptions: Failed to get payload buffer of size: " << size;
422             return false;
423         }
424 
425         memcpy(buffer, vec[divresult.quot].get(), size);
426     } else {
427         return ZerofillDiskExceptions(read_size);
428     }
429 
430     return true;
431 }
432 
GetMergeStartOffset(void * merged_buffer,void * unmerged_buffer,int * unmerged_exceptions)433 loff_t WorkerThread::GetMergeStartOffset(void* merged_buffer, void* unmerged_buffer,
434                                          int* unmerged_exceptions) {
435     loff_t offset = 0;
436     *unmerged_exceptions = 0;
437 
438     while (*unmerged_exceptions <= exceptions_per_area_) {
439         struct disk_exception* merged_de =
440                 reinterpret_cast<struct disk_exception*>((char*)merged_buffer + offset);
441         struct disk_exception* cow_de =
442                 reinterpret_cast<struct disk_exception*>((char*)unmerged_buffer + offset);
443 
444         // Unmerged op by the kernel
445         if (merged_de->old_chunk != 0 || merged_de->new_chunk != 0) {
446             if (!(merged_de->old_chunk == cow_de->old_chunk)) {
447                 SNAP_LOG(ERROR) << "GetMergeStartOffset: merged_de->old_chunk: "
448                                 << merged_de->old_chunk
449                                 << "cow_de->old_chunk: " << cow_de->old_chunk;
450                 return -1;
451             }
452 
453             if (!(merged_de->new_chunk == cow_de->new_chunk)) {
454                 SNAP_LOG(ERROR) << "GetMergeStartOffset: merged_de->new_chunk: "
455                                 << merged_de->new_chunk
456                                 << "cow_de->new_chunk: " << cow_de->new_chunk;
457                 return -1;
458             }
459 
460             offset += sizeof(struct disk_exception);
461             *unmerged_exceptions += 1;
462             continue;
463         }
464 
465         break;
466     }
467 
468     SNAP_LOG(DEBUG) << "Unmerged_Exceptions: " << *unmerged_exceptions << " Offset: " << offset;
469     return offset;
470 }
471 
GetNumberOfMergedOps(void * merged_buffer,void * unmerged_buffer,loff_t offset,int unmerged_exceptions,bool * copy_op,bool * commit)472 int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset,
473                                        int unmerged_exceptions, bool* copy_op, bool* commit) {
474     int merged_ops_cur_iter = 0;
475     std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
476     *copy_op = false;
477     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
478 
479     // Find the operations which are merged in this cycle.
480     while ((unmerged_exceptions + merged_ops_cur_iter) < exceptions_per_area_) {
481         struct disk_exception* merged_de =
482                 reinterpret_cast<struct disk_exception*>((char*)merged_buffer + offset);
483         struct disk_exception* cow_de =
484                 reinterpret_cast<struct disk_exception*>((char*)unmerged_buffer + offset);
485 
486         if (!(merged_de->new_chunk == 0)) {
487             SNAP_LOG(ERROR) << "GetNumberOfMergedOps: Invalid new-chunk: " << merged_de->new_chunk;
488             return -1;
489         }
490 
491         if (!(merged_de->old_chunk == 0)) {
492             SNAP_LOG(ERROR) << "GetNumberOfMergedOps: Invalid old-chunk: " << merged_de->old_chunk;
493             return -1;
494         }
495 
496         if (cow_de->new_chunk != 0) {
497             merged_ops_cur_iter += 1;
498             offset += sizeof(struct disk_exception);
499             auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
500                                        std::make_pair(ChunkToSector(cow_de->new_chunk), nullptr),
501                                        Snapuserd::compare);
502 
503             if (!(it != chunk_vec.end())) {
504                 SNAP_LOG(ERROR) << "Sector not found: " << ChunkToSector(cow_de->new_chunk);
505                 return -1;
506             }
507 
508             if (!(it->first == ChunkToSector(cow_de->new_chunk))) {
509                 SNAP_LOG(ERROR) << "Invalid sector: " << ChunkToSector(cow_de->new_chunk);
510                 return -1;
511             }
512             const CowOperation* cow_op = it->second;
513 
514             if (snapuserd_->IsReadAheadFeaturePresent() && cow_op->type == kCowCopyOp) {
515                 *copy_op = true;
516                 // Every single copy operation has to come from read-ahead
517                 // cache.
518                 if (read_ahead_buffer_map.find(cow_op->new_block) == read_ahead_buffer_map.end()) {
519                     SNAP_LOG(ERROR)
520                             << " Block: " << cow_op->new_block << " not found in read-ahead cache"
521                             << " Source: " << cow_op->source;
522                     return -1;
523                 }
524                 // If this is a final block merged in the read-ahead buffer
525                 // region, notify the read-ahead thread to make forward
526                 // progress
527                 if (cow_op->new_block == snapuserd_->GetFinalBlockMerged()) {
528                     *commit = true;
529                 }
530             }
531 
532             // zero out to indicate that operation is merged.
533             cow_de->old_chunk = 0;
534             cow_de->new_chunk = 0;
535         } else if (cow_de->old_chunk == 0) {
536             // Already merged op in previous iteration or
537             // This could also represent a partially filled area.
538             //
539             // If the op was merged in previous cycle, we don't have
540             // to count them.
541             break;
542         } else {
543             SNAP_LOG(ERROR) << "Error in merge operation. Found invalid metadata: "
544                             << " merged_de-old-chunk: " << merged_de->old_chunk
545                             << " merged_de-new-chunk: " << merged_de->new_chunk
546                             << " cow_de-old-chunk: " << cow_de->old_chunk
547                             << " cow_de-new-chunk: " << cow_de->new_chunk
548                             << " unmerged_exceptions: " << unmerged_exceptions
549                             << " merged_ops_cur_iter: " << merged_ops_cur_iter
550                             << " offset: " << offset;
551             return -1;
552         }
553     }
554     return merged_ops_cur_iter;
555 }
556 
ProcessMergeComplete(chunk_t chunk,void * buffer)557 bool WorkerThread::ProcessMergeComplete(chunk_t chunk, void* buffer) {
558     uint32_t stride = exceptions_per_area_ + 1;
559     const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
560     bool copy_op = false;
561     bool commit = false;
562 
563     // ChunkID to vector index
564     lldiv_t divresult = lldiv(chunk, stride);
565 
566     if (!(divresult.quot < vec.size())) {
567         SNAP_LOG(ERROR) << "ProcessMergeComplete: Invalid chunk: " << chunk
568                         << " Metadata-Index: " << divresult.quot << " Area-size: " << vec.size();
569         return false;
570     }
571 
572     SNAP_LOG(DEBUG) << "ProcessMergeComplete: chunk: " << chunk
573                     << " Metadata-Index: " << divresult.quot;
574 
575     int unmerged_exceptions = 0;
576     loff_t offset = GetMergeStartOffset(buffer, vec[divresult.quot].get(), &unmerged_exceptions);
577 
578     if (offset < 0) {
579         SNAP_LOG(ERROR) << "GetMergeStartOffset failed: unmerged_exceptions: "
580                         << unmerged_exceptions;
581         return false;
582     }
583 
584     int merged_ops_cur_iter = GetNumberOfMergedOps(buffer, vec[divresult.quot].get(), offset,
585                                                    unmerged_exceptions, &copy_op, &commit);
586 
587     // There should be at least one operation merged in this cycle
588     if (!(merged_ops_cur_iter > 0)) {
589         SNAP_LOG(ERROR) << "Merge operation failed: " << merged_ops_cur_iter;
590         return false;
591     }
592 
593     if (copy_op) {
594         if (commit) {
595             // Push the flushing logic to read-ahead thread so that merge thread
596             // can make forward progress. Sync will happen in the background
597             snapuserd_->StartReadAhead();
598         }
599     } else {
600         // Non-copy ops and all ops in older COW format
601         if (!snapuserd_->CommitMerge(merged_ops_cur_iter)) {
602             SNAP_LOG(ERROR) << "CommitMerge failed...";
603             return false;
604         }
605     }
606 
607     SNAP_LOG(DEBUG) << "Merge success: " << merged_ops_cur_iter << "chunk: " << chunk;
608     return true;
609 }
610 
611 // Read Header from dm-user misc device. This gives
612 // us the sector number for which IO is issued by dm-snapshot device
ReadDmUserHeader()613 bool WorkerThread::ReadDmUserHeader() {
614     if (!android::base::ReadFully(ctrl_fd_, bufsink_.GetBufPtr(), sizeof(struct dm_user_header))) {
615         if (errno != ENOTBLK) {
616             SNAP_PLOG(ERROR) << "Control-read failed";
617         }
618 
619         return false;
620     }
621 
622     return true;
623 }
624 
625 // Send the payload/data back to dm-user misc device.
WriteDmUserPayload(size_t size,bool header_response)626 bool WorkerThread::WriteDmUserPayload(size_t size, bool header_response) {
627     size_t payload_size = size;
628     void* buf = bufsink_.GetPayloadBufPtr();
629     if (header_response) {
630         payload_size += sizeof(struct dm_user_header);
631         buf = bufsink_.GetBufPtr();
632     }
633 
634     if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) {
635         SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size;
636         return false;
637     }
638 
639     return true;
640 }
641 
ReadDmUserPayload(void * buffer,size_t size)642 bool WorkerThread::ReadDmUserPayload(void* buffer, size_t size) {
643     if (!android::base::ReadFully(ctrl_fd_, buffer, size)) {
644         SNAP_PLOG(ERROR) << "ReadDmUserPayload failed size: " << size;
645         return false;
646     }
647 
648     return true;
649 }
650 
DmuserWriteRequest()651 bool WorkerThread::DmuserWriteRequest() {
652     struct dm_user_header* header = bufsink_.GetHeaderPtr();
653 
654     // device mapper has the capability to allow
655     // targets to flush the cache when writes are completed. This
656     // is controlled by each target by a flag "flush_supported".
657     // This flag is set by dm-user. When flush is supported,
658     // a number of zero-length bio's will be submitted to
659     // the target for the purpose of flushing cache. It is the
660     // responsibility of the target driver - which is dm-user in this
661     // case, to remap these bio's to the underlying device. Since,
662     // there is no underlying device for dm-user, this zero length
663     // bio's gets routed to daemon.
664     //
665     // Flush operations are generated post merge by dm-snap by having
666     // REQ_PREFLUSH flag set. Snapuser daemon doesn't have anything
667     // to flush per se; hence, just respond back with a success message.
668     if (header->sector == 0) {
669         if (!(header->len == 0)) {
670             SNAP_LOG(ERROR) << "Invalid header length received from sector 0: " << header->len;
671             header->type = DM_USER_RESP_ERROR;
672         } else {
673             header->type = DM_USER_RESP_SUCCESS;
674         }
675 
676         if (!WriteDmUserPayload(0, true)) {
677             return false;
678         }
679         return true;
680     }
681 
682     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
683     size_t remaining_size = header->len;
684     size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
685 
686     chunk_t chunk = SectorToChunk(header->sector);
687     auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
688                                std::make_pair(header->sector, nullptr), Snapuserd::compare);
689 
690     bool not_found = (it == chunk_vec.end() || it->first != header->sector);
691 
692     if (not_found) {
693         void* buffer = bufsink_.GetPayloadBuffer(read_size);
694         if (buffer == nullptr) {
695             SNAP_LOG(ERROR) << "DmuserWriteRequest: Failed to get payload buffer of size: "
696                             << read_size;
697             header->type = DM_USER_RESP_ERROR;
698         } else {
699             header->type = DM_USER_RESP_SUCCESS;
700 
701             if (!ReadDmUserPayload(buffer, read_size)) {
702                 SNAP_LOG(ERROR) << "ReadDmUserPayload failed for chunk id: " << chunk
703                                 << "Sector: " << header->sector;
704                 header->type = DM_USER_RESP_ERROR;
705             }
706 
707             if (header->type == DM_USER_RESP_SUCCESS && !ProcessMergeComplete(chunk, buffer)) {
708                 SNAP_LOG(ERROR) << "ProcessMergeComplete failed for chunk id: " << chunk
709                                 << "Sector: " << header->sector;
710                 header->type = DM_USER_RESP_ERROR;
711             }
712         }
713     } else {
714         SNAP_LOG(ERROR) << "DmuserWriteRequest: Invalid sector received: header->sector";
715         header->type = DM_USER_RESP_ERROR;
716     }
717 
718     if (!WriteDmUserPayload(0, true)) {
719         return false;
720     }
721 
722     return true;
723 }
724 
DmuserReadRequest()725 bool WorkerThread::DmuserReadRequest() {
726     struct dm_user_header* header = bufsink_.GetHeaderPtr();
727     size_t remaining_size = header->len;
728     loff_t offset = 0;
729     sector_t sector = header->sector;
730     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
731     bool header_response = true;
732     do {
733         size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
734 
735         int ret = read_size;
736         header->type = DM_USER_RESP_SUCCESS;
737         chunk_t chunk = SectorToChunk(header->sector);
738 
739         // Request to sector 0 is always for kernel
740         // representation of COW header. This IO should be only
741         // once during dm-snapshot device creation. We should
742         // never see multiple IO requests. Additionally this IO
743         // will always be a single 4k.
744         if (header->sector == 0) {
745             if (read_size == BLOCK_SZ) {
746                 ConstructKernelCowHeader();
747                 SNAP_LOG(DEBUG) << "Kernel header constructed";
748             } else {
749                 SNAP_LOG(ERROR) << "Invalid read_size: " << read_size << " for sector 0";
750                 header->type = DM_USER_RESP_ERROR;
751             }
752         } else {
753             auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
754                                        std::make_pair(header->sector, nullptr), Snapuserd::compare);
755             bool not_found = (it == chunk_vec.end() || it->first != header->sector);
756             if (!offset && (read_size == BLOCK_SZ) && not_found) {
757                 if (!ReadDiskExceptions(chunk, read_size)) {
758                     SNAP_LOG(ERROR) << "ReadDiskExceptions failed for chunk id: " << chunk
759                                     << "Sector: " << header->sector;
760                     header->type = DM_USER_RESP_ERROR;
761                 } else {
762                     SNAP_LOG(DEBUG) << "ReadDiskExceptions success for chunk id: " << chunk
763                                     << "Sector: " << header->sector;
764                 }
765             } else {
766                 chunk_t num_sectors_read = (offset >> SECTOR_SHIFT);
767 
768                 ret = ReadData(sector + num_sectors_read, read_size);
769                 if (ret < 0) {
770                     SNAP_LOG(ERROR) << "ReadData failed for chunk id: " << chunk
771                                     << " Sector: " << (sector + num_sectors_read)
772                                     << " size: " << read_size << " header-len: " << header->len;
773                     header->type = DM_USER_RESP_ERROR;
774                 } else {
775                     SNAP_LOG(DEBUG) << "ReadData success for chunk id: " << chunk
776                                     << "Sector: " << header->sector;
777                 }
778             }
779         }
780 
781         // Just return the header if it is an error
782         if (header->type == DM_USER_RESP_ERROR) {
783             SNAP_LOG(ERROR) << "IO read request failed...";
784             ret = 0;
785         }
786 
787         if (!header_response) {
788             CHECK(header->type == DM_USER_RESP_SUCCESS)
789                     << " failed for sector: " << sector << " header->len: " << header->len
790                     << " remaining_size: " << remaining_size;
791         }
792 
793         // Daemon will not be terminated if there is any error. We will
794         // just send the error back to dm-user.
795         if (!WriteDmUserPayload(ret, header_response)) {
796             return false;
797         }
798 
799         if (header->type == DM_USER_RESP_ERROR) {
800             break;
801         }
802 
803         remaining_size -= ret;
804         offset += ret;
805         header_response = false;
806     } while (remaining_size > 0);
807 
808     return true;
809 }
810 
InitializeBufsink()811 void WorkerThread::InitializeBufsink() {
812     // Allocate the buffer which is used to communicate between
813     // daemon and dm-user. The buffer comprises of header and a fixed payload.
814     // If the dm-user requests a big IO, the IO will be broken into chunks
815     // of PAYLOAD_SIZE.
816     size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_SIZE;
817     bufsink_.Initialize(buf_size);
818 }
819 
RunThread()820 bool WorkerThread::RunThread() {
821     InitializeBufsink();
822 
823     if (!InitializeFds()) {
824         return false;
825     }
826 
827     if (!InitReader()) {
828         return false;
829     }
830 
831     // Start serving IO
832     while (true) {
833         if (!ProcessIORequest()) {
834             break;
835         }
836     }
837 
838     CloseFds();
839     reader_->CloseCowFd();
840 
841     return true;
842 }
843 
ProcessIORequest()844 bool WorkerThread::ProcessIORequest() {
845     struct dm_user_header* header = bufsink_.GetHeaderPtr();
846 
847     if (!ReadDmUserHeader()) {
848         return false;
849     }
850 
851     SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq;
852     SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len;
853     SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector;
854     SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type;
855     SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags;
856 
857     switch (header->type) {
858         case DM_USER_REQ_MAP_READ: {
859             if (!DmuserReadRequest()) {
860                 return false;
861             }
862             break;
863         }
864 
865         case DM_USER_REQ_MAP_WRITE: {
866             if (!DmuserWriteRequest()) {
867                 return false;
868             }
869             break;
870         }
871     }
872 
873     return true;
874 }
875 
876 }  // namespace snapshot
877 }  // namespace android
878