"""Demonstrate what is when we define the CityModel-objects in a top-down approach
"""
import json
from copy import deepcopy
import collections
from typing import Iterable, Mapping
# TODO BD: this iteration is not really nice, maybe implement it in a way that don't need to use .items() and .values()
# for co_id, co in cm.cityobjects.items():
# for geom in co.geometry:
# for srf in geom.surfaces.values():
[docs]class CityObject(object):
"""CityObject class"""
def __init__(self, id,
type: str=None, geometry: Iterable=None,
attributes: Mapping=None,
children: Iterable=None, parents: Iterable=None):
self.id = id
self.type = type
self.geometry = [] if geometry is None else geometry
self.attributes = {} if attributes is None else attributes
self.children = [] if children is None else children
self.parents = [] if parents is None else parents
def __repr__(self):
return self._get_info()
def _get_info(self):
"""Print information about the object"""
info = collections.OrderedDict()
info['id'] = self.id
info['type'] = self.type
info['attributes'] = self.attributes
info['children'] = self.children
info['parents'] = self.parents
gt = set()
gl = set()
sf = set()
if len(self.geometry) > 0:
for geom in self.geometry:
gt.add(geom.type)
gl.add(geom.lod)
if geom.surfaces:
for s_i, srf in geom.surfaces.items():
sf.add(srf['type'])
info['geometry_type'] = list(gt)
info['geometry_lod'] = list(gl)
info['semantic_surfaces'] = list(sf)
return json.dumps(info, indent=2)
[docs] def get_vertices(self):
"""Dump the vertex coordinates of the CityObject into a list"""
vtx = []
for geom in self.geometry:
vtx += geom.get_vertices()
return vtx
[docs] def build_index(self, vtx_lookup: Mapping=None, vtx_idx: int=0):
"""Build a coordinate list and index the vertices for Geometry objects
in the CityObject.
"""
vtx_lookup = {} if vtx_lookup is None else vtx_lookup
geometry = []
for geom in self.geometry:
geom_idx, vtx_lookup, vtx_idx = geom.build_index(vtx_lookup, vtx_idx)
j = geom.to_json()
j['boundaries'] = geom_idx
if len(geom.surfaces) > 0:
geom.build_semantic_surface_index()
j['semantics'] = geom.semantics
geometry.append(j)
return (geometry, vtx_lookup, vtx_idx)
[docs] def to_json(self):
"""Return a dictionary that conforms the CityJSON schema"""
j = dict()
j['type'] = self.type
j['geometry'] = []
if self.attributes:
j['attributes'] = self.attributes
if self.children:
j['children'] = self.children
if self.parents:
j['parents'] = self.parents
return j
[docs]class Geometry(object):
"""CityJSON Geometry object"""
def __init__(self, type: str=None, lod: str=None,
boundaries: Iterable=None, semantics_obj: Mapping=None,
vertices=None, transform=None, texture_obj=None, appearance=None):
self.type = type # TODO: use a property for allowing only the specified types
self.lod = lod
self.boundaries = self._dereference_boundaries(type, boundaries, vertices, transform)
self.surfaces = self._dereference_surfaces(semantics_obj)
self.semantics = {}
self.texture = self._dereference_textures(texture_obj, appearance)
@staticmethod
def _index_surface_boundaries(values):
"""Create an index of the Surfaces which have semantic value in a Geometry boundary
It creates a lookup table for the indicies to the Surfaces in a boundary that have semantics.
The key of the lookup table are the indices of the SemanticSurface objects in the Geometry.surfaces array.
The idea is that by using the index, the geometry of the Surface can be retrieved from
the boundary in O(1) time, instead of looping through the 'semantics.values' and
'boundaries' each time the geometry of a semantic surface needs to be retrieved.
.. note:: Only works with MultiSurface or more complex boundaries
:param values: The array of values from a Geometry Object's `semantics` member
:return: A dict of indices to the surfaces in a boundary.
"""
# TODO BD optimize: Again, here recursion seems to be like a nice alternative
surface_idx = dict()
if not values or len(values) == 0:
return surface_idx
else:
for i, idx in enumerate(values):
if idx is not None:
if isinstance(idx, list):
for j, jdx in enumerate(idx):
if jdx is not None:
if isinstance(jdx, list):
for k, kdx in enumerate(jdx):
if isinstance(kdx, list):
raise TypeError("The 'values' member of 'semantics' is too many levels deep")
if kdx is not None:
if kdx not in surface_idx.keys():
surface_idx[kdx] = [[i,j,k]]
else:
surface_idx[kdx].append([i,j,k])
else:
if jdx not in surface_idx.keys():
surface_idx[jdx] = [[i,j]]
else:
surface_idx[jdx].append([i,j])
else:
if idx not in surface_idx.keys():
surface_idx[idx] = [[i]]
else:
surface_idx[idx].append([i])
return surface_idx
@staticmethod
def _vertex_mapper(ring, vertices, transform):
"""Maps vertex coordinates to vertex indices and/or apply transformation on them
:param ring: A ring defined as an array of vertex indices pointing to ``vertices``
:param vertices: The array of vertices from the CityJSON file
:param transform: A Transform object from CityJSON
:return: The ring with the vertex indices replaced with the vertices
"""
# NOTE BD: it might be ok to simply return the iterator from map()
if vertices is not None:
return list(map(lambda v_i: Geometry._transform_vertex(vertices[v_i], transform), ring))
else:
return [Geometry._transform_vertex(vtx, transform) for vtx in ring]
@staticmethod
def _transform_vertex(vertex, transform):
"""Apply the tranformation from a Transform object to a vertex
:param vertex: A vertex with 3 coordinates, typically (x,y,z)
:type vertex: sequence
:param transform: Transform object from CityJSON
:type transform: dict.
:return: A vertex with transformed coordinates
:type return: sequence
"""
if transform is None:
return vertex
else:
x = deepcopy((vertex[0] * transform["scale"][0]) + transform["translate"][0])
y = deepcopy((vertex[1] * transform["scale"][1]) + transform["translate"][1])
z = deepcopy((vertex[2] * transform["scale"][2]) + transform["translate"][2])
return x,y,z
@staticmethod
def _vertex_indexer(geom, vtx_lookup, vtx_idx):
ret = []
for g in geom:
if not isinstance(g, tuple):
gt = tuple(g)
else:
gt = g
if gt not in vtx_lookup:
vtx_lookup[gt] = vtx_idx
ret.append(vtx_idx)
vtx_idx += 1
else:
ret.append(vtx_lookup[gt])
return (ret, vtx_lookup, vtx_idx)
def _dereference_boundaries(self, btype, boundaries, vertices, transform=None):
"""Replace vertex indices with vertex coordinates in the geomery boundary
:param btype: Boundary type
:param boundaries: Boundary list
:param vertices: Vertex list of CityJSON
:return: Boundary list with the vertex indices replaced with vertex coordinates from the vertex list
"""
# TODO BD optimize: would be much faster with recursion
if not boundaries:
return list()
if btype.lower() == 'multipoint':
if not isinstance(boundaries[0], int):
raise TypeError("Boundary definition does not correspond to MultiPoint")
return self._vertex_mapper(boundaries, vertices, transform)
elif btype.lower() == 'multilinestring':
if not isinstance(boundaries[0][0], int):
raise TypeError("Boundary definition does not correspond to MultiPoint")
return [self._vertex_mapper(b, vertices, transform) for b in boundaries]
elif btype.lower() == 'multisurface' or btype.lower() == 'compositesurface':
s = list()
if not isinstance(boundaries[0][0][0], int):
raise TypeError("Boundary definition does not correspond to MultiSurface or CompositeSurface")
for surface in boundaries:
s.append([self._vertex_mapper(ring, vertices, transform) for ring in surface])
return s
elif btype.lower() == 'solid':
sh = list()
if not isinstance(boundaries[0][0][0][0], int):
raise TypeError("Boundary definition does not correspond to Solid")
for shell in boundaries:
s = list()
for surface in shell:
s.append([self._vertex_mapper(ring, vertices, transform) for ring in surface])
sh.append(s)
return sh
elif btype.lower() == 'multisolid' or btype.lower() == 'compositesolid':
solids = list()
if not isinstance(boundaries[0][0][0][0][0], int):
raise TypeError("Boundary definition does not correspond to MultiSolid or CompositeSolid")
for solid in boundaries:
sh = list()
for shell in solid:
s = list()
for surface in shell:
s.append([self._vertex_mapper(ring, vertices, transform) for ring in surface])
sh.append(s)
solids.append(sh)
return solids
else:
raise TypeError("Unknown geometry type: {}".format(btype))
def _dereference_surfaces(self, semantics_obj):
"""Dereferene a semantic surface
:param semantics_obj: Semantic Surface object as extracted from CityJSON file
"""
semantic_surfaces = dict()
if not semantics_obj or not semantics_obj['values']:
return semantic_surfaces
else:
srf_idx = self._index_surface_boundaries(semantics_obj['values'])
for i,srf in enumerate(semantics_obj['surfaces']):
attributes = dict()
semantic_surfaces[i] = {'surface_idx': srf_idx.get(i)}
for key,value in srf.items():
if key == 'type':
semantic_surfaces[i]['type'] = value
elif key == 'children':
semantic_surfaces[i]['children'] = value
elif key == 'parent':
semantic_surfaces[i]['parent'] = value
else:
attributes[key] = value
if len(attributes) > 0:
semantic_surfaces[i]['attributes'] = attributes
return semantic_surfaces
def _dereference_textures(self, texture_obj, appearance):
'''
Creates a mapping from surfaces to associated textures and vertices-texture
'''
if texture_obj == None or appearance == None:
return {}
texture_idx = {}
num_surfaces = len(self.boundaries)
if self.type == 'Solid':
num_surfaces = len(self.boundaries[0])
for c in range(num_surfaces):
t = {}
for ele in texture_obj:
textures = texture_obj[ele]['values']
if self.type == 'Solid':
textures = textures[0]
textures = textures[c]
d = {'texture':[],'vertices-texture':[]}
for texture_list in textures:
if texture_list == [None]:
continue
d['texture'].append(appearance['textures'][texture_list[0]])
vt = []
for i in texture_list[1:]:
vt.append(appearance['vertices-texture'][i])
d['vertices-texture'].append(vt)
t[ele] = d
texture_idx[c] = t
return texture_idx
[docs] def get_vertices(self):
"""Dump the vertex coordinates into a list"""
# TODO BD optimize: would be much faster with recursion
if not self.boundaries:
return list()
if self.type.lower() == 'multipoint':
return self.boundaries
elif self.type.lower() == 'multilinestring':
return [b for b in self.boundaries]
elif self.type.lower() == 'multisurface' or self.type.lower() == 'compositesurface':
vtx = list()
for surface in self.boundaries:
for ring in surface:
vtx += ring
return vtx
elif self.type.lower() == 'solid':
vtx = list()
for shell in self.boundaries:
for surface in shell:
for ring in surface:
vtx += ring
return vtx
elif self.type.lower() == 'multisolid' or self.type.lower() == 'compositesolid':
vtx = list()
for solid in self.boundaries:
for shell in solid:
for surface in shell:
for ring in surface:
vtx += ring
return vtx
else:
raise TypeError("Unknown geometry type: {}".format(self.type))
[docs] def get_surface_boundaries(self, surface):
"""Get the surface at the index location from the Geometry boundary
.. note:: Interior surfaces don't have semantics and they are returned with the exterior.
:param surface: A semantic surface
:return: Surfaces from the boundary that correspond to the index.
"""
# TODO BD: essentially, this function is meant to returns a MultiSurface,
# which is a collection of surfaces that have semantics --> consider returning
# a Geometry object of MultiSufrace type
if not isinstance(surface, dict):
raise TypeError("surface must be a dict")
if (len(surface) > 0 and 'surface_idx' not in surface):
raise TypeError("surface must be a single surface")
if not surface['surface_idx'] or len(surface['surface_idx']) == 0:
return []
else:
return (self.boundaries[i[0]] if len(i) == 1
else self.boundaries[i[0]][i[1]] if len(i) == 2
else self.boundaries[i[0]][i[1]][i[2]]
for i in surface['surface_idx'])
[docs] def build_index(self, vtx_lookup: Mapping=None, vtx_idx: int=0):
"""Build a coordinate list and index the vertices in the boundary.
This method is used when converting the Geometry to the JSON output.
"""
vtx_lookup = {} if vtx_lookup is None else vtx_lookup
if not self.boundaries:
return ([], vtx_lookup, vtx_idx)
if self.type.lower() == 'multipoint':
bdry, vtx_lookup, vtx_idx = self._vertex_indexer(self.boundaries, vtx_lookup, vtx_idx)
return (bdry, vtx_lookup, vtx_idx)
elif self.type.lower() == 'multilinestring':
mline = list()
for _boundary in self.boundaries:
bdry, vtx_lookup, vtx_idx = self._vertex_indexer(_boundary, vtx_lookup, vtx_idx)
mline.append(bdry)
return (mline, vtx_lookup, vtx_idx)
elif self.type.lower() == 'multisurface' or self.type.lower() == 'compositesurface':
msurface = list()
for _surface in self.boundaries:
r = list()
for _ring in _surface:
bdry, vtx_lookup, vtx_idx = self._vertex_indexer(_ring, vtx_lookup, vtx_idx)
r.append(bdry)
msurface.append(r)
return (msurface, vtx_lookup, vtx_idx)
elif self.type.lower() == 'solid':
shell = list()
for _shell in self.boundaries:
msurface = list()
for _surface in _shell:
r = list()
for _ring in _surface:
bdry, vtx_lookup, vtx_idx = self._vertex_indexer(_ring, vtx_lookup, vtx_idx)
r.append(bdry)
msurface.append(r)
shell.append(msurface)
return (shell, vtx_lookup, vtx_idx)
elif self.type.lower() == 'multisolid' or self.type.lower() == 'compositesolid':
msolid = list()
for solid in self.boundaries:
shell = list()
for _shell in solid:
msurface = list()
for _surface in _shell:
r = list()
for _ring in _surface:
bdry, vtx_lookup, vtx_idx = self._vertex_indexer(_ring, vtx_lookup, vtx_idx)
r.append(bdry)
msurface.append(r)
shell.append(msurface)
msolid.append(shell)
return (msolid, vtx_lookup, vtx_idx)
else:
raise TypeError("Unknown geometry type: {}".format(self.type))
[docs] def build_semantic_surface_index(self):
"""Index the semantic surfaces in way that is stored in JSON."""
# TODO: handle parent-children
self.semantics['surfaces'] = []
if self.type.lower() == 'multisurface':
self.semantics['values'] = [None for i in range(len(self.boundaries))]
elif self.type.lower() == 'solid':
self.semantics['values'] = []
for i in range(len(self.boundaries)):
self.semantics['values'].append([])
for j in range(len(self.boundaries[i])):
self.semantics['values'][i].append(None)
else:
raise ValueError(f"{self.type} is not supported at the moment for semantic surfaces")
for i, srf in self.surfaces.items():
if srf['surface_idx']:
_surface = dict()
_surface['type'] = srf['type']
if 'attributes' in srf:
for attr, value in srf['attributes'].items():
_surface[attr] = value
# TODO: make it work with null-s in semantic surfaces
self.semantics['surfaces'].append(_surface)
# TODO: optimize for loop by switching it with the conditional
for bdry in srf['surface_idx']:
if len(bdry) == 1:
self.semantics['values'][bdry[0]] = i
elif len(bdry) == 2:
self.semantics['values'][bdry[0]][bdry[1]] = i
else:
# There is an unused Semantic Object on the geometry. The Semantic
# Object was imported from the cityjson.
# TODO: log("Removing unused Semantic Object {srf['type']} from the Geometry")
pass
[docs] def get_surfaces(self, type: str=None, lod: str=None):
"""Get the semantic surfaces of the given type
The whole boundary is returned if a geometry does not have semantics, or has a LoD < 2,
or the surface type is not provided.
:param type: Semantic Surface type. If not provided, the whole boundary is returned.
:param lod: Level of Detail
:return: Return a subset of the specific surfaces of the geometry
"""
if (type is None) or (lod and float(lod) < 2.0) or len(self.surfaces) == 0:
return self.boundaries
else:
return {i:srf for i,srf in self.surfaces.items() if srf['type'].lower() == type.lower()}
[docs] def to_json(self):
"""Return a dict that in the CityJSON schema"""
j = dict()
j['type'] = self.type
j['lod'] = self.lod
j['boundaries'] = []
if self.surfaces:
j['semantics'] = {}
return j