Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from sqlalchemy import types as sqltypes 

2from sqlalchemy.ext.compiler import compiles 

3from sqlalchemy.schema import Column 

4from sqlalchemy.schema import CreateIndex 

5from sqlalchemy.sql.expression import ClauseElement 

6from sqlalchemy.sql.expression import Executable 

7 

8from .base import AddColumn 

9from .base import alter_column 

10from .base import alter_table 

11from .base import ColumnDefault 

12from .base import ColumnName 

13from .base import ColumnNullable 

14from .base import ColumnType 

15from .base import format_column_name 

16from .base import format_server_default 

17from .base import format_table_name 

18from .base import format_type 

19from .base import RenameTable 

20from .impl import DefaultImpl 

21from .. import util 

22 

23 

24class MSSQLImpl(DefaultImpl): 

25 __dialect__ = "mssql" 

26 transactional_ddl = True 

27 batch_separator = "GO" 

28 

29 type_synonyms = DefaultImpl.type_synonyms + ({"VARCHAR", "NVARCHAR"},) 

30 

31 def __init__(self, *arg, **kw): 

32 super(MSSQLImpl, self).__init__(*arg, **kw) 

33 self.batch_separator = self.context_opts.get( 

34 "mssql_batch_separator", self.batch_separator 

35 ) 

36 

37 def _exec(self, construct, *args, **kw): 

38 result = super(MSSQLImpl, self)._exec(construct, *args, **kw) 

39 if self.as_sql and self.batch_separator: 

40 self.static_output(self.batch_separator) 

41 return result 

42 

43 def emit_begin(self): 

44 self.static_output("BEGIN TRANSACTION" + self.command_terminator) 

45 

46 def emit_commit(self): 

47 super(MSSQLImpl, self).emit_commit() 

48 if self.as_sql and self.batch_separator: 

49 self.static_output(self.batch_separator) 

50 

51 def alter_column( 

52 self, 

53 table_name, 

54 column_name, 

55 nullable=None, 

56 server_default=False, 

57 name=None, 

58 type_=None, 

59 schema=None, 

60 existing_type=None, 

61 existing_server_default=None, 

62 existing_nullable=None, 

63 **kw 

64 ): 

65 

66 if nullable is not None and existing_type is None: 

67 if type_ is not None: 

68 existing_type = type_ 

69 # the NULL/NOT NULL alter will handle 

70 # the type alteration 

71 type_ = None 

72 else: 

73 raise util.CommandError( 

74 "MS-SQL ALTER COLUMN operations " 

75 "with NULL or NOT NULL require the " 

76 "existing_type or a new type_ be passed." 

77 ) 

78 

79 super(MSSQLImpl, self).alter_column( 

80 table_name, 

81 column_name, 

82 nullable=nullable, 

83 type_=type_, 

84 schema=schema, 

85 existing_type=existing_type, 

86 existing_nullable=existing_nullable, 

87 **kw 

88 ) 

89 

90 if server_default is not False: 

91 if existing_server_default is not False or server_default is None: 

92 self._exec( 

93 _ExecDropConstraint( 

94 table_name, 

95 column_name, 

96 "sys.default_constraints", 

97 schema, 

98 ) 

99 ) 

100 if server_default is not None: 

101 super(MSSQLImpl, self).alter_column( 

102 table_name, 

103 column_name, 

104 schema=schema, 

105 server_default=server_default, 

106 ) 

107 

108 if name is not None: 

109 super(MSSQLImpl, self).alter_column( 

110 table_name, column_name, schema=schema, name=name 

111 ) 

112 

113 def create_index(self, index): 

114 # this likely defaults to None if not present, so get() 

115 # should normally not return the default value. being 

116 # defensive in any case 

117 mssql_include = index.kwargs.get("mssql_include", None) or () 

118 for col in mssql_include: 

119 if col not in index.table.c: 

120 index.table.append_column(Column(col, sqltypes.NullType)) 

121 self._exec(CreateIndex(index)) 

122 

123 def bulk_insert(self, table, rows, **kw): 

124 if self.as_sql: 

125 self._exec( 

126 "SET IDENTITY_INSERT %s ON" 

127 % self.dialect.identifier_preparer.format_table(table) 

128 ) 

129 super(MSSQLImpl, self).bulk_insert(table, rows, **kw) 

130 self._exec( 

131 "SET IDENTITY_INSERT %s OFF" 

132 % self.dialect.identifier_preparer.format_table(table) 

133 ) 

134 else: 

135 super(MSSQLImpl, self).bulk_insert(table, rows, **kw) 

136 

137 def drop_column(self, table_name, column, schema=None, **kw): 

138 drop_default = kw.pop("mssql_drop_default", False) 

139 if drop_default: 

140 self._exec( 

141 _ExecDropConstraint( 

142 table_name, column, "sys.default_constraints", schema 

143 ) 

144 ) 

145 drop_check = kw.pop("mssql_drop_check", False) 

146 if drop_check: 

147 self._exec( 

148 _ExecDropConstraint( 

149 table_name, column, "sys.check_constraints", schema 

150 ) 

151 ) 

152 drop_fks = kw.pop("mssql_drop_foreign_key", False) 

153 if drop_fks: 

154 self._exec(_ExecDropFKConstraint(table_name, column, schema)) 

155 super(MSSQLImpl, self).drop_column( 

156 table_name, column, schema=schema, **kw 

157 ) 

158 

159 

160class _ExecDropConstraint(Executable, ClauseElement): 

161 def __init__(self, tname, colname, type_, schema): 

162 self.tname = tname 

163 self.colname = colname 

164 self.type_ = type_ 

165 self.schema = schema 

166 

167 

168class _ExecDropFKConstraint(Executable, ClauseElement): 

169 def __init__(self, tname, colname, schema): 

170 self.tname = tname 

171 self.colname = colname 

172 self.schema = schema 

173 

174 

175@compiles(_ExecDropConstraint, "mssql") 

176def _exec_drop_col_constraint(element, compiler, **kw): 

177 schema, tname, colname, type_ = ( 

178 element.schema, 

179 element.tname, 

180 element.colname, 

181 element.type_, 

182 ) 

183 # from http://www.mssqltips.com/sqlservertip/1425/\ 

184 # working-with-default-constraints-in-sql-server/ 

185 # TODO: needs table formatting, etc. 

186 return """declare @const_name varchar(256) 

187select @const_name = [name] from %(type)s 

188where parent_object_id = object_id('%(schema_dot)s%(tname)s') 

189and col_name(parent_object_id, parent_column_id) = '%(colname)s' 

190exec('alter table %(tname_quoted)s drop constraint ' + @const_name)""" % { 

191 "type": type_, 

192 "tname": tname, 

193 "colname": colname, 

194 "tname_quoted": format_table_name(compiler, tname, schema), 

195 "schema_dot": schema + "." if schema else "", 

196 } 

197 

198 

199@compiles(_ExecDropFKConstraint, "mssql") 

200def _exec_drop_col_fk_constraint(element, compiler, **kw): 

201 schema, tname, colname = element.schema, element.tname, element.colname 

202 

203 return """declare @const_name varchar(256) 

204select @const_name = [name] from 

205sys.foreign_keys fk join sys.foreign_key_columns fkc 

206on fk.object_id=fkc.constraint_object_id 

207where fkc.parent_object_id = object_id('%(schema_dot)s%(tname)s') 

208`and col_name(fkc.parent_object_id, fkc.parent_column_id) = '%(colname)s' 

209exec('alter table %(tname_quoted)s drop constraint ' + @const_name)""" % { 

210 "tname": tname, 

211 "colname": colname, 

212 "tname_quoted": format_table_name(compiler, tname, schema), 

213 "schema_dot": schema + "." if schema else "", 

214 } 

215 

216 

217@compiles(AddColumn, "mssql") 

218def visit_add_column(element, compiler, **kw): 

219 return "%s %s" % ( 

220 alter_table(compiler, element.table_name, element.schema), 

221 mssql_add_column(compiler, element.column, **kw), 

222 ) 

223 

224 

225def mssql_add_column(compiler, column, **kw): 

226 return "ADD %s" % compiler.get_column_specification(column, **kw) 

227 

228 

229@compiles(ColumnNullable, "mssql") 

230def visit_column_nullable(element, compiler, **kw): 

231 return "%s %s %s %s" % ( 

232 alter_table(compiler, element.table_name, element.schema), 

233 alter_column(compiler, element.column_name), 

234 format_type(compiler, element.existing_type), 

235 "NULL" if element.nullable else "NOT NULL", 

236 ) 

237 

238 

239@compiles(ColumnDefault, "mssql") 

240def visit_column_default(element, compiler, **kw): 

241 # TODO: there can also be a named constraint 

242 # with ADD CONSTRAINT here 

243 return "%s ADD DEFAULT %s FOR %s" % ( 

244 alter_table(compiler, element.table_name, element.schema), 

245 format_server_default(compiler, element.default), 

246 format_column_name(compiler, element.column_name), 

247 ) 

248 

249 

250@compiles(ColumnName, "mssql") 

251def visit_rename_column(element, compiler, **kw): 

252 return "EXEC sp_rename '%s.%s', %s, 'COLUMN'" % ( 

253 format_table_name(compiler, element.table_name, element.schema), 

254 format_column_name(compiler, element.column_name), 

255 format_column_name(compiler, element.newname), 

256 ) 

257 

258 

259@compiles(ColumnType, "mssql") 

260def visit_column_type(element, compiler, **kw): 

261 return "%s %s %s" % ( 

262 alter_table(compiler, element.table_name, element.schema), 

263 alter_column(compiler, element.column_name), 

264 format_type(compiler, element.type_), 

265 ) 

266 

267 

268@compiles(RenameTable, "mssql") 

269def visit_rename_table(element, compiler, **kw): 

270 return "EXEC sp_rename '%s', %s" % ( 

271 format_table_name(compiler, element.table_name, element.schema), 

272 format_table_name(compiler, element.new_table_name, None), 

273 )