#!/usr/bin/env python3
"""
.. module:: horizsection.py
.. moduleauthor:: Viktoria Zubaly, Zoltan Siki
Sample application of Ulyxes PyAPI to measure horizontal section(s).
Several parameters can be set from the command line or from a
JSON configuration file.
Parameters are stored in a config file using JSON format::
log_file: name of log file
log_level: 10/20/30/40/50
log_format: format string for log file
angle_step: angle step in section
station type: Total station type 1200/1100/5500
station_east: Station east coordinate
station_north: Station north coordinate
station_elev: Station elevation
port: Communication port
hz_start: Horizontal start direction bottom
hz_top: Horizontam start direction top
max_angle: Max angle from start direction at bottom
max_top: Max angle from top direction
tolerance: Height tolerance
iteration: number of iterations to find section
height_list: Elevation list for section
Command line parameters::
-l LOG, --log LOG Logfile name "stdout" for screen output
--level LEVEL Log level
--format FORMAT Log format
--step STEP Angle step in section
--type TYPE Total station type
--east EAST Station east
--north NORTH Station north
--elev ELEV Station elevation
-p PORT, --port PORT Communication port
--start START Horizontal start direction
--top TOP Horizontal start direction at top
--max MAX Max angle
--tmax TMAX Max angle at top
--tol TOL Height tolerance
--iter ITER Max iteration to find section
--heights HEIGHTS list of elevations for more sections
--wrt WRT Output file
--pid ID Start of point IDs, default 0
"""
import sys
import os.path
import re
import math
import logging
import argparse
# check PYTHONPATH
if len([p for p in sys.path if 'pyapi' in p]) == 0:
if os.path.isdir('../pyapi/'):
sys.path.append('../pyapi/')
else:
print("pyapi not found")
print("Add pyapi directory to the Python path or start your application from ulyxes/pyapps folder")
sys.exit(1)
from angle import Angle, PI2
from serialiface import SerialIface
from georeader import GeoReader
from csvreader import CsvReader
from csvwriter import CsvWriter
from confreader import ConfReader
from leicatps1200 import LeicaTPS1200
from leicatcra1100 import LeicaTCRA1100
from axis10 import Axis10
from trimble5500 import Trimble5500
from totalstation import TotalStation
from blindorientation import Orientation
from filegen import ObsGen
from robot import Robot
from freestation import Freestation
from anystation import AnyStation
[docs]
class HorizontalSection():
""" Measure a horizontal section at a given elevation
:param ts: total station instance
:param wrt: result writer
:param hoc: height of collimation
:param elev: elevation for section
:param hz_start: start horizontal direction (Angle)
:param step: horizontal step angle (Angle)
:param hz_max: end horizontal direction (radians)
:param maxiter: max iteration to find elevation
:param tol: tolerance for horizontal angle
:param pid: start point id
"""
def __init__(self, ts, wrt, st_east=None, st_north=None, hoc=None, elev=None,
hz_start=None, stepinterval=Angle(45, "DEG"), maxa=Angle(PI2),
maxiter=10, tol=0.02):
""" initialize """
self.ts = ts
self.wrt = wrt
self.st_east = st_east
self.st_north = st_north
self.hoc = hoc # height of collimation
self.elev = elev
self.hz_start = hz_start
self.stepinterval = stepinterval
self.maxa = maxa
self.maxiter = maxiter
self.tol = tol
[docs]
def invalid(self, ans):
""" check observation results are complete
use after GetMeasure
:param ans: result of GetMeasure call
:returns True/False invalid/valid
"""
return self.ts.measureIface.state != self.ts.measureIface.IF_OK or \
'errorCode' in ans or \
'hz' not in ans or 'v' not in ans or 'distance' not in ans or \
ans['distance'] < 0.1
[docs]
def Coords(self, measure):
""" calculate target coordinates
:param measure: disctionary of observations
:returns: dictionary of coords
"""
hd = measure['distance'] * math.sin(measure['v'].GetAngle())
east = self.st_east + hd * math.sin(measure['hz'].GetAngle())
north = self.st_north + hd * math.cos(measure['hz'].GetAngle())
elev = self.hoc + measure['distance'] * math.cos(measure['v'].GetAngle())
return {'east': east, 'north': north, 'elev': elev}
[docs]
def run(self):
""" do the observations in horizontal section """
if self.hz_start is not None:
# rotate to start position, keeping zenith angle
a = self.ts.GetAngles()
self.ts.Move(self.hz_start, a['v'])
startp0 = None
if self.elev is None:
ans = self.ts.Measure() # initial measurement for startpoint
if 'errCode' in ans:
print('FATAL Cannot measure startpoint')
return 1
startp = self.ts.GetMeasure()
if self.invalid(startp):
print('FATAL Cannot measure startpoint')
return 1
# coords of startpoint
coords = self.Coords(startp)
height0 = coords['elev']
else:
height0 = self.elev
w = True
act = Angle(0) # actual angle from startpoint
while act.GetAngle() < self.maxa.GetAngle(): # go around the whole section
ans = self.ts.Measure() # measure distance
if self.ts.measureIface.state != self.ts.measureIface.IF_OK or \
'errCode' in ans:
# skip this and move to next point
logging.warning('Cannot measure point, skip')
self.ts.measureIface.state = self.ts.measureIface.IF_OK
self.ts.MoveRel(self.stepinterval, Angle(0))
act += self.stepinterval
continue
nextp = self.ts.GetMeasure() # get observation data
if self.invalid(nextp):
# cannot measure, skip
logging.warning('Cannot measure point, skip')
self.ts.measureIface.state = self.ts.measureIface.IF_OK
self.ts.MoveRel(self.stepinterval, Angle(0))
act += self.stepinterval
continue
coords = self.Coords(nextp)
height = coords['elev']
index = 0
while abs(height-height0) > self.tol: # looking for right elevation
w = True
zenith = nextp['v'].GetAngle()
hd = math.sin(zenith) * nextp['distance']
dz = height0 - self.hoc # height difference from the height of collimation
alpha = math.atan(abs(dz) / hd)
zenith1 = math.pi / 2.0 + alpha if dz < 0 else math.pi / 2.0 - alpha
self.ts.MoveRel(Angle(0), Angle(zenith1-zenith))
ans = self.ts.Measure()
if 'errCode' in ans:
logging.warning('Cannot measure point, skip')
w = False
break
index += 1
if index > self.maxiter or \
self.ts.measureIface.state != self.ts.measureIface.IF_OK:
w = False
self.ts.measureIface.state = self.ts.measureIface.IF_OK
logging.warning('Missing measurement')
break
nextp = self.ts.GetMeasure()
if self.invalid(nextp):
w = False
break
coords = self.Coords(nextp)
height = coords['elev']
if 'distance' in nextp and startp0 is None:
startp0 = nextp # store first valid point on section
if 'distance' in nextp and w:
#coord = self.Coords(nextp)
res = dict(list(nextp.items()) + list(coords.items()))
self.wrt.WriteData(res)
self.ts.MoveRel(self.stepinterval, Angle(0))
act += self.stepinterval
# rotate back to start
if startp0 is not None:
self.ts.Move(startp0['hz'], startp0['v'])
try:
self.ts.SetRedLaser(0) # turn off red laser if possible
except Exception:
pass
return 0
[docs]
def cmd_params():
""" process command line parameters
parameters can be given by switches or in a json file
"""
# defaults
def_logfile = 'stdout'
def_logging = logging.ERROR
def_format = "%(asctime)s %(levelname)s:%(message)s"
def_type = "1200"
def_angle = 45.0
def_east = None
def_north = None
def_elev = None
def_ih = 0.0 # instrument height
def_port = '/dev/ttyUSB0'
def_start = None
def_top = None
def_max = 359.9
def_tmax = 359.9
def_tol = 0.01
def_iter = 10
def_hlist = None
def_wrt = 'stdout'
def_coords = None # coordinates of reference points for blind orientation
def_pid = 0
hz_start = def_start
levels = None
def_gama = 'gama-local'
if len(sys.argv) == 2 and os.path.exists(sys.argv[1]):
# process JSON config and drop other switches
config_pars = {
'log_file': {'required' : False, 'type': 'logfile', 'default': def_logfile},
'log_level': {'required' : False, 'type': 'int',
'set': [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.FATAL],
'default': def_logging},
'log_format': {'required': False, 'default': def_format},
'angle_step' : {'required': False, 'type': "float", 'default': def_angle},
'station_type': {'required' : False, 'type': 'str', 'set': ['1200', '1100', '5500', 'axis10'], 'default': def_type},
'station_east': {'required' : False, 'type': 'float', 'default': def_east},
'station_north': {'required' : False, 'type': 'float', 'default': def_north},
'station_elev': {'required' : False, 'type': 'float', 'default': def_elev},
'station_ih': {'required' : False, 'type': 'float', 'default': def_ih},
'port': {'required' : False, 'type': 'str', 'default': def_port},
'hz_start': {"required": False, 'type': "str", 'default': def_start},
'hz_top': {"required": False, 'type': "str", 'default': def_top},
'max_angle': {'required': False, 'type': "float", 'default': def_max},
'max_top': {'required': False, 'type': "float", 'default': def_tmax},
'tolerance': {'required': False, 'type': "float", 'default': def_tol},
'iteration': {'required': False, 'type': 'int', 'default': def_iter},
'height_list': {'required': False, 'type': 'list', 'default': def_hlist},
'wrt': {'required': False, 'default': 'stdout'},
'coords': {'required': False, 'default': def_coords},
'pid': {'required' : False, 'type': 'int', 'default': def_pid},
'center_east': {'required' : False, 'type': 'float', 'default': None},
'center_north': {'required' : False, 'type': 'float', 'default': None},
'radius': {'required' : False, 'type': 'float', 'default': None},
'radius_top': {'required' : False, 'type': 'float', 'default': None},
'gama': {'required' : False, 'type': 'str', 'default': None},
'__comment__': {'required': False, 'type': 'str'}
}
cr = ConfReader('HorizontalSection', sys.argv[1], config_pars)
cr.Load()
state, msg_lst = cr.Check()
if state == "FATAL":
print("Config check failed")
for msg in msg_lst:
print(msg)
sys.exit(-1)
if cr.json['log_file'] == 'stdout':
logging.basicConfig(format=cr.json['log_format'],
level=cr.json['log_level'])
else:
logging.basicConfig(format=cr.json['log_format'],
filename=cr.json['log_file'], filemode='a',
level=cr.json['log_level'])
# deffered config warnings
if state == "WARNING":
for msg in msg_lst:
logging.warning(msg)
if cr.json['hz_start'] is not None:
hz_start = Angle(float(cr.json['hz_start']), 'DEG')
if cr.json['hz_top'] is not None:
hz_top = Angle(float(cr.json['hz_top']), 'DEG')
else:
hz_top = hz_start
stepinterval = Angle(cr.json['angle_step'], 'DEG')
stationtype = cr.json['station_type']
east = cr.json['station_east']
north = cr.json['station_north']
elev = cr.json['station_elev']
ih = cr.json['station_ih']
port = cr.json['port']
maxa = Angle(cr.json['max_angle'], "DEG")
if cr.json['max_top'] is not None:
maxt = Angle(cr.json['max_top'], "DEG")
else:
maxt = maxa
tol = cr.json['tolerance']
maxiter = cr.json['iteration']
wrt_file = cr.json['wrt']
coords = cr.json['coords']
levels = cr.json['height_list']
pid = cr.json['pid']
center_east = cr.json['center_east']
center_north = cr.json['center_north']
radius = cr.json['radius']
gama = cr.json['gama']
else:
# process command line switches
parser = argparse.ArgumentParser()
parser.add_argument('-l', '--log', type=str, default=def_logfile,
help=f'Logfile name, default: {def_logfile}, "stdout" for screen output')
parser.add_argument('--log_level', type=int, default=def_logging,
help=f'Log level, default: {def_logging}')
parser.add_argument('--log_format', type=str, default=def_format,
help='Log format, default: time, level, message')
parser.add_argument('--step', type=float, default=def_angle,
help=f'Angle step in section [DEG], default: {def_angle}')
parser.add_argument('--type', type=str, required=False, default=def_type,
help=f'Total station type, default: {def_type}')
parser.add_argument('--east', type=float, default=def_east,
help=f'Station east, default: {def_east}')
parser.add_argument('--north', type=float, default=def_north,
help=f'Station north, default: {def_north}')
parser.add_argument('--elev', type=float, default=def_elev,
help=f'Station elevation, default: {def_elev}')
parser.add_argument('--ih', type=float, default=def_ih,
help=f'Instrument height, default: {def_ih}')
parser.add_argument('-p', '--port', type=str, default=def_port,
help=f'Communication port, default: {def_port}')
parser.add_argument('--start', type=float, default=def_start,
help='Horizontal start direction, default: actual telescope direction')
parser.add_argument('--top', type=float, default=def_top,
help='Horizontal start direction at top, default: same as start')
parser.add_argument('--max', type=float, default=def_max,
help='Max angle, default: whole circle')
parser.add_argument('--tmax', type=float, default=def_tmax,
help='Max angle at top, default: same as max')
parser.add_argument('--tol', type=float, default=def_tol,
help=f'Height tolerance, default: {def_tol}')
parser.add_argument('--iter', type=int, default=def_iter,
help=f'Max iteration to find section, default: {def_iter}')
parser.add_argument('--heights', type=str, default=def_hlist,
help='list of elevations for more sections between double quotes, default: single section at the telescope direction')
parser.add_argument('--wrt', type=str, default=def_wrt,
help=f'Name of output file, default: {def_wrt}')
parser.add_argument('--coords', type=str, default=def_coords,
help=f'Name of coordinate file, default: {def_coords}')
parser.add_argument('--pid', type=int, default=def_pid,
help=f'Starting point ID, default: {def_pid}')
parser.add_argument('--center_east', type=float, default=None,
help='Center point east of section, default: None')
parser.add_argument('--center_north', type=float, default=None,
help='Center point north of section, default: None')
parser.add_argument('--radius', type=float, default=None,
help='Radius of section at bottom section, default: None')
parser.add_argument('--radius_top', type=float, default=None,
help='Radius of section at top section, default: None')
parser.add_argument('--gama', type=str, default=def_gama,
help=f'Path to gama-local, default: {def_gama}')
args = parser.parse_args()
if args.log == "stdout":
logging.basicConfig(format=args.format, filemode='a',
level=args.level)
else:
logging.basicConfig(format=args.format, filename=args.log, filemode='a',
level=args.level)
hz_start = None
if args.start is not None:
hz_start = Angle(args.start, 'DEG')
stepinterval = Angle(args.step, 'DEG')
stationtype = args.type
east = args.east
north = args.north
elev = args.elev
ih = args.ih
port = args.port
maxa = Angle(args.max, "DEG")
hz_top = None
if args.top is not None:
hz_top = Angle(args.top, 'DEG')
maxt = None
if args.tmax is not None:
maxt = Angle(args.tmax, 'DEG')
tol = args.tol
maxiter = args.iter
wrt_file = args.wrt
coords = args.coords
try:
if args.heights is not None:
levels = [float(l) for l in args.heights.split()]
except Exception:
print("parameter error --heights")
sys.exit(1)
pid = args.pid
center_east = args.center_east
center_north = args.center_north
radius = args.radius
radius_top = args.radius_top
if radius is None and radius_top is not None:
radius = radius_top
if radius_top is None and radius is not None:
radius_top = radius
gama = args.gama
return {'hz_start': hz_start, 'hz_top': hz_top,
'stepinterval': stepinterval,
'stationtype': stationtype,
'east': east, 'north': north, 'elev': elev, 'ih': ih, 'port': port,
'max': maxa, 'maxt': maxt,
'tol': tol, 'iter': maxiter, 'wrt': wrt_file, 'coords': coords,
'levels': levels, 'pid': pid,
'center_east': center_east, 'center_north': center_north,
'radius': radius, 'radius_top': radius_top, 'gama': gama}
if __name__ == "__main__":
# process parameters
params = cmd_params()
# iface for instrument
iface = SerialIface("rs-232", params['port'])
if iface.state != iface.IF_OK:
print("serial error")
sys.exit(1)
# measure interface for instrument
if re.search('120[0-9]$', params['stationtype']):
mu = LeicaTPS1200()
elif re.search('110[0-9]$', params['stationtype']):
mu = LeicaTCRA1100()
elif re.search('550[0-9]$', params['stationtype']):
mu = Trimble5500()
iface.eomRead = b'>'
elif params['stationtype'].lower() == "axis10":
mu = Axis10()
else:
print("unsupported instrument type")
sys.exit(1)
# create instrument
ts = TotalStation(params['stationtype'], mu, iface)
coords = None
ts.SetEDMMode('STANDARD')
orig_dir = ts.GetAngles() # save original direction
# orientation
if params['coords']: # use coords for blind orientation
if re.search(r'\.txt$', params['coords']) or re.search(r'\.csv$', params['coords']):
rd = CsvReader(fname=params['coords'], \
filt=['id', 'east', 'north', 'elev'])
else:
rd = GeoReader(fname=params['coords'], \
filt=['id', 'east', 'north', 'elev'])
coords = rd.Load() # load coordinates of reference points
if 'east' in params and 'north' in params and 'elev' in params and \
params['east'] is not None and params['north'] is not None and \
params['elev'] is not None:
# know station coords
st_coord = [{'id': 'STATION', 'east': params['east'],
'north': params['north'], 'elev': params['elev']}]
og = ObsGen(st_coord + coords, 'STATION', params['ih'])
observations = og.run()
# change to face left
if ts.GetFace()['face'] == ts.FACE_RIGHT:
a = ts.GetAngles()
a['hz'] = (a['hz'] + Angle(180, 'DEG')).Normalize()
a['v'] = (Angle(360, 'DEG') - a['v']).Normalize()
ans = ts.Move(a['hz'], a['v'], 0) # no ATR
if 'errCode' in ans:
logging.fatal("Rotation to face left failed %d", ans['errCode'])
sys.exit(-1)
o = Orientation(observations, ts) # TODO orientation limit????
ans = o.Search()
if 'errCode' in ans:
logging.fatal("Orientation failed %d", ans['errCode'])
sys.exit(-1)
# freestation
r = Robot(observations, st_coord, ts)
obs_out, coo_out = r.run()
fs = Freestation(obs_out, st_coord + coords,
params['gama'], 3, blunders=False) # TODO stdev
st_coord = fs.Adjustment()
else:
# no station coordinates
a_s = AnyStation(coords, ts, params['gama'], params['ih'])
st_coord = a_s.run()
if not st_coord:
print("Cannot calculate station coords")
sys.exit()
ts.SetStation(st_coord[0]['east'], st_coord[0]['north'],
st_coord[0]['elev'], params['ih'])
# set exact orientation on instrument
ts.Move(Angle(0.0), Angle(math.pi/2), 0)
ts.SetOri(Angle(st_coord[0]['ori'], 'GON'))
hoc = st_coord[0]['elev'] + params['ih']
params['east'] = st_coord[0]['east']
params['north'] = st_coord[0]['north']
params['elev'] = st_coord[0]['elev']
else:
# set station coordinates
st_coord = [{'id': 'STATION', 'east': params['east'],
'north': params['north'], 'elev': params['elev']}]
ts.SetStation(params['east'], params['north'],
params['elev'], params['ih'])
hoc = params['elev'] + params['ih']
# writer for coordinates and observations
wrt = CsvWriter(angle='DMS', dist='.3f',
filt=['id', 'east', 'north', 'elev', 'hz', 'v', 'distance'],
fname=params['wrt'], mode='a', sep=';', pid=params['pid'])
wrt.WriteData(st_coord[0]) # add station to output
if isinstance(mu, Trimble5500):
print("Please change to reflectorless EDM mode (MNU 722 from keyboard)")
print("and turn on red laser (MNU 741 from keyboard) and press enter!")
input()
else:
ts.SetEDMMode('RLSTANDARD') # reflectorless distance measurement
try:
ts.SetRedLaser(1) # turn on red laser if possible
except Exception:
pass
if params['center_east'] is not None and \
params['center_north'] is not None and params['radius'] is not None:
center_dir = math.atan2(params['center_east']-params['east'],
params['center_north']-params['north'])
center_dist = math.hypot(params['center_east']-params['east'],
params['center_north']-params['north'])
alpha = math.atan(params['radius'] / center_dist)
params['hz_start'] = Angle(center_dir-alpha).Normalize()
params['max'] = Angle(alpha * 2)
if params['radius'] == params['radius_top'] or params['radius_top'] is None:
params['hz_top'] = params['hz_start']
params['maxt'] = params['max']
else:
alpha1 = math.atan(params['radius_top'] / center_dist)
params['hz_start'] = Angle(center_dir-alpha1).Normalize()
params['maxt'] = Angle(alpha1 * 2)
else:
# turn instrument back to original direction
ts.Move(orig_dir["hz"], orig_dir["v"], 0)
levels = params['levels']
if levels is not None and len(levels) > 1:
if params['hz_start'] is None:
a = ts.GetAngles()
params['hz_start'] = a['hz']
if params['hz_top'] is None:
params['hz_top'] = params['hz_start']
dhz = params['hz_top'].GetAngle("DEG") - params['hz_start'].GetAngle("DEG")
z0 = levels[0]
z1 = levels[-1]
dmax = params['maxt'].GetAngle("DEG") - params['max'].GetAngle("DEG")
for level in levels:
hz = Angle(params['hz_start'].GetAngle("DEG") + (level - z0) / (z1 - z0) * dhz, "DEG")
ma = Angle(params['max'].GetAngle("DEG") + (level - z0) / (z1 - z0) * dmax, "DEG")
h_sec = HorizontalSection(ts, wrt, params['east'], params['north'],
hoc, level, hz,
params['stepinterval'], ma,
params['iter'], params['tol'])
h_sec.run()
else:
if levels is None or len(levels) == 0:
h_sec = HorizontalSection(ts, wrt, params['east'], params['north'],
hoc, None, params['hz_start'],
params['stepinterval'], params['max'],
params['iter'], params['tol'])
else:
h_sec = HorizontalSection(ts, wrt, params['east'], params['north'],
hoc, levels[0], params['hz_start'],
params['stepinterval'], params['max'],
params['iter'], params['tol'])
h_sec.run()