Source code for pysiglib.streams

# Copyright 2026 Daniil Shmelev
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========================================================================

from typing import Union, List, Tuple
import numpy as np
import torch

from .param_checks import check_pos, check_type, check_n_jobs
from .sig_length import sig_length, log_sig_length
from .sig_join import sig_join
from .sig import sig_combine, sig
from .log_sig_join import log_sig_join
from .log_sig_combine import log_sig_combine
from .log_sig import log_sig

try:
    import jax
    import jax.numpy as jnp
    _HAS_JAX = True
except ImportError:
    _HAS_JAX = False


def _is_jax(x):
    """Return True if ``x`` is a JAX array. Returns False if JAX is not installed."""
    return _HAS_JAX and isinstance(x, jax.Array)


def _make_zero(length, batch_shape, like_arr):
    """Create a zero array of shape ``(..., length)`` matching dtype/device of ``like_arr``."""
    full_shape = (*batch_shape, length)
    if isinstance(like_arr, torch.Tensor):
        return torch.zeros(full_shape, dtype=like_arr.dtype, device=like_arr.device)
    if _is_jax(like_arr):
        return jnp.zeros(full_shape, dtype=like_arr.dtype)
    return np.zeros(full_shape, dtype=like_arr.dtype)


def _make_identity_sig(sig_len, batch_shape, like_arr, scalar_term=True):
    """Create an identity signature of shape ``(..., sig_len)``.

    With ``scalar_term=True`` the result is ``[1, 0, ..., 0]``; with ``scalar_term=False``
    the leading 1 is stripped and the identity becomes all zeros (the sig of a
    zero-length path is zero in every tensor level >= 1).
    """
    identity = _make_zero(sig_len, batch_shape, like_arr)
    if scalar_term:
        # JAX arrays are immutable; use functional update.
        if _is_jax(identity):
            return identity.at[..., 0].set(1.0)
        identity[..., 0] = 1.0
    return identity


def _stack(arrays, like_arr):
    """Stack arrays along a new leading dimension."""
    if isinstance(like_arr, torch.Tensor):
        return torch.stack(arrays)
    if _is_jax(like_arr):
        return jnp.stack(arrays)
    return np.stack(arrays)


def _validate_push_point(point, expected_dim, expected_batch_shape):
    """Validate a ``push(point)`` argument and return its inferred batch shape.

    Point must have shape ``(..., dimension)``. If the stream already has a
    locked-in batch shape, the input shape must match it.
    """
    if point.ndim < 1 or point.shape[-1] != expected_dim:
        raise ValueError(
            f"push expects a point of shape (..., {expected_dim}); "
            f"got shape {tuple(point.shape)}")
    batch = tuple(point.shape[:-1])
    if expected_batch_shape is not None and batch != expected_batch_shape:
        raise ValueError(
            f"push batch shape {batch} does not match the shape {expected_batch_shape} "
            f"locked in by the first push")
    return batch


def _validate_push_batch(points, expected_dim, expected_batch_shape):
    """Validate a ``push_batch(points)`` argument and return ``(batch_shape, n_points)``.

    Points must have shape ``(..., n_points, dimension)``. If the stream already
    has a locked-in batch shape, the input shape must match it.
    """
    if points.ndim < 2 or points.shape[-1] != expected_dim:
        raise ValueError(
            f"push_batch expects points of shape (..., n_points, {expected_dim}); "
            f"got shape {tuple(points.shape)}")
    batch = tuple(points.shape[:-2])
    n_points = points.shape[-2]
    if expected_batch_shape is not None and batch != expected_batch_shape:
        raise ValueError(
            f"push_batch batch shape {batch} does not match the shape {expected_batch_shape} "
            f"locked in by the first push")
    return batch, n_points


def _cat_time(a, b):
    """Concatenate along the time axis (axis ``-2``)."""
    if isinstance(a, torch.Tensor):
        return torch.cat([a, b], dim=-2)
    if _is_jax(a):
        return jnp.concatenate([a, b], axis=-2)
    return np.concatenate([a, b], axis=-2)


def _expand_time(point):
    """Insert a length-1 time axis at position ``-2``: ``(..., dim) -> (..., 1, dim)``."""
    if isinstance(point, torch.Tensor):
        return point.unsqueeze(-2)
    if _is_jax(point):
        return jnp.expand_dims(point, axis=-2)
    return point[..., np.newaxis, :]


def _flip_time(path):
    """Reverse along the time axis (axis ``-2``) and return a contiguous copy."""
    if isinstance(path, torch.Tensor):
        return path.flip(-2).contiguous()
    if _is_jax(path):
        # JAX arrays are immutable; jnp.flip returns a fresh array.
        return jnp.flip(path, axis=-2)
    return np.flip(path, axis=-2).copy()


def _last_time_step(points):
    """Extract the last time-step, preserving batch shape. Returns an owning copy."""
    if isinstance(points, torch.Tensor):
        return points[..., -1, :].contiguous()
    if _is_jax(points):
        # JAX arrays are immutable; the slice already owns its data.
        return points[..., -1, :]
    return points[..., -1, :].copy()


def _copy_sig(s):
    """Return a fresh copy of a signature tensor (not a view)."""
    if isinstance(s, torch.Tensor):
        return s.clone()
    if _is_jax(s):
        # JAX arrays are immutable; returning the same reference is safe.
        return s
    return s.copy()


[docs] class SigStream: """ A stateful stream that maintains precomputed cumulative signatures over a growing path, supporting efficient push/pop operations and O(1) arbitrary interval queries. Cumulative signatures ``S(0, t)`` and their inverses ``S(0, t)^{-1}`` are stored for each point. Any interval signature is computed via Chen's identity: ``S(a, b) = S(0, a)^{-1} * S(0, b)``. Supports numpy arrays, torch tensors (with autograd via ``pysiglib.torch_api``), and JAX arrays (via ``pysiglib.jax_api``). Accepts a single path or a batch of independent paths - the batch shape is inferred from the first ``push`` / ``push_batch`` call and locked in for the rest of the stream's lifetime. A single ``SigStream`` instance can therefore track many independent paths in parallel. :param dimension: Dimension of the underlying space, :math:`d`. :type dimension: int :param degree: Truncation level of the signature, :math:`N`. :type degree: int :param scalar_term: If True, stored signatures include the leading constant 1 at index 0. If False (default), the leading element is stripped. :type scalar_term: bool :param n_jobs: Number of threads to run in parallel in the internal ``sig``, ``sig_join`` and ``sig_combine`` calls. If ``n_jobs = 1`` the computation is serial. If ``-1``, all available threads are used. For ``n_jobs < -1``, ``max_threads + 1 + n_jobs`` threads are used. :type n_jobs: int Example:: import pysiglib import numpy as np # Single path stream = pysiglib.SigStream(dimension=3, degree=4) path = np.random.randn(50, 3) stream.push_batch(path) s = stream.sig(10, 30) # shape (sig_length,) # Batch of 8 independent paths tracked in parallel stream = pysiglib.SigStream(dimension=3, degree=4) paths = np.random.randn(8, 50, 3) stream.push_batch(paths) s = stream.sig(10, 30) # shape (8, sig_length) """ def __init__(self, dimension: int, degree: int, *, scalar_term: bool = False, n_jobs: int = 1, _sig_join=None, _sig_combine=None, _sig=None): check_n_jobs(n_jobs) self._dimension = dimension self._degree = degree self._scalar_term = scalar_term self._sig_len = sig_length(dimension, degree, scalar_term=scalar_term) raw_sig = _sig or sig raw_sig_join = _sig_join or sig_join raw_sig_combine = _sig_combine or sig_combine self._sig_fn = lambda path, deg: raw_sig( path, deg, scalar_term=scalar_term, n_jobs=n_jobs) self._sig_combine_fn = lambda s1, s2, dim, deg: raw_sig_combine( s1, s2, dim, deg, n_jobs=n_jobs) self._sig_join_fn = lambda s, disp, dim, deg, prepend=False: raw_sig_join( s, disp, dim, deg, prepend=prepend, n_jobs=n_jobs) self._sigs = [] # cumulative forward sigs at each checkpoint self._inv_sigs = [] # cumulative inverse sigs at each checkpoint self._last_point = None self._start = 0 self._batch_shape = None # locked in by the first push
[docs] def push(self, point: Union[np.ndarray, torch.Tensor]) -> None: """ Append a single point (or batch of points, one per tracked path) and update the cumulative signature. :param point: Shape ``(..., dimension)``. The leading batch dimensions are either empty (single-path stream) or match the batch shape locked in by the first push. :type point: numpy.ndarray | torch.Tensor """ batch = _validate_push_point(point, self._dimension, self._batch_shape) if self._last_point is None: self._batch_shape = batch self._last_point = point identity = _make_identity_sig(self._sig_len, batch, point, self._scalar_term) self._sigs.append(identity) self._inv_sigs.append(identity) return displacement = point - self._last_point new_sig = self._sig_join_fn( self._sigs[-1], displacement, self._dimension, self._degree) new_inv = self._sig_join_fn( self._inv_sigs[-1], -displacement, self._dimension, self._degree, prepend=True) self._sigs.append(new_sig) self._inv_sigs.append(new_inv) self._last_point = point
[docs] def push_batch(self, points: Union[np.ndarray, torch.Tensor]) -> None: """ Append multiple points to the stream. Computes the batch signature in a single batched call rather than per-point sequential joins. :param points: Shape ``(..., n_points, dimension)``. The leading batch dimensions are either empty (single-path stream) or match the batch shape locked in by the first push. :type points: numpy.ndarray | torch.Tensor """ batch, n_points = _validate_push_batch(points, self._dimension, self._batch_shape) if n_points == 0: return if self._last_point is None: self.push(points[..., 0, :]) if n_points == 1: return points = points[..., 1:, :] n_points -= 1 last_expanded = _expand_time(self._last_point) batch_path = _cat_time(last_expanded, points) batch_path_rev = _flip_time(batch_path) batch_sig = self._sig_fn(batch_path, self._degree) new_cumulative = self._sig_combine_fn( self._sigs[-1], batch_sig, self._dimension, self._degree) batch_sig_rev = self._sig_fn(batch_path_rev, self._degree) new_inv = self._sig_combine_fn( batch_sig_rev, self._inv_sigs[-1], self._dimension, self._degree) self._sigs.append(new_cumulative) self._inv_sigs.append(new_inv) self._last_point = _last_time_step(points)
[docs] def pop_front(self) -> None: """Remove the oldest cumulative signature from the stream.""" if len(self._sigs) <= 1: raise ValueError("Cannot pop_front: stream has 1 or fewer entries") self._sigs.pop(0) self._inv_sigs.pop(0) self._start += 1
[docs] def sig(self, start: int, end: int) -> Union[np.ndarray, torch.Tensor]: """ Query the signature over an interval via Chen's identity. :param start: Start index (absolute, inclusive). :type start: int :param end: End index (absolute, inclusive). :type end: int :return: The signature of shape ``(..., sig_length)`` for the interval ``path[start:end+1]``. :rtype: numpy.ndarray | torch.Tensor """ si = start - self._start ei = end - self._start if si < 0 or ei >= len(self._sigs): raise IndexError(f"Indices [{start}, {end}] out of range [{self._start}, {self._start + len(self._sigs) - 1}]") # Fast path: no pops + si==0 means combine is a no-op. Return a fresh # copy so the caller can't mutate internal state. if si == 0 and self._start == 0: return _copy_sig(self._sigs[ei]) return self._sig_combine_fn( self._inv_sigs[si], self._sigs[ei], self._dimension, self._degree)
[docs] def sig_batch(self, intervals: List[Tuple[int, int]]) -> Union[np.ndarray, torch.Tensor]: """ Query signatures over multiple intervals at once. :param intervals: List of ``(start, end)`` pairs. :type intervals: list[tuple[int, int]] :return: Stacked signatures of shape ``(K, ..., sig_length)``. :rtype: numpy.ndarray | torch.Tensor """ results = [self.sig(s, e) for s, e in intervals] return _stack(results, self._sigs[0])
[docs] def sig_all(self) -> Union[np.ndarray, torch.Tensor]: """ Return the expanding (cumulative) signatures ``S(0, 0), S(0, 1), ..., S(0, t)``. :return: Stacked signatures of shape ``(n, ..., sig_length)``. :rtype: numpy.ndarray | torch.Tensor """ return _stack(self._sigs, self._sigs[0])
@property def size(self) -> int: """Number of time-steps currently in the stream.""" return len(self._sigs) @property def start_index(self) -> int: """Absolute index of the first point in the stream.""" return self._start @property def end_index(self) -> int: """Absolute index of the last point in the stream.""" return self._start + len(self._sigs) - 1 @property def batch_shape(self) -> Union[tuple, None]: """Batch shape locked in by the first push, or ``None`` if nothing has been pushed.""" return self._batch_shape
[docs] class LogSigStream: """ A stateful stream that maintains precomputed cumulative log-signatures over a growing path, supporting efficient push/pop operations and O(1) arbitrary interval queries. Cumulative log-signatures are stored for each point. Any interval log-signature is computed via BCH: ``L(a, b) = BCH(-L(0, a), L(0, b))``, since the inverse of a log-signature is its negation. Supports numpy arrays, torch tensors (with autograd via ``pysiglib.torch_api``), and JAX arrays (via ``pysiglib.jax_api``). Accepts a single path or a batch of independent paths - the batch shape is inferred from the first ``push`` / ``push_batch`` call and locked in for the rest of the stream's lifetime. .. note:: You must call ``pysiglib.prepare_log_sig(dimension, degree)`` before creating a ``LogSigStream``. This precomputes the Lyndon basis and BCH coefficients. :param dimension: Dimension of the underlying space, :math:`d`. :type dimension: int :param degree: Truncation level of the log-signature, :math:`N`. :type degree: int :param method: Method to use for internal log-signature computation (``2`` or ``3``). Method ``2`` uses the Lyndon bracket basis via the signature-to-log-signature projection; method ``3`` computes log-sigs directly from the path via BCH. :type method: int :param n_jobs: Number of threads to run in parallel in internal ``log_sig``, ``log_sig_join`` and ``log_sig_combine`` calls. ``-1`` uses all available threads; for ``n_jobs < -1``, ``max_threads + 1 + n_jobs`` threads are used. :type n_jobs: int Example:: import pysiglib import numpy as np pysiglib.prepare_log_sig(3, 4, method=2) # Single path stream = pysiglib.LogSigStream(dimension=3, degree=4) path = np.random.randn(50, 3) stream.push_batch(path) ls = stream.sig(10, 30) # shape (log_sig_length,) # Batch of 8 independent paths stream = pysiglib.LogSigStream(dimension=3, degree=4) paths = np.random.randn(8, 50, 3) stream.push_batch(paths) ls = stream.sig(10, 30) # shape (8, log_sig_length) """ def __init__(self, dimension: int, degree: int, *, method: int = 2, n_jobs: int = 1, _log_sig_join=None, _log_sig_combine=None, _log_sig=None): if method not in (2, 3): raise ValueError( f"LogSigStream requires method=2 or method=3 (Lyndon basis); " f"got method={method}. Method 1 uses the Hall basis which is " f"incompatible with log_sig_combine/log_sig_join.") check_n_jobs(n_jobs) self._dimension = dimension self._degree = degree self._ls_len = log_sig_length(dimension, degree) raw_log_sig = _log_sig or log_sig raw_log_sig_join = _log_sig_join or log_sig_join raw_log_sig_combine = _log_sig_combine or log_sig_combine self._log_sig_fn = lambda path, deg: raw_log_sig( path, deg, method=method, n_jobs=n_jobs) self._log_sig_combine_fn = lambda s1, s2, dim, deg: raw_log_sig_combine( s1, s2, dim, deg, n_jobs=n_jobs) self._log_sig_join_fn = lambda ls, disp, dim, deg: raw_log_sig_join( ls, disp, dim, deg, n_jobs=n_jobs) self._log_sigs = [] self._last_point = None self._start = 0 self._batch_shape = None
[docs] def push(self, point: Union[np.ndarray, torch.Tensor]) -> None: """ Append a single point (or batch of points, one per tracked path) and update the cumulative log-signature. :param point: Shape ``(..., dimension)``. :type point: numpy.ndarray | torch.Tensor """ batch = _validate_push_point(point, self._dimension, self._batch_shape) if self._last_point is None: self._batch_shape = batch self._last_point = point self._log_sigs.append(_make_zero(self._ls_len, batch, point)) return displacement = point - self._last_point new_ls = self._log_sig_join_fn( self._log_sigs[-1], displacement, self._dimension, self._degree) self._log_sigs.append(new_ls) self._last_point = point
[docs] def push_batch(self, points: Union[np.ndarray, torch.Tensor]) -> None: """ Append multiple points to the stream. Computes the batch log-signature in a single batched call rather than per-point sequential joins. :param points: Shape ``(..., n_points, dimension)``. :type points: numpy.ndarray | torch.Tensor """ batch, n_points = _validate_push_batch(points, self._dimension, self._batch_shape) if n_points == 0: return if self._last_point is None: self.push(points[..., 0, :]) if n_points == 1: return points = points[..., 1:, :] n_points -= 1 last_expanded = _expand_time(self._last_point) batch_path = _cat_time(last_expanded, points) batch_ls = self._log_sig_fn(batch_path, self._degree) new_cumulative = self._log_sig_combine_fn( self._log_sigs[-1], batch_ls, self._dimension, self._degree) self._log_sigs.append(new_cumulative) self._last_point = _last_time_step(points)
[docs] def pop_front(self) -> None: """Remove the oldest cumulative log-signature from the stream.""" if len(self._log_sigs) <= 1: raise ValueError("Cannot pop_front: stream has 1 or fewer entries") self._log_sigs.pop(0) self._start += 1
[docs] def sig(self, start: int, end: int) -> Union[np.ndarray, torch.Tensor]: """ Query the log-signature over an interval via BCH. :param start: Start index (absolute, inclusive). :type start: int :param end: End index (absolute, inclusive). :type end: int :return: The log-signature of shape ``(..., log_sig_length)`` for the interval ``path[start:end+1]``. :rtype: numpy.ndarray | torch.Tensor """ si = start - self._start ei = end - self._start if si < 0 or ei >= len(self._log_sigs): raise IndexError(f"Indices [{start}, {end}] out of range [{self._start}, {self._start + len(self._log_sigs) - 1}]") # Fast path: no pops + si==0 means BCH is a no-op. Return a fresh copy. if si == 0 and self._start == 0: return _copy_sig(self._log_sigs[ei]) return self._log_sig_combine_fn( -self._log_sigs[si], self._log_sigs[ei], self._dimension, self._degree)
[docs] def sig_batch(self, intervals: List[Tuple[int, int]]) -> Union[np.ndarray, torch.Tensor]: """ Query log-signatures over multiple intervals at once. :param intervals: List of ``(start, end)`` pairs. :type intervals: list[tuple[int, int]] :return: Stacked log-signatures of shape ``(K, ..., log_sig_length)``. :rtype: numpy.ndarray | torch.Tensor """ results = [self.sig(s, e) for s, e in intervals] return _stack(results, self._log_sigs[0])
[docs] def sig_all(self) -> Union[np.ndarray, torch.Tensor]: """ Return the expanding (cumulative) log-signatures. :return: Stacked log-signatures of shape ``(n, ..., log_sig_length)``. :rtype: numpy.ndarray | torch.Tensor """ return _stack(self._log_sigs, self._log_sigs[0])
@property def size(self) -> int: """Number of time-steps currently in the stream.""" return len(self._log_sigs) @property def start_index(self) -> int: """Absolute index of the first point in the stream.""" return self._start @property def end_index(self) -> int: """Absolute index of the last point in the stream.""" return self._start + len(self._log_sigs) - 1 @property def batch_shape(self) -> Union[tuple, None]: """Batch shape locked in by the first push, or ``None`` if nothing has been pushed.""" return self._batch_shape
class _WindowStream: """Base class for windowed stream classes. Buffers points (along the time axis) and computes each window's (log-)signature directly from the path slice. Supports arbitrary leading batch dimensions; batch shape is locked in on first push. """ def __init__(self, sig_fn, dimension: int, degree: int, window_size: int, stride: int): self._sig_fn = sig_fn self._dimension = dimension self._degree = degree self._window_size = window_size self._stride = stride self._pending = [] # unbatched points awaiting consolidation self._buffer = None # consolidated array of points, shape (..., buf_len, dim) self._buf_len = 0 # Set when `stride > window_size` creates a gap between windows # wider than the buffer currently holds; counts points to drop. self._skip_next = 0 self._windows = [] self._next_window_end = window_size - 1 self._batch_shape = None def push(self, point: Union[np.ndarray, torch.Tensor]) -> None: """ Append a single point (or batch of points). If a new window is completed, its (log-)signature is computed and stored. :param point: Shape ``(..., dimension)``. :type point: numpy.ndarray | torch.Tensor """ batch = _validate_push_point(point, self._dimension, self._batch_shape) if self._batch_shape is None: self._batch_shape = batch if self._skip_next > 0: self._skip_next -= 1 return self._pending.append(point) self._buf_len += 1 self._emit_windows() def push_batch(self, points: Union[np.ndarray, torch.Tensor]) -> None: """ Append multiple points, emitting windows as they become complete. :param points: Shape ``(..., n_points, dimension)``. :type points: numpy.ndarray | torch.Tensor """ batch, n_points = _validate_push_batch(points, self._dimension, self._batch_shape) if n_points == 0: return if self._batch_shape is None: self._batch_shape = batch if self._skip_next > 0: skip_count = min(self._skip_next, n_points) self._skip_next -= skip_count points = points[..., skip_count:, :] n_points -= skip_count if n_points == 0: return self._flush_pending() if self._buffer is None: self._buffer = points else: self._buffer = _cat_time(self._buffer, points) self._buf_len += n_points self._emit_windows() def _flush_pending(self): """Consolidate pending single-point pushes into the buffer.""" if not self._pending: return first = self._pending[0] if isinstance(first, torch.Tensor): batch = torch.stack(self._pending, dim=-2) else: batch = np.stack(self._pending, axis=-2) if self._buffer is None: self._buffer = batch else: self._buffer = _cat_time(self._buffer, batch) self._pending.clear() def _emit_windows(self): self._flush_pending() if self._buffer is None: return while self._buf_len - 1 >= self._next_window_end: w_start = self._next_window_end - self._window_size + 1 w_end = self._next_window_end + 1 window_path = self._buffer[..., w_start:w_end, :] self._windows.append(self._sig_fn(window_path, self._degree)) self._next_window_end += self._stride # When `stride > window_size`, `earliest_needed` can exceed `_buf_len`; # drop the whole buffer and defer the remainder to `_skip_next`. earliest_needed = self._next_window_end - self._window_size + 1 if earliest_needed > 0: if earliest_needed > self._buf_len: self._skip_next += earliest_needed - self._buf_len trim = self._buf_len else: trim = earliest_needed self._buffer = self._buffer[..., trim:, :] self._buf_len -= trim self._next_window_end -= earliest_needed def sig(self) -> Union[np.ndarray, torch.Tensor]: """ Return the stacked (log-)signatures of all complete windows. :return: Array of shape ``(num_windows, ..., sig_length)``. :rtype: numpy.ndarray | torch.Tensor """ if not self._windows: raise ValueError("No complete windows yet") return _stack(self._windows, self._windows[0]) @property def num_windows(self) -> int: """Number of complete windows emitted so far.""" return len(self._windows) @property def batch_shape(self) -> Union[tuple, None]: """Batch shape locked in by the first push, or ``None`` if nothing has been pushed.""" return self._batch_shape
[docs] class SigWindowStream(_WindowStream): """ A fixed-width sliding window over a stream of incoming points that emits windowed signatures. A new window is emitted every ``stride`` points once enough points have been accumulated to fill the window. Accepts a single path or a batch of independent paths - the batch shape is inferred from the first ``push`` / ``push_batch`` call. Windows are extracted along the time axis only; each batch element is windowed independently. :param dimension: Dimension of the underlying space, :math:`d`. :type dimension: int :param degree: Truncation level of the signature, :math:`N`. :type degree: int :param window_size: Number of points per window. :type window_size: int :param stride: Number of points between successive window starts. Default 1. :type stride: int :param scalar_term: If True, each emitted window signature includes the leading constant 1. If False (default), the leading element is stripped. :type scalar_term: bool :param n_jobs: Number of threads to run in parallel in the internal per-window ``sig`` calls. ``-1`` uses all available threads. :type n_jobs: int Example:: import pysiglib import numpy as np ws = pysiglib.SigWindowStream(dimension=3, degree=4, window_size=20, stride=5) # Single path path = np.random.randn(100, 3) ws.push_batch(path) window_sigs = ws.sig() # shape (num_windows, sig_length) # Batch of 8 independent paths ws = pysiglib.SigWindowStream(dimension=3, degree=4, window_size=20, stride=5) paths = np.random.randn(8, 100, 3) ws.push_batch(paths) window_sigs = ws.sig() # shape (num_windows, 8, sig_length) """ def __init__(self, dimension: int, degree: int, window_size: int, *, stride: int = 1, scalar_term: bool = False, n_jobs: int = 1, _sig=None): check_type(window_size, "window_size", int) check_type(stride, "stride", int) check_pos(window_size, "window_size") check_pos(stride, "stride") check_n_jobs(n_jobs) raw_sig = _sig or sig sig_fn = lambda path, deg: raw_sig(path, deg, scalar_term=scalar_term, n_jobs=n_jobs) super().__init__(sig_fn, dimension, degree, window_size, stride)
[docs] class LogSigWindowStream(_WindowStream): """ A fixed-width sliding window over a stream of incoming points that emits windowed log-signatures. A new window is emitted every ``stride`` points once enough points have been accumulated to fill the window. Accepts a single path or a batch of independent paths - the batch shape is inferred from the first ``push`` / ``push_batch`` call. .. note:: You must call ``pysiglib.prepare_log_sig(dimension, degree, method=2)`` before creating a ``LogSigWindowStream``. :param dimension: Dimension of the underlying space, :math:`d`. :type dimension: int :param degree: Truncation level of the log-signature, :math:`N`. :type degree: int :param window_size: Number of points per window. :type window_size: int :param stride: Number of points between successive window starts. Default 1. :type stride: int :param method: Method used for per-window log-signature computation (``2`` or ``3``). :type method: int :param n_jobs: Number of threads to run in parallel in the internal per-window ``log_sig`` calls. ``-1`` uses all available threads. :type n_jobs: int Example:: import pysiglib import numpy as np pysiglib.prepare_log_sig(3, 4, method=2) ws = pysiglib.LogSigWindowStream(dimension=3, degree=4, window_size=20, stride=5) path = np.random.randn(100, 3) ws.push_batch(path) window_logsigs = ws.sig() # shape (num_windows, log_sig_length) """ def __init__(self, dimension: int, degree: int, window_size: int, *, stride: int = 1, method: int = 2, n_jobs: int = 1, _log_sig=None): check_type(window_size, "window_size", int) check_type(stride, "stride", int) check_pos(window_size, "window_size") check_pos(stride, "stride") check_n_jobs(n_jobs) sig_fn = _log_sig or (lambda path, deg: log_sig(path, deg, method=method, n_jobs=n_jobs)) super().__init__(sig_fn, dimension, degree, window_size, stride)