summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRasmus Luha <rasmus.luha@ut.ee>2025-04-29 00:25:22 +0300
committerRasmus Luha <rasmus.luha@ut.ee>2025-04-29 00:25:22 +0300
commitb2e8ec86abe8089ed5fbd1655677889b6691397f (patch)
tree0982990ad217e4537e1005dbd20c63d758facc17
parent6b3c096f6efea8855772c7faffa90f71861290bd (diff)
add cml variables, and platform option to config
-rw-r--r--config.py16
-rwxr-xr-x[-rw-r--r--]main.py48
-rw-r--r--modules/telegraf/core.py10
-rw-r--r--modules/telegraf/telegraf_utils.py39
4 files changed, 93 insertions, 20 deletions
diff --git a/config.py b/config.py
index 684349c..86c43aa 100644
--- a/config.py
+++ b/config.py
@@ -1,4 +1,5 @@
-INTERACTIVE_MODE=False
+INTERACTIVE_MODE=True
+#PLATFORM="Nifiaskdjas"
#### Nifi ####
@@ -16,15 +17,18 @@ DB_URL="http://influxdb:8086/write?db=nifi_weatherData"
DB_USER="admin"
DB_PASS="admin"
+#### Telegraf ####
+
+#TBA
+
+
+
+#### Over all ####
## Needed if Interactive mode turned off
API_URL="https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72&current_weather=true"
API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'}
API_USERNAME="Placeholder"
API_PASSWORD="Placehoder"
-PIPELINE_SCHEDULING_PERIOD="5 sec"
+PIPELINE_SCHEDULING_PERIOD="10 sec"
PIPELINE_NAME="test_pipeline"
-
-
-
-#### Telegraf ####
diff --git a/main.py b/main.py
index c2071a8..20208ba 100644..100755
--- a/main.py
+++ b/main.py
@@ -1,5 +1,7 @@
from modules.nifi import core as nifi
from modules.telegraf import core as telegraf
+import config
+import sys
AVAILABLE_PLATFORMS = {
"1": ("Nifi", nifi),
@@ -7,20 +9,50 @@ AVAILABLE_PLATFORMS = {
def list_platforms():
- print("Available platforms:")
+ print("Platovormide valik:")
for key, (name, _) in AVAILABLE_PLATFORMS.items():
print(f"{key}. {name}")
def main():
- list_platforms()
- plat_choice = input("Palun vali platform (number): ").strip()
-
- platform = AVAILABLE_PLATFORMS.get(plat_choice)
- if not platform:
- print("Ebaõnnestunud valik, sulgen rakenduse...")
- return
+ ## Kontrolli kas platform andi käsureamuutujana
+ if len(sys.argv) >= 2:
+ platform = sys.argv[1].lower()
+ if platform not in ("telegraf", "nifi"):
+ print("Kasutus: main.py [nifi|telegraf]")
+ sys.exit(1)
+ if platform == "nifi":
+ platform = AVAILABLE_PLATFORMS.get("1")
+ elif platform == "telegraf":
+ platform = AVAILABLE_PLATFORMS.get("2")
+
+ else:
+ ## Vali platvorm
+ try:
+ if config.PLATFORM.lower() == "nifi":
+ platform = AVAILABLE_PLATFORMS.get("1")
+ elif config.PLATFORM.lower() == "telegraf":
+ platform = AVAILABLE_PLATFORMS.get("2")
+ else:
+ raise Exception("Ebaõnnestunud platvormivalik konfiguratsioonifailis...")
+ except Exception as e:
+ ## ära prindi errorit kui platvormi pole defineeritud
+ if isinstance(e, AttributeError):
+ pass
+ else:
+ print(f"Error occurred: {e}")
+
+ list_platforms()
+ plat_choice = input("Palun vali platform (number): ").strip()
+
+ platform = AVAILABLE_PLATFORMS.get(plat_choice)
+ if not platform:
+ print("Ebaõnnestunud valik, sulgen rakenduse...")
+ return
+
+
+ ## Genereeri andmekonveier
name, module = platform
module.introduction()
module.build_pipeline()
diff --git a/modules/telegraf/core.py b/modules/telegraf/core.py
index 3febf7b..ffef7d5 100644
--- a/modules/telegraf/core.py
+++ b/modules/telegraf/core.py
@@ -25,10 +25,12 @@ def introduction():
###########################
-def modify_template(new_pipeline_path, api_url):
- #telegraf_utils.modify_output("templates/basic_ETL.toml", "urls", "testingIfWorks")
+def modify_template(new_pipeline_path, api_url, schedulingPeriod):
- ## Api Url editing
+ ## Pipeline intervall
+ telegraf_utils.modify_agent(new_pipeline_path,"interval", schedulingPeriod)
+
+ ## API url
telegraf_utils.modify_input(new_pipeline_path,"urls", [api_url])
@@ -69,7 +71,7 @@ def build_pipeline():
shutil.copy(f"modules/telegraf/templates/{template_name}", new_pipeline_path)
- modify_template(new_pipeline_path, api_url)
+ modify_template(new_pipeline_path, api_url, schedulingPeriod)
#telegraf.modify_output("templates/basic_ETL.toml", "urls", "testingIfWorks")
diff --git a/modules/telegraf/telegraf_utils.py b/modules/telegraf/telegraf_utils.py
index d6ed376..ffaddfd 100644
--- a/modules/telegraf/telegraf_utils.py
+++ b/modules/telegraf/telegraf_utils.py
@@ -6,6 +6,43 @@ def modify_input(new_pipeline_path, key, value):
if key in pluggin:
+ #print(f"Before: {key} = {pluggin[key]}")
+ pluggin[key] = value
+ #print(f"After: {key} = {pluggin[key]}")
+
+
+ with open(new_pipeline_path, "w") as f:
+ toml.dump(data, f)
+
+
+##modify_input("templates/basic_ETL.toml", "test_pipers.toml, "urls", ["stillTesting"])
+
+
+## TODO
+def modify_agent(new_pipeline_path, key, value):
+ data = toml.load(new_pipeline_path)
+ pluggin = data["agent"]
+
+ if key in pluggin:
+
+ #print(f"Before: {key} = {pluggin[key]}")
+ pluggin[key] = value
+ #print(f"After: {key} = {pluggin[key]}")
+
+
+ with open(new_pipeline_path, "w") as f:
+ toml.dump(data, f)
+
+
+
+
+## TODO
+def modify_output(new_pipeline_path, key, value):
+ data = toml.load(new_pipeline_path)
+ pluggin = data["outputs"]["influxdb"][0]
+
+ if key in pluggin:
+
print(f"Before: {key} = {pluggin[key]}")
pluggin[key] = value
print(f"After: {key} = {pluggin[key]}")
@@ -13,5 +50,3 @@ def modify_input(new_pipeline_path, key, value):
with open(new_pipeline_path, "w") as f:
toml.dump(data, f)
-
-#modify_input("templates/basic_ETL.toml", "test_pipers.toml, "urls", ["stillTesting"])