1from datetime import ( 2 date, 3 datetime, 4 ) 5 6import re 7 8from webob.byterange import ( 9 ContentRange, 10 Range, 11 ) 12 13from webob.compat import ( 14 PY3, 15 text_type, 16 ) 17 18from webob.datetime_utils import ( 19 parse_date, 20 serialize_date, 21 ) 22 23from webob.util import ( 24 header_docstring, 25 warn_deprecation, 26 ) 27 28 29CHARSET_RE = re.compile(r';\s*charset=([^;]*)', re.I) 30SCHEME_RE = re.compile(r'^[a-z]+:', re.I) 31 32 33_not_given = object() 34 35def environ_getter(key, default=_not_given, rfc_section=None): 36 if rfc_section: 37 doc = header_docstring(key, rfc_section) 38 else: 39 doc = "Gets and sets the ``%s`` key in the environment." % key 40 if default is _not_given: 41 def fget(req): 42 return req.environ[key] 43 def fset(req, val): 44 req.environ[key] = val 45 fdel = None 46 else: 47 def fget(req): 48 return req.environ.get(key, default) 49 def fset(req, val): 50 if val is None: 51 if key in req.environ: 52 del req.environ[key] 53 else: 54 req.environ[key] = val 55 def fdel(req): 56 del req.environ[key] 57 return property(fget, fset, fdel, doc=doc) 58 59 60def environ_decoder(key, default=_not_given, rfc_section=None, 61 encattr=None): 62 if rfc_section: 63 doc = header_docstring(key, rfc_section) 64 else: 65 doc = "Gets and sets the ``%s`` key in the environment." % key 66 if default is _not_given: 67 def fget(req): 68 return req.encget(key, encattr=encattr) 69 def fset(req, val): 70 return req.encset(key, val, encattr=encattr) 71 fdel = None 72 else: 73 def fget(req): 74 return req.encget(key, default, encattr=encattr) 75 def fset(req, val): 76 if val is None: 77 if key in req.environ: 78 del req.environ[key] 79 else: 80 return req.encset(key, val, encattr=encattr) 81 def fdel(req): 82 del req.environ[key] 83 return property(fget, fset, fdel, doc=doc) 84 85def upath_property(key): 86 if PY3: # pragma: no cover 87 def fget(req): 88 encoding = req.url_encoding 89 return req.environ.get(key, '').encode('latin-1').decode(encoding) 90 def fset(req, val): 91 encoding = req.url_encoding 92 req.environ[key] = val.encode(encoding).decode('latin-1') 93 else: 94 def fget(req): 95 encoding = req.url_encoding 96 return req.environ.get(key, '').decode(encoding) 97 def fset(req, val): 98 encoding = req.url_encoding 99 if isinstance(val, text_type): 100 val = val.encode(encoding) 101 req.environ[key] = val 102 return property(fget, fset, doc='upath_property(%r)' % key) 103 104 105def deprecated_property(attr, name, text, version): # pragma: no cover 106 """ 107 Wraps a descriptor, with a deprecation warning or error 108 """ 109 def warn(): 110 warn_deprecation('The attribute %s is deprecated: %s' 111 % (name, text), 112 version, 113 3 114 ) 115 def fget(self): 116 warn() 117 return attr.__get__(self, type(self)) 118 def fset(self, val): 119 warn() 120 attr.__set__(self, val) 121 def fdel(self): 122 warn() 123 attr.__delete__(self) 124 return property(fget, fset, fdel, 125 '<Deprecated attribute %s>' % name 126 ) 127 128 129def header_getter(header, rfc_section): 130 doc = header_docstring(header, rfc_section) 131 key = header.lower() 132 133 def fget(r): 134 for k, v in r._headerlist: 135 if k.lower() == key: 136 return v 137 138 def fset(r, value): 139 fdel(r) 140 if value is not None: 141 if isinstance(value, text_type) and not PY3: 142 value = value.encode('latin-1') 143 r._headerlist.append((header, value)) 144 145 def fdel(r): 146 items = r._headerlist 147 for i in range(len(items)-1, -1, -1): 148 if items[i][0].lower() == key: 149 del items[i] 150 151 return property(fget, fset, fdel, doc) 152 153 154 155 156def converter(prop, parse, serialize, convert_name=None): 157 assert isinstance(prop, property) 158 convert_name = convert_name or "``%s`` and ``%s``" % (parse.__name__, 159 serialize.__name__) 160 doc = prop.__doc__ or '' 161 doc += " Converts it using %s." % convert_name 162 hget, hset = prop.fget, prop.fset 163 def fget(r): 164 return parse(hget(r)) 165 def fset(r, val): 166 if val is not None: 167 val = serialize(val) 168 hset(r, val) 169 return property(fget, fset, prop.fdel, doc) 170 171 172 173def list_header(header, rfc_section): 174 prop = header_getter(header, rfc_section) 175 return converter(prop, parse_list, serialize_list, 'list') 176 177def parse_list(value): 178 if not value: 179 return None 180 return tuple(filter(None, [v.strip() for v in value.split(',')])) 181 182def serialize_list(value): 183 if isinstance(value, (text_type, bytes)): 184 return str(value) 185 else: 186 return ', '.join(map(str, value)) 187 188 189 190 191def converter_date(prop): 192 return converter(prop, parse_date, serialize_date, 'HTTP date') 193 194def date_header(header, rfc_section): 195 return converter_date(header_getter(header, rfc_section)) 196 197 198 199 200 201 202 203 204 205######################## 206## Converter functions 207######################## 208 209 210_rx_etag = re.compile(r'(?:^|\s)(W/)?"((?:\\"|.)*?)"') 211 212def parse_etag_response(value, strong=False): 213 """ 214 Parse a response ETag. 215 See: 216 * http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.19 217 * http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.11 218 """ 219 if not value: 220 return None 221 m = _rx_etag.match(value) 222 if not m: 223 # this etag is invalid, but we'll just return it anyway 224 return value 225 elif strong and m.group(1): 226 # this is a weak etag and we want only strong ones 227 return None 228 else: 229 return m.group(2).replace('\\"', '"') 230 231def serialize_etag_response(value): #return '"%s"' % value.replace('"', '\\"') 232 strong = True 233 if isinstance(value, tuple): 234 value, strong = value 235 elif _rx_etag.match(value): 236 # this is a valid etag already 237 return value 238 # let's quote the value 239 r = '"%s"' % value.replace('"', '\\"') 240 if not strong: 241 r = 'W/' + r 242 return r 243 244def serialize_if_range(value): 245 if isinstance(value, (datetime, date)): 246 return serialize_date(value) 247 value = str(value) 248 return value or None 249 250def parse_range(value): 251 if not value: 252 return None 253 # Might return None too: 254 return Range.parse(value) 255 256def serialize_range(value): 257 if not value: 258 return None 259 elif isinstance(value, (list, tuple)): 260 return str(Range(*value)) 261 else: 262 assert isinstance(value, str) 263 return value 264 265def parse_int(value): 266 if value is None or value == '': 267 return None 268 return int(value) 269 270def parse_int_safe(value): 271 if value is None or value == '': 272 return None 273 try: 274 return int(value) 275 except ValueError: 276 return None 277 278serialize_int = str 279 280def parse_content_range(value): 281 if not value or not value.strip(): 282 return None 283 # May still return None 284 return ContentRange.parse(value) 285 286def serialize_content_range(value): 287 if isinstance(value, (tuple, list)): 288 if len(value) not in (2, 3): 289 raise ValueError( 290 "When setting content_range to a list/tuple, it must " 291 "be length 2 or 3 (not %r)" % value) 292 if len(value) == 2: 293 begin, end = value 294 length = None 295 else: 296 begin, end, length = value 297 value = ContentRange(begin, end, length) 298 value = str(value).strip() 299 if not value: 300 return None 301 return value 302 303 304 305 306_rx_auth_param = re.compile(r'([a-z]+)[ \t]*=[ \t]*(".*?"|[^,]*?)[ \t]*(?:\Z|, *)') 307 308def parse_auth_params(params): 309 r = {} 310 for k, v in _rx_auth_param.findall(params): 311 r[k] = v.strip('"') 312 return r 313 314# see http://lists.w3.org/Archives/Public/ietf-http-wg/2009OctDec/0297.html 315known_auth_schemes = ['Basic', 'Digest', 'WSSE', 'HMACDigest', 'GoogleLogin', 316 'Cookie', 'OpenID'] 317known_auth_schemes = dict.fromkeys(known_auth_schemes, None) 318 319def parse_auth(val): 320 if val is not None: 321 authtype, params = val.split(' ', 1) 322 if authtype in known_auth_schemes: 323 if authtype == 'Basic' and '"' not in params: 324 # this is the "Authentication: Basic XXXXX==" case 325 pass 326 else: 327 params = parse_auth_params(params) 328 return authtype, params 329 return val 330 331def serialize_auth(val): 332 if isinstance(val, (tuple, list)): 333 authtype, params = val 334 if isinstance(params, dict): 335 params = ', '.join(map('%s="%s"'.__mod__, params.items())) 336 assert isinstance(params, str) 337 return '%s %s' % (authtype, params) 338 return val 339