1import unittest 2from test.test_email import TestEmailBase, parameterize 3import textwrap 4from email import policy 5from email.message import EmailMessage 6from email.contentmanager import ContentManager, raw_data_manager 7 8 9@parameterize 10class TestContentManager(TestEmailBase): 11 12 policy = policy.default 13 message = EmailMessage 14 15 get_key_params = { 16 'full_type': (1, 'text/plain',), 17 'maintype_only': (2, 'text',), 18 'null_key': (3, '',), 19 } 20 21 def get_key_as_get_content_key(self, order, key): 22 def foo_getter(msg, foo=None): 23 bar = msg['X-Bar-Header'] 24 return foo, bar 25 cm = ContentManager() 26 cm.add_get_handler(key, foo_getter) 27 m = self._make_message() 28 m['Content-Type'] = 'text/plain' 29 m['X-Bar-Header'] = 'foo' 30 self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo')) 31 32 def get_key_as_get_content_key_order(self, order, key): 33 def bar_getter(msg): 34 return msg['X-Bar-Header'] 35 def foo_getter(msg): 36 return msg['X-Foo-Header'] 37 cm = ContentManager() 38 cm.add_get_handler(key, foo_getter) 39 for precedence, key in self.get_key_params.values(): 40 if precedence > order: 41 cm.add_get_handler(key, bar_getter) 42 m = self._make_message() 43 m['Content-Type'] = 'text/plain' 44 m['X-Bar-Header'] = 'bar' 45 m['X-Foo-Header'] = 'foo' 46 self.assertEqual(cm.get_content(m), ('foo')) 47 48 def test_get_content_raises_if_unknown_mimetype_and_no_default(self): 49 cm = ContentManager() 50 m = self._make_message() 51 m['Content-Type'] = 'text/plain' 52 with self.assertRaisesRegex(KeyError, 'text/plain'): 53 cm.get_content(m) 54 55 class BaseThing(str): 56 pass 57 baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing' 58 class Thing(BaseThing): 59 pass 60 testobject_full_path = __name__ + '.' + 'TestContentManager.Thing' 61 62 set_key_params = { 63 'type': (0, Thing,), 64 'full_path': (1, testobject_full_path,), 65 'qualname': (2, 'TestContentManager.Thing',), 66 'name': (3, 'Thing',), 67 'base_type': (4, BaseThing,), 68 'base_full_path': (5, baseobject_full_path,), 69 'base_qualname': (6, 'TestContentManager.BaseThing',), 70 'base_name': (7, 'BaseThing',), 71 'str_type': (8, str,), 72 'str_full_path': (9, 'builtins.str',), 73 'str_name': (10, 'str',), # str name and qualname are the same 74 'null_key': (11, None,), 75 } 76 77 def set_key_as_set_content_key(self, order, key): 78 def foo_setter(msg, obj, foo=None): 79 msg['X-Foo-Header'] = foo 80 msg.set_payload(obj) 81 cm = ContentManager() 82 cm.add_set_handler(key, foo_setter) 83 m = self._make_message() 84 msg_obj = self.Thing() 85 cm.set_content(m, msg_obj, foo='bar') 86 self.assertEqual(m['X-Foo-Header'], 'bar') 87 self.assertEqual(m.get_payload(), msg_obj) 88 89 def set_key_as_set_content_key_order(self, order, key): 90 def foo_setter(msg, obj): 91 msg['X-FooBar-Header'] = 'foo' 92 msg.set_payload(obj) 93 def bar_setter(msg, obj): 94 msg['X-FooBar-Header'] = 'bar' 95 cm = ContentManager() 96 cm.add_set_handler(key, foo_setter) 97 for precedence, key in self.get_key_params.values(): 98 if precedence > order: 99 cm.add_set_handler(key, bar_setter) 100 m = self._make_message() 101 msg_obj = self.Thing() 102 cm.set_content(m, msg_obj) 103 self.assertEqual(m['X-FooBar-Header'], 'foo') 104 self.assertEqual(m.get_payload(), msg_obj) 105 106 def test_set_content_raises_if_unknown_type_and_no_default(self): 107 cm = ContentManager() 108 m = self._make_message() 109 msg_obj = self.Thing() 110 with self.assertRaisesRegex(KeyError, self.testobject_full_path): 111 cm.set_content(m, msg_obj) 112 113 def test_set_content_raises_if_called_on_multipart(self): 114 cm = ContentManager() 115 m = self._make_message() 116 m['Content-Type'] = 'multipart/foo' 117 with self.assertRaises(TypeError): 118 cm.set_content(m, 'test') 119 120 def test_set_content_calls_clear_content(self): 121 m = self._make_message() 122 m['Content-Foo'] = 'bar' 123 m['Content-Type'] = 'text/html' 124 m['To'] = 'test' 125 m.set_payload('abc') 126 cm = ContentManager() 127 cm.add_set_handler(str, lambda *args, **kw: None) 128 m.set_content('xyz', content_manager=cm) 129 self.assertIsNone(m['Content-Foo']) 130 self.assertIsNone(m['Content-Type']) 131 self.assertEqual(m['To'], 'test') 132 self.assertIsNone(m.get_payload()) 133 134 135@parameterize 136class TestRawDataManager(TestEmailBase): 137 # Note: these tests are dependent on the order in which headers are added 138 # to the message objects by the code. There's no defined ordering in 139 # RFC5322/MIME, so this makes the tests more fragile than the standards 140 # require. However, if the header order changes it is best to understand 141 # *why*, and make sure it isn't a subtle bug in whatever change was 142 # applied. 143 144 policy = policy.default.clone(max_line_length=60, 145 content_manager=raw_data_manager) 146 message = EmailMessage 147 148 def test_get_text_plain(self): 149 m = self._str_msg(textwrap.dedent("""\ 150 Content-Type: text/plain 151 152 Basic text. 153 """)) 154 self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n") 155 156 def test_get_text_html(self): 157 m = self._str_msg(textwrap.dedent("""\ 158 Content-Type: text/html 159 160 <p>Basic text.</p> 161 """)) 162 self.assertEqual(raw_data_manager.get_content(m), 163 "<p>Basic text.</p>\n") 164 165 def test_get_text_plain_latin1(self): 166 m = self._bytes_msg(textwrap.dedent("""\ 167 Content-Type: text/plain; charset=latin1 168 169 Basìc tëxt. 170 """).encode('latin1')) 171 self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") 172 173 def test_get_text_plain_latin1_quoted_printable(self): 174 m = self._str_msg(textwrap.dedent("""\ 175 Content-Type: text/plain; charset="latin-1" 176 Content-Transfer-Encoding: quoted-printable 177 178 Bas=ECc t=EBxt. 179 """)) 180 self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") 181 182 def test_get_text_plain_utf8_base64(self): 183 m = self._str_msg(textwrap.dedent("""\ 184 Content-Type: text/plain; charset="utf8" 185 Content-Transfer-Encoding: base64 186 187 QmFzw6xjIHTDq3h0Lgo= 188 """)) 189 self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") 190 191 def test_get_text_plain_bad_utf8_quoted_printable(self): 192 m = self._str_msg(textwrap.dedent("""\ 193 Content-Type: text/plain; charset="utf8" 194 Content-Transfer-Encoding: quoted-printable 195 196 Bas=c3=acc t=c3=abxt=fd. 197 """)) 198 self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt�.\n") 199 200 def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self): 201 m = self._str_msg(textwrap.dedent("""\ 202 Content-Type: text/plain; charset="utf8" 203 Content-Transfer-Encoding: quoted-printable 204 205 Bas=c3=acc t=c3=abxt=fd. 206 """)) 207 self.assertEqual(raw_data_manager.get_content(m, errors='ignore'), 208 "Basìc tëxt.\n") 209 210 def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self): 211 m = self._str_msg(textwrap.dedent("""\ 212 Content-Type: text/plain; charset="utf8" 213 Content-Transfer-Encoding: base64 214 215 QmFzw6xjIHTDq3h0Lgo\xFF= 216 """)) 217 self.assertEqual(raw_data_manager.get_content(m, errors='ignore'), 218 "Basìc tëxt.\n") 219 220 def test_get_text_invalid_keyword(self): 221 m = self._str_msg(textwrap.dedent("""\ 222 Content-Type: text/plain 223 224 Basic text. 225 """)) 226 with self.assertRaises(TypeError): 227 raw_data_manager.get_content(m, foo='ignore') 228 229 def test_get_non_text(self): 230 template = textwrap.dedent("""\ 231 Content-Type: {} 232 Content-Transfer-Encoding: base64 233 234 Ym9ndXMgZGF0YQ== 235 """) 236 for maintype in 'audio image video application'.split(): 237 with self.subTest(maintype=maintype): 238 m = self._str_msg(template.format(maintype+'/foo')) 239 self.assertEqual(raw_data_manager.get_content(m), b"bogus data") 240 241 def test_get_non_text_invalid_keyword(self): 242 m = self._str_msg(textwrap.dedent("""\ 243 Content-Type: image/jpg 244 Content-Transfer-Encoding: base64 245 246 Ym9ndXMgZGF0YQ== 247 """)) 248 with self.assertRaises(TypeError): 249 raw_data_manager.get_content(m, errors='ignore') 250 251 def test_get_raises_on_multipart(self): 252 m = self._str_msg(textwrap.dedent("""\ 253 Content-Type: multipart/mixed; boundary="===" 254 255 --=== 256 --===-- 257 """)) 258 with self.assertRaises(KeyError): 259 raw_data_manager.get_content(m) 260 261 def test_get_message_rfc822_and_external_body(self): 262 template = textwrap.dedent("""\ 263 Content-Type: message/{} 264 265 To: foo@example.com 266 From: bar@example.com 267 Subject: example 268 269 an example message 270 """) 271 for subtype in 'rfc822 external-body'.split(): 272 with self.subTest(subtype=subtype): 273 m = self._str_msg(template.format(subtype)) 274 sub_msg = raw_data_manager.get_content(m) 275 self.assertIsInstance(sub_msg, self.message) 276 self.assertEqual(raw_data_manager.get_content(sub_msg), 277 "an example message\n") 278 self.assertEqual(sub_msg['to'], 'foo@example.com') 279 self.assertEqual(sub_msg['from'].addresses[0].username, 'bar') 280 281 def test_get_message_non_rfc822_or_external_body_yields_bytes(self): 282 m = self._str_msg(textwrap.dedent("""\ 283 Content-Type: message/partial 284 285 To: foo@example.com 286 From: bar@example.com 287 Subject: example 288 289 The real body is in another message. 290 """)) 291 self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex') 292 293 def test_set_text_plain(self): 294 m = self._make_message() 295 content = "Simple message.\n" 296 raw_data_manager.set_content(m, content) 297 self.assertEqual(str(m), textwrap.dedent("""\ 298 Content-Type: text/plain; charset="utf-8" 299 Content-Transfer-Encoding: 7bit 300 301 Simple message. 302 """)) 303 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 304 self.assertEqual(m.get_content(), content) 305 306 def test_set_text_html(self): 307 m = self._make_message() 308 content = "<p>Simple message.</p>\n" 309 raw_data_manager.set_content(m, content, subtype='html') 310 self.assertEqual(str(m), textwrap.dedent("""\ 311 Content-Type: text/html; charset="utf-8" 312 Content-Transfer-Encoding: 7bit 313 314 <p>Simple message.</p> 315 """)) 316 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 317 self.assertEqual(m.get_content(), content) 318 319 def test_set_text_charset_latin_1(self): 320 m = self._make_message() 321 content = "Simple message.\n" 322 raw_data_manager.set_content(m, content, charset='latin-1') 323 self.assertEqual(str(m), textwrap.dedent("""\ 324 Content-Type: text/plain; charset="iso-8859-1" 325 Content-Transfer-Encoding: 7bit 326 327 Simple message. 328 """)) 329 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 330 self.assertEqual(m.get_content(), content) 331 332 def test_set_text_short_line_minimal_non_ascii_heuristics(self): 333 m = self._make_message() 334 content = "et là il est monté sur moi et il commence à m'éto.\n" 335 raw_data_manager.set_content(m, content) 336 self.assertEqual(bytes(m), textwrap.dedent("""\ 337 Content-Type: text/plain; charset="utf-8" 338 Content-Transfer-Encoding: 8bit 339 340 et là il est monté sur moi et il commence à m'éto. 341 """).encode('utf-8')) 342 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 343 self.assertEqual(m.get_content(), content) 344 345 def test_set_text_long_line_minimal_non_ascii_heuristics(self): 346 m = self._make_message() 347 content = ("j'ai un problème de python. il est sorti de son" 348 " vivarium. et là il est monté sur moi et il commence" 349 " à m'éto.\n") 350 raw_data_manager.set_content(m, content) 351 self.assertEqual(bytes(m), textwrap.dedent("""\ 352 Content-Type: text/plain; charset="utf-8" 353 Content-Transfer-Encoding: quoted-printable 354 355 j'ai un probl=C3=A8me de python. il est sorti de son vivari= 356 um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence = 357 =C3=A0 m'=C3=A9to. 358 """).encode('utf-8')) 359 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 360 self.assertEqual(m.get_content(), content) 361 362 def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self): 363 m = self._make_message() 364 content = '\n'*10 + ( 365 "j'ai un problème de python. il est sorti de son" 366 " vivarium. et là il est monté sur moi et il commence" 367 " à m'éto.\n") 368 raw_data_manager.set_content(m, content) 369 self.assertEqual(bytes(m), textwrap.dedent("""\ 370 Content-Type: text/plain; charset="utf-8" 371 Content-Transfer-Encoding: quoted-printable 372 """ + '\n'*10 + """ 373 j'ai un probl=C3=A8me de python. il est sorti de son vivari= 374 um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence = 375 =C3=A0 m'=C3=A9to. 376 """).encode('utf-8')) 377 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 378 self.assertEqual(m.get_content(), content) 379 380 def test_set_text_maximal_non_ascii_heuristics(self): 381 m = self._make_message() 382 content = "áàäéèęöő.\n" 383 raw_data_manager.set_content(m, content) 384 self.assertEqual(bytes(m), textwrap.dedent("""\ 385 Content-Type: text/plain; charset="utf-8" 386 Content-Transfer-Encoding: 8bit 387 388 áàäéèęöő. 389 """).encode('utf-8')) 390 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 391 self.assertEqual(m.get_content(), content) 392 393 def test_set_text_11_lines_maximal_non_ascii_heuristics(self): 394 m = self._make_message() 395 content = '\n'*10 + "áàäéèęöő.\n" 396 raw_data_manager.set_content(m, content) 397 self.assertEqual(bytes(m), textwrap.dedent("""\ 398 Content-Type: text/plain; charset="utf-8" 399 Content-Transfer-Encoding: 8bit 400 """ + '\n'*10 + """ 401 áàäéèęöő. 402 """).encode('utf-8')) 403 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 404 self.assertEqual(m.get_content(), content) 405 406 def test_set_text_long_line_maximal_non_ascii_heuristics(self): 407 m = self._make_message() 408 content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" 409 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" 410 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") 411 raw_data_manager.set_content(m, content) 412 self.assertEqual(bytes(m), textwrap.dedent("""\ 413 Content-Type: text/plain; charset="utf-8" 414 Content-Transfer-Encoding: base64 415 416 w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD 417 tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo 418 xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD 419 qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg 420 w6TDqcOoxJnDtsWRLgo= 421 """).encode('utf-8')) 422 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 423 self.assertEqual(m.get_content(), content) 424 425 def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self): 426 # Yes, it chooses "wrong" here. It's a heuristic. So this result 427 # could change if we come up with a better heuristic. 428 m = self._make_message() 429 content = ('\n'*10 + 430 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" 431 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" 432 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") 433 raw_data_manager.set_content(m, "\n"*10 + 434 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" 435 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" 436 "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") 437 self.assertEqual(bytes(m), textwrap.dedent("""\ 438 Content-Type: text/plain; charset="utf-8" 439 Content-Transfer-Encoding: quoted-printable 440 """ + '\n'*10 + """ 441 =C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3= 442 =A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4= 443 =C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3= 444 =A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99= 445 =C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5= 446 =91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1= 447 =C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3= 448 =A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9= 449 =C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4= 450 =99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6= 451 =C5=91. 452 """).encode('utf-8')) 453 self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) 454 self.assertEqual(m.get_content(), content) 455 456 def test_set_text_non_ascii_with_cte_7bit_raises(self): 457 m = self._make_message() 458 with self.assertRaises(UnicodeError): 459 raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit') 460 461 def test_set_text_non_ascii_with_charset_ascii_raises(self): 462 m = self._make_message() 463 with self.assertRaises(UnicodeError): 464 raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii') 465 466 def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self): 467 m = self._make_message() 468 with self.assertRaises(UnicodeError): 469 raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii') 470 471 def test_set_message(self): 472 m = self._make_message() 473 m['Subject'] = "Forwarded message" 474 content = self._make_message() 475 content['To'] = 'python@vivarium.org' 476 content['From'] = 'police@monty.org' 477 content['Subject'] = "get back in your box" 478 content.set_content("Or face the comfy chair.") 479 raw_data_manager.set_content(m, content) 480 self.assertEqual(str(m), textwrap.dedent("""\ 481 Subject: Forwarded message 482 Content-Type: message/rfc822 483 Content-Transfer-Encoding: 8bit 484 485 To: python@vivarium.org 486 From: police@monty.org 487 Subject: get back in your box 488 Content-Type: text/plain; charset="utf-8" 489 Content-Transfer-Encoding: 7bit 490 MIME-Version: 1.0 491 492 Or face the comfy chair. 493 """)) 494 payload = m.get_payload(0) 495 self.assertIsInstance(payload, self.message) 496 self.assertEqual(str(payload), str(content)) 497 self.assertIsInstance(m.get_content(), self.message) 498 self.assertEqual(str(m.get_content()), str(content)) 499 500 def test_set_message_with_non_ascii_and_coercion_to_7bit(self): 501 m = self._make_message() 502 m['Subject'] = "Escape report" 503 content = self._make_message() 504 content['To'] = 'police@monty.org' 505 content['From'] = 'victim@monty.org' 506 content['Subject'] = "Help" 507 content.set_content("j'ai un problème de python. il est sorti de son" 508 " vivarium.") 509 raw_data_manager.set_content(m, content) 510 self.assertEqual(bytes(m), textwrap.dedent("""\ 511 Subject: Escape report 512 Content-Type: message/rfc822 513 Content-Transfer-Encoding: 8bit 514 515 To: police@monty.org 516 From: victim@monty.org 517 Subject: Help 518 Content-Type: text/plain; charset="utf-8" 519 Content-Transfer-Encoding: 8bit 520 MIME-Version: 1.0 521 522 j'ai un problème de python. il est sorti de son vivarium. 523 """).encode('utf-8')) 524 # The choice of base64 for the body encoding is because generator 525 # doesn't bother with heuristics and uses it unconditionally for utf-8 526 # text. 527 # XXX: the first cte should be 7bit, too...that's a generator bug. 528 # XXX: the line length in the body also looks like a generator bug. 529 self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length), 530 textwrap.dedent("""\ 531 Subject: Escape report 532 Content-Type: message/rfc822 533 Content-Transfer-Encoding: 8bit 534 535 To: police@monty.org 536 From: victim@monty.org 537 Subject: Help 538 Content-Type: text/plain; charset="utf-8" 539 Content-Transfer-Encoding: base64 540 MIME-Version: 1.0 541 542 aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt 543 Lgo= 544 """)) 545 self.assertIsInstance(m.get_content(), self.message) 546 self.assertEqual(str(m.get_content()), str(content)) 547 548 def test_set_message_invalid_cte_raises(self): 549 m = self._make_message() 550 content = self._make_message() 551 for cte in 'quoted-printable base64'.split(): 552 for subtype in 'rfc822 external-body'.split(): 553 with self.subTest(cte=cte, subtype=subtype): 554 with self.assertRaises(ValueError) as ar: 555 m.set_content(content, subtype, cte=cte) 556 exc = str(ar.exception) 557 self.assertIn(cte, exc) 558 self.assertIn(subtype, exc) 559 subtype = 'external-body' 560 for cte in '8bit binary'.split(): 561 with self.subTest(cte=cte, subtype=subtype): 562 with self.assertRaises(ValueError) as ar: 563 m.set_content(content, subtype, cte=cte) 564 exc = str(ar.exception) 565 self.assertIn(cte, exc) 566 self.assertIn(subtype, exc) 567 568 def test_set_image_jpg(self): 569 for content in (b"bogus content", 570 bytearray(b"bogus content"), 571 memoryview(b"bogus content")): 572 with self.subTest(content=content): 573 m = self._make_message() 574 raw_data_manager.set_content(m, content, 'image', 'jpeg') 575 self.assertEqual(str(m), textwrap.dedent("""\ 576 Content-Type: image/jpeg 577 Content-Transfer-Encoding: base64 578 579 Ym9ndXMgY29udGVudA== 580 """)) 581 self.assertEqual(m.get_payload(decode=True), content) 582 self.assertEqual(m.get_content(), content) 583 584 def test_set_audio_aif_with_quoted_printable_cte(self): 585 # Why you would use qp, I don't know, but it is technically supported. 586 # XXX: the incorrect line length is because binascii.b2a_qp doesn't 587 # support a line length parameter, but we must use it to get newline 588 # encoding. 589 # XXX: what about that lack of tailing newline? Do we actually handle 590 # that correctly in all cases? That is, if the *source* has an 591 # unencoded newline, do we add an extra newline to the returned payload 592 # or not? And can that actually be disambiguated based on the RFC? 593 m = self._make_message() 594 content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100 595 m.set_content(content, 'audio', 'aif', cte='quoted-printable') 596 self.assertEqual(bytes(m), textwrap.dedent("""\ 597 Content-Type: audio/aif 598 Content-Transfer-Encoding: quoted-printable 599 MIME-Version: 1.0 600 601 b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz= 602 zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1')) 603 self.assertEqual(m.get_payload(decode=True), content) 604 self.assertEqual(m.get_content(), content) 605 606 def test_set_video_mpeg_with_binary_cte(self): 607 m = self._make_message() 608 content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100 609 m.set_content(content, 'video', 'mpeg', cte='binary') 610 self.assertEqual(bytes(m), textwrap.dedent("""\ 611 Content-Type: video/mpeg 612 Content-Transfer-Encoding: binary 613 MIME-Version: 1.0 614 615 """).encode('ascii') + 616 # XXX: the second \n ought to be a \r, but generator gets it wrong. 617 # THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE. 618 b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' + 619 b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz') 620 self.assertEqual(m.get_payload(decode=True), content) 621 self.assertEqual(m.get_content(), content) 622 623 def test_set_application_octet_stream_with_8bit_cte(self): 624 # In 8bit mode, universal line end logic applies. It is up to the 625 # application to make sure the lines are short enough; we don't check. 626 m = self._make_message() 627 content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n' 628 m.set_content(content, 'application', 'octet-stream', cte='8bit') 629 self.assertEqual(bytes(m), textwrap.dedent("""\ 630 Content-Type: application/octet-stream 631 Content-Transfer-Encoding: 8bit 632 MIME-Version: 1.0 633 634 """).encode('ascii') + 635 b'b\xFFgus\tcon\nt\nent\n' + 636 b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n') 637 self.assertEqual(m.get_payload(decode=True), content) 638 self.assertEqual(m.get_content(), content) 639 640 def test_set_headers_from_header_objects(self): 641 m = self._make_message() 642 content = "Simple message.\n" 643 header_factory = self.policy.header_factory 644 raw_data_manager.set_content(m, content, headers=( 645 header_factory("To", "foo@example.com"), 646 header_factory("From", "foo@example.com"), 647 header_factory("Subject", "I'm talking to myself."))) 648 self.assertEqual(str(m), textwrap.dedent("""\ 649 Content-Type: text/plain; charset="utf-8" 650 To: foo@example.com 651 From: foo@example.com 652 Subject: I'm talking to myself. 653 Content-Transfer-Encoding: 7bit 654 655 Simple message. 656 """)) 657 658 def test_set_headers_from_strings(self): 659 m = self._make_message() 660 content = "Simple message.\n" 661 raw_data_manager.set_content(m, content, headers=( 662 "X-Foo-Header: foo", 663 "X-Bar-Header: bar",)) 664 self.assertEqual(str(m), textwrap.dedent("""\ 665 Content-Type: text/plain; charset="utf-8" 666 X-Foo-Header: foo 667 X-Bar-Header: bar 668 Content-Transfer-Encoding: 7bit 669 670 Simple message. 671 """)) 672 673 def test_set_headers_with_invalid_duplicate_string_header_raises(self): 674 m = self._make_message() 675 content = "Simple message.\n" 676 with self.assertRaisesRegex(ValueError, 'Content-Type'): 677 raw_data_manager.set_content(m, content, headers=( 678 "Content-Type: foo/bar",) 679 ) 680 681 def test_set_headers_with_invalid_duplicate_header_header_raises(self): 682 m = self._make_message() 683 content = "Simple message.\n" 684 header_factory = self.policy.header_factory 685 with self.assertRaisesRegex(ValueError, 'Content-Type'): 686 raw_data_manager.set_content(m, content, headers=( 687 header_factory("Content-Type", " foo/bar"),) 688 ) 689 690 def test_set_headers_with_defective_string_header_raises(self): 691 m = self._make_message() 692 content = "Simple message.\n" 693 with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'): 694 raw_data_manager.set_content(m, content, headers=( 695 'To: a@fairly@@invalid@address',) 696 ) 697 print(m['To'].defects) 698 699 def test_set_headers_with_defective_header_header_raises(self): 700 m = self._make_message() 701 content = "Simple message.\n" 702 header_factory = self.policy.header_factory 703 with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'): 704 raw_data_manager.set_content(m, content, headers=( 705 header_factory('To', 'a@fairly@@invalid@address'),) 706 ) 707 print(m['To'].defects) 708 709 def test_set_disposition_inline(self): 710 m = self._make_message() 711 m.set_content('foo', disposition='inline') 712 self.assertEqual(m['Content-Disposition'], 'inline') 713 714 def test_set_disposition_attachment(self): 715 m = self._make_message() 716 m.set_content('foo', disposition='attachment') 717 self.assertEqual(m['Content-Disposition'], 'attachment') 718 719 def test_set_disposition_foo(self): 720 m = self._make_message() 721 m.set_content('foo', disposition='foo') 722 self.assertEqual(m['Content-Disposition'], 'foo') 723 724 # XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that 725 # would cause 'foo' above to raise. 726 727 def test_set_filename(self): 728 m = self._make_message() 729 m.set_content('foo', filename='bar.txt') 730 self.assertEqual(m['Content-Disposition'], 731 'attachment; filename="bar.txt"') 732 733 def test_set_filename_and_disposition_inline(self): 734 m = self._make_message() 735 m.set_content('foo', disposition='inline', filename='bar.txt') 736 self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"') 737 738 def test_set_non_ascii_filename(self): 739 m = self._make_message() 740 m.set_content('foo', filename='ábárî.txt') 741 self.assertEqual(bytes(m), textwrap.dedent("""\ 742 Content-Type: text/plain; charset="utf-8" 743 Content-Transfer-Encoding: 7bit 744 Content-Disposition: attachment; 745 filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt 746 MIME-Version: 1.0 747 748 foo 749 """).encode('ascii')) 750 751 content_object_params = { 752 'text_plain': ('content', ()), 753 'text_html': ('content', ('html',)), 754 'application_octet_stream': (b'content', 755 ('application', 'octet_stream')), 756 'image_jpeg': (b'content', ('image', 'jpeg')), 757 'message_rfc822': (message(), ()), 758 'message_external_body': (message(), ('external-body',)), 759 } 760 761 def content_object_as_header_receiver(self, obj, mimetype): 762 m = self._make_message() 763 m.set_content(obj, *mimetype, headers=( 764 'To: foo@example.com', 765 'From: bar@simple.net')) 766 self.assertEqual(m['to'], 'foo@example.com') 767 self.assertEqual(m['from'], 'bar@simple.net') 768 769 def content_object_as_disposition_inline_receiver(self, obj, mimetype): 770 m = self._make_message() 771 m.set_content(obj, *mimetype, disposition='inline') 772 self.assertEqual(m['Content-Disposition'], 'inline') 773 774 def content_object_as_non_ascii_filename_receiver(self, obj, mimetype): 775 m = self._make_message() 776 m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt') 777 self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"') 778 self.assertEqual(m.get_filename(), "bár.txt") 779 self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt") 780 781 def content_object_as_cid_receiver(self, obj, mimetype): 782 m = self._make_message() 783 m.set_content(obj, *mimetype, cid='some_random_stuff') 784 self.assertEqual(m['Content-ID'], 'some_random_stuff') 785 786 def content_object_as_params_receiver(self, obj, mimetype): 787 m = self._make_message() 788 params = {'foo': 'bár', 'abc': 'xyz'} 789 m.set_content(obj, *mimetype, params=params) 790 if isinstance(obj, str): 791 params['charset'] = 'utf-8' 792 self.assertEqual(m['Content-Type'].params, params) 793 794 795if __name__ == '__main__': 796 unittest.main() 797