summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRasmus Luha <rasmus.luha@ut.ee>2025-04-29 23:49:58 +0300
committerRasmus Luha <rasmus.luha@ut.ee>2025-04-29 23:49:58 +0300
commit737f498e1f402a5a41068a37ab20f34cabd4b052 (patch)
tree2b97e22aa8e3293e4c66e69c5ba7d7bcc5b0a5fd
parentb2e8ec86abe8089ed5fbd1655677889b6691397f (diff)
telegraf basicETL working
-rw-r--r--config.py13
-rw-r--r--modules/nifi/core.py4
-rw-r--r--modules/nifi/nifi_utils.py3
-rw-r--r--modules/telegraf/core.py29
-rw-r--r--modules/telegraf/telegraf_utils.py4
-rw-r--r--modules/telegraf/templates/advanced_ETL.toml39
-rw-r--r--modules/telegraf/templates/basic_ETL.toml2
7 files changed, 76 insertions, 18 deletions
diff --git a/config.py b/config.py
index 86c43aa..a2911a5 100644
--- a/config.py
+++ b/config.py
@@ -1,5 +1,5 @@
-INTERACTIVE_MODE=True
-#PLATFORM="Nifiaskdjas"
+INTERACTIVE_MODE=False
+#PLATFORM="Nifi"
#### Nifi ####
@@ -10,10 +10,13 @@ NIFI_DEPLOY=False
NIFI_USER="lab08nifiuser"
NIFI_PASS="tartunifi2023"
-NIFI_MEASUREMENT_NAME="test_measurementName"
+MEASUREMENT_NAME="test_measurementName"
## Database
-DB_URL="http://influxdb:8086/write?db=nifi_weatherData"
+#DB_URL="http://influxdb:8086/write?db=nifi_weatherData"
+DB_URL="http://influxdb:8086"
+#DB_NAME="nifi_weatherData"
+DB_NAME="telegraf_weatherData"
DB_USER="admin"
DB_PASS="admin"
@@ -30,5 +33,5 @@ API_URL="https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72&c
API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'}
API_USERNAME="Placeholder"
API_PASSWORD="Placehoder"
-PIPELINE_SCHEDULING_PERIOD="10 sec"
+PIPELINE_SCHEDULING_PERIOD="10"
PIPELINE_NAME="test_pipeline"
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index 118ee3a..bcf50c0 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -46,7 +46,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
if config.INTERACTIVE_MODE:
measurements_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): "))
else:
- measurements_name = config.NIFI_MEASUREMENT_NAME+" "
+ measurements_name = config.MEASUREMENT_NAME+" "
if needs_SplitJson:
@@ -102,7 +102,7 @@ def build_pipeline():
else:
api_url = config.API_URL
data_values = config.API_FIELDS
- schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD
+ schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD+"sec"
new_pipeline_name = config.PIPELINE_NAME+".json"
api_username = config.API_USERNAME
api_password = config.API_PASSWORD
diff --git a/modules/nifi/nifi_utils.py b/modules/nifi/nifi_utils.py
index 74cba81..1871940 100644
--- a/modules/nifi/nifi_utils.py
+++ b/modules/nifi/nifi_utils.py
@@ -34,7 +34,8 @@ def update_template(file_path, dot_path, new_key, new_value):
def set_database_credentials(file_path,dot_path):
## Update URL
- update_template(file_path, dot_path, "HTTP URL", config.DB_URL)
+ db_full_url=config.DB_URL+"/write?db="+config.DB_NAME
+ update_template(file_path, dot_path, "HTTP URL", db_full_url)
## Update username
update_template(file_path, dot_path, "username", config.DB_USER)
diff --git a/modules/telegraf/core.py b/modules/telegraf/core.py
index ffef7d5..ded2d1d 100644
--- a/modules/telegraf/core.py
+++ b/modules/telegraf/core.py
@@ -25,20 +25,35 @@ def introduction():
###########################
-def modify_template(new_pipeline_path, api_url, schedulingPeriod):
+def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name):
- ## Pipeline intervall
+ ## Pipeline interval
telegraf_utils.modify_agent(new_pipeline_path,"interval", schedulingPeriod)
## API url
telegraf_utils.modify_input(new_pipeline_path,"urls", [api_url])
+ ### Pluggins
+ fields=[]
+ json_query = ""
+ for key, value in data_values.items():
+ fields.append(key)
+ parts = value.rsplit('.', 2)
+ json_query = '.'.join(parts[:-1])[1:] # Get the json path till last item (second last dot(.))
+ telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query)
+ telegraf_utils.modify_input(new_pipeline_path,"fieldinclude", fields)
+ telegraf_utils.modify_input(new_pipeline_path,"name_override", measurement_name)
+
+ ## Database
+ telegraf_utils.modify_output(new_pipeline_path, "urls", [config.DB_URL])
+ telegraf_utils.modify_output(new_pipeline_path, "database", config.DB_NAME)
+ telegraf_utils.modify_output(new_pipeline_path, "username", config.DB_USER)
+ telegraf_utils.modify_output(new_pipeline_path, "password", config.DB_PASS)
-###########################
@@ -48,17 +63,18 @@ def build_pipeline():
print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"
-
+ measurement_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): "))
new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".toml"
## TODO
else:
api_url = config.API_URL
data_values = config.API_FIELDS
- schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD
+ schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD+"s"
new_pipeline_name = config.PIPELINE_NAME+".toml"
api_username = config.API_USERNAME
api_password = config.API_PASSWORD
+ measurement_name = config.MEASUREMENT_NAME
@@ -71,8 +87,7 @@ def build_pipeline():
shutil.copy(f"modules/telegraf/templates/{template_name}", new_pipeline_path)
- modify_template(new_pipeline_path, api_url, schedulingPeriod)
- #telegraf.modify_output("templates/basic_ETL.toml", "urls", "testingIfWorks")
+ modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name)
diff --git a/modules/telegraf/telegraf_utils.py b/modules/telegraf/telegraf_utils.py
index ffaddfd..62bc4ea 100644
--- a/modules/telegraf/telegraf_utils.py
+++ b/modules/telegraf/telegraf_utils.py
@@ -43,9 +43,9 @@ def modify_output(new_pipeline_path, key, value):
if key in pluggin:
- print(f"Before: {key} = {pluggin[key]}")
+ #print(f"Before: {key} = {pluggin[key]}")
pluggin[key] = value
- print(f"After: {key} = {pluggin[key]}")
+ #print(f"After: {key} = {pluggin[key]}")
with open(new_pipeline_path, "w") as f:
diff --git a/modules/telegraf/templates/advanced_ETL.toml b/modules/telegraf/templates/advanced_ETL.toml
new file mode 100644
index 0000000..de6b8e1
--- /dev/null
+++ b/modules/telegraf/templates/advanced_ETL.toml
@@ -0,0 +1,39 @@
+[agent]
+ debug = true
+# interval = "3600s"
+
+
+#INPUT: fetching data from delta api
+[[inputs.http]]
+ name_override = "telegraafi_deltaEnergy"
+ urls = ["https://delta.iot.cs.ut.ee/measurement/measurements?source=780&dateFrom=2025-02-19T00:00:00Z&dateTo=2025-02-19T23:59:59Z&pageSize=200&type=KogEN"]
+
+ method = "GET"
+
+ # Authentication
+ username = "rasmus.luha"
+ password =
+
+ # Response format
+ data_format = "json"
+
+ # Specify JSON field
+ json_query = "measurements"
+
+ # Field for energy value from "measurements"
+ json_string_fields = ["KogEN.T.value"]
+
+ # Timestamp configuration - needed for the database
+ json_time_key = "time"
+ json_time_format = "2006-01-02T15:04:05Z"
+
+
+
+
+# OUTPUT: Write data to InfluxDB
+[[outputs.influxdb]]
+ urls = ["http://influxdb:8086"]
+ database = "telegraf_deltaEnergy"
+ username = "admin"
+ password = "admin"
+
diff --git a/modules/telegraf/templates/basic_ETL.toml b/modules/telegraf/templates/basic_ETL.toml
index 155f570..82b1728 100644
--- a/modules/telegraf/templates/basic_ETL.toml
+++ b/modules/telegraf/templates/basic_ETL.toml
@@ -7,7 +7,7 @@
[[inputs.http]]
urls = [] #[ "https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72&current_weather=true" ]
method = "GET"
- timeout = "plcaeholder" #"5s"
+ timeout = "5s"
headers = { Content-Type = "application/json" }
data_format = "json"
json_query = "plcaeholder" #"current_weather"