Source code for finetuner.run

import time
from typing import Any, Dict, Iterator, Optional

from finetuner.client import FinetunerV1Client
from finetuner.console import console, print_examples, print_metrics
from finetuner.constants import (
    ARTIFACT_ID,
    ARTIFACTS_DIR,
    CREATED,
    FAILED,
    STARTED,
    STATUS,
    SYNTHESIS_TASK,
    TRAIN_DATA,
    TRAINING_TASK,
)
from finetuner.excepts import RunFailedError, RunInProgressError, RunPreparingError
from finetuner.hubble import download_artifact


[docs]class Run: """Class for a run. :param client: Client object for sending api requests. :param name: Name of the run. :param experiment_name: Name of the experiment. :param config: Configuration for the run. :param created_at: Creation time of the run. :param description: Optional description of the run. :param train_data: The name of the `DocumentArray` created if this run is a data synthesis job. """ def __init__( self, client: FinetunerV1Client, name: str, experiment_name: str, config: dict, created_at: str, description: str = '', task: str = TRAINING_TASK, train_data: Optional[str] = None, ): self._client = client self._name = name self._experiment_name = experiment_name self._config = config self._created_at = created_at self._description = description self._run = self._get_run() self.task = task self._train_data = train_data @property def name(self) -> str: """Get the name of the :class:`Run`.""" return self._name @property def config(self) -> dict: """Get the config of the :class:`Run`.""" return self._config @property def train_data(self) -> str: """Get the data generated by the :class:`Run` In the case that it was a Synthesis job, if it is a training job, a ValueError is thrown. """ if self.task != SYNTHESIS_TASK: raise ValueError(f'{self.task} run does not produce data.') else: self._check_run_status_finished() if self._train_data: return self._train_data else: run = self._get_run() try: train_data = run[TRAIN_DATA] except KeyError: raise ValueError(f'run {self.name} has no train_data.') self._train_data = train_data return train_data def _get_run(self) -> dict: """Get Run object as dict.""" return self._client.get_run( experiment_name=self._experiment_name, run_name=self._name )
[docs] def status(self) -> dict: """Get :class:`Run` status. :returns: A dict representing the :class:`Run` status. """ return self._client.get_run_status( experiment_name=self._experiment_name, run_name=self._name )
[docs] def logs(self) -> str: """Check the :class:`Run` logs. :returns: A string dump of the run logs. """ self._check_run_status_started() return self._client.get_run_logs( experiment_name=self._experiment_name, run_name=self._name )
[docs] def stream_logs(self, interval: int = 5) -> Iterator[str]: """Stream the :class:`Run` logs lively. :param interval: The time interval to sync the status of finetuner `Run`. :yield: An iterators keep stream the logs from server. """ status = self.status()[STATUS] msg_template = ( 'Preparing to run, logs will be ready to pull when ' '`status` is `STARTED`. Current status is `%s`' ) with console.status(msg_template % status, spinner="dots") as rich_status: while status == CREATED: time.sleep(interval) status = self.status()[STATUS] rich_status.update(msg_template % status) return self._client.stream_run_logs( experiment_name=self._experiment_name, run_name=self._name )
[docs] def metrics(self) -> Dict[str, Dict[str, float]]: """Get the evaluation metrics of the :class:`Run`. :return: dictionary with evaluation metrics before and after fine-tuning. """ self._check_run_status_finished() return self._client.get_run_metrics( experiment_name=self._experiment_name, run_name=self._name )
[docs] def display_metrics(self): """ Prints a table of retrieval metrics before and after fine-tuning """ metrics = self.metrics() for stage in metrics: print_metrics(stage, metrics[stage])
[docs] def example_results(self) -> Dict[str, Any]: """Get the results of example queries from the evaluation data of the :class:`Run`. :return: dictionary with results before and after fine-tuning. """ self._check_run_status_finished() return self._client.get_run_examples( experiment_name=self._experiment_name, run_name=self._name )
[docs] def display_examples(self, k: int = 5): """ Prints a table of results of example queries before and after fine-tuning. :param k: maximal number of results per query to display """ example_results = self.example_results() for stage in example_results: print_examples(stage, example_results[stage], k=k)
def _check_run_status_finished(self): status = self.status()[STATUS] if status in [CREATED, STARTED]: raise RunInProgressError( 'The run needs to be finished in order to save the artifact.' ) if status == FAILED: raise RunFailedError( 'The run failed, please check the `logs` for detailed information.' ) def _check_run_status_started(self): status = self.status()[STATUS] if status == CREATED: raise RunPreparingError( 'Preparing to run, logs will be ready to pull when ' '`status` is `STARTED`.' )
[docs] def save_artifact(self, directory: str = ARTIFACTS_DIR) -> str: """Save artifact if the :class:`Run` is finished. :param directory: Directory where the artifact will be stored. :returns: A string object that indicates the download path. """ self._check_run_status_finished() return download_artifact( client=self._client, artifact_id=self._run[ARTIFACT_ID], run_name=self._name, directory=directory, )
@property def artifact_id(self): """Get artifact id of the :class:`Run`. An artifact in finetuner contains fine-tuned model and its metadata. Such as preprocessing function, collate function. This id could be useful if you want to directly pull the artifact from the cloud storage, such as using `FinetunerExecutor`. :return: Artifact id as string object. """ self._check_run_status_finished() return self._run[ARTIFACT_ID]