blissify/client/client.py

122 lines
5.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import logging
import math
import os
import sqlite3
import subprocess
import sys
logging.basicConfig(level=logging.INFO)
_QUEUE_LENGTH = 100
# TODO: Use cosine similarity as well
_DISTANCE_THRESHOLD = 4.0
if "XDG_DATA_HOME" in os.environ:
_MPDBLISS_DATA_HOME = os.path.expandvars("$XDG_DATA_HOME/mpdbliss")
else:
_MPDBLISS_DATA_HOME = os.path.expanduser("~/.local/share/mpdbliss")
def main():
mpd_queue = []
db_path = os.path.join(_MPDBLISS_DATA_HOME, "db.sqlite3")
logging.debug("Using DB path: %s." % (db_path,))
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
conn.execute('pragma foreign_keys=ON')
cur = conn.cursor()
current_song = subprocess.check_output(
["mpc", "current", '--format', '"%file%"'])
current_song = current_song.decode("utf-8").strip('" \r\n\t')
if current_song is "":
logging.warning("Currently played song could not be found.")
sys.exit(1)
logging.info("Currently played song is %s." % (current_song,))
# Get current song coordinates
cur.execute("SELECT id, tempo, amplitude, frequency, attack, filename FROM songs WHERE filename=?", (current_song,))
current_song = cur.fetchone()
if current_song is None:
logging.warning("Current song %s is not in db. You should update the db." %
(current_song["filename"],))
sys.exit(1)
for i in range(_QUEUE_LENGTH):
# Append current song to the mpd queue to avoid duplicates
mpd_queue.append(current_song["filename"])
# Get cached distances from db
cur.execute(
"SELECT id, filename, distance FROM (SELECT s2.id AS id, s2.filename AS filename, distances.distance AS distance FROM distances INNER JOIN songs AS s1 ON s1.id=distances.song1 INNER JOIN songs AS s2 on s2.id=distances.song2 WHERE s1.filename=? UNION SELECT s1.id as id, s1.filename AS filename, distances.distance as distance FROM distances INNER JOIN songs AS s1 ON s1.id=distances.song1 INNER JOIN songs AS s2 on s2.id=distances.song2 WHERE s2.filename=?) ORDER BY distance ASC",
(current_song["filename"], current_song["filename"]))
cached_distances = [row
for row in cur.fetchall()
if row["filename"] not in mpd_queue]
cached_distances_songs = [i["filename"] for i in cached_distances]
# If distance to closest song is ok, just add the song
if len(cached_distances) > 0:
if cached_distances[0]["distance"] < _DISTANCE_THRESHOLD:
# Push it on the queue
subprocess.check_call(["mpc", "add",
cached_distances[0]["filename"]])
# Continue using latest pushed song as current song
logging.info("Using cached distance. Found %s. Distance is %f." %
(current_song["filename"], cached_distances[0]["distance"]))
current_song = cached_distances[0]
continue
# Get all other songs coordinates
closest_song = None
cur.execute("SELECT id, tempo, amplitude, frequency, attack, filename FROM songs")
for tmp_song_data in cur.fetchall():
if(tmp_song_data["filename"] == current_song["filename"] or
tmp_song_data["filename"] in cached_distances_songs or
tmp_song_data["filename"] in mpd_queue):
# Skip current song and already processed songs
logging.debug("Skipping %s." % (tmp_song_data["filename"]))
continue
# Compute distance
distance = math.sqrt(
(current_song["tempo"] - tmp_song_data["tempo"])**2 +
(current_song["amplitude"] - tmp_song_data["amplitude"])**2 +
(current_song["frequency"] - tmp_song_data["frequency"])**2 +
(current_song["attack"] - tmp_song_data["attack"])**2
)
logging.debug("Distance between %s and %s is %f." %
(current_song["filename"],
tmp_song_data["filename"], distance))
# Store distance in db cache
try:
logging.debug("Storing distance in database.")
conn.execute(
"INSERT INTO distances(song1, song2, distance) VALUES(?, ?, ?)",
(current_song["id"], tmp_song_data["id"], distance))
conn.commit()
except sqlite3.IntegrityError:
logging.warning("Unable to insert distance in database.")
conn.rollback()
# If distance is ok, just add the song
if distance < _DISTANCE_THRESHOLD:
# Push it on the queue
subprocess.check_call(["mpc", "add", tmp_song_data["filename"]])
# Continue using latest pushed song as current song
logging.info("Found a close song: %s. Distance is %f." %
(tmp_song_data["filename"], distance))
current_song = tmp_song_data
break
elif closest_song is None or distance < closest_song[1]:
closest_song = (tmp_song_data, distance)
# If no song found, take the closest one
logging.info("No close enough song found. Using %s. Distance is %f." %
(closest_song[0]["filename"], closest_song[1]))
current_song = closest_song[0]
subprocess.check_call(["mpc", "add", closest_song[0]["filename"]])
conn.close()
if __name__ == "__main__":
main()