1# Copyright 2014 Google Inc. All rights reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""A service account credentials class. 16 17This credentials class is implemented on top of rsa library. 18""" 19 20import base64 21import json 22import six 23import time 24 25from pyasn1.codec.ber import decoder 26from pyasn1_modules.rfc5208 import PrivateKeyInfo 27import rsa 28 29from oauth2client import GOOGLE_REVOKE_URI 30from oauth2client import GOOGLE_TOKEN_URI 31from oauth2client import util 32from oauth2client.client import AssertionCredentials 33 34 35class _ServiceAccountCredentials(AssertionCredentials): 36 """Class representing a service account (signed JWT) credential.""" 37 38 MAX_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 39 40 def __init__(self, service_account_id, service_account_email, private_key_id, 41 private_key_pkcs8_text, scopes, user_agent=None, 42 token_uri=GOOGLE_TOKEN_URI, revoke_uri=GOOGLE_REVOKE_URI, 43 **kwargs): 44 45 super(_ServiceAccountCredentials, self).__init__( 46 None, user_agent=user_agent, token_uri=token_uri, revoke_uri=revoke_uri) 47 48 self._service_account_id = service_account_id 49 self._service_account_email = service_account_email 50 self._private_key_id = private_key_id 51 self._private_key = _get_private_key(private_key_pkcs8_text) 52 self._private_key_pkcs8_text = private_key_pkcs8_text 53 self._scopes = util.scopes_to_string(scopes) 54 self._user_agent = user_agent 55 self._token_uri = token_uri 56 self._revoke_uri = revoke_uri 57 self._kwargs = kwargs 58 59 def _generate_assertion(self): 60 """Generate the assertion that will be used in the request.""" 61 62 header = { 63 'alg': 'RS256', 64 'typ': 'JWT', 65 'kid': self._private_key_id 66 } 67 68 now = int(time.time()) 69 payload = { 70 'aud': self._token_uri, 71 'scope': self._scopes, 72 'iat': now, 73 'exp': now + _ServiceAccountCredentials.MAX_TOKEN_LIFETIME_SECS, 74 'iss': self._service_account_email 75 } 76 payload.update(self._kwargs) 77 78 assertion_input = (_urlsafe_b64encode(header) + b'.' + 79 _urlsafe_b64encode(payload)) 80 81 # Sign the assertion. 82 rsa_bytes = rsa.pkcs1.sign(assertion_input, self._private_key, 'SHA-256') 83 signature = base64.urlsafe_b64encode(rsa_bytes).rstrip(b'=') 84 85 return assertion_input + b'.' + signature 86 87 def sign_blob(self, blob): 88 # Ensure that it is bytes 89 try: 90 blob = blob.encode('utf-8') 91 except AttributeError: 92 pass 93 return (self._private_key_id, 94 rsa.pkcs1.sign(blob, self._private_key, 'SHA-256')) 95 96 @property 97 def service_account_email(self): 98 return self._service_account_email 99 100 @property 101 def serialization_data(self): 102 return { 103 'type': 'service_account', 104 'client_id': self._service_account_id, 105 'client_email': self._service_account_email, 106 'private_key_id': self._private_key_id, 107 'private_key': self._private_key_pkcs8_text 108 } 109 110 def create_scoped_required(self): 111 return not self._scopes 112 113 def create_scoped(self, scopes): 114 return _ServiceAccountCredentials(self._service_account_id, 115 self._service_account_email, 116 self._private_key_id, 117 self._private_key_pkcs8_text, 118 scopes, 119 user_agent=self._user_agent, 120 token_uri=self._token_uri, 121 revoke_uri=self._revoke_uri, 122 **self._kwargs) 123 124 125def _urlsafe_b64encode(data): 126 return base64.urlsafe_b64encode( 127 json.dumps(data, separators=(',', ':')).encode('UTF-8')).rstrip(b'=') 128 129 130def _get_private_key(private_key_pkcs8_text): 131 """Get an RSA private key object from a pkcs8 representation.""" 132 133 if not isinstance(private_key_pkcs8_text, six.binary_type): 134 private_key_pkcs8_text = private_key_pkcs8_text.encode('ascii') 135 der = rsa.pem.load_pem(private_key_pkcs8_text, 'PRIVATE KEY') 136 asn1_private_key, _ = decoder.decode(der, asn1Spec=PrivateKeyInfo()) 137 return rsa.PrivateKey.load_pkcs1( 138 asn1_private_key.getComponentByName('privateKey').asOctets(), 139 format='DER') 140