Source code for dorie.utilities.vtktools.vtkfile

import os
import xml.etree.ElementTree as ET
import base64
import struct
import numpy as np
import warnings

from dorie.utilities.grid import UnstructuredVTKGrid
from dorie.utilities.grid import BaseGrid
from dorie.utilities.warnings import VTKWarning
from dorie.utilities import check_path

[docs]class VTKFile: """ This class does most of the heavy lifting when it comes to reading and operating on ASCII or base64 encoded VTK files. """ def __init__(self, path): if os.path.isfile(path): self.path = path else: raise ValueError("Path {} not readable".format(path)) self.filename = os.path.split(path)[1] self.time = None self.grid = None def __str__(self): return "VTKFile {0}".format(self.path)
[docs] def set_time(self, time): if time < 1E-20: self.time = 0 else: self.time = time
[docs] def read(self, variables=None): """ Wrapper function that determines the type of the vtk files given and passes them along to the respective reader function. :param files: Holds the file(s) to be read :param variables: CellData variables that should be returned (None reads all \ variables, but consumes more memory) :type files: str or list of str :type variables: list of str or None """ try: xml_tree = ET.parse(self.path) except ET.ParseError: raise ET.ParseError("Failed to parse VTK file: {}. Make sure the VTK " "file is encoded in ASCII or base64".format(self.path)) xml_root = xml_tree.getroot() vtk_type = xml_root.attrib["type"] if vtk_type == "StructuredGrid": grid = self._read_vtr(xml_root, variables) elif vtk_type == "UnstructuredGrid": grid = self._read_vtu(xml_root, variables) elif vtk_type == "PUnstructuredGrid": grid = self._read_pvtu(xml_root, variables) else: raise RuntimeError("Problem reading {0}: XML root attribute 'type' must " "either be 'StructuredGrid' or 'UnstructuredGrid' " "(is: {1})".format(self.path,xml_root.attrib["type"])) self.grid = grid return grid
def _read_pvtu(self, root, variables): """ Reads an merges the peace files given by a parallel collection .pvtu file. This function assumes that the grids can simply be stacked. Calls VTKFile.read() for any peace found. """ out = None for grid in root: for layer in grid: if layer.tag == "Piece": filename = os.path.join(os.path.split(self.path)[0],layer.attrib["Source"]) if os.path.isfile(filename): print("adding parallel piece {} to file collection".format(filename)) piece_file = VTKFile(filename) if out == None: out = piece_file.read(variables) else: out = out.merge(other=piece_file.read(variables)) else: raise RuntimeError("Problem reading {}: Piece file {} cannot be found".format(self.path,filename)) return out def _read_vtu(self, root, variables): """ Traverses the VTK XML tree root given in ``root``. Assumes the structure .. literal:: <UnstructuredGrid> --> <Piece> --> <CellData> / <Points> / <Cells> \ --> (variables) :param root: Root of the VTK XML tree :type root: :class:`xml.etree.ElementTree` root :raises RuntimeError: if malformed XML is detected :raises NotImplementedError: if an unsupported data format is encountered :raises AssertionError: if the data length header does not match with created array size .. note:: This function parses ASCII or base64 encoded VTK files. """ byte_order = root.attrib["byte_order"] if byte_order.lower() == "littleendian": byte_sign = "<" elif byte_order.lower() == "bigendian": byte_sign = ">" else: raise NotImplementedError("byte_order attribute must either be LittleEndian or BigEndian") def base64_to_np(data,dtype): data_base64 = data.encode('ascii').strip() data_len, data_content = [v + b"==" for v in data_base64.split(b"==") if v] data_len_unpacked = struct.unpack("I",base64.decodestring(data_len))[0] numpy_data = np.frombuffer(base64.decodestring(data_content),dtype=dtype) assert data_len_unpacked == numpy_data.size * dtype.itemsize return numpy_data def ascii_to_np(data,dtype): numpy_data = np.array(dataArray.text.split(), dtype=dtype) return numpy_data out = {"Points": {}, "CellData": {}, "Cells": {}} for grid in root: for piece in grid: for data in piece: if data.tag == "CellData" or data.tag == "Points" or data.tag == "Cells": for dataArray in data: if dataArray.tag != "DataArray": raise RuntimeError("Unexpected tag: {0} (expected " "DataArray)".format(dataArray.tag)) else: varname = dataArray.attrib["Name"] data_type = dataArray.attrib["type"] dtype = np.dtype(data_type).newbyteorder(byte_sign) if variables is None or varname in variables or varname.lower() in ["coordinates","connectivity","offsets"]: if dataArray.attrib["format"] == "binary": numpy_data = base64_to_np(dataArray.text, dtype) elif dataArray.attrib["format"] == "ascii": numpy_data = ascii_to_np(dataArray.text, dtype) else: raise NotImplementedError("data format must be ascii or binary (base64)") n = int(dataArray.attrib["NumberOfComponents"]) out[data.tag][varname] = numpy_data.reshape(-1,n) ppc = int(out["Cells"]["offsets"][0]) # points per cell pointCoords = out["Points"]["Coordinates"] connectivity = out["Cells"]["connectivity"] # indexes of points of each cell grid = UnstructuredVTKGrid(pointCoords,connectivity,ppc) for cellVariable in out["CellData"]: grid.add_data(cellVariable,out["CellData"][cellVariable]) if variables: unmatched = [v for v in variables if v not in out["CellData"]] if unmatched: warnings.warn("The following variables have not been found: {}"\ .format(", ".join(unmatched)), VTKWarning) return grid def _read_vtr(self, root, variables): raise NotImplementedError
[docs] def plot(self,variables=None): """ Method to plot the contents of a VTK file. Makes use of matplotlib's PolyCollection to plot each grid cell. Vector quantites are shown as quiver plots. .. note:: Currently works for any 2-dimensional grid that has a usable triangulation method (i.e., all unstructured VTK grids). :param variables: Specifies the variables that should be plotted :type variables: list of str or None :returns: List of created figure instances """ # import these libraries only when needed import matplotlib as mpl mpl.use("agg") import matplotlib.pyplot as plt from matplotlib.collections import PolyCollection if self.grid.dim > 2: raise TypeError("Only 1- or 2-dimensional grids are supported") data = self.grid.data() if variables is None: variables = list(data.keys()) cell_centers = self.grid.cellCenters() x, y = cell_centers[...,0], cell_centers[...,1] (xt, yt), c = self.grid.triangulation() polygons = [[(xt[cii],yt[cii]) for cii in ci] for ci in c] figures = [] for variable in variables: try: d = data[variable] except KeyError: warnings.warn("Variable {} not found. Skipping".format(variable), VTKWarning) continue if len(d.shape) > 1: ncomp = d.shape[1] else: ncomp = 1 fig, ax = plt.subplots(1,1,figsize=(10,8)) if ncomp > 1: # we are plotting a vector q = plt.quiver(x, y, d.T[0], d.T[1], angles="xy", scale_units="xy", scale=None, zorder=100) d = np.sqrt(d[:,0]**2 + d[:,1]**2) # use absolute value of d for shading # 1/2 of the RMS of the absolute flux seems to be a decent length scale arrow_length = np.sqrt(np.mean(d.flatten()**2)) / 2 length_significand = round(arrow_length/10**int(np.log10(arrow_length)-1)) length_exponent = int(np.log10(arrow_length)-1) plt.quiverkey(q, 0.9, 1.05, arrow_length, '${0} \\times 10^{{{1}}}$'\ .format(length_significand,length_exponent)) plotargs = dict(zorder=1, edgecolors="k", linewidths=0.5) if np.sign(d.min()) == np.sign(d.max()): clim = None plotargs["cmap"] = plt.get_cmap("Blues") else: maxrange = max(abs(d.min()),abs(d.max())) clim = (-maxrange,maxrange) plotargs["cmap"] = plt.get_cmap("RdBu") collection = PolyCollection(polygons, **plotargs) collection.set_array(d.flatten()) if clim: collection.set_clim(clim) tc = ax.add_collection(collection) if not self.time is None: plt.title("{0}: {1} ($t = ${2:.2e}s)".format(self.filename,variable,self.time)) else: plt.title("{0}: {1}".format(self.filename,variable)) plt.colorbar(tc, format='%.2e') figures.append(fig) plt.close() return figures