# -*- coding: utf-8
"""Module for fluid property diagram creation.
This file is part of project fluprodia (github.com/fwitte/fluprodia). It's
copyrighted by the contributors recorded in the version control history of the
file, available from its original location
src/fluprodia/fluid_property_diagram.py
SPDX-License-Identifier: MIT
"""
import json
import os
import warnings
import CoolProp as CP
import numpy as np
from . import __version__
from ._units import Units
from ._utils import _beautiful_unit_string
from ._utils import _hampel_filter
from ._utils import _isolines_log
from ._utils import _linear_range
from ._utils import _log_range
# Mapping from matplotlib linestyle strings to plotly dash strings
_MPL_DASH_MAP = {
'-': 'solid', 'solid': 'solid',
'--': 'dash', 'dashed': 'dash',
'-.': 'dashdot', 'dashdot': 'dashdot',
':': 'dot', 'dotted': 'dot',
}
def _mpl_style_to_plotly_line(style):
"""Convert a matplotlib line style dict to a plotly line dict."""
line = {}
if 'color' in style:
line['color'] = style['color']
if 'linewidth' in style:
line['width'] = style['linewidth']
if 'linestyle' in style:
line['dash'] = _MPL_DASH_MAP.get(style['linestyle'], 'solid')
return line
[docs]
class FluidPropertyDiagram:
u"""Short summary.
Parameters
----------
fluid : str
Fluid for diagram.
width : float
Width of all diagrams (default value: :code:`width=16.0`).
height : float
Height of all diagrams (default value: :code:`height=10.0`).
Example
-------
This is a small example of how to create a fluid property dataset for water
and export to Ts-, hs- and logph-diagram.
>>> from fluprodia import FluidPropertyDiagram
>>> import matplotlib.pyplot as plt
>>> import numpy as np
>>> diagram = FluidPropertyDiagram('water')
After object creation it is possible to specify isolines. There are default
isolines available, but these might not suit your requirements. We will
define temperature and enthalpy isolines for this case. Before that, we
need to specify the units if we do not want to use SI units. For available
units see the
:py:meth:`fluprodia.fluid_property_diagram.FluidPropertyDiagram.set_unit_system`
documentation.
>>> diagram.set_unit_system(T='°C', p='MPa', s='kJ/kgK', h='kJ/kg')
>>> iso_T = np.arange(50, 701, 50)
>>> iso_h = np.arange(0, 3601, 200)
>>> diagram.set_isolines(T=iso_T, h=iso_h)
Now we can calculate the diagram data and create the diagram, the data
should be plotted to. If you want to plot additional data to your diagram,
first call the `draw_isolines` method, and then plot your data.
>>> diagram.calc_isolines()
After that it is possible to specify the view range of the plot and draw
the isolines of a specific type of diagram. Last step is to export your
diagram. Any file format supported by matplotlib is possible.
>>> fig, ax = plt.subplots(1)
>>> diagram.draw_isolines(diagram_type='Ts', fig=fig, ax=ax, x_min=0, x_max=8, y_min=0, y_max=700)
>>> plt.tight_layout()
>>> fig.savefig('Ts_Diagramm.pdf')
If we want to create a different diagram, e.g. hs-diagram, it is not
necessary to recalculate the isolines. Instead, create a new figure and draw
the isolines for a different diagram.
>>> fig, ax = plt.subplots(1, figsize=(8, 5))
>>> diagram.draw_isolines(diagram_type='hs', fig=fig, ax=ax, x_min=0, x_max=8, y_min=0, y_max=3600)
>>> plt.tight_layout()
>>> fig.savefig('hs_Diagramm.pdf')
>>> fig, ax = plt.subplots(1, figsize=(8, 5))
>>> diagram.draw_isolines(diagram_type='logph', fig=fig, ax=ax, x_min=0, x_max=3600, y_min=1e-2, y_max=5e2)
>>> plt.tight_layout()
>>> fig.savefig('logph_Diagramm.pdf')
It is also possible to specify/modify the isolines to plot. For example,
the lines of constant specific enthalpy should be plotted in red color
and with a linewidth of 2. Also, the lines of constant specific volumen
should not be plotted at all. For more information see the
:py:meth:`fluprodia.fluid_property_diagram.FluidPropertyDiagram.draw_isolines`
method.
>>> fig, ax = plt.subplots(1)
>>> diagram.draw_isolines(
... diagram_type='Ts', fig=fig, ax=ax,
... isoline_data={
... 'h': {
... 'values': iso_h,
... 'style': {'linewidth': 2, 'color': '#ff0000'}
... }, 'vol': {'values': np.array([])}},
... x_min=0, x_max=8, y_min=0, y_max=700
... )
>>> plt.tight_layout()
>>> fig.savefig('Ts_Diagramm.pdf')
It is also possible to export the data of a diagram:
>>> diagram.to_json("./tmp/water.json")
And, if you have your data ready to use, you can import the data again like
this:
>>> diagram = FluidPropertyDiagram.from_json("tmp/water.json")
>>> diagram.fluid
'water'
>>> len(diagram.pressure["isolines"])
20
"""
def __init__(self, fluid, backend=None):
u"""Create a FluidPropertyDiagram object.
Parameters
----------
fluid : str
Fluid for diagram.
backend : str
CoolProp backend to use, e.g. HEOS or REFPROP, by default HEOS.
"""
if backend is None:
backend = "HEOS"
self.backend = backend
self.fluid = fluid
self.fluid_string = f"{self.backend}::{self.fluid}"
self.state = CP.AbstractState(backend, self.fluid)
self.units = Units()
self.properties = {
'p': 'pressure',
'vol': 'volume',
'T': 'temperature',
'h': 'enthalpy',
's': 'entropy',
'Q': 'quality'
}
self.supported_diagrams = {
'Ts': {
'x_property': 's',
'y_property': 'T',
'x_scale': 'linear',
'y_scale': 'linear'
},
'hs': {
'x_property': 's',
'y_property': 'h',
'x_scale': 'linear',
'y_scale': 'linear'
},
'logph': {
'x_property': 'h',
'y_property': 'p',
'x_scale': 'linear',
'y_scale': 'log'
},
'Th': {
'x_property': 'h',
'y_property': 'T',
'x_scale': 'linear',
'y_scale': 'linear'
},
'plogv': {
'x_property': 'vol',
'y_property': 'p',
'x_scale': 'log',
'y_scale': 'linear'
}
}
self._setup_functions_and_inputs()
self._setup_datastructures()
self._setup_isoline_defaults()
self._setup_default_line_layout()
self._setup_default_label_positioning()
@classmethod
def _is_legacy_json(cls, data):
"""Return True if *data* was exported by fluprodia 3.x."""
version_str = data.get("META", {}).get("fluprodia-version", "")
try:
major = int(version_str.split(".")[0])
return major == 3
except (ValueError, IndexError):
return False
@classmethod
def _migrate_legacy_json(cls, data):
"""Convert the pre-v4 flat format to the current nested format.
Old format: ``{ "pressure": { "1000000.0": { "h": [...], "v": [...], ... } } }``
New format: ``{ "pressure": { "isolines": {0: 1e6, ...}, "isoline_data": {0: { "vol": [...], ... }} } }``
"""
migrated = {}
for prop_name, prop_data in data.items():
if prop_name == "META":
continue
isolines = {}
isoline_data = {}
for i, (value_str, datapoints) in enumerate(prop_data.items()):
isolines[i] = float(value_str)
isoline_data[i] = {
("vol" if k == "v" else k): v
for k, v in datapoints.items()
}
migrated[prop_name] = {"isolines": isolines, "isoline_data": isoline_data}
meta = data.get("META", {}).copy()
# rename 'v' -> 'vol' in the units dict so Units.from_json doesn't warn
if "v" in meta.get("units", {}):
meta["units"]["vol"] = meta["units"].pop("v")
migrated["META"] = meta
return migrated
[docs]
@classmethod
def from_json(cls, path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if cls._is_legacy_json(data):
version = data.get("META", {}).get("fluprodia-version", "unknown")
warnings.warn(
f"The JSON file was exported by an older version of fluprodia "
f"(detected version: {version}) and has been migrated "
f"automatically. Re-export with the current version to avoid "
f"this warning.",
UserWarning,
stacklevel=2,
)
data = cls._migrate_legacy_json(data)
metadata = data["META"].copy()
del data["META"]
instance = cls(metadata["fluid"], metadata.get("backend"))
instance.units = Units.from_json(metadata["units"])
for key, value in data.items():
isoprop = getattr(instance, key)
isoprop["isolines"] = {
int(key): value for key, value in value["isolines"].items()
}
isoprop["isoline_data"] = {
int(key): {
subprop: np.array(datapoints)
for subprop, datapoints in isoline_data.items()
}
for key, isoline_data in value["isoline_data"].items()
}
return instance
def _setup_functions_and_inputs(self):
"""Setup lookup tables for isoline functions and CoolProp inputs."""
self.CoolProp_inputs = {
'p': CP.iP,
'vol': CP.iDmass,
'T': CP.iT,
'h': CP.iHmass,
's': CP.iSmass,
'Q': CP.iQ
}
self.CoolProp_results = {
'p': self.state.p,
'vol': self.state.rhomass,
'T': self.state.T,
'h': self.state.hmass,
's': self.state.smass,
'Q': self.state.Q
}
def _setup_datastructures(self):
"""Set up datastructures for all isolines."""
self.pressure = {
'isolines': {},
'isoline_data': {}
}
self.entropy = {
'isolines': {},
'isoline_data': {}
}
self.temperature = {
'isolines': {},
'isoline_data': {}
}
self.enthalpy = {
'isolines': {},
'isoline_data': {}
}
self.volume = {
'isolines': {},
'isoline_data': {}
}
self.quality = {
'isolines': {},
'isoline_data': {}
}
def _setup_default_line_layout(self):
"""Definition of the default isoline layout."""
self.pressure['style'] = {
'linestyle': '-.',
'color': '#363636',
'linewidth': 0.5
}
self.volume['style'] = {
'linestyle': 'dotted',
'color': '#363636',
'linewidth': 0.5
}
self.quality['style'] = {
'linestyle': '-',
'color': '#363636',
'linewidth': 0.5
}
self.entropy['style'] = {
'linestyle': 'solid',
'color': '#d1d1d1',
'linewidth': 0.5
}
self.enthalpy['style'] = {
'linestyle': '--',
'color': '#363636',
'linewidth': 0.5
}
self.temperature['style'] = {
'linestyle': '--',
'color': '#363636',
'linewidth': 0.5
}
def _setup_default_label_positioning(self):
"""Definition of the default label positioning."""
self.pressure['label_position'] = 0.85
self.volume['label_position'] = 0.7
self.quality['label_position'] = 0.225
self.entropy['label_position'] = 0.95
self.enthalpy['label_position'] = 0.85
self.temperature['label_position'] = 0.95
[docs]
def set_isolines(self, **kwargs):
"""Set the isolines.
Parameters
----------
p : ndarray
Isolines for pressure.
T : ndarray
Isolines for temperature.
Q : ndarray
Isolines for vapor mass fraction.
s : ndarray
Isolines for specific entropy.
h : ndarray
Isolines for specific enthalpy.
vol : ndarray
Isolines for specific volume.
"""
keys = ['p', 'T', 'Q', 's', 'h', 'vol']
if 'v' in kwargs:
warnings.warn(
"The key 'v' for specific volume is deprecated and will be "
"removed in a future release. Use 'vol' instead.",
FutureWarning
)
kwargs['vol'] = kwargs.pop('v')
for key in kwargs:
if key in keys:
obj = getattr(self, self.properties[key])
isoline_values = self.convert_to_SI(kwargs[key], key).round(8)
obj['isolines'] = {
i: value for i, value in enumerate(isoline_values)
}
else:
msg = (
f'The specified isoline \'{key}\' is not available. '
f'Select from: {", ".join(keys)}.'
)
raise ValueError(msg)
def _setup_isoline_defaults(self):
"""Calculate the default values for the isolines."""
self.p_trip = self.state.trivial_keyed_output(CP.iP_triple)
self.p_min = self.p_trip
self.p_max = self.state.trivial_keyed_output(CP.iP_max)
self.T_trip = self.state.trivial_keyed_output(CP.iT_triple)
self.T_min = self.T_trip
self.T_max = self.state.trivial_keyed_output(CP.iT_max)
self.p_crit = self.state.trivial_keyed_output(CP.iP_critical)
self.T_crit = self.state.trivial_keyed_output(CP.iT_critical)
self.v_crit = 1 / self.state.trivial_keyed_output(CP.irhomass_critical)
self.state.update(CP.PQ_INPUTS, (self.p_crit + self.p_min) / 2, 1)
self.v_intermediate = 1 / self.state.rhomass()
self.state.update(CP.PT_INPUTS, self.p_min, self.T_max)
self.v_max = 1 / self.state.rhomass()
self.s_max = self.state.smass()
self.h_max = self.state.hmass()
self.state.update(CP.QT_INPUTS, 0, self.T_min + 1)
self.s_min = self.state.smass()
self.h_min = self.state.hmass()
self.state.specify_phase(CP.iphase_liquid)
p = self.p_crit
while True:
try:
self.state.update(CP.PT_INPUTS, p, self.T_min + 1)
break
except ValueError:
p *= 0.999
self.v_min = 1 / self.state.rhomass()
self.state.unspecify_phase()
isovalues = np.linspace(0, 1, 11).round(8)
self.quality['isolines'] = {
i: value for i, value in enumerate(isovalues)
}
step = round(int(self.T_max - self.T_min) / 15, -1)
isovalues = np.append(
self.T_min,
np.arange(self.T_max, self.T_min, -step)[::-1]
).round(8)
self.temperature['isolines'] = {
i: value for i, value in enumerate(isovalues)
}
step = round(int(self.s_max - self.s_min) / 15, -1)
isovalues = np.arange(self.s_min, self.s_max, step).round(8)
self.entropy['isolines'] = {
i: value for i, value in enumerate(isovalues)
}
step = round(int(self.h_max - self.h_min) / 15, -1)
isovalues = np.arange(0, self.h_max, step).round(8)
self.enthalpy['isolines'] = {
i: value for i, value in enumerate(isovalues)
}
isovalues = _isolines_log(self.p_min + 1e-2, self.p_max).round(8)
self.pressure['isolines'] = {
i: value for i, value in enumerate(isovalues)
}
isovalues = _isolines_log(self.v_min, self.v_max).round(8)
self.volume['isolines'] = {
i: value for i, value in enumerate(isovalues)
}
[docs]
def clear_isolines(self):
self.temperature["isolines"] = {}
self.pressure["isolines"] = {}
self.volume["isolines"] = {}
self.entropy["isolines"] = {}
self.enthalpy["isolines"] = {}
self.quality["isolines"] = {}
self.temperature["isoline_data"] = {}
self.pressure["isoline_data"] = {}
self.volume["isoline_data"] = {}
self.entropy["isoline_data"] = {}
self.enthalpy["isoline_data"] = {}
self.quality["isoline_data"] = {}
[docs]
def set_isolines_subcritical(self, T_min, T_max):
self.clear_isolines()
T_crit = self.convert_from_SI(self.T_crit, "T")
if T_min > T_crit:
msg = (
f"{T_min = } cannot be higher than critical point "
f"temperature {T_crit}."
)
raise ValueError(msg)
isovalues = self.convert_to_SI(_linear_range(T_min, T_max), "T")
self.temperature["isolines"] = {
i: value for i, value in enumerate(isovalues)
}
self.T_min = min(self.temperature["isolines"].values())
self.T_max = max(self.temperature["isolines"].values())
p_min = CP.CoolProp.PropsSI("P", "Q", 1, "T", self.T_min, self.fluid_string)
p_max = self.p_crit * 1.1
isovalues = _log_range(p_min, p_max)
self.pressure["isolines"] = {
i: value for i, value in enumerate(isovalues)
}
self.p_min = min(self.pressure["isolines"].values())
self.p_max = max(self.pressure["isolines"].values())
# this is required to include the (potentially) lower pressure compared
# to the minimum temperature. The quality isolines are calculated over
# temperature
self.T_min = CP.CoolProp.PropsSI("T", "P", self.p_min, "Q", 1, self.fluid_string)
v_min = 1 / CP.CoolProp.PropsSI("D", "T", self.T_min, "P", self.p_max, self.fluid_string)
v_max = 1 / CP.CoolProp.PropsSI("D", "T", self.T_max, "P", self.p_min, self.fluid_string)
self.v_min = v_min
self.v_max = v_max
isovalues = _log_range(v_min, v_max)
self.volume["isolines"] = {
i: value for i, value in enumerate(isovalues)
}
s_min = CP.CoolProp.PropsSI("S", "T", self.T_min, "P", self.p_max, self.fluid_string)
s_max = CP.CoolProp.PropsSI("S", "T", self.T_max, "P", self.p_min, self.fluid_string)
h_min = CP.CoolProp.PropsSI("H", "T", self.T_min, "P", self.p_max, self.fluid_string)
h_max = CP.CoolProp.PropsSI("H", "T", self.T_max, "P", self.p_min, self.fluid_string)
isovalues = _linear_range(s_min, s_max)
self.entropy["isolines"] = {
i: value for i, value in enumerate(isovalues)
}
isovalues = _linear_range(h_min, h_max)
self.enthalpy["isolines"] = {
i: value for i, value in enumerate(isovalues)
}
isovalues = np.linspace(0, 1, 11)
self.quality["isolines"] = {
i: value for i, value in enumerate(isovalues)
}
def _get_state_result_by_name(self, property_name):
if property_name == "vol":
return 1 / self.CoolProp_results[property_name]()
else:
return self.CoolProp_results[property_name]()
def _update_state(self, input_dict):
output_dict = {}
for property_name, value in input_dict.items():
if property_name == "vol":
output_dict["vol"] = 1 / value
else:
output_dict[property_name] = value
args = [
arg for property_name, value in output_dict.items()
for arg in [self.CoolProp_inputs[property_name], value]
]
self.state.update(*CP.CoolProp.generate_update_pair(*args))
[docs]
def set_unit_system(self, units=None, **kwargs):
u"""Set the unit system for the fluid properties.
Delegates to :meth:`fluprodia._units.Units.set_defaults`. Any
pint-compatible unit string that is dimensionally consistent with the
property is accepted.
Both short-form keys (``p``, ``T``, ``vol``, ``h``, ``s``, ``Q``) and
long-form keys (``pressure``, ``temperature``, ``volume``,
``enthalpy``, ``entropy``, ``quality``) are supported.
A :class:`tespy.tools.units.Units` instance (or any object with a
``default`` attribute that is a plain ``dict``) may be passed as the
first positional argument. Keyword arguments are merged on top and
take precedence. tespy properties that have no fluprodia equivalent
(e.g. ``mass_flow``, ``power``) are silently ignored.
Parameters
----------
units : object, optional
An object with a ``default`` dict, e.g. ``network.units`` from a
tespy simulation. Its entries are used as base values; any
``**kwargs`` override them.
p / pressure : str
Unit of pressure, e.g. ``'Pa'``, ``'bar'``, ``'MPa'``.
T / temperature : str
Unit of temperature, e.g. ``'K'``, ``'°C'``, ``'°F'``.
s / entropy : str
Unit of specific entropy, e.g. ``'J/kgK'``, ``'kJ/kgK'``.
h / enthalpy : str
Unit of specific enthalpy, e.g. ``'J/kg'``, ``'kJ/kg'``.
vol / volume / specific_volume : str
Unit of specific volume, e.g. ``'m^3/kg'``, ``'l/kg'``.
Q / quality : str
Unit of vapour quality, e.g. ``'-'``, ``'%'``.
"""
if units is not None:
if not hasattr(units, 'default') or not isinstance(units.default, dict):
raise TypeError(
f"'units' must be an object with a 'default' dict "
f"(e.g. a tespy Units instance), got {type(units).__name__}."
)
units = {
key: value
for key, value in units.default.items()
if key is not None
}
merged = {**units, **kwargs}
else:
merged = kwargs
self.units.set_defaults(**merged)
def _draw_isoline_label(self, fig, ax, isoline, property, idx, x, y, x_min, x_max, y_min, y_max, latex_units):
"""Draw a label for an isoline.
Parameters
----------
isoline : float
Value of the isoline.
property : str
Fluid property of the isoline.
idx : float
Index in the array holding the isoline data, where the label
should be plotted.
x : ndarray
x-values of the isoline.s
y : ndarray
y-values of the isoline.s
"""
fig_size = fig.get_size_inches()
pos = ax.get_position()
width, height = (pos.width * fig_size[0], pos.height * fig_size[1])
if (idx > len(x) or
idx == 0 or
x[idx] > x_max or x[idx] < x_min or
y[idx] > y_max or y[idx] < y_min or
x[idx - 1] > x_max or x[idx - 1] < x_min or
y[idx - 1] > y_max or y[idx - 1] < y_min):
return
idx -= 1
if x[idx] - x[idx - 1] == 0:
if y[idx] > y[idx - 1]:
alpha = 90
else:
alpha = -90
elif y[idx] - y[idx - 1] == 0:
alpha = 0
else:
if ax.get_xscale() == 'log':
x_scaled = (
(np.log(x[idx]) - np.log(x[idx - 1]))
* (width / (np.log(x_max) - np.log(x_min)))
)
else:
x_scaled = (x[idx] - x[idx - 1]) * (width / (x_max - x_min))
if ax.get_yscale() == 'log':
y_scaled = (
(np.log(y[idx]) - np.log(y[idx - 1]))
* (height / (np.log(y_max) - np.log(y_min)))
)
else:
y_scaled = (y[idx] - y[idx - 1]) * (height / (y_max - y_min))
alpha = np.arctan(y_scaled / x_scaled) / (2 * np.pi) * 360
if latex_units:
unit = _beautiful_unit_string(self.units[property])
else:
unit = self.units[property]
txt = f'{isoline} {unit}'
text_bg_color = ax.get_facecolor()
ax.text(
x[idx], y[idx], txt, fontsize=5,
rotation=alpha, va='center', ha='center',
bbox=dict(facecolor=text_bg_color, edgecolor=text_bg_color, pad=0.0)
)
[docs]
def calc_isolines(self):
"""Calculate all isolines."""
self._isobaric()
self._isochoric()
self._isoquality()
self._isenthalpic()
self._isothermal()
self._isentropic()
def _isobaric(self):
"""Calculate an isoline of constant pressure."""
isolines = self.pressure['isolines']
for key, p in isolines.items():
iterators = [
("vol", np.geomspace(self.v_min, self.v_intermediate, 100, endpoint=False)),
("vol", np.geomspace(self.v_intermediate, self.v_max, 100))
]
if p <= self.p_crit:
try:
T_sat = CP.CoolProp.PropsSI("T", "P", p, "Q", 0, self.fluid_string)
iterators = [
("T", np.geomspace(self.T_min, T_sat * 0.999, 120)),
# start in liquid and end in gas
("Q", np.linspace(0, 1, 11)),
("T", np.geomspace(T_sat * 1.001, self.T_max, 69))
]
except ValueError:
pass
self.pressure["isoline_data"][key] = self._single_isoline(iterators, "p", p)
def _isochoric(self):
"""Calculate an isoline of constant specific volume."""
isolines = self.volume['isolines']
for key, v in isolines.items():
iterators = [
('p', np.append(
np.geomspace(self.p_min, self.p_crit * 0.8, 100, endpoint=False),
np.geomspace(self.p_crit * 0.8, self.p_max, 100)
))
]
try:
if v > self.v_crit:
p_sat = CP.CoolProp.PropsSI("P", "D", 1 / v, "Q", 1, self.fluid_string)
else:
p_sat = CP.CoolProp.PropsSI("P", "D", 1 / v, "Q", 0, self.fluid_string)
iterators = [
('p', np.append(
np.geomspace(self.p_min, p_sat, 100, endpoint=False),
np.geomspace(p_sat, self.p_max, 100)
))
]
except ValueError:
pass
datapoints = self._single_isoline(iterators, "vol", v)
mask_nan = np.isnan(datapoints["T"])
for prop in datapoints:
datapoints[prop] = datapoints[prop][~mask_nan]
mask = (datapoints["T"] < self.T_crit * 2)
ydata = datapoints["T"][mask]
if len(ydata) > 2:
_, outlier_mask = _hampel_filter(ydata, window_size=15, n_sigmas=1.5)
remove_outliers_mask = ydata < self.T_crit
# inser the new masks into the original one
mask[mask] = outlier_mask & remove_outliers_mask
# remove incorrect values
for prop in datapoints:
datapoints[prop] = datapoints[prop][~mask]
if not mask.any():
# remove values with decreasing temperature
mask = np.append([False], np.diff(datapoints["T"]) < 0)
if len(mask) == 0:
continue
for prop in datapoints:
datapoints[prop] = datapoints[prop][~mask]
self.volume["isoline_data"][key] = datapoints
def _isothermal(self):
"""Calculate an isoline of constant temperature."""
isolines = self.temperature['isolines']
for key, T in isolines.items():
iterators = [
("p", np.geomspace(self.p_min, self.p_max, 200)),
]
if T <= self.T_crit and T >= self.T_trip:
self.state.update(CP.QT_INPUTS, 0, T)
p_sat = self.state.p()
if self.p_min < p_sat * 0.999:
iterators = [
("p", np.geomspace(self.p_min, p_sat * 0.999, 100)),
# start in gas and end in liquid
("Q", np.linspace(1, 0, 31)),
("p", np.geomspace(p_sat * 1.001, self.p_max, 69))
]
elif self.p_min < p_sat * 1.01:
iterators = [
# start in gas and end in liquid
("Q", np.linspace(1, 0, 31)),
("p", np.geomspace(p_sat * 1.001, self.p_max, 169))
]
elif T <= self.T_crit * 1.2 and T >= self.T_trip:
self.state.update(CP.PT_INPUTS, self.p_crit * 0.7, T)
s_start = self.state.smass()
self.state.update(CP.PT_INPUTS, self.p_crit * 1.2, T)
s_end = self.state.smass()
iterators = [
("p", np.geomspace(self.p_min, self.p_crit * 0.7, 80, endpoint=False)),
("s", np.linspace(s_start, s_end, 40, endpoint=False)),
("p", np.geomspace(self.p_crit * 1.2, self.p_max, 80)),
]
self.temperature["isoline_data"][key] = self._single_isoline(iterators, "T", T)
def _isoquality(self):
"""Calculate an isoline of constant vapor mass fraction."""
isolines = self.quality['isolines']
if self.p_min < self.p_crit:
iterators = [
("T", np.append(
np.linspace(self.T_min, self.T_crit * 0.97, 40, endpoint=False),
np.linspace(self.T_crit * 0.97, self.T_crit, 40)
))
]
else:
iterators = []
for key, Q in isolines.items():
self.quality["isoline_data"][key] = self._single_isoline(iterators, "Q", Q)
def _isenthalpic(self):
"""Calculate an isoline of constant specific enthalpy."""
isolines = self.enthalpy['isolines']
iterators = [
("vol", np.geomspace(self.v_min, self.v_intermediate, 100, endpoint=False)),
("vol", np.geomspace(self.v_intermediate, self.v_max, 100))
]
for key, h in isolines.items():
self.enthalpy["isoline_data"][key] = self._single_isoline(iterators, "h", h)
def _isentropic(self):
"""Calculate an isoline of constant specific entropy."""
isolines = self.entropy['isolines']
iterators = [
('p', np.append(
np.geomspace(self.p_min, self.p_crit * 0.8, 100, endpoint=False),
np.geomspace(self.p_crit * 0.8, self.p_max, 100)
))
]
for key, s in isolines.items():
self.entropy["isoline_data"][key] = self._single_isoline(iterators, 's', s)
[docs]
def to_json(self, path):
"""Export the diagram data as json file to a path.
Parameters
----------
path : str, path-like
Name of the file to export the data to.
"""
data = {}
for prop in self.properties.values():
data[prop] = {
"isolines": getattr(self, prop)["isolines"],
"isoline_data": {
key: {
subprop: list(datapoints)
for subprop, datapoints in isoline_data.items()
}
for key, isoline_data
in getattr(self, prop)["isoline_data"].items()
}
}
data["META"] = {
"fluid": self.fluid,
"backend": self.backend,
"units": self.units._serialize(),
"CoolProp-version": CP.__version__,
"fluprodia-version": __version__
}
directory = os.path.dirname(path)
if directory != "" and not os.path.exists(directory):
os.makedirs(directory)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
[docs]
def calc_individual_isoline(
self, isoline_property=None,
isoline_value=None,
isoline_value_end=None,
starting_point_property=None, ending_point_property=None,
starting_point_value=None, ending_point_value=None):
"""Return data points of an individual isoline within.
Pass the isoline type, its value in the diagrams unit system, as well
as the start and the endpoint of the line. Styling can be changed using
the line_style property.
Parameters
----------
isoline_property : str
Type of the isoline. Choose from :code:`line_type='...'`:
- pressure (:code:`'p'`)
- specific volume (:code:`'vol'`)
- temperature (:code:`'T'`)
- enthalpy (:code:`'h'`)
- entropy (:code:`'s'`)
isoline_value : float
Value of the isoline specified in the respective unit.
starting_point : dict
Dictionary holding the starting property and its value in the
unit used for plotting the diagram, e.g.
:code:`starting_property{'p': 1e5}`
ending_point : dict
Dictionary holding the ending property and its value in the
unit used for plotting the diagram, e.g.
:code:`ending_property{'p': 1e4}`
Returns
-------
datapoints : dict
Dictionary holding the isoline datapoints for:
- pressure (key=:code:`'p'`)
- specific volume (key=:code:`'vol'`)
- temperature (key=:code:`'T'`)
- enthalpy (key=:code:`'h'`)
- entropy (key=:code:`'s'`)
Example
-------
A full example can be found in the class documentation.
"""
_deprecated_v_props = {
'isoline_property': isoline_property,
'starting_point_property': starting_point_property,
'ending_point_property': ending_point_property,
}
for _attr, _val in _deprecated_v_props.items():
if _val == 'v':
warnings.warn(
f"The value 'v' for {_attr} is deprecated and will be "
"removed in a future release. Use 'vol' instead.",
FutureWarning
)
if isoline_property == 'v':
isoline_property = 'vol'
if starting_point_property == 'v':
starting_point_property = 'vol'
if ending_point_property == 'v':
ending_point_property = 'vol'
if not isinstance(isoline_property, str):
msg = 'Parameter isoline_property must be specified as string!'
raise ValueError(msg)
elif (isoline_property not in self.properties.keys() or
isoline_property == 'Q'):
msg = 'Isoline of type ' + isoline_property + ' not available.'
raise ValueError(msg)
isoline_value = self.convert_to_SI(isoline_value, isoline_property)
if isoline_value_end is None:
isoline_value_end = isoline_value
else:
isoline_value_end = self.convert_to_SI(
isoline_value_end, isoline_property
)
starting_point_value = self.convert_to_SI(
starting_point_value, starting_point_property
)
ending_point_value = self.convert_to_SI(
ending_point_value, ending_point_property
)
isoline_vector = np.linspace(isoline_value, isoline_value_end, 100)
self.state.unspecify_phase()
if isoline_property == 'vol':
if starting_point_property == 'p':
pressure_start = starting_point_value
else:
self._update_state({
starting_point_property: starting_point_value,
isoline_property: isoline_value
})
pressure_start = self.state.p()
if ending_point_property == 'p':
pressure_end = ending_point_value
else:
self._update_state({
ending_point_property: ending_point_value,
isoline_property: isoline_value_end
})
pressure_end = self.state.p()
iterator = [("p", np.geomspace(pressure_start, pressure_end, 100))]
elif isoline_property == 'T':
if starting_point_property == 's':
entropy_start = starting_point_value
else:
self._update_state({
starting_point_property: starting_point_value,
isoline_property: isoline_value
})
entropy_start = self.state.smass()
if ending_point_property == 's':
entropy_end = ending_point_value
else:
self._update_state({
ending_point_property: ending_point_value,
isoline_property: isoline_value_end
})
entropy_end = self.state.smass()
iterator = [("s", np.linspace(entropy_start, entropy_end, 100))]
else:
if starting_point_property == 'vol':
density_start = 1 / starting_point_value
else:
density_start = CP.CoolProp.PropsSI(
"D",
starting_point_property.upper(),
starting_point_value,
isoline_property.upper(),
isoline_value,
self.fluid_string
)
# self._update_state({
# starting_point_property: starting_point_value,
# isoline_property: isoline_value_end
# })
# density_start = self.state.rhomass()
if ending_point_property == 'vol':
density_end = 1 / ending_point_value
else:
density_end = CP.CoolProp.PropsSI(
"D",
ending_point_property.upper(),
ending_point_value,
isoline_property.upper(),
isoline_value_end,
self.fluid_string
)
# this produces a different result, no idea why
# self._update_state({
# ending_point_property: ending_point_value,
# isoline_property: isoline_value_end
# })
# density_end = self.state.rhomass()
if isoline_property == 'p':
density_range = np.geomspace(density_start, density_end, 100)
density_change = np.append(
0, np.diff(density_range)[::-1]
/ (density_range[0] - density_range[-1])
)
isoline_vector = (
isoline_value + density_change.cumsum()
* ( isoline_value - isoline_value_end)
)
iterator = [("vol", 1 / np.geomspace(density_start, density_end, 100))]
datapoints = self._single_isoline(iterator, isoline_property, isoline_vector)
if isoline_property in ["T", "p", "vol"]:
rising = False
if iterator[0][1][0] < iterator[0][1][-1]:
rising = True
if iterator[0][0] == "vol":
rising = not rising
Q_crossings = self._get_Q_crossings(datapoints, isoline_property, rising)
if len(Q_crossings) > 0:
datapoints = self._insert_Q_crossings(datapoints, isoline_property, Q_crossings)
for key in datapoints.keys():
datapoints[key] = self.convert_from_SI(datapoints[key], key)
return datapoints
def _single_isoline(self, iterators, isoline_property, isoline_value):
"""Calculate datapoints for a single isoline."""
datapoints = {'h': [], 'T': [], 'vol': [], 's': [], 'p': [], 'Q': []}
num_points = sum([len(_[1]) for _ in iterators])
if np.isscalar(isoline_value):
datapoints[isoline_property] = np.ones(num_points) * isoline_value
else:
if len(isoline_value) != num_points:
msg = (
"If you pass an array of isoline values instead of a "
"single value, the array length must match the length of "
"all iterators."
)
raise ValueError(msg)
datapoints[isoline_property] = isoline_value
for iterator_property, iterator_values in iterators:
# this is necessary because when changing the iterator
# CoolProp sometimes cannot find values
if iterator_property == "Q" or isoline_property == "Q":
self.state.specify_phase(CP.iphase_twophase)
else:
self.state.unspecify_phase()
datapoints[iterator_property] += iterator_values.tolist()
result_properties = (
set(datapoints.keys())
- {iterator_property, isoline_property}
)
for isoline_value, value in zip(datapoints[isoline_property], iterator_values):
try:
self._update_state({
isoline_property: isoline_value,
iterator_property: value
})
success = True
except ValueError:
success = False
result = np.nan
for result_property in result_properties:
if success:
result = self._get_state_result_by_name(result_property)
datapoints[result_property] += [result]
for key in set(datapoints.keys()) - {isoline_property}:
datapoints[key] = np.asarray(datapoints[key])
return datapoints
def _get_Q_crossings(self, datapoints, property, rising):
"""Return data of Q=0 or Q=1 crossings of specified line."""
num_points = len(datapoints['Q'])
distance_to_gas = 1 - datapoints['Q']
distance_to_liq = datapoints['Q']
idx_gas = np.argwhere(
(distance_to_gas < distance_to_liq) &
(datapoints['Q'] > -1))[:, 0]
idx_liq = np.argwhere(
(distance_to_gas > distance_to_liq) &
(datapoints['Q'] > -1))[:, 0]
data = {}
if idx_gas.size > 1:
if rising:
pos = 0
next_point = 1
else:
pos = -1
next_point = -1
if idx_gas[pos] < num_points - 1 and idx_gas[pos] > 0:
Q1 = datapoints['Q'][idx_gas[pos]]
Q2 = datapoints['Q'][idx_gas[pos] + next_point]
value1 = datapoints[property][idx_gas[pos]]
value2 = datapoints[property][idx_gas[pos] + next_point]
data[1] = value1 + (1 - Q1) / (Q2 - Q1) * (value2 - value1)
if idx_liq.size > 1:
if not rising or property == 'vol':
pos = 0
next_point = 1
else:
pos = -1
next_point = -1
if idx_liq[pos] < num_points - 1 and idx_liq[pos] > 0:
Q1 = datapoints['Q'][idx_liq[pos]]
Q2 = datapoints['Q'][idx_liq[pos] + next_point]
prop1 = datapoints[property][idx_liq[pos]]
prop2 = datapoints[property][idx_liq[pos] + next_point]
data[0] = prop1 + (0 - Q1) / (Q2 - Q1) * (prop2 - prop1)
return data
def _insert_Q_crossings(self, datapoints, property, data):
"""Insert data of Q=0 and Q=1 crossings into specified line."""
for Q, value in data.items():
if property == 'vol':
value = 1 / value
try:
self.state.specify_phase(CP.iphase_twophase)
self.state.update(*CP.CoolProp.generate_update_pair(
self.CoolProp_inputs[property], value, CP.iQ, Q
))
except ValueError:
continue
if property == 'vol':
T = self.state.T()
arg_sorted = np.argsort(datapoints['T'])
idx = np.searchsorted(datapoints['T'], T)
position = arg_sorted[idx]
datapoints['T'] = np.insert(datapoints['T'], position, T)
other_props = ['h', 'p', 's']
else:
rising = datapoints['s'][0] < datapoints['s'][-1]
s = self.state.smass()
if rising:
position = np.searchsorted(datapoints['s'], s)
else:
position = np.searchsorted(-datapoints['s'], -s)
datapoints['s'] = np.insert(datapoints['s'], position, s)
other_props = {'vol', 'T', 'h', 'p'} - {property}
datapoints[property] = np.insert(
datapoints[property], position, value
)
datapoints['Q'] = np.insert(datapoints['Q'], position, Q)
for prop in other_props:
result = self.state.keyed_output(self.CoolProp_inputs[prop])
if prop == 'vol':
result = 1 / result
datapoints[prop] = np.insert(datapoints[prop], position, result)
return datapoints
[docs]
def draw_isolines(self, fig, ax, diagram_type, x_min, x_max, y_min, y_max, isoline_data=None, latex_units=True):
"""Draw the isolines onto an axes within a matplotlib figure
Parameters
----------
fig : matplotlib.pyplot.figure
Figure to draw into
ax : matplotlib.pyplot.axes
Axes to draw into
diagram_type : str
Name of the diagram
x_min : number
Minimum for x range
x_max : number
Maximum for x range
y_min : number
Minimum for y range
y_max : number
Maximum for y range
isoline_data : dict, optional
Dictionary holding additional data on the isolines to be drawn,
by default None. These are per isoline type
- the isoline values with key :code:`values`,
- the isoline style with key :code:`style`,
- the number of labels per isoline :code:`labels_per_line`
(default: 1) and
- label every n-th line only :code:`label_every_nth` (default: 1)
following this structure: {"Q": {"values": np.array([0.0, 1.0])}}
The islonline style is another dictionary holding keyword arguments
of a :code:`matplotlib.lines.Line2D` object. See
https://matplotlib.org/stable/api/_as_gen/matplotlib.lines.Line2D.html
for more information.
latex_units : bool, optional
Axis and isoline labels using LaTeX style units, by default True
"""
if isoline_data is None:
isoline_data = {}
self._check_diagram_types(diagram_type)
x_scale = self.supported_diagrams[diagram_type]['x_scale']
y_scale = self.supported_diagrams[diagram_type]['y_scale']
x_property = self.supported_diagrams[diagram_type]['x_property']
y_property = self.supported_diagrams[diagram_type]['y_property']
ax.clear()
ax.set_ylim([y_min, y_max])
ax.set_xlim([x_min, x_max])
ax.set_xscale(x_scale)
ax.set_yscale(y_scale)
if latex_units:
x_unit = _beautiful_unit_string(self.units[x_property])
y_unit = _beautiful_unit_string(self.units[y_property])
else:
x_unit = self.units[x_property]
y_unit = self.units[y_property]
ax.set_xlabel(f'{x_property} in {x_unit}')
ax.set_ylabel(f'{y_property} in {y_unit}')
for trace in self._iter_isoline_traces(
diagram_type, x_min, x_max, y_min, y_max, isoline_data
):
ax.plot(trace['x'], trace['y'], **trace['style'])
if trace['trace_index'] % trace['label_every_nth'] == 0:
label_positions = [int(trace['label_position'] * len(trace['x']))]
if trace['labels_per_line'] > 1:
label_positions = (
np.linspace(0.05, 0.95, trace['labels_per_line'])
* len(trace['x'])
)
for lp in label_positions:
self._draw_isoline_label(
fig, ax,
trace['isoval'], trace['isoline'],
int(lp),
trace['x'], trace['y'],
x_min, x_max, y_min, y_max, latex_units
)
def _iter_isoline_traces(self, diagram_type, x_min, x_max, y_min, y_max, isoline_data):
"""Yield one dict per visible isoline trace.
Shared by :meth:`draw_isolines` and :meth:`draw_isolines_plotly`.
Each dict contains:
* ``isoline`` – short property key (e.g. ``'p'``, ``'T'``)
* ``trace_index`` – position within this isoline type (for labelling)
* ``x``, ``y`` – data arrays clipped to the plot range
* ``style`` – merged style dict (copy; never mutates stored state)
* ``isoval`` – isoline value in user units
* ``label_position``, ``labels_per_line``, ``label_every_nth``
"""
x_property = self.supported_diagrams[diagram_type]['x_property']
y_property = self.supported_diagrams[diagram_type]['y_property']
for isoline in [k for k in self.properties if k not in (x_property, y_property)]:
prop_data = getattr(self, self.properties[isoline])
isovalues = prop_data["isolines"]
# Resolve per-call overrides without mutating stored state
style = dict(prop_data['style'])
label_position = prop_data['label_position']
label_every_nth = 1
labels_per_line = 1
keys_to_plot = list(isovalues.keys())
if isoline in isoline_data:
override = isoline_data[isoline]
if 'style' in override:
style.update(override['style'])
if 'values' in override:
si_values = self.convert_to_SI(override['values'], isoline).round(8)
keys_to_plot = [
k for k, v in isovalues.items()
if round(v, 8) in si_values
]
if 'label_position' in override:
label_position = override['label_position']
if 'label_every_nth' in override:
label_every_nth = override['label_every_nth']
if 'labels_per_line' in override:
labels_per_line = override['labels_per_line']
for trace_index, isoline_key in enumerate(keys_to_plot):
datapoints = prop_data["isoline_data"][isoline_key]
x = self.convert_from_SI(datapoints[x_property], x_property)
y = self.convert_from_SI(datapoints[y_property], y_property)
indices = np.intersect1d(
np.where((x >= x_min) & (x <= x_max)),
np.where((y >= y_min) & (y <= y_max))
)
if len(indices) == 0:
continue
gap = np.where(np.diff(indices) > 1)[0]
if len(gap) > 0:
indices = np.insert(indices, gap + 1, indices[gap] + 1)
indices = np.insert(indices, gap + 2, indices[gap + 2] - 1)
if indices[0] != 0:
indices = np.insert(indices, 0, indices[0] - 1)
if indices[-1] < len(x) - 1:
indices = np.append(indices, indices[-1] + 1)
isoval = round(
self.convert_from_SI(isovalues[isoline_key], isoline), 8
)
yield {
'isoline': isoline,
'trace_index': trace_index,
'x': x[indices],
'y': y[indices],
'style': style,
'isoval': isoval,
'label_position': label_position,
'label_every_nth': label_every_nth,
'labels_per_line': labels_per_line,
}
[docs]
def draw_isolines_plotly(
self, diagram_type, x_min, x_max, y_min, y_max,
isoline_data=None
):
"""Draw isolines and return a :class:`plotly.graph_objects.Figure`.
The interface mirrors :meth:`draw_isolines`. Isoline styles are
specified with the same matplotlib-style keys (``color``,
``linewidth``, ``linestyle``) and are mapped to plotly equivalents
internally.
Parameters
----------
diagram_type : str
Name of the diagram, same choices as for :meth:`draw_isolines`.
x_min : number
Minimum for x range.
x_max : number
Maximum for x range.
y_min : number
Minimum for y range.
y_max : number
Maximum for y range.
isoline_data : dict, optional
Same structure as for :meth:`draw_isolines`. The ``style``
sub-dict accepts ``color``, ``linewidth`` and ``linestyle``
(matplotlib keys; mapped to plotly ``line.color``, ``line.width``
and ``line.dash`` respectively).
Returns
-------
plotly.graph_objects.Figure
Fully configured figure; add further traces to overlay custom
data (e.g. state points from a cycle simulation).
"""
try:
import plotly.graph_objects as go
except ImportError as e:
raise ImportError(
"Install plotly to use draw_isolines_plotly."
) from e
if isoline_data is None:
isoline_data = {}
self._check_diagram_types(diagram_type)
x_scale = self.supported_diagrams[diagram_type]['x_scale']
y_scale = self.supported_diagrams[diagram_type]['y_scale']
x_property = self.supported_diagrams[diagram_type]['x_property']
y_property = self.supported_diagrams[diagram_type]['y_property']
x_label = f'{x_property} in {self.units[x_property]}'
y_label = f'{y_property} in {self.units[y_property]}'
x_range = [np.log10(x_min), np.log10(x_max)] if x_scale == 'log' else [x_min, x_max]
y_range = [np.log10(y_min), np.log10(y_max)] if y_scale == 'log' else [y_min, y_max]
fig = go.Figure()
fig.update_layout(
xaxis=dict(type=x_scale, range=x_range, title=x_label),
yaxis=dict(type=y_scale, range=y_range, title=y_label)
)
legend_shown = set()
for trace in self._iter_isoline_traces(
diagram_type, x_min, x_max, y_min, y_max, isoline_data
):
isoline = trace['isoline']
fig.add_trace(go.Scatter(
x=trace['x'], y=trace['y'],
mode='lines',
line=_mpl_style_to_plotly_line(trace['style']),
legendgroup=isoline,
name=isoline,
showlegend=isoline not in legend_shown,
))
legend_shown.add(isoline)
return fig
def _check_diagram_types(self, diagram_type):
if not isinstance(diagram_type, str):
msg = (
'The diagram_type must be specified as string. Available '
f'inputs are: {", ".join(self.supported_diagrams.keys())}.'
)
raise TypeError(msg)
elif diagram_type not in self.supported_diagrams.keys():
msg = (
'The specified diagram_type is not available. Available '
f'types are: {", ".join(self.supported_diagrams.keys())}.'
)
raise ValueError(msg)
[docs]
def convert_to_SI(self, value, property):
"""Convert a value from the active unit to SI.
Delegates to :meth:`fluprodia._units.Units.to_SI`. Works on scalars
and NumPy arrays alike.
"""
return self.units.to_SI(value, property)
[docs]
def convert_from_SI(self, value, property):
"""Convert a value from SI to the active unit.
Delegates to :meth:`fluprodia._units.Units.from_SI`. Works on scalars
and NumPy arrays alike.
"""
return self.units.from_SI(value, property)