1import asyncore
2import base64
3import email.mime.text
4from email.message import EmailMessage
5from email.base64mime import body_encode as encode_base64
6import email.utils
7import hashlib
8import hmac
9import socket
10import smtpd
11import smtplib
12import io
13import re
14import sys
15import time
16import select
17import errno
18import textwrap
19import threading
20
21import unittest
22from test import support, mock_socket
23from test.support import hashlib_helper
24from test.support import socket_helper
25from test.support import threading_setup, threading_cleanup, join_thread
26from unittest.mock import Mock
27
28HOST = socket_helper.HOST
29
30if sys.platform == 'darwin':
31    # select.poll returns a select.POLLHUP at the end of the tests
32    # on darwin, so just ignore it
33    def handle_expt(self):
34        pass
35    smtpd.SMTPChannel.handle_expt = handle_expt
36
37
38def server(evt, buf, serv):
39    serv.listen()
40    evt.set()
41    try:
42        conn, addr = serv.accept()
43    except socket.timeout:
44        pass
45    else:
46        n = 500
47        while buf and n > 0:
48            r, w, e = select.select([], [conn], [])
49            if w:
50                sent = conn.send(buf)
51                buf = buf[sent:]
52
53            n -= 1
54
55        conn.close()
56    finally:
57        serv.close()
58        evt.set()
59
60class GeneralTests:
61
62    def setUp(self):
63        smtplib.socket = mock_socket
64        self.port = 25
65
66    def tearDown(self):
67        smtplib.socket = socket
68
69    # This method is no longer used but is retained for backward compatibility,
70    # so test to make sure it still works.
71    def testQuoteData(self):
72        teststr  = "abc\n.jkl\rfoo\r\n..blue"
73        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
74        self.assertEqual(expected, smtplib.quotedata(teststr))
75
76    def testBasic1(self):
77        mock_socket.reply_with(b"220 Hola mundo")
78        # connects
79        client = self.client(HOST, self.port)
80        client.close()
81
82    def testSourceAddress(self):
83        mock_socket.reply_with(b"220 Hola mundo")
84        # connects
85        client = self.client(HOST, self.port,
86                             source_address=('127.0.0.1',19876))
87        self.assertEqual(client.source_address, ('127.0.0.1', 19876))
88        client.close()
89
90    def testBasic2(self):
91        mock_socket.reply_with(b"220 Hola mundo")
92        # connects, include port in host name
93        client = self.client("%s:%s" % (HOST, self.port))
94        client.close()
95
96    def testLocalHostName(self):
97        mock_socket.reply_with(b"220 Hola mundo")
98        # check that supplied local_hostname is used
99        client = self.client(HOST, self.port, local_hostname="testhost")
100        self.assertEqual(client.local_hostname, "testhost")
101        client.close()
102
103    def testTimeoutDefault(self):
104        mock_socket.reply_with(b"220 Hola mundo")
105        self.assertIsNone(mock_socket.getdefaulttimeout())
106        mock_socket.setdefaulttimeout(30)
107        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
108        try:
109            client = self.client(HOST, self.port)
110        finally:
111            mock_socket.setdefaulttimeout(None)
112        self.assertEqual(client.sock.gettimeout(), 30)
113        client.close()
114
115    def testTimeoutNone(self):
116        mock_socket.reply_with(b"220 Hola mundo")
117        self.assertIsNone(socket.getdefaulttimeout())
118        socket.setdefaulttimeout(30)
119        try:
120            client = self.client(HOST, self.port, timeout=None)
121        finally:
122            socket.setdefaulttimeout(None)
123        self.assertIsNone(client.sock.gettimeout())
124        client.close()
125
126    def testTimeoutZero(self):
127        mock_socket.reply_with(b"220 Hola mundo")
128        with self.assertRaises(ValueError):
129            self.client(HOST, self.port, timeout=0)
130
131    def testTimeoutValue(self):
132        mock_socket.reply_with(b"220 Hola mundo")
133        client = self.client(HOST, self.port, timeout=30)
134        self.assertEqual(client.sock.gettimeout(), 30)
135        client.close()
136
137    def test_debuglevel(self):
138        mock_socket.reply_with(b"220 Hello world")
139        client = self.client()
140        client.set_debuglevel(1)
141        with support.captured_stderr() as stderr:
142            client.connect(HOST, self.port)
143        client.close()
144        expected = re.compile(r"^connect:", re.MULTILINE)
145        self.assertRegex(stderr.getvalue(), expected)
146
147    def test_debuglevel_2(self):
148        mock_socket.reply_with(b"220 Hello world")
149        client = self.client()
150        client.set_debuglevel(2)
151        with support.captured_stderr() as stderr:
152            client.connect(HOST, self.port)
153        client.close()
154        expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
155                              re.MULTILINE)
156        self.assertRegex(stderr.getvalue(), expected)
157
158
159class SMTPGeneralTests(GeneralTests, unittest.TestCase):
160
161    client = smtplib.SMTP
162
163
164class LMTPGeneralTests(GeneralTests, unittest.TestCase):
165
166    client = smtplib.LMTP
167
168    def testTimeoutZero(self):
169        super().testTimeoutZero()
170        local_host = '/some/local/lmtp/delivery/program'
171        with self.assertRaises(ValueError):
172            self.client(local_host, timeout=0)
173
174# Test server thread using the specified SMTP server class
175def debugging_server(serv, serv_evt, client_evt):
176    serv_evt.set()
177
178    try:
179        if hasattr(select, 'poll'):
180            poll_fun = asyncore.poll2
181        else:
182            poll_fun = asyncore.poll
183
184        n = 1000
185        while asyncore.socket_map and n > 0:
186            poll_fun(0.01, asyncore.socket_map)
187
188            # when the client conversation is finished, it will
189            # set client_evt, and it's then ok to kill the server
190            if client_evt.is_set():
191                serv.close()
192                break
193
194            n -= 1
195
196    except socket.timeout:
197        pass
198    finally:
199        if not client_evt.is_set():
200            # allow some time for the client to read the result
201            time.sleep(0.5)
202            serv.close()
203        asyncore.close_all()
204        serv_evt.set()
205
206MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
207MSG_END = '------------ END MESSAGE ------------\n'
208
209# NOTE: Some SMTP objects in the tests below are created with a non-default
210# local_hostname argument to the constructor, since (on some systems) the FQDN
211# lookup caused by the default local_hostname sometimes takes so long that the
212# test server times out, causing the test to fail.
213
214# Test behavior of smtpd.DebuggingServer
215class DebuggingServerTests(unittest.TestCase):
216
217    maxDiff = None
218
219    def setUp(self):
220        self.thread_key = threading_setup()
221        self.real_getfqdn = socket.getfqdn
222        socket.getfqdn = mock_socket.getfqdn
223        # temporarily replace sys.stdout to capture DebuggingServer output
224        self.old_stdout = sys.stdout
225        self.output = io.StringIO()
226        sys.stdout = self.output
227
228        self.serv_evt = threading.Event()
229        self.client_evt = threading.Event()
230        # Capture SMTPChannel debug output
231        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
232        smtpd.DEBUGSTREAM = io.StringIO()
233        # Pick a random unused port by passing 0 for the port number
234        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
235                                          decode_data=True)
236        # Keep a note of what server host and port were assigned
237        self.host, self.port = self.serv.socket.getsockname()[:2]
238        serv_args = (self.serv, self.serv_evt, self.client_evt)
239        self.thread = threading.Thread(target=debugging_server, args=serv_args)
240        self.thread.start()
241
242        # wait until server thread has assigned a port number
243        self.serv_evt.wait()
244        self.serv_evt.clear()
245
246    def tearDown(self):
247        socket.getfqdn = self.real_getfqdn
248        # indicate that the client is finished
249        self.client_evt.set()
250        # wait for the server thread to terminate
251        self.serv_evt.wait()
252        join_thread(self.thread)
253        # restore sys.stdout
254        sys.stdout = self.old_stdout
255        # restore DEBUGSTREAM
256        smtpd.DEBUGSTREAM.close()
257        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
258        del self.thread
259        self.doCleanups()
260        threading_cleanup(*self.thread_key)
261
262    def get_output_without_xpeer(self):
263        test_output = self.output.getvalue()
264        return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
265                      test_output, flags=re.MULTILINE|re.DOTALL)
266
267    def testBasic(self):
268        # connect
269        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
270                            timeout=support.LOOPBACK_TIMEOUT)
271        smtp.quit()
272
273    def testSourceAddress(self):
274        # connect
275        src_port = socket_helper.find_unused_port()
276        try:
277            smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
278                                timeout=support.LOOPBACK_TIMEOUT,
279                                source_address=(self.host, src_port))
280            self.addCleanup(smtp.close)
281            self.assertEqual(smtp.source_address, (self.host, src_port))
282            self.assertEqual(smtp.local_hostname, 'localhost')
283            smtp.quit()
284        except OSError as e:
285            if e.errno == errno.EADDRINUSE:
286                self.skipTest("couldn't bind to source port %d" % src_port)
287            raise
288
289    def testNOOP(self):
290        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
291                            timeout=support.LOOPBACK_TIMEOUT)
292        self.addCleanup(smtp.close)
293        expected = (250, b'OK')
294        self.assertEqual(smtp.noop(), expected)
295        smtp.quit()
296
297    def testRSET(self):
298        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
299                            timeout=support.LOOPBACK_TIMEOUT)
300        self.addCleanup(smtp.close)
301        expected = (250, b'OK')
302        self.assertEqual(smtp.rset(), expected)
303        smtp.quit()
304
305    def testELHO(self):
306        # EHLO isn't implemented in DebuggingServer
307        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
308                            timeout=support.LOOPBACK_TIMEOUT)
309        self.addCleanup(smtp.close)
310        expected = (250, b'\nSIZE 33554432\nHELP')
311        self.assertEqual(smtp.ehlo(), expected)
312        smtp.quit()
313
314    def testEXPNNotImplemented(self):
315        # EXPN isn't implemented in DebuggingServer
316        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
317                            timeout=support.LOOPBACK_TIMEOUT)
318        self.addCleanup(smtp.close)
319        expected = (502, b'EXPN not implemented')
320        smtp.putcmd('EXPN')
321        self.assertEqual(smtp.getreply(), expected)
322        smtp.quit()
323
324    def testVRFY(self):
325        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
326                            timeout=support.LOOPBACK_TIMEOUT)
327        self.addCleanup(smtp.close)
328        expected = (252, b'Cannot VRFY user, but will accept message ' + \
329                         b'and attempt delivery')
330        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
331        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
332        smtp.quit()
333
334    def testSecondHELO(self):
335        # check that a second HELO returns a message that it's a duplicate
336        # (this behavior is specific to smtpd.SMTPChannel)
337        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
338                            timeout=support.LOOPBACK_TIMEOUT)
339        self.addCleanup(smtp.close)
340        smtp.helo()
341        expected = (503, b'Duplicate HELO/EHLO')
342        self.assertEqual(smtp.helo(), expected)
343        smtp.quit()
344
345    def testHELP(self):
346        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
347                            timeout=support.LOOPBACK_TIMEOUT)
348        self.addCleanup(smtp.close)
349        self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
350                                      b'RCPT DATA RSET NOOP QUIT VRFY')
351        smtp.quit()
352
353    def testSend(self):
354        # connect and send mail
355        m = 'A test message'
356        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
357                            timeout=support.LOOPBACK_TIMEOUT)
358        self.addCleanup(smtp.close)
359        smtp.sendmail('John', 'Sally', m)
360        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
361        # in asyncore.  This sleep might help, but should really be fixed
362        # properly by using an Event variable.
363        time.sleep(0.01)
364        smtp.quit()
365
366        self.client_evt.set()
367        self.serv_evt.wait()
368        self.output.flush()
369        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
370        self.assertEqual(self.output.getvalue(), mexpect)
371
372    def testSendBinary(self):
373        m = b'A test message'
374        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
375                            timeout=support.LOOPBACK_TIMEOUT)
376        self.addCleanup(smtp.close)
377        smtp.sendmail('John', 'Sally', m)
378        # XXX (see comment in testSend)
379        time.sleep(0.01)
380        smtp.quit()
381
382        self.client_evt.set()
383        self.serv_evt.wait()
384        self.output.flush()
385        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
386        self.assertEqual(self.output.getvalue(), mexpect)
387
388    def testSendNeedingDotQuote(self):
389        # Issue 12283
390        m = '.A test\n.mes.sage.'
391        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
392                            timeout=support.LOOPBACK_TIMEOUT)
393        self.addCleanup(smtp.close)
394        smtp.sendmail('John', 'Sally', m)
395        # XXX (see comment in testSend)
396        time.sleep(0.01)
397        smtp.quit()
398
399        self.client_evt.set()
400        self.serv_evt.wait()
401        self.output.flush()
402        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
403        self.assertEqual(self.output.getvalue(), mexpect)
404
405    def testSendNullSender(self):
406        m = 'A test message'
407        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
408                            timeout=support.LOOPBACK_TIMEOUT)
409        self.addCleanup(smtp.close)
410        smtp.sendmail('<>', 'Sally', m)
411        # XXX (see comment in testSend)
412        time.sleep(0.01)
413        smtp.quit()
414
415        self.client_evt.set()
416        self.serv_evt.wait()
417        self.output.flush()
418        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
419        self.assertEqual(self.output.getvalue(), mexpect)
420        debugout = smtpd.DEBUGSTREAM.getvalue()
421        sender = re.compile("^sender: <>$", re.MULTILINE)
422        self.assertRegex(debugout, sender)
423
424    def testSendMessage(self):
425        m = email.mime.text.MIMEText('A test message')
426        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
427                            timeout=support.LOOPBACK_TIMEOUT)
428        self.addCleanup(smtp.close)
429        smtp.send_message(m, from_addr='John', to_addrs='Sally')
430        # XXX (see comment in testSend)
431        time.sleep(0.01)
432        smtp.quit()
433
434        self.client_evt.set()
435        self.serv_evt.wait()
436        self.output.flush()
437        # Remove the X-Peer header that DebuggingServer adds as figuring out
438        # exactly what IP address format is put there is not easy (and
439        # irrelevant to our test).  Typically 127.0.0.1 or ::1, but it is
440        # not always the same as socket.gethostbyname(HOST). :(
441        test_output = self.get_output_without_xpeer()
442        del m['X-Peer']
443        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
444        self.assertEqual(test_output, mexpect)
445
446    def testSendMessageWithAddresses(self):
447        m = email.mime.text.MIMEText('A test message')
448        m['From'] = 'foo@bar.com'
449        m['To'] = 'John'
450        m['CC'] = 'Sally, Fred'
451        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
452        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
453                            timeout=support.LOOPBACK_TIMEOUT)
454        self.addCleanup(smtp.close)
455        smtp.send_message(m)
456        # XXX (see comment in testSend)
457        time.sleep(0.01)
458        smtp.quit()
459        # make sure the Bcc header is still in the message.
460        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
461                                    '<warped@silly.walks.com>')
462
463        self.client_evt.set()
464        self.serv_evt.wait()
465        self.output.flush()
466        # Remove the X-Peer header that DebuggingServer adds.
467        test_output = self.get_output_without_xpeer()
468        del m['X-Peer']
469        # The Bcc header should not be transmitted.
470        del m['Bcc']
471        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
472        self.assertEqual(test_output, mexpect)
473        debugout = smtpd.DEBUGSTREAM.getvalue()
474        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
475        self.assertRegex(debugout, sender)
476        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
477                     'warped@silly.walks.com'):
478            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
479                                 re.MULTILINE)
480            self.assertRegex(debugout, to_addr)
481
482    def testSendMessageWithSomeAddresses(self):
483        # Make sure nothing breaks if not all of the three 'to' headers exist
484        m = email.mime.text.MIMEText('A test message')
485        m['From'] = 'foo@bar.com'
486        m['To'] = 'John, Dinsdale'
487        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
488                            timeout=support.LOOPBACK_TIMEOUT)
489        self.addCleanup(smtp.close)
490        smtp.send_message(m)
491        # XXX (see comment in testSend)
492        time.sleep(0.01)
493        smtp.quit()
494
495        self.client_evt.set()
496        self.serv_evt.wait()
497        self.output.flush()
498        # Remove the X-Peer header that DebuggingServer adds.
499        test_output = self.get_output_without_xpeer()
500        del m['X-Peer']
501        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
502        self.assertEqual(test_output, mexpect)
503        debugout = smtpd.DEBUGSTREAM.getvalue()
504        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
505        self.assertRegex(debugout, sender)
506        for addr in ('John', 'Dinsdale'):
507            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
508                                 re.MULTILINE)
509            self.assertRegex(debugout, to_addr)
510
511    def testSendMessageWithSpecifiedAddresses(self):
512        # Make sure addresses specified in call override those in message.
513        m = email.mime.text.MIMEText('A test message')
514        m['From'] = 'foo@bar.com'
515        m['To'] = 'John, Dinsdale'
516        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
517                            timeout=support.LOOPBACK_TIMEOUT)
518        self.addCleanup(smtp.close)
519        smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
520        # XXX (see comment in testSend)
521        time.sleep(0.01)
522        smtp.quit()
523
524        self.client_evt.set()
525        self.serv_evt.wait()
526        self.output.flush()
527        # Remove the X-Peer header that DebuggingServer adds.
528        test_output = self.get_output_without_xpeer()
529        del m['X-Peer']
530        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
531        self.assertEqual(test_output, mexpect)
532        debugout = smtpd.DEBUGSTREAM.getvalue()
533        sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
534        self.assertRegex(debugout, sender)
535        for addr in ('John', 'Dinsdale'):
536            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
537                                 re.MULTILINE)
538            self.assertNotRegex(debugout, to_addr)
539        recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
540        self.assertRegex(debugout, recip)
541
542    def testSendMessageWithMultipleFrom(self):
543        # Sender overrides To
544        m = email.mime.text.MIMEText('A test message')
545        m['From'] = 'Bernard, Bianca'
546        m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
547        m['To'] = 'John, Dinsdale'
548        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
549                            timeout=support.LOOPBACK_TIMEOUT)
550        self.addCleanup(smtp.close)
551        smtp.send_message(m)
552        # XXX (see comment in testSend)
553        time.sleep(0.01)
554        smtp.quit()
555
556        self.client_evt.set()
557        self.serv_evt.wait()
558        self.output.flush()
559        # Remove the X-Peer header that DebuggingServer adds.
560        test_output = self.get_output_without_xpeer()
561        del m['X-Peer']
562        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
563        self.assertEqual(test_output, mexpect)
564        debugout = smtpd.DEBUGSTREAM.getvalue()
565        sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
566        self.assertRegex(debugout, sender)
567        for addr in ('John', 'Dinsdale'):
568            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
569                                 re.MULTILINE)
570            self.assertRegex(debugout, to_addr)
571
572    def testSendMessageResent(self):
573        m = email.mime.text.MIMEText('A test message')
574        m['From'] = 'foo@bar.com'
575        m['To'] = 'John'
576        m['CC'] = 'Sally, Fred'
577        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
578        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
579        m['Resent-From'] = 'holy@grail.net'
580        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
581        m['Resent-Bcc'] = 'doe@losthope.net'
582        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
583                            timeout=support.LOOPBACK_TIMEOUT)
584        self.addCleanup(smtp.close)
585        smtp.send_message(m)
586        # XXX (see comment in testSend)
587        time.sleep(0.01)
588        smtp.quit()
589
590        self.client_evt.set()
591        self.serv_evt.wait()
592        self.output.flush()
593        # The Resent-Bcc headers are deleted before serialization.
594        del m['Bcc']
595        del m['Resent-Bcc']
596        # Remove the X-Peer header that DebuggingServer adds.
597        test_output = self.get_output_without_xpeer()
598        del m['X-Peer']
599        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
600        self.assertEqual(test_output, mexpect)
601        debugout = smtpd.DEBUGSTREAM.getvalue()
602        sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
603        self.assertRegex(debugout, sender)
604        for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
605            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
606                                 re.MULTILINE)
607            self.assertRegex(debugout, to_addr)
608
609    def testSendMessageMultipleResentRaises(self):
610        m = email.mime.text.MIMEText('A test message')
611        m['From'] = 'foo@bar.com'
612        m['To'] = 'John'
613        m['CC'] = 'Sally, Fred'
614        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
615        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
616        m['Resent-From'] = 'holy@grail.net'
617        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
618        m['Resent-Bcc'] = 'doe@losthope.net'
619        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
620        m['Resent-To'] = 'holy@grail.net'
621        m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
622        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
623                            timeout=support.LOOPBACK_TIMEOUT)
624        self.addCleanup(smtp.close)
625        with self.assertRaises(ValueError):
626            smtp.send_message(m)
627        smtp.close()
628
629class NonConnectingTests(unittest.TestCase):
630
631    def testNotConnected(self):
632        # Test various operations on an unconnected SMTP object that
633        # should raise exceptions (at present the attempt in SMTP.send
634        # to reference the nonexistent 'sock' attribute of the SMTP object
635        # causes an AttributeError)
636        smtp = smtplib.SMTP()
637        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
638        self.assertRaises(smtplib.SMTPServerDisconnected,
639                          smtp.send, 'test msg')
640
641    def testNonnumericPort(self):
642        # check that non-numeric port raises OSError
643        self.assertRaises(OSError, smtplib.SMTP,
644                          "localhost", "bogus")
645        self.assertRaises(OSError, smtplib.SMTP,
646                          "localhost:bogus")
647
648    def testSockAttributeExists(self):
649        # check that sock attribute is present outside of a connect() call
650        # (regression test, the previous behavior raised an
651        #  AttributeError: 'SMTP' object has no attribute 'sock')
652        with smtplib.SMTP() as smtp:
653            self.assertIsNone(smtp.sock)
654
655
656class DefaultArgumentsTests(unittest.TestCase):
657
658    def setUp(self):
659        self.msg = EmailMessage()
660        self.msg['From'] = 'Páolo <főo@bar.com>'
661        self.smtp = smtplib.SMTP()
662        self.smtp.ehlo = Mock(return_value=(200, 'OK'))
663        self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
664
665    def testSendMessage(self):
666        expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
667        self.smtp.send_message(self.msg)
668        self.smtp.send_message(self.msg)
669        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
670                         expected_mail_options)
671        self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
672                         expected_mail_options)
673
674    def testSendMessageWithMailOptions(self):
675        mail_options = ['STARTTLS']
676        expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
677        self.smtp.send_message(self.msg, None, None, mail_options)
678        self.assertEqual(mail_options, ['STARTTLS'])
679        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
680                         expected_mail_options)
681
682
683# test response of client to a non-successful HELO message
684class BadHELOServerTests(unittest.TestCase):
685
686    def setUp(self):
687        smtplib.socket = mock_socket
688        mock_socket.reply_with(b"199 no hello for you!")
689        self.old_stdout = sys.stdout
690        self.output = io.StringIO()
691        sys.stdout = self.output
692        self.port = 25
693
694    def tearDown(self):
695        smtplib.socket = socket
696        sys.stdout = self.old_stdout
697
698    def testFailingHELO(self):
699        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
700                            HOST, self.port, 'localhost', 3)
701
702
703class TooLongLineTests(unittest.TestCase):
704    respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
705
706    def setUp(self):
707        self.thread_key = threading_setup()
708        self.old_stdout = sys.stdout
709        self.output = io.StringIO()
710        sys.stdout = self.output
711
712        self.evt = threading.Event()
713        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
714        self.sock.settimeout(15)
715        self.port = socket_helper.bind_port(self.sock)
716        servargs = (self.evt, self.respdata, self.sock)
717        self.thread = threading.Thread(target=server, args=servargs)
718        self.thread.start()
719        self.evt.wait()
720        self.evt.clear()
721
722    def tearDown(self):
723        self.evt.wait()
724        sys.stdout = self.old_stdout
725        join_thread(self.thread)
726        del self.thread
727        self.doCleanups()
728        threading_cleanup(*self.thread_key)
729
730    def testLineTooLong(self):
731        self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
732                          HOST, self.port, 'localhost', 3)
733
734
735sim_users = {'Mr.A@somewhere.com':'John A',
736             'Ms.B@xn--fo-fka.com':'Sally B',
737             'Mrs.C@somewhereesle.com':'Ruth C',
738            }
739
740sim_auth = ('Mr.A@somewhere.com', 'somepassword')
741sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
742                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
743sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
744             'list-2':['Ms.B@xn--fo-fka.com',],
745            }
746
747# Simulated SMTP channel & server
748class ResponseException(Exception): pass
749class SimSMTPChannel(smtpd.SMTPChannel):
750
751    quit_response = None
752    mail_response = None
753    rcpt_response = None
754    data_response = None
755    rcpt_count = 0
756    rset_count = 0
757    disconnect = 0
758    AUTH = 99    # Add protocol state to enable auth testing.
759    authenticated_user = None
760
761    def __init__(self, extra_features, *args, **kw):
762        self._extrafeatures = ''.join(
763            [ "250-{0}\r\n".format(x) for x in extra_features ])
764        super(SimSMTPChannel, self).__init__(*args, **kw)
765
766    # AUTH related stuff.  It would be nice if support for this were in smtpd.
767    def found_terminator(self):
768        if self.smtp_state == self.AUTH:
769            line = self._emptystring.join(self.received_lines)
770            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
771            self.received_lines = []
772            try:
773                self.auth_object(line)
774            except ResponseException as e:
775                self.smtp_state = self.COMMAND
776                self.push('%s %s' % (e.smtp_code, e.smtp_error))
777                return
778        super().found_terminator()
779
780
781    def smtp_AUTH(self, arg):
782        if not self.seen_greeting:
783            self.push('503 Error: send EHLO first')
784            return
785        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
786            self.push('500 Error: command "AUTH" not recognized')
787            return
788        if self.authenticated_user is not None:
789            self.push(
790                '503 Bad sequence of commands: already authenticated')
791            return
792        args = arg.split()
793        if len(args) not in [1, 2]:
794            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
795            return
796        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
797        try:
798            self.auth_object = getattr(self, auth_object_name)
799        except AttributeError:
800            self.push('504 Command parameter not implemented: unsupported '
801                      ' authentication mechanism {!r}'.format(auth_object_name))
802            return
803        self.smtp_state = self.AUTH
804        self.auth_object(args[1] if len(args) == 2 else None)
805
806    def _authenticated(self, user, valid):
807        if valid:
808            self.authenticated_user = user
809            self.push('235 Authentication Succeeded')
810        else:
811            self.push('535 Authentication credentials invalid')
812        self.smtp_state = self.COMMAND
813
814    def _decode_base64(self, string):
815        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
816
817    def _auth_plain(self, arg=None):
818        if arg is None:
819            self.push('334 ')
820        else:
821            logpass = self._decode_base64(arg)
822            try:
823                *_, user, password = logpass.split('\0')
824            except ValueError as e:
825                self.push('535 Splitting response {!r} into user and password'
826                          ' failed: {}'.format(logpass, e))
827                return
828            self._authenticated(user, password == sim_auth[1])
829
830    def _auth_login(self, arg=None):
831        if arg is None:
832            # base64 encoded 'Username:'
833            self.push('334 VXNlcm5hbWU6')
834        elif not hasattr(self, '_auth_login_user'):
835            self._auth_login_user = self._decode_base64(arg)
836            # base64 encoded 'Password:'
837            self.push('334 UGFzc3dvcmQ6')
838        else:
839            password = self._decode_base64(arg)
840            self._authenticated(self._auth_login_user, password == sim_auth[1])
841            del self._auth_login_user
842
843    def _auth_cram_md5(self, arg=None):
844        if arg is None:
845            self.push('334 {}'.format(sim_cram_md5_challenge))
846        else:
847            logpass = self._decode_base64(arg)
848            try:
849                user, hashed_pass = logpass.split()
850            except ValueError as e:
851                self.push('535 Splitting response {!r} into user and password '
852                          'failed: {}'.format(logpass, e))
853                return False
854            valid_hashed_pass = hmac.HMAC(
855                sim_auth[1].encode('ascii'),
856                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
857                'md5').hexdigest()
858            self._authenticated(user, hashed_pass == valid_hashed_pass)
859    # end AUTH related stuff.
860
861    def smtp_EHLO(self, arg):
862        resp = ('250-testhost\r\n'
863                '250-EXPN\r\n'
864                '250-SIZE 20000000\r\n'
865                '250-STARTTLS\r\n'
866                '250-DELIVERBY\r\n')
867        resp = resp + self._extrafeatures + '250 HELP'
868        self.push(resp)
869        self.seen_greeting = arg
870        self.extended_smtp = True
871
872    def smtp_VRFY(self, arg):
873        # For max compatibility smtplib should be sending the raw address.
874        if arg in sim_users:
875            self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
876        else:
877            self.push('550 No such user: %s' % arg)
878
879    def smtp_EXPN(self, arg):
880        list_name = arg.lower()
881        if list_name in sim_lists:
882            user_list = sim_lists[list_name]
883            for n, user_email in enumerate(user_list):
884                quoted_addr = smtplib.quoteaddr(user_email)
885                if n < len(user_list) - 1:
886                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
887                else:
888                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
889        else:
890            self.push('550 No access for you!')
891
892    def smtp_QUIT(self, arg):
893        if self.quit_response is None:
894            super(SimSMTPChannel, self).smtp_QUIT(arg)
895        else:
896            self.push(self.quit_response)
897            self.close_when_done()
898
899    def smtp_MAIL(self, arg):
900        if self.mail_response is None:
901            super().smtp_MAIL(arg)
902        else:
903            self.push(self.mail_response)
904            if self.disconnect:
905                self.close_when_done()
906
907    def smtp_RCPT(self, arg):
908        if self.rcpt_response is None:
909            super().smtp_RCPT(arg)
910            return
911        self.rcpt_count += 1
912        self.push(self.rcpt_response[self.rcpt_count-1])
913
914    def smtp_RSET(self, arg):
915        self.rset_count += 1
916        super().smtp_RSET(arg)
917
918    def smtp_DATA(self, arg):
919        if self.data_response is None:
920            super().smtp_DATA(arg)
921        else:
922            self.push(self.data_response)
923
924    def handle_error(self):
925        raise
926
927
928class SimSMTPServer(smtpd.SMTPServer):
929
930    channel_class = SimSMTPChannel
931
932    def __init__(self, *args, **kw):
933        self._extra_features = []
934        self._addresses = {}
935        smtpd.SMTPServer.__init__(self, *args, **kw)
936
937    def handle_accepted(self, conn, addr):
938        self._SMTPchannel = self.channel_class(
939            self._extra_features, self, conn, addr,
940            decode_data=self._decode_data)
941
942    def process_message(self, peer, mailfrom, rcpttos, data):
943        self._addresses['from'] = mailfrom
944        self._addresses['tos'] = rcpttos
945
946    def add_feature(self, feature):
947        self._extra_features.append(feature)
948
949    def handle_error(self):
950        raise
951
952
953# Test various SMTP & ESMTP commands/behaviors that require a simulated server
954# (i.e., something with more features than DebuggingServer)
955class SMTPSimTests(unittest.TestCase):
956
957    def setUp(self):
958        self.thread_key = threading_setup()
959        self.real_getfqdn = socket.getfqdn
960        socket.getfqdn = mock_socket.getfqdn
961        self.serv_evt = threading.Event()
962        self.client_evt = threading.Event()
963        # Pick a random unused port by passing 0 for the port number
964        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
965        # Keep a note of what port was assigned
966        self.port = self.serv.socket.getsockname()[1]
967        serv_args = (self.serv, self.serv_evt, self.client_evt)
968        self.thread = threading.Thread(target=debugging_server, args=serv_args)
969        self.thread.start()
970
971        # wait until server thread has assigned a port number
972        self.serv_evt.wait()
973        self.serv_evt.clear()
974
975    def tearDown(self):
976        socket.getfqdn = self.real_getfqdn
977        # indicate that the client is finished
978        self.client_evt.set()
979        # wait for the server thread to terminate
980        self.serv_evt.wait()
981        join_thread(self.thread)
982        del self.thread
983        self.doCleanups()
984        threading_cleanup(*self.thread_key)
985
986    def testBasic(self):
987        # smoke test
988        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
989                            timeout=support.LOOPBACK_TIMEOUT)
990        smtp.quit()
991
992    def testEHLO(self):
993        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
994                            timeout=support.LOOPBACK_TIMEOUT)
995
996        # no features should be present before the EHLO
997        self.assertEqual(smtp.esmtp_features, {})
998
999        # features expected from the test server
1000        expected_features = {'expn':'',
1001                             'size': '20000000',
1002                             'starttls': '',
1003                             'deliverby': '',
1004                             'help': '',
1005                             }
1006
1007        smtp.ehlo()
1008        self.assertEqual(smtp.esmtp_features, expected_features)
1009        for k in expected_features:
1010            self.assertTrue(smtp.has_extn(k))
1011        self.assertFalse(smtp.has_extn('unsupported-feature'))
1012        smtp.quit()
1013
1014    def testVRFY(self):
1015        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1016                            timeout=support.LOOPBACK_TIMEOUT)
1017
1018        for addr_spec, name in sim_users.items():
1019            expected_known = (250, bytes('%s %s' %
1020                                         (name, smtplib.quoteaddr(addr_spec)),
1021                                         "ascii"))
1022            self.assertEqual(smtp.vrfy(addr_spec), expected_known)
1023
1024        u = 'nobody@nowhere.com'
1025        expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
1026        self.assertEqual(smtp.vrfy(u), expected_unknown)
1027        smtp.quit()
1028
1029    def testEXPN(self):
1030        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1031                            timeout=support.LOOPBACK_TIMEOUT)
1032
1033        for listname, members in sim_lists.items():
1034            users = []
1035            for m in members:
1036                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
1037            expected_known = (250, bytes('\n'.join(users), "ascii"))
1038            self.assertEqual(smtp.expn(listname), expected_known)
1039
1040        u = 'PSU-Members-List'
1041        expected_unknown = (550, b'No access for you!')
1042        self.assertEqual(smtp.expn(u), expected_unknown)
1043        smtp.quit()
1044
1045    def testAUTH_PLAIN(self):
1046        self.serv.add_feature("AUTH PLAIN")
1047        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1048                            timeout=support.LOOPBACK_TIMEOUT)
1049        resp = smtp.login(sim_auth[0], sim_auth[1])
1050        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1051        smtp.close()
1052
1053    def testAUTH_LOGIN(self):
1054        self.serv.add_feature("AUTH LOGIN")
1055        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1056                            timeout=support.LOOPBACK_TIMEOUT)
1057        resp = smtp.login(sim_auth[0], sim_auth[1])
1058        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1059        smtp.close()
1060
1061    @hashlib_helper.requires_hashdigest('md5')
1062    def testAUTH_CRAM_MD5(self):
1063        self.serv.add_feature("AUTH CRAM-MD5")
1064        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1065                            timeout=support.LOOPBACK_TIMEOUT)
1066        resp = smtp.login(sim_auth[0], sim_auth[1])
1067        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1068        smtp.close()
1069
1070    @hashlib_helper.requires_hashdigest('md5')
1071    def testAUTH_multiple(self):
1072        # Test that multiple authentication methods are tried.
1073        self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
1074        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1075                            timeout=support.LOOPBACK_TIMEOUT)
1076        resp = smtp.login(sim_auth[0], sim_auth[1])
1077        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1078        smtp.close()
1079
1080    def test_auth_function(self):
1081        supported = {'PLAIN', 'LOGIN'}
1082        try:
1083            hashlib.md5()
1084        except ValueError:
1085            pass
1086        else:
1087            supported.add('CRAM-MD5')
1088        for mechanism in supported:
1089            self.serv.add_feature("AUTH {}".format(mechanism))
1090        for mechanism in supported:
1091            with self.subTest(mechanism=mechanism):
1092                smtp = smtplib.SMTP(HOST, self.port,
1093                                    local_hostname='localhost',
1094                                    timeout=support.LOOPBACK_TIMEOUT)
1095                smtp.ehlo('foo')
1096                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
1097                method = 'auth_' + mechanism.lower().replace('-', '_')
1098                resp = smtp.auth(mechanism, getattr(smtp, method))
1099                self.assertEqual(resp, (235, b'Authentication Succeeded'))
1100                smtp.close()
1101
1102    def test_quit_resets_greeting(self):
1103        smtp = smtplib.SMTP(HOST, self.port,
1104                            local_hostname='localhost',
1105                            timeout=support.LOOPBACK_TIMEOUT)
1106        code, message = smtp.ehlo()
1107        self.assertEqual(code, 250)
1108        self.assertIn('size', smtp.esmtp_features)
1109        smtp.quit()
1110        self.assertNotIn('size', smtp.esmtp_features)
1111        smtp.connect(HOST, self.port)
1112        self.assertNotIn('size', smtp.esmtp_features)
1113        smtp.ehlo_or_helo_if_needed()
1114        self.assertIn('size', smtp.esmtp_features)
1115        smtp.quit()
1116
1117    def test_with_statement(self):
1118        with smtplib.SMTP(HOST, self.port) as smtp:
1119            code, message = smtp.noop()
1120            self.assertEqual(code, 250)
1121        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1122        with smtplib.SMTP(HOST, self.port) as smtp:
1123            smtp.close()
1124        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1125
1126    def test_with_statement_QUIT_failure(self):
1127        with self.assertRaises(smtplib.SMTPResponseException) as error:
1128            with smtplib.SMTP(HOST, self.port) as smtp:
1129                smtp.noop()
1130                self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
1131        self.assertEqual(error.exception.smtp_code, 421)
1132        self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
1133
1134    #TODO: add tests for correct AUTH method fallback now that the
1135    #test infrastructure can support it.
1136
1137    # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
1138    def test__rest_from_mail_cmd(self):
1139        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1140                            timeout=support.LOOPBACK_TIMEOUT)
1141        smtp.noop()
1142        self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
1143        self.serv._SMTPchannel.disconnect = True
1144        with self.assertRaises(smtplib.SMTPSenderRefused):
1145            smtp.sendmail('John', 'Sally', 'test message')
1146        self.assertIsNone(smtp.sock)
1147
1148    # Issue 5713: make sure close, not rset, is called if we get a 421 error
1149    def test_421_from_mail_cmd(self):
1150        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1151                            timeout=support.LOOPBACK_TIMEOUT)
1152        smtp.noop()
1153        self.serv._SMTPchannel.mail_response = '421 closing connection'
1154        with self.assertRaises(smtplib.SMTPSenderRefused):
1155            smtp.sendmail('John', 'Sally', 'test message')
1156        self.assertIsNone(smtp.sock)
1157        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1158
1159    def test_421_from_rcpt_cmd(self):
1160        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1161                            timeout=support.LOOPBACK_TIMEOUT)
1162        smtp.noop()
1163        self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
1164        with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
1165            smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
1166        self.assertIsNone(smtp.sock)
1167        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1168        self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
1169
1170    def test_421_from_data_cmd(self):
1171        class MySimSMTPChannel(SimSMTPChannel):
1172            def found_terminator(self):
1173                if self.smtp_state == self.DATA:
1174                    self.push('421 closing')
1175                else:
1176                    super().found_terminator()
1177        self.serv.channel_class = MySimSMTPChannel
1178        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1179                            timeout=support.LOOPBACK_TIMEOUT)
1180        smtp.noop()
1181        with self.assertRaises(smtplib.SMTPDataError):
1182            smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
1183        self.assertIsNone(smtp.sock)
1184        self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
1185
1186    def test_smtputf8_NotSupportedError_if_no_server_support(self):
1187        smtp = smtplib.SMTP(
1188            HOST, self.port, local_hostname='localhost',
1189            timeout=support.LOOPBACK_TIMEOUT)
1190        self.addCleanup(smtp.close)
1191        smtp.ehlo()
1192        self.assertTrue(smtp.does_esmtp)
1193        self.assertFalse(smtp.has_extn('smtputf8'))
1194        self.assertRaises(
1195            smtplib.SMTPNotSupportedError,
1196            smtp.sendmail,
1197            'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1198        self.assertRaises(
1199            smtplib.SMTPNotSupportedError,
1200            smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
1201
1202    def test_send_unicode_without_SMTPUTF8(self):
1203        smtp = smtplib.SMTP(
1204            HOST, self.port, local_hostname='localhost',
1205            timeout=support.LOOPBACK_TIMEOUT)
1206        self.addCleanup(smtp.close)
1207        self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
1208        self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
1209
1210    def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
1211        # This test is located here and not in the SMTPUTF8SimTests
1212        # class because it needs a "regular" SMTP server to work
1213        msg = EmailMessage()
1214        msg['From'] = "Páolo <főo@bar.com>"
1215        msg['To'] = 'Dinsdale'
1216        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1217        smtp = smtplib.SMTP(
1218            HOST, self.port, local_hostname='localhost',
1219            timeout=support.LOOPBACK_TIMEOUT)
1220        self.addCleanup(smtp.close)
1221        with self.assertRaises(smtplib.SMTPNotSupportedError):
1222            smtp.send_message(msg)
1223
1224    def test_name_field_not_included_in_envelop_addresses(self):
1225        smtp = smtplib.SMTP(
1226            HOST, self.port, local_hostname='localhost',
1227            timeout=support.LOOPBACK_TIMEOUT)
1228        self.addCleanup(smtp.close)
1229
1230        message = EmailMessage()
1231        message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
1232        message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
1233
1234        self.assertDictEqual(smtp.send_message(message), {})
1235
1236        self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
1237        self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
1238
1239
1240class SimSMTPUTF8Server(SimSMTPServer):
1241
1242    def __init__(self, *args, **kw):
1243        # The base SMTP server turns these on automatically, but our test
1244        # server is set up to munge the EHLO response, so we need to provide
1245        # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
1246        self._extra_features = ['SMTPUTF8', '8BITMIME']
1247        smtpd.SMTPServer.__init__(self, *args, **kw)
1248
1249    def handle_accepted(self, conn, addr):
1250        self._SMTPchannel = self.channel_class(
1251            self._extra_features, self, conn, addr,
1252            decode_data=self._decode_data,
1253            enable_SMTPUTF8=self.enable_SMTPUTF8,
1254        )
1255
1256    def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
1257                                                             rcpt_options=None):
1258        self.last_peer = peer
1259        self.last_mailfrom = mailfrom
1260        self.last_rcpttos = rcpttos
1261        self.last_message = data
1262        self.last_mail_options = mail_options
1263        self.last_rcpt_options = rcpt_options
1264
1265
1266class SMTPUTF8SimTests(unittest.TestCase):
1267
1268    maxDiff = None
1269
1270    def setUp(self):
1271        self.thread_key = threading_setup()
1272        self.real_getfqdn = socket.getfqdn
1273        socket.getfqdn = mock_socket.getfqdn
1274        self.serv_evt = threading.Event()
1275        self.client_evt = threading.Event()
1276        # Pick a random unused port by passing 0 for the port number
1277        self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
1278                                      decode_data=False,
1279                                      enable_SMTPUTF8=True)
1280        # Keep a note of what port was assigned
1281        self.port = self.serv.socket.getsockname()[1]
1282        serv_args = (self.serv, self.serv_evt, self.client_evt)
1283        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1284        self.thread.start()
1285
1286        # wait until server thread has assigned a port number
1287        self.serv_evt.wait()
1288        self.serv_evt.clear()
1289
1290    def tearDown(self):
1291        socket.getfqdn = self.real_getfqdn
1292        # indicate that the client is finished
1293        self.client_evt.set()
1294        # wait for the server thread to terminate
1295        self.serv_evt.wait()
1296        join_thread(self.thread)
1297        del self.thread
1298        self.doCleanups()
1299        threading_cleanup(*self.thread_key)
1300
1301    def test_test_server_supports_extensions(self):
1302        smtp = smtplib.SMTP(
1303            HOST, self.port, local_hostname='localhost',
1304            timeout=support.LOOPBACK_TIMEOUT)
1305        self.addCleanup(smtp.close)
1306        smtp.ehlo()
1307        self.assertTrue(smtp.does_esmtp)
1308        self.assertTrue(smtp.has_extn('smtputf8'))
1309
1310    def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
1311        m = '¡a test message containing unicode!'.encode('utf-8')
1312        smtp = smtplib.SMTP(
1313            HOST, self.port, local_hostname='localhost',
1314            timeout=support.LOOPBACK_TIMEOUT)
1315        self.addCleanup(smtp.close)
1316        smtp.sendmail('Jőhn', 'Sálly', m,
1317                      mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1318        self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
1319        self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
1320        self.assertEqual(self.serv.last_message, m)
1321        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1322        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1323        self.assertEqual(self.serv.last_rcpt_options, [])
1324
1325    def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
1326        m = '¡a test message containing unicode!'.encode('utf-8')
1327        smtp = smtplib.SMTP(
1328            HOST, self.port, local_hostname='localhost',
1329            timeout=support.LOOPBACK_TIMEOUT)
1330        self.addCleanup(smtp.close)
1331        smtp.ehlo()
1332        self.assertEqual(
1333            smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
1334            (250, b'OK'))
1335        self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
1336        self.assertEqual(smtp.data(m), (250, b'OK'))
1337        self.assertEqual(self.serv.last_mailfrom, 'Jő')
1338        self.assertEqual(self.serv.last_rcpttos, ['János'])
1339        self.assertEqual(self.serv.last_message, m)
1340        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1341        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1342        self.assertEqual(self.serv.last_rcpt_options, [])
1343
1344    def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
1345        msg = EmailMessage()
1346        msg['From'] = "Páolo <főo@bar.com>"
1347        msg['To'] = 'Dinsdale'
1348        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1349        # XXX I don't know why I need two \n's here, but this is an existing
1350        # bug (if it is one) and not a problem with the new functionality.
1351        msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
1352        # XXX smtpd converts received /r/n to /n, so we can't easily test that
1353        # we are successfully sending /r/n :(.
1354        expected = textwrap.dedent("""\
1355            From: Páolo <főo@bar.com>
1356            To: Dinsdale
1357            Subject: Nudge nudge, wink, wink \u1F609
1358            Content-Type: text/plain; charset="utf-8"
1359            Content-Transfer-Encoding: 8bit
1360            MIME-Version: 1.0
1361
1362            oh là là, know what I mean, know what I mean?
1363            """)
1364        smtp = smtplib.SMTP(
1365            HOST, self.port, local_hostname='localhost',
1366            timeout=support.LOOPBACK_TIMEOUT)
1367        self.addCleanup(smtp.close)
1368        self.assertEqual(smtp.send_message(msg), {})
1369        self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
1370        self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
1371        self.assertEqual(self.serv.last_message.decode(), expected)
1372        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1373        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1374        self.assertEqual(self.serv.last_rcpt_options, [])
1375
1376
1377EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
1378
1379class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
1380    def smtp_AUTH(self, arg):
1381        # RFC 4954's AUTH command allows for an optional initial-response.
1382        # Not all AUTH methods support this; some require a challenge.  AUTH
1383        # PLAIN does those, so test that here.  See issue #15014.
1384        args = arg.split()
1385        if args[0].lower() == 'plain':
1386            if len(args) == 2:
1387                # AUTH PLAIN <initial-response> with the response base 64
1388                # encoded.  Hard code the expected response for the test.
1389                if args[1] == EXPECTED_RESPONSE:
1390                    self.push('235 Ok')
1391                    return
1392        self.push('571 Bad authentication')
1393
1394class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
1395    channel_class = SimSMTPAUTHInitialResponseChannel
1396
1397
1398class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
1399    def setUp(self):
1400        self.thread_key = threading_setup()
1401        self.real_getfqdn = socket.getfqdn
1402        socket.getfqdn = mock_socket.getfqdn
1403        self.serv_evt = threading.Event()
1404        self.client_evt = threading.Event()
1405        # Pick a random unused port by passing 0 for the port number
1406        self.serv = SimSMTPAUTHInitialResponseServer(
1407            (HOST, 0), ('nowhere', -1), decode_data=True)
1408        # Keep a note of what port was assigned
1409        self.port = self.serv.socket.getsockname()[1]
1410        serv_args = (self.serv, self.serv_evt, self.client_evt)
1411        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1412        self.thread.start()
1413
1414        # wait until server thread has assigned a port number
1415        self.serv_evt.wait()
1416        self.serv_evt.clear()
1417
1418    def tearDown(self):
1419        socket.getfqdn = self.real_getfqdn
1420        # indicate that the client is finished
1421        self.client_evt.set()
1422        # wait for the server thread to terminate
1423        self.serv_evt.wait()
1424        join_thread(self.thread)
1425        del self.thread
1426        self.doCleanups()
1427        threading_cleanup(*self.thread_key)
1428
1429    def testAUTH_PLAIN_initial_response_login(self):
1430        self.serv.add_feature('AUTH PLAIN')
1431        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1432                            timeout=support.LOOPBACK_TIMEOUT)
1433        smtp.login('psu', 'doesnotexist')
1434        smtp.close()
1435
1436    def testAUTH_PLAIN_initial_response_auth(self):
1437        self.serv.add_feature('AUTH PLAIN')
1438        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1439                            timeout=support.LOOPBACK_TIMEOUT)
1440        smtp.user = 'psu'
1441        smtp.password = 'doesnotexist'
1442        code, response = smtp.auth('plain', smtp.auth_plain)
1443        smtp.close()
1444        self.assertEqual(code, 235)
1445
1446
1447if __name__ == '__main__':
1448    unittest.main()
1449