Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/alembic/autogenerate/render.py : 15%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import re
3from mako.pygen import PythonPrinter
4from sqlalchemy import schema as sa_schema
5from sqlalchemy import sql
6from sqlalchemy import types as sqltypes
8from .. import util
9from ..operations import ops
10from ..util import compat
11from ..util import sqla_compat
12from ..util.compat import string_types
13from ..util.compat import StringIO
16MAX_PYTHON_ARGS = 255
18try:
19 from sqlalchemy.sql.naming import conv
21 def _render_gen_name(autogen_context, name):
22 if isinstance(name, conv):
23 return _f_name(_alembic_autogenerate_prefix(autogen_context), name)
24 else:
25 return name
28except ImportError:
30 def _render_gen_name(autogen_context, name):
31 return name
34def _indent(text):
35 text = re.compile(r"^", re.M).sub(" ", text).strip()
36 text = re.compile(r" +$", re.M).sub("", text)
37 return text
40def _render_python_into_templatevars(
41 autogen_context, migration_script, template_args
42):
43 imports = autogen_context.imports
45 for upgrade_ops, downgrade_ops in zip(
46 migration_script.upgrade_ops_list, migration_script.downgrade_ops_list
47 ):
48 template_args[upgrade_ops.upgrade_token] = _indent(
49 _render_cmd_body(upgrade_ops, autogen_context)
50 )
51 template_args[downgrade_ops.downgrade_token] = _indent(
52 _render_cmd_body(downgrade_ops, autogen_context)
53 )
54 template_args["imports"] = "\n".join(sorted(imports))
57default_renderers = renderers = util.Dispatcher()
60def _render_cmd_body(op_container, autogen_context):
62 buf = StringIO()
63 printer = PythonPrinter(buf)
65 printer.writeline(
66 "# ### commands auto generated by Alembic - please adjust! ###"
67 )
69 has_lines = False
70 for op in op_container.ops:
71 lines = render_op(autogen_context, op)
72 has_lines = has_lines or lines
74 for line in lines:
75 printer.writeline(line)
77 if not has_lines:
78 printer.writeline("pass")
80 printer.writeline("# ### end Alembic commands ###")
82 return buf.getvalue()
85def render_op(autogen_context, op):
86 renderer = renderers.dispatch(op)
87 lines = util.to_list(renderer(autogen_context, op))
88 return lines
91def render_op_text(autogen_context, op):
92 return "\n".join(render_op(autogen_context, op))
95@renderers.dispatch_for(ops.ModifyTableOps)
96def _render_modify_table(autogen_context, op):
97 opts = autogen_context.opts
98 render_as_batch = opts.get("render_as_batch", False)
100 if op.ops:
101 lines = []
102 if render_as_batch:
103 with autogen_context._within_batch():
104 lines.append(
105 "with op.batch_alter_table(%r, schema=%r) as batch_op:"
106 % (op.table_name, op.schema)
107 )
108 for t_op in op.ops:
109 t_lines = render_op(autogen_context, t_op)
110 lines.extend(t_lines)
111 lines.append("")
112 else:
113 for t_op in op.ops:
114 t_lines = render_op(autogen_context, t_op)
115 lines.extend(t_lines)
117 return lines
118 else:
119 return []
122@renderers.dispatch_for(ops.CreateTableCommentOp)
123def _render_create_table_comment(autogen_context, op):
125 templ = (
126 "{prefix}create_table_comment(\n"
127 "{indent}'{tname}',\n"
128 "{indent}{comment},\n"
129 "{indent}existing_comment={existing},\n"
130 "{indent}schema={schema}\n"
131 ")"
132 )
133 return templ.format(
134 prefix=_alembic_autogenerate_prefix(autogen_context),
135 tname=op.table_name,
136 comment="%r" % op.comment if op.comment is not None else None,
137 existing="%r" % op.existing_comment
138 if op.existing_comment is not None
139 else None,
140 schema="'%s'" % op.schema if op.schema is not None else None,
141 indent=" ",
142 )
145@renderers.dispatch_for(ops.DropTableCommentOp)
146def _render_drop_table_comment(autogen_context, op):
148 templ = (
149 "{prefix}drop_table_comment(\n"
150 "{indent}'{tname}',\n"
151 "{indent}existing_comment={existing},\n"
152 "{indent}schema={schema}\n"
153 ")"
154 )
155 return templ.format(
156 prefix=_alembic_autogenerate_prefix(autogen_context),
157 tname=op.table_name,
158 existing="%r" % op.existing_comment
159 if op.existing_comment is not None
160 else None,
161 schema="'%s'" % op.schema if op.schema is not None else None,
162 indent=" ",
163 )
166@renderers.dispatch_for(ops.CreateTableOp)
167def _add_table(autogen_context, op):
168 table = op.to_table()
170 args = [
171 col
172 for col in [
173 _render_column(col, autogen_context) for col in table.columns
174 ]
175 if col
176 ] + sorted(
177 [
178 rcons
179 for rcons in [
180 _render_constraint(cons, autogen_context)
181 for cons in table.constraints
182 ]
183 if rcons is not None
184 ]
185 )
187 if len(args) > MAX_PYTHON_ARGS:
188 args = "*[" + ",\n".join(args) + "]"
189 else:
190 args = ",\n".join(args)
192 text = "%(prefix)screate_table(%(tablename)r,\n%(args)s" % {
193 "tablename": _ident(op.table_name),
194 "prefix": _alembic_autogenerate_prefix(autogen_context),
195 "args": args,
196 }
197 if op.schema:
198 text += ",\nschema=%r" % _ident(op.schema)
200 comment = sqla_compat._comment_attribute(table)
201 if comment:
202 text += ",\ncomment=%r" % _ident(comment)
203 for k in sorted(op.kw):
204 text += ",\n%s=%r" % (k.replace(" ", "_"), op.kw[k])
205 text += "\n)"
206 return text
209@renderers.dispatch_for(ops.DropTableOp)
210def _drop_table(autogen_context, op):
211 text = "%(prefix)sdrop_table(%(tname)r" % {
212 "prefix": _alembic_autogenerate_prefix(autogen_context),
213 "tname": _ident(op.table_name),
214 }
215 if op.schema:
216 text += ", schema=%r" % _ident(op.schema)
217 text += ")"
218 return text
221@renderers.dispatch_for(ops.CreateIndexOp)
222def _add_index(autogen_context, op):
223 index = op.to_index()
225 has_batch = autogen_context._has_batch
227 if has_batch:
228 tmpl = (
229 "%(prefix)screate_index(%(name)r, [%(columns)s], "
230 "unique=%(unique)r%(kwargs)s)"
231 )
232 else:
233 tmpl = (
234 "%(prefix)screate_index(%(name)r, %(table)r, [%(columns)s], "
235 "unique=%(unique)r%(schema)s%(kwargs)s)"
236 )
238 text = tmpl % {
239 "prefix": _alembic_autogenerate_prefix(autogen_context),
240 "name": _render_gen_name(autogen_context, index.name),
241 "table": _ident(index.table.name),
242 "columns": ", ".join(
243 _get_index_rendered_expressions(index, autogen_context)
244 ),
245 "unique": index.unique or False,
246 "schema": (", schema=%r" % _ident(index.table.schema))
247 if index.table.schema
248 else "",
249 "kwargs": (
250 ", "
251 + ", ".join(
252 [
253 "%s=%s"
254 % (key, _render_potential_expr(val, autogen_context))
255 for key, val in index.kwargs.items()
256 ]
257 )
258 )
259 if len(index.kwargs)
260 else "",
261 }
262 return text
265@renderers.dispatch_for(ops.DropIndexOp)
266def _drop_index(autogen_context, op):
267 has_batch = autogen_context._has_batch
269 if has_batch:
270 tmpl = "%(prefix)sdrop_index(%(name)r)"
271 else:
272 tmpl = (
273 "%(prefix)sdrop_index(%(name)r, "
274 "table_name=%(table_name)r%(schema)s)"
275 )
277 text = tmpl % {
278 "prefix": _alembic_autogenerate_prefix(autogen_context),
279 "name": _render_gen_name(autogen_context, op.index_name),
280 "table_name": _ident(op.table_name),
281 "schema": ((", schema=%r" % _ident(op.schema)) if op.schema else ""),
282 }
283 return text
286@renderers.dispatch_for(ops.CreateUniqueConstraintOp)
287def _add_unique_constraint(autogen_context, op):
288 return [_uq_constraint(op.to_constraint(), autogen_context, True)]
291@renderers.dispatch_for(ops.CreateForeignKeyOp)
292def _add_fk_constraint(autogen_context, op):
294 args = [repr(_render_gen_name(autogen_context, op.constraint_name))]
295 if not autogen_context._has_batch:
296 args.append(repr(_ident(op.source_table)))
298 args.extend(
299 [
300 repr(_ident(op.referent_table)),
301 repr([_ident(col) for col in op.local_cols]),
302 repr([_ident(col) for col in op.remote_cols]),
303 ]
304 )
306 kwargs = [
307 "referent_schema",
308 "onupdate",
309 "ondelete",
310 "initially",
311 "deferrable",
312 "use_alter",
313 ]
314 if not autogen_context._has_batch:
315 kwargs.insert(0, "source_schema")
317 for k in kwargs:
318 if k in op.kw:
319 value = op.kw[k]
320 if value is not None:
321 args.append("%s=%r" % (k, value))
323 return "%(prefix)screate_foreign_key(%(args)s)" % {
324 "prefix": _alembic_autogenerate_prefix(autogen_context),
325 "args": ", ".join(args),
326 }
329@renderers.dispatch_for(ops.CreatePrimaryKeyOp)
330def _add_pk_constraint(constraint, autogen_context):
331 raise NotImplementedError()
334@renderers.dispatch_for(ops.CreateCheckConstraintOp)
335def _add_check_constraint(constraint, autogen_context):
336 raise NotImplementedError()
339@renderers.dispatch_for(ops.DropConstraintOp)
340def _drop_constraint(autogen_context, op):
342 if autogen_context._has_batch:
343 template = "%(prefix)sdrop_constraint" "(%(name)r, type_=%(type)r)"
344 else:
345 template = (
346 "%(prefix)sdrop_constraint"
347 "(%(name)r, '%(table_name)s'%(schema)s, type_=%(type)r)"
348 )
350 text = template % {
351 "prefix": _alembic_autogenerate_prefix(autogen_context),
352 "name": _render_gen_name(autogen_context, op.constraint_name),
353 "table_name": _ident(op.table_name),
354 "type": op.constraint_type,
355 "schema": (", schema=%r" % _ident(op.schema)) if op.schema else "",
356 }
357 return text
360@renderers.dispatch_for(ops.AddColumnOp)
361def _add_column(autogen_context, op):
363 schema, tname, column = op.schema, op.table_name, op.column
364 if autogen_context._has_batch:
365 template = "%(prefix)sadd_column(%(column)s)"
366 else:
367 template = "%(prefix)sadd_column(%(tname)r, %(column)s"
368 if schema:
369 template += ", schema=%(schema)r"
370 template += ")"
371 text = template % {
372 "prefix": _alembic_autogenerate_prefix(autogen_context),
373 "tname": tname,
374 "column": _render_column(column, autogen_context),
375 "schema": schema,
376 }
377 return text
380@renderers.dispatch_for(ops.DropColumnOp)
381def _drop_column(autogen_context, op):
383 schema, tname, column_name = op.schema, op.table_name, op.column_name
385 if autogen_context._has_batch:
386 template = "%(prefix)sdrop_column(%(cname)r)"
387 else:
388 template = "%(prefix)sdrop_column(%(tname)r, %(cname)r"
389 if schema:
390 template += ", schema=%(schema)r"
391 template += ")"
393 text = template % {
394 "prefix": _alembic_autogenerate_prefix(autogen_context),
395 "tname": _ident(tname),
396 "cname": _ident(column_name),
397 "schema": _ident(schema),
398 }
399 return text
402@renderers.dispatch_for(ops.AlterColumnOp)
403def _alter_column(autogen_context, op):
405 tname = op.table_name
406 cname = op.column_name
407 server_default = op.modify_server_default
408 type_ = op.modify_type
409 nullable = op.modify_nullable
410 comment = op.modify_comment
411 autoincrement = op.kw.get("autoincrement", None)
412 existing_type = op.existing_type
413 existing_nullable = op.existing_nullable
414 existing_comment = op.existing_comment
415 existing_server_default = op.existing_server_default
416 schema = op.schema
418 indent = " " * 11
420 if autogen_context._has_batch:
421 template = "%(prefix)salter_column(%(cname)r"
422 else:
423 template = "%(prefix)salter_column(%(tname)r, %(cname)r"
425 text = template % {
426 "prefix": _alembic_autogenerate_prefix(autogen_context),
427 "tname": tname,
428 "cname": cname,
429 }
430 if existing_type is not None:
431 text += ",\n%sexisting_type=%s" % (
432 indent,
433 _repr_type(existing_type, autogen_context),
434 )
435 if server_default is not False:
436 rendered = _render_server_default(server_default, autogen_context)
437 text += ",\n%sserver_default=%s" % (indent, rendered)
439 if type_ is not None:
440 text += ",\n%stype_=%s" % (indent, _repr_type(type_, autogen_context))
441 if nullable is not None:
442 text += ",\n%snullable=%r" % (indent, nullable)
443 if comment is not False:
444 text += ",\n%scomment=%r" % (indent, comment)
445 if existing_comment is not None:
446 text += ",\n%sexisting_comment=%r" % (indent, existing_comment)
447 if nullable is None and existing_nullable is not None:
448 text += ",\n%sexisting_nullable=%r" % (indent, existing_nullable)
449 if autoincrement is not None:
450 text += ",\n%sautoincrement=%r" % (indent, autoincrement)
451 if server_default is False and existing_server_default:
452 rendered = _render_server_default(
453 existing_server_default, autogen_context
454 )
455 text += ",\n%sexisting_server_default=%s" % (indent, rendered)
456 if schema and not autogen_context._has_batch:
457 text += ",\n%sschema=%r" % (indent, schema)
458 text += ")"
459 return text
462class _f_name(object):
463 def __init__(self, prefix, name):
464 self.prefix = prefix
465 self.name = name
467 def __repr__(self):
468 return "%sf(%r)" % (self.prefix, _ident(self.name))
471def _ident(name):
472 """produce a __repr__() object for a string identifier that may
473 use quoted_name() in SQLAlchemy 0.9 and greater.
475 The issue worked around here is that quoted_name() doesn't have
476 very good repr() behavior by itself when unicode is involved.
478 """
479 if name is None:
480 return name
481 elif isinstance(name, sql.elements.quoted_name):
482 if compat.py2k:
483 # the attempt to encode to ascii here isn't super ideal,
484 # however we are trying to cut down on an explosion of
485 # u'' literals only when py2k + SQLA 0.9, in particular
486 # makes unit tests testing code generation very difficult
487 try:
488 return name.encode("ascii")
489 except UnicodeError:
490 return compat.text_type(name)
491 else:
492 return compat.text_type(name)
493 elif isinstance(name, compat.string_types):
494 return name
497def _render_potential_expr(
498 value, autogen_context, wrap_in_text=True, is_server_default=False
499):
500 if isinstance(value, sql.ClauseElement):
502 if wrap_in_text:
503 template = "%(prefix)stext(%(sql)r)"
504 else:
505 template = "%(sql)r"
507 return template % {
508 "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
509 "sql": autogen_context.migration_context.impl.render_ddl_sql_expr(
510 value, is_server_default=is_server_default
511 ),
512 }
514 else:
515 return repr(value)
518def _get_index_rendered_expressions(idx, autogen_context):
519 return [
520 repr(_ident(getattr(exp, "name", None)))
521 if isinstance(exp, sa_schema.Column)
522 else _render_potential_expr(exp, autogen_context)
523 for exp in idx.expressions
524 ]
527def _uq_constraint(constraint, autogen_context, alter):
528 opts = []
530 has_batch = autogen_context._has_batch
532 if constraint.deferrable:
533 opts.append(("deferrable", str(constraint.deferrable)))
534 if constraint.initially:
535 opts.append(("initially", str(constraint.initially)))
536 if not has_batch and alter and constraint.table.schema:
537 opts.append(("schema", _ident(constraint.table.schema)))
538 if not alter and constraint.name:
539 opts.append(
540 ("name", _render_gen_name(autogen_context, constraint.name))
541 )
543 if alter:
544 args = [repr(_render_gen_name(autogen_context, constraint.name))]
545 if not has_batch:
546 args += [repr(_ident(constraint.table.name))]
547 args.append(repr([_ident(col.name) for col in constraint.columns]))
548 args.extend(["%s=%r" % (k, v) for k, v in opts])
549 return "%(prefix)screate_unique_constraint(%(args)s)" % {
550 "prefix": _alembic_autogenerate_prefix(autogen_context),
551 "args": ", ".join(args),
552 }
553 else:
554 args = [repr(_ident(col.name)) for col in constraint.columns]
555 args.extend(["%s=%r" % (k, v) for k, v in opts])
556 return "%(prefix)sUniqueConstraint(%(args)s)" % {
557 "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
558 "args": ", ".join(args),
559 }
562def _user_autogenerate_prefix(autogen_context, target):
563 prefix = autogen_context.opts["user_module_prefix"]
564 if prefix is None:
565 return "%s." % target.__module__
566 else:
567 return prefix
570def _sqlalchemy_autogenerate_prefix(autogen_context):
571 return autogen_context.opts["sqlalchemy_module_prefix"] or ""
574def _alembic_autogenerate_prefix(autogen_context):
575 if autogen_context._has_batch:
576 return "batch_op."
577 else:
578 return autogen_context.opts["alembic_module_prefix"] or ""
581def _user_defined_render(type_, object_, autogen_context):
582 if "render_item" in autogen_context.opts:
583 render = autogen_context.opts["render_item"]
584 if render:
585 rendered = render(type_, object_, autogen_context)
586 if rendered is not False:
587 return rendered
588 return False
591def _render_column(column, autogen_context):
592 rendered = _user_defined_render("column", column, autogen_context)
593 if rendered is not False:
594 return rendered
596 args = []
597 opts = []
599 if column.server_default:
600 if sqla_compat._server_default_is_computed(column):
601 rendered = _render_computed(column.computed, autogen_context)
602 if rendered:
603 args.append(rendered)
604 else:
605 rendered = _render_server_default(
606 column.server_default, autogen_context
607 )
608 if rendered:
609 opts.append(("server_default", rendered))
611 if (
612 column.autoincrement is not None
613 and column.autoincrement != sqla_compat.AUTOINCREMENT_DEFAULT
614 ):
615 opts.append(("autoincrement", column.autoincrement))
617 if column.nullable is not None:
618 opts.append(("nullable", column.nullable))
620 if column.system:
621 opts.append(("system", column.system))
623 comment = sqla_compat._comment_attribute(column)
624 if comment:
625 opts.append(("comment", "%r" % comment))
627 # TODO: for non-ascii colname, assign a "key"
628 return "%(prefix)sColumn(%(name)r, %(type)s, %(args)s%(kwargs)s)" % {
629 "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
630 "name": _ident(column.name),
631 "type": _repr_type(column.type, autogen_context),
632 "args": ", ".join([str(arg) for arg in args]) + ", " if args else "",
633 "kwargs": (
634 ", ".join(
635 ["%s=%s" % (kwname, val) for kwname, val in opts]
636 + [
637 "%s=%s"
638 % (key, _render_potential_expr(val, autogen_context))
639 for key, val in sqla_compat._column_kwargs(column).items()
640 ]
641 )
642 ),
643 }
646def _render_server_default(default, autogen_context, repr_=True):
647 rendered = _user_defined_render("server_default", default, autogen_context)
648 if rendered is not False:
649 return rendered
651 if sqla_compat.has_computed and isinstance(default, sa_schema.Computed):
652 return _render_computed(default, autogen_context)
653 elif isinstance(default, sa_schema.DefaultClause):
654 if isinstance(default.arg, compat.string_types):
655 default = default.arg
656 else:
657 return _render_potential_expr(
658 default.arg, autogen_context, is_server_default=True
659 )
661 if isinstance(default, string_types) and repr_:
662 default = repr(re.sub(r"^'|'$", "", default))
664 return default
667def _render_computed(computed, autogen_context):
668 text = _render_potential_expr(
669 computed.sqltext, autogen_context, wrap_in_text=False
670 )
672 kwargs = {}
673 if computed.persisted is not None:
674 kwargs["persisted"] = computed.persisted
675 return "%(prefix)sComputed(%(text)s, %(kwargs)s)" % {
676 "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
677 "text": text,
678 "kwargs": (", ".join("%s=%s" % pair for pair in kwargs.items())),
679 }
682def _repr_type(type_, autogen_context):
683 rendered = _user_defined_render("type", type_, autogen_context)
684 if rendered is not False:
685 return rendered
687 if hasattr(autogen_context.migration_context, "impl"):
688 impl_rt = autogen_context.migration_context.impl.render_type(
689 type_, autogen_context
690 )
691 else:
692 impl_rt = None
694 mod = type(type_).__module__
695 imports = autogen_context.imports
696 if mod.startswith("sqlalchemy.dialects"):
697 dname = re.match(r"sqlalchemy\.dialects\.(\w+)", mod).group(1)
698 if imports is not None:
699 imports.add("from sqlalchemy.dialects import %s" % dname)
700 if impl_rt:
701 return impl_rt
702 else:
703 return "%s.%r" % (dname, type_)
704 elif impl_rt:
705 return impl_rt
706 elif mod.startswith("sqlalchemy."):
707 if type(type_) is sqltypes.Variant:
708 return _render_Variant_type(type_, autogen_context)
709 if "_render_%s_type" % type_.__visit_name__ in globals():
710 fn = globals()["_render_%s_type" % type_.__visit_name__]
711 return fn(type_, autogen_context)
712 else:
713 prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
714 return "%s%r" % (prefix, type_)
715 else:
716 prefix = _user_autogenerate_prefix(autogen_context, type_)
717 return "%s%r" % (prefix, type_)
720def _render_ARRAY_type(type_, autogen_context):
721 return _render_type_w_subtype(
722 type_, autogen_context, "item_type", r"(.+?\()"
723 )
726def _render_Variant_type(type_, autogen_context):
727 base = _repr_type(type_.impl, autogen_context)
728 for dialect in sorted(type_.mapping):
729 typ = type_.mapping[dialect]
730 base += ".with_variant(%s, %r)" % (
731 _repr_type(typ, autogen_context),
732 dialect,
733 )
734 return base
737def _render_type_w_subtype(
738 type_, autogen_context, attrname, regexp, prefix=None
739):
740 outer_repr = repr(type_)
741 inner_type = getattr(type_, attrname, None)
742 if inner_type is None:
743 return False
745 inner_repr = repr(inner_type)
747 inner_repr = re.sub(r"([\(\)])", r"\\\1", inner_repr)
748 sub_type = _repr_type(getattr(type_, attrname), autogen_context)
749 outer_type = re.sub(regexp + inner_repr, r"\1%s" % sub_type, outer_repr)
751 if prefix:
752 return "%s%s" % (prefix, outer_type)
754 mod = type(type_).__module__
755 if mod.startswith("sqlalchemy.dialects"):
756 dname = re.match(r"sqlalchemy\.dialects\.(\w+)", mod).group(1)
757 return "%s.%s" % (dname, outer_type)
758 elif mod.startswith("sqlalchemy"):
759 prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
760 return "%s%s" % (prefix, outer_type)
761 else:
762 return None
765_constraint_renderers = util.Dispatcher()
768def _render_constraint(constraint, autogen_context):
769 try:
770 renderer = _constraint_renderers.dispatch(constraint)
771 except ValueError:
772 util.warn("No renderer is established for object %r" % constraint)
773 return "[Unknown Python object %r]" % constraint
774 else:
775 return renderer(constraint, autogen_context)
778@_constraint_renderers.dispatch_for(sa_schema.PrimaryKeyConstraint)
779def _render_primary_key(constraint, autogen_context):
780 rendered = _user_defined_render("primary_key", constraint, autogen_context)
781 if rendered is not False:
782 return rendered
784 if not constraint.columns:
785 return None
787 opts = []
788 if constraint.name:
789 opts.append(
790 ("name", repr(_render_gen_name(autogen_context, constraint.name)))
791 )
792 return "%(prefix)sPrimaryKeyConstraint(%(args)s)" % {
793 "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
794 "args": ", ".join(
795 [repr(c.name) for c in constraint.columns]
796 + ["%s=%s" % (kwname, val) for kwname, val in opts]
797 ),
798 }
801def _fk_colspec(fk, metadata_schema):
802 """Implement a 'safe' version of ForeignKey._get_colspec() that
803 won't fail if the remote table can't be resolved.
805 """
806 colspec = fk._get_colspec()
807 tokens = colspec.split(".")
808 tname, colname = tokens[-2:]
810 if metadata_schema is not None and len(tokens) == 2:
811 table_fullname = "%s.%s" % (metadata_schema, tname)
812 else:
813 table_fullname = ".".join(tokens[0:-1])
815 if (
816 not fk.link_to_name
817 and fk.parent is not None
818 and fk.parent.table is not None
819 ):
820 # try to resolve the remote table in order to adjust for column.key.
821 # the FK constraint needs to be rendered in terms of the column
822 # name.
823 parent_metadata = fk.parent.table.metadata
824 if table_fullname in parent_metadata.tables:
825 col = parent_metadata.tables[table_fullname].c.get(colname)
826 if col is not None:
827 colname = _ident(col.name)
829 colspec = "%s.%s" % (table_fullname, colname)
831 return colspec
834def _populate_render_fk_opts(constraint, opts):
836 if constraint.onupdate:
837 opts.append(("onupdate", repr(constraint.onupdate)))
838 if constraint.ondelete:
839 opts.append(("ondelete", repr(constraint.ondelete)))
840 if constraint.initially:
841 opts.append(("initially", repr(constraint.initially)))
842 if constraint.deferrable:
843 opts.append(("deferrable", repr(constraint.deferrable)))
844 if constraint.use_alter:
845 opts.append(("use_alter", repr(constraint.use_alter)))
848@_constraint_renderers.dispatch_for(sa_schema.ForeignKeyConstraint)
849def _render_foreign_key(constraint, autogen_context):
850 rendered = _user_defined_render("foreign_key", constraint, autogen_context)
851 if rendered is not False:
852 return rendered
854 opts = []
855 if constraint.name:
856 opts.append(
857 ("name", repr(_render_gen_name(autogen_context, constraint.name)))
858 )
860 _populate_render_fk_opts(constraint, opts)
862 apply_metadata_schema = constraint.parent.metadata.schema
863 return (
864 "%(prefix)sForeignKeyConstraint([%(cols)s], "
865 "[%(refcols)s], %(args)s)"
866 % {
867 "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
868 "cols": ", ".join(
869 "%r" % _ident(f.parent.name) for f in constraint.elements
870 ),
871 "refcols": ", ".join(
872 repr(_fk_colspec(f, apply_metadata_schema))
873 for f in constraint.elements
874 ),
875 "args": ", ".join(
876 ["%s=%s" % (kwname, val) for kwname, val in opts]
877 ),
878 }
879 )
882@_constraint_renderers.dispatch_for(sa_schema.UniqueConstraint)
883def _render_unique_constraint(constraint, autogen_context):
884 rendered = _user_defined_render("unique", constraint, autogen_context)
885 if rendered is not False:
886 return rendered
888 return _uq_constraint(constraint, autogen_context, False)
891@_constraint_renderers.dispatch_for(sa_schema.CheckConstraint)
892def _render_check_constraint(constraint, autogen_context):
893 rendered = _user_defined_render("check", constraint, autogen_context)
894 if rendered is not False:
895 return rendered
897 # detect the constraint being part of
898 # a parent type which is probably in the Table already.
899 # ideally SQLAlchemy would give us more of a first class
900 # way to detect this.
901 if (
902 constraint._create_rule
903 and hasattr(constraint._create_rule, "target")
904 and isinstance(constraint._create_rule.target, sqltypes.TypeEngine)
905 ):
906 return None
907 opts = []
908 if constraint.name:
909 opts.append(
910 ("name", repr(_render_gen_name(autogen_context, constraint.name)))
911 )
912 return "%(prefix)sCheckConstraint(%(sqltext)s%(opts)s)" % {
913 "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
914 "opts": ", " + (", ".join("%s=%s" % (k, v) for k, v in opts))
915 if opts
916 else "",
917 "sqltext": _render_potential_expr(
918 constraint.sqltext, autogen_context, wrap_in_text=False
919 ),
920 }
923@renderers.dispatch_for(ops.ExecuteSQLOp)
924def _execute_sql(autogen_context, op):
925 if not isinstance(op.sqltext, string_types):
926 raise NotImplementedError(
927 "Autogenerate rendering of SQL Expression language constructs "
928 "not supported here; please use a plain SQL string"
929 )
930 return "op.execute(%r)" % op.sqltext
933renderers = default_renderers.branch()