1# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/ 2# 3# Permission is hereby granted, free of charge, to any person obtaining a 4# copy of this software and associated documentation files (the 5# "Software"), to deal in the Software without restriction, including 6# without limitation the rights to use, copy, modify, merge, publish, dis- 7# tribute, sublicense, and/or sell copies of the Software, and to permit 8# persons to whom the Software is furnished to do so, subject to the fol- 9# lowing conditions: 10# 11# The above copyright notice and this permission notice shall be included 12# in all copies or substantial portions of the Software. 13# 14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 16# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 17# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 18# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 20# IN THE SOFTWARE. 21import time 22import os 23 24 25class Submitter(object): 26 27 def __init__(self, sd): 28 self.sd = sd 29 self.input_bucket = self.sd.get_obj('input_bucket') 30 self.output_bucket = self.sd.get_obj('output_bucket') 31 self.output_domain = self.sd.get_obj('output_domain') 32 self.queue = self.sd.get_obj('input_queue') 33 34 def get_key_name(self, fullpath, prefix): 35 key_name = fullpath[len(prefix):] 36 l = key_name.split(os.sep) 37 return '/'.join(l) 38 39 def write_message(self, key, metadata): 40 if self.queue: 41 m = self.queue.new_message() 42 m.for_key(key, metadata) 43 if self.output_bucket: 44 m['OutputBucket'] = self.output_bucket.name 45 self.queue.write(m) 46 47 def submit_file(self, path, metadata=None, cb=None, num_cb=0, prefix='/'): 48 if not metadata: 49 metadata = {} 50 key_name = self.get_key_name(path, prefix) 51 k = self.input_bucket.new_key(key_name) 52 k.update_metadata(metadata) 53 k.set_contents_from_filename(path, replace=False, cb=cb, num_cb=num_cb) 54 self.write_message(k, metadata) 55 56 def submit_path(self, path, tags=None, ignore_dirs=None, cb=None, num_cb=0, status=False, prefix='/'): 57 path = os.path.expanduser(path) 58 path = os.path.expandvars(path) 59 path = os.path.abspath(path) 60 total = 0 61 metadata = {} 62 if tags: 63 metadata['Tags'] = tags 64 l = [] 65 for t in time.gmtime(): 66 l.append(str(t)) 67 metadata['Batch'] = '_'.join(l) 68 if self.output_domain: 69 self.output_domain.put_attributes(metadata['Batch'], {'type' : 'Batch'}) 70 if os.path.isdir(path): 71 for root, dirs, files in os.walk(path): 72 if ignore_dirs: 73 for ignore in ignore_dirs: 74 if ignore in dirs: 75 dirs.remove(ignore) 76 for file in files: 77 fullpath = os.path.join(root, file) 78 if status: 79 print('Submitting %s' % fullpath) 80 self.submit_file(fullpath, metadata, cb, num_cb, prefix) 81 total += 1 82 elif os.path.isfile(path): 83 self.submit_file(path, metadata, cb, num_cb) 84 total += 1 85 else: 86 print('problem with %s' % path) 87 return (metadata['Batch'], total) 88