Spaces:
Running
on
Zero
Running
on
Zero
| from transformers import AutoConfig, AutoModelForSequenceClassification, AutoTokenizer | |
| from typing import Optional, Dict, Sequence, List | |
| import transformers | |
| from peft import PeftModel | |
| import torch | |
| from torch.nn.utils.rnn import pad_sequence | |
| from dataclasses import dataclass | |
| import pandas as pd | |
| from datasets import Dataset | |
| from tqdm import tqdm | |
| import numpy as np | |
| from huggingface_hub import hf_hub_download | |
| import os | |
| import pickle | |
| from sklearn import preprocessing | |
| import json | |
| import spaces | |
| import time | |
| class calculateDuration: | |
| def __init__(self, activity_name=""): | |
| self.activity_name = activity_name | |
| def __enter__(self): | |
| self.start_time = time.time() | |
| return self | |
| def __exit__(self, exc_type, exc_value, traceback): | |
| self.end_time = time.time() | |
| self.elapsed_time = self.end_time - self.start_time | |
| if self.activity_name: | |
| print(f"Elapsed time for {self.activity_name}: {self.elapsed_time:.6f} seconds") | |
| else: | |
| print(f"Elapsed time: {self.elapsed_time:.6f} seconds") | |
| from rdkit import RDLogger, Chem | |
| # Suppress RDKit INFO messages | |
| RDLogger.DisableLog('rdApp.*') | |
| # we have a dictionary to store the task types of the models | |
| #task_types = { | |
| # "admet_bioavailability_ma": "classification", | |
| # "admet_ppbr_az": "regression", | |
| # "admet_half_life_obach": "regression", | |
| #} | |
| # read the dataset descriptions | |
| with open("dataset_descriptions.json", "r") as f: | |
| dataset_description_temp = json.load(f) | |
| dataset_descriptions = dict() | |
| dataset_property_names = dict() | |
| dataset_task_types = dict() | |
| dataset_property_names_to_dataset = dict() | |
| for dataset in dataset_description_temp: | |
| dataset_name = dataset.lower() | |
| dataset_descriptions[dataset_name] = \ | |
| f"{dataset_description_temp[dataset]['task_name']} is a {dataset_description_temp[dataset]['task_type']} task, " + \ | |
| f"where the goal is to {dataset_description_temp[dataset]['description']}. \n" + \ | |
| f"More information can be found at {dataset_description_temp[dataset]['url']}." | |
| dataset_property_names[dataset_name] = dataset_description_temp[dataset]['task_name'] | |
| dataset_property_names_to_dataset[dataset_description_temp[dataset]['task_name']] = dataset_name | |
| dataset_task_types[dataset_name] = dataset_description_temp[dataset]['task_type'] | |
| class Scaler: | |
| def __init__(self, log=False): | |
| self.log = log | |
| self.offset = None | |
| self.scaler = None | |
| def fit(self, y): | |
| # make the values non-negative | |
| self.offset = np.min([np.min(y), 0.0]) | |
| y = y.reshape(-1, 1) - self.offset | |
| # scale the input data | |
| if self.log: | |
| y = np.log10(y + 1.0) | |
| self.scaler = preprocessing.StandardScaler().fit(y) | |
| def transform(self, y): | |
| y = y.reshape(-1, 1) - self.offset | |
| # scale the input data | |
| if self.log: | |
| y = np.log10(y + 1.0) | |
| y_scale = self.scaler.transform(y) | |
| return y_scale | |
| def inverse_transform(self, y_scale): | |
| y = self.scaler.inverse_transform(y_scale.reshape(-1, 1)) | |
| if self.log: | |
| y = 10.0**y - 1.0 | |
| y = y + self.offset | |
| return y | |
| def smart_tokenizer_and_embedding_resize( | |
| special_tokens_dict: Dict, | |
| tokenizer: transformers.PreTrainedTokenizer, | |
| model: transformers.PreTrainedModel, | |
| non_special_tokens = None, | |
| ): | |
| """Resize tokenizer and embedding. | |
| Note: This is the unoptimized version that may make your embedding size not be divisible by 64. | |
| """ | |
| num_new_tokens = tokenizer.add_special_tokens(special_tokens_dict) + tokenizer.add_tokens(non_special_tokens) | |
| num_old_tokens = model.get_input_embeddings().weight.shape[0] | |
| num_new_tokens = len(tokenizer) - num_old_tokens | |
| if num_new_tokens == 0: | |
| return | |
| model.resize_token_embeddings(len(tokenizer)) | |
| if num_new_tokens > 0: | |
| input_embeddings_data = model.get_input_embeddings().weight.data | |
| input_embeddings_avg = input_embeddings_data[:-num_new_tokens].mean(dim=0, keepdim=True) | |
| input_embeddings_data[-num_new_tokens:] = input_embeddings_avg | |
| print(f"Resized tokenizer and embedding from {num_old_tokens} to {len(tokenizer)} tokens.") | |
| class DataCollator(object): | |
| tokenizer: transformers.PreTrainedTokenizer | |
| source_max_len: int | |
| molecule_start_str: str | |
| end_str: str | |
| def augment_molecule(self, molecule: str) -> str: | |
| return self.sme.augment([molecule])[0] | |
| def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]: | |
| with calculateDuration("DataCollator"): | |
| sources = [] | |
| for example in instances: | |
| smiles = example['smiles'].strip() | |
| smiles = Chem.MolToSmiles(Chem.MolFromSmiles(smiles)) | |
| # get the properties except the smiles and mol_id cols | |
| #props = [example[col] if example[col] is not None else np.nan for col in sorted(example.keys()) if col not in ['smiles', 'is_aug']] | |
| source = f"{self.molecule_start_str}{smiles}{self.end_str}" | |
| sources.append(source) | |
| # Tokenize | |
| tokenized_sources_with_prompt = self.tokenizer( | |
| sources, | |
| max_length=self.source_max_len, | |
| truncation=True, | |
| add_special_tokens=False, | |
| ) | |
| input_ids = [torch.tensor(tokenized_source) for tokenized_source in tokenized_sources_with_prompt['input_ids']] | |
| input_ids = pad_sequence(input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id) | |
| data_dict = { | |
| 'input_ids': input_ids, | |
| 'attention_mask': input_ids.ne(self.tokenizer.pad_token_id), | |
| } | |
| return data_dict | |
| class MolecularPropertyPredictionModel(): | |
| def __init__(self, candidate_models): | |
| self.adapter_name = None | |
| # we need to keep track of the paths of adapter scalers | |
| # we don't want to download the same scaler multiple times | |
| self.apapter_scaler_path = dict() | |
| DEFAULT_PAD_TOKEN = "[PAD]" | |
| # load the base model | |
| config = AutoConfig.from_pretrained( | |
| "ChemFM/ChemFM-3B", | |
| num_labels=1, | |
| finetuning_task="classification", # this is not about our task type | |
| trust_remote_code=True, | |
| token = os.environ.get("TOKEN") | |
| ) | |
| self.base_model = AutoModelForSequenceClassification.from_pretrained( | |
| "ChemFM/ChemFM-3B", | |
| config=config, | |
| device_map="cuda", | |
| trust_remote_code=True, | |
| token = os.environ.get("TOKEN") | |
| ) | |
| # | |
| # load the tokenizer | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| "ChemFM/admet_ppbr_az", | |
| trust_remote_code=True, | |
| token = os.environ.get("TOKEN") | |
| ) | |
| special_tokens_dict = dict(pad_token=DEFAULT_PAD_TOKEN) | |
| smart_tokenizer_and_embedding_resize( | |
| special_tokens_dict=special_tokens_dict, | |
| tokenizer=self.tokenizer, | |
| model=self.base_model | |
| ) | |
| self.base_model.config.pad_token_id = self.tokenizer.pad_token_id | |
| self.data_collator = DataCollator( | |
| tokenizer=self.tokenizer, | |
| source_max_len=512, | |
| molecule_start_str="<molstart>", | |
| end_str="<eos>", | |
| ) | |
| # load the adapters firstly | |
| for adapter_name in candidate_models: | |
| adapter_id = candidate_models[adapter_name] | |
| print(f"loading {adapter_name} from {adapter_id}...") | |
| self.base_model.load_adapter(adapter_id, adapter_name=adapter_name, token = os.environ.get("TOKEN")) | |
| try: | |
| self.apapter_scaler_path[adapter_name] = hf_hub_download(adapter_id, filename="scaler.pkl", token = os.environ.get("TOKEN")) | |
| except: | |
| self.apapter_scaler_path[adapter_name] = None | |
| assert dataset_task_types[adapter_name] == "classification", f"{adapter_name} is not a regression task." | |
| self.base_model.to("cuda") | |
| def swith_adapter(self, adapter_name, adapter_id): | |
| # return flag: | |
| # keep: adapter is the same as the current one | |
| # switched: adapter is switched successfully | |
| # error: adapter is not found | |
| with calculateDuration("switching adapter"): | |
| if adapter_name == self.adapter_name: | |
| return "keep" | |
| # switch adapter | |
| try: | |
| #self.adapter_name = adapter_name | |
| #print(self.adapter_name, adapter_id) | |
| #self.lora_model = PeftModel.from_pretrained(self.base_model, adapter_id, token = os.environ.get("TOKEN")) | |
| #self.lora_model.to("cuda") | |
| #print(self.lora_model) | |
| self.base_model.set_adapter(adapter_name) | |
| self.base_model.eval() | |
| #if adapter_name not in self.apapter_scaler_path: | |
| # self.apapter_scaler_path[adapter_name] = hf_hub_download(adapter_id, filename="scaler.pkl", token = os.environ.get("TOKEN")) | |
| if self.apapter_scaler_path[adapter_name] and os.path.exists(self.apapter_scaler_path[adapter_name]): | |
| self.scaler = pickle.load(open(self.apapter_scaler_path[adapter_name], "rb")) | |
| else: | |
| self.scaler = None | |
| self.adapter_name = adapter_name | |
| return "switched" | |
| except Exception as e: | |
| # handle error | |
| return "error" | |
| def predict(self, valid_df, task_type): | |
| with calculateDuration("predicting"): | |
| with calculateDuration("construct dataloader"): | |
| test_dataset = Dataset.from_pandas(valid_df) | |
| # construct the dataloader | |
| test_loader = torch.utils.data.DataLoader( | |
| test_dataset, | |
| batch_size=16, | |
| collate_fn=self.data_collator, | |
| ) | |
| # predict | |
| y_pred = [] | |
| for i, batch in tqdm(enumerate(test_loader), total=len(test_loader), desc="Evaluating"): | |
| with torch.no_grad(): | |
| batch = {k: v.to(self.base_model.device) for k, v in batch.items()} | |
| print(self.base_model.device) | |
| print(batch) | |
| outputs = self.base_model(**batch) | |
| print(outputs) | |
| if task_type == "regression": # TODO: check if the model is regression or classification | |
| y_pred.append(outputs.logits.cpu().detach().numpy()) | |
| else: | |
| y_pred.append((torch.sigmoid(outputs.logits)).cpu().detach().numpy()) | |
| y_pred = np.concatenate(y_pred, axis=0) | |
| if task_type=="regression" and self.scaler is not None: | |
| y_pred = self.scaler.inverse_transform(y_pred) | |
| return y_pred | |
| def predict_single_smiles(self, smiles, task_type): | |
| with calculateDuration("predicting a single SMILES"): | |
| assert task_type in ["regression", "classification"] | |
| # check the SMILES string is valid | |
| if not Chem.MolFromSmiles(smiles): | |
| return None | |
| valid_df = pd.DataFrame([smiles], columns=['smiles']) | |
| results = self.predict(valid_df, task_type) | |
| # predict | |
| return results.item() | |
| def predict_file(self, df, task_type): | |
| with calculateDuration("predicting a file"): | |
| # we should add the index first | |
| df = df.reset_index() | |
| with calculateDuration("pre-checking SMILES"): | |
| # we need to check the SMILES strings are valid, the invalid ones will be moved to the last | |
| valid_idx = [] | |
| invalid_idx = [] | |
| for idx, smiles in enumerate(df['smiles']): | |
| if Chem.MolFromSmiles(smiles): | |
| valid_idx.append(idx) | |
| else: | |
| invalid_idx.append(idx) | |
| valid_df = df.loc[valid_idx] | |
| # get the smiles list | |
| valid_df_smiles = valid_df['smiles'].tolist() | |
| input_df = pd.DataFrame(valid_df_smiles, columns=['smiles']) | |
| results = self.predict(input_df, task_type) | |
| # add the results to the dataframe | |
| df.loc[valid_idx, 'prediction'] = results | |
| df.loc[invalid_idx, 'prediction'] = np.nan | |
| # drop the index column | |
| df = df.drop(columns=['index']) | |
| # phrase file | |
| return df |