Marks.Guitar-Assistant / src /spotify_data_fetch.py
Raheel Abdul Rehman
Initial Push
c31d1ca
import os
import time
import random
import spotipy
import pandas as pd
from spotipy.oauth2 import SpotifyClientCredentials
from logger import get_logger # pylint: disable=import-error
logger = get_logger(__name__)
def read_spotify_track_data(track_ids: list,
output_file: str,
save_every: int = 5000,
max_records: int = 25000) -> pd.DataFrame:
"""
Fetches Spotify track data in batches with a tiny delay between batches to avoid rate limits.
Args:
track_ids (list): Spotify track ids
save_every (int): Save checkpoint after this many records
output_file (str): File path for partial results
max_records (int): Max number of tracks to fetch in one run
Returns:
pd.DataFrame: Track data (song, artist, id, etc.)
"""
try:
client_id = "754888cc4fa4486daea9cb7917e176fc"
client_secret = "3f383e012f7442c18851a668b63849dc"
sp = spotipy.Spotify(
auth_manager=SpotifyClientCredentials(
client_id=client_id,
client_secret=client_secret
)
)
# Load partial results if they exist
if os.path.exists(output_file):
existing = pd.read_parquet(output_file)
processed_ids = set(existing["spotify_song_id"])
else:
existing = pd.DataFrame()
processed_ids = set()
# Filter only unprocessed IDs
remaining_ids = [tid for tid in track_ids if tid not in processed_ids]
# Limit to max_records this run
remaining_ids = remaining_ids[:max_records]
results = []
total = len(remaining_ids)
for i in range(0, total, 50):
batch = remaining_ids[i:i+50]
response = sp.tracks(batch)
for track in response["tracks"]:
if track is None:
continue
track_id = track["id"]
track_name = track["name"]
artist_names = ", ".join([artist["name"] for artist in track["artists"]])
results.append({
"spotify_song_id": track_id,
"track_name": track_name,
"artist_names": artist_names
})
# Tiny random delay to avoid hitting rate limit
time.sleep(random.uniform(0.2, 0.5))
if (i // 50) % (save_every // 50) == 0 and results:
df_partial = pd.concat([existing, pd.DataFrame(results)], ignore_index=True)
df_partial.to_parquet(output_file, index=False)
# Final save
final_df = pd.concat([existing, pd.DataFrame(results)], ignore_index=True)
final_df.to_parquet(output_file, index=False)
print(f"Run complete. Total saved: {len(final_df)} tracks")
return final_df
except Exception as e:
logger.error("Failed fetching spotify data : %s", e)
raise
if __name__ == '__main__':
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(BASE_DIR, "..", "data", "raw", "songs_data.parquet")
chord_data = pd.read_parquet(file_path)
track_id_list = [str(id) for id in chord_data["spotify_song_id"] if str(id).lower() != 'none']
output_path = os.path.join(BASE_DIR, "..", "data", "raw", "spotify_tracks.parquet")
spotify_track_data = read_spotify_track_data(track_id_list, output_path)
os.makedirs(os.path.dirname(output_path), exist_ok=True)
spotify_track_data.to_parquet(output_path, index=False)