summaryrefslogtreecommitdiff
path: root/modules
diff options
context:
space:
mode:
authorRasmus Luha <rasmus.luha@ut.ee>2025-05-11 15:43:31 +0300
committerRasmus Luha <rasmus.luha@ut.ee>2025-05-11 15:43:31 +0300
commit137b8a988f77e957feed698494f7143ac06a7b51 (patch)
treed330750cc63ebbe38a0187002c7cd637eb3f687e /modules
parent9a332ab302a2f05ce4924a66647dec03fb3b1366 (diff)
add diffJsonRootPath template to telegraf
Diffstat (limited to 'modules')
-rw-r--r--modules/nifi/core.py4
-rw-r--r--modules/telegraf/core.py39
-rw-r--r--modules/telegraf/telegraf_utils.py70
-rw-r--r--modules/telegraf/templates/different_jsonPaths_ETL.toml18
4 files changed, 107 insertions, 24 deletions
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index 79ab5b9..6eddbad 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -44,7 +44,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
## Measurements name defining
if config.INTERACTIVE_MODE:
- measurements_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): "))
+ measurements_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): "))+" "
else:
measurements_name = config.MEASUREMENT_NAME+" "
@@ -55,6 +55,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
nifi_utils.update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path)
## EvaluateJsonPath processor setup
+ ## TODO
for key, value in data_values.items() :
path_parts = value.split(']')
nifi_utils.update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
@@ -93,6 +94,7 @@ def build_pipeline():
if config.INTERACTIVE_MODE:
data_values, api_url, api_username, api_password= common.get_data_values()
+ print(data_values)
print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"
diff --git a/modules/telegraf/core.py b/modules/telegraf/core.py
index 64fe893..0c9f9c7 100644
--- a/modules/telegraf/core.py
+++ b/modules/telegraf/core.py
@@ -10,6 +10,7 @@ import config
import toml
import shutil
+import sys
@@ -26,7 +27,6 @@ def introduction():
def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name, api_username, api_password, template_name):
-
## Pipeline interval
telegraf_utils.modify_agent(new_pipeline_path,"interval", schedulingPeriod)
@@ -39,32 +39,29 @@ def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, m
if template_name == "basic_ETL.toml":
-
for key, value in data_values.items():
fields.append(key)
-
parts = value.rsplit('.', 2)
json_query = '.'.join(parts[:-1])[1:] # Get the json path till last item (second last dot(.))
-
telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query)
telegraf_utils.modify_input(new_pipeline_path,"fieldinclude", fields)
-
elif template_name == "advanced_ETL.toml":
-
for key, value in data_values.items():
-
parts = value.split(']', 1)
json_query = parts[0].split("[")[0][1:]
fields.append(parts[1][1:])
-
-
telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query)
telegraf_utils.modify_input(new_pipeline_path,"json_string_fields", fields)
-
-
-
+ elif template_name == "different_jsonPaths_ETL.toml":
+ for key, value in data_values.items():
+ fields.append(value[1:].replace(".","_"))
+ telegraf_utils.modify_input(new_pipeline_path, "fieldpass", fields)
+ #sys.exit(1)
+ else:
+ print("Malli valimisel tekkis probleem...")
+ sys.exit(1)
## Measurement
telegraf_utils.modify_input(new_pipeline_path,"name_override", measurement_name)
@@ -82,8 +79,6 @@ def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, m
telegraf_utils.modify_input(new_pipeline_path,"password", api_password)
-
-
def build_pipeline():
if config.INTERACTIVE_MODE:
data_values, api_url, api_username, api_password= common.get_data_values()
@@ -103,14 +98,22 @@ def build_pipeline():
api_password = config.API_PASSWORD
measurement_name = config.MEASUREMENT_NAME
+
+ ### Select template
+ ## Check if multiple root json paths template should be used
+ prev=""
+ multpleJsonPaths=False
+ for el in data_values.values():
+ cur = el.split(".", 2)[1]
+ if cur != prev and prev != "":
+ multpleJsonPaths = True
+ prev = cur
-
- ### Select template
- ##TODO
- #template_name="basic_ETL.toml"
if (api_username and api_username.lower() != "placeholder") and (api_password and api_password.lower() != "placeholder"):
template_name="advanced_ETL.toml"
+ elif multpleJsonPaths:
+ template_name="different_jsonPaths_ETL.toml"
else:
template_name="basic_ETL.toml"
diff --git a/modules/telegraf/telegraf_utils.py b/modules/telegraf/telegraf_utils.py
index 62bc4ea..2e72fbf 100644
--- a/modules/telegraf/telegraf_utils.py
+++ b/modules/telegraf/telegraf_utils.py
@@ -17,8 +17,6 @@ def modify_input(new_pipeline_path, key, value):
##modify_input("templates/basic_ETL.toml", "test_pipers.toml, "urls", ["stillTesting"])
-
-## TODO
def modify_agent(new_pipeline_path, key, value):
data = toml.load(new_pipeline_path)
pluggin = data["agent"]
@@ -34,9 +32,6 @@ def modify_agent(new_pipeline_path, key, value):
toml.dump(data, f)
-
-
-## TODO
def modify_output(new_pipeline_path, key, value):
data = toml.load(new_pipeline_path)
pluggin = data["outputs"]["influxdb"][0]
@@ -50,3 +45,68 @@ def modify_output(new_pipeline_path, key, value):
with open(new_pipeline_path, "w") as f:
toml.dump(data, f)
+
+
+
+
+### different_jsonPaths_ETL template funcs ###
+
+
+#def modify_processorsConventer(new_pipeline_path, key, value):
+# data = toml.load(new_pipeline_path)
+# #print(data)
+# pluggin = data["processors"]["converter"][0]["fields"]
+# print(pluggin)
+#
+# if key in pluggin:
+# pluggin[key] = value
+# with open(new_pipeline_path, "w") as f:
+# toml.dump(data, f)
+#
+#
+#def modify_processorsRename(new_pipeline_path, key, value):
+# data = toml.load(new_pipeline_path)
+# pluggin = data["processors"]["rename"][0]["replace"][0]
+# print(pluggin)
+# pluggin = data["processors"]["rename"][0]["replace"][1]
+# print(pluggin)
+#
+# if key in pluggin:
+# pluggin[key] = value
+# with open(new_pipeline_path, "w") as f:
+# toml.dump(data, f)
+#
+
+
+
+
+
+
+
+
+### ChatGPT was used in the procesess of creating this function
+## def add_new_replace_block(new_pipeline_name):
+##
+## new_block = """ [[processors.rename.replace]]
+## field = "placeholder"
+## dest = "placeholder"
+## """
+##
+## with open(new_pipeline_name, "r") as file:
+## lines = file.readlines()
+##
+## # Find the last occurrence of '[[processors.rename.replace]]'
+## insert_index = -1
+## for i, line in enumerate(lines):
+## if line.strip().startswith("[[processors.rename.replace]]"):
+## insert_index = i
+##
+## while insert_index + 1 < len(lines) and lines[insert_index + 1].startswith(" "):
+## insert_index += 1
+##
+## # Insert the new block
+## lines.insert(insert_index + 1, new_block + "\n")
+##
+## with open(new_pipeline_name, "w") as file:
+## file.writelines(lines)
+##
diff --git a/modules/telegraf/templates/different_jsonPaths_ETL.toml b/modules/telegraf/templates/different_jsonPaths_ETL.toml
new file mode 100644
index 0000000..d7825e3
--- /dev/null
+++ b/modules/telegraf/templates/different_jsonPaths_ETL.toml
@@ -0,0 +1,18 @@
+[agent]
+interval = "10s"
+debug = true
+
+[[inputs.http]]
+ urls = [ ]
+ method = "GET"
+ timeout = "10s"
+ data_format = "json"
+ json_query = "" # empty = use whole JSON
+ name_override = "placeholder"
+ fieldpass = []
+
+[[outputs.influxdb]]
+ urls = []
+ database = "Placeholder"
+ username = "admin"
+ password = "admin"