From 5bbcd5b6bc8340eedb7e8dcd88e156a0987d42f6 Mon Sep 17 00:00:00 2001 From: Rasmus Luha Date: Sun, 13 Apr 2025 19:40:41 +0300 Subject: added ReplaceText processor inital hardcoded support --- .gitignore | 1 + modules/nifi/core.py | 20 ++++++++++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index b398d7a..cc1f513 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +TODO.txt pipelines/* venv/ __pycache__/ diff --git a/modules/nifi/core.py b/modules/nifi/core.py index a30211c..fccbf91 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -100,9 +100,6 @@ def get_data_values(): -def update_template_with_json_list(): - update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) - ## TODO - textReplace part -> fix templates @@ -134,26 +131,41 @@ def build_pipeline(): ### Processor editing + ## Measurements setup - TODO: hardcoded. + measurements_name = "test_measurementName, " + if needs_SplitJson: ## SplitJson update split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0]) - update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path) + + ## EvaluateJsonPath processor setup for key, value in data_values.items() : path_parts = value.split(']') update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1]) + measurements_name+=f"{key}={{{key}}}" + ## Database Setup + set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties") else: ## EvaluateJsonPath processor setup for key, value in data_values.items() : update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) + measurements_name+=f"{key}={{{key}}}" ## Database Setup set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") + ##ReplaceText update + update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name) + + + print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.") + + ## Pipeline Deployment if (config.NIFI_DEPLOY): nifi_utils.upload_nifi_exported_flow( nifi_host=config.NIFI_HOST, username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False) -- cgit v1.2.3