Snippets

Devel Ubidots Average Calculus Prices for Diamond Energy

Created by David Sánchez last modified
import pytz
import requests
import datetime
import time

# Settings
TOKEN = "BBFF-W8MwKjIR5yOUoI1IJVFzUOQBaWsIwZ"
""" DEVICE = "prices"
DEVICE_MEAN_PRICES = "mean-prices" """
DEVICE = "prices-test"
DEVICE_MEAN_PRICES = "mean-prices-test"
VARIABLES = ["tas1", "qld1", "vic1", "sa1", "nsw1"]
TIMEZONE = "Australia/Canberra"


def get_ubi_data(token, device, variable,
                 lv=False, page_size=1, start=None, end=None):
    url = "http://industrial.api.ubidots.com"
    url = "{}/api/v1.6/devices/{}/{}".format(url, device, variable)
    if lv:
        url = "{}/lv".format(url)
    else:
        url = "{}/values?page_size={}".format(url, page_size)
        if start is not None:
            url = "{}&start={}".format(url, start)
        if end is not None:
            url = "{}&end={}".format(url, end)

    headers = {"X-Auth-Token": token, "Content-Type": "application/json"}

    status = 400
    attempts = 0

    while status >= 400 and attempts <= 5:
        req = requests.get(url=url, headers=headers)
        status = req.status_code
        attempts += 1
        if status >= 400:
            time.sleep(1)

    if status == 200 or status == 201:
        print("[INFO] Data obtained properly")
        return None if len(req.content) == 0 else req.json()

    print("[ERROR] Could not retrieve data")
    return None


def get_minute(timestamp):
    date_obj = datetime.datetime.fromtimestamp(timestamp)

    return (date_obj.minute)


def mean(values):
    return float(sum(values)) / max(len(values), 1)


def parse_date(orig_date, time_zone):
    ''' 
    Parses the date and returns a timestamp in milliseconds localized in
    the specified timezone
    date format must be 2018-03-17T02:05:00
    '''
    date, hour = orig_date.split("T")
    date_formated = "{} {}".format(date, hour)
    date_obj = datetime.datetime.strptime(date_formated, "%Y-%m-%d %H:%M:%S")
    tz = pytz.timezone(time_zone)
    timestamp = round(tz.localize(date_obj).timestamp()*1000)

    return timestamp


def calculateAverage(timestamp, data):
    minutes = get_minute(int(timestamp / 1000))
    # If it is within the first 5 minutes
    if minutes in range(0, 4) or minutes in range(30, 34):
        return 0, False
    
    # If there is no valid data
    if (len(data) == 0):
        return 0, False
    
    # The average of the valid data is calculated
    return float(sum(data)) / max(len(data), 1), True


def parse_price(last_value, data):
    timestamp_aemo_api = last_value['timestamp']
    timestamp_aest = parse_date(last_value['context']['change_timestamp'], TIMEZONE)
    timestampDiference = ((timestamp_aemo_api - timestamp_aest) / 1000)
    test_1 = last_value['value'] >= -1000 and last_value['value'] <= 16000
    test_2 = len(str(last_value['value'])) == 8 and timestampDiference >= 150 and timestampDiference <= 300

    # The price is calculated normal
    if test_1 and test_2:
      return -1000000, True

    values = [dot.get('value') for dot in data if dot.get('context').get('state') is None]
    # It is calculated with the timestamp of ubidots
    if not test_1 and test_2:
        return calculateAverage(timestamp_aemo_api, values)
    # It is calculated with the AEST timestamp
    return calculateAverage(timestamp_aest, values)


def localize_date(date_obj, time_zone):
    tz = pytz.timezone(time_zone)
    date_obj_localized = pytz.utc.localize(date_obj).astimezone(tz)

    return date_obj_localized


def get_formatted_context(date_obj, time_zone):
    date_obj_localized = localize_date(date_obj, time_zone)
    date = "{}-{}-{}T{}:{}:{}".format(
        date_obj_localized.year,
        date_obj_localized.month,
        date_obj_localized.day,
        date_obj_localized.hour,
        date_obj_localized.minute,
        date_obj_localized.second
    )

    return date


def send_to_ubidots(token, device, payload):
    url = "http://industrial.api.ubidots.com"
    url = "{}/api/v1.6/devices/{}".format(url, device)

    headers = {"X-Auth-Token": token, "Content-Type": "application/json"}

    status = 400
    attempts = 0
    print("[INFO] attempting to send data to Ubidots")

    while status >= 400 and attempts <= 5:
        req = requests.post(url=url, headers=headers, json=payload)
        status = req.status_code
        attempts += 1
        if status >= 400:
            time.sleep(1)

    if status == 200 or status == 201:
        print("[INFO] Data posted properly")
        return (True, req.json())

    print("[ERROR] Could not post data")
    return (False, ())


def get_page_size(minute):
    # Number of dots to retrieve
    if minute < 5:
        page_size = 6
    if minute >= 5 and minute < 35:
        page_size = int(minute/5)
    if minute >= 35 and minute < 60:
        page_size = int((minute - 30) / 5)  # Number of dots to retrieve

    # Obtains up to six values since the init date
    if page_size == 0:
        page_size = 1

    return page_size


def create_payload(data, init_date_obj, page_size):
    values = []
    results = []
    tz = pytz.timezone(TIMEZONE)
    date_obj_localized = pytz.utc.localize(init_date_obj).astimezone(tz)
    init_timestamp = int(date_obj_localized.timestamp())

    # boundarie timestamp should be up to 30 minutes ago
    up_boundary_timestamp = init_timestamp + (5 * 60)
    low_boundary_timestamp = init_timestamp - (60 * 5 * (page_size))
    for dot in data:
        timestamp = int(dot["timestamp"] / 1000)
        if timestamp <= up_boundary_timestamp \
                and timestamp >= low_boundary_timestamp:
            value = dot["value"]
            values.append(value)
            results.append(dot)
    return values, results


def check_period_change(token):
    message = "[INFO] Attempting to obtain variable=last-period-scrapped"
    message = "{} from device1=mean-prices, device2=prices".format(message)
    print(message)
    last_price_period = get_ubi_data(token,
                                    DEVICE,
                                    "last-period-scrapped",
                                    lv=True)

    last_mean_price_period = get_ubi_data(token,
                                    DEVICE_MEAN_PRICES,
                                    "last-period-scrapped",
                                    lv=True)

    if last_price_period is None or last_mean_price_period is None:
        print("[ERROR] Culd not obtain last scrapped periods")
        return False

    message = "[INFO] Result obtained for device1={}".format(last_price_period)
    message = "{} , device2={}".format(message, last_mean_price_period)
    print(message)

    return last_mean_price_period != last_price_period


def parse_ubi_data(token, device, variables):
    mean_prices = {}
    init_date_obj = datetime.datetime.utcnow()

    for variable in variables:
        # Obtains last stored timestamp
        print("[INFO] Attempting to obtain data from {}".format(variable))
        result = get_ubi_data(token, device, variable)["results"][0]
        if result is None:
            print("[ERROR] Could not retrieve data")
            return None
        timestamp = int(result["timestamp"] / 1000)
        print("[INFO] timestamp from variable: {}".format(variable))
        minute = get_minute(timestamp)

        page_size = get_page_size(minute)
        print("[INFO] Attempting to retrieve {} values from Ubidots".format(
            page_size)
        )

        data = get_ubi_data(token, device, variable,
                            end=result['timestamp'], page_size=page_size)

        if data is None:
            print("[ERROR] Could not retrieve data")
            return None

        data = data["results"]
        print("[INFO] Ubidots data retrieved:\n{}".format(data))
        values, results = create_payload(data, init_date_obj, page_size)
        if len(values) <= 0:
            print("[ERROR] Data retrieved is not consistent with the period to calculate")
            return None

        timestamp = data[0]['timestamp']  # Last registered timestamp
        print("[INFO] timestamp for payload: {}".format(timestamp))
        date_formatted = get_formatted_context(init_date_obj, TIMEZONE)
        # Check tests
        value, is_valid = parse_price(result, results)
        average = value
        if is_valid and value == -1000000:
            average = mean(values)

        context = {"update-date": date_formatted}
        if not is_valid:
            context["state"] = "No value"
        mean_prices[variable] = {"value": average,
                                 "timestamp": timestamp,
                                 "context": context
                                 }

    minute = get_minute(int(timestamp/1000))
    actual_period = int(minute/5)
    mean_prices['last-period-scrapped'] = {"value": actual_period,
                                           "timestamp": timestamp,
                                           "context": {"update-date": date_formatted}
                                           }
    return mean_prices


def main(args):
    mean_prices = None
    print("[INFO] Checking if period has changed")
    if check_period_change(TOKEN):
        print("[INFO] Period changed, triggering average rolling calculus")
        mean_prices = parse_ubi_data(TOKEN, DEVICE, VARIABLES)
        print("[INFO] payload to send: {}".format(mean_prices))
    if mean_prices is not None:
        print("[INFO] Attempting to send data to Ubidots")
        return {"status": "ok",
                "result": send_to_ubidots(TOKEN, DEVICE_MEAN_PRICES, mean_prices)}
    else:
        print("[INFO] Period has not changed or could not retrieve info from Ubidots")
        return {"status": "ok", "result": None}

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.