Skip to content

VPN APP API Package

nautobot_app_vpn.api

Initialize the Nautobot VPN plugin API package.

IKECryptoSerializer

Bases: BaseModelSerializer

Serializer for IKECrypto objects.

Source code in nautobot_app_vpn/api/serializers.py
class IKECryptoSerializer(BaseModelSerializer):
    """Serializer for IKECrypto objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:ikecrypto-detail")
    tenant_group = VPNNestedTenantGroupSerializer(read_only=True, required=False, allow_null=True)
    tenant_group_id = serializers.PrimaryKeyRelatedField(
        queryset=TenantGroup.objects.all(),
        source="tenant_group",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant Group",
    )
    tenant = VPNNestedTenantSerializer(read_only=True, required=False, allow_null=True)
    tenant_id = serializers.PrimaryKeyRelatedField(
        queryset=Tenant.objects.all(),
        source="tenant",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant",
    )
    status = VPNNestedStatusSerializer(required=False, allow_null=True, read_only=True)
    status_id = serializers.PrimaryKeyRelatedField(
        queryset=Status.objects.all(), source="status", write_only=True, required=False, allow_null=True, label="Status"
    )
    dh_group = serializers.PrimaryKeyRelatedField(
        queryset=DiffieHellmanGroup.objects.all(), many=True, required=False, label="Diffie-Hellman Groups"
    )
    encryption = serializers.PrimaryKeyRelatedField(
        queryset=EncryptionAlgorithm.objects.all(), many=True, required=False, label="Encryption Algorithms"
    )
    authentication = serializers.PrimaryKeyRelatedField(
        queryset=AuthenticationAlgorithm.objects.all(), many=True, required=False, label="Authentication Algorithms"
    )

    class Meta:
        model = IKECrypto
        fields = [
            "id",
            "display",
            "url",
            "name",
            "dh_group",
            "encryption",
            "authentication",
            "lifetime",
            "lifetime_unit",
            "status",
            "status_id",
            "description",
            "tenant_group",
            "tenant_group_id",
            "tenant",
            "tenant_id",
            "created",
            "last_updated",
        ]
        read_only_fields = ["id", "display", "url", "status", "created", "last_updated"]

IKECryptoViewSet

Bases: ModelViewSet

API endpoint for managing IKE Crypto Profiles.

Source code in nautobot_app_vpn/api/viewsets.py
class IKECryptoViewSet(viewsets.ModelViewSet):
    """API endpoint for managing IKE Crypto Profiles."""

    queryset = IKECrypto.objects.all().order_by("name")
    serializer_class = IKECryptoSerializer
    permission_classes = [IsAdminOrReadOnly]
    filter_backends = [DjangoFilterBackend, filters.OrderingFilter, filters.SearchFilter]
    filterset_class = IKECryptoFilterSet
    ordering_fields = ["name", "dh_group", "encryption", "lifetime"]
    search_fields = ["name", "dh_group", "encryption"]
    pagination_class = StandardResultsSetPagination

IKEGatewaySerializer

Bases: BaseModelSerializer

Serializer for IKEGateway objects.

Source code in nautobot_app_vpn/api/serializers.py
class IKEGatewaySerializer(BaseModelSerializer):
    """Serializer for IKEGateway objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:ikegateway-detail")

    # Read-only Nested Representations
    tenant_group = VPNNestedTenantGroupSerializer(read_only=True, required=False, allow_null=True)
    tenant = VPNNestedTenantSerializer(read_only=True, required=False, allow_null=True)
    local_devices = VPNNestedDeviceSerializer(many=True, read_only=True)
    peer_devices = VPNNestedDeviceSerializer(many=True, read_only=True, required=False)
    local_locations = VPNNestedLocationSerializer(many=True, read_only=True, required=False)
    peer_locations = VPNNestedLocationSerializer(many=True, read_only=True, required=False)
    ike_crypto_profile = VPNNestedIKECryptoSerializer(read_only=True, required=False, allow_null=True)
    status = VPNNestedStatusSerializer(read_only=True, required=False, allow_null=True)
    bind_interface = VPNNestedInterfaceSerializer(read_only=True, required=False, allow_null=True)
    local_platform = VPNNestedPlatformSerializer(read_only=True, required=False, allow_null=True)
    peer_platform = VPNNestedPlatformSerializer(read_only=True, required=False, allow_null=True)

    # Writeable Related Field Selectors
    tenant_group_id = serializers.PrimaryKeyRelatedField(
        queryset=TenantGroup.objects.all(),
        source="tenant_group",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant Group",
    )
    tenant_id = serializers.PrimaryKeyRelatedField(
        queryset=Tenant.objects.all(),
        source="tenant",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant",
    )
    local_device_ids = serializers.PrimaryKeyRelatedField(
        queryset=Device.objects.all(),
        source="local_devices",
        many=True,
        write_only=True,
        required=True,
        label="Local Devices (IDs)",
    )
    peer_device_ids = serializers.PrimaryKeyRelatedField(
        queryset=Device.objects.all(),
        source="peer_devices",
        many=True,
        write_only=True,
        required=False,
        label="Peer Devices (IDs)",
    )
    local_location_ids = serializers.PrimaryKeyRelatedField(
        queryset=Location.objects.all(),
        source="local_locations",
        many=True,
        write_only=True,
        required=False,
        label="Local Locations (IDs)",
    )
    peer_location_ids = serializers.PrimaryKeyRelatedField(
        queryset=Location.objects.all(),
        source="peer_locations",
        many=True,
        write_only=True,
        required=False,
        label="Peer Locations (IDs)",
    )
    ike_crypto_profile_id = serializers.PrimaryKeyRelatedField(
        queryset=IKECrypto.objects.all(),
        source="ike_crypto_profile",
        write_only=True,
        required=True,
        allow_null=False,
        label="IKE Crypto Profile",
    )
    status_id = serializers.PrimaryKeyRelatedField(
        queryset=Status.objects.all(), source="status", write_only=True, required=False, allow_null=True, label="Status"
    )

    bind_interface_id = serializers.PrimaryKeyRelatedField(
        queryset=Interface.objects.all(),
        source="bind_interface",
        write_only=True,
        required=False,
        allow_null=True,
        label="Bind Interface (ID)",
    )
    local_platform_id = serializers.PrimaryKeyRelatedField(
        queryset=Platform.objects.all(),
        source="local_platform",
        write_only=True,
        required=False,
        allow_null=True,
        label="Local Platform (ID)",
    )
    peer_platform_id = serializers.PrimaryKeyRelatedField(
        queryset=Platform.objects.all(),
        source="peer_platform",
        write_only=True,
        required=False,
        allow_null=True,
        label="Peer Platform (ID)",
    )

    # Choice Fields
    ike_version = ChoiceField(choices=IKEVersions.choices, required=False)
    exchange_mode = ChoiceField(choices=IKEExchangeModes.choices, required=False)
    local_ip_type = ChoiceField(choices=IPAddressTypes.choices, required=False)
    peer_ip_type = ChoiceField(choices=IPAddressTypes.choices, required=False)
    local_id_type = ChoiceField(choices=IdentificationTypes.choices, required=False, allow_null=True)
    peer_id_type = ChoiceField(choices=IdentificationTypes.choices, required=False, allow_null=True)
    authentication_type = ChoiceField(choices=IKEAuthenticationTypes.choices, required=True, allow_null=False)
    name = serializers.CharField(required=True, allow_blank=False)

    # Other Fields
    pre_shared_key = serializers.CharField(
        write_only=True, required=False, allow_blank=True, allow_null=True, style={"input_type": "password"}
    )

    class Meta:
        model = IKEGateway
        fields = [
            "id",
            "display",
            "url",
            "name",
            "description",
            "tenant_group",
            "tenant_group_id",
            "tenant",
            "tenant_id",
            "ike_version",
            "exchange_mode",
            "local_ip_type",
            "local_ip",
            "local_devices",
            "local_device_ids",
            "local_locations",
            "local_location_ids",
            "local_platform",
            "local_platform_id",
            "local_id_type",
            "local_id_value",
            "peer_ip_type",
            "peer_ip",
            "peer_devices",
            "peer_device_ids",
            "peer_device_manual",
            "peer_locations",
            "peer_location_ids",
            "peer_location_manual",
            "peer_platform",
            "peer_platform_id",
            "peer_id_type",
            "peer_id_value",
            "authentication_type",
            "pre_shared_key",
            "ike_crypto_profile",
            "ike_crypto_profile_id",
            "bind_interface",
            "bind_interface_id",
            "enable_passive_mode",
            "enable_nat_traversal",
            "enable_dpd",
            "dpd_interval",
            "dpd_retry",
            "liveness_check_interval",
            "status",
            "status_id",
            "last_sync",
            "created",
            "last_updated",
        ]

        read_only_fields = [
            "id",
            "display",
            "url",
            "tenant_group",
            "tenant",
            "local_devices",
            "peer_devices",
            "local_locations",
            "peer_locations",
            "ike_crypto_profile",
            "status",
            "bind_interface",
            "local_platform",
            "peer_platform",
            "created",
            "last_updated",
            "last_sync",
        ]

    def validate(self, data):  # pylint: disable=arguments-renamed
        """Custom validation for IKEGateway serializer."""
        peer_locations = data.get("peer_locations")
        peer_location_manual = data.get("peer_location_manual")
        if peer_locations and peer_location_manual:
            raise serializers.ValidationError("Specify Peer Locations *or* Manual Peer Location, not both.")
        bind_iface_id = data.get("bind_interface")
        local_device_ids = data.get("local_devices")
        if bind_iface_id and local_device_ids:
            try:
                bind_iface_obj = Interface.objects.select_related("device").get(pk=bind_iface_id.pk)
                if bind_iface_obj.device.pk not in [dev.pk for dev in local_device_ids]:
                    raise serializers.ValidationError(
                        {
                            "bind_interface_id": "Selected Bind Interface must belong to one of the selected Local Devices."
                        }
                    )
            except Interface.DoesNotExist as exc:
                raise serializers.ValidationError({"bind_interface_id": "Invalid Bind Interface selected."}) from exc
        return data

validate(data)

Custom validation for IKEGateway serializer.

Source code in nautobot_app_vpn/api/serializers.py
def validate(self, data):  # pylint: disable=arguments-renamed
    """Custom validation for IKEGateway serializer."""
    peer_locations = data.get("peer_locations")
    peer_location_manual = data.get("peer_location_manual")
    if peer_locations and peer_location_manual:
        raise serializers.ValidationError("Specify Peer Locations *or* Manual Peer Location, not both.")
    bind_iface_id = data.get("bind_interface")
    local_device_ids = data.get("local_devices")
    if bind_iface_id and local_device_ids:
        try:
            bind_iface_obj = Interface.objects.select_related("device").get(pk=bind_iface_id.pk)
            if bind_iface_obj.device.pk not in [dev.pk for dev in local_device_ids]:
                raise serializers.ValidationError(
                    {
                        "bind_interface_id": "Selected Bind Interface must belong to one of the selected Local Devices."
                    }
                )
        except Interface.DoesNotExist as exc:
            raise serializers.ValidationError({"bind_interface_id": "Invalid Bind Interface selected."}) from exc
    return data

IKEGatewayViewSet

Bases: ModelViewSet

API viewset for IKE Gateways.

Source code in nautobot_app_vpn/api/viewsets.py
class IKEGatewayViewSet(viewsets.ModelViewSet):
    """API viewset for IKE Gateways."""

    queryset = (
        IKEGateway.objects.select_related(
            "ike_crypto_profile",
            "status",
            "bind_interface",
        )
        .prefetch_related("local_devices", "peer_devices", "local_locations", "peer_locations")
        .order_by("name")
    )

    serializer_class = IKEGatewaySerializer
    permission_classes = [IsAdminOrReadOnly]
    filter_backends = [DjangoFilterBackend, filters.OrderingFilter, filters.SearchFilter]
    filterset_class = IKEGatewayFilterSet
    ordering_fields = ["name", "local_ip", "peer_ip", "bind_interface__name"]

    search_fields = [
        "name",
        "description",
        "local_ip",
        "peer_ip",
        "peer_device_manual",
        "peer_location_manual",
        "bind_interface__name",
    ]
    pagination_class = StandardResultsSetPagination

    def perform_create(self, serializer):
        serializer.save()

    def perform_update(self, serializer):
        serializer.save()

IPSECTunnelSerializer

Bases: BaseModelSerializer

Serializer for IPSECTunnel objects.

Source code in nautobot_app_vpn/api/serializers.py
class IPSECTunnelSerializer(BaseModelSerializer):
    """Serializer for IPSECTunnel objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:ipsectunnel-detail")

    tenant_group = VPNNestedTenantGroupSerializer(read_only=True, required=False, allow_null=True)
    tenant = VPNNestedTenantSerializer(read_only=True, required=False, allow_null=True)
    devices = VPNNestedDeviceSerializer(many=True, read_only=True)
    ike_gateway = VPNNestedIKEGatewaySerializer(read_only=True, required=False, allow_null=True)
    ipsec_crypto_profile = VPNNestedIPSecCryptoSerializer(read_only=True, required=False, allow_null=True)
    status = VPNNestedStatusSerializer(read_only=True, required=False, allow_null=True)
    tunnel_interface = VPNNestedInterfaceSerializer(read_only=True, required=False, allow_null=True)
    monitor_profile = VPNNestedTunnelMonitorProfileSerializer(read_only=True, required=False, allow_null=True)
    proxy_ids = IPSecProxyIDSerializer(many=True, read_only=True)
    role = ChoiceField(choices=TunnelRoleChoices.choices, required=False, allow_null=True)

    device_ids = serializers.PrimaryKeyRelatedField(
        queryset=Device.objects.all(),
        source="devices",
        many=True,
        write_only=True,
        required=True,
        label="Devices (IDs)",
    )
    tenant_group_id = serializers.PrimaryKeyRelatedField(
        queryset=TenantGroup.objects.all(),
        source="tenant_group",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant Group",
    )
    tenant_id = serializers.PrimaryKeyRelatedField(
        queryset=Tenant.objects.all(),
        source="tenant",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant",
    )
    ike_gateway_id = serializers.PrimaryKeyRelatedField(
        queryset=IKEGateway.objects.all(),
        source="ike_gateway",
        write_only=True,
        required=True,
        allow_null=False,
        label="IKE Gateway",
    )
    ipsec_crypto_profile_id = serializers.PrimaryKeyRelatedField(
        queryset=IPSecCrypto.objects.all(),
        source="ipsec_crypto_profile",
        write_only=True,
        required=True,
        allow_null=False,
        label="IPSec Crypto Profile",
    )
    status_id = serializers.PrimaryKeyRelatedField(
        queryset=Status.objects.all(), source="status", write_only=True, required=False, allow_null=True, label="Status"
    )
    tunnel_interface_id = serializers.PrimaryKeyRelatedField(
        queryset=Interface.objects.all(),
        source="tunnel_interface",
        write_only=True,
        required=True,
        allow_null=False,
        label="Tunnel Interface",
    )

    monitor_profile_id = serializers.PrimaryKeyRelatedField(
        queryset=TunnelMonitorProfile.objects.all(),
        source="monitor_profile",
        write_only=True,
        required=False,
        allow_null=True,
        label="Monitor Profile (ID)",
    )

    class Meta:
        model = IPSECTunnel
        fields = [
            "id",
            "display",
            "url",
            "name",
            "description",
            "tenant_group",
            "tenant_group_id",
            "tenant",
            "tenant_id",
            "devices",
            "device_ids",
            "ike_gateway",
            "ike_gateway_id",
            "ipsec_crypto_profile",
            "ipsec_crypto_profile_id",
            "tunnel_interface",
            "tunnel_interface_id",
            "role",
            "proxy_ids",
            "enable_tunnel_monitor",
            "monitor_destination_ip",
            "monitor_profile",
            "monitor_profile_id",
            "status",
            "status_id",
            "last_sync",
            "created",
            "last_updated",
        ]

        read_only_fields = [
            "id",
            "display",
            "url",
            "tenant_group",
            "tenant",
            "devices",
            "ike_gateway",
            "ipsec_crypto_profile",
            "status",
            "tunnel_interface",
            "monitor_profile",
            "proxy_ids",
            "created",
            "last_updated",
            "last_sync",
        ]

    def validate(self, data):  # pylint: disable=arguments-renamed
        """Custom validation for IPSECTunnel serializer."""
        monitor_enabled = data.get(
            "enable_tunnel_monitor", getattr(self.instance, "enable_tunnel_monitor", False) if self.instance else False
        )
        dest_ip = data.get(
            "monitor_destination_ip", getattr(self.instance, "monitor_destination_ip", None) if self.instance else None
        )
        profile_id_submitted = "monitor_profile_id" in data

        if monitor_enabled:
            if not dest_ip:
                raise serializers.ValidationError(
                    {"monitor_destination_ip": "Destination IP is required when tunnel monitoring is enabled."}
                )

            if profile_id_submitted and data.get("monitor_profile_id") is None:
                raise serializers.ValidationError(
                    {"monitor_profile_id": "Monitor Profile is required when tunnel monitoring is enabled."}
                )

            if not self.instance and "monitor_profile_id" not in data:
                raise serializers.ValidationError(
                    {"monitor_profile_id": "Monitor Profile is required when tunnel monitoring is enabled."}
                )

            if self.instance and profile_id_submitted and data.get("monitor_profile_id") is None and monitor_enabled:
                raise serializers.ValidationError(
                    {"monitor_profile_id": "Cannot remove Monitor Profile while tunnel monitoring is enabled."}
                )
        return data

validate(data)

Custom validation for IPSECTunnel serializer.

Source code in nautobot_app_vpn/api/serializers.py
def validate(self, data):  # pylint: disable=arguments-renamed
    """Custom validation for IPSECTunnel serializer."""
    monitor_enabled = data.get(
        "enable_tunnel_monitor", getattr(self.instance, "enable_tunnel_monitor", False) if self.instance else False
    )
    dest_ip = data.get(
        "monitor_destination_ip", getattr(self.instance, "monitor_destination_ip", None) if self.instance else None
    )
    profile_id_submitted = "monitor_profile_id" in data

    if monitor_enabled:
        if not dest_ip:
            raise serializers.ValidationError(
                {"monitor_destination_ip": "Destination IP is required when tunnel monitoring is enabled."}
            )

        if profile_id_submitted and data.get("monitor_profile_id") is None:
            raise serializers.ValidationError(
                {"monitor_profile_id": "Monitor Profile is required when tunnel monitoring is enabled."}
            )

        if not self.instance and "monitor_profile_id" not in data:
            raise serializers.ValidationError(
                {"monitor_profile_id": "Monitor Profile is required when tunnel monitoring is enabled."}
            )

        if self.instance and profile_id_submitted and data.get("monitor_profile_id") is None and monitor_enabled:
            raise serializers.ValidationError(
                {"monitor_profile_id": "Cannot remove Monitor Profile while tunnel monitoring is enabled."}
            )
    return data

IPSECTunnelViewSet

Bases: ModelViewSet

API viewset for IPSec Tunnels.

Source code in nautobot_app_vpn/api/viewsets.py
class IPSECTunnelViewSet(viewsets.ModelViewSet):
    """API viewset for IPSec Tunnels."""

    queryset = (
        IPSECTunnel.objects.select_related(
            "ike_gateway",
            "ipsec_crypto_profile",
            "status",
            "tunnel_interface",
            "monitor_profile",
        )
        .prefetch_related(
            "devices",
            "proxy_ids",
        )
        .order_by("name")
        .distinct()
    )

    serializer_class = IPSECTunnelSerializer
    permission_classes = [IsAdminOrReadOnly]
    filter_backends = [DjangoFilterBackend, filters.OrderingFilter, filters.SearchFilter]
    filterset_class = IPSECTunnelFilterSet

    ordering_fields = [
        "name",
        "ike_gateway__name",
        "ipsec_crypto_profile__name",
        "tunnel_interface__name",
        "status__name",
        "enable_tunnel_monitor",
        "monitor_destination_ip",
    ]
    search_fields = [
        "name",
        "description",
        "ike_gateway__name",
        "ipsec_crypto_profile__name",
        "tunnel_interface__name",
        "monitor_destination_ip",
    ]
    pagination_class = StandardResultsSetPagination

    def perform_create(self, serializer):
        serializer.save()

    def perform_update(self, serializer):
        serializer.save()

IPSecCryptoSerializer

Bases: BaseModelSerializer

Serializer for IPSecCrypto objects.

Source code in nautobot_app_vpn/api/serializers.py
class IPSecCryptoSerializer(BaseModelSerializer):
    """Serializer for IPSecCrypto objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:ipseccrypto-detail")
    tenant_group = VPNNestedTenantGroupSerializer(read_only=True, required=False, allow_null=True)
    tenant_group_id = serializers.PrimaryKeyRelatedField(
        queryset=TenantGroup.objects.all(),
        source="tenant_group",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant Group",
    )
    tenant = VPNNestedTenantSerializer(read_only=True, required=False, allow_null=True)
    tenant_id = serializers.PrimaryKeyRelatedField(
        queryset=Tenant.objects.all(),
        source="tenant",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant",
    )
    status = VPNNestedStatusSerializer(required=False, allow_null=True, read_only=True)
    status_id = serializers.PrimaryKeyRelatedField(
        queryset=Status.objects.all(), source="status", write_only=True, required=False, allow_null=True, label="Status"
    )
    dh_group = serializers.PrimaryKeyRelatedField(
        queryset=DiffieHellmanGroup.objects.all(), many=True, required=False, label="Diffie-Hellman Groups"
    )
    encryption = serializers.PrimaryKeyRelatedField(
        queryset=EncryptionAlgorithm.objects.all(), many=True, required=False, label="Encryption Algorithms"
    )
    authentication = serializers.PrimaryKeyRelatedField(
        queryset=AuthenticationAlgorithm.objects.all(), many=True, required=False, label="Authentication Algorithms"
    )

    class Meta:
        model = IPSecCrypto
        fields = [
            "id",
            "display",
            "url",
            "name",
            "encryption",
            "authentication",
            "dh_group",
            "protocol",
            "lifetime",
            "lifetime_unit",
            "status",
            "status_id",
            "description",
            "tenant_group",
            "tenant_group_id",
            "tenant",
            "tenant_id",
            "created",
            "last_updated",
        ]
        read_only_fields = ["id", "display", "url", "status", "created", "last_updated"]

IPSecCryptoViewSet

Bases: ModelViewSet

API endpoint for managing IPSec Crypto Profiles.

Source code in nautobot_app_vpn/api/viewsets.py
class IPSecCryptoViewSet(viewsets.ModelViewSet):
    """API endpoint for managing IPSec Crypto Profiles."""

    queryset = IPSecCrypto.objects.all().order_by("name")
    serializer_class = IPSecCryptoSerializer
    permission_classes = [IsAdminOrReadOnly]
    filter_backends = [DjangoFilterBackend, filters.OrderingFilter, filters.SearchFilter]
    filterset_class = IPSecCryptoFilterSet
    ordering_fields = ["name", "encryption", "authentication", "dh_group"]
    search_fields = ["name", "encryption", "authentication"]
    pagination_class = StandardResultsSetPagination

IsAdminOrReadOnly

Bases: BasePermission

Allow only staff/superusers to modify data. Read-only access is allowed for everyone.

Source code in nautobot_app_vpn/api/permissions.py
class IsAdminOrReadOnly(BasePermission):
    """Allow only staff/superusers to modify data.
    Read-only access is allowed for everyone.
    """

    def has_permission(self, request, view):
        if request.method in SAFE_METHODS:
            return True
        return request.user and (request.user.is_staff or request.user.is_superuser)

StandardResultsSetPagination

Bases: PageNumberPagination

Standard pagination for API endpoints.

  • Supports dynamic page sizes via ?page_size=X
  • Prevents excessive page loads with max_page_size=100
  • Defaults to 25 results per page for a better balance of performance and usability.
Source code in nautobot_app_vpn/api/pagination.py
class StandardResultsSetPagination(PageNumberPagination):
    """Standard pagination for API endpoints.

    - Supports dynamic page sizes via `?page_size=X`
    - Prevents excessive page loads with `max_page_size=100`
    - Defaults to 25 results per page
      for a better balance of performance and usability.
    """

    page_size = 25
    page_size_query_param = "page_size"
    max_page_size = 200
    last_page_strings = ("last",)

TunnelMonitorProfileSerializer

Bases: BaseModelSerializer

Serializer for Tunnel Monitor Profiles.

Source code in nautobot_app_vpn/api/serializers.py
class TunnelMonitorProfileSerializer(BaseModelSerializer):
    """Serializer for Tunnel Monitor Profiles."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:tunnelmonitorprofile-detail")
    action = ChoiceField(choices=TunnelMonitorActionChoices.choices, required=False)
    tenant_group = VPNNestedTenantGroupSerializer(read_only=True, required=False, allow_null=True)
    tenant_group_id = serializers.PrimaryKeyRelatedField(
        queryset=TenantGroup.objects.all(),
        source="tenant_group",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant Group",
    )
    tenant = VPNNestedTenantSerializer(read_only=True, required=False, allow_null=True)
    tenant_id = serializers.PrimaryKeyRelatedField(
        queryset=Tenant.objects.all(),
        source="tenant",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant",
    )

    class Meta:
        model = TunnelMonitorProfile
        fields = [
            "id",
            "display",
            "url",
            "name",
            "tenant_group",
            "tenant_group_id",
            "tenant",
            "tenant_id",
            "action",
            "interval",
            "threshold",
            "created",
            "last_updated",
        ]
        read_only_fields = ["id", "display", "url", "tenant_group", "tenant", "created", "last_updated"]

TunnelMonitorProfileViewSet

Bases: ModelViewSet

API viewset for Tunnel Monitor Profiles.

Source code in nautobot_app_vpn/api/viewsets.py
class TunnelMonitorProfileViewSet(viewsets.ModelViewSet):
    """API viewset for Tunnel Monitor Profiles."""

    queryset = TunnelMonitorProfile.objects.all().order_by("name")
    serializer_class = TunnelMonitorProfileSerializer
    permission_classes = [IsAdminOrReadOnly]
    filter_backends = [DjangoFilterBackend, filters.OrderingFilter, filters.SearchFilter]
    filterset_class = TunnelMonitorProfileFilterSet
    ordering_fields = ["name", "action", "interval", "threshold"]
    search_fields = ["name"]
    pagination_class = StandardResultsSetPagination

VPNTopologyFilterOptionsView

Bases: APIView

Returns arrays per filter key your UI expects: { "country": [...], "role": [...], "status": [...], "ike_version": [...], "location": [...], "device": [...], "platform": [...] } (Built from relational data; no change to IKE/IPSec logic.)

Source code in nautobot_app_vpn/api/viewsets.py
class VPNTopologyFilterOptionsView(APIView):
    """
    Returns arrays per filter key your UI expects:
    {
      "country": [...], "role": [...], "status": [...],
      "ike_version": [...], "location": [...],
      "device": [...], "platform": [...]
    }
    (Built from relational data; no change to IKE/IPSec logic.)
    """

    serializer_class = DummySerializer
    permission_classes = [IsAuthenticated]

    def _get_device_country_from_name(self, device_name):
        """Derives country from device name based on 'CODE-...' convention."""
        if device_name:
            parts = device_name.split("-")
            if parts:
                return parts[0].upper()
        return None

    def get(self, request):
        """Return available filter values derived from relational VPN data."""
        logger.debug("Filter options GET request from user %s", request.user)
        countries = set()
        ike_versions = set()
        statuses = set()
        tunnel_roles = set()
        devices = set()
        locations = set()
        platforms = set()

        tunnels_qs = IPSECTunnel.objects.select_related(
            "ike_gateway", "status", "ike_gateway__local_platform", "ike_gateway__peer_platform"
        ).prefetch_related(
            "ike_gateway__local_devices__platform",
            "ike_gateway__local_devices__location",
            "ike_gateway__local_devices__role",
            "ike_gateway__peer_devices__platform",
            "ike_gateway__peer_devices__location",
            "ike_gateway__peer_devices__role",
        )

        for tunnel in tunnels_qs:
            if tunnel.status and tunnel.status.name:
                statuses.add(tunnel.status.name)
            if tunnel.role:
                tunnel_roles.add(str(tunnel.role))

            gw = tunnel.ike_gateway
            if gw:
                if gw.ike_version:
                    ike_versions.add(str(gw.ike_version))

                for dev_group in [gw.local_devices.all(), gw.peer_devices.all()]:
                    for dev in dev_group:
                        if dev and dev.name:
                            devices.add(dev.name)
                            country = self._get_device_country_from_name(dev.name)
                            if country:
                                countries.add(country)
                        if dev and dev.location and dev.location.name:
                            locations.add(dev.location.name)
                        if dev and dev.platform and dev.platform.name:
                            platforms.add(dev.platform.name)

                # consider local/peer platforms on gateway
                for plat in [gw.local_platform, gw.peer_platform]:
                    if plat and plat.name:
                        platforms.add(plat.name)

        # also include all defined platforms
        for plat in Platform.objects.all().values("name").distinct():
            if plat["name"]:
                platforms.add(plat["name"])

        # OUTPUT KEYS match frontend expectations
        return Response(
            {
                "country": sorted(filter(None, countries)),
                "ike_version": sorted(filter(None, ike_versions)),
                "status": sorted(filter(None, statuses)),
                "role": sorted(filter(None, tunnel_roles)),
                "location": sorted(filter(None, locations)),
                "device": sorted(filter(None, devices)),
                "platform": sorted(filter(None, platforms)),
            }
        )

get(request)

Return available filter values derived from relational VPN data.

Source code in nautobot_app_vpn/api/viewsets.py
def get(self, request):
    """Return available filter values derived from relational VPN data."""
    logger.debug("Filter options GET request from user %s", request.user)
    countries = set()
    ike_versions = set()
    statuses = set()
    tunnel_roles = set()
    devices = set()
    locations = set()
    platforms = set()

    tunnels_qs = IPSECTunnel.objects.select_related(
        "ike_gateway", "status", "ike_gateway__local_platform", "ike_gateway__peer_platform"
    ).prefetch_related(
        "ike_gateway__local_devices__platform",
        "ike_gateway__local_devices__location",
        "ike_gateway__local_devices__role",
        "ike_gateway__peer_devices__platform",
        "ike_gateway__peer_devices__location",
        "ike_gateway__peer_devices__role",
    )

    for tunnel in tunnels_qs:
        if tunnel.status and tunnel.status.name:
            statuses.add(tunnel.status.name)
        if tunnel.role:
            tunnel_roles.add(str(tunnel.role))

        gw = tunnel.ike_gateway
        if gw:
            if gw.ike_version:
                ike_versions.add(str(gw.ike_version))

            for dev_group in [gw.local_devices.all(), gw.peer_devices.all()]:
                for dev in dev_group:
                    if dev and dev.name:
                        devices.add(dev.name)
                        country = self._get_device_country_from_name(dev.name)
                        if country:
                            countries.add(country)
                    if dev and dev.location and dev.location.name:
                        locations.add(dev.location.name)
                    if dev and dev.platform and dev.platform.name:
                        platforms.add(dev.platform.name)

            # consider local/peer platforms on gateway
            for plat in [gw.local_platform, gw.peer_platform]:
                if plat and plat.name:
                    platforms.add(plat.name)

    # also include all defined platforms
    for plat in Platform.objects.all().values("name").distinct():
        if plat["name"]:
            platforms.add(plat["name"])

    # OUTPUT KEYS match frontend expectations
    return Response(
        {
            "country": sorted(filter(None, countries)),
            "ike_version": sorted(filter(None, ike_versions)),
            "status": sorted(filter(None, statuses)),
            "role": sorted(filter(None, tunnel_roles)),
            "location": sorted(filter(None, locations)),
            "device": sorted(filter(None, devices)),
            "platform": sorted(filter(None, platforms)),
        }
    )

VPNTopologyNeo4jView

Bases: APIView

Returns GeoJSON for MapLibre:

{ "devices": FeatureCollection(Point), "tunnels": FeatureCollection(LineString), "stats": { "active": N, "failed": M, "planned": K, ... }, "meta": { "devices_count": int, "tunnels_count": int, "countries_count": int, "platforms_count": int, "ha_pairs": int, "last_synced": ISO8601 or null } }

Source code in nautobot_app_vpn/api/viewsets.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
class VPNTopologyNeo4jView(APIView):
    """
    Returns GeoJSON for MapLibre:

    {
      "devices": FeatureCollection(Point),
      "tunnels": FeatureCollection(LineString),
      "stats": { "active": N, "failed": M, "planned": K, ... },
      "meta":  {
        "devices_count": int,
        "tunnels_count": int,
        "countries_count": int,
        "platforms_count": int,
        "ha_pairs": int,
        "last_synced": ISO8601 or null
      }
    }
    """

    serializer_class = DummySerializer
    permission_classes = [IsAuthenticated]

    def _build_node_where(self, params_in, qp_out):
        where = []
        if params_in.get("country"):
            where.append("toLower(n.country) = toLower($country)")
            qp_out["country"] = params_in["country"]

        if params_in.get("platform"):
            where.append("toLower(n.platform_name) CONTAINS toLower($platform)")
            qp_out["platform"] = params_in["platform"]

        if params_in.get("location"):
            where.append("toLower(n.location_name) CONTAINS toLower($location)")
            qp_out["location"] = params_in["location"]

        if params_in.get("device"):
            val = str(params_in["device"]).strip()
            # Guard against null lists with coalesce()
            where.append(
                "("
                "toLower($device_name) IN [dev IN coalesce(n.device_names, []) | toLower(dev)] "
                "OR $device_name IN coalesce(n.nautobot_device_pks, []) "
                "OR toLower(n.label) CONTAINS toLower($device_name)"
                ")"
            )
            qp_out["device_name"] = val

        if params_in.get("role"):
            where.append("toLower(n.role) = toLower($node_role)")
            qp_out["node_role"] = params_in["role"]

        return where

    def _build_edge_filter(self, params_in, qp_out):
        conds = []
        if params_in.get("status"):
            conds.append("toLower(r.status) = toLower($tunnel_status)")
            qp_out["tunnel_status"] = params_in["status"]

        if params_in.get("ike_version"):
            conds.append("toLower(r.ike_version) = toLower($ike_version)")
            qp_out["ike_version"] = params_in["ike_version"]

        if params_in.get("role"):
            conds.append("toLower(r.role) = toLower($tunnel_role)")
            qp_out["tunnel_role"] = params_in["role"]

        return conds

    def get(self, request):
        """Return VPN topology GeoJSON and summary metadata sourced from Neo4j."""
        logger.info("Neo4j VPN Topology GET request from user %s with filters: %s", request.user, request.GET.dict())

        # Settings check
        for attr in ("NEO4J_URI", "NEO4J_USER", "NEO4J_PASSWORD"):
            if not hasattr(settings, attr):
                logger.error("Neo4j connection settings are not fully configured in Nautobot settings.")
                return Response({"error": "Graph database service is not configured."}, status=503)

        driver = None
        try:
            driver = _neo4j_driver()
            driver.verify_connectivity()
        except Exception as exc:  # pylint: disable=broad-exception-caught
            logger.error("Failed to connect to Neo4j for topology view: %s", exc, exc_info=True)
            return Response({"error": "Could not connect to graph database."}, status=503)

        params_in = request.GET.dict()

        # --- Relational statistics for ribbon summary ---
        tracked_status = [
            ("active", "Active"),
            ("down", "Down"),
            ("decommissioned", "Decommissioned"),
            ("disabled", "Disabled"),
            ("planned", "Planned"),
        ]
        status_counts = {slug: 0 for slug, _ in tracked_status}
        status_labels = dict(tracked_status)
        status_order = [slug for slug, _ in tracked_status]

        role_labels = {
            "primary": "Primary",
            "secondary": "Secondary",
            "tertiary": "Tertiary",
            "unassigned": "Unassigned",
        }
        role_counts = {key: 0 for key in role_labels}
        role_order = ["primary", "secondary", "tertiary"]
        total_tunnels = 0

        try:
            tunnels_qs = IPSECTunnel.objects.restrict(request.user, "view")

            status_filter = (params_in.get("status") or "").strip()
            role_filter = (params_in.get("role") or "").strip()

            status_model = apps.get_model("extras", "Status")
            status_fields = {f.name for f in status_model._meta.get_fields()}
            has_status_slug = "slug" in status_fields

            if status_filter:
                status_lookup = Q(status__name__iexact=status_filter)
                if has_status_slug:
                    status_lookup |= Q(status__slug__iexact=status_filter)
                tunnels_qs = tunnels_qs.filter(status_lookup)

            if role_filter:
                tunnels_qs = tunnels_qs.filter(role__iexact=role_filter)

            status_value_fields = ["status__name"]
            if has_status_slug:
                status_value_fields.append("status__slug")

            for row in tunnels_qs.values(*status_value_fields).annotate(total=Count("id")):
                status_name = (row.get("status__name") or "").strip()
                raw_key = row.get("status__slug") if has_status_slug else status_name
                slug_key = (raw_key or status_name or "unknown").strip().lower().replace(" ", "-")
                if not slug_key:
                    slug_key = "unknown"
                status_counts[slug_key] = row["total"]
                status_labels[slug_key] = status_name or status_labels.get(slug_key, slug_key.title())
                if slug_key not in status_order:
                    status_order.append(slug_key)

            for row in tunnels_qs.values("role").annotate(total=Count("id")):
                role_value = (row["role"] or "unassigned").lower()
                role_counts[role_value] = row["total"]
                if role_value not in role_order:
                    role_order.append(role_value)

            total_tunnels = tunnels_qs.count()
        except (DatabaseError, LookupError) as agg_exc:
            logger.error("Failed to compute relational tunnel statistics: %s", agg_exc, exc_info=True)

        qp = {}

        node_where = self._build_node_where(params_in, qp)
        node_query = "MATCH (n:VPNNode)"
        if node_where:
            node_query += " WHERE " + " AND ".join(node_where)
        node_query += " RETURN n"

        try:
            devices_fc = {"type": "FeatureCollection", "features": []}
            tunnels_fc = {"type": "FeatureCollection", "features": []}

            with driver.session(database=getattr(settings, "NEO4J_DATABASE", "neo4j")) as session:
                # ---- Nodes
                logger.debug("Node query: %s params=%s", node_query, qp)
                node_records = session.run(node_query, qp)

                node_ids = set()
                for rec in node_records:
                    nprops = dict(rec["n"])
                    node_id = nprops.get("id")
                    if not node_id:
                        continue
                    # accept several possible coord keys
                    lat = nprops.get("lat", nprops.get("latitude"))
                    lon = nprops.get("lon", nprops.get("longitude"))
                    if lat is None or lon is None:
                        # skip nodes without geo
                        continue

                    node_ids.add(node_id)
                    props = {
                        "id": node_id,
                        "name": nprops.get("name") or nprops.get("label") or "",
                        "status": nprops.get("status") or "unknown",
                        "role": nprops.get("role"),
                        "platform": nprops.get("platform_name"),
                        "country": nprops.get("country"),
                        "location": nprops.get("location_name"),
                        "is_ha_pair": bool(nprops.get("is_ha_pair")),
                        # Include backing device info to make device filter work with HA groups
                        "device_names": nprops.get("device_names") or [],
                        "nautobot_device_pks": nprops.get("nautobot_device_pks") or [],
                        "search_text": " ".join(
                            str(x)
                            for x in [
                                nprops.get("name") or nprops.get("label"),
                                nprops.get("role"),
                                nprops.get("platform_name"),
                                nprops.get("country"),
                                nprops.get("location_name"),
                            ]
                            if x
                        ),
                    }

                    devices_fc["features"].append(
                        {
                            "type": "Feature",
                            "geometry": {"type": "Point", "coordinates": [float(lon), float(lat)]},
                            "properties": props,
                        }
                    )

                # ---- Edges (include peers even if they don't match the node filters) ----
                edge_qp = {}
                edge_conds = self._build_edge_filter(params_in, edge_qp)

                base = (
                    "MATCH (a:VPNNode)-[r:TUNNEL]->(b:VPNNode) "
                    "WHERE a.lat IS NOT NULL AND a.lon IS NOT NULL AND b.lat IS NOT NULL AND b.lon IS NOT NULL "
                )
                if node_ids:
                    base += "AND (a.id IN $node_ids OR b.id IN $node_ids) "
                    edge_qp["node_ids"] = list(node_ids)
                if edge_conds:
                    base += "AND " + " AND ".join(edge_conds) + " "
                edge_query = base + "RETURN a AS a, b AS b, r AS r"

                logger.debug("Edge query: %s params=%s", edge_query, edge_qp)
                nodes_map = {f["properties"]["id"]: f for f in devices_fc["features"]}
                for rec in session.run(edge_query, edge_qp):
                    aprops = dict(rec["a"])  # node a properties
                    bprops = dict(rec["b"])  # node b properties
                    rprops = dict(rec["r"])  # relationship properties

                    # Ensure endpoints exist in devices_fc
                    for np in (aprops, bprops):
                        nid = np.get("id")
                        if not nid or nid in nodes_map:
                            continue
                        lat = np.get("lat")
                        if lat is None:
                            lat = np.get("latitude")
                        lon = np.get("lon")
                        if lon is None:
                            lon = np.get("longitude")
                        if lat is None or lon is None:
                            continue
                        props = {
                            "id": nid,
                            "name": np.get("name") or np.get("label") or "",
                            "status": np.get("status") or "unknown",
                            "role": np.get("role"),
                            "platform": np.get("platform_name"),
                            "country": np.get("country"),
                            "location": np.get("location_name"),
                            "is_ha_pair": bool(np.get("is_ha_pair")),
                            "device_names": np.get("device_names") or [],
                            "nautobot_device_pks": np.get("nautobot_device_pks") or [],
                            "search_text": " ".join(
                                str(x)
                                for x in [
                                    np.get("name") or np.get("label"),
                                    np.get("role"),
                                    np.get("platform_name"),
                                    np.get("country"),
                                    np.get("location_name"),
                                ]
                                if x
                            ),
                        }
                        feat = {
                            "type": "Feature",
                            "geometry": {
                                "type": "Point",
                                "coordinates": [float(lon), float(lat)],
                            },
                            "properties": props,
                        }
                        devices_fc["features"].append(feat)
                        nodes_map[nid] = feat

                    # Add tunnel feature
                    a_lon = aprops.get("lon") if aprops.get("lon") is not None else aprops.get("longitude")
                    a_lat = aprops.get("lat") if aprops.get("lat") is not None else aprops.get("latitude")
                    b_lon = bprops.get("lon") if bprops.get("lon") is not None else bprops.get("longitude")
                    b_lat = bprops.get("lat") if bprops.get("lat") is not None else bprops.get("latitude")
                    if a_lon is None or a_lat is None or b_lon is None or b_lat is None:
                        continue
                    tunnels_fc["features"].append(
                        {
                            "type": "Feature",
                            "geometry": {
                                "type": "LineString",
                                "coordinates": [
                                    [float(a_lon), float(a_lat)],
                                    [float(b_lon), float(b_lat)],
                                ],
                            },
                            "properties": {
                                "name": rprops.get("label") or rprops.get("id") or "",
                                "status": rprops.get("status") or "unknown",
                                "role": rprops.get("role") or "",
                                "ike_version": rprops.get("ike_version") or "",
                                "scope": rprops.get("scope") or "",
                                "local_ip": rprops.get("local_ip") or "",
                                "peer_ip": rprops.get("peer_ip") or "",
                                "firewall_hostnames": rprops.get("firewall_hostnames") or "",
                                "tooltip": rprops.get("tooltip_details_json") or rprops.get("tooltip") or "",
                            },
                        }
                    )

            # ---- Stats (by device status) ----
            stats = {}
            for f in devices_fc["features"]:
                s = (f["properties"].get("status") or "unknown").lower()
                stats[s] = stats.get(s, 0) + 1

            # ---- Meta for ribbon ----
            countries = set()
            platforms = set()
            ha_pairs = 0
            for f in devices_fc["features"]:
                p = f["properties"] or {}
                if p.get("country"):
                    countries.add(p["country"])
                if p.get("platform"):
                    platforms.add(p["platform"])
                if p.get("is_ha_pair"):
                    ha_pairs += 1

            last_synced_iso = None
            last_sync_status = None
            try:
                dash = VPNDashboard.objects.filter(pk=1).only("last_sync_time", "last_sync_status").first()
                if dash and dash.last_sync_time:
                    last_synced_iso = dash.last_sync_time.isoformat()
                if dash and dash.last_sync_status:
                    last_sync_status = dash.last_sync_status
            except DatabaseError as db_error:
                logger.debug(
                    "Unable to load VPNDashboard sync metadata due to database error: %s",
                    db_error,
                    exc_info=True,
                )

            meta = {
                "devices_count": len(devices_fc["features"]),
                "tunnels_count": len(tunnels_fc["features"]),
                "countries_count": len(countries),
                "platforms_count": len(platforms),
                "ha_pairs": ha_pairs,
                "last_synced": last_synced_iso,
                "last_sync_status": last_sync_status,
                "status_counts": status_counts,
                "status_labels": status_labels,
                "status_order": status_order,
                "role_counts": role_counts,
                "role_labels": role_labels,
                "role_order": role_order,
                "total_tunnels": total_tunnels,
                "total_primary_tunnels": role_counts.get("primary", 0),
                "total_secondary_tunnels": role_counts.get("secondary", 0),
                "total_tertiary_tunnels": role_counts.get("tertiary", 0),
                "total_unassigned_tunnels": role_counts.get("unassigned", 0),
            }

            return Response({"devices": devices_fc, "tunnels": tunnels_fc, "stats": stats, "meta": meta})

        except neo4j_exceptions.CypherSyntaxError as e:  # pylint: disable=broad-exception-caught
            logger.error("Neo4j Cypher Syntax Error in VPNTopologyNeo4jView: %s", e, exc_info=True)
            return Response({"error": "Error querying graph database (query syntax problem)."}, status=500)
        except neo4j_exceptions.ServiceUnavailable:
            logger.error("Neo4j Service Unavailable during VPN topology query.", exc_info=True)
            return Response({"error": "Graph database service unavailable during query."}, status=503)
        except Exception as exc:  # pylint: disable=broad-exception-caught
            logger.error("Error querying or processing data from Neo4j in VPNTopologyNeo4jView: %s", exc, exc_info=True)
            return Response({"error": "Could not retrieve topology data from graph database."}, status=500)
        finally:
            if driver:
                driver.close()

get(request)

Return VPN topology GeoJSON and summary metadata sourced from Neo4j.

Source code in nautobot_app_vpn/api/viewsets.py
def get(self, request):
    """Return VPN topology GeoJSON and summary metadata sourced from Neo4j."""
    logger.info("Neo4j VPN Topology GET request from user %s with filters: %s", request.user, request.GET.dict())

    # Settings check
    for attr in ("NEO4J_URI", "NEO4J_USER", "NEO4J_PASSWORD"):
        if not hasattr(settings, attr):
            logger.error("Neo4j connection settings are not fully configured in Nautobot settings.")
            return Response({"error": "Graph database service is not configured."}, status=503)

    driver = None
    try:
        driver = _neo4j_driver()
        driver.verify_connectivity()
    except Exception as exc:  # pylint: disable=broad-exception-caught
        logger.error("Failed to connect to Neo4j for topology view: %s", exc, exc_info=True)
        return Response({"error": "Could not connect to graph database."}, status=503)

    params_in = request.GET.dict()

    # --- Relational statistics for ribbon summary ---
    tracked_status = [
        ("active", "Active"),
        ("down", "Down"),
        ("decommissioned", "Decommissioned"),
        ("disabled", "Disabled"),
        ("planned", "Planned"),
    ]
    status_counts = {slug: 0 for slug, _ in tracked_status}
    status_labels = dict(tracked_status)
    status_order = [slug for slug, _ in tracked_status]

    role_labels = {
        "primary": "Primary",
        "secondary": "Secondary",
        "tertiary": "Tertiary",
        "unassigned": "Unassigned",
    }
    role_counts = {key: 0 for key in role_labels}
    role_order = ["primary", "secondary", "tertiary"]
    total_tunnels = 0

    try:
        tunnels_qs = IPSECTunnel.objects.restrict(request.user, "view")

        status_filter = (params_in.get("status") or "").strip()
        role_filter = (params_in.get("role") or "").strip()

        status_model = apps.get_model("extras", "Status")
        status_fields = {f.name for f in status_model._meta.get_fields()}
        has_status_slug = "slug" in status_fields

        if status_filter:
            status_lookup = Q(status__name__iexact=status_filter)
            if has_status_slug:
                status_lookup |= Q(status__slug__iexact=status_filter)
            tunnels_qs = tunnels_qs.filter(status_lookup)

        if role_filter:
            tunnels_qs = tunnels_qs.filter(role__iexact=role_filter)

        status_value_fields = ["status__name"]
        if has_status_slug:
            status_value_fields.append("status__slug")

        for row in tunnels_qs.values(*status_value_fields).annotate(total=Count("id")):
            status_name = (row.get("status__name") or "").strip()
            raw_key = row.get("status__slug") if has_status_slug else status_name
            slug_key = (raw_key or status_name or "unknown").strip().lower().replace(" ", "-")
            if not slug_key:
                slug_key = "unknown"
            status_counts[slug_key] = row["total"]
            status_labels[slug_key] = status_name or status_labels.get(slug_key, slug_key.title())
            if slug_key not in status_order:
                status_order.append(slug_key)

        for row in tunnels_qs.values("role").annotate(total=Count("id")):
            role_value = (row["role"] or "unassigned").lower()
            role_counts[role_value] = row["total"]
            if role_value not in role_order:
                role_order.append(role_value)

        total_tunnels = tunnels_qs.count()
    except (DatabaseError, LookupError) as agg_exc:
        logger.error("Failed to compute relational tunnel statistics: %s", agg_exc, exc_info=True)

    qp = {}

    node_where = self._build_node_where(params_in, qp)
    node_query = "MATCH (n:VPNNode)"
    if node_where:
        node_query += " WHERE " + " AND ".join(node_where)
    node_query += " RETURN n"

    try:
        devices_fc = {"type": "FeatureCollection", "features": []}
        tunnels_fc = {"type": "FeatureCollection", "features": []}

        with driver.session(database=getattr(settings, "NEO4J_DATABASE", "neo4j")) as session:
            # ---- Nodes
            logger.debug("Node query: %s params=%s", node_query, qp)
            node_records = session.run(node_query, qp)

            node_ids = set()
            for rec in node_records:
                nprops = dict(rec["n"])
                node_id = nprops.get("id")
                if not node_id:
                    continue
                # accept several possible coord keys
                lat = nprops.get("lat", nprops.get("latitude"))
                lon = nprops.get("lon", nprops.get("longitude"))
                if lat is None or lon is None:
                    # skip nodes without geo
                    continue

                node_ids.add(node_id)
                props = {
                    "id": node_id,
                    "name": nprops.get("name") or nprops.get("label") or "",
                    "status": nprops.get("status") or "unknown",
                    "role": nprops.get("role"),
                    "platform": nprops.get("platform_name"),
                    "country": nprops.get("country"),
                    "location": nprops.get("location_name"),
                    "is_ha_pair": bool(nprops.get("is_ha_pair")),
                    # Include backing device info to make device filter work with HA groups
                    "device_names": nprops.get("device_names") or [],
                    "nautobot_device_pks": nprops.get("nautobot_device_pks") or [],
                    "search_text": " ".join(
                        str(x)
                        for x in [
                            nprops.get("name") or nprops.get("label"),
                            nprops.get("role"),
                            nprops.get("platform_name"),
                            nprops.get("country"),
                            nprops.get("location_name"),
                        ]
                        if x
                    ),
                }

                devices_fc["features"].append(
                    {
                        "type": "Feature",
                        "geometry": {"type": "Point", "coordinates": [float(lon), float(lat)]},
                        "properties": props,
                    }
                )

            # ---- Edges (include peers even if they don't match the node filters) ----
            edge_qp = {}
            edge_conds = self._build_edge_filter(params_in, edge_qp)

            base = (
                "MATCH (a:VPNNode)-[r:TUNNEL]->(b:VPNNode) "
                "WHERE a.lat IS NOT NULL AND a.lon IS NOT NULL AND b.lat IS NOT NULL AND b.lon IS NOT NULL "
            )
            if node_ids:
                base += "AND (a.id IN $node_ids OR b.id IN $node_ids) "
                edge_qp["node_ids"] = list(node_ids)
            if edge_conds:
                base += "AND " + " AND ".join(edge_conds) + " "
            edge_query = base + "RETURN a AS a, b AS b, r AS r"

            logger.debug("Edge query: %s params=%s", edge_query, edge_qp)
            nodes_map = {f["properties"]["id"]: f for f in devices_fc["features"]}
            for rec in session.run(edge_query, edge_qp):
                aprops = dict(rec["a"])  # node a properties
                bprops = dict(rec["b"])  # node b properties
                rprops = dict(rec["r"])  # relationship properties

                # Ensure endpoints exist in devices_fc
                for np in (aprops, bprops):
                    nid = np.get("id")
                    if not nid or nid in nodes_map:
                        continue
                    lat = np.get("lat")
                    if lat is None:
                        lat = np.get("latitude")
                    lon = np.get("lon")
                    if lon is None:
                        lon = np.get("longitude")
                    if lat is None or lon is None:
                        continue
                    props = {
                        "id": nid,
                        "name": np.get("name") or np.get("label") or "",
                        "status": np.get("status") or "unknown",
                        "role": np.get("role"),
                        "platform": np.get("platform_name"),
                        "country": np.get("country"),
                        "location": np.get("location_name"),
                        "is_ha_pair": bool(np.get("is_ha_pair")),
                        "device_names": np.get("device_names") or [],
                        "nautobot_device_pks": np.get("nautobot_device_pks") or [],
                        "search_text": " ".join(
                            str(x)
                            for x in [
                                np.get("name") or np.get("label"),
                                np.get("role"),
                                np.get("platform_name"),
                                np.get("country"),
                                np.get("location_name"),
                            ]
                            if x
                        ),
                    }
                    feat = {
                        "type": "Feature",
                        "geometry": {
                            "type": "Point",
                            "coordinates": [float(lon), float(lat)],
                        },
                        "properties": props,
                    }
                    devices_fc["features"].append(feat)
                    nodes_map[nid] = feat

                # Add tunnel feature
                a_lon = aprops.get("lon") if aprops.get("lon") is not None else aprops.get("longitude")
                a_lat = aprops.get("lat") if aprops.get("lat") is not None else aprops.get("latitude")
                b_lon = bprops.get("lon") if bprops.get("lon") is not None else bprops.get("longitude")
                b_lat = bprops.get("lat") if bprops.get("lat") is not None else bprops.get("latitude")
                if a_lon is None or a_lat is None or b_lon is None or b_lat is None:
                    continue
                tunnels_fc["features"].append(
                    {
                        "type": "Feature",
                        "geometry": {
                            "type": "LineString",
                            "coordinates": [
                                [float(a_lon), float(a_lat)],
                                [float(b_lon), float(b_lat)],
                            ],
                        },
                        "properties": {
                            "name": rprops.get("label") or rprops.get("id") or "",
                            "status": rprops.get("status") or "unknown",
                            "role": rprops.get("role") or "",
                            "ike_version": rprops.get("ike_version") or "",
                            "scope": rprops.get("scope") or "",
                            "local_ip": rprops.get("local_ip") or "",
                            "peer_ip": rprops.get("peer_ip") or "",
                            "firewall_hostnames": rprops.get("firewall_hostnames") or "",
                            "tooltip": rprops.get("tooltip_details_json") or rprops.get("tooltip") or "",
                        },
                    }
                )

        # ---- Stats (by device status) ----
        stats = {}
        for f in devices_fc["features"]:
            s = (f["properties"].get("status") or "unknown").lower()
            stats[s] = stats.get(s, 0) + 1

        # ---- Meta for ribbon ----
        countries = set()
        platforms = set()
        ha_pairs = 0
        for f in devices_fc["features"]:
            p = f["properties"] or {}
            if p.get("country"):
                countries.add(p["country"])
            if p.get("platform"):
                platforms.add(p["platform"])
            if p.get("is_ha_pair"):
                ha_pairs += 1

        last_synced_iso = None
        last_sync_status = None
        try:
            dash = VPNDashboard.objects.filter(pk=1).only("last_sync_time", "last_sync_status").first()
            if dash and dash.last_sync_time:
                last_synced_iso = dash.last_sync_time.isoformat()
            if dash and dash.last_sync_status:
                last_sync_status = dash.last_sync_status
        except DatabaseError as db_error:
            logger.debug(
                "Unable to load VPNDashboard sync metadata due to database error: %s",
                db_error,
                exc_info=True,
            )

        meta = {
            "devices_count": len(devices_fc["features"]),
            "tunnels_count": len(tunnels_fc["features"]),
            "countries_count": len(countries),
            "platforms_count": len(platforms),
            "ha_pairs": ha_pairs,
            "last_synced": last_synced_iso,
            "last_sync_status": last_sync_status,
            "status_counts": status_counts,
            "status_labels": status_labels,
            "status_order": status_order,
            "role_counts": role_counts,
            "role_labels": role_labels,
            "role_order": role_order,
            "total_tunnels": total_tunnels,
            "total_primary_tunnels": role_counts.get("primary", 0),
            "total_secondary_tunnels": role_counts.get("secondary", 0),
            "total_tertiary_tunnels": role_counts.get("tertiary", 0),
            "total_unassigned_tunnels": role_counts.get("unassigned", 0),
        }

        return Response({"devices": devices_fc, "tunnels": tunnels_fc, "stats": stats, "meta": meta})

    except neo4j_exceptions.CypherSyntaxError as e:  # pylint: disable=broad-exception-caught
        logger.error("Neo4j Cypher Syntax Error in VPNTopologyNeo4jView: %s", e, exc_info=True)
        return Response({"error": "Error querying graph database (query syntax problem)."}, status=500)
    except neo4j_exceptions.ServiceUnavailable:
        logger.error("Neo4j Service Unavailable during VPN topology query.", exc_info=True)
        return Response({"error": "Graph database service unavailable during query."}, status=503)
    except Exception as exc:  # pylint: disable=broad-exception-caught
        logger.error("Error querying or processing data from Neo4j in VPNTopologyNeo4jView: %s", exc, exc_info=True)
        return Response({"error": "Could not retrieve topology data from graph database."}, status=500)
    finally:
        if driver:
            driver.close()

pagination

Custom pagination classes for the Nautobot VPN plugin API.

LargeResultsSetPagination

Bases: PageNumberPagination

Large pagination class for bulk API requests.

  • Used for bulk exports or high-performance endpoints.
Source code in nautobot_app_vpn/api/pagination.py
class LargeResultsSetPagination(PageNumberPagination):
    """Large pagination class for bulk API requests.

    - Used for bulk exports or high-performance endpoints.
    """

    page_size = 100  # ✅ Larger page size for bulk operations
    page_size_query_param = "page_size"
    max_page_size = 500  # ✅ Allows exporting up to 500 records per request

SmallResultsSetPagination

Bases: PageNumberPagination

Smaller pagination for lightweight API endpoints.

  • Useful for quick-loading small lists.
Source code in nautobot_app_vpn/api/pagination.py
class SmallResultsSetPagination(PageNumberPagination):
    """Smaller pagination for lightweight API endpoints.

    - Useful for quick-loading small lists.
    """

    page_size = 5  # ✅ Minimal results for quick API calls
    page_size_query_param = "page_size"
    max_page_size = 50  # ✅ Ensures no overload

StandardResultsSetPagination

Bases: PageNumberPagination

Standard pagination for API endpoints.

  • Supports dynamic page sizes via ?page_size=X
  • Prevents excessive page loads with max_page_size=100
  • Defaults to 25 results per page for a better balance of performance and usability.
Source code in nautobot_app_vpn/api/pagination.py
class StandardResultsSetPagination(PageNumberPagination):
    """Standard pagination for API endpoints.

    - Supports dynamic page sizes via `?page_size=X`
    - Prevents excessive page loads with `max_page_size=100`
    - Defaults to 25 results per page
      for a better balance of performance and usability.
    """

    page_size = 25
    page_size_query_param = "page_size"
    max_page_size = 200
    last_page_strings = ("last",)

permissions

Custom API permissions for the Nautobot VPN app.

IsAdminOrReadOnly

Bases: BasePermission

Allow only staff/superusers to modify data. Read-only access is allowed for everyone.

Source code in nautobot_app_vpn/api/permissions.py
class IsAdminOrReadOnly(BasePermission):
    """Allow only staff/superusers to modify data.
    Read-only access is allowed for everyone.
    """

    def has_permission(self, request, view):
        if request.method in SAFE_METHODS:
            return True
        return request.user and (request.user.is_staff or request.user.is_superuser)

IsAuthenticatedOrAdmin

Bases: BasePermission

Allow authenticated users to write. Anonymous users get read-only access.

Source code in nautobot_app_vpn/api/permissions.py
class IsAuthenticatedOrAdmin(BasePermission):
    """Allow authenticated users to write.
    Anonymous users get read-only access.
    """

    def has_permission(self, request, view):
        if request.method in SAFE_METHODS:
            return True
        return request.user and request.user.is_authenticated

IsOwnerOrAdmin

Bases: BasePermission

Allow users to modify only objects they own. Admins have full access. Assumes obj.created_by exists; otherwise, denies write access.

Source code in nautobot_app_vpn/api/permissions.py
class IsOwnerOrAdmin(BasePermission):
    """Allow users to modify only objects they own.
    Admins have full access.
    Assumes `obj.created_by` exists; otherwise, denies write access.
    """

    def has_object_permission(self, request, view, obj):
        if request.method in SAFE_METHODS:
            return True

        # Fall back to read-only if `created_by` is not defined
        owner = getattr(obj, "created_by", None)
        if owner is None:
            return False

        return request.user.is_superuser or owner == request.user

serializers

Serializers for Nautobot VPN Plugin.

AuthenticationAlgorithmSerializer

Bases: ModelSerializer

Serializer for AuthenticationAlgorithm model.

Source code in nautobot_app_vpn/api/serializers.py
class AuthenticationAlgorithmSerializer(serializers.ModelSerializer):
    """Serializer for AuthenticationAlgorithm model."""

    display = serializers.CharField(source="label", read_only=True)

    class Meta:
        model = AuthenticationAlgorithm
        fields = ["id", "code", "label", "display"]

DiffieHellmanGroupSerializer

Bases: ModelSerializer

Serializer for DiffieHellmanGroup model.

Source code in nautobot_app_vpn/api/serializers.py
class DiffieHellmanGroupSerializer(serializers.ModelSerializer):
    """Serializer for DiffieHellmanGroup model."""

    display = serializers.CharField(source="label", read_only=True)

    class Meta:
        model = DiffieHellmanGroup
        fields = ["id", "code", "label", "display"]

DummySerializer

Bases: Serializer

Dummy serializer placeholder.

Source code in nautobot_app_vpn/api/serializers.py
class DummySerializer(serializers.Serializer):
    """Dummy serializer placeholder."""

    dummy = serializers.CharField()

    def create(self, validated_data):
        return validated_data

    def update(self, instance, validated_data):
        return validated_data

EncryptionAlgorithmSerializer

Bases: ModelSerializer

Serializer for EncryptionAlgorithm model.

Source code in nautobot_app_vpn/api/serializers.py
class EncryptionAlgorithmSerializer(serializers.ModelSerializer):
    """Serializer for EncryptionAlgorithm model."""

    display = serializers.CharField(source="label", read_only=True)

    class Meta:
        model = EncryptionAlgorithm
        fields = ["id", "code", "label", "display"]

IKECryptoSerializer

Bases: BaseModelSerializer

Serializer for IKECrypto objects.

Source code in nautobot_app_vpn/api/serializers.py
class IKECryptoSerializer(BaseModelSerializer):
    """Serializer for IKECrypto objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:ikecrypto-detail")
    tenant_group = VPNNestedTenantGroupSerializer(read_only=True, required=False, allow_null=True)
    tenant_group_id = serializers.PrimaryKeyRelatedField(
        queryset=TenantGroup.objects.all(),
        source="tenant_group",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant Group",
    )
    tenant = VPNNestedTenantSerializer(read_only=True, required=False, allow_null=True)
    tenant_id = serializers.PrimaryKeyRelatedField(
        queryset=Tenant.objects.all(),
        source="tenant",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant",
    )
    status = VPNNestedStatusSerializer(required=False, allow_null=True, read_only=True)
    status_id = serializers.PrimaryKeyRelatedField(
        queryset=Status.objects.all(), source="status", write_only=True, required=False, allow_null=True, label="Status"
    )
    dh_group = serializers.PrimaryKeyRelatedField(
        queryset=DiffieHellmanGroup.objects.all(), many=True, required=False, label="Diffie-Hellman Groups"
    )
    encryption = serializers.PrimaryKeyRelatedField(
        queryset=EncryptionAlgorithm.objects.all(), many=True, required=False, label="Encryption Algorithms"
    )
    authentication = serializers.PrimaryKeyRelatedField(
        queryset=AuthenticationAlgorithm.objects.all(), many=True, required=False, label="Authentication Algorithms"
    )

    class Meta:
        model = IKECrypto
        fields = [
            "id",
            "display",
            "url",
            "name",
            "dh_group",
            "encryption",
            "authentication",
            "lifetime",
            "lifetime_unit",
            "status",
            "status_id",
            "description",
            "tenant_group",
            "tenant_group_id",
            "tenant",
            "tenant_id",
            "created",
            "last_updated",
        ]
        read_only_fields = ["id", "display", "url", "status", "created", "last_updated"]

IKEGatewaySerializer

Bases: BaseModelSerializer

Serializer for IKEGateway objects.

Source code in nautobot_app_vpn/api/serializers.py
class IKEGatewaySerializer(BaseModelSerializer):
    """Serializer for IKEGateway objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:ikegateway-detail")

    # Read-only Nested Representations
    tenant_group = VPNNestedTenantGroupSerializer(read_only=True, required=False, allow_null=True)
    tenant = VPNNestedTenantSerializer(read_only=True, required=False, allow_null=True)
    local_devices = VPNNestedDeviceSerializer(many=True, read_only=True)
    peer_devices = VPNNestedDeviceSerializer(many=True, read_only=True, required=False)
    local_locations = VPNNestedLocationSerializer(many=True, read_only=True, required=False)
    peer_locations = VPNNestedLocationSerializer(many=True, read_only=True, required=False)
    ike_crypto_profile = VPNNestedIKECryptoSerializer(read_only=True, required=False, allow_null=True)
    status = VPNNestedStatusSerializer(read_only=True, required=False, allow_null=True)
    bind_interface = VPNNestedInterfaceSerializer(read_only=True, required=False, allow_null=True)
    local_platform = VPNNestedPlatformSerializer(read_only=True, required=False, allow_null=True)
    peer_platform = VPNNestedPlatformSerializer(read_only=True, required=False, allow_null=True)

    # Writeable Related Field Selectors
    tenant_group_id = serializers.PrimaryKeyRelatedField(
        queryset=TenantGroup.objects.all(),
        source="tenant_group",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant Group",
    )
    tenant_id = serializers.PrimaryKeyRelatedField(
        queryset=Tenant.objects.all(),
        source="tenant",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant",
    )
    local_device_ids = serializers.PrimaryKeyRelatedField(
        queryset=Device.objects.all(),
        source="local_devices",
        many=True,
        write_only=True,
        required=True,
        label="Local Devices (IDs)",
    )
    peer_device_ids = serializers.PrimaryKeyRelatedField(
        queryset=Device.objects.all(),
        source="peer_devices",
        many=True,
        write_only=True,
        required=False,
        label="Peer Devices (IDs)",
    )
    local_location_ids = serializers.PrimaryKeyRelatedField(
        queryset=Location.objects.all(),
        source="local_locations",
        many=True,
        write_only=True,
        required=False,
        label="Local Locations (IDs)",
    )
    peer_location_ids = serializers.PrimaryKeyRelatedField(
        queryset=Location.objects.all(),
        source="peer_locations",
        many=True,
        write_only=True,
        required=False,
        label="Peer Locations (IDs)",
    )
    ike_crypto_profile_id = serializers.PrimaryKeyRelatedField(
        queryset=IKECrypto.objects.all(),
        source="ike_crypto_profile",
        write_only=True,
        required=True,
        allow_null=False,
        label="IKE Crypto Profile",
    )
    status_id = serializers.PrimaryKeyRelatedField(
        queryset=Status.objects.all(), source="status", write_only=True, required=False, allow_null=True, label="Status"
    )

    bind_interface_id = serializers.PrimaryKeyRelatedField(
        queryset=Interface.objects.all(),
        source="bind_interface",
        write_only=True,
        required=False,
        allow_null=True,
        label="Bind Interface (ID)",
    )
    local_platform_id = serializers.PrimaryKeyRelatedField(
        queryset=Platform.objects.all(),
        source="local_platform",
        write_only=True,
        required=False,
        allow_null=True,
        label="Local Platform (ID)",
    )
    peer_platform_id = serializers.PrimaryKeyRelatedField(
        queryset=Platform.objects.all(),
        source="peer_platform",
        write_only=True,
        required=False,
        allow_null=True,
        label="Peer Platform (ID)",
    )

    # Choice Fields
    ike_version = ChoiceField(choices=IKEVersions.choices, required=False)
    exchange_mode = ChoiceField(choices=IKEExchangeModes.choices, required=False)
    local_ip_type = ChoiceField(choices=IPAddressTypes.choices, required=False)
    peer_ip_type = ChoiceField(choices=IPAddressTypes.choices, required=False)
    local_id_type = ChoiceField(choices=IdentificationTypes.choices, required=False, allow_null=True)
    peer_id_type = ChoiceField(choices=IdentificationTypes.choices, required=False, allow_null=True)
    authentication_type = ChoiceField(choices=IKEAuthenticationTypes.choices, required=True, allow_null=False)
    name = serializers.CharField(required=True, allow_blank=False)

    # Other Fields
    pre_shared_key = serializers.CharField(
        write_only=True, required=False, allow_blank=True, allow_null=True, style={"input_type": "password"}
    )

    class Meta:
        model = IKEGateway
        fields = [
            "id",
            "display",
            "url",
            "name",
            "description",
            "tenant_group",
            "tenant_group_id",
            "tenant",
            "tenant_id",
            "ike_version",
            "exchange_mode",
            "local_ip_type",
            "local_ip",
            "local_devices",
            "local_device_ids",
            "local_locations",
            "local_location_ids",
            "local_platform",
            "local_platform_id",
            "local_id_type",
            "local_id_value",
            "peer_ip_type",
            "peer_ip",
            "peer_devices",
            "peer_device_ids",
            "peer_device_manual",
            "peer_locations",
            "peer_location_ids",
            "peer_location_manual",
            "peer_platform",
            "peer_platform_id",
            "peer_id_type",
            "peer_id_value",
            "authentication_type",
            "pre_shared_key",
            "ike_crypto_profile",
            "ike_crypto_profile_id",
            "bind_interface",
            "bind_interface_id",
            "enable_passive_mode",
            "enable_nat_traversal",
            "enable_dpd",
            "dpd_interval",
            "dpd_retry",
            "liveness_check_interval",
            "status",
            "status_id",
            "last_sync",
            "created",
            "last_updated",
        ]

        read_only_fields = [
            "id",
            "display",
            "url",
            "tenant_group",
            "tenant",
            "local_devices",
            "peer_devices",
            "local_locations",
            "peer_locations",
            "ike_crypto_profile",
            "status",
            "bind_interface",
            "local_platform",
            "peer_platform",
            "created",
            "last_updated",
            "last_sync",
        ]

    def validate(self, data):  # pylint: disable=arguments-renamed
        """Custom validation for IKEGateway serializer."""
        peer_locations = data.get("peer_locations")
        peer_location_manual = data.get("peer_location_manual")
        if peer_locations and peer_location_manual:
            raise serializers.ValidationError("Specify Peer Locations *or* Manual Peer Location, not both.")
        bind_iface_id = data.get("bind_interface")
        local_device_ids = data.get("local_devices")
        if bind_iface_id and local_device_ids:
            try:
                bind_iface_obj = Interface.objects.select_related("device").get(pk=bind_iface_id.pk)
                if bind_iface_obj.device.pk not in [dev.pk for dev in local_device_ids]:
                    raise serializers.ValidationError(
                        {
                            "bind_interface_id": "Selected Bind Interface must belong to one of the selected Local Devices."
                        }
                    )
            except Interface.DoesNotExist as exc:
                raise serializers.ValidationError({"bind_interface_id": "Invalid Bind Interface selected."}) from exc
        return data
validate(data)

Custom validation for IKEGateway serializer.

Source code in nautobot_app_vpn/api/serializers.py
def validate(self, data):  # pylint: disable=arguments-renamed
    """Custom validation for IKEGateway serializer."""
    peer_locations = data.get("peer_locations")
    peer_location_manual = data.get("peer_location_manual")
    if peer_locations and peer_location_manual:
        raise serializers.ValidationError("Specify Peer Locations *or* Manual Peer Location, not both.")
    bind_iface_id = data.get("bind_interface")
    local_device_ids = data.get("local_devices")
    if bind_iface_id and local_device_ids:
        try:
            bind_iface_obj = Interface.objects.select_related("device").get(pk=bind_iface_id.pk)
            if bind_iface_obj.device.pk not in [dev.pk for dev in local_device_ids]:
                raise serializers.ValidationError(
                    {
                        "bind_interface_id": "Selected Bind Interface must belong to one of the selected Local Devices."
                    }
                )
        except Interface.DoesNotExist as exc:
            raise serializers.ValidationError({"bind_interface_id": "Invalid Bind Interface selected."}) from exc
    return data

IPSECTunnelSerializer

Bases: BaseModelSerializer

Serializer for IPSECTunnel objects.

Source code in nautobot_app_vpn/api/serializers.py
class IPSECTunnelSerializer(BaseModelSerializer):
    """Serializer for IPSECTunnel objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:ipsectunnel-detail")

    tenant_group = VPNNestedTenantGroupSerializer(read_only=True, required=False, allow_null=True)
    tenant = VPNNestedTenantSerializer(read_only=True, required=False, allow_null=True)
    devices = VPNNestedDeviceSerializer(many=True, read_only=True)
    ike_gateway = VPNNestedIKEGatewaySerializer(read_only=True, required=False, allow_null=True)
    ipsec_crypto_profile = VPNNestedIPSecCryptoSerializer(read_only=True, required=False, allow_null=True)
    status = VPNNestedStatusSerializer(read_only=True, required=False, allow_null=True)
    tunnel_interface = VPNNestedInterfaceSerializer(read_only=True, required=False, allow_null=True)
    monitor_profile = VPNNestedTunnelMonitorProfileSerializer(read_only=True, required=False, allow_null=True)
    proxy_ids = IPSecProxyIDSerializer(many=True, read_only=True)
    role = ChoiceField(choices=TunnelRoleChoices.choices, required=False, allow_null=True)

    device_ids = serializers.PrimaryKeyRelatedField(
        queryset=Device.objects.all(),
        source="devices",
        many=True,
        write_only=True,
        required=True,
        label="Devices (IDs)",
    )
    tenant_group_id = serializers.PrimaryKeyRelatedField(
        queryset=TenantGroup.objects.all(),
        source="tenant_group",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant Group",
    )
    tenant_id = serializers.PrimaryKeyRelatedField(
        queryset=Tenant.objects.all(),
        source="tenant",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant",
    )
    ike_gateway_id = serializers.PrimaryKeyRelatedField(
        queryset=IKEGateway.objects.all(),
        source="ike_gateway",
        write_only=True,
        required=True,
        allow_null=False,
        label="IKE Gateway",
    )
    ipsec_crypto_profile_id = serializers.PrimaryKeyRelatedField(
        queryset=IPSecCrypto.objects.all(),
        source="ipsec_crypto_profile",
        write_only=True,
        required=True,
        allow_null=False,
        label="IPSec Crypto Profile",
    )
    status_id = serializers.PrimaryKeyRelatedField(
        queryset=Status.objects.all(), source="status", write_only=True, required=False, allow_null=True, label="Status"
    )
    tunnel_interface_id = serializers.PrimaryKeyRelatedField(
        queryset=Interface.objects.all(),
        source="tunnel_interface",
        write_only=True,
        required=True,
        allow_null=False,
        label="Tunnel Interface",
    )

    monitor_profile_id = serializers.PrimaryKeyRelatedField(
        queryset=TunnelMonitorProfile.objects.all(),
        source="monitor_profile",
        write_only=True,
        required=False,
        allow_null=True,
        label="Monitor Profile (ID)",
    )

    class Meta:
        model = IPSECTunnel
        fields = [
            "id",
            "display",
            "url",
            "name",
            "description",
            "tenant_group",
            "tenant_group_id",
            "tenant",
            "tenant_id",
            "devices",
            "device_ids",
            "ike_gateway",
            "ike_gateway_id",
            "ipsec_crypto_profile",
            "ipsec_crypto_profile_id",
            "tunnel_interface",
            "tunnel_interface_id",
            "role",
            "proxy_ids",
            "enable_tunnel_monitor",
            "monitor_destination_ip",
            "monitor_profile",
            "monitor_profile_id",
            "status",
            "status_id",
            "last_sync",
            "created",
            "last_updated",
        ]

        read_only_fields = [
            "id",
            "display",
            "url",
            "tenant_group",
            "tenant",
            "devices",
            "ike_gateway",
            "ipsec_crypto_profile",
            "status",
            "tunnel_interface",
            "monitor_profile",
            "proxy_ids",
            "created",
            "last_updated",
            "last_sync",
        ]

    def validate(self, data):  # pylint: disable=arguments-renamed
        """Custom validation for IPSECTunnel serializer."""
        monitor_enabled = data.get(
            "enable_tunnel_monitor", getattr(self.instance, "enable_tunnel_monitor", False) if self.instance else False
        )
        dest_ip = data.get(
            "monitor_destination_ip", getattr(self.instance, "monitor_destination_ip", None) if self.instance else None
        )
        profile_id_submitted = "monitor_profile_id" in data

        if monitor_enabled:
            if not dest_ip:
                raise serializers.ValidationError(
                    {"monitor_destination_ip": "Destination IP is required when tunnel monitoring is enabled."}
                )

            if profile_id_submitted and data.get("monitor_profile_id") is None:
                raise serializers.ValidationError(
                    {"monitor_profile_id": "Monitor Profile is required when tunnel monitoring is enabled."}
                )

            if not self.instance and "monitor_profile_id" not in data:
                raise serializers.ValidationError(
                    {"monitor_profile_id": "Monitor Profile is required when tunnel monitoring is enabled."}
                )

            if self.instance and profile_id_submitted and data.get("monitor_profile_id") is None and monitor_enabled:
                raise serializers.ValidationError(
                    {"monitor_profile_id": "Cannot remove Monitor Profile while tunnel monitoring is enabled."}
                )
        return data
validate(data)

Custom validation for IPSECTunnel serializer.

Source code in nautobot_app_vpn/api/serializers.py
def validate(self, data):  # pylint: disable=arguments-renamed
    """Custom validation for IPSECTunnel serializer."""
    monitor_enabled = data.get(
        "enable_tunnel_monitor", getattr(self.instance, "enable_tunnel_monitor", False) if self.instance else False
    )
    dest_ip = data.get(
        "monitor_destination_ip", getattr(self.instance, "monitor_destination_ip", None) if self.instance else None
    )
    profile_id_submitted = "monitor_profile_id" in data

    if monitor_enabled:
        if not dest_ip:
            raise serializers.ValidationError(
                {"monitor_destination_ip": "Destination IP is required when tunnel monitoring is enabled."}
            )

        if profile_id_submitted and data.get("monitor_profile_id") is None:
            raise serializers.ValidationError(
                {"monitor_profile_id": "Monitor Profile is required when tunnel monitoring is enabled."}
            )

        if not self.instance and "monitor_profile_id" not in data:
            raise serializers.ValidationError(
                {"monitor_profile_id": "Monitor Profile is required when tunnel monitoring is enabled."}
            )

        if self.instance and profile_id_submitted and data.get("monitor_profile_id") is None and monitor_enabled:
            raise serializers.ValidationError(
                {"monitor_profile_id": "Cannot remove Monitor Profile while tunnel monitoring is enabled."}
            )
    return data

IPSecCryptoSerializer

Bases: BaseModelSerializer

Serializer for IPSecCrypto objects.

Source code in nautobot_app_vpn/api/serializers.py
class IPSecCryptoSerializer(BaseModelSerializer):
    """Serializer for IPSecCrypto objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:ipseccrypto-detail")
    tenant_group = VPNNestedTenantGroupSerializer(read_only=True, required=False, allow_null=True)
    tenant_group_id = serializers.PrimaryKeyRelatedField(
        queryset=TenantGroup.objects.all(),
        source="tenant_group",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant Group",
    )
    tenant = VPNNestedTenantSerializer(read_only=True, required=False, allow_null=True)
    tenant_id = serializers.PrimaryKeyRelatedField(
        queryset=Tenant.objects.all(),
        source="tenant",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant",
    )
    status = VPNNestedStatusSerializer(required=False, allow_null=True, read_only=True)
    status_id = serializers.PrimaryKeyRelatedField(
        queryset=Status.objects.all(), source="status", write_only=True, required=False, allow_null=True, label="Status"
    )
    dh_group = serializers.PrimaryKeyRelatedField(
        queryset=DiffieHellmanGroup.objects.all(), many=True, required=False, label="Diffie-Hellman Groups"
    )
    encryption = serializers.PrimaryKeyRelatedField(
        queryset=EncryptionAlgorithm.objects.all(), many=True, required=False, label="Encryption Algorithms"
    )
    authentication = serializers.PrimaryKeyRelatedField(
        queryset=AuthenticationAlgorithm.objects.all(), many=True, required=False, label="Authentication Algorithms"
    )

    class Meta:
        model = IPSecCrypto
        fields = [
            "id",
            "display",
            "url",
            "name",
            "encryption",
            "authentication",
            "dh_group",
            "protocol",
            "lifetime",
            "lifetime_unit",
            "status",
            "status_id",
            "description",
            "tenant_group",
            "tenant_group_id",
            "tenant",
            "tenant_id",
            "created",
            "last_updated",
        ]
        read_only_fields = ["id", "display", "url", "status", "created", "last_updated"]

IPSecProxyIDSerializer

Bases: BaseModelSerializer

Serializer for IPSECTunnel model.

Source code in nautobot_app_vpn/api/serializers.py
class IPSecProxyIDSerializer(BaseModelSerializer):
    """Serializer for IPSECTunnel model."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:ipsecproxyid-detail")
    tunnel = VPNNestedIPSECTunnelSerializer(read_only=True)
    tunnel_id = serializers.PrimaryKeyRelatedField(
        queryset=IPSECTunnel.objects.all(),
        source="tunnel",
        write_only=True,
        required=True,
        allow_null=False,
        label="IPSec Tunnel",
    )

    class Meta:
        model = IPSecProxyID
        fields = [
            "id",
            "url",
            "display",
            "tunnel",
            "tunnel_id",
            "local_subnet",
            "remote_subnet",
            "protocol",
            "local_port",
            "remote_port",
        ]
        read_only_fields = ["id", "url", "display", "tunnel"]

TunnelMonitorProfileSerializer

Bases: BaseModelSerializer

Serializer for Tunnel Monitor Profiles.

Source code in nautobot_app_vpn/api/serializers.py
class TunnelMonitorProfileSerializer(BaseModelSerializer):
    """Serializer for Tunnel Monitor Profiles."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:tunnelmonitorprofile-detail")
    action = ChoiceField(choices=TunnelMonitorActionChoices.choices, required=False)
    tenant_group = VPNNestedTenantGroupSerializer(read_only=True, required=False, allow_null=True)
    tenant_group_id = serializers.PrimaryKeyRelatedField(
        queryset=TenantGroup.objects.all(),
        source="tenant_group",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant Group",
    )
    tenant = VPNNestedTenantSerializer(read_only=True, required=False, allow_null=True)
    tenant_id = serializers.PrimaryKeyRelatedField(
        queryset=Tenant.objects.all(),
        source="tenant",
        write_only=True,
        required=False,
        allow_null=True,
        label="Tenant",
    )

    class Meta:
        model = TunnelMonitorProfile
        fields = [
            "id",
            "display",
            "url",
            "name",
            "tenant_group",
            "tenant_group_id",
            "tenant",
            "tenant_id",
            "action",
            "interval",
            "threshold",
            "created",
            "last_updated",
        ]
        read_only_fields = ["id", "display", "url", "tenant_group", "tenant", "created", "last_updated"]

VPNDashboardSerializer

Bases: BaseModelSerializer

Serializer for VPNDashboard objects.

Source code in nautobot_app_vpn/api/serializers.py
class VPNDashboardSerializer(BaseModelSerializer):
    """Serializer for VPNDashboard objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:vpndashboard-detail")

    class Meta:
        model = VPNDashboard
        fields = [
            "id",
            "url",
            "display",
            "name",
            "last_updated",
            "total_tunnels",
            "active_tunnels",
            "inactive_tunnels",
            "last_sync_status",
            "last_sync_time",
            "last_push_status",
            "last_push_time",
            "created",
        ]
        read_only_fields = ["id", "url", "display", "created", "last_updated"]

VPNNestedDeviceSerializer

Bases: BaseModelSerializer

Nested serializer for referencing a Device object.

Source code in nautobot_app_vpn/api/serializers.py
class VPNNestedDeviceSerializer(BaseModelSerializer):
    """Nested serializer for referencing a Device object."""

    url = serializers.HyperlinkedIdentityField(view_name="dcim-api:device-detail")

    class Meta:
        model = Device
        fields = ["id", "url", "display", "name"]

VPNNestedIKECryptoSerializer

Bases: BaseModelSerializer

Nested serializer for referencing an IKECrypto object.

Source code in nautobot_app_vpn/api/serializers.py
class VPNNestedIKECryptoSerializer(BaseModelSerializer):
    """Nested serializer for referencing an IKECrypto object."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:ikecrypto-detail")

    class Meta:
        model = IKECrypto
        fields = ["id", "url", "display", "name"]

VPNNestedIKEGatewaySerializer

Bases: BaseModelSerializer

Nested serializer for referencing an IKEGateway object.

Source code in nautobot_app_vpn/api/serializers.py
class VPNNestedIKEGatewaySerializer(BaseModelSerializer):
    """Nested serializer for referencing an IKEGateway object."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:ikegateway-detail")

    class Meta:
        model = IKEGateway
        fields = ["id", "url", "display", "name"]

VPNNestedIPSECTunnelSerializer

Bases: BaseModelSerializer

Minimal serializer for related IPSECTunnel objects.

Source code in nautobot_app_vpn/api/serializers.py
class VPNNestedIPSECTunnelSerializer(BaseModelSerializer):
    """Minimal serializer for related IPSECTunnel objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:ipsectunnel-detail")

    class Meta:
        model = IPSECTunnel
        fields = ["id", "url", "display", "name"]

VPNNestedIPSecCryptoSerializer

Bases: BaseModelSerializer

Nested serializer for referencing an IPSecCrypto object.

Source code in nautobot_app_vpn/api/serializers.py
class VPNNestedIPSecCryptoSerializer(BaseModelSerializer):
    """Nested serializer for referencing an IPSecCrypto object."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:ipseccrypto-detail")

    class Meta:
        model = IPSecCrypto
        fields = ["id", "url", "display", "name"]

VPNNestedInterfaceSerializer

Bases: BaseModelSerializer

Nested serializer for referencing an Interface object.

Source code in nautobot_app_vpn/api/serializers.py
class VPNNestedInterfaceSerializer(BaseModelSerializer):
    """Nested serializer for referencing an Interface object."""

    url = serializers.HyperlinkedIdentityField(view_name="dcim-api:interface-detail")
    device = VPNNestedDeviceSerializer(read_only=True)  # Show device for context

    class Meta:
        model = Interface
        fields = ["id", "url", "display", "name", "device"]  # Added device

VPNNestedLocationSerializer

Bases: BaseModelSerializer

Nested serializer for referencing a Location object.

Source code in nautobot_app_vpn/api/serializers.py
class VPNNestedLocationSerializer(BaseModelSerializer):
    """Nested serializer for referencing a Location object."""

    url = serializers.HyperlinkedIdentityField(view_name="dcim-api:location-detail")

    class Meta:
        model = Location
        fields = ["id", "url", "display", "name"]

VPNNestedPlatformSerializer

Bases: BaseModelSerializer

Nested serializer for referencing a Platform object.

Source code in nautobot_app_vpn/api/serializers.py
class VPNNestedPlatformSerializer(BaseModelSerializer):
    """Nested serializer for referencing a Platform object."""

    url = serializers.HyperlinkedIdentityField(view_name="dcim-api:platform-detail")

    class Meta:
        model = Platform
        fields = ["id", "url", "display", "name"]

VPNNestedStatusSerializer

Bases: BaseModelSerializer

Nested serializer for referencing a Status object.

Source code in nautobot_app_vpn/api/serializers.py
class VPNNestedStatusSerializer(BaseModelSerializer):
    """Nested serializer for referencing a Status object."""

    url = serializers.HyperlinkedIdentityField(view_name="extras-api:status-detail")

    class Meta:
        model = Status
        fields = ["id", "url", "display", "name", "color"]

VPNNestedTenantGroupSerializer

Bases: BaseModelSerializer

Nested serializer for referencing a Tenant Group.

Source code in nautobot_app_vpn/api/serializers.py
class VPNNestedTenantGroupSerializer(BaseModelSerializer):
    """Nested serializer for referencing a Tenant Group."""

    url = serializers.HyperlinkedIdentityField(view_name="tenancy-api:tenantgroup-detail")

    class Meta:
        model = TenantGroup
        fields = ["id", "url", "display", "name"]

VPNNestedTenantSerializer

Bases: BaseModelSerializer

Nested serializer for referencing a Tenant.

Source code in nautobot_app_vpn/api/serializers.py
class VPNNestedTenantSerializer(BaseModelSerializer):
    """Nested serializer for referencing a Tenant."""

    url = serializers.HyperlinkedIdentityField(view_name="tenancy-api:tenant-detail")

    class Meta:
        model = Tenant
        fields = ["id", "url", "display", "name"]

VPNNestedTunnelMonitorProfileSerializer

Bases: BaseModelSerializer

Minimal serializer for related TunnelMonitorProfile objects.

Source code in nautobot_app_vpn/api/serializers.py
class VPNNestedTunnelMonitorProfileSerializer(BaseModelSerializer):
    """Minimal serializer for related TunnelMonitorProfile objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_app_vpn-api:tunnelmonitorprofile-detail")

    class Meta:
        model = TunnelMonitorProfile
        fields = ["id", "url", "display", "name"]

urls

API URL declarations for the Nautobot VPN app.

viewsets

API viewsets for the Nautobot VPN plugin.

AuthenticationAlgorithmViewSet

Bases: ReadOnlyModelViewSet

API viewset for Authentication Algorithms.

Source code in nautobot_app_vpn/api/viewsets.py
class AuthenticationAlgorithmViewSet(viewsets.ReadOnlyModelViewSet):
    """API viewset for Authentication Algorithms."""

    queryset = AuthenticationAlgorithm.objects.all()
    serializer_class = AuthenticationAlgorithmSerializer

DiffieHellmanGroupViewSet

Bases: ReadOnlyModelViewSet

API viewset for Diffie-Hellman Groups.

Source code in nautobot_app_vpn/api/viewsets.py
class DiffieHellmanGroupViewSet(viewsets.ReadOnlyModelViewSet):
    """API viewset for Diffie-Hellman Groups."""

    queryset = DiffieHellmanGroup.objects.all()
    serializer_class = DiffieHellmanGroupSerializer

EncryptionAlgorithmViewSet

Bases: ReadOnlyModelViewSet

API viewset for Encryption Algorithms.

Source code in nautobot_app_vpn/api/viewsets.py
class EncryptionAlgorithmViewSet(viewsets.ReadOnlyModelViewSet):
    """API viewset for Encryption Algorithms."""

    queryset = EncryptionAlgorithm.objects.all()
    serializer_class = EncryptionAlgorithmSerializer

IKECryptoViewSet

Bases: ModelViewSet

API endpoint for managing IKE Crypto Profiles.

Source code in nautobot_app_vpn/api/viewsets.py
class IKECryptoViewSet(viewsets.ModelViewSet):
    """API endpoint for managing IKE Crypto Profiles."""

    queryset = IKECrypto.objects.all().order_by("name")
    serializer_class = IKECryptoSerializer
    permission_classes = [IsAdminOrReadOnly]
    filter_backends = [DjangoFilterBackend, filters.OrderingFilter, filters.SearchFilter]
    filterset_class = IKECryptoFilterSet
    ordering_fields = ["name", "dh_group", "encryption", "lifetime"]
    search_fields = ["name", "dh_group", "encryption"]
    pagination_class = StandardResultsSetPagination

IKEGatewayViewSet

Bases: ModelViewSet

API viewset for IKE Gateways.

Source code in nautobot_app_vpn/api/viewsets.py
class IKEGatewayViewSet(viewsets.ModelViewSet):
    """API viewset for IKE Gateways."""

    queryset = (
        IKEGateway.objects.select_related(
            "ike_crypto_profile",
            "status",
            "bind_interface",
        )
        .prefetch_related("local_devices", "peer_devices", "local_locations", "peer_locations")
        .order_by("name")
    )

    serializer_class = IKEGatewaySerializer
    permission_classes = [IsAdminOrReadOnly]
    filter_backends = [DjangoFilterBackend, filters.OrderingFilter, filters.SearchFilter]
    filterset_class = IKEGatewayFilterSet
    ordering_fields = ["name", "local_ip", "peer_ip", "bind_interface__name"]

    search_fields = [
        "name",
        "description",
        "local_ip",
        "peer_ip",
        "peer_device_manual",
        "peer_location_manual",
        "bind_interface__name",
    ]
    pagination_class = StandardResultsSetPagination

    def perform_create(self, serializer):
        serializer.save()

    def perform_update(self, serializer):
        serializer.save()

IPSECTunnelViewSet

Bases: ModelViewSet

API viewset for IPSec Tunnels.

Source code in nautobot_app_vpn/api/viewsets.py
class IPSECTunnelViewSet(viewsets.ModelViewSet):
    """API viewset for IPSec Tunnels."""

    queryset = (
        IPSECTunnel.objects.select_related(
            "ike_gateway",
            "ipsec_crypto_profile",
            "status",
            "tunnel_interface",
            "monitor_profile",
        )
        .prefetch_related(
            "devices",
            "proxy_ids",
        )
        .order_by("name")
        .distinct()
    )

    serializer_class = IPSECTunnelSerializer
    permission_classes = [IsAdminOrReadOnly]
    filter_backends = [DjangoFilterBackend, filters.OrderingFilter, filters.SearchFilter]
    filterset_class = IPSECTunnelFilterSet

    ordering_fields = [
        "name",
        "ike_gateway__name",
        "ipsec_crypto_profile__name",
        "tunnel_interface__name",
        "status__name",
        "enable_tunnel_monitor",
        "monitor_destination_ip",
    ]
    search_fields = [
        "name",
        "description",
        "ike_gateway__name",
        "ipsec_crypto_profile__name",
        "tunnel_interface__name",
        "monitor_destination_ip",
    ]
    pagination_class = StandardResultsSetPagination

    def perform_create(self, serializer):
        serializer.save()

    def perform_update(self, serializer):
        serializer.save()

IPSecCryptoViewSet

Bases: ModelViewSet

API endpoint for managing IPSec Crypto Profiles.

Source code in nautobot_app_vpn/api/viewsets.py
class IPSecCryptoViewSet(viewsets.ModelViewSet):
    """API endpoint for managing IPSec Crypto Profiles."""

    queryset = IPSecCrypto.objects.all().order_by("name")
    serializer_class = IPSecCryptoSerializer
    permission_classes = [IsAdminOrReadOnly]
    filter_backends = [DjangoFilterBackend, filters.OrderingFilter, filters.SearchFilter]
    filterset_class = IPSecCryptoFilterSet
    ordering_fields = ["name", "encryption", "authentication", "dh_group"]
    search_fields = ["name", "encryption", "authentication"]
    pagination_class = StandardResultsSetPagination

IPSecProxyIDViewSet

Bases: ModelViewSet

API viewset for IPSec Proxy IDs.

Source code in nautobot_app_vpn/api/viewsets.py
class IPSecProxyIDViewSet(viewsets.ModelViewSet):
    """API viewset for IPSec Proxy IDs."""

    queryset = IPSecProxyID.objects.select_related("tunnel").order_by("tunnel__name")
    serializer_class = IPSecProxyIDSerializer
    permission_classes = [IsAdminOrReadOnly]
    filter_backends = [DjangoFilterBackend, filters.OrderingFilter, filters.SearchFilter]
    filterset_class = IPSecProxyIDFilterSet
    ordering_fields = ["tunnel__name", "local_subnet", "remote_subnet", "protocol"]
    search_fields = ["local_subnet", "remote_subnet", "protocol"]
    pagination_class = StandardResultsSetPagination

TunnelMonitorProfileViewSet

Bases: ModelViewSet

API viewset for Tunnel Monitor Profiles.

Source code in nautobot_app_vpn/api/viewsets.py
class TunnelMonitorProfileViewSet(viewsets.ModelViewSet):
    """API viewset for Tunnel Monitor Profiles."""

    queryset = TunnelMonitorProfile.objects.all().order_by("name")
    serializer_class = TunnelMonitorProfileSerializer
    permission_classes = [IsAdminOrReadOnly]
    filter_backends = [DjangoFilterBackend, filters.OrderingFilter, filters.SearchFilter]
    filterset_class = TunnelMonitorProfileFilterSet
    ordering_fields = ["name", "action", "interval", "threshold"]
    search_fields = ["name"]
    pagination_class = StandardResultsSetPagination

VPNTopologyFilterOptionsView

Bases: APIView

Returns arrays per filter key your UI expects: { "country": [...], "role": [...], "status": [...], "ike_version": [...], "location": [...], "device": [...], "platform": [...] } (Built from relational data; no change to IKE/IPSec logic.)

Source code in nautobot_app_vpn/api/viewsets.py
class VPNTopologyFilterOptionsView(APIView):
    """
    Returns arrays per filter key your UI expects:
    {
      "country": [...], "role": [...], "status": [...],
      "ike_version": [...], "location": [...],
      "device": [...], "platform": [...]
    }
    (Built from relational data; no change to IKE/IPSec logic.)
    """

    serializer_class = DummySerializer
    permission_classes = [IsAuthenticated]

    def _get_device_country_from_name(self, device_name):
        """Derives country from device name based on 'CODE-...' convention."""
        if device_name:
            parts = device_name.split("-")
            if parts:
                return parts[0].upper()
        return None

    def get(self, request):
        """Return available filter values derived from relational VPN data."""
        logger.debug("Filter options GET request from user %s", request.user)
        countries = set()
        ike_versions = set()
        statuses = set()
        tunnel_roles = set()
        devices = set()
        locations = set()
        platforms = set()

        tunnels_qs = IPSECTunnel.objects.select_related(
            "ike_gateway", "status", "ike_gateway__local_platform", "ike_gateway__peer_platform"
        ).prefetch_related(
            "ike_gateway__local_devices__platform",
            "ike_gateway__local_devices__location",
            "ike_gateway__local_devices__role",
            "ike_gateway__peer_devices__platform",
            "ike_gateway__peer_devices__location",
            "ike_gateway__peer_devices__role",
        )

        for tunnel in tunnels_qs:
            if tunnel.status and tunnel.status.name:
                statuses.add(tunnel.status.name)
            if tunnel.role:
                tunnel_roles.add(str(tunnel.role))

            gw = tunnel.ike_gateway
            if gw:
                if gw.ike_version:
                    ike_versions.add(str(gw.ike_version))

                for dev_group in [gw.local_devices.all(), gw.peer_devices.all()]:
                    for dev in dev_group:
                        if dev and dev.name:
                            devices.add(dev.name)
                            country = self._get_device_country_from_name(dev.name)
                            if country:
                                countries.add(country)
                        if dev and dev.location and dev.location.name:
                            locations.add(dev.location.name)
                        if dev and dev.platform and dev.platform.name:
                            platforms.add(dev.platform.name)

                # consider local/peer platforms on gateway
                for plat in [gw.local_platform, gw.peer_platform]:
                    if plat and plat.name:
                        platforms.add(plat.name)

        # also include all defined platforms
        for plat in Platform.objects.all().values("name").distinct():
            if plat["name"]:
                platforms.add(plat["name"])

        # OUTPUT KEYS match frontend expectations
        return Response(
            {
                "country": sorted(filter(None, countries)),
                "ike_version": sorted(filter(None, ike_versions)),
                "status": sorted(filter(None, statuses)),
                "role": sorted(filter(None, tunnel_roles)),
                "location": sorted(filter(None, locations)),
                "device": sorted(filter(None, devices)),
                "platform": sorted(filter(None, platforms)),
            }
        )
get(request)

Return available filter values derived from relational VPN data.

Source code in nautobot_app_vpn/api/viewsets.py
def get(self, request):
    """Return available filter values derived from relational VPN data."""
    logger.debug("Filter options GET request from user %s", request.user)
    countries = set()
    ike_versions = set()
    statuses = set()
    tunnel_roles = set()
    devices = set()
    locations = set()
    platforms = set()

    tunnels_qs = IPSECTunnel.objects.select_related(
        "ike_gateway", "status", "ike_gateway__local_platform", "ike_gateway__peer_platform"
    ).prefetch_related(
        "ike_gateway__local_devices__platform",
        "ike_gateway__local_devices__location",
        "ike_gateway__local_devices__role",
        "ike_gateway__peer_devices__platform",
        "ike_gateway__peer_devices__location",
        "ike_gateway__peer_devices__role",
    )

    for tunnel in tunnels_qs:
        if tunnel.status and tunnel.status.name:
            statuses.add(tunnel.status.name)
        if tunnel.role:
            tunnel_roles.add(str(tunnel.role))

        gw = tunnel.ike_gateway
        if gw:
            if gw.ike_version:
                ike_versions.add(str(gw.ike_version))

            for dev_group in [gw.local_devices.all(), gw.peer_devices.all()]:
                for dev in dev_group:
                    if dev and dev.name:
                        devices.add(dev.name)
                        country = self._get_device_country_from_name(dev.name)
                        if country:
                            countries.add(country)
                    if dev and dev.location and dev.location.name:
                        locations.add(dev.location.name)
                    if dev and dev.platform and dev.platform.name:
                        platforms.add(dev.platform.name)

            # consider local/peer platforms on gateway
            for plat in [gw.local_platform, gw.peer_platform]:
                if plat and plat.name:
                    platforms.add(plat.name)

    # also include all defined platforms
    for plat in Platform.objects.all().values("name").distinct():
        if plat["name"]:
            platforms.add(plat["name"])

    # OUTPUT KEYS match frontend expectations
    return Response(
        {
            "country": sorted(filter(None, countries)),
            "ike_version": sorted(filter(None, ike_versions)),
            "status": sorted(filter(None, statuses)),
            "role": sorted(filter(None, tunnel_roles)),
            "location": sorted(filter(None, locations)),
            "device": sorted(filter(None, devices)),
            "platform": sorted(filter(None, platforms)),
        }
    )

VPNTopologyNeo4jView

Bases: APIView

Returns GeoJSON for MapLibre:

{ "devices": FeatureCollection(Point), "tunnels": FeatureCollection(LineString), "stats": { "active": N, "failed": M, "planned": K, ... }, "meta": { "devices_count": int, "tunnels_count": int, "countries_count": int, "platforms_count": int, "ha_pairs": int, "last_synced": ISO8601 or null } }

Source code in nautobot_app_vpn/api/viewsets.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
class VPNTopologyNeo4jView(APIView):
    """
    Returns GeoJSON for MapLibre:

    {
      "devices": FeatureCollection(Point),
      "tunnels": FeatureCollection(LineString),
      "stats": { "active": N, "failed": M, "planned": K, ... },
      "meta":  {
        "devices_count": int,
        "tunnels_count": int,
        "countries_count": int,
        "platforms_count": int,
        "ha_pairs": int,
        "last_synced": ISO8601 or null
      }
    }
    """

    serializer_class = DummySerializer
    permission_classes = [IsAuthenticated]

    def _build_node_where(self, params_in, qp_out):
        where = []
        if params_in.get("country"):
            where.append("toLower(n.country) = toLower($country)")
            qp_out["country"] = params_in["country"]

        if params_in.get("platform"):
            where.append("toLower(n.platform_name) CONTAINS toLower($platform)")
            qp_out["platform"] = params_in["platform"]

        if params_in.get("location"):
            where.append("toLower(n.location_name) CONTAINS toLower($location)")
            qp_out["location"] = params_in["location"]

        if params_in.get("device"):
            val = str(params_in["device"]).strip()
            # Guard against null lists with coalesce()
            where.append(
                "("
                "toLower($device_name) IN [dev IN coalesce(n.device_names, []) | toLower(dev)] "
                "OR $device_name IN coalesce(n.nautobot_device_pks, []) "
                "OR toLower(n.label) CONTAINS toLower($device_name)"
                ")"
            )
            qp_out["device_name"] = val

        if params_in.get("role"):
            where.append("toLower(n.role) = toLower($node_role)")
            qp_out["node_role"] = params_in["role"]

        return where

    def _build_edge_filter(self, params_in, qp_out):
        conds = []
        if params_in.get("status"):
            conds.append("toLower(r.status) = toLower($tunnel_status)")
            qp_out["tunnel_status"] = params_in["status"]

        if params_in.get("ike_version"):
            conds.append("toLower(r.ike_version) = toLower($ike_version)")
            qp_out["ike_version"] = params_in["ike_version"]

        if params_in.get("role"):
            conds.append("toLower(r.role) = toLower($tunnel_role)")
            qp_out["tunnel_role"] = params_in["role"]

        return conds

    def get(self, request):
        """Return VPN topology GeoJSON and summary metadata sourced from Neo4j."""
        logger.info("Neo4j VPN Topology GET request from user %s with filters: %s", request.user, request.GET.dict())

        # Settings check
        for attr in ("NEO4J_URI", "NEO4J_USER", "NEO4J_PASSWORD"):
            if not hasattr(settings, attr):
                logger.error("Neo4j connection settings are not fully configured in Nautobot settings.")
                return Response({"error": "Graph database service is not configured."}, status=503)

        driver = None
        try:
            driver = _neo4j_driver()
            driver.verify_connectivity()
        except Exception as exc:  # pylint: disable=broad-exception-caught
            logger.error("Failed to connect to Neo4j for topology view: %s", exc, exc_info=True)
            return Response({"error": "Could not connect to graph database."}, status=503)

        params_in = request.GET.dict()

        # --- Relational statistics for ribbon summary ---
        tracked_status = [
            ("active", "Active"),
            ("down", "Down"),
            ("decommissioned", "Decommissioned"),
            ("disabled", "Disabled"),
            ("planned", "Planned"),
        ]
        status_counts = {slug: 0 for slug, _ in tracked_status}
        status_labels = dict(tracked_status)
        status_order = [slug for slug, _ in tracked_status]

        role_labels = {
            "primary": "Primary",
            "secondary": "Secondary",
            "tertiary": "Tertiary",
            "unassigned": "Unassigned",
        }
        role_counts = {key: 0 for key in role_labels}
        role_order = ["primary", "secondary", "tertiary"]
        total_tunnels = 0

        try:
            tunnels_qs = IPSECTunnel.objects.restrict(request.user, "view")

            status_filter = (params_in.get("status") or "").strip()
            role_filter = (params_in.get("role") or "").strip()

            status_model = apps.get_model("extras", "Status")
            status_fields = {f.name for f in status_model._meta.get_fields()}
            has_status_slug = "slug" in status_fields

            if status_filter:
                status_lookup = Q(status__name__iexact=status_filter)
                if has_status_slug:
                    status_lookup |= Q(status__slug__iexact=status_filter)
                tunnels_qs = tunnels_qs.filter(status_lookup)

            if role_filter:
                tunnels_qs = tunnels_qs.filter(role__iexact=role_filter)

            status_value_fields = ["status__name"]
            if has_status_slug:
                status_value_fields.append("status__slug")

            for row in tunnels_qs.values(*status_value_fields).annotate(total=Count("id")):
                status_name = (row.get("status__name") or "").strip()
                raw_key = row.get("status__slug") if has_status_slug else status_name
                slug_key = (raw_key or status_name or "unknown").strip().lower().replace(" ", "-")
                if not slug_key:
                    slug_key = "unknown"
                status_counts[slug_key] = row["total"]
                status_labels[slug_key] = status_name or status_labels.get(slug_key, slug_key.title())
                if slug_key not in status_order:
                    status_order.append(slug_key)

            for row in tunnels_qs.values("role").annotate(total=Count("id")):
                role_value = (row["role"] or "unassigned").lower()
                role_counts[role_value] = row["total"]
                if role_value not in role_order:
                    role_order.append(role_value)

            total_tunnels = tunnels_qs.count()
        except (DatabaseError, LookupError) as agg_exc:
            logger.error("Failed to compute relational tunnel statistics: %s", agg_exc, exc_info=True)

        qp = {}

        node_where = self._build_node_where(params_in, qp)
        node_query = "MATCH (n:VPNNode)"
        if node_where:
            node_query += " WHERE " + " AND ".join(node_where)
        node_query += " RETURN n"

        try:
            devices_fc = {"type": "FeatureCollection", "features": []}
            tunnels_fc = {"type": "FeatureCollection", "features": []}

            with driver.session(database=getattr(settings, "NEO4J_DATABASE", "neo4j")) as session:
                # ---- Nodes
                logger.debug("Node query: %s params=%s", node_query, qp)
                node_records = session.run(node_query, qp)

                node_ids = set()
                for rec in node_records:
                    nprops = dict(rec["n"])
                    node_id = nprops.get("id")
                    if not node_id:
                        continue
                    # accept several possible coord keys
                    lat = nprops.get("lat", nprops.get("latitude"))
                    lon = nprops.get("lon", nprops.get("longitude"))
                    if lat is None or lon is None:
                        # skip nodes without geo
                        continue

                    node_ids.add(node_id)
                    props = {
                        "id": node_id,
                        "name": nprops.get("name") or nprops.get("label") or "",
                        "status": nprops.get("status") or "unknown",
                        "role": nprops.get("role"),
                        "platform": nprops.get("platform_name"),
                        "country": nprops.get("country"),
                        "location": nprops.get("location_name"),
                        "is_ha_pair": bool(nprops.get("is_ha_pair")),
                        # Include backing device info to make device filter work with HA groups
                        "device_names": nprops.get("device_names") or [],
                        "nautobot_device_pks": nprops.get("nautobot_device_pks") or [],
                        "search_text": " ".join(
                            str(x)
                            for x in [
                                nprops.get("name") or nprops.get("label"),
                                nprops.get("role"),
                                nprops.get("platform_name"),
                                nprops.get("country"),
                                nprops.get("location_name"),
                            ]
                            if x
                        ),
                    }

                    devices_fc["features"].append(
                        {
                            "type": "Feature",
                            "geometry": {"type": "Point", "coordinates": [float(lon), float(lat)]},
                            "properties": props,
                        }
                    )

                # ---- Edges (include peers even if they don't match the node filters) ----
                edge_qp = {}
                edge_conds = self._build_edge_filter(params_in, edge_qp)

                base = (
                    "MATCH (a:VPNNode)-[r:TUNNEL]->(b:VPNNode) "
                    "WHERE a.lat IS NOT NULL AND a.lon IS NOT NULL AND b.lat IS NOT NULL AND b.lon IS NOT NULL "
                )
                if node_ids:
                    base += "AND (a.id IN $node_ids OR b.id IN $node_ids) "
                    edge_qp["node_ids"] = list(node_ids)
                if edge_conds:
                    base += "AND " + " AND ".join(edge_conds) + " "
                edge_query = base + "RETURN a AS a, b AS b, r AS r"

                logger.debug("Edge query: %s params=%s", edge_query, edge_qp)
                nodes_map = {f["properties"]["id"]: f for f in devices_fc["features"]}
                for rec in session.run(edge_query, edge_qp):
                    aprops = dict(rec["a"])  # node a properties
                    bprops = dict(rec["b"])  # node b properties
                    rprops = dict(rec["r"])  # relationship properties

                    # Ensure endpoints exist in devices_fc
                    for np in (aprops, bprops):
                        nid = np.get("id")
                        if not nid or nid in nodes_map:
                            continue
                        lat = np.get("lat")
                        if lat is None:
                            lat = np.get("latitude")
                        lon = np.get("lon")
                        if lon is None:
                            lon = np.get("longitude")
                        if lat is None or lon is None:
                            continue
                        props = {
                            "id": nid,
                            "name": np.get("name") or np.get("label") or "",
                            "status": np.get("status") or "unknown",
                            "role": np.get("role"),
                            "platform": np.get("platform_name"),
                            "country": np.get("country"),
                            "location": np.get("location_name"),
                            "is_ha_pair": bool(np.get("is_ha_pair")),
                            "device_names": np.get("device_names") or [],
                            "nautobot_device_pks": np.get("nautobot_device_pks") or [],
                            "search_text": " ".join(
                                str(x)
                                for x in [
                                    np.get("name") or np.get("label"),
                                    np.get("role"),
                                    np.get("platform_name"),
                                    np.get("country"),
                                    np.get("location_name"),
                                ]
                                if x
                            ),
                        }
                        feat = {
                            "type": "Feature",
                            "geometry": {
                                "type": "Point",
                                "coordinates": [float(lon), float(lat)],
                            },
                            "properties": props,
                        }
                        devices_fc["features"].append(feat)
                        nodes_map[nid] = feat

                    # Add tunnel feature
                    a_lon = aprops.get("lon") if aprops.get("lon") is not None else aprops.get("longitude")
                    a_lat = aprops.get("lat") if aprops.get("lat") is not None else aprops.get("latitude")
                    b_lon = bprops.get("lon") if bprops.get("lon") is not None else bprops.get("longitude")
                    b_lat = bprops.get("lat") if bprops.get("lat") is not None else bprops.get("latitude")
                    if a_lon is None or a_lat is None or b_lon is None or b_lat is None:
                        continue
                    tunnels_fc["features"].append(
                        {
                            "type": "Feature",
                            "geometry": {
                                "type": "LineString",
                                "coordinates": [
                                    [float(a_lon), float(a_lat)],
                                    [float(b_lon), float(b_lat)],
                                ],
                            },
                            "properties": {
                                "name": rprops.get("label") or rprops.get("id") or "",
                                "status": rprops.get("status") or "unknown",
                                "role": rprops.get("role") or "",
                                "ike_version": rprops.get("ike_version") or "",
                                "scope": rprops.get("scope") or "",
                                "local_ip": rprops.get("local_ip") or "",
                                "peer_ip": rprops.get("peer_ip") or "",
                                "firewall_hostnames": rprops.get("firewall_hostnames") or "",
                                "tooltip": rprops.get("tooltip_details_json") or rprops.get("tooltip") or "",
                            },
                        }
                    )

            # ---- Stats (by device status) ----
            stats = {}
            for f in devices_fc["features"]:
                s = (f["properties"].get("status") or "unknown").lower()
                stats[s] = stats.get(s, 0) + 1

            # ---- Meta for ribbon ----
            countries = set()
            platforms = set()
            ha_pairs = 0
            for f in devices_fc["features"]:
                p = f["properties"] or {}
                if p.get("country"):
                    countries.add(p["country"])
                if p.get("platform"):
                    platforms.add(p["platform"])
                if p.get("is_ha_pair"):
                    ha_pairs += 1

            last_synced_iso = None
            last_sync_status = None
            try:
                dash = VPNDashboard.objects.filter(pk=1).only("last_sync_time", "last_sync_status").first()
                if dash and dash.last_sync_time:
                    last_synced_iso = dash.last_sync_time.isoformat()
                if dash and dash.last_sync_status:
                    last_sync_status = dash.last_sync_status
            except DatabaseError as db_error:
                logger.debug(
                    "Unable to load VPNDashboard sync metadata due to database error: %s",
                    db_error,
                    exc_info=True,
                )

            meta = {
                "devices_count": len(devices_fc["features"]),
                "tunnels_count": len(tunnels_fc["features"]),
                "countries_count": len(countries),
                "platforms_count": len(platforms),
                "ha_pairs": ha_pairs,
                "last_synced": last_synced_iso,
                "last_sync_status": last_sync_status,
                "status_counts": status_counts,
                "status_labels": status_labels,
                "status_order": status_order,
                "role_counts": role_counts,
                "role_labels": role_labels,
                "role_order": role_order,
                "total_tunnels": total_tunnels,
                "total_primary_tunnels": role_counts.get("primary", 0),
                "total_secondary_tunnels": role_counts.get("secondary", 0),
                "total_tertiary_tunnels": role_counts.get("tertiary", 0),
                "total_unassigned_tunnels": role_counts.get("unassigned", 0),
            }

            return Response({"devices": devices_fc, "tunnels": tunnels_fc, "stats": stats, "meta": meta})

        except neo4j_exceptions.CypherSyntaxError as e:  # pylint: disable=broad-exception-caught
            logger.error("Neo4j Cypher Syntax Error in VPNTopologyNeo4jView: %s", e, exc_info=True)
            return Response({"error": "Error querying graph database (query syntax problem)."}, status=500)
        except neo4j_exceptions.ServiceUnavailable:
            logger.error("Neo4j Service Unavailable during VPN topology query.", exc_info=True)
            return Response({"error": "Graph database service unavailable during query."}, status=503)
        except Exception as exc:  # pylint: disable=broad-exception-caught
            logger.error("Error querying or processing data from Neo4j in VPNTopologyNeo4jView: %s", exc, exc_info=True)
            return Response({"error": "Could not retrieve topology data from graph database."}, status=500)
        finally:
            if driver:
                driver.close()
get(request)

Return VPN topology GeoJSON and summary metadata sourced from Neo4j.

Source code in nautobot_app_vpn/api/viewsets.py
def get(self, request):
    """Return VPN topology GeoJSON and summary metadata sourced from Neo4j."""
    logger.info("Neo4j VPN Topology GET request from user %s with filters: %s", request.user, request.GET.dict())

    # Settings check
    for attr in ("NEO4J_URI", "NEO4J_USER", "NEO4J_PASSWORD"):
        if not hasattr(settings, attr):
            logger.error("Neo4j connection settings are not fully configured in Nautobot settings.")
            return Response({"error": "Graph database service is not configured."}, status=503)

    driver = None
    try:
        driver = _neo4j_driver()
        driver.verify_connectivity()
    except Exception as exc:  # pylint: disable=broad-exception-caught
        logger.error("Failed to connect to Neo4j for topology view: %s", exc, exc_info=True)
        return Response({"error": "Could not connect to graph database."}, status=503)

    params_in = request.GET.dict()

    # --- Relational statistics for ribbon summary ---
    tracked_status = [
        ("active", "Active"),
        ("down", "Down"),
        ("decommissioned", "Decommissioned"),
        ("disabled", "Disabled"),
        ("planned", "Planned"),
    ]
    status_counts = {slug: 0 for slug, _ in tracked_status}
    status_labels = dict(tracked_status)
    status_order = [slug for slug, _ in tracked_status]

    role_labels = {
        "primary": "Primary",
        "secondary": "Secondary",
        "tertiary": "Tertiary",
        "unassigned": "Unassigned",
    }
    role_counts = {key: 0 for key in role_labels}
    role_order = ["primary", "secondary", "tertiary"]
    total_tunnels = 0

    try:
        tunnels_qs = IPSECTunnel.objects.restrict(request.user, "view")

        status_filter = (params_in.get("status") or "").strip()
        role_filter = (params_in.get("role") or "").strip()

        status_model = apps.get_model("extras", "Status")
        status_fields = {f.name for f in status_model._meta.get_fields()}
        has_status_slug = "slug" in status_fields

        if status_filter:
            status_lookup = Q(status__name__iexact=status_filter)
            if has_status_slug:
                status_lookup |= Q(status__slug__iexact=status_filter)
            tunnels_qs = tunnels_qs.filter(status_lookup)

        if role_filter:
            tunnels_qs = tunnels_qs.filter(role__iexact=role_filter)

        status_value_fields = ["status__name"]
        if has_status_slug:
            status_value_fields.append("status__slug")

        for row in tunnels_qs.values(*status_value_fields).annotate(total=Count("id")):
            status_name = (row.get("status__name") or "").strip()
            raw_key = row.get("status__slug") if has_status_slug else status_name
            slug_key = (raw_key or status_name or "unknown").strip().lower().replace(" ", "-")
            if not slug_key:
                slug_key = "unknown"
            status_counts[slug_key] = row["total"]
            status_labels[slug_key] = status_name or status_labels.get(slug_key, slug_key.title())
            if slug_key not in status_order:
                status_order.append(slug_key)

        for row in tunnels_qs.values("role").annotate(total=Count("id")):
            role_value = (row["role"] or "unassigned").lower()
            role_counts[role_value] = row["total"]
            if role_value not in role_order:
                role_order.append(role_value)

        total_tunnels = tunnels_qs.count()
    except (DatabaseError, LookupError) as agg_exc:
        logger.error("Failed to compute relational tunnel statistics: %s", agg_exc, exc_info=True)

    qp = {}

    node_where = self._build_node_where(params_in, qp)
    node_query = "MATCH (n:VPNNode)"
    if node_where:
        node_query += " WHERE " + " AND ".join(node_where)
    node_query += " RETURN n"

    try:
        devices_fc = {"type": "FeatureCollection", "features": []}
        tunnels_fc = {"type": "FeatureCollection", "features": []}

        with driver.session(database=getattr(settings, "NEO4J_DATABASE", "neo4j")) as session:
            # ---- Nodes
            logger.debug("Node query: %s params=%s", node_query, qp)
            node_records = session.run(node_query, qp)

            node_ids = set()
            for rec in node_records:
                nprops = dict(rec["n"])
                node_id = nprops.get("id")
                if not node_id:
                    continue
                # accept several possible coord keys
                lat = nprops.get("lat", nprops.get("latitude"))
                lon = nprops.get("lon", nprops.get("longitude"))
                if lat is None or lon is None:
                    # skip nodes without geo
                    continue

                node_ids.add(node_id)
                props = {
                    "id": node_id,
                    "name": nprops.get("name") or nprops.get("label") or "",
                    "status": nprops.get("status") or "unknown",
                    "role": nprops.get("role"),
                    "platform": nprops.get("platform_name"),
                    "country": nprops.get("country"),
                    "location": nprops.get("location_name"),
                    "is_ha_pair": bool(nprops.get("is_ha_pair")),
                    # Include backing device info to make device filter work with HA groups
                    "device_names": nprops.get("device_names") or [],
                    "nautobot_device_pks": nprops.get("nautobot_device_pks") or [],
                    "search_text": " ".join(
                        str(x)
                        for x in [
                            nprops.get("name") or nprops.get("label"),
                            nprops.get("role"),
                            nprops.get("platform_name"),
                            nprops.get("country"),
                            nprops.get("location_name"),
                        ]
                        if x
                    ),
                }

                devices_fc["features"].append(
                    {
                        "type": "Feature",
                        "geometry": {"type": "Point", "coordinates": [float(lon), float(lat)]},
                        "properties": props,
                    }
                )

            # ---- Edges (include peers even if they don't match the node filters) ----
            edge_qp = {}
            edge_conds = self._build_edge_filter(params_in, edge_qp)

            base = (
                "MATCH (a:VPNNode)-[r:TUNNEL]->(b:VPNNode) "
                "WHERE a.lat IS NOT NULL AND a.lon IS NOT NULL AND b.lat IS NOT NULL AND b.lon IS NOT NULL "
            )
            if node_ids:
                base += "AND (a.id IN $node_ids OR b.id IN $node_ids) "
                edge_qp["node_ids"] = list(node_ids)
            if edge_conds:
                base += "AND " + " AND ".join(edge_conds) + " "
            edge_query = base + "RETURN a AS a, b AS b, r AS r"

            logger.debug("Edge query: %s params=%s", edge_query, edge_qp)
            nodes_map = {f["properties"]["id"]: f for f in devices_fc["features"]}
            for rec in session.run(edge_query, edge_qp):
                aprops = dict(rec["a"])  # node a properties
                bprops = dict(rec["b"])  # node b properties
                rprops = dict(rec["r"])  # relationship properties

                # Ensure endpoints exist in devices_fc
                for np in (aprops, bprops):
                    nid = np.get("id")
                    if not nid or nid in nodes_map:
                        continue
                    lat = np.get("lat")
                    if lat is None:
                        lat = np.get("latitude")
                    lon = np.get("lon")
                    if lon is None:
                        lon = np.get("longitude")
                    if lat is None or lon is None:
                        continue
                    props = {
                        "id": nid,
                        "name": np.get("name") or np.get("label") or "",
                        "status": np.get("status") or "unknown",
                        "role": np.get("role"),
                        "platform": np.get("platform_name"),
                        "country": np.get("country"),
                        "location": np.get("location_name"),
                        "is_ha_pair": bool(np.get("is_ha_pair")),
                        "device_names": np.get("device_names") or [],
                        "nautobot_device_pks": np.get("nautobot_device_pks") or [],
                        "search_text": " ".join(
                            str(x)
                            for x in [
                                np.get("name") or np.get("label"),
                                np.get("role"),
                                np.get("platform_name"),
                                np.get("country"),
                                np.get("location_name"),
                            ]
                            if x
                        ),
                    }
                    feat = {
                        "type": "Feature",
                        "geometry": {
                            "type": "Point",
                            "coordinates": [float(lon), float(lat)],
                        },
                        "properties": props,
                    }
                    devices_fc["features"].append(feat)
                    nodes_map[nid] = feat

                # Add tunnel feature
                a_lon = aprops.get("lon") if aprops.get("lon") is not None else aprops.get("longitude")
                a_lat = aprops.get("lat") if aprops.get("lat") is not None else aprops.get("latitude")
                b_lon = bprops.get("lon") if bprops.get("lon") is not None else bprops.get("longitude")
                b_lat = bprops.get("lat") if bprops.get("lat") is not None else bprops.get("latitude")
                if a_lon is None or a_lat is None or b_lon is None or b_lat is None:
                    continue
                tunnels_fc["features"].append(
                    {
                        "type": "Feature",
                        "geometry": {
                            "type": "LineString",
                            "coordinates": [
                                [float(a_lon), float(a_lat)],
                                [float(b_lon), float(b_lat)],
                            ],
                        },
                        "properties": {
                            "name": rprops.get("label") or rprops.get("id") or "",
                            "status": rprops.get("status") or "unknown",
                            "role": rprops.get("role") or "",
                            "ike_version": rprops.get("ike_version") or "",
                            "scope": rprops.get("scope") or "",
                            "local_ip": rprops.get("local_ip") or "",
                            "peer_ip": rprops.get("peer_ip") or "",
                            "firewall_hostnames": rprops.get("firewall_hostnames") or "",
                            "tooltip": rprops.get("tooltip_details_json") or rprops.get("tooltip") or "",
                        },
                    }
                )

        # ---- Stats (by device status) ----
        stats = {}
        for f in devices_fc["features"]:
            s = (f["properties"].get("status") or "unknown").lower()
            stats[s] = stats.get(s, 0) + 1

        # ---- Meta for ribbon ----
        countries = set()
        platforms = set()
        ha_pairs = 0
        for f in devices_fc["features"]:
            p = f["properties"] or {}
            if p.get("country"):
                countries.add(p["country"])
            if p.get("platform"):
                platforms.add(p["platform"])
            if p.get("is_ha_pair"):
                ha_pairs += 1

        last_synced_iso = None
        last_sync_status = None
        try:
            dash = VPNDashboard.objects.filter(pk=1).only("last_sync_time", "last_sync_status").first()
            if dash and dash.last_sync_time:
                last_synced_iso = dash.last_sync_time.isoformat()
            if dash and dash.last_sync_status:
                last_sync_status = dash.last_sync_status
        except DatabaseError as db_error:
            logger.debug(
                "Unable to load VPNDashboard sync metadata due to database error: %s",
                db_error,
                exc_info=True,
            )

        meta = {
            "devices_count": len(devices_fc["features"]),
            "tunnels_count": len(tunnels_fc["features"]),
            "countries_count": len(countries),
            "platforms_count": len(platforms),
            "ha_pairs": ha_pairs,
            "last_synced": last_synced_iso,
            "last_sync_status": last_sync_status,
            "status_counts": status_counts,
            "status_labels": status_labels,
            "status_order": status_order,
            "role_counts": role_counts,
            "role_labels": role_labels,
            "role_order": role_order,
            "total_tunnels": total_tunnels,
            "total_primary_tunnels": role_counts.get("primary", 0),
            "total_secondary_tunnels": role_counts.get("secondary", 0),
            "total_tertiary_tunnels": role_counts.get("tertiary", 0),
            "total_unassigned_tunnels": role_counts.get("unassigned", 0),
        }

        return Response({"devices": devices_fc, "tunnels": tunnels_fc, "stats": stats, "meta": meta})

    except neo4j_exceptions.CypherSyntaxError as e:  # pylint: disable=broad-exception-caught
        logger.error("Neo4j Cypher Syntax Error in VPNTopologyNeo4jView: %s", e, exc_info=True)
        return Response({"error": "Error querying graph database (query syntax problem)."}, status=500)
    except neo4j_exceptions.ServiceUnavailable:
        logger.error("Neo4j Service Unavailable during VPN topology query.", exc_info=True)
        return Response({"error": "Graph database service unavailable during query."}, status=503)
    except Exception as exc:  # pylint: disable=broad-exception-caught
        logger.error("Error querying or processing data from Neo4j in VPNTopologyNeo4jView: %s", exc, exc_info=True)
        return Response({"error": "Could not retrieve topology data from graph database."}, status=500)
    finally:
        if driver:
            driver.close()

latlon_to_xy(lat, lon, svg_width=2754, svg_height=1398)

Map latitude and longitude to SVG x, y coordinates (equirectangular).

Source code in nautobot_app_vpn/api/viewsets.py
def latlon_to_xy(lat, lon, svg_width=2754, svg_height=1398):
    """Map latitude and longitude to SVG x, y coordinates (equirectangular)."""
    x = (lon + 180) * (svg_width / 360.0)
    y = (90 - lat) * (svg_height / 180.0)
    return x, y