summaryrefslogtreecommitdiff
path: root/modules/nifi/core.py
diff options
context:
space:
mode:
Diffstat (limited to 'modules/nifi/core.py')
-rw-r--r--modules/nifi/core.py42
1 files changed, 26 insertions, 16 deletions
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index 38c22dc..a30211c 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -10,6 +10,7 @@ import sys
import json
import shutil
import requests
+import re
def introduction():
@@ -104,47 +105,56 @@ def update_template_with_json_list():
## TODO - textReplace part -> fix templates
+
def build_pipeline():
data_values = get_data_values()
- print(data_values)
- ## Check if splitJson template needed
+ ### Check if splitJson template needed
needs_SplitJson = False
+ path_parts = []
for el in data_values.values():
if '[' in el:
needs_SplitJson = True
+ #path_parts = el.split(']')
+ path_parts = re.split(r'(?<=\])', el)
- print(needs_SplitJson)
- ## Select template
+ ### Select template
## TODO - unhardcoded template usage
+ new_pipeline_name="test_pipeline.json"
if needs_SplitJson:
template_name="splitJsonETL.json"
else:
template_name="basic_ETL.json"
- new_pipeline_path = f"pipelines/{template_name}"
+ new_pipeline_path = f"pipelines/{new_pipeline_name}"
shutil.copy(f"modules/nifi/templates/{template_name}", new_pipeline_path)
- ## Processor editing
- for key, value in data_values.items() :
- update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
+ ### Processor editing
+ if needs_SplitJson:
+ ## SplitJson update
+ split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0])
+ update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path)
+ for key, value in data_values.items() :
+ path_parts = value.split(']')
+ update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
+ else:
+ ## EvaluateJsonPath processor setup
+ for key, value in data_values.items() :
+ update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
- ## Database Setup
- set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
+ ## Database Setup
+ set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
+ print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.")
## Pipeline Deployment
if (config.NIFI_DEPLOY):
- nifi_utils.upload_nifi_exported_flow( nifi_host="https://127.0.0.1.nip.io", username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False)
- else:
- print("TODO - ask if user wants deployment")
-
-
- print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.")
+ nifi_utils.upload_nifi_exported_flow( nifi_host=config.NIFI_HOST, username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False)
+ print("Andmekonveier on deploytud - TODO")