summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--wasp/apps/heart.py136
-rw-r--r--wasp/boards/pinetime/manifest.py1
-rw-r--r--wasp/ppg.py170
3 files changed, 180 insertions, 127 deletions
diff --git a/wasp/apps/heart.py b/wasp/apps/heart.py
index fdf8336..367d17a 100644
--- a/wasp/apps/heart.py
+++ b/wasp/apps/heart.py
@@ -2,112 +2,8 @@
# Copyright (C) 2020 Daniel Thompson
import wasp
-import array
import machine
-
-class Biquad():
- """Direct Form II Biquad Filter"""
-
- def __init__(self, b0, b1, b2, a1, a2):
- self._coeff = (b0, b1, b2, a1, a2)
- self._v1 = 0
- self._v2 = 0
-
- def step(self, x):
- c = self._coeff
- v1 = self._v1
- v2 = self._v2
-
- v = x - (c[3] * v1) - (c[4] * v2)
- y = (c[0] * v) + (c[1] * v1) + (c[2] * v2)
-
- self._v2 = v1
- self._v1 = v
- return y
-
-class PTAGC():
- """Peak Tracking Automatic Gain Control
-
- In order for the correlation checks to work correctly we must
- aggressively reject spikes caused by fast DC steps. Setting a
- threshold based on the median is very effective at killing
- spikes but needs an extra 1k for sample storage which isn't
- really plausible for a microcontroller.
- """
- def __init__(self, start, decay, threshold):
- self._peak = start
- self._decay = decay
- self._boost = 1 / decay
- self._threshold = threshold
-
- def step(self, spl):
- # peak tracking
- peak = self._peak
- if abs(spl) > peak:
- peak *= self._boost
- else:
- peak *= self._decay
- self._peak = peak
-
- # rejection filter (clipper)
- threshold = self._threshold
- if spl > (peak * threshold) or spl < (peak * -threshold):
- return 0
-
- # booster
- spl = 100 * spl / (2 * peak)
-
- return spl
-
-def _compare(d1, d2, count, shift):
- e = 0
- for i in range(count):
- d = d1[i] - d2[i]
- e += d*d
- return e
-
-def compare(d, shift):
- return _compare(d[shift:], d[:-shift], len(d)-shift, shift)
-
-def trough(d, mn, mx):
- z2 = compare(d, mn-2)
- z1 = compare(d, mn-1)
- for i in range(mn, mx+1):
- z = compare(d, i)
- if z2 > z1 and z1 < z:
- return i
- z2 = z1
- z1 = z
-
- return -1
-
-def get_hrs(d):
- din = memoryview(d)
-
- # Search initially from ~210 to 30 bpm
- t0 = trough(din, 7, 48)
- if t0 < 0:
- return None
-
- # Check the second cycle ...
- t1 = t0 * 2
- t1 = trough(din, t1 - 5, t1 + 5)
- if t1 < 0:
- return None
-
- # ... and the third
- t2 = (t1 * 3) // 2
- t2 = trough(din, t2 - 5, t2 + 4)
- if t2 < 0:
- return None
-
- # If we can find a fourth cycle then use that for the extra
- # precision otherwise report whatever we've found
- t3 = (t2 * 4) // 3
- t3 = trough(din, t3 - 4, t3 + 4)
- if t3 < 0:
- return (60 * 24 * 3) // t2
- return (60 * 24 * 4) // t3
+import ppg
class HeartApp():
"""Heart Rate Sensing application.
@@ -125,40 +21,26 @@ class HeartApp():
draw.fill()
draw.string('PPG graph', 0, 6, width=240)
- self._hpf = Biquad(0.87033078, -1.74066156, 0.87033078, -1.72377617, 0.75754694)
- self._agc = PTAGC(20, 0.971, 2)
- self._lpf = Biquad(0.11595249, 0.23190498, 0.11595249, -0.72168143, 0.18549138)
+ wasp.system.request_tick(1000 // 8)
+ self._hrdata = ppg.PPG(wasp.watch.hrs.read_hrs())
self._x = 0
- self._offset = wasp.watch.hrs.read_hrs()
-
- self._hrdata = array.array('b')
-
- wasp.system.request_tick(1000 // 8)
def background(self):
wasp.watch.hrs.disable()
- del self._hpf
- del self._agc
- del self._lpf
+ del self._hrdata
def _subtick(self, ticks):
"""Notify the application that its periodic tick is due."""
draw = wasp.watch.drawable
- spl = wasp.watch.hrs.read_hrs()
- spl -= self._offset
- spl = self._hpf.step(spl)
- spl = self._agc.step(spl)
- spl = self._lpf.step(spl)
- spl = int(spl)
+ spl = self._hrdata.preprocess(wasp.watch.hrs.read_hrs())
- self._hrdata.append(spl)
- if len(self._hrdata) >= 240:
- draw.string('{} bpm'.format(get_hrs(self._hrdata)), 0, 6, width=240)
- del self._hrdata
- self._hrdata = array.array('b')
+ if len(self._hrdata.data) >= 240:
+ draw.string('{} bpm'.format(self._hrdata.get_heart_rate()),
+ 0, 6, width=240)
+ # Graph is orange by default...
color = 0xffc0
# If the maths goes wrong lets show it in the chart!
diff --git a/wasp/boards/pinetime/manifest.py b/wasp/boards/pinetime/manifest.py
index 02b5649..0193fed 100644
--- a/wasp/boards/pinetime/manifest.py
+++ b/wasp/boards/pinetime/manifest.py
@@ -29,6 +29,7 @@ freeze('../..',
'fonts/sans28.py',
'fonts/sans36.py',
'icons.py',
+ 'ppg.py',
'shell.py',
'wasp.py',
'widgets.py',
diff --git a/wasp/ppg.py b/wasp/ppg.py
new file mode 100644
index 0000000..324392d
--- /dev/null
+++ b/wasp/ppg.py
@@ -0,0 +1,170 @@
+# SPDX-License-Identifier: LGPL-3.0-or-later
+# Copyright (C) 2020 Daniel Thompson
+
+"""Photoplethysmogram (PPG) Signal Processing
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Algorithms and signal processing primatives that can be used to convert
+raw PPG signals into something useful.
+"""
+
+import array
+import micropython
+
+@micropython.viper
+def _compare(d1, d2, count: int, shift: int) -> int:
+ """Compare two sequences of (signed) bytes and quantify how dissimilar
+ they are.
+ """
+ p1 = ptr8(d1)
+ p2 = ptr8(d2)
+
+ e = 0
+ for i in range(count):
+ s1 = int(p1[i])
+ if s1 > 127:
+ s1 -= 256
+
+ s2 = int(p2[i])
+ if s2 > 127:
+ s2 -= 256
+
+ d = s1 - s2
+ e += d*d
+ return e
+
+class Biquad():
+ """Direct Form II Biquad Filter"""
+
+ def __init__(self, b0, b1, b2, a1, a2):
+ self._coeff = (b0, b1, b2, a1, a2)
+ self._v1 = 0
+ self._v2 = 0
+
+ def step(self, x):
+ c = self._coeff
+ v1 = self._v1
+ v2 = self._v2
+
+ v = x - (c[3] * v1) - (c[4] * v2)
+ y = (c[0] * v) + (c[1] * v1) + (c[2] * v2)
+
+ self._v2 = v1
+ self._v1 = v
+ return y
+
+class PTAGC():
+ """Peak Tracking Automatic Gain Control
+
+ In order for the correlation checks to work correctly we must
+ aggressively reject spikes caused by fast DC steps. Setting a
+ threshold based on the median is very effective at killing
+ spikes but needs an extra 1k for sample storage which isn't
+ really plausible for a microcontroller.
+ """
+ def __init__(self, start, decay, threshold):
+ self._peak = start
+ self._decay = decay
+ self._boost = 1 / decay
+ self._threshold = threshold
+
+ def step(self, spl):
+ # peak tracking
+ peak = self._peak
+ if abs(spl) > peak:
+ peak *= self._boost
+ else:
+ peak *= self._decay
+ self._peak = peak
+
+ # rejection filter (clipper)
+ threshold = self._threshold
+ if spl > (peak * threshold) or spl < (peak * -threshold):
+ return 0
+
+ # booster
+ spl = 100 * spl / (2 * peak)
+
+ return spl
+
+class PPG():
+ """
+ """
+
+ def __init__(self, spl):
+ self._offset = spl
+ self.data = array.array('b')
+
+ self._hpf = Biquad(0.87033078, -1.74066156, 0.87033078,
+ -1.72377617, 0.75754694)
+ self._agc = PTAGC(20, 0.971, 2)
+ self._lpf = Biquad(0.11595249, 0.23190498, 0.11595249,
+ -0.72168143, 0.18549138)
+
+ def preprocess(self, spl):
+ """Preprocess a PPG sample.
+
+ Must be called at 24Hz for accurate heart rate calculations.
+ """
+ spl -= self._offset
+ spl = self._hpf.step(spl)
+ spl = self._agc.step(spl)
+ spl = self._lpf.step(spl)
+ spl = int(spl)
+
+ self.data.append(spl)
+ return spl
+
+ def _get_heart_rate(self):
+ def compare(d, shift):
+ return _compare(d[shift:], d[:-shift], len(d)-shift, shift)
+
+ def trough(d, mn, mx):
+ z2 = compare(d, mn-2)
+ z1 = compare(d, mn-1)
+ for i in range(mn, mx+1):
+ z = compare(d, i)
+ if z2 > z1 and z1 < z:
+ return i
+ z2 = z1
+ z1 = z
+
+ return -1
+
+ data = memoryview(self.data)
+
+ # Search initially from ~210 to 30 bpm
+ t0 = trough(data, 7, 48)
+ if t0 < 0:
+ return None
+
+ # Check the second cycle ...
+ t1 = t0 * 2
+ t1 = trough(data, t1 - 5, t1 + 5)
+ if t1 < 0:
+ return None
+
+ # ... and the third
+ t2 = (t1 * 3) // 2
+ t2 = trough(data, t2 - 5, t2 + 4)
+ if t2 < 0:
+ return None
+
+ # If we can find a fourth cycle then use that for the extra
+ # precision otherwise report whatever we've found
+ t3 = (t2 * 4) // 3
+ t3 = trough(data, t3 - 4, t3 + 4)
+ if t3 < 0:
+ return (60 * 24 * 3) // t2
+ return (60 * 24 * 4) // t3
+
+ def get_heart_rate(self):
+ if len(self.data) < 200:
+ return None
+
+ hr = self._get_heart_rate()
+
+ # Clear out the accumulated data
+ self.data = array.array('b')
+
+ return hr