summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRasmus Luha <rasmus.luha@ut.ee>2025-05-05 23:48:10 +0300
committerRasmus Luha <rasmus.luha@ut.ee>2025-05-05 23:48:10 +0300
commit3c58b8e8f6f888473b37b91cc604b22647bec686 (patch)
tree0404142e3e564597937591faed2f5d3342810802
parentab4d535e4caf1d5fc1ed8097407b13bf8a0d98af (diff)
telegraf advanced tempalte working
-rw-r--r--config.py17
-rwxr-xr-xmain.py5
-rw-r--r--modules/telegraf/core.py54
-rw-r--r--modules/telegraf/templates/advanced_ETL.toml33
-rw-r--r--modules/telegraf/templates/basic_ETL.toml2
5 files changed, 69 insertions, 42 deletions
diff --git a/config.py b/config.py
index e1a34e1..4bd495a 100644
--- a/config.py
+++ b/config.py
@@ -1,5 +1,5 @@
-INTERACTIVE_MODE=False
-PLATFORM="Nifi"
+INTERACTIVE_MODE=True
+PLATFORM="Telegraf"
#### Nifi ####
@@ -10,13 +10,12 @@ NIFI_DEPLOY=True
NIFI_USER="lab08nifiuser"
NIFI_PASS="tartunifi2023"
-MEASUREMENT_NAME="Ateena_ilm"
+MEASUREMENT_NAME="testers_measurementersNames"
## Database
-#DB_URL="http://influxdb:8086/write?db=nifi_weatherData"
DB_URL="http://influxdb:8086"
#DB_NAME="nifi_weatherData"
-DB_NAME="nifi_weatherData"
+DB_NAME="telegraf_weatherData"
DB_USER="admin"
DB_PASS="admin"
@@ -31,7 +30,7 @@ DB_PASS="admin"
## Needed if Interactive mode turned off
API_URL="https://api.open-meteo.com/v1/forecast?latitude=37.9838&longitude=23.7275&current_weather=true"
API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'}
-API_USERNAME="Placeholder"
-API_PASSWORD="Placehoder"
-PIPELINE_SCHEDULING_PERIOD="10"
-PIPELINE_NAME="Ateena"
+API_USERNAME="rasmus.luha"
+API_PASSWORD="rasmusPass"
+PIPELINE_SCHEDULING_PERIOD="15"
+PIPELINE_NAME="TestPiper"
diff --git a/main.py b/main.py
index 20208ba..e4f72f3 100755
--- a/main.py
+++ b/main.py
@@ -5,7 +5,8 @@ import sys
AVAILABLE_PLATFORMS = {
"1": ("Nifi", nifi),
- "2": ("Telegraf", telegraf)}
+ "2": ("Telegraf", telegraf)
+ }
def list_platforms():
@@ -16,7 +17,7 @@ def list_platforms():
def main():
- ## Kontrolli kas platform andi käsureamuutujana
+ ## Kontrolli kas platform anti käsureamuutujana
if len(sys.argv) >= 2:
platform = sys.argv[1].lower()
if platform not in ("telegraf", "nifi"):
diff --git a/modules/telegraf/core.py b/modules/telegraf/core.py
index ded2d1d..347bdba 100644
--- a/modules/telegraf/core.py
+++ b/modules/telegraf/core.py
@@ -25,7 +25,7 @@ def introduction():
###########################
-def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name):
+def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name, api_username, api_password, template_name):
## Pipeline interval
telegraf_utils.modify_agent(new_pipeline_path,"interval", schedulingPeriod)
@@ -36,15 +36,36 @@ def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, m
### Pluggins
fields=[]
json_query = ""
- for key, value in data_values.items():
- fields.append(key)
- parts = value.rsplit('.', 2)
- json_query = '.'.join(parts[:-1])[1:] # Get the json path till last item (second last dot(.))
+ if template_name == "basic_ETL.toml":
- telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query)
- telegraf_utils.modify_input(new_pipeline_path,"fieldinclude", fields)
+ for key, value in data_values.items():
+ fields.append(key)
+ parts = value.rsplit('.', 2)
+ json_query = '.'.join(parts[:-1])[1:] # Get the json path till last item (second last dot(.))
+
+
+ telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query)
+ telegraf_utils.modify_input(new_pipeline_path,"fieldinclude", fields)
+
+ elif template_name == "advanced_ETL.toml":
+
+ for key, value in data_values.items():
+
+ parts = value.split(']', 1)
+ json_query = parts[0].split("[")[0][1:]
+ fields.append(parts[1][1:])
+
+
+
+ telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query)
+ telegraf_utils.modify_input(new_pipeline_path,"json_string_fields", fields)
+
+
+
+
+ ## Measurement
telegraf_utils.modify_input(new_pipeline_path,"name_override", measurement_name)
## Database
@@ -54,6 +75,13 @@ def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, m
telegraf_utils.modify_output(new_pipeline_path, "password", config.DB_PASS)
+ ## If authenctication needed
+ if api_username and api_username.lower() != "placeholder":
+ print("Added username ")
+ telegraf_utils.modify_input(new_pipeline_path,"username", api_username)
+ print("Added password")
+ telegraf_utils.modify_input(new_pipeline_path,"password", api_password)
+
@@ -62,8 +90,8 @@ def build_pipeline():
data_values, api_url, api_username, api_password= common.get_data_values()
print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
- schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"
- measurement_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): "))
+ schedulingPeriod = str(common.ask_digit_input(86400))+ "s"
+ measurement_name = str(input("Palun sisesta andmebaasi (influxDB) jaoks vajalik 'measurement' nimi: "))
new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".toml"
## TODO
@@ -81,13 +109,17 @@ def build_pipeline():
### Select template
##TODO
- template_name="basic_ETL.toml"
+ #template_name="basic_ETL.toml"
+ if (api_username and api_username.lower() != "placeholder") and (api_password and api_password.lower() != "placeholder"):
+ template_name="advanced_ETL.toml"
+ else:
+ template_name="basic_ETL.toml"
new_pipeline_path = f"pipelines/{new_pipeline_name}"
shutil.copy(f"modules/telegraf/templates/{template_name}", new_pipeline_path)
- modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name)
+ modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name, api_username, api_password, template_name)
diff --git a/modules/telegraf/templates/advanced_ETL.toml b/modules/telegraf/templates/advanced_ETL.toml
index de6b8e1..217fb8f 100644
--- a/modules/telegraf/templates/advanced_ETL.toml
+++ b/modules/telegraf/templates/advanced_ETL.toml
@@ -1,39 +1,34 @@
+# Telegraf Configuration
[agent]
+ interval = "10s" # Fetch data every 10 seconds
debug = true
-# interval = "3600s"
-
#INPUT: fetching data from delta api
[[inputs.http]]
- name_override = "telegraafi_deltaEnergy"
- urls = ["https://delta.iot.cs.ut.ee/measurement/measurements?source=780&dateFrom=2025-02-19T00:00:00Z&dateTo=2025-02-19T23:59:59Z&pageSize=200&type=KogEN"]
-
+ urls = []
method = "GET"
+ timeout = "5s"
+ headers = { Content-Type = "application/json" }
+ data_format = "json"
# Authentication
username = "rasmus.luha"
- password =
+ password = "Placeholder"
# Response format
- data_format = "json"
+ #data_format = "json"
# Specify JSON field
- json_query = "measurements"
-
- # Field for energy value from "measurements"
- json_string_fields = ["KogEN.T.value"]
-
- # Timestamp configuration - needed for the database
- json_time_key = "time"
- json_time_format = "2006-01-02T15:04:05Z"
-
+ json_query = "placeholder"
+ json_string_fields = []
+ name_override = "Placeholder" #"weather_metrics"
# OUTPUT: Write data to InfluxDB
[[outputs.influxdb]]
urls = ["http://influxdb:8086"]
- database = "telegraf_deltaEnergy"
- username = "admin"
- password = "admin"
+ database = "placeholder "#"telegraf_deltaEnergy"
+ username = "TODO"
+ password = "TODO"
diff --git a/modules/telegraf/templates/basic_ETL.toml b/modules/telegraf/templates/basic_ETL.toml
index 82b1728..5af3c95 100644
--- a/modules/telegraf/templates/basic_ETL.toml
+++ b/modules/telegraf/templates/basic_ETL.toml
@@ -15,7 +15,7 @@
#tag_keys = ["temperature", "windspeed"]
# Measuremens for DB
- name_override = "plcaeholder" #"weather_metrics"
+ name_override = "Placeholder" #"weather_metrics"
# Output Plugin: InfluxDB