Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import random | |
| import spotipy | |
| import pandas as pd | |
| from spotipy.oauth2 import SpotifyClientCredentials | |
| from logger import get_logger # pylint: disable=import-error | |
| logger = get_logger(__name__) | |
| def read_spotify_track_data(track_ids: list, | |
| output_file: str, | |
| save_every: int = 5000, | |
| max_records: int = 25000) -> pd.DataFrame: | |
| """ | |
| Fetches Spotify track data in batches with a tiny delay between batches to avoid rate limits. | |
| Args: | |
| track_ids (list): Spotify track ids | |
| save_every (int): Save checkpoint after this many records | |
| output_file (str): File path for partial results | |
| max_records (int): Max number of tracks to fetch in one run | |
| Returns: | |
| pd.DataFrame: Track data (song, artist, id, etc.) | |
| """ | |
| try: | |
| client_id = "754888cc4fa4486daea9cb7917e176fc" | |
| client_secret = "3f383e012f7442c18851a668b63849dc" | |
| sp = spotipy.Spotify( | |
| auth_manager=SpotifyClientCredentials( | |
| client_id=client_id, | |
| client_secret=client_secret | |
| ) | |
| ) | |
| # Load partial results if they exist | |
| if os.path.exists(output_file): | |
| existing = pd.read_parquet(output_file) | |
| processed_ids = set(existing["spotify_song_id"]) | |
| else: | |
| existing = pd.DataFrame() | |
| processed_ids = set() | |
| # Filter only unprocessed IDs | |
| remaining_ids = [tid for tid in track_ids if tid not in processed_ids] | |
| # Limit to max_records this run | |
| remaining_ids = remaining_ids[:max_records] | |
| results = [] | |
| total = len(remaining_ids) | |
| for i in range(0, total, 50): | |
| batch = remaining_ids[i:i+50] | |
| response = sp.tracks(batch) | |
| for track in response["tracks"]: | |
| if track is None: | |
| continue | |
| track_id = track["id"] | |
| track_name = track["name"] | |
| artist_names = ", ".join([artist["name"] for artist in track["artists"]]) | |
| results.append({ | |
| "spotify_song_id": track_id, | |
| "track_name": track_name, | |
| "artist_names": artist_names | |
| }) | |
| # Tiny random delay to avoid hitting rate limit | |
| time.sleep(random.uniform(0.2, 0.5)) | |
| if (i // 50) % (save_every // 50) == 0 and results: | |
| df_partial = pd.concat([existing, pd.DataFrame(results)], ignore_index=True) | |
| df_partial.to_parquet(output_file, index=False) | |
| # Final save | |
| final_df = pd.concat([existing, pd.DataFrame(results)], ignore_index=True) | |
| final_df.to_parquet(output_file, index=False) | |
| print(f"Run complete. Total saved: {len(final_df)} tracks") | |
| return final_df | |
| except Exception as e: | |
| logger.error("Failed fetching spotify data : %s", e) | |
| raise | |
| if __name__ == '__main__': | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| file_path = os.path.join(BASE_DIR, "..", "data", "raw", "songs_data.parquet") | |
| chord_data = pd.read_parquet(file_path) | |
| track_id_list = [str(id) for id in chord_data["spotify_song_id"] if str(id).lower() != 'none'] | |
| output_path = os.path.join(BASE_DIR, "..", "data", "raw", "spotify_tracks.parquet") | |
| spotify_track_data = read_spotify_track_data(track_id_list, output_path) | |
| os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
| spotify_track_data.to_parquet(output_path, index=False) | |