Source code for queuewriter

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
.. module:: queuewriter.py
   :platform: Unix, Windows
   :synopsis: Ulyxes - an open source project to drive total stations and
           publish observation results.
           GPL v2.0 license
           Copyright (C) 2010-2020 Zoltan Siki <siki.zoltan@epito.bme.hu>

.. moduleauthor:: Bence TurĂ¡k <turak.bence@epito.bme.hu>
"""

import logging
import queue
from writer import Writer

[docs]class QueueWriter(Writer): """Class to write queue :param qu: queue (Queue), default None :param name: name of writer (str), default None :param angle: angle unit to use (str), default GON :param dist: distance and coordinate format (str), default .3f :param dt: date/time format (str), dafault ansi :param filt: list of keys to output (list), deafult None """ def __init__(self, qu=None, name=None, angle='GON', dist='.3f', \ dt='%Y-%m-%d %H:%M:%S', filt=None): """ Constuctor """ super().__init__(name, angle, dist, dt, filt) if isinstance(qu, queue.Queue): self.q = qu elif queue is None: self.q = queue.Queue() else: raise TypeError('qu must be Queue type!')
[docs] def GetQueue(self): """ method to get queue :returns: queue """ return self.q
[docs] def WriteData(self, data): """ Write observation data to queue :param data: dictonary with observation data :returns: 0/-1/-2 OK/write error/empty not written """ line = {} if data is None or self.DropData(data): logging.warning(" empty or inappropiate data not written") return -3 data = self.ExtendData(data) for key, val in data.items(): if self.filt is None or key in self.filt: line[key] = val if not line: return -2 # skip empty data try: self.q.put(line) except Exception: logging.error(" queue write failed") return -1 return 0
if __name__ == "__main__": from angle import Angle qu = queue.Queue() myQueue = QueueWriter(qu = qu) data = {'hz': Angle(0.12345), 'v': Angle(100.2365, 'GON'), 'dist': 123.6581} myQueue.WriteData(data) myQueue.GetQueue()