1import io 2import socket 3import datetime 4import textwrap 5import unittest 6import functools 7import contextlib 8import nntplib 9import os.path 10import re 11import threading 12 13from test import support 14from test.support import socket_helper 15from nntplib import NNTP, GroupInfo 16from unittest.mock import patch 17try: 18 import ssl 19except ImportError: 20 ssl = None 21 22 23certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem') 24 25if ssl is not None: 26 SSLError = ssl.SSLError 27else: 28 class SSLError(Exception): 29 """Non-existent exception class when we lack SSL support.""" 30 reason = "This will never be raised." 31 32# TODO: 33# - test the `file` arg to more commands 34# - test error conditions 35# - test auth and `usenetrc` 36 37 38class NetworkedNNTPTestsMixin: 39 40 def test_welcome(self): 41 welcome = self.server.getwelcome() 42 self.assertEqual(str, type(welcome)) 43 44 def test_help(self): 45 resp, lines = self.server.help() 46 self.assertTrue(resp.startswith("100 "), resp) 47 for line in lines: 48 self.assertEqual(str, type(line)) 49 50 def test_list(self): 51 resp, groups = self.server.list() 52 if len(groups) > 0: 53 self.assertEqual(GroupInfo, type(groups[0])) 54 self.assertEqual(str, type(groups[0].group)) 55 56 def test_list_active(self): 57 resp, groups = self.server.list(self.GROUP_PAT) 58 if len(groups) > 0: 59 self.assertEqual(GroupInfo, type(groups[0])) 60 self.assertEqual(str, type(groups[0].group)) 61 62 def test_unknown_command(self): 63 with self.assertRaises(nntplib.NNTPPermanentError) as cm: 64 self.server._shortcmd("XYZZY") 65 resp = cm.exception.response 66 self.assertTrue(resp.startswith("500 "), resp) 67 68 def test_newgroups(self): 69 # gmane gets a constant influx of new groups. In order not to stress 70 # the server too much, we choose a recent date in the past. 71 dt = datetime.date.today() - datetime.timedelta(days=7) 72 resp, groups = self.server.newgroups(dt) 73 if len(groups) > 0: 74 self.assertIsInstance(groups[0], GroupInfo) 75 self.assertIsInstance(groups[0].group, str) 76 77 def test_description(self): 78 def _check_desc(desc): 79 # Sanity checks 80 self.assertIsInstance(desc, str) 81 self.assertNotIn(self.GROUP_NAME, desc) 82 desc = self.server.description(self.GROUP_NAME) 83 _check_desc(desc) 84 # Another sanity check 85 self.assertIn("Python", desc) 86 # With a pattern 87 desc = self.server.description(self.GROUP_PAT) 88 _check_desc(desc) 89 # Shouldn't exist 90 desc = self.server.description("zk.brrtt.baz") 91 self.assertEqual(desc, '') 92 93 def test_descriptions(self): 94 resp, descs = self.server.descriptions(self.GROUP_PAT) 95 # 215 for LIST NEWSGROUPS, 282 for XGTITLE 96 self.assertTrue( 97 resp.startswith("215 ") or resp.startswith("282 "), resp) 98 self.assertIsInstance(descs, dict) 99 desc = descs[self.GROUP_NAME] 100 self.assertEqual(desc, self.server.description(self.GROUP_NAME)) 101 102 def test_group(self): 103 result = self.server.group(self.GROUP_NAME) 104 self.assertEqual(5, len(result)) 105 resp, count, first, last, group = result 106 self.assertEqual(group, self.GROUP_NAME) 107 self.assertIsInstance(count, int) 108 self.assertIsInstance(first, int) 109 self.assertIsInstance(last, int) 110 self.assertLessEqual(first, last) 111 self.assertTrue(resp.startswith("211 "), resp) 112 113 def test_date(self): 114 resp, date = self.server.date() 115 self.assertIsInstance(date, datetime.datetime) 116 # Sanity check 117 self.assertGreaterEqual(date.year, 1995) 118 self.assertLessEqual(date.year, 2030) 119 120 def _check_art_dict(self, art_dict): 121 # Some sanity checks for a field dictionary returned by OVER / XOVER 122 self.assertIsInstance(art_dict, dict) 123 # NNTP has 7 mandatory fields 124 self.assertGreaterEqual(art_dict.keys(), 125 {"subject", "from", "date", "message-id", 126 "references", ":bytes", ":lines"} 127 ) 128 for v in art_dict.values(): 129 self.assertIsInstance(v, (str, type(None))) 130 131 def test_xover(self): 132 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 133 resp, lines = self.server.xover(last - 5, last) 134 if len(lines) == 0: 135 self.skipTest("no articles retrieved") 136 # The 'last' article is not necessarily part of the output (cancelled?) 137 art_num, art_dict = lines[0] 138 self.assertGreaterEqual(art_num, last - 5) 139 self.assertLessEqual(art_num, last) 140 self._check_art_dict(art_dict) 141 142 @unittest.skipIf(True, 'temporarily skipped until a permanent solution' 143 ' is found for issue #28971') 144 def test_over(self): 145 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 146 start = last - 10 147 # The "start-" article range form 148 resp, lines = self.server.over((start, None)) 149 art_num, art_dict = lines[0] 150 self._check_art_dict(art_dict) 151 # The "start-end" article range form 152 resp, lines = self.server.over((start, last)) 153 art_num, art_dict = lines[-1] 154 # The 'last' article is not necessarily part of the output (cancelled?) 155 self.assertGreaterEqual(art_num, start) 156 self.assertLessEqual(art_num, last) 157 self._check_art_dict(art_dict) 158 # XXX The "message_id" form is unsupported by gmane 159 # 503 Overview by message-ID unsupported 160 161 def test_xhdr(self): 162 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 163 resp, lines = self.server.xhdr('subject', last) 164 for line in lines: 165 self.assertEqual(str, type(line[1])) 166 167 def check_article_resp(self, resp, article, art_num=None): 168 self.assertIsInstance(article, nntplib.ArticleInfo) 169 if art_num is not None: 170 self.assertEqual(article.number, art_num) 171 for line in article.lines: 172 self.assertIsInstance(line, bytes) 173 # XXX this could exceptionally happen... 174 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n")) 175 176 @unittest.skipIf(True, "FIXME: see bpo-32128") 177 def test_article_head_body(self): 178 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 179 # Try to find an available article 180 for art_num in (last, first, last - 1): 181 try: 182 resp, head = self.server.head(art_num) 183 except nntplib.NNTPTemporaryError as e: 184 if not e.response.startswith("423 "): 185 raise 186 # "423 No such article" => choose another one 187 continue 188 break 189 else: 190 self.skipTest("could not find a suitable article number") 191 self.assertTrue(resp.startswith("221 "), resp) 192 self.check_article_resp(resp, head, art_num) 193 resp, body = self.server.body(art_num) 194 self.assertTrue(resp.startswith("222 "), resp) 195 self.check_article_resp(resp, body, art_num) 196 resp, article = self.server.article(art_num) 197 self.assertTrue(resp.startswith("220 "), resp) 198 self.check_article_resp(resp, article, art_num) 199 # Tolerate running the tests from behind a NNTP virus checker 200 blacklist = lambda line: line.startswith(b'X-Antivirus') 201 filtered_head_lines = [line for line in head.lines 202 if not blacklist(line)] 203 filtered_lines = [line for line in article.lines 204 if not blacklist(line)] 205 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines) 206 207 def test_capabilities(self): 208 # The server under test implements NNTP version 2 and has a 209 # couple of well-known capabilities. Just sanity check that we 210 # got them. 211 def _check_caps(caps): 212 caps_list = caps['LIST'] 213 self.assertIsInstance(caps_list, (list, tuple)) 214 self.assertIn('OVERVIEW.FMT', caps_list) 215 self.assertGreaterEqual(self.server.nntp_version, 2) 216 _check_caps(self.server.getcapabilities()) 217 # This re-emits the command 218 resp, caps = self.server.capabilities() 219 _check_caps(caps) 220 221 def test_zlogin(self): 222 # This test must be the penultimate because further commands will be 223 # refused. 224 baduser = "notarealuser" 225 badpw = "notarealpassword" 226 # Check that bogus credentials cause failure 227 self.assertRaises(nntplib.NNTPError, self.server.login, 228 user=baduser, password=badpw, usenetrc=False) 229 # FIXME: We should check that correct credentials succeed, but that 230 # would require valid details for some server somewhere to be in the 231 # test suite, I think. Gmane is anonymous, at least as used for the 232 # other tests. 233 234 def test_zzquit(self): 235 # This test must be called last, hence the name 236 cls = type(self) 237 try: 238 self.server.quit() 239 finally: 240 cls.server = None 241 242 @classmethod 243 def wrap_methods(cls): 244 # Wrap all methods in a transient_internet() exception catcher 245 # XXX put a generic version in test.support? 246 def wrap_meth(meth): 247 @functools.wraps(meth) 248 def wrapped(self): 249 with socket_helper.transient_internet(self.NNTP_HOST): 250 meth(self) 251 return wrapped 252 for name in dir(cls): 253 if not name.startswith('test_'): 254 continue 255 meth = getattr(cls, name) 256 if not callable(meth): 257 continue 258 # Need to use a closure so that meth remains bound to its current 259 # value 260 setattr(cls, name, wrap_meth(meth)) 261 262 def test_timeout(self): 263 with self.assertRaises(ValueError): 264 self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False) 265 266 def test_with_statement(self): 267 def is_connected(): 268 if not hasattr(server, 'file'): 269 return False 270 try: 271 server.help() 272 except (OSError, EOFError): 273 return False 274 return True 275 276 try: 277 server = self.NNTP_CLASS(self.NNTP_HOST, 278 timeout=support.INTERNET_TIMEOUT, 279 usenetrc=False) 280 with server: 281 self.assertTrue(is_connected()) 282 self.assertTrue(server.help()) 283 self.assertFalse(is_connected()) 284 285 server = self.NNTP_CLASS(self.NNTP_HOST, 286 timeout=support.INTERNET_TIMEOUT, 287 usenetrc=False) 288 with server: 289 server.quit() 290 self.assertFalse(is_connected()) 291 except SSLError as ssl_err: 292 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small" 293 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason): 294 raise unittest.SkipTest(f"Got {ssl_err} connecting " 295 f"to {self.NNTP_HOST!r}") 296 raise 297 298 299NetworkedNNTPTestsMixin.wrap_methods() 300 301 302EOF_ERRORS = (EOFError,) 303if ssl is not None: 304 EOF_ERRORS += (ssl.SSLEOFError,) 305 306 307class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): 308 # This server supports STARTTLS (gmane doesn't) 309 NNTP_HOST = 'news.trigofacile.com' 310 GROUP_NAME = 'fr.comp.lang.python' 311 GROUP_PAT = 'fr.comp.lang.*' 312 313 NNTP_CLASS = NNTP 314 315 @classmethod 316 def setUpClass(cls): 317 support.requires("network") 318 with socket_helper.transient_internet(cls.NNTP_HOST): 319 try: 320 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, 321 timeout=support.INTERNET_TIMEOUT, 322 usenetrc=False) 323 except SSLError as ssl_err: 324 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small" 325 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason): 326 raise unittest.SkipTest(f"{cls} got {ssl_err} connecting " 327 f"to {cls.NNTP_HOST!r}") 328 raise 329 except EOF_ERRORS: 330 raise unittest.SkipTest(f"{cls} got EOF error on connecting " 331 f"to {cls.NNTP_HOST!r}") 332 333 @classmethod 334 def tearDownClass(cls): 335 if cls.server is not None: 336 cls.server.quit() 337 338@unittest.skipUnless(ssl, 'requires SSL support') 339class NetworkedNNTP_SSLTests(NetworkedNNTPTests): 340 341 # Technical limits for this public NNTP server (see http://www.aioe.org): 342 # "Only two concurrent connections per IP address are allowed and 343 # 400 connections per day are accepted from each IP address." 344 345 NNTP_HOST = 'nntp.aioe.org' 346 GROUP_NAME = 'comp.lang.python' 347 GROUP_PAT = 'comp.lang.*' 348 349 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None) 350 351 # Disabled as it produces too much data 352 test_list = None 353 354 # Disabled as the connection will already be encrypted. 355 test_starttls = None 356 357 358# 359# Non-networked tests using a local server (or something mocking it). 360# 361 362class _NNTPServerIO(io.RawIOBase): 363 """A raw IO object allowing NNTP commands to be received and processed 364 by a handler. The handler can push responses which can then be read 365 from the IO object.""" 366 367 def __init__(self, handler): 368 io.RawIOBase.__init__(self) 369 # The channel from the client 370 self.c2s = io.BytesIO() 371 # The channel to the client 372 self.s2c = io.BytesIO() 373 self.handler = handler 374 self.handler.start(self.c2s.readline, self.push_data) 375 376 def readable(self): 377 return True 378 379 def writable(self): 380 return True 381 382 def push_data(self, data): 383 """Push (buffer) some data to send to the client.""" 384 pos = self.s2c.tell() 385 self.s2c.seek(0, 2) 386 self.s2c.write(data) 387 self.s2c.seek(pos) 388 389 def write(self, b): 390 """The client sends us some data""" 391 pos = self.c2s.tell() 392 self.c2s.write(b) 393 self.c2s.seek(pos) 394 self.handler.process_pending() 395 return len(b) 396 397 def readinto(self, buf): 398 """The client wants to read a response""" 399 self.handler.process_pending() 400 b = self.s2c.read(len(buf)) 401 n = len(b) 402 buf[:n] = b 403 return n 404 405 406def make_mock_file(handler): 407 sio = _NNTPServerIO(handler) 408 # Using BufferedRWPair instead of BufferedRandom ensures the file 409 # isn't seekable. 410 file = io.BufferedRWPair(sio, sio) 411 return (sio, file) 412 413 414class NNTPServer(nntplib.NNTP): 415 416 def __init__(self, f, host, readermode=None): 417 self.file = f 418 self.host = host 419 self._base_init(readermode) 420 421 def _close(self): 422 self.file.close() 423 del self.file 424 425 426class MockedNNTPTestsMixin: 427 # Override in derived classes 428 handler_class = None 429 430 def setUp(self): 431 super().setUp() 432 self.make_server() 433 434 def tearDown(self): 435 super().tearDown() 436 del self.server 437 438 def make_server(self, *args, **kwargs): 439 self.handler = self.handler_class() 440 self.sio, file = make_mock_file(self.handler) 441 self.server = NNTPServer(file, 'test.server', *args, **kwargs) 442 return self.server 443 444 445class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin): 446 def setUp(self): 447 super().setUp() 448 self.make_server(readermode=True) 449 450 451class NNTPv1Handler: 452 """A handler for RFC 977""" 453 454 welcome = "200 NNTP mock server" 455 456 def start(self, readline, push_data): 457 self.in_body = False 458 self.allow_posting = True 459 self._readline = readline 460 self._push_data = push_data 461 self._logged_in = False 462 self._user_sent = False 463 # Our welcome 464 self.handle_welcome() 465 466 def _decode(self, data): 467 return str(data, "utf-8", "surrogateescape") 468 469 def process_pending(self): 470 if self.in_body: 471 while True: 472 line = self._readline() 473 if not line: 474 return 475 self.body.append(line) 476 if line == b".\r\n": 477 break 478 try: 479 meth, tokens = self.body_callback 480 meth(*tokens, body=self.body) 481 finally: 482 self.body_callback = None 483 self.body = None 484 self.in_body = False 485 while True: 486 line = self._decode(self._readline()) 487 if not line: 488 return 489 if not line.endswith("\r\n"): 490 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line)) 491 line = line[:-2] 492 cmd, *tokens = line.split() 493 #meth = getattr(self.handler, "handle_" + cmd.upper(), None) 494 meth = getattr(self, "handle_" + cmd.upper(), None) 495 if meth is None: 496 self.handle_unknown() 497 else: 498 try: 499 meth(*tokens) 500 except Exception as e: 501 raise ValueError("command failed: {!r}".format(line)) from e 502 else: 503 if self.in_body: 504 self.body_callback = meth, tokens 505 self.body = [] 506 507 def expect_body(self): 508 """Flag that the client is expected to post a request body""" 509 self.in_body = True 510 511 def push_data(self, data): 512 """Push some binary data""" 513 self._push_data(data) 514 515 def push_lit(self, lit): 516 """Push a string literal""" 517 lit = textwrap.dedent(lit) 518 lit = "\r\n".join(lit.splitlines()) + "\r\n" 519 lit = lit.encode('utf-8') 520 self.push_data(lit) 521 522 def handle_unknown(self): 523 self.push_lit("500 What?") 524 525 def handle_welcome(self): 526 self.push_lit(self.welcome) 527 528 def handle_QUIT(self): 529 self.push_lit("205 Bye!") 530 531 def handle_DATE(self): 532 self.push_lit("111 20100914001155") 533 534 def handle_GROUP(self, group): 535 if group == "fr.comp.lang.python": 536 self.push_lit("211 486 761 1265 fr.comp.lang.python") 537 else: 538 self.push_lit("411 No such group {}".format(group)) 539 540 def handle_HELP(self): 541 self.push_lit("""\ 542 100 Legal commands 543 authinfo user Name|pass Password|generic <prog> <args> 544 date 545 help 546 Report problems to <root@example.org> 547 .""") 548 549 def handle_STAT(self, message_spec=None): 550 if message_spec is None: 551 self.push_lit("412 No newsgroup selected") 552 elif message_spec == "3000234": 553 self.push_lit("223 3000234 <45223423@example.com>") 554 elif message_spec == "<45223423@example.com>": 555 self.push_lit("223 0 <45223423@example.com>") 556 else: 557 self.push_lit("430 No Such Article Found") 558 559 def handle_NEXT(self): 560 self.push_lit("223 3000237 <668929@example.org> retrieved") 561 562 def handle_LAST(self): 563 self.push_lit("223 3000234 <45223423@example.com> retrieved") 564 565 def handle_LIST(self, action=None, param=None): 566 if action is None: 567 self.push_lit("""\ 568 215 Newsgroups in form "group high low flags". 569 comp.lang.python 0000052340 0000002828 y 570 comp.lang.python.announce 0000001153 0000000993 m 571 free.it.comp.lang.python 0000000002 0000000002 y 572 fr.comp.lang.python 0000001254 0000000760 y 573 free.it.comp.lang.python.learner 0000000000 0000000001 y 574 tw.bbs.comp.lang.python 0000000304 0000000304 y 575 .""") 576 elif action == "ACTIVE": 577 if param == "*distutils*": 578 self.push_lit("""\ 579 215 Newsgroups in form "group high low flags" 580 gmane.comp.python.distutils.devel 0000014104 0000000001 m 581 gmane.comp.python.distutils.cvs 0000000000 0000000001 m 582 .""") 583 else: 584 self.push_lit("""\ 585 215 Newsgroups in form "group high low flags" 586 .""") 587 elif action == "OVERVIEW.FMT": 588 self.push_lit("""\ 589 215 Order of fields in overview database. 590 Subject: 591 From: 592 Date: 593 Message-ID: 594 References: 595 Bytes: 596 Lines: 597 Xref:full 598 .""") 599 elif action == "NEWSGROUPS": 600 assert param is not None 601 if param == "comp.lang.python": 602 self.push_lit("""\ 603 215 Descriptions in form "group description". 604 comp.lang.python\tThe Python computer language. 605 .""") 606 elif param == "comp.lang.python*": 607 self.push_lit("""\ 608 215 Descriptions in form "group description". 609 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated) 610 comp.lang.python\tThe Python computer language. 611 .""") 612 else: 613 self.push_lit("""\ 614 215 Descriptions in form "group description". 615 .""") 616 else: 617 self.push_lit('501 Unknown LIST keyword') 618 619 def handle_NEWNEWS(self, group, date_str, time_str): 620 # We hard code different return messages depending on passed 621 # argument and date syntax. 622 if (group == "comp.lang.python" and date_str == "20100913" 623 and time_str == "082004"): 624 # Date was passed in RFC 3977 format (NNTP "v2") 625 self.push_lit("""\ 626 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows 627 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com> 628 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com> 629 .""") 630 elif (group == "comp.lang.python" and date_str == "100913" 631 and time_str == "082004"): 632 # Date was passed in RFC 977 format (NNTP "v1") 633 self.push_lit("""\ 634 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows 635 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com> 636 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com> 637 .""") 638 elif (group == 'comp.lang.python' and 639 date_str in ('20100101', '100101') and 640 time_str == '090000'): 641 self.push_lit('too long line' * 3000 + 642 '\n.') 643 else: 644 self.push_lit("""\ 645 230 An empty list of newsarticles follows 646 .""") 647 # (Note for experiments: many servers disable NEWNEWS. 648 # As of this writing, sicinfo3.epfl.ch doesn't.) 649 650 def handle_XOVER(self, message_spec): 651 if message_spec == "57-59": 652 self.push_lit( 653 "224 Overview information for 57-58 follows\n" 654 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout" 655 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>" 656 "\tSat, 19 Jun 2010 18:04:08 -0400" 657 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>" 658 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16" 659 "\tXref: news.gmane.io gmane.comp.python.authors:57" 660 "\n" 661 "58\tLooking for a few good bloggers" 662 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>" 663 "\tThu, 22 Jul 2010 09:14:14 -0400" 664 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>" 665 "\t\t6683\t16" 666 "\t" 667 "\n" 668 # A UTF-8 overview line from fr.comp.lang.python 669 "59\tRe: Message d'erreur incompréhensible (par moi)" 670 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>" 671 "\tWed, 15 Sep 2010 18:09:15 +0200" 672 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>" 673 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27" 674 "\tXref: saria.nerim.net fr.comp.lang.python:1265" 675 "\n" 676 ".\n") 677 else: 678 self.push_lit("""\ 679 224 No articles 680 .""") 681 682 def handle_POST(self, *, body=None): 683 if body is None: 684 if self.allow_posting: 685 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>") 686 self.expect_body() 687 else: 688 self.push_lit("440 Posting not permitted") 689 else: 690 assert self.allow_posting 691 self.push_lit("240 Article received OK") 692 self.posted_body = body 693 694 def handle_IHAVE(self, message_id, *, body=None): 695 if body is None: 696 if (self.allow_posting and 697 message_id == "<i.am.an.article.you.will.want@example.com>"): 698 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>") 699 self.expect_body() 700 else: 701 self.push_lit("435 Article not wanted") 702 else: 703 assert self.allow_posting 704 self.push_lit("235 Article transferred OK") 705 self.posted_body = body 706 707 sample_head = """\ 708 From: "Demo User" <nobody@example.net> 709 Subject: I am just a test article 710 Content-Type: text/plain; charset=UTF-8; format=flowed 711 Message-ID: <i.am.an.article.you.will.want@example.com>""" 712 713 sample_body = """\ 714 This is just a test article. 715 ..Here is a dot-starting line. 716 717 -- Signed by Andr\xe9.""" 718 719 sample_article = sample_head + "\n\n" + sample_body 720 721 def handle_ARTICLE(self, message_spec=None): 722 if message_spec is None: 723 self.push_lit("220 3000237 <45223423@example.com>") 724 elif message_spec == "<45223423@example.com>": 725 self.push_lit("220 0 <45223423@example.com>") 726 elif message_spec == "3000234": 727 self.push_lit("220 3000234 <45223423@example.com>") 728 else: 729 self.push_lit("430 No Such Article Found") 730 return 731 self.push_lit(self.sample_article) 732 self.push_lit(".") 733 734 def handle_HEAD(self, message_spec=None): 735 if message_spec is None: 736 self.push_lit("221 3000237 <45223423@example.com>") 737 elif message_spec == "<45223423@example.com>": 738 self.push_lit("221 0 <45223423@example.com>") 739 elif message_spec == "3000234": 740 self.push_lit("221 3000234 <45223423@example.com>") 741 else: 742 self.push_lit("430 No Such Article Found") 743 return 744 self.push_lit(self.sample_head) 745 self.push_lit(".") 746 747 def handle_BODY(self, message_spec=None): 748 if message_spec is None: 749 self.push_lit("222 3000237 <45223423@example.com>") 750 elif message_spec == "<45223423@example.com>": 751 self.push_lit("222 0 <45223423@example.com>") 752 elif message_spec == "3000234": 753 self.push_lit("222 3000234 <45223423@example.com>") 754 else: 755 self.push_lit("430 No Such Article Found") 756 return 757 self.push_lit(self.sample_body) 758 self.push_lit(".") 759 760 def handle_AUTHINFO(self, cred_type, data): 761 if self._logged_in: 762 self.push_lit('502 Already Logged In') 763 elif cred_type == 'user': 764 if self._user_sent: 765 self.push_lit('482 User Credential Already Sent') 766 else: 767 self.push_lit('381 Password Required') 768 self._user_sent = True 769 elif cred_type == 'pass': 770 self.push_lit('281 Login Successful') 771 self._logged_in = True 772 else: 773 raise Exception('Unknown cred type {}'.format(cred_type)) 774 775 776class NNTPv2Handler(NNTPv1Handler): 777 """A handler for RFC 3977 (NNTP "v2")""" 778 779 def handle_CAPABILITIES(self): 780 fmt = """\ 781 101 Capability list: 782 VERSION 2 3 783 IMPLEMENTATION INN 2.5.1{} 784 HDR 785 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT 786 OVER 787 POST 788 READER 789 .""" 790 791 if not self._logged_in: 792 self.push_lit(fmt.format('\n AUTHINFO USER')) 793 else: 794 self.push_lit(fmt.format('')) 795 796 def handle_MODE(self, _): 797 raise Exception('MODE READER sent despite READER has been advertised') 798 799 def handle_OVER(self, message_spec=None): 800 return self.handle_XOVER(message_spec) 801 802 803class CapsAfterLoginNNTPv2Handler(NNTPv2Handler): 804 """A handler that allows CAPABILITIES only after login""" 805 806 def handle_CAPABILITIES(self): 807 if not self._logged_in: 808 self.push_lit('480 You must log in.') 809 else: 810 super().handle_CAPABILITIES() 811 812 813class ModeSwitchingNNTPv2Handler(NNTPv2Handler): 814 """A server that starts in transit mode""" 815 816 def __init__(self): 817 self._switched = False 818 819 def handle_CAPABILITIES(self): 820 fmt = """\ 821 101 Capability list: 822 VERSION 2 3 823 IMPLEMENTATION INN 2.5.1 824 HDR 825 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT 826 OVER 827 POST 828 {}READER 829 .""" 830 if self._switched: 831 self.push_lit(fmt.format('')) 832 else: 833 self.push_lit(fmt.format('MODE-')) 834 835 def handle_MODE(self, what): 836 assert not self._switched and what == 'reader' 837 self._switched = True 838 self.push_lit('200 Posting allowed') 839 840 841class NNTPv1v2TestsMixin: 842 843 def setUp(self): 844 super().setUp() 845 846 def test_welcome(self): 847 self.assertEqual(self.server.welcome, self.handler.welcome) 848 849 def test_authinfo(self): 850 if self.nntp_version == 2: 851 self.assertIn('AUTHINFO', self.server._caps) 852 self.server.login('testuser', 'testpw') 853 # if AUTHINFO is gone from _caps we also know that getcapabilities() 854 # has been called after login as it should 855 self.assertNotIn('AUTHINFO', self.server._caps) 856 857 def test_date(self): 858 resp, date = self.server.date() 859 self.assertEqual(resp, "111 20100914001155") 860 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55)) 861 862 def test_quit(self): 863 self.assertFalse(self.sio.closed) 864 resp = self.server.quit() 865 self.assertEqual(resp, "205 Bye!") 866 self.assertTrue(self.sio.closed) 867 868 def test_help(self): 869 resp, help = self.server.help() 870 self.assertEqual(resp, "100 Legal commands") 871 self.assertEqual(help, [ 872 ' authinfo user Name|pass Password|generic <prog> <args>', 873 ' date', 874 ' help', 875 'Report problems to <root@example.org>', 876 ]) 877 878 def test_list(self): 879 resp, groups = self.server.list() 880 self.assertEqual(len(groups), 6) 881 g = groups[1] 882 self.assertEqual(g, 883 GroupInfo("comp.lang.python.announce", "0000001153", 884 "0000000993", "m")) 885 resp, groups = self.server.list("*distutils*") 886 self.assertEqual(len(groups), 2) 887 g = groups[0] 888 self.assertEqual(g, 889 GroupInfo("gmane.comp.python.distutils.devel", "0000014104", 890 "0000000001", "m")) 891 892 def test_stat(self): 893 resp, art_num, message_id = self.server.stat(3000234) 894 self.assertEqual(resp, "223 3000234 <45223423@example.com>") 895 self.assertEqual(art_num, 3000234) 896 self.assertEqual(message_id, "<45223423@example.com>") 897 resp, art_num, message_id = self.server.stat("<45223423@example.com>") 898 self.assertEqual(resp, "223 0 <45223423@example.com>") 899 self.assertEqual(art_num, 0) 900 self.assertEqual(message_id, "<45223423@example.com>") 901 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 902 self.server.stat("<non.existent.id>") 903 self.assertEqual(cm.exception.response, "430 No Such Article Found") 904 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 905 self.server.stat() 906 self.assertEqual(cm.exception.response, "412 No newsgroup selected") 907 908 def test_next(self): 909 resp, art_num, message_id = self.server.next() 910 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved") 911 self.assertEqual(art_num, 3000237) 912 self.assertEqual(message_id, "<668929@example.org>") 913 914 def test_last(self): 915 resp, art_num, message_id = self.server.last() 916 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved") 917 self.assertEqual(art_num, 3000234) 918 self.assertEqual(message_id, "<45223423@example.com>") 919 920 def test_description(self): 921 desc = self.server.description("comp.lang.python") 922 self.assertEqual(desc, "The Python computer language.") 923 desc = self.server.description("comp.lang.pythonx") 924 self.assertEqual(desc, "") 925 926 def test_descriptions(self): 927 resp, groups = self.server.descriptions("comp.lang.python") 928 self.assertEqual(resp, '215 Descriptions in form "group description".') 929 self.assertEqual(groups, { 930 "comp.lang.python": "The Python computer language.", 931 }) 932 resp, groups = self.server.descriptions("comp.lang.python*") 933 self.assertEqual(groups, { 934 "comp.lang.python": "The Python computer language.", 935 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)", 936 }) 937 resp, groups = self.server.descriptions("comp.lang.pythonx") 938 self.assertEqual(groups, {}) 939 940 def test_group(self): 941 resp, count, first, last, group = self.server.group("fr.comp.lang.python") 942 self.assertTrue(resp.startswith("211 "), resp) 943 self.assertEqual(first, 761) 944 self.assertEqual(last, 1265) 945 self.assertEqual(count, 486) 946 self.assertEqual(group, "fr.comp.lang.python") 947 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 948 self.server.group("comp.lang.python.devel") 949 exc = cm.exception 950 self.assertTrue(exc.response.startswith("411 No such group"), 951 exc.response) 952 953 def test_newnews(self): 954 # NEWNEWS comp.lang.python [20]100913 082004 955 dt = datetime.datetime(2010, 9, 13, 8, 20, 4) 956 resp, ids = self.server.newnews("comp.lang.python", dt) 957 expected = ( 958 "230 list of newsarticles (NNTP v{0}) " 959 "created after Mon Sep 13 08:20:04 2010 follows" 960 ).format(self.nntp_version) 961 self.assertEqual(resp, expected) 962 self.assertEqual(ids, [ 963 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>", 964 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>", 965 ]) 966 # NEWNEWS fr.comp.lang.python [20]100913 082004 967 dt = datetime.datetime(2010, 9, 13, 8, 20, 4) 968 resp, ids = self.server.newnews("fr.comp.lang.python", dt) 969 self.assertEqual(resp, "230 An empty list of newsarticles follows") 970 self.assertEqual(ids, []) 971 972 def _check_article_body(self, lines): 973 self.assertEqual(len(lines), 4) 974 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.") 975 self.assertEqual(lines[-2], b"") 976 self.assertEqual(lines[-3], b".Here is a dot-starting line.") 977 self.assertEqual(lines[-4], b"This is just a test article.") 978 979 def _check_article_head(self, lines): 980 self.assertEqual(len(lines), 4) 981 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>') 982 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>") 983 984 def _check_article_data(self, lines): 985 self.assertEqual(len(lines), 9) 986 self._check_article_head(lines[:4]) 987 self._check_article_body(lines[-4:]) 988 self.assertEqual(lines[4], b"") 989 990 def test_article(self): 991 # ARTICLE 992 resp, info = self.server.article() 993 self.assertEqual(resp, "220 3000237 <45223423@example.com>") 994 art_num, message_id, lines = info 995 self.assertEqual(art_num, 3000237) 996 self.assertEqual(message_id, "<45223423@example.com>") 997 self._check_article_data(lines) 998 # ARTICLE num 999 resp, info = self.server.article(3000234) 1000 self.assertEqual(resp, "220 3000234 <45223423@example.com>") 1001 art_num, message_id, lines = info 1002 self.assertEqual(art_num, 3000234) 1003 self.assertEqual(message_id, "<45223423@example.com>") 1004 self._check_article_data(lines) 1005 # ARTICLE id 1006 resp, info = self.server.article("<45223423@example.com>") 1007 self.assertEqual(resp, "220 0 <45223423@example.com>") 1008 art_num, message_id, lines = info 1009 self.assertEqual(art_num, 0) 1010 self.assertEqual(message_id, "<45223423@example.com>") 1011 self._check_article_data(lines) 1012 # Non-existent id 1013 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1014 self.server.article("<non-existent@example.com>") 1015 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1016 1017 def test_article_file(self): 1018 # With a "file" argument 1019 f = io.BytesIO() 1020 resp, info = self.server.article(file=f) 1021 self.assertEqual(resp, "220 3000237 <45223423@example.com>") 1022 art_num, message_id, lines = info 1023 self.assertEqual(art_num, 3000237) 1024 self.assertEqual(message_id, "<45223423@example.com>") 1025 self.assertEqual(lines, []) 1026 data = f.getvalue() 1027 self.assertTrue(data.startswith( 1028 b'From: "Demo User" <nobody@example.net>\r\n' 1029 b'Subject: I am just a test article\r\n' 1030 ), ascii(data)) 1031 self.assertTrue(data.endswith( 1032 b'This is just a test article.\r\n' 1033 b'.Here is a dot-starting line.\r\n' 1034 b'\r\n' 1035 b'-- Signed by Andr\xc3\xa9.\r\n' 1036 ), ascii(data)) 1037 1038 def test_head(self): 1039 # HEAD 1040 resp, info = self.server.head() 1041 self.assertEqual(resp, "221 3000237 <45223423@example.com>") 1042 art_num, message_id, lines = info 1043 self.assertEqual(art_num, 3000237) 1044 self.assertEqual(message_id, "<45223423@example.com>") 1045 self._check_article_head(lines) 1046 # HEAD num 1047 resp, info = self.server.head(3000234) 1048 self.assertEqual(resp, "221 3000234 <45223423@example.com>") 1049 art_num, message_id, lines = info 1050 self.assertEqual(art_num, 3000234) 1051 self.assertEqual(message_id, "<45223423@example.com>") 1052 self._check_article_head(lines) 1053 # HEAD id 1054 resp, info = self.server.head("<45223423@example.com>") 1055 self.assertEqual(resp, "221 0 <45223423@example.com>") 1056 art_num, message_id, lines = info 1057 self.assertEqual(art_num, 0) 1058 self.assertEqual(message_id, "<45223423@example.com>") 1059 self._check_article_head(lines) 1060 # Non-existent id 1061 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1062 self.server.head("<non-existent@example.com>") 1063 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1064 1065 def test_head_file(self): 1066 f = io.BytesIO() 1067 resp, info = self.server.head(file=f) 1068 self.assertEqual(resp, "221 3000237 <45223423@example.com>") 1069 art_num, message_id, lines = info 1070 self.assertEqual(art_num, 3000237) 1071 self.assertEqual(message_id, "<45223423@example.com>") 1072 self.assertEqual(lines, []) 1073 data = f.getvalue() 1074 self.assertTrue(data.startswith( 1075 b'From: "Demo User" <nobody@example.net>\r\n' 1076 b'Subject: I am just a test article\r\n' 1077 ), ascii(data)) 1078 self.assertFalse(data.endswith( 1079 b'This is just a test article.\r\n' 1080 b'.Here is a dot-starting line.\r\n' 1081 b'\r\n' 1082 b'-- Signed by Andr\xc3\xa9.\r\n' 1083 ), ascii(data)) 1084 1085 def test_body(self): 1086 # BODY 1087 resp, info = self.server.body() 1088 self.assertEqual(resp, "222 3000237 <45223423@example.com>") 1089 art_num, message_id, lines = info 1090 self.assertEqual(art_num, 3000237) 1091 self.assertEqual(message_id, "<45223423@example.com>") 1092 self._check_article_body(lines) 1093 # BODY num 1094 resp, info = self.server.body(3000234) 1095 self.assertEqual(resp, "222 3000234 <45223423@example.com>") 1096 art_num, message_id, lines = info 1097 self.assertEqual(art_num, 3000234) 1098 self.assertEqual(message_id, "<45223423@example.com>") 1099 self._check_article_body(lines) 1100 # BODY id 1101 resp, info = self.server.body("<45223423@example.com>") 1102 self.assertEqual(resp, "222 0 <45223423@example.com>") 1103 art_num, message_id, lines = info 1104 self.assertEqual(art_num, 0) 1105 self.assertEqual(message_id, "<45223423@example.com>") 1106 self._check_article_body(lines) 1107 # Non-existent id 1108 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1109 self.server.body("<non-existent@example.com>") 1110 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1111 1112 def test_body_file(self): 1113 f = io.BytesIO() 1114 resp, info = self.server.body(file=f) 1115 self.assertEqual(resp, "222 3000237 <45223423@example.com>") 1116 art_num, message_id, lines = info 1117 self.assertEqual(art_num, 3000237) 1118 self.assertEqual(message_id, "<45223423@example.com>") 1119 self.assertEqual(lines, []) 1120 data = f.getvalue() 1121 self.assertFalse(data.startswith( 1122 b'From: "Demo User" <nobody@example.net>\r\n' 1123 b'Subject: I am just a test article\r\n' 1124 ), ascii(data)) 1125 self.assertTrue(data.endswith( 1126 b'This is just a test article.\r\n' 1127 b'.Here is a dot-starting line.\r\n' 1128 b'\r\n' 1129 b'-- Signed by Andr\xc3\xa9.\r\n' 1130 ), ascii(data)) 1131 1132 def check_over_xover_resp(self, resp, overviews): 1133 self.assertTrue(resp.startswith("224 "), resp) 1134 self.assertEqual(len(overviews), 3) 1135 art_num, over = overviews[0] 1136 self.assertEqual(art_num, 57) 1137 self.assertEqual(over, { 1138 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>", 1139 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout", 1140 "date": "Sat, 19 Jun 2010 18:04:08 -0400", 1141 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>", 1142 "references": "<hvalf7$ort$1@dough.gmane.org>", 1143 ":bytes": "7103", 1144 ":lines": "16", 1145 "xref": "news.gmane.io gmane.comp.python.authors:57" 1146 }) 1147 art_num, over = overviews[1] 1148 self.assertEqual(over["xref"], None) 1149 art_num, over = overviews[2] 1150 self.assertEqual(over["subject"], 1151 "Re: Message d'erreur incompréhensible (par moi)") 1152 1153 def test_xover(self): 1154 resp, overviews = self.server.xover(57, 59) 1155 self.check_over_xover_resp(resp, overviews) 1156 1157 def test_over(self): 1158 # In NNTP "v1", this will fallback on XOVER 1159 resp, overviews = self.server.over((57, 59)) 1160 self.check_over_xover_resp(resp, overviews) 1161 1162 sample_post = ( 1163 b'From: "Demo User" <nobody@example.net>\r\n' 1164 b'Subject: I am just a test article\r\n' 1165 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n' 1166 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n' 1167 b'\r\n' 1168 b'This is just a test article.\r\n' 1169 b'.Here is a dot-starting line.\r\n' 1170 b'\r\n' 1171 b'-- Signed by Andr\xc3\xa9.\r\n' 1172 ) 1173 1174 def _check_posted_body(self): 1175 # Check the raw body as received by the server 1176 lines = self.handler.posted_body 1177 # One additional line for the "." terminator 1178 self.assertEqual(len(lines), 10) 1179 self.assertEqual(lines[-1], b'.\r\n') 1180 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n') 1181 self.assertEqual(lines[-3], b'\r\n') 1182 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n') 1183 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n') 1184 1185 def _check_post_ihave_sub(self, func, *args, file_factory): 1186 # First the prepared post with CRLF endings 1187 post = self.sample_post 1188 func_args = args + (file_factory(post),) 1189 self.handler.posted_body = None 1190 resp = func(*func_args) 1191 self._check_posted_body() 1192 # Then the same post with "normal" line endings - they should be 1193 # converted by NNTP.post and NNTP.ihave. 1194 post = self.sample_post.replace(b"\r\n", b"\n") 1195 func_args = args + (file_factory(post),) 1196 self.handler.posted_body = None 1197 resp = func(*func_args) 1198 self._check_posted_body() 1199 return resp 1200 1201 def check_post_ihave(self, func, success_resp, *args): 1202 # With a bytes object 1203 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes) 1204 self.assertEqual(resp, success_resp) 1205 # With a bytearray object 1206 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray) 1207 self.assertEqual(resp, success_resp) 1208 # With a file object 1209 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO) 1210 self.assertEqual(resp, success_resp) 1211 # With an iterable of terminated lines 1212 def iterlines(b): 1213 return iter(b.splitlines(keepends=True)) 1214 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) 1215 self.assertEqual(resp, success_resp) 1216 # With an iterable of non-terminated lines 1217 def iterlines(b): 1218 return iter(b.splitlines(keepends=False)) 1219 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) 1220 self.assertEqual(resp, success_resp) 1221 1222 def test_post(self): 1223 self.check_post_ihave(self.server.post, "240 Article received OK") 1224 self.handler.allow_posting = False 1225 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1226 self.server.post(self.sample_post) 1227 self.assertEqual(cm.exception.response, 1228 "440 Posting not permitted") 1229 1230 def test_ihave(self): 1231 self.check_post_ihave(self.server.ihave, "235 Article transferred OK", 1232 "<i.am.an.article.you.will.want@example.com>") 1233 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1234 self.server.ihave("<another.message.id>", self.sample_post) 1235 self.assertEqual(cm.exception.response, 1236 "435 Article not wanted") 1237 1238 def test_too_long_lines(self): 1239 dt = datetime.datetime(2010, 1, 1, 9, 0, 0) 1240 self.assertRaises(nntplib.NNTPDataError, 1241 self.server.newnews, "comp.lang.python", dt) 1242 1243 1244class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): 1245 """Tests an NNTP v1 server (no capabilities).""" 1246 1247 nntp_version = 1 1248 handler_class = NNTPv1Handler 1249 1250 def test_caps(self): 1251 caps = self.server.getcapabilities() 1252 self.assertEqual(caps, {}) 1253 self.assertEqual(self.server.nntp_version, 1) 1254 self.assertEqual(self.server.nntp_implementation, None) 1255 1256 1257class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): 1258 """Tests an NNTP v2 server (with capabilities).""" 1259 1260 nntp_version = 2 1261 handler_class = NNTPv2Handler 1262 1263 def test_caps(self): 1264 caps = self.server.getcapabilities() 1265 self.assertEqual(caps, { 1266 'VERSION': ['2', '3'], 1267 'IMPLEMENTATION': ['INN', '2.5.1'], 1268 'AUTHINFO': ['USER'], 1269 'HDR': [], 1270 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS', 1271 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'], 1272 'OVER': [], 1273 'POST': [], 1274 'READER': [], 1275 }) 1276 self.assertEqual(self.server.nntp_version, 3) 1277 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1') 1278 1279 1280class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase): 1281 """Tests a probably NNTP v2 server with capabilities only after login.""" 1282 1283 nntp_version = 2 1284 handler_class = CapsAfterLoginNNTPv2Handler 1285 1286 def test_caps_only_after_login(self): 1287 self.assertEqual(self.server._caps, {}) 1288 self.server.login('testuser', 'testpw') 1289 self.assertIn('VERSION', self.server._caps) 1290 1291 1292class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin, 1293 unittest.TestCase): 1294 """Same tests as for v2 but we tell NTTP to send MODE READER to a server 1295 that isn't in READER mode by default.""" 1296 1297 nntp_version = 2 1298 handler_class = ModeSwitchingNNTPv2Handler 1299 1300 def test_we_are_in_reader_mode_after_connect(self): 1301 self.assertIn('READER', self.server._caps) 1302 1303 1304class MiscTests(unittest.TestCase): 1305 1306 def test_decode_header(self): 1307 def gives(a, b): 1308 self.assertEqual(nntplib.decode_header(a), b) 1309 gives("" , "") 1310 gives("a plain header", "a plain header") 1311 gives(" with extra spaces ", " with extra spaces ") 1312 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python") 1313 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?=" 1314 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=", 1315 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées") 1316 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=", 1317 "Re: problème de matrice") 1318 # A natively utf-8 header (found in the real world!) 1319 gives("Re: Message d'erreur incompréhensible (par moi)", 1320 "Re: Message d'erreur incompréhensible (par moi)") 1321 1322 def test_parse_overview_fmt(self): 1323 # The minimal (default) response 1324 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1325 "References:", ":bytes", ":lines"] 1326 self.assertEqual(nntplib._parse_overview_fmt(lines), 1327 ["subject", "from", "date", "message-id", "references", 1328 ":bytes", ":lines"]) 1329 # The minimal response using alternative names 1330 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1331 "References:", "Bytes:", "Lines:"] 1332 self.assertEqual(nntplib._parse_overview_fmt(lines), 1333 ["subject", "from", "date", "message-id", "references", 1334 ":bytes", ":lines"]) 1335 # Variations in casing 1336 lines = ["subject:", "FROM:", "DaTe:", "message-ID:", 1337 "References:", "BYTES:", "Lines:"] 1338 self.assertEqual(nntplib._parse_overview_fmt(lines), 1339 ["subject", "from", "date", "message-id", "references", 1340 ":bytes", ":lines"]) 1341 # First example from RFC 3977 1342 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1343 "References:", ":bytes", ":lines", "Xref:full", 1344 "Distribution:full"] 1345 self.assertEqual(nntplib._parse_overview_fmt(lines), 1346 ["subject", "from", "date", "message-id", "references", 1347 ":bytes", ":lines", "xref", "distribution"]) 1348 # Second example from RFC 3977 1349 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1350 "References:", "Bytes:", "Lines:", "Xref:FULL", 1351 "Distribution:FULL"] 1352 self.assertEqual(nntplib._parse_overview_fmt(lines), 1353 ["subject", "from", "date", "message-id", "references", 1354 ":bytes", ":lines", "xref", "distribution"]) 1355 # A classic response from INN 1356 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1357 "References:", "Bytes:", "Lines:", "Xref:full"] 1358 self.assertEqual(nntplib._parse_overview_fmt(lines), 1359 ["subject", "from", "date", "message-id", "references", 1360 ":bytes", ":lines", "xref"]) 1361 1362 def test_parse_overview(self): 1363 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"] 1364 # First example from RFC 3977 1365 lines = [ 1366 '3000234\tI am just a test article\t"Demo User" ' 1367 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1368 '<45223423@example.com>\t<45454@example.net>\t1234\t' 1369 '17\tXref: news.example.com misc.test:3000363', 1370 ] 1371 overview = nntplib._parse_overview(lines, fmt) 1372 (art_num, fields), = overview 1373 self.assertEqual(art_num, 3000234) 1374 self.assertEqual(fields, { 1375 'subject': 'I am just a test article', 1376 'from': '"Demo User" <nobody@example.com>', 1377 'date': '6 Oct 1998 04:38:40 -0500', 1378 'message-id': '<45223423@example.com>', 1379 'references': '<45454@example.net>', 1380 ':bytes': '1234', 1381 ':lines': '17', 1382 'xref': 'news.example.com misc.test:3000363', 1383 }) 1384 # Second example; here the "Xref" field is totally absent (including 1385 # the header name) and comes out as None 1386 lines = [ 1387 '3000234\tI am just a test article\t"Demo User" ' 1388 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1389 '<45223423@example.com>\t<45454@example.net>\t1234\t' 1390 '17\t\t', 1391 ] 1392 overview = nntplib._parse_overview(lines, fmt) 1393 (art_num, fields), = overview 1394 self.assertEqual(fields['xref'], None) 1395 # Third example; the "Xref" is an empty string, while "references" 1396 # is a single space. 1397 lines = [ 1398 '3000234\tI am just a test article\t"Demo User" ' 1399 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1400 '<45223423@example.com>\t \t1234\t' 1401 '17\tXref: \t', 1402 ] 1403 overview = nntplib._parse_overview(lines, fmt) 1404 (art_num, fields), = overview 1405 self.assertEqual(fields['references'], ' ') 1406 self.assertEqual(fields['xref'], '') 1407 1408 def test_parse_datetime(self): 1409 def gives(a, b, *c): 1410 self.assertEqual(nntplib._parse_datetime(a, b), 1411 datetime.datetime(*c)) 1412 # Output of DATE command 1413 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24) 1414 # Variations 1415 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24) 1416 gives("990623", "135624", 1999, 6, 23, 13, 56, 24) 1417 gives("090623", "135624", 2009, 6, 23, 13, 56, 24) 1418 1419 def test_unparse_datetime(self): 1420 # Test non-legacy mode 1421 # 1) with a datetime 1422 def gives(y, M, d, h, m, s, date_str, time_str): 1423 dt = datetime.datetime(y, M, d, h, m, s) 1424 self.assertEqual(nntplib._unparse_datetime(dt), 1425 (date_str, time_str)) 1426 self.assertEqual(nntplib._unparse_datetime(dt, False), 1427 (date_str, time_str)) 1428 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624") 1429 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624") 1430 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203") 1431 # 2) with a date 1432 def gives(y, M, d, date_str, time_str): 1433 dt = datetime.date(y, M, d) 1434 self.assertEqual(nntplib._unparse_datetime(dt), 1435 (date_str, time_str)) 1436 self.assertEqual(nntplib._unparse_datetime(dt, False), 1437 (date_str, time_str)) 1438 gives(1999, 6, 23, "19990623", "000000") 1439 gives(2000, 6, 23, "20000623", "000000") 1440 gives(2010, 6, 5, "20100605", "000000") 1441 1442 def test_unparse_datetime_legacy(self): 1443 # Test legacy mode (RFC 977) 1444 # 1) with a datetime 1445 def gives(y, M, d, h, m, s, date_str, time_str): 1446 dt = datetime.datetime(y, M, d, h, m, s) 1447 self.assertEqual(nntplib._unparse_datetime(dt, True), 1448 (date_str, time_str)) 1449 gives(1999, 6, 23, 13, 56, 24, "990623", "135624") 1450 gives(2000, 6, 23, 13, 56, 24, "000623", "135624") 1451 gives(2010, 6, 5, 1, 2, 3, "100605", "010203") 1452 # 2) with a date 1453 def gives(y, M, d, date_str, time_str): 1454 dt = datetime.date(y, M, d) 1455 self.assertEqual(nntplib._unparse_datetime(dt, True), 1456 (date_str, time_str)) 1457 gives(1999, 6, 23, "990623", "000000") 1458 gives(2000, 6, 23, "000623", "000000") 1459 gives(2010, 6, 5, "100605", "000000") 1460 1461 @unittest.skipUnless(ssl, 'requires SSL support') 1462 def test_ssl_support(self): 1463 self.assertTrue(hasattr(nntplib, 'NNTP_SSL')) 1464 1465 1466class PublicAPITests(unittest.TestCase): 1467 """Ensures that the correct values are exposed in the public API.""" 1468 1469 def test_module_all_attribute(self): 1470 self.assertTrue(hasattr(nntplib, '__all__')) 1471 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError', 1472 'NNTPTemporaryError', 'NNTPPermanentError', 1473 'NNTPProtocolError', 'NNTPDataError', 'decode_header'] 1474 if ssl is not None: 1475 target_api.append('NNTP_SSL') 1476 self.assertEqual(set(nntplib.__all__), set(target_api)) 1477 1478class MockSocketTests(unittest.TestCase): 1479 """Tests involving a mock socket object 1480 1481 Used where the _NNTPServerIO file object is not enough.""" 1482 1483 nntp_class = nntplib.NNTP 1484 1485 def check_constructor_error_conditions( 1486 self, handler_class, 1487 expected_error_type, expected_error_msg, 1488 login=None, password=None): 1489 1490 class mock_socket_module: 1491 def create_connection(address, timeout): 1492 return MockSocket() 1493 1494 class MockSocket: 1495 def close(self): 1496 nonlocal socket_closed 1497 socket_closed = True 1498 1499 def makefile(socket, mode): 1500 handler = handler_class() 1501 _, file = make_mock_file(handler) 1502 files.append(file) 1503 return file 1504 1505 socket_closed = False 1506 files = [] 1507 with patch('nntplib.socket', mock_socket_module), \ 1508 self.assertRaisesRegex(expected_error_type, expected_error_msg): 1509 self.nntp_class('dummy', user=login, password=password) 1510 self.assertTrue(socket_closed) 1511 for f in files: 1512 self.assertTrue(f.closed) 1513 1514 def test_bad_welcome(self): 1515 #Test a bad welcome message 1516 class Handler(NNTPv1Handler): 1517 welcome = 'Bad Welcome' 1518 self.check_constructor_error_conditions( 1519 Handler, nntplib.NNTPProtocolError, Handler.welcome) 1520 1521 def test_service_temporarily_unavailable(self): 1522 #Test service temporarily unavailable 1523 class Handler(NNTPv1Handler): 1524 welcome = '400 Service temporarily unavailable' 1525 self.check_constructor_error_conditions( 1526 Handler, nntplib.NNTPTemporaryError, Handler.welcome) 1527 1528 def test_service_permanently_unavailable(self): 1529 #Test service permanently unavailable 1530 class Handler(NNTPv1Handler): 1531 welcome = '502 Service permanently unavailable' 1532 self.check_constructor_error_conditions( 1533 Handler, nntplib.NNTPPermanentError, Handler.welcome) 1534 1535 def test_bad_capabilities(self): 1536 #Test a bad capabilities response 1537 class Handler(NNTPv1Handler): 1538 def handle_CAPABILITIES(self): 1539 self.push_lit(capabilities_response) 1540 capabilities_response = '201 bad capability' 1541 self.check_constructor_error_conditions( 1542 Handler, nntplib.NNTPReplyError, capabilities_response) 1543 1544 def test_login_aborted(self): 1545 #Test a bad authinfo response 1546 login = 't@e.com' 1547 password = 'python' 1548 class Handler(NNTPv1Handler): 1549 def handle_AUTHINFO(self, *args): 1550 self.push_lit(authinfo_response) 1551 authinfo_response = '503 Mechanism not recognized' 1552 self.check_constructor_error_conditions( 1553 Handler, nntplib.NNTPPermanentError, authinfo_response, 1554 login, password) 1555 1556class bypass_context: 1557 """Bypass encryption and actual SSL module""" 1558 def wrap_socket(sock, **args): 1559 return sock 1560 1561@unittest.skipUnless(ssl, 'requires SSL support') 1562class MockSslTests(MockSocketTests): 1563 @staticmethod 1564 def nntp_class(*pos, **kw): 1565 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw) 1566 1567 1568class LocalServerTests(unittest.TestCase): 1569 def setUp(self): 1570 sock = socket.socket() 1571 port = socket_helper.bind_port(sock) 1572 sock.listen() 1573 self.background = threading.Thread( 1574 target=self.run_server, args=(sock,)) 1575 self.background.start() 1576 self.addCleanup(self.background.join) 1577 1578 self.nntp = NNTP(socket_helper.HOST, port, usenetrc=False).__enter__() 1579 self.addCleanup(self.nntp.__exit__, None, None, None) 1580 1581 def run_server(self, sock): 1582 # Could be generalized to handle more commands in separate methods 1583 with sock: 1584 [client, _] = sock.accept() 1585 with contextlib.ExitStack() as cleanup: 1586 cleanup.enter_context(client) 1587 reader = cleanup.enter_context(client.makefile('rb')) 1588 client.sendall(b'200 Server ready\r\n') 1589 while True: 1590 cmd = reader.readline() 1591 if cmd == b'CAPABILITIES\r\n': 1592 client.sendall( 1593 b'101 Capability list:\r\n' 1594 b'VERSION 2\r\n' 1595 b'STARTTLS\r\n' 1596 b'.\r\n' 1597 ) 1598 elif cmd == b'STARTTLS\r\n': 1599 reader.close() 1600 client.sendall(b'382 Begin TLS negotiation now\r\n') 1601 context = ssl.SSLContext() 1602 context.load_cert_chain(certfile) 1603 client = context.wrap_socket( 1604 client, server_side=True) 1605 cleanup.enter_context(client) 1606 reader = cleanup.enter_context(client.makefile('rb')) 1607 elif cmd == b'QUIT\r\n': 1608 client.sendall(b'205 Bye!\r\n') 1609 break 1610 else: 1611 raise ValueError('Unexpected command {!r}'.format(cmd)) 1612 1613 @unittest.skipUnless(ssl, 'requires SSL support') 1614 def test_starttls(self): 1615 file = self.nntp.file 1616 sock = self.nntp.sock 1617 self.nntp.starttls() 1618 # Check that the socket and internal pseudo-file really were 1619 # changed. 1620 self.assertNotEqual(file, self.nntp.file) 1621 self.assertNotEqual(sock, self.nntp.sock) 1622 # Check that the new socket really is an SSL one 1623 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket) 1624 # Check that trying starttls when it's already active fails. 1625 self.assertRaises(ValueError, self.nntp.starttls) 1626 1627 1628if __name__ == "__main__": 1629 unittest.main() 1630