Commit 64151718 authored by Prasanna's avatar Prasanna
Browse files

adding async search edits

parent e94a9edb
......@@ -12,6 +12,10 @@ from skopt import Optimizer
from utils import *
import os
import argparse
from skopt.acquisition import gaussian_ei, gaussian_pi, gaussian_lcb
import numpy as np
from ExtremeGradientBoostingQuantileRegressor import ExtremeGradientBoostingQuantileRegressor
seed = 12345
def create_parser():
......@@ -97,13 +101,17 @@ if rank == 0:
parDict = {}
evalDict = {}
resultsList = []
parDict['kappa'] = 1.96
parDict['kappa'] = 0.0
init_x = []
delta = 0.05
patience = max(10, 3 * num_workers-1)
last_imp = 0
curr_best = math.inf
opt = Optimizer(space, base_estimator='RF', acq_optimizer='sampling',
opt = Optimizer(space, base_estimator=ExtremeGradientBoostingQuantileRegressor(), acq_optimizer='sampling',
acq_func='LCB', acq_func_kwargs=parDict, random_state=seed)
print("Master starting with %d workers" % num_workers)
print('Master starting with {} workers'.format(num_workers))
while closed_workers < num_workers:
data = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
source = status.Get_source()
......@@ -111,8 +119,7 @@ if rank == 0:
elapsed_time = float(time.time() - start_time)
print('elapsed_time:%1.3f'%elapsed_time)
if tag == tags.READY:
if eval_counter < max_evals and elapsed_time < max_time:
# Worker is ready, so send it a task
if last_imp < patience and eval_counter < max_evals and elapsed_time < max_time:
if starting_point is not None:
x = starting_point
if num_workers-1 > 0:
......@@ -122,7 +129,7 @@ if rank == 0:
if len(init_x) > 0:
x = init_x.pop(0)
else:
x = opt.ask(n_points=1)[0]
x = opt.ask(n_points=1, strategy='cl_min')[0]
key = str(x)
print('sample %s' % key)
if key in evalDict.keys():
......@@ -132,26 +139,36 @@ if rank == 0:
task['x'] = x
task['eval_counter'] = eval_counter
task['start_time'] = elapsed_time
print("Sending task %d to worker %d" % (eval_counter, source))
print('Sending task {} to worker {}'.format (eval_counter, source))
comm.send(task, dest=source, tag=tags.START)
eval_counter = eval_counter + 1
else:
comm.send(None, dest=source, tag=tags.EXIT)
elif tag == tags.DONE:
elif tag == tags.DONE:
result = data
result['end_time'] = elapsed_time
print("Got data from worker %d" % source)
print(result)
print('Got data from worker {}'.format(source))
resultsList.append(result)
x = result['x']
y = result['cost']
opt.tell(x, y)
percent_improv = -100*(y - curr_best)/curr_best
if y < curr_best:
if percent_improv >= delta or curr_best==math.inf:
curr_best = y
last_imp = 0
else:
last_imp = last_imp+1
print('curr_best={} percent_improv={} patience={}/{}'.format(curr_best, percent_improv, last_imp, patience))
elif tag == tags.EXIT:
print("Worker %d exited." % source)
print('Worker {} exited.'.format(source))
closed_workers = closed_workers + 1
print('Search finishing')
print('Search finished..')
y_best = np.min(opt.yi)
best_index = np.where(opt.yi==y_best)[0][0]
x_best = opt.Xi[best_index]
print('Best: x = {}; y={}'.format(y_best, x_best))
saveResults(resultsList, results_json_fname, results_csv_fname)
else:
# Worker processes execute code below
name = MPI.Get_processor_name()
......@@ -161,9 +178,9 @@ else:
task = comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
tag = status.Get_tag()
if tag == tags.START:
print(task)
result = evaluate(task['x'], task['eval_counter'], params, prob_dir, jobs_dir, results_dir)
result['start_time'] = task['start_time']
print(result)
comm.send(result, dest=0, tag=tags.DONE)
elif tag == tags.EXIT:
break
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment