summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRasmus Luha <rasmus.luha@ut.ee>2025-04-23 00:04:05 +0300
committerRasmus Luha <rasmus.luha@ut.ee>2025-04-23 00:04:05 +0300
commit003351a014acbe7f56b23806f8068502f25d8b8f (patch)
treef1cc54c293e4d703a89a3899a062ba5f0cf61980
parentea4cfadef55319da613901017a586043e75e769f (diff)
splitJson template fix, minor fixes, nifi almost done - needs password encryption
-rw-r--r--config.py6
-rw-r--r--modules/nifi/core.py28
-rw-r--r--modules/nifi/templates/protsessorite_järjekorrad.txt2
-rw-r--r--modules/nifi/templates/splitJsonETL.json9
-rw-r--r--testing.py89
5 files changed, 118 insertions, 16 deletions
diff --git a/config.py b/config.py
index c737dc8..b171425 100644
--- a/config.py
+++ b/config.py
@@ -1,4 +1,4 @@
-INTERACTIVE_MODE=False
+INTERACTIVE_MODE=True
## Nifi
NIFI_USER="lab08nifiuser"
@@ -26,7 +26,7 @@ DB_PASS="admin"
## Needed if Interactive mode turned off
API_URL="https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72&current_weather=true"
API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'}
-API_URL_USERNAME="TODO"
-API_URL_PASSWORD="TODO"
+API_USERNAME="TODO"
+API_PASSWORD="TODO"
PIPELINE_SCHEDULING_PERIOD="5 sec"
PIPELINE_NAME="test_pipeline.json"
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index f4c377e..be3cc1f 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -11,6 +11,7 @@ import json
import shutil
import requests
import re
+import base64
def introduction():
@@ -92,7 +93,7 @@ def get_data_values():
choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower()
if choose_another == 'e':
- return chosen_json_values, api_url
+ return chosen_json_values, api_url, username, passwd
else:
choice = common.ask_binary_input(prompt="\nKas soovid URL-i (m)uuta URL-i või (v)äljuda?(m/v): ",valikud=["m","v"]).strip().lower()
if choice == 'v':
@@ -103,7 +104,7 @@ def get_data_values():
## TODO - textReplace part -> fix templates
-def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url):
+def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password):
############### Choosing and modfyfing Template ##############
### Check if splitJson template needed
@@ -129,13 +130,15 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
- ### Processor editing
+ ### Processor editing - TODO only from config file currently
measurements_name = config.NIFI_MEASUREMENT_NAME+" "
if needs_SplitJson:
## SplitJson update
split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0])
+ print("Got here")
update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path)
+ print("Got also here")
## EvaluateJsonPath processor setup
for key, value in data_values.items() :
@@ -163,27 +166,38 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
## Update scheduling Periond on API Calls
update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod)
+ ## Add api credentials
+ if api_username != "placeholder":
+ update_template(new_pipeline_path, "flowContents.processors[1]", "Request Username", api_username)
+ #update_template(new_pipeline_path, "flowContents.processors[1]", "Request Password", api_password)
+ #update_template(new_pipeline_path, "flowContents.processors[1]", "Request Password", base64.b64encode(api_password.encode()).decode())
+
+
+
###############################################
def build_pipeline():
if config.INTERACTIVE_MODE:
- data_values, api_url= get_data_values()
+ data_values, api_url, api_username, api_password= get_data_values()
+
print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"
+
new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json"
+
else:
api_url = config.API_URL
data_values = config.API_FIELDS
schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD
new_pipeline_name = config.PIPELINE_NAME
+ api_username = config.API_USERNAME
+ api_password = config.API_PASSWORD
-
- modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url)
+ modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password)
print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.")
-
## Pipeline Deployment
if (config.NIFI_DEPLOY):
nifi_utils.upload_nifi_exported_flow( nifi_host=config.NIFI_HOST, username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False)
diff --git a/modules/nifi/templates/protsessorite_järjekorrad.txt b/modules/nifi/templates/protsessorite_järjekorrad.txt
index fe610fc..cd20936 100644
--- a/modules/nifi/templates/protsessorite_järjekorrad.txt
+++ b/modules/nifi/templates/protsessorite_järjekorrad.txt
@@ -15,4 +15,4 @@ splitJsonETL.json
1: InvokeHTTP in
2: EvaluateJsonPath
3: SplitJson
-3: InvokeHTTP Out
+4: InvokeHTTP Out
diff --git a/modules/nifi/templates/splitJsonETL.json b/modules/nifi/templates/splitJsonETL.json
index cc4d2b6..3f5fe32 100644
--- a/modules/nifi/templates/splitJsonETL.json
+++ b/modules/nifi/templates/splitJsonETL.json
@@ -28,7 +28,6 @@
},
"properties": {
"Regular Expression": "(?s)(^.*$)",
- "Replacement Value": "energy,building=\"Delta\" kilowattHours=${energy_value}",
"Evaluation Mode": "Entire text",
"Text to Prepend": null,
"Line-by-Line Evaluation Mode": "All",
@@ -163,13 +162,13 @@
"Request User-Agent": null,
"Response Header Request Attributes Enabled": "false",
"HTTP Method": "GET",
- "Request Username": null
+ "Request Username": "your.name",
"Request Content-Type": "${mime.type}",
"Response Body Attribute Name": null,
"Request Digest Authentication Enabled": "false",
"Request Multipart Form-Data Name": null,
"Response Cache Size": "10MB",
- "Response Body Ignored": "false",
+ "Response Body Ignored": "false"
},
"propertyDescriptors": {
"Request Content-Encoding": {
@@ -540,7 +539,7 @@
"properties": {
"Max String Length": "20 MB",
"Null Value Representation": "empty string",
- "JsonPath Expression": "Placeholder"
+ "JsonPath Expression": "placeholder"
},
"propertyDescriptors": {
"Max String Length": {
@@ -610,7 +609,7 @@
"Socket Read Timeout": "15 secs",
"Socket Idle Connections": "5",
"Request Body Enabled": "true",
- "HTTP URL": "Placeholder",
+ "HTTP URL": "placeholder",
"Request OAuth2 Access Token Provider": null,
"Socket Idle Timeout": "5 mins",
"Response Redirects Enabled": "True",
diff --git a/testing.py b/testing.py
new file mode 100644
index 0000000..4ace0e5
--- /dev/null
+++ b/testing.py
@@ -0,0 +1,89 @@
+import config as config
+
+import requests
+import json
+
+# Configuration
+PLAINTEXT_PW = "TODO"
+INPUT_TEMPLATE = "template.json"
+OUTPUT_FILE = "configured-flow.json"
+
+# API Endpoints
+BASE_URL = f"{config.NIFI_HOST}/nifi-api"
+LOGIN_URL = f"{BASE_URL}/access/token"
+ENCRYPT_URL = f"{BASE_URL}/flow/encrypt-text"
+
+def get_access_token():
+ """Authenticate with NiFi and get JWT token"""
+ try:
+ response = requests.post(
+ LOGIN_URL,
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ data=f"username={config.NIFI_USER}&password={config.NIFI_PASS}",
+ verify=False # For self-signed certificates
+ )
+ response.raise_for_status()
+ return response.text
+ except requests.exceptions.RequestException as e:
+ print(f"Authentication failed: {str(e)}")
+ exit(1)
+
+def encrypt_password(token, plaintext):
+ """Encryptpassword using NiFi's API"""
+ try:
+ response = requests.post(
+ ENCRYPT_URL,
+ headers={
+ "Content-Type": "application/x-www-form-urlencoded",
+ "Authorization": f"Bearer {token}"
+ },
+ data=f"value={plaintext}",
+ verify=False
+ )
+ response.raise_for_status()
+ return response.json()["value"]
+ except requests.exceptions.RequestException as e:
+ print(f"Encryption failed: {str(e)}")
+ exit(1)
+
+
+
+
+
+
+
+
+
+def update_template(encrypted_pw):
+ """Update JSON template with encrypted password"""
+ try:
+ with open(INPUT_TEMPLATE, "r") as f:
+ flow = json.load(f)
+
+ # Find and update InvokeHTTP processor
+ for processor in flow["flowContents"]["processors"]:
+ if processor["type"] == "org.apache.nifi.processors.standard.InvokeHTTP":
+ # Update password property
+ processor["properties"]["Request password"] = encrypted_pw
+ # Mark property as sensitive
+ processor["propertyDescriptors"]["Request password"]["sensitive"] = True
+
+ with open(OUTPUT_FILE, "w") as f:
+ json.dump(flow, f, indent=2)
+
+ print(f"Successfully generated: {OUTPUT_FILE}")
+ except Exception as e:
+ print(f"Template update failed: {str(e)}")
+ exit(1)
+
+if __name__ == "__main__":
+ # 1. Authenticate
+ token = get_access_token()
+
+ print("token part DONE")
+
+ # 2. Encrypt config.NIFI_PASS
+ encrypt_password(token, PLAINTEXT_PW)
+
+ # 3. Update template
+ #update_template(encrypted_config.NIFI_PASS)