1# Copyright 2018 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import base64
6import logging
7import json
8import os
9import tempfile
10
11from autotest_lib.client.bin import test
12from autotest_lib.client.bin import utils
13from autotest_lib.client.common_lib import error
14from autotest_lib.client.cros.cros_disks import CrosDisksTester
15
16
17def try_remove(filename):
18    try:
19        os.remove(filename)
20        return True
21    except OSError:
22        return False
23
24
25class CrosDisksFuseTester(CrosDisksTester):
26    """Common steps for all FUSE-based tests.
27    """
28    def __init__(self, test, test_configs):
29        super(CrosDisksFuseTester, self).__init__(test)
30        self._test_configs = test_configs
31
32    def setup_test_case(self, config):
33        pass
34
35    def teardown_test_case(self, config):
36        pass
37
38    def verify_test_case(self, config, mount_result):
39        pass
40
41    def _test_case(self, config):
42        logging.info('Testing "%s"', config['description'])
43        self.setup_test_case(config)
44        try:
45            source = config['test_mount_source_uri']
46            fstype = config.get('test_mount_filesystem_type')
47            options = config.get('test_mount_options', [])
48            expected_mount_completion = {
49                'status': config['expected_mount_status'],
50                'source_path': source,
51            }
52            if 'expected_mount_path' in config:
53                expected_mount_completion['mount_path'] = \
54                    config['expected_mount_path']
55
56            self.cros_disks.mount(source, fstype, options)
57            result = self.cros_disks.expect_mount_completion(
58                    expected_mount_completion)
59            try:
60                self.verify_test_case(config, result)
61            finally:
62                self.cros_disks.unmount(source, ['lazy'])
63        finally:
64            self.teardown_test_case(config)
65
66    def _run_all_test_cases(self):
67        try:
68            for config in self._test_configs:
69                self._test_case(config)
70        except RuntimeError:
71            cmd = 'ls -la %s' % tempfile.gettempdir()
72            logging.debug(utils.run(cmd))
73            raise
74
75    def get_tests(self):
76        return [self._run_all_test_cases]
77
78
79SSH_DIR_PATH = '/home/chronos/user/.ssh'
80AUTHORIZED_KEYS = os.path.join(SSH_DIR_PATH, 'authorized_keys')
81AUTHORIZED_KEYS_BACKUP = AUTHORIZED_KEYS + '.sshfsbak'
82
83# Some discardable SSH key.
84SSH_PRIVATE_KEY = '''-----BEGIN RSA PRIVATE KEY-----
85MIIEogIBAAKCAQEAvKhQn82O9F+SzDTYgpI+qnCD6E6cYroLvflLp8/onYdqD1xK
86ES4wDTGC68DNbS9tIo1hEjwbD79UltQT9NTmJg8DERUrQbNayYXtwxqZ2tSo1Hg5
87dpAKLd3GBhwK1Eob+bNgcqEu3iIZq+QRVtlM92Uj4vBFuy8qgvGs4x+n3lACsyk8
884GZGtiFpqTPlTZ6BOEdknZpB0K3HIZ7NjZ8uD9fXJYFuUgQhQvhp1N8aZaf7JtWr
89GLQ8Pwq6UYEVb8veHgLVAJ9p/5ko/WNVWf1h78v95pEHSYrQ0opcSDizbquW/1Fs
90Fk6elrQcKctJ1FsXMxlWYOzN31yNxcPqT6rzhQIDAQABAoIBAAcD50OZ/DfgGfBY
91ArkQQR5LYsxPqAcPzgH5dDPASnEZKPt7PhHXetfywGCN4dWujstbIIHyFDuIrNeS
92+U8AX7KIml+XPu2JgtW9kjLQGWqGv+RuuAxNnONJvORbRJfSTaoCXpLEpZ6C/Btl
93NrPZDsCgVS5KKv2j6lvGKtyjP7XHiXIXLvlhOJkpWRk4a1IBISP8HPt2w/bG0raD
94CW6e6XYYPI4ZPwMlPRympQPGo8mVpNkhFAMHKnaN+E7HplsWXvb0daAVUeCBDVId
95QSat88e7PbK2FMsinZvsCZSrHdggS+4u1h6LjMI3GO1PYEjvrMorkHz2w1KS3n1S
96n4Eas50CgYEA9JR6JCauiZqJAV4azOylZaeiClkAtsK1IG98XkHyIDfn634U7o5c
976w1Uf0zwxRKx12EPQhzKiYRtp+nPirMAZHmm+gJqExakDV7uJlHNo/6qY7m1Z8I7
98Ww/my1Oi5ASQ6Emyrpecvo8xTTl52Kf+l3mQk/EqitZLNWgkX5HdwTMCgYEAxXdh
99/DLRDrBz7b+lYahAvBCr+VqUxWdjiarpnC/NZmXIWsI7U0nFpf3H4JzEQVdu5gdV
100DKLrU8uw1dGwytgH3zA8s1VMWVg1uvVFduQk+pZeRj9ekGEHvViUEkylg3CaNCyO
1012Kl72VS8W/ls5uX74mFcx9fwc9jUue807+406GcCgYAjfpTHQFHeKG4vo5+SE9nh
102CdXrWIVRAKrWnTdYWouv/00KEQ8qm8CCYDneC6V5hEAI+M4FEzaVhIGBd94ly9qH
103ulvwNn98a7G9OwSmzQJiBWhm9qGMAFUq3wDoiye9nagF/gQPcHNP+Gn4Qhobxi2d
104gAfqYHqDEZxykL2OnRWonwKBgBvcl1+9T9ARx5mxI8WettuSQqGhTUJ5Lws6qVGX
105URT0oYtkwngi/ZdJMo2XsP1DN+uO90ocJrYhFGdm+dn1F08/gCERlP86OgKSHuYC
106lNEirFSfFlmqxyvJNsNKO0RLfAaGjvU1HLtygE096Ua/BoZPlIbCCjReUM2XWdHM
107u3xbAoGARqG0gGpNCY7pEjSQ33TdLEXV7O0S4hJiN+IKPS2q8q8k21X7ckkPxsEG
108h4dIuHzdLGZsmIXel4Mx3rvyKbboj93K3ia5rbU05keVi9duMv1PlbdQzu9mq2qu
109A5CmV2fYpStHZTHsv5BcYWxkhc4aAmvUJwyAzlWEhyijwFK5wSQ=
110-----END RSA PRIVATE KEY-----
111'''
112
113SSH_PUBLIC_KEY = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC8qFCfzY' \
114        '70X5LMNNiCkj6qcIPoTpxiugu9+Uunz+idh2oPXEoRLjANMYLrwM1tL' \
115        '20ijWESPBsPv1SW1BP01OYmDwMRFStBs1rJhe3DGpna1KjUeDl2kAot' \
116        '3cYGHArUShv5s2ByoS7eIhmr5BFW2Uz3ZSPi8EW7LyqC8azjH6feUAK' \
117        'zKTzgZka2IWmpM+VNnoE4R2SdmkHQrcchns2Nny4P19clgW5SBCFC+G' \
118        'nU3xplp/sm1asYtDw/CrpRgRVvy94eAtUAn2n/mSj9Y1VZ/WHvy/3mk' \
119        'QdJitDSilxIOLNuq5b/UWwWTp6WtBwpy0nUWxczGVZg7M3fXI3Fw+pP' \
120        'qvOF root@localhost'
121
122
123class CrosDisksSshfsTester(CrosDisksFuseTester):
124    """A tester to verify sshfs support in CrosDisks.
125    """
126    def __init__(self, test, test_configs):
127        super(CrosDisksSshfsTester, self).__init__(test, test_configs)
128
129    def setup_test_case(self, config):
130        if os.path.exists(AUTHORIZED_KEYS):
131            # Make backup of the current authorized_keys
132            utils.run('mv -f ' + AUTHORIZED_KEYS + ' ' + AUTHORIZED_KEYS_BACKUP,
133                      ignore_status=True)
134
135        self._register_key(SSH_PUBLIC_KEY)
136
137        identity = base64.b64encode(SSH_PRIVATE_KEY)
138        known_hosts = base64.b64encode(self._generate_known_hosts())
139
140        options = config.get('test_mount_options', [])
141        options.append('IdentityBase64=' + identity)
142        options.append('UserKnownHostsBase64=' + known_hosts)
143        config['test_mount_options'] = options
144
145    def teardown_test_case(self, config):
146        if os.path.exists(AUTHORIZED_KEYS_BACKUP):
147            # Restore authorized_keys from backup.
148            utils.run('mv -f ' + AUTHORIZED_KEYS_BACKUP + ' ' + AUTHORIZED_KEYS,
149                      ignore_status=True)
150
151    def verify_test_case(self, config, mount_result):
152        if 'expected_file' in config:
153            f = config['expected_file']
154            if not os.path.exists(f):
155                raise error.TestFail('Expected file "' + f + '" not found')
156
157    def _register_key(self, pubkey):
158        utils.run('sudo -u chronos mkdir -p ' + SSH_DIR_PATH,
159                  ignore_status=True)
160        utils.run('sudo -u chronos touch ' + AUTHORIZED_KEYS)
161        with open(AUTHORIZED_KEYS, 'wb') as f:
162            f.write(pubkey)
163        utils.run('sudo -u chronos chmod 0600 ' + AUTHORIZED_KEYS)
164
165    def _generate_known_hosts(self):
166        hostkey = '/mnt/stateful_partition/etc/ssh/ssh_host_ed25519_key.pub'
167        with open(hostkey, 'rb') as f:
168            keydata = f.readline().split()
169        return 'localhost {} {}\n'.format(keydata[0], keydata[1])
170
171
172class platform_CrosDisksSshfs(test.test):
173    version = 1
174
175    def run_once(self, *args, **kwargs):
176        test_configs = []
177        config_file = '%s/%s' % (self.bindir, kwargs['config_file'])
178        with open(config_file, 'rb') as f:
179            test_configs.extend(json.load(f))
180
181        tester = CrosDisksSshfsTester(self, test_configs)
182        tester.run(*args, **kwargs)
183