diff options
author | Rasmus Luha <rasmus.luha@ut.ee> | 2025-04-25 21:43:54 +0300 |
---|---|---|
committer | Rasmus Luha <rasmus.luha@ut.ee> | 2025-04-25 21:43:54 +0300 |
commit | 0a16e0e3e586456cf2e86dbdad4b66787b036a5d (patch) | |
tree | 7e82621e2d06d84801598b195fe3150140431d94 | |
parent | 9ba62c9bf9f19053f6eb664db70eb342812efc38 (diff) |
some restructuring, start telegraf module
-rw-r--r-- | common/core.py | 35 | ||||
-rw-r--r-- | config.py | 19 | ||||
-rw-r--r-- | modules/nifi/core.py | 121 | ||||
-rw-r--r-- | modules/nifi/nifi_utils.py | 45 | ||||
-rw-r--r-- | modules/telegraf/templates/basic_ETL.toml | 26 |
5 files changed, 128 insertions, 118 deletions
diff --git a/common/core.py b/common/core.py index 6160ee3..7fd8cff 100644 --- a/common/core.py +++ b/common/core.py @@ -141,3 +141,38 @@ def inspect_json_top_level_test(json_data, has_list=False): print(f"\nValitud väärtus: '{path}'") return {last_key: path} + +def get_data_values(): + + chosen_json_values = {} + + ##Getting API url and json values + while True: + api_url = input("Palun sisesta andmete API URL: ").strip() + username = "placeholder" + passwd = "placeholder" + + needs_auth = ask_binary_input(prompt="Kas API vajab ka kasutajaga autentimist?(jah/ei): ").strip().lower() == 'jah' + if needs_auth: + username=input("Sisesta kasutajanimi: ") + passwd=input("Sisesta parool: ") + + json_data, api_url_correct = is_app_url_correct(api_url,needs_auth,username,passwd) + + + ## TODO itemite eemaldamise v6malus + if api_url_correct: + while True: + + chosen_json_values.update(inspect_json_top_level_test(json_data)) + ## Testing + print("Oled hetkel valinud järgmised väärtused JSON lõppväärtused: ", ", ".join(chosen_json_values)) + choose_another = ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower() + + if choose_another == 'e': + return chosen_json_values, api_url, username, passwd + else: + choice = ask_binary_input(prompt="\nKas soovid URL-i (m)uuta URL-i või (v)äljuda?(m/v): ",valikud=["m","v"]).strip().lower() + if choice == 'v': + print("Väljun programmist.") + sys.exit() @@ -1,15 +1,15 @@ -INTERACTIVE_MODE=True +INTERACTIVE_MODE=False + + +#### Nifi #### -## Nifi NIFI_HOST="https://127.0.0.1.nip.io" NIFI_DEPLOY=False NIFI_USER="lab08nifiuser" NIFI_PASS="tartunifi2023" -#NIFI_MEASUREMENT_NAME="test_measurementName" -NIFI_MEASUREMENT_NAME="" - +NIFI_MEASUREMENT_NAME="test_measurementName" ## Database DB_URL="http://influxdb:8086/write?db=nifi_weatherData" @@ -17,11 +17,6 @@ DB_USER="admin" DB_PASS="admin" - -############################### - - - ## Needed if Interactive mode turned off API_URL="https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72¤t_weather=true" API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'} @@ -29,3 +24,7 @@ API_USERNAME="Placeholder" API_PASSWORD="Placehoder" PIPELINE_SCHEDULING_PERIOD="5 sec" PIPELINE_NAME="test_pipeline.json" + + + +#### Telegraf #### diff --git a/modules/nifi/core.py b/modules/nifi/core.py index 4ddc268..46455c7 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -1,17 +1,13 @@ ## TODO - check syntax from common import core as common -import config as config +import config from modules.nifi import nifi_utils - from pyfiglet import figlet_format from rich.console import Console -import sys -import json + import shutil -import requests import re -import base64 def introduction(): @@ -21,89 +17,7 @@ def introduction(): print("Valisid Nifi Platformi!\n") -def update_template(file_path, dot_path, new_key, new_value): - - # Step 2: Load the copied JSON - with open(file_path, "r") as f: - data = json.load(f) - - # Step 3: Walk the path (e.g. 'flowContents.processors[0].properties') - keys = dot_path.split(".") - current = data - - for key in keys: - if key.endswith("]"): # Handle list index like processors[0] - list_key = key[:key.index("[")] - index = int(key[key.index("[") + 1 : key.index("]")]) - current = current[list_key][index] - else: - current = current[key] - - # Step 4: Add or update the key - current[new_key] = new_value - print(f"🛠 Added '{new_key}': '{new_value}' at path '{dot_path}'") - - # Step 5: Save back the JSON - with open(file_path, "w") as f: - json.dump(data, f, indent=2) - print("✅ Changes saved.") - - - - -def set_database_credentials(file_path,dot_path): - ## Update URL - update_template(file_path, dot_path, "HTTP URL", config.DB_URL) - - ## Update username - update_template(file_path, dot_path, "username", config.DB_USER) - - ## Update username - update_template(file_path, dot_path, "password", config.DB_PASS) - - - - -def get_data_values(): - - chosen_json_values = {} - - ##Getting API url and json values - while True: - api_url = input("Palun sisesta andmete API URL: ").strip() - username = "placeholder" - passwd = "placeholder" - - needs_auth = common.ask_binary_input(prompt="Kas API vajab ka kasutajaga autentimist?(jah/ei): ").strip().lower() == 'jah' - if needs_auth: - username=input("Sisesta kasutajanimi: ") - passwd=input("Sisesta parool: ") - - json_data, api_url_correct = common.is_app_url_correct(api_url,needs_auth,username,passwd) - - - ## TODO itemite eemaldamise v6malus - if api_url_correct: - while True: - - chosen_json_values.update(common.inspect_json_top_level_test(json_data)) - ## Testing - print("Oled hetkel valinud järgmised väärtused JSON lõppväärtused: ", ", ".join(chosen_json_values)) - choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower() - - if choose_another == 'e': - return chosen_json_values, api_url, username, passwd - else: - choice = common.ask_binary_input(prompt="\nKas soovid URL-i (m)uuta URL-i või (v)äljuda?(m/v): ",valikud=["m","v"]).strip().lower() - if choice == 'v': - print("Väljun programmist.") - sys.exit() - - - - -## TODO - textReplace part -> fix templates def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password): ############### Choosing and modfyfing Template ############## @@ -116,10 +30,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_ #path_parts = el.split(']') path_parts = re.split(r'(?<=\])', el) - - ### Select template - ## TODO - currently has only 2 templates... if needs_SplitJson: template_name="splitJsonETL.json" else: @@ -129,7 +40,6 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_ shutil.copy(f"modules/nifi/templates/{template_name}", new_pipeline_path) - ### Processor editing ## Measurements name defining @@ -142,48 +52,47 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_ if needs_SplitJson: ## SplitJson update split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0]) - update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path) + nifi_utils.update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path) ## EvaluateJsonPath processor setup for key, value in data_values.items() : path_parts = value.split(']') - update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1]) + nifi_utils.update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1]) measurements_name+=f"{key}=${{{key}}}," ## Database Setup - set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties") + nifi_utils.set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties") else: ## EvaluateJsonPath processor setup for key, value in data_values.items() : - update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) + nifi_utils.update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) measurements_name+=f"{key}=${{{key}}}," ## Database Setup - set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") + nifi_utils.set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") ## ReplaceText processor update - making it compatible for timeseries database (influxDB) - update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name[:-1]) # Delete last coma + nifi_utils.update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name[:-1]) # Delete last coma ## Update API call URL - update_template(new_pipeline_path, "flowContents.processors[1].properties", "HTTP URL", api_url) + nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "HTTP URL", api_url) ## Update scheduling Periond on API Calls - update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod) + nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod) ## Add api credentials if api_username != "placeholder": - update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Username", api_username) - update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Password", api_password) - + nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Username", api_username) + nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Password", api_password) +### -############################################### def build_pipeline(): if config.INTERACTIVE_MODE: - data_values, api_url, api_username, api_password= get_data_values() + data_values, api_url, api_username, api_password= common.get_data_values() print("\nKui tihti peaks andmekonveier jooksma? (sekundites)") schedulingPeriod = str(common.ask_digit_input(86400))+ "sec" @@ -202,7 +111,6 @@ def build_pipeline(): print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.") - ## Pipeline Deployment if (config.NIFI_DEPLOY): token = nifi_utils.get_access_token() @@ -212,4 +120,3 @@ def build_pipeline(): if choice == "jah": token = nifi_utils.get_access_token() nifi_utils.upload_nifi_pipeline(token, "pipelines/test_pipeline.json", "test_pipeline", username=config.NIFI_USER, password=config.NIFI_PASS, nifi_url=config.NIFI_HOST, position_x=0, position_y=0) - diff --git a/modules/nifi/nifi_utils.py b/modules/nifi/nifi_utils.py index 88953d6..74cba81 100644 --- a/modules/nifi/nifi_utils.py +++ b/modules/nifi/nifi_utils.py @@ -1,6 +1,49 @@ -import requests import config +import requests +import sys +import json + + +def update_template(file_path, dot_path, new_key, new_value): + + # Step 2: Load the copied JSON + with open(file_path, "r") as f: + data = json.load(f) + + # Step 3: Walk the path (e.g. 'flowContents.processors[0].properties') + keys = dot_path.split(".") + current = data + + for key in keys: + if key.endswith("]"): # Handle list index like processors[0] + list_key = key[:key.index("[")] + index = int(key[key.index("[") + 1 : key.index("]")]) + current = current[list_key][index] + else: + current = current[key] + + # Step 4: Add or update the key + current[new_key] = new_value + print(f"🛠 Added '{new_key}': '{new_value}' at path '{dot_path}'") + + # Step 5: Save back the JSON + with open(file_path, "w") as f: + json.dump(data, f, indent=2) + print("✅ Changes saved.") + +def set_database_credentials(file_path,dot_path): + ## Update URL + update_template(file_path, dot_path, "HTTP URL", config.DB_URL) + + ## Update username + update_template(file_path, dot_path, "username", config.DB_USER) + + ## Update username + update_template(file_path, dot_path, "password", config.DB_PASS) + + + # export TOKEN=$(curl -k -X POST https://127.0.0.1.nip.io/nifi-api/access/token\ diff --git a/modules/telegraf/templates/basic_ETL.toml b/modules/telegraf/templates/basic_ETL.toml new file mode 100644 index 0000000..02806c2 --- /dev/null +++ b/modules/telegraf/templates/basic_ETL.toml @@ -0,0 +1,26 @@ +# Telegraf Configuration +[agent] + interval = "10s" # Fetch data every 10 seconds + debug = true + +# Input Plugin: HTTP +[[inputs.http]] + urls = [] #[ "https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72¤t_weather=true" ] + method = "GET" + timeout = #"5s" + headers = { Content-Type = "application/json" } + data_format = "json" + json_query = #"current_weather" + fieldinclude = [] #["temperature", "windspeed"] + #tag_keys = ["temperature", "windspeed"] + + # Measuremens for DB + name_override = "weather_metrics" + + +# Output Plugin: InfluxDB +[[outputs.influxdb]] + urls = [] #["http://influxdb:8086"] + database = "placeholder" #"telegraf_weatherData" + username = "TODO" + password = "TODO" |