Snippets

Devel Ubidots Scrapper of AEMO API for Diamond Energy

Created by David Sánchez last modified
import requests
import time
import datetime
import pytz
import json

SCRAPPER_URL = "https://www.aemo.com.au/aemo/apps/api/report/ELEC_NEM_SUMMARY"
SCRAPPER_DATA_KEY = "ELEC_NEM_SUMMARY"
TOKEN = "BBFF-W8MwKjIR5yOUoI1IJVFzUOQBaWsIwZ"
TIMEZONE = "Australia/Canberra"
""" DEVICE_ID = "5aac597ce694aa67804c6455"
DEVICE_PRICES = "prices"
DEVICE_MEAN_PRICES = "mean-prices" """
DEVICE_ID = "5b0d58a5642ab670a03334c0" #Test
DEVICE_PRICES = "prices-test" #Test
DEVICE_MEAN_PRICES =  "mean-prices-test" #Test

def get_raw_data(url):
    '''
    Obtains raw data from the website to scrap
    '''
    status = 400
    attempts = 0
    print("[INFO] attempting to obtain data from website")

    while status >= 400 and attempts <= 5:
        req = requests.get(url=url)
        status = req.status_code
        attempts += 1
        time.sleep(1)

    if status == 200 or status == 201:
        print("[INFO] Data obtained")
        return req.json()

    return None


def parse_data(data):
    prices = data[SCRAPPER_DATA_KEY]

    return prices


def parse_date(orig_date, time_zone):
    ''' 
    Parses the date and returns a timestamp in milliseconds localized in
    the specified timezone
    date format must be 2018-03-17T02:05:00
    '''
    date, hour = orig_date.split("T")
    date_formated = "{} {}".format(date, hour)
    date_obj = datetime.datetime.strptime(date_formated, "%Y-%m-%d %H:%M:%S")
    tz = pytz.timezone(time_zone)
    timestamp = round(tz.localize(date_obj).timestamp()*1000)

    return timestamp


def get_formatted_context(date_obj, time_zone):
    ''' 
    Parses the date and returns a date formatted char array localized in
    the specified timezone
    date format must be 2018-03-17T02:05:00
    '''
    tz = pytz.timezone(time_zone)
    date_obj_localized = pytz.utc.localize(date_obj).astimezone(tz)
    date = "{}-{}-{}T{}:{}:{}".format(
        date_obj_localized.year,
        date_obj_localized.month,
        date_obj_localized.day,
        date_obj_localized.hour,
        date_obj_localized.minute,
        date_obj_localized.second
    )

    return date


def get_aest_timestamp(orig_date, time_zone):
    tz = pytz.timezone(time_zone)
    date = pytz.utc.localize(orig_date).astimezone(tz)
    return int(date.timestamp()*1000)


def get_formatted_date(timestamp, time_zone):
    ''' 
    Parses the date and returns a date formatted char array localized in
    the specified timezone
    date format must be 2018-03-17T02:05:00
    '''
    date_obj = datetime.datetime.fromtimestamp(timestamp/1000)
    tz = pytz.timezone(time_zone)
    date_obj = tz.localize(date_obj)
    date = "{}-{}-{}T{}:{}:{}".format(
        date_obj.year,
        date_obj.month,
        date_obj.day,
        date_obj.hour,
        date_obj.minute,
        date_obj.second
    )

    timestamp = round(date_obj.timestamp()*1000)

    return (date, timestamp)


def get_minute_second(timestamp):
    date_obj = datetime.datetime.fromtimestamp(timestamp/1000)

    return (date_obj.minute, date_obj.second)


def send_to_ubidots(token, device, payload):
    url = "http://industrial.api.ubidots.com"
    url = "{}/api/v1.6/devices/{}".format(url, device)

    headers = {"X-Auth-Token": token, "Content-Type": "application/json"}

    status = 400
    attempts = 0
    print("[INFO] attempting to send data to Ubidots")

    while status >= 400 and attempts <= 5:
        req = requests.post(url=url, headers=headers, json=payload)
        status = req.status_code
        attempts += 1
        time.sleep(1)

    if status == 200 or status == 201:
        print("[INFO] Data posted properly")
        return (True, req.json())

    print("[ERROR] Could not post data")
    return (False, {})


def format_url_device_id(device_id):
    '''
    creates the url to retrieve data from Ubidots using the device id
    '''
    url = "http://industrial.api.ubidots.com"
    url = "{}/api/v1.6/datasources/{}/variables".format(url, device_id)

    return url


def format_url_device_label(device, variable, lv=False, page_size=1, start=None):
    '''
    creates the url to retrieve data from Ubidots using the device
    and variable labels
    '''
    url = "http://industrial.api.ubidots.com"
    url = "{}/api/v1.6/devices/{}/{}".format(url, device, variable)
    if lv:
        url = "{}/lv".format(url)
    else:
        url = "{}/values?page_size={}".format(url, page_size)
        if start is not None:
            url = "{}&start={}".format(url, start)

    return url


def get_ubi_data(token, device, variable=None, lv=False, page_size=1, start=None, device_id=None):
    if device_id is not None:
        url = format_url_device_id(device_id)
    else:
        url = format_url_device_label(
            device, variable, lv=False, page_size=1, start=None)

    headers = {"X-Auth-Token": token, "Content-Type": "application/json"}

    status = 400
    attempts = 0

    while status >= 400 and attempts <= 5:
        req = requests.get(url=url, headers=headers)
        status = req.status_code
        attempts += 1
        time.sleep(1)

    if status == 200 or status == 201:
        print("[INFO] Data obtained properly")
        return req.json()

    print("[ERROR] Could not retrieve data")
    return None


def check_period_change(last_period, minute):
    if last_period is None:
        return True

    actual_period = int(minute/5)
    return actual_period != last_period


def get_last_period(timestamp):
    actual_timestamp = datetime.datetime.fromtimestamp(timestamp/1000)
    lt = actual_timestamp - datetime.timedelta(minutes=5)
    return lt.timestamp


def extract_last_period_posted(ubi_prices):
    last_period = None
    for result in ubi_prices["results"]:
        if result['label'] == "last-period-scrapped":
            # Last period posted at Ubidots
            try:                
                last_period = result["last_value"]["value"]
            except Exception:
                last_period = None
            return last_period

    return last_period


def check_prices(region_id, price, ubi_prices):
    value = -1
    state = None
    for result in ubi_prices["results"]:
        if result['label'] == region_id.lower():
            value = result["last_value"]["value"]
            state = json.loads(result["last_value"]["context"]).get("state", None)
            return price != value and state is None
    return False


def extract_region_context(region_id, ubi_prices):
    context = {}
    for result in ubi_prices["results"]:
        if result['label'] == region_id.lower():
            context = json.loads(result["last_value"]["context"])
            context.pop('state', None)
            return context

    return context


def parse_price(price, timestamp_aest, timestamp_aemo_api):
    # Tests cases
    timestampDiference = ((timestamp_aemo_api - timestamp_aest) / 1000)
    test_1 = price >= -1000 and price <= 16000
    test_2 = len(str(price)) == 8 and  timestampDiference >= 150 and timestampDiference <= 300
    # Define price according to test case
    if test_1 and test_2:
        return price, True

    return 0, False


def main(args):
    print("\n [INFO] getting started")
    token = TOKEN
    time_zone = TIMEZONE
    device_id = DEVICE_ID
    init_timestamp = int(time.time()*1000)
    init_date_obj = datetime.datetime.utcnow()

    # flag to trigger the rolling if there is a period change
    trigger_rolling = False

    # Obtains data from the specified
    raw_scrapper_data = get_raw_data(SCRAPPER_URL)
    if raw_scrapper_data is None:
        print("[ERROR] Could not obtain data from website")
        return {"status": False, "result": {}, "period_change": False}

    # Parses the raw data and extracts all the prices
    prices = parse_data(raw_scrapper_data)
    payload = {}  # Payload to send in the request
    device = DEVICE_PRICES  # Ubidots device label

    # Checks if the period has changed, possible periods: (0-11)
    print("[INFO] Checking if period has changed")
    price_date = prices[0]["SETTLEMENTDATE"]
    timestamp = parse_date(price_date, time_zone)
    minute, second = get_minute_second(timestamp)
    ubi_prices = get_ubi_data(token, device, device_id=device_id)
    print(ubi_prices)

    # Obtains the last period posted
    last_period = extract_last_period_posted(ubi_prices)
    period_change = check_period_change(last_period, minute)

    # obtains the last period of the rolling window
    last_period_rolling = get_ubi_data(token,
                                       DEVICE_MEAN_PRICES,
                                       variable="last-period-scrapped",
                                       lv=True)
    print(last_period_rolling)
    if len(last_period_rolling['results']) == 0:
        print("[ERROR] Could not obtain the last period rolling")
        return {"status": False, "result": {}, "period_change": False}
    last_period_rolling = last_period_rolling['results'][0]['value']

    # If both periods are different, triggers the rolling script
    trigger_rolling = last_period != last_period_rolling

    # If period has changed, updates the payload
    if period_change:
        date_formatted, timestamp_formatted = get_formatted_date(init_timestamp, time_zone)
        print("[INFO] period changed")
        payload["last-period-scrapped"] = {"value": int(minute/5),
                                           "timestamp": timestamp_formatted
                                           }
        trigger_rolling = True

    # Loops over the retrieved prices and updates the payload
    for region_price in prices:
        region_id = region_price["REGIONID"]
        price_date = region_price["SETTLEMENTDATE"]
        timestamp = parse_date(price_date, time_zone)
        # Check tests
        timestamp_aest = get_aest_timestamp(init_date_obj, time_zone)
        check_price = check_prices(region_id, region_price["PRICE"], ubi_prices)
        is_valid = True
        price = region_price["PRICE"]
        if check_price:
            price, is_valid = parse_price(region_price["PRICE"], timestamp_aest, timestamp)
        minute, second = get_minute_second(timestamp)

        region_id_context = {}
        print("[INFO] attempting to retrieve data for {}".format(region_id))
        region_id_context = extract_region_context(region_id, ubi_prices)

        # If there is a change in the actual time period, adds it
        if period_change or len(region_id_context) == 0:
            date_formatted = get_formatted_context(init_date_obj, time_zone)
            region_id_context["change_timestamp"] = date_formatted
            period_change = True

        if not is_valid:
            trigger_rolling = False
            region_id_context["state"] = "No value"

        payload[region_id] = {"value": price,
                              "context": region_id_context,
                              "timestamp": timestamp}

    print("payload to send")
    print(payload)

    # Sends the result
    status, result = send_to_ubidots(token, DEVICE_PRICES, payload)
    period_change = period_change if status else False
    response_script = {"status": status,
                       "result": result,
                       "period_change": period_change,
                       "trigger_rolling": trigger_rolling }
    return response_script

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.