diff options
author | Rasmus Luha <rasmus.luha@ut.ee> | 2025-04-23 00:04:05 +0300 |
---|---|---|
committer | Rasmus Luha <rasmus.luha@ut.ee> | 2025-04-23 00:04:05 +0300 |
commit | 003351a014acbe7f56b23806f8068502f25d8b8f (patch) | |
tree | f1cc54c293e4d703a89a3899a062ba5f0cf61980 | |
parent | ea4cfadef55319da613901017a586043e75e769f (diff) |
splitJson template fix, minor fixes, nifi almost done - needs password encryption
-rw-r--r-- | config.py | 6 | ||||
-rw-r--r-- | modules/nifi/core.py | 28 | ||||
-rw-r--r-- | modules/nifi/templates/protsessorite_järjekorrad.txt | 2 | ||||
-rw-r--r-- | modules/nifi/templates/splitJsonETL.json | 9 | ||||
-rw-r--r-- | testing.py | 89 |
5 files changed, 118 insertions, 16 deletions
@@ -1,4 +1,4 @@ -INTERACTIVE_MODE=False +INTERACTIVE_MODE=True ## Nifi NIFI_USER="lab08nifiuser" @@ -26,7 +26,7 @@ DB_PASS="admin" ## Needed if Interactive mode turned off API_URL="https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72¤t_weather=true" API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'} -API_URL_USERNAME="TODO" -API_URL_PASSWORD="TODO" +API_USERNAME="TODO" +API_PASSWORD="TODO" PIPELINE_SCHEDULING_PERIOD="5 sec" PIPELINE_NAME="test_pipeline.json" diff --git a/modules/nifi/core.py b/modules/nifi/core.py index f4c377e..be3cc1f 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -11,6 +11,7 @@ import json import shutil import requests import re +import base64 def introduction(): @@ -92,7 +93,7 @@ def get_data_values(): choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower() if choose_another == 'e': - return chosen_json_values, api_url + return chosen_json_values, api_url, username, passwd else: choice = common.ask_binary_input(prompt="\nKas soovid URL-i (m)uuta URL-i või (v)äljuda?(m/v): ",valikud=["m","v"]).strip().lower() if choice == 'v': @@ -103,7 +104,7 @@ def get_data_values(): ## TODO - textReplace part -> fix templates -def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url): +def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password): ############### Choosing and modfyfing Template ############## ### Check if splitJson template needed @@ -129,13 +130,15 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_ - ### Processor editing + ### Processor editing - TODO only from config file currently measurements_name = config.NIFI_MEASUREMENT_NAME+" " if needs_SplitJson: ## SplitJson update split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0]) + print("Got here") update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path) + print("Got also here") ## EvaluateJsonPath processor setup for key, value in data_values.items() : @@ -163,27 +166,38 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_ ## Update scheduling Periond on API Calls update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod) + ## Add api credentials + if api_username != "placeholder": + update_template(new_pipeline_path, "flowContents.processors[1]", "Request Username", api_username) + #update_template(new_pipeline_path, "flowContents.processors[1]", "Request Password", api_password) + #update_template(new_pipeline_path, "flowContents.processors[1]", "Request Password", base64.b64encode(api_password.encode()).decode()) + + + ############################################### def build_pipeline(): if config.INTERACTIVE_MODE: - data_values, api_url= get_data_values() + data_values, api_url, api_username, api_password= get_data_values() + print("\nKui tihti peaks andmekonveier jooksma? (sekundites)") schedulingPeriod = str(common.ask_digit_input(86400))+ "sec" + new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json" + else: api_url = config.API_URL data_values = config.API_FIELDS schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD new_pipeline_name = config.PIPELINE_NAME + api_username = config.API_USERNAME + api_password = config.API_PASSWORD - - modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url) + modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password) print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.") - ## Pipeline Deployment if (config.NIFI_DEPLOY): nifi_utils.upload_nifi_exported_flow( nifi_host=config.NIFI_HOST, username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False) diff --git a/modules/nifi/templates/protsessorite_järjekorrad.txt b/modules/nifi/templates/protsessorite_järjekorrad.txt index fe610fc..cd20936 100644 --- a/modules/nifi/templates/protsessorite_järjekorrad.txt +++ b/modules/nifi/templates/protsessorite_järjekorrad.txt @@ -15,4 +15,4 @@ splitJsonETL.json 1: InvokeHTTP in 2: EvaluateJsonPath 3: SplitJson -3: InvokeHTTP Out +4: InvokeHTTP Out diff --git a/modules/nifi/templates/splitJsonETL.json b/modules/nifi/templates/splitJsonETL.json index cc4d2b6..3f5fe32 100644 --- a/modules/nifi/templates/splitJsonETL.json +++ b/modules/nifi/templates/splitJsonETL.json @@ -28,7 +28,6 @@ }, "properties": { "Regular Expression": "(?s)(^.*$)", - "Replacement Value": "energy,building=\"Delta\" kilowattHours=${energy_value}", "Evaluation Mode": "Entire text", "Text to Prepend": null, "Line-by-Line Evaluation Mode": "All", @@ -163,13 +162,13 @@ "Request User-Agent": null, "Response Header Request Attributes Enabled": "false", "HTTP Method": "GET", - "Request Username": null + "Request Username": "your.name", "Request Content-Type": "${mime.type}", "Response Body Attribute Name": null, "Request Digest Authentication Enabled": "false", "Request Multipart Form-Data Name": null, "Response Cache Size": "10MB", - "Response Body Ignored": "false", + "Response Body Ignored": "false" }, "propertyDescriptors": { "Request Content-Encoding": { @@ -540,7 +539,7 @@ "properties": { "Max String Length": "20 MB", "Null Value Representation": "empty string", - "JsonPath Expression": "Placeholder" + "JsonPath Expression": "placeholder" }, "propertyDescriptors": { "Max String Length": { @@ -610,7 +609,7 @@ "Socket Read Timeout": "15 secs", "Socket Idle Connections": "5", "Request Body Enabled": "true", - "HTTP URL": "Placeholder", + "HTTP URL": "placeholder", "Request OAuth2 Access Token Provider": null, "Socket Idle Timeout": "5 mins", "Response Redirects Enabled": "True", diff --git a/testing.py b/testing.py new file mode 100644 index 0000000..4ace0e5 --- /dev/null +++ b/testing.py @@ -0,0 +1,89 @@ +import config as config + +import requests +import json + +# Configuration +PLAINTEXT_PW = "TODO" +INPUT_TEMPLATE = "template.json" +OUTPUT_FILE = "configured-flow.json" + +# API Endpoints +BASE_URL = f"{config.NIFI_HOST}/nifi-api" +LOGIN_URL = f"{BASE_URL}/access/token" +ENCRYPT_URL = f"{BASE_URL}/flow/encrypt-text" + +def get_access_token(): + """Authenticate with NiFi and get JWT token""" + try: + response = requests.post( + LOGIN_URL, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data=f"username={config.NIFI_USER}&password={config.NIFI_PASS}", + verify=False # For self-signed certificates + ) + response.raise_for_status() + return response.text + except requests.exceptions.RequestException as e: + print(f"Authentication failed: {str(e)}") + exit(1) + +def encrypt_password(token, plaintext): + """Encryptpassword using NiFi's API""" + try: + response = requests.post( + ENCRYPT_URL, + headers={ + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": f"Bearer {token}" + }, + data=f"value={plaintext}", + verify=False + ) + response.raise_for_status() + return response.json()["value"] + except requests.exceptions.RequestException as e: + print(f"Encryption failed: {str(e)}") + exit(1) + + + + + + + + + +def update_template(encrypted_pw): + """Update JSON template with encrypted password""" + try: + with open(INPUT_TEMPLATE, "r") as f: + flow = json.load(f) + + # Find and update InvokeHTTP processor + for processor in flow["flowContents"]["processors"]: + if processor["type"] == "org.apache.nifi.processors.standard.InvokeHTTP": + # Update password property + processor["properties"]["Request password"] = encrypted_pw + # Mark property as sensitive + processor["propertyDescriptors"]["Request password"]["sensitive"] = True + + with open(OUTPUT_FILE, "w") as f: + json.dump(flow, f, indent=2) + + print(f"Successfully generated: {OUTPUT_FILE}") + except Exception as e: + print(f"Template update failed: {str(e)}") + exit(1) + +if __name__ == "__main__": + # 1. Authenticate + token = get_access_token() + + print("token part DONE") + + # 2. Encrypt config.NIFI_PASS + encrypt_password(token, PLAINTEXT_PW) + + # 3. Update template + #update_template(encrypted_config.NIFI_PASS) |