summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRasmus Luha <rasmus.luha@ut.ee>2025-04-13 01:16:54 +0300
committerRasmus Luha <rasmus.luha@ut.ee>2025-04-13 01:16:54 +0300
commitf42212afe0cc0de4b321df2cd70d830a3ea06231 (patch)
tree30935a99a8f8cf1d0624f5830d85b609fa95d205
parent5a46716cf0d6d465052eef48006ae32072c71bd3 (diff)
add SplitJson processor support in Nifi
-rw-r--r--config.py2
-rw-r--r--modules/nifi/core.py49
2 files changed, 40 insertions, 11 deletions
diff --git a/config.py b/config.py
index a41d686..eec0afb 100644
--- a/config.py
+++ b/config.py
@@ -2,6 +2,8 @@
NIFI_USER="lab08nifiuser"
NIFI_PASS="tartunifi2023"
+NIFI_DEPLOY=False
+
## Database
DB_URL="http://influxdb:8086/write?db=nifi_weatherData"
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index 6746bc6..38c22dc 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -1,10 +1,11 @@
-from pyfiglet import figlet_format
-from rich.console import Console
+## TODO - check syntax
from common import core as common
-import config as config ## TODO - check syntax
-#from modules.nifi import nifi_utils as nifi_utils
+import config as config
+from modules.nifi import nifi_utils as nifi_utils
+from pyfiglet import figlet_format
+from rich.console import Console
import sys
import json
import shutil
@@ -105,19 +106,45 @@ def update_template_with_json_list():
## TODO - textReplace part -> fix templates
def build_pipeline():
data_values = get_data_values()
+ print(data_values)
+
+ ## Check if splitJson template needed
+ needs_SplitJson = False
+ for el in data_values.values():
+ if '[' in el:
+ needs_SplitJson = True
+
+ print(needs_SplitJson)
+
+
+ ## Select template
+ ## TODO - unhardcoded template usage
+ if needs_SplitJson:
+ template_name="splitJsonETL.json"
+ else:
+ template_name="basic_ETL.json"
- ## TODO - unhardcode
- new_pipeline_path = "pipelines/test_pipeline.json"
- shutil.copy("modules/nifi/templates/basic_ETL.json", new_pipeline_path)
+ new_pipeline_path = f"pipelines/{template_name}"
+ shutil.copy(f"modules/nifi/templates/{template_name}", new_pipeline_path)
- ## TODO - make a function for different types ... etc
+
+
+ ## Processor editing
for key, value in data_values.items() :
update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
+
+
+ ## Database Setup
set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
- print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.")
- ## TODO - not working
- #nifi_utils.upload_nifi_exported_flow( nifi_host="https://127.0.0.1.nip.io", username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False)
+
+ ## Pipeline Deployment
+ if (config.NIFI_DEPLOY):
+ nifi_utils.upload_nifi_exported_flow( nifi_host="https://127.0.0.1.nip.io", username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False)
+ else:
+ print("TODO - ask if user wants deployment")
+
+ print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.")