Skip to content
Snippets Groups Projects
Commit acf86d8b authored by s202576's avatar s202576
Browse files

Upload New File

parent 9a02f9c5
No related branches found
No related tags found
No related merge requests found
Pipeline #4788 failed
# Saccade Peak Velocity measure calculations & error metrics
# saccVelocity.py - Fiona Mulvey <fiona.mulvey@humlab.le.se>2011
# DO NOT DISTRIBUTE BEYOND THIS WORKGROUP PLEASE!!
# Usage: python saccVelocity.py <datafile> [<baseline>] [<decimationRate>]
#This program uses a univariate spline interpolation to compute a velocity profile from raw sample data and report peak velocity, with error metrics and optional
#angular thresholding.
#
#
# Configuration Options:
configMonocular = False # False for binocular files, True for monocular.Assumes that binocular data is reported with every second sample from the same eye, so if that's the...
# ...format you have, there is no need to separate the files.
configAngleEccentricity = 1000.0 # Average angular eccentricity threshold, 0.0 to exclude nothing. Optional filter to remove saccades with non-human levels of angular curvature across...
# ...the saccade (i.e. not ballistic with curvature that's impossible - typical of system noise).
configStartBaseline = True # Adds a start baseline value to the velocity profile of each saccade if True. This should be calculated based on spatial precision during fixations. I used...
# ...the mode of rounded intersample velocities.
#configBaselineDistance = 0.002 # How far (seconds) to place the baseline velocity point from the real saccade samples
configEndBaseline = False # Adds an end baseline value to the velocity profile of each saccade if True
configPixelsPerDegree = 40.4 # Number of pixels per degree of viewing angle
configMaxIntersample = 1000 # Degrees/sec for which we filter intersample velocities for the calculation of baseline velocity.
configBaselineMethod = "accel" # Methods, one of: "mode", "accel", "velocity" can be used to calculate the baseline velocity value
configSampleRate = 625 # Hertz
configDecimationRate = 2 # Ratio of 1:n samples. 1 for no decimation.
#configBaselineDistance = 1.0 / (float(configSampleRate)/float(configDecimationRate)) # How far (seconds) to place the baselines from the real samples
inputFrequency = 1250
configBaselineDistance = 0.0016
configOutputFileLabel = "625hz"
#print "BaselineDistance: " + str(configBaselineDistance)
configSaccMethod = "baseline" # How to detect saccades ("system" or "baseline") baseline will use a simple velocity thresholding to identify saccades, but if a
# system event detection variable is present in the data, this can be used instead
# File format declaration
configDelimiter = "\t" # Column separator in files - "\t" is tab.
configTerminator = "" # Value in 'sacx' or 'sacy' fields that marks saccade end. In this case, it's a system missing value
# Column header names
configColumnTime = "GazeTime" # Absolute time field (seconds)
configColumnSaccX = "sacx" # Marked saccades x values
configColumnSaccY = "sacy" # Marked saccades x values
configColumnRawX = "x" # Raw X
configColumnRawY = "y" # Raw Y
configColumnSubject = "subject" # A subject delimits a group.
#configColumnSubjectNr = "Trial" # For some systems where the subject is marked as a 'trial' variable (stupid variable name for a subject, but it happens....)
configColumnSubjectNr = "subjectnumber" # An individual within that group
configColumnCondition = "condition" # An experimental trial identifier which you can insert if you want to compile files later
# Output column names
configColumnPeakVelocity = "PeakVelocity" # What to call peak velocity in the output
configColumnErrorRate = "ErrorRate" # Error metric calculation in output - this error metric is based on calculus and identifies movements in the data which cannot be eye movements, since they do not obey the calculus of rotations. They are present even in high end systems.
configColumnAmplitude = "Amplitude" # Total saccade amplitude calculation - based on the sum of intersample distances
configColumnSkew = "Skew" # What to call the skew variable in the output - this is the skew of the velocity profile
configFixedBaseline = 0.0
# Input filename
# NOTE: This is overriden by filenames given on the command line
configInputFile = "sampledataNEW.txt"
# Global imports
import sys
import math
import scipy.interpolate
import scipy.integrate
import scipy.stats
import os.path
import numpy
# Utility functions
from itertools import izip
argmax = lambda array: max(izip(array, xrange(len(array))))[1]
# SaccadeVelocity class:
# - Does actual interpolation calculations.
# - Knows nothing about files - simply gets given a complete saccade
class SaccadeVelocity:
peakVelocity = 0.0
errorRate = 0.0
amplitude = 0.0
integral = 0.0
skew = 0.0
baseline = 0.0
baseTime = 0
duration = 0.0
angle = 0.0
saccadeData = []
isSaccade = True
def __init__(self,saccade,baseline=-1.0):
# Sanity check the saccade
if len(saccade) == 0:
#raise Exception("Error: Empty saccade in SaccadeVelocity")
self.isSaccade = False
return
if configStartBaseline or configEndBaseline:
if baseline < 0.0:
self.isSaccade = False
raise Exception("Error: No baseline value supplied")
self.baseline = baseline
self.saccadeData = saccade
self.compute(saccade)
def compute(self,saccade):
colx = configColumnSaccX
coly = configColumnSaccY
self.baseTime = float(saccade[0][configColumnTime])
prevX = 0
prevY = 0
if saccade[0][configColumnSaccX] != "" or saccade[0][configColumnSaccY] != "":
prevX = int(saccade[0][configColumnSaccX])
prevY = int(saccade[0][configColumnSaccY])
prevT = 0.0
xvals = []
yvals = []
distvals = []
angles = []
if configStartBaseline:
xvals.append(-configBaselineDistance * 2.0)
yvals.append(self.baseline)
distvals.append(0.0)
angles.append(0.0)
xvals.append(-configBaselineDistance)
yvals.append(self.baseline)
distvals.append(0.0)
angles.append(0.0)
for samp in saccade:
if samp[colx] == "" or samp[coly] == "":
continue
if int(samp[colx]) == 0 or int(samp[coly]) == 0:
continue
gt = float(samp[configColumnTime])
x = int(samp[colx])
y = int(samp[coly])
d = math.sqrt((x-prevX)**2 + (y-prevY)**2)
timedelta = gt-prevT
#print "gt: " + str(gt) + " prevT: " + str(prevT)
self.amplitude = self.amplitude + (d / configPixelsPerDegree)
#print "d: " + str(d) + " timedelta: " + str(timedelta)
vel = d/timedelta
velDeg = vel / configPixelsPerDegree
if velDeg > configMaxIntersample:
continue
ang = math.degrees(math.atan2(prevX-x,prevY-y))
xvals.append(gt-self.baseTime)
yvals.append(velDeg)
angles.append(ang)
distvals.append(self.amplitude / configPixelsPerDegree)
prevX = x
prevY = y
prevT = gt
if configEndBaseline:
xvals.append((prevT-self.baseTime) + configBaselineDistance)
yvals.append(self.baseline)
distvals.append(distvals[len(distvals)-1])
angles.append(angles[len(angles)-1])
# Discard saccades that have too few velocity points for interpolation (at least two measured samples/one velocity point are necessary even with baseline velocity points added)
if len(xvals) <= 3:
self.isSaccade = False
return
if yvals[1] == 0.0:
yvals[1] = yvals[0]
if angles[1] == 0.0:
angles[1] = angles[2]
if configStartBaseline:
angles[0] = angles[1] # We couldn't set this initially, so set it now.
#print "samp: " + str(samp)
#print "xvals: " + str(xvals)
#print "yvals: " + str(yvals)
#print "angles: " + str(angles)
# Calculate angular eccentricity
lang = angles[1]
chang = 0.0
for ia in range(1,len(angles)):
a = angles[ia]
chang = chang + abs(a-lang)
lang = a
self.angle = chang / float(len(angles))
if configAngleEccentricity > 0 and chang / float(len(angles)) > configAngleEccentricity:
self.isSaccade = False
#print " * Angular eccentricity of saccade at time " + str(self.baseTime) + " exceeds limit"
return
#### Begin Interpolation #####
rng = float(saccade[len(saccade)-1][configColumnTime]) - self.baseTime
interp = scipy.interpolate.InterpolatedUnivariateSpline(xvals,yvals)
lowerbound = 0.0
upperbound = rng
if configStartBaseline:
lowerbound = -configBaselineDistance * 2.0
if configEndBaseline:
upperbound = rng + configBaselineDistance
#print "Lowerbound: " + str(lowerbound) + " Upperbound: " + str(upperbound)
interpxrange = numpy.linspace(lowerbound,upperbound,num=4*(len(xvals)))
newveloc = interp(interpxrange)
#### Error calculation #####
interpabs = lambda x: interp(x) <= configFixedBaseline and configFixedBaseline or interp(x)
integral = scipy.integrate.quad(interpabs, min(xvals), max(xvals))
if saccade[0][configColumnSaccX] == "" or saccade[0][configColumnSaccY] == "":
self.isSaccade = False
return
sx = int(saccade[0][configColumnSaccX])
sy = int(saccade[0][configColumnSaccY])
ex = sx
if saccade[len(saccade)-1][configColumnSaccX] != "":
ex = int(saccade[len(saccade)-1][configColumnSaccX])
ey = sy
if saccade[len(saccade)-1][configColumnSaccY] != "":
ey = int(saccade[len(saccade)-1][configColumnSaccY])
totalamp = 0.0
lastxp = sx
lastyp = sy
#### Duration calculation #####
stime = float(saccade[0][configColumnTime])
etime = float(saccade[len(saccade)-1][configColumnTime])
self.duration = etime - stime
if configStartBaseline:
self.duration = self.duration + configBaselineDistance
for sv in saccade:
if sv[configColumnSaccX] == '' or sv[configColumnSaccY] == '':
continue
x = int(sv[configColumnSaccX])
y = int(sv[configColumnSaccY])
totalamp = totalamp + math.sqrt((x-lastxp)**2 + (y-lastyp)**2)
lastxp = x
lastyp = y
self.integral = integral[0]
totalamp = totalamp / configPixelsPerDegree
#self.ampltitude = totalamp
if totalamp == 0.0:
self.isSaccade = False
totalamp = 0.000001 # avoid division by zero
#print "Total Amp: " + str(totalamp) + " Integral: " + str(integral[0])
#print "Integral2: " + str(integral)
self.errorRate = (abs(totalamp-integral[0])/totalamp) * 100.0
if self.errorRate > 100.0:
self.isSaccade = False
self.errorRate = 100.0
# Find peak
self.peakVelocity = max(newveloc)
# Symmetry calculation
peakindex = argmax(newveloc)
# lhs range
llowerbound = min(xvals)
lupperbound = interpxrange[peakindex]
# rhs range
rlowerbound = interpxrange[peakindex]
rupperbound = max(xvals)
leftintegral = scipy.integrate.quad(interp, llowerbound, lupperbound)
rightintegral = scipy.integrate.quad(interp, rlowerbound, rupperbound)
self.skew = scipy.stats.skew(newveloc)
return
def getPeakVelocity(self):
return self.peakVelocity
def getErrorRate(self):
return self.errorRate
def getAmplitude(self):
return self.amplitude
def getSkew(self):
return self.skew
def getDuration(self):
return self.duration
def getSaccadeData(self):
dat = []
for s in self.saccadeData:
s["PeakVelocity"] = self.peakVelocity
s["PredictedAmpl"] = self.integral
s["ErrorRate"] = self.errorRate
s["CumulativeAmpl"] = self.amplitude
s["Angle"] = self.angle
s["Duration"] = self.duration
s["Skew"] = self.skew
s["Baseline"] = self.baseline
dat.append(s)
return dat
def __str__( self ):
smsg = ""
if not self.isSaccade:
smsg = "\n Result: This is probably not a saccade"
pamp = str(self.integral)
if self.integral < 0.0:
pamp = "(failed)"
return "Saccade:\n BaseTime: " + str(self.baseTime) + "\n Duration: " + str(self.duration) + "\n Peak Vel: " + str(self.peakVelocity) + "\n Error: " + str(self.errorRate) + "\n Measured Amplitude: " + str(self.amplitude) + "\n Predicted Amplitude: " + pamp + "\n Angle: " + str(self.angle) + smsg
# Baseline class:
# - Computes a baseline for a given participant
class Baseline:
subject = []
baseline = 0.0
def __init__(self,subject):
#print "Baseline Samples: " + str(len(subject))
self.subject = subject
# XXX: FOR HIGHSPEED DATA TESTING ONLY
self.baseline = 2.73
return
prevT = 0.0
if subject[0][configColumnRawX] == "" or subject[0][configColumnRawY] == "":
self.subject = []
return
prevX = int(subject[0][configColumnRawX])
prevY = int(subject[0][configColumnRawY])
lastVel = 0.0
accelSum = 0.0
velSum = 0.0
bins = dict()
for b in range(0,100):
bins[b] = 0
cnt = 0
for samp in subject:
cnt = cnt + 1
if not configMonocular:
if cnt % 2 == 0:
continue
if samp[configColumnRawX] == '' or samp[configColumnRawY] == '':
continue
if int(samp[configColumnRawX]) == 0 or int(samp[configColumnRawY]) == 0:
continue
gt = float(samp[configColumnTime])
x = int(samp[configColumnRawX])
y = int(samp[configColumnRawY])
d = math.sqrt((x-prevX)**2 + (y-prevY)**2)
timedelta = gt-prevT
if timedelta == 0:
raise Exception("Time variable is not uniformly increasing. Are you running a binocular file with configMonocular set?")
vel = d/timedelta
velDeg = vel / configPixelsPerDegree
if velDeg < configMaxIntersample:
b = int(math.ceil(velDeg / 10.0))
if b > 0:
bins[b] = bins[b] + 1
accelSum = accelSum + ((velDeg - lastVel) / timedelta)
velSum = velSum + velDeg
prevX = x
prevY = y
prevT = gt
lastVel = velDeg
#accelSum = accelSum - lastVel
n = len(subject)
if not configMonocular:
n = len(subject) / 2
avgAccel = accelSum / float(n)
#avgAccel = accelSum
#print "Average Acceleration: " + str(avgAccel)
#print "Average Velocity: " + str(velSum / float(len(subject)))
largestv = 0
largest = 0
for b in range(0,100):
if bins[b] > largestv:
largestv = bins[b]
largest = b
mode = largest * 10
if configBaselineMethod == "mode":
self.baseline = mode
elif configBaselineMethod == "accel":
self.baseline = abs(avgAccel)
elif configBaselineMethod == "velocity":
self.baseline = velSum / n
else:
raise Exception("Invalid value for configBaselineMethod. Must be mode, accel or velocity.")
print "Baseline: " + str(self.baseline)
def getBaseline(self):
return 2.73
#return self.baseline
# Relies on blank values for sacx and sacy to terminate saccades
def sysSacc(self,eyeData,eye=""):
if(len(eyeData) == 0):
return []
saccades = []
currsac = []
cnt = 0
for samp in eyeData:
if eye != "":
samp["Eye"] = eye
samp["Baseline"] = self.baseline
if samp[configColumnSaccX].strip() == '' or samp[configColumnSaccY].strip() == '':
saccades.append(currsac)
currsac = []
continue
if cnt % configDecimationRate == 0:
currsac.append(samp)
cnt = cnt + 1
if len(currsac) > 0:
saccades.append(currsac)
print "Len: " + str(len(saccades))
return saccades
def compSacc(self,eyeData,eye=""):
if(len(eyeData) == 0):
return []
prevT = 0.0
prevX = int(eyeData[0][configColumnRawX])
prevY = int(eyeData[0][configColumnRawY])
saccades = []
currsac = []
cnt = 0
for samp in eyeData:
if cnt % configDecimationRate != 0:
continue
cnt = cnt + 1
if eye != "":
samp["Eye"] = eye
if samp[configColumnRawX] == '' or samp[configColumnRawY] == '':
continue
if int(samp[configColumnRawX]) == 0 or int(samp[configColumnRawY]) == 0:
continue
samp["Baseline"] = self.baseline
gt = float(samp[configColumnTime])
x = int(samp[configColumnRawX])
y = int(samp[configColumnRawY])
d = math.sqrt((x-prevX)**2 + (y-prevY)**2)
timedelta = gt-prevT
if timedelta == 0:
raise Exception("Time variable is not increasing. Are you running a binocular file with configMonocular set?")
vel = d/timedelta
velDeg = vel / configPixelsPerDegree
if velDeg < configMaxIntersample and velDeg > self.baseline:
currsac.append(samp)
if velDeg < self.baseline and len(currsac) > 0:
saccades.append(currsac)
currsac = []
prevX = x
prevY = y
prevT = gt
return saccades
def getSaccades(self):
if self.baseline == 0.0:
self.baseline = 10.0
if not configMonocular:
eyeA = [self.subject[i] for i in range(len(self.subject)) if i % 2 == 0]
eyeB = [self.subject[i] for i in range(len(self.subject)) if i % 2 != 0]
if configSaccMethod == "baseline":
return self.compSacc(eyeA,"A") + self.compSacc(eyeB,"B")
else:
return self.sysSacc(eyeA,"A") + self.sysSacc(eyeB,"B")
else:
if configSaccMethod == "baseline":
return self.compSacc(self.subject)
else:
return self.sysSacc(self.subject)
# FileParser class:
# - Knows how to parse data files into saccades
# - Return the next saccade using nextSaccade()
# - Returns [] when no more saccades remain
class FileParser:
data = []
headerColumns = dict()
headerKeys = dict()
subjectPtr = 0
lastSubject = ""
saccadePtr = 0
eye = []
progress = 0
fsize = 0
bytes = 0
subject = []
def __init__(self,file,addition):
self.fsize = os.path.getsize(file)
print "Opening data file: '" + file + "' (" + str(self.fsize) + " bytes)"
self.fp = open(file,'r')
self.fout = open(file + '_' + addition + '.out.dat', 'w')
hdr = self.fp.readline()
hdrfields = hdr.split(configDelimiter)
for i in range(0,len(hdrfields)):
self.headerColumns[hdrfields[i].strip()] = i
self.headerKeys[i] = hdrfields[i].strip()
print " * Header key: " + str(i) + ": " + self.headerKeys[i]
self.headerKeys[i+1] = "PeakVelocity"
self.headerKeys[i+2] = "PredictedAmpl"
self.headerKeys[i+3] = "ErrorRate"
self.headerKeys[i+4] = "CumulativeAmpl"
self.headerKeys[i+5] = "Angle"
self.headerKeys[i+6] = "Duration"
self.headerKeys[i+7] = "Baseline"
self.headerKeys[i+8] = "Skew"
self.headerColumns["PeakVelocity"] = i+1
self.headerColumns["PredictedAmpl"] = i+2
self.headerColumns["ErrorRate"] = i+3
self.headerColumns["CumulativeAmpl"] = i+4
self.headerColumns["Angle"] = i+5
self.headerColumns["Duration"] = i+6
self.headerColumns["Baseline"] = i+7
self.headerColumns["Skew"] = i+8
# We must call nextSubject() once initially to set lastSubject
self.nextSubject()
def makeFileHeader(self):
line = []
for i in range(0,len(self.headerKeys)):
line.append(self.headerKeys[i])
self.fout.write("\t".join(line)+"\n")
def nextSaccade(self):
if self.eye != []:
e = self.eye
self.eye = []
return e
if len(self.subject) == 0:
return []
start = 0
end = 0
while end < len(self.subject):
if self.subject[end][configColumnSaccX] == configTerminator or self.subject[end][configColumnSaccY] == configTerminator:
break
end = end + 1
if configMonocular:
tmp = self.subject[start:end]
self.subject = self.subject[end+1:]
return tmp
else:
self.eye = [self.subject[i] for i in range(start,end) if i % 2 != 0]
data = [self.subject[i] for i in range(start,end) if i % 2 == 0]
self.subject = self.subject[end+1:]
return data
def lowPass(self):
cutoff = float(configSampleRate)
fs = float(inputFrequency)
nyq = fs/2.
filterorder = 5
b,a = scipy.signal.filter_design.butter(filterorder,cutoff/nyq)
return scipy.signal.lfilter(b,a,self.subject)
def nextSubject(self):
print " ... " + str(self.progress) + "%"
while True:
line = self.fp.readline()
if line == "":
self.subject = self.data
self.data = []
return self.subject
self.bytes = self.bytes + len(line) + 1
self.progress = int((float(self.bytes) / float(self.fsize)) * 100.0)
fields = line.strip().split(configDelimiter)
d = dict()
for f in range(0,len(fields)):
d[self.headerKeys[f]] = fields[f].strip()
if not configColumnSubjectNr in d:
print str(d)
print str(self.headerKeys)
print str(self.headerColumns)
if d[configColumnSubjectNr] != self.lastSubject:
self.lastSubject = d[configColumnSubjectNr]
self.subject = self.data
self.data = [d]
self.subject = self.lowPass()
return self.subject
self.data.append(d)
def output(self,sacc):
for s in sacc:
line = []
for i in range(0,len(self.headerKeys)):
if not (self.headerKeys[i] in s):
line.append("")
continue
#print "Output Key: " + str(i) + ": " + self.headerKeys[i]
line.append(str(s[self.headerKeys[i]]))
self.fout.write("\t".join(line)+"\n")
def __str__(self):
s = ""
for i in self.subject:
s = s + "Subject: " + i[configColumnSubjectNr] + " Time: " + i[configColumnTime] + " SaccX: " + i[configColumnSaccX] + " SaccY: " + i[configColumnSaccY] + "\n"
return "FileParser class: Current subject has " + str(len(self.subject)) + " samples:\n" + s
# PROGRAM ENTRY POINT
def main():
fname = configInputFile
if len(sys.argv) > 1:
fname = sys.argv[1]
#if len(sys.argv) > 2:
# configFixedBaseline = float(sys.argv[2])
#
#if len(sys.argv) > 3:
# configDecimationRate = float(sys.argv[3])
# configBaselineDistance = 1.0 / (float(configSampleRate)/float(configDecimationRate))
# Print some diagnostic information
print "Saccade Peak Velocity measure"
print " * Input file: " + fname
if configMonocular:
print " * Eye Configuration: Monocular"
else:
print " * Eye Configuration: Binocular"
if configStartBaseline and configEndBaseline:
print " * Baselines: Both"
if configStartBaseline and not configEndBaseline:
print " * Baselines: Start"
if not configStartBaseline and configEndBaseline:
print " * Baselines: End"
if not configStartBaseline and not configEndBaseline:
print " * Baselines: None"
print " * Fixed Baseline: " + str(configFixedBaseline)
print " * Baseline Method: " + configBaselineMethod
print " * Angular eccentricity threshold: " + str(configAngleEccentricity) + " degrees"
print " * Decimation ratio: 1:" + str(configDecimationRate)
fileParser = FileParser(fname,configOutputFileLabel)
writeHeader = True
avgPeak = 0.0
avgCount = 0
avgPV = 0.0
while True:
subj = fileParser.nextSubject()
fileParser.subject = []
if subj == []:
break
baseline = Baseline(subj)
saccs = baseline.getSaccades()
for s in saccs:
if len(s) < 2:
continue
sv = SaccadeVelocity(s, configFixedBaseline)
if writeHeader:
fileParser.makeFileHeader()
writeHeader = False
d = sv.getSaccadeData()
fileParser.output(d)
#print "Average PeakVelocity: " + str(avgPeak / float(avgCount))
#print "Average PV: " + str(avgPV / float(avgCount))
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment