Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from alembic import util 

2from alembic.operations import ops 

3 

4 

5class Rewriter(object): 

6 """A helper object that allows easy 'rewriting' of ops streams. 

7 

8 The :class:`.Rewriter` object is intended to be passed along 

9 to the 

10 :paramref:`.EnvironmentContext.configure.process_revision_directives` 

11 parameter in an ``env.py`` script. Once constructed, any number 

12 of "rewrites" functions can be associated with it, which will be given 

13 the opportunity to modify the structure without having to have explicit 

14 knowledge of the overall structure. 

15 

16 The function is passed the :class:`.MigrationContext` object and 

17 ``revision`` tuple that are passed to the :paramref:`.Environment 

18 Context.configure.process_revision_directives` function normally, 

19 and the third argument is an individual directive of the type 

20 noted in the decorator. The function has the choice of returning 

21 a single op directive, which normally can be the directive that 

22 was actually passed, or a new directive to replace it, or a list 

23 of zero or more directives to replace it. 

24 

25 .. seealso:: 

26 

27 :ref:`autogen_rewriter` - usage example 

28 

29 .. versionadded:: 0.8 

30 

31 """ 

32 

33 _traverse = util.Dispatcher() 

34 

35 _chained = None 

36 

37 def __init__(self): 

38 self.dispatch = util.Dispatcher() 

39 

40 def chain(self, other): 

41 """Produce a "chain" of this :class:`.Rewriter` to another. 

42 

43 This allows two rewriters to operate serially on a stream, 

44 e.g.:: 

45 

46 writer1 = autogenerate.Rewriter() 

47 writer2 = autogenerate.Rewriter() 

48 

49 @writer1.rewrites(ops.AddColumnOp) 

50 def add_column_nullable(context, revision, op): 

51 op.column.nullable = True 

52 return op 

53 

54 @writer2.rewrites(ops.AddColumnOp) 

55 def add_column_idx(context, revision, op): 

56 idx_op = ops.CreateIndexOp( 

57 'ixc', op.table_name, [op.column.name]) 

58 return [ 

59 op, 

60 idx_op 

61 ] 

62 

63 writer = writer1.chain(writer2) 

64 

65 :param other: a :class:`.Rewriter` instance 

66 :return: a new :class:`.Rewriter` that will run the operations 

67 of this writer, then the "other" writer, in succession. 

68 

69 """ 

70 wr = self.__class__.__new__(self.__class__) 

71 wr.__dict__.update(self.__dict__) 

72 wr._chained = other 

73 return wr 

74 

75 def rewrites(self, operator): 

76 """Register a function as rewriter for a given type. 

77 

78 The function should receive three arguments, which are 

79 the :class:`.MigrationContext`, a ``revision`` tuple, and 

80 an op directive of the type indicated. E.g.:: 

81 

82 @writer1.rewrites(ops.AddColumnOp) 

83 def add_column_nullable(context, revision, op): 

84 op.column.nullable = True 

85 return op 

86 

87 """ 

88 return self.dispatch.dispatch_for(operator) 

89 

90 def _rewrite(self, context, revision, directive): 

91 try: 

92 _rewriter = self.dispatch.dispatch(directive) 

93 except ValueError: 

94 _rewriter = None 

95 yield directive 

96 else: 

97 if self in directive._mutations: 

98 yield directive 

99 else: 

100 for r_directive in util.to_list( 

101 _rewriter(context, revision, directive) 

102 ): 

103 r_directive._mutations = r_directive._mutations.union( 

104 [self] 

105 ) 

106 yield r_directive 

107 

108 def __call__(self, context, revision, directives): 

109 self.process_revision_directives(context, revision, directives) 

110 if self._chained: 

111 self._chained(context, revision, directives) 

112 

113 @_traverse.dispatch_for(ops.MigrationScript) 

114 def _traverse_script(self, context, revision, directive): 

115 upgrade_ops_list = [] 

116 for upgrade_ops in directive.upgrade_ops_list: 

117 ret = self._traverse_for(context, revision, upgrade_ops) 

118 if len(ret) != 1: 

119 raise ValueError( 

120 "Can only return single object for UpgradeOps traverse" 

121 ) 

122 upgrade_ops_list.append(ret[0]) 

123 directive.upgrade_ops = upgrade_ops_list 

124 

125 downgrade_ops_list = [] 

126 for downgrade_ops in directive.downgrade_ops_list: 

127 ret = self._traverse_for(context, revision, downgrade_ops) 

128 if len(ret) != 1: 

129 raise ValueError( 

130 "Can only return single object for DowngradeOps traverse" 

131 ) 

132 downgrade_ops_list.append(ret[0]) 

133 directive.downgrade_ops = downgrade_ops_list 

134 

135 @_traverse.dispatch_for(ops.OpContainer) 

136 def _traverse_op_container(self, context, revision, directive): 

137 self._traverse_list(context, revision, directive.ops) 

138 

139 @_traverse.dispatch_for(ops.MigrateOperation) 

140 def _traverse_any_directive(self, context, revision, directive): 

141 pass 

142 

143 def _traverse_for(self, context, revision, directive): 

144 directives = list(self._rewrite(context, revision, directive)) 

145 for directive in directives: 

146 traverser = self._traverse.dispatch(directive) 

147 traverser(self, context, revision, directive) 

148 return directives 

149 

150 def _traverse_list(self, context, revision, directives): 

151 dest = [] 

152 for directive in directives: 

153 dest.extend(self._traverse_for(context, revision, directive)) 

154 

155 directives[:] = dest 

156 

157 def process_revision_directives(self, context, revision, directives): 

158 self._traverse_list(context, revision, directives)