diff options
author | Rasmus Luha <rasmus.luha@ut.ee> | 2025-04-29 00:25:22 +0300 |
---|---|---|
committer | Rasmus Luha <rasmus.luha@ut.ee> | 2025-04-29 00:25:22 +0300 |
commit | b2e8ec86abe8089ed5fbd1655677889b6691397f (patch) | |
tree | 0982990ad217e4537e1005dbd20c63d758facc17 | |
parent | 6b3c096f6efea8855772c7faffa90f71861290bd (diff) |
add cml variables, and platform option to config
-rw-r--r-- | config.py | 16 | ||||
-rwxr-xr-x[-rw-r--r--] | main.py | 48 | ||||
-rw-r--r-- | modules/telegraf/core.py | 10 | ||||
-rw-r--r-- | modules/telegraf/telegraf_utils.py | 39 |
4 files changed, 93 insertions, 20 deletions
@@ -1,4 +1,5 @@ -INTERACTIVE_MODE=False +INTERACTIVE_MODE=True +#PLATFORM="Nifiaskdjas" #### Nifi #### @@ -16,15 +17,18 @@ DB_URL="http://influxdb:8086/write?db=nifi_weatherData" DB_USER="admin" DB_PASS="admin" +#### Telegraf #### + +#TBA + + + +#### Over all #### ## Needed if Interactive mode turned off API_URL="https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72¤t_weather=true" API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'} API_USERNAME="Placeholder" API_PASSWORD="Placehoder" -PIPELINE_SCHEDULING_PERIOD="5 sec" +PIPELINE_SCHEDULING_PERIOD="10 sec" PIPELINE_NAME="test_pipeline" - - - -#### Telegraf #### @@ -1,5 +1,7 @@ from modules.nifi import core as nifi from modules.telegraf import core as telegraf +import config +import sys AVAILABLE_PLATFORMS = { "1": ("Nifi", nifi), @@ -7,20 +9,50 @@ AVAILABLE_PLATFORMS = { def list_platforms(): - print("Available platforms:") + print("Platovormide valik:") for key, (name, _) in AVAILABLE_PLATFORMS.items(): print(f"{key}. {name}") def main(): - list_platforms() - plat_choice = input("Palun vali platform (number): ").strip() - - platform = AVAILABLE_PLATFORMS.get(plat_choice) - if not platform: - print("Ebaõnnestunud valik, sulgen rakenduse...") - return + ## Kontrolli kas platform andi käsureamuutujana + if len(sys.argv) >= 2: + platform = sys.argv[1].lower() + if platform not in ("telegraf", "nifi"): + print("Kasutus: main.py [nifi|telegraf]") + sys.exit(1) + if platform == "nifi": + platform = AVAILABLE_PLATFORMS.get("1") + elif platform == "telegraf": + platform = AVAILABLE_PLATFORMS.get("2") + + else: + ## Vali platvorm + try: + if config.PLATFORM.lower() == "nifi": + platform = AVAILABLE_PLATFORMS.get("1") + elif config.PLATFORM.lower() == "telegraf": + platform = AVAILABLE_PLATFORMS.get("2") + else: + raise Exception("Ebaõnnestunud platvormivalik konfiguratsioonifailis...") + except Exception as e: + ## ära prindi errorit kui platvormi pole defineeritud + if isinstance(e, AttributeError): + pass + else: + print(f"Error occurred: {e}") + + list_platforms() + plat_choice = input("Palun vali platform (number): ").strip() + + platform = AVAILABLE_PLATFORMS.get(plat_choice) + if not platform: + print("Ebaõnnestunud valik, sulgen rakenduse...") + return + + + ## Genereeri andmekonveier name, module = platform module.introduction() module.build_pipeline() diff --git a/modules/telegraf/core.py b/modules/telegraf/core.py index 3febf7b..ffef7d5 100644 --- a/modules/telegraf/core.py +++ b/modules/telegraf/core.py @@ -25,10 +25,12 @@ def introduction(): ########################### -def modify_template(new_pipeline_path, api_url): - #telegraf_utils.modify_output("templates/basic_ETL.toml", "urls", "testingIfWorks") +def modify_template(new_pipeline_path, api_url, schedulingPeriod): - ## Api Url editing + ## Pipeline intervall + telegraf_utils.modify_agent(new_pipeline_path,"interval", schedulingPeriod) + + ## API url telegraf_utils.modify_input(new_pipeline_path,"urls", [api_url]) @@ -69,7 +71,7 @@ def build_pipeline(): shutil.copy(f"modules/telegraf/templates/{template_name}", new_pipeline_path) - modify_template(new_pipeline_path, api_url) + modify_template(new_pipeline_path, api_url, schedulingPeriod) #telegraf.modify_output("templates/basic_ETL.toml", "urls", "testingIfWorks") diff --git a/modules/telegraf/telegraf_utils.py b/modules/telegraf/telegraf_utils.py index d6ed376..ffaddfd 100644 --- a/modules/telegraf/telegraf_utils.py +++ b/modules/telegraf/telegraf_utils.py @@ -6,6 +6,43 @@ def modify_input(new_pipeline_path, key, value): if key in pluggin: + #print(f"Before: {key} = {pluggin[key]}") + pluggin[key] = value + #print(f"After: {key} = {pluggin[key]}") + + + with open(new_pipeline_path, "w") as f: + toml.dump(data, f) + + +##modify_input("templates/basic_ETL.toml", "test_pipers.toml, "urls", ["stillTesting"]) + + +## TODO +def modify_agent(new_pipeline_path, key, value): + data = toml.load(new_pipeline_path) + pluggin = data["agent"] + + if key in pluggin: + + #print(f"Before: {key} = {pluggin[key]}") + pluggin[key] = value + #print(f"After: {key} = {pluggin[key]}") + + + with open(new_pipeline_path, "w") as f: + toml.dump(data, f) + + + + +## TODO +def modify_output(new_pipeline_path, key, value): + data = toml.load(new_pipeline_path) + pluggin = data["outputs"]["influxdb"][0] + + if key in pluggin: + print(f"Before: {key} = {pluggin[key]}") pluggin[key] = value print(f"After: {key} = {pluggin[key]}") @@ -13,5 +50,3 @@ def modify_input(new_pipeline_path, key, value): with open(new_pipeline_path, "w") as f: toml.dump(data, f) - -#modify_input("templates/basic_ETL.toml", "test_pipers.toml, "urls", ["stillTesting"]) |