Source code for pysiglib.pysiglib

# Copyright 2025 Daniil Shmelev
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========================================================================
"""
Signature Computations on CPU and GPU
"""

import os
import sys
import platform
from typing import Union
import ctypes
from ctypes import c_float, c_double, c_int, c_int32, c_int64, c_uint64, c_bool, POINTER, cast

import numpy as np
import torch

from .param_checks import check_type, check_type_multiple, check_non_neg, check_dtype, check_dtype_double, ensure_own_contiguous_storage
from .error_codes import err_msg

######################################################
# Figure out how pysiglib was built, in particular
# whether CUDA is being used
######################################################

try:
    from ._config import SYSTEM, BUILT_WITH_CUDA, BUILT_WITH_AVX
except ImportError as exc:
    raise RuntimeError("Could not import configuration properties from _config.py - package may not have been built correctly.") from exc

if SYSTEM != platform.system():
    raise RuntimeError("System on which pySigLib was built does not match the current system - package may not have been built correctly.")

######################################################
# Load the cpsig and cusig libraries
######################################################

# winmode = 0 is necessary here
# https://github.com/NVIDIA/warp/issues/24

dir_ = os.path.dirname(sys.modules['pysiglib'].__file__)

if SYSTEM == 'Windows':
    cpsig_path = os.path.join(dir_, 'cpsig.dll')
    cpsig = ctypes.CDLL(cpsig_path, winmode = 0)

    if BUILT_WITH_CUDA:
        cusig_path = os.path.join(dir_, 'cusig.dll')
        cusig = ctypes.CDLL(cusig_path, winmode=0)
elif SYSTEM == "Linux":
    cpsig_path = os.path.join(dir_, 'libcpsig.so')
    cpsig = ctypes.CDLL(cpsig_path, winmode=0)

    if BUILT_WITH_CUDA:
        cusig_path = os.path.join(dir_, 'libcusig.so')
        cusig = ctypes.CDLL(cusig_path, winmode=0)
elif SYSTEM == 'Darwin':
    cpsig_path = os.path.join(dir_, 'libcpsig.dylib')
    cpsig = ctypes.CDLL(cpsig_path)
else:
    raise Exception("Unsupported OS during pysiglib.py")

######################################################
# Set argtypes and restypes for all imported functions
######################################################

cpsig.sig_length.argtypes = (c_uint64, c_uint64)
cpsig.sig_length.restype = c_uint64

cpsig.sig_combine.argtypes = (POINTER(c_double), POINTER(c_double), POINTER(c_double), c_uint64, c_uint64)
cpsig.sig_combine.restype = c_int

cpsig.batch_sig_combine.argtypes = (POINTER(c_double), POINTER(c_double), POINTER(c_double), c_uint64, c_uint64, c_uint64, c_int)
cpsig.batch_sig_combine.restype = c_int

cpsig.signature_int32.argtypes = (POINTER(c_int32), POINTER(c_double), c_uint64, c_uint64, c_uint64, c_bool, c_bool, c_bool)
cpsig.signature_int32.restype = c_int

cpsig.signature_int64.argtypes = (POINTER(c_int64), POINTER(c_double), c_uint64, c_uint64, c_uint64, c_bool, c_bool, c_bool)
cpsig.signature_int64.restype = c_int

cpsig.signature_float.argtypes = (POINTER(c_float), POINTER(c_double), c_uint64, c_uint64, c_uint64, c_bool, c_bool, c_bool)
cpsig.signature_float.restype = c_int

cpsig.signature_double.argtypes = (POINTER(c_double), POINTER(c_double), c_uint64, c_uint64, c_uint64, c_bool, c_bool, c_bool)
cpsig.signature_double.restype = c_int

cpsig.batch_signature_int32.argtypes = (POINTER(c_int32), POINTER(c_double), c_uint64, c_uint64, c_uint64, c_uint64, c_bool, c_bool, c_bool, c_int)
cpsig.batch_signature_int32.restype = c_int

cpsig.batch_signature_int64.argtypes = (POINTER(c_int64), POINTER(c_double), c_uint64, c_uint64, c_uint64, c_uint64, c_bool, c_bool, c_bool, c_int)
cpsig.batch_signature_int64.restype = c_int

cpsig.batch_signature_float.argtypes = (POINTER(c_float), POINTER(c_double), c_uint64, c_uint64, c_uint64, c_uint64, c_bool, c_bool, c_bool, c_int)
cpsig.batch_signature_float.restype = c_int

cpsig.batch_signature_double.argtypes = (POINTER(c_double), POINTER(c_double), c_uint64, c_uint64, c_uint64, c_uint64, c_bool, c_bool, c_bool, c_int)
cpsig.batch_signature_double.restype = c_int

cpsig.batch_sig_kernel.argtypes = (POINTER(c_double), POINTER(c_double), c_uint64, c_uint64, c_uint64, c_uint64, c_uint64, c_uint64, c_int)
cpsig.batch_sig_kernel.restype = c_int

if BUILT_WITH_CUDA:
    cusig.batch_sig_kernel_cuda.argtypes = (POINTER(c_double), POINTER(c_double), c_uint64, c_uint64, c_uint64, c_uint64, c_uint64, c_uint64)
    cusig.batch_sig_kernel_cuda.restype = c_int

######################################################
# Some dicts to simplify dtype cases
######################################################

DTYPES = {
    "int32": c_int32,
    "int64": c_int64,
    "float32": c_float,
    "float64": c_double
}

CPSIG_SIGNATURE = {
    "int32": cpsig.signature_int32,
    "int64": cpsig.signature_int64,
    "float32": cpsig.signature_float,
    "float64": cpsig.signature_double
}

CPSIG_BATCH_SIGNATURE = {
    "int32": cpsig.batch_signature_int32,
    "int64": cpsig.batch_signature_int64,
    "float32": cpsig.batch_signature_float,
    "float64": cpsig.batch_signature_double
}

######################################################
# Python wrappers
######################################################

[docs] def sig_length(dimension : int, degree : int) -> int: """ Returns the length of a truncated signature, .. math:: \\sum_{i=0}^N d^i = \\frac{d^{N+1} - 1}{d - 1}, where :math:`d` is the dimension of the underlying path and :math:`N` is the truncation level of the signature. :param dimension: Dimension of the undelying path, :math:`d` :type dimension: int :param degree: Truncation level of the signature, :math:`N` :type degree: int :return: Length of a truncated signature :rtype: int """ check_type(dimension, "dimension", int) check_type(degree, "degree", int) check_non_neg(dimension, "dimension") check_non_neg(degree, "degree") out = cpsig.sig_length(dimension, degree) if out == 0: raise ValueError("Integer overflow encountered in sig_length") return out
class PolyDataHandler: def __init__(self, poly1_, poly2_, dimension, degree): check_type_multiple(poly1_, "poly1", (np.ndarray, torch.Tensor)) check_type_multiple(poly2_, "poly2", (np.ndarray, torch.Tensor)) self.poly1 = ensure_own_contiguous_storage(poly1_, 4) self.poly2 = ensure_own_contiguous_storage(poly2_, 4) check_dtype_double(self.poly1, "poly1") check_dtype_double(self.poly2, "poly2") self.poly_len_ = sig_length(dimension, degree) if self.poly1.shape[-1] != self.poly_len_: raise ValueError("poly1 is of incorrect length. Expected " + str(self.poly_len_) + ", got " + str(self.poly1.shape[-1])) if self.poly2.shape[-1] != self.poly_len_: raise ValueError("poly2 is of incorrect length. Expected " + str(self.poly_len_) + ", got " + str(self.poly2.shape[-1])) if len(self.poly1.shape) == 1: self.is_batch = False self.batch_size = 1 self.length_1 = self.poly1.shape[0] elif len(self.poly1.shape) == 2: self.is_batch = True self.batch_size = self.poly1.shape[0] self.length_1 = self.poly1.shape[1] else: raise ValueError("poly1.shape must have length 1 or 2, got length " + str(len(self.poly1.shape)) + " instead.") if len(self.poly2.shape) == 1: if self.batch_size != 1: raise ValueError("poly1, poly2 have different batch sizes") self.length_2 = self.poly2.shape[0] elif len(self.poly2.shape) == 2: if self.batch_size != self.poly2.shape[0]: raise ValueError("path1, path2 have different batch sizes") self.length_2 = self.poly2.shape[1] else: raise ValueError("poly2.shape must have length 1 or 2, got length " + str(len(self.poly2.shape)) + " instead.") self.result_length = self.batch_size * self.poly_len_ if isinstance(self.poly1, np.ndarray) and isinstance(self.poly2, np.ndarray): self.poly1_ptr = self.poly1.ctypes.data_as(POINTER(c_double)) self.poly2_ptr = self.poly2.ctypes.data_as(POINTER(c_double)) if self.is_batch: self.out = np.empty( shape=(self.batch_size, self.poly_len_), dtype=np.float64 ) else: self.out = np.empty( shape=self.poly_len_, dtype=np.float64 ) self.out_ptr = self.out.ctypes.data_as(POINTER(c_double)) elif isinstance(self.poly1, torch.Tensor) and isinstance(self.poly2, torch.Tensor): if not (self.poly1.device.type == "cpu" and self.poly2.device.type == "cpu"): raise ValueError("Data must be located on the cpu") self.poly1_ptr = cast(self.poly1.data_ptr(), POINTER(c_double)) self.poly2_ptr = cast(self.poly2.data_ptr(), POINTER(c_double)) if self.is_batch: self.out = torch.empty( size=(self.batch_size, self.poly_len_), dtype=torch.float64 ) else: self.out = torch.empty( size=(self.poly_len_,), dtype=torch.float64 ) self.out_ptr = cast(self.out.data_ptr(), POINTER(c_double)) else: raise ValueError("path1, path2 must both be numpy arrays or both torch arrays")
[docs] def sig_combine( sig1 : Union[np.ndarray, torch.tensor], sig2 : Union[np.ndarray, torch.tensor], dimension : int, degree : int, n_jobs : int = 1 ) -> Union[np.ndarray, torch.tensor]: """ Combines two truncated signatures of the same degree and dimension into one signature. In particular, let :math:`x_1, x_2` be two paths such that the first point of :math:`x_2` is the last point of :math:`x_1`. Let :math:`S(x_1), S(x_2)` be the truncated signatures of :math:`x_1, x_2` respectively. Then calling this function on :math:`S(x_1), S(x_2)` returns the truncated signature of the concatenated path, .. math:: S(x_1 * x_2) = S(x_1) \\otimes S(x_2), where :math:`x_1 * x_2` is the concatenation of the two paths :math:`x_1, x_2`. :param sig1: The first truncated tensor polynomial :type sig1: numpy.ndarray | torch.tensor :param sig2: The second truncated tensor polynomial. Must have the same degree and dimension as the first. :type sig2: numpy.ndarray | torch.tensor :param dimension: Dimension of the underlying space, :math:`d`. :type dimension: int :param degree: Truncation level of the tensor polynomial, :math:`N` :type degree: int :param n_jobs: Number of threads to run in parallel. If n_jobs = 1, the computation is run serially. If set to -1, all available threads are used. For n_jobs below -1, (max_threads + 1 + n_jobs) threads are used. For example if n_jobs = -2, all threads but one are used. :type n_jobs: int :return: Tensor product of the truncated tensor polynomials :rtype: numpy.ndarray | torch.tensor .. note:: Parallelising the computation by setting ``n_jobs != 1`` can be beneficial when the workload is large. However, if the workload is too small, it may be faster to set this to ``1`` and run the computation serially, due to parallelisation overhead. .. note:: Ideally, any array passed to ``pysiglib.sig_combine`` should be both contiguous and own its data. If this is not the case, ``pysiglib.sig_combine`` will internally create a contiguous copy, which may be inefficient. Example usage:: import pysiglib batch_size = 32 length = 100 dimension = 5 degree = 3 X1 = np.random.uniform(size=(batch_size, length, dimension)) X2 = np.random.uniform(size=(batch_size, length, dimension)) X_concat = np.concatenate((X1, X2), axis=1) X2 = np.concatenate((X1[:, [-1], :], X2), axis=1) # Make sure first pt of X2 is last pt of X1 sig1 = pysiglib.signature(X1, degree) sig2 = pysiglib.signature(X2, degree) # The tensor product... sig_mult = pysiglib.sig_combine(sig1, sig2, dimension, degree) # ... is the same as the signature of the concatenated path: sig = pysiglib.signature(X_concat, degree) """ check_type(dimension, "dimension", int) check_type(degree, "degree", int) check_non_neg(dimension, "dimension") check_non_neg(degree, "degree") check_type(n_jobs, "n_jobs", int) if n_jobs == 0: raise ValueError("n_jobs cannot be 0") data = PolyDataHandler(sig1, sig2, dimension, degree) if data.is_batch: err_code = cpsig.batch_sig_combine( data.poly1_ptr, data.poly2_ptr, data.out_ptr, data.batch_size, dimension, degree, n_jobs ) else: err_code = cpsig.sig_combine( data.poly1_ptr, data.poly2_ptr, data.out_ptr, dimension, degree ) if err_code: raise Exception("Error in pysiglib.signature: " + err_msg(err_code)) return data.out
class SigDataHandler: def __init__(self, path_, degree, time_aug, lead_lag): check_type_multiple(path_, "path",(np.ndarray, torch.Tensor)) self.path = ensure_own_contiguous_storage(path_, 4) check_dtype(self.path, "path") check_type(degree, "degree", int) check_non_neg(degree, "degree") check_type(time_aug, "time_aug", bool) check_type(lead_lag, "lead_lag", bool) self.degree = degree self.time_aug = time_aug self.lead_lag = lead_lag self.get_dims(self.path) if isinstance(self.path, np.ndarray): self.init_numpy(self.path) elif isinstance(self.path, torch.Tensor): if not self.path.device.type == "cpu": raise ValueError("Data must be located on the cpu") self.init_torch(self.path) def init_numpy(self, path): self.dtype = str(path.dtype) self.data_ptr = path.ctypes.data_as(POINTER(DTYPES[self.dtype])) _, dimension_ = self.transformed_dims() if self.is_batch: self.out = np.empty( shape=(self.batch_size, sig_length(dimension_, self.degree)), dtype=np.float64 ) else: self.out = np.empty( shape=sig_length(dimension_, self.degree), dtype=np.float64 ) self.out_ptr = self.out.ctypes.data_as(POINTER(c_double)) def init_torch(self, path): self.dtype = str(path.dtype)[6:] self.data_ptr = cast(path.data_ptr(), POINTER(DTYPES[self.dtype])) _, dimension_ = self.transformed_dims() if self.is_batch: self.out = torch.empty( size=(self.batch_size, sig_length(dimension_, self.degree)), dtype=torch.float64 ) else: self.out = torch.empty( size=(sig_length(dimension_, self.degree),), dtype=torch.float64 ) self.out_ptr = cast(self.out.data_ptr(), POINTER(c_double)) def get_dims(self, path): if len(path.shape) == 2: self.is_batch = False self.length = path.shape[0] self.dimension = path.shape[1] elif len(path.shape) == 3: self.is_batch = True self.batch_size = path.shape[0] self.length = path.shape[1] self.dimension = path.shape[2] else: raise ValueError("path.shape must have length 2 or 3, got length " + str(len(path.shape)) + " instead.") def transformed_dims(self): length_ = self.length dimension_ = self.dimension if self.lead_lag: length_ *= 2 length_ -= 3 dimension_ *= 2 if self.time_aug: dimension_ += 1 return length_, dimension_ def signature_(data, time_aug = False, lead_lag = False, horner = True): err_code = CPSIG_SIGNATURE[data.dtype]( data.data_ptr, data.out_ptr, data.dimension, data.length, data.degree, time_aug, lead_lag, horner ) if err_code: raise Exception("Error in pysiglib.signature: " + err_msg(err_code)) return data.out def batch_signature_(data, time_aug = False, lead_lag = False, horner = True, n_jobs = 1): err_code = CPSIG_BATCH_SIGNATURE[data.dtype]( data.data_ptr, data.out_ptr, data.batch_size, data.dimension, data.length, data.degree, time_aug, lead_lag, horner, n_jobs ) if err_code: raise Exception("Error in pysiglib.signature: " + err_msg(err_code)) return data.out
[docs] def signature( path : Union[np.ndarray, torch.tensor], degree : int, time_aug : bool = False, lead_lag : bool = False, horner : bool = True, n_jobs : int = 1 ) -> Union[np.ndarray, torch.tensor]: """ Computes the truncated signature of single path or a batch of paths. For a single path :math:`x`, the signature is given by .. math:: S(x)_{[s,t]} := \\left( 1, S(x)^{(1)}_{[s,t]}, \\ldots, S(x)^{(N)}_{[s,t]}\\right) \\in T((\\mathbb{R}^d)), .. math:: S(x)^{(k)}_{[s,t]} := \\int_{s < t_1 < \\cdots < t_k < t} dx_{t_1} \\otimes dx_{t_2} \\otimes \\cdots \\otimes dx_{t_k} \\in \\left(\\mathbb{R}^d\\right)^{\\otimes k}. :param path: The underlying path or batch of paths, given as a `numpy.ndarray` or `torch.tensor`. For a single path, this must be of shape (length, dimension). For a batch of paths, this must be of shape (batch size, length, dimension). :type path: numpy.ndarray | torch.tensor :param degree: The truncation level of the signature, :math:`N`. :type degree: int :param time_aug: If set to True, will compute the signature of the time-augmented path, :math:`\\hat{x}_t := (t, x_t)`, defined as the original path with an extra channel set to time, :math:`t`. :type time_aug: bool :param lead_lag: If set to True, will compute the signatue of the path after applying the lead-lag transformation. :type lead_lag: bool :param horner: If True, will use Horner's algorithm for polynomial multiplication. :type horner: bool :param n_jobs: Number of threads to run in parallel. If n_jobs = 1, the computation is run serially. If set to -1, all available threads are used. For n_jobs below -1, (max_threads + 1 + n_jobs) threads are used. For example if n_jobs = -2, all threads but one are used. :type n_jobs: int :return: Truncated signature, or a batch of truncated signatures. :rtype: numpy.ndarray | torch.tensor .. note:: Ideally, any array passed to ``pysiglib.signature`` should be both contiguous and own its data. If this is not the case, ``pysiglib.signature`` will internally create a contiguous copy, which may be inefficient. """ check_type(horner, "horner", bool) data = SigDataHandler(path, degree, time_aug, lead_lag) if data.is_batch: check_type(n_jobs, "n_jobs", int) if n_jobs == 0: raise ValueError("n_jobs cannot be 0") return batch_signature_(data, time_aug, lead_lag, horner, n_jobs) return signature_(data, time_aug, lead_lag, horner)
class SigKernelDataHandler: def __init__(self, path1_, path2_, dyadic_order): check_type_multiple(path1_, "path1", (np.ndarray, torch.Tensor)) check_type_multiple(path2_, "path2", (np.ndarray, torch.Tensor)) self.path1 = ensure_own_contiguous_storage(path1_, 4) self.path2 = ensure_own_contiguous_storage(path2_, 4) check_dtype(self.path1, "path1") check_dtype(self.path2, "path2") if isinstance(dyadic_order, tuple) and len(dyadic_order) == 2: self.dyadic_order_1 = dyadic_order[0] self.dyadic_order_2 = dyadic_order[1] elif isinstance(dyadic_order, int): self.dyadic_order_1 = dyadic_order self.dyadic_order_2 = dyadic_order else: raise TypeError("dyadic_order must be an integer or a tuple of length 2") if self.dyadic_order_1 < 0 or self.dyadic_order_2 < 0: raise ValueError("dyadic_order must be a non-negative integer or tuple of non-negative integers") if len(self.path1.shape) == 2: self.is_batch = False self.batch_size = 1 self.length_1 = self.path1.shape[0] self.dimension = self.path1.shape[1] elif len(self.path1.shape) == 3: self.is_batch = True self.batch_size = self.path1.shape[0] self.length_1 = self.path1.shape[1] self.dimension = self.path1.shape[2] else: raise ValueError("path1.shape must have length 2 or 3, got length " + str(len(self.path1.shape)) + " instead.") if len(self.path2.shape) == 2: if self.batch_size != 1: raise ValueError("path1, path2 have different batch sizes") self.length_2 = self.path2.shape[0] if self.dimension != self.path2.shape[1]: raise ValueError("path1, path2 have different dimensions") elif len(self.path2.shape) == 3: if self.batch_size != self.path2.shape[0]: raise ValueError("path1, path2 have different batch sizes") self.length_2 = self.path2.shape[1] if self.dimension != self.path2.shape[2]: raise ValueError("path1, path2 have different dimensions") else: raise ValueError("path2.shape must have length 2 or 3, got length " + str(len(self.path2.shape)) + " instead.") if isinstance(self.path1, np.ndarray) and isinstance(self.path2, np.ndarray): self.device = "cpu" self.out = np.empty(shape=self.batch_size, dtype=np.float64) self.out_ptr = self.out.ctypes.data_as(POINTER(c_double)) elif isinstance(self.path1, torch.Tensor) and isinstance(self.path2, torch.Tensor) and self.path1.device == self.path2.device: self.device = self.path1.device.type self.out = torch.empty(self.batch_size, dtype=torch.float64, device = self.device) self.out_ptr = cast(self.out.data_ptr(), POINTER(c_double)) else: raise ValueError("path1, path2 must both be numpy arrays or both torch arrays on the same device") self.torch_path1 = torch.as_tensor(self.path1) # Avoids data copy self.torch_path2 = torch.as_tensor(self.path2) if self.is_batch: x1 = self.torch_path1[:, 1:, :] - self.torch_path1[:, :-1, :] y1 = self.torch_path2[:, 1:, :] - self.torch_path2[:, :-1, :] else: x1 = (self.torch_path1[1:, :] - self.torch_path1[:-1, :])[None, :, :] y1 = (self.torch_path2[1:, :] - self.torch_path2[:-1, :])[None, :, :] self.gram = torch.bmm(x1, y1.permute(0, 2, 1)) def sig_kernel_(data, n_jobs): err_code = cpsig.batch_sig_kernel( cast(data.gram.data_ptr(), POINTER(c_double)), data.out_ptr, data.batch_size, data.dimension, data.length_1, data.length_2, data.dyadic_order_1, data.dyadic_order_2, n_jobs ) if err_code: raise Exception("Error in pysiglib.sig_kernel: " + err_msg(err_code)) return data.out def sig_kernel_cuda_(data): err_code = cusig.batch_sig_kernel_cuda( cast(data.gram.data_ptr(), POINTER(c_double)), data.out_ptr, data.batch_size, data.dimension, data.length_1, data.length_2, data.dyadic_order_1, data.dyadic_order_2 ) if err_code: raise Exception("Error in pysiglib.sig_kernel: " + err_msg(err_code)) return data.out
[docs] def sig_kernel( path1 : Union[np.ndarray, torch.tensor], path2 : Union[np.ndarray, torch.tensor], dyadic_order : Union[int, tuple], n_jobs : int = 1 ) -> Union[np.ndarray, torch.tensor]: #TODO: add time-aug and lead-lag """ Computes a single signature kernel or a batch of signature kernels. The signature kernel of two :math:`d`-dimensional paths :math:`x,y` is defined as .. math:: k_{x,y}(s,t) := \\left< S(x)_{[0,s]}, S(y)_{[0, t]} \\right>_{T((\\mathbb{R}^d))} where the inner product is defined as .. math:: \\left< A, B \\right> := \\sum_{k=0}^{\\infty} \\left< A_k, B_k \\right>_{\\left(\\mathbb{R}^d\\right)^{\\otimes k}} .. math:: \\left< u, v \\right>_{\\left(\\mathbb{R}^d\\right)^{\\otimes k}} := \\prod_{i=1}^k \\left< u_i, v_i \\right>_{\\mathbb{R}^d} :param path1: The first underlying path or batch of paths, given as a `numpy.ndarray` or `torch.tensor`. For a single path, this must be of shape (length, dimension). For a batch of paths, this must be of shape (batch size, length, dimension). :type path1: numpy.ndarray | torch.tensor :param path2: The second underlying path or batch of paths, given as a `numpy.ndarray` or `torch.tensor`. For a single path, this must be of shape (length, dimension). For a batch of paths, this must be of shape (batch size, length, dimension). :type path2: numpy.ndarray | torch.tensor :param dyadic_order: If set to a positive integer :math:`\\lambda`, will refine the PDE grid by a factor of :math:`2^\\lambda`. :type dyadic_order: int | tuple :param n_jobs: (Only applicable to CPU computation) Number of threads to run in parallel. If n_jobs = 1, the computation is run serially. If set to -1, all available threads are used. For n_jobs below -1, (max_threads + 1 + n_jobs) threads are used. For example if n_jobs = -2, all threads but one are used. :type n_jobs: int :return: Single signature kernel or batch of signature kernels :rtype: numpy.ndarray | torch.tensor .. note:: Ideally, any array passed to ``pysiglib.sig_kernel`` should be both contiguous and own its data. If this is not the case, ``pysiglib.sig_kernel`` will internally create a contiguous copy, which may be inefficient. """ check_type(n_jobs, "n_jobs", int) if n_jobs == 0: raise ValueError("n_jobs cannot be 0") data = SigKernelDataHandler(path1, path2, dyadic_order) if data.device == "cpu": return sig_kernel_(data, n_jobs) if not BUILT_WITH_CUDA: raise RuntimeError("pySigLib was build without CUDA - data must be moved to CPU.") return sig_kernel_cuda_(data)