1# Copyright 2018 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import base64 6import logging 7import json 8import os 9import tempfile 10 11from autotest_lib.client.bin import test 12from autotest_lib.client.bin import utils 13from autotest_lib.client.common_lib import error 14from autotest_lib.client.cros.cros_disks import CrosDisksTester 15 16 17def try_remove(filename): 18 try: 19 os.remove(filename) 20 return True 21 except OSError: 22 return False 23 24 25class CrosDisksFuseTester(CrosDisksTester): 26 """Common steps for all FUSE-based tests. 27 """ 28 def __init__(self, test, test_configs): 29 super(CrosDisksFuseTester, self).__init__(test) 30 self._test_configs = test_configs 31 32 def setup_test_case(self, config): 33 pass 34 35 def teardown_test_case(self, config): 36 pass 37 38 def verify_test_case(self, config, mount_result): 39 pass 40 41 def _test_case(self, config): 42 logging.info('Testing "%s"', config['description']) 43 self.setup_test_case(config) 44 try: 45 source = config['test_mount_source_uri'] 46 fstype = config.get('test_mount_filesystem_type') 47 options = config.get('test_mount_options', []) 48 expected_mount_completion = { 49 'status': config['expected_mount_status'], 50 'source_path': source, 51 } 52 if 'expected_mount_path' in config: 53 expected_mount_completion['mount_path'] = \ 54 config['expected_mount_path'] 55 56 self.cros_disks.mount(source, fstype, options) 57 result = self.cros_disks.expect_mount_completion( 58 expected_mount_completion) 59 try: 60 self.verify_test_case(config, result) 61 finally: 62 self.cros_disks.unmount(source, ['lazy']) 63 finally: 64 self.teardown_test_case(config) 65 66 def _run_all_test_cases(self): 67 try: 68 for config in self._test_configs: 69 self._test_case(config) 70 except RuntimeError: 71 cmd = 'ls -la %s' % tempfile.gettempdir() 72 logging.debug(utils.run(cmd)) 73 raise 74 75 def get_tests(self): 76 return [self._run_all_test_cases] 77 78 79SSH_DIR_PATH = '/home/chronos/user/.ssh' 80AUTHORIZED_KEYS = os.path.join(SSH_DIR_PATH, 'authorized_keys') 81AUTHORIZED_KEYS_BACKUP = AUTHORIZED_KEYS + '.sshfsbak' 82 83# Some discardable SSH key. 84SSH_PRIVATE_KEY = '''-----BEGIN RSA PRIVATE KEY----- 85MIIEogIBAAKCAQEAvKhQn82O9F+SzDTYgpI+qnCD6E6cYroLvflLp8/onYdqD1xK 86ES4wDTGC68DNbS9tIo1hEjwbD79UltQT9NTmJg8DERUrQbNayYXtwxqZ2tSo1Hg5 87dpAKLd3GBhwK1Eob+bNgcqEu3iIZq+QRVtlM92Uj4vBFuy8qgvGs4x+n3lACsyk8 884GZGtiFpqTPlTZ6BOEdknZpB0K3HIZ7NjZ8uD9fXJYFuUgQhQvhp1N8aZaf7JtWr 89GLQ8Pwq6UYEVb8veHgLVAJ9p/5ko/WNVWf1h78v95pEHSYrQ0opcSDizbquW/1Fs 90Fk6elrQcKctJ1FsXMxlWYOzN31yNxcPqT6rzhQIDAQABAoIBAAcD50OZ/DfgGfBY 91ArkQQR5LYsxPqAcPzgH5dDPASnEZKPt7PhHXetfywGCN4dWujstbIIHyFDuIrNeS 92+U8AX7KIml+XPu2JgtW9kjLQGWqGv+RuuAxNnONJvORbRJfSTaoCXpLEpZ6C/Btl 93NrPZDsCgVS5KKv2j6lvGKtyjP7XHiXIXLvlhOJkpWRk4a1IBISP8HPt2w/bG0raD 94CW6e6XYYPI4ZPwMlPRympQPGo8mVpNkhFAMHKnaN+E7HplsWXvb0daAVUeCBDVId 95QSat88e7PbK2FMsinZvsCZSrHdggS+4u1h6LjMI3GO1PYEjvrMorkHz2w1KS3n1S 96n4Eas50CgYEA9JR6JCauiZqJAV4azOylZaeiClkAtsK1IG98XkHyIDfn634U7o5c 976w1Uf0zwxRKx12EPQhzKiYRtp+nPirMAZHmm+gJqExakDV7uJlHNo/6qY7m1Z8I7 98Ww/my1Oi5ASQ6Emyrpecvo8xTTl52Kf+l3mQk/EqitZLNWgkX5HdwTMCgYEAxXdh 99/DLRDrBz7b+lYahAvBCr+VqUxWdjiarpnC/NZmXIWsI7U0nFpf3H4JzEQVdu5gdV 100DKLrU8uw1dGwytgH3zA8s1VMWVg1uvVFduQk+pZeRj9ekGEHvViUEkylg3CaNCyO 1012Kl72VS8W/ls5uX74mFcx9fwc9jUue807+406GcCgYAjfpTHQFHeKG4vo5+SE9nh 102CdXrWIVRAKrWnTdYWouv/00KEQ8qm8CCYDneC6V5hEAI+M4FEzaVhIGBd94ly9qH 103ulvwNn98a7G9OwSmzQJiBWhm9qGMAFUq3wDoiye9nagF/gQPcHNP+Gn4Qhobxi2d 104gAfqYHqDEZxykL2OnRWonwKBgBvcl1+9T9ARx5mxI8WettuSQqGhTUJ5Lws6qVGX 105URT0oYtkwngi/ZdJMo2XsP1DN+uO90ocJrYhFGdm+dn1F08/gCERlP86OgKSHuYC 106lNEirFSfFlmqxyvJNsNKO0RLfAaGjvU1HLtygE096Ua/BoZPlIbCCjReUM2XWdHM 107u3xbAoGARqG0gGpNCY7pEjSQ33TdLEXV7O0S4hJiN+IKPS2q8q8k21X7ckkPxsEG 108h4dIuHzdLGZsmIXel4Mx3rvyKbboj93K3ia5rbU05keVi9duMv1PlbdQzu9mq2qu 109A5CmV2fYpStHZTHsv5BcYWxkhc4aAmvUJwyAzlWEhyijwFK5wSQ= 110-----END RSA PRIVATE KEY----- 111''' 112 113SSH_PUBLIC_KEY = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC8qFCfzY' \ 114 '70X5LMNNiCkj6qcIPoTpxiugu9+Uunz+idh2oPXEoRLjANMYLrwM1tL' \ 115 '20ijWESPBsPv1SW1BP01OYmDwMRFStBs1rJhe3DGpna1KjUeDl2kAot' \ 116 '3cYGHArUShv5s2ByoS7eIhmr5BFW2Uz3ZSPi8EW7LyqC8azjH6feUAK' \ 117 'zKTzgZka2IWmpM+VNnoE4R2SdmkHQrcchns2Nny4P19clgW5SBCFC+G' \ 118 'nU3xplp/sm1asYtDw/CrpRgRVvy94eAtUAn2n/mSj9Y1VZ/WHvy/3mk' \ 119 'QdJitDSilxIOLNuq5b/UWwWTp6WtBwpy0nUWxczGVZg7M3fXI3Fw+pP' \ 120 'qvOF root@localhost' 121 122 123class CrosDisksSshfsTester(CrosDisksFuseTester): 124 """A tester to verify sshfs support in CrosDisks. 125 """ 126 def __init__(self, test, test_configs): 127 super(CrosDisksSshfsTester, self).__init__(test, test_configs) 128 129 def setup_test_case(self, config): 130 if os.path.exists(AUTHORIZED_KEYS): 131 # Make backup of the current authorized_keys 132 utils.run('mv -f ' + AUTHORIZED_KEYS + ' ' + AUTHORIZED_KEYS_BACKUP, 133 ignore_status=True) 134 135 self._register_key(SSH_PUBLIC_KEY) 136 137 identity = base64.b64encode(SSH_PRIVATE_KEY) 138 known_hosts = base64.b64encode(self._generate_known_hosts()) 139 140 options = config.get('test_mount_options', []) 141 options.append('IdentityBase64=' + identity) 142 options.append('UserKnownHostsBase64=' + known_hosts) 143 config['test_mount_options'] = options 144 145 def teardown_test_case(self, config): 146 if os.path.exists(AUTHORIZED_KEYS_BACKUP): 147 # Restore authorized_keys from backup. 148 utils.run('mv -f ' + AUTHORIZED_KEYS_BACKUP + ' ' + AUTHORIZED_KEYS, 149 ignore_status=True) 150 151 def verify_test_case(self, config, mount_result): 152 if 'expected_file' in config: 153 f = config['expected_file'] 154 if not os.path.exists(f): 155 raise error.TestFail('Expected file "' + f + '" not found') 156 157 def _register_key(self, pubkey): 158 utils.run('sudo -u chronos mkdir -p ' + SSH_DIR_PATH, 159 ignore_status=True) 160 utils.run('sudo -u chronos touch ' + AUTHORIZED_KEYS) 161 with open(AUTHORIZED_KEYS, 'wb') as f: 162 f.write(pubkey) 163 utils.run('sudo -u chronos chmod 0600 ' + AUTHORIZED_KEYS) 164 165 def _generate_known_hosts(self): 166 hostkey = '/mnt/stateful_partition/etc/ssh/ssh_host_ed25519_key.pub' 167 with open(hostkey, 'rb') as f: 168 keydata = f.readline().split() 169 return 'localhost {} {}\n'.format(keydata[0], keydata[1]) 170 171 172class platform_CrosDisksSshfs(test.test): 173 version = 1 174 175 def run_once(self, *args, **kwargs): 176 test_configs = [] 177 config_file = '%s/%s' % (self.bindir, kwargs['config_file']) 178 with open(config_file, 'rb') as f: 179 test_configs.extend(json.load(f)) 180 181 tester = CrosDisksSshfsTester(self, test_configs) 182 tester.run(*args, **kwargs) 183