Coverage for formkit_ninja / triggers.py: 82%

17 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-23 06:00 +0000

1import pgtrigger 

2 

3NODE_CHANGE_ID = "formkitschemanode_change_id" 

4NODE_CHILDREN_CHANGE_ID = "nodechildren_change_id" 

5 

6 

7def create_sequence(sequence_name: str): 

8 return f"CREATE SEQUENCE IF NOT EXISTS {sequence_name};" 

9 

10 

11def drop_sequence(sequence_name: str): 

12 return f"DROP SEQUENCE IF EXISTS {sequence_name};" 

13 

14 

15def create_sequence_migration(sequence_name: str): 

16 return (create_sequence(sequence_name), drop_sequence(sequence_name)) 

17 

18 

19def update_group_trigger(order_by_field: str, id_field: str = "id"): 

20 """ 

21 Takes a model with an "order" field and 

22 a "group" field and adds a trigger to 

23 keep the ordering correct 

24 This assumes a pk field named "id" too 

25 """ 

26 return pgtrigger.Trigger( 

27 name="order_on_update_option", 

28 when=pgtrigger.After, 

29 operation=pgtrigger.Update, 

30 func=pgtrigger.Func( 

31 f""" 

32 -- Do not allow a "null" value 

33 -- This stops Django from dumbly updating 

34 -- which can break the trigger 

35 if NEW."order" IS NULL then 

36 NEW."order" = OLD."order"; 

37 end if; 

38 if NEW."order" > OLD."order" then 

39 update {{meta.db_table}} 

40 set "order" = "order"- 1 

41 where "order" <= NEW."order" 

42 and "order" > OLD."order" 

43 and "{order_by_field}" = NEW."{order_by_field}" 

44 and "{id_field}" <> NEW."{id_field}"; 

45 else 

46 update {{meta.db_table}} 

47 set "order" = "order"+ 1 

48 where "order" >= NEW."order" 

49 and "order" < OLD."order" 

50 and "{order_by_field}" = NEW."{order_by_field}" 

51 and "{id_field}" <> NEW."{id_field}"; 

52 end if; 

53 RETURN NEW; 

54 """ 

55 ), 

56 condition=pgtrigger.Condition("pg_trigger_depth() = 0"), # Prevents infinite recursion 

57 ) 

58 

59 

60def insert_group_trigger(order_by_field: str): 

61 return pgtrigger.Trigger( 

62 name="order_on_insert_option", 

63 when=pgtrigger.Before, 

64 operation=pgtrigger.Insert, 

65 func=pgtrigger.Func( 

66 f'NEW."order" = (SELECT coalesce(max("order"), 0) + 1 FROM {{meta.db_table}} ' 

67 f'WHERE {{meta.db_table}}."{order_by_field}" = NEW."{order_by_field}"); RETURN NEW;' 

68 ), 

69 ) 

70 

71 

72def update_or_insert_group_trigger(order_by_field: str, id_field: str = "id"): 

73 return [update_group_trigger(order_by_field, id_field), insert_group_trigger(order_by_field)] 

74 

75 

76def bump_sequence_value(value_field: str = "track_change", sequence_name: str = NODE_CHANGE_ID): 

77 """ 

78 Increment a sequence value and set the field referred to to that value. 

79 Intended to track latest changes across a model / multiple models 

80 to make syncing easier 

81 Remember to add the appropriate "CREATE SEQUENCE" code to a migration. 

82 That would be something like 

83 

84 migrations.RunSQL( 

85 ''' 

86 CREATE SEQUENCE IF NOT EXISTS ida_options_version_seq; 

87 ''', 

88 ''' 

89 DROP SEQUENCE IF EXISTS ida_options_version_seq; 

90 ''', 

91 ) 

92 """ 

93 return pgtrigger.Trigger( 

94 name="version_on_update", 

95 when=pgtrigger.Before, 

96 operation=pgtrigger.Update | pgtrigger.Insert, 

97 func=f"""NEW."{value_field}" = nextval('{sequence_name}'); RETURN NEW;""", 

98 )