1"""TestCases for distributed transactions. 2""" 3 4import os 5import time 6import unittest 7 8from test_all import db, test_support, have_threads, verbose, \ 9 get_new_environment_path, get_new_database_path 10 11 12#---------------------------------------------------------------------- 13 14class DBReplication(unittest.TestCase) : 15 def setUp(self) : 16 self.homeDirMaster = get_new_environment_path() 17 self.homeDirClient = get_new_environment_path() 18 19 self.dbenvMaster = db.DBEnv() 20 self.dbenvClient = db.DBEnv() 21 22 # Must use "DB_THREAD" because the Replication Manager will 23 # be executed in other threads but will use the same environment. 24 # http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0 25 self.dbenvMaster.open(self.homeDirMaster, db.DB_CREATE | db.DB_INIT_TXN 26 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | 27 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666) 28 self.dbenvClient.open(self.homeDirClient, db.DB_CREATE | db.DB_INIT_TXN 29 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | 30 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666) 31 32 self.confirmed_master=self.client_startupdone=False 33 def confirmed_master(a,b,c) : 34 if b==db.DB_EVENT_REP_MASTER : 35 self.confirmed_master=True 36 37 def client_startupdone(a,b,c) : 38 if b==db.DB_EVENT_REP_STARTUPDONE : 39 self.client_startupdone=True 40 41 self.dbenvMaster.set_event_notify(confirmed_master) 42 self.dbenvClient.set_event_notify(client_startupdone) 43 44 #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True) 45 #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True) 46 #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True) 47 #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True) 48 49 self.dbMaster = self.dbClient = None 50 51 52 def tearDown(self): 53 if self.dbClient : 54 self.dbClient.close() 55 if self.dbMaster : 56 self.dbMaster.close() 57 58 # Here we assign dummy event handlers to allow GC of the test object. 59 # Since the dummy handler doesn't use any outer scope variable, it 60 # doesn't keep any reference to the test object. 61 def dummy(*args) : 62 pass 63 self.dbenvMaster.set_event_notify(dummy) 64 self.dbenvClient.set_event_notify(dummy) 65 66 self.dbenvClient.close() 67 self.dbenvMaster.close() 68 test_support.rmtree(self.homeDirClient) 69 test_support.rmtree(self.homeDirMaster) 70 71class DBReplicationManager(DBReplication) : 72 def test01_basic_replication(self) : 73 master_port = test_support.find_unused_port() 74 client_port = test_support.find_unused_port() 75 if db.version() >= (5, 2) : 76 self.site = self.dbenvMaster.repmgr_site("127.0.0.1", master_port) 77 self.site.set_config(db.DB_GROUP_CREATOR, True) 78 self.site.set_config(db.DB_LOCAL_SITE, True) 79 self.site2 = self.dbenvMaster.repmgr_site("127.0.0.1", client_port) 80 81 self.site3 = self.dbenvClient.repmgr_site("127.0.0.1", master_port) 82 self.site3.set_config(db.DB_BOOTSTRAP_HELPER, True) 83 self.site4 = self.dbenvClient.repmgr_site("127.0.0.1", client_port) 84 self.site4.set_config(db.DB_LOCAL_SITE, True) 85 86 d = { 87 db.DB_BOOTSTRAP_HELPER: [False, False, True, False], 88 db.DB_GROUP_CREATOR: [True, False, False, False], 89 db.DB_LEGACY: [False, False, False, False], 90 db.DB_LOCAL_SITE: [True, False, False, True], 91 db.DB_REPMGR_PEER: [False, False, False, False ], 92 } 93 94 for i, j in d.items() : 95 for k, v in \ 96 zip([self.site, self.site2, self.site3, self.site4], j) : 97 if v : 98 self.assertTrue(k.get_config(i)) 99 else : 100 self.assertFalse(k.get_config(i)) 101 102 self.assertNotEqual(self.site.get_eid(), self.site2.get_eid()) 103 self.assertNotEqual(self.site3.get_eid(), self.site4.get_eid()) 104 105 for i, j in zip([self.site, self.site2, self.site3, self.site4], \ 106 [master_port, client_port, master_port, client_port]) : 107 addr = i.get_address() 108 self.assertEqual(addr, ("127.0.0.1", j)) 109 110 for i in [self.site, self.site2] : 111 self.assertEqual(i.get_address(), 112 self.dbenvMaster.repmgr_site_by_eid(i.get_eid()).get_address()) 113 for i in [self.site3, self.site4] : 114 self.assertEqual(i.get_address(), 115 self.dbenvClient.repmgr_site_by_eid(i.get_eid()).get_address()) 116 else : 117 self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port) 118 self.dbenvClient.repmgr_set_local_site("127.0.0.1", client_port) 119 self.dbenvMaster.repmgr_add_remote_site("127.0.0.1", client_port) 120 self.dbenvClient.repmgr_add_remote_site("127.0.0.1", master_port) 121 122 self.dbenvMaster.rep_set_nsites(2) 123 self.dbenvClient.rep_set_nsites(2) 124 125 self.dbenvMaster.rep_set_priority(10) 126 self.dbenvClient.rep_set_priority(0) 127 128 self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123) 129 self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321) 130 self.assertEqual(self.dbenvMaster.rep_get_timeout( 131 db.DB_REP_CONNECTION_RETRY), 100123) 132 self.assertEqual(self.dbenvClient.rep_get_timeout( 133 db.DB_REP_CONNECTION_RETRY), 100321) 134 135 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234) 136 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432) 137 self.assertEqual(self.dbenvMaster.rep_get_timeout( 138 db.DB_REP_ELECTION_TIMEOUT), 100234) 139 self.assertEqual(self.dbenvClient.rep_get_timeout( 140 db.DB_REP_ELECTION_TIMEOUT), 100432) 141 142 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345) 143 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543) 144 self.assertEqual(self.dbenvMaster.rep_get_timeout( 145 db.DB_REP_ELECTION_RETRY), 100345) 146 self.assertEqual(self.dbenvClient.rep_get_timeout( 147 db.DB_REP_ELECTION_RETRY), 100543) 148 149 self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL) 150 self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL) 151 152 self.dbenvMaster.repmgr_start(1, db.DB_REP_MASTER); 153 self.dbenvClient.repmgr_start(1, db.DB_REP_CLIENT); 154 155 self.assertEqual(self.dbenvMaster.rep_get_nsites(),2) 156 self.assertEqual(self.dbenvClient.rep_get_nsites(),2) 157 self.assertEqual(self.dbenvMaster.rep_get_priority(),10) 158 self.assertEqual(self.dbenvClient.rep_get_priority(),0) 159 self.assertEqual(self.dbenvMaster.repmgr_get_ack_policy(), 160 db.DB_REPMGR_ACKS_ALL) 161 self.assertEqual(self.dbenvClient.repmgr_get_ack_policy(), 162 db.DB_REPMGR_ACKS_ALL) 163 164 # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE 165 # is not generated if the master has no new transactions. 166 # This is solved in BDB 4.6 (#15542). 167 import time 168 timeout = time.time()+60 169 while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) : 170 time.sleep(0.02) 171 # self.client_startupdone does not always get set to True within 172 # the timeout. On windows this may be a deep issue, on other 173 # platforms it is likely just a timing issue, especially on slow 174 # virthost buildbots (see issue 3892 for more). Even though 175 # the timeout triggers, the rest of this test method usually passes 176 # (but not all of it always, see below). So we just note the 177 # timeout on stderr and keep soldering on. 178 if time.time()>timeout: 179 import sys 180 print >> sys.stderr, ("XXX: timeout happened before" 181 "startup was confirmed - see issue 3892") 182 startup_timeout = True 183 184 d = self.dbenvMaster.repmgr_site_list() 185 self.assertEqual(len(d), 1) 186 d = d.values()[0] # There is only one 187 self.assertEqual(d[0], "127.0.0.1") 188 self.assertEqual(d[1], client_port) 189 self.assertIn(d[2], (db.DB_REPMGR_CONNECTED, db.DB_REPMGR_DISCONNECTED)) 190 191 d = self.dbenvClient.repmgr_site_list() 192 self.assertEqual(len(d), 1) 193 d = d.values()[0] # There is only one 194 self.assertEqual(d[0], "127.0.0.1") 195 self.assertEqual(d[1], master_port) 196 self.assertIn(d[2], (db.DB_REPMGR_CONNECTED, db.DB_REPMGR_DISCONNECTED)) 197 198 if db.version() >= (4,6) : 199 d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR); 200 self.assertIn("msgs_queued", d) 201 202 self.dbMaster=db.DB(self.dbenvMaster) 203 txn=self.dbenvMaster.txn_begin() 204 self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn) 205 txn.commit() 206 207 import time,os.path 208 timeout=time.time()+10 209 while (time.time()<timeout) and \ 210 not (os.path.exists(os.path.join(self.homeDirClient,"test"))) : 211 time.sleep(0.01) 212 213 self.dbClient=db.DB(self.dbenvClient) 214 while True : 215 txn=self.dbenvClient.txn_begin() 216 try : 217 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY, 218 mode=0666, txn=txn) 219 except db.DBRepHandleDeadError : 220 txn.abort() 221 self.dbClient.close() 222 self.dbClient=db.DB(self.dbenvClient) 223 continue 224 225 txn.commit() 226 break 227 228 txn=self.dbenvMaster.txn_begin() 229 self.dbMaster.put("ABC", "123", txn=txn) 230 txn.commit() 231 import time 232 timeout=time.time()+10 233 v=None 234 while (time.time()<timeout) and (v is None) : 235 txn=self.dbenvClient.txn_begin() 236 v=self.dbClient.get("ABC", txn=txn) 237 txn.commit() 238 if v is None : 239 time.sleep(0.02) 240 # If startup did not happen before the timeout above, then this test 241 # sometimes fails. This happens randomly, which causes buildbot 242 # instability, but all the other bsddb tests pass. Since bsddb3 in the 243 # stdlib is currently not getting active maintenance, and is gone in 244 # py3k, we just skip the end of the test in that case. 245 if time.time()>=timeout and startup_timeout: 246 self.skipTest("replication test skipped due to random failure, " 247 "see issue 3892") 248 self.assertLess(time.time(), timeout) 249 self.assertEqual("123", v) 250 251 txn=self.dbenvMaster.txn_begin() 252 self.dbMaster.delete("ABC", txn=txn) 253 txn.commit() 254 timeout=time.time()+10 255 while (time.time()<timeout) and (v is not None) : 256 txn=self.dbenvClient.txn_begin() 257 v=self.dbClient.get("ABC", txn=txn) 258 txn.commit() 259 if v is None : 260 time.sleep(0.02) 261 self.assertLess(time.time(), timeout) 262 self.assertEqual(None, v) 263 264class DBBaseReplication(DBReplication) : 265 def setUp(self) : 266 DBReplication.setUp(self) 267 def confirmed_master(a,b,c) : 268 if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) : 269 self.confirmed_master = True 270 271 def client_startupdone(a,b,c) : 272 if b == db.DB_EVENT_REP_STARTUPDONE : 273 self.client_startupdone = True 274 275 self.dbenvMaster.set_event_notify(confirmed_master) 276 self.dbenvClient.set_event_notify(client_startupdone) 277 278 import Queue 279 self.m2c = Queue.Queue() 280 self.c2m = Queue.Queue() 281 282 # There are only two nodes, so we don't need to 283 # do any routing decision 284 def m2c(dbenv, control, rec, lsnp, envid, flags) : 285 self.m2c.put((control, rec)) 286 287 def c2m(dbenv, control, rec, lsnp, envid, flags) : 288 self.c2m.put((control, rec)) 289 290 self.dbenvMaster.rep_set_transport(13,m2c) 291 self.dbenvMaster.rep_set_priority(10) 292 self.dbenvClient.rep_set_transport(3,c2m) 293 self.dbenvClient.rep_set_priority(0) 294 295 self.assertEqual(self.dbenvMaster.rep_get_priority(),10) 296 self.assertEqual(self.dbenvClient.rep_get_priority(),0) 297 298 #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True) 299 #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True) 300 #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True) 301 #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True) 302 303 def thread_master() : 304 return self.thread_do(self.dbenvMaster, self.c2m, 3, 305 self.master_doing_election, True) 306 307 def thread_client() : 308 return self.thread_do(self.dbenvClient, self.m2c, 13, 309 self.client_doing_election, False) 310 311 from threading import Thread 312 t_m=Thread(target=thread_master) 313 t_c=Thread(target=thread_client) 314 import sys 315 if sys.version_info[0] < 3 : 316 t_m.setDaemon(True) 317 t_c.setDaemon(True) 318 else : 319 t_m.daemon = True 320 t_c.daemon = True 321 322 self.t_m = t_m 323 self.t_c = t_c 324 325 self.dbMaster = self.dbClient = None 326 327 self.master_doing_election=[False] 328 self.client_doing_election=[False] 329 330 331 def tearDown(self): 332 if self.dbClient : 333 self.dbClient.close() 334 if self.dbMaster : 335 self.dbMaster.close() 336 self.m2c.put(None) 337 self.c2m.put(None) 338 self.t_m.join() 339 self.t_c.join() 340 341 # Here we assign dummy event handlers to allow GC of the test object. 342 # Since the dummy handler doesn't use any outer scope variable, it 343 # doesn't keep any reference to the test object. 344 def dummy(*args) : 345 pass 346 self.dbenvMaster.set_event_notify(dummy) 347 self.dbenvClient.set_event_notify(dummy) 348 self.dbenvMaster.rep_set_transport(13,dummy) 349 self.dbenvClient.rep_set_transport(3,dummy) 350 351 self.dbenvClient.close() 352 self.dbenvMaster.close() 353 test_support.rmtree(self.homeDirClient) 354 test_support.rmtree(self.homeDirMaster) 355 356 def basic_rep_threading(self) : 357 self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER) 358 self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT) 359 360 def thread_do(env, q, envid, election_status, must_be_master) : 361 while True : 362 v=q.get() 363 if v is None : return 364 env.rep_process_message(v[0], v[1], envid) 365 366 self.thread_do = thread_do 367 368 self.t_m.start() 369 self.t_c.start() 370 371 def test01_basic_replication(self) : 372 self.basic_rep_threading() 373 374 # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE 375 # is not generated if the master has no new transactions. 376 # This is solved in BDB 4.6 (#15542). 377 import time 378 timeout = time.time()+60 379 while (time.time()<timeout) and not (self.confirmed_master and 380 self.client_startupdone) : 381 time.sleep(0.02) 382 self.assertLess(time.time(), timeout) 383 384 self.dbMaster=db.DB(self.dbenvMaster) 385 txn=self.dbenvMaster.txn_begin() 386 self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn) 387 txn.commit() 388 389 import time,os.path 390 timeout=time.time()+10 391 while (time.time()<timeout) and \ 392 not (os.path.exists(os.path.join(self.homeDirClient,"test"))) : 393 time.sleep(0.01) 394 395 self.dbClient=db.DB(self.dbenvClient) 396 while True : 397 txn=self.dbenvClient.txn_begin() 398 try : 399 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY, 400 mode=0666, txn=txn) 401 except db.DBRepHandleDeadError : 402 txn.abort() 403 self.dbClient.close() 404 self.dbClient=db.DB(self.dbenvClient) 405 continue 406 407 txn.commit() 408 break 409 410 d = self.dbenvMaster.rep_stat(flags=db.DB_STAT_CLEAR); 411 self.assertIn("master_changes", d) 412 413 txn=self.dbenvMaster.txn_begin() 414 self.dbMaster.put("ABC", "123", txn=txn) 415 txn.commit() 416 import time 417 timeout=time.time()+10 418 v=None 419 while (time.time()<timeout) and (v is None) : 420 txn=self.dbenvClient.txn_begin() 421 v=self.dbClient.get("ABC", txn=txn) 422 txn.commit() 423 if v is None : 424 time.sleep(0.02) 425 self.assertLess(time.time(), timeout) 426 self.assertEqual("123", v) 427 428 txn=self.dbenvMaster.txn_begin() 429 self.dbMaster.delete("ABC", txn=txn) 430 txn.commit() 431 timeout=time.time()+10 432 while (time.time()<timeout) and (v is not None) : 433 txn=self.dbenvClient.txn_begin() 434 v=self.dbClient.get("ABC", txn=txn) 435 txn.commit() 436 if v is None : 437 time.sleep(0.02) 438 self.assertLess(time.time(), timeout) 439 self.assertEqual(None, v) 440 441 if db.version() >= (4,7) : 442 def test02_test_request(self) : 443 self.basic_rep_threading() 444 (minimum, maximum) = self.dbenvClient.rep_get_request() 445 self.dbenvClient.rep_set_request(minimum-1, maximum+1) 446 self.assertEqual(self.dbenvClient.rep_get_request(), 447 (minimum-1, maximum+1)) 448 449 if db.version() >= (4,6) : 450 def test03_master_election(self) : 451 # Get ready to hold an election 452 #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER) 453 self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT) 454 self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT) 455 456 def thread_do(env, q, envid, election_status, must_be_master) : 457 while True : 458 v=q.get() 459 if v is None : return 460 r = env.rep_process_message(v[0],v[1],envid) 461 if must_be_master and self.confirmed_master : 462 self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER) 463 must_be_master = False 464 465 if r[0] == db.DB_REP_HOLDELECTION : 466 def elect() : 467 while True : 468 try : 469 env.rep_elect(2, 1) 470 election_status[0] = False 471 break 472 except db.DBRepUnavailError : 473 pass 474 if not election_status[0] and not self.confirmed_master : 475 from threading import Thread 476 election_status[0] = True 477 t=Thread(target=elect) 478 import sys 479 if sys.version_info[0] < 3 : 480 t.setDaemon(True) 481 else : 482 t.daemon = True 483 t.start() 484 485 self.thread_do = thread_do 486 487 self.t_m.start() 488 self.t_c.start() 489 490 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000) 491 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000) 492 self.client_doing_election[0] = True 493 while True : 494 try : 495 self.dbenvClient.rep_elect(2, 1) 496 self.client_doing_election[0] = False 497 break 498 except db.DBRepUnavailError : 499 pass 500 501 self.assertTrue(self.confirmed_master) 502 503 # Race condition showed up after upgrading to Solaris 10 Update 10 504 # https://forums.oracle.com/forums/thread.jspa?messageID=9902860 505 # jcea@jcea.es: See private email from Paula Bingham (Oracle), 506 # in 20110929. 507 while not (self.dbenvClient.rep_stat()["startup_complete"]) : 508 pass 509 510 if db.version() >= (4,7) : 511 def test04_test_clockskew(self) : 512 fast, slow = 1234, 1230 513 self.dbenvMaster.rep_set_clockskew(fast, slow) 514 self.assertEqual((fast, slow), 515 self.dbenvMaster.rep_get_clockskew()) 516 self.basic_rep_threading() 517 518#---------------------------------------------------------------------- 519 520def test_suite(): 521 suite = unittest.TestSuite() 522 if db.version() >= (4, 6) : 523 dbenv = db.DBEnv() 524 try : 525 dbenv.repmgr_get_ack_policy() 526 ReplicationManager_available=True 527 except : 528 ReplicationManager_available=False 529 dbenv.close() 530 del dbenv 531 if ReplicationManager_available : 532 suite.addTest(unittest.makeSuite(DBReplicationManager)) 533 534 if have_threads : 535 suite.addTest(unittest.makeSuite(DBBaseReplication)) 536 537 return suite 538 539 540if __name__ == '__main__': 541 unittest.main(defaultTest='test_suite') 542