Source code for gamaiface

#!/usr/bin/env python
# -*- coding: utf-8 -*-
.. module::
    :platform: Linux, Windows
    :synopsis: interface modul to GNU Gama
           GPL v2.0 license
           Copyright (C) 2010- Zoltan Siki <>

.. moduleauthor::Zoltan Siki <>

import sys
import os
import math
import re
import tempfile
import logging
# for XML
import xml.etree.ElementTree as ET

[docs]class GamaIface(): """ Interface class to GNU Gama """ def __init__(self, gama_path, dimension=3, probability=0.95, stdev_angle=1, stdev_dist=1, stdev_dist1=1.5): """ Initialize a new GamaIface instance. :param gama_path: path to gama-local program :param dimension: dimension of network (int), 1/2/3 :param probability: porbability for statistical tests (0.9/0.95/0.997) (float) :param stdev_angle: standard deviation for directions in cc (float) :param stdev_dist: base standard deviation for distances mm (float) :param stdev_dist1: standard deviation for distances mm/km (float) """ self.dimension = dimension # limit probability to 90/95/99.7% if probability < 0.925: self.probability = 0.9 self.krit = 1.64 elif probability < 0.975: self.probability = 0.95 self.krit = 1.96 else: self.probability = 0.997 self.krit = 2.97 self.stdev_angle = stdev_angle self.stdev_dist = stdev_dist self.stdev_dist1 = stdev_dist1 self.points = [] self.observations = [] if not os.path.isfile(gama_path): logging.error("GNU gama not found") gama_path = None self.gama_path = gama_path
[docs] def add_point(self, point, state='ADJ'): """ Add point to adjustment :param point: point to ad network (dic) :param state: FIX or ADJ (str) """ if [point, "ADJ"] in self.points or [point, "FIX"] in self.points: # avoid duplicated points return self.points.append([point, state])
[docs] def add_observation(self, obs): """ Add observation to adjustment :param obs: observation to add (dic) """ self.observations.append(obs)
[docs] def remove_last_observation(self, st=False): """ remove last observation or station data :param st: False remove single observation, True remove station (Bool) """ if len(self.observations) > 0: if st: o = self.observations.pop() while len(self.observations) > 0 and o.station is None: o = self.observations.pop() else: self.observations.pop()
[docs] def remove_observation(self, fr, to): """ Remove a polar observation TODO it erases hz, v and distance TODO it removes first occurance """ for o in self.observations: if 'station' in o: st = o['station'] elif st == fr and to == o['id']: oo = o break self.observations.remove(oo)
[docs] def adjust(self): """ Export data to GNU Gama xml, adjust the network and read result :returns: result list of adjusment and blunder from GNU Gama """ # gama-local OK? if self.gama_path is None: logging.error("GNU gama path is None") return (None, None) # fix = 0 free network fix = sum([1 for p, s in self.points if s == 'FIX']) adj = sum([1 for p, s in self.points if s == 'ADJ']) if adj == 0 or len(self.observations) < 2: # no unknowns or observations logging.error("GNU gama no unknowns or not enough observations") return (None, None) gama_local = ET.Element('gama-local', {'version': '2.0'}) comment = ET.Comment('Gama XML created by Ulyxes') gama_local.append(comment) network = ET.SubElement(gama_local, 'network', {'axes-xy': 'ne', 'angles': 'left-handed'}) description = ET.SubElement(network, 'description') if self.dimension == 1: description.text = 'GNU Gama 1D network' elif self.dimension == 2: description.text = 'GNU Gama 2D network' elif self.dimension == 3: description.text = 'GNU Gama 3D network' parameters = ET.SubElement(network, 'parameters', {'sigma-apr': '1', 'conf-pr': str(self.probability), 'tol-abs': '1000', 'sigma-act': 'aposteriori'}) #'update-constrained-coordinates': 'yes'}) gama 2.10! points_observations = ET.SubElement(network, 'points-observations', {'distance-stdev': str(self.stdev_dist)+' '+str(self.stdev_dist1), 'direction-stdev': str(self.stdev_angle / 3600.0 * 10000.0), 'angle-stdev': str(math.sqrt(2)*self.stdev_angle/3600.0*10000), 'zenith-angle-stdev': str(self.stdev_angle/3600.0*10000.0)}) for p, s in self.points: attr = {} if self.dimension == 1: attr['id'] = p['id'] if 'elev' in p and p['elev'] is not None: attr['z'] = str(p['elev']) if s == 'FIX': attr['fix'] = 'z' else: if fix == 0: attr['adj'] = 'Z' else: attr['adj'] = 'z' tmp = ET.SubElement(points_observations, 'point', attr) elif self.dimension == 2: attr['id'] = p['id'] if 'east' in p and 'north' in p and \ p['east'] is not None and p['north'] is not None: attr['y'] = str(p['east']) attr['x'] = str(p['north']) if s == 'FIX': attr['fix'] = 'xy' else: if fix == 0: # free network attr['adj'] = 'XY' else: attr['adj'] = 'xy' tmp = ET.SubElement(points_observations, 'point', attr) elif self.dimension == 3: attr['id'] = p['id'] if 'east' in p and 'north' in p and \ p['east'] is not None and p['north'] is not None: attr['y'] = str(p['east']) attr['x'] = str(p['north']) if 'elev' in p and p['elev'] is not None: attr['z'] = str(p['elev']) if s == 'FIX': attr['fix'] = 'xyz' else: if fix == 0: attr['adj'] = 'XYZ' else: attr['adj'] = 'xyz' tmp = ET.SubElement(points_observations, 'point', attr) for o in self.observations: if 'station' in o: # station record attr = {} attr['from'] = o['station'] # instrument height ih = 0 if 'ih' in o: ih = o['ih'] sta = ET.SubElement(points_observations, 'obs', attr) else: # observation th = 0 if 'th' in o: th = o['th'] if self.dimension == 2: # horizontal network if 'hz' in o: attr = {} attr['to'] = o['id'] attr['val'] = str(o['hz'].GetAngle('GON')) tmp = ET.SubElement(sta, 'direction', attr) if 'distance' in o and 'v' in o: # horizontal distance attr = {} hd = math.sin(o['v'].GetAngle()) * o['distance'] attr['to'] = o['id'] attr['val'] = str(hd) tmp = ET.SubElement(sta, 'direction', attr) elif self.dimension == 1: # elevations only pass elif self.dimension == 3: # 3d if 'hz' in o: attr = {} attr['to'] = o['id'] attr['val'] = str(o['hz'].GetAngle('GON')) tmp = ET.SubElement(sta, 'direction', attr) if 'distance' in o: attr = {} attr['to'] = o['id'] attr['val'] = str(o['distance']) attr['from_dh'] = str(ih) attr['to_dh'] = str(th) tmp = ET.SubElement(sta, 's-distance', attr) if 'v' in o: attr = {} attr['to'] = o['id'] attr['val'] = str(o['v'].GetAngle('GON')) attr['from_dh'] = str(ih) attr['to_dh'] = str(th) tmp = ET.SubElement(sta, 'z-angle', attr) else: # unknown dimension logging.error("GNU gama unknown dimension") return (None, None) # generate temp file name with tempfile.NamedTemporaryFile() as f: tmp_name = w = ET.tostring(gama_local).decode('utf-8') with open(tmp_name + '.xml', 'w') as f: f.write(w) # run gama-local status = os.system(self.gama_path + ' ' + tmp_name + '.xml --text ' + tmp_name + '.txt --xml ' + tmp_name + 'out.xml ' + '--cov-band 0') if status != 0: logging.error("GNU gama failed") return (None, None) doc = ET.parse(tmp_name + 'out.xml') root = doc.getroot() if not root.tag.endswith("gama-local-adjustment"): #return res print("***") root_tag = re.sub('gama-local-adjustment$', '', root.tag) p = {} # the single adjusted point from result blunder = {'std-residual': 0} for child in root: if root_tag + "coordinates" == child.tag: for gchild in child: if root_tag + "adjusted" == gchild.tag: for ggchild in gchild: if root_tag + "point" == ggchild.tag: for pdata in ggchild: if root_tag + "id" == pdata.tag: p['id'] = pdata.text elif root_tag + "y" == pdata.tag: p['east'] = float(pdata.text) elif root_tag + "x" == pdata.tag: p['north'] = float(pdata.text) elif root_tag + "z" == pdata.tag: p['elev'] = float(pdata.text) if root_tag + "orientation-shifts" == gchild.tag: for ggchild in gchild: if root_tag + "orientation" == ggchild.tag: for pdata in ggchild: if root_tag + "approx" == pdata.tag: p['approx_ori'] = float(pdata.text) if root_tag + "adj" == pdata.tag: p['ori'] = float(pdata.text) if root_tag + "cov-mat" == gchild.tag: i = 0 idx = ['std_east', 'std_north', 'std_elev', 'std_ori'] for ggchild in gchild: if root_tag + "flt" == ggchild.tag: p[idx[i]] = math.sqrt(float(ggchild.text)) i += 1 if root_tag + "observations" == child.tag: for gchild in child: o = {'std-residual': 0} if gchild.tag in (root_tag + "direction", root_tag + "slope-distance", root_tag + "zenith-angle"): o["type"] = re.sub('^' + root_tag, '', gchild.tag) for ggchild in gchild: if root_tag + "from" == ggchild.tag: o["from"] = ggchild.text if root_tag + "to" == ggchild.tag: o["to"] = ggchild.text if root_tag + "f" == ggchild.tag: o["f"] = float(ggchild.text) if root_tag + "std-residual" == ggchild.tag: o["std-residual"] = float(ggchild.text) if o['std-residual'] > self.krit and \ o['std-residual'] > blunder['std-residual'] and \ o['f'] > 10: # extra observations ratio blunder = o # remove input xml and output xml os.remove(tmp_name + '.xml') os.remove(tmp_name + '.txt') os.remove(tmp_name + 'out.xml') return (p, blunder)
if __name__ == "__main__": # unit test from georeader import GeoReader fname = "/home/siki/GeoEasy_old/data/freestation.geo" gama_path = '/home/siki/GeoEasy/src/gama-local' if len(sys.argv) > 1: fname = sys.argv[1] if fname[-4:] != '.geo' and fname[-4:] != '.coo': fname += '.geo' if not os.path.isfile(fname): print("File not found: " + fname) sys.exit(-1) fn = fname[:-4] # remove extension g = GamaIface(gama_path, 3, 0.95, 1, 1, 1.5) # load coordinates coo = GeoReader(fname=fn + '.coo') while True: w = coo.GetNext() if w is None or len(w) == 0: break if 'id' in w: if w['id'] == '1': g.add_point(w, 'ADJ') else: g.add_point(w, 'FIX') obs = GeoReader(fname=fn + '.geo') # load observations while True: w = obs.GetNext() if w is None or len(w) == 0: break if 'station' in w or \ ('id' in w and 'hz' in w and 'v' in w and 'distance' in w): g.add_observation(w) res = {} while True: last_res = res p, blunder = g.adjust() print(p, blunder) if p is not None: #print(p) pass else: print("None") if blunder is not None: print(blunder) else: print("None") if p is None or 'east' not in p or \ 'north' not in p or 'elev' not in p: print("adjustment failed") break if blunder is not None and blunder['std-residual'] < 1.0: print("blunders removed") break print(f"{blunder['from']} - {blunder['to']} observation removed") g.remove_observation(blunder['from'], blunder['to'])