Evaluating robustness of exterior lightguide#

In this example, we use Speos and optiSLang to analyze the robustness of a prismatic light guide and quantify the failure rate due to production tolerances in an automated way. We will understand which tolerances lead to regulation fails for the automotive Day-Time-Running lights and which tolerances must be improved to increase the optical performance. Additionally, we will evaluate the scattering of the homogeneity (RMS-contrast) and the lit appearance (average luminance) to see what the worst design looks like due to tolerances.

Problem description#

The optical simulation project includes a lightguide with an LED. During the vehicle assemble step, the relative position of LED can have an impact on the optical performance of the lightguide, i.e. RMS-contrast and average luminance.

The workflow includes the following steps: - Definition of the variation analysis boundaries - optiSLang Project creation and workflow setup - optiSLang project execution - Connect to PySpeos server - PySpeos execution - Result evaluation

This workflow will generate output as a robustness summary table is printed out to the console.

# Perform required imports
# ------------------------

import os
from pathlib import Path
import time

from ansys.optislang.core import Optislang
import ansys.optislang.core.node_types as node_types
from ansys.optislang.core.nodes import DesignFlow
from ansys.optislang.core.project_parametric import (
    ComparisonType,
    ConstraintCriterion,
    DistributionType,
    ObjectiveCriterion,
    StochasticParameter,
)
import ansys.speos.core as core
from ansys.speos.core import Body, Project
from ansys.speos.core.simulation import SimulationDirect
from ansys.speos.core.source import SourceSurface
from comtypes.client import CreateObject
import numpy as np

Definition of the variation analysis boundaries#

The following parameters are used to control the script execution. You can modify these parameters to suit your needs. - tolerance parameter and distributions - responses (quality measurements / performance indicator) - constraints (quality limits for each performance indicator) - the number of designs to be calculated for the robustness analysis

MAXIMUM_NUMBER_SIMULATIONS: int = 100 if (os.environ.get("ON_CI", "false").lower() != "true") else 5
"""Maximum number of simulations for the robustness analysis."""

PARAMETERS: list[dict] = [
    {
        "name": "LED_delta_x",
        "value": 0.0,
        "distribution_type": "NORMAL",
        "distribution_parameters": [0, 0.25],  # mean value, standard deviation
    },
    {
        "name": "LED_delta_y",
        "value": 0.0,
        "distribution_type": "NORMAL",
        "distribution_parameters": [0, 0.25],  # mean value, standard deviation
    },
    {
        "name": "LED_delta_z",
        "value": 0.0,
        "distribution_type": "NORMAL",
        "distribution_parameters": [0, 0.25],  # mean value, standard deviation
    },
    {
        "name": "Flux",
        "value": 200.0,
        "distribution_type": "TRUNCATEDNORMAL",
        "distribution_parameters": [
            300,
            45,
            200,
            400,
        ],  # mean value, standard deviation, lower bound, upper bound
    },
]
"""Input parameters and their statistical distributions."""

RESPONSES: list[dict] = [
    {"name": "RMS_contrast", "value": 1.0},
    {"name": "Average", "value": 120.0},
    {"name": "Number_of_rules_limited_passed", "value": 0.0},
    {"name": "Number_of_rules_failed", "value": 0.0},
]
"""Expected responses in the robustness analysis."""

CRITERIA: list[dict] = [
    {"name": "RMS_contrast", "type": "constraint", "limit": 0.2, "target": "<="},
    {"name": "Average", "type": "constraint", "limit": 160000, "target": ">="},
    {
        "name": "Number_of_rules_limited_passed",
        "type": "constraint",
        "limit": 2,
        "target": "<=",
    },
    {"name": "Number_of_rules_failed", "type": "constraint", "limit": 0, "target": "<="},
]
"""Evaluation criteria for the robustness analysis."""

VARIATION_ANALYSIS_BOUNDARIES = {
    "maximum_number_simulations": MAXIMUM_NUMBER_SIMULATIONS,
    "parameters": PARAMETERS,
    "responses": RESPONSES,
    "criteria": CRITERIA,
}
"""Variation analysis boundaries for the robustness study."""

GRAPHICS_BOOL = False  # Set to True to display the graphics


ANSYS_RELEASE = os.getenv("ANSYS_RELEASE_COMPACT", "252")
"""ANSYS release version."""
'ANSYS release version.'

Define functions#

The following functions are defined to perform various tasks in the robustness analysis workflow, such as cleaning databases, displacing faces and bodies, opening results, changing source positions and powers, running Speos simulations, and creating the optiSLang workflow.

def clean_all_dbs(speos_client: core.kernel.client.SpeosClient) -> None:
    """
    Clean the database info loaded inside a client.

    Parameters
    ----------
    speos_client: core.SpeosClient

    Returns
    -------
    None

    """
    for item in (
        speos_client.jobs().list()
        + speos_client.scenes().list()
        + speos_client.simulation_templates().list()
        + speos_client.sensor_templates().list()
        + speos_client.source_templates().list()
        + speos_client.intensity_templates().list()
        + speos_client.spectrums().list()
        + speos_client.vop_templates().list()
        + speos_client.sop_templates().list()
        + speos_client.parts().list()
        + speos_client.bodies().list()
        + speos_client.faces().list()
    ):
        item.delete()


def open_result(file) -> dict:
    """
    Open result Speos XMP result file and extract measurement results based
    on a measurement template file.

    Parameters
    ----------
    file: str
        Speos result xmp file directory

    Returns
    -------
    dict
        a dictionary with measurement results

    """
    if os.getenv("ON_CI") != "true":  # Use XMPViewer only outside CI environment
        dpf_instance = CreateObject("XMPViewer.Application")
        dpf_instance.OpenFile(file)
        temp_dir = os.getenv("TEMP")
        if not os.path.exists(temp_dir):
            os.mkdir(temp_dir)
        if "radiance" in file.lower():
            dpf_instance.ImportTemplate(
                os.path.join(
                    os.path.abspath(""), "Lightguide.speos", "DRL_Upper-only.VE-measure.xml"
                ),
                1,
                1,
                0,
            )
            export_dir = os.path.join(temp_dir, "lg_robustness_result.txt")
            dpf_instance.MeasuresExportTXT(export_dir)
            file = open(export_dir)
            content = file.readlines()
            RMS_content = content[10]
            Average_content = content[11]
            res = {
                "RMS_contrast": float(
                    RMS_content[RMS_content.find("\tValue=") + 7 : RMS_content.find(r"ValueUnit=")]
                ),
                "Average": float(
                    Average_content[
                        Average_content.find("\tValue=") + 7 : Average_content.find(r"ValueUnit=")
                    ]
                ),
            }
            return res
        else:
            dpf_instance.ImportTemplate(
                os.path.join(
                    os.path.abspath(""), "Lightguide.speos", "ECE_R87_DRL_WithoutLines.xml"
                ),
                1,
                1,
                0,
            )
            export_dir = os.path.join(temp_dir, f"lg_robustness_result.txt")
            dpf_instance.MeasuresExportTXT(export_dir)
            limited_passed_count = 0
            failed_count = 0
            passed_count = 0
            with open(export_dir) as result_file:
                for line in result_file:
                    if "RuleStatus" in line:
                        if "(specification failed)" in line:
                            limited_passed_count += 1
                        elif "(passed)" in line:
                            passed_count += 1
                        elif "(failed)" in line:
                            failed_count += 1
                        else:
                            print("Rules status is unknown.")

            res = {
                "Number_of_rules_limited_passed": limited_passed_count,
                "Number_of_rules_failed": failed_count,
            }
            return res
    else:  # Provide dummy results for CI environment
        res = {
            "RMS_contrast": 0.15,
            "Average": 250000.0,
            "Number_of_rules_limited_passed": 1,
            "Number_of_rules_failed": 0,
        }
        return res


def displace_face(edit_face, xyz=[0, 0, 0]) -> None:
    """
    Move a face in a translational direction defined by a provided vector xyz.

    Parameters
    ----------
    edit_face: ansys.speos.core.face.Face
    xyz: tuple
        A tuple with 3 elements defining a vector defined by xyz

    Returns
    -------
    None

    """
    temp_face_vertices = edit_face._face.vertices
    temp_face_vertices = np.array(temp_face_vertices).reshape((-1, 3))
    new_face_vertices = [np.add(item, xyz) for item in temp_face_vertices]
    new_face_vertices = np.array(new_face_vertices).reshape((1, -1)).tolist()[0]

    edit_face.set_vertices(new_face_vertices)
    edit_face.commit()


def displace_body(project, body_name, xyz=[1, 0, 0]) -> None:
    """
    Move a body in a translational direction defined by a provided vector xyz.

    Parameters
    ----------
    project: script.Project
    body_name: str
        geo_path of a body
    xyz: tuple
        A tuple with 3 elements defining a vector defined by xyz

    Returns
    -------
    None

    """
    edit_body = project.find(name=body_name, name_regex=True, feature_type=Body)[0]
    faces = edit_body._geom_features
    for face in faces:
        displace_face(face, xyz=xyz)


def change_surface_source_position(project, sources, source_position_dict) -> None:
    """
    change the position of surface where surface source is linked.

    Parameters
    ----------
    project: script.Project
        project in which source will be modified.
    sources: script.Source
        sources inside the project.
    source_position_dict: dict
        A dictionary within which position information is saved.

    Returns
    -------
    None

    """
    for source in sources:
        source_name = source.get(key="name")
        source_geo_path = source.get(key="geo_path")[0]
        source_linked_body = source_geo_path["geo_path"].split("/")[0]
        if source_name in source_position_dict:
            displace_body(
                project,
                "/".join([source_linked_body]),
                source_position_dict[source_name],
            )


def change_source_power(sources, source_power_dict) -> None:
    """
    Change the power of surface where surface source is linked.

    Parameters
    ----------
    sources: script.Source
        sources inside the project.
    source_power_dict: dict
        A dictionary within which source power information is saved.

    Returns
    -------
    None

    """
    for source in sources:
        source_name = source.get(key="name")
        if source_name in source_power_dict:
            source.set_flux_luminous(value=source_power_dict[source_name])
            source.commit()


def speos_simulation(hid, speos, parameters) -> dict:
    """
    Run speos simulation with given source parameters to be changed.

    Parameters
    ----------
    hid: str
        string that contains the design id
    speos: core.Speos
    parameters: dict
        dictionary includes parameters to be used for each design iteration

    Returns
    -------
    dict
        a dictionary with index of design id and value as dictionary including
        simulation results: RMS_contrast, Average luminous, Number_of_rules_failed

    """
    new_parameter_values = {p["name"]: p["value"] for p in parameters}

    clean_all_dbs(speos.client)
    script_folder = Path(__file__).resolve().parent
    speos_file = script_folder / "Lightguide.speos" / "Lightguide.speos"
    project = Project(speos=speos, path=str(speos_file))
    if hid == "0.1":
        if GRAPHICS_BOOL:
            project.preview()

    # Update of the light source power
    sources = project.find(name=".*", name_regex=True, feature_type=SourceSurface)
    new_sources_power = {
        "Surface.1:6015": new_parameter_values.get("Flux"),
        "Surface.2:30": new_parameter_values.get("Flux"),
    }

    change_source_power(sources, new_sources_power)

    # Update of the source position
    new_source_displacement = {
        "Surface.1:6015": [
            new_parameter_values.get("LED_delta_x"),
            new_parameter_values.get("LED_delta_y"),
            new_parameter_values.get("LED_delta_z"),
        ],
        "Surface.2:30": [
            new_parameter_values.get("LED_delta_x"),
            new_parameter_values.get("LED_delta_y"),
            new_parameter_values.get("LED_delta_z"),
        ],
    }
    change_surface_source_position(project, sources, new_source_displacement)
    # project.preview()

    # execution of the Speos simulation
    sim = project.find(name=".*", name_regex=True, feature_type=SimulationDirect)[0]
    sim.set_stop_condition_rays_number(2000000).commit()
    res = sim.compute_CPU()

    # Result extraction
    xmp_files = [item.path for item in res if ".xmp" in item.path]
    response = {}
    result_design = {}
    for xmp in xmp_files:
        response.update(open_result(xmp))
    result_design.update(
        hid=hid,
        responses=[
            {"name": "RMS_contrast", "value": response.get("RMS_contrast")},
            {"name": "Average", "value": response.get("Average")},
            {
                "name": "Number_of_rules_limited_passed",
                "value": response.get("Number_of_rules_limited_passed"),
            },
            {"name": "Number_of_rules_failed", "value": response.get("Number_of_rules_failed")},
        ],
    )
    result_print = str("Design " + result_design["hid"]) + ": "
    for i in range(0, len(result_design["responses"])):
        result_print += (
            str(result_design["responses"][i]["name"])
            + " = "
            + str(result_design["responses"][i]["value"])
            + " | "
        )
    print(result_print)
    return result_design


def get_design_values(osl) -> list[dict]:
    """

    Parameters
    ----------
    osl: ansys.optislang.core
        Ansys optiSLang instance

    Returns
    -------
    list of dict:
        A list of dictionaries containing the all design information for each design.
    """
    project_tree = osl.osl_server.get_full_project_tree()
    project_actors = project_tree["projects"][0]["system"]
    algorithmsystem_uid = project_actors["nodes"][0]["uid"]
    actor_info = osl.osl_server.get_actor_status_info(
        algorithmsystem_uid, 0, include_design_values=True
    )

    return actor_info

Main script execution#

From the environment variable, find the optiSLang executable for given version and initiate an optiSLang project.

def get_executable(version) -> Path:
    """Returns the optiSLang executable for given version.

    Parameters
    ----------
    version : int
        optiSLang version, e.g. 251

    Returns
    -------
    Path
        Executable path.
    """
    if os.getenv(f"AWP_ROOT{version}"):
        awp_root = os.getenv(f"AWP_ROOT{version}")
        osl_com = Path(awp_root) / "optiSLang" / "optislang.com"
    else:
        raise Exception(f"optiSLang installation not found. Please install optiSLang v{version}.")
    return osl_com


osl_executable = get_executable(ANSYS_RELEASE)
my_osl = Optislang(
    executable=osl_executable,
    ini_timeout=60,
    # loglevel="DEBUG"
)
print(f"Using optiSLang version {my_osl.osl_version_string}")
Using optiSLang version 25.2.0 (1718)

Create optiSLang workflow#

Create robustness workflow and define criteria definition

def create_workflow(osl, user_input_json):
    """
    This function creates the robustness workflow
    with a proxy node and register parameters and responses.

    Parameters
    ----------
    osl: ansys.optislang.core
        Ansys optiSLang instance
    user_input_json: dict
        dictionary with optimization boundaries as json

    Returns
    -------
    ansys.optislang.core.nodes
        optiSLang Optimization system and proxy solver node.
    """
    # create system
    root_system = osl.application.project.root_system
    robustness_system = root_system.create_node(type_=node_types.Robustness, name="Robustness")

    # create node
    proxy_solver_node = robustness_system.create_node(
        type_=node_types.ProxySolver, name="Proxy Solver", design_flow=DesignFlow.RECEIVE_SEND
    )
    mop_node = root_system.create_node(type_=node_types.Mop, name="MOP")

    # connect system with MOP node
    robustness_system_out = robustness_system.get_output_slots(name="OMDBPath")[0]
    mop_node_in = mop_node.get_input_slots(name="IMDBPath")[0]
    robustness_system_out.connect_to(mop_node_in)

    # register parameter and responses
    proxy_solver_node.load(user_input_json)
    proxy_solver_node.register_locations_as_parameter()
    proxy_solver_node.register_locations_as_response()

    # adjust parameter ranges
    for parameter in user_input_json.get("parameters"):
        robustness_system.parameter_manager.modify_parameter(
            StochasticParameter(
                name=parameter.get("name"),
                reference_value=parameter.get("value"),
                distribution_type=getattr(DistributionType, parameter.get("distribution_type")),
                distribution_parameters=parameter.get("distribution_parameters"),
            )
        )

    # robustness system settings
    robustness_settings = robustness_system.get_property("AlgorithmSettings")
    robustness_settings["num_discretization"] = user_input_json.get("maximum_number_simulations")
    robustness_system.set_property("AlgorithmSettings", robustness_settings)

    # proxy node settings
    multi_design_launch_num = 30  # set -1 to solve all designs simultaneously
    proxy_solver_node.set_property("MultiDesignLaunchNum", multi_design_launch_num)
    proxy_solver_node.set_property("ForwardHPCLicenseContextEnvironment", True)

    return robustness_system, proxy_solver_node


def criteria_definition(system, load_json) -> None:
    """
    This functions defines the robustness criteria in optiSLang based on user input
    Parameters
    ----------
    system: ansys.optislang.core.nodes
        optiSLang Optimization system
    load_json: dict
        dictionary with optimization boundaries as json

    Returns
    -------
    None

    """
    for crit in load_json.get("criteria"):
        crit_name = crit.get("name")
        if crit.get("type") == "objective":
            if crit.get("target") == "MIN":
                comparison_type_obj = ComparisonType.MIN
            elif crit.get("target") == "MAX":
                comparison_type_obj = ComparisonType.MAX
            else:
                raise Exception("objective type not defined")

            system.criteria_manager.add_criterion(
                ObjectiveCriterion(
                    name=f"obj_{crit_name}", expression=crit_name, criterion=comparison_type_obj
                )
            )

        if crit.get("type") == "constraint":
            if crit.get("target") == "<=":
                comparison_type_constr = ComparisonType.LESSEQUAL
            elif crit.get("target") == ">=":
                comparison_type_constr = ComparisonType.GREATEREQUAL
            else:
                raise Exception("Constraint type not well defined")
            system.criteria_manager.add_criterion(
                ConstraintCriterion(
                    name=f"constr_{crit_name}",
                    expression=crit_name,
                    criterion=comparison_type_constr,
                    limit_expression=str(crit.get("limit")),
                )
            )


parametric_system, proxy_node = create_workflow(my_osl, VARIATION_ANALYSIS_BOUNDARIES)
criteria_definition(parametric_system, VARIATION_ANALYSIS_BOUNDARIES)

# Save project
temp_dir = Path(os.getenv("TEMP"))
project_name = "_proxy_solver_workflow.opf"
# my_osl.application.save_as(temp_dir / project_name)  # uncomment to save the project

# optiSLang project execution
my_osl.application.project.start(wait_for_finished=False)
print("Variation Analysis: started.")
Variation Analysis: started.

Execute Robustness analysis#

loop until get_status() and returns “Processing done” for the root system

def calculate(designs) -> list:
    """
    This function evaluated the optical simulation outputs based on the given designs parameters.

    Parameters
    ----------
    designs: list
        A list of designs to be evaluated. For each design the parameter values are defined

    Returns
    -------
    list
        A list of evaluated designs with the corresponding outputs.
    """
    # create speos instance
    from ansys.speos.core import launcher

    speos = launcher.launch_local_speos_rpc_server(version=ANSYS_RELEASE)

    # run speos simulation
    result_design_list = []
    for design in designs:
        hid = design["hid"]
        parameters = design["parameters"]

        result_design = speos_simulation(hid, speos, parameters)
        result_design_list.append(result_design)

    # close speos instance
    speos.client.close()
    return result_design_list


while not my_osl.project.root_system.get_status() == "Processing done":
    design_list = proxy_node.get_designs()
    if len(design_list):
        responses_dict = calculate(design_list)
        proxy_node.set_designs(responses_dict)
    time.sleep(1)

# Run MOP node
my_osl.application.project.start(wait_for_finished=True)
print("Variation Analysis: Done!")
wf so 01 light guide robustness study
Design 0.1: RMS_contrast = 0.15 | Average = 250000.0 | Number_of_rules_limited_passed = 1 | Number_of_rules_failed = 0 |
Design 0.2: RMS_contrast = 0.15 | Average = 250000.0 | Number_of_rules_limited_passed = 1 | Number_of_rules_failed = 0 |
Design 0.3: RMS_contrast = 0.15 | Average = 250000.0 | Number_of_rules_limited_passed = 1 | Number_of_rules_failed = 0 |
Design 0.4: RMS_contrast = 0.15 | Average = 250000.0 | Number_of_rules_limited_passed = 1 | Number_of_rules_failed = 0 |
Design 0.5: RMS_contrast = 0.15 | Average = 250000.0 | Number_of_rules_limited_passed = 1 | Number_of_rules_failed = 0 |
Variation Analysis: Done!

Get quality values#

Get the robustness result and display in the console.

def get_design_quality_values(osl):
    """
    This function calculates the probability of failure for each response.
    osl: ansys.optislang.core
        Ansys optiSLang instance

    Returns
    -------
    real
        The probability of failure for the response as well as the number of feasible designs.
    """
    list_design_information = get_design_values(osl)

    rms_contrast_count_failed = 0
    average_count_failed = 0
    rules_specification_count_failed = 0
    rules_count_failed = 0
    feasible_designs = 0

    for design_status in list_design_information.get("design_status"):
        if design_status.get("feasible"):
            feasible_designs += 1

    response_names = list_design_information.get("designs").get("response_names")
    designs = list_design_information.get("designs").get("values")

    for design_values in designs:
        response_values = design_values.get("response_values")
        responses = {name: value for name, value in zip(response_names, response_values)}

        if responses.get("RMS_contrast") > 0.2:
            rms_contrast_count_failed += 1
        if responses.get("Average") < 160000.0:
            average_count_failed += 1
        if responses.get("Number_of_rules_limited_passed") > 2.0:
            rules_specification_count_failed += 1
        if responses.get("Number_of_rules_failed") > 0.0:
            rules_count_failed += 1

    return (
        rms_contrast_count_failed / len(designs) * 100,
        average_count_failed / len(designs) * 100,
        rules_specification_count_failed / len(designs) * 100,
        rules_count_failed / len(designs) * 100,
        feasible_designs,
    )


(
    rms_contrast_fail_prob,
    average_fail_prob,
    specification_rules_fail_prob,
    national_rules_fail_prob,
    num_feasible_designs,
) = get_design_quality_values(my_osl)

print(f'{"*" * 25} SUMMARY {"*" * 25}')
print(f"Probability of failure for RMS contrast: {round(rms_contrast_fail_prob, 1)} %")
print(f"Probability of failure for Average: {round(average_fail_prob, 1)} %")
print(
    f"Probability of failure for fulfillment national rules: {round(national_rules_fail_prob, 1)} %"
)
print(
    "Probability of failure for fulfillment specification rules: "
    f"{round(specification_rules_fail_prob, 1)} %\n"
)
print(
    f"{num_feasible_designs} out of {VARIATION_ANALYSIS_BOUNDARIES['maximum_number_simulations']} "
    "designs are feasible."
)
print("*" * 50)
************************* SUMMARY *************************
Probability of failure for RMS contrast: 0.0 %
Probability of failure for Average: 0.0 %
Probability of failure for fulfillment national rules: 0.0 %
Probability of failure for fulfillment specification rules: 0.0 %

5 out of 5 designs are feasible.
**************************************************

Close optiSLang#

my_osl.dispose()
print("OSL Project finished.")
OSL Project finished.

Total running time of the script: (32 minutes 10.678 seconds)