summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRasmus Luha <rasmus.luha@ut.ee>2025-04-13 17:00:46 +0300
committerRasmus Luha <rasmus.luha@ut.ee>2025-04-13 17:00:46 +0300
commit3104d4fdab9a80ada2c09c5188a920dec939bbe8 (patch)
treed03225ee790dd1e9bb201cacc94588564041fd1b
parentf42212afe0cc0de4b321df2cd70d830a3ea06231 (diff)
SplitJson inital support added
-rw-r--r--config.py1
-rw-r--r--modules/nifi/core.py42
-rw-r--r--modules/nifi/templates/basic_ETL.json2
-rw-r--r--modules/nifi/templates/splitJsonETL.json89
4 files changed, 72 insertions, 62 deletions
diff --git a/config.py b/config.py
index eec0afb..f21307e 100644
--- a/config.py
+++ b/config.py
@@ -2,6 +2,7 @@
NIFI_USER="lab08nifiuser"
NIFI_PASS="tartunifi2023"
+NIFI_HOST="https://127.0.0.1.nip.io"
NIFI_DEPLOY=False
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index 38c22dc..a30211c 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -10,6 +10,7 @@ import sys
import json
import shutil
import requests
+import re
def introduction():
@@ -104,47 +105,56 @@ def update_template_with_json_list():
## TODO - textReplace part -> fix templates
+
def build_pipeline():
data_values = get_data_values()
- print(data_values)
- ## Check if splitJson template needed
+ ### Check if splitJson template needed
needs_SplitJson = False
+ path_parts = []
for el in data_values.values():
if '[' in el:
needs_SplitJson = True
+ #path_parts = el.split(']')
+ path_parts = re.split(r'(?<=\])', el)
- print(needs_SplitJson)
- ## Select template
+ ### Select template
## TODO - unhardcoded template usage
+ new_pipeline_name="test_pipeline.json"
if needs_SplitJson:
template_name="splitJsonETL.json"
else:
template_name="basic_ETL.json"
- new_pipeline_path = f"pipelines/{template_name}"
+ new_pipeline_path = f"pipelines/{new_pipeline_name}"
shutil.copy(f"modules/nifi/templates/{template_name}", new_pipeline_path)
- ## Processor editing
- for key, value in data_values.items() :
- update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
+ ### Processor editing
+ if needs_SplitJson:
+ ## SplitJson update
+ split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0])
+ update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path)
+ for key, value in data_values.items() :
+ path_parts = value.split(']')
+ update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
+ else:
+ ## EvaluateJsonPath processor setup
+ for key, value in data_values.items() :
+ update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
- ## Database Setup
- set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
+ ## Database Setup
+ set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
+ print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.")
## Pipeline Deployment
if (config.NIFI_DEPLOY):
- nifi_utils.upload_nifi_exported_flow( nifi_host="https://127.0.0.1.nip.io", username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False)
- else:
- print("TODO - ask if user wants deployment")
-
-
- print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.")
+ nifi_utils.upload_nifi_exported_flow( nifi_host=config.NIFI_HOST, username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False)
+ print("Andmekonveier on deploytud - TODO")
diff --git a/modules/nifi/templates/basic_ETL.json b/modules/nifi/templates/basic_ETL.json
index 9c3764e..34a0297 100644
--- a/modules/nifi/templates/basic_ETL.json
+++ b/modules/nifi/templates/basic_ETL.json
@@ -572,7 +572,7 @@
"Socket Read Timeout": "15 secs",
"Socket Idle Connections": "5",
"Request Body Enabled": "true",
- "HTTP URL": "http://influxdb:8086/write?db=nifi_weatherData",
+ "HTTP URL": "Placeholder",
"Request OAuth2 Access Token Provider": null,
"Socket Idle Timeout": "5 mins",
"Response Redirects Enabled": "True",
diff --git a/modules/nifi/templates/splitJsonETL.json b/modules/nifi/templates/splitJsonETL.json
index 457c783..aa3b536 100644
--- a/modules/nifi/templates/splitJsonETL.json
+++ b/modules/nifi/templates/splitJsonETL.json
@@ -440,26 +440,35 @@
"groupIdentifier": "2ae4bcd4-1c30-34e2-8206-1a0b567f7274"
},
{
- "identifier": "fb9a5b80-aa9a-3e1b-86f5-c17db5783812",
- "instanceIdentifier": "0bd7b16d-0195-1000-32ad-5b4055f37b22",
- "name": "SplitJson",
+ "identifier": "b00e49a7-d25a-3d5a-8705-ef4c5e2919e7",
+ "instanceIdentifier": "6802228d-1680-3d01-dcb3-83febf10560d",
+ "name": "EvaluateJsonPath",
"comments": "",
"position": {
- "x": -1184.0,
- "y": -440.0
+ "x": -648.0,
+ "y": -608.0
},
- "type": "org.apache.nifi.processors.standard.SplitJson",
+ "type": "org.apache.nifi.processors.standard.EvaluateJsonPath",
"bundle": {
"group": "org.apache.nifi",
"artifact": "nifi-standard-nar",
"version": "2.1.0"
},
"properties": {
+ "Destination": "flowfile-attribute",
"Max String Length": "20 MB",
+ "Return Type": "auto-detect",
"Null Value Representation": "empty string",
- "JsonPath Expression": "$.measurements[*]"
+ "Path Not Found Behavior": "ignore"
},
"propertyDescriptors": {
+ "Destination": {
+ "name": "Destination",
+ "displayName": "Destination",
+ "identifiesControllerService": false,
+ "sensitive": false,
+ "dynamic": false
+ },
"Max String Length": {
"name": "Max String Length",
"displayName": "Max String Length",
@@ -467,6 +476,20 @@
"sensitive": false,
"dynamic": false
},
+ "Return Type": {
+ "name": "Return Type",
+ "displayName": "Return Type",
+ "identifiesControllerService": false,
+ "sensitive": false,
+ "dynamic": false
+ },
+ "energy_value": {
+ "name": "energy_value",
+ "displayName": "energy_value",
+ "identifiesControllerService": false,
+ "sensitive": false,
+ "dynamic": true
+ },
"Null Value Representation": {
"name": "Null Value Representation",
"displayName": "Null Value Representation",
@@ -474,9 +497,9 @@
"sensitive": false,
"dynamic": false
},
- "JsonPath Expression": {
- "name": "JsonPath Expression",
- "displayName": "JsonPath Expression",
+ "Path Not Found Behavior": {
+ "name": "Path Not Found Behavior",
+ "displayName": "Path Not Found Behavior",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false
@@ -501,36 +524,26 @@
"groupIdentifier": "2ae4bcd4-1c30-34e2-8206-1a0b567f7274"
},
{
- "identifier": "b00e49a7-d25a-3d5a-8705-ef4c5e2919e7",
- "instanceIdentifier": "6802228d-1680-3d01-dcb3-83febf10560d",
- "name": "EvaluateJsonPath",
+ "identifier": "fb9a5b80-aa9a-3e1b-86f5-c17db5783812",
+ "instanceIdentifier": "0bd7b16d-0195-1000-32ad-5b4055f37b22",
+ "name": "SplitJson",
"comments": "",
"position": {
- "x": -648.0,
- "y": -608.0
+ "x": -1184.0,
+ "y": -440.0
},
- "type": "org.apache.nifi.processors.standard.EvaluateJsonPath",
+ "type": "org.apache.nifi.processors.standard.SplitJson",
"bundle": {
"group": "org.apache.nifi",
"artifact": "nifi-standard-nar",
"version": "2.1.0"
},
"properties": {
- "Destination": "flowfile-attribute",
"Max String Length": "20 MB",
- "Return Type": "auto-detect",
- "energy_value": "$.KogEN.T.value",
"Null Value Representation": "empty string",
- "Path Not Found Behavior": "ignore"
+ "JsonPath Expression": "Placeholder"
},
"propertyDescriptors": {
- "Destination": {
- "name": "Destination",
- "displayName": "Destination",
- "identifiesControllerService": false,
- "sensitive": false,
- "dynamic": false
- },
"Max String Length": {
"name": "Max String Length",
"displayName": "Max String Length",
@@ -538,20 +551,6 @@
"sensitive": false,
"dynamic": false
},
- "Return Type": {
- "name": "Return Type",
- "displayName": "Return Type",
- "identifiesControllerService": false,
- "sensitive": false,
- "dynamic": false
- },
- "energy_value": {
- "name": "energy_value",
- "displayName": "energy_value",
- "identifiesControllerService": false,
- "sensitive": false,
- "dynamic": true
- },
"Null Value Representation": {
"name": "Null Value Representation",
"displayName": "Null Value Representation",
@@ -559,9 +558,9 @@
"sensitive": false,
"dynamic": false
},
- "Path Not Found Behavior": {
- "name": "Path Not Found Behavior",
- "displayName": "Path Not Found Behavior",
+ "JsonPath Expression": {
+ "name": "JsonPath Expression",
+ "displayName": "JsonPath Expression",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false
@@ -612,7 +611,7 @@
"Socket Read Timeout": "15 secs",
"Socket Idle Connections": "5",
"Request Body Enabled": "true",
- "HTTP URL": "http://influxdb:8086/write?db=nifi_deltaEnergy",
+ "HTTP URL": "Placeholder",
"Request OAuth2 Access Token Provider": null,
"Socket Idle Timeout": "5 mins",
"Response Redirects Enabled": "True",