1# Copyright 2014 Google Inc. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""A service account credentials class.
16
17This credentials class is implemented on top of rsa library.
18"""
19
20import base64
21import json
22import six
23import time
24
25from pyasn1.codec.ber import decoder
26from pyasn1_modules.rfc5208 import PrivateKeyInfo
27import rsa
28
29from oauth2client import GOOGLE_REVOKE_URI
30from oauth2client import GOOGLE_TOKEN_URI
31from oauth2client import util
32from oauth2client.client import AssertionCredentials
33
34
35class _ServiceAccountCredentials(AssertionCredentials):
36  """Class representing a service account (signed JWT) credential."""
37
38  MAX_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
39
40  def __init__(self, service_account_id, service_account_email, private_key_id,
41               private_key_pkcs8_text, scopes, user_agent=None,
42               token_uri=GOOGLE_TOKEN_URI, revoke_uri=GOOGLE_REVOKE_URI,
43               **kwargs):
44
45    super(_ServiceAccountCredentials, self).__init__(
46        None, user_agent=user_agent, token_uri=token_uri, revoke_uri=revoke_uri)
47
48    self._service_account_id = service_account_id
49    self._service_account_email = service_account_email
50    self._private_key_id = private_key_id
51    self._private_key = _get_private_key(private_key_pkcs8_text)
52    self._private_key_pkcs8_text = private_key_pkcs8_text
53    self._scopes = util.scopes_to_string(scopes)
54    self._user_agent = user_agent
55    self._token_uri = token_uri
56    self._revoke_uri = revoke_uri
57    self._kwargs = kwargs
58
59  def _generate_assertion(self):
60    """Generate the assertion that will be used in the request."""
61
62    header = {
63        'alg': 'RS256',
64        'typ': 'JWT',
65        'kid': self._private_key_id
66    }
67
68    now = int(time.time())
69    payload = {
70        'aud': self._token_uri,
71        'scope': self._scopes,
72        'iat': now,
73        'exp': now + _ServiceAccountCredentials.MAX_TOKEN_LIFETIME_SECS,
74        'iss': self._service_account_email
75    }
76    payload.update(self._kwargs)
77
78    assertion_input = (_urlsafe_b64encode(header) + b'.' +
79                       _urlsafe_b64encode(payload))
80
81    # Sign the assertion.
82    rsa_bytes = rsa.pkcs1.sign(assertion_input, self._private_key, 'SHA-256')
83    signature = base64.urlsafe_b64encode(rsa_bytes).rstrip(b'=')
84
85    return assertion_input + b'.' + signature
86
87  def sign_blob(self, blob):
88    # Ensure that it is bytes
89    try:
90      blob = blob.encode('utf-8')
91    except AttributeError:
92      pass
93    return (self._private_key_id,
94            rsa.pkcs1.sign(blob, self._private_key, 'SHA-256'))
95
96  @property
97  def service_account_email(self):
98    return self._service_account_email
99
100  @property
101  def serialization_data(self):
102    return {
103        'type': 'service_account',
104        'client_id': self._service_account_id,
105        'client_email': self._service_account_email,
106        'private_key_id': self._private_key_id,
107        'private_key': self._private_key_pkcs8_text
108    }
109
110  def create_scoped_required(self):
111    return not self._scopes
112
113  def create_scoped(self, scopes):
114    return _ServiceAccountCredentials(self._service_account_id,
115                                      self._service_account_email,
116                                      self._private_key_id,
117                                      self._private_key_pkcs8_text,
118                                      scopes,
119                                      user_agent=self._user_agent,
120                                      token_uri=self._token_uri,
121                                      revoke_uri=self._revoke_uri,
122                                      **self._kwargs)
123
124
125def _urlsafe_b64encode(data):
126  return base64.urlsafe_b64encode(
127      json.dumps(data, separators=(',', ':')).encode('UTF-8')).rstrip(b'=')
128
129
130def _get_private_key(private_key_pkcs8_text):
131  """Get an RSA private key object from a pkcs8 representation."""
132
133  if not isinstance(private_key_pkcs8_text, six.binary_type):
134    private_key_pkcs8_text = private_key_pkcs8_text.encode('ascii')
135  der = rsa.pem.load_pem(private_key_pkcs8_text, 'PRIVATE KEY')
136  asn1_private_key, _ = decoder.decode(der, asn1Spec=PrivateKeyInfo())
137  return rsa.PrivateKey.load_pkcs1(
138      asn1_private_key.getComponentByName('privateKey').asOctets(),
139      format='DER')
140