From 003351a014acbe7f56b23806f8068502f25d8b8f Mon Sep 17 00:00:00 2001 From: Rasmus Luha Date: Wed, 23 Apr 2025 00:04:05 +0300 Subject: splitJson template fix, minor fixes, nifi almost done - needs password encryption --- modules/nifi/core.py | 28 ++++++++++++++++------ .../protsessorite_j\303\244rjekorrad.txt" | 2 +- modules/nifi/templates/splitJsonETL.json | 9 ++++--- 3 files changed, 26 insertions(+), 13 deletions(-) (limited to 'modules') diff --git a/modules/nifi/core.py b/modules/nifi/core.py index f4c377e..be3cc1f 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -11,6 +11,7 @@ import json import shutil import requests import re +import base64 def introduction(): @@ -92,7 +93,7 @@ def get_data_values(): choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower() if choose_another == 'e': - return chosen_json_values, api_url + return chosen_json_values, api_url, username, passwd else: choice = common.ask_binary_input(prompt="\nKas soovid URL-i (m)uuta URL-i või (v)äljuda?(m/v): ",valikud=["m","v"]).strip().lower() if choice == 'v': @@ -103,7 +104,7 @@ def get_data_values(): ## TODO - textReplace part -> fix templates -def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url): +def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password): ############### Choosing and modfyfing Template ############## ### Check if splitJson template needed @@ -129,13 +130,15 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_ - ### Processor editing + ### Processor editing - TODO only from config file currently measurements_name = config.NIFI_MEASUREMENT_NAME+" " if needs_SplitJson: ## SplitJson update split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0]) + print("Got here") update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path) + print("Got also here") ## EvaluateJsonPath processor setup for key, value in data_values.items() : @@ -163,27 +166,38 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_ ## Update scheduling Periond on API Calls update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod) + ## Add api credentials + if api_username != "placeholder": + update_template(new_pipeline_path, "flowContents.processors[1]", "Request Username", api_username) + #update_template(new_pipeline_path, "flowContents.processors[1]", "Request Password", api_password) + #update_template(new_pipeline_path, "flowContents.processors[1]", "Request Password", base64.b64encode(api_password.encode()).decode()) + + + ############################################### def build_pipeline(): if config.INTERACTIVE_MODE: - data_values, api_url= get_data_values() + data_values, api_url, api_username, api_password= get_data_values() + print("\nKui tihti peaks andmekonveier jooksma? (sekundites)") schedulingPeriod = str(common.ask_digit_input(86400))+ "sec" + new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json" + else: api_url = config.API_URL data_values = config.API_FIELDS schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD new_pipeline_name = config.PIPELINE_NAME + api_username = config.API_USERNAME + api_password = config.API_PASSWORD - - modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url) + modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password) print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.") - ## Pipeline Deployment if (config.NIFI_DEPLOY): nifi_utils.upload_nifi_exported_flow( nifi_host=config.NIFI_HOST, username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False) diff --git "a/modules/nifi/templates/protsessorite_j\303\244rjekorrad.txt" "b/modules/nifi/templates/protsessorite_j\303\244rjekorrad.txt" index fe610fc..cd20936 100644 --- "a/modules/nifi/templates/protsessorite_j\303\244rjekorrad.txt" +++ "b/modules/nifi/templates/protsessorite_j\303\244rjekorrad.txt" @@ -15,4 +15,4 @@ splitJsonETL.json 1: InvokeHTTP in 2: EvaluateJsonPath 3: SplitJson -3: InvokeHTTP Out +4: InvokeHTTP Out diff --git a/modules/nifi/templates/splitJsonETL.json b/modules/nifi/templates/splitJsonETL.json index cc4d2b6..3f5fe32 100644 --- a/modules/nifi/templates/splitJsonETL.json +++ b/modules/nifi/templates/splitJsonETL.json @@ -28,7 +28,6 @@ }, "properties": { "Regular Expression": "(?s)(^.*$)", - "Replacement Value": "energy,building=\"Delta\" kilowattHours=${energy_value}", "Evaluation Mode": "Entire text", "Text to Prepend": null, "Line-by-Line Evaluation Mode": "All", @@ -163,13 +162,13 @@ "Request User-Agent": null, "Response Header Request Attributes Enabled": "false", "HTTP Method": "GET", - "Request Username": null + "Request Username": "your.name", "Request Content-Type": "${mime.type}", "Response Body Attribute Name": null, "Request Digest Authentication Enabled": "false", "Request Multipart Form-Data Name": null, "Response Cache Size": "10MB", - "Response Body Ignored": "false", + "Response Body Ignored": "false" }, "propertyDescriptors": { "Request Content-Encoding": { @@ -540,7 +539,7 @@ "properties": { "Max String Length": "20 MB", "Null Value Representation": "empty string", - "JsonPath Expression": "Placeholder" + "JsonPath Expression": "placeholder" }, "propertyDescriptors": { "Max String Length": { @@ -610,7 +609,7 @@ "Socket Read Timeout": "15 secs", "Socket Idle Connections": "5", "Request Body Enabled": "true", - "HTTP URL": "Placeholder", + "HTTP URL": "placeholder", "Request OAuth2 Access Token Provider": null, "Socket Idle Timeout": "5 mins", "Response Redirects Enabled": "True", -- cgit v1.2.3