1import os 2import sys 3import time 4import stat 5import socket 6import email 7import email.message 8import re 9import io 10import tempfile 11from test import support 12import unittest 13import textwrap 14import mailbox 15import glob 16 17 18class TestBase: 19 20 all_mailbox_types = (mailbox.Message, mailbox.MaildirMessage, 21 mailbox.mboxMessage, mailbox.MHMessage, 22 mailbox.BabylMessage, mailbox.MMDFMessage) 23 24 def _check_sample(self, msg): 25 # Inspect a mailbox.Message representation of the sample message 26 self.assertIsInstance(msg, email.message.Message) 27 self.assertIsInstance(msg, mailbox.Message) 28 for key, value in _sample_headers.items(): 29 self.assertIn(value, msg.get_all(key)) 30 self.assertTrue(msg.is_multipart()) 31 self.assertEqual(len(msg.get_payload()), len(_sample_payloads)) 32 for i, payload in enumerate(_sample_payloads): 33 part = msg.get_payload(i) 34 self.assertIsInstance(part, email.message.Message) 35 self.assertNotIsInstance(part, mailbox.Message) 36 self.assertEqual(part.get_payload(), payload) 37 38 def _delete_recursively(self, target): 39 # Delete a file or delete a directory recursively 40 if os.path.isdir(target): 41 support.rmtree(target) 42 elif os.path.exists(target): 43 support.unlink(target) 44 45 46class TestMailbox(TestBase): 47 48 maxDiff = None 49 50 _factory = None # Overridden by subclasses to reuse tests 51 _template = 'From: foo\n\n%s\n' 52 53 def setUp(self): 54 self._path = support.TESTFN 55 self._delete_recursively(self._path) 56 self._box = self._factory(self._path) 57 58 def tearDown(self): 59 self._box.close() 60 self._delete_recursively(self._path) 61 62 def test_add(self): 63 # Add copies of a sample message 64 keys = [] 65 keys.append(self._box.add(self._template % 0)) 66 self.assertEqual(len(self._box), 1) 67 keys.append(self._box.add(mailbox.Message(_sample_message))) 68 self.assertEqual(len(self._box), 2) 69 keys.append(self._box.add(email.message_from_string(_sample_message))) 70 self.assertEqual(len(self._box), 3) 71 keys.append(self._box.add(io.BytesIO(_bytes_sample_message))) 72 self.assertEqual(len(self._box), 4) 73 keys.append(self._box.add(_sample_message)) 74 self.assertEqual(len(self._box), 5) 75 keys.append(self._box.add(_bytes_sample_message)) 76 self.assertEqual(len(self._box), 6) 77 with self.assertWarns(DeprecationWarning): 78 keys.append(self._box.add( 79 io.TextIOWrapper(io.BytesIO(_bytes_sample_message)))) 80 self.assertEqual(len(self._box), 7) 81 self.assertEqual(self._box.get_string(keys[0]), self._template % 0) 82 for i in (1, 2, 3, 4, 5, 6): 83 self._check_sample(self._box[keys[i]]) 84 85 _nonascii_msg = textwrap.dedent("""\ 86 From: foo 87 Subject: Falinaptár házhozszállítással. Már rendeltél? 88 89 0 90 """) 91 92 def test_add_invalid_8bit_bytes_header(self): 93 key = self._box.add(self._nonascii_msg.encode('latin-1')) 94 self.assertEqual(len(self._box), 1) 95 self.assertEqual(self._box.get_bytes(key), 96 self._nonascii_msg.encode('latin-1')) 97 98 def test_invalid_nonascii_header_as_string(self): 99 subj = self._nonascii_msg.splitlines()[1] 100 key = self._box.add(subj.encode('latin-1')) 101 self.assertEqual(self._box.get_string(key), 102 'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz' 103 'YWwuIE3hciByZW5kZWx06Ww/?=\n\n') 104 105 def test_add_nonascii_string_header_raises(self): 106 with self.assertRaisesRegex(ValueError, "ASCII-only"): 107 self._box.add(self._nonascii_msg) 108 self._box.flush() 109 self.assertEqual(len(self._box), 0) 110 self.assertMailboxEmpty() 111 112 def test_add_that_raises_leaves_mailbox_empty(self): 113 def raiser(*args, **kw): 114 raise Exception("a fake error") 115 support.patch(self, email.generator.BytesGenerator, 'flatten', raiser) 116 with self.assertRaises(Exception): 117 self._box.add(email.message_from_string("From: Alphöso")) 118 self.assertEqual(len(self._box), 0) 119 self._box.close() 120 self.assertMailboxEmpty() 121 122 _non_latin_bin_msg = textwrap.dedent("""\ 123 From: foo@bar.com 124 To: báz 125 Subject: Maintenant je vous présente mon collègue, le pouf célèbre 126 \tJean de Baddie 127 Mime-Version: 1.0 128 Content-Type: text/plain; charset="utf-8" 129 Content-Transfer-Encoding: 8bit 130 131 Да, они летят. 132 """).encode('utf-8') 133 134 def test_add_8bit_body(self): 135 key = self._box.add(self._non_latin_bin_msg) 136 self.assertEqual(self._box.get_bytes(key), 137 self._non_latin_bin_msg) 138 with self._box.get_file(key) as f: 139 self.assertEqual(f.read(), 140 self._non_latin_bin_msg.replace(b'\n', 141 os.linesep.encode())) 142 self.assertEqual(self._box[key].get_payload(), 143 "Да, они летят.\n") 144 145 def test_add_binary_file(self): 146 with tempfile.TemporaryFile('wb+') as f: 147 f.write(_bytes_sample_message) 148 f.seek(0) 149 key = self._box.add(f) 150 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 151 _bytes_sample_message.split(b'\n')) 152 153 def test_add_binary_nonascii_file(self): 154 with tempfile.TemporaryFile('wb+') as f: 155 f.write(self._non_latin_bin_msg) 156 f.seek(0) 157 key = self._box.add(f) 158 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 159 self._non_latin_bin_msg.split(b'\n')) 160 161 def test_add_text_file_warns(self): 162 with tempfile.TemporaryFile('w+') as f: 163 f.write(_sample_message) 164 f.seek(0) 165 with self.assertWarns(DeprecationWarning): 166 key = self._box.add(f) 167 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 168 _bytes_sample_message.split(b'\n')) 169 170 def test_add_StringIO_warns(self): 171 with self.assertWarns(DeprecationWarning): 172 key = self._box.add(io.StringIO(self._template % "0")) 173 self.assertEqual(self._box.get_string(key), self._template % "0") 174 175 def test_add_nonascii_StringIO_raises(self): 176 with self.assertWarns(DeprecationWarning): 177 with self.assertRaisesRegex(ValueError, "ASCII-only"): 178 self._box.add(io.StringIO(self._nonascii_msg)) 179 self.assertEqual(len(self._box), 0) 180 self._box.close() 181 self.assertMailboxEmpty() 182 183 def test_remove(self): 184 # Remove messages using remove() 185 self._test_remove_or_delitem(self._box.remove) 186 187 def test_delitem(self): 188 # Remove messages using __delitem__() 189 self._test_remove_or_delitem(self._box.__delitem__) 190 191 def _test_remove_or_delitem(self, method): 192 # (Used by test_remove() and test_delitem().) 193 key0 = self._box.add(self._template % 0) 194 key1 = self._box.add(self._template % 1) 195 self.assertEqual(len(self._box), 2) 196 method(key0) 197 self.assertEqual(len(self._box), 1) 198 self.assertRaises(KeyError, lambda: self._box[key0]) 199 self.assertRaises(KeyError, lambda: method(key0)) 200 self.assertEqual(self._box.get_string(key1), self._template % 1) 201 key2 = self._box.add(self._template % 2) 202 self.assertEqual(len(self._box), 2) 203 method(key2) 204 self.assertEqual(len(self._box), 1) 205 self.assertRaises(KeyError, lambda: self._box[key2]) 206 self.assertRaises(KeyError, lambda: method(key2)) 207 self.assertEqual(self._box.get_string(key1), self._template % 1) 208 method(key1) 209 self.assertEqual(len(self._box), 0) 210 self.assertRaises(KeyError, lambda: self._box[key1]) 211 self.assertRaises(KeyError, lambda: method(key1)) 212 213 def test_discard(self, repetitions=10): 214 # Discard messages 215 key0 = self._box.add(self._template % 0) 216 key1 = self._box.add(self._template % 1) 217 self.assertEqual(len(self._box), 2) 218 self._box.discard(key0) 219 self.assertEqual(len(self._box), 1) 220 self.assertRaises(KeyError, lambda: self._box[key0]) 221 self._box.discard(key0) 222 self.assertEqual(len(self._box), 1) 223 self.assertRaises(KeyError, lambda: self._box[key0]) 224 225 def test_get(self): 226 # Retrieve messages using get() 227 key0 = self._box.add(self._template % 0) 228 msg = self._box.get(key0) 229 self.assertEqual(msg['from'], 'foo') 230 self.assertEqual(msg.get_payload(), '0\n') 231 self.assertIsNone(self._box.get('foo')) 232 self.assertIs(self._box.get('foo', False), False) 233 self._box.close() 234 self._box = self._factory(self._path) 235 key1 = self._box.add(self._template % 1) 236 msg = self._box.get(key1) 237 self.assertEqual(msg['from'], 'foo') 238 self.assertEqual(msg.get_payload(), '1\n') 239 240 def test_getitem(self): 241 # Retrieve message using __getitem__() 242 key0 = self._box.add(self._template % 0) 243 msg = self._box[key0] 244 self.assertEqual(msg['from'], 'foo') 245 self.assertEqual(msg.get_payload(), '0\n') 246 self.assertRaises(KeyError, lambda: self._box['foo']) 247 self._box.discard(key0) 248 self.assertRaises(KeyError, lambda: self._box[key0]) 249 250 def test_get_message(self): 251 # Get Message representations of messages 252 key0 = self._box.add(self._template % 0) 253 key1 = self._box.add(_sample_message) 254 msg0 = self._box.get_message(key0) 255 self.assertIsInstance(msg0, mailbox.Message) 256 self.assertEqual(msg0['from'], 'foo') 257 self.assertEqual(msg0.get_payload(), '0\n') 258 self._check_sample(self._box.get_message(key1)) 259 260 def test_get_bytes(self): 261 # Get bytes representations of messages 262 key0 = self._box.add(self._template % 0) 263 key1 = self._box.add(_sample_message) 264 self.assertEqual(self._box.get_bytes(key0), 265 (self._template % 0).encode('ascii')) 266 self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message) 267 268 def test_get_string(self): 269 # Get string representations of messages 270 key0 = self._box.add(self._template % 0) 271 key1 = self._box.add(_sample_message) 272 self.assertEqual(self._box.get_string(key0), self._template % 0) 273 self.assertEqual(self._box.get_string(key1).split('\n'), 274 _sample_message.split('\n')) 275 276 def test_get_file(self): 277 # Get file representations of messages 278 key0 = self._box.add(self._template % 0) 279 key1 = self._box.add(_sample_message) 280 with self._box.get_file(key0) as file: 281 data0 = file.read() 282 with self._box.get_file(key1) as file: 283 data1 = file.read() 284 self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'), 285 self._template % 0) 286 self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'), 287 _sample_message) 288 289 def test_get_file_can_be_closed_twice(self): 290 # Issue 11700 291 key = self._box.add(_sample_message) 292 f = self._box.get_file(key) 293 f.close() 294 f.close() 295 296 def test_iterkeys(self): 297 # Get keys using iterkeys() 298 self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False) 299 300 def test_keys(self): 301 # Get keys using keys() 302 self._check_iteration(self._box.keys, do_keys=True, do_values=False) 303 304 def test_itervalues(self): 305 # Get values using itervalues() 306 self._check_iteration(self._box.itervalues, do_keys=False, 307 do_values=True) 308 309 def test_iter(self): 310 # Get values using __iter__() 311 self._check_iteration(self._box.__iter__, do_keys=False, 312 do_values=True) 313 314 def test_values(self): 315 # Get values using values() 316 self._check_iteration(self._box.values, do_keys=False, do_values=True) 317 318 def test_iteritems(self): 319 # Get keys and values using iteritems() 320 self._check_iteration(self._box.iteritems, do_keys=True, 321 do_values=True) 322 323 def test_items(self): 324 # Get keys and values using items() 325 self._check_iteration(self._box.items, do_keys=True, do_values=True) 326 327 def _check_iteration(self, method, do_keys, do_values, repetitions=10): 328 for value in method(): 329 self.fail("Not empty") 330 keys, values = [], [] 331 for i in range(repetitions): 332 keys.append(self._box.add(self._template % i)) 333 values.append(self._template % i) 334 if do_keys and not do_values: 335 returned_keys = list(method()) 336 elif do_values and not do_keys: 337 returned_values = list(method()) 338 else: 339 returned_keys, returned_values = [], [] 340 for key, value in method(): 341 returned_keys.append(key) 342 returned_values.append(value) 343 if do_keys: 344 self.assertEqual(len(keys), len(returned_keys)) 345 self.assertEqual(set(keys), set(returned_keys)) 346 if do_values: 347 count = 0 348 for value in returned_values: 349 self.assertEqual(value['from'], 'foo') 350 self.assertLess(int(value.get_payload()), repetitions) 351 count += 1 352 self.assertEqual(len(values), count) 353 354 def test_contains(self): 355 # Check existence of keys using __contains__() 356 self.assertNotIn('foo', self._box) 357 key0 = self._box.add(self._template % 0) 358 self.assertIn(key0, self._box) 359 self.assertNotIn('foo', self._box) 360 key1 = self._box.add(self._template % 1) 361 self.assertIn(key1, self._box) 362 self.assertIn(key0, self._box) 363 self.assertNotIn('foo', self._box) 364 self._box.remove(key0) 365 self.assertNotIn(key0, self._box) 366 self.assertIn(key1, self._box) 367 self.assertNotIn('foo', self._box) 368 self._box.remove(key1) 369 self.assertNotIn(key1, self._box) 370 self.assertNotIn(key0, self._box) 371 self.assertNotIn('foo', self._box) 372 373 def test_len(self, repetitions=10): 374 # Get message count 375 keys = [] 376 for i in range(repetitions): 377 self.assertEqual(len(self._box), i) 378 keys.append(self._box.add(self._template % i)) 379 self.assertEqual(len(self._box), i + 1) 380 for i in range(repetitions): 381 self.assertEqual(len(self._box), repetitions - i) 382 self._box.remove(keys[i]) 383 self.assertEqual(len(self._box), repetitions - i - 1) 384 385 def test_set_item(self): 386 # Modify messages using __setitem__() 387 key0 = self._box.add(self._template % 'original 0') 388 self.assertEqual(self._box.get_string(key0), 389 self._template % 'original 0') 390 key1 = self._box.add(self._template % 'original 1') 391 self.assertEqual(self._box.get_string(key1), 392 self._template % 'original 1') 393 self._box[key0] = self._template % 'changed 0' 394 self.assertEqual(self._box.get_string(key0), 395 self._template % 'changed 0') 396 self._box[key1] = self._template % 'changed 1' 397 self.assertEqual(self._box.get_string(key1), 398 self._template % 'changed 1') 399 self._box[key0] = _sample_message 400 self._check_sample(self._box[key0]) 401 self._box[key1] = self._box[key0] 402 self._check_sample(self._box[key1]) 403 self._box[key0] = self._template % 'original 0' 404 self.assertEqual(self._box.get_string(key0), 405 self._template % 'original 0') 406 self._check_sample(self._box[key1]) 407 self.assertRaises(KeyError, 408 lambda: self._box.__setitem__('foo', 'bar')) 409 self.assertRaises(KeyError, lambda: self._box['foo']) 410 self.assertEqual(len(self._box), 2) 411 412 def test_clear(self, iterations=10): 413 # Remove all messages using clear() 414 keys = [] 415 for i in range(iterations): 416 self._box.add(self._template % i) 417 for i, key in enumerate(keys): 418 self.assertEqual(self._box.get_string(key), self._template % i) 419 self._box.clear() 420 self.assertEqual(len(self._box), 0) 421 for i, key in enumerate(keys): 422 self.assertRaises(KeyError, lambda: self._box.get_string(key)) 423 424 def test_pop(self): 425 # Get and remove a message using pop() 426 key0 = self._box.add(self._template % 0) 427 self.assertIn(key0, self._box) 428 key1 = self._box.add(self._template % 1) 429 self.assertIn(key1, self._box) 430 self.assertEqual(self._box.pop(key0).get_payload(), '0\n') 431 self.assertNotIn(key0, self._box) 432 self.assertIn(key1, self._box) 433 key2 = self._box.add(self._template % 2) 434 self.assertIn(key2, self._box) 435 self.assertEqual(self._box.pop(key2).get_payload(), '2\n') 436 self.assertNotIn(key2, self._box) 437 self.assertIn(key1, self._box) 438 self.assertEqual(self._box.pop(key1).get_payload(), '1\n') 439 self.assertNotIn(key1, self._box) 440 self.assertEqual(len(self._box), 0) 441 442 def test_popitem(self, iterations=10): 443 # Get and remove an arbitrary (key, message) using popitem() 444 keys = [] 445 for i in range(10): 446 keys.append(self._box.add(self._template % i)) 447 seen = [] 448 for i in range(10): 449 key, msg = self._box.popitem() 450 self.assertIn(key, keys) 451 self.assertNotIn(key, seen) 452 seen.append(key) 453 self.assertEqual(int(msg.get_payload()), keys.index(key)) 454 self.assertEqual(len(self._box), 0) 455 for key in keys: 456 self.assertRaises(KeyError, lambda: self._box[key]) 457 458 def test_update(self): 459 # Modify multiple messages using update() 460 key0 = self._box.add(self._template % 'original 0') 461 key1 = self._box.add(self._template % 'original 1') 462 key2 = self._box.add(self._template % 'original 2') 463 self._box.update({key0: self._template % 'changed 0', 464 key2: _sample_message}) 465 self.assertEqual(len(self._box), 3) 466 self.assertEqual(self._box.get_string(key0), 467 self._template % 'changed 0') 468 self.assertEqual(self._box.get_string(key1), 469 self._template % 'original 1') 470 self._check_sample(self._box[key2]) 471 self._box.update([(key2, self._template % 'changed 2'), 472 (key1, self._template % 'changed 1'), 473 (key0, self._template % 'original 0')]) 474 self.assertEqual(len(self._box), 3) 475 self.assertEqual(self._box.get_string(key0), 476 self._template % 'original 0') 477 self.assertEqual(self._box.get_string(key1), 478 self._template % 'changed 1') 479 self.assertEqual(self._box.get_string(key2), 480 self._template % 'changed 2') 481 self.assertRaises(KeyError, 482 lambda: self._box.update({'foo': 'bar', 483 key0: self._template % "changed 0"})) 484 self.assertEqual(len(self._box), 3) 485 self.assertEqual(self._box.get_string(key0), 486 self._template % "changed 0") 487 self.assertEqual(self._box.get_string(key1), 488 self._template % "changed 1") 489 self.assertEqual(self._box.get_string(key2), 490 self._template % "changed 2") 491 492 def test_flush(self): 493 # Write changes to disk 494 self._test_flush_or_close(self._box.flush, True) 495 496 def test_popitem_and_flush_twice(self): 497 # See #15036. 498 self._box.add(self._template % 0) 499 self._box.add(self._template % 1) 500 self._box.flush() 501 502 self._box.popitem() 503 self._box.flush() 504 self._box.popitem() 505 self._box.flush() 506 507 def test_lock_unlock(self): 508 # Lock and unlock the mailbox 509 self.assertFalse(os.path.exists(self._get_lock_path())) 510 self._box.lock() 511 self.assertTrue(os.path.exists(self._get_lock_path())) 512 self._box.unlock() 513 self.assertFalse(os.path.exists(self._get_lock_path())) 514 515 def test_close(self): 516 # Close mailbox and flush changes to disk 517 self._test_flush_or_close(self._box.close, False) 518 519 def _test_flush_or_close(self, method, should_call_close): 520 contents = [self._template % i for i in range(3)] 521 self._box.add(contents[0]) 522 self._box.add(contents[1]) 523 self._box.add(contents[2]) 524 oldbox = self._box 525 method() 526 if should_call_close: 527 self._box.close() 528 self._box = self._factory(self._path) 529 keys = self._box.keys() 530 self.assertEqual(len(keys), 3) 531 for key in keys: 532 self.assertIn(self._box.get_string(key), contents) 533 oldbox.close() 534 535 def test_dump_message(self): 536 # Write message representations to disk 537 for input in (email.message_from_string(_sample_message), 538 _sample_message, io.BytesIO(_bytes_sample_message)): 539 output = io.BytesIO() 540 self._box._dump_message(input, output) 541 self.assertEqual(output.getvalue(), 542 _bytes_sample_message.replace(b'\n', os.linesep.encode())) 543 output = io.BytesIO() 544 self.assertRaises(TypeError, 545 lambda: self._box._dump_message(None, output)) 546 547 def _get_lock_path(self): 548 # Return the path of the dot lock file. May be overridden. 549 return self._path + '.lock' 550 551 552class TestMailboxSuperclass(TestBase, unittest.TestCase): 553 554 def test_notimplemented(self): 555 # Test that all Mailbox methods raise NotImplementedException. 556 box = mailbox.Mailbox('path') 557 self.assertRaises(NotImplementedError, lambda: box.add('')) 558 self.assertRaises(NotImplementedError, lambda: box.remove('')) 559 self.assertRaises(NotImplementedError, lambda: box.__delitem__('')) 560 self.assertRaises(NotImplementedError, lambda: box.discard('')) 561 self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) 562 self.assertRaises(NotImplementedError, lambda: box.iterkeys()) 563 self.assertRaises(NotImplementedError, lambda: box.keys()) 564 self.assertRaises(NotImplementedError, lambda: box.itervalues().__next__()) 565 self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__()) 566 self.assertRaises(NotImplementedError, lambda: box.values()) 567 self.assertRaises(NotImplementedError, lambda: box.iteritems().__next__()) 568 self.assertRaises(NotImplementedError, lambda: box.items()) 569 self.assertRaises(NotImplementedError, lambda: box.get('')) 570 self.assertRaises(NotImplementedError, lambda: box.__getitem__('')) 571 self.assertRaises(NotImplementedError, lambda: box.get_message('')) 572 self.assertRaises(NotImplementedError, lambda: box.get_string('')) 573 self.assertRaises(NotImplementedError, lambda: box.get_bytes('')) 574 self.assertRaises(NotImplementedError, lambda: box.get_file('')) 575 self.assertRaises(NotImplementedError, lambda: '' in box) 576 self.assertRaises(NotImplementedError, lambda: box.__contains__('')) 577 self.assertRaises(NotImplementedError, lambda: box.__len__()) 578 self.assertRaises(NotImplementedError, lambda: box.clear()) 579 self.assertRaises(NotImplementedError, lambda: box.pop('')) 580 self.assertRaises(NotImplementedError, lambda: box.popitem()) 581 self.assertRaises(NotImplementedError, lambda: box.update((('', ''),))) 582 self.assertRaises(NotImplementedError, lambda: box.flush()) 583 self.assertRaises(NotImplementedError, lambda: box.lock()) 584 self.assertRaises(NotImplementedError, lambda: box.unlock()) 585 self.assertRaises(NotImplementedError, lambda: box.close()) 586 587 588class TestMaildir(TestMailbox, unittest.TestCase): 589 590 _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) 591 592 def setUp(self): 593 TestMailbox.setUp(self) 594 if (os.name == 'nt') or (sys.platform == 'cygwin'): 595 self._box.colon = '!' 596 597 def assertMailboxEmpty(self): 598 self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), []) 599 600 def test_add_MM(self): 601 # Add a MaildirMessage instance 602 msg = mailbox.MaildirMessage(self._template % 0) 603 msg.set_subdir('cur') 604 msg.set_info('foo') 605 key = self._box.add(msg) 606 self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' % 607 (key, self._box.colon)))) 608 609 def test_get_MM(self): 610 # Get a MaildirMessage instance 611 msg = mailbox.MaildirMessage(self._template % 0) 612 msg.set_subdir('cur') 613 msg.set_flags('RF') 614 key = self._box.add(msg) 615 msg_returned = self._box.get_message(key) 616 self.assertIsInstance(msg_returned, mailbox.MaildirMessage) 617 self.assertEqual(msg_returned.get_subdir(), 'cur') 618 self.assertEqual(msg_returned.get_flags(), 'FR') 619 620 def test_set_MM(self): 621 # Set with a MaildirMessage instance 622 msg0 = mailbox.MaildirMessage(self._template % 0) 623 msg0.set_flags('TP') 624 key = self._box.add(msg0) 625 msg_returned = self._box.get_message(key) 626 self.assertEqual(msg_returned.get_subdir(), 'new') 627 self.assertEqual(msg_returned.get_flags(), 'PT') 628 msg1 = mailbox.MaildirMessage(self._template % 1) 629 self._box[key] = msg1 630 msg_returned = self._box.get_message(key) 631 self.assertEqual(msg_returned.get_subdir(), 'new') 632 self.assertEqual(msg_returned.get_flags(), '') 633 self.assertEqual(msg_returned.get_payload(), '1\n') 634 msg2 = mailbox.MaildirMessage(self._template % 2) 635 msg2.set_info('2,S') 636 self._box[key] = msg2 637 self._box[key] = self._template % 3 638 msg_returned = self._box.get_message(key) 639 self.assertEqual(msg_returned.get_subdir(), 'new') 640 self.assertEqual(msg_returned.get_flags(), 'S') 641 self.assertEqual(msg_returned.get_payload(), '3\n') 642 643 def test_consistent_factory(self): 644 # Add a message. 645 msg = mailbox.MaildirMessage(self._template % 0) 646 msg.set_subdir('cur') 647 msg.set_flags('RF') 648 key = self._box.add(msg) 649 650 # Create new mailbox with 651 class FakeMessage(mailbox.MaildirMessage): 652 pass 653 box = mailbox.Maildir(self._path, factory=FakeMessage) 654 box.colon = self._box.colon 655 msg2 = box.get_message(key) 656 self.assertIsInstance(msg2, FakeMessage) 657 658 def test_initialize_new(self): 659 # Initialize a non-existent mailbox 660 self.tearDown() 661 self._box = mailbox.Maildir(self._path) 662 self._check_basics() 663 self._delete_recursively(self._path) 664 self._box = self._factory(self._path, factory=None) 665 self._check_basics() 666 667 def test_initialize_existing(self): 668 # Initialize an existing mailbox 669 self.tearDown() 670 for subdir in '', 'tmp', 'new', 'cur': 671 os.mkdir(os.path.normpath(os.path.join(self._path, subdir))) 672 self._box = mailbox.Maildir(self._path) 673 self._check_basics() 674 675 def _check_basics(self, factory=None): 676 # (Used by test_open_new() and test_open_existing().) 677 self.assertEqual(self._box._path, os.path.abspath(self._path)) 678 self.assertEqual(self._box._factory, factory) 679 for subdir in '', 'tmp', 'new', 'cur': 680 path = os.path.join(self._path, subdir) 681 mode = os.stat(path)[stat.ST_MODE] 682 self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path) 683 684 def test_list_folders(self): 685 # List folders 686 self._box.add_folder('one') 687 self._box.add_folder('two') 688 self._box.add_folder('three') 689 self.assertEqual(len(self._box.list_folders()), 3) 690 self.assertEqual(set(self._box.list_folders()), 691 set(('one', 'two', 'three'))) 692 693 def test_get_folder(self): 694 # Open folders 695 self._box.add_folder('foo.bar') 696 folder0 = self._box.get_folder('foo.bar') 697 folder0.add(self._template % 'bar') 698 self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar'))) 699 folder1 = self._box.get_folder('foo.bar') 700 self.assertEqual(folder1.get_string(folder1.keys()[0]), 701 self._template % 'bar') 702 703 def test_add_and_remove_folders(self): 704 # Delete folders 705 self._box.add_folder('one') 706 self._box.add_folder('two') 707 self.assertEqual(len(self._box.list_folders()), 2) 708 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 709 self._box.remove_folder('one') 710 self.assertEqual(len(self._box.list_folders()), 1) 711 self.assertEqual(set(self._box.list_folders()), set(('two',))) 712 self._box.add_folder('three') 713 self.assertEqual(len(self._box.list_folders()), 2) 714 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 715 self._box.remove_folder('three') 716 self.assertEqual(len(self._box.list_folders()), 1) 717 self.assertEqual(set(self._box.list_folders()), set(('two',))) 718 self._box.remove_folder('two') 719 self.assertEqual(len(self._box.list_folders()), 0) 720 self.assertEqual(self._box.list_folders(), []) 721 722 def test_clean(self): 723 # Remove old files from 'tmp' 724 foo_path = os.path.join(self._path, 'tmp', 'foo') 725 bar_path = os.path.join(self._path, 'tmp', 'bar') 726 with open(foo_path, 'w') as f: 727 f.write("@") 728 with open(bar_path, 'w') as f: 729 f.write("@") 730 self._box.clean() 731 self.assertTrue(os.path.exists(foo_path)) 732 self.assertTrue(os.path.exists(bar_path)) 733 foo_stat = os.stat(foo_path) 734 os.utime(foo_path, (time.time() - 129600 - 2, 735 foo_stat.st_mtime)) 736 self._box.clean() 737 self.assertFalse(os.path.exists(foo_path)) 738 self.assertTrue(os.path.exists(bar_path)) 739 740 def test_create_tmp(self, repetitions=10): 741 # Create files in tmp directory 742 hostname = socket.gethostname() 743 if '/' in hostname: 744 hostname = hostname.replace('/', r'\057') 745 if ':' in hostname: 746 hostname = hostname.replace(':', r'\072') 747 pid = os.getpid() 748 pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)" 749 r"Q(?P<Q>\d+)\.(?P<host>[^:/]*)") 750 previous_groups = None 751 for x in range(repetitions): 752 tmp_file = self._box._create_tmp() 753 head, tail = os.path.split(tmp_file.name) 754 self.assertEqual(head, os.path.abspath(os.path.join(self._path, 755 "tmp")), 756 "File in wrong location: '%s'" % head) 757 match = pattern.match(tail) 758 self.assertIsNotNone(match, "Invalid file name: '%s'" % tail) 759 groups = match.groups() 760 if previous_groups is not None: 761 self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]), 762 "Non-monotonic seconds: '%s' before '%s'" % 763 (previous_groups[0], groups[0])) 764 if int(groups[0]) == int(previous_groups[0]): 765 self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]), 766 "Non-monotonic milliseconds: '%s' before '%s'" % 767 (previous_groups[1], groups[1])) 768 self.assertEqual(int(groups[2]), pid, 769 "Process ID mismatch: '%s' should be '%s'" % 770 (groups[2], pid)) 771 self.assertEqual(int(groups[3]), int(previous_groups[3]) + 1, 772 "Non-sequential counter: '%s' before '%s'" % 773 (previous_groups[3], groups[3])) 774 self.assertEqual(groups[4], hostname, 775 "Host name mismatch: '%s' should be '%s'" % 776 (groups[4], hostname)) 777 previous_groups = groups 778 tmp_file.write(_bytes_sample_message) 779 tmp_file.seek(0) 780 self.assertEqual(tmp_file.read(), _bytes_sample_message) 781 tmp_file.close() 782 file_count = len(os.listdir(os.path.join(self._path, "tmp"))) 783 self.assertEqual(file_count, repetitions, 784 "Wrong file count: '%s' should be '%s'" % 785 (file_count, repetitions)) 786 787 def test_refresh(self): 788 # Update the table of contents 789 self.assertEqual(self._box._toc, {}) 790 key0 = self._box.add(self._template % 0) 791 key1 = self._box.add(self._template % 1) 792 self.assertEqual(self._box._toc, {}) 793 self._box._refresh() 794 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 795 key1: os.path.join('new', key1)}) 796 key2 = self._box.add(self._template % 2) 797 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 798 key1: os.path.join('new', key1)}) 799 self._box._refresh() 800 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 801 key1: os.path.join('new', key1), 802 key2: os.path.join('new', key2)}) 803 804 def test_refresh_after_safety_period(self): 805 # Issue #13254: Call _refresh after the "file system safety 806 # period" of 2 seconds has passed; _toc should still be 807 # updated because this is the first call to _refresh. 808 key0 = self._box.add(self._template % 0) 809 key1 = self._box.add(self._template % 1) 810 811 self._box = self._factory(self._path) 812 self.assertEqual(self._box._toc, {}) 813 814 # Emulate sleeping. Instead of sleeping for 2 seconds, use the 815 # skew factor to make _refresh think that the filesystem 816 # safety period has passed and re-reading the _toc is only 817 # required if mtimes differ. 818 self._box._skewfactor = -3 819 820 self._box._refresh() 821 self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1])) 822 823 def test_lookup(self): 824 # Look up message subpaths in the TOC 825 self.assertRaises(KeyError, lambda: self._box._lookup('foo')) 826 key0 = self._box.add(self._template % 0) 827 self.assertEqual(self._box._lookup(key0), os.path.join('new', key0)) 828 os.remove(os.path.join(self._path, 'new', key0)) 829 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)}) 830 # Be sure that the TOC is read back from disk (see issue #6896 831 # about bad mtime behaviour on some systems). 832 self._box.flush() 833 self.assertRaises(KeyError, lambda: self._box._lookup(key0)) 834 self.assertEqual(self._box._toc, {}) 835 836 def test_lock_unlock(self): 837 # Lock and unlock the mailbox. For Maildir, this does nothing. 838 self._box.lock() 839 self._box.unlock() 840 841 def test_folder (self): 842 # Test for bug #1569790: verify that folders returned by .get_folder() 843 # use the same factory function. 844 def dummy_factory (s): 845 return None 846 box = self._factory(self._path, factory=dummy_factory) 847 folder = box.add_folder('folder1') 848 self.assertIs(folder._factory, dummy_factory) 849 850 folder1_alias = box.get_folder('folder1') 851 self.assertIs(folder1_alias._factory, dummy_factory) 852 853 def test_directory_in_folder (self): 854 # Test that mailboxes still work if there's a stray extra directory 855 # in a folder. 856 for i in range(10): 857 self._box.add(mailbox.Message(_sample_message)) 858 859 # Create a stray directory 860 os.mkdir(os.path.join(self._path, 'cur', 'stray-dir')) 861 862 # Check that looping still works with the directory present. 863 for msg in self._box: 864 pass 865 866 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 867 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 868 def test_file_permissions(self): 869 # Verify that message files are created without execute permissions 870 msg = mailbox.MaildirMessage(self._template % 0) 871 orig_umask = os.umask(0) 872 try: 873 key = self._box.add(msg) 874 finally: 875 os.umask(orig_umask) 876 path = os.path.join(self._path, self._box._lookup(key)) 877 mode = os.stat(path).st_mode 878 self.assertFalse(mode & 0o111) 879 880 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 881 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 882 def test_folder_file_perms(self): 883 # From bug #3228, we want to verify that the file created inside a Maildir 884 # subfolder isn't marked as executable. 885 orig_umask = os.umask(0) 886 try: 887 subfolder = self._box.add_folder('subfolder') 888 finally: 889 os.umask(orig_umask) 890 891 path = os.path.join(subfolder._path, 'maildirfolder') 892 st = os.stat(path) 893 perms = st.st_mode 894 self.assertFalse((perms & 0o111)) # Execute bits should all be off. 895 896 def test_reread(self): 897 # Do an initial unconditional refresh 898 self._box._refresh() 899 900 # Put the last modified times more than two seconds into the past 901 # (because mtime may have a two second granularity) 902 for subdir in ('cur', 'new'): 903 os.utime(os.path.join(self._box._path, subdir), 904 (time.time()-5,)*2) 905 906 # Because mtime has a two second granularity in worst case (FAT), a 907 # refresh is done unconditionally if called for within 908 # two-second-plus-a-bit of the last one, just in case the mbox has 909 # changed; so now we have to wait for that interval to expire. 910 # 911 # Because this is a test, emulate sleeping. Instead of 912 # sleeping for 2 seconds, use the skew factor to make _refresh 913 # think that 2 seconds have passed and re-reading the _toc is 914 # only required if mtimes differ. 915 self._box._skewfactor = -3 916 917 # Re-reading causes the ._toc attribute to be assigned a new dictionary 918 # object, so we'll check that the ._toc attribute isn't a different 919 # object. 920 orig_toc = self._box._toc 921 def refreshed(): 922 return self._box._toc is not orig_toc 923 924 self._box._refresh() 925 self.assertFalse(refreshed()) 926 927 # Now, write something into cur and remove it. This changes 928 # the mtime and should cause a re-read. Note that "sleep 929 # emulation" is still in effect, as skewfactor is -3. 930 filename = os.path.join(self._path, 'cur', 'stray-file') 931 support.create_empty_file(filename) 932 os.unlink(filename) 933 self._box._refresh() 934 self.assertTrue(refreshed()) 935 936 937class _TestSingleFile(TestMailbox): 938 '''Common tests for single-file mailboxes''' 939 940 def test_add_doesnt_rewrite(self): 941 # When only adding messages, flush() should not rewrite the 942 # mailbox file. See issue #9559. 943 944 # Inode number changes if the contents are written to another 945 # file which is then renamed over the original file. So we 946 # must check that the inode number doesn't change. 947 inode_before = os.stat(self._path).st_ino 948 949 self._box.add(self._template % 0) 950 self._box.flush() 951 952 inode_after = os.stat(self._path).st_ino 953 self.assertEqual(inode_before, inode_after) 954 955 # Make sure the message was really added 956 self._box.close() 957 self._box = self._factory(self._path) 958 self.assertEqual(len(self._box), 1) 959 960 def test_permissions_after_flush(self): 961 # See issue #5346 962 963 # Make the mailbox world writable. It's unlikely that the new 964 # mailbox file would have these permissions after flush(), 965 # because umask usually prevents it. 966 mode = os.stat(self._path).st_mode | 0o666 967 os.chmod(self._path, mode) 968 969 self._box.add(self._template % 0) 970 i = self._box.add(self._template % 1) 971 # Need to remove one message to make flush() create a new file 972 self._box.remove(i) 973 self._box.flush() 974 975 self.assertEqual(os.stat(self._path).st_mode, mode) 976 977 978class _TestMboxMMDF(_TestSingleFile): 979 980 def tearDown(self): 981 super().tearDown() 982 self._box.close() 983 self._delete_recursively(self._path) 984 for lock_remnant in glob.glob(self._path + '.*'): 985 support.unlink(lock_remnant) 986 987 def assertMailboxEmpty(self): 988 with open(self._path) as f: 989 self.assertEqual(f.readlines(), []) 990 991 def test_add_from_string(self): 992 # Add a string starting with 'From ' to the mailbox 993 key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n') 994 self.assertEqual(self._box[key].get_from(), 'foo@bar blah') 995 self.assertEqual(self._box[key].get_payload(), '0\n') 996 997 def test_add_from_bytes(self): 998 # Add a byte string starting with 'From ' to the mailbox 999 key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0\n') 1000 self.assertEqual(self._box[key].get_from(), 'foo@bar blah') 1001 self.assertEqual(self._box[key].get_payload(), '0\n') 1002 1003 def test_add_mbox_or_mmdf_message(self): 1004 # Add an mboxMessage or MMDFMessage 1005 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1006 msg = class_('From foo@bar blah\nFrom: foo\n\n0\n') 1007 key = self._box.add(msg) 1008 1009 def test_open_close_open(self): 1010 # Open and inspect previously-created mailbox 1011 values = [self._template % i for i in range(3)] 1012 for value in values: 1013 self._box.add(value) 1014 self._box.close() 1015 mtime = os.path.getmtime(self._path) 1016 self._box = self._factory(self._path) 1017 self.assertEqual(len(self._box), 3) 1018 for key in self._box.iterkeys(): 1019 self.assertIn(self._box.get_string(key), values) 1020 self._box.close() 1021 self.assertEqual(mtime, os.path.getmtime(self._path)) 1022 1023 def test_add_and_close(self): 1024 # Verifying that closing a mailbox doesn't change added items 1025 self._box.add(_sample_message) 1026 for i in range(3): 1027 self._box.add(self._template % i) 1028 self._box.add(_sample_message) 1029 self._box._file.flush() 1030 self._box._file.seek(0) 1031 contents = self._box._file.read() 1032 self._box.close() 1033 with open(self._path, 'rb') as f: 1034 self.assertEqual(contents, f.read()) 1035 self._box = self._factory(self._path) 1036 1037 @unittest.skipUnless(hasattr(os, 'fork'), "Test needs fork().") 1038 @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().") 1039 def test_lock_conflict(self): 1040 # Fork off a child process that will lock the mailbox temporarily, 1041 # unlock it and exit. 1042 c, p = socket.socketpair() 1043 self.addCleanup(c.close) 1044 self.addCleanup(p.close) 1045 1046 pid = os.fork() 1047 if pid == 0: 1048 # child 1049 try: 1050 # lock the mailbox, and signal the parent it can proceed 1051 self._box.lock() 1052 c.send(b'c') 1053 1054 # wait until the parent is done, and unlock the mailbox 1055 c.recv(1) 1056 self._box.unlock() 1057 finally: 1058 os._exit(0) 1059 1060 # In the parent, wait until the child signals it locked the mailbox. 1061 p.recv(1) 1062 try: 1063 self.assertRaises(mailbox.ExternalClashError, 1064 self._box.lock) 1065 finally: 1066 # Signal the child it can now release the lock and exit. 1067 p.send(b'p') 1068 # Wait for child to exit. Locking should now succeed. 1069 exited_pid, status = os.waitpid(pid, 0) 1070 1071 self._box.lock() 1072 self._box.unlock() 1073 1074 def test_relock(self): 1075 # Test case for bug #1575506: the mailbox class was locking the 1076 # wrong file object in its flush() method. 1077 msg = "Subject: sub\n\nbody\n" 1078 key1 = self._box.add(msg) 1079 self._box.flush() 1080 self._box.close() 1081 1082 self._box = self._factory(self._path) 1083 self._box.lock() 1084 key2 = self._box.add(msg) 1085 self._box.flush() 1086 self.assertTrue(self._box._locked) 1087 self._box.close() 1088 1089 1090class TestMbox(_TestMboxMMDF, unittest.TestCase): 1091 1092 _factory = lambda self, path, factory=None: mailbox.mbox(path, factory) 1093 1094 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 1095 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 1096 def test_file_perms(self): 1097 # From bug #3228, we want to verify that the mailbox file isn't executable, 1098 # even if the umask is set to something that would leave executable bits set. 1099 # We only run this test on platforms that support umask. 1100 try: 1101 old_umask = os.umask(0o077) 1102 self._box.close() 1103 os.unlink(self._path) 1104 self._box = mailbox.mbox(self._path, create=True) 1105 self._box.add('') 1106 self._box.close() 1107 finally: 1108 os.umask(old_umask) 1109 1110 st = os.stat(self._path) 1111 perms = st.st_mode 1112 self.assertFalse((perms & 0o111)) # Execute bits should all be off. 1113 1114 def test_terminating_newline(self): 1115 message = email.message.Message() 1116 message['From'] = 'john@example.com' 1117 message.set_payload('No newline at the end') 1118 i = self._box.add(message) 1119 1120 # A newline should have been appended to the payload 1121 message = self._box.get(i) 1122 self.assertEqual(message.get_payload(), 'No newline at the end\n') 1123 1124 def test_message_separator(self): 1125 # Check there's always a single blank line after each message 1126 self._box.add('From: foo\n\n0') # No newline at the end 1127 with open(self._path) as f: 1128 data = f.read() 1129 self.assertEqual(data[-3:], '0\n\n') 1130 1131 self._box.add('From: foo\n\n0\n') # Newline at the end 1132 with open(self._path) as f: 1133 data = f.read() 1134 self.assertEqual(data[-3:], '0\n\n') 1135 1136 1137class TestMMDF(_TestMboxMMDF, unittest.TestCase): 1138 1139 _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory) 1140 1141 1142class TestMH(TestMailbox, unittest.TestCase): 1143 1144 _factory = lambda self, path, factory=None: mailbox.MH(path, factory) 1145 1146 def assertMailboxEmpty(self): 1147 self.assertEqual(os.listdir(self._path), ['.mh_sequences']) 1148 1149 def test_list_folders(self): 1150 # List folders 1151 self._box.add_folder('one') 1152 self._box.add_folder('two') 1153 self._box.add_folder('three') 1154 self.assertEqual(len(self._box.list_folders()), 3) 1155 self.assertEqual(set(self._box.list_folders()), 1156 set(('one', 'two', 'three'))) 1157 1158 def test_get_folder(self): 1159 # Open folders 1160 def dummy_factory (s): 1161 return None 1162 self._box = self._factory(self._path, dummy_factory) 1163 1164 new_folder = self._box.add_folder('foo.bar') 1165 folder0 = self._box.get_folder('foo.bar') 1166 folder0.add(self._template % 'bar') 1167 self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar'))) 1168 folder1 = self._box.get_folder('foo.bar') 1169 self.assertEqual(folder1.get_string(folder1.keys()[0]), 1170 self._template % 'bar') 1171 1172 # Test for bug #1569790: verify that folders returned by .get_folder() 1173 # use the same factory function. 1174 self.assertIs(new_folder._factory, self._box._factory) 1175 self.assertIs(folder0._factory, self._box._factory) 1176 1177 def test_add_and_remove_folders(self): 1178 # Delete folders 1179 self._box.add_folder('one') 1180 self._box.add_folder('two') 1181 self.assertEqual(len(self._box.list_folders()), 2) 1182 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 1183 self._box.remove_folder('one') 1184 self.assertEqual(len(self._box.list_folders()), 1) 1185 self.assertEqual(set(self._box.list_folders()), set(('two',))) 1186 self._box.add_folder('three') 1187 self.assertEqual(len(self._box.list_folders()), 2) 1188 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 1189 self._box.remove_folder('three') 1190 self.assertEqual(len(self._box.list_folders()), 1) 1191 self.assertEqual(set(self._box.list_folders()), set(('two',))) 1192 self._box.remove_folder('two') 1193 self.assertEqual(len(self._box.list_folders()), 0) 1194 self.assertEqual(self._box.list_folders(), []) 1195 1196 def test_sequences(self): 1197 # Get and set sequences 1198 self.assertEqual(self._box.get_sequences(), {}) 1199 msg0 = mailbox.MHMessage(self._template % 0) 1200 msg0.add_sequence('foo') 1201 key0 = self._box.add(msg0) 1202 self.assertEqual(self._box.get_sequences(), {'foo':[key0]}) 1203 msg1 = mailbox.MHMessage(self._template % 1) 1204 msg1.set_sequences(['bar', 'replied', 'foo']) 1205 key1 = self._box.add(msg1) 1206 self.assertEqual(self._box.get_sequences(), 1207 {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]}) 1208 msg0.set_sequences(['flagged']) 1209 self._box[key0] = msg0 1210 self.assertEqual(self._box.get_sequences(), 1211 {'foo':[key1], 'bar':[key1], 'replied':[key1], 1212 'flagged':[key0]}) 1213 self._box.remove(key1) 1214 self.assertEqual(self._box.get_sequences(), {'flagged':[key0]}) 1215 1216 def test_issue2625(self): 1217 msg0 = mailbox.MHMessage(self._template % 0) 1218 msg0.add_sequence('foo') 1219 key0 = self._box.add(msg0) 1220 refmsg0 = self._box.get_message(key0) 1221 1222 def test_issue7627(self): 1223 msg0 = mailbox.MHMessage(self._template % 0) 1224 key0 = self._box.add(msg0) 1225 self._box.lock() 1226 self._box.remove(key0) 1227 self._box.unlock() 1228 1229 def test_pack(self): 1230 # Pack the contents of the mailbox 1231 msg0 = mailbox.MHMessage(self._template % 0) 1232 msg1 = mailbox.MHMessage(self._template % 1) 1233 msg2 = mailbox.MHMessage(self._template % 2) 1234 msg3 = mailbox.MHMessage(self._template % 3) 1235 msg0.set_sequences(['foo', 'unseen']) 1236 msg1.set_sequences(['foo']) 1237 msg2.set_sequences(['foo', 'flagged']) 1238 msg3.set_sequences(['foo', 'bar', 'replied']) 1239 key0 = self._box.add(msg0) 1240 key1 = self._box.add(msg1) 1241 key2 = self._box.add(msg2) 1242 key3 = self._box.add(msg3) 1243 self.assertEqual(self._box.get_sequences(), 1244 {'foo':[key0,key1,key2,key3], 'unseen':[key0], 1245 'flagged':[key2], 'bar':[key3], 'replied':[key3]}) 1246 self._box.remove(key2) 1247 self.assertEqual(self._box.get_sequences(), 1248 {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3], 1249 'replied':[key3]}) 1250 self._box.pack() 1251 self.assertEqual(self._box.keys(), [1, 2, 3]) 1252 key0 = key0 1253 key1 = key0 + 1 1254 key2 = key1 + 1 1255 self.assertEqual(self._box.get_sequences(), 1256 {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]}) 1257 1258 # Test case for packing while holding the mailbox locked. 1259 key0 = self._box.add(msg1) 1260 key1 = self._box.add(msg1) 1261 key2 = self._box.add(msg1) 1262 key3 = self._box.add(msg1) 1263 1264 self._box.remove(key0) 1265 self._box.remove(key2) 1266 self._box.lock() 1267 self._box.pack() 1268 self._box.unlock() 1269 self.assertEqual(self._box.get_sequences(), 1270 {'foo':[1, 2, 3, 4, 5], 1271 'unseen':[1], 'bar':[3], 'replied':[3]}) 1272 1273 def _get_lock_path(self): 1274 return os.path.join(self._path, '.mh_sequences.lock') 1275 1276 1277class TestBabyl(_TestSingleFile, unittest.TestCase): 1278 1279 _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory) 1280 1281 def assertMailboxEmpty(self): 1282 with open(self._path) as f: 1283 self.assertEqual(f.readlines(), []) 1284 1285 def tearDown(self): 1286 super().tearDown() 1287 self._box.close() 1288 self._delete_recursively(self._path) 1289 for lock_remnant in glob.glob(self._path + '.*'): 1290 support.unlink(lock_remnant) 1291 1292 def test_labels(self): 1293 # Get labels from the mailbox 1294 self.assertEqual(self._box.get_labels(), []) 1295 msg0 = mailbox.BabylMessage(self._template % 0) 1296 msg0.add_label('foo') 1297 key0 = self._box.add(msg0) 1298 self.assertEqual(self._box.get_labels(), ['foo']) 1299 msg1 = mailbox.BabylMessage(self._template % 1) 1300 msg1.set_labels(['bar', 'answered', 'foo']) 1301 key1 = self._box.add(msg1) 1302 self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar'])) 1303 msg0.set_labels(['blah', 'filed']) 1304 self._box[key0] = msg0 1305 self.assertEqual(set(self._box.get_labels()), 1306 set(['foo', 'bar', 'blah'])) 1307 self._box.remove(key1) 1308 self.assertEqual(set(self._box.get_labels()), set(['blah'])) 1309 1310 1311class FakeFileLikeObject: 1312 1313 def __init__(self): 1314 self.closed = False 1315 1316 def close(self): 1317 self.closed = True 1318 1319 1320class FakeMailBox(mailbox.Mailbox): 1321 1322 def __init__(self): 1323 mailbox.Mailbox.__init__(self, '', lambda file: None) 1324 self.files = [FakeFileLikeObject() for i in range(10)] 1325 1326 def get_file(self, key): 1327 return self.files[key] 1328 1329 1330class TestFakeMailBox(unittest.TestCase): 1331 1332 def test_closing_fd(self): 1333 box = FakeMailBox() 1334 for i in range(10): 1335 self.assertFalse(box.files[i].closed) 1336 for i in range(10): 1337 box[i] 1338 for i in range(10): 1339 self.assertTrue(box.files[i].closed) 1340 1341 1342class TestMessage(TestBase, unittest.TestCase): 1343 1344 _factory = mailbox.Message # Overridden by subclasses to reuse tests 1345 1346 def setUp(self): 1347 self._path = support.TESTFN 1348 1349 def tearDown(self): 1350 self._delete_recursively(self._path) 1351 1352 def test_initialize_with_eMM(self): 1353 # Initialize based on email.message.Message instance 1354 eMM = email.message_from_string(_sample_message) 1355 msg = self._factory(eMM) 1356 self._post_initialize_hook(msg) 1357 self._check_sample(msg) 1358 1359 def test_initialize_with_string(self): 1360 # Initialize based on string 1361 msg = self._factory(_sample_message) 1362 self._post_initialize_hook(msg) 1363 self._check_sample(msg) 1364 1365 def test_initialize_with_file(self): 1366 # Initialize based on contents of file 1367 with open(self._path, 'w+') as f: 1368 f.write(_sample_message) 1369 f.seek(0) 1370 msg = self._factory(f) 1371 self._post_initialize_hook(msg) 1372 self._check_sample(msg) 1373 1374 def test_initialize_with_binary_file(self): 1375 # Initialize based on contents of binary file 1376 with open(self._path, 'wb+') as f: 1377 f.write(_bytes_sample_message) 1378 f.seek(0) 1379 msg = self._factory(f) 1380 self._post_initialize_hook(msg) 1381 self._check_sample(msg) 1382 1383 def test_initialize_with_nothing(self): 1384 # Initialize without arguments 1385 msg = self._factory() 1386 self._post_initialize_hook(msg) 1387 self.assertIsInstance(msg, email.message.Message) 1388 self.assertIsInstance(msg, mailbox.Message) 1389 self.assertIsInstance(msg, self._factory) 1390 self.assertEqual(msg.keys(), []) 1391 self.assertFalse(msg.is_multipart()) 1392 self.assertIsNone(msg.get_payload()) 1393 1394 def test_initialize_incorrectly(self): 1395 # Initialize with invalid argument 1396 self.assertRaises(TypeError, lambda: self._factory(object())) 1397 1398 def test_all_eMM_attribues_exist(self): 1399 # Issue 12537 1400 eMM = email.message_from_string(_sample_message) 1401 msg = self._factory(_sample_message) 1402 for attr in eMM.__dict__: 1403 self.assertIn(attr, msg.__dict__, 1404 '{} attribute does not exist'.format(attr)) 1405 1406 def test_become_message(self): 1407 # Take on the state of another message 1408 eMM = email.message_from_string(_sample_message) 1409 msg = self._factory() 1410 msg._become_message(eMM) 1411 self._check_sample(msg) 1412 1413 def test_explain_to(self): 1414 # Copy self's format-specific data to other message formats. 1415 # This test is superficial; better ones are in TestMessageConversion. 1416 msg = self._factory() 1417 for class_ in self.all_mailbox_types: 1418 other_msg = class_() 1419 msg._explain_to(other_msg) 1420 other_msg = email.message.Message() 1421 self.assertRaises(TypeError, lambda: msg._explain_to(other_msg)) 1422 1423 def _post_initialize_hook(self, msg): 1424 # Overridden by subclasses to check extra things after initialization 1425 pass 1426 1427 1428class TestMaildirMessage(TestMessage, unittest.TestCase): 1429 1430 _factory = mailbox.MaildirMessage 1431 1432 def _post_initialize_hook(self, msg): 1433 self.assertEqual(msg._subdir, 'new') 1434 self.assertEqual(msg._info, '') 1435 1436 def test_subdir(self): 1437 # Use get_subdir() and set_subdir() 1438 msg = mailbox.MaildirMessage(_sample_message) 1439 self.assertEqual(msg.get_subdir(), 'new') 1440 msg.set_subdir('cur') 1441 self.assertEqual(msg.get_subdir(), 'cur') 1442 msg.set_subdir('new') 1443 self.assertEqual(msg.get_subdir(), 'new') 1444 self.assertRaises(ValueError, lambda: msg.set_subdir('tmp')) 1445 self.assertEqual(msg.get_subdir(), 'new') 1446 msg.set_subdir('new') 1447 self.assertEqual(msg.get_subdir(), 'new') 1448 self._check_sample(msg) 1449 1450 def test_flags(self): 1451 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1452 msg = mailbox.MaildirMessage(_sample_message) 1453 self.assertEqual(msg.get_flags(), '') 1454 self.assertEqual(msg.get_subdir(), 'new') 1455 msg.set_flags('F') 1456 self.assertEqual(msg.get_subdir(), 'new') 1457 self.assertEqual(msg.get_flags(), 'F') 1458 msg.set_flags('SDTP') 1459 self.assertEqual(msg.get_flags(), 'DPST') 1460 msg.add_flag('FT') 1461 self.assertEqual(msg.get_flags(), 'DFPST') 1462 msg.remove_flag('TDRP') 1463 self.assertEqual(msg.get_flags(), 'FS') 1464 self.assertEqual(msg.get_subdir(), 'new') 1465 self._check_sample(msg) 1466 1467 def test_date(self): 1468 # Use get_date() and set_date() 1469 msg = mailbox.MaildirMessage(_sample_message) 1470 self.assertLess(abs(msg.get_date() - time.time()), 60) 1471 msg.set_date(0.0) 1472 self.assertEqual(msg.get_date(), 0.0) 1473 1474 def test_info(self): 1475 # Use get_info() and set_info() 1476 msg = mailbox.MaildirMessage(_sample_message) 1477 self.assertEqual(msg.get_info(), '') 1478 msg.set_info('1,foo=bar') 1479 self.assertEqual(msg.get_info(), '1,foo=bar') 1480 self.assertRaises(TypeError, lambda: msg.set_info(None)) 1481 self._check_sample(msg) 1482 1483 def test_info_and_flags(self): 1484 # Test interaction of info and flag methods 1485 msg = mailbox.MaildirMessage(_sample_message) 1486 self.assertEqual(msg.get_info(), '') 1487 msg.set_flags('SF') 1488 self.assertEqual(msg.get_flags(), 'FS') 1489 self.assertEqual(msg.get_info(), '2,FS') 1490 msg.set_info('1,') 1491 self.assertEqual(msg.get_flags(), '') 1492 self.assertEqual(msg.get_info(), '1,') 1493 msg.remove_flag('RPT') 1494 self.assertEqual(msg.get_flags(), '') 1495 self.assertEqual(msg.get_info(), '1,') 1496 msg.add_flag('D') 1497 self.assertEqual(msg.get_flags(), 'D') 1498 self.assertEqual(msg.get_info(), '2,D') 1499 self._check_sample(msg) 1500 1501 1502class _TestMboxMMDFMessage: 1503 1504 _factory = mailbox._mboxMMDFMessage 1505 1506 def _post_initialize_hook(self, msg): 1507 self._check_from(msg) 1508 1509 def test_initialize_with_unixfrom(self): 1510 # Initialize with a message that already has a _unixfrom attribute 1511 msg = mailbox.Message(_sample_message) 1512 msg.set_unixfrom('From foo@bar blah') 1513 msg = mailbox.mboxMessage(msg) 1514 self.assertEqual(msg.get_from(), 'foo@bar blah', msg.get_from()) 1515 1516 def test_from(self): 1517 # Get and set "From " line 1518 msg = mailbox.mboxMessage(_sample_message) 1519 self._check_from(msg) 1520 msg.set_from('foo bar') 1521 self.assertEqual(msg.get_from(), 'foo bar') 1522 msg.set_from('foo@bar', True) 1523 self._check_from(msg, 'foo@bar') 1524 msg.set_from('blah@temp', time.localtime()) 1525 self._check_from(msg, 'blah@temp') 1526 1527 def test_flags(self): 1528 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1529 msg = mailbox.mboxMessage(_sample_message) 1530 self.assertEqual(msg.get_flags(), '') 1531 msg.set_flags('F') 1532 self.assertEqual(msg.get_flags(), 'F') 1533 msg.set_flags('XODR') 1534 self.assertEqual(msg.get_flags(), 'RODX') 1535 msg.add_flag('FA') 1536 self.assertEqual(msg.get_flags(), 'RODFAX') 1537 msg.remove_flag('FDXA') 1538 self.assertEqual(msg.get_flags(), 'RO') 1539 self._check_sample(msg) 1540 1541 def _check_from(self, msg, sender=None): 1542 # Check contents of "From " line 1543 if sender is None: 1544 sender = "MAILER-DAEMON" 1545 self.assertIsNotNone(re.match( 1546 sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:\d{2} \d{4}", 1547 msg.get_from())) 1548 1549 1550class TestMboxMessage(_TestMboxMMDFMessage, TestMessage): 1551 1552 _factory = mailbox.mboxMessage 1553 1554 1555class TestMHMessage(TestMessage, unittest.TestCase): 1556 1557 _factory = mailbox.MHMessage 1558 1559 def _post_initialize_hook(self, msg): 1560 self.assertEqual(msg._sequences, []) 1561 1562 def test_sequences(self): 1563 # Get, set, join, and leave sequences 1564 msg = mailbox.MHMessage(_sample_message) 1565 self.assertEqual(msg.get_sequences(), []) 1566 msg.set_sequences(['foobar']) 1567 self.assertEqual(msg.get_sequences(), ['foobar']) 1568 msg.set_sequences([]) 1569 self.assertEqual(msg.get_sequences(), []) 1570 msg.add_sequence('unseen') 1571 self.assertEqual(msg.get_sequences(), ['unseen']) 1572 msg.add_sequence('flagged') 1573 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1574 msg.add_sequence('flagged') 1575 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1576 msg.remove_sequence('unseen') 1577 self.assertEqual(msg.get_sequences(), ['flagged']) 1578 msg.add_sequence('foobar') 1579 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1580 msg.remove_sequence('replied') 1581 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1582 msg.set_sequences(['foobar', 'replied']) 1583 self.assertEqual(msg.get_sequences(), ['foobar', 'replied']) 1584 1585 1586class TestBabylMessage(TestMessage, unittest.TestCase): 1587 1588 _factory = mailbox.BabylMessage 1589 1590 def _post_initialize_hook(self, msg): 1591 self.assertEqual(msg._labels, []) 1592 1593 def test_labels(self): 1594 # Get, set, join, and leave labels 1595 msg = mailbox.BabylMessage(_sample_message) 1596 self.assertEqual(msg.get_labels(), []) 1597 msg.set_labels(['foobar']) 1598 self.assertEqual(msg.get_labels(), ['foobar']) 1599 msg.set_labels([]) 1600 self.assertEqual(msg.get_labels(), []) 1601 msg.add_label('filed') 1602 self.assertEqual(msg.get_labels(), ['filed']) 1603 msg.add_label('resent') 1604 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1605 msg.add_label('resent') 1606 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1607 msg.remove_label('filed') 1608 self.assertEqual(msg.get_labels(), ['resent']) 1609 msg.add_label('foobar') 1610 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1611 msg.remove_label('unseen') 1612 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1613 msg.set_labels(['foobar', 'answered']) 1614 self.assertEqual(msg.get_labels(), ['foobar', 'answered']) 1615 1616 def test_visible(self): 1617 # Get, set, and update visible headers 1618 msg = mailbox.BabylMessage(_sample_message) 1619 visible = msg.get_visible() 1620 self.assertEqual(visible.keys(), []) 1621 self.assertIsNone(visible.get_payload()) 1622 visible['User-Agent'] = 'FooBar 1.0' 1623 visible['X-Whatever'] = 'Blah' 1624 self.assertEqual(msg.get_visible().keys(), []) 1625 msg.set_visible(visible) 1626 visible = msg.get_visible() 1627 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1628 self.assertEqual(visible['User-Agent'], 'FooBar 1.0') 1629 self.assertEqual(visible['X-Whatever'], 'Blah') 1630 self.assertIsNone(visible.get_payload()) 1631 msg.update_visible() 1632 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1633 self.assertIsNone(visible.get_payload()) 1634 visible = msg.get_visible() 1635 self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To', 1636 'Subject']) 1637 for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'): 1638 self.assertEqual(visible[header], msg[header]) 1639 1640 1641class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage): 1642 1643 _factory = mailbox.MMDFMessage 1644 1645 1646class TestMessageConversion(TestBase, unittest.TestCase): 1647 1648 def test_plain_to_x(self): 1649 # Convert Message to all formats 1650 for class_ in self.all_mailbox_types: 1651 msg_plain = mailbox.Message(_sample_message) 1652 msg = class_(msg_plain) 1653 self._check_sample(msg) 1654 1655 def test_x_to_plain(self): 1656 # Convert all formats to Message 1657 for class_ in self.all_mailbox_types: 1658 msg = class_(_sample_message) 1659 msg_plain = mailbox.Message(msg) 1660 self._check_sample(msg_plain) 1661 1662 def test_x_from_bytes(self): 1663 # Convert all formats to Message 1664 for class_ in self.all_mailbox_types: 1665 msg = class_(_bytes_sample_message) 1666 self._check_sample(msg) 1667 1668 def test_x_to_invalid(self): 1669 # Convert all formats to an invalid format 1670 for class_ in self.all_mailbox_types: 1671 self.assertRaises(TypeError, lambda: class_(False)) 1672 1673 def test_type_specific_attributes_removed_on_conversion(self): 1674 reference = {class_: class_(_sample_message).__dict__ 1675 for class_ in self.all_mailbox_types} 1676 for class1 in self.all_mailbox_types: 1677 for class2 in self.all_mailbox_types: 1678 if class1 is class2: 1679 continue 1680 source = class1(_sample_message) 1681 target = class2(source) 1682 type_specific = [a for a in reference[class1] 1683 if a not in reference[class2]] 1684 for attr in type_specific: 1685 self.assertNotIn(attr, target.__dict__, 1686 "while converting {} to {}".format(class1, class2)) 1687 1688 def test_maildir_to_maildir(self): 1689 # Convert MaildirMessage to MaildirMessage 1690 msg_maildir = mailbox.MaildirMessage(_sample_message) 1691 msg_maildir.set_flags('DFPRST') 1692 msg_maildir.set_subdir('cur') 1693 date = msg_maildir.get_date() 1694 msg = mailbox.MaildirMessage(msg_maildir) 1695 self._check_sample(msg) 1696 self.assertEqual(msg.get_flags(), 'DFPRST') 1697 self.assertEqual(msg.get_subdir(), 'cur') 1698 self.assertEqual(msg.get_date(), date) 1699 1700 def test_maildir_to_mboxmmdf(self): 1701 # Convert MaildirMessage to mboxmessage and MMDFMessage 1702 pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'), 1703 ('T', 'D'), ('DFPRST', 'RDFA')) 1704 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1705 msg_maildir = mailbox.MaildirMessage(_sample_message) 1706 msg_maildir.set_date(0.0) 1707 for setting, result in pairs: 1708 msg_maildir.set_flags(setting) 1709 msg = class_(msg_maildir) 1710 self.assertEqual(msg.get_flags(), result) 1711 self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' % 1712 time.asctime(time.gmtime(0.0))) 1713 msg_maildir.set_subdir('cur') 1714 self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA') 1715 1716 def test_maildir_to_mh(self): 1717 # Convert MaildirMessage to MHMessage 1718 msg_maildir = mailbox.MaildirMessage(_sample_message) 1719 pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']), 1720 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []), 1721 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged'])) 1722 for setting, result in pairs: 1723 msg_maildir.set_flags(setting) 1724 self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(), 1725 result) 1726 1727 def test_maildir_to_babyl(self): 1728 # Convert MaildirMessage to Babyl 1729 msg_maildir = mailbox.MaildirMessage(_sample_message) 1730 pairs = (('D', ['unseen']), ('F', ['unseen']), 1731 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']), 1732 ('S', []), ('T', ['unseen', 'deleted']), 1733 ('DFPRST', ['deleted', 'answered', 'forwarded'])) 1734 for setting, result in pairs: 1735 msg_maildir.set_flags(setting) 1736 self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(), 1737 result) 1738 1739 def test_mboxmmdf_to_maildir(self): 1740 # Convert mboxMessage and MMDFMessage to MaildirMessage 1741 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1742 msg_mboxMMDF = class_(_sample_message) 1743 msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0)) 1744 pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'), 1745 ('RODFA', 'FRST')) 1746 for setting, result in pairs: 1747 msg_mboxMMDF.set_flags(setting) 1748 msg = mailbox.MaildirMessage(msg_mboxMMDF) 1749 self.assertEqual(msg.get_flags(), result) 1750 self.assertEqual(msg.get_date(), 0.0) 1751 msg_mboxMMDF.set_flags('O') 1752 self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(), 1753 'cur') 1754 1755 def test_mboxmmdf_to_mboxmmdf(self): 1756 # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage 1757 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1758 msg_mboxMMDF = class_(_sample_message) 1759 msg_mboxMMDF.set_flags('RODFA') 1760 msg_mboxMMDF.set_from('foo@bar') 1761 for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1762 msg2 = class2_(msg_mboxMMDF) 1763 self.assertEqual(msg2.get_flags(), 'RODFA') 1764 self.assertEqual(msg2.get_from(), 'foo@bar') 1765 1766 def test_mboxmmdf_to_mh(self): 1767 # Convert mboxMessage and MMDFMessage to MHMessage 1768 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1769 msg_mboxMMDF = class_(_sample_message) 1770 pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']), 1771 ('F', ['unseen', 'flagged']), 1772 ('A', ['unseen', 'replied']), 1773 ('RODFA', ['replied', 'flagged'])) 1774 for setting, result in pairs: 1775 msg_mboxMMDF.set_flags(setting) 1776 self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(), 1777 result) 1778 1779 def test_mboxmmdf_to_babyl(self): 1780 # Convert mboxMessage and MMDFMessage to BabylMessage 1781 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1782 msg = class_(_sample_message) 1783 pairs = (('R', []), ('O', ['unseen']), 1784 ('D', ['unseen', 'deleted']), ('F', ['unseen']), 1785 ('A', ['unseen', 'answered']), 1786 ('RODFA', ['deleted', 'answered'])) 1787 for setting, result in pairs: 1788 msg.set_flags(setting) 1789 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1790 1791 def test_mh_to_maildir(self): 1792 # Convert MHMessage to MaildirMessage 1793 pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS')) 1794 for setting, result in pairs: 1795 msg = mailbox.MHMessage(_sample_message) 1796 msg.add_sequence(setting) 1797 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1798 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1799 msg = mailbox.MHMessage(_sample_message) 1800 msg.add_sequence('unseen') 1801 msg.add_sequence('replied') 1802 msg.add_sequence('flagged') 1803 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR') 1804 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1805 1806 def test_mh_to_mboxmmdf(self): 1807 # Convert MHMessage to mboxMessage and MMDFMessage 1808 pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF')) 1809 for setting, result in pairs: 1810 msg = mailbox.MHMessage(_sample_message) 1811 msg.add_sequence(setting) 1812 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1813 self.assertEqual(class_(msg).get_flags(), result) 1814 msg = mailbox.MHMessage(_sample_message) 1815 msg.add_sequence('unseen') 1816 msg.add_sequence('replied') 1817 msg.add_sequence('flagged') 1818 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1819 self.assertEqual(class_(msg).get_flags(), 'OFA') 1820 1821 def test_mh_to_mh(self): 1822 # Convert MHMessage to MHMessage 1823 msg = mailbox.MHMessage(_sample_message) 1824 msg.add_sequence('unseen') 1825 msg.add_sequence('replied') 1826 msg.add_sequence('flagged') 1827 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1828 ['unseen', 'replied', 'flagged']) 1829 1830 def test_mh_to_babyl(self): 1831 # Convert MHMessage to BabylMessage 1832 pairs = (('unseen', ['unseen']), ('replied', ['answered']), 1833 ('flagged', [])) 1834 for setting, result in pairs: 1835 msg = mailbox.MHMessage(_sample_message) 1836 msg.add_sequence(setting) 1837 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1838 msg = mailbox.MHMessage(_sample_message) 1839 msg.add_sequence('unseen') 1840 msg.add_sequence('replied') 1841 msg.add_sequence('flagged') 1842 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), 1843 ['unseen', 'answered']) 1844 1845 def test_babyl_to_maildir(self): 1846 # Convert BabylMessage to MaildirMessage 1847 pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'), 1848 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'), 1849 ('resent', 'PS')) 1850 for setting, result in pairs: 1851 msg = mailbox.BabylMessage(_sample_message) 1852 msg.add_label(setting) 1853 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1854 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1855 msg = mailbox.BabylMessage(_sample_message) 1856 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1857 'edited', 'resent'): 1858 msg.add_label(label) 1859 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT') 1860 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1861 1862 def test_babyl_to_mboxmmdf(self): 1863 # Convert BabylMessage to mboxMessage and MMDFMessage 1864 pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'), 1865 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'), 1866 ('resent', 'RO')) 1867 for setting, result in pairs: 1868 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1869 msg = mailbox.BabylMessage(_sample_message) 1870 msg.add_label(setting) 1871 self.assertEqual(class_(msg).get_flags(), result) 1872 msg = mailbox.BabylMessage(_sample_message) 1873 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1874 'edited', 'resent'): 1875 msg.add_label(label) 1876 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1877 self.assertEqual(class_(msg).get_flags(), 'ODA') 1878 1879 def test_babyl_to_mh(self): 1880 # Convert BabylMessage to MHMessage 1881 pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []), 1882 ('answered', ['replied']), ('forwarded', []), ('edited', []), 1883 ('resent', [])) 1884 for setting, result in pairs: 1885 msg = mailbox.BabylMessage(_sample_message) 1886 msg.add_label(setting) 1887 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result) 1888 msg = mailbox.BabylMessage(_sample_message) 1889 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1890 'edited', 'resent'): 1891 msg.add_label(label) 1892 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1893 ['unseen', 'replied']) 1894 1895 def test_babyl_to_babyl(self): 1896 # Convert BabylMessage to BabylMessage 1897 msg = mailbox.BabylMessage(_sample_message) 1898 msg.update_visible() 1899 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1900 'edited', 'resent'): 1901 msg.add_label(label) 1902 msg2 = mailbox.BabylMessage(msg) 1903 self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed', 1904 'answered', 'forwarded', 'edited', 1905 'resent']) 1906 self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys()) 1907 for key in msg.get_visible().keys(): 1908 self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key]) 1909 1910 1911class TestProxyFileBase(TestBase): 1912 1913 def _test_read(self, proxy): 1914 # Read by byte 1915 proxy.seek(0) 1916 self.assertEqual(proxy.read(), b'bar') 1917 proxy.seek(1) 1918 self.assertEqual(proxy.read(), b'ar') 1919 proxy.seek(0) 1920 self.assertEqual(proxy.read(2), b'ba') 1921 proxy.seek(1) 1922 self.assertEqual(proxy.read(-1), b'ar') 1923 proxy.seek(2) 1924 self.assertEqual(proxy.read(1000), b'r') 1925 1926 def _test_readline(self, proxy): 1927 # Read by line 1928 linesep = os.linesep.encode() 1929 proxy.seek(0) 1930 self.assertEqual(proxy.readline(), b'foo' + linesep) 1931 self.assertEqual(proxy.readline(), b'bar' + linesep) 1932 self.assertEqual(proxy.readline(), b'fred' + linesep) 1933 self.assertEqual(proxy.readline(), b'bob') 1934 proxy.seek(2) 1935 self.assertEqual(proxy.readline(), b'o' + linesep) 1936 proxy.seek(6 + 2 * len(os.linesep)) 1937 self.assertEqual(proxy.readline(), b'fred' + linesep) 1938 proxy.seek(6 + 2 * len(os.linesep)) 1939 self.assertEqual(proxy.readline(2), b'fr') 1940 self.assertEqual(proxy.readline(-10), b'ed' + linesep) 1941 1942 def _test_readlines(self, proxy): 1943 # Read multiple lines 1944 linesep = os.linesep.encode() 1945 proxy.seek(0) 1946 self.assertEqual(proxy.readlines(), [b'foo' + linesep, 1947 b'bar' + linesep, 1948 b'fred' + linesep, b'bob']) 1949 proxy.seek(0) 1950 self.assertEqual(proxy.readlines(2), [b'foo' + linesep]) 1951 proxy.seek(3 + len(linesep)) 1952 self.assertEqual(proxy.readlines(4 + len(linesep)), 1953 [b'bar' + linesep, b'fred' + linesep]) 1954 proxy.seek(3) 1955 self.assertEqual(proxy.readlines(1000), [linesep, b'bar' + linesep, 1956 b'fred' + linesep, b'bob']) 1957 1958 def _test_iteration(self, proxy): 1959 # Iterate by line 1960 linesep = os.linesep.encode() 1961 proxy.seek(0) 1962 iterator = iter(proxy) 1963 self.assertEqual(next(iterator), b'foo' + linesep) 1964 self.assertEqual(next(iterator), b'bar' + linesep) 1965 self.assertEqual(next(iterator), b'fred' + linesep) 1966 self.assertEqual(next(iterator), b'bob') 1967 self.assertRaises(StopIteration, next, iterator) 1968 1969 def _test_seek_and_tell(self, proxy): 1970 # Seek and use tell to check position 1971 linesep = os.linesep.encode() 1972 proxy.seek(3) 1973 self.assertEqual(proxy.tell(), 3) 1974 self.assertEqual(proxy.read(len(linesep)), linesep) 1975 proxy.seek(2, 1) 1976 self.assertEqual(proxy.read(1 + len(linesep)), b'r' + linesep) 1977 proxy.seek(-3 - len(linesep), 2) 1978 self.assertEqual(proxy.read(3), b'bar') 1979 proxy.seek(2, 0) 1980 self.assertEqual(proxy.read(), b'o' + linesep + b'bar' + linesep) 1981 proxy.seek(100) 1982 self.assertFalse(proxy.read()) 1983 1984 def _test_close(self, proxy): 1985 # Close a file 1986 self.assertFalse(proxy.closed) 1987 proxy.close() 1988 self.assertTrue(proxy.closed) 1989 # Issue 11700 subsequent closes should be a no-op. 1990 proxy.close() 1991 self.assertTrue(proxy.closed) 1992 1993 1994class TestProxyFile(TestProxyFileBase, unittest.TestCase): 1995 1996 def setUp(self): 1997 self._path = support.TESTFN 1998 self._file = open(self._path, 'wb+') 1999 2000 def tearDown(self): 2001 self._file.close() 2002 self._delete_recursively(self._path) 2003 2004 def test_initialize(self): 2005 # Initialize and check position 2006 self._file.write(b'foo') 2007 pos = self._file.tell() 2008 proxy0 = mailbox._ProxyFile(self._file) 2009 self.assertEqual(proxy0.tell(), pos) 2010 self.assertEqual(self._file.tell(), pos) 2011 proxy1 = mailbox._ProxyFile(self._file, 0) 2012 self.assertEqual(proxy1.tell(), 0) 2013 self.assertEqual(self._file.tell(), pos) 2014 2015 def test_read(self): 2016 self._file.write(b'bar') 2017 self._test_read(mailbox._ProxyFile(self._file)) 2018 2019 def test_readline(self): 2020 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2021 os.linesep), 'ascii')) 2022 self._test_readline(mailbox._ProxyFile(self._file)) 2023 2024 def test_readlines(self): 2025 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2026 os.linesep), 'ascii')) 2027 self._test_readlines(mailbox._ProxyFile(self._file)) 2028 2029 def test_iteration(self): 2030 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2031 os.linesep), 'ascii')) 2032 self._test_iteration(mailbox._ProxyFile(self._file)) 2033 2034 def test_seek_and_tell(self): 2035 self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii')) 2036 self._test_seek_and_tell(mailbox._ProxyFile(self._file)) 2037 2038 def test_close(self): 2039 self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii')) 2040 self._test_close(mailbox._ProxyFile(self._file)) 2041 2042 2043class TestPartialFile(TestProxyFileBase, unittest.TestCase): 2044 2045 def setUp(self): 2046 self._path = support.TESTFN 2047 self._file = open(self._path, 'wb+') 2048 2049 def tearDown(self): 2050 self._file.close() 2051 self._delete_recursively(self._path) 2052 2053 def test_initialize(self): 2054 # Initialize and check position 2055 self._file.write(bytes('foo' + os.linesep + 'bar', 'ascii')) 2056 pos = self._file.tell() 2057 proxy = mailbox._PartialFile(self._file, 2, 5) 2058 self.assertEqual(proxy.tell(), 0) 2059 self.assertEqual(self._file.tell(), pos) 2060 2061 def test_read(self): 2062 self._file.write(bytes('***bar***', 'ascii')) 2063 self._test_read(mailbox._PartialFile(self._file, 3, 6)) 2064 2065 def test_readline(self): 2066 self._file.write(bytes('!!!!!foo%sbar%sfred%sbob!!!!!' % 2067 (os.linesep, os.linesep, os.linesep), 'ascii')) 2068 self._test_readline(mailbox._PartialFile(self._file, 5, 2069 18 + 3 * len(os.linesep))) 2070 2071 def test_readlines(self): 2072 self._file.write(bytes('foo%sbar%sfred%sbob?????' % 2073 (os.linesep, os.linesep, os.linesep), 'ascii')) 2074 self._test_readlines(mailbox._PartialFile(self._file, 0, 2075 13 + 3 * len(os.linesep))) 2076 2077 def test_iteration(self): 2078 self._file.write(bytes('____foo%sbar%sfred%sbob####' % 2079 (os.linesep, os.linesep, os.linesep), 'ascii')) 2080 self._test_iteration(mailbox._PartialFile(self._file, 4, 2081 17 + 3 * len(os.linesep))) 2082 2083 def test_seek_and_tell(self): 2084 self._file.write(bytes('(((foo%sbar%s$$$' % (os.linesep, os.linesep), 'ascii')) 2085 self._test_seek_and_tell(mailbox._PartialFile(self._file, 3, 2086 9 + 2 * len(os.linesep))) 2087 2088 def test_close(self): 2089 self._file.write(bytes('&foo%sbar%s^' % (os.linesep, os.linesep), 'ascii')) 2090 self._test_close(mailbox._PartialFile(self._file, 1, 2091 6 + 3 * len(os.linesep))) 2092 2093 2094## Start: tests from the original module (for backward compatibility). 2095 2096FROM_ = "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n" 2097DUMMY_MESSAGE = """\ 2098From: some.body@dummy.domain 2099To: me@my.domain 2100Subject: Simple Test 2101 2102This is a dummy message. 2103""" 2104 2105class MaildirTestCase(unittest.TestCase): 2106 2107 def setUp(self): 2108 # create a new maildir mailbox to work with: 2109 self._dir = support.TESTFN 2110 if os.path.isdir(self._dir): 2111 support.rmtree(self._dir) 2112 elif os.path.isfile(self._dir): 2113 support.unlink(self._dir) 2114 os.mkdir(self._dir) 2115 os.mkdir(os.path.join(self._dir, "cur")) 2116 os.mkdir(os.path.join(self._dir, "tmp")) 2117 os.mkdir(os.path.join(self._dir, "new")) 2118 self._counter = 1 2119 self._msgfiles = [] 2120 2121 def tearDown(self): 2122 list(map(os.unlink, self._msgfiles)) 2123 support.rmdir(os.path.join(self._dir, "cur")) 2124 support.rmdir(os.path.join(self._dir, "tmp")) 2125 support.rmdir(os.path.join(self._dir, "new")) 2126 support.rmdir(self._dir) 2127 2128 def createMessage(self, dir, mbox=False): 2129 t = int(time.time() % 1000000) 2130 pid = self._counter 2131 self._counter += 1 2132 filename = ".".join((str(t), str(pid), "myhostname", "mydomain")) 2133 tmpname = os.path.join(self._dir, "tmp", filename) 2134 newname = os.path.join(self._dir, dir, filename) 2135 with open(tmpname, "w") as fp: 2136 self._msgfiles.append(tmpname) 2137 if mbox: 2138 fp.write(FROM_) 2139 fp.write(DUMMY_MESSAGE) 2140 try: 2141 os.link(tmpname, newname) 2142 except (AttributeError, PermissionError): 2143 with open(newname, "w") as fp: 2144 fp.write(DUMMY_MESSAGE) 2145 self._msgfiles.append(newname) 2146 return tmpname 2147 2148 def test_empty_maildir(self): 2149 """Test an empty maildir mailbox""" 2150 # Test for regression on bug #117490: 2151 # Make sure the boxes attribute actually gets set. 2152 self.mbox = mailbox.Maildir(support.TESTFN) 2153 #self.assertTrue(hasattr(self.mbox, "boxes")) 2154 #self.assertEqual(len(self.mbox.boxes), 0) 2155 self.assertIsNone(self.mbox.next()) 2156 self.assertIsNone(self.mbox.next()) 2157 2158 def test_nonempty_maildir_cur(self): 2159 self.createMessage("cur") 2160 self.mbox = mailbox.Maildir(support.TESTFN) 2161 #self.assertEqual(len(self.mbox.boxes), 1) 2162 self.assertIsNotNone(self.mbox.next()) 2163 self.assertIsNone(self.mbox.next()) 2164 self.assertIsNone(self.mbox.next()) 2165 2166 def test_nonempty_maildir_new(self): 2167 self.createMessage("new") 2168 self.mbox = mailbox.Maildir(support.TESTFN) 2169 #self.assertEqual(len(self.mbox.boxes), 1) 2170 self.assertIsNotNone(self.mbox.next()) 2171 self.assertIsNone(self.mbox.next()) 2172 self.assertIsNone(self.mbox.next()) 2173 2174 def test_nonempty_maildir_both(self): 2175 self.createMessage("cur") 2176 self.createMessage("new") 2177 self.mbox = mailbox.Maildir(support.TESTFN) 2178 #self.assertEqual(len(self.mbox.boxes), 2) 2179 self.assertIsNotNone(self.mbox.next()) 2180 self.assertIsNotNone(self.mbox.next()) 2181 self.assertIsNone(self.mbox.next()) 2182 self.assertIsNone(self.mbox.next()) 2183 2184## End: tests from the original module (for backward compatibility). 2185 2186 2187_sample_message = """\ 2188Return-Path: <gkj@gregorykjohnson.com> 2189X-Original-To: gkj+person@localhost 2190Delivered-To: gkj+person@localhost 2191Received: from localhost (localhost [127.0.0.1]) 2192 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 2193 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 2194Delivered-To: gkj@sundance.gregorykjohnson.com 2195Received: from localhost [127.0.0.1] 2196 by localhost with POP3 (fetchmail-6.2.5) 2197 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 2198Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 2199 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 2200 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 2201Received: by andy.gregorykjohnson.com (Postfix, from userid 1000) 2202 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 2203Date: Wed, 13 Jul 2005 17:23:11 -0400 2204From: "Gregory K. Johnson" <gkj@gregorykjohnson.com> 2205To: gkj@gregorykjohnson.com 2206Subject: Sample message 2207Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com> 2208Mime-Version: 1.0 2209Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+" 2210Content-Disposition: inline 2211User-Agent: Mutt/1.5.9i 2212 2213 2214--NMuMz9nt05w80d4+ 2215Content-Type: text/plain; charset=us-ascii 2216Content-Disposition: inline 2217 2218This is a sample message. 2219 2220-- 2221Gregory K. Johnson 2222 2223--NMuMz9nt05w80d4+ 2224Content-Type: application/octet-stream 2225Content-Disposition: attachment; filename="text.gz" 2226Content-Transfer-Encoding: base64 2227 2228H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 22293FYlAAAA 2230 2231--NMuMz9nt05w80d4+-- 2232""" 2233 2234_bytes_sample_message = _sample_message.encode('ascii') 2235 2236_sample_headers = { 2237 "Return-Path":"<gkj@gregorykjohnson.com>", 2238 "X-Original-To":"gkj+person@localhost", 2239 "Delivered-To":"gkj+person@localhost", 2240 "Received":"""from localhost (localhost [127.0.0.1]) 2241 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 2242 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 2243 "Delivered-To":"gkj@sundance.gregorykjohnson.com", 2244 "Received":"""from localhost [127.0.0.1] 2245 by localhost with POP3 (fetchmail-6.2.5) 2246 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 2247 "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 2248 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 2249 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 2250 "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000) 2251 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 2252 "Date":"Wed, 13 Jul 2005 17:23:11 -0400", 2253 "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""", 2254 "To":"gkj@gregorykjohnson.com", 2255 "Subject":"Sample message", 2256 "Mime-Version":"1.0", 2257 "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""", 2258 "Content-Disposition":"inline", 2259 "User-Agent": "Mutt/1.5.9i" } 2260 2261_sample_payloads = ("""This is a sample message. 2262 2263-- 2264Gregory K. Johnson 2265""", 2266"""H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 22673FYlAAAA 2268""") 2269 2270 2271class MiscTestCase(unittest.TestCase): 2272 def test__all__(self): 2273 blacklist = {"linesep", "fcntl"} 2274 support.check__all__(self, mailbox, blacklist=blacklist) 2275 2276 2277def test_main(): 2278 tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH, 2279 TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage, 2280 TestMHMessage, TestBabylMessage, TestMMDFMessage, 2281 TestMessageConversion, TestProxyFile, TestPartialFile, 2282 MaildirTestCase, TestFakeMailBox, MiscTestCase) 2283 support.run_unittest(*tests) 2284 support.reap_children() 2285 2286 2287if __name__ == '__main__': 2288 test_main() 2289