Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import datetime 

2from decimal import Decimal 

3import re 

4import time 

5 

6from .err import ProgrammingError 

7from .constants import FIELD_TYPE 

8 

9 

10def escape_item(val, charset, mapping=None): 

11 if mapping is None: 

12 mapping = encoders 

13 encoder = mapping.get(type(val)) 

14 

15 # Fallback to default when no encoder found 

16 if not encoder: 

17 try: 

18 encoder = mapping[str] 

19 except KeyError: 

20 raise TypeError("no default type converter defined") 

21 

22 if encoder in (escape_dict, escape_sequence): 

23 val = encoder(val, charset, mapping) 

24 else: 

25 val = encoder(val, mapping) 

26 return val 

27 

28 

29def escape_dict(val, charset, mapping=None): 

30 n = {} 

31 for k, v in val.items(): 

32 quoted = escape_item(v, charset, mapping) 

33 n[k] = quoted 

34 return n 

35 

36 

37def escape_sequence(val, charset, mapping=None): 

38 n = [] 

39 for item in val: 

40 quoted = escape_item(item, charset, mapping) 

41 n.append(quoted) 

42 return "(" + ",".join(n) + ")" 

43 

44 

45def escape_set(val, charset, mapping=None): 

46 return ",".join([escape_item(x, charset, mapping) for x in val]) 

47 

48 

49def escape_bool(value, mapping=None): 

50 return str(int(value)) 

51 

52 

53def escape_int(value, mapping=None): 

54 return str(value) 

55 

56 

57def escape_float(value, mapping=None): 

58 s = repr(value) 

59 if s in ("inf", "nan"): 

60 raise ProgrammingError("%s can not be used with MySQL" % s) 

61 if "e" not in s: 

62 s += "e0" 

63 return s 

64 

65 

66_escape_table = [chr(x) for x in range(128)] 

67_escape_table[0] = "\\0" 

68_escape_table[ord("\\")] = "\\\\" 

69_escape_table[ord("\n")] = "\\n" 

70_escape_table[ord("\r")] = "\\r" 

71_escape_table[ord("\032")] = "\\Z" 

72_escape_table[ord('"')] = '\\"' 

73_escape_table[ord("'")] = "\\'" 

74 

75 

76def escape_string(value, mapping=None): 

77 """escapes *value* without adding quote. 

78 

79 Value should be unicode 

80 """ 

81 return value.translate(_escape_table) 

82 

83 

84def escape_bytes_prefixed(value, mapping=None): 

85 return "_binary'%s'" % value.decode("ascii", "surrogateescape").translate( 

86 _escape_table 

87 ) 

88 

89 

90def escape_bytes(value, mapping=None): 

91 return "'%s'" % value.decode("ascii", "surrogateescape").translate(_escape_table) 

92 

93 

94def escape_str(value, mapping=None): 

95 return "'%s'" % escape_string(str(value), mapping) 

96 

97 

98def escape_None(value, mapping=None): 

99 return "NULL" 

100 

101 

102def escape_timedelta(obj, mapping=None): 

103 seconds = int(obj.seconds) % 60 

104 minutes = int(obj.seconds // 60) % 60 

105 hours = int(obj.seconds // 3600) % 24 + int(obj.days) * 24 

106 if obj.microseconds: 

107 fmt = "'{0:02d}:{1:02d}:{2:02d}.{3:06d}'" 

108 else: 

109 fmt = "'{0:02d}:{1:02d}:{2:02d}'" 

110 return fmt.format(hours, minutes, seconds, obj.microseconds) 

111 

112 

113def escape_time(obj, mapping=None): 

114 if obj.microsecond: 

115 fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'" 

116 else: 

117 fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}'" 

118 return fmt.format(obj) 

119 

120 

121def escape_datetime(obj, mapping=None): 

122 if obj.microsecond: 

123 fmt = "'{0.year:04}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'" 

124 else: 

125 fmt = "'{0.year:04}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}:{0.second:02}'" 

126 return fmt.format(obj) 

127 

128 

129def escape_date(obj, mapping=None): 

130 fmt = "'{0.year:04}-{0.month:02}-{0.day:02}'" 

131 return fmt.format(obj) 

132 

133 

134def escape_struct_time(obj, mapping=None): 

135 return escape_datetime(datetime.datetime(*obj[:6])) 

136 

137 

138def Decimal2Literal(o, d): 

139 return format(o, "f") 

140 

141 

142def _convert_second_fraction(s): 

143 if not s: 

144 return 0 

145 # Pad zeros to ensure the fraction length in microseconds 

146 s = s.ljust(6, "0") 

147 return int(s[:6]) 

148 

149 

150DATETIME_RE = re.compile( 

151 r"(\d{1,4})-(\d{1,2})-(\d{1,2})[T ](\d{1,2}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?" 

152) 

153 

154 

155def convert_datetime(obj): 

156 """Returns a DATETIME or TIMESTAMP column value as a datetime object: 

157 

158 >>> datetime_or_None('2007-02-25 23:06:20') 

159 datetime.datetime(2007, 2, 25, 23, 6, 20) 

160 >>> datetime_or_None('2007-02-25T23:06:20') 

161 datetime.datetime(2007, 2, 25, 23, 6, 20) 

162 

163 Illegal values are returned as None: 

164 

165 >>> datetime_or_None('2007-02-31T23:06:20') is None 

166 True 

167 >>> datetime_or_None('0000-00-00 00:00:00') is None 

168 True 

169 

170 """ 

171 if isinstance(obj, (bytes, bytearray)): 

172 obj = obj.decode("ascii") 

173 

174 m = DATETIME_RE.match(obj) 

175 if not m: 

176 return convert_date(obj) 

177 

178 try: 

179 groups = list(m.groups()) 

180 groups[-1] = _convert_second_fraction(groups[-1]) 

181 return datetime.datetime(*[int(x) for x in groups]) 

182 except ValueError: 

183 return convert_date(obj) 

184 

185 

186TIMEDELTA_RE = re.compile(r"(-)?(\d{1,3}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?") 

187 

188 

189def convert_timedelta(obj): 

190 """Returns a TIME column as a timedelta object: 

191 

192 >>> timedelta_or_None('25:06:17') 

193 datetime.timedelta(1, 3977) 

194 >>> timedelta_or_None('-25:06:17') 

195 datetime.timedelta(-2, 83177) 

196 

197 Illegal values are returned as None: 

198 

199 >>> timedelta_or_None('random crap') is None 

200 True 

201 

202 Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but 

203 can accept values as (+|-)DD HH:MM:SS. The latter format will not 

204 be parsed correctly by this function. 

205 """ 

206 if isinstance(obj, (bytes, bytearray)): 

207 obj = obj.decode("ascii") 

208 

209 m = TIMEDELTA_RE.match(obj) 

210 if not m: 

211 return obj 

212 

213 try: 

214 groups = list(m.groups()) 

215 groups[-1] = _convert_second_fraction(groups[-1]) 

216 negate = -1 if groups[0] else 1 

217 hours, minutes, seconds, microseconds = groups[1:] 

218 

219 tdelta = ( 

220 datetime.timedelta( 

221 hours=int(hours), 

222 minutes=int(minutes), 

223 seconds=int(seconds), 

224 microseconds=int(microseconds), 

225 ) 

226 * negate 

227 ) 

228 return tdelta 

229 except ValueError: 

230 return obj 

231 

232 

233TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?") 

234 

235 

236def convert_time(obj): 

237 """Returns a TIME column as a time object: 

238 

239 >>> time_or_None('15:06:17') 

240 datetime.time(15, 6, 17) 

241 

242 Illegal values are returned as None: 

243 

244 >>> time_or_None('-25:06:17') is None 

245 True 

246 >>> time_or_None('random crap') is None 

247 True 

248 

249 Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but 

250 can accept values as (+|-)DD HH:MM:SS. The latter format will not 

251 be parsed correctly by this function. 

252 

253 Also note that MySQL's TIME column corresponds more closely to 

254 Python's timedelta and not time. However if you want TIME columns 

255 to be treated as time-of-day and not a time offset, then you can 

256 use set this function as the converter for FIELD_TYPE.TIME. 

257 """ 

258 if isinstance(obj, (bytes, bytearray)): 

259 obj = obj.decode("ascii") 

260 

261 m = TIME_RE.match(obj) 

262 if not m: 

263 return obj 

264 

265 try: 

266 groups = list(m.groups()) 

267 groups[-1] = _convert_second_fraction(groups[-1]) 

268 hours, minutes, seconds, microseconds = groups 

269 return datetime.time( 

270 hour=int(hours), 

271 minute=int(minutes), 

272 second=int(seconds), 

273 microsecond=int(microseconds), 

274 ) 

275 except ValueError: 

276 return obj 

277 

278 

279def convert_date(obj): 

280 """Returns a DATE column as a date object: 

281 

282 >>> date_or_None('2007-02-26') 

283 datetime.date(2007, 2, 26) 

284 

285 Illegal values are returned as None: 

286 

287 >>> date_or_None('2007-02-31') is None 

288 True 

289 >>> date_or_None('0000-00-00') is None 

290 True 

291 

292 """ 

293 if isinstance(obj, (bytes, bytearray)): 

294 obj = obj.decode("ascii") 

295 try: 

296 return datetime.date(*[int(x) for x in obj.split("-", 2)]) 

297 except ValueError: 

298 return obj 

299 

300 

301def through(x): 

302 return x 

303 

304 

305# def convert_bit(b): 

306# b = "\x00" * (8 - len(b)) + b # pad w/ zeroes 

307# return struct.unpack(">Q", b)[0] 

308# 

309# the snippet above is right, but MySQLdb doesn't process bits, 

310# so we shouldn't either 

311convert_bit = through 

312 

313 

314encoders = { 

315 bool: escape_bool, 

316 int: escape_int, 

317 float: escape_float, 

318 str: escape_str, 

319 bytes: escape_bytes, 

320 tuple: escape_sequence, 

321 list: escape_sequence, 

322 set: escape_sequence, 

323 frozenset: escape_sequence, 

324 dict: escape_dict, 

325 type(None): escape_None, 

326 datetime.date: escape_date, 

327 datetime.datetime: escape_datetime, 

328 datetime.timedelta: escape_timedelta, 

329 datetime.time: escape_time, 

330 time.struct_time: escape_struct_time, 

331 Decimal: Decimal2Literal, 

332} 

333 

334 

335decoders = { 

336 FIELD_TYPE.BIT: convert_bit, 

337 FIELD_TYPE.TINY: int, 

338 FIELD_TYPE.SHORT: int, 

339 FIELD_TYPE.LONG: int, 

340 FIELD_TYPE.FLOAT: float, 

341 FIELD_TYPE.DOUBLE: float, 

342 FIELD_TYPE.LONGLONG: int, 

343 FIELD_TYPE.INT24: int, 

344 FIELD_TYPE.YEAR: int, 

345 FIELD_TYPE.TIMESTAMP: convert_datetime, 

346 FIELD_TYPE.DATETIME: convert_datetime, 

347 FIELD_TYPE.TIME: convert_timedelta, 

348 FIELD_TYPE.DATE: convert_date, 

349 FIELD_TYPE.BLOB: through, 

350 FIELD_TYPE.TINY_BLOB: through, 

351 FIELD_TYPE.MEDIUM_BLOB: through, 

352 FIELD_TYPE.LONG_BLOB: through, 

353 FIELD_TYPE.STRING: through, 

354 FIELD_TYPE.VAR_STRING: through, 

355 FIELD_TYPE.VARCHAR: through, 

356 FIELD_TYPE.DECIMAL: Decimal, 

357 FIELD_TYPE.NEWDECIMAL: Decimal, 

358} 

359 

360 

361# for MySQLdb compatibility 

362conversions = encoders.copy() 

363conversions.update(decoders) 

364Thing2Literal = escape_str