Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/alembic/autogenerate/rewriter.py : 26%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from alembic import util
2from alembic.operations import ops
5class Rewriter(object):
6 """A helper object that allows easy 'rewriting' of ops streams.
8 The :class:`.Rewriter` object is intended to be passed along
9 to the
10 :paramref:`.EnvironmentContext.configure.process_revision_directives`
11 parameter in an ``env.py`` script. Once constructed, any number
12 of "rewrites" functions can be associated with it, which will be given
13 the opportunity to modify the structure without having to have explicit
14 knowledge of the overall structure.
16 The function is passed the :class:`.MigrationContext` object and
17 ``revision`` tuple that are passed to the :paramref:`.Environment
18 Context.configure.process_revision_directives` function normally,
19 and the third argument is an individual directive of the type
20 noted in the decorator. The function has the choice of returning
21 a single op directive, which normally can be the directive that
22 was actually passed, or a new directive to replace it, or a list
23 of zero or more directives to replace it.
25 .. seealso::
27 :ref:`autogen_rewriter` - usage example
29 .. versionadded:: 0.8
31 """
33 _traverse = util.Dispatcher()
35 _chained = None
37 def __init__(self):
38 self.dispatch = util.Dispatcher()
40 def chain(self, other):
41 """Produce a "chain" of this :class:`.Rewriter` to another.
43 This allows two rewriters to operate serially on a stream,
44 e.g.::
46 writer1 = autogenerate.Rewriter()
47 writer2 = autogenerate.Rewriter()
49 @writer1.rewrites(ops.AddColumnOp)
50 def add_column_nullable(context, revision, op):
51 op.column.nullable = True
52 return op
54 @writer2.rewrites(ops.AddColumnOp)
55 def add_column_idx(context, revision, op):
56 idx_op = ops.CreateIndexOp(
57 'ixc', op.table_name, [op.column.name])
58 return [
59 op,
60 idx_op
61 ]
63 writer = writer1.chain(writer2)
65 :param other: a :class:`.Rewriter` instance
66 :return: a new :class:`.Rewriter` that will run the operations
67 of this writer, then the "other" writer, in succession.
69 """
70 wr = self.__class__.__new__(self.__class__)
71 wr.__dict__.update(self.__dict__)
72 wr._chained = other
73 return wr
75 def rewrites(self, operator):
76 """Register a function as rewriter for a given type.
78 The function should receive three arguments, which are
79 the :class:`.MigrationContext`, a ``revision`` tuple, and
80 an op directive of the type indicated. E.g.::
82 @writer1.rewrites(ops.AddColumnOp)
83 def add_column_nullable(context, revision, op):
84 op.column.nullable = True
85 return op
87 """
88 return self.dispatch.dispatch_for(operator)
90 def _rewrite(self, context, revision, directive):
91 try:
92 _rewriter = self.dispatch.dispatch(directive)
93 except ValueError:
94 _rewriter = None
95 yield directive
96 else:
97 if self in directive._mutations:
98 yield directive
99 else:
100 for r_directive in util.to_list(
101 _rewriter(context, revision, directive)
102 ):
103 r_directive._mutations = r_directive._mutations.union(
104 [self]
105 )
106 yield r_directive
108 def __call__(self, context, revision, directives):
109 self.process_revision_directives(context, revision, directives)
110 if self._chained:
111 self._chained(context, revision, directives)
113 @_traverse.dispatch_for(ops.MigrationScript)
114 def _traverse_script(self, context, revision, directive):
115 upgrade_ops_list = []
116 for upgrade_ops in directive.upgrade_ops_list:
117 ret = self._traverse_for(context, revision, upgrade_ops)
118 if len(ret) != 1:
119 raise ValueError(
120 "Can only return single object for UpgradeOps traverse"
121 )
122 upgrade_ops_list.append(ret[0])
123 directive.upgrade_ops = upgrade_ops_list
125 downgrade_ops_list = []
126 for downgrade_ops in directive.downgrade_ops_list:
127 ret = self._traverse_for(context, revision, downgrade_ops)
128 if len(ret) != 1:
129 raise ValueError(
130 "Can only return single object for DowngradeOps traverse"
131 )
132 downgrade_ops_list.append(ret[0])
133 directive.downgrade_ops = downgrade_ops_list
135 @_traverse.dispatch_for(ops.OpContainer)
136 def _traverse_op_container(self, context, revision, directive):
137 self._traverse_list(context, revision, directive.ops)
139 @_traverse.dispatch_for(ops.MigrateOperation)
140 def _traverse_any_directive(self, context, revision, directive):
141 pass
143 def _traverse_for(self, context, revision, directive):
144 directives = list(self._rewrite(context, revision, directive))
145 for directive in directives:
146 traverser = self._traverse.dispatch(directive)
147 traverser(self, context, revision, directive)
148 return directives
150 def _traverse_list(self, context, revision, directives):
151 dest = []
152 for directive in directives:
153 dest.extend(self._traverse_for(context, revision, directive))
155 directives[:] = dest
157 def process_revision_directives(self, context, revision, directives):
158 self._traverse_list(context, revision, directives)