Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import functools 

2 

3from sqlalchemy import exc 

4from sqlalchemy import Integer 

5from sqlalchemy import types as sqltypes 

6from sqlalchemy.ext.compiler import compiles 

7from sqlalchemy.schema import Column 

8from sqlalchemy.schema import DDLElement 

9from sqlalchemy.sql.elements import quoted_name 

10 

11from ..util import sqla_compat 

12from ..util.sqla_compat import _columns_for_constraint # noqa 

13from ..util.sqla_compat import _find_columns # noqa 

14from ..util.sqla_compat import _fk_spec # noqa 

15from ..util.sqla_compat import _is_type_bound # noqa 

16from ..util.sqla_compat import _table_for_constraint # noqa 

17 

18 

19class AlterTable(DDLElement): 

20 

21 """Represent an ALTER TABLE statement. 

22 

23 Only the string name and optional schema name of the table 

24 is required, not a full Table object. 

25 

26 """ 

27 

28 def __init__(self, table_name, schema=None): 

29 self.table_name = table_name 

30 self.schema = schema 

31 

32 

33class RenameTable(AlterTable): 

34 def __init__(self, old_table_name, new_table_name, schema=None): 

35 super(RenameTable, self).__init__(old_table_name, schema=schema) 

36 self.new_table_name = new_table_name 

37 

38 

39class AlterColumn(AlterTable): 

40 def __init__( 

41 self, 

42 name, 

43 column_name, 

44 schema=None, 

45 existing_type=None, 

46 existing_nullable=None, 

47 existing_server_default=None, 

48 existing_comment=None, 

49 ): 

50 super(AlterColumn, self).__init__(name, schema=schema) 

51 self.column_name = column_name 

52 self.existing_type = ( 

53 sqltypes.to_instance(existing_type) 

54 if existing_type is not None 

55 else None 

56 ) 

57 self.existing_nullable = existing_nullable 

58 self.existing_server_default = existing_server_default 

59 self.existing_comment = existing_comment 

60 

61 

62class ColumnNullable(AlterColumn): 

63 def __init__(self, name, column_name, nullable, **kw): 

64 super(ColumnNullable, self).__init__(name, column_name, **kw) 

65 self.nullable = nullable 

66 

67 

68class ColumnType(AlterColumn): 

69 def __init__(self, name, column_name, type_, **kw): 

70 super(ColumnType, self).__init__(name, column_name, **kw) 

71 self.type_ = sqltypes.to_instance(type_) 

72 

73 

74class ColumnName(AlterColumn): 

75 def __init__(self, name, column_name, newname, **kw): 

76 super(ColumnName, self).__init__(name, column_name, **kw) 

77 self.newname = newname 

78 

79 

80class ColumnDefault(AlterColumn): 

81 def __init__(self, name, column_name, default, **kw): 

82 super(ColumnDefault, self).__init__(name, column_name, **kw) 

83 self.default = default 

84 

85 

86class AddColumn(AlterTable): 

87 def __init__(self, name, column, schema=None): 

88 super(AddColumn, self).__init__(name, schema=schema) 

89 self.column = column 

90 

91 

92class DropColumn(AlterTable): 

93 def __init__(self, name, column, schema=None): 

94 super(DropColumn, self).__init__(name, schema=schema) 

95 self.column = column 

96 

97 

98class ColumnComment(AlterColumn): 

99 def __init__(self, name, column_name, comment, **kw): 

100 super(ColumnComment, self).__init__(name, column_name, **kw) 

101 self.comment = comment 

102 

103 

104@compiles(RenameTable) 

105def visit_rename_table(element, compiler, **kw): 

106 return "%s RENAME TO %s" % ( 

107 alter_table(compiler, element.table_name, element.schema), 

108 format_table_name(compiler, element.new_table_name, element.schema), 

109 ) 

110 

111 

112@compiles(AddColumn) 

113def visit_add_column(element, compiler, **kw): 

114 return "%s %s" % ( 

115 alter_table(compiler, element.table_name, element.schema), 

116 add_column(compiler, element.column, **kw), 

117 ) 

118 

119 

120@compiles(DropColumn) 

121def visit_drop_column(element, compiler, **kw): 

122 return "%s %s" % ( 

123 alter_table(compiler, element.table_name, element.schema), 

124 drop_column(compiler, element.column.name, **kw), 

125 ) 

126 

127 

128@compiles(ColumnNullable) 

129def visit_column_nullable(element, compiler, **kw): 

130 return "%s %s %s" % ( 

131 alter_table(compiler, element.table_name, element.schema), 

132 alter_column(compiler, element.column_name), 

133 "DROP NOT NULL" if element.nullable else "SET NOT NULL", 

134 ) 

135 

136 

137@compiles(ColumnType) 

138def visit_column_type(element, compiler, **kw): 

139 return "%s %s %s" % ( 

140 alter_table(compiler, element.table_name, element.schema), 

141 alter_column(compiler, element.column_name), 

142 "TYPE %s" % format_type(compiler, element.type_), 

143 ) 

144 

145 

146@compiles(ColumnName) 

147def visit_column_name(element, compiler, **kw): 

148 return "%s RENAME %s TO %s" % ( 

149 alter_table(compiler, element.table_name, element.schema), 

150 format_column_name(compiler, element.column_name), 

151 format_column_name(compiler, element.newname), 

152 ) 

153 

154 

155@compiles(ColumnDefault) 

156def visit_column_default(element, compiler, **kw): 

157 if sqla_compat.has_computed and ( 

158 isinstance(element.default, sqla_compat.Computed) 

159 or isinstance(element.existing_server_default, sqla_compat.Computed) 

160 ): 

161 raise exc.CompileError( 

162 'Adding or removing a "computed" construct, e.g. GENERATED ' 

163 "ALWAYS AS, to or from an existing column is not supported." 

164 ) 

165 

166 return "%s %s %s" % ( 

167 alter_table(compiler, element.table_name, element.schema), 

168 alter_column(compiler, element.column_name), 

169 "SET DEFAULT %s" % format_server_default(compiler, element.default) 

170 if element.default is not None 

171 else "DROP DEFAULT", 

172 ) 

173 

174 

175def quote_dotted(name, quote): 

176 """quote the elements of a dotted name""" 

177 

178 if isinstance(name, quoted_name): 

179 return quote(name) 

180 result = ".".join([quote(x) for x in name.split(".")]) 

181 return result 

182 

183 

184def format_table_name(compiler, name, schema): 

185 quote = functools.partial(compiler.preparer.quote) 

186 if schema: 

187 return quote_dotted(schema, quote) + "." + quote(name) 

188 else: 

189 return quote(name) 

190 

191 

192def format_column_name(compiler, name): 

193 return compiler.preparer.quote(name) 

194 

195 

196def format_server_default(compiler, default): 

197 return compiler.get_column_default_string( 

198 Column("x", Integer, server_default=default) 

199 ) 

200 

201 

202def format_type(compiler, type_): 

203 return compiler.dialect.type_compiler.process(type_) 

204 

205 

206def alter_table(compiler, name, schema): 

207 return "ALTER TABLE %s" % format_table_name(compiler, name, schema) 

208 

209 

210def drop_column(compiler, name): 

211 return "DROP COLUMN %s" % format_column_name(compiler, name) 

212 

213 

214def alter_column(compiler, name): 

215 return "ALTER COLUMN %s" % format_column_name(compiler, name) 

216 

217 

218def add_column(compiler, column, **kw): 

219 text = "ADD COLUMN %s" % compiler.get_column_specification(column, **kw) 

220 

221 const = " ".join( 

222 compiler.process(constraint) for constraint in column.constraints 

223 ) 

224 if const: 

225 text += " " + const 

226 

227 return text