# Copyright 2018 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import BaseHTTPServer import base64 import binascii import thread import urlparse from string import Template from xml.dom import minidom def _split_url(url): """Splits a URL into the URL base and path.""" split_url = urlparse.urlsplit(url) url_base = urlparse.urlunsplit( (split_url.scheme, split_url.netloc, '', '', '')) url_path = split_url.path return url_base, url_path.lstrip('/') class NanoOmahaDevserver(object): """A simple Omaha instance that can be setup on a DUT in client tests.""" def __init__(self, eol=False, failures_per_url=1, backoff=False, num_urls=2): """ Create a nano omaha devserver. @param eol: True if we should return a response with _eol flag. @param failures_per_url: how many times each url can fail. @param backoff: Whether we should wait a while before trying to update again after a failure. @param num_urls: The number of URLs in the omaha response. """ self._eol = eol self._failures_per_url = failures_per_url self._backoff = backoff self._num_urls = num_urls def create_update_response(self, appid): """ Create an update response using the values from set_image_params(). @param appid: the appid parsed from the request. @returns: a string of the response this server should send. """ EOL_TEMPLATE = Template(""" """) RESPONSE_TEMPLATE = Template(""" $PER_URL_TAGS """) PER_URL_TEMPLATE = Template('') FLAG_TEMPLATE = Template('$key="$value"') ROLLBACK_TEMPLATE = Template(""" _firmware_version="$fw" _firmware_version_0="$fw0" _firmware_version_1="$fw1" _firmware_version_2="$fw2" _firmware_version_3="$fw3" _firmware_version_4="$fw4" _kernel_version="$kern" _kernel_version_0="$kern0" _kernel_version_1="$kern1" _kernel_version_2="$kern2" _kernel_version_3="$kern3" _kernel_version_4="$kern4" _rollback="$is_rollback" """) # IF EOL, return a simplified response with _eol tag. if self._eol: return EOL_TEMPLATE.substitute(appid=appid) template_keys = {} template_keys['is_delta'] = str(self._is_delta).lower() template_keys['build_number'] = self._build template_keys['sha256'] = ( binascii.hexlify(base64.b64decode(self._sha256))) template_keys['image_size'] = self._image_size template_keys['failures_per_url'] = self._failures_per_url template_keys['disable_backoff'] = str(not self._backoff).lower() template_keys['num_urls'] = self._num_urls template_keys['appid'] = appid (base, name) = _split_url(self._image_url) template_keys['base'] = base template_keys['image_name'] = name # For now, set all version flags to the same value. if self._is_rollback: fw_val = '5' k_val = '7' rollback_flags = ROLLBACK_TEMPLATE.substitute( fw=fw_val, fw0=fw_val, fw1=fw_val, fw2=fw_val, fw3=fw_val, fw4=fw_val, kern=k_val, kern0=k_val, kern1=k_val, kern2=k_val, kern3=k_val, kern4=k_val, is_rollback='true') else: rollback_flags = '' template_keys['ROLLBACK_FLAGS'] = rollback_flags per_url = '' for i in xrange(self._num_urls): per_url += PER_URL_TEMPLATE.substitute(template_keys) template_keys['PER_URL_TAGS'] = per_url action_flags = [] def add_action_flag(key, value): """Helper function for the OPTIONAL_ACTION_FLAGS parameter.""" action_flags.append( FLAG_TEMPLATE.substitute(key=key, value=value)) if self._critical: add_action_flag('deadline', 'now') if self._metadata_size: add_action_flag('MetadataSize', self._metadata_size) if self._metadata_signature: add_action_flag('MetadataSignatureRsa', self._metadata_signature) if self._public_key: add_action_flag('PublicKeyRsa', self._public_key) template_keys['OPTIONAL_ACTION_FLAGS'] = ( '\n '.join(action_flags)) return RESPONSE_TEMPLATE.substitute(template_keys) class Handler(BaseHTTPServer.BaseHTTPRequestHandler): """Inner class for handling HTTP requests.""" def do_POST(self): """Handler for POST requests.""" if self.path == '/update': # Parse the app id from the request to use in the response. content_len = int(self.headers.getheader('content-length')) request_string = self.rfile.read(content_len) request_dom = minidom.parseString(request_string) app = request_dom.firstChild.getElementsByTagName('app')[0] appid = app.getAttribute('appid') response = self.server._devserver.create_update_response(appid) self.send_response(200) self.send_header('Content-Type', 'application/xml') self.end_headers() self.wfile.write(response) else: self.send_response(500) def start(self): """Starts the server.""" self._httpd = BaseHTTPServer.HTTPServer(('127.0.0.1', 0), self.Handler) self._httpd._devserver = self # Serve HTTP requests in a dedicated thread. thread.start_new_thread(self._httpd.serve_forever, ()) self._port = self._httpd.socket.getsockname()[1] def stop(self): """Stops the server.""" self._httpd.shutdown() def get_port(self): """Returns the TCP port number the server is listening on.""" return self._port def get_update_url(self): """Returns the update url for this server.""" return 'http://127.0.0.1:%d/update' % self._port def set_image_params(self, image_url, image_size, sha256, metadata_size=None, metadata_signature=None, public_key=None, is_delta=False, critical=True, is_rollback=False, build='999999.0.0'): """ Sets the values to return in the Omaha response. Only the |image_url|, |image_size| and |sha256| parameters are mandatory. @param image_url: the url of the image to install. @param image_size: the size of the image to install. @param sha256: the sha256 hash of the image to install. @param metadata_size: the size of the metadata. @param metadata_signature: the signature of the metadata. @param public_key: the public key. @param is_delta: True if image is a delta, False if a full payload. @param critical: True for forced update, False for regular update. @param is_rollback: True if image is for rollback, False if not. @param build: the build number the response should claim to have. """ self._image_url = image_url self._image_size = image_size self._sha256 = sha256 self._metadata_size = metadata_size self._metadata_signature = metadata_signature self._public_key = public_key self._is_delta = is_delta self._critical = critical self._is_rollback = is_rollback self._build = build