1# Copyright (c) 2009 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, os, re, socket, subprocess, tempfile, threading, time
6
7from autotest_lib.client.common_lib import error
8from autotest_lib.server import autotest, hosts, test
9
10
11class WolWake(threading.Thread):
12    """Class to allow waking of DUT via Wake-on-LAN capabilities (WOL)."""
13
14
15    def __init__(self, hostname, mac_addr, sleep_secs):
16        """Constructor for waking DUT.
17
18        Args:
19          mac_addr: string of mac address tuple
20          sleep_secs: seconds to sleep prior to attempting WOL
21        """
22        threading.Thread.__init__(self)
23        self._hostname = hostname
24        self._mac_addr = mac_addr
25        self._sleep_secs = sleep_secs
26
27
28    # TODO(tbroch) Borrowed from class ServoTest.  Refactor for code re-use
29    def _ping_test(self, hostname, timeout=5):
30        """Verify whether a host responds to a ping.
31
32        Args:
33          hostname: Hostname to ping.
34          timeout: Time in seconds to wait for a response.
35
36        Returns: True if success False otherwise
37        """
38        with open(os.devnull, 'w') as fnull:
39            ping_good = False
40            elapsed_time = 0
41            while not ping_good and elapsed_time < timeout:
42                ping_good = subprocess.call(
43                    ['ping', '-c', '1', '-W', str(timeout), str(hostname)],
44                    stdout=fnull, stderr=fnull) == 0
45                time.sleep(1)
46                elapsed_time += 1
47            return ping_good
48
49
50    def _send_wol_magic_packet(self):
51        """Perform Wake-on-LAN magic wake.
52
53        WOL magic packet consists of:
54          0xff repeated for 6 bytes
55          <mac addr> repeated 16 times
56
57        Sent as a broadcast packet.
58        """
59        mac_tuple = self._mac_addr.split(':')
60        assert len(mac_tuple) == 6
61        magic = '\xff' * 6
62        submagic = ''.join("%c" % int(value, 16) for value in mac_tuple)
63        magic += submagic * 16
64        assert len(magic) == 102
65
66        sock=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
67        sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
68        sock.sendto(magic, ('<broadcast>', 7))
69        sock.close()
70        logging.info("Wake thread sent WOL wakeup")
71
72
73    def run(self):
74        # ping device to make sure its network is off presumably from suspend
75        # not another malfunction.
76        ping_secs = 0
77        while self._ping_test(self._hostname, timeout=2) and \
78                ping_secs < self._sleep_secs:
79            time.sleep(1)
80            ping_secs += 1
81
82        self._send_wol_magic_packet()
83
84
85class network_EthCapsServer(test.test):
86    """test class"""
87    version = 1
88
89    def _parse_ifconfig(self, filename):
90        """Retrieve ifconfig information.
91
92        Raises
93          error.TestError if unable to parse mac address
94        """
95        self._mac_addr = None
96
97        fd = open(filename)
98        re_mac = re.compile(r'.*(HWaddr|ether)\s+(\S+:\S+:\S+:\S+:\S+:\S+).*')
99        for ln in fd.readlines():
100            logging.debug(ln)
101            mat = re.match(re_mac, ln)
102            if mat:
103                self._mac_addr = mat.group(2)
104                logging.info("mac addr = %s", self._mac_addr)
105                break
106        fd.close()
107
108        if not self._mac_addr:
109            raise error.TestError("Unable to find mac addresss")
110
111
112    def _client_cmd(self, cmd, results=None):
113        """Execute a command on the client.
114
115        Args:
116          results: string of filename to save results on client.
117
118        Returns:
119          string of filename on server side with stdout results of command
120        """
121        if results:
122            client_tmpdir = self._client.get_tmp_dir()
123            client_results = os.path.join(client_tmpdir, "%s" % results)
124            cmd = "%s > %s 2>&1" % (cmd, client_results)
125
126        logging.info("Client cmd = %s", cmd)
127        self._client.run(cmd)
128
129        if results:
130            server_tmpfile = tempfile.NamedTemporaryFile(delete=False)
131            server_tmpfile.close()
132            self._client.get_file(client_results, server_tmpfile.name)
133            return server_tmpfile.name
134
135        return None
136
137
138    def run_once(self, client_ip=None, ethname='eth0'):
139        """Run the test.
140
141        Args:
142          client_ip: string of client's ip address
143          ethname: string of ethernet device under test
144        """
145        if not client_ip:
146            raise error.TestError("Must provide client's IP address to test")
147
148        sleep_secs = 20
149
150        self._ethname = ethname
151        self._client_ip = client_ip
152        self._client = hosts.create_host(client_ip)
153        client_at = autotest.Autotest(self._client)
154
155        # retrieve ifconfig info for mac address of client
156        cmd = "ifconfig %s" % self._ethname
157        ifconfig_filename = self._client_cmd(cmd, results="ifconfig.log")
158        self._parse_ifconfig(ifconfig_filename)
159
160        # thread to wake the device using WOL
161        wol_wake = WolWake(self._client_ip, self._mac_addr, sleep_secs)
162        wol_wake.start()
163
164        # create and run client test to prepare and suspend device
165        client_at.run_test("network_EthCaps", ethname=ethname,
166                           threshold_secs=sleep_secs * 2)
167
168        wol_wake.join()
169