Population Based Training in Ray Tune

Photo by Gary Bendig on Unsplash
# imports
import json
import os

from joblib import dump, load
from lightgbm import LGBMRegressor
from ray import tune
from ray.air.config import CheckpointConfig
from ray.air.config import RunConfig
from ray.tune.schedulers import PopulationBasedTraining
from ray.tune.tune_config import TuneConfig
from ray.tune.tuner import Tuner
from sklearn.datasets import load_boston
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import train_test_split
class TrainableForPBT(tune.Trainable):
def setup(self, config):
pass

def step(self):
pass
def setup(self, config, x_train, y_train):
self.current_config = config
self.x_train = x_train
self.y_train = y_train
# need to add about model
self.model = LGBMRegressor(**self.current_config)
# count on which iteration are we in
self.current_iteration = 0
self.current_step_score = None
def step(self):
self.current_iteration += 1
if self.current_iteration == 1:
self.model.fit(self.x_train, self.y_train)
else:
self.model.fit(self.x_train, self.y_train, init_model=self.model)

self.current_step_score = cross_val_score(estimator=self.model, X=self.x_train, y=self.y_train,
scoring='r2', cv=5).mean()
results_dict = {"r2_score": self.current_step_score}
return results_dict
def save_checkpoint(self, tmp_checkpoint_dir):
path = os.path.join(tmp_checkpoint_dir, "checkpoint")
with open(path, "w") as f:
f.write(json.dumps(
{"current_score": self.current_step_score, "current_step": self.current_iteration}))

path_for_model = os.path.join(tmp_checkpoint_dir, 'model.joblib')
dump(self.model, path_for_model)

return tmp_checkpoint_dir
def load_checkpoint(self, tmp_checkpoint_dir):
with open(os.path.join(tmp_checkpoint_dir, "checkpoint")) as f:
state = json.loads(f.read())
self.current_step_score = state["current_score"]
self.current_iteration = state["current_step"]

path_for_model = os.path.join(tmp_checkpoint_dir, 'model.joblib')
self.model = load(path_for_model)
def reset_config(self, new_config):
self.current_config = new_config
self.current_iteration = 0
self.current_step_score = None
self.model = LGBMRegressor(**self.current_config)
return True
class TrainableForPBT(tune.Trainable):
def setup(self, config, x_train, y_train):
self.current_config = config
self.x_train = x_train
self.y_train = y_train
# need to add about model
self.model = LGBMRegressor(**self.current_config)
# count on which iteration are we in
self.current_iteration = 0
self.current_step_score = None

def step(self):
self.current_iteration += 1
if self.current_iteration == 1:
self.model.fit(self.x_train, self.y_train)
else:
self.model.fit(self.x_train, self.y_train, init_model=self.model)

self.current_step_score = cross_val_score(estimator=self.model, X=self.x_train, y=self.y_train,
scoring='r2', cv=5).mean()
results_dict = {"r2_score": self.current_step_score}
return results_dict

def save_checkpoint(self, tmp_checkpoint_dir):
path = os.path.join(tmp_checkpoint_dir, "checkpoint")
with open(path, "w") as f:
f.write(json.dumps(
{"current_score": self.current_step_score, "current_step": self.current_iteration}))

path_for_model = os.path.join(tmp_checkpoint_dir, 'model.joblib')
dump(self.model, path_for_model)

return tmp_checkpoint_dir

def load_checkpoint(self, tmp_checkpoint_dir):
with open(os.path.join(tmp_checkpoint_dir, "checkpoint")) as f:
state = json.loads(f.read())
self.current_step_score = state["current_score"]
self.current_iteration = state["current_step"]

path_for_model = os.path.join(tmp_checkpoint_dir, 'model.joblib')
self.model = load(path_for_model)

def reset_config(self, new_config):
self.current_config = new_config
self.current_iteration = 0
self.current_step_score = None
self.model = LGBMRegressor(**self.current_config)
return True
param_space = {
"params": {
"learning_rate": tune.loguniform(1e-5, 1e-1), #between 0.00001 and 0.1
"num_leaves": tune.randint(5, 100), #between 5 and 100(exclusive)
"max_depth": tune.randint(1, 9), #between 1 and 9(exclusive)
},
}
pbt = PopulationBasedTraining(
time_attr="training_iteration",
perturbation_interval=4,
burn_in_period=10,
hyperparam_mutations=param_space["params"],
)
tune_config = TuneConfig(metric="r2_score", mode="max", search_alg=None, scheduler=pbt, num_samples=15, reuse_actors=True)
checkpoint_config = CheckpointConfig(checkpoint_score_attribute="r2_score", 
checkpoint_score_order="max",
checkpoint_frequency=4,
checkpoint_at_end=True)
run_config = RunConfig(name="pbt_experiment", 
local_dir='/Users/admin/Desktop/Dressler/Publications',
stop={"training_iteration": 30},
checkpoint_config=checkpoint_config)
X, y = load_boston(return_X_y=True)
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

trainable_with_parameters = tune.with_parameters(TrainableForPBT, x_train=x_train, y_train=y_train)

tuner = Tuner(trainable_with_parameters, param_space=param_space["params"], tune_config=tune_config, run_config=run_config)
analysis = tuner.fit()
best_trial_id = analysis._experiment_analysis.best_trial.trial_id
best_result = analysis.get_best_result()
best_result_score = best_result.metrics['r2_score']
best_config = best_result.config
best_checkpoint = best_result.checkpoint
best_checkpoint_dir = best_result.checkpoint.as_directory()
print(f"BEST TRIAL ID: {best_trial_id}")
print(f"BEST RESULT SCORE: {best_result_score}")
print(f"BEST CHECKPOINT DIRECTORY: {best_checkpoint}")

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store