Skip to content

Commit

Permalink
Mypy type safety: round 6
Browse files Browse the repository at this point in the history
  • Loading branch information
TheBB committed Feb 14, 2024
1 parent bc6c147 commit 4c16b78
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 222 deletions.
246 changes: 127 additions & 119 deletions splipy/io/g2.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from __future__ import annotations

from typing import ClassVar, Callable, TextIO, Union, Optional, Type, Literal, Sequence
from types import TracebackType
from pathlib import Path

import numpy as np
from numpy import pi, savetxt
from typing_extensions import Self

from ..curve import Curve
from ..surface import Surface
from ..volume import Volume
from ..splineobject import SplineObject
from ..splinemodel import SplineModel
from ..basis import BSplineBasis
from ..trimmedsurface import TrimmedSurface
from ..utils import flip_and_move_plane_geometry, rotate_local_x_axis
Expand All @@ -16,13 +23,106 @@

class G2(MasterIO):

def read_next_non_whitespace(self):
fstream: TextIO
filename: str
mode: Literal['w', 'r']
trimming_curves: list[TrimmedSurface]

g2_type: ClassVar[list[int]] = [100, 200, 700] # curve, surface, volume identifiers
g2_generators: ClassVar[dict[int, str]] = {
120: 'line',
130: 'circle',
140: 'ellipse',
260: 'cylinder',
292: 'disc',
270: 'sphere',
290: 'torus',
250: 'plane',
210: 'bounded_surface',
261: 'surface_of_linear_extrusion',
} #, 280:cone

def __init__(self, filename: Union[Path, str], mode: Literal["w", "r"] = 'r') -> None:
self.filename = str(filename)
self.trimming_curves = []
self.mode = mode

def __enter__(self) -> Self:
self.fstream = open(self.filename, self.mode).__enter__()
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType]
) -> None:
self.fstream.__exit__(exc_type, exc_val, exc_tb)

def _write_obj(self, obj: SplineObject) -> None:
for i in range(obj.pardim):
if obj.periodic(i):
obj = obj.split_periodic(obj.start(i), i)

self.fstream.write('{} 1 0 0\n'.format(G2.g2_type[obj.pardim-1]))
self.fstream.write('{} {}\n'.format(obj.dimension, int(obj.rational)))
for b in obj.bases:
self.fstream.write('%i %i\n' % (len(b.knots) - b.order, b.order))
self.fstream.write(' '.join('%.16g' % k for k in b.knots))
self.fstream.write('\n')

savetxt(self.fstream, obj.controlpoints.reshape(-1, obj.dimension + obj.rational, order='F'),
fmt='%.16g', delimiter=' ', newline='\n')

def write(self, obj: Union[Sequence[SplineObject], SplineObject, SplineModel]) -> None:
"""Write the object in GoTools format."""
if isinstance(obj, (Sequence, SplineModel)): # input SplineModel or list
for o in obj:
self._write_obj(o)
return

assert isinstance(obj, SplineObject)
self._write_obj(obj)

def read(self) -> list[SplineObject]:
result = []

for line in self.fstream:
line = line.strip()
if not line:
continue

# read object type
objtype, major, minor, patch = map(int, line.split())
if (major, minor, patch) != (1, 0, 0):
raise IOError('Unknown G2 format')

# if obj type is in factory methods (cicle, torus etc), create it now
if objtype in G2.g2_generators:
constructor = getattr(self, G2.g2_generators[objtype])
result.append(constructor())
continue

# for "normal" splines (Curves, Surfaces, Volumes) create it now
pardim = [i for i in range(len(G2.g2_type)) if G2.g2_type[i] == objtype]
if not pardim:
raise IOError('Unknown G2 object type {}'.format(objtype))
result.append(self.splines(pardim[0] + 1))

return result

def read_basis(self) -> BSplineBasis:
ncps, order = map(int, next(self.fstream).split())
kts = list(map(float, next(self.fstream).split()))
return BSplineBasis(order, kts, -1)

def read_next_non_whitespace(self) -> str:
line = next(self.fstream).strip()
while not line:
line = next(self.fstream).strip()
return line

def circle(self):
def circle(self) -> Curve:
self.read_next_non_whitespace()
# dim = int( self.read_next_non_whitespace().strip())
r = float( next(self.fstream).strip())
Expand All @@ -38,7 +138,7 @@ def circle(self):
result.reverse()
return result

def ellipse(self):
def ellipse(self) -> Curve:
self.read_next_non_whitespace()
# dim = int( self.read_next_non_whitespace().strip())
r1 = float( next(self.fstream).strip())
Expand All @@ -55,7 +155,7 @@ def ellipse(self):
result.reverse()
return result

def line(self):
def line(self) -> Curve:
self.read_next_non_whitespace()
# dim = int( self.read_next_non_whitespace().strip())
start = np.array(next(self.fstream).split(), dtype=float)
Expand All @@ -67,14 +167,13 @@ def line(self):
s = np.array(start)
# d /= np.linalg.norm(d)
if not finite:
param = [-state.unlimited, +state.unlimited]
param = np.array([-state.unlimited, +state.unlimited])

result = curve_factory.line(s+d*param[0], s+d*param[1])
if reverse:
result.reverse()
return result


# def cone(self):
# dim = int( self.read_next_non_whitespace().strip())
# r = float( next(self.fstream).strip())
Expand All @@ -87,8 +186,7 @@ def line(self):
# if finite:
# param_v=np.array(next(self.fstream).split(' '), dtype=float)


def cylinder(self):
def cylinder(self) -> Surface:
self.read_next_non_whitespace()
# dim = int( self.read_next_non_whitespace().strip())
r = float( next(self.fstream).strip())
Expand All @@ -98,9 +196,9 @@ def cylinder(self):
finite = next(self.fstream).strip() != '0'
param_u = np.array(next(self.fstream).split(), dtype=float)
if finite:
param_v=np.array(next(self.fstream).split(), dtype=float)
param_v = np.array(next(self.fstream).split(), dtype=float)
else:
param_v=[-state.unlimited, state.unlimited]
param_v = np.array([-state.unlimited, state.unlimited])
swap = next(self.fstream).strip() != '0'

center = center + z_axis*param_v[0]
Expand All @@ -111,7 +209,7 @@ def cylinder(self):
result.swap()
return result

def disc(self):
def disc(self) -> Surface:
self.read_next_non_whitespace()
# dim = int( self.read_next_non_whitespace().strip())
center = np.array(next(self.fstream).split(), dtype=float)
Expand All @@ -135,30 +233,30 @@ def disc(self):
result.swap()
return result

def plane(self):
def plane(self) -> Surface:
self.read_next_non_whitespace()
# dim = int( self.read_next_non_whitespace().strip())
center = np.array(next(self.fstream).split(), dtype=float)
normal = np.array(next(self.fstream).split(), dtype=float)
x_axis = np.array(next(self.fstream).split(), dtype=float)
finite = next(self.fstream).strip() != '0'
if finite:
param_u= np.array(next(self.fstream).split(), dtype=float)
param_v= np.array(next(self.fstream).split(), dtype=float)
param_u = np.array(next(self.fstream).split(), dtype=float)
param_v = np.array(next(self.fstream).split(), dtype=float)
else:
param_u= [-state.unlimited, +state.unlimited]
param_v= [-state.unlimited, +state.unlimited]
param_u = np.array([-state.unlimited, +state.unlimited])
param_v = np.array([-state.unlimited, +state.unlimited])
swap = next(self.fstream).strip() != '0'

result = Surface() * [param_u[1]-param_u[0], param_v[1]-param_v[0]] + [param_u[0],param_v[0]]
result.rotate(rotate_local_x_axis(x_axis, normal))
result = flip_and_move_plane_geometry(result,center,normal)
result.reparam(param_u, param_v)
if(swap):
if swap:
result.swap()
return result

def torus(self):
def torus(self) -> Surface:
self.read_next_non_whitespace()
# dim = int( self.read_next_non_whitespace().strip())
r2 = float( next(self.fstream).strip())
Expand All @@ -178,7 +276,7 @@ def torus(self):
result.swap()
return result

def sphere(self):
def sphere(self) -> Surface:
self.read_next_non_whitespace()
# dim = int( self.read_next_non_whitespace().strip())
r = float( next(self.fstream).strip())
Expand All @@ -195,11 +293,9 @@ def sphere(self):
result.reparam(param_u, param_v)
return result

def splines(self, pardim):
cls = G2.classes[pardim-1]

def splines(self, pardim: int) -> SplineObject:
_, rational = self.read_next_non_whitespace().strip().split()
rational = bool(int(rational))
rational_bool = bool(int(rational))

bases = [self.read_basis() for _ in range(pardim)]
ncps = 1
Expand All @@ -209,20 +305,19 @@ def splines(self, pardim):
cps = [tuple(map(float, next(self.fstream).split()))
for _ in range(ncps)]

args = bases + [cps, rational]
return cls(*args)
return SplineObject.constructor(pardim)(bases, cps, rational=rational_bool)

def surface_of_linear_extrusion(self):
def surface_of_linear_extrusion(self) -> Surface:
self.read_next_non_whitespace()
# dim = int( self.read_next_non_whitespace().strip())
crv = self.splines(1)
normal = np.array(self.read_next_non_whitespace().split(), dtype=float)
finite = next(self.fstream).strip() != '0'
param_u = np.array(next(self.fstream).split(), dtype=float)
if finite:
param_v=np.array(next(self.fstream).split(), dtype=float)
param_v = np.array(next(self.fstream).split(), dtype=float)
else:
param_v=[-state.unlimited, +state.unlimited]
param_v = np.array([-state.unlimited, +state.unlimited])
swap = next(self.fstream).strip() != '0'

result = surface_factory.extrude(crv + normal*param_v[0], normal*(param_v[1]-param_v[0]))
Expand All @@ -232,13 +327,12 @@ def surface_of_linear_extrusion(self):
result.swap()
return result


def bounded_surface(self):
objtype = int( next(self.fstream).strip() )
def bounded_surface(self) -> TrimmedSurface:
objtype = int(next(self.fstream).strip())

# create the underlying surface which all trimming curves are to be applied
if objtype in G2.g2_generators:
constructor = getattr(self, G2.g2_generators[objtype].__name__)
constructor = getattr(self, G2.g2_generators[objtype])
surface = constructor()
elif objtype == 200:
surface = self.splines(2)
Expand All @@ -261,7 +355,7 @@ def bounded_surface(self):
two_curves = []
for crv_type in [parameter_curve_type, space_curve_type]:
if crv_type in G2.g2_generators:
constructor = getattr(self, G2.g2_generators[crv_type].__name__)
constructor = getattr(self, G2.g2_generators[crv_type])
crv = constructor()
elif crv_type == 100:
crv = self.splines(1)
Expand All @@ -275,89 +369,3 @@ def bounded_surface(self):
all_loops.append(one_loop)

return TrimmedSurface(surface.bases[0], surface.bases[1], surface.controlpoints, surface.rational, all_loops, raw=True)

g2_type = [100, 200, 700] # curve, surface, volume identifiers
classes = [Curve, Surface, Volume]
g2_generators = {120:line, 130:circle, 140:ellipse,
260:cylinder, 292:disc, 270:sphere, 290:torus, 250:plane,
210:bounded_surface, 261:surface_of_linear_extrusion} #, 280:cone

def __init__(self, filename):
if filename[-3:] != '.g2':
filename += '.g2'
self.filename = filename
self.trimming_curves = []

def __enter__(self):
return self

def write(self, obj):
if not hasattr(self, 'fstream'):
self.onlywrite = True
self.fstream = open(self.filename, 'w')
if not self.onlywrite:
raise IOError('Could not write to file %s' % (self.filename))

"""Write the object in GoTools format. """
if isinstance(obj[0], SplineObject): # input SplineModel or list
for o in obj:
self.write(o)
return

for i in range(obj.pardim):
if obj.periodic(i):
obj = obj.split(obj.start(i), i)

self.fstream.write('{} 1 0 0\n'.format(G2.g2_type[obj.pardim-1]))
self.fstream.write('{} {}\n'.format(obj.dimension, int(obj.rational)))
for b in obj.bases:
self.fstream.write('%i %i\n' % (len(b.knots) - b.order, b.order))
self.fstream.write(' '.join('%.16g' % k for k in b.knots))
self.fstream.write('\n')

savetxt(self.fstream, obj.controlpoints.reshape(-1, obj.dimension + obj.rational, order='F'),
fmt='%.16g', delimiter=' ', newline='\n')

def read(self):
if not hasattr(self, 'fstream'):
self.onlywrite = False
self.fstream = open(self.filename, 'r')

if self.onlywrite:
raise IOError('Could not read from file %s' % (self.filename))

result = []

for line in self.fstream:
line = line.strip()
if not line:
continue

# read object type
objtype, major, minor, patch = map(int, line.split())
if (major, minor, patch) != (1, 0, 0):
raise IOError('Unknown G2 format')

# if obj type is in factory methods (cicle, torus etc), create it now
if objtype in G2.g2_generators:
constructor = getattr(self, G2.g2_generators[objtype].__name__)
result.append( constructor() )
continue

# for "normal" splines (Curves, Surfaces, Volumes) create it now
pardim = [i for i in range(len(G2.g2_type)) if G2.g2_type[i] == objtype]
if not pardim:
raise IOError('Unknown G2 object type {}'.format(objtype))
pardim = pardim[0] + 1
result.append(self.splines(pardim))

return result

def read_basis(self):
ncps, order = map(int, next(self.fstream).split())
kts = list(map(float, next(self.fstream).split()))
return BSplineBasis(order, kts, -1)

def __exit__(self, exc_type, exc_value, traceback):
if hasattr(self, 'fstream'):
self.fstream.close()
Loading

0 comments on commit 4c16b78

Please sign in to comment.