Source code for py_crispr_analyser.align

# Copyright (C) 2025-2026 Genome Research Ltd.

import getopt
import numpy as np
from numba import jit, prange, cuda
import sys

from .utils import (
    ERROR_STR,
    FILE_VERSION,
    HEADER_SIZE,
    METADATA_SIZE,
    check_file_header,
    get_guides,
    get_file_metadata,
    print_metadata,
)

MAX_MISSMATCHES = 5
MAX_OFF_TARGETS = 2000
PAM_ON = np.left_shift(1, 40, dtype=np.uint64)
PAM_OFF = np.invert(PAM_ON, dtype=np.uint64)


@cuda.jit
def find_off_targets_kernel(
    guides: np.ndarray,
    query_sequence: np.uint64,
    reverse_query_sequence: np.uint64,
    summary: np.ndarray,
    off_target_ids_idx: np.ndarray,
    off_target_ids: np.ndarray,
    offset: np.uint64,
) -> None:
    """Find off-targets for a given query sequence using CUDA

    :param guides: The array of encoded gRNA sequences
    :param query_sequence: The query sequence
    :param reverse_query_sequence: The reverse complement of the query sequence
    :param summary: The array to store the results
    :param off_target_ids_idx: The index for the off-target_ids array
    :param off_target_ids: The array to store the off-target ids
    :param offset: The offset of the guides default 0
    :return: None
    """
    index = cuda.grid(1)
    threads_per_grid = cuda.gridDim.x * cuda.blockDim.x

    for i in range(index, guides.size, threads_per_grid):
        if index < guides.size:
            if guides[i] == ERROR_STR:
                continue
            match = query_sequence ^ guides[i]
            if match & PAM_ON:
                match = reverse_query_sequence ^ guides[i]
            match = match & PAM_OFF
            match = (match | (match >> 1)) & 0x5555555555555555
            nos_off_targets = cuda.libdevice.popcll(match)
            if nos_off_targets < MAX_MISSMATCHES:
                cuda.atomic.add(summary, nos_off_targets, 1)
                idx = cuda.atomic.add(off_target_ids_idx, 0, 1)
                cuda.atomic.add(off_target_ids, idx, offset + i + 1)


@jit(nopython=True, parallel=True)
def find_off_targets_cpu(
    guides: np.ndarray,
    query_sequence: np.uint64,
    reverse_query_sequence: np.uint64,
    summary: np.ndarray,
    off_target_ids_idx: np.ndarray,
    off_target_ids: np.ndarray,
    offset: np.uint64,
) -> None:
    """Find off-targets for a given query sequence using parallel CPU

    :param guides: The array of encoded gRNA sequences
    :param query_sequence: The query sequence
    :param reverse_query_sequence: The reverse complement of the query sequence
    :param summary: The array to store the mismatch count summary
    :param off_target_ids_idx: Single-element array holding the next write index
    :param off_target_ids: The array to store the off-target ids
    :param offset: The offset of the guides, default 0
    :return: None
    """
    match_counts = np.full(guides.size, np.int64(MAX_MISSMATCHES), dtype=np.int64)

    for i in prange(guides.size):
        if guides[i] == ERROR_STR:
            continue
        match = query_sequence ^ guides[i]
        if match & PAM_ON:
            match_r = reverse_query_sequence ^ guides[i]
            nos_off_targets = _pop_count(match_r & PAM_OFF)
        else:
            nos_off_targets = _pop_count(match & PAM_OFF)
        if nos_off_targets < MAX_MISSMATCHES:
            match_counts[i] = np.int64(nos_off_targets)

    for i in range(guides.size):
        mc = match_counts[i]
        if mc < MAX_MISSMATCHES:
            summary[mc] += 1
            idx = off_target_ids_idx[0]
            off_target_ids_idx[0] += 1
            off_target_ids[idx] = offset + np.uint64(i) + np.uint64(1)


@jit
def find_off_targets(
    guides: np.ndarray,
    query_sequence: np.uint64,
    reverse_query_sequence: np.uint64,
    offset: np.uint64 = np.uint64(0),
) -> tuple[list[int], list[np.uint64]]:
    """Find off-targets for a given query sequence using the CPU

    :param guides: The array of encoded gRNA sequences
    :param query_sequence: The query sequence
    :param reverse_query_sequence: The reverse complement of the query sequence
    :param offset: The offset of the guides default 0
    :return: A tuple containing a summary and a list of off-target ids

    .. deprecated:: 1.0.4
       :func:`find_off_targets_cpu` is far more performant.
    """
    summary = [0] * MAX_MISSMATCHES
    off_target_ids = []
    for i, guide in enumerate(guides):
        if guide == ERROR_STR:
            continue
        match = query_sequence ^ guide
        if match & PAM_ON:
            match_r = reverse_query_sequence ^ guide
            match_count = _pop_count(match_r & PAM_OFF)
        else:
            match_count = _pop_count(match & PAM_OFF)
        if match_count < MAX_MISSMATCHES:
            summary[match_count] += 1
            off_target_ids.append(offset + i + 1)
    return summary, off_target_ids


@jit
def _pop_count(x: np.uint64) -> np.uint64:
    """Count bits in integer accounting for encoding
    as everything is two bit we must convert them all to one bit,
    to do this we must turn off all MSBs, but before we can do that
    we need to ensure that when an MSB is set to 1, the LSB is also set.
    the 4 will change pam_right to 0 so it doesnt get counted.
    5 is 0101, 4 is 0100

    :param x: The integer to count the bits of
    :return: The number of bits set in the integer
    """
    x = np.uint64((x | (x >> 1)) & 0x5555555555555555)
    x = (x & 0x3333333333333333) + ((x >> 2) & 0x3333333333333333) # type: ignore
    x = (x + (x >> 4)) & 0x0F0F0F0F0F0F0F0F
    return (0x0101010101010101 * x) >> 56 # type: ignore


[docs] def reverse_complement_binary(sequence: np.uint64, size: int) -> np.uint64: """Reverse complement a binary sequence :param sequence: The binary sequence to reverse complement :param size: The size of the sequence :return: The reverse complemented binary sequence """ mask = np.uint64(0xFFFFFFFFFFFFFFFF >> (63 - (size * 2))) sequence = ~sequence & mask reversed = sequence >> (size * 2) shift = 0 for _ in range(size): reversed <<= 2 reversed |= (sequence >> shift) & 0x3 shift += 2 return np.uint64(reversed)
[docs] def run(argv=sys.argv[1:]) -> None: """Run the align command to find off-targets for CRISPRs""" inputfile = "" use_cuda = True def usage() -> None: print( """Usage: crispr_analyser_align [options...] [ids...] -h, --help Print this help message -i, --ifile <file> The input binary guides file --no-cuda Do not use CUDA GPU acceleration [ids...] The ids of the CRISPRs to find off-targets for """ ) try: opts, args = getopt.getopt( argv, "hi:", [ "help", "ifile=", "no-cuda", ], ) except getopt.GetoptError as err: print(err) usage() sys.exit(2) for opt, arg in opts: if opt in ("-h", "--help"): usage() sys.exit() elif opt in ("-i", "--ifile"): inputfile = arg elif opt == "--no-cuda": use_cuda = False if inputfile == "" or len(args) == 0: usage() sys.exit(2) with open(inputfile, "rb") as in_file: check_file_header(in_file.read(HEADER_SIZE)) print(f"Version is {FILE_VERSION}", file=sys.stderr) metadata = get_file_metadata(in_file.read(METADATA_SIZE)) print_metadata(metadata) guides = get_guides(in_file, verbose=True) print("Searching for off targets", file=sys.stderr) if use_cuda & cuda.is_available(): memory_required = guides.size * 8 / 1024 / 1024 print( f"Requires {memory_required} MB of GPU memory", file=sys.stderr, ) device_guides = cuda.to_device(guides) threads_per_block = 256 blocks_per_grid = ( guides.size + (threads_per_block - 1) // threads_per_block ) for i in range(len(args)): query_sequence = guides[int(args[i]) - 1] reverse_query_sequence = reverse_complement_binary( query_sequence, 20 ) summary = np.zeros(MAX_MISSMATCHES, dtype=np.uint32) off_target_ids_idx = np.zeros(1, dtype=np.uint32) off_target_ids = np.zeros(MAX_OFF_TARGETS, dtype=np.uint32) if use_cuda & cuda.is_available(): device_summary = cuda.to_device(summary) device_off_target_ids_idx = cuda.to_device(off_target_ids_idx) device_off_target_ids = cuda.to_device(off_target_ids) find_off_targets_kernel[blocks_per_grid, threads_per_block]( device_guides, query_sequence, reverse_query_sequence, device_summary, device_off_target_ids_idx, device_off_target_ids, metadata.offset, ) summary = device_summary.copy_to_host() off_target_ids = device_off_target_ids.copy_to_host() else: find_off_targets_cpu( guides, query_sequence, reverse_query_sequence, summary, off_target_ids_idx, off_target_ids, metadata.offset, ) print_off_targets( args[i], summary, np.sort(np.trim_zeros(off_target_ids)), metadata.species_id, )