Skip to content

Utilities

add_constraint_information(data, vocs)

determine if constraints have been satisfied based on data and vocs

Source code in xopt/utils.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def add_constraint_information(data: pd.DataFrame, vocs: VOCS) -> pd.DataFrame:
    """
    determine if constraints have been satisfied based on data and vocs

    """
    temp_data = data.copy()

    # transform data s.t. negative values imply feasibility
    constraints = vocs.constraints

    for name, value in constraints.items():
        if isinstance(value, GreaterThanConstraint):
            temp_data[name] = -(data[name] - value.value)
        else:
            temp_data[name] = data[name] - value.value

        # add column to original dataframe
        data[f"{name}_feas"] = temp_data[name] < 0.0

    # add a column feas to show feasibility based on all of the constraints
    data["feas"] = data[[f"{ele}_feas" for ele in constraints]].all(axis=1)

    return data

copy_generator(generator)

Create a deep copy of a given generator. Moves any data saved on the gpu in the deepcopy of the generator to the cpu.

Parameters:

Name Type Description Default
generator Generator
required

Returns:

Name Type Description
generator_copy Generator
list_of_fields_on_gpu list[str]
Source code in xopt/utils.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def copy_generator(generator: Generator) -> Tuple[Generator, List[str]]:
    """
    Create a deep copy of a given generator.
    Moves any data saved on the gpu in the deepcopy of the generator to the cpu.

    Parameters
    ----------
    generator : Generator

    Returns
    -------
    generator_copy : Generator
    list_of_fields_on_gpu : list[str]
    """
    generator_copy = deepcopy(generator)
    generator_copy, list_of_fields_on_gpu = recursive_move_data_gpu_to_cpu(
        generator_copy
    )
    return generator_copy, list_of_fields_on_gpu

explode_all_columns(data)

explode all data columns in dataframes that are lists or np.arrays

Source code in xopt/utils.py
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def explode_all_columns(data: pd.DataFrame):
    """explode all data columns in dataframes that are lists or np.arrays"""
    # TODO: rework the whole list return type handling - this is really slow
    list_types = []
    lengths = []
    for name, val in data.iloc[0].items():
        if isinstance(val, (list, np.ndarray)):
            list_types.append(name)
            try:
                lengths.append(len(val))
            except TypeError:
                # handle case when a zero length ndarray is passed
                lengths.append(1)
    if len(list_types) > 0:
        if len(set(lengths)) > 1:
            raise ValueError("evaluator outputs that are lists must match in size")

        if data.shape[0] == 1:
            # Fast path for most common experimental case of 1 candidate per step
            df = _explode_pandas_modified(data, list_types, lengths[0])
            return df
        else:
            if len(list_types):
                try:
                    # dtype of return is object, but we have floats...
                    # https://github.com/pandas-dev/pandas/issues/34923
                    # also, this method is implemented in Python and uses slow calls
                    return data.explode(list_types, ignore_index=True)
                except ValueError:
                    raise ValueError(
                        "evaluator outputs that are lists must match in size"
                    )
    else:
        return data

get_function(name)

Returns a function from a fully qualified name or global name.

Source code in xopt/utils.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def get_function(name):
    """
    Returns a function from a fully qualified name or global name.
    """

    # Check if already a function
    if callable(name):
        return name

    if not isinstance(name, str):
        raise ValueError(f"{name} must be callable or a string.")

    if name in globals():
        if callable(globals()[name]):
            f = globals()[name]
        else:
            raise ValueError(f"global {name} is not callable")
    else:
        if "." in name:
            # try to import
            m_name, f_name = name.rsplit(".", 1)
            module = importlib.import_module(m_name)
            f = getattr(module, f_name)
        else:
            raise Exception(f"function {name} does not exist")

    return f

get_function_defaults(f)

Returns a dict of the non-empty POSITIONAL_OR_KEYWORD arguments.

See the inspect documentation for defaults.

Source code in xopt/utils.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def get_function_defaults(f):
    """
    Returns a dict of the non-empty POSITIONAL_OR_KEYWORD arguments.

    See the `inspect` documentation for defaults.
    """
    defaults = {}
    for k, v in inspect.signature(f).parameters.items():
        if v.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD:
            # print(k, v.default, v.kind)
            if v.default != inspect.Parameter.empty:
                defaults[k] = v.default
    return defaults

get_generator_name(generator)

Returns the name of the generator if it has one as an attribute, otherwise returns the module name.

Source code in xopt/utils.py
60
61
62
63
64
def get_generator_name(generator):
    """
    Returns the name of the generator if it has one as an attribute, otherwise returns the module name.
    """
    return getattr(generator, "name", inspect.getmodule(generator).__name__)

get_local_region(center_point, vocs, fraction=0.1)

calculates the bounds of a local region around a center point with side lengths equal to a fixed fraction of the input space for each variable

Source code in xopt/utils.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def get_local_region(center_point: dict, vocs: VOCS, fraction: float = 0.1) -> dict:
    """
    calculates the bounds of a local region around a center point with side lengths
    equal to a fixed fraction of the input space for each variable

    """
    if not center_point.keys() == set(vocs.variable_names):
        raise KeyError("Center point keys must match vocs variable names")

    bounds = {}
    widths = {
        ele: vocs.variables[ele].domain[1] - vocs.variables[ele].domain[0]
        for ele in vocs.variable_names
    }

    for name in vocs.variable_names:
        bounds[name] = [
            np.max(
                (
                    center_point[name] - widths[name] * fraction,
                    vocs.variables[name].domain[0],
                )
            ),
            np.min(
                (
                    center_point[name] + widths[name] * fraction,
                    vocs.variables[name].domain[1],
                )
            ),
        ]

    return bounds

get_n_required_fuction_arguments(f)

Counts the number of required function arguments using the inspect module.

Source code in xopt/utils.py
111
112
113
114
115
116
117
118
119
120
def get_n_required_fuction_arguments(f):
    """
    Counts the number of required function arguments using the `inspect` module.
    """
    n = 0
    for k, v in inspect.signature(f).parameters.items():
        if v.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD:
            if v.default == inspect.Parameter.empty:
                n += 1
    return n

has_device_field(module, device)

Checks if given module has a given device.

Parameters:

Name Type Description Default
module Module
required
device device
required

Returns:

Type Description
True/False : bool
Source code in xopt/utils.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def has_device_field(module: torch.nn.Module, device: torch.device) -> bool:
    """
    Checks if given module has a given device.

    Parameters
    ----------
    module : torch.nn.Module
    device : torch.device

    Returns
    -------
    True/False : bool
    """
    for parameter in module.parameters():
        if parameter.device == device:
            return True
    for buffer in module.buffers():
        if buffer.device == device:
            return True
    return False

isotime(include_microseconds=False)

UTC to ISO 8601 with Local TimeZone information without microsecond

Source code in xopt/utils.py
51
52
53
54
55
56
57
def isotime(include_microseconds=False):
    """UTC to ISO 8601 with Local TimeZone information without microsecond"""
    t = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).astimezone()
    if not include_microseconds:
        t = t.replace(microsecond=0)

    return t.isoformat()

nsga2_to_cnsga_file_format(input_dir, output_dir, last_n_lines=None)

Convert the output of the NSGA2 generator to the same format used by the CNSGA generator. This function is useful for interfacing with existing analysis tools.

The converted output is guaranteed to be reproducible for the same input data. To this end, the converted filenames follow the format from the CNSGA generator cnsga_population_<timestamp>.csv and cnsga_offspring_<timtestamp>.csv where the timestamp is the generation index in seconds since epoch.

Parameters:

input_dir : str The output directory of the NSGA2 generator. output_dir : str Where the converted output will be saved. Directory will be created if necessary. last_n_lines : int Read only the last n lines of each CSV file (useful for pulling final generations in large files)

Source code in xopt/utils.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def nsga2_to_cnsga_file_format(
    input_dir: str, output_dir: str, last_n_lines: int | None = None
):
    """
    Convert the output of the NSGA2 generator to the same format used by the CNSGA generator. This
    function is useful for interfacing with existing analysis tools.

    The converted output is guaranteed to be reproducible for the same input data. To this end, the
    converted filenames follow the format from the CNSGA generator `cnsga_population_<timestamp>.csv`
    and `cnsga_offspring_<timtestamp>.csv` where the timestamp is the generation index in seconds since
    epoch.

    Parameters:
    -----------
    input_dir : str
        The output directory of the NSGA2 generator.
    output_dir : str
        Where the converted output will be saved. Directory will be created if necessary.
    last_n_lines : int
        Read only the last n lines of each CSV file (useful for pulling final generations in large files)
    """
    # Load the population and data files
    pop = read_csv(
        os.path.join(input_dir, "populations.csv"), last_n_lines=last_n_lines
    )
    dat = read_csv(os.path.join(input_dir, "data.csv"), last_n_lines=last_n_lines)

    # Setup the output dir
    os.makedirs(output_dir, exist_ok=True)

    # Write each population (separated by key `xopt_generation`)
    generations = 0
    for generation in pop["xopt_generation"].unique():
        # Filter population data for this generation
        gen_pop = pop[pop["xopt_generation"] == generation]

        # Build filename
        timestamp = (
            datetime.datetime.fromtimestamp(int(generation), tz=datetime.timezone.utc)
            .isoformat()
            .replace(":", "_")
        )
        filename = f"cnsga_population_{timestamp}.csv"
        filename = os.path.join(output_dir, filename)

        # Write generation data to file
        gen_pop.to_csv(filename, index_label="xopt_index")
        logger.debug(
            f'Saved population file for generation {generation} to "{filename}"'
        )
        generations += 1

    # Write each set of the offspring (separated by key `xopt_parent_generation`)
    offsprings = 0
    for generation in dat["xopt_parent_generation"].unique():
        # Filter population data for this generation
        gen_pop = dat[dat["xopt_parent_generation"] == generation]

        # Build the filename
        # Note: to match CNSGA generator behavior where the candidates just received by the generator
        # have the same timestamp as the completed generation, the `xopt_parent_generation` needs
        # additional factor of one.
        timestamp = (
            datetime.datetime.fromtimestamp(
                int(generation) + 1, tz=datetime.timezone.utc
            )
            .isoformat()
            .replace(":", "_")
        )
        filename = f"cnsga_offspring_{timestamp}.csv"
        filename = os.path.join(output_dir, filename)

        # Write generation data to file
        gen_pop.to_csv(filename, index_label="xopt_index")
        logging.debug(
            f'Saved offspring file for generation {generation} to "{filename}"'
        )
        offsprings += 1

    # Some logging
    logging.info(
        f'Converted NSGA2Generator output "{input_dir}" to CNSGA2Generator format at "{output_dir}" ({generations} population files, {offsprings} offspring files, last_n_lines={last_n_lines})'
    )

read_csv(filepath, last_n_lines=None, **kwargs)

Wrapper for pandas.read_csv with addition of only reading last n lines.

Source code in xopt/utils.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def read_csv(filepath: str, last_n_lines: int | None = None, **kwargs) -> pd.DataFrame:
    """
    Wrapper for pandas.read_csv with addition of only reading last n lines.
    """
    if last_n_lines is not None:
        # Count total lines (subtract 1 for header)
        with open(filepath, "r") as f:
            total_lines = sum(1 for _ in f) - 1

        # Calculate how many lines to skip
        skiprows = max(0, total_lines - last_n_lines)
        skiprows = range(1, skiprows + 1)
    else:
        skiprows = None

    # Read with skiprows
    return pd.read_csv(filepath, skiprows=skiprows, **kwargs)

read_xopt_csv(*files)

Read several Xopt-style CSV files into data

Parameters:

Name Type Description Default
file1

One or more Xopt csv files

required
file2

One or more Xopt csv files

required

Returns:

Type Description
DataFrame

DataFrame with xopt_index as the index column

Source code in xopt/utils.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def read_xopt_csv(*files):
    """
    Read several Xopt-style CSV files into data

    Parameters
    ----------
    file1, file2, ...: path-like
        One or more Xopt csv files

    Returns
    -------
    pd.DataFrame
        DataFrame with xopt_index as the index column
    """
    dfs = []
    for file in files:
        df = pd.read_csv(file, index_col="xopt_index")
        dfs.append(df)
    return pd.concat(dfs)

recursive_move_data_gpu_to_cpu(pydantic_object)

A recersive method to find all the data of a pydantic object which is stored on the gpu and then move that data to the cpu.

Parameters:

Name Type Description Default
pydantic_object BaseModel
required

Returns:

Name Type Description
pydantic_object BaseModel
list_of_fields_on_gpu list[str]
Source code in xopt/utils.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def recursive_move_data_gpu_to_cpu(
    pydantic_object: BaseModel,
) -> Tuple[BaseModel, List[str]]:
    """
    A recersive method to find all the data of a pydantic object
    which is stored on the gpu and then move that data to the cpu.

    Parameters
    ----------
    pydantic_object : BaseModel

    Returns
    -------
    pydantic_object : BaseModel
    list_of_fields_on_gpu : list[str]

    """
    pydantic_object_dict = pydantic_object.model_dump()
    list_of_fields_on_gpu = [pydantic_object.__class__.__name__]

    for field_name, field_value in pydantic_object_dict.items():
        if isinstance(field_value, BaseModel):
            result = recursive_move_data_gpu_to_cpu(field_value)
            pydantic_object_dict[field_name] = result[0]
            list_of_fields_on_gpu.append(result[1])
        if isinstance(field_value, torch.Tensor):
            if field_value.device.type == "cuda":
                pydantic_object_dict[field_name] = field_value.cpu()
                list_of_fields_on_gpu.append(field_name)
        elif isinstance(field_value, torch.nn.Module):
            if has_device_field(field_value, torch.device("cuda")):
                pydantic_object_dict[field_name] = field_value.cpu()
                list_of_fields_on_gpu.append(field_name)

    return pydantic_object, list_of_fields_on_gpu

safe_call(func, *args, **kwargs)

Safely call the function, catching all exceptions. Returns a dict

Parameters:

Name Type Description Default
func Callable

Function to call.

required
args tuple

Arguments to pass to the function.

()
kwargs dict

Keyword arguments to pass to the function.

{}

Returns:

Name Type Description
outputs dict

result: result of the function call exception: exception raised by the function call traceback: traceback of the exception runtime: runtime of the function call in seconds

Source code in xopt/utils.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def safe_call(func, *args, **kwargs):
    """
    Safely call the function, catching all exceptions.
    Returns a dict

    Parameters
    ----------
    func : Callable
        Function to call.
    args : tuple
        Arguments to pass to the function.
    kwargs : dict
        Keyword arguments to pass to the function.

    Returns
    -------
    outputs : dict
        result: result of the function call
        exception: exception raised by the function call
        traceback: traceback of the exception
        runtime: runtime of the function call in seconds
    """

    t = time.perf_counter()
    outputs = {}
    try:
        result = func(*args, **kwargs)
        outputs["exception"] = None
        outputs["traceback"] = ""
    except Exception:
        exc_tuple = sys.exc_info()
        error_str = traceback.format_exc()
        outputs = {}
        result = None
        outputs["exception"] = exc_tuple
        outputs["traceback"] = error_str
    finally:
        outputs["result"] = result
        outputs["runtime"] = time.perf_counter() - t
    return outputs