Skip to content

Evaluator

Bases: XoptBaseModel

Xopt Evaluator for handling the parallel execution of an evaluate function.

Parameters:

Name Type Description Default
function Callable

Function to evaluate.

required
function_kwargs dict

Any kwargs to pass on to this function.

{}
max_workers int

Maximum number of workers.

1
executor NormalExecutor

NormalExecutor or any instantiated Executor object

required
vectorized bool

If true, lists of evaluation points will be sent to the evaluator function to be processed in parallel instead of evaluated separately via mapping.

False
Source code in xopt/evaluator.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
class Evaluator(XoptBaseModel):
    """
    Xopt Evaluator for handling the parallel execution of an evaluate function.

    Parameters
    ----------
    function : Callable
        Function to evaluate.
    function_kwargs : dict, default={}
        Any kwargs to pass on to this function.
    max_workers : int, default=1
        Maximum number of workers.
    executor : NormalExecutor
        NormalExecutor or any instantiated Executor object
    vectorized : bool, default=False
        If true, lists of evaluation points will be sent to the evaluator
        function to be processed in parallel instead of evaluated separately via
        mapping.
    """

    function: Callable
    max_workers: int = Field(1, ge=1)
    executor: NormalExecutor = Field(exclude=True)  # Do not serialize
    function_kwargs: dict = Field({})
    vectorized: bool = Field(False)

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @model_validator(mode="before")
    def validate_all(cls, values: Dict) -> Dict:
        """
        Validate all inputs before initializing the Evaluator.

        Parameters
        ----------
        values : dict
            The input values to validate.

        Returns
        -------
        dict
            The validated input values.
        """
        f = get_function(values["function"])
        kwargs = values.get("function_kwargs", {})
        kwargs = {**get_function_defaults(f), **kwargs}
        values["function"] = f
        values["function_kwargs"] = kwargs

        max_workers = values.pop("max_workers", 1)

        executor = values.pop("executor", None)
        if not executor:
            executor = (
                ProcessPoolExecutor(max_workers=max_workers)
                if max_workers > 1
                else DummyExecutor()
            )

        # Cast as a NormalExecutor
        values["executor"] = NormalExecutor[type(executor)](executor=executor)
        values["max_workers"] = max_workers

        return values

    def evaluate(self, input: Dict, **kwargs) -> Dict:
        """
        Evaluate a single input dict using Evaluator.function with
        Evaluator.function_kwargs.

        Further kwargs are passed to the function.

        Parameters
        ----------
        input : dict
            The input dictionary to evaluate.
        **kwargs : dict
            Additional keyword arguments to pass to the function.

        Returns
        -------
        dict
            The evaluation result.
        """
        return self.safe_function(input, **{**self.function_kwargs, **kwargs})

    def evaluate_data(
        self,
        input_data: Union[
            pd.DataFrame,
            List[Dict[str, float]],
            Dict[str, List[float]],
            Dict[str, float],
        ],
    ) -> pd.DataFrame:
        """
        Evaluate a dataframe of inputs.

        Parameters
        ----------
        input_data : Union[pd.DataFrame, List[Dict[str, float]], Dict[str, List[float]], Dict[str, float]]
            The input data to evaluate.

        Returns
        -------
        pd.DataFrame
            The evaluation results.
        """
        if self.vectorized:
            output_data = self.safe_function(input_data, **self.function_kwargs)
        else:
            # This construction is needed to avoid a pickle error
            # translate input data into pandas dataframes
            if not isinstance(input_data, DataFrame):
                try:
                    input_data = DataFrame(input_data)
                except ValueError:
                    input_data = DataFrame(input_data, index=[0])

            inputs = input_data.to_dict("records")

            funcs = [self.function] * len(inputs)
            kwargs = [self.function_kwargs] * len(inputs)

            output_data = self.executor.map(
                safe_function_for_map,
                funcs,
                inputs,
                kwargs,
            )

        return pd.concat(
            [input_data, DataFrame(output_data, index=input_data.index)], axis=1
        )

    def safe_function(self, *args, **kwargs) -> Dict:
        """
        Safely call the function, handling exceptions.

        Parameters
        ----------
        *args : tuple
            Positional arguments to pass to the function.
        **kwargs : dict
            Keyword arguments to pass to the function.

        Returns
        -------
        dict
            The safe function outputs.
        """
        return safe_function(self.function, *args, **kwargs)

    def submit(self, input: Dict) -> Future:
        """
        Submit a single input to the executor.

        Parameters
        ----------
        input : dict
            The input dictionary to submit.

        Returns
        -------
        Future
            The Future object representing the submitted task.
        """
        if not isinstance(input, dict):
            raise ValueError("input must be a dictionary")
        # return self.executor.submit(self.function, input, **self.function_kwargs)
        # Must call a function outside of the class
        # See: https://stackoverflow.com/questions/44144584/typeerror-cant-pickle-thread-lock-objects
        return self.executor.submit(
            safe_function, self.function, input, **self.function_kwargs
        )

    def submit_data(self, input_data: pd.DataFrame) -> List[Future]:
        """
        Submit a dataframe of inputs to the executor.

        Parameters
        ----------
        input_data : pd.DataFrame
            The input data to submit.

        Returns
        -------
        List[Future]
            A list of Future objects representing the submitted tasks.
        """
        input_data = pd.DataFrame(input_data)  # cast to dataframe for consistency

        if self.vectorized:
            # Single submission, cast to numpy array
            inputs = input_data.to_dict(orient="list")
            for key, value in inputs.items():
                inputs[key] = np.array(value)
            futures = [self.submit(inputs)]  # Single item
        else:
            # Do not use iterrows or itertuples.
            futures = [self.submit(inputs) for inputs in input_data.to_dict("records")]

        return futures

evaluate(input, **kwargs)

Evaluate a single input dict using Evaluator.function with Evaluator.function_kwargs.

Further kwargs are passed to the function.

Parameters:

Name Type Description Default
input dict

The input dictionary to evaluate.

required
**kwargs dict

Additional keyword arguments to pass to the function.

{}

Returns:

Type Description
dict

The evaluation result.

Source code in xopt/evaluator.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def evaluate(self, input: Dict, **kwargs) -> Dict:
    """
    Evaluate a single input dict using Evaluator.function with
    Evaluator.function_kwargs.

    Further kwargs are passed to the function.

    Parameters
    ----------
    input : dict
        The input dictionary to evaluate.
    **kwargs : dict
        Additional keyword arguments to pass to the function.

    Returns
    -------
    dict
        The evaluation result.
    """
    return self.safe_function(input, **{**self.function_kwargs, **kwargs})

evaluate_data(input_data)

Evaluate a dataframe of inputs.

Parameters:

Name Type Description Default
input_data Union[DataFrame, List[Dict[str, float]], Dict[str, List[float]], Dict[str, float]]

The input data to evaluate.

required

Returns:

Type Description
DataFrame

The evaluation results.

Source code in xopt/evaluator.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def evaluate_data(
    self,
    input_data: Union[
        pd.DataFrame,
        List[Dict[str, float]],
        Dict[str, List[float]],
        Dict[str, float],
    ],
) -> pd.DataFrame:
    """
    Evaluate a dataframe of inputs.

    Parameters
    ----------
    input_data : Union[pd.DataFrame, List[Dict[str, float]], Dict[str, List[float]], Dict[str, float]]
        The input data to evaluate.

    Returns
    -------
    pd.DataFrame
        The evaluation results.
    """
    if self.vectorized:
        output_data = self.safe_function(input_data, **self.function_kwargs)
    else:
        # This construction is needed to avoid a pickle error
        # translate input data into pandas dataframes
        if not isinstance(input_data, DataFrame):
            try:
                input_data = DataFrame(input_data)
            except ValueError:
                input_data = DataFrame(input_data, index=[0])

        inputs = input_data.to_dict("records")

        funcs = [self.function] * len(inputs)
        kwargs = [self.function_kwargs] * len(inputs)

        output_data = self.executor.map(
            safe_function_for_map,
            funcs,
            inputs,
            kwargs,
        )

    return pd.concat(
        [input_data, DataFrame(output_data, index=input_data.index)], axis=1
    )

safe_function(*args, **kwargs)

Safely call the function, handling exceptions.

Parameters:

Name Type Description Default
*args tuple

Positional arguments to pass to the function.

()
**kwargs dict

Keyword arguments to pass to the function.

{}

Returns:

Type Description
dict

The safe function outputs.

Source code in xopt/evaluator.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def safe_function(self, *args, **kwargs) -> Dict:
    """
    Safely call the function, handling exceptions.

    Parameters
    ----------
    *args : tuple
        Positional arguments to pass to the function.
    **kwargs : dict
        Keyword arguments to pass to the function.

    Returns
    -------
    dict
        The safe function outputs.
    """
    return safe_function(self.function, *args, **kwargs)

submit(input)

Submit a single input to the executor.

Parameters:

Name Type Description Default
input dict

The input dictionary to submit.

required

Returns:

Type Description
Future

The Future object representing the submitted task.

Source code in xopt/evaluator.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def submit(self, input: Dict) -> Future:
    """
    Submit a single input to the executor.

    Parameters
    ----------
    input : dict
        The input dictionary to submit.

    Returns
    -------
    Future
        The Future object representing the submitted task.
    """
    if not isinstance(input, dict):
        raise ValueError("input must be a dictionary")
    # return self.executor.submit(self.function, input, **self.function_kwargs)
    # Must call a function outside of the class
    # See: https://stackoverflow.com/questions/44144584/typeerror-cant-pickle-thread-lock-objects
    return self.executor.submit(
        safe_function, self.function, input, **self.function_kwargs
    )

submit_data(input_data)

Submit a dataframe of inputs to the executor.

Parameters:

Name Type Description Default
input_data DataFrame

The input data to submit.

required

Returns:

Type Description
List[Future]

A list of Future objects representing the submitted tasks.

Source code in xopt/evaluator.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def submit_data(self, input_data: pd.DataFrame) -> List[Future]:
    """
    Submit a dataframe of inputs to the executor.

    Parameters
    ----------
    input_data : pd.DataFrame
        The input data to submit.

    Returns
    -------
    List[Future]
        A list of Future objects representing the submitted tasks.
    """
    input_data = pd.DataFrame(input_data)  # cast to dataframe for consistency

    if self.vectorized:
        # Single submission, cast to numpy array
        inputs = input_data.to_dict(orient="list")
        for key, value in inputs.items():
            inputs[key] = np.array(value)
        futures = [self.submit(inputs)]  # Single item
    else:
        # Do not use iterrows or itertuples.
        futures = [self.submit(inputs) for inputs in input_data.to_dict("records")]

    return futures

validate_all(values)

Validate all inputs before initializing the Evaluator.

Parameters:

Name Type Description Default
values dict

The input values to validate.

required

Returns:

Type Description
dict

The validated input values.

Source code in xopt/evaluator.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@model_validator(mode="before")
def validate_all(cls, values: Dict) -> Dict:
    """
    Validate all inputs before initializing the Evaluator.

    Parameters
    ----------
    values : dict
        The input values to validate.

    Returns
    -------
    dict
        The validated input values.
    """
    f = get_function(values["function"])
    kwargs = values.get("function_kwargs", {})
    kwargs = {**get_function_defaults(f), **kwargs}
    values["function"] = f
    values["function_kwargs"] = kwargs

    max_workers = values.pop("max_workers", 1)

    executor = values.pop("executor", None)
    if not executor:
        executor = (
            ProcessPoolExecutor(max_workers=max_workers)
            if max_workers > 1
            else DummyExecutor()
        )

    # Cast as a NormalExecutor
    values["executor"] = NormalExecutor[type(executor)](executor=executor)
    values["max_workers"] = max_workers

    return values

yaml(**kwargs)

serialize first then dump to yaml string

Source code in xopt/pydantic.py
231
232
233
234
235
236
237
238
def yaml(self, **kwargs):
    """serialize first then dump to yaml string"""
    output = json.loads(
        self.to_json(
            **kwargs,
        )
    )
    return yaml.dump(output)