• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2016 syzkaller project authors. All rights reserved.
2// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
3
4package vmimpl
5
6import (
7	"bytes"
8	"fmt"
9	"io"
10	"sync"
11)
12
13type OutputMerger struct {
14	Output chan []byte
15	Err    chan error
16	teeMu  sync.Mutex
17	tee    io.Writer
18	wg     sync.WaitGroup
19}
20
21type MergerError struct {
22	Name string
23	R    io.ReadCloser
24	Err  error
25}
26
27func (err MergerError) Error() string {
28	return fmt.Sprintf("failed to read from %v: %v", err.Name, err.Err)
29}
30
31func NewOutputMerger(tee io.Writer) *OutputMerger {
32	return &OutputMerger{
33		Output: make(chan []byte, 1000),
34		Err:    make(chan error, 1),
35		tee:    tee,
36	}
37}
38
39func (merger *OutputMerger) Wait() {
40	merger.wg.Wait()
41	close(merger.Output)
42}
43
44func (merger *OutputMerger) Add(name string, r io.ReadCloser) {
45	merger.AddDecoder(name, r, nil)
46}
47
48func (merger *OutputMerger) AddDecoder(name string, r io.ReadCloser,
49	decoder func(data []byte) (start, size int, decoded []byte)) {
50	merger.wg.Add(1)
51	go func() {
52		var pending []byte
53		var proto []byte
54		var buf [4 << 10]byte
55		for {
56			n, err := r.Read(buf[:])
57			if n != 0 {
58				if decoder != nil {
59					proto = append(proto, buf[:n]...)
60					start, size, decoded := decoder(proto)
61					proto = proto[start+size:]
62					if len(decoded) != 0 {
63						merger.Output <- decoded // note: this can block
64					}
65				}
66				pending = append(pending, buf[:n]...)
67				if pos := bytes.LastIndexByte(pending, '\n'); pos != -1 {
68					out := pending[:pos+1]
69					if merger.tee != nil {
70						merger.teeMu.Lock()
71						merger.tee.Write(out)
72						merger.teeMu.Unlock()
73					}
74					select {
75					case merger.Output <- append([]byte{}, out...):
76						r := copy(pending[:], pending[pos+1:])
77						pending = pending[:r]
78					default:
79					}
80				}
81			}
82			if err != nil {
83				if len(pending) != 0 {
84					pending = append(pending, '\n')
85					if merger.tee != nil {
86						merger.teeMu.Lock()
87						merger.tee.Write(pending)
88						merger.teeMu.Unlock()
89					}
90					select {
91					case merger.Output <- pending:
92					default:
93					}
94				}
95				r.Close()
96				select {
97				case merger.Err <- MergerError{name, r, err}:
98				default:
99				}
100				merger.wg.Done()
101				return
102			}
103		}
104	}()
105}
106