Source code for backendapp.tags

# -*- coding: utf-8 -*-

#  A web application that stores samples from a collection of NFC sensors.
#
#  https://github.com/cuplsensor/cuplbackend
#
#  Original Author: Malcolm Mackay
#  Email: malcolm@plotsensor.com
#  Website: https://cupl.co.uk
#
#  Copyright (c) 2021. Plotsensor Ltd.
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Affero General Public License
#  as published by the Free Software Foundation, either version 3
#  of the License, or (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Affero General Public License for more details.
#
#  You should have received a copy of the
#  GNU Affero General Public License along with this program.
#  If not, see <https://www.gnu.org/licenses/>.

from ..core import Service
from .models import Tag

from hashids import Hashids
from .simsamples import trhsamples
from wscodec.encoder.pyencoder.encoderfactory import encode
from wscodec.decoder.status import BOR_BIT, SVSH_BIT, WDT_BIT, MISC_BIT, LPM5WU_BIT, CLOCKFAIL_BIT
from ..config import HASHIDS_SALT, HASHIDS_OFFSET


[docs]class TagDecodeFailedError(Exception): """ Tag Decode Failed Error The tag serial number did not decode to the tag id """
[docs] def __init__(self, tagid, serial, decoded): self.description = "Tag serial number {} for tag id {} decodes to {}".format(serial, tagid, decoded)
def __str__(self): return self.description
[docs]class TagService(Service): """Create, find, simulate and delete tags. """ __model__ = Tag hashids = Hashids(min_length=8, salt=HASHIDS_SALT)
[docs] def get_by_serial(self, serial: str) -> Tag: """ Find a :py:class:`Tag` in the database. Raise a 404 :py:exc:`HTTPException` if none exists. :param serial: 8 character alphanumeric string. :return: The tag. """ return self.first_or_404(serial=serial)
[docs] def create(self, **kwargs) -> Tag: """ Return a new :py:class:`Tag` instance, which is saved into the database. :param **kwargs: instance parameters :return: The newly created tag. """ # Call base class constructor. By committing to the db we get an id. tag = super().create(**kwargs) if tag.serial is None: # Generate a serial from the id. serial = self.hashids.encode(HASHIDS_OFFSET + tag.id) # Assign serial to the tag and commit to the db. tag = super().update(tag, serial=serial) return tag
[docs] def simulate(self, id, frontendurl, nsamples=100, smplintervalmins=10, format=1, usehmac=True, batvoltagemv=3000, bor=False, svsh=False, wdt=False, misc=False, lpm5wu=False, clockfail=False, tagerror=False): """ Get URL that would be generated by a tag. """ # Get tag from the database so we can obtain its serial and secretkey tag = self.get(id=id) # Disable https if necessary spliturl = frontendurl.split('://') httpsdisable = False # Assume https by default if spliturl[0] == "http": httpsdisable = True frontendurl = spliturl[1] # Remove protocol from the URL elif spliturl[0] == "https": frontendurl = spliturl[1] # Remove protocol from the URL # Convert battery voltage to an ADC reading batteryadc = (256 * 1500) / batvoltagemv batteryadc = int(batteryadc) # It is vital to convert from float to integer. No type checking is done yet. # Assemble reset cause bitfield resetcause = 0 if bor: resetcause |= BOR_BIT if svsh: resetcause |= SVSH_BIT if wdt: resetcause |= WDT_BIT if misc: resetcause |= MISC_BIT if lpm5wu: resetcause |= LPM5WU_BIT if clockfail: resetcause |= CLOCKFAIL_BIT # Initialise encoder virtualsensor = encode(format=format, baseurl=frontendurl, serial=tag.serial, secretkey=tag.secretkey, smplintervalmins=smplintervalmins, usehmac=usehmac, resetcause=resetcause, resetsalltime=59, batteryadc=batteryadc, httpsdisable=httpsdisable, tagerror=tagerror) # Produce a list of simulated samples. Each is a dictionary with temp and rh keys. # Also store the time offset from UTC now to the most recent sample. trhlist, timeoffsetmins = trhsamples(smplintervalmins=smplintervalmins, nsamples=nsamples) # Load samples into wscodec virtualsensor.pushsamplelist(trhlist) # Update time offset on the virtual sensor in minutes virtualsensor.updateendstop(timeoffsetmins) # Obtain URL urlstr = virtualsensor.get_url() return urlstr