Working on code for Noel2013
Started serial handling TODO : Serial in main loop + args
This commit is contained in:
parent
c59cbd10e7
commit
4d96e938c6
103
Noel2013/noel.py
103
Noel2013/noel.py
@ -21,6 +21,9 @@
|
|||||||
from sound4python import sound
|
from sound4python import sound
|
||||||
from multiprocessing import Process
|
from multiprocessing import Process
|
||||||
import math
|
import math
|
||||||
|
import serial
|
||||||
|
import sys
|
||||||
|
import getopt
|
||||||
|
|
||||||
|
|
||||||
class _Getch:
|
class _Getch:
|
||||||
@ -46,45 +49,109 @@ def play_wave(frequency=440, nb_secs=1.):
|
|||||||
sine_wave = []
|
sine_wave = []
|
||||||
for i in range(math.ceil(nb_secs*framerate)+1):
|
for i in range(math.ceil(nb_secs*framerate)+1):
|
||||||
sine_wave.append(int(16384*math.sin(2*math.pi*frequency*i/framerate)))
|
sine_wave.append(int(16384*math.sin(2*math.pi*frequency*i/framerate)))
|
||||||
|
|
||||||
sound(sine_wave)
|
sound(sine_wave)
|
||||||
|
|
||||||
|
|
||||||
|
def read_serial(ser):
|
||||||
|
line = ser.readline()
|
||||||
|
line = line.decode().strip("\r\n")
|
||||||
|
line = line.split(" ")
|
||||||
|
return [int(i or 0) for i in line]
|
||||||
|
|
||||||
framerate = 16000
|
framerate = 16000
|
||||||
processes = []
|
processes = []
|
||||||
frequency = 440
|
frequencies = {"FA": 698.5, "SOL": 784, "LA": 880, "SIb": 932, "DO": 1046.5}
|
||||||
|
notes = frequencies.keys()
|
||||||
|
thresholds = []
|
||||||
|
measures = []
|
||||||
|
|
||||||
|
serial_port = "/dev/ttyACM0"
|
||||||
|
serial_speed = 115200
|
||||||
|
|
||||||
|
try:
|
||||||
|
opts, args = getopt.getopt(sys.argv[1:], "hs:", ["help", "serial="])
|
||||||
|
|
||||||
|
for opt, arg in opts:
|
||||||
|
if opt in ("-h", "--help"):
|
||||||
|
print("Touchless 3D tracking with color mapping")
|
||||||
|
print("\nUsage : "+sys.argv[0]+" [OPTIONS]")
|
||||||
|
print("\nTrack the position of your hand in 3D and map it " +
|
||||||
|
"in RGB space")
|
||||||
|
print("\nOptions :")
|
||||||
|
print("\t-h (--help) \t display this help message")
|
||||||
|
print("\t-s (--serial=) \t change serial port (default is " +
|
||||||
|
"/dev/tty/ACM0")
|
||||||
|
sys.exit(0)
|
||||||
|
elif opt in ("-s", "--serial"):
|
||||||
|
serial_port = arg
|
||||||
|
except getopt.GetoptError:
|
||||||
|
pass
|
||||||
|
|
||||||
# Handle serial opening
|
# Handle serial opening
|
||||||
# *** TODO ***
|
try:
|
||||||
|
ser = serial.Serial(serial_port, serial_speed)
|
||||||
|
except Exception as e:
|
||||||
|
sys.exit("Invalid serial port options : "+str(e))
|
||||||
|
|
||||||
|
try:
|
||||||
|
ser.open()
|
||||||
|
except Exception as e:
|
||||||
|
sys.exit("Error while opening serial port : "+str(e))
|
||||||
|
|
||||||
|
if not ser.isOpen():
|
||||||
|
sys.exit("Serial port not opened")
|
||||||
|
|
||||||
|
ser.flushInput()
|
||||||
|
ser.flushOutput()
|
||||||
|
|
||||||
# Handle calibration
|
# Handle calibration
|
||||||
# *** TODO ***
|
for i in len(notes):
|
||||||
|
print("Calibrating note "+notes[i])
|
||||||
|
print("Touch and release the key now... Will get 10k samples.")
|
||||||
|
|
||||||
|
for sample in range(10000):
|
||||||
|
measures[sample] = read_serial(ser)
|
||||||
|
|
||||||
|
measures.sort()
|
||||||
|
max_diff = 0
|
||||||
|
max_diff_index = 0
|
||||||
|
|
||||||
|
for sample in range(10000-1):
|
||||||
|
if measures[sample+1]-measures[sample]:
|
||||||
|
max_diff = measures[sample+1]-measures[sample]
|
||||||
|
max_diff_index = sample
|
||||||
|
|
||||||
|
thresholds[i] = (measures[max_diff_index+1]+measures[max_diff_index])/2
|
||||||
|
|
||||||
# Main loop
|
# Main loop
|
||||||
# *** TODO *** : Add fetch from the serial also
|
# *** TODO *** : Add fetch from the serial also
|
||||||
print("Running... Press q to quit.")
|
print("Running... Press q to quit.")
|
||||||
running = True
|
running = True
|
||||||
while running:
|
while running:
|
||||||
|
serial_input = read_serial(ser)
|
||||||
|
|
||||||
|
for note in range(notes):
|
||||||
|
|
||||||
|
|
||||||
char = getch()
|
char = getch()
|
||||||
if char == "q":
|
if char == "q":
|
||||||
print("Exiting...")
|
print("Exiting...")
|
||||||
running = False
|
running = False
|
||||||
continue
|
continue
|
||||||
elif char == "a":
|
if char == "a":
|
||||||
frequency = 440
|
frequency = frequencies[0]
|
||||||
elif char == "b":
|
if char == "b":
|
||||||
frequency = 493.88
|
frequency = frequencies[1]
|
||||||
elif char == "c":
|
if char == "c":
|
||||||
frequency = 523.25
|
frequency = frequencies[2]
|
||||||
elif char == "d":
|
if char == "d":
|
||||||
frequency = 587.33
|
frequency = frequencies[3]
|
||||||
elif char == "e":
|
if char == "e":
|
||||||
frequency = 659.26
|
frequency = frequencies[4]
|
||||||
elif char == "f":
|
if char == "f":
|
||||||
frequency = 698.46
|
frequency = frequencies[5]
|
||||||
elif char == "g":
|
if char == "g":
|
||||||
frequency = 783.99
|
frequency = frequencies[6]
|
||||||
else:
|
else:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
100
Noel2013/sound4python.py
Normal file
100
Noel2013/sound4python.py
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
|
||||||
|
try: import tempfile, wave, subprocess, os, signal, struct
|
||||||
|
except:
|
||||||
|
print("E: sound4python is unable to import a combination of %s"%
|
||||||
|
("tempfile, wave, subprocess, os, signal, struct"))
|
||||||
|
|
||||||
|
FNULL = open(os.devnull,'w')
|
||||||
|
def launchWithoutConsole(args,output=False):
|
||||||
|
"""Launches args windowless and waits until finished"""
|
||||||
|
startupinfo = None
|
||||||
|
if( 'STARTUPINFO' in dir(subprocess) ):
|
||||||
|
startupinfo = subprocess.STARTUPINFO()
|
||||||
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
||||||
|
if( output ):
|
||||||
|
return subprocess.Popen(args, stdin=subprocess.PIPE,
|
||||||
|
stdout=subprocess.PIPE,startupinfo=startupinfo, )
|
||||||
|
else:
|
||||||
|
return subprocess.Popen(args, stdin=subprocess.PIPE,
|
||||||
|
stdout=FNULL ,stderr=FNULL, startupinfo=startupinfo)
|
||||||
|
|
||||||
|
# sound4python
|
||||||
|
def sound(itr,samprate=16000,autoscale=True,output=False):
|
||||||
|
try: import numpy as np; foundNumpy=True;
|
||||||
|
except:
|
||||||
|
foundNumpy=False;
|
||||||
|
|
||||||
|
#for now, assume 1-D iterable
|
||||||
|
mult = 1
|
||||||
|
if( autoscale ):
|
||||||
|
mult = 32767.0 / max(itr)
|
||||||
|
#mult = 128.0 / max(itr)
|
||||||
|
|
||||||
|
#create file in memory
|
||||||
|
#with tempfile.SpooledTemporaryFile() as memFile:
|
||||||
|
memFile = tempfile.SpooledTemporaryFile()
|
||||||
|
|
||||||
|
#create wave write objection pointing to memFile
|
||||||
|
waveWrite = wave.open(memFile,'wb')
|
||||||
|
waveWrite.setsampwidth(2) # int16 default
|
||||||
|
waveWrite.setnchannels(1) # mono default
|
||||||
|
waveWrite.setframerate(samprate) # 8kHz default
|
||||||
|
|
||||||
|
wroteFrames=False
|
||||||
|
# Not satisfied by the numpy implementation, buzzing and so...
|
||||||
|
#Let's try to create sound from NumPy vector
|
||||||
|
#if( foundNumpy ):
|
||||||
|
# if( type(itr)==np.array ):
|
||||||
|
# if( itr.ndim == 1 or itr.shape.count(1) == itr.ndim - 1 ):
|
||||||
|
# waveWrite.writeframes( (mult*itr.flatten()).astype(np.int16).tostring() )
|
||||||
|
# wroteFrames=True
|
||||||
|
# else: #we have np, but the iterable isn't a vector
|
||||||
|
# waveWrite.writeframes( (mult*np.array(itr)).astype(np.int16).tostring() )
|
||||||
|
# wroteFrames=True
|
||||||
|
#if( not wroteFrames and not foundNumpy ):
|
||||||
|
#python w/o np doesn't have "short"/"int16", "@h" is "native,aligned short"
|
||||||
|
# BAD : waveWrite.writeframes( struct.pack(len(itr)*"@h",[int(mult*itm) for itm in itr]) )
|
||||||
|
writeValues = []
|
||||||
|
for itm in itr:
|
||||||
|
packed_value = struct.pack('h', int(mult*itm))
|
||||||
|
waveWrite.writeframes(packed_value)
|
||||||
|
|
||||||
|
wroteFrames=True
|
||||||
|
|
||||||
|
if( not wroteFrames ):
|
||||||
|
print("E: Unable to create sound. Only 1D numpy arrays and numerical lists are supported.")
|
||||||
|
waveWrite.close()
|
||||||
|
return None
|
||||||
|
|
||||||
|
#configure the file object, memFile, as if it has just been opened for reading
|
||||||
|
memFile.seek(0)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# getting here means wroteFrames == True
|
||||||
|
#print("\nAttempting to play a mono audio stream of length")
|
||||||
|
#print(" %.2f seconds (%.3f thousand samples at sample rate of %.3f kHz)"%
|
||||||
|
# ( 1.0*len(itr)/samprate , len(itr)/1000. , int(samprate)/1000.) )
|
||||||
|
p=launchWithoutConsole(['sox','-','-d'])
|
||||||
|
except:
|
||||||
|
print("E: Unable to launch sox.")
|
||||||
|
print("E: Please ensure that sox is installed and on the path.")
|
||||||
|
print("E: Try 'sox -h' to test sox installation.")
|
||||||
|
waveWrite.close()
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
p.communicate(memFile.read())
|
||||||
|
p.wait()
|
||||||
|
except:
|
||||||
|
print("E: Unable to send in-memory wave file to stdin of sox subprocess.")
|
||||||
|
waveWrite.close()
|
||||||
|
return None
|
||||||
|
#os.kill(p.pid,signal.CTRL_C_EVENT)
|
||||||
|
#end def sound(itr,samprate=8000,autoscale=True)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user