diff options
author | Rasmus Luha <rasmus.luha@ut.ee> | 2025-05-11 15:43:31 +0300 |
---|---|---|
committer | Rasmus Luha <rasmus.luha@ut.ee> | 2025-05-11 15:43:31 +0300 |
commit | 137b8a988f77e957feed698494f7143ac06a7b51 (patch) | |
tree | d330750cc63ebbe38a0187002c7cd637eb3f687e /modules | |
parent | 9a332ab302a2f05ce4924a66647dec03fb3b1366 (diff) |
add diffJsonRootPath template to telegraf
Diffstat (limited to 'modules')
-rw-r--r-- | modules/nifi/core.py | 4 | ||||
-rw-r--r-- | modules/telegraf/core.py | 39 | ||||
-rw-r--r-- | modules/telegraf/telegraf_utils.py | 70 | ||||
-rw-r--r-- | modules/telegraf/templates/different_jsonPaths_ETL.toml | 18 |
4 files changed, 107 insertions, 24 deletions
diff --git a/modules/nifi/core.py b/modules/nifi/core.py index 79ab5b9..6eddbad 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -44,7 +44,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_ ## Measurements name defining if config.INTERACTIVE_MODE: - measurements_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): ")) + measurements_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): "))+" " else: measurements_name = config.MEASUREMENT_NAME+" " @@ -55,6 +55,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path) ## EvaluateJsonPath processor setup + ## TODO for key, value in data_values.items() : path_parts = value.split(']') nifi_utils.update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1]) @@ -93,6 +94,7 @@ def build_pipeline(): if config.INTERACTIVE_MODE: data_values, api_url, api_username, api_password= common.get_data_values() + print(data_values) print("\nKui tihti peaks andmekonveier jooksma? (sekundites)") schedulingPeriod = str(common.ask_digit_input(86400))+ "sec" diff --git a/modules/telegraf/core.py b/modules/telegraf/core.py index 64fe893..0c9f9c7 100644 --- a/modules/telegraf/core.py +++ b/modules/telegraf/core.py @@ -10,6 +10,7 @@ import config import toml import shutil +import sys @@ -26,7 +27,6 @@ def introduction(): def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name, api_username, api_password, template_name): - ## Pipeline interval telegraf_utils.modify_agent(new_pipeline_path,"interval", schedulingPeriod) @@ -39,32 +39,29 @@ def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, m if template_name == "basic_ETL.toml": - for key, value in data_values.items(): fields.append(key) - parts = value.rsplit('.', 2) json_query = '.'.join(parts[:-1])[1:] # Get the json path till last item (second last dot(.)) - telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query) telegraf_utils.modify_input(new_pipeline_path,"fieldinclude", fields) - elif template_name == "advanced_ETL.toml": - for key, value in data_values.items(): - parts = value.split(']', 1) json_query = parts[0].split("[")[0][1:] fields.append(parts[1][1:]) - - telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query) telegraf_utils.modify_input(new_pipeline_path,"json_string_fields", fields) - - - + elif template_name == "different_jsonPaths_ETL.toml": + for key, value in data_values.items(): + fields.append(value[1:].replace(".","_")) + telegraf_utils.modify_input(new_pipeline_path, "fieldpass", fields) + #sys.exit(1) + else: + print("Malli valimisel tekkis probleem...") + sys.exit(1) ## Measurement telegraf_utils.modify_input(new_pipeline_path,"name_override", measurement_name) @@ -82,8 +79,6 @@ def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, m telegraf_utils.modify_input(new_pipeline_path,"password", api_password) - - def build_pipeline(): if config.INTERACTIVE_MODE: data_values, api_url, api_username, api_password= common.get_data_values() @@ -103,14 +98,22 @@ def build_pipeline(): api_password = config.API_PASSWORD measurement_name = config.MEASUREMENT_NAME + + ### Select template + ## Check if multiple root json paths template should be used + prev="" + multpleJsonPaths=False + for el in data_values.values(): + cur = el.split(".", 2)[1] + if cur != prev and prev != "": + multpleJsonPaths = True + prev = cur - - ### Select template - ##TODO - #template_name="basic_ETL.toml" if (api_username and api_username.lower() != "placeholder") and (api_password and api_password.lower() != "placeholder"): template_name="advanced_ETL.toml" + elif multpleJsonPaths: + template_name="different_jsonPaths_ETL.toml" else: template_name="basic_ETL.toml" diff --git a/modules/telegraf/telegraf_utils.py b/modules/telegraf/telegraf_utils.py index 62bc4ea..2e72fbf 100644 --- a/modules/telegraf/telegraf_utils.py +++ b/modules/telegraf/telegraf_utils.py @@ -17,8 +17,6 @@ def modify_input(new_pipeline_path, key, value): ##modify_input("templates/basic_ETL.toml", "test_pipers.toml, "urls", ["stillTesting"]) - -## TODO def modify_agent(new_pipeline_path, key, value): data = toml.load(new_pipeline_path) pluggin = data["agent"] @@ -34,9 +32,6 @@ def modify_agent(new_pipeline_path, key, value): toml.dump(data, f) - - -## TODO def modify_output(new_pipeline_path, key, value): data = toml.load(new_pipeline_path) pluggin = data["outputs"]["influxdb"][0] @@ -50,3 +45,68 @@ def modify_output(new_pipeline_path, key, value): with open(new_pipeline_path, "w") as f: toml.dump(data, f) + + + + +### different_jsonPaths_ETL template funcs ### + + +#def modify_processorsConventer(new_pipeline_path, key, value): +# data = toml.load(new_pipeline_path) +# #print(data) +# pluggin = data["processors"]["converter"][0]["fields"] +# print(pluggin) +# +# if key in pluggin: +# pluggin[key] = value +# with open(new_pipeline_path, "w") as f: +# toml.dump(data, f) +# +# +#def modify_processorsRename(new_pipeline_path, key, value): +# data = toml.load(new_pipeline_path) +# pluggin = data["processors"]["rename"][0]["replace"][0] +# print(pluggin) +# pluggin = data["processors"]["rename"][0]["replace"][1] +# print(pluggin) +# +# if key in pluggin: +# pluggin[key] = value +# with open(new_pipeline_path, "w") as f: +# toml.dump(data, f) +# + + + + + + + + +### ChatGPT was used in the procesess of creating this function +## def add_new_replace_block(new_pipeline_name): +## +## new_block = """ [[processors.rename.replace]] +## field = "placeholder" +## dest = "placeholder" +## """ +## +## with open(new_pipeline_name, "r") as file: +## lines = file.readlines() +## +## # Find the last occurrence of '[[processors.rename.replace]]' +## insert_index = -1 +## for i, line in enumerate(lines): +## if line.strip().startswith("[[processors.rename.replace]]"): +## insert_index = i +## +## while insert_index + 1 < len(lines) and lines[insert_index + 1].startswith(" "): +## insert_index += 1 +## +## # Insert the new block +## lines.insert(insert_index + 1, new_block + "\n") +## +## with open(new_pipeline_name, "w") as file: +## file.writelines(lines) +## diff --git a/modules/telegraf/templates/different_jsonPaths_ETL.toml b/modules/telegraf/templates/different_jsonPaths_ETL.toml new file mode 100644 index 0000000..d7825e3 --- /dev/null +++ b/modules/telegraf/templates/different_jsonPaths_ETL.toml @@ -0,0 +1,18 @@ +[agent] +interval = "10s" +debug = true + +[[inputs.http]] + urls = [ ] + method = "GET" + timeout = "10s" + data_format = "json" + json_query = "" # empty = use whole JSON + name_override = "placeholder" + fieldpass = [] + +[[outputs.influxdb]] + urls = [] + database = "Placeholder" + username = "admin" + password = "admin" |