1# Copyright (c) 2011 Brian Beach 2# All rights reserved. 3# 4# Permission is hereby granted, free of charge, to any person obtaining a 5# copy of this software and associated documentation files (the 6# "Software"), to deal in the Software without restriction, including 7# without limitation the rights to use, copy, modify, merge, publish, dis- 8# tribute, sublicense, and/or sell copies of the Software, and to permit 9# persons to whom the Software is furnished to do so, subject to the fol- 10# lowing conditions: 11# 12# The above copyright notice and this permission notice shall be included 13# in all copies or substantial portions of the Software. 14# 15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 16# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 17# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 18# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 19# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21# IN THE SOFTWARE. 22 23""" 24Some multi-threading tests of boto in a greenlet environment. 25""" 26from __future__ import print_function 27 28import boto 29import time 30import uuid 31 32from threading import Thread 33 34def spawn(function, *args, **kwargs): 35 """ 36 Spawns a new thread. API is the same as 37 gevent.greenlet.Greenlet.spawn. 38 """ 39 t = Thread(target = function, args = args, kwargs = kwargs) 40 t.start() 41 return t 42 43def put_object(bucket, name): 44 bucket.new_key(name).set_contents_from_string(name) 45 46def get_object(bucket, name): 47 assert bucket.get_key(name).get_contents_as_string().decode('utf-8') == name 48 49def test_close_connections(): 50 """ 51 A test that exposes the problem where connections are returned to the 52 connection pool (and closed) before the caller reads the response. 53 54 I couldn't think of a way to test it without greenlets, so this test 55 doesn't run as part of the standard test suite. That way, no more 56 dependencies are added to the test suite. 57 """ 58 59 print("Running test_close_connections") 60 61 # Connect to S3 62 s3 = boto.connect_s3() 63 64 # Clean previous tests. 65 for b in s3.get_all_buckets(): 66 if b.name.startswith('test-'): 67 for key in b.get_all_keys(): 68 key.delete() 69 b.delete() 70 71 # Make a test bucket 72 bucket = s3.create_bucket('test-%d' % int(time.time())) 73 74 # Create 30 threads that each create an object in S3. The number 75 # 30 is chosen because it is larger than the connection pool size 76 # (20). 77 names = [str(uuid.uuid4) for _ in range(30)] 78 threads = [ 79 spawn(put_object, bucket, name) 80 for name in names 81 ] 82 for t in threads: 83 t.join() 84 85 # Create 30 threads to read the contents of the new objects. This 86 # is where closing the connection early is a problem, because 87 # there is a response that needs to be read, and it can't be read 88 # if the connection has already been closed. 89 threads = [ 90 spawn(get_object, bucket, name) 91 for name in names 92 ] 93 for t in threads: 94 t.join() 95 96# test_reuse_connections needs to read a file that is big enough that 97# one read() call on the socket won't read the whole thing. 98BIG_SIZE = 10000 99 100class WriteAndCount(object): 101 102 """ 103 A file-like object that counts the number of characters written. 104 """ 105 106 def __init__(self): 107 self.size = 0 108 109 def write(self, data): 110 self.size += len(data) 111 time.sleep(0) # yield to other threads 112 113def read_big_object(s3, bucket, name, count): 114 for _ in range(count): 115 key = bucket.get_key(name) 116 out = WriteAndCount() 117 key.get_contents_to_file(out) 118 if out.size != BIG_SIZE: 119 print(out.size, BIG_SIZE) 120 assert out.size == BIG_SIZE 121 print(" pool size:", s3._pool.size()) 122 123class LittleQuerier(object): 124 125 """ 126 An object that manages a thread that keeps pulling down small 127 objects from S3 and checking the answers until told to stop. 128 """ 129 130 def __init__(self, bucket, small_names): 131 self.running = True 132 self.bucket = bucket 133 self.small_names = small_names 134 self.thread = spawn(self.run) 135 136 def stop(self): 137 self.running = False 138 self.thread.join() 139 140 def run(self): 141 count = 0 142 while self.running: 143 i = count % 4 144 key = self.bucket.get_key(self.small_names[i]) 145 expected = str(i) 146 rh = { 'response-content-type' : 'small/' + str(i) } 147 actual = key.get_contents_as_string(response_headers = rh).decode('utf-8') 148 if expected != actual: 149 print("AHA:", repr(expected), repr(actual)) 150 assert expected == actual 151 count += 1 152 153def test_reuse_connections(): 154 """ 155 This test is an attempt to expose problems because of the fact 156 that boto returns connections to the connection pool before 157 reading the response. The strategy is to start a couple big reads 158 from S3, where it will take time to read the response, and then 159 start other requests that will reuse the same connection from the 160 pool while the big response is still being read. 161 162 The test passes because of an interesting combination of factors. 163 I was expecting that it would fail because two threads would be 164 reading the same connection at the same time. That doesn't happen 165 because httplib catches the problem before it happens and raises 166 an exception. 167 168 Here's the sequence of events: 169 170 - Thread 1: Send a request to read a big S3 object. 171 - Thread 1: Returns connection to pool. 172 - Thread 1: Start reading the body if the response. 173 174 - Thread 2: Get the same connection from the pool. 175 - Thread 2: Send another request on the same connection. 176 - Thread 2: Try to read the response, but 177 HTTPConnection.get_response notices that the 178 previous response isn't done reading yet, and 179 raises a ResponseNotReady exception. 180 - Thread 2: _mexe catches the exception, does not return the 181 connection to the pool, gets a new connection, and 182 retries. 183 184 - Thread 1: Finish reading the body of its response. 185 186 - Server: Gets the second request on the connection, and 187 sends a response. This response is ignored because 188 the connection has been dropped on the client end. 189 190 If you add a print statement in HTTPConnection.get_response at the 191 point where it raises ResponseNotReady, and then run this test, 192 you can see that it's happening. 193 """ 194 195 print("Running test_reuse_connections") 196 197 # Connect to S3 198 s3 = boto.connect_s3() 199 200 # Make a test bucket 201 bucket = s3.create_bucket('test-%d' % int(time.time())) 202 203 # Create some small objects in S3. 204 small_names = [str(uuid.uuid4()) for _ in range(4)] 205 for (i, name) in enumerate(small_names): 206 bucket.new_key(name).set_contents_from_string(str(i)) 207 208 # Wait, clean the connection pool, and make sure it's empty. 209 print(" waiting for all connections to become stale") 210 time.sleep(s3._pool.STALE_DURATION + 1) 211 s3._pool.clean() 212 assert s3._pool.size() == 0 213 print(" pool is empty") 214 215 # Create a big object in S3. 216 big_name = str(uuid.uuid4()) 217 contents = "-" * BIG_SIZE 218 bucket.new_key(big_name).set_contents_from_string(contents) 219 220 # Start some threads to read it and check that they are reading 221 # the correct thing. Each thread will read the object 40 times. 222 threads = [ 223 spawn(read_big_object, s3, bucket, big_name, 20) 224 for _ in range(5) 225 ] 226 227 # Do some other things that may (incorrectly) re-use the same 228 # connections while the big objects are being read. 229 queriers = [ 230 LittleQuerier(bucket, small_names) 231 for _ in range(5) 232 ] 233 234 # Clean up. 235 for t in threads: 236 t.join() 237 for q in queriers: 238 q.stop() 239 240def main(): 241 test_close_connections() 242 test_reuse_connections() 243 244if __name__ == '__main__': 245 main() 246