1# coding: utf-8
2
3"""
4ASN.1 type classes for the online certificate status protocol (OCSP). Exports
5the following items:
6
7 - OCSPRequest()
8 - OCSPResponse()
9
10Other type classes are defined that help compose the types listed above.
11"""
12
13from __future__ import unicode_literals, division, absolute_import, print_function
14
15from ._errors import unwrap
16from .algos import DigestAlgorithm, SignedDigestAlgorithm
17from .core import (
18    Boolean,
19    Choice,
20    Enumerated,
21    GeneralizedTime,
22    IA5String,
23    Integer,
24    Null,
25    ObjectIdentifier,
26    OctetBitString,
27    OctetString,
28    ParsableOctetString,
29    Sequence,
30    SequenceOf,
31)
32from .crl import AuthorityInfoAccessSyntax, CRLReason
33from .keys import PublicKeyAlgorithm
34from .x509 import Certificate, GeneralName, GeneralNames, Name
35
36
37# The structures in this file are taken from https://tools.ietf.org/html/rfc6960
38
39
40class Version(Integer):
41    _map = {
42        0: 'v1'
43    }
44
45
46class CertId(Sequence):
47    _fields = [
48        ('hash_algorithm', DigestAlgorithm),
49        ('issuer_name_hash', OctetString),
50        ('issuer_key_hash', OctetString),
51        ('serial_number', Integer),
52    ]
53
54
55class ServiceLocator(Sequence):
56    _fields = [
57        ('issuer', Name),
58        ('locator', AuthorityInfoAccessSyntax),
59    ]
60
61
62class RequestExtensionId(ObjectIdentifier):
63    _map = {
64        '1.3.6.1.5.5.7.48.1.7': 'service_locator',
65    }
66
67
68class RequestExtension(Sequence):
69    _fields = [
70        ('extn_id', RequestExtensionId),
71        ('critical', Boolean, {'default': False}),
72        ('extn_value', ParsableOctetString),
73    ]
74
75    _oid_pair = ('extn_id', 'extn_value')
76    _oid_specs = {
77        'service_locator': ServiceLocator,
78    }
79
80
81class RequestExtensions(SequenceOf):
82    _child_spec = RequestExtension
83
84
85class Request(Sequence):
86    _fields = [
87        ('req_cert', CertId),
88        ('single_request_extensions', RequestExtensions, {'explicit': 0, 'optional': True}),
89    ]
90
91    _processed_extensions = False
92    _critical_extensions = None
93    _service_locator_value = None
94
95    def _set_extensions(self):
96        """
97        Sets common named extensions to private attributes and creates a list
98        of critical extensions
99        """
100
101        self._critical_extensions = set()
102
103        for extension in self['single_request_extensions']:
104            name = extension['extn_id'].native
105            attribute_name = '_%s_value' % name
106            if hasattr(self, attribute_name):
107                setattr(self, attribute_name, extension['extn_value'].parsed)
108            if extension['critical'].native:
109                self._critical_extensions.add(name)
110
111        self._processed_extensions = True
112
113    @property
114    def critical_extensions(self):
115        """
116        Returns a set of the names (or OID if not a known extension) of the
117        extensions marked as critical
118
119        :return:
120            A set of unicode strings
121        """
122
123        if not self._processed_extensions:
124            self._set_extensions()
125        return self._critical_extensions
126
127    @property
128    def service_locator_value(self):
129        """
130        This extension is used when communicating with an OCSP responder that
131        acts as a proxy for OCSP requests
132
133        :return:
134            None or a ServiceLocator object
135        """
136
137        if self._processed_extensions is False:
138            self._set_extensions()
139        return self._service_locator_value
140
141
142class Requests(SequenceOf):
143    _child_spec = Request
144
145
146class ResponseType(ObjectIdentifier):
147    _map = {
148        '1.3.6.1.5.5.7.48.1.1': 'basic_ocsp_response',
149    }
150
151
152class AcceptableResponses(SequenceOf):
153    _child_spec = ResponseType
154
155
156class PreferredSignatureAlgorithm(Sequence):
157    _fields = [
158        ('sig_identifier', SignedDigestAlgorithm),
159        ('cert_identifier', PublicKeyAlgorithm, {'optional': True}),
160    ]
161
162
163class PreferredSignatureAlgorithms(SequenceOf):
164    _child_spec = PreferredSignatureAlgorithm
165
166
167class TBSRequestExtensionId(ObjectIdentifier):
168    _map = {
169        '1.3.6.1.5.5.7.48.1.2': 'nonce',
170        '1.3.6.1.5.5.7.48.1.4': 'acceptable_responses',
171        '1.3.6.1.5.5.7.48.1.8': 'preferred_signature_algorithms',
172    }
173
174
175class TBSRequestExtension(Sequence):
176    _fields = [
177        ('extn_id', TBSRequestExtensionId),
178        ('critical', Boolean, {'default': False}),
179        ('extn_value', ParsableOctetString),
180    ]
181
182    _oid_pair = ('extn_id', 'extn_value')
183    _oid_specs = {
184        'nonce': OctetString,
185        'acceptable_responses': AcceptableResponses,
186        'preferred_signature_algorithms': PreferredSignatureAlgorithms,
187    }
188
189
190class TBSRequestExtensions(SequenceOf):
191    _child_spec = TBSRequestExtension
192
193
194class TBSRequest(Sequence):
195    _fields = [
196        ('version', Version, {'explicit': 0, 'default': 'v1'}),
197        ('requestor_name', GeneralName, {'explicit': 1, 'optional': True}),
198        ('request_list', Requests),
199        ('request_extensions', TBSRequestExtensions, {'explicit': 2, 'optional': True}),
200    ]
201
202
203class Certificates(SequenceOf):
204    _child_spec = Certificate
205
206
207class Signature(Sequence):
208    _fields = [
209        ('signature_algorithm', SignedDigestAlgorithm),
210        ('signature', OctetBitString),
211        ('certs', Certificates, {'explicit': 0, 'optional': True}),
212    ]
213
214
215class OCSPRequest(Sequence):
216    _fields = [
217        ('tbs_request', TBSRequest),
218        ('optional_signature', Signature, {'explicit': 0, 'optional': True}),
219    ]
220
221    _processed_extensions = False
222    _critical_extensions = None
223    _nonce_value = None
224    _acceptable_responses_value = None
225    _preferred_signature_algorithms_value = None
226
227    def _set_extensions(self):
228        """
229        Sets common named extensions to private attributes and creates a list
230        of critical extensions
231        """
232
233        self._critical_extensions = set()
234
235        for extension in self['tbs_request']['request_extensions']:
236            name = extension['extn_id'].native
237            attribute_name = '_%s_value' % name
238            if hasattr(self, attribute_name):
239                setattr(self, attribute_name, extension['extn_value'].parsed)
240            if extension['critical'].native:
241                self._critical_extensions.add(name)
242
243        self._processed_extensions = True
244
245    @property
246    def critical_extensions(self):
247        """
248        Returns a set of the names (or OID if not a known extension) of the
249        extensions marked as critical
250
251        :return:
252            A set of unicode strings
253        """
254
255        if not self._processed_extensions:
256            self._set_extensions()
257        return self._critical_extensions
258
259    @property
260    def nonce_value(self):
261        """
262        This extension is used to prevent replay attacks by including a unique,
263        random value with each request/response pair
264
265        :return:
266            None or an OctetString object
267        """
268
269        if self._processed_extensions is False:
270            self._set_extensions()
271        return self._nonce_value
272
273    @property
274    def acceptable_responses_value(self):
275        """
276        This extension is used to allow the client and server to communicate
277        with alternative response formats other than just basic_ocsp_response,
278        although no other formats are defined in the standard.
279
280        :return:
281            None or an AcceptableResponses object
282        """
283
284        if self._processed_extensions is False:
285            self._set_extensions()
286        return self._acceptable_responses_value
287
288    @property
289    def preferred_signature_algorithms_value(self):
290        """
291        This extension is used by the client to define what signature algorithms
292        are preferred, including both the hash algorithm and the public key
293        algorithm, with a level of detail down to even the public key algorithm
294        parameters, such as curve name.
295
296        :return:
297            None or a PreferredSignatureAlgorithms object
298        """
299
300        if self._processed_extensions is False:
301            self._set_extensions()
302        return self._preferred_signature_algorithms_value
303
304
305class OCSPResponseStatus(Enumerated):
306    _map = {
307        0: 'successful',
308        1: 'malformed_request',
309        2: 'internal_error',
310        3: 'try_later',
311        5: 'sign_required',
312        6: 'unauthorized',
313    }
314
315
316class ResponderId(Choice):
317    _alternatives = [
318        ('by_name', Name, {'explicit': 1}),
319        ('by_key', OctetString, {'explicit': 2}),
320    ]
321
322
323# Custom class to return a meaningful .native attribute from CertStatus()
324class StatusGood(Null):
325    def set(self, value):
326        """
327        Sets the value of the object
328
329        :param value:
330            None or 'good'
331        """
332
333        if value is not None and value != 'good' and not isinstance(value, Null):
334            raise ValueError(unwrap(
335                '''
336                value must be one of None, "good", not %s
337                ''',
338                repr(value)
339            ))
340
341        self.contents = b''
342
343    @property
344    def native(self):
345        return 'good'
346
347
348# Custom class to return a meaningful .native attribute from CertStatus()
349class StatusUnknown(Null):
350    def set(self, value):
351        """
352        Sets the value of the object
353
354        :param value:
355            None or 'unknown'
356        """
357
358        if value is not None and value != 'unknown' and not isinstance(value, Null):
359            raise ValueError(unwrap(
360                '''
361                value must be one of None, "unknown", not %s
362                ''',
363                repr(value)
364            ))
365
366        self.contents = b''
367
368    @property
369    def native(self):
370        return 'unknown'
371
372
373class RevokedInfo(Sequence):
374    _fields = [
375        ('revocation_time', GeneralizedTime),
376        ('revocation_reason', CRLReason, {'explicit': 0, 'optional': True}),
377    ]
378
379
380class CertStatus(Choice):
381    _alternatives = [
382        ('good', StatusGood, {'implicit': 0}),
383        ('revoked', RevokedInfo, {'implicit': 1}),
384        ('unknown', StatusUnknown, {'implicit': 2}),
385    ]
386
387
388class CrlId(Sequence):
389    _fields = [
390        ('crl_url', IA5String, {'explicit': 0, 'optional': True}),
391        ('crl_num', Integer, {'explicit': 1, 'optional': True}),
392        ('crl_time', GeneralizedTime, {'explicit': 2, 'optional': True}),
393    ]
394
395
396class SingleResponseExtensionId(ObjectIdentifier):
397    _map = {
398        '1.3.6.1.5.5.7.48.1.3': 'crl',
399        '1.3.6.1.5.5.7.48.1.6': 'archive_cutoff',
400        # These are CRLEntryExtension values from
401        # https://tools.ietf.org/html/rfc5280
402        '2.5.29.21': 'crl_reason',
403        '2.5.29.24': 'invalidity_date',
404        '2.5.29.29': 'certificate_issuer',
405        # https://tools.ietf.org/html/rfc6962.html#page-13
406        '1.3.6.1.4.1.11129.2.4.5': 'signed_certificate_timestamp_list',
407    }
408
409
410class SingleResponseExtension(Sequence):
411    _fields = [
412        ('extn_id', SingleResponseExtensionId),
413        ('critical', Boolean, {'default': False}),
414        ('extn_value', ParsableOctetString),
415    ]
416
417    _oid_pair = ('extn_id', 'extn_value')
418    _oid_specs = {
419        'crl': CrlId,
420        'archive_cutoff': GeneralizedTime,
421        'crl_reason': CRLReason,
422        'invalidity_date': GeneralizedTime,
423        'certificate_issuer': GeneralNames,
424        'signed_certificate_timestamp_list': OctetString,
425    }
426
427
428class SingleResponseExtensions(SequenceOf):
429    _child_spec = SingleResponseExtension
430
431
432class SingleResponse(Sequence):
433    _fields = [
434        ('cert_id', CertId),
435        ('cert_status', CertStatus),
436        ('this_update', GeneralizedTime),
437        ('next_update', GeneralizedTime, {'explicit': 0, 'optional': True}),
438        ('single_extensions', SingleResponseExtensions, {'explicit': 1, 'optional': True}),
439    ]
440
441    _processed_extensions = False
442    _critical_extensions = None
443    _crl_value = None
444    _archive_cutoff_value = None
445    _crl_reason_value = None
446    _invalidity_date_value = None
447    _certificate_issuer_value = None
448
449    def _set_extensions(self):
450        """
451        Sets common named extensions to private attributes and creates a list
452        of critical extensions
453        """
454
455        self._critical_extensions = set()
456
457        for extension in self['single_extensions']:
458            name = extension['extn_id'].native
459            attribute_name = '_%s_value' % name
460            if hasattr(self, attribute_name):
461                setattr(self, attribute_name, extension['extn_value'].parsed)
462            if extension['critical'].native:
463                self._critical_extensions.add(name)
464
465        self._processed_extensions = True
466
467    @property
468    def critical_extensions(self):
469        """
470        Returns a set of the names (or OID if not a known extension) of the
471        extensions marked as critical
472
473        :return:
474            A set of unicode strings
475        """
476
477        if not self._processed_extensions:
478            self._set_extensions()
479        return self._critical_extensions
480
481    @property
482    def crl_value(self):
483        """
484        This extension is used to locate the CRL that a certificate's revocation
485        is contained within.
486
487        :return:
488            None or a CrlId object
489        """
490
491        if self._processed_extensions is False:
492            self._set_extensions()
493        return self._crl_value
494
495    @property
496    def archive_cutoff_value(self):
497        """
498        This extension is used to indicate the date at which an archived
499        (historical) certificate status entry will no longer be available.
500
501        :return:
502            None or a GeneralizedTime object
503        """
504
505        if self._processed_extensions is False:
506            self._set_extensions()
507        return self._archive_cutoff_value
508
509    @property
510    def crl_reason_value(self):
511        """
512        This extension indicates the reason that a certificate was revoked.
513
514        :return:
515            None or a CRLReason object
516        """
517
518        if self._processed_extensions is False:
519            self._set_extensions()
520        return self._crl_reason_value
521
522    @property
523    def invalidity_date_value(self):
524        """
525        This extension indicates the suspected date/time the private key was
526        compromised or the certificate became invalid. This would usually be
527        before the revocation date, which is when the CA processed the
528        revocation.
529
530        :return:
531            None or a GeneralizedTime object
532        """
533
534        if self._processed_extensions is False:
535            self._set_extensions()
536        return self._invalidity_date_value
537
538    @property
539    def certificate_issuer_value(self):
540        """
541        This extension indicates the issuer of the certificate in question.
542
543        :return:
544            None or an x509.GeneralNames object
545        """
546
547        if self._processed_extensions is False:
548            self._set_extensions()
549        return self._certificate_issuer_value
550
551
552class Responses(SequenceOf):
553    _child_spec = SingleResponse
554
555
556class ResponseDataExtensionId(ObjectIdentifier):
557    _map = {
558        '1.3.6.1.5.5.7.48.1.2': 'nonce',
559        '1.3.6.1.5.5.7.48.1.9': 'extended_revoke',
560    }
561
562
563class ResponseDataExtension(Sequence):
564    _fields = [
565        ('extn_id', ResponseDataExtensionId),
566        ('critical', Boolean, {'default': False}),
567        ('extn_value', ParsableOctetString),
568    ]
569
570    _oid_pair = ('extn_id', 'extn_value')
571    _oid_specs = {
572        'nonce': OctetString,
573        'extended_revoke': Null,
574    }
575
576
577class ResponseDataExtensions(SequenceOf):
578    _child_spec = ResponseDataExtension
579
580
581class ResponseData(Sequence):
582    _fields = [
583        ('version', Version, {'explicit': 0, 'default': 'v1'}),
584        ('responder_id', ResponderId),
585        ('produced_at', GeneralizedTime),
586        ('responses', Responses),
587        ('response_extensions', ResponseDataExtensions, {'explicit': 1, 'optional': True}),
588    ]
589
590
591class BasicOCSPResponse(Sequence):
592    _fields = [
593        ('tbs_response_data', ResponseData),
594        ('signature_algorithm', SignedDigestAlgorithm),
595        ('signature', OctetBitString),
596        ('certs', Certificates, {'explicit': 0, 'optional': True}),
597    ]
598
599
600class ResponseBytes(Sequence):
601    _fields = [
602        ('response_type', ResponseType),
603        ('response', ParsableOctetString),
604    ]
605
606    _oid_pair = ('response_type', 'response')
607    _oid_specs = {
608        'basic_ocsp_response': BasicOCSPResponse,
609    }
610
611
612class OCSPResponse(Sequence):
613    _fields = [
614        ('response_status', OCSPResponseStatus),
615        ('response_bytes', ResponseBytes, {'explicit': 0, 'optional': True}),
616    ]
617
618    _processed_extensions = False
619    _critical_extensions = None
620    _nonce_value = None
621    _extended_revoke_value = None
622
623    def _set_extensions(self):
624        """
625        Sets common named extensions to private attributes and creates a list
626        of critical extensions
627        """
628
629        self._critical_extensions = set()
630
631        for extension in self['response_bytes']['response'].parsed['tbs_response_data']['response_extensions']:
632            name = extension['extn_id'].native
633            attribute_name = '_%s_value' % name
634            if hasattr(self, attribute_name):
635                setattr(self, attribute_name, extension['extn_value'].parsed)
636            if extension['critical'].native:
637                self._critical_extensions.add(name)
638
639        self._processed_extensions = True
640
641    @property
642    def critical_extensions(self):
643        """
644        Returns a set of the names (or OID if not a known extension) of the
645        extensions marked as critical
646
647        :return:
648            A set of unicode strings
649        """
650
651        if not self._processed_extensions:
652            self._set_extensions()
653        return self._critical_extensions
654
655    @property
656    def nonce_value(self):
657        """
658        This extension is used to prevent replay attacks on the request/response
659        exchange
660
661        :return:
662            None or an OctetString object
663        """
664
665        if self._processed_extensions is False:
666            self._set_extensions()
667        return self._nonce_value
668
669    @property
670    def extended_revoke_value(self):
671        """
672        This extension is used to signal that the responder will return a
673        "revoked" status for non-issued certificates.
674
675        :return:
676            None or a Null object (if present)
677        """
678
679        if self._processed_extensions is False:
680            self._set_extensions()
681        return self._extended_revoke_value
682
683    @property
684    def basic_ocsp_response(self):
685        """
686        A shortcut into the BasicOCSPResponse sequence
687
688        :return:
689            None or an asn1crypto.ocsp.BasicOCSPResponse object
690        """
691
692        return self['response_bytes']['response'].parsed
693
694    @property
695    def response_data(self):
696        """
697        A shortcut into the parsed, ResponseData sequence
698
699        :return:
700            None or an asn1crypto.ocsp.ResponseData object
701        """
702
703        return self['response_bytes']['response'].parsed['tbs_response_data']
704