1import unittest
2from test.test_email import TestEmailBase, parameterize
3import textwrap
4from email import policy
5from email.message import EmailMessage
6from email.contentmanager import ContentManager, raw_data_manager
7
8
9@parameterize
10class TestContentManager(TestEmailBase):
11
12    policy = policy.default
13    message = EmailMessage
14
15    get_key_params = {
16        'full_type':        (1, 'text/plain',),
17        'maintype_only':    (2, 'text',),
18        'null_key':         (3, '',),
19        }
20
21    def get_key_as_get_content_key(self, order, key):
22        def foo_getter(msg, foo=None):
23            bar = msg['X-Bar-Header']
24            return foo, bar
25        cm = ContentManager()
26        cm.add_get_handler(key, foo_getter)
27        m = self._make_message()
28        m['Content-Type'] = 'text/plain'
29        m['X-Bar-Header'] = 'foo'
30        self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo'))
31
32    def get_key_as_get_content_key_order(self, order, key):
33        def bar_getter(msg):
34            return msg['X-Bar-Header']
35        def foo_getter(msg):
36            return msg['X-Foo-Header']
37        cm = ContentManager()
38        cm.add_get_handler(key, foo_getter)
39        for precedence, key in self.get_key_params.values():
40            if precedence > order:
41                cm.add_get_handler(key, bar_getter)
42        m = self._make_message()
43        m['Content-Type'] = 'text/plain'
44        m['X-Bar-Header'] = 'bar'
45        m['X-Foo-Header'] = 'foo'
46        self.assertEqual(cm.get_content(m), ('foo'))
47
48    def test_get_content_raises_if_unknown_mimetype_and_no_default(self):
49        cm = ContentManager()
50        m = self._make_message()
51        m['Content-Type'] = 'text/plain'
52        with self.assertRaisesRegex(KeyError, 'text/plain'):
53            cm.get_content(m)
54
55    class BaseThing(str):
56        pass
57    baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing'
58    class Thing(BaseThing):
59        pass
60    testobject_full_path = __name__ + '.' + 'TestContentManager.Thing'
61
62    set_key_params = {
63        'type':             (0,  Thing,),
64        'full_path':        (1,  testobject_full_path,),
65        'qualname':         (2,  'TestContentManager.Thing',),
66        'name':             (3,  'Thing',),
67        'base_type':        (4,  BaseThing,),
68        'base_full_path':   (5,  baseobject_full_path,),
69        'base_qualname':    (6,  'TestContentManager.BaseThing',),
70        'base_name':        (7,  'BaseThing',),
71        'str_type':         (8,  str,),
72        'str_full_path':    (9,  'builtins.str',),
73        'str_name':         (10, 'str',),   # str name and qualname are the same
74        'null_key':         (11, None,),
75        }
76
77    def set_key_as_set_content_key(self, order, key):
78        def foo_setter(msg, obj, foo=None):
79            msg['X-Foo-Header'] = foo
80            msg.set_payload(obj)
81        cm = ContentManager()
82        cm.add_set_handler(key, foo_setter)
83        m = self._make_message()
84        msg_obj = self.Thing()
85        cm.set_content(m, msg_obj, foo='bar')
86        self.assertEqual(m['X-Foo-Header'], 'bar')
87        self.assertEqual(m.get_payload(), msg_obj)
88
89    def set_key_as_set_content_key_order(self, order, key):
90        def foo_setter(msg, obj):
91            msg['X-FooBar-Header'] = 'foo'
92            msg.set_payload(obj)
93        def bar_setter(msg, obj):
94            msg['X-FooBar-Header'] = 'bar'
95        cm = ContentManager()
96        cm.add_set_handler(key, foo_setter)
97        for precedence, key in self.get_key_params.values():
98            if precedence > order:
99                cm.add_set_handler(key, bar_setter)
100        m = self._make_message()
101        msg_obj = self.Thing()
102        cm.set_content(m, msg_obj)
103        self.assertEqual(m['X-FooBar-Header'], 'foo')
104        self.assertEqual(m.get_payload(), msg_obj)
105
106    def test_set_content_raises_if_unknown_type_and_no_default(self):
107        cm = ContentManager()
108        m = self._make_message()
109        msg_obj = self.Thing()
110        with self.assertRaisesRegex(KeyError, self.testobject_full_path):
111            cm.set_content(m, msg_obj)
112
113    def test_set_content_raises_if_called_on_multipart(self):
114        cm = ContentManager()
115        m = self._make_message()
116        m['Content-Type'] = 'multipart/foo'
117        with self.assertRaises(TypeError):
118            cm.set_content(m, 'test')
119
120    def test_set_content_calls_clear_content(self):
121        m = self._make_message()
122        m['Content-Foo'] = 'bar'
123        m['Content-Type'] = 'text/html'
124        m['To'] = 'test'
125        m.set_payload('abc')
126        cm = ContentManager()
127        cm.add_set_handler(str, lambda *args, **kw: None)
128        m.set_content('xyz', content_manager=cm)
129        self.assertIsNone(m['Content-Foo'])
130        self.assertIsNone(m['Content-Type'])
131        self.assertEqual(m['To'], 'test')
132        self.assertIsNone(m.get_payload())
133
134
135@parameterize
136class TestRawDataManager(TestEmailBase):
137    # Note: these tests are dependent on the order in which headers are added
138    # to the message objects by the code.  There's no defined ordering in
139    # RFC5322/MIME, so this makes the tests more fragile than the standards
140    # require.  However, if the header order changes it is best to understand
141    # *why*, and make sure it isn't a subtle bug in whatever change was
142    # applied.
143
144    policy = policy.default.clone(max_line_length=60,
145                                  content_manager=raw_data_manager)
146    message = EmailMessage
147
148    def test_get_text_plain(self):
149        m = self._str_msg(textwrap.dedent("""\
150            Content-Type: text/plain
151
152            Basic text.
153            """))
154        self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n")
155
156    def test_get_text_html(self):
157        m = self._str_msg(textwrap.dedent("""\
158            Content-Type: text/html
159
160            <p>Basic text.</p>
161            """))
162        self.assertEqual(raw_data_manager.get_content(m),
163                         "<p>Basic text.</p>\n")
164
165    def test_get_text_plain_latin1(self):
166        m = self._bytes_msg(textwrap.dedent("""\
167            Content-Type: text/plain; charset=latin1
168
169            Basìc tëxt.
170            """).encode('latin1'))
171        self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
172
173    def test_get_text_plain_latin1_quoted_printable(self):
174        m = self._str_msg(textwrap.dedent("""\
175            Content-Type: text/plain; charset="latin-1"
176            Content-Transfer-Encoding: quoted-printable
177
178            Bas=ECc t=EBxt.
179            """))
180        self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
181
182    def test_get_text_plain_utf8_base64(self):
183        m = self._str_msg(textwrap.dedent("""\
184            Content-Type: text/plain; charset="utf8"
185            Content-Transfer-Encoding: base64
186
187            QmFzw6xjIHTDq3h0Lgo=
188            """))
189        self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
190
191    def test_get_text_plain_bad_utf8_quoted_printable(self):
192        m = self._str_msg(textwrap.dedent("""\
193            Content-Type: text/plain; charset="utf8"
194            Content-Transfer-Encoding: quoted-printable
195
196            Bas=c3=acc t=c3=abxt=fd.
197            """))
198        self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt�.\n")
199
200    def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self):
201        m = self._str_msg(textwrap.dedent("""\
202            Content-Type: text/plain; charset="utf8"
203            Content-Transfer-Encoding: quoted-printable
204
205            Bas=c3=acc t=c3=abxt=fd.
206            """))
207        self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
208                         "Basìc tëxt.\n")
209
210    def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self):
211        m = self._str_msg(textwrap.dedent("""\
212            Content-Type: text/plain; charset="utf8"
213            Content-Transfer-Encoding: base64
214
215            QmFzw6xjIHTDq3h0Lgo\xFF=
216            """))
217        self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
218                         "Basìc tëxt.\n")
219
220    def test_get_text_invalid_keyword(self):
221        m = self._str_msg(textwrap.dedent("""\
222            Content-Type: text/plain
223
224            Basic text.
225            """))
226        with self.assertRaises(TypeError):
227            raw_data_manager.get_content(m, foo='ignore')
228
229    def test_get_non_text(self):
230        template = textwrap.dedent("""\
231            Content-Type: {}
232            Content-Transfer-Encoding: base64
233
234            Ym9ndXMgZGF0YQ==
235            """)
236        for maintype in 'audio image video application'.split():
237            with self.subTest(maintype=maintype):
238                m = self._str_msg(template.format(maintype+'/foo'))
239                self.assertEqual(raw_data_manager.get_content(m), b"bogus data")
240
241    def test_get_non_text_invalid_keyword(self):
242        m = self._str_msg(textwrap.dedent("""\
243            Content-Type: image/jpg
244            Content-Transfer-Encoding: base64
245
246            Ym9ndXMgZGF0YQ==
247            """))
248        with self.assertRaises(TypeError):
249            raw_data_manager.get_content(m, errors='ignore')
250
251    def test_get_raises_on_multipart(self):
252        m = self._str_msg(textwrap.dedent("""\
253            Content-Type: multipart/mixed; boundary="==="
254
255            --===
256            --===--
257            """))
258        with self.assertRaises(KeyError):
259            raw_data_manager.get_content(m)
260
261    def test_get_message_rfc822_and_external_body(self):
262        template = textwrap.dedent("""\
263            Content-Type: message/{}
264
265            To: foo@example.com
266            From: bar@example.com
267            Subject: example
268
269            an example message
270            """)
271        for subtype in 'rfc822 external-body'.split():
272            with self.subTest(subtype=subtype):
273                m = self._str_msg(template.format(subtype))
274                sub_msg = raw_data_manager.get_content(m)
275                self.assertIsInstance(sub_msg, self.message)
276                self.assertEqual(raw_data_manager.get_content(sub_msg),
277                                 "an example message\n")
278                self.assertEqual(sub_msg['to'], 'foo@example.com')
279                self.assertEqual(sub_msg['from'].addresses[0].username, 'bar')
280
281    def test_get_message_non_rfc822_or_external_body_yields_bytes(self):
282        m = self._str_msg(textwrap.dedent("""\
283            Content-Type: message/partial
284
285            To: foo@example.com
286            From: bar@example.com
287            Subject: example
288
289            The real body is in another message.
290            """))
291        self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex')
292
293    def test_set_text_plain(self):
294        m = self._make_message()
295        content = "Simple message.\n"
296        raw_data_manager.set_content(m, content)
297        self.assertEqual(str(m), textwrap.dedent("""\
298            Content-Type: text/plain; charset="utf-8"
299            Content-Transfer-Encoding: 7bit
300
301            Simple message.
302            """))
303        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
304        self.assertEqual(m.get_content(), content)
305
306    def test_set_text_html(self):
307        m = self._make_message()
308        content = "<p>Simple message.</p>\n"
309        raw_data_manager.set_content(m, content, subtype='html')
310        self.assertEqual(str(m), textwrap.dedent("""\
311            Content-Type: text/html; charset="utf-8"
312            Content-Transfer-Encoding: 7bit
313
314            <p>Simple message.</p>
315            """))
316        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
317        self.assertEqual(m.get_content(), content)
318
319    def test_set_text_charset_latin_1(self):
320        m = self._make_message()
321        content = "Simple message.\n"
322        raw_data_manager.set_content(m, content, charset='latin-1')
323        self.assertEqual(str(m), textwrap.dedent("""\
324            Content-Type: text/plain; charset="iso-8859-1"
325            Content-Transfer-Encoding: 7bit
326
327            Simple message.
328            """))
329        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
330        self.assertEqual(m.get_content(), content)
331
332    def test_set_text_short_line_minimal_non_ascii_heuristics(self):
333        m = self._make_message()
334        content = "et là il est monté sur moi et il commence à m'éto.\n"
335        raw_data_manager.set_content(m, content)
336        self.assertEqual(bytes(m), textwrap.dedent("""\
337            Content-Type: text/plain; charset="utf-8"
338            Content-Transfer-Encoding: 8bit
339
340            et là il est monté sur moi et il commence à m'éto.
341            """).encode('utf-8'))
342        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
343        self.assertEqual(m.get_content(), content)
344
345    def test_set_text_long_line_minimal_non_ascii_heuristics(self):
346        m = self._make_message()
347        content = ("j'ai un problème de python. il est sorti de son"
348                   " vivarium.  et là il est monté sur moi et il commence"
349                   " à m'éto.\n")
350        raw_data_manager.set_content(m, content)
351        self.assertEqual(bytes(m), textwrap.dedent("""\
352            Content-Type: text/plain; charset="utf-8"
353            Content-Transfer-Encoding: quoted-printable
354
355            j'ai un probl=C3=A8me de python. il est sorti de son vivari=
356            um.  et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
357            =C3=A0 m'=C3=A9to.
358            """).encode('utf-8'))
359        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
360        self.assertEqual(m.get_content(), content)
361
362    def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self):
363        m = self._make_message()
364        content = '\n'*10 + (
365                  "j'ai un problème de python. il est sorti de son"
366                  " vivarium.  et là il est monté sur moi et il commence"
367                  " à m'éto.\n")
368        raw_data_manager.set_content(m, content)
369        self.assertEqual(bytes(m), textwrap.dedent("""\
370            Content-Type: text/plain; charset="utf-8"
371            Content-Transfer-Encoding: quoted-printable
372            """ + '\n'*10 + """
373            j'ai un probl=C3=A8me de python. il est sorti de son vivari=
374            um.  et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
375            =C3=A0 m'=C3=A9to.
376            """).encode('utf-8'))
377        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
378        self.assertEqual(m.get_content(), content)
379
380    def test_set_text_maximal_non_ascii_heuristics(self):
381        m = self._make_message()
382        content = "áàäéèęöő.\n"
383        raw_data_manager.set_content(m, content)
384        self.assertEqual(bytes(m), textwrap.dedent("""\
385            Content-Type: text/plain; charset="utf-8"
386            Content-Transfer-Encoding: 8bit
387
388            áàäéèęöő.
389            """).encode('utf-8'))
390        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
391        self.assertEqual(m.get_content(), content)
392
393    def test_set_text_11_lines_maximal_non_ascii_heuristics(self):
394        m = self._make_message()
395        content = '\n'*10 + "áàäéèęöő.\n"
396        raw_data_manager.set_content(m, content)
397        self.assertEqual(bytes(m), textwrap.dedent("""\
398            Content-Type: text/plain; charset="utf-8"
399            Content-Transfer-Encoding: 8bit
400            """ + '\n'*10 + """
401            áàäéèęöő.
402            """).encode('utf-8'))
403        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
404        self.assertEqual(m.get_content(), content)
405
406    def test_set_text_long_line_maximal_non_ascii_heuristics(self):
407        m = self._make_message()
408        content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
409                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
410                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
411        raw_data_manager.set_content(m, content)
412        self.assertEqual(bytes(m), textwrap.dedent("""\
413            Content-Type: text/plain; charset="utf-8"
414            Content-Transfer-Encoding: base64
415
416            w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD
417            tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo
418            xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD
419            qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg
420            w6TDqcOoxJnDtsWRLgo=
421            """).encode('utf-8'))
422        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
423        self.assertEqual(m.get_content(), content)
424
425    def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self):
426        # Yes, it chooses "wrong" here.  It's a heuristic.  So this result
427        # could change if we come up with a better heuristic.
428        m = self._make_message()
429        content = ('\n'*10 +
430                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
431                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
432                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
433        raw_data_manager.set_content(m, "\n"*10 +
434                                        "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
435                                        "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
436                                        "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
437        self.assertEqual(bytes(m), textwrap.dedent("""\
438            Content-Type: text/plain; charset="utf-8"
439            Content-Transfer-Encoding: quoted-printable
440            """ + '\n'*10 + """
441            =C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=
442            =A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=
443            =C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=
444            =A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=
445            =C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=
446            =91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=
447            =C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=
448            =A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=
449            =C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=
450            =99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=
451            =C5=91.
452            """).encode('utf-8'))
453        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
454        self.assertEqual(m.get_content(), content)
455
456    def test_set_text_non_ascii_with_cte_7bit_raises(self):
457        m = self._make_message()
458        with self.assertRaises(UnicodeError):
459            raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit')
460
461    def test_set_text_non_ascii_with_charset_ascii_raises(self):
462        m = self._make_message()
463        with self.assertRaises(UnicodeError):
464            raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii')
465
466    def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self):
467        m = self._make_message()
468        with self.assertRaises(UnicodeError):
469            raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii')
470
471    def test_set_message(self):
472        m = self._make_message()
473        m['Subject'] = "Forwarded message"
474        content = self._make_message()
475        content['To'] = 'python@vivarium.org'
476        content['From'] = 'police@monty.org'
477        content['Subject'] = "get back in your box"
478        content.set_content("Or face the comfy chair.")
479        raw_data_manager.set_content(m, content)
480        self.assertEqual(str(m), textwrap.dedent("""\
481            Subject: Forwarded message
482            Content-Type: message/rfc822
483            Content-Transfer-Encoding: 8bit
484
485            To: python@vivarium.org
486            From: police@monty.org
487            Subject: get back in your box
488            Content-Type: text/plain; charset="utf-8"
489            Content-Transfer-Encoding: 7bit
490            MIME-Version: 1.0
491
492            Or face the comfy chair.
493            """))
494        payload = m.get_payload(0)
495        self.assertIsInstance(payload, self.message)
496        self.assertEqual(str(payload), str(content))
497        self.assertIsInstance(m.get_content(), self.message)
498        self.assertEqual(str(m.get_content()), str(content))
499
500    def test_set_message_with_non_ascii_and_coercion_to_7bit(self):
501        m = self._make_message()
502        m['Subject'] = "Escape report"
503        content = self._make_message()
504        content['To'] = 'police@monty.org'
505        content['From'] = 'victim@monty.org'
506        content['Subject'] = "Help"
507        content.set_content("j'ai un problème de python. il est sorti de son"
508                            " vivarium.")
509        raw_data_manager.set_content(m, content)
510        self.assertEqual(bytes(m), textwrap.dedent("""\
511            Subject: Escape report
512            Content-Type: message/rfc822
513            Content-Transfer-Encoding: 8bit
514
515            To: police@monty.org
516            From: victim@monty.org
517            Subject: Help
518            Content-Type: text/plain; charset="utf-8"
519            Content-Transfer-Encoding: 8bit
520            MIME-Version: 1.0
521
522            j'ai un problème de python. il est sorti de son vivarium.
523            """).encode('utf-8'))
524        # The choice of base64 for the body encoding is because generator
525        # doesn't bother with heuristics and uses it unconditionally for utf-8
526        # text.
527        # XXX: the first cte should be 7bit, too...that's a generator bug.
528        # XXX: the line length in the body also looks like a generator bug.
529        self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length),
530                         textwrap.dedent("""\
531            Subject: Escape report
532            Content-Type: message/rfc822
533            Content-Transfer-Encoding: 8bit
534
535            To: police@monty.org
536            From: victim@monty.org
537            Subject: Help
538            Content-Type: text/plain; charset="utf-8"
539            Content-Transfer-Encoding: base64
540            MIME-Version: 1.0
541
542            aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt
543            Lgo=
544            """))
545        self.assertIsInstance(m.get_content(), self.message)
546        self.assertEqual(str(m.get_content()), str(content))
547
548    def test_set_message_invalid_cte_raises(self):
549        m = self._make_message()
550        content = self._make_message()
551        for cte in 'quoted-printable base64'.split():
552            for subtype in 'rfc822 external-body'.split():
553                with self.subTest(cte=cte, subtype=subtype):
554                    with self.assertRaises(ValueError) as ar:
555                        m.set_content(content, subtype, cte=cte)
556                    exc = str(ar.exception)
557                    self.assertIn(cte, exc)
558                    self.assertIn(subtype, exc)
559        subtype = 'external-body'
560        for cte in '8bit binary'.split():
561            with self.subTest(cte=cte, subtype=subtype):
562                with self.assertRaises(ValueError) as ar:
563                    m.set_content(content, subtype, cte=cte)
564                exc = str(ar.exception)
565                self.assertIn(cte, exc)
566                self.assertIn(subtype, exc)
567
568    def test_set_image_jpg(self):
569        for content in (b"bogus content",
570                        bytearray(b"bogus content"),
571                        memoryview(b"bogus content")):
572            with self.subTest(content=content):
573                m = self._make_message()
574                raw_data_manager.set_content(m, content, 'image', 'jpeg')
575                self.assertEqual(str(m), textwrap.dedent("""\
576                    Content-Type: image/jpeg
577                    Content-Transfer-Encoding: base64
578
579                    Ym9ndXMgY29udGVudA==
580                    """))
581                self.assertEqual(m.get_payload(decode=True), content)
582                self.assertEqual(m.get_content(), content)
583
584    def test_set_audio_aif_with_quoted_printable_cte(self):
585        # Why you would use qp, I don't know, but it is technically supported.
586        # XXX: the incorrect line length is because binascii.b2a_qp doesn't
587        # support a line length parameter, but we must use it to get newline
588        # encoding.
589        # XXX: what about that lack of tailing newline?  Do we actually handle
590        # that correctly in all cases?  That is, if the *source* has an
591        # unencoded newline, do we add an extra newline to the returned payload
592        # or not?  And can that actually be disambiguated based on the RFC?
593        m = self._make_message()
594        content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
595        m.set_content(content, 'audio', 'aif', cte='quoted-printable')
596        self.assertEqual(bytes(m), textwrap.dedent("""\
597            Content-Type: audio/aif
598            Content-Transfer-Encoding: quoted-printable
599            MIME-Version: 1.0
600
601            b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz=
602            zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1'))
603        self.assertEqual(m.get_payload(decode=True), content)
604        self.assertEqual(m.get_content(), content)
605
606    def test_set_video_mpeg_with_binary_cte(self):
607        m = self._make_message()
608        content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
609        m.set_content(content, 'video', 'mpeg', cte='binary')
610        self.assertEqual(bytes(m), textwrap.dedent("""\
611            Content-Type: video/mpeg
612            Content-Transfer-Encoding: binary
613            MIME-Version: 1.0
614
615            """).encode('ascii') +
616            # XXX: the second \n ought to be a \r, but generator gets it wrong.
617            # THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE.
618            b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' +
619            b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz')
620        self.assertEqual(m.get_payload(decode=True), content)
621        self.assertEqual(m.get_content(), content)
622
623    def test_set_application_octet_stream_with_8bit_cte(self):
624        # In 8bit mode, universal line end logic applies.  It is up to the
625        # application to make sure the lines are short enough; we don't check.
626        m = self._make_message()
627        content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n'
628        m.set_content(content, 'application', 'octet-stream', cte='8bit')
629        self.assertEqual(bytes(m), textwrap.dedent("""\
630            Content-Type: application/octet-stream
631            Content-Transfer-Encoding: 8bit
632            MIME-Version: 1.0
633
634            """).encode('ascii') +
635            b'b\xFFgus\tcon\nt\nent\n' +
636            b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n')
637        self.assertEqual(m.get_payload(decode=True), content)
638        self.assertEqual(m.get_content(), content)
639
640    def test_set_headers_from_header_objects(self):
641        m = self._make_message()
642        content = "Simple message.\n"
643        header_factory = self.policy.header_factory
644        raw_data_manager.set_content(m, content, headers=(
645            header_factory("To", "foo@example.com"),
646            header_factory("From", "foo@example.com"),
647            header_factory("Subject", "I'm talking to myself.")))
648        self.assertEqual(str(m), textwrap.dedent("""\
649            Content-Type: text/plain; charset="utf-8"
650            To: foo@example.com
651            From: foo@example.com
652            Subject: I'm talking to myself.
653            Content-Transfer-Encoding: 7bit
654
655            Simple message.
656            """))
657
658    def test_set_headers_from_strings(self):
659        m = self._make_message()
660        content = "Simple message.\n"
661        raw_data_manager.set_content(m, content, headers=(
662            "X-Foo-Header: foo",
663            "X-Bar-Header: bar",))
664        self.assertEqual(str(m), textwrap.dedent("""\
665            Content-Type: text/plain; charset="utf-8"
666            X-Foo-Header: foo
667            X-Bar-Header: bar
668            Content-Transfer-Encoding: 7bit
669
670            Simple message.
671            """))
672
673    def test_set_headers_with_invalid_duplicate_string_header_raises(self):
674        m = self._make_message()
675        content = "Simple message.\n"
676        with self.assertRaisesRegex(ValueError, 'Content-Type'):
677            raw_data_manager.set_content(m, content, headers=(
678                "Content-Type: foo/bar",)
679                )
680
681    def test_set_headers_with_invalid_duplicate_header_header_raises(self):
682        m = self._make_message()
683        content = "Simple message.\n"
684        header_factory = self.policy.header_factory
685        with self.assertRaisesRegex(ValueError, 'Content-Type'):
686            raw_data_manager.set_content(m, content, headers=(
687                header_factory("Content-Type", " foo/bar"),)
688                )
689
690    def test_set_headers_with_defective_string_header_raises(self):
691        m = self._make_message()
692        content = "Simple message.\n"
693        with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
694            raw_data_manager.set_content(m, content, headers=(
695                'To: a@fairly@@invalid@address',)
696                )
697            print(m['To'].defects)
698
699    def test_set_headers_with_defective_header_header_raises(self):
700        m = self._make_message()
701        content = "Simple message.\n"
702        header_factory = self.policy.header_factory
703        with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
704            raw_data_manager.set_content(m, content, headers=(
705                header_factory('To', 'a@fairly@@invalid@address'),)
706                )
707            print(m['To'].defects)
708
709    def test_set_disposition_inline(self):
710        m = self._make_message()
711        m.set_content('foo', disposition='inline')
712        self.assertEqual(m['Content-Disposition'], 'inline')
713
714    def test_set_disposition_attachment(self):
715        m = self._make_message()
716        m.set_content('foo', disposition='attachment')
717        self.assertEqual(m['Content-Disposition'], 'attachment')
718
719    def test_set_disposition_foo(self):
720        m = self._make_message()
721        m.set_content('foo', disposition='foo')
722        self.assertEqual(m['Content-Disposition'], 'foo')
723
724    # XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that
725    # would cause 'foo' above to raise.
726
727    def test_set_filename(self):
728        m = self._make_message()
729        m.set_content('foo', filename='bar.txt')
730        self.assertEqual(m['Content-Disposition'],
731                         'attachment; filename="bar.txt"')
732
733    def test_set_filename_and_disposition_inline(self):
734        m = self._make_message()
735        m.set_content('foo', disposition='inline', filename='bar.txt')
736        self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"')
737
738    def test_set_non_ascii_filename(self):
739        m = self._make_message()
740        m.set_content('foo', filename='ábárî.txt')
741        self.assertEqual(bytes(m), textwrap.dedent("""\
742            Content-Type: text/plain; charset="utf-8"
743            Content-Transfer-Encoding: 7bit
744            Content-Disposition: attachment;
745             filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt
746            MIME-Version: 1.0
747
748            foo
749            """).encode('ascii'))
750
751    content_object_params = {
752        'text_plain': ('content', ()),
753        'text_html': ('content', ('html',)),
754        'application_octet_stream': (b'content',
755                                     ('application', 'octet_stream')),
756        'image_jpeg': (b'content', ('image', 'jpeg')),
757        'message_rfc822': (message(), ()),
758        'message_external_body': (message(), ('external-body',)),
759        }
760
761    def content_object_as_header_receiver(self, obj, mimetype):
762        m = self._make_message()
763        m.set_content(obj, *mimetype, headers=(
764            'To: foo@example.com',
765            'From: bar@simple.net'))
766        self.assertEqual(m['to'], 'foo@example.com')
767        self.assertEqual(m['from'], 'bar@simple.net')
768
769    def content_object_as_disposition_inline_receiver(self, obj, mimetype):
770        m = self._make_message()
771        m.set_content(obj, *mimetype, disposition='inline')
772        self.assertEqual(m['Content-Disposition'], 'inline')
773
774    def content_object_as_non_ascii_filename_receiver(self, obj, mimetype):
775        m = self._make_message()
776        m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt')
777        self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"')
778        self.assertEqual(m.get_filename(), "bár.txt")
779        self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt")
780
781    def content_object_as_cid_receiver(self, obj, mimetype):
782        m = self._make_message()
783        m.set_content(obj, *mimetype, cid='some_random_stuff')
784        self.assertEqual(m['Content-ID'], 'some_random_stuff')
785
786    def content_object_as_params_receiver(self, obj, mimetype):
787        m = self._make_message()
788        params = {'foo': 'bár', 'abc': 'xyz'}
789        m.set_content(obj, *mimetype, params=params)
790        if isinstance(obj, str):
791            params['charset'] = 'utf-8'
792        self.assertEqual(m['Content-Type'].params, params)
793
794
795if __name__ == '__main__':
796    unittest.main()
797