diff --git a/draft.py b/draft.py deleted file mode 100644 index 61a568f2..00000000 --- a/draft.py +++ /dev/null @@ -1,131 +0,0 @@ -"""Partial autoinformation module for segmented data.""" - -# code from https://github.com/Frederic-vW/AIF-PAIF: -# F. von Wegner, Partial Autoinformation to Characterize Symbolic Sequences -# Front Physiol (2018) https://doi.org/10.3389/fphys.2018.01382 -import itertools -from typing import List, Optional, Tuple, Union - -import numpy as np -import scipy.stats -from mne.parallel import parallel_func -from numpy.typing import NDArray - -from .._typing import Segmentation -from ..utils._checks import _check_n_jobs, _check_type, _check_value -from ..utils._docs import fill_doc - -# TODO: should we normalize aif and paif? -# The n_clusters parameter should reflect the total number of possible states, -# including the ignored state, if applicable. If the ignored state is not -# considered as a valid state, it should be excluded from the count of -# n_clusters. - - -### Partial auto-information -@fill_doc -def _partial_auto_information( - labels: NDArray[int], - k: int, - state_to_ignore: Optional[Union[int, None]] = -1, - log_base: Optional[Union[float, str]] = 2, -): - """ - Compute the partial auto-information for lag k. - - Parameters - ---------- - %(labels_info)s - k: int - the lag in sample. - %(state_to_ignore)s - %(log_base)s - - Returns - ------- - p: float - Partial auto-information for lag k. - """ - if k <= 1: - return _auto_information( - labels, k, state_to_ignore=state_to_ignore, log_base=log_base - ) - h1 = _joint_entropy_history( - labels, - k, - state_to_ignore=state_to_ignore, - log_base=log_base, - ) - h2 = _joint_entropy_history( - labels, - k - 1, - state_to_ignore=state_to_ignore, - log_base=log_base, - ) - h3 = _joint_entropy_history( - labels, - k + 1, - state_to_ignore=state_to_ignore, - log_base=log_base, - ) - p = 2 * h1 - h2 - h3 - return p - - -@fill_doc -def partial_auto_information_function( - segmentation: [Segmentation, NDArray[int]], - lags: Union[ - int, - List[int], - Tuple[int, ...], - NDArray[int], - ], - state_to_ignore: Optional[Union[int, None]] = -1, - ignore_self: Optional[bool] = False, - log_base: Optional[Union[float, str]] = 2, - n_jobs: Optional[int] = 1, -): - """ - Compute the Partial auto-information function. - - TeX notation: - I(X_{n+k} ; X_{n} | X_{n+k-1}^{(k-1)}) - = H(X_{n+k} | X_{n+k-1}^{(k-1)}) - H(X_{n+k} | X_{n+k-1}^{(k-1)}, X_{n}) - = H(X_{n+k},X_{n+k-1}^{(k-1)}) - H(X_{n+k-1}^{(k-1)}) - - H(X_{n+k},X_{n+k-1}^{(k)}) + H(X_{n+k-1}^{(k)}) - = H(X_{n+k}^{(k)}) - H(X_{n+k-1}^{(k-1)}) - H(X_{n+k}^{(k+1)}) + H(X_{n+k-1}^{(k)}) - - Parameters - ---------- - %(segmentation_or_labels)s - %(lags)s - %(log_base)s - %(n_jobs)s - - Returns - ------- - p: array (n_symbols, ) - Partial auto-information array for each lag. - lags: array (n_lags, ) - the lag values. - """ - labels = _check_segmentation(segmentation) - lags = _check_lags(lags) - _check_type(state_to_ignore, (int,), "state_to_ignore") - _check_type(ignore_self, (bool,), "ignore_self") - log_base = _check_log_base(log_base) - n_jobs = _check_n_jobs(n_jobs) - - # ignore transition to itself (i.e. 1 -> 1) - if ignore_self: - labels = np.array([s for s, _ in itertools.groupby(labels)]) - - parallel, p_fun, _ = parallel_func( - _partial_auto_information, n_jobs, total=len(lags) - ) - runs = parallel( - p_fun(labels, k, state_to_ignore=state_to_ignore, log_base=log_base) - for k in lags - ) - return runs, lags diff --git a/pycrostates/_typing.py b/pycrostates/_typing.py index b09b5cd3..1fc9f025 100644 --- a/pycrostates/_typing.py +++ b/pycrostates/_typing.py @@ -29,7 +29,7 @@ class Cluster(ABC): pass -class Segmentation(ABC): +class Segmentation(_BaseSegmentation): """Typing for a clustering class.""" pass diff --git a/pycrostates/segmentation/tests/test_entropy.py b/pycrostates/segmentation/tests/test_entropy.py index 277faa34..685ec77f 100644 --- a/pycrostates/segmentation/tests/test_entropy.py +++ b/pycrostates/segmentation/tests/test_entropy.py @@ -1,6 +1,6 @@ import numpy as np import pytest -from mne import Epochs, make_fixed_length_events +from mne import Epochs, make_fixed_length_epochs from mne.datasets import testing from mne.io import read_raw_fif @@ -30,8 +30,7 @@ raw = raw.pick("eeg").crop(0, 10) raw = raw.load_data().filter(1, 40).apply_proj() -events = make_fixed_length_events(raw, 1) -epochs = Epochs(raw, events, preload=True) +epochs = make_fixed_length_epochs(raw, 1, preload=True) ModK_raw = ModKMeans(n_clusters=4, n_init=10, max_iter=100, tol=1e-4, random_state=1) ModK_epochs = ModKMeans(n_clusters=4, n_init=10, max_iter=100, tol=1e-4, random_state=1) diff --git a/test.ipynb b/test.ipynb deleted file mode 100644 index 5a7b379f..00000000 --- a/test.ipynb +++ /dev/null @@ -1,549 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "metadata": {}, - "outputs": [], - "source": [ - "import numpy as np\n", - "n_clusters = 5\n", - "labels = np.random.randint(0, n_clusters, 100)" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [], - "source": [ - "#!/usr/bin/python\n", - "# -*- coding: utf-8 -*-\n", - "# code for the PAIF manuscript:\n", - "# F. von Wegner, Partial Autoinformation to Characterize Symbolic Sequences\n", - "# Front Physiol (2018) https://doi.org/10.3389/fphys.2018.01382\n", - "# FvW 05/2018, Python3 version 05/2021\n", - "\n", - "import os, sys\n", - "import numpy as np\n", - "import matplotlib.pyplot as plt\n", - "\n", - "# --- define logarithm\n", - "#log = np.log\n", - "log = np.log2\n", - "\n", - "\n", - "\"\"\"*****************************************************************************\n", - "!!! ALL SYMBOLS HAVE TO BE INTEGERS: 0, 1, 2, ... !!!\n", - "*****************************************************************************\"\"\"\n", - "\n", - "\n", - "def H_1(x, ns):\n", - " \"\"\"\n", - " Shannon entropy of the symbolic sequence x with ns symbols.\n", - "\n", - " Args:\n", - " x: symbolic sequence, symbols = [0, 1, ..., ns-1]\n", - " ns: number of symbols\n", - " Returns:\n", - " h: Shannon entropy of x\n", - " \"\"\"\n", - "\n", - " n = len(x)\n", - " p = np.zeros(ns) # symbol distribution\n", - " for t in range(n):\n", - " p[x[t]] += 1.0\n", - " p /= n\n", - " h = -np.sum(p[p>0]*log(p[p>0]))\n", - " return h\n", - "\n", - "\n", - "def H_2(x, y, ns):\n", - " \"\"\"\n", - " Joint Shannon entropy of the symbolic sequences x, y, with ns symbols.\n", - "\n", - " Args:\n", - " x, y: symbolic sequences, symbols = [0, 1, ..., ns-1]\n", - " ns: number of symbols\n", - " Returns:\n", - " h: joint Shannon entropy of (x, y)\n", - " \"\"\"\n", - "\n", - " if (len(x) != len(y)):\n", - " print(\"H_2 warning : sequences of different lengths, using the shorter...\")\n", - " n = min(len(x), len(y))\n", - " p = np.zeros((ns, ns)) # joint distribution\n", - " for t in range(n):\n", - " p[x[t],y[t]] += 1.0\n", - " p /= n\n", - " h = -np.sum(p[p>0]*log(p[p>0]))\n", - " return h\n", - "\n", - "\n", - "def H_k(x, ns, k):\n", - " \"\"\"\n", - " Joint Shannon entropy of k-histories x[t:t+k]\n", - "\n", - " Args:\n", - " x: symbolic sequence, symbols = [0, 1, ..., ns-1]\n", - " ns: number of symbols\n", - " k: length of k-history\n", - " Returns:\n", - " h: joint Shannon entropy of x[t:t+k]\n", - " \"\"\"\n", - "\n", - " n = len(x)\n", - " p = np.zeros(tuple(k*[ns])) # symbol joint distribution\n", - " for t in range(n-k):\n", - " p[tuple(x[t:t+k])] += 1.0\n", - " p /= (n-k) # normalize distribution\n", - " h = -np.sum(p[p>0]*log(p[p>0]))\n", - " #m = np.sum(p>0)\n", - " #h = h + (m-1)/(2*N) # Miller-Madow bias correction\n", - " return h\n", - "\n", - "def H_k_fix(x, ns, k):\n", - " \"\"\"\n", - " Joint Shannon entropy of k-histories x[t:t+k]\n", - "\n", - " Args:\n", - " x: symbolic sequence, symbols = [0, 1, ..., ns-1]\n", - " ns: number of symbols\n", - " k: length of k-history\n", - " Returns:\n", - " h: joint Shannon entropy of x[t:t+k]\n", - " \"\"\"\n", - "\n", - " n = len(x)\n", - " p = np.zeros(tuple(k*[ns])) # symbol joint distribution\n", - " for t in range(n-k+1):\n", - " p[tuple(x[t:t+k])] += 1.0\n", - " p /= (n-k+1) # normalize distribution\n", - " h = -np.sum(p[p>0]*log(p[p>0]))\n", - " #m = np.sum(p>0)\n", - " #h = h + (m-1)/(2*N) # Miller-Madow bias correction\n", - " return h\n", - "\n", - "def ais(x, ns, k=1):\n", - " \"\"\"\n", - " Active information storage (AIS)\n", - "\n", - " TeX notation:\n", - " I(X_{n+1} ; X_{n}^{(k)})\n", - " = H(X_{n+1}) - H(X_{n+1} | X_{n}^{(k)})\n", - " = H(X_{n+1}) - H(X_{n+1},X_{n}^{(k)}) + H(X_{n}^{(k)})\n", - " = H(X_{n+1}) + H(X_{n}^{(k)}) - H(X_{n+1}^{(k+1)})\n", - "\n", - " Args:\n", - " x: symbolic sequence, symbols = [0, 1, ..., ns-1]\n", - " ns: number of symbols\n", - " k: history length (optional, default value k=1)\n", - " Returns:\n", - " a: active information storage\n", - " \"\"\"\n", - "\n", - " n = len(x)\n", - " h1 = H_k(x, ns, 1)\n", - " h2 = H_k(x, ns, k)\n", - " h3 = H_k(x, ns, k+1)\n", - " a = h1 + h2 - h3\n", - " return a\n", - "\n", - "def ais_fix(x, ns, k=1):\n", - " \"\"\"\n", - " Active information storage (AIS)\n", - "\n", - " TeX notation:\n", - " I(X_{n+1} ; X_{n}^{(k)})\n", - " = H(X_{n+1}) - H(X_{n+1} | X_{n}^{(k)})\n", - " = H(X_{n+1}) - H(X_{n+1},X_{n}^{(k)}) + H(X_{n}^{(k)})\n", - " = H(X_{n+1}) + H(X_{n}^{(k)}) - H(X_{n+1}^{(k+1)})\n", - "\n", - " Args:\n", - " x: symbolic sequence, symbols = [0, 1, ..., ns-1]\n", - " ns: number of symbols\n", - " k: history length (optional, default value k=1)\n", - " Returns:\n", - " a: active information storage\n", - " \"\"\"\n", - "\n", - " n = len(x)\n", - " h1 = H_k_fix(x, ns, 1)\n", - " h2 = H_k_fix(x, ns, k)\n", - " h3 = H_k_fix(x, ns, k+1)\n", - " a = h1 + h2 - h3\n", - " return a\n" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "2.2940526443131133\n", - "2.2940526443131133\n", - "2.2940526443131133\n" - ] - } - ], - "source": [ - "# n_clusters doesn't change H1\n", - "for n in [n_clusters, 10, 12]:\n", - " print(H_1(labels, n))" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "4.546766258580116\n", - "4.546766258580116\n", - "4.546766258580116\n" - ] - } - ], - "source": [ - "# n_clusters doesn't change H_k\n", - "for n in [n_clusters, 10, 12]:\n", - " print(H_k(labels, n, 2))" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "2.3098288226749863\n", - "2.3121287243020605\n", - "2.3098288226749863\n" - ] - } - ], - "source": [ - "# H_k (k=1)!= H_1\n", - "print(H_1(labels, n))\n", - "print(H_k(labels, n, 1))\n", - "# due to len(labels) - k + 1\n", - "print(H_k_fix(labels, n, 1))" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "metadata": {}, - "outputs": [], - "source": [ - "%load_ext autoreload\n", - "%autoreload 2\n", - "\n", - "from pycrostates.segmentation.autoinformation import entropy,joint_entropy, joint_entropy_history" - ] - }, - { - "cell_type": "code", - "execution_count": 9, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "True" - ] - }, - "execution_count": 9, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "entropy(labels) == H_1(labels, n_clusters)" - ] - }, - { - "cell_type": "code", - "execution_count": 14, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "True" - ] - }, - "execution_count": 14, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "np.allclose(joint_entropy(labels, labels), H_2(labels, labels, n_clusters))" - ] - }, - { - "cell_type": "code", - "execution_count": 18, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "True" - ] - }, - "execution_count": 18, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "np.allclose(joint_entropy_history(labels, 3), H_k_fix(labels, n_clusters, 3))" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "True" - ] - }, - "execution_count": 6, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "joint_entropy_history(labels, 1) == entropy(labels)" - ] - }, - { - "cell_type": "code", - "execution_count": 12, - "metadata": {}, - "outputs": [], - "source": [ - "\n", - "\n", - "from pycrostates.segmentation.autoinformation import paif as pc_paif\n" - ] - }, - { - "cell_type": "code", - "execution_count": 17, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "1.8377171241426664\n", - "1.8198448257160775\n", - "1.8198448257160758\n" - ] - } - ], - "source": [ - "from pycrostates.segmentation.autoinformation import ais as pc_ais\n", - "print(ais(labels, n_clusters, 3))\n", - "print(pc_ais(labels, 3))\n", - "print(ais_fix(labels, n_clusters, 3))" - ] - }, - { - "cell_type": "code", - "execution_count": 26, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\tAIF(k=3)\n", - "0.09332550105294057\n", - "0.09332550105294057\n" - ] - } - ], - "source": [ - "from pycrostates.segmentation.autoinformation import aif as pc_aif\n", - "\n", - "def aif(x, ns, kmax):\n", - " \"\"\"\n", - " Time-lagged mutual information = Auto-information function (AIF)\n", - "\n", - " TeX notation:\n", - " I(X_{n+k} ; X_{n})\n", - " = H(X_{n+k}) - H(X_{n+k} | X_{n})\n", - " = H(X_{n+k}) - H(X_{n+k},X_{n}) + H(X_{n})\n", - " = H(X_{n+k}) + H(X_{n}) - H(X_{n+k},X_{n})\n", - "\n", - " Args:\n", - " x: symbolic sequence, symbols = [0, 1, 2, ...]\n", - " ns: number of symbols\n", - " kmax: maximum time lag\n", - " Returns:\n", - " a: time-lagged mutual information array for lags k=0, ..., kmax-1\n", - " \"\"\"\n", - "\n", - " n = len(x)\n", - " a = np.zeros(kmax)\n", - " for k in range(kmax):\n", - " nmax = n-k\n", - " h1 = H_1(x[:nmax], ns)\n", - " h2 = H_1(x[k:k+nmax], ns)\n", - " h12 = H_2(x[:nmax], x[k:k+nmax], ns)\n", - " a[k] = h1 + h2 - h12\n", - " #if (k%10 == 0): print(f\"\\tAIF(k={k:d})\\r\", end=\"\")\n", - " print(f\"\\tAIF(k={k:d})\\r\", end=\"\")\n", - " print(\"\")\n", - " #a /= a[0] # normalize: a[0]=1.0\n", - " return a\n", - "\n", - "k = 3\n", - "print(aif(labels, n_clusters, k+1)[k])\n", - "print(pc_aif(labels, k))" - ] - }, - { - "cell_type": "code", - "execution_count": 28, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\tAIF(k=1)\n", - "\tPAIF(k=3)\n", - "1.0477063751555882\n", - "\tAIF(k=1)\n", - "\tPAIF(k=3)\n", - "1.0291852953964824\n", - "1.0291852953964815\n" - ] - } - ], - "source": [ - "from pycrostates.segmentation.autoinformation import paif as pc_paif\n", - "\n", - "def paif(x, ns, kmax):\n", - " \"\"\"\n", - " Partial auto-information function (PAIF).\n", - "\n", - " TeX notation:\n", - " I(X_{n+k} ; X_{n} | X_{n+k-1}^{(k-1)})\n", - " = H(X_{n+k} | X_{n+k-1}^{(k-1)}) - H(X_{n+k} | X_{n+k-1}^{(k-1)}, X_{n})\n", - " = H(X_{n+k},X_{n+k-1}^{(k-1)}) - H(X_{n+k-1}^{(k-1)})\n", - " - H(X_{n+k},X_{n+k-1}^{(k)}) + H(X_{n+k-1}^{(k)})\n", - " = H(X_{n+k}^{(k)}) - H(X_{n+k-1}^{(k-1)}) - H(X_{n+k}^{(k+1)}) + H(X_{n+k-1}^{(k)})\n", - "\n", - " Args:\n", - " x: symbolic sequence, symbols = [0, 1, 2, ...]\n", - " ns: number of symbols\n", - " kmax: maximum time lag\n", - " Returns:\n", - " p: partial autoinformation array for lags k=0, ..., kmax-1\n", - " \"\"\"\n", - "\n", - " n = len(x)\n", - " p = np.zeros(kmax)\n", - " a = aif(x, ns, 2) # use AIF coeffs. for k=0, 1\n", - " p[0], p[1] = a[0], a[1]\n", - " for k in range(2,kmax):\n", - " h1 = H_k(x, ns, k)\n", - " h2 = H_k(x, ns, k-1)\n", - " h3 = H_k(x, ns, k+1)\n", - " p[k] = 2*h1 - h2 - h3\n", - " #if (k%5 == 0): print(f\"\\tPAIF(k={k:d})\\r\", end=\"\")\n", - " print(f\"\\tPAIF(k={k:d})\\r\", end=\"\")\n", - " print(\"\")\n", - " #p /= p[0] # normalize: p[0]=1.0\n", - " return p\n", - "\n", - "def paif_fix(x, ns, kmax):\n", - " \"\"\"\n", - " Partial auto-information function (PAIF).\n", - "\n", - " TeX notation:\n", - " I(X_{n+k} ; X_{n} | X_{n+k-1}^{(k-1)})\n", - " = H(X_{n+k} | X_{n+k-1}^{(k-1)}) - H(X_{n+k} | X_{n+k-1}^{(k-1)}, X_{n})\n", - " = H(X_{n+k},X_{n+k-1}^{(k-1)}) - H(X_{n+k-1}^{(k-1)})\n", - " - H(X_{n+k},X_{n+k-1}^{(k)}) + H(X_{n+k-1}^{(k)})\n", - " = H(X_{n+k}^{(k)}) - H(X_{n+k-1}^{(k-1)}) - H(X_{n+k}^{(k+1)}) + H(X_{n+k-1}^{(k)})\n", - "\n", - " Args:\n", - " x: symbolic sequence, symbols = [0, 1, 2, ...]\n", - " ns: number of symbols\n", - " kmax: maximum time lag\n", - " Returns:\n", - " p: partial autoinformation array for lags k=0, ..., kmax-1\n", - " \"\"\"\n", - "\n", - " n = len(x)\n", - " p = np.zeros(kmax)\n", - " a = aif(x, ns, 2) # use AIF coeffs. for k=0, 1\n", - " p[0], p[1] = a[0], a[1]\n", - " for k in range(2,kmax):\n", - " h1 = H_k_fix(x, ns, k)\n", - " h2 = H_k_fix(x, ns, k-1)\n", - " h3 = H_k_fix(x, ns, k+1)\n", - " p[k] = 2*h1 - h2 - h3\n", - " #if (k%5 == 0): print(f\"\\tPAIF(k={k:d})\\r\", end=\"\")\n", - " print(f\"\\tPAIF(k={k:d})\\r\", end=\"\")\n", - " print(\"\")\n", - " #p /= p[0] # normalize: p[0]=1.0\n", - " return p\n", - "\n", - "k = 3\n", - "print(paif(labels, n_clusters, k+1)[k])\n", - "print(paif_fix(labels, n_clusters, k+1)[k])\n", - "print(pc_paif(labels, k))\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "pycrostates_dev", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.12" - }, - "orig_nbformat": 4 - }, - "nbformat": 4, - "nbformat_minor": 2 -}