Source code for thermostat.importers

from thermostat.core import Thermostat

import pandas as pd
from thermostat.stations import get_closest_station_by_zipcode

from thermostat.eeweather_wrapper import get_indexed_temperatures_eeweather
from eeweather.cache import KeyValueStore
from eeweather.exceptions import ISDDataNotAvailableError
import json

import warnings
import dateutil.parser
import os
import errno
import pytz
from multiprocessing import Pool, cpu_count
from functools import partial
import logging

try:
    NUMBER_OF_CORES = len(os.sched_getaffinity(0))
except AttributeError:
    NUMBER_OF_CORES = cpu_count()
MAX_FTP_CONNECTIONS = 3
AVAILABLE_PROCESSES = min(NUMBER_OF_CORES, MAX_FTP_CONNECTIONS)


logger = logging.getLogger(__name__)


def __prime_eeweather_cache():
    """ Primes the eemeter / eeweather caches by doing a non-existent query
    This creates the cache directories sooner than if they were created
    during normal processing (which can lead to a race condition and missing
    thermostats)
    """
    sql_json = KeyValueStore()
    if sql_json.key_exists('0') is not False:
        raise Exception("eeweather cache was not properly primed. Aborting.")


[docs]def save_json_cache(index, thermostat_id, station, cache_path=None): """ Saves the cached results from eeweather into a JSON file. Parameters ---------- index : pd.DatetimeIndex hourly index used to compute the years needed. thermostat_id : str A unique identifier for the termostat (used for the filename) station : str Station ID used to retrieve the weather data. cache_path : str Directory path to save the cached data """ if cache_path is None: directory = os.path.join( os.curdir, "epathermostat_weather_data") else: directory = os.path.normpath( cache_path) try: os.mkdir(directory) except OSError as e: if e.errno != errno.EEXIST: raise json_cache = {} sqlite_json_store = KeyValueStore() years = index.groupby(index.year).keys() for year in years: filename = "ISD-{station}-{year}.json".format( station=station, year=year) json_cache[filename] = sqlite_json_store.retrieve_json(filename) thermostat_filename = "{thermostat_id}.json".format(thermostat_id=thermostat_id) thermostat_path = os.path.join(directory, thermostat_filename) try: with open(thermostat_path, 'w') as outfile: json.dump(json_cache, outfile) except Exception as e: warnings.warn("Unable to write JSON file: {}".format(e))
[docs]def normalize_utc_offset(utc_offset): """ Normalizes the UTC offset Returns the UTC offset based on the string passed in. Parameters ---------- utc_offset : str String representation of the UTC offset Returns ------- datetime timdelta offset """ try: if int(utc_offset) == 0: utc_offset = "+0" delta = dateutil.parser.parse( "2000-01-01T00:00:00" + str(utc_offset)).tzinfo.utcoffset(None) return delta except (ValueError, TypeError, AttributeError) as e: raise TypeError("Invalid UTC offset: {} ({})".format( utc_offset, e))
[docs]def from_csv(metadata_filename, verbose=False, save_cache=False, shuffle=True, cache_path=None, quiet=None): """ Creates Thermostat objects from data stored in CSV files. Parameters ---------- metadata_filename : str Path to a file containing the thermostat metadata. verbose : boolean Set to True to output a more detailed log of import activity. save_cache: boolean Set to True to save the cached data to a json file (based on Thermostat ID). shuffle: boolean Shuffles the thermostats to give them random ordering if desired (helps with caching). cache_path: str Directory path to save the cached data Returns ------- thermostats : iterator over thermostat.Thermostat objects Thermostats imported from the given CSV input files. """ if quiet: logging.warning('quiet argument has been deprecated. Please remove this flag from your code.') __prime_eeweather_cache() metadata = pd.read_csv( metadata_filename, dtype={ "thermostat_id": str, "zipcode": str, "utc_offset": str, "equipment_type": int, "interval_data_filename": str } ) # Shuffle the results to help alleviate cache issues if shuffle: logging.info("Metadata randomized to prevent collisions in cache.") metadata = metadata.sample(frac=1).reset_index(drop=True) p = Pool(AVAILABLE_PROCESSES) multiprocess_func_partial = partial( multiprocess_func, metadata_filename=metadata_filename, verbose=verbose, save_cache=save_cache, cache_path=cache_path) result_list = p.imap(multiprocess_func_partial, metadata.iterrows()) p.close() p.join() # Bad thermostats return None so remove those. results = [x for x in result_list if x is not None] # Check for thermostats that were not loaded and log them metadata_thermostat_ids = set(metadata.thermostat_id) loaded_thermostat_ids = set([x.thermostat_id for x in results]) missing_thermostats = metadata_thermostat_ids.difference(loaded_thermostat_ids) missing_thermostats_num = len(missing_thermostats) if missing_thermostats_num > 0: logging.warning("Unable to load {} thermostat records because of " "errors. Please check the logs for the following thermostats:".format( missing_thermostats_num)) for thermostat in missing_thermostats: logging.warning(thermostat) # Convert this to an iterator to maintain compatibility return iter(results)
[docs]def multiprocess_func(metadata, metadata_filename, verbose=False, save_cache=False, cache_path=None): """ This function is a partial function for multiproccessing and shares the same arguments as from_csv. It is not intended to be called directly.""" i, row = metadata logger.info("Importing thermostat {}".format(row.thermostat_id)) if verbose and logger.getEffectiveLevel() > logging.INFO: print("Importing thermostat {}".format(row.thermostat_id)) # make sure this thermostat type is supported. if row.equipment_type not in [1, 2, 3, 4, 5]: warnings.warn( "Skipping import of thermostat controlling equipment" " of unsupported type. (id={})".format(row.thermostat_id)) return interval_data_filename = os.path.join(os.path.dirname(metadata_filename), row.interval_data_filename) try: thermostat = get_single_thermostat( row.thermostat_id, row.zipcode, row.equipment_type, row.utc_offset, interval_data_filename, save_cache=save_cache, cache_path=cache_path, ) except ValueError as e: # Could not locate a station for the thermostat. Warn and skip. warnings.warn( "Skipping import of thermostat (id={}) for which " "a sufficient source of outdoor weather data could not" "be located using the given ZIP code ({}). This likely " "due to the discrepancy between US Postal Service ZIP " "codes (which do not always map well to locations) and " "Census Bureau ZCTAs (which usually do). Please supply " "a zipcode which corresponds to a US Census Bureau ZCTA." .format(row.thermostat_id, row.zipcode)) return except ISDDataNotAvailableError as e: warnings.warn( "Skipping import of thermostat(id={} because the NCDC " "does not have data: {}" .format(row.thermostat_id, e)) return except Exception as e: warnings.warn( "Skipping import of thermostat(id={}) because of " "the following error: {}" .format(row.thermostat_id, e)) return return thermostat
[docs]def get_single_thermostat(thermostat_id, zipcode, equipment_type, utc_offset, interval_data_filename, save_cache=False, cache_path=None): """ Load a single thermostat directly from an interval data file. Parameters ---------- thermostat_id : str A unique identifier for the thermostat. zipcode : str The zipcode of the thermostat, e.g. `"01234"`. equipment_type : str The equipment type of the thermostat. utc_offset : str A string representing the UTC offset of the interval data, e.g. `"-0700"`. Could also be `"Z"` (UTC), or just `"+7"` (equivalent to `"+0700"`), or any other timezone format recognized by the library method dateutil.parser.parse. interval_data_filename : str The path to the CSV in which the interval data is stored. save_cache: boolean Set to True to save the cached data to a json file (based on Thermostat ID). cache_path: str Directory path to save the cached data Returns ------- thermostat : thermostat.Thermostat The loaded thermostat object. """ df = pd.read_csv(interval_data_filename) heating, cooling, aux_emerg = _get_equipment_type(equipment_type) # load indices dates = pd.to_datetime(df["date"]) daily_index = pd.date_range(start=dates[0], periods=dates.shape[0], freq="D") hourly_index = pd.date_range(start=dates[0], periods=dates.shape[0] * 24, freq="H") hourly_index_utc = pd.date_range(start=dates[0], periods=dates.shape[0] * 24, freq="H", tz=pytz.UTC) # raise an error if dates are not aligned if not all(dates == daily_index): message = ("Dates provided for thermostat_id={} may contain some " "which are out of order, missing, or duplicated.".format(thermostat_id)) raise RuntimeError(message) # load hourly time series values temp_in = pd.Series(_get_hourly_block(df, "temp_in"), hourly_index) if heating: heating_setpoint = pd.Series(_get_hourly_block(df, "heating_setpoint"), hourly_index) else: heating_setpoint = None if cooling: cooling_setpoint = pd.Series(_get_hourly_block(df, "cooling_setpoint"), hourly_index) else: cooling_setpoint = None if aux_emerg: auxiliary_heat_runtime = pd.Series(_get_hourly_block(df, "auxiliary_heat_runtime"), hourly_index) emergency_heat_runtime = pd.Series(_get_hourly_block(df, "emergency_heat_runtime"), hourly_index) else: auxiliary_heat_runtime = None emergency_heat_runtime = None # load outdoor temperatures station = get_closest_station_by_zipcode(zipcode) if station is None: message = "Could not locate a valid source of outdoor temperature " \ "data for ZIP code {}".format(zipcode) raise RuntimeError(message) utc_offset = normalize_utc_offset(utc_offset) temp_out = get_indexed_temperatures_eeweather(station, hourly_index_utc - utc_offset) temp_out.index = hourly_index # Export the data from the cache if save_cache: save_json_cache(hourly_index, thermostat_id, station, cache_path) # load daily time series values if cooling: cool_runtime = pd.Series(df["cool_runtime"].values, daily_index) else: cool_runtime = None if heating: heat_runtime = pd.Series(df["heat_runtime"].values, daily_index) else: heat_runtime = None # create thermostat instance thermostat = Thermostat( thermostat_id, equipment_type, zipcode, station, temp_in, temp_out, cooling_setpoint, heating_setpoint, cool_runtime, heat_runtime, auxiliary_heat_runtime, emergency_heat_runtime ) return thermostat
def _get_hourly_block(df, prefix): columns = ["{}_{:02d}".format(prefix, i) for i in range(24)] values = df[columns].values return values.reshape((values.shape[0] * values.shape[1],)) def _get_equipment_type(equipment_type): """ Returns ------- heating : boolean True if the equipment type has heating equipment cooling : boolean True if the equipment type has cooling equipment aux_emerg : boolean True if the equipment type has auxiliary/emergency heat equipment """ # heating, cooling, aux_emerg equipment_type_dict = { 1: (True, True, True), 2: (True, True, False), 3: (True, True, False), 4: (True, False, False), 5: (False, True, False), } return(equipment_type_dict.get(equipment_type, None))