Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/alembic/script/base.py : 18%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from contextlib import contextmanager
2import datetime
3import os
4import re
5import shutil
7from dateutil import tz
9from . import revision
10from . import write_hooks
11from .. import util
12from ..runtime import migration
13from ..util import compat
15_sourceless_rev_file = re.compile(r"(?!\.\#|__init__)(.*\.py)(c|o)?$")
16_only_source_rev_file = re.compile(r"(?!\.\#|__init__)(.*\.py)$")
17_legacy_rev = re.compile(r"([a-f0-9]+)\.py$")
18_mod_def_re = re.compile(r"(upgrade|downgrade)_([a-z0-9]+)")
19_slug_re = re.compile(r"\w+")
20_default_file_template = "%(rev)s_%(slug)s"
21_split_on_space_comma = re.compile(r", *|(?: +)")
24class ScriptDirectory(object):
26 """Provides operations upon an Alembic script directory.
28 This object is useful to get information as to current revisions,
29 most notably being able to get at the "head" revision, for schemes
30 that want to test if the current revision in the database is the most
31 recent::
33 from alembic.script import ScriptDirectory
34 from alembic.config import Config
35 config = Config()
36 config.set_main_option("script_location", "myapp:migrations")
37 script = ScriptDirectory.from_config(config)
39 head_revision = script.get_current_head()
43 """
45 def __init__(
46 self,
47 dir, # noqa
48 file_template=_default_file_template,
49 truncate_slug_length=40,
50 version_locations=None,
51 sourceless=False,
52 output_encoding="utf-8",
53 timezone=None,
54 hook_config=None,
55 ):
56 self.dir = dir
57 self.file_template = file_template
58 self.version_locations = version_locations
59 self.truncate_slug_length = truncate_slug_length or 40
60 self.sourceless = sourceless
61 self.output_encoding = output_encoding
62 self.revision_map = revision.RevisionMap(self._load_revisions)
63 self.timezone = timezone
64 self.hook_config = hook_config
66 if not os.access(dir, os.F_OK):
67 raise util.CommandError(
68 "Path doesn't exist: %r. Please use "
69 "the 'init' command to create a new "
70 "scripts folder." % os.path.abspath(dir)
71 )
73 @property
74 def versions(self):
75 loc = self._version_locations
76 if len(loc) > 1:
77 raise util.CommandError("Multiple version_locations present")
78 else:
79 return loc[0]
81 @util.memoized_property
82 def _version_locations(self):
83 if self.version_locations:
84 return [
85 os.path.abspath(util.coerce_resource_to_filename(location))
86 for location in self.version_locations
87 ]
88 else:
89 return (os.path.abspath(os.path.join(self.dir, "versions")),)
91 def _load_revisions(self):
92 if self.version_locations:
93 paths = [
94 vers
95 for vers in self._version_locations
96 if os.path.exists(vers)
97 ]
98 else:
99 paths = [self.versions]
101 dupes = set()
102 for vers in paths:
103 for file_ in Script._list_py_dir(self, vers):
104 path = os.path.realpath(os.path.join(vers, file_))
105 if path in dupes:
106 util.warn(
107 "File %s loaded twice! ignoring. Please ensure "
108 "version_locations is unique." % path
109 )
110 continue
111 dupes.add(path)
112 script = Script._from_filename(self, vers, file_)
113 if script is None:
114 continue
115 yield script
117 @classmethod
118 def from_config(cls, config):
119 """Produce a new :class:`.ScriptDirectory` given a :class:`.Config`
120 instance.
122 The :class:`.Config` need only have the ``script_location`` key
123 present.
125 """
126 script_location = config.get_main_option("script_location")
127 if script_location is None:
128 raise util.CommandError(
129 "No 'script_location' key " "found in configuration."
130 )
131 truncate_slug_length = config.get_main_option("truncate_slug_length")
132 if truncate_slug_length is not None:
133 truncate_slug_length = int(truncate_slug_length)
135 version_locations = config.get_main_option("version_locations")
136 if version_locations:
137 version_locations = _split_on_space_comma.split(version_locations)
139 return ScriptDirectory(
140 util.coerce_resource_to_filename(script_location),
141 file_template=config.get_main_option(
142 "file_template", _default_file_template
143 ),
144 truncate_slug_length=truncate_slug_length,
145 sourceless=config.get_main_option("sourceless") == "true",
146 output_encoding=config.get_main_option("output_encoding", "utf-8"),
147 version_locations=version_locations,
148 timezone=config.get_main_option("timezone"),
149 hook_config=config.get_section("post_write_hooks", {}),
150 )
152 @contextmanager
153 def _catch_revision_errors(
154 self,
155 ancestor=None,
156 multiple_heads=None,
157 start=None,
158 end=None,
159 resolution=None,
160 ):
161 try:
162 yield
163 except revision.RangeNotAncestorError as rna:
164 if start is None:
165 start = rna.lower
166 if end is None:
167 end = rna.upper
168 if not ancestor:
169 ancestor = (
170 "Requested range %(start)s:%(end)s does not refer to "
171 "ancestor/descendant revisions along the same branch"
172 )
173 ancestor = ancestor % {"start": start, "end": end}
174 compat.raise_from_cause(util.CommandError(ancestor))
175 except revision.MultipleHeads as mh:
176 if not multiple_heads:
177 multiple_heads = (
178 "Multiple head revisions are present for given "
179 "argument '%(head_arg)s'; please "
180 "specify a specific target revision, "
181 "'<branchname>@%(head_arg)s' to "
182 "narrow to a specific head, or 'heads' for all heads"
183 )
184 multiple_heads = multiple_heads % {
185 "head_arg": end or mh.argument,
186 "heads": util.format_as_comma(mh.heads),
187 }
188 compat.raise_from_cause(util.CommandError(multiple_heads))
189 except revision.ResolutionError as re:
190 if resolution is None:
191 resolution = "Can't locate revision identified by '%s'" % (
192 re.argument
193 )
194 compat.raise_from_cause(util.CommandError(resolution))
195 except revision.RevisionError as err:
196 compat.raise_from_cause(util.CommandError(err.args[0]))
198 def walk_revisions(self, base="base", head="heads"):
199 """Iterate through all revisions.
201 :param base: the base revision, or "base" to start from the
202 empty revision.
204 :param head: the head revision; defaults to "heads" to indicate
205 all head revisions. May also be "head" to indicate a single
206 head revision.
208 .. versionchanged:: 0.7.0 the "head" identifier now refers to
209 the head of a non-branched repository only; use "heads" to
210 refer to the set of all head branches simultaneously.
212 """
213 with self._catch_revision_errors(start=base, end=head):
214 for rev in self.revision_map.iterate_revisions(
215 head, base, inclusive=True, assert_relative_length=False
216 ):
217 yield rev
219 def get_revisions(self, id_):
220 """Return the :class:`.Script` instance with the given rev identifier,
221 symbolic name, or sequence of identifiers.
223 .. versionadded:: 0.7.0
225 """
226 with self._catch_revision_errors():
227 return self.revision_map.get_revisions(id_)
229 def get_all_current(self, id_):
230 with self._catch_revision_errors():
231 top_revs = set(self.revision_map.get_revisions(id_))
232 top_revs.update(
233 self.revision_map._get_ancestor_nodes(
234 list(top_revs), include_dependencies=True
235 )
236 )
237 top_revs = self.revision_map._filter_into_branch_heads(top_revs)
238 return top_revs
240 def get_revision(self, id_):
241 """Return the :class:`.Script` instance with the given rev id.
243 .. seealso::
245 :meth:`.ScriptDirectory.get_revisions`
247 """
249 with self._catch_revision_errors():
250 return self.revision_map.get_revision(id_)
252 def as_revision_number(self, id_):
253 """Convert a symbolic revision, i.e. 'head' or 'base', into
254 an actual revision number."""
256 with self._catch_revision_errors():
257 rev, branch_name = self.revision_map._resolve_revision_number(id_)
259 if not rev:
260 # convert () to None
261 return None
262 elif id_ == "heads":
263 return rev
264 else:
265 return rev[0]
267 def iterate_revisions(self, upper, lower):
268 """Iterate through script revisions, starting at the given
269 upper revision identifier and ending at the lower.
271 The traversal uses strictly the `down_revision`
272 marker inside each migration script, so
273 it is a requirement that upper >= lower,
274 else you'll get nothing back.
276 The iterator yields :class:`.Script` objects.
278 .. seealso::
280 :meth:`.RevisionMap.iterate_revisions`
282 """
283 return self.revision_map.iterate_revisions(upper, lower)
285 def get_current_head(self):
286 """Return the current head revision.
288 If the script directory has multiple heads
289 due to branching, an error is raised;
290 :meth:`.ScriptDirectory.get_heads` should be
291 preferred.
293 :return: a string revision number.
295 .. seealso::
297 :meth:`.ScriptDirectory.get_heads`
299 """
300 with self._catch_revision_errors(
301 multiple_heads=(
302 "The script directory has multiple heads (due to branching)."
303 "Please use get_heads(), or merge the branches using "
304 "alembic merge."
305 )
306 ):
307 return self.revision_map.get_current_head()
309 def get_heads(self):
310 """Return all "versioned head" revisions as strings.
312 This is normally a list of length one,
313 unless branches are present. The
314 :meth:`.ScriptDirectory.get_current_head()` method
315 can be used normally when a script directory
316 has only one head.
318 :return: a tuple of string revision numbers.
319 """
320 return list(self.revision_map.heads)
322 def get_base(self):
323 """Return the "base" revision as a string.
325 This is the revision number of the script that
326 has a ``down_revision`` of None.
328 If the script directory has multiple bases, an error is raised;
329 :meth:`.ScriptDirectory.get_bases` should be
330 preferred.
332 """
333 bases = self.get_bases()
334 if len(bases) > 1:
335 raise util.CommandError(
336 "The script directory has multiple bases. "
337 "Please use get_bases()."
338 )
339 elif bases:
340 return bases[0]
341 else:
342 return None
344 def get_bases(self):
345 """return all "base" revisions as strings.
347 This is the revision number of all scripts that
348 have a ``down_revision`` of None.
350 .. versionadded:: 0.7.0
352 """
353 return list(self.revision_map.bases)
355 def _upgrade_revs(self, destination, current_rev):
356 with self._catch_revision_errors(
357 ancestor="Destination %(end)s is not a valid upgrade "
358 "target from current head(s)",
359 end=destination,
360 ):
361 revs = self.revision_map.iterate_revisions(
362 destination, current_rev, implicit_base=True
363 )
364 revs = list(revs)
365 return [
366 migration.MigrationStep.upgrade_from_script(
367 self.revision_map, script
368 )
369 for script in reversed(list(revs))
370 ]
372 def _downgrade_revs(self, destination, current_rev):
373 with self._catch_revision_errors(
374 ancestor="Destination %(end)s is not a valid downgrade "
375 "target from current head(s)",
376 end=destination,
377 ):
378 revs = self.revision_map.iterate_revisions(
379 current_rev, destination, select_for_downgrade=True
380 )
381 return [
382 migration.MigrationStep.downgrade_from_script(
383 self.revision_map, script
384 )
385 for script in revs
386 ]
388 def _stamp_revs(self, revision, heads):
389 with self._catch_revision_errors(
390 multiple_heads="Multiple heads are present; please specify a "
391 "single target revision"
392 ):
394 heads = self.get_revisions(heads)
396 steps = []
398 if not revision:
399 revision = "base"
401 filtered_heads = []
402 for rev in util.to_tuple(revision):
403 if rev:
404 filtered_heads.extend(
405 self.revision_map.filter_for_lineage(
406 heads, rev, include_dependencies=True
407 )
408 )
409 filtered_heads = util.unique_list(filtered_heads)
411 dests = self.get_revisions(revision) or [None]
413 for dest in dests:
415 if dest is None:
416 # dest is 'base'. Return a "delete branch" migration
417 # for all applicable heads.
418 steps.extend(
419 [
420 migration.StampStep(
421 head.revision,
422 None,
423 False,
424 True,
425 self.revision_map,
426 )
427 for head in filtered_heads
428 ]
429 )
430 continue
431 elif dest in filtered_heads:
432 # the dest is already in the version table, do nothing.
433 continue
435 # figure out if the dest is a descendant or an
436 # ancestor of the selected nodes
437 descendants = set(
438 self.revision_map._get_descendant_nodes([dest])
439 )
440 ancestors = set(self.revision_map._get_ancestor_nodes([dest]))
442 if descendants.intersection(filtered_heads):
443 # heads are above the target, so this is a downgrade.
444 # we can treat them as a "merge", single step.
445 assert not ancestors.intersection(filtered_heads)
446 todo_heads = [head.revision for head in filtered_heads]
447 step = migration.StampStep(
448 todo_heads,
449 dest.revision,
450 False,
451 False,
452 self.revision_map,
453 )
454 steps.append(step)
455 continue
456 elif ancestors.intersection(filtered_heads):
457 # heads are below the target, so this is an upgrade.
458 # we can treat them as a "merge", single step.
459 todo_heads = [head.revision for head in filtered_heads]
460 step = migration.StampStep(
461 todo_heads,
462 dest.revision,
463 True,
464 False,
465 self.revision_map,
466 )
467 steps.append(step)
468 continue
469 else:
470 # destination is in a branch not represented,
471 # treat it as new branch
472 step = migration.StampStep(
473 (), dest.revision, True, True, self.revision_map
474 )
475 steps.append(step)
476 continue
478 return steps
480 def run_env(self):
481 """Run the script environment.
483 This basically runs the ``env.py`` script present
484 in the migration environment. It is called exclusively
485 by the command functions in :mod:`alembic.command`.
488 """
489 util.load_python_file(self.dir, "env.py")
491 @property
492 def env_py_location(self):
493 return os.path.abspath(os.path.join(self.dir, "env.py"))
495 def _generate_template(self, src, dest, **kw):
496 util.status(
497 "Generating %s" % os.path.abspath(dest),
498 util.template_to_file,
499 src,
500 dest,
501 self.output_encoding,
502 **kw
503 )
505 def _copy_file(self, src, dest):
506 util.status(
507 "Generating %s" % os.path.abspath(dest), shutil.copy, src, dest
508 )
510 def _ensure_directory(self, path):
511 path = os.path.abspath(path)
512 if not os.path.exists(path):
513 util.status("Creating directory %s" % path, os.makedirs, path)
515 def _generate_create_date(self):
516 if self.timezone is not None:
517 # First, assume correct capitalization
518 tzinfo = tz.gettz(self.timezone)
519 if tzinfo is None:
520 # Fall back to uppercase
521 tzinfo = tz.gettz(self.timezone.upper())
522 if tzinfo is None:
523 raise util.CommandError(
524 "Can't locate timezone: %s" % self.timezone
525 )
526 create_date = (
527 datetime.datetime.utcnow()
528 .replace(tzinfo=tz.tzutc())
529 .astimezone(tzinfo)
530 )
531 else:
532 create_date = datetime.datetime.now()
533 return create_date
535 def generate_revision(
536 self,
537 revid,
538 message,
539 head=None,
540 refresh=False,
541 splice=False,
542 branch_labels=None,
543 version_path=None,
544 depends_on=None,
545 **kw
546 ):
547 """Generate a new revision file.
549 This runs the ``script.py.mako`` template, given
550 template arguments, and creates a new file.
552 :param revid: String revision id. Typically this
553 comes from ``alembic.util.rev_id()``.
554 :param message: the revision message, the one passed
555 by the -m argument to the ``revision`` command.
556 :param head: the head revision to generate against. Defaults
557 to the current "head" if no branches are present, else raises
558 an exception.
560 .. versionadded:: 0.7.0
562 :param splice: if True, allow the "head" version to not be an
563 actual head; otherwise, the selected head must be a head
564 (e.g. endpoint) revision.
565 :param refresh: deprecated.
567 """
568 if head is None:
569 head = "head"
571 try:
572 Script.verify_rev_id(revid)
573 except revision.RevisionError as err:
574 compat.raise_from_cause(util.CommandError(err.args[0]))
576 with self._catch_revision_errors(
577 multiple_heads=(
578 "Multiple heads are present; please specify the head "
579 "revision on which the new revision should be based, "
580 "or perform a merge."
581 )
582 ):
583 heads = self.revision_map.get_revisions(head)
585 if len(set(heads)) != len(heads):
586 raise util.CommandError("Duplicate head revisions specified")
588 create_date = self._generate_create_date()
590 if version_path is None:
591 if len(self._version_locations) > 1:
592 for head in heads:
593 if head is not None:
594 version_path = os.path.dirname(head.path)
595 break
596 else:
597 raise util.CommandError(
598 "Multiple version locations present, "
599 "please specify --version-path"
600 )
601 else:
602 version_path = self.versions
604 norm_path = os.path.normpath(os.path.abspath(version_path))
605 for vers_path in self._version_locations:
606 if os.path.normpath(vers_path) == norm_path:
607 break
608 else:
609 raise util.CommandError(
610 "Path %s is not represented in current "
611 "version locations" % version_path
612 )
614 if self.version_locations:
615 self._ensure_directory(version_path)
617 path = self._rev_path(version_path, revid, message, create_date)
619 if not splice:
620 for head in heads:
621 if head is not None and not head.is_head:
622 raise util.CommandError(
623 "Revision %s is not a head revision; please specify "
624 "--splice to create a new branch from this revision"
625 % head.revision
626 )
628 if depends_on:
629 with self._catch_revision_errors():
630 depends_on = [
631 dep
632 if dep in rev.branch_labels # maintain branch labels
633 else rev.revision # resolve partial revision identifiers
634 for rev, dep in [
635 (self.revision_map.get_revision(dep), dep)
636 for dep in util.to_list(depends_on)
637 ]
638 ]
640 self._generate_template(
641 os.path.join(self.dir, "script.py.mako"),
642 path,
643 up_revision=str(revid),
644 down_revision=revision.tuple_rev_as_scalar(
645 tuple(h.revision if h is not None else None for h in heads)
646 ),
647 branch_labels=util.to_tuple(branch_labels),
648 depends_on=revision.tuple_rev_as_scalar(depends_on),
649 create_date=create_date,
650 comma=util.format_as_comma,
651 message=message if message is not None else ("empty message"),
652 **kw
653 )
655 post_write_hooks = self.hook_config
656 if post_write_hooks:
657 write_hooks._run_hooks(path, post_write_hooks)
659 try:
660 script = Script._from_path(self, path)
661 except revision.RevisionError as err:
662 compat.raise_from_cause(util.CommandError(err.args[0]))
663 if branch_labels and not script.branch_labels:
664 raise util.CommandError(
665 "Version %s specified branch_labels %s, however the "
666 "migration file %s does not have them; have you upgraded "
667 "your script.py.mako to include the "
668 "'branch_labels' section?"
669 % (script.revision, branch_labels, script.path)
670 )
672 self.revision_map.add_revision(script)
673 return script
675 def _rev_path(self, path, rev_id, message, create_date):
676 slug = "_".join(_slug_re.findall(message or "")).lower()
677 if len(slug) > self.truncate_slug_length:
678 slug = slug[: self.truncate_slug_length].rsplit("_", 1)[0] + "_"
679 filename = "%s.py" % (
680 self.file_template
681 % {
682 "rev": rev_id,
683 "slug": slug,
684 "year": create_date.year,
685 "month": create_date.month,
686 "day": create_date.day,
687 "hour": create_date.hour,
688 "minute": create_date.minute,
689 "second": create_date.second,
690 }
691 )
692 return os.path.join(path, filename)
695class Script(revision.Revision):
697 """Represent a single revision file in a ``versions/`` directory.
699 The :class:`.Script` instance is returned by methods
700 such as :meth:`.ScriptDirectory.iterate_revisions`.
702 """
704 def __init__(self, module, rev_id, path):
705 self.module = module
706 self.path = path
707 super(Script, self).__init__(
708 rev_id,
709 module.down_revision,
710 branch_labels=util.to_tuple(
711 getattr(module, "branch_labels", None), default=()
712 ),
713 dependencies=util.to_tuple(
714 getattr(module, "depends_on", None), default=()
715 ),
716 )
718 module = None
719 """The Python module representing the actual script itself."""
721 path = None
722 """Filesystem path of the script."""
724 _db_current_indicator = None
725 """Utility variable which when set will cause string output to indicate
726 this is a "current" version in some database"""
728 @property
729 def doc(self):
730 """Return the docstring given in the script."""
732 return re.split("\n\n", self.longdoc)[0]
734 @property
735 def longdoc(self):
736 """Return the docstring given in the script."""
738 doc = self.module.__doc__
739 if doc:
740 if hasattr(self.module, "_alembic_source_encoding"):
741 doc = doc.decode(self.module._alembic_source_encoding)
742 return doc.strip()
743 else:
744 return ""
746 @property
747 def log_entry(self):
748 entry = "Rev: %s%s%s%s%s\n" % (
749 self.revision,
750 " (head)" if self.is_head else "",
751 " (branchpoint)" if self.is_branch_point else "",
752 " (mergepoint)" if self.is_merge_point else "",
753 " (current)" if self._db_current_indicator else "",
754 )
755 if self.is_merge_point:
756 entry += "Merges: %s\n" % (self._format_down_revision(),)
757 else:
758 entry += "Parent: %s\n" % (self._format_down_revision(),)
760 if self.dependencies:
761 entry += "Also depends on: %s\n" % (
762 util.format_as_comma(self.dependencies)
763 )
765 if self.is_branch_point:
766 entry += "Branches into: %s\n" % (
767 util.format_as_comma(self.nextrev)
768 )
770 if self.branch_labels:
771 entry += "Branch names: %s\n" % (
772 util.format_as_comma(self.branch_labels),
773 )
775 entry += "Path: %s\n" % (self.path,)
777 entry += "\n%s\n" % (
778 "\n".join(" %s" % para for para in self.longdoc.splitlines())
779 )
780 return entry
782 def __str__(self):
783 return "%s -> %s%s%s%s, %s" % (
784 self._format_down_revision(),
785 self.revision,
786 " (head)" if self.is_head else "",
787 " (branchpoint)" if self.is_branch_point else "",
788 " (mergepoint)" if self.is_merge_point else "",
789 self.doc,
790 )
792 def _head_only(
793 self,
794 include_branches=False,
795 include_doc=False,
796 include_parents=False,
797 tree_indicators=True,
798 head_indicators=True,
799 ):
800 text = self.revision
801 if include_parents:
802 if self.dependencies:
803 text = "%s (%s) -> %s" % (
804 self._format_down_revision(),
805 util.format_as_comma(self.dependencies),
806 text,
807 )
808 else:
809 text = "%s -> %s" % (self._format_down_revision(), text)
810 if include_branches and self.branch_labels:
811 text += " (%s)" % util.format_as_comma(self.branch_labels)
812 if head_indicators or tree_indicators:
813 text += "%s%s%s" % (
814 " (head)" if self._is_real_head else "",
815 " (effective head)"
816 if self.is_head and not self._is_real_head
817 else "",
818 " (current)" if self._db_current_indicator else "",
819 )
820 if tree_indicators:
821 text += "%s%s" % (
822 " (branchpoint)" if self.is_branch_point else "",
823 " (mergepoint)" if self.is_merge_point else "",
824 )
825 if include_doc:
826 text += ", %s" % self.doc
827 return text
829 def cmd_format(
830 self,
831 verbose,
832 include_branches=False,
833 include_doc=False,
834 include_parents=False,
835 tree_indicators=True,
836 ):
837 if verbose:
838 return self.log_entry
839 else:
840 return self._head_only(
841 include_branches, include_doc, include_parents, tree_indicators
842 )
844 def _format_down_revision(self):
845 if not self.down_revision:
846 return "<base>"
847 else:
848 return util.format_as_comma(self._versioned_down_revisions)
850 @classmethod
851 def _from_path(cls, scriptdir, path):
852 dir_, filename = os.path.split(path)
853 return cls._from_filename(scriptdir, dir_, filename)
855 @classmethod
856 def _list_py_dir(cls, scriptdir, path):
857 if scriptdir.sourceless:
858 # read files in version path, e.g. pyc or pyo files
859 # in the immediate path
860 paths = os.listdir(path)
862 names = set(fname.split(".")[0] for fname in paths)
864 # look for __pycache__
865 if os.path.exists(os.path.join(path, "__pycache__")):
866 # add all files from __pycache__ whose filename is not
867 # already in the names we got from the version directory.
868 # add as relative paths including __pycache__ token
869 paths.extend(
870 os.path.join("__pycache__", pyc)
871 for pyc in os.listdir(os.path.join(path, "__pycache__"))
872 if pyc.split(".")[0] not in names
873 )
874 return paths
875 else:
876 return os.listdir(path)
878 @classmethod
879 def _from_filename(cls, scriptdir, dir_, filename):
880 if scriptdir.sourceless:
881 py_match = _sourceless_rev_file.match(filename)
882 else:
883 py_match = _only_source_rev_file.match(filename)
885 if not py_match:
886 return None
888 py_filename = py_match.group(1)
890 if scriptdir.sourceless:
891 is_c = py_match.group(2) == "c"
892 is_o = py_match.group(2) == "o"
893 else:
894 is_c = is_o = False
896 if is_o or is_c:
897 py_exists = os.path.exists(os.path.join(dir_, py_filename))
898 pyc_exists = os.path.exists(os.path.join(dir_, py_filename + "c"))
900 # prefer .py over .pyc because we'd like to get the
901 # source encoding; prefer .pyc over .pyo because we'd like to
902 # have the docstrings which a -OO file would not have
903 if py_exists or is_o and pyc_exists:
904 return None
906 module = util.load_python_file(dir_, filename)
908 if not hasattr(module, "revision"):
909 # attempt to get the revision id from the script name,
910 # this for legacy only
911 m = _legacy_rev.match(filename)
912 if not m:
913 raise util.CommandError(
914 "Could not determine revision id from filename %s. "
915 "Be sure the 'revision' variable is "
916 "declared inside the script (please see 'Upgrading "
917 "from Alembic 0.1 to 0.2' in the documentation)."
918 % filename
919 )
920 else:
921 revision = m.group(1)
922 else:
923 revision = module.revision
924 return Script(module, revision, os.path.join(dir_, filename))