From 3104d4fdab9a80ada2c09c5188a920dec939bbe8 Mon Sep 17 00:00:00 2001 From: Rasmus Luha Date: Sun, 13 Apr 2025 17:00:46 +0300 Subject: SplitJson inital support added --- modules/nifi/core.py | 42 ++++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 16 deletions(-) (limited to 'modules/nifi/core.py') diff --git a/modules/nifi/core.py b/modules/nifi/core.py index 38c22dc..a30211c 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -10,6 +10,7 @@ import sys import json import shutil import requests +import re def introduction(): @@ -104,47 +105,56 @@ def update_template_with_json_list(): ## TODO - textReplace part -> fix templates + def build_pipeline(): data_values = get_data_values() - print(data_values) - ## Check if splitJson template needed + ### Check if splitJson template needed needs_SplitJson = False + path_parts = [] for el in data_values.values(): if '[' in el: needs_SplitJson = True + #path_parts = el.split(']') + path_parts = re.split(r'(?<=\])', el) - print(needs_SplitJson) - ## Select template + ### Select template ## TODO - unhardcoded template usage + new_pipeline_name="test_pipeline.json" if needs_SplitJson: template_name="splitJsonETL.json" else: template_name="basic_ETL.json" - new_pipeline_path = f"pipelines/{template_name}" + new_pipeline_path = f"pipelines/{new_pipeline_name}" shutil.copy(f"modules/nifi/templates/{template_name}", new_pipeline_path) - ## Processor editing - for key, value in data_values.items() : - update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) + ### Processor editing + if needs_SplitJson: + ## SplitJson update + split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0]) + update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path) + for key, value in data_values.items() : + path_parts = value.split(']') + update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1]) + else: + ## EvaluateJsonPath processor setup + for key, value in data_values.items() : + update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) - ## Database Setup - set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") + ## Database Setup + set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") + print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.") ## Pipeline Deployment if (config.NIFI_DEPLOY): - nifi_utils.upload_nifi_exported_flow( nifi_host="https://127.0.0.1.nip.io", username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False) - else: - print("TODO - ask if user wants deployment") - - - print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.") + nifi_utils.upload_nifi_exported_flow( nifi_host=config.NIFI_HOST, username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False) + print("Andmekonveier on deploytud - TODO") -- cgit v1.2.3