Source code for nmeagnssunit

#!/usr/bin/env python
"""
.. module:: nmeagnssunit.py
   :platform: Unix, Windows
   :synopsis: Ulyxes - an open source project to drive total stations and
        publish observation results.  GPL v2.0 license Copyright (C)
        2010- Zoltan Siki <siki.zoltan@epito.bme.hu>

.. moduleauthor:: Zoltan Siki <siki.zoltan@epito.bme.hu>,
    Daniel Moka <mokadaniel@citromail.hu>
"""

import re
import logging
from datetime import datetime
from angle import Angle
from measureunit import MeasureUnit

[docs]class NmeaGnssUnit(MeasureUnit): """ NMEA measure unit :param filt: NMEA sentence filter list e.g. ["GGA", "GNS"] :param name: name of nmea unit (str), default 'Nmea Gnss' :param typ: type of nmea unit (str), default None """ # set item positions in NMEA messages MSGS_KEYS = { "GGA": {'datetime': 1, 'latitude': 2, 'ns': 3, 'longitude': 4, 'ew': 5, 'quality': 6, 'nsat': 7, 'hdop': 8, 'altitude': 9, 'alt_unit': 10, 'geoid': 11, 'geoid_unit': 12, 'age': 13, 'ref': 14}, "GNS": {'datetime': 1, 'latitude': 2, 'ns': 3, 'longitude': 4, 'ew': 5, 'quality': 6, 'nsat': 7, 'hdop': 8, 'altitude': 9, 'geoid': 10, 'age': 11, 'ref': 12}, "ZDA": {'datetime': 1, 'day': 2, 'month': 3, 'year': 4, 'offset_h': 5, 'offset_m': 6}, "RMC": {'datetime': 1, 'quality': 2, 'latitude': 3, 'longitude': 4, 'speed': 5, 'angle': 6, 'date': 7, 'magnetic': 8}, "GLL": {'latitude': 1, 'ns': 2, 'longitude': 3, 'ew': 4, 'datetime': 5, 'quality': 6} } SUPPORTED_MSGS = list(MSGS_KEYS.keys()) def __init__(self, filt=None, name='Nmea Gnss', typ=None): """ constructor for nmea measure unit """ # call super class init super().__init__(name, typ) self.filt = self.SUPPORTED_MSGS # default filter accept all if filt is not None: self.filt = [msg for msg in filt if msg in self.SUPPORTED_MSGS] self.date_time = None
[docs] @staticmethod def GetCapabilities(): """ Get instrument specific functions :returns: list of capabilities """ return ['POSITION']
[docs] @staticmethod def NmeaChecksum(msg): """ NMEA message checksum :param msg: NMEA message :returns: True/False """ data, cksum = msg.split('*') cksum1 = 0 for s in data[1:]: cksum1 ^= ord(s) if cksum.lower() != hex(cksum1).lower()[-2:]: return False return True
[docs] @staticmethod def NmeaDateTime(msg): """ get date/time from ZDA message :param msg: NMEA ZDA sentence :returns: dete/time or None """ d = None lst = re.split(r'[,\*]', msg) if len(lst) < 5: logging.error("Invalid ZDA message, few fields: %s", msg) return None try: day = int(lst[2]) month = int(lst[3]) year = int(lst[4]) hour = int(lst[1][0:2]) minute = int(lst[1][2:4]) second = int(lst[1][4:6]) ms = 0 if len(lst[1]) > 6: ms = int(float(lst[1][6:]) * 1000) d = datetime(year, month, day, hour, minute, second, ms) except Exception: logging.error("Invalid date/time: %s", msg) return d
[docs] def Nmea2Coo(self, msg): """ process GGA nmea message :param msg: NMEA GGA/GNS/RMC/GLL sentence :returns: dictionary of data """ msg_type = msg[3:6] msg_keys = self.MSGS_KEYS[msg_type] lst = re.split(r'[,\*]', msg) if len(lst) < len(msg_keys): logging.error("Invalid message, few fields: %s", msg) return None res = {} try: if 'datetime' in msg_keys: if self.date_time is None: d1 = datetime.utcnow() # UTC at server else: d1 = self.date_time # timestamp from ZDA # UTC time from sentence index = msg_keys['datetime'] hour = int(lst[index][0:2]) minute = int(lst[index][2:4]) second = int(lst[index][4:6]) ms = 0 if len(lst[index]) > 6: ms = int(float(lst[index][6:]) * 1000) d = datetime(d1.year, d1.month, d1.day, hour, minute, second, ms) # check day change if abs((d1 - d).total_seconds()) > 10: logging.debug("day change %s - %s", d, d1) d = d1 # TODO time from GGA/GNS overwritten! res['datetime'] = d if 'latitude' in msg_keys: mul = 1 if lst[msg_keys['ns']] == 'N' else -1 res['latitude'] = Angle(mul * float(lst[msg_keys['latitude']]), 'NMEA') if 'longitude' in msg_keys: mul = 1 if lst[msg_keys['ew']] == 'E' else -1 res['longitude'] = Angle(mul * float(lst[msg_keys['longitude']]), 'NMEA') if 'quality' in msg_keys: res['quality'] = lst[msg_keys['quality']] if 'nsat' in msg_keys: res['nsat'] = int(lst[msg_keys['nsat']]) if 'hdop' in msg_keys: res['hdop'] = float(lst[msg_keys['hdop']]) if 'altitude' in msg_keys: res['altitude'] = float(lst[msg_keys['altitude']]) except Exception as e: logging.error('Invalid message, invalid value: %s', msg) logging.error(e) self.date_time = None # drop previous DZA time res = None self.date_time = None # drop previous DZA time return res
[docs] def Result(self, msgs, ans): """ process the answer from GNSS :param msgs: MNEA messages to get (list), None means to use filter :param ans: NMEA message from GNSS unit :returns: processed message or None if msg and ans do not match """ if msgs is None: msgs = self.filt msg = ans[3:len(msgs[0])+3] if msg not in msgs: return None # no process for this message # check checksum if not self.NmeaChecksum(ans): logging.error(' Checksum error') return None res = None if msg == 'ZDA': self.date_time = self.NmeaDateTime(ans) # datetime elif msg in self.SUPPORTED_MSGS: res = self.Nmea2Coo(ans) # position return res
[docs] def MeasureMsg(self): """ NMEA sentence type for lat,lon :returns: accepted messages """ return self.filt
if __name__ == '__main__': nmeaunit = NmeaGnssUnit(("GGA", "ZDA", "GNS")) ans = "$GPZDA,050306,29,10,2003,,*43" print(nmeaunit.Result(None, ans)) #ans = "$GPGGA,183730,3907.356,N,12102.482,W,1,05,1.6,646.4,M,-24.1,M,,*75" ans = "$GNGGA,000501.00,4728.1158076,N,01904.0281549,E,4,12,0.57,124.585,M,39.322,M,1.0,0207*64" print(nmeaunit.Result(None, ans)) ans = "$GNGNS,082456.00,4733.9695486,N,01900.4864959,E,RRNN,17,0.63,198.744,39.430,1.0,0207,V*35" print(nmeaunit.Result(None, ans))