Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
vferat committed Aug 10, 2023
1 parent 1967d0e commit c648257
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 7 deletions.
3 changes: 3 additions & 0 deletions pycrostates/segmentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
auto_information_function,
entropy,
excess_entropy_rate,
partial_auto_information_function,
)
from .segmentation import EpochsSegmentation, RawSegmentation # noqa: F401
from .transitions import ( # noqa: F401
Expand All @@ -15,6 +16,8 @@
"entropy",
"excess_entropy_rate",
"auto_information_function",
"partial_auto_information_function",
"auto_information_function",
"EpochsSegmentation",
"RawSegmentation",
)
138 changes: 131 additions & 7 deletions pycrostates/segmentation/entropy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# F. von Wegner, Partial Autoinformation to Characterize Symbolic Sequences
# Front Physiol (2018) https://doi.org/10.3389/fphys.2018.01382
import itertools
from typing import Optional, Union
from typing import List, Optional, Tuple, Union

import numpy as np
import scipy.stats
Expand Down Expand Up @@ -183,7 +183,7 @@ def _joint_entropy_history(
# Construct the k-history sequences while ignoring the state
histories = []
for i in range(len(labels) - k + 1):
history = tuple(labels[i : i + k])
history = tuple(labels[i: i + k])
if state_to_ignore not in history:
histories.append(history)

Expand All @@ -193,7 +193,7 @@ def _joint_entropy_history(
for i in range(
len(labels) - k + 1
): # TODO: check +1 (not in original code)
history = tuple(labels[i : i + k])
history = tuple(labels[i: i + k])
if state_to_ignore not in history:
joint_dist[history] += 1.0
# Compute the joint entropy
Expand Down Expand Up @@ -414,7 +414,7 @@ def _auto_information(
log_base: Optional[Union[float, str]] = 2,
):
"""
Computes the Auto-information for lag k.
Compute the Auto-information for lag k.
Parameters
----------
Expand Down Expand Up @@ -448,13 +448,13 @@ def _auto_information(
labels[:nmax], state_to_ignore=state_to_ignore, log_base=log_base
)
h2 = _entropy(
labels[k : k + nmax],
labels[k: k + nmax],
state_to_ignore=state_to_ignore,
log_base=log_base,
)
h12 = _joint_entropy(
labels[:nmax],
labels[k : k + nmax],
labels[k: k + nmax],
state_to_ignore=state_to_ignore,
log_base=log_base,
)
Expand All @@ -465,7 +465,12 @@ def _auto_information(
@fill_doc
def auto_information_function(
segmentation: NDArray[int],
lags: int,
lags: Union[
int,
List[int],
Tuple[int, ...],
NDArray[int],
],
state_to_ignore: Optional[Union[int, None]] = -1,
ignore_self: Optional[bool] = False,
log_base: Optional[Union[float, str]] = 2,
Expand Down Expand Up @@ -523,3 +528,122 @@ def auto_information_function(
)
runs = np.array(runs)
return lags, runs


# Partial auto-information
@fill_doc
def _partial_auto_information(
labels: NDArray[int],
k: int,
state_to_ignore: Optional[Union[int, None]] = -1,
log_base: Optional[Union[float, str]] = 2,
):
"""
Compute the partial auto-information for lag k.
Parameters
----------
%(labels_info)s
k: int
the lag in sample.
%(state_to_ignore)s
%(log_base)s
Returns
-------
p: float
Partial auto-information for lag k.
"""
if k <= 1:
return _auto_information(
labels, k, state_to_ignore=state_to_ignore, log_base=log_base
)
h1 = _joint_entropy_history(
labels,
k,
state_to_ignore=state_to_ignore,
log_base=log_base,
)
h2 = _joint_entropy_history(
labels,
k - 1,
state_to_ignore=state_to_ignore,
log_base=log_base,
)
h3 = _joint_entropy_history(
labels,
k + 1,
state_to_ignore=state_to_ignore,
log_base=log_base,
)
p = 2 * h1 - h2 - h3
return p


@fill_doc
def partial_auto_information_function(
segmentation: [Segmentation, NDArray[int]],
lags: Union[
int,
List[int],
Tuple[int, ...],
NDArray[int],
],
state_to_ignore: Optional[Union[int, None]] = -1,
ignore_self: Optional[bool] = False,
log_base: Optional[Union[float, str]] = 2,
n_jobs: Optional[int] = 1,
):
"""
Compute the Partial auto-information function.
TeX notation:
I(X_{n+k} ; X_{n} | X_{n+k-1}^{(k-1)})
= H(X_{n+k} | X_{n+k-1}^{(k-1)}) - H(X_{n+k} | X_{n+k-1}^{(k-1)}, X_{n})
= H(X_{n+k},X_{n+k-1}^{(k-1)}) - H(X_{n+k-1}^{(k-1)})
- H(X_{n+k},X_{n+k-1}^{(k)}) + H(X_{n+k-1}^{(k)})
= H(X_{n+k}^{(k)}) - H(X_{n+k-1}^{(k-1)}) - H(X_{n+k}^{(k+1)}) + H(X_{n+k-1}^{(k)})
Parameters
----------
%(segmentation_or_labels)s
lags : int, list, tuple, or array of shape ``(n_lags,)``
The lags at which to compute the auto-information function.
If int, will use lags = np.arange(lags).
%(log_base)s
%(n_jobs)s
Returns
-------
p: array (n_symbols, )
Partial auto-information array for each lag.
lags: array (n_lags, )
the lag values.
""" # noqa: E501
labels = _check_segmentation(segmentation)
lags = _check_lags(lags)
_check_type(
state_to_ignore,
(
int,
None,
),
"state_to_ignore",
)
_check_type(ignore_self, (bool,), "ignore_self")
log_base = _check_log_base(log_base)
n_jobs = _check_n_jobs(n_jobs)

# ignore transition to itself (i.e. 1 -> 1)
if ignore_self:
labels = np.array([s for s, _ in itertools.groupby(labels)])

parallel, p_fun, _ = parallel_func(
_partial_auto_information, n_jobs, total=len(lags)
)
runs = parallel(
p_fun(labels, k, state_to_ignore=state_to_ignore, log_base=log_base)
for k in lags
)
runs = np.array(runs)
return lags, runs
64 changes: 64 additions & 0 deletions pycrostates/segmentation/tests/test_entropy.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
_check_segmentation,
_joint_entropy,
_joint_entropy_history,
_partial_auto_information,
auto_information_function,
entropy,
excess_entropy_rate,
partial_auto_information_function,
)

set_log_level("INFO")
Expand Down Expand Up @@ -234,3 +236,65 @@ def test_auto_information_function():
ignore_self=True,
log_base=2,
)


def test__partial_auto_information():
labels = np.random.randint(-1, 4, 100)
r = _partial_auto_information(labels, 10, state_to_ignore=-1, log_base=2)
assert isinstance(r, float)
r = _partial_auto_information(labels, 10, state_to_ignore=None, log_base=2)
assert isinstance(r, float)


def test_partial_auto_information_function():
# Array
labels = np.random.randint(-1, 4, 100)
lags, runs = partial_auto_information_function(
labels,
lags=10,
state_to_ignore=-1,
ignore_self=False,
log_base=2,
)
assert np.all(lags == np.arange(10))
assert isinstance(runs, np.ndarray)
# Raw
partial_auto_information_function(
raw_segmentation,
lags=10,
state_to_ignore=-1,
ignore_self=False,
log_base=2,
)
# Epochs
partial_auto_information_function(
epochs_segmentation,
lags=10,
state_to_ignore=-1,
ignore_self=False,
log_base=2,
)
# state_to_ignore
partial_auto_information_function(
labels,
lags=10,
state_to_ignore=None,
ignore_self=False,
log_base=2,
)
# ignore_self
auto_information_function(
labels,
lags=10,
state_to_ignore=None,
ignore_self=True,
log_base=2,
)
# lags
auto_information_function(
labels,
lags=[1, 3, 10],
state_to_ignore=None,
ignore_self=True,
log_base=2,
)

0 comments on commit c648257

Please sign in to comment.