summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRasmus Luha <rasmus.luha@ut.ee>2025-04-13 19:40:41 +0300
committerRasmus Luha <rasmus.luha@ut.ee>2025-04-13 19:40:41 +0300
commit5bbcd5b6bc8340eedb7e8dcd88e156a0987d42f6 (patch)
treef0eb531a1342b14ef7ff9984a474de2431940935
parent3104d4fdab9a80ada2c09c5188a920dec939bbe8 (diff)
added ReplaceText processor inital hardcoded support
-rw-r--r--.gitignore1
-rw-r--r--modules/nifi/core.py20
2 files changed, 17 insertions, 4 deletions
diff --git a/.gitignore b/.gitignore
index b398d7a..cc1f513 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,4 @@
+TODO.txt
pipelines/*
venv/
__pycache__/
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index a30211c..fccbf91 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -100,9 +100,6 @@ def get_data_values():
-def update_template_with_json_list():
- update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
-
## TODO - textReplace part -> fix templates
@@ -134,26 +131,41 @@ def build_pipeline():
### Processor editing
+ ## Measurements setup - TODO: hardcoded.
+ measurements_name = "test_measurementName, "
+
if needs_SplitJson:
## SplitJson update
split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0])
-
update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path)
+
+ ## EvaluateJsonPath processor setup
for key, value in data_values.items() :
path_parts = value.split(']')
update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
+ measurements_name+=f"{key}={{{key}}}"
+ ## Database Setup
+ set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties")
else:
## EvaluateJsonPath processor setup
for key, value in data_values.items() :
update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
+ measurements_name+=f"{key}={{{key}}}"
## Database Setup
set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
+ ##ReplaceText update
+ update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name)
+
+
+
print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.")
+
+
## Pipeline Deployment
if (config.NIFI_DEPLOY):
nifi_utils.upload_nifi_exported_flow( nifi_host=config.NIFI_HOST, username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False)