1import io
2import socket
3import datetime
4import textwrap
5import unittest
6import functools
7import contextlib
8import nntplib
9import os.path
10import re
11import threading
12
13from test import support
14from test.support import socket_helper
15from nntplib import NNTP, GroupInfo
16from unittest.mock import patch
17try:
18    import ssl
19except ImportError:
20    ssl = None
21
22
23certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
24
25if ssl is not None:
26    SSLError = ssl.SSLError
27else:
28    class SSLError(Exception):
29        """Non-existent exception class when we lack SSL support."""
30        reason = "This will never be raised."
31
32# TODO:
33# - test the `file` arg to more commands
34# - test error conditions
35# - test auth and `usenetrc`
36
37
38class NetworkedNNTPTestsMixin:
39
40    def test_welcome(self):
41        welcome = self.server.getwelcome()
42        self.assertEqual(str, type(welcome))
43
44    def test_help(self):
45        resp, lines = self.server.help()
46        self.assertTrue(resp.startswith("100 "), resp)
47        for line in lines:
48            self.assertEqual(str, type(line))
49
50    def test_list(self):
51        resp, groups = self.server.list()
52        if len(groups) > 0:
53            self.assertEqual(GroupInfo, type(groups[0]))
54            self.assertEqual(str, type(groups[0].group))
55
56    def test_list_active(self):
57        resp, groups = self.server.list(self.GROUP_PAT)
58        if len(groups) > 0:
59            self.assertEqual(GroupInfo, type(groups[0]))
60            self.assertEqual(str, type(groups[0].group))
61
62    def test_unknown_command(self):
63        with self.assertRaises(nntplib.NNTPPermanentError) as cm:
64            self.server._shortcmd("XYZZY")
65        resp = cm.exception.response
66        self.assertTrue(resp.startswith("500 "), resp)
67
68    def test_newgroups(self):
69        # gmane gets a constant influx of new groups.  In order not to stress
70        # the server too much, we choose a recent date in the past.
71        dt = datetime.date.today() - datetime.timedelta(days=7)
72        resp, groups = self.server.newgroups(dt)
73        if len(groups) > 0:
74            self.assertIsInstance(groups[0], GroupInfo)
75            self.assertIsInstance(groups[0].group, str)
76
77    def test_description(self):
78        def _check_desc(desc):
79            # Sanity checks
80            self.assertIsInstance(desc, str)
81            self.assertNotIn(self.GROUP_NAME, desc)
82        desc = self.server.description(self.GROUP_NAME)
83        _check_desc(desc)
84        # Another sanity check
85        self.assertIn("Python", desc)
86        # With a pattern
87        desc = self.server.description(self.GROUP_PAT)
88        _check_desc(desc)
89        # Shouldn't exist
90        desc = self.server.description("zk.brrtt.baz")
91        self.assertEqual(desc, '')
92
93    def test_descriptions(self):
94        resp, descs = self.server.descriptions(self.GROUP_PAT)
95        # 215 for LIST NEWSGROUPS, 282 for XGTITLE
96        self.assertTrue(
97            resp.startswith("215 ") or resp.startswith("282 "), resp)
98        self.assertIsInstance(descs, dict)
99        desc = descs[self.GROUP_NAME]
100        self.assertEqual(desc, self.server.description(self.GROUP_NAME))
101
102    def test_group(self):
103        result = self.server.group(self.GROUP_NAME)
104        self.assertEqual(5, len(result))
105        resp, count, first, last, group = result
106        self.assertEqual(group, self.GROUP_NAME)
107        self.assertIsInstance(count, int)
108        self.assertIsInstance(first, int)
109        self.assertIsInstance(last, int)
110        self.assertLessEqual(first, last)
111        self.assertTrue(resp.startswith("211 "), resp)
112
113    def test_date(self):
114        resp, date = self.server.date()
115        self.assertIsInstance(date, datetime.datetime)
116        # Sanity check
117        self.assertGreaterEqual(date.year, 1995)
118        self.assertLessEqual(date.year, 2030)
119
120    def _check_art_dict(self, art_dict):
121        # Some sanity checks for a field dictionary returned by OVER / XOVER
122        self.assertIsInstance(art_dict, dict)
123        # NNTP has 7 mandatory fields
124        self.assertGreaterEqual(art_dict.keys(),
125            {"subject", "from", "date", "message-id",
126             "references", ":bytes", ":lines"}
127            )
128        for v in art_dict.values():
129            self.assertIsInstance(v, (str, type(None)))
130
131    def test_xover(self):
132        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
133        resp, lines = self.server.xover(last - 5, last)
134        if len(lines) == 0:
135            self.skipTest("no articles retrieved")
136        # The 'last' article is not necessarily part of the output (cancelled?)
137        art_num, art_dict = lines[0]
138        self.assertGreaterEqual(art_num, last - 5)
139        self.assertLessEqual(art_num, last)
140        self._check_art_dict(art_dict)
141
142    @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
143                           ' is found for issue #28971')
144    def test_over(self):
145        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
146        start = last - 10
147        # The "start-" article range form
148        resp, lines = self.server.over((start, None))
149        art_num, art_dict = lines[0]
150        self._check_art_dict(art_dict)
151        # The "start-end" article range form
152        resp, lines = self.server.over((start, last))
153        art_num, art_dict = lines[-1]
154        # The 'last' article is not necessarily part of the output (cancelled?)
155        self.assertGreaterEqual(art_num, start)
156        self.assertLessEqual(art_num, last)
157        self._check_art_dict(art_dict)
158        # XXX The "message_id" form is unsupported by gmane
159        # 503 Overview by message-ID unsupported
160
161    def test_xhdr(self):
162        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
163        resp, lines = self.server.xhdr('subject', last)
164        for line in lines:
165            self.assertEqual(str, type(line[1]))
166
167    def check_article_resp(self, resp, article, art_num=None):
168        self.assertIsInstance(article, nntplib.ArticleInfo)
169        if art_num is not None:
170            self.assertEqual(article.number, art_num)
171        for line in article.lines:
172            self.assertIsInstance(line, bytes)
173        # XXX this could exceptionally happen...
174        self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
175
176    @unittest.skipIf(True, "FIXME: see bpo-32128")
177    def test_article_head_body(self):
178        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
179        # Try to find an available article
180        for art_num in (last, first, last - 1):
181            try:
182                resp, head = self.server.head(art_num)
183            except nntplib.NNTPTemporaryError as e:
184                if not e.response.startswith("423 "):
185                    raise
186                # "423 No such article" => choose another one
187                continue
188            break
189        else:
190            self.skipTest("could not find a suitable article number")
191        self.assertTrue(resp.startswith("221 "), resp)
192        self.check_article_resp(resp, head, art_num)
193        resp, body = self.server.body(art_num)
194        self.assertTrue(resp.startswith("222 "), resp)
195        self.check_article_resp(resp, body, art_num)
196        resp, article = self.server.article(art_num)
197        self.assertTrue(resp.startswith("220 "), resp)
198        self.check_article_resp(resp, article, art_num)
199        # Tolerate running the tests from behind a NNTP virus checker
200        blacklist = lambda line: line.startswith(b'X-Antivirus')
201        filtered_head_lines = [line for line in head.lines
202                               if not blacklist(line)]
203        filtered_lines = [line for line in article.lines
204                          if not blacklist(line)]
205        self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
206
207    def test_capabilities(self):
208        # The server under test implements NNTP version 2 and has a
209        # couple of well-known capabilities. Just sanity check that we
210        # got them.
211        def _check_caps(caps):
212            caps_list = caps['LIST']
213            self.assertIsInstance(caps_list, (list, tuple))
214            self.assertIn('OVERVIEW.FMT', caps_list)
215        self.assertGreaterEqual(self.server.nntp_version, 2)
216        _check_caps(self.server.getcapabilities())
217        # This re-emits the command
218        resp, caps = self.server.capabilities()
219        _check_caps(caps)
220
221    def test_zlogin(self):
222        # This test must be the penultimate because further commands will be
223        # refused.
224        baduser = "notarealuser"
225        badpw = "notarealpassword"
226        # Check that bogus credentials cause failure
227        self.assertRaises(nntplib.NNTPError, self.server.login,
228                          user=baduser, password=badpw, usenetrc=False)
229        # FIXME: We should check that correct credentials succeed, but that
230        # would require valid details for some server somewhere to be in the
231        # test suite, I think. Gmane is anonymous, at least as used for the
232        # other tests.
233
234    def test_zzquit(self):
235        # This test must be called last, hence the name
236        cls = type(self)
237        try:
238            self.server.quit()
239        finally:
240            cls.server = None
241
242    @classmethod
243    def wrap_methods(cls):
244        # Wrap all methods in a transient_internet() exception catcher
245        # XXX put a generic version in test.support?
246        def wrap_meth(meth):
247            @functools.wraps(meth)
248            def wrapped(self):
249                with socket_helper.transient_internet(self.NNTP_HOST):
250                    meth(self)
251            return wrapped
252        for name in dir(cls):
253            if not name.startswith('test_'):
254                continue
255            meth = getattr(cls, name)
256            if not callable(meth):
257                continue
258            # Need to use a closure so that meth remains bound to its current
259            # value
260            setattr(cls, name, wrap_meth(meth))
261
262    def test_timeout(self):
263        with self.assertRaises(ValueError):
264            self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False)
265
266    def test_with_statement(self):
267        def is_connected():
268            if not hasattr(server, 'file'):
269                return False
270            try:
271                server.help()
272            except (OSError, EOFError):
273                return False
274            return True
275
276        try:
277            server = self.NNTP_CLASS(self.NNTP_HOST,
278                                     timeout=support.INTERNET_TIMEOUT,
279                                     usenetrc=False)
280            with server:
281                self.assertTrue(is_connected())
282                self.assertTrue(server.help())
283            self.assertFalse(is_connected())
284
285            server = self.NNTP_CLASS(self.NNTP_HOST,
286                                     timeout=support.INTERNET_TIMEOUT,
287                                     usenetrc=False)
288            with server:
289                server.quit()
290            self.assertFalse(is_connected())
291        except SSLError as ssl_err:
292            # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
293            if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
294                raise unittest.SkipTest(f"Got {ssl_err} connecting "
295                                        f"to {self.NNTP_HOST!r}")
296            raise
297
298
299NetworkedNNTPTestsMixin.wrap_methods()
300
301
302EOF_ERRORS = (EOFError,)
303if ssl is not None:
304    EOF_ERRORS += (ssl.SSLEOFError,)
305
306
307class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
308    # This server supports STARTTLS (gmane doesn't)
309    NNTP_HOST = 'news.trigofacile.com'
310    GROUP_NAME = 'fr.comp.lang.python'
311    GROUP_PAT = 'fr.comp.lang.*'
312
313    NNTP_CLASS = NNTP
314
315    @classmethod
316    def setUpClass(cls):
317        support.requires("network")
318        with socket_helper.transient_internet(cls.NNTP_HOST):
319            try:
320                cls.server = cls.NNTP_CLASS(cls.NNTP_HOST,
321                                            timeout=support.INTERNET_TIMEOUT,
322                                            usenetrc=False)
323            except SSLError as ssl_err:
324                # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
325                if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
326                    raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
327                                            f"to {cls.NNTP_HOST!r}")
328                raise
329            except EOF_ERRORS:
330                raise unittest.SkipTest(f"{cls} got EOF error on connecting "
331                                        f"to {cls.NNTP_HOST!r}")
332
333    @classmethod
334    def tearDownClass(cls):
335        if cls.server is not None:
336            cls.server.quit()
337
338@unittest.skipUnless(ssl, 'requires SSL support')
339class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
340
341    # Technical limits for this public NNTP server (see http://www.aioe.org):
342    # "Only two concurrent connections per IP address are allowed and
343    # 400 connections per day are accepted from each IP address."
344
345    NNTP_HOST = 'nntp.aioe.org'
346    GROUP_NAME = 'comp.lang.python'
347    GROUP_PAT = 'comp.lang.*'
348
349    NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
350
351    # Disabled as it produces too much data
352    test_list = None
353
354    # Disabled as the connection will already be encrypted.
355    test_starttls = None
356
357
358#
359# Non-networked tests using a local server (or something mocking it).
360#
361
362class _NNTPServerIO(io.RawIOBase):
363    """A raw IO object allowing NNTP commands to be received and processed
364    by a handler.  The handler can push responses which can then be read
365    from the IO object."""
366
367    def __init__(self, handler):
368        io.RawIOBase.__init__(self)
369        # The channel from the client
370        self.c2s = io.BytesIO()
371        # The channel to the client
372        self.s2c = io.BytesIO()
373        self.handler = handler
374        self.handler.start(self.c2s.readline, self.push_data)
375
376    def readable(self):
377        return True
378
379    def writable(self):
380        return True
381
382    def push_data(self, data):
383        """Push (buffer) some data to send to the client."""
384        pos = self.s2c.tell()
385        self.s2c.seek(0, 2)
386        self.s2c.write(data)
387        self.s2c.seek(pos)
388
389    def write(self, b):
390        """The client sends us some data"""
391        pos = self.c2s.tell()
392        self.c2s.write(b)
393        self.c2s.seek(pos)
394        self.handler.process_pending()
395        return len(b)
396
397    def readinto(self, buf):
398        """The client wants to read a response"""
399        self.handler.process_pending()
400        b = self.s2c.read(len(buf))
401        n = len(b)
402        buf[:n] = b
403        return n
404
405
406def make_mock_file(handler):
407    sio = _NNTPServerIO(handler)
408    # Using BufferedRWPair instead of BufferedRandom ensures the file
409    # isn't seekable.
410    file = io.BufferedRWPair(sio, sio)
411    return (sio, file)
412
413
414class NNTPServer(nntplib.NNTP):
415
416    def __init__(self, f, host, readermode=None):
417        self.file = f
418        self.host = host
419        self._base_init(readermode)
420
421    def _close(self):
422        self.file.close()
423        del self.file
424
425
426class MockedNNTPTestsMixin:
427    # Override in derived classes
428    handler_class = None
429
430    def setUp(self):
431        super().setUp()
432        self.make_server()
433
434    def tearDown(self):
435        super().tearDown()
436        del self.server
437
438    def make_server(self, *args, **kwargs):
439        self.handler = self.handler_class()
440        self.sio, file = make_mock_file(self.handler)
441        self.server = NNTPServer(file, 'test.server', *args, **kwargs)
442        return self.server
443
444
445class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
446    def setUp(self):
447        super().setUp()
448        self.make_server(readermode=True)
449
450
451class NNTPv1Handler:
452    """A handler for RFC 977"""
453
454    welcome = "200 NNTP mock server"
455
456    def start(self, readline, push_data):
457        self.in_body = False
458        self.allow_posting = True
459        self._readline = readline
460        self._push_data = push_data
461        self._logged_in = False
462        self._user_sent = False
463        # Our welcome
464        self.handle_welcome()
465
466    def _decode(self, data):
467        return str(data, "utf-8", "surrogateescape")
468
469    def process_pending(self):
470        if self.in_body:
471            while True:
472                line = self._readline()
473                if not line:
474                    return
475                self.body.append(line)
476                if line == b".\r\n":
477                    break
478            try:
479                meth, tokens = self.body_callback
480                meth(*tokens, body=self.body)
481            finally:
482                self.body_callback = None
483                self.body = None
484                self.in_body = False
485        while True:
486            line = self._decode(self._readline())
487            if not line:
488                return
489            if not line.endswith("\r\n"):
490                raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
491            line = line[:-2]
492            cmd, *tokens = line.split()
493            #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
494            meth = getattr(self, "handle_" + cmd.upper(), None)
495            if meth is None:
496                self.handle_unknown()
497            else:
498                try:
499                    meth(*tokens)
500                except Exception as e:
501                    raise ValueError("command failed: {!r}".format(line)) from e
502                else:
503                    if self.in_body:
504                        self.body_callback = meth, tokens
505                        self.body = []
506
507    def expect_body(self):
508        """Flag that the client is expected to post a request body"""
509        self.in_body = True
510
511    def push_data(self, data):
512        """Push some binary data"""
513        self._push_data(data)
514
515    def push_lit(self, lit):
516        """Push a string literal"""
517        lit = textwrap.dedent(lit)
518        lit = "\r\n".join(lit.splitlines()) + "\r\n"
519        lit = lit.encode('utf-8')
520        self.push_data(lit)
521
522    def handle_unknown(self):
523        self.push_lit("500 What?")
524
525    def handle_welcome(self):
526        self.push_lit(self.welcome)
527
528    def handle_QUIT(self):
529        self.push_lit("205 Bye!")
530
531    def handle_DATE(self):
532        self.push_lit("111 20100914001155")
533
534    def handle_GROUP(self, group):
535        if group == "fr.comp.lang.python":
536            self.push_lit("211 486 761 1265 fr.comp.lang.python")
537        else:
538            self.push_lit("411 No such group {}".format(group))
539
540    def handle_HELP(self):
541        self.push_lit("""\
542            100 Legal commands
543              authinfo user Name|pass Password|generic <prog> <args>
544              date
545              help
546            Report problems to <root@example.org>
547            .""")
548
549    def handle_STAT(self, message_spec=None):
550        if message_spec is None:
551            self.push_lit("412 No newsgroup selected")
552        elif message_spec == "3000234":
553            self.push_lit("223 3000234 <45223423@example.com>")
554        elif message_spec == "<45223423@example.com>":
555            self.push_lit("223 0 <45223423@example.com>")
556        else:
557            self.push_lit("430 No Such Article Found")
558
559    def handle_NEXT(self):
560        self.push_lit("223 3000237 <668929@example.org> retrieved")
561
562    def handle_LAST(self):
563        self.push_lit("223 3000234 <45223423@example.com> retrieved")
564
565    def handle_LIST(self, action=None, param=None):
566        if action is None:
567            self.push_lit("""\
568                215 Newsgroups in form "group high low flags".
569                comp.lang.python 0000052340 0000002828 y
570                comp.lang.python.announce 0000001153 0000000993 m
571                free.it.comp.lang.python 0000000002 0000000002 y
572                fr.comp.lang.python 0000001254 0000000760 y
573                free.it.comp.lang.python.learner 0000000000 0000000001 y
574                tw.bbs.comp.lang.python 0000000304 0000000304 y
575                .""")
576        elif action == "ACTIVE":
577            if param == "*distutils*":
578                self.push_lit("""\
579                    215 Newsgroups in form "group high low flags"
580                    gmane.comp.python.distutils.devel 0000014104 0000000001 m
581                    gmane.comp.python.distutils.cvs 0000000000 0000000001 m
582                    .""")
583            else:
584                self.push_lit("""\
585                    215 Newsgroups in form "group high low flags"
586                    .""")
587        elif action == "OVERVIEW.FMT":
588            self.push_lit("""\
589                215 Order of fields in overview database.
590                Subject:
591                From:
592                Date:
593                Message-ID:
594                References:
595                Bytes:
596                Lines:
597                Xref:full
598                .""")
599        elif action == "NEWSGROUPS":
600            assert param is not None
601            if param == "comp.lang.python":
602                self.push_lit("""\
603                    215 Descriptions in form "group description".
604                    comp.lang.python\tThe Python computer language.
605                    .""")
606            elif param == "comp.lang.python*":
607                self.push_lit("""\
608                    215 Descriptions in form "group description".
609                    comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
610                    comp.lang.python\tThe Python computer language.
611                    .""")
612            else:
613                self.push_lit("""\
614                    215 Descriptions in form "group description".
615                    .""")
616        else:
617            self.push_lit('501 Unknown LIST keyword')
618
619    def handle_NEWNEWS(self, group, date_str, time_str):
620        # We hard code different return messages depending on passed
621        # argument and date syntax.
622        if (group == "comp.lang.python" and date_str == "20100913"
623            and time_str == "082004"):
624            # Date was passed in RFC 3977 format (NNTP "v2")
625            self.push_lit("""\
626                230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
627                <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
628                <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
629                .""")
630        elif (group == "comp.lang.python" and date_str == "100913"
631            and time_str == "082004"):
632            # Date was passed in RFC 977 format (NNTP "v1")
633            self.push_lit("""\
634                230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
635                <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
636                <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
637                .""")
638        elif (group == 'comp.lang.python' and
639              date_str in ('20100101', '100101') and
640              time_str == '090000'):
641            self.push_lit('too long line' * 3000 +
642                          '\n.')
643        else:
644            self.push_lit("""\
645                230 An empty list of newsarticles follows
646                .""")
647        # (Note for experiments: many servers disable NEWNEWS.
648        #  As of this writing, sicinfo3.epfl.ch doesn't.)
649
650    def handle_XOVER(self, message_spec):
651        if message_spec == "57-59":
652            self.push_lit(
653                "224 Overview information for 57-58 follows\n"
654                "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
655                    "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
656                    "\tSat, 19 Jun 2010 18:04:08 -0400"
657                    "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
658                    "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
659                    "\tXref: news.gmane.io gmane.comp.python.authors:57"
660                    "\n"
661                "58\tLooking for a few good bloggers"
662                    "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
663                    "\tThu, 22 Jul 2010 09:14:14 -0400"
664                    "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
665                    "\t\t6683\t16"
666                    "\t"
667                    "\n"
668                # A UTF-8 overview line from fr.comp.lang.python
669                "59\tRe: Message d'erreur incompréhensible (par moi)"
670                    "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
671                    "\tWed, 15 Sep 2010 18:09:15 +0200"
672                    "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
673                    "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
674                    "\tXref: saria.nerim.net fr.comp.lang.python:1265"
675                    "\n"
676                ".\n")
677        else:
678            self.push_lit("""\
679                224 No articles
680                .""")
681
682    def handle_POST(self, *, body=None):
683        if body is None:
684            if self.allow_posting:
685                self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
686                self.expect_body()
687            else:
688                self.push_lit("440 Posting not permitted")
689        else:
690            assert self.allow_posting
691            self.push_lit("240 Article received OK")
692            self.posted_body = body
693
694    def handle_IHAVE(self, message_id, *, body=None):
695        if body is None:
696            if (self.allow_posting and
697                message_id == "<i.am.an.article.you.will.want@example.com>"):
698                self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
699                self.expect_body()
700            else:
701                self.push_lit("435 Article not wanted")
702        else:
703            assert self.allow_posting
704            self.push_lit("235 Article transferred OK")
705            self.posted_body = body
706
707    sample_head = """\
708        From: "Demo User" <nobody@example.net>
709        Subject: I am just a test article
710        Content-Type: text/plain; charset=UTF-8; format=flowed
711        Message-ID: <i.am.an.article.you.will.want@example.com>"""
712
713    sample_body = """\
714        This is just a test article.
715        ..Here is a dot-starting line.
716
717        -- Signed by Andr\xe9."""
718
719    sample_article = sample_head + "\n\n" + sample_body
720
721    def handle_ARTICLE(self, message_spec=None):
722        if message_spec is None:
723            self.push_lit("220 3000237 <45223423@example.com>")
724        elif message_spec == "<45223423@example.com>":
725            self.push_lit("220 0 <45223423@example.com>")
726        elif message_spec == "3000234":
727            self.push_lit("220 3000234 <45223423@example.com>")
728        else:
729            self.push_lit("430 No Such Article Found")
730            return
731        self.push_lit(self.sample_article)
732        self.push_lit(".")
733
734    def handle_HEAD(self, message_spec=None):
735        if message_spec is None:
736            self.push_lit("221 3000237 <45223423@example.com>")
737        elif message_spec == "<45223423@example.com>":
738            self.push_lit("221 0 <45223423@example.com>")
739        elif message_spec == "3000234":
740            self.push_lit("221 3000234 <45223423@example.com>")
741        else:
742            self.push_lit("430 No Such Article Found")
743            return
744        self.push_lit(self.sample_head)
745        self.push_lit(".")
746
747    def handle_BODY(self, message_spec=None):
748        if message_spec is None:
749            self.push_lit("222 3000237 <45223423@example.com>")
750        elif message_spec == "<45223423@example.com>":
751            self.push_lit("222 0 <45223423@example.com>")
752        elif message_spec == "3000234":
753            self.push_lit("222 3000234 <45223423@example.com>")
754        else:
755            self.push_lit("430 No Such Article Found")
756            return
757        self.push_lit(self.sample_body)
758        self.push_lit(".")
759
760    def handle_AUTHINFO(self, cred_type, data):
761        if self._logged_in:
762            self.push_lit('502 Already Logged In')
763        elif cred_type == 'user':
764            if self._user_sent:
765                self.push_lit('482 User Credential Already Sent')
766            else:
767                self.push_lit('381 Password Required')
768                self._user_sent = True
769        elif cred_type == 'pass':
770            self.push_lit('281 Login Successful')
771            self._logged_in = True
772        else:
773            raise Exception('Unknown cred type {}'.format(cred_type))
774
775
776class NNTPv2Handler(NNTPv1Handler):
777    """A handler for RFC 3977 (NNTP "v2")"""
778
779    def handle_CAPABILITIES(self):
780        fmt = """\
781            101 Capability list:
782            VERSION 2 3
783            IMPLEMENTATION INN 2.5.1{}
784            HDR
785            LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
786            OVER
787            POST
788            READER
789            ."""
790
791        if not self._logged_in:
792            self.push_lit(fmt.format('\n            AUTHINFO USER'))
793        else:
794            self.push_lit(fmt.format(''))
795
796    def handle_MODE(self, _):
797        raise Exception('MODE READER sent despite READER has been advertised')
798
799    def handle_OVER(self, message_spec=None):
800        return self.handle_XOVER(message_spec)
801
802
803class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
804    """A handler that allows CAPABILITIES only after login"""
805
806    def handle_CAPABILITIES(self):
807        if not self._logged_in:
808            self.push_lit('480 You must log in.')
809        else:
810            super().handle_CAPABILITIES()
811
812
813class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
814    """A server that starts in transit mode"""
815
816    def __init__(self):
817        self._switched = False
818
819    def handle_CAPABILITIES(self):
820        fmt = """\
821            101 Capability list:
822            VERSION 2 3
823            IMPLEMENTATION INN 2.5.1
824            HDR
825            LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
826            OVER
827            POST
828            {}READER
829            ."""
830        if self._switched:
831            self.push_lit(fmt.format(''))
832        else:
833            self.push_lit(fmt.format('MODE-'))
834
835    def handle_MODE(self, what):
836        assert not self._switched and what == 'reader'
837        self._switched = True
838        self.push_lit('200 Posting allowed')
839
840
841class NNTPv1v2TestsMixin:
842
843    def setUp(self):
844        super().setUp()
845
846    def test_welcome(self):
847        self.assertEqual(self.server.welcome, self.handler.welcome)
848
849    def test_authinfo(self):
850        if self.nntp_version == 2:
851            self.assertIn('AUTHINFO', self.server._caps)
852        self.server.login('testuser', 'testpw')
853        # if AUTHINFO is gone from _caps we also know that getcapabilities()
854        # has been called after login as it should
855        self.assertNotIn('AUTHINFO', self.server._caps)
856
857    def test_date(self):
858        resp, date = self.server.date()
859        self.assertEqual(resp, "111 20100914001155")
860        self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
861
862    def test_quit(self):
863        self.assertFalse(self.sio.closed)
864        resp = self.server.quit()
865        self.assertEqual(resp, "205 Bye!")
866        self.assertTrue(self.sio.closed)
867
868    def test_help(self):
869        resp, help = self.server.help()
870        self.assertEqual(resp, "100 Legal commands")
871        self.assertEqual(help, [
872            '  authinfo user Name|pass Password|generic <prog> <args>',
873            '  date',
874            '  help',
875            'Report problems to <root@example.org>',
876        ])
877
878    def test_list(self):
879        resp, groups = self.server.list()
880        self.assertEqual(len(groups), 6)
881        g = groups[1]
882        self.assertEqual(g,
883            GroupInfo("comp.lang.python.announce", "0000001153",
884                      "0000000993", "m"))
885        resp, groups = self.server.list("*distutils*")
886        self.assertEqual(len(groups), 2)
887        g = groups[0]
888        self.assertEqual(g,
889            GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
890                      "0000000001", "m"))
891
892    def test_stat(self):
893        resp, art_num, message_id = self.server.stat(3000234)
894        self.assertEqual(resp, "223 3000234 <45223423@example.com>")
895        self.assertEqual(art_num, 3000234)
896        self.assertEqual(message_id, "<45223423@example.com>")
897        resp, art_num, message_id = self.server.stat("<45223423@example.com>")
898        self.assertEqual(resp, "223 0 <45223423@example.com>")
899        self.assertEqual(art_num, 0)
900        self.assertEqual(message_id, "<45223423@example.com>")
901        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
902            self.server.stat("<non.existent.id>")
903        self.assertEqual(cm.exception.response, "430 No Such Article Found")
904        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
905            self.server.stat()
906        self.assertEqual(cm.exception.response, "412 No newsgroup selected")
907
908    def test_next(self):
909        resp, art_num, message_id = self.server.next()
910        self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
911        self.assertEqual(art_num, 3000237)
912        self.assertEqual(message_id, "<668929@example.org>")
913
914    def test_last(self):
915        resp, art_num, message_id = self.server.last()
916        self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
917        self.assertEqual(art_num, 3000234)
918        self.assertEqual(message_id, "<45223423@example.com>")
919
920    def test_description(self):
921        desc = self.server.description("comp.lang.python")
922        self.assertEqual(desc, "The Python computer language.")
923        desc = self.server.description("comp.lang.pythonx")
924        self.assertEqual(desc, "")
925
926    def test_descriptions(self):
927        resp, groups = self.server.descriptions("comp.lang.python")
928        self.assertEqual(resp, '215 Descriptions in form "group description".')
929        self.assertEqual(groups, {
930            "comp.lang.python": "The Python computer language.",
931            })
932        resp, groups = self.server.descriptions("comp.lang.python*")
933        self.assertEqual(groups, {
934            "comp.lang.python": "The Python computer language.",
935            "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
936            })
937        resp, groups = self.server.descriptions("comp.lang.pythonx")
938        self.assertEqual(groups, {})
939
940    def test_group(self):
941        resp, count, first, last, group = self.server.group("fr.comp.lang.python")
942        self.assertTrue(resp.startswith("211 "), resp)
943        self.assertEqual(first, 761)
944        self.assertEqual(last, 1265)
945        self.assertEqual(count, 486)
946        self.assertEqual(group, "fr.comp.lang.python")
947        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
948            self.server.group("comp.lang.python.devel")
949        exc = cm.exception
950        self.assertTrue(exc.response.startswith("411 No such group"),
951                        exc.response)
952
953    def test_newnews(self):
954        # NEWNEWS comp.lang.python [20]100913 082004
955        dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
956        resp, ids = self.server.newnews("comp.lang.python", dt)
957        expected = (
958            "230 list of newsarticles (NNTP v{0}) "
959            "created after Mon Sep 13 08:20:04 2010 follows"
960            ).format(self.nntp_version)
961        self.assertEqual(resp, expected)
962        self.assertEqual(ids, [
963            "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
964            "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
965            ])
966        # NEWNEWS fr.comp.lang.python [20]100913 082004
967        dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
968        resp, ids = self.server.newnews("fr.comp.lang.python", dt)
969        self.assertEqual(resp, "230 An empty list of newsarticles follows")
970        self.assertEqual(ids, [])
971
972    def _check_article_body(self, lines):
973        self.assertEqual(len(lines), 4)
974        self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
975        self.assertEqual(lines[-2], b"")
976        self.assertEqual(lines[-3], b".Here is a dot-starting line.")
977        self.assertEqual(lines[-4], b"This is just a test article.")
978
979    def _check_article_head(self, lines):
980        self.assertEqual(len(lines), 4)
981        self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
982        self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
983
984    def _check_article_data(self, lines):
985        self.assertEqual(len(lines), 9)
986        self._check_article_head(lines[:4])
987        self._check_article_body(lines[-4:])
988        self.assertEqual(lines[4], b"")
989
990    def test_article(self):
991        # ARTICLE
992        resp, info = self.server.article()
993        self.assertEqual(resp, "220 3000237 <45223423@example.com>")
994        art_num, message_id, lines = info
995        self.assertEqual(art_num, 3000237)
996        self.assertEqual(message_id, "<45223423@example.com>")
997        self._check_article_data(lines)
998        # ARTICLE num
999        resp, info = self.server.article(3000234)
1000        self.assertEqual(resp, "220 3000234 <45223423@example.com>")
1001        art_num, message_id, lines = info
1002        self.assertEqual(art_num, 3000234)
1003        self.assertEqual(message_id, "<45223423@example.com>")
1004        self._check_article_data(lines)
1005        # ARTICLE id
1006        resp, info = self.server.article("<45223423@example.com>")
1007        self.assertEqual(resp, "220 0 <45223423@example.com>")
1008        art_num, message_id, lines = info
1009        self.assertEqual(art_num, 0)
1010        self.assertEqual(message_id, "<45223423@example.com>")
1011        self._check_article_data(lines)
1012        # Non-existent id
1013        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1014            self.server.article("<non-existent@example.com>")
1015        self.assertEqual(cm.exception.response, "430 No Such Article Found")
1016
1017    def test_article_file(self):
1018        # With a "file" argument
1019        f = io.BytesIO()
1020        resp, info = self.server.article(file=f)
1021        self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1022        art_num, message_id, lines = info
1023        self.assertEqual(art_num, 3000237)
1024        self.assertEqual(message_id, "<45223423@example.com>")
1025        self.assertEqual(lines, [])
1026        data = f.getvalue()
1027        self.assertTrue(data.startswith(
1028            b'From: "Demo User" <nobody@example.net>\r\n'
1029            b'Subject: I am just a test article\r\n'
1030            ), ascii(data))
1031        self.assertTrue(data.endswith(
1032            b'This is just a test article.\r\n'
1033            b'.Here is a dot-starting line.\r\n'
1034            b'\r\n'
1035            b'-- Signed by Andr\xc3\xa9.\r\n'
1036            ), ascii(data))
1037
1038    def test_head(self):
1039        # HEAD
1040        resp, info = self.server.head()
1041        self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1042        art_num, message_id, lines = info
1043        self.assertEqual(art_num, 3000237)
1044        self.assertEqual(message_id, "<45223423@example.com>")
1045        self._check_article_head(lines)
1046        # HEAD num
1047        resp, info = self.server.head(3000234)
1048        self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1049        art_num, message_id, lines = info
1050        self.assertEqual(art_num, 3000234)
1051        self.assertEqual(message_id, "<45223423@example.com>")
1052        self._check_article_head(lines)
1053        # HEAD id
1054        resp, info = self.server.head("<45223423@example.com>")
1055        self.assertEqual(resp, "221 0 <45223423@example.com>")
1056        art_num, message_id, lines = info
1057        self.assertEqual(art_num, 0)
1058        self.assertEqual(message_id, "<45223423@example.com>")
1059        self._check_article_head(lines)
1060        # Non-existent id
1061        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1062            self.server.head("<non-existent@example.com>")
1063        self.assertEqual(cm.exception.response, "430 No Such Article Found")
1064
1065    def test_head_file(self):
1066        f = io.BytesIO()
1067        resp, info = self.server.head(file=f)
1068        self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1069        art_num, message_id, lines = info
1070        self.assertEqual(art_num, 3000237)
1071        self.assertEqual(message_id, "<45223423@example.com>")
1072        self.assertEqual(lines, [])
1073        data = f.getvalue()
1074        self.assertTrue(data.startswith(
1075            b'From: "Demo User" <nobody@example.net>\r\n'
1076            b'Subject: I am just a test article\r\n'
1077            ), ascii(data))
1078        self.assertFalse(data.endswith(
1079            b'This is just a test article.\r\n'
1080            b'.Here is a dot-starting line.\r\n'
1081            b'\r\n'
1082            b'-- Signed by Andr\xc3\xa9.\r\n'
1083            ), ascii(data))
1084
1085    def test_body(self):
1086        # BODY
1087        resp, info = self.server.body()
1088        self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1089        art_num, message_id, lines = info
1090        self.assertEqual(art_num, 3000237)
1091        self.assertEqual(message_id, "<45223423@example.com>")
1092        self._check_article_body(lines)
1093        # BODY num
1094        resp, info = self.server.body(3000234)
1095        self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1096        art_num, message_id, lines = info
1097        self.assertEqual(art_num, 3000234)
1098        self.assertEqual(message_id, "<45223423@example.com>")
1099        self._check_article_body(lines)
1100        # BODY id
1101        resp, info = self.server.body("<45223423@example.com>")
1102        self.assertEqual(resp, "222 0 <45223423@example.com>")
1103        art_num, message_id, lines = info
1104        self.assertEqual(art_num, 0)
1105        self.assertEqual(message_id, "<45223423@example.com>")
1106        self._check_article_body(lines)
1107        # Non-existent id
1108        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1109            self.server.body("<non-existent@example.com>")
1110        self.assertEqual(cm.exception.response, "430 No Such Article Found")
1111
1112    def test_body_file(self):
1113        f = io.BytesIO()
1114        resp, info = self.server.body(file=f)
1115        self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1116        art_num, message_id, lines = info
1117        self.assertEqual(art_num, 3000237)
1118        self.assertEqual(message_id, "<45223423@example.com>")
1119        self.assertEqual(lines, [])
1120        data = f.getvalue()
1121        self.assertFalse(data.startswith(
1122            b'From: "Demo User" <nobody@example.net>\r\n'
1123            b'Subject: I am just a test article\r\n'
1124            ), ascii(data))
1125        self.assertTrue(data.endswith(
1126            b'This is just a test article.\r\n'
1127            b'.Here is a dot-starting line.\r\n'
1128            b'\r\n'
1129            b'-- Signed by Andr\xc3\xa9.\r\n'
1130            ), ascii(data))
1131
1132    def check_over_xover_resp(self, resp, overviews):
1133        self.assertTrue(resp.startswith("224 "), resp)
1134        self.assertEqual(len(overviews), 3)
1135        art_num, over = overviews[0]
1136        self.assertEqual(art_num, 57)
1137        self.assertEqual(over, {
1138            "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1139            "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1140            "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1141            "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1142            "references": "<hvalf7$ort$1@dough.gmane.org>",
1143            ":bytes": "7103",
1144            ":lines": "16",
1145            "xref": "news.gmane.io gmane.comp.python.authors:57"
1146            })
1147        art_num, over = overviews[1]
1148        self.assertEqual(over["xref"], None)
1149        art_num, over = overviews[2]
1150        self.assertEqual(over["subject"],
1151                         "Re: Message d'erreur incompréhensible (par moi)")
1152
1153    def test_xover(self):
1154        resp, overviews = self.server.xover(57, 59)
1155        self.check_over_xover_resp(resp, overviews)
1156
1157    def test_over(self):
1158        # In NNTP "v1", this will fallback on XOVER
1159        resp, overviews = self.server.over((57, 59))
1160        self.check_over_xover_resp(resp, overviews)
1161
1162    sample_post = (
1163        b'From: "Demo User" <nobody@example.net>\r\n'
1164        b'Subject: I am just a test article\r\n'
1165        b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1166        b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1167        b'\r\n'
1168        b'This is just a test article.\r\n'
1169        b'.Here is a dot-starting line.\r\n'
1170        b'\r\n'
1171        b'-- Signed by Andr\xc3\xa9.\r\n'
1172    )
1173
1174    def _check_posted_body(self):
1175        # Check the raw body as received by the server
1176        lines = self.handler.posted_body
1177        # One additional line for the "." terminator
1178        self.assertEqual(len(lines), 10)
1179        self.assertEqual(lines[-1], b'.\r\n')
1180        self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1181        self.assertEqual(lines[-3], b'\r\n')
1182        self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1183        self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1184
1185    def _check_post_ihave_sub(self, func, *args, file_factory):
1186        # First the prepared post with CRLF endings
1187        post = self.sample_post
1188        func_args = args + (file_factory(post),)
1189        self.handler.posted_body = None
1190        resp = func(*func_args)
1191        self._check_posted_body()
1192        # Then the same post with "normal" line endings - they should be
1193        # converted by NNTP.post and NNTP.ihave.
1194        post = self.sample_post.replace(b"\r\n", b"\n")
1195        func_args = args + (file_factory(post),)
1196        self.handler.posted_body = None
1197        resp = func(*func_args)
1198        self._check_posted_body()
1199        return resp
1200
1201    def check_post_ihave(self, func, success_resp, *args):
1202        # With a bytes object
1203        resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1204        self.assertEqual(resp, success_resp)
1205        # With a bytearray object
1206        resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1207        self.assertEqual(resp, success_resp)
1208        # With a file object
1209        resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1210        self.assertEqual(resp, success_resp)
1211        # With an iterable of terminated lines
1212        def iterlines(b):
1213            return iter(b.splitlines(keepends=True))
1214        resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1215        self.assertEqual(resp, success_resp)
1216        # With an iterable of non-terminated lines
1217        def iterlines(b):
1218            return iter(b.splitlines(keepends=False))
1219        resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1220        self.assertEqual(resp, success_resp)
1221
1222    def test_post(self):
1223        self.check_post_ihave(self.server.post, "240 Article received OK")
1224        self.handler.allow_posting = False
1225        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1226            self.server.post(self.sample_post)
1227        self.assertEqual(cm.exception.response,
1228                         "440 Posting not permitted")
1229
1230    def test_ihave(self):
1231        self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1232                              "<i.am.an.article.you.will.want@example.com>")
1233        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1234            self.server.ihave("<another.message.id>", self.sample_post)
1235        self.assertEqual(cm.exception.response,
1236                         "435 Article not wanted")
1237
1238    def test_too_long_lines(self):
1239        dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1240        self.assertRaises(nntplib.NNTPDataError,
1241                          self.server.newnews, "comp.lang.python", dt)
1242
1243
1244class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1245    """Tests an NNTP v1 server (no capabilities)."""
1246
1247    nntp_version = 1
1248    handler_class = NNTPv1Handler
1249
1250    def test_caps(self):
1251        caps = self.server.getcapabilities()
1252        self.assertEqual(caps, {})
1253        self.assertEqual(self.server.nntp_version, 1)
1254        self.assertEqual(self.server.nntp_implementation, None)
1255
1256
1257class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1258    """Tests an NNTP v2 server (with capabilities)."""
1259
1260    nntp_version = 2
1261    handler_class = NNTPv2Handler
1262
1263    def test_caps(self):
1264        caps = self.server.getcapabilities()
1265        self.assertEqual(caps, {
1266            'VERSION': ['2', '3'],
1267            'IMPLEMENTATION': ['INN', '2.5.1'],
1268            'AUTHINFO': ['USER'],
1269            'HDR': [],
1270            'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1271                     'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1272            'OVER': [],
1273            'POST': [],
1274            'READER': [],
1275            })
1276        self.assertEqual(self.server.nntp_version, 3)
1277        self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
1278
1279
1280class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1281    """Tests a probably NNTP v2 server with capabilities only after login."""
1282
1283    nntp_version = 2
1284    handler_class = CapsAfterLoginNNTPv2Handler
1285
1286    def test_caps_only_after_login(self):
1287        self.assertEqual(self.server._caps, {})
1288        self.server.login('testuser', 'testpw')
1289        self.assertIn('VERSION', self.server._caps)
1290
1291
1292class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1293        unittest.TestCase):
1294    """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1295    that isn't in READER mode by default."""
1296
1297    nntp_version = 2
1298    handler_class = ModeSwitchingNNTPv2Handler
1299
1300    def test_we_are_in_reader_mode_after_connect(self):
1301        self.assertIn('READER', self.server._caps)
1302
1303
1304class MiscTests(unittest.TestCase):
1305
1306    def test_decode_header(self):
1307        def gives(a, b):
1308            self.assertEqual(nntplib.decode_header(a), b)
1309        gives("" , "")
1310        gives("a plain header", "a plain header")
1311        gives(" with extra  spaces ", " with extra  spaces ")
1312        gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1313        gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1314              " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1315              "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1316        gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1317              "Re: problème de matrice")
1318        # A natively utf-8 header (found in the real world!)
1319        gives("Re: Message d'erreur incompréhensible (par moi)",
1320              "Re: Message d'erreur incompréhensible (par moi)")
1321
1322    def test_parse_overview_fmt(self):
1323        # The minimal (default) response
1324        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1325                 "References:", ":bytes", ":lines"]
1326        self.assertEqual(nntplib._parse_overview_fmt(lines),
1327            ["subject", "from", "date", "message-id", "references",
1328             ":bytes", ":lines"])
1329        # The minimal response using alternative names
1330        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1331                 "References:", "Bytes:", "Lines:"]
1332        self.assertEqual(nntplib._parse_overview_fmt(lines),
1333            ["subject", "from", "date", "message-id", "references",
1334             ":bytes", ":lines"])
1335        # Variations in casing
1336        lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1337                 "References:", "BYTES:", "Lines:"]
1338        self.assertEqual(nntplib._parse_overview_fmt(lines),
1339            ["subject", "from", "date", "message-id", "references",
1340             ":bytes", ":lines"])
1341        # First example from RFC 3977
1342        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1343                 "References:", ":bytes", ":lines", "Xref:full",
1344                 "Distribution:full"]
1345        self.assertEqual(nntplib._parse_overview_fmt(lines),
1346            ["subject", "from", "date", "message-id", "references",
1347             ":bytes", ":lines", "xref", "distribution"])
1348        # Second example from RFC 3977
1349        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1350                 "References:", "Bytes:", "Lines:", "Xref:FULL",
1351                 "Distribution:FULL"]
1352        self.assertEqual(nntplib._parse_overview_fmt(lines),
1353            ["subject", "from", "date", "message-id", "references",
1354             ":bytes", ":lines", "xref", "distribution"])
1355        # A classic response from INN
1356        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1357                 "References:", "Bytes:", "Lines:", "Xref:full"]
1358        self.assertEqual(nntplib._parse_overview_fmt(lines),
1359            ["subject", "from", "date", "message-id", "references",
1360             ":bytes", ":lines", "xref"])
1361
1362    def test_parse_overview(self):
1363        fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1364        # First example from RFC 3977
1365        lines = [
1366            '3000234\tI am just a test article\t"Demo User" '
1367            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1368            '<45223423@example.com>\t<45454@example.net>\t1234\t'
1369            '17\tXref: news.example.com misc.test:3000363',
1370        ]
1371        overview = nntplib._parse_overview(lines, fmt)
1372        (art_num, fields), = overview
1373        self.assertEqual(art_num, 3000234)
1374        self.assertEqual(fields, {
1375            'subject': 'I am just a test article',
1376            'from': '"Demo User" <nobody@example.com>',
1377            'date': '6 Oct 1998 04:38:40 -0500',
1378            'message-id': '<45223423@example.com>',
1379            'references': '<45454@example.net>',
1380            ':bytes': '1234',
1381            ':lines': '17',
1382            'xref': 'news.example.com misc.test:3000363',
1383        })
1384        # Second example; here the "Xref" field is totally absent (including
1385        # the header name) and comes out as None
1386        lines = [
1387            '3000234\tI am just a test article\t"Demo User" '
1388            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1389            '<45223423@example.com>\t<45454@example.net>\t1234\t'
1390            '17\t\t',
1391        ]
1392        overview = nntplib._parse_overview(lines, fmt)
1393        (art_num, fields), = overview
1394        self.assertEqual(fields['xref'], None)
1395        # Third example; the "Xref" is an empty string, while "references"
1396        # is a single space.
1397        lines = [
1398            '3000234\tI am just a test article\t"Demo User" '
1399            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1400            '<45223423@example.com>\t \t1234\t'
1401            '17\tXref: \t',
1402        ]
1403        overview = nntplib._parse_overview(lines, fmt)
1404        (art_num, fields), = overview
1405        self.assertEqual(fields['references'], ' ')
1406        self.assertEqual(fields['xref'], '')
1407
1408    def test_parse_datetime(self):
1409        def gives(a, b, *c):
1410            self.assertEqual(nntplib._parse_datetime(a, b),
1411                             datetime.datetime(*c))
1412        # Output of DATE command
1413        gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1414        # Variations
1415        gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1416        gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1417        gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1418
1419    def test_unparse_datetime(self):
1420        # Test non-legacy mode
1421        # 1) with a datetime
1422        def gives(y, M, d, h, m, s, date_str, time_str):
1423            dt = datetime.datetime(y, M, d, h, m, s)
1424            self.assertEqual(nntplib._unparse_datetime(dt),
1425                             (date_str, time_str))
1426            self.assertEqual(nntplib._unparse_datetime(dt, False),
1427                             (date_str, time_str))
1428        gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1429        gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1430        gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1431        # 2) with a date
1432        def gives(y, M, d, date_str, time_str):
1433            dt = datetime.date(y, M, d)
1434            self.assertEqual(nntplib._unparse_datetime(dt),
1435                             (date_str, time_str))
1436            self.assertEqual(nntplib._unparse_datetime(dt, False),
1437                             (date_str, time_str))
1438        gives(1999, 6, 23, "19990623", "000000")
1439        gives(2000, 6, 23, "20000623", "000000")
1440        gives(2010, 6, 5, "20100605", "000000")
1441
1442    def test_unparse_datetime_legacy(self):
1443        # Test legacy mode (RFC 977)
1444        # 1) with a datetime
1445        def gives(y, M, d, h, m, s, date_str, time_str):
1446            dt = datetime.datetime(y, M, d, h, m, s)
1447            self.assertEqual(nntplib._unparse_datetime(dt, True),
1448                             (date_str, time_str))
1449        gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1450        gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1451        gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1452        # 2) with a date
1453        def gives(y, M, d, date_str, time_str):
1454            dt = datetime.date(y, M, d)
1455            self.assertEqual(nntplib._unparse_datetime(dt, True),
1456                             (date_str, time_str))
1457        gives(1999, 6, 23, "990623", "000000")
1458        gives(2000, 6, 23, "000623", "000000")
1459        gives(2010, 6, 5, "100605", "000000")
1460
1461    @unittest.skipUnless(ssl, 'requires SSL support')
1462    def test_ssl_support(self):
1463        self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
1464
1465
1466class PublicAPITests(unittest.TestCase):
1467    """Ensures that the correct values are exposed in the public API."""
1468
1469    def test_module_all_attribute(self):
1470        self.assertTrue(hasattr(nntplib, '__all__'))
1471        target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1472                      'NNTPTemporaryError', 'NNTPPermanentError',
1473                      'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1474        if ssl is not None:
1475            target_api.append('NNTP_SSL')
1476        self.assertEqual(set(nntplib.__all__), set(target_api))
1477
1478class MockSocketTests(unittest.TestCase):
1479    """Tests involving a mock socket object
1480
1481    Used where the _NNTPServerIO file object is not enough."""
1482
1483    nntp_class = nntplib.NNTP
1484
1485    def check_constructor_error_conditions(
1486            self, handler_class,
1487            expected_error_type, expected_error_msg,
1488            login=None, password=None):
1489
1490        class mock_socket_module:
1491            def create_connection(address, timeout):
1492                return MockSocket()
1493
1494        class MockSocket:
1495            def close(self):
1496                nonlocal socket_closed
1497                socket_closed = True
1498
1499            def makefile(socket, mode):
1500                handler = handler_class()
1501                _, file = make_mock_file(handler)
1502                files.append(file)
1503                return file
1504
1505        socket_closed = False
1506        files = []
1507        with patch('nntplib.socket', mock_socket_module), \
1508             self.assertRaisesRegex(expected_error_type, expected_error_msg):
1509            self.nntp_class('dummy', user=login, password=password)
1510        self.assertTrue(socket_closed)
1511        for f in files:
1512            self.assertTrue(f.closed)
1513
1514    def test_bad_welcome(self):
1515        #Test a bad welcome message
1516        class Handler(NNTPv1Handler):
1517            welcome = 'Bad Welcome'
1518        self.check_constructor_error_conditions(
1519            Handler, nntplib.NNTPProtocolError, Handler.welcome)
1520
1521    def test_service_temporarily_unavailable(self):
1522        #Test service temporarily unavailable
1523        class Handler(NNTPv1Handler):
1524            welcome = '400 Service temporarily unavailable'
1525        self.check_constructor_error_conditions(
1526            Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1527
1528    def test_service_permanently_unavailable(self):
1529        #Test service permanently unavailable
1530        class Handler(NNTPv1Handler):
1531            welcome = '502 Service permanently unavailable'
1532        self.check_constructor_error_conditions(
1533            Handler, nntplib.NNTPPermanentError, Handler.welcome)
1534
1535    def test_bad_capabilities(self):
1536        #Test a bad capabilities response
1537        class Handler(NNTPv1Handler):
1538            def handle_CAPABILITIES(self):
1539                self.push_lit(capabilities_response)
1540        capabilities_response = '201 bad capability'
1541        self.check_constructor_error_conditions(
1542            Handler, nntplib.NNTPReplyError, capabilities_response)
1543
1544    def test_login_aborted(self):
1545        #Test a bad authinfo response
1546        login = 't@e.com'
1547        password = 'python'
1548        class Handler(NNTPv1Handler):
1549            def handle_AUTHINFO(self, *args):
1550                self.push_lit(authinfo_response)
1551        authinfo_response = '503 Mechanism not recognized'
1552        self.check_constructor_error_conditions(
1553            Handler, nntplib.NNTPPermanentError, authinfo_response,
1554            login, password)
1555
1556class bypass_context:
1557    """Bypass encryption and actual SSL module"""
1558    def wrap_socket(sock, **args):
1559        return sock
1560
1561@unittest.skipUnless(ssl, 'requires SSL support')
1562class MockSslTests(MockSocketTests):
1563    @staticmethod
1564    def nntp_class(*pos, **kw):
1565        return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
1566
1567
1568class LocalServerTests(unittest.TestCase):
1569    def setUp(self):
1570        sock = socket.socket()
1571        port = socket_helper.bind_port(sock)
1572        sock.listen()
1573        self.background = threading.Thread(
1574            target=self.run_server, args=(sock,))
1575        self.background.start()
1576        self.addCleanup(self.background.join)
1577
1578        self.nntp = NNTP(socket_helper.HOST, port, usenetrc=False).__enter__()
1579        self.addCleanup(self.nntp.__exit__, None, None, None)
1580
1581    def run_server(self, sock):
1582        # Could be generalized to handle more commands in separate methods
1583        with sock:
1584            [client, _] = sock.accept()
1585        with contextlib.ExitStack() as cleanup:
1586            cleanup.enter_context(client)
1587            reader = cleanup.enter_context(client.makefile('rb'))
1588            client.sendall(b'200 Server ready\r\n')
1589            while True:
1590                cmd = reader.readline()
1591                if cmd == b'CAPABILITIES\r\n':
1592                    client.sendall(
1593                        b'101 Capability list:\r\n'
1594                        b'VERSION 2\r\n'
1595                        b'STARTTLS\r\n'
1596                        b'.\r\n'
1597                    )
1598                elif cmd == b'STARTTLS\r\n':
1599                    reader.close()
1600                    client.sendall(b'382 Begin TLS negotiation now\r\n')
1601                    context = ssl.SSLContext()
1602                    context.load_cert_chain(certfile)
1603                    client = context.wrap_socket(
1604                        client, server_side=True)
1605                    cleanup.enter_context(client)
1606                    reader = cleanup.enter_context(client.makefile('rb'))
1607                elif cmd == b'QUIT\r\n':
1608                    client.sendall(b'205 Bye!\r\n')
1609                    break
1610                else:
1611                    raise ValueError('Unexpected command {!r}'.format(cmd))
1612
1613    @unittest.skipUnless(ssl, 'requires SSL support')
1614    def test_starttls(self):
1615        file = self.nntp.file
1616        sock = self.nntp.sock
1617        self.nntp.starttls()
1618        # Check that the socket and internal pseudo-file really were
1619        # changed.
1620        self.assertNotEqual(file, self.nntp.file)
1621        self.assertNotEqual(sock, self.nntp.sock)
1622        # Check that the new socket really is an SSL one
1623        self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1624        # Check that trying starttls when it's already active fails.
1625        self.assertRaises(ValueError, self.nntp.starttls)
1626
1627
1628if __name__ == "__main__":
1629    unittest.main()
1630