diff options
author | Rasmus Luha <rasmus.luha@ut.ee> | 2025-04-13 17:00:46 +0300 |
---|---|---|
committer | Rasmus Luha <rasmus.luha@ut.ee> | 2025-04-13 17:00:46 +0300 |
commit | 3104d4fdab9a80ada2c09c5188a920dec939bbe8 (patch) | |
tree | d03225ee790dd1e9bb201cacc94588564041fd1b | |
parent | f42212afe0cc0de4b321df2cd70d830a3ea06231 (diff) |
SplitJson inital support added
-rw-r--r-- | config.py | 1 | ||||
-rw-r--r-- | modules/nifi/core.py | 42 | ||||
-rw-r--r-- | modules/nifi/templates/basic_ETL.json | 2 | ||||
-rw-r--r-- | modules/nifi/templates/splitJsonETL.json | 89 |
4 files changed, 72 insertions, 62 deletions
@@ -2,6 +2,7 @@ NIFI_USER="lab08nifiuser" NIFI_PASS="tartunifi2023" +NIFI_HOST="https://127.0.0.1.nip.io" NIFI_DEPLOY=False diff --git a/modules/nifi/core.py b/modules/nifi/core.py index 38c22dc..a30211c 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -10,6 +10,7 @@ import sys import json import shutil import requests +import re def introduction(): @@ -104,47 +105,56 @@ def update_template_with_json_list(): ## TODO - textReplace part -> fix templates + def build_pipeline(): data_values = get_data_values() - print(data_values) - ## Check if splitJson template needed + ### Check if splitJson template needed needs_SplitJson = False + path_parts = [] for el in data_values.values(): if '[' in el: needs_SplitJson = True + #path_parts = el.split(']') + path_parts = re.split(r'(?<=\])', el) - print(needs_SplitJson) - ## Select template + ### Select template ## TODO - unhardcoded template usage + new_pipeline_name="test_pipeline.json" if needs_SplitJson: template_name="splitJsonETL.json" else: template_name="basic_ETL.json" - new_pipeline_path = f"pipelines/{template_name}" + new_pipeline_path = f"pipelines/{new_pipeline_name}" shutil.copy(f"modules/nifi/templates/{template_name}", new_pipeline_path) - ## Processor editing - for key, value in data_values.items() : - update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) + ### Processor editing + if needs_SplitJson: + ## SplitJson update + split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0]) + update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path) + for key, value in data_values.items() : + path_parts = value.split(']') + update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1]) + else: + ## EvaluateJsonPath processor setup + for key, value in data_values.items() : + update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) - ## Database Setup - set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") + ## Database Setup + set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") + print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.") ## Pipeline Deployment if (config.NIFI_DEPLOY): - nifi_utils.upload_nifi_exported_flow( nifi_host="https://127.0.0.1.nip.io", username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False) - else: - print("TODO - ask if user wants deployment") - - - print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.") + nifi_utils.upload_nifi_exported_flow( nifi_host=config.NIFI_HOST, username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False) + print("Andmekonveier on deploytud - TODO") diff --git a/modules/nifi/templates/basic_ETL.json b/modules/nifi/templates/basic_ETL.json index 9c3764e..34a0297 100644 --- a/modules/nifi/templates/basic_ETL.json +++ b/modules/nifi/templates/basic_ETL.json @@ -572,7 +572,7 @@ "Socket Read Timeout": "15 secs", "Socket Idle Connections": "5", "Request Body Enabled": "true", - "HTTP URL": "http://influxdb:8086/write?db=nifi_weatherData", + "HTTP URL": "Placeholder", "Request OAuth2 Access Token Provider": null, "Socket Idle Timeout": "5 mins", "Response Redirects Enabled": "True", diff --git a/modules/nifi/templates/splitJsonETL.json b/modules/nifi/templates/splitJsonETL.json index 457c783..aa3b536 100644 --- a/modules/nifi/templates/splitJsonETL.json +++ b/modules/nifi/templates/splitJsonETL.json @@ -440,26 +440,35 @@ "groupIdentifier": "2ae4bcd4-1c30-34e2-8206-1a0b567f7274" }, { - "identifier": "fb9a5b80-aa9a-3e1b-86f5-c17db5783812", - "instanceIdentifier": "0bd7b16d-0195-1000-32ad-5b4055f37b22", - "name": "SplitJson", + "identifier": "b00e49a7-d25a-3d5a-8705-ef4c5e2919e7", + "instanceIdentifier": "6802228d-1680-3d01-dcb3-83febf10560d", + "name": "EvaluateJsonPath", "comments": "", "position": { - "x": -1184.0, - "y": -440.0 + "x": -648.0, + "y": -608.0 }, - "type": "org.apache.nifi.processors.standard.SplitJson", + "type": "org.apache.nifi.processors.standard.EvaluateJsonPath", "bundle": { "group": "org.apache.nifi", "artifact": "nifi-standard-nar", "version": "2.1.0" }, "properties": { + "Destination": "flowfile-attribute", "Max String Length": "20 MB", + "Return Type": "auto-detect", "Null Value Representation": "empty string", - "JsonPath Expression": "$.measurements[*]" + "Path Not Found Behavior": "ignore" }, "propertyDescriptors": { + "Destination": { + "name": "Destination", + "displayName": "Destination", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, "Max String Length": { "name": "Max String Length", "displayName": "Max String Length", @@ -467,6 +476,20 @@ "sensitive": false, "dynamic": false }, + "Return Type": { + "name": "Return Type", + "displayName": "Return Type", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "energy_value": { + "name": "energy_value", + "displayName": "energy_value", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": true + }, "Null Value Representation": { "name": "Null Value Representation", "displayName": "Null Value Representation", @@ -474,9 +497,9 @@ "sensitive": false, "dynamic": false }, - "JsonPath Expression": { - "name": "JsonPath Expression", - "displayName": "JsonPath Expression", + "Path Not Found Behavior": { + "name": "Path Not Found Behavior", + "displayName": "Path Not Found Behavior", "identifiesControllerService": false, "sensitive": false, "dynamic": false @@ -501,36 +524,26 @@ "groupIdentifier": "2ae4bcd4-1c30-34e2-8206-1a0b567f7274" }, { - "identifier": "b00e49a7-d25a-3d5a-8705-ef4c5e2919e7", - "instanceIdentifier": "6802228d-1680-3d01-dcb3-83febf10560d", - "name": "EvaluateJsonPath", + "identifier": "fb9a5b80-aa9a-3e1b-86f5-c17db5783812", + "instanceIdentifier": "0bd7b16d-0195-1000-32ad-5b4055f37b22", + "name": "SplitJson", "comments": "", "position": { - "x": -648.0, - "y": -608.0 + "x": -1184.0, + "y": -440.0 }, - "type": "org.apache.nifi.processors.standard.EvaluateJsonPath", + "type": "org.apache.nifi.processors.standard.SplitJson", "bundle": { "group": "org.apache.nifi", "artifact": "nifi-standard-nar", "version": "2.1.0" }, "properties": { - "Destination": "flowfile-attribute", "Max String Length": "20 MB", - "Return Type": "auto-detect", - "energy_value": "$.KogEN.T.value", "Null Value Representation": "empty string", - "Path Not Found Behavior": "ignore" + "JsonPath Expression": "Placeholder" }, "propertyDescriptors": { - "Destination": { - "name": "Destination", - "displayName": "Destination", - "identifiesControllerService": false, - "sensitive": false, - "dynamic": false - }, "Max String Length": { "name": "Max String Length", "displayName": "Max String Length", @@ -538,20 +551,6 @@ "sensitive": false, "dynamic": false }, - "Return Type": { - "name": "Return Type", - "displayName": "Return Type", - "identifiesControllerService": false, - "sensitive": false, - "dynamic": false - }, - "energy_value": { - "name": "energy_value", - "displayName": "energy_value", - "identifiesControllerService": false, - "sensitive": false, - "dynamic": true - }, "Null Value Representation": { "name": "Null Value Representation", "displayName": "Null Value Representation", @@ -559,9 +558,9 @@ "sensitive": false, "dynamic": false }, - "Path Not Found Behavior": { - "name": "Path Not Found Behavior", - "displayName": "Path Not Found Behavior", + "JsonPath Expression": { + "name": "JsonPath Expression", + "displayName": "JsonPath Expression", "identifiesControllerService": false, "sensitive": false, "dynamic": false @@ -612,7 +611,7 @@ "Socket Read Timeout": "15 secs", "Socket Idle Connections": "5", "Request Body Enabled": "true", - "HTTP URL": "http://influxdb:8086/write?db=nifi_deltaEnergy", + "HTTP URL": "Placeholder", "Request OAuth2 Access Token Provider": null, "Socket Idle Timeout": "5 mins", "Response Redirects Enabled": "True", |