diff options
author | Rasmus Luha <rasmus.luha@ut.ee> | 2025-04-13 17:00:46 +0300 |
---|---|---|
committer | Rasmus Luha <rasmus.luha@ut.ee> | 2025-04-13 17:00:46 +0300 |
commit | 3104d4fdab9a80ada2c09c5188a920dec939bbe8 (patch) | |
tree | d03225ee790dd1e9bb201cacc94588564041fd1b /modules/nifi/core.py | |
parent | f42212afe0cc0de4b321df2cd70d830a3ea06231 (diff) |
SplitJson inital support added
Diffstat (limited to 'modules/nifi/core.py')
-rw-r--r-- | modules/nifi/core.py | 42 |
1 files changed, 26 insertions, 16 deletions
diff --git a/modules/nifi/core.py b/modules/nifi/core.py index 38c22dc..a30211c 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -10,6 +10,7 @@ import sys import json import shutil import requests +import re def introduction(): @@ -104,47 +105,56 @@ def update_template_with_json_list(): ## TODO - textReplace part -> fix templates + def build_pipeline(): data_values = get_data_values() - print(data_values) - ## Check if splitJson template needed + ### Check if splitJson template needed needs_SplitJson = False + path_parts = [] for el in data_values.values(): if '[' in el: needs_SplitJson = True + #path_parts = el.split(']') + path_parts = re.split(r'(?<=\])', el) - print(needs_SplitJson) - ## Select template + ### Select template ## TODO - unhardcoded template usage + new_pipeline_name="test_pipeline.json" if needs_SplitJson: template_name="splitJsonETL.json" else: template_name="basic_ETL.json" - new_pipeline_path = f"pipelines/{template_name}" + new_pipeline_path = f"pipelines/{new_pipeline_name}" shutil.copy(f"modules/nifi/templates/{template_name}", new_pipeline_path) - ## Processor editing - for key, value in data_values.items() : - update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) + ### Processor editing + if needs_SplitJson: + ## SplitJson update + split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0]) + update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path) + for key, value in data_values.items() : + path_parts = value.split(']') + update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1]) + else: + ## EvaluateJsonPath processor setup + for key, value in data_values.items() : + update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) - ## Database Setup - set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") + ## Database Setup + set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") + print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.") ## Pipeline Deployment if (config.NIFI_DEPLOY): - nifi_utils.upload_nifi_exported_flow( nifi_host="https://127.0.0.1.nip.io", username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False) - else: - print("TODO - ask if user wants deployment") - - - print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.") + nifi_utils.upload_nifi_exported_flow( nifi_host=config.NIFI_HOST, username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False) + print("Andmekonveier on deploytud - TODO") |