Source code for nrgpy.cloud_api.upload

try:
    from nrgpy import logger
except ImportError:
    pass
from datetime import datetime
import json
from nrgpy.utils.utilities import date_check, draw_progress_bar
from .auth import cloud_api, cloud_url_base, is_authorized
import os
import requests


[docs]class cloud_import(cloud_api): """Uses NRG hosted web-based API to import RLD and CSV/CSV.zip files to existing sites in NRG Cloud. To sign up for the service, go to https://cloud.nrgsystems.com/. Note that the site must exist in the NRG Cloud platform, and you must have Contributor or Administrator level access to the site to use these features. Attributes ---------- in_dir : str (path-like) path to data file directory filename : str provide for single file import site_filter : str, optional text filter for limiting file set filter2 : str, optional another text filter... start_date : str, optional text start date to filter on "YYYY-mm-dd" end_date : str, optional text end date to filter on "YYYY-mm-dd" client_id : str provided by NRG Systems client_secret : str provided by NRG Systems session_token : str headers : str headers passed in API call data : str data passed in API call resp : str API response job_ids : dict dictionary of filenames and job ids Examples -------- Import a single raw data file with NRG Import API >>> import nrgpy >>> filename = "/home/user/data/sympro/000123/000123_2019-05-23_19.00_003672.filename >>> client_id = "go to https://cloud.nrgsystems.com/data-manager/api-setup for access" >>> client_secret = "go to https://cloud.nrgsystems.com/data-manager/api-setup for access" >>> importer = nrgpy.cloud_import( filename=filename, client_id=client_id, client_secret=client_secret, ) Import a folder of data files with NRG Import API >>> import nrgpy >>> file_filter = "000175" >>> in_directory = "filenames" >>> client_id = "go to https://cloud.nrgsystems.com/data-manager/api-setup for access" >>> client_secret = "go to https://cloud.nrgsystems.com/data-manager/api-setup for access" >>> importer = nrgpy.cloud_import( file_filter=file_filter, in_dir=in_directory, client_id=client_id, client_secret=client_secret, start_date="2020-01-01", end_date="2020-01-31", ) >>> importer.process() """ def __init__( self, in_dir="", filename="", site_filter="", filter2="", start_date="1970-01-01", end_date="2150-12-31", client_id="", client_secret="", url_base=cloud_url_base, progress_bar=True, **kwargs, ): """Initialize a cloud_export object. Parameters ---------- in_dir : str (path-like) path to data file directory filename : str provide for single file import site_filter : str, optional text filter for limiting file set filter2 : str, optional another text filter... start_date : str, optional text start date to filter on "YYYY-mm-dd" end_date : str, optional text end date to filter on "YYYY-mm-dd" client_id : str available in the NRG Cloud portal client_secret : str available in the NRG Cloud portal progress_bar : bool, default True whether to display the progress bar """ super().__init__(client_id, client_secret, url_base) self.site_filter = site_filter if "file_filter" in kwargs and site_filter == "": self.file_filter = kwargs.get("file_filter") self.site_filter = self.file_filter self.filter2 = filter2 self.in_dir = in_dir self.start_date = start_date self.end_date = end_date self.progress_bar = progress_bar self.job_ids = {} if filename: self.pad = 1 self.counter = 1 self.raw_count = 1 self.progress_bar = False self.start_time = datetime.now() self.single_file(filename) if in_dir: self.process()
[docs] def process(self): self.start_time = datetime.now() self.files = [ f for f in sorted(os.listdir(self.in_dir)) if self.site_filter in f and self.filter2 in f and f.lower().endswith(tuple(["rld", "csv", "csv.zip"])) and date_check(self.start_date, self.end_date, f) ] self.raw_count = len(self.files) self.pad = len(str(self.raw_count)) + 1 self.counter = 1 for filename in self.files: try: self.single_file(os.path.join(self.in_dir, filename)) self.counter += 1 if not is_authorized(self.resp): break except: pass
[docs] def single_file(self, filename=""): try: if self.progress_bar: draw_progress_bar(self.counter, self.raw_count, self.start_time) else: print( "Processing {0}/{1} ... {2} ... ".format( str(self.counter).rjust(self.pad), str(self.raw_count).ljust(self.pad), os.path.basename(filename), ), end="", flush=True, ) self.encoded_filename_bytes = self.prepare_file_bytes(filename) self.encoded_filename_string = self.encoded_filename_bytes.decode("utf-8") if not self.token_valid(): ( self.session_token, self.session_start_time, ) = self.request_session_token() headers = {"Authorization": "Bearer {}".format(self.session_token)} self.data = { "FileBytes64BitEncoded": self.encoded_filename_string, "FileName": os.path.basename(filename), } self.resp = requests.post( json=self.data, url=self.import_url, headers=headers ) if self.resp.status_code == 200: if self.progress_bar is False: print("[DONE]") self.job_ids[os.path.basename(filename)] = json.loads(self.resp.text)[ "jobId" ] logger.info(f"imported {os.path.basename(filename)} OK") logger.debug(f"{self.resp.status_code} {self.resp.text}") elif self.resp.status_code == 401 or self.resp.status_code == 400: if ("has already been imported" in json.loads(self.resp.text)["apiResponseMessage"]): logger.info(self.resp.text) if self.progress_bar is False: print("[ALREADY IMPORTED]") else: logger.info(f"{os.path.basename(filename)}: {self.resp.text} ") if self.progress_bar is False: print("[PASSED]") pass else: logger.error(f"unable to import {os.path.basename(filename)}: FAILED") print(f"\nunable to process file: {filename}") print(f"{str(self.resp.status_code)} | {self.resp.reason}") print(self.resp.text.split(":")[1].split('"')[1]) except Exception as e: if self.progress_bar is False: print("[FAILED]") logger.error(f"unable to import {os.path.basename(filename)}: FAILED") logger.debug(e) print(f"unable to process file: {filename}")