Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2Collection of query wrappers / abstractions to both facilitate data 

3retrieval and to reduce dependency on DB-specific API. 

4""" 

5 

6from contextlib import contextmanager 

7from datetime import date, datetime, time 

8from functools import partial 

9import re 

10import warnings 

11 

12import numpy as np 

13 

14import pandas._libs.lib as lib 

15 

16from pandas.core.dtypes.common import is_datetime64tz_dtype, is_dict_like, is_list_like 

17from pandas.core.dtypes.dtypes import DatetimeTZDtype 

18from pandas.core.dtypes.missing import isna 

19 

20from pandas.core.api import DataFrame, Series 

21from pandas.core.base import PandasObject 

22from pandas.core.tools.datetimes import to_datetime 

23 

24 

25class SQLAlchemyRequired(ImportError): 

26 pass 

27 

28 

29class DatabaseError(IOError): 

30 pass 

31 

32 

33# ----------------------------------------------------------------------------- 

34# -- Helper functions 

35 

36_SQLALCHEMY_INSTALLED = None 

37 

38 

39def _is_sqlalchemy_connectable(con): 

40 global _SQLALCHEMY_INSTALLED 

41 if _SQLALCHEMY_INSTALLED is None: 

42 try: 

43 import sqlalchemy 

44 

45 _SQLALCHEMY_INSTALLED = True 

46 except ImportError: 

47 _SQLALCHEMY_INSTALLED = False 

48 

49 if _SQLALCHEMY_INSTALLED: 

50 import sqlalchemy # noqa: F811 

51 

52 return isinstance(con, sqlalchemy.engine.Connectable) 

53 else: 

54 return False 

55 

56 

57def _convert_params(sql, params): 

58 """Convert SQL and params args to DBAPI2.0 compliant format.""" 

59 args = [sql] 

60 if params is not None: 

61 if hasattr(params, "keys"): # test if params is a mapping 

62 args += [params] 

63 else: 

64 args += [list(params)] 

65 return args 

66 

67 

68def _process_parse_dates_argument(parse_dates): 

69 """Process parse_dates argument for read_sql functions""" 

70 # handle non-list entries for parse_dates gracefully 

71 if parse_dates is True or parse_dates is None or parse_dates is False: 

72 parse_dates = [] 

73 

74 elif not hasattr(parse_dates, "__iter__"): 

75 parse_dates = [parse_dates] 

76 return parse_dates 

77 

78 

79def _handle_date_column(col, utc=None, format=None): 

80 if isinstance(format, dict): 

81 return to_datetime(col, errors="ignore", **format) 

82 else: 

83 # Allow passing of formatting string for integers 

84 # GH17855 

85 if format is None and ( 

86 issubclass(col.dtype.type, np.floating) 

87 or issubclass(col.dtype.type, np.integer) 

88 ): 

89 format = "s" 

90 if format in ["D", "d", "h", "m", "s", "ms", "us", "ns"]: 

91 return to_datetime(col, errors="coerce", unit=format, utc=utc) 

92 elif is_datetime64tz_dtype(col): 

93 # coerce to UTC timezone 

94 # GH11216 

95 return to_datetime(col, utc=True) 

96 else: 

97 return to_datetime(col, errors="coerce", format=format, utc=utc) 

98 

99 

100def _parse_date_columns(data_frame, parse_dates): 

101 """ 

102 Force non-datetime columns to be read as such. 

103 Supports both string formatted and integer timestamp columns. 

104 """ 

105 parse_dates = _process_parse_dates_argument(parse_dates) 

106 

107 # we want to coerce datetime64_tz dtypes for now to UTC 

108 # we could in theory do a 'nice' conversion from a FixedOffset tz 

109 # GH11216 

110 for col_name, df_col in data_frame.items(): 

111 if is_datetime64tz_dtype(df_col) or col_name in parse_dates: 

112 try: 

113 fmt = parse_dates[col_name] 

114 except TypeError: 

115 fmt = None 

116 data_frame[col_name] = _handle_date_column(df_col, format=fmt) 

117 

118 return data_frame 

119 

120 

121def _wrap_result(data, columns, index_col=None, coerce_float=True, parse_dates=None): 

122 """Wrap result set of query in a DataFrame.""" 

123 

124 frame = DataFrame.from_records(data, columns=columns, coerce_float=coerce_float) 

125 

126 frame = _parse_date_columns(frame, parse_dates) 

127 

128 if index_col is not None: 

129 frame.set_index(index_col, inplace=True) 

130 

131 return frame 

132 

133 

134def execute(sql, con, cur=None, params=None): 

135 """ 

136 Execute the given SQL query using the provided connection object. 

137 

138 Parameters 

139 ---------- 

140 sql : string 

141 SQL query to be executed. 

142 con : SQLAlchemy connectable(engine/connection) or sqlite3 connection 

143 Using SQLAlchemy makes it possible to use any DB supported by the 

144 library. 

145 If a DBAPI2 object, only sqlite3 is supported. 

146 cur : deprecated, cursor is obtained from connection, default: None 

147 params : list or tuple, optional, default: None 

148 List of parameters to pass to execute method. 

149 

150 Returns 

151 ------- 

152 Results Iterable 

153 """ 

154 if cur is None: 

155 pandas_sql = pandasSQL_builder(con) 

156 else: 

157 pandas_sql = pandasSQL_builder(cur, is_cursor=True) 

158 args = _convert_params(sql, params) 

159 return pandas_sql.execute(*args) 

160 

161 

162# ----------------------------------------------------------------------------- 

163# -- Read and write to DataFrames 

164 

165 

166def read_sql_table( 

167 table_name, 

168 con, 

169 schema=None, 

170 index_col=None, 

171 coerce_float=True, 

172 parse_dates=None, 

173 columns=None, 

174 chunksize=None, 

175): 

176 """ 

177 Read SQL database table into a DataFrame. 

178 

179 Given a table name and a SQLAlchemy connectable, returns a DataFrame. 

180 This function does not support DBAPI connections. 

181 

182 Parameters 

183 ---------- 

184 table_name : str 

185 Name of SQL table in database. 

186 con : SQLAlchemy connectable or str 

187 A database URI could be provided as as str. 

188 SQLite DBAPI connection mode not supported. 

189 schema : str, default None 

190 Name of SQL schema in database to query (if database flavor 

191 supports this). Uses default schema if None (default). 

192 index_col : str or list of str, optional, default: None 

193 Column(s) to set as index(MultiIndex). 

194 coerce_float : bool, default True 

195 Attempts to convert values of non-string, non-numeric objects (like 

196 decimal.Decimal) to floating point. Can result in loss of Precision. 

197 parse_dates : list or dict, default None 

198 - List of column names to parse as dates. 

199 - Dict of ``{column_name: format string}`` where format string is 

200 strftime compatible in case of parsing string times or is one of 

201 (D, s, ns, ms, us) in case of parsing integer timestamps. 

202 - Dict of ``{column_name: arg dict}``, where the arg dict corresponds 

203 to the keyword arguments of :func:`pandas.to_datetime` 

204 Especially useful with databases without native Datetime support, 

205 such as SQLite. 

206 columns : list, default None 

207 List of column names to select from SQL table. 

208 chunksize : int, default None 

209 If specified, returns an iterator where `chunksize` is the number of 

210 rows to include in each chunk. 

211 

212 Returns 

213 ------- 

214 DataFrame 

215 A SQL table is returned as two-dimensional data structure with labeled 

216 axes. 

217 

218 See Also 

219 -------- 

220 read_sql_query : Read SQL query into a DataFrame. 

221 read_sql : Read SQL query or database table into a DataFrame. 

222 

223 Notes 

224 ----- 

225 Any datetime values with time zone information will be converted to UTC. 

226 

227 Examples 

228 -------- 

229 >>> pd.read_sql_table('table_name', 'postgres:///db_name') # doctest:+SKIP 

230 """ 

231 

232 con = _engine_builder(con) 

233 if not _is_sqlalchemy_connectable(con): 

234 raise NotImplementedError( 

235 "read_sql_table only supported for SQLAlchemy connectable." 

236 ) 

237 import sqlalchemy 

238 from sqlalchemy.schema import MetaData 

239 

240 meta = MetaData(con, schema=schema) 

241 try: 

242 meta.reflect(only=[table_name], views=True) 

243 except sqlalchemy.exc.InvalidRequestError: 

244 raise ValueError(f"Table {table_name} not found") 

245 

246 pandas_sql = SQLDatabase(con, meta=meta) 

247 table = pandas_sql.read_table( 

248 table_name, 

249 index_col=index_col, 

250 coerce_float=coerce_float, 

251 parse_dates=parse_dates, 

252 columns=columns, 

253 chunksize=chunksize, 

254 ) 

255 

256 if table is not None: 

257 return table 

258 else: 

259 raise ValueError(f"Table {table_name} not found", con) 

260 

261 

262def read_sql_query( 

263 sql, 

264 con, 

265 index_col=None, 

266 coerce_float=True, 

267 params=None, 

268 parse_dates=None, 

269 chunksize=None, 

270): 

271 """ 

272 Read SQL query into a DataFrame. 

273 

274 Returns a DataFrame corresponding to the result set of the query 

275 string. Optionally provide an `index_col` parameter to use one of the 

276 columns as the index, otherwise default integer index will be used. 

277 

278 Parameters 

279 ---------- 

280 sql : str SQL query or SQLAlchemy Selectable (select or text object) 

281 SQL query to be executed. 

282 con : SQLAlchemy connectable(engine/connection), database str URI, 

283 or sqlite3 DBAPI2 connection 

284 Using SQLAlchemy makes it possible to use any DB supported by that 

285 library. 

286 If a DBAPI2 object, only sqlite3 is supported. 

287 index_col : str or list of strings, optional, default: None 

288 Column(s) to set as index(MultiIndex). 

289 coerce_float : bool, default True 

290 Attempts to convert values of non-string, non-numeric objects (like 

291 decimal.Decimal) to floating point. Useful for SQL result sets. 

292 params : list, tuple or dict, optional, default: None 

293 List of parameters to pass to execute method. The syntax used 

294 to pass parameters is database driver dependent. Check your 

295 database driver documentation for which of the five syntax styles, 

296 described in PEP 249's paramstyle, is supported. 

297 Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}. 

298 parse_dates : list or dict, default: None 

299 - List of column names to parse as dates. 

300 - Dict of ``{column_name: format string}`` where format string is 

301 strftime compatible in case of parsing string times, or is one of 

302 (D, s, ns, ms, us) in case of parsing integer timestamps. 

303 - Dict of ``{column_name: arg dict}``, where the arg dict corresponds 

304 to the keyword arguments of :func:`pandas.to_datetime` 

305 Especially useful with databases without native Datetime support, 

306 such as SQLite. 

307 chunksize : int, default None 

308 If specified, return an iterator where `chunksize` is the number of 

309 rows to include in each chunk. 

310 

311 Returns 

312 ------- 

313 DataFrame 

314 

315 See Also 

316 -------- 

317 read_sql_table : Read SQL database table into a DataFrame. 

318 read_sql 

319 

320 Notes 

321 ----- 

322 Any datetime values with time zone information parsed via the `parse_dates` 

323 parameter will be converted to UTC. 

324 """ 

325 pandas_sql = pandasSQL_builder(con) 

326 return pandas_sql.read_query( 

327 sql, 

328 index_col=index_col, 

329 params=params, 

330 coerce_float=coerce_float, 

331 parse_dates=parse_dates, 

332 chunksize=chunksize, 

333 ) 

334 

335 

336def read_sql( 

337 sql, 

338 con, 

339 index_col=None, 

340 coerce_float=True, 

341 params=None, 

342 parse_dates=None, 

343 columns=None, 

344 chunksize=None, 

345): 

346 """ 

347 Read SQL query or database table into a DataFrame. 

348 

349 This function is a convenience wrapper around ``read_sql_table`` and 

350 ``read_sql_query`` (for backward compatibility). It will delegate 

351 to the specific function depending on the provided input. A SQL query 

352 will be routed to ``read_sql_query``, while a database table name will 

353 be routed to ``read_sql_table``. Note that the delegated function might 

354 have more specific notes about their functionality not listed here. 

355 

356 Parameters 

357 ---------- 

358 sql : str or SQLAlchemy Selectable (select or text object) 

359 SQL query to be executed or a table name. 

360 con : SQLAlchemy connectable (engine/connection) or database str URI 

361 or DBAPI2 connection (fallback mode)' 

362 

363 Using SQLAlchemy makes it possible to use any DB supported by that 

364 library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible 

365 for engine disposal and connection closure for the SQLAlchemy connectable. See 

366 `here <https://docs.sqlalchemy.org/en/13/core/connections.html>`_ 

367 index_col : str or list of strings, optional, default: None 

368 Column(s) to set as index(MultiIndex). 

369 coerce_float : bool, default True 

370 Attempts to convert values of non-string, non-numeric objects (like 

371 decimal.Decimal) to floating point, useful for SQL result sets. 

372 params : list, tuple or dict, optional, default: None 

373 List of parameters to pass to execute method. The syntax used 

374 to pass parameters is database driver dependent. Check your 

375 database driver documentation for which of the five syntax styles, 

376 described in PEP 249's paramstyle, is supported. 

377 Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}. 

378 parse_dates : list or dict, default: None 

379 - List of column names to parse as dates. 

380 - Dict of ``{column_name: format string}`` where format string is 

381 strftime compatible in case of parsing string times, or is one of 

382 (D, s, ns, ms, us) in case of parsing integer timestamps. 

383 - Dict of ``{column_name: arg dict}``, where the arg dict corresponds 

384 to the keyword arguments of :func:`pandas.to_datetime` 

385 Especially useful with databases without native Datetime support, 

386 such as SQLite. 

387 columns : list, default: None 

388 List of column names to select from SQL table (only used when reading 

389 a table). 

390 chunksize : int, default None 

391 If specified, return an iterator where `chunksize` is the 

392 number of rows to include in each chunk. 

393 

394 Returns 

395 ------- 

396 DataFrame 

397 

398 See Also 

399 -------- 

400 read_sql_table : Read SQL database table into a DataFrame. 

401 read_sql_query : Read SQL query into a DataFrame. 

402 """ 

403 pandas_sql = pandasSQL_builder(con) 

404 

405 if isinstance(pandas_sql, SQLiteDatabase): 

406 return pandas_sql.read_query( 

407 sql, 

408 index_col=index_col, 

409 params=params, 

410 coerce_float=coerce_float, 

411 parse_dates=parse_dates, 

412 chunksize=chunksize, 

413 ) 

414 

415 try: 

416 _is_table_name = pandas_sql.has_table(sql) 

417 except Exception: 

418 # using generic exception to catch errors from sql drivers (GH24988) 

419 _is_table_name = False 

420 

421 if _is_table_name: 

422 pandas_sql.meta.reflect(only=[sql]) 

423 return pandas_sql.read_table( 

424 sql, 

425 index_col=index_col, 

426 coerce_float=coerce_float, 

427 parse_dates=parse_dates, 

428 columns=columns, 

429 chunksize=chunksize, 

430 ) 

431 else: 

432 return pandas_sql.read_query( 

433 sql, 

434 index_col=index_col, 

435 params=params, 

436 coerce_float=coerce_float, 

437 parse_dates=parse_dates, 

438 chunksize=chunksize, 

439 ) 

440 

441 

442def to_sql( 

443 frame, 

444 name, 

445 con, 

446 schema=None, 

447 if_exists="fail", 

448 index=True, 

449 index_label=None, 

450 chunksize=None, 

451 dtype=None, 

452 method=None, 

453): 

454 """ 

455 Write records stored in a DataFrame to a SQL database. 

456 

457 Parameters 

458 ---------- 

459 frame : DataFrame, Series 

460 name : str 

461 Name of SQL table. 

462 con : SQLAlchemy connectable(engine/connection) or database string URI 

463 or sqlite3 DBAPI2 connection 

464 Using SQLAlchemy makes it possible to use any DB supported by that 

465 library. 

466 If a DBAPI2 object, only sqlite3 is supported. 

467 schema : str, optional 

468 Name of SQL schema in database to write to (if database flavor 

469 supports this). If None, use default schema (default). 

470 if_exists : {'fail', 'replace', 'append'}, default 'fail' 

471 - fail: If table exists, do nothing. 

472 - replace: If table exists, drop it, recreate it, and insert data. 

473 - append: If table exists, insert data. Create if does not exist. 

474 index : boolean, default True 

475 Write DataFrame index as a column. 

476 index_label : str or sequence, optional 

477 Column label for index column(s). If None is given (default) and 

478 `index` is True, then the index names are used. 

479 A sequence should be given if the DataFrame uses MultiIndex. 

480 chunksize : int, optional 

481 Specify the number of rows in each batch to be written at a time. 

482 By default, all rows will be written at once. 

483 dtype : dict or scalar, optional 

484 Specifying the datatype for columns. If a dictionary is used, the 

485 keys should be the column names and the values should be the 

486 SQLAlchemy types or strings for the sqlite3 fallback mode. If a 

487 scalar is provided, it will be applied to all columns. 

488 method : {None, 'multi', callable}, optional 

489 Controls the SQL insertion clause used: 

490 

491 - None : Uses standard SQL ``INSERT`` clause (one per row). 

492 - 'multi': Pass multiple values in a single ``INSERT`` clause. 

493 - callable with signature ``(pd_table, conn, keys, data_iter)``. 

494 

495 Details and a sample callable implementation can be found in the 

496 section :ref:`insert method <io.sql.method>`. 

497 

498 .. versionadded:: 0.24.0 

499 """ 

500 if if_exists not in ("fail", "replace", "append"): 

501 raise ValueError(f"'{if_exists}' is not valid for if_exists") 

502 

503 pandas_sql = pandasSQL_builder(con, schema=schema) 

504 

505 if isinstance(frame, Series): 

506 frame = frame.to_frame() 

507 elif not isinstance(frame, DataFrame): 

508 raise NotImplementedError( 

509 "'frame' argument should be either a Series or a DataFrame" 

510 ) 

511 

512 pandas_sql.to_sql( 

513 frame, 

514 name, 

515 if_exists=if_exists, 

516 index=index, 

517 index_label=index_label, 

518 schema=schema, 

519 chunksize=chunksize, 

520 dtype=dtype, 

521 method=method, 

522 ) 

523 

524 

525def has_table(table_name, con, schema=None): 

526 """ 

527 Check if DataBase has named table. 

528 

529 Parameters 

530 ---------- 

531 table_name: string 

532 Name of SQL table. 

533 con: SQLAlchemy connectable(engine/connection) or sqlite3 DBAPI2 connection 

534 Using SQLAlchemy makes it possible to use any DB supported by that 

535 library. 

536 If a DBAPI2 object, only sqlite3 is supported. 

537 schema : string, default None 

538 Name of SQL schema in database to write to (if database flavor supports 

539 this). If None, use default schema (default). 

540 

541 Returns 

542 ------- 

543 boolean 

544 """ 

545 pandas_sql = pandasSQL_builder(con, schema=schema) 

546 return pandas_sql.has_table(table_name) 

547 

548 

549table_exists = has_table 

550 

551 

552def _engine_builder(con): 

553 """ 

554 Returns a SQLAlchemy engine from a URI (if con is a string) 

555 else it just return con without modifying it. 

556 """ 

557 global _SQLALCHEMY_INSTALLED 

558 if isinstance(con, str): 

559 try: 

560 import sqlalchemy 

561 except ImportError: 

562 _SQLALCHEMY_INSTALLED = False 

563 else: 

564 con = sqlalchemy.create_engine(con) 

565 return con 

566 

567 return con 

568 

569 

570def pandasSQL_builder(con, schema=None, meta=None, is_cursor=False): 

571 """ 

572 Convenience function to return the correct PandasSQL subclass based on the 

573 provided parameters. 

574 """ 

575 # When support for DBAPI connections is removed, 

576 # is_cursor should not be necessary. 

577 con = _engine_builder(con) 

578 if _is_sqlalchemy_connectable(con): 

579 return SQLDatabase(con, schema=schema, meta=meta) 

580 elif isinstance(con, str): 

581 raise ImportError("Using URI string without sqlalchemy installed.") 

582 else: 

583 return SQLiteDatabase(con, is_cursor=is_cursor) 

584 

585 

586class SQLTable(PandasObject): 

587 """ 

588 For mapping Pandas tables to SQL tables. 

589 Uses fact that table is reflected by SQLAlchemy to 

590 do better type conversions. 

591 Also holds various flags needed to avoid having to 

592 pass them between functions all the time. 

593 """ 

594 

595 # TODO: support for multiIndex 

596 

597 def __init__( 

598 self, 

599 name, 

600 pandas_sql_engine, 

601 frame=None, 

602 index=True, 

603 if_exists="fail", 

604 prefix="pandas", 

605 index_label=None, 

606 schema=None, 

607 keys=None, 

608 dtype=None, 

609 ): 

610 self.name = name 

611 self.pd_sql = pandas_sql_engine 

612 self.prefix = prefix 

613 self.frame = frame 

614 self.index = self._index_name(index, index_label) 

615 self.schema = schema 

616 self.if_exists = if_exists 

617 self.keys = keys 

618 self.dtype = dtype 

619 

620 if frame is not None: 

621 # We want to initialize based on a dataframe 

622 self.table = self._create_table_setup() 

623 else: 

624 # no data provided, read-only mode 

625 self.table = self.pd_sql.get_table(self.name, self.schema) 

626 

627 if self.table is None: 

628 raise ValueError(f"Could not init table '{name}'") 

629 

630 def exists(self): 

631 return self.pd_sql.has_table(self.name, self.schema) 

632 

633 def sql_schema(self): 

634 from sqlalchemy.schema import CreateTable 

635 

636 return str(CreateTable(self.table).compile(self.pd_sql.connectable)) 

637 

638 def _execute_create(self): 

639 # Inserting table into database, add to MetaData object 

640 self.table = self.table.tometadata(self.pd_sql.meta) 

641 self.table.create() 

642 

643 def create(self): 

644 if self.exists(): 

645 if self.if_exists == "fail": 

646 raise ValueError(f"Table '{self.name}' already exists.") 

647 elif self.if_exists == "replace": 

648 self.pd_sql.drop_table(self.name, self.schema) 

649 self._execute_create() 

650 elif self.if_exists == "append": 

651 pass 

652 else: 

653 raise ValueError(f"'{self.if_exists}' is not valid for if_exists") 

654 else: 

655 self._execute_create() 

656 

657 def _execute_insert(self, conn, keys, data_iter): 

658 """Execute SQL statement inserting data 

659 

660 Parameters 

661 ---------- 

662 conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection 

663 keys : list of str 

664 Column names 

665 data_iter : generator of list 

666 Each item contains a list of values to be inserted 

667 """ 

668 data = [dict(zip(keys, row)) for row in data_iter] 

669 conn.execute(self.table.insert(), data) 

670 

671 def _execute_insert_multi(self, conn, keys, data_iter): 

672 """Alternative to _execute_insert for DBs support multivalue INSERT. 

673 

674 Note: multi-value insert is usually faster for analytics DBs 

675 and tables containing a few columns 

676 but performance degrades quickly with increase of columns. 

677 """ 

678 data = [dict(zip(keys, row)) for row in data_iter] 

679 conn.execute(self.table.insert(data)) 

680 

681 def insert_data(self): 

682 if self.index is not None: 

683 temp = self.frame.copy() 

684 temp.index.names = self.index 

685 try: 

686 temp.reset_index(inplace=True) 

687 except ValueError as err: 

688 raise ValueError(f"duplicate name in index/columns: {err}") 

689 else: 

690 temp = self.frame 

691 

692 column_names = list(map(str, temp.columns)) 

693 ncols = len(column_names) 

694 data_list = [None] * ncols 

695 blocks = temp._data.blocks 

696 

697 for b in blocks: 

698 if b.is_datetime: 

699 # return datetime.datetime objects 

700 if b.is_datetimetz: 

701 # GH 9086: Ensure we return datetimes with timezone info 

702 # Need to return 2-D data; DatetimeIndex is 1D 

703 d = b.values.to_pydatetime() 

704 d = np.atleast_2d(d) 

705 else: 

706 # convert to microsecond resolution for datetime.datetime 

707 d = b.values.astype("M8[us]").astype(object) 

708 else: 

709 d = np.array(b.get_values(), dtype=object) 

710 

711 # replace NaN with None 

712 if b._can_hold_na: 

713 mask = isna(d) 

714 d[mask] = None 

715 

716 for col_loc, col in zip(b.mgr_locs, d): 

717 data_list[col_loc] = col 

718 

719 return column_names, data_list 

720 

721 def insert(self, chunksize=None, method=None): 

722 

723 # set insert method 

724 if method is None: 

725 exec_insert = self._execute_insert 

726 elif method == "multi": 

727 exec_insert = self._execute_insert_multi 

728 elif callable(method): 

729 exec_insert = partial(method, self) 

730 else: 

731 raise ValueError(f"Invalid parameter `method`: {method}") 

732 

733 keys, data_list = self.insert_data() 

734 

735 nrows = len(self.frame) 

736 

737 if nrows == 0: 

738 return 

739 

740 if chunksize is None: 

741 chunksize = nrows 

742 elif chunksize == 0: 

743 raise ValueError("chunksize argument should be non-zero") 

744 

745 chunks = int(nrows / chunksize) + 1 

746 

747 with self.pd_sql.run_transaction() as conn: 

748 for i in range(chunks): 

749 start_i = i * chunksize 

750 end_i = min((i + 1) * chunksize, nrows) 

751 if start_i >= end_i: 

752 break 

753 

754 chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list]) 

755 exec_insert(conn, keys, chunk_iter) 

756 

757 def _query_iterator( 

758 self, result, chunksize, columns, coerce_float=True, parse_dates=None 

759 ): 

760 """Return generator through chunked result set.""" 

761 

762 while True: 

763 data = result.fetchmany(chunksize) 

764 if not data: 

765 break 

766 else: 

767 self.frame = DataFrame.from_records( 

768 data, columns=columns, coerce_float=coerce_float 

769 ) 

770 

771 self._harmonize_columns(parse_dates=parse_dates) 

772 

773 if self.index is not None: 

774 self.frame.set_index(self.index, inplace=True) 

775 

776 yield self.frame 

777 

778 def read(self, coerce_float=True, parse_dates=None, columns=None, chunksize=None): 

779 

780 if columns is not None and len(columns) > 0: 

781 from sqlalchemy import select 

782 

783 cols = [self.table.c[n] for n in columns] 

784 if self.index is not None: 

785 for idx in self.index[::-1]: 

786 cols.insert(0, self.table.c[idx]) 

787 sql_select = select(cols) 

788 else: 

789 sql_select = self.table.select() 

790 

791 result = self.pd_sql.execute(sql_select) 

792 column_names = result.keys() 

793 

794 if chunksize is not None: 

795 return self._query_iterator( 

796 result, 

797 chunksize, 

798 column_names, 

799 coerce_float=coerce_float, 

800 parse_dates=parse_dates, 

801 ) 

802 else: 

803 data = result.fetchall() 

804 self.frame = DataFrame.from_records( 

805 data, columns=column_names, coerce_float=coerce_float 

806 ) 

807 

808 self._harmonize_columns(parse_dates=parse_dates) 

809 

810 if self.index is not None: 

811 self.frame.set_index(self.index, inplace=True) 

812 

813 return self.frame 

814 

815 def _index_name(self, index, index_label): 

816 # for writing: index=True to include index in sql table 

817 if index is True: 

818 nlevels = self.frame.index.nlevels 

819 # if index_label is specified, set this as index name(s) 

820 if index_label is not None: 

821 if not isinstance(index_label, list): 

822 index_label = [index_label] 

823 if len(index_label) != nlevels: 

824 raise ValueError( 

825 "Length of 'index_label' should match number of " 

826 f"levels, which is {nlevels}" 

827 ) 

828 else: 

829 return index_label 

830 # return the used column labels for the index columns 

831 if ( 

832 nlevels == 1 

833 and "index" not in self.frame.columns 

834 and self.frame.index.name is None 

835 ): 

836 return ["index"] 

837 else: 

838 return [ 

839 l if l is not None else f"level_{i}" 

840 for i, l in enumerate(self.frame.index.names) 

841 ] 

842 

843 # for reading: index=(list of) string to specify column to set as index 

844 elif isinstance(index, str): 

845 return [index] 

846 elif isinstance(index, list): 

847 return index 

848 else: 

849 return None 

850 

851 def _get_column_names_and_types(self, dtype_mapper): 

852 column_names_and_types = [] 

853 if self.index is not None: 

854 for i, idx_label in enumerate(self.index): 

855 idx_type = dtype_mapper(self.frame.index._get_level_values(i)) 

856 column_names_and_types.append((str(idx_label), idx_type, True)) 

857 

858 column_names_and_types += [ 

859 (str(self.frame.columns[i]), dtype_mapper(self.frame.iloc[:, i]), False) 

860 for i in range(len(self.frame.columns)) 

861 ] 

862 

863 return column_names_and_types 

864 

865 def _create_table_setup(self): 

866 from sqlalchemy import Table, Column, PrimaryKeyConstraint 

867 

868 column_names_and_types = self._get_column_names_and_types(self._sqlalchemy_type) 

869 

870 columns = [ 

871 Column(name, typ, index=is_index) 

872 for name, typ, is_index in column_names_and_types 

873 ] 

874 

875 if self.keys is not None: 

876 if not is_list_like(self.keys): 

877 keys = [self.keys] 

878 else: 

879 keys = self.keys 

880 pkc = PrimaryKeyConstraint(*keys, name=self.name + "_pk") 

881 columns.append(pkc) 

882 

883 schema = self.schema or self.pd_sql.meta.schema 

884 

885 # At this point, attach to new metadata, only attach to self.meta 

886 # once table is created. 

887 from sqlalchemy.schema import MetaData 

888 

889 meta = MetaData(self.pd_sql, schema=schema) 

890 

891 return Table(self.name, meta, *columns, schema=schema) 

892 

893 def _harmonize_columns(self, parse_dates=None): 

894 """ 

895 Make the DataFrame's column types align with the SQL table 

896 column types. 

897 Need to work around limited NA value support. Floats are always 

898 fine, ints must always be floats if there are Null values. 

899 Booleans are hard because converting bool column with None replaces 

900 all Nones with false. Therefore only convert bool if there are no 

901 NA values. 

902 Datetimes should already be converted to np.datetime64 if supported, 

903 but here we also force conversion if required. 

904 """ 

905 parse_dates = _process_parse_dates_argument(parse_dates) 

906 

907 for sql_col in self.table.columns: 

908 col_name = sql_col.name 

909 try: 

910 df_col = self.frame[col_name] 

911 

912 # Handle date parsing upfront; don't try to convert columns 

913 # twice 

914 if col_name in parse_dates: 

915 try: 

916 fmt = parse_dates[col_name] 

917 except TypeError: 

918 fmt = None 

919 self.frame[col_name] = _handle_date_column(df_col, format=fmt) 

920 continue 

921 

922 # the type the dataframe column should have 

923 col_type = self._get_dtype(sql_col.type) 

924 

925 if ( 

926 col_type is datetime 

927 or col_type is date 

928 or col_type is DatetimeTZDtype 

929 ): 

930 # Convert tz-aware Datetime SQL columns to UTC 

931 utc = col_type is DatetimeTZDtype 

932 self.frame[col_name] = _handle_date_column(df_col, utc=utc) 

933 elif col_type is float: 

934 # floats support NA, can always convert! 

935 self.frame[col_name] = df_col.astype(col_type, copy=False) 

936 

937 elif len(df_col) == df_col.count(): 

938 # No NA values, can convert ints and bools 

939 if col_type is np.dtype("int64") or col_type is bool: 

940 self.frame[col_name] = df_col.astype(col_type, copy=False) 

941 except KeyError: 

942 pass # this column not in results 

943 

944 def _sqlalchemy_type(self, col): 

945 

946 dtype = self.dtype or {} 

947 if col.name in dtype: 

948 return self.dtype[col.name] 

949 

950 # Infer type of column, while ignoring missing values. 

951 # Needed for inserting typed data containing NULLs, GH 8778. 

952 col_type = lib.infer_dtype(col, skipna=True) 

953 

954 from sqlalchemy.types import ( 

955 BigInteger, 

956 Integer, 

957 Float, 

958 Text, 

959 Boolean, 

960 DateTime, 

961 Date, 

962 Time, 

963 TIMESTAMP, 

964 ) 

965 

966 if col_type == "datetime64" or col_type == "datetime": 

967 # GH 9086: TIMESTAMP is the suggested type if the column contains 

968 # timezone information 

969 try: 

970 if col.dt.tz is not None: 

971 return TIMESTAMP(timezone=True) 

972 except AttributeError: 

973 # The column is actually a DatetimeIndex 

974 if col.tz is not None: 

975 return TIMESTAMP(timezone=True) 

976 return DateTime 

977 if col_type == "timedelta64": 

978 warnings.warn( 

979 "the 'timedelta' type is not supported, and will be " 

980 "written as integer values (ns frequency) to the " 

981 "database.", 

982 UserWarning, 

983 stacklevel=8, 

984 ) 

985 return BigInteger 

986 elif col_type == "floating": 

987 if col.dtype == "float32": 

988 return Float(precision=23) 

989 else: 

990 return Float(precision=53) 

991 elif col_type == "integer": 

992 if col.dtype == "int32": 

993 return Integer 

994 else: 

995 return BigInteger 

996 elif col_type == "boolean": 

997 return Boolean 

998 elif col_type == "date": 

999 return Date 

1000 elif col_type == "time": 

1001 return Time 

1002 elif col_type == "complex": 

1003 raise ValueError("Complex datatypes not supported") 

1004 

1005 return Text 

1006 

1007 def _get_dtype(self, sqltype): 

1008 from sqlalchemy.types import Integer, Float, Boolean, DateTime, Date, TIMESTAMP 

1009 

1010 if isinstance(sqltype, Float): 

1011 return float 

1012 elif isinstance(sqltype, Integer): 

1013 # TODO: Refine integer size. 

1014 return np.dtype("int64") 

1015 elif isinstance(sqltype, TIMESTAMP): 

1016 # we have a timezone capable type 

1017 if not sqltype.timezone: 

1018 return datetime 

1019 return DatetimeTZDtype 

1020 elif isinstance(sqltype, DateTime): 

1021 # Caution: np.datetime64 is also a subclass of np.number. 

1022 return datetime 

1023 elif isinstance(sqltype, Date): 

1024 return date 

1025 elif isinstance(sqltype, Boolean): 

1026 return bool 

1027 return object 

1028 

1029 

1030class PandasSQL(PandasObject): 

1031 """ 

1032 Subclasses Should define read_sql and to_sql. 

1033 """ 

1034 

1035 def read_sql(self, *args, **kwargs): 

1036 raise ValueError( 

1037 "PandasSQL must be created with an SQLAlchemy " 

1038 "connectable or sqlite connection" 

1039 ) 

1040 

1041 def to_sql(self, *args, **kwargs): 

1042 raise ValueError( 

1043 "PandasSQL must be created with an SQLAlchemy " 

1044 "connectable or sqlite connection" 

1045 ) 

1046 

1047 

1048class SQLDatabase(PandasSQL): 

1049 """ 

1050 This class enables conversion between DataFrame and SQL databases 

1051 using SQLAlchemy to handle DataBase abstraction. 

1052 

1053 Parameters 

1054 ---------- 

1055 engine : SQLAlchemy connectable 

1056 Connectable to connect with the database. Using SQLAlchemy makes it 

1057 possible to use any DB supported by that library. 

1058 schema : string, default None 

1059 Name of SQL schema in database to write to (if database flavor 

1060 supports this). If None, use default schema (default). 

1061 meta : SQLAlchemy MetaData object, default None 

1062 If provided, this MetaData object is used instead of a newly 

1063 created. This allows to specify database flavor specific 

1064 arguments in the MetaData object. 

1065 

1066 """ 

1067 

1068 def __init__(self, engine, schema=None, meta=None): 

1069 self.connectable = engine 

1070 if not meta: 

1071 from sqlalchemy.schema import MetaData 

1072 

1073 meta = MetaData(self.connectable, schema=schema) 

1074 

1075 self.meta = meta 

1076 

1077 @contextmanager 

1078 def run_transaction(self): 

1079 with self.connectable.begin() as tx: 

1080 if hasattr(tx, "execute"): 

1081 yield tx 

1082 else: 

1083 yield self.connectable 

1084 

1085 def execute(self, *args, **kwargs): 

1086 """Simple passthrough to SQLAlchemy connectable""" 

1087 return self.connectable.execute(*args, **kwargs) 

1088 

1089 def read_table( 

1090 self, 

1091 table_name, 

1092 index_col=None, 

1093 coerce_float=True, 

1094 parse_dates=None, 

1095 columns=None, 

1096 schema=None, 

1097 chunksize=None, 

1098 ): 

1099 """Read SQL database table into a DataFrame. 

1100 

1101 Parameters 

1102 ---------- 

1103 table_name : string 

1104 Name of SQL table in database. 

1105 index_col : string, optional, default: None 

1106 Column to set as index. 

1107 coerce_float : boolean, default True 

1108 Attempts to convert values of non-string, non-numeric objects 

1109 (like decimal.Decimal) to floating point. This can result in 

1110 loss of precision. 

1111 parse_dates : list or dict, default: None 

1112 - List of column names to parse as dates. 

1113 - Dict of ``{column_name: format string}`` where format string is 

1114 strftime compatible in case of parsing string times, or is one of 

1115 (D, s, ns, ms, us) in case of parsing integer timestamps. 

1116 - Dict of ``{column_name: arg}``, where the arg corresponds 

1117 to the keyword arguments of :func:`pandas.to_datetime`. 

1118 Especially useful with databases without native Datetime support, 

1119 such as SQLite. 

1120 columns : list, default: None 

1121 List of column names to select from SQL table. 

1122 schema : string, default None 

1123 Name of SQL schema in database to query (if database flavor 

1124 supports this). If specified, this overwrites the default 

1125 schema of the SQL database object. 

1126 chunksize : int, default None 

1127 If specified, return an iterator where `chunksize` is the number 

1128 of rows to include in each chunk. 

1129 

1130 Returns 

1131 ------- 

1132 DataFrame 

1133 

1134 See Also 

1135 -------- 

1136 pandas.read_sql_table 

1137 SQLDatabase.read_query 

1138 

1139 """ 

1140 table = SQLTable(table_name, self, index=index_col, schema=schema) 

1141 return table.read( 

1142 coerce_float=coerce_float, 

1143 parse_dates=parse_dates, 

1144 columns=columns, 

1145 chunksize=chunksize, 

1146 ) 

1147 

1148 @staticmethod 

1149 def _query_iterator( 

1150 result, chunksize, columns, index_col=None, coerce_float=True, parse_dates=None 

1151 ): 

1152 """Return generator through chunked result set""" 

1153 

1154 while True: 

1155 data = result.fetchmany(chunksize) 

1156 if not data: 

1157 break 

1158 else: 

1159 yield _wrap_result( 

1160 data, 

1161 columns, 

1162 index_col=index_col, 

1163 coerce_float=coerce_float, 

1164 parse_dates=parse_dates, 

1165 ) 

1166 

1167 def read_query( 

1168 self, 

1169 sql, 

1170 index_col=None, 

1171 coerce_float=True, 

1172 parse_dates=None, 

1173 params=None, 

1174 chunksize=None, 

1175 ): 

1176 """Read SQL query into a DataFrame. 

1177 

1178 Parameters 

1179 ---------- 

1180 sql : string 

1181 SQL query to be executed. 

1182 index_col : string, optional, default: None 

1183 Column name to use as index for the returned DataFrame object. 

1184 coerce_float : boolean, default True 

1185 Attempt to convert values of non-string, non-numeric objects (like 

1186 decimal.Decimal) to floating point, useful for SQL result sets. 

1187 params : list, tuple or dict, optional, default: None 

1188 List of parameters to pass to execute method. The syntax used 

1189 to pass parameters is database driver dependent. Check your 

1190 database driver documentation for which of the five syntax styles, 

1191 described in PEP 249's paramstyle, is supported. 

1192 Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'} 

1193 parse_dates : list or dict, default: None 

1194 - List of column names to parse as dates. 

1195 - Dict of ``{column_name: format string}`` where format string is 

1196 strftime compatible in case of parsing string times, or is one of 

1197 (D, s, ns, ms, us) in case of parsing integer timestamps. 

1198 - Dict of ``{column_name: arg dict}``, where the arg dict 

1199 corresponds to the keyword arguments of 

1200 :func:`pandas.to_datetime` Especially useful with databases 

1201 without native Datetime support, such as SQLite. 

1202 chunksize : int, default None 

1203 If specified, return an iterator where `chunksize` is the number 

1204 of rows to include in each chunk. 

1205 

1206 Returns 

1207 ------- 

1208 DataFrame 

1209 

1210 See Also 

1211 -------- 

1212 read_sql_table : Read SQL database table into a DataFrame. 

1213 read_sql 

1214 

1215 """ 

1216 args = _convert_params(sql, params) 

1217 

1218 result = self.execute(*args) 

1219 columns = result.keys() 

1220 

1221 if chunksize is not None: 

1222 return self._query_iterator( 

1223 result, 

1224 chunksize, 

1225 columns, 

1226 index_col=index_col, 

1227 coerce_float=coerce_float, 

1228 parse_dates=parse_dates, 

1229 ) 

1230 else: 

1231 data = result.fetchall() 

1232 frame = _wrap_result( 

1233 data, 

1234 columns, 

1235 index_col=index_col, 

1236 coerce_float=coerce_float, 

1237 parse_dates=parse_dates, 

1238 ) 

1239 return frame 

1240 

1241 read_sql = read_query 

1242 

1243 def to_sql( 

1244 self, 

1245 frame, 

1246 name, 

1247 if_exists="fail", 

1248 index=True, 

1249 index_label=None, 

1250 schema=None, 

1251 chunksize=None, 

1252 dtype=None, 

1253 method=None, 

1254 ): 

1255 """ 

1256 Write records stored in a DataFrame to a SQL database. 

1257 

1258 Parameters 

1259 ---------- 

1260 frame : DataFrame 

1261 name : string 

1262 Name of SQL table. 

1263 if_exists : {'fail', 'replace', 'append'}, default 'fail' 

1264 - fail: If table exists, do nothing. 

1265 - replace: If table exists, drop it, recreate it, and insert data. 

1266 - append: If table exists, insert data. Create if does not exist. 

1267 index : boolean, default True 

1268 Write DataFrame index as a column. 

1269 index_label : string or sequence, default None 

1270 Column label for index column(s). If None is given (default) and 

1271 `index` is True, then the index names are used. 

1272 A sequence should be given if the DataFrame uses MultiIndex. 

1273 schema : string, default None 

1274 Name of SQL schema in database to write to (if database flavor 

1275 supports this). If specified, this overwrites the default 

1276 schema of the SQLDatabase object. 

1277 chunksize : int, default None 

1278 If not None, then rows will be written in batches of this size at a 

1279 time. If None, all rows will be written at once. 

1280 dtype : single type or dict of column name to SQL type, default None 

1281 Optional specifying the datatype for columns. The SQL type should 

1282 be a SQLAlchemy type. If all columns are of the same type, one 

1283 single value can be used. 

1284 method : {None', 'multi', callable}, default None 

1285 Controls the SQL insertion clause used: 

1286 

1287 * None : Uses standard SQL ``INSERT`` clause (one per row). 

1288 * 'multi': Pass multiple values in a single ``INSERT`` clause. 

1289 * callable with signature ``(pd_table, conn, keys, data_iter)``. 

1290 

1291 Details and a sample callable implementation can be found in the 

1292 section :ref:`insert method <io.sql.method>`. 

1293 

1294 .. versionadded:: 0.24.0 

1295 """ 

1296 if dtype and not is_dict_like(dtype): 

1297 dtype = {col_name: dtype for col_name in frame} 

1298 

1299 if dtype is not None: 

1300 from sqlalchemy.types import to_instance, TypeEngine 

1301 

1302 for col, my_type in dtype.items(): 

1303 if not isinstance(to_instance(my_type), TypeEngine): 

1304 raise ValueError(f"The type of {col} is not a SQLAlchemy type") 

1305 

1306 table = SQLTable( 

1307 name, 

1308 self, 

1309 frame=frame, 

1310 index=index, 

1311 if_exists=if_exists, 

1312 index_label=index_label, 

1313 schema=schema, 

1314 dtype=dtype, 

1315 ) 

1316 table.create() 

1317 table.insert(chunksize, method=method) 

1318 if not name.isdigit() and not name.islower(): 

1319 # check for potentially case sensitivity issues (GH7815) 

1320 # Only check when name is not a number and name is not lower case 

1321 engine = self.connectable.engine 

1322 with self.connectable.connect() as conn: 

1323 table_names = engine.table_names( 

1324 schema=schema or self.meta.schema, connection=conn 

1325 ) 

1326 if name not in table_names: 

1327 msg = ( 

1328 f"The provided table name '{name}' is not found exactly as " 

1329 "such in the database after writing the table, possibly " 

1330 "due to case sensitivity issues. Consider using lower " 

1331 "case table names." 

1332 ) 

1333 warnings.warn(msg, UserWarning) 

1334 

1335 @property 

1336 def tables(self): 

1337 return self.meta.tables 

1338 

1339 def has_table(self, name, schema=None): 

1340 return self.connectable.run_callable( 

1341 self.connectable.dialect.has_table, name, schema or self.meta.schema 

1342 ) 

1343 

1344 def get_table(self, table_name, schema=None): 

1345 schema = schema or self.meta.schema 

1346 if schema: 

1347 tbl = self.meta.tables.get(".".join([schema, table_name])) 

1348 else: 

1349 tbl = self.meta.tables.get(table_name) 

1350 

1351 # Avoid casting double-precision floats into decimals 

1352 from sqlalchemy import Numeric 

1353 

1354 for column in tbl.columns: 

1355 if isinstance(column.type, Numeric): 

1356 column.type.asdecimal = False 

1357 

1358 return tbl 

1359 

1360 def drop_table(self, table_name, schema=None): 

1361 schema = schema or self.meta.schema 

1362 if self.has_table(table_name, schema): 

1363 self.meta.reflect(only=[table_name], schema=schema) 

1364 self.get_table(table_name, schema).drop() 

1365 self.meta.clear() 

1366 

1367 def _create_sql_schema(self, frame, table_name, keys=None, dtype=None): 

1368 table = SQLTable( 

1369 table_name, self, frame=frame, index=False, keys=keys, dtype=dtype 

1370 ) 

1371 return str(table.sql_schema()) 

1372 

1373 

1374# ---- SQL without SQLAlchemy --- 

1375# sqlite-specific sql strings and handler class 

1376# dictionary used for readability purposes 

1377_SQL_TYPES = { 

1378 "string": "TEXT", 

1379 "floating": "REAL", 

1380 "integer": "INTEGER", 

1381 "datetime": "TIMESTAMP", 

1382 "date": "DATE", 

1383 "time": "TIME", 

1384 "boolean": "INTEGER", 

1385} 

1386 

1387 

1388def _get_unicode_name(name): 

1389 try: 

1390 uname = str(name).encode("utf-8", "strict").decode("utf-8") 

1391 except UnicodeError: 

1392 raise ValueError(f"Cannot convert identifier to UTF-8: '{name}'") 

1393 return uname 

1394 

1395 

1396def _get_valid_sqlite_name(name): 

1397 # See https://stackoverflow.com/questions/6514274/how-do-you-escape-strings\ 

1398 # -for-sqlite-table-column-names-in-python 

1399 # Ensure the string can be encoded as UTF-8. 

1400 # Ensure the string does not include any NUL characters. 

1401 # Replace all " with "". 

1402 # Wrap the entire thing in double quotes. 

1403 

1404 uname = _get_unicode_name(name) 

1405 if not len(uname): 

1406 raise ValueError("Empty table or column name specified") 

1407 

1408 nul_index = uname.find("\x00") 

1409 if nul_index >= 0: 

1410 raise ValueError("SQLite identifier cannot contain NULs") 

1411 return '"' + uname.replace('"', '""') + '"' 

1412 

1413 

1414_SAFE_NAMES_WARNING = ( 

1415 "The spaces in these column names will not be changed. " 

1416 "In pandas versions < 0.14, spaces were converted to " 

1417 "underscores." 

1418) 

1419 

1420 

1421class SQLiteTable(SQLTable): 

1422 """ 

1423 Patch the SQLTable for fallback support. 

1424 Instead of a table variable just use the Create Table statement. 

1425 """ 

1426 

1427 def __init__(self, *args, **kwargs): 

1428 # GH 8341 

1429 # register an adapter callable for datetime.time object 

1430 import sqlite3 

1431 

1432 # this will transform time(12,34,56,789) into '12:34:56.000789' 

1433 # (this is what sqlalchemy does) 

1434 sqlite3.register_adapter(time, lambda _: _.strftime("%H:%M:%S.%f")) 

1435 super().__init__(*args, **kwargs) 

1436 

1437 def sql_schema(self): 

1438 return str(";\n".join(self.table)) 

1439 

1440 def _execute_create(self): 

1441 with self.pd_sql.run_transaction() as conn: 

1442 for stmt in self.table: 

1443 conn.execute(stmt) 

1444 

1445 def insert_statement(self): 

1446 names = list(map(str, self.frame.columns)) 

1447 wld = "?" # wildcard char 

1448 escape = _get_valid_sqlite_name 

1449 

1450 if self.index is not None: 

1451 for idx in self.index[::-1]: 

1452 names.insert(0, idx) 

1453 

1454 bracketed_names = [escape(column) for column in names] 

1455 col_names = ",".join(bracketed_names) 

1456 wildcards = ",".join([wld] * len(names)) 

1457 insert_statement = ( 

1458 f"INSERT INTO {escape(self.name)} ({col_names}) VALUES ({wildcards})" 

1459 ) 

1460 return insert_statement 

1461 

1462 def _execute_insert(self, conn, keys, data_iter): 

1463 data_list = list(data_iter) 

1464 conn.executemany(self.insert_statement(), data_list) 

1465 

1466 def _create_table_setup(self): 

1467 """ 

1468 Return a list of SQL statements that creates a table reflecting the 

1469 structure of a DataFrame. The first entry will be a CREATE TABLE 

1470 statement while the rest will be CREATE INDEX statements. 

1471 """ 

1472 column_names_and_types = self._get_column_names_and_types(self._sql_type_name) 

1473 

1474 pat = re.compile(r"\s+") 

1475 column_names = [col_name for col_name, _, _ in column_names_and_types] 

1476 if any(map(pat.search, column_names)): 

1477 warnings.warn(_SAFE_NAMES_WARNING, stacklevel=6) 

1478 

1479 escape = _get_valid_sqlite_name 

1480 

1481 create_tbl_stmts = [ 

1482 escape(cname) + " " + ctype for cname, ctype, _ in column_names_and_types 

1483 ] 

1484 

1485 if self.keys is not None and len(self.keys): 

1486 if not is_list_like(self.keys): 

1487 keys = [self.keys] 

1488 else: 

1489 keys = self.keys 

1490 cnames_br = ", ".join(escape(c) for c in keys) 

1491 create_tbl_stmts.append( 

1492 f"CONSTRAINT {self.name}_pk PRIMARY KEY ({cnames_br})" 

1493 ) 

1494 

1495 create_stmts = [ 

1496 "CREATE TABLE " 

1497 + escape(self.name) 

1498 + " (\n" 

1499 + ",\n ".join(create_tbl_stmts) 

1500 + "\n)" 

1501 ] 

1502 

1503 ix_cols = [cname for cname, _, is_index in column_names_and_types if is_index] 

1504 if len(ix_cols): 

1505 cnames = "_".join(ix_cols) 

1506 cnames_br = ",".join(escape(c) for c in ix_cols) 

1507 create_stmts.append( 

1508 "CREATE INDEX " 

1509 + escape("ix_" + self.name + "_" + cnames) 

1510 + "ON " 

1511 + escape(self.name) 

1512 + " (" 

1513 + cnames_br 

1514 + ")" 

1515 ) 

1516 

1517 return create_stmts 

1518 

1519 def _sql_type_name(self, col): 

1520 dtype = self.dtype or {} 

1521 if col.name in dtype: 

1522 return dtype[col.name] 

1523 

1524 # Infer type of column, while ignoring missing values. 

1525 # Needed for inserting typed data containing NULLs, GH 8778. 

1526 col_type = lib.infer_dtype(col, skipna=True) 

1527 

1528 if col_type == "timedelta64": 

1529 warnings.warn( 

1530 "the 'timedelta' type is not supported, and will be " 

1531 "written as integer values (ns frequency) to the " 

1532 "database.", 

1533 UserWarning, 

1534 stacklevel=8, 

1535 ) 

1536 col_type = "integer" 

1537 

1538 elif col_type == "datetime64": 

1539 col_type = "datetime" 

1540 

1541 elif col_type == "empty": 

1542 col_type = "string" 

1543 

1544 elif col_type == "complex": 

1545 raise ValueError("Complex datatypes not supported") 

1546 

1547 if col_type not in _SQL_TYPES: 

1548 col_type = "string" 

1549 

1550 return _SQL_TYPES[col_type] 

1551 

1552 

1553class SQLiteDatabase(PandasSQL): 

1554 """ 

1555 Version of SQLDatabase to support SQLite connections (fallback without 

1556 SQLAlchemy). This should only be used internally. 

1557 

1558 Parameters 

1559 ---------- 

1560 con : sqlite connection object 

1561 

1562 """ 

1563 

1564 def __init__(self, con, is_cursor=False): 

1565 self.is_cursor = is_cursor 

1566 self.con = con 

1567 

1568 @contextmanager 

1569 def run_transaction(self): 

1570 cur = self.con.cursor() 

1571 try: 

1572 yield cur 

1573 self.con.commit() 

1574 except Exception: 

1575 self.con.rollback() 

1576 raise 

1577 finally: 

1578 cur.close() 

1579 

1580 def execute(self, *args, **kwargs): 

1581 if self.is_cursor: 

1582 cur = self.con 

1583 else: 

1584 cur = self.con.cursor() 

1585 try: 

1586 cur.execute(*args, **kwargs) 

1587 return cur 

1588 except Exception as exc: 

1589 try: 

1590 self.con.rollback() 

1591 except Exception as inner_exc: # pragma: no cover 

1592 ex = DatabaseError( 

1593 f"Execution failed on sql: {args[0]}\n{exc}\nunable to rollback" 

1594 ) 

1595 raise ex from inner_exc 

1596 

1597 ex = DatabaseError(f"Execution failed on sql '{args[0]}': {exc}") 

1598 raise ex from exc 

1599 

1600 @staticmethod 

1601 def _query_iterator( 

1602 cursor, chunksize, columns, index_col=None, coerce_float=True, parse_dates=None 

1603 ): 

1604 """Return generator through chunked result set""" 

1605 

1606 while True: 

1607 data = cursor.fetchmany(chunksize) 

1608 if type(data) == tuple: 

1609 data = list(data) 

1610 if not data: 

1611 cursor.close() 

1612 break 

1613 else: 

1614 yield _wrap_result( 

1615 data, 

1616 columns, 

1617 index_col=index_col, 

1618 coerce_float=coerce_float, 

1619 parse_dates=parse_dates, 

1620 ) 

1621 

1622 def read_query( 

1623 self, 

1624 sql, 

1625 index_col=None, 

1626 coerce_float=True, 

1627 params=None, 

1628 parse_dates=None, 

1629 chunksize=None, 

1630 ): 

1631 

1632 args = _convert_params(sql, params) 

1633 cursor = self.execute(*args) 

1634 columns = [col_desc[0] for col_desc in cursor.description] 

1635 

1636 if chunksize is not None: 

1637 return self._query_iterator( 

1638 cursor, 

1639 chunksize, 

1640 columns, 

1641 index_col=index_col, 

1642 coerce_float=coerce_float, 

1643 parse_dates=parse_dates, 

1644 ) 

1645 else: 

1646 data = self._fetchall_as_list(cursor) 

1647 cursor.close() 

1648 

1649 frame = _wrap_result( 

1650 data, 

1651 columns, 

1652 index_col=index_col, 

1653 coerce_float=coerce_float, 

1654 parse_dates=parse_dates, 

1655 ) 

1656 return frame 

1657 

1658 def _fetchall_as_list(self, cur): 

1659 result = cur.fetchall() 

1660 if not isinstance(result, list): 

1661 result = list(result) 

1662 return result 

1663 

1664 def to_sql( 

1665 self, 

1666 frame, 

1667 name, 

1668 if_exists="fail", 

1669 index=True, 

1670 index_label=None, 

1671 schema=None, 

1672 chunksize=None, 

1673 dtype=None, 

1674 method=None, 

1675 ): 

1676 """ 

1677 Write records stored in a DataFrame to a SQL database. 

1678 

1679 Parameters 

1680 ---------- 

1681 frame: DataFrame 

1682 name: string 

1683 Name of SQL table. 

1684 if_exists: {'fail', 'replace', 'append'}, default 'fail' 

1685 fail: If table exists, do nothing. 

1686 replace: If table exists, drop it, recreate it, and insert data. 

1687 append: If table exists, insert data. Create if it does not exist. 

1688 index : boolean, default True 

1689 Write DataFrame index as a column 

1690 index_label : string or sequence, default None 

1691 Column label for index column(s). If None is given (default) and 

1692 `index` is True, then the index names are used. 

1693 A sequence should be given if the DataFrame uses MultiIndex. 

1694 schema : string, default None 

1695 Ignored parameter included for compatibility with SQLAlchemy 

1696 version of ``to_sql``. 

1697 chunksize : int, default None 

1698 If not None, then rows will be written in batches of this 

1699 size at a time. If None, all rows will be written at once. 

1700 dtype : single type or dict of column name to SQL type, default None 

1701 Optional specifying the datatype for columns. The SQL type should 

1702 be a string. If all columns are of the same type, one single value 

1703 can be used. 

1704 method : {None, 'multi', callable}, default None 

1705 Controls the SQL insertion clause used: 

1706 

1707 * None : Uses standard SQL ``INSERT`` clause (one per row). 

1708 * 'multi': Pass multiple values in a single ``INSERT`` clause. 

1709 * callable with signature ``(pd_table, conn, keys, data_iter)``. 

1710 

1711 Details and a sample callable implementation can be found in the 

1712 section :ref:`insert method <io.sql.method>`. 

1713 

1714 .. versionadded:: 0.24.0 

1715 """ 

1716 if dtype and not is_dict_like(dtype): 

1717 dtype = {col_name: dtype for col_name in frame} 

1718 

1719 if dtype is not None: 

1720 for col, my_type in dtype.items(): 

1721 if not isinstance(my_type, str): 

1722 raise ValueError(f"{col} ({my_type}) not a string") 

1723 

1724 table = SQLiteTable( 

1725 name, 

1726 self, 

1727 frame=frame, 

1728 index=index, 

1729 if_exists=if_exists, 

1730 index_label=index_label, 

1731 dtype=dtype, 

1732 ) 

1733 table.create() 

1734 table.insert(chunksize, method) 

1735 

1736 def has_table(self, name, schema=None): 

1737 # TODO(wesm): unused? 

1738 # escape = _get_valid_sqlite_name 

1739 # esc_name = escape(name) 

1740 

1741 wld = "?" 

1742 query = f"SELECT name FROM sqlite_master WHERE type='table' AND name={wld};" 

1743 

1744 return len(self.execute(query, [name]).fetchall()) > 0 

1745 

1746 def get_table(self, table_name, schema=None): 

1747 return None # not supported in fallback mode 

1748 

1749 def drop_table(self, name, schema=None): 

1750 drop_sql = f"DROP TABLE {_get_valid_sqlite_name(name)}" 

1751 self.execute(drop_sql) 

1752 

1753 def _create_sql_schema(self, frame, table_name, keys=None, dtype=None): 

1754 table = SQLiteTable( 

1755 table_name, self, frame=frame, index=False, keys=keys, dtype=dtype 

1756 ) 

1757 return str(table.sql_schema()) 

1758 

1759 

1760def get_schema(frame, name, keys=None, con=None, dtype=None): 

1761 """ 

1762 Get the SQL db table schema for the given frame. 

1763 

1764 Parameters 

1765 ---------- 

1766 frame : DataFrame 

1767 name : string 

1768 name of SQL table 

1769 keys : string or sequence, default: None 

1770 columns to use a primary key 

1771 con: an open SQL database connection object or a SQLAlchemy connectable 

1772 Using SQLAlchemy makes it possible to use any DB supported by that 

1773 library, default: None 

1774 If a DBAPI2 object, only sqlite3 is supported. 

1775 dtype : dict of column name to SQL type, default None 

1776 Optional specifying the datatype for columns. The SQL type should 

1777 be a SQLAlchemy type, or a string for sqlite3 fallback connection. 

1778 

1779 """ 

1780 

1781 pandas_sql = pandasSQL_builder(con=con) 

1782 return pandas_sql._create_sql_schema(frame, name, keys=keys, dtype=dtype)