diff options
author | Rasmus Luha <rasmus.luha@ut.ee> | 2025-04-29 23:49:58 +0300 |
---|---|---|
committer | Rasmus Luha <rasmus.luha@ut.ee> | 2025-04-29 23:49:58 +0300 |
commit | 737f498e1f402a5a41068a37ab20f34cabd4b052 (patch) | |
tree | 2b97e22aa8e3293e4c66e69c5ba7d7bcc5b0a5fd | |
parent | b2e8ec86abe8089ed5fbd1655677889b6691397f (diff) |
telegraf basicETL working
-rw-r--r-- | config.py | 13 | ||||
-rw-r--r-- | modules/nifi/core.py | 4 | ||||
-rw-r--r-- | modules/nifi/nifi_utils.py | 3 | ||||
-rw-r--r-- | modules/telegraf/core.py | 29 | ||||
-rw-r--r-- | modules/telegraf/telegraf_utils.py | 4 | ||||
-rw-r--r-- | modules/telegraf/templates/advanced_ETL.toml | 39 | ||||
-rw-r--r-- | modules/telegraf/templates/basic_ETL.toml | 2 |
7 files changed, 76 insertions, 18 deletions
@@ -1,5 +1,5 @@ -INTERACTIVE_MODE=True -#PLATFORM="Nifiaskdjas" +INTERACTIVE_MODE=False +#PLATFORM="Nifi" #### Nifi #### @@ -10,10 +10,13 @@ NIFI_DEPLOY=False NIFI_USER="lab08nifiuser" NIFI_PASS="tartunifi2023" -NIFI_MEASUREMENT_NAME="test_measurementName" +MEASUREMENT_NAME="test_measurementName" ## Database -DB_URL="http://influxdb:8086/write?db=nifi_weatherData" +#DB_URL="http://influxdb:8086/write?db=nifi_weatherData" +DB_URL="http://influxdb:8086" +#DB_NAME="nifi_weatherData" +DB_NAME="telegraf_weatherData" DB_USER="admin" DB_PASS="admin" @@ -30,5 +33,5 @@ API_URL="https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72&c API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'} API_USERNAME="Placeholder" API_PASSWORD="Placehoder" -PIPELINE_SCHEDULING_PERIOD="10 sec" +PIPELINE_SCHEDULING_PERIOD="10" PIPELINE_NAME="test_pipeline" diff --git a/modules/nifi/core.py b/modules/nifi/core.py index 118ee3a..bcf50c0 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -46,7 +46,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_ if config.INTERACTIVE_MODE: measurements_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): ")) else: - measurements_name = config.NIFI_MEASUREMENT_NAME+" " + measurements_name = config.MEASUREMENT_NAME+" " if needs_SplitJson: @@ -102,7 +102,7 @@ def build_pipeline(): else: api_url = config.API_URL data_values = config.API_FIELDS - schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD + schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD+"sec" new_pipeline_name = config.PIPELINE_NAME+".json" api_username = config.API_USERNAME api_password = config.API_PASSWORD diff --git a/modules/nifi/nifi_utils.py b/modules/nifi/nifi_utils.py index 74cba81..1871940 100644 --- a/modules/nifi/nifi_utils.py +++ b/modules/nifi/nifi_utils.py @@ -34,7 +34,8 @@ def update_template(file_path, dot_path, new_key, new_value): def set_database_credentials(file_path,dot_path): ## Update URL - update_template(file_path, dot_path, "HTTP URL", config.DB_URL) + db_full_url=config.DB_URL+"/write?db="+config.DB_NAME + update_template(file_path, dot_path, "HTTP URL", db_full_url) ## Update username update_template(file_path, dot_path, "username", config.DB_USER) diff --git a/modules/telegraf/core.py b/modules/telegraf/core.py index ffef7d5..ded2d1d 100644 --- a/modules/telegraf/core.py +++ b/modules/telegraf/core.py @@ -25,20 +25,35 @@ def introduction(): ########################### -def modify_template(new_pipeline_path, api_url, schedulingPeriod): +def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name): - ## Pipeline intervall + ## Pipeline interval telegraf_utils.modify_agent(new_pipeline_path,"interval", schedulingPeriod) ## API url telegraf_utils.modify_input(new_pipeline_path,"urls", [api_url]) + ### Pluggins + fields=[] + json_query = "" + for key, value in data_values.items(): + fields.append(key) + parts = value.rsplit('.', 2) + json_query = '.'.join(parts[:-1])[1:] # Get the json path till last item (second last dot(.)) + telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query) + telegraf_utils.modify_input(new_pipeline_path,"fieldinclude", fields) + telegraf_utils.modify_input(new_pipeline_path,"name_override", measurement_name) + + ## Database + telegraf_utils.modify_output(new_pipeline_path, "urls", [config.DB_URL]) + telegraf_utils.modify_output(new_pipeline_path, "database", config.DB_NAME) + telegraf_utils.modify_output(new_pipeline_path, "username", config.DB_USER) + telegraf_utils.modify_output(new_pipeline_path, "password", config.DB_PASS) -########################### @@ -48,17 +63,18 @@ def build_pipeline(): print("\nKui tihti peaks andmekonveier jooksma? (sekundites)") schedulingPeriod = str(common.ask_digit_input(86400))+ "sec" - + measurement_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): ")) new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".toml" ## TODO else: api_url = config.API_URL data_values = config.API_FIELDS - schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD + schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD+"s" new_pipeline_name = config.PIPELINE_NAME+".toml" api_username = config.API_USERNAME api_password = config.API_PASSWORD + measurement_name = config.MEASUREMENT_NAME @@ -71,8 +87,7 @@ def build_pipeline(): shutil.copy(f"modules/telegraf/templates/{template_name}", new_pipeline_path) - modify_template(new_pipeline_path, api_url, schedulingPeriod) - #telegraf.modify_output("templates/basic_ETL.toml", "urls", "testingIfWorks") + modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name) diff --git a/modules/telegraf/telegraf_utils.py b/modules/telegraf/telegraf_utils.py index ffaddfd..62bc4ea 100644 --- a/modules/telegraf/telegraf_utils.py +++ b/modules/telegraf/telegraf_utils.py @@ -43,9 +43,9 @@ def modify_output(new_pipeline_path, key, value): if key in pluggin: - print(f"Before: {key} = {pluggin[key]}") + #print(f"Before: {key} = {pluggin[key]}") pluggin[key] = value - print(f"After: {key} = {pluggin[key]}") + #print(f"After: {key} = {pluggin[key]}") with open(new_pipeline_path, "w") as f: diff --git a/modules/telegraf/templates/advanced_ETL.toml b/modules/telegraf/templates/advanced_ETL.toml new file mode 100644 index 0000000..de6b8e1 --- /dev/null +++ b/modules/telegraf/templates/advanced_ETL.toml @@ -0,0 +1,39 @@ +[agent] + debug = true +# interval = "3600s" + + +#INPUT: fetching data from delta api +[[inputs.http]] + name_override = "telegraafi_deltaEnergy" + urls = ["https://delta.iot.cs.ut.ee/measurement/measurements?source=780&dateFrom=2025-02-19T00:00:00Z&dateTo=2025-02-19T23:59:59Z&pageSize=200&type=KogEN"] + + method = "GET" + + # Authentication + username = "rasmus.luha" + password = + + # Response format + data_format = "json" + + # Specify JSON field + json_query = "measurements" + + # Field for energy value from "measurements" + json_string_fields = ["KogEN.T.value"] + + # Timestamp configuration - needed for the database + json_time_key = "time" + json_time_format = "2006-01-02T15:04:05Z" + + + + +# OUTPUT: Write data to InfluxDB +[[outputs.influxdb]] + urls = ["http://influxdb:8086"] + database = "telegraf_deltaEnergy" + username = "admin" + password = "admin" + diff --git a/modules/telegraf/templates/basic_ETL.toml b/modules/telegraf/templates/basic_ETL.toml index 155f570..82b1728 100644 --- a/modules/telegraf/templates/basic_ETL.toml +++ b/modules/telegraf/templates/basic_ETL.toml @@ -7,7 +7,7 @@ [[inputs.http]] urls = [] #[ "https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72¤t_weather=true" ] method = "GET" - timeout = "plcaeholder" #"5s" + timeout = "5s" headers = { Content-Type = "application/json" } data_format = "json" json_query = "plcaeholder" #"current_weather" |