Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2# cardinal_pythonlib/sqlalchemy/orm_query.py 

3 

4""" 

5=============================================================================== 

6 

7 Original code copyright (C) 2009-2021 Rudolf Cardinal (rudolf@pobox.com). 

8 

9 This file is part of cardinal_pythonlib. 

10 

11 Licensed under the Apache License, Version 2.0 (the "License"); 

12 you may not use this file except in compliance with the License. 

13 You may obtain a copy of the License at 

14 

15 https://www.apache.org/licenses/LICENSE-2.0 

16 

17 Unless required by applicable law or agreed to in writing, software 

18 distributed under the License is distributed on an "AS IS" BASIS, 

19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

20 See the License for the specific language governing permissions and 

21 limitations under the License. 

22 

23=============================================================================== 

24 

25**Functions to perform and manipulate SQLAlchemy ORM queries.** 

26 

27""" 

28 

29from typing import Any, Dict, Sequence, Tuple, Union 

30 

31from sqlalchemy.engine.base import Connection, Engine 

32from sqlalchemy.engine.result import ResultProxy 

33from sqlalchemy.ext.declarative.api import DeclarativeMeta 

34from sqlalchemy.orm.query import Query 

35from sqlalchemy.orm.session import Session 

36from sqlalchemy.sql.expression import ClauseElement, literal 

37from sqlalchemy.sql import func 

38from sqlalchemy.sql.selectable import Exists 

39 

40from cardinal_pythonlib.logs import get_brace_style_log_with_null_handler 

41from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName 

42 

43log = get_brace_style_log_with_null_handler(__name__) 

44 

45 

46# ============================================================================= 

47# Get query result with fieldnames 

48# ============================================================================= 

49 

50def get_rows_fieldnames_from_query( 

51 session: Union[Session, Engine, Connection], 

52 query: Query) -> Tuple[Sequence[Sequence[Any]], Sequence[str]]: 

53 """ 

54 Returns results and column names from a query. 

55 

56 Args: 

57 session: SQLAlchemy :class:`Session`, :class:`Engine`, or 

58 :class:`Connection` object 

59 query: SQLAlchemy :class:`Query` 

60 

61 Returns: 

62 ``(rows, fieldnames)`` where ``rows`` is the usual set of results and 

63 ``fieldnames`` are the name of the result columns/fields. 

64 

65 """ 

66 # https://stackoverflow.com/questions/6455560/how-to-get-column-names-from-sqlalchemy-result-declarative-syntax # noqa 

67 # No! Returns e.g. "User" for session.Query(User)... 

68 # fieldnames = [cd['name'] for cd in query.column_descriptions] 

69 result = session.execute(query) # type: ResultProxy 

70 fieldnames = result.keys() 

71 # ... yes! Comes out as "_table_field", which is how SQLAlchemy SELECTs 

72 # things. 

73 rows = result.fetchall() 

74 return rows, fieldnames 

75 

76 

77# ============================================================================= 

78# EXISTS (SQLAlchemy ORM) 

79# ============================================================================= 

80 

81def bool_from_exists_clause(session: Session, 

82 exists_clause: Exists) -> bool: 

83 """ 

84 Database dialects are not consistent in how ``EXISTS`` clauses can be 

85 converted to a boolean answer. This function manages the inconsistencies. 

86 

87 See: 

88  

89 - https://bitbucket.org/zzzeek/sqlalchemy/issues/3212/misleading-documentation-for-queryexists 

90 - https://docs.sqlalchemy.org/en/latest/orm/query.html#sqlalchemy.orm.query.Query.exists 

91  

92 Specifically, we want this: 

93  

94 *SQL Server* 

95  

96 .. code-block:: sql 

97  

98 SELECT 1 WHERE EXISTS (SELECT 1 FROM table WHERE ...) 

99 -- ... giving 1 or None (no rows) 

100 -- ... fine for SQL Server, but invalid for MySQL (no FROM clause) 

101  

102 *Others, including MySQL* 

103  

104 .. code-block:: sql 

105  

106 SELECT EXISTS (SELECT 1 FROM table WHERE ...) 

107 -- ... giving 1 or 0 

108 -- ... fine for MySQL, but invalid syntax for SQL Server 

109  

110 """ # noqa 

111 if session.get_bind().dialect.name == SqlaDialectName.MSSQL: 

112 # SQL Server 

113 result = session.query(literal(True)).filter(exists_clause).scalar() 

114 else: 

115 # MySQL, etc. 

116 result = session.query(exists_clause).scalar() 

117 return bool(result) 

118 

119 

120def exists_orm(session: Session, 

121 ormclass: DeclarativeMeta, 

122 *criteria: Any) -> bool: 

123 """ 

124 Detects whether a database record exists for the specified ``ormclass`` 

125 and ``criteria``. 

126 

127 Example usage: 

128 

129 .. code-block:: python 

130 

131 bool_exists = exists_orm(session, MyClass, MyClass.myfield == value) 

132 """ 

133 # http://docs.sqlalchemy.org/en/latest/orm/query.html 

134 q = session.query(ormclass) 

135 for criterion in criteria: 

136 q = q.filter(criterion) 

137 exists_clause = q.exists() 

138 return bool_from_exists_clause(session=session, 

139 exists_clause=exists_clause) 

140 

141 

142# ============================================================================= 

143# Get or create (SQLAlchemy ORM) 

144# ============================================================================= 

145 

146def get_or_create(session: Session, 

147 model: DeclarativeMeta, 

148 defaults: Dict[str, Any] = None, 

149 **kwargs: Any) -> Tuple[Any, bool]: 

150 """ 

151 Fetches an ORM object from the database, or creates one if none existed. 

152 

153 Args: 

154 session: an SQLAlchemy :class:`Session` 

155 model: an SQLAlchemy ORM class 

156 defaults: default initialization arguments (in addition to relevant 

157 filter criteria) if we have to create a new instance 

158 kwargs: optional filter criteria 

159 

160 Returns: 

161 a tuple ``(instance, newly_created)`` 

162 

163 See https://stackoverflow.com/questions/2546207 (this function is a 

164 composite of several suggestions). 

165 """ 

166 instance = session.query(model).filter_by(**kwargs).first() 

167 if instance: 

168 return instance, False 

169 else: 

170 params = dict((k, v) for k, v in kwargs.items() 

171 if not isinstance(v, ClauseElement)) 

172 params.update(defaults or {}) 

173 instance = model(**params) 

174 session.add(instance) 

175 return instance, True 

176 

177 

178# ============================================================================= 

179# Extend Query to provide an optimized COUNT(*) 

180# ============================================================================= 

181 

182# noinspection PyAbstractClass 

183class CountStarSpecializedQuery(Query): 

184 def __init__(self, *args, **kwargs) -> None: 

185 """ 

186 Optimizes ``COUNT(*)`` queries. 

187 

188 See 

189 https://stackoverflow.com/questions/12941416/how-to-count-rows-with-select-count-with-sqlalchemy 

190 

191 Example use: 

192 

193 .. code-block:: python 

194 

195 q = CountStarSpecializedQuery([cls], session=dbsession)\ 

196 .filter(cls.username == username) 

197 return q.count_star() 

198 

199 """ # noqa 

200 super().__init__(*args, **kwargs) 

201 

202 def count_star(self) -> int: 

203 """ 

204 Implements the ``COUNT(*)`` specialization. 

205 """ 

206 count_query = (self.statement.with_only_columns([func.count()]) 

207 .order_by(None)) 

208 return self.session.execute(count_query).scalar()