122 lines
5.5 KiB
Python
122 lines
5.5 KiB
Python
|
#!/usr/bin/env python3
|
||
|
import logging
|
||
|
import math
|
||
|
import os
|
||
|
import sqlite3
|
||
|
import subprocess
|
||
|
import sys
|
||
|
|
||
|
logging.basicConfig(level=logging.INFO)
|
||
|
|
||
|
_QUEUE_LENGTH = 100
|
||
|
# TODO: Use cosine similarity as well
|
||
|
_DISTANCE_THRESHOLD = 4.0
|
||
|
|
||
|
if "XDG_DATA_HOME" in os.environ:
|
||
|
_MPDBLISS_DATA_HOME = os.path.expandvars("$XDG_DATA_HOME/mpdbliss")
|
||
|
else:
|
||
|
_MPDBLISS_DATA_HOME = os.path.expanduser("~/.local/share/mpdbliss")
|
||
|
|
||
|
|
||
|
def main():
|
||
|
mpd_queue = []
|
||
|
db_path = os.path.join(_MPDBLISS_DATA_HOME, "db.sqlite3")
|
||
|
logging.debug("Using DB path: %s." % (db_path,))
|
||
|
conn = sqlite3.connect(db_path)
|
||
|
conn.row_factory = sqlite3.Row
|
||
|
conn.execute('pragma foreign_keys=ON')
|
||
|
cur = conn.cursor()
|
||
|
|
||
|
current_song = subprocess.check_output(
|
||
|
["mpc", "current", '--format', '"%file%"'])
|
||
|
current_song = current_song.decode("utf-8").strip('" \r\n\t')
|
||
|
if current_song is "":
|
||
|
logging.warning("Currently played song could not be found.")
|
||
|
sys.exit(1)
|
||
|
logging.info("Currently played song is %s." % (current_song,))
|
||
|
|
||
|
# Get current song coordinates
|
||
|
cur.execute("SELECT id, tempo, amplitude, frequency, attack, filename FROM songs WHERE filename=?", (current_song,))
|
||
|
current_song = cur.fetchone()
|
||
|
if current_song is None:
|
||
|
logging.warning("Current song %s is not in db. You should update the db." %
|
||
|
(current_song["filename"],))
|
||
|
sys.exit(1)
|
||
|
|
||
|
for i in range(_QUEUE_LENGTH):
|
||
|
# Append current song to the mpd queue to avoid duplicates
|
||
|
mpd_queue.append(current_song["filename"])
|
||
|
# Get cached distances from db
|
||
|
cur.execute(
|
||
|
"SELECT id, filename, distance FROM (SELECT s2.id AS id, s2.filename AS filename, distances.distance AS distance FROM distances INNER JOIN songs AS s1 ON s1.id=distances.song1 INNER JOIN songs AS s2 on s2.id=distances.song2 WHERE s1.filename=? UNION SELECT s1.id as id, s1.filename AS filename, distances.distance as distance FROM distances INNER JOIN songs AS s1 ON s1.id=distances.song1 INNER JOIN songs AS s2 on s2.id=distances.song2 WHERE s2.filename=?) ORDER BY distance ASC",
|
||
|
(current_song["filename"], current_song["filename"]))
|
||
|
cached_distances = [row
|
||
|
for row in cur.fetchall()
|
||
|
if row["filename"] not in mpd_queue]
|
||
|
cached_distances_songs = [i["filename"] for i in cached_distances]
|
||
|
|
||
|
# If distance to closest song is ok, just add the song
|
||
|
if len(cached_distances) > 0:
|
||
|
if cached_distances[0]["distance"] < _DISTANCE_THRESHOLD:
|
||
|
# Push it on the queue
|
||
|
subprocess.check_call(["mpc", "add",
|
||
|
cached_distances[0]["filename"]])
|
||
|
# Continue using latest pushed song as current song
|
||
|
logging.info("Using cached distance. Found %s. Distance is %f." %
|
||
|
(current_song["filename"], cached_distances[0]["distance"]))
|
||
|
current_song = cached_distances[0]
|
||
|
continue
|
||
|
|
||
|
# Get all other songs coordinates
|
||
|
closest_song = None
|
||
|
cur.execute("SELECT id, tempo, amplitude, frequency, attack, filename FROM songs")
|
||
|
for tmp_song_data in cur.fetchall():
|
||
|
if(tmp_song_data["filename"] == current_song["filename"] or
|
||
|
tmp_song_data["filename"] in cached_distances_songs or
|
||
|
tmp_song_data["filename"] in mpd_queue):
|
||
|
# Skip current song and already processed songs
|
||
|
logging.debug("Skipping %s." % (tmp_song_data["filename"]))
|
||
|
continue
|
||
|
# Compute distance
|
||
|
distance = math.sqrt(
|
||
|
(current_song["tempo"] - tmp_song_data["tempo"])**2 +
|
||
|
(current_song["amplitude"] - tmp_song_data["amplitude"])**2 +
|
||
|
(current_song["frequency"] - tmp_song_data["frequency"])**2 +
|
||
|
(current_song["attack"] - tmp_song_data["attack"])**2
|
||
|
)
|
||
|
logging.debug("Distance between %s and %s is %f." %
|
||
|
(current_song["filename"],
|
||
|
tmp_song_data["filename"], distance))
|
||
|
# Store distance in db cache
|
||
|
try:
|
||
|
logging.debug("Storing distance in database.")
|
||
|
conn.execute(
|
||
|
"INSERT INTO distances(song1, song2, distance) VALUES(?, ?, ?)",
|
||
|
(current_song["id"], tmp_song_data["id"], distance))
|
||
|
conn.commit()
|
||
|
except sqlite3.IntegrityError:
|
||
|
logging.warning("Unable to insert distance in database.")
|
||
|
conn.rollback()
|
||
|
|
||
|
# If distance is ok, just add the song
|
||
|
if distance < _DISTANCE_THRESHOLD:
|
||
|
# Push it on the queue
|
||
|
subprocess.check_call(["mpc", "add", tmp_song_data["filename"]])
|
||
|
# Continue using latest pushed song as current song
|
||
|
logging.info("Found a close song: %s. Distance is %f." %
|
||
|
(tmp_song_data["filename"], distance))
|
||
|
current_song = tmp_song_data
|
||
|
break
|
||
|
elif closest_song is None or distance < closest_song[1]:
|
||
|
closest_song = (tmp_song_data, distance)
|
||
|
# If no song found, take the closest one
|
||
|
logging.info("No close enough song found. Using %s. Distance is %f." %
|
||
|
(closest_song[0]["filename"], closest_song[1]))
|
||
|
current_song = closest_song[0]
|
||
|
subprocess.check_call(["mpc", "add", closest_song[0]["filename"]])
|
||
|
conn.close()
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|