1// Copyright 2016 syzkaller project authors. All rights reserved. 2// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. 3 4package vmimpl 5 6import ( 7 "bytes" 8 "fmt" 9 "io" 10 "sync" 11) 12 13type OutputMerger struct { 14 Output chan []byte 15 Err chan error 16 teeMu sync.Mutex 17 tee io.Writer 18 wg sync.WaitGroup 19} 20 21type MergerError struct { 22 Name string 23 R io.ReadCloser 24 Err error 25} 26 27func (err MergerError) Error() string { 28 return fmt.Sprintf("failed to read from %v: %v", err.Name, err.Err) 29} 30 31func NewOutputMerger(tee io.Writer) *OutputMerger { 32 return &OutputMerger{ 33 Output: make(chan []byte, 1000), 34 Err: make(chan error, 1), 35 tee: tee, 36 } 37} 38 39func (merger *OutputMerger) Wait() { 40 merger.wg.Wait() 41 close(merger.Output) 42} 43 44func (merger *OutputMerger) Add(name string, r io.ReadCloser) { 45 merger.AddDecoder(name, r, nil) 46} 47 48func (merger *OutputMerger) AddDecoder(name string, r io.ReadCloser, 49 decoder func(data []byte) (start, size int, decoded []byte)) { 50 merger.wg.Add(1) 51 go func() { 52 var pending []byte 53 var proto []byte 54 var buf [4 << 10]byte 55 for { 56 n, err := r.Read(buf[:]) 57 if n != 0 { 58 if decoder != nil { 59 proto = append(proto, buf[:n]...) 60 start, size, decoded := decoder(proto) 61 proto = proto[start+size:] 62 if len(decoded) != 0 { 63 merger.Output <- decoded // note: this can block 64 } 65 } 66 pending = append(pending, buf[:n]...) 67 if pos := bytes.LastIndexByte(pending, '\n'); pos != -1 { 68 out := pending[:pos+1] 69 if merger.tee != nil { 70 merger.teeMu.Lock() 71 merger.tee.Write(out) 72 merger.teeMu.Unlock() 73 } 74 select { 75 case merger.Output <- append([]byte{}, out...): 76 r := copy(pending[:], pending[pos+1:]) 77 pending = pending[:r] 78 default: 79 } 80 } 81 } 82 if err != nil { 83 if len(pending) != 0 { 84 pending = append(pending, '\n') 85 if merger.tee != nil { 86 merger.teeMu.Lock() 87 merger.tee.Write(pending) 88 merger.teeMu.Unlock() 89 } 90 select { 91 case merger.Output <- pending: 92 default: 93 } 94 } 95 r.Close() 96 select { 97 case merger.Err <- MergerError{name, r, err}: 98 default: 99 } 100 merger.wg.Done() 101 return 102 } 103 } 104 }() 105} 106