summaryrefslogtreecommitdiff
path: root/modules/telegraf/core.py
diff options
context:
space:
mode:
authorRasmus Luha <rasmus.luha@ut.ee>2025-04-29 23:49:58 +0300
committerRasmus Luha <rasmus.luha@ut.ee>2025-04-29 23:49:58 +0300
commit737f498e1f402a5a41068a37ab20f34cabd4b052 (patch)
tree2b97e22aa8e3293e4c66e69c5ba7d7bcc5b0a5fd /modules/telegraf/core.py
parentb2e8ec86abe8089ed5fbd1655677889b6691397f (diff)
telegraf basicETL working
Diffstat (limited to 'modules/telegraf/core.py')
-rw-r--r--modules/telegraf/core.py29
1 files changed, 22 insertions, 7 deletions
diff --git a/modules/telegraf/core.py b/modules/telegraf/core.py
index ffef7d5..ded2d1d 100644
--- a/modules/telegraf/core.py
+++ b/modules/telegraf/core.py
@@ -25,20 +25,35 @@ def introduction():
###########################
-def modify_template(new_pipeline_path, api_url, schedulingPeriod):
+def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name):
- ## Pipeline intervall
+ ## Pipeline interval
telegraf_utils.modify_agent(new_pipeline_path,"interval", schedulingPeriod)
## API url
telegraf_utils.modify_input(new_pipeline_path,"urls", [api_url])
+ ### Pluggins
+ fields=[]
+ json_query = ""
+ for key, value in data_values.items():
+ fields.append(key)
+ parts = value.rsplit('.', 2)
+ json_query = '.'.join(parts[:-1])[1:] # Get the json path till last item (second last dot(.))
+ telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query)
+ telegraf_utils.modify_input(new_pipeline_path,"fieldinclude", fields)
+ telegraf_utils.modify_input(new_pipeline_path,"name_override", measurement_name)
+
+ ## Database
+ telegraf_utils.modify_output(new_pipeline_path, "urls", [config.DB_URL])
+ telegraf_utils.modify_output(new_pipeline_path, "database", config.DB_NAME)
+ telegraf_utils.modify_output(new_pipeline_path, "username", config.DB_USER)
+ telegraf_utils.modify_output(new_pipeline_path, "password", config.DB_PASS)
-###########################
@@ -48,17 +63,18 @@ def build_pipeline():
print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"
-
+ measurement_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): "))
new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".toml"
## TODO
else:
api_url = config.API_URL
data_values = config.API_FIELDS
- schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD
+ schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD+"s"
new_pipeline_name = config.PIPELINE_NAME+".toml"
api_username = config.API_USERNAME
api_password = config.API_PASSWORD
+ measurement_name = config.MEASUREMENT_NAME
@@ -71,8 +87,7 @@ def build_pipeline():
shutil.copy(f"modules/telegraf/templates/{template_name}", new_pipeline_path)
- modify_template(new_pipeline_path, api_url, schedulingPeriod)
- #telegraf.modify_output("templates/basic_ETL.toml", "urls", "testingIfWorks")
+ modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name)