diff options
author | Rasmus Luha <rasmus.luha@ut.ee> | 2025-05-05 23:48:10 +0300 |
---|---|---|
committer | Rasmus Luha <rasmus.luha@ut.ee> | 2025-05-05 23:48:10 +0300 |
commit | 3c58b8e8f6f888473b37b91cc604b22647bec686 (patch) | |
tree | 0404142e3e564597937591faed2f5d3342810802 | |
parent | ab4d535e4caf1d5fc1ed8097407b13bf8a0d98af (diff) |
telegraf advanced tempalte working
-rw-r--r-- | config.py | 17 | ||||
-rwxr-xr-x | main.py | 5 | ||||
-rw-r--r-- | modules/telegraf/core.py | 54 | ||||
-rw-r--r-- | modules/telegraf/templates/advanced_ETL.toml | 33 | ||||
-rw-r--r-- | modules/telegraf/templates/basic_ETL.toml | 2 |
5 files changed, 69 insertions, 42 deletions
@@ -1,5 +1,5 @@ -INTERACTIVE_MODE=False -PLATFORM="Nifi" +INTERACTIVE_MODE=True +PLATFORM="Telegraf" #### Nifi #### @@ -10,13 +10,12 @@ NIFI_DEPLOY=True NIFI_USER="lab08nifiuser" NIFI_PASS="tartunifi2023" -MEASUREMENT_NAME="Ateena_ilm" +MEASUREMENT_NAME="testers_measurementersNames" ## Database -#DB_URL="http://influxdb:8086/write?db=nifi_weatherData" DB_URL="http://influxdb:8086" #DB_NAME="nifi_weatherData" -DB_NAME="nifi_weatherData" +DB_NAME="telegraf_weatherData" DB_USER="admin" DB_PASS="admin" @@ -31,7 +30,7 @@ DB_PASS="admin" ## Needed if Interactive mode turned off API_URL="https://api.open-meteo.com/v1/forecast?latitude=37.9838&longitude=23.7275¤t_weather=true" API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'} -API_USERNAME="Placeholder" -API_PASSWORD="Placehoder" -PIPELINE_SCHEDULING_PERIOD="10" -PIPELINE_NAME="Ateena" +API_USERNAME="rasmus.luha" +API_PASSWORD="rasmusPass" +PIPELINE_SCHEDULING_PERIOD="15" +PIPELINE_NAME="TestPiper" @@ -5,7 +5,8 @@ import sys AVAILABLE_PLATFORMS = { "1": ("Nifi", nifi), - "2": ("Telegraf", telegraf)} + "2": ("Telegraf", telegraf) + } def list_platforms(): @@ -16,7 +17,7 @@ def list_platforms(): def main(): - ## Kontrolli kas platform andi käsureamuutujana + ## Kontrolli kas platform anti käsureamuutujana if len(sys.argv) >= 2: platform = sys.argv[1].lower() if platform not in ("telegraf", "nifi"): diff --git a/modules/telegraf/core.py b/modules/telegraf/core.py index ded2d1d..347bdba 100644 --- a/modules/telegraf/core.py +++ b/modules/telegraf/core.py @@ -25,7 +25,7 @@ def introduction(): ########################### -def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name): +def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name, api_username, api_password, template_name): ## Pipeline interval telegraf_utils.modify_agent(new_pipeline_path,"interval", schedulingPeriod) @@ -36,15 +36,36 @@ def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, m ### Pluggins fields=[] json_query = "" - for key, value in data_values.items(): - fields.append(key) - parts = value.rsplit('.', 2) - json_query = '.'.join(parts[:-1])[1:] # Get the json path till last item (second last dot(.)) + if template_name == "basic_ETL.toml": - telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query) - telegraf_utils.modify_input(new_pipeline_path,"fieldinclude", fields) + for key, value in data_values.items(): + fields.append(key) + parts = value.rsplit('.', 2) + json_query = '.'.join(parts[:-1])[1:] # Get the json path till last item (second last dot(.)) + + + telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query) + telegraf_utils.modify_input(new_pipeline_path,"fieldinclude", fields) + + elif template_name == "advanced_ETL.toml": + + for key, value in data_values.items(): + + parts = value.split(']', 1) + json_query = parts[0].split("[")[0][1:] + fields.append(parts[1][1:]) + + + + telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query) + telegraf_utils.modify_input(new_pipeline_path,"json_string_fields", fields) + + + + + ## Measurement telegraf_utils.modify_input(new_pipeline_path,"name_override", measurement_name) ## Database @@ -54,6 +75,13 @@ def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, m telegraf_utils.modify_output(new_pipeline_path, "password", config.DB_PASS) + ## If authenctication needed + if api_username and api_username.lower() != "placeholder": + print("Added username ") + telegraf_utils.modify_input(new_pipeline_path,"username", api_username) + print("Added password") + telegraf_utils.modify_input(new_pipeline_path,"password", api_password) + @@ -62,8 +90,8 @@ def build_pipeline(): data_values, api_url, api_username, api_password= common.get_data_values() print("\nKui tihti peaks andmekonveier jooksma? (sekundites)") - schedulingPeriod = str(common.ask_digit_input(86400))+ "sec" - measurement_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): ")) + schedulingPeriod = str(common.ask_digit_input(86400))+ "s" + measurement_name = str(input("Palun sisesta andmebaasi (influxDB) jaoks vajalik 'measurement' nimi: ")) new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".toml" ## TODO @@ -81,13 +109,17 @@ def build_pipeline(): ### Select template ##TODO - template_name="basic_ETL.toml" + #template_name="basic_ETL.toml" + if (api_username and api_username.lower() != "placeholder") and (api_password and api_password.lower() != "placeholder"): + template_name="advanced_ETL.toml" + else: + template_name="basic_ETL.toml" new_pipeline_path = f"pipelines/{new_pipeline_name}" shutil.copy(f"modules/telegraf/templates/{template_name}", new_pipeline_path) - modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name) + modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name, api_username, api_password, template_name) diff --git a/modules/telegraf/templates/advanced_ETL.toml b/modules/telegraf/templates/advanced_ETL.toml index de6b8e1..217fb8f 100644 --- a/modules/telegraf/templates/advanced_ETL.toml +++ b/modules/telegraf/templates/advanced_ETL.toml @@ -1,39 +1,34 @@ +# Telegraf Configuration [agent] + interval = "10s" # Fetch data every 10 seconds debug = true -# interval = "3600s" - #INPUT: fetching data from delta api [[inputs.http]] - name_override = "telegraafi_deltaEnergy" - urls = ["https://delta.iot.cs.ut.ee/measurement/measurements?source=780&dateFrom=2025-02-19T00:00:00Z&dateTo=2025-02-19T23:59:59Z&pageSize=200&type=KogEN"] - + urls = [] method = "GET" + timeout = "5s" + headers = { Content-Type = "application/json" } + data_format = "json" # Authentication username = "rasmus.luha" - password = + password = "Placeholder" # Response format - data_format = "json" + #data_format = "json" # Specify JSON field - json_query = "measurements" - - # Field for energy value from "measurements" - json_string_fields = ["KogEN.T.value"] - - # Timestamp configuration - needed for the database - json_time_key = "time" - json_time_format = "2006-01-02T15:04:05Z" - + json_query = "placeholder" + json_string_fields = [] + name_override = "Placeholder" #"weather_metrics" # OUTPUT: Write data to InfluxDB [[outputs.influxdb]] urls = ["http://influxdb:8086"] - database = "telegraf_deltaEnergy" - username = "admin" - password = "admin" + database = "placeholder "#"telegraf_deltaEnergy" + username = "TODO" + password = "TODO" diff --git a/modules/telegraf/templates/basic_ETL.toml b/modules/telegraf/templates/basic_ETL.toml index 82b1728..5af3c95 100644 --- a/modules/telegraf/templates/basic_ETL.toml +++ b/modules/telegraf/templates/basic_ETL.toml @@ -15,7 +15,7 @@ #tag_keys = ["temperature", "windspeed"] # Measuremens for DB - name_override = "plcaeholder" #"weather_metrics" + name_override = "Placeholder" #"weather_metrics" # Output Plugin: InfluxDB |