summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRasmus Luha <rasmus.luha@ut.ee>2025-04-25 21:43:54 +0300
committerRasmus Luha <rasmus.luha@ut.ee>2025-04-25 21:43:54 +0300
commit0a16e0e3e586456cf2e86dbdad4b66787b036a5d (patch)
tree7e82621e2d06d84801598b195fe3150140431d94
parent9ba62c9bf9f19053f6eb664db70eb342812efc38 (diff)
some restructuring, start telegraf module
-rw-r--r--common/core.py35
-rw-r--r--config.py19
-rw-r--r--modules/nifi/core.py121
-rw-r--r--modules/nifi/nifi_utils.py45
-rw-r--r--modules/telegraf/templates/basic_ETL.toml26
5 files changed, 128 insertions, 118 deletions
diff --git a/common/core.py b/common/core.py
index 6160ee3..7fd8cff 100644
--- a/common/core.py
+++ b/common/core.py
@@ -141,3 +141,38 @@ def inspect_json_top_level_test(json_data, has_list=False):
print(f"\nValitud väärtus: '{path}'")
return {last_key: path}
+
+def get_data_values():
+
+ chosen_json_values = {}
+
+ ##Getting API url and json values
+ while True:
+ api_url = input("Palun sisesta andmete API URL: ").strip()
+ username = "placeholder"
+ passwd = "placeholder"
+
+ needs_auth = ask_binary_input(prompt="Kas API vajab ka kasutajaga autentimist?(jah/ei): ").strip().lower() == 'jah'
+ if needs_auth:
+ username=input("Sisesta kasutajanimi: ")
+ passwd=input("Sisesta parool: ")
+
+ json_data, api_url_correct = is_app_url_correct(api_url,needs_auth,username,passwd)
+
+
+ ## TODO itemite eemaldamise v6malus
+ if api_url_correct:
+ while True:
+
+ chosen_json_values.update(inspect_json_top_level_test(json_data))
+ ## Testing
+ print("Oled hetkel valinud järgmised väärtused JSON lõppväärtused: ", ", ".join(chosen_json_values))
+ choose_another = ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower()
+
+ if choose_another == 'e':
+ return chosen_json_values, api_url, username, passwd
+ else:
+ choice = ask_binary_input(prompt="\nKas soovid URL-i (m)uuta URL-i või (v)äljuda?(m/v): ",valikud=["m","v"]).strip().lower()
+ if choice == 'v':
+ print("Väljun programmist.")
+ sys.exit()
diff --git a/config.py b/config.py
index f5d4554..3b1f304 100644
--- a/config.py
+++ b/config.py
@@ -1,15 +1,15 @@
-INTERACTIVE_MODE=True
+INTERACTIVE_MODE=False
+
+
+#### Nifi ####
-## Nifi
NIFI_HOST="https://127.0.0.1.nip.io"
NIFI_DEPLOY=False
NIFI_USER="lab08nifiuser"
NIFI_PASS="tartunifi2023"
-#NIFI_MEASUREMENT_NAME="test_measurementName"
-NIFI_MEASUREMENT_NAME=""
-
+NIFI_MEASUREMENT_NAME="test_measurementName"
## Database
DB_URL="http://influxdb:8086/write?db=nifi_weatherData"
@@ -17,11 +17,6 @@ DB_USER="admin"
DB_PASS="admin"
-
-###############################
-
-
-
## Needed if Interactive mode turned off
API_URL="https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72&current_weather=true"
API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'}
@@ -29,3 +24,7 @@ API_USERNAME="Placeholder"
API_PASSWORD="Placehoder"
PIPELINE_SCHEDULING_PERIOD="5 sec"
PIPELINE_NAME="test_pipeline.json"
+
+
+
+#### Telegraf ####
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index 4ddc268..46455c7 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -1,17 +1,13 @@
## TODO - check syntax
from common import core as common
-import config as config
+import config
from modules.nifi import nifi_utils
-
from pyfiglet import figlet_format
from rich.console import Console
-import sys
-import json
+
import shutil
-import requests
import re
-import base64
def introduction():
@@ -21,89 +17,7 @@ def introduction():
print("Valisid Nifi Platformi!\n")
-def update_template(file_path, dot_path, new_key, new_value):
-
- # Step 2: Load the copied JSON
- with open(file_path, "r") as f:
- data = json.load(f)
-
- # Step 3: Walk the path (e.g. 'flowContents.processors[0].properties')
- keys = dot_path.split(".")
- current = data
-
- for key in keys:
- if key.endswith("]"): # Handle list index like processors[0]
- list_key = key[:key.index("[")]
- index = int(key[key.index("[") + 1 : key.index("]")])
- current = current[list_key][index]
- else:
- current = current[key]
-
- # Step 4: Add or update the key
- current[new_key] = new_value
- print(f"🛠 Added '{new_key}': '{new_value}' at path '{dot_path}'")
-
- # Step 5: Save back the JSON
- with open(file_path, "w") as f:
- json.dump(data, f, indent=2)
- print("✅ Changes saved.")
-
-
-
-
-def set_database_credentials(file_path,dot_path):
- ## Update URL
- update_template(file_path, dot_path, "HTTP URL", config.DB_URL)
-
- ## Update username
- update_template(file_path, dot_path, "username", config.DB_USER)
-
- ## Update username
- update_template(file_path, dot_path, "password", config.DB_PASS)
-
-
-
-
-def get_data_values():
-
- chosen_json_values = {}
-
- ##Getting API url and json values
- while True:
- api_url = input("Palun sisesta andmete API URL: ").strip()
- username = "placeholder"
- passwd = "placeholder"
-
- needs_auth = common.ask_binary_input(prompt="Kas API vajab ka kasutajaga autentimist?(jah/ei): ").strip().lower() == 'jah'
- if needs_auth:
- username=input("Sisesta kasutajanimi: ")
- passwd=input("Sisesta parool: ")
-
- json_data, api_url_correct = common.is_app_url_correct(api_url,needs_auth,username,passwd)
-
-
- ## TODO itemite eemaldamise v6malus
- if api_url_correct:
- while True:
-
- chosen_json_values.update(common.inspect_json_top_level_test(json_data))
- ## Testing
- print("Oled hetkel valinud järgmised väärtused JSON lõppväärtused: ", ", ".join(chosen_json_values))
- choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower()
-
- if choose_another == 'e':
- return chosen_json_values, api_url, username, passwd
- else:
- choice = common.ask_binary_input(prompt="\nKas soovid URL-i (m)uuta URL-i või (v)äljuda?(m/v): ",valikud=["m","v"]).strip().lower()
- if choice == 'v':
- print("Väljun programmist.")
- sys.exit()
-
-
-
-
-## TODO - textReplace part -> fix templates
def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password):
############### Choosing and modfyfing Template ##############
@@ -116,10 +30,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
#path_parts = el.split(']')
path_parts = re.split(r'(?<=\])', el)
-
-
### Select template
- ## TODO - currently has only 2 templates...
if needs_SplitJson:
template_name="splitJsonETL.json"
else:
@@ -129,7 +40,6 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
shutil.copy(f"modules/nifi/templates/{template_name}", new_pipeline_path)
-
### Processor editing
## Measurements name defining
@@ -142,48 +52,47 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
if needs_SplitJson:
## SplitJson update
split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0])
- update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path)
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path)
## EvaluateJsonPath processor setup
for key, value in data_values.items() :
path_parts = value.split(']')
- update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
measurements_name+=f"{key}=${{{key}}},"
## Database Setup
- set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties")
+ nifi_utils.set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties")
else:
## EvaluateJsonPath processor setup
for key, value in data_values.items() :
- update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
measurements_name+=f"{key}=${{{key}}},"
## Database Setup
- set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
+ nifi_utils.set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
## ReplaceText processor update - making it compatible for timeseries database (influxDB)
- update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name[:-1]) # Delete last coma
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name[:-1]) # Delete last coma
## Update API call URL
- update_template(new_pipeline_path, "flowContents.processors[1].properties", "HTTP URL", api_url)
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "HTTP URL", api_url)
## Update scheduling Periond on API Calls
- update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod)
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod)
## Add api credentials
if api_username != "placeholder":
- update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Username", api_username)
- update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Password", api_password)
-
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Username", api_username)
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Password", api_password)
+###
-###############################################
def build_pipeline():
if config.INTERACTIVE_MODE:
- data_values, api_url, api_username, api_password= get_data_values()
+ data_values, api_url, api_username, api_password= common.get_data_values()
print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"
@@ -202,7 +111,6 @@ def build_pipeline():
print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.")
-
## Pipeline Deployment
if (config.NIFI_DEPLOY):
token = nifi_utils.get_access_token()
@@ -212,4 +120,3 @@ def build_pipeline():
if choice == "jah":
token = nifi_utils.get_access_token()
nifi_utils.upload_nifi_pipeline(token, "pipelines/test_pipeline.json", "test_pipeline", username=config.NIFI_USER, password=config.NIFI_PASS, nifi_url=config.NIFI_HOST, position_x=0, position_y=0)
-
diff --git a/modules/nifi/nifi_utils.py b/modules/nifi/nifi_utils.py
index 88953d6..74cba81 100644
--- a/modules/nifi/nifi_utils.py
+++ b/modules/nifi/nifi_utils.py
@@ -1,6 +1,49 @@
-import requests
import config
+import requests
+import sys
+import json
+
+
+def update_template(file_path, dot_path, new_key, new_value):
+
+ # Step 2: Load the copied JSON
+ with open(file_path, "r") as f:
+ data = json.load(f)
+
+ # Step 3: Walk the path (e.g. 'flowContents.processors[0].properties')
+ keys = dot_path.split(".")
+ current = data
+
+ for key in keys:
+ if key.endswith("]"): # Handle list index like processors[0]
+ list_key = key[:key.index("[")]
+ index = int(key[key.index("[") + 1 : key.index("]")])
+ current = current[list_key][index]
+ else:
+ current = current[key]
+
+ # Step 4: Add or update the key
+ current[new_key] = new_value
+ print(f"🛠 Added '{new_key}': '{new_value}' at path '{dot_path}'")
+
+ # Step 5: Save back the JSON
+ with open(file_path, "w") as f:
+ json.dump(data, f, indent=2)
+ print("✅ Changes saved.")
+
+def set_database_credentials(file_path,dot_path):
+ ## Update URL
+ update_template(file_path, dot_path, "HTTP URL", config.DB_URL)
+
+ ## Update username
+ update_template(file_path, dot_path, "username", config.DB_USER)
+
+ ## Update username
+ update_template(file_path, dot_path, "password", config.DB_PASS)
+
+
+
# export TOKEN=$(curl -k -X POST https://127.0.0.1.nip.io/nifi-api/access/token\
diff --git a/modules/telegraf/templates/basic_ETL.toml b/modules/telegraf/templates/basic_ETL.toml
new file mode 100644
index 0000000..02806c2
--- /dev/null
+++ b/modules/telegraf/templates/basic_ETL.toml
@@ -0,0 +1,26 @@
+# Telegraf Configuration
+[agent]
+ interval = "10s" # Fetch data every 10 seconds
+ debug = true
+
+# Input Plugin: HTTP
+[[inputs.http]]
+ urls = [] #[ "https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72&current_weather=true" ]
+ method = "GET"
+ timeout = #"5s"
+ headers = { Content-Type = "application/json" }
+ data_format = "json"
+ json_query = #"current_weather"
+ fieldinclude = [] #["temperature", "windspeed"]
+ #tag_keys = ["temperature", "windspeed"]
+
+ # Measuremens for DB
+ name_override = "weather_metrics"
+
+
+# Output Plugin: InfluxDB
+[[outputs.influxdb]]
+ urls = [] #["http://influxdb:8086"]
+ database = "placeholder" #"telegraf_weatherData"
+ username = "TODO"
+ password = "TODO"