summaryrefslogtreecommitdiff
path: root/modules
diff options
context:
space:
mode:
authorRasmus Luha <rasmus.luha@ut.ee>2025-04-23 21:55:24 +0300
committerRasmus Luha <rasmus.luha@ut.ee>2025-04-23 21:55:24 +0300
commitc341ab0ab1697a26f09cc8f9131ca1d9f158034e (patch)
treef5c1375e51a05731a8ebd4eac95bfe28f190e5b8 /modules
parent1d41a08fe30bbb5b8a4cea7db14740109960b467 (diff)
add nifi pipeline uploading, template cleanup
Diffstat (limited to 'modules')
-rw-r--r--modules/nifi/core.py14
-rw-r--r--modules/nifi/nifi_utils.py212
-rw-r--r--modules/nifi/templates/basic_ETL.json8
3 files changed, 60 insertions, 174 deletions
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index 24489e4..4b7f748 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -1,7 +1,7 @@
## TODO - check syntax
from common import core as common
import config as config
-from modules.nifi import nifi_utils as nifi_utils
+from modules.nifi import nifi_utils
from pyfiglet import figlet_format
@@ -195,7 +195,15 @@ def build_pipeline():
modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password)
print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.")
+
+
## Pipeline Deployment
if (config.NIFI_DEPLOY):
- nifi_utils.upload_nifi_exported_flow( nifi_host=config.NIFI_HOST, username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False)
- print("Andmekonveier on deploytud - TODO")
+ token = nifi_utils.get_access_token()
+ nifi_utils.upload_nifi_pipeline(token, "pipelines/test_pipeline.json", "test_pipeline", username=config.NIFI_USER, password=config.NIFI_PASS, nifi_url=config.NIFI_HOST, position_x=0, position_y=0)
+ else:
+ choice = common.ask_binary_input(prompt="\nKas soovid genereeritud andmekonveieri nifi platvormile paigaldada?(jah/ei): ",valikud=["jah","ei"]).strip().lower()
+ if choice == "jah":
+ token = nifi_utils.get_access_token()
+ nifi_utils.upload_nifi_pipeline(token, "pipelines/test_pipeline.json", "test_pipeline", username=config.NIFI_USER, password=config.NIFI_PASS, nifi_url=config.NIFI_HOST, position_x=0, position_y=0)
+
diff --git a/modules/nifi/nifi_utils.py b/modules/nifi/nifi_utils.py
index 34e6ad8..88953d6 100644
--- a/modules/nifi/nifi_utils.py
+++ b/modules/nifi/nifi_utils.py
@@ -1,166 +1,46 @@
-# import requests
-# import json
-#
-# def extract_snippet_fields(flow):
-# return {
-# key: flow.get(key, [])
-# for key in [
-# "processors",
-# "connections",
-# "funnels",
-# "inputPorts",
-# "outputPorts",
-# "labels",
-# "controllerServices",
-# "processGroups"
-# ]
-# }
-#
-# def upload_nifi_exported_flow(
-# nifi_host: str,
-# username: str,
-# password: str,
-# json_file_path: str,
-# verify_ssl: bool = False
-# ):
-# try:
-# token_resp = requests.post(
-# f"{nifi_host}/nifi-api/access/token",
-# data={"username": username, "password": password},
-# verify=verify_ssl
-# )
-# token_resp.raise_for_status()
-# token = token_resp.text
-#
-# headers = {
-# "Authorization": f"Bearer {token}",
-# "Content-Type": "application/json"
-# }
-#
-# root_resp = requests.get(f"{nifi_host}/nifi-api/flow/process-groups/root", headers=headers, verify=verify_ssl)
-# root_resp.raise_for_status()
-# root_pg_id = root_resp.json()["processGroupFlow"]["id"]
-#
-# with open(json_file_path, "r") as f:
-# raw = json.load(f)
-#
-# flow = raw.get("flowContents")
-# if not flow:
-# raise ValueError("Missing 'flowContents' in provided file.")
-#
-# snippet = extract_snippet_fields(flow)
-#
-# payload = {
-# "revision": { "version": 0 },
-# "component": {
-# "name": flow.get("name", "ImportedGroup"),
-# "position": { "x": 0.0, "y": 0.0 },
-# "flowSnippet": snippet
-# }
-# }
-#
-# url = f"{nifi_host}/nifi-api/process-groups/{root_pg_id}/process-groups"
-# resp = requests.post(url, headers=headers, json=payload, verify=verify_ssl)
-#
-# if resp.status_code == 201:
-# print("✅ Flow uploaded successfully!")
-# pg_id = resp.json()["component"]["id"]
-# print(f"🔗 View it at: {nifi_host}/nifi/#/process-groups/{root_pg_id}/process-group/{pg_id}")
-# else:
-# print(f"❌ Upload failed: {resp.status_code} - {resp.text}")
-#
-# except Exception as e:
-# print(f"🚨 Error: {e}")
-#
-#
-#
-#
-# #####################################################################################
-#
-#
-# def get_nifi_token(nifi_url, username, password, verify_ssl=False):
-# """
-# Get authentication token from NiFi.
-# """
-# token_url = f"{nifi_url}/access/token"
-# headers = {
-# "Content-Type": "application/x-www-form-urlencoded"
-# }
-# data = {
-# "username": username,
-# "password": password
-# }
-#
-# response = requests.post(token_url, headers=headers, data=data, verify=verify_ssl)
-#
-# if response.ok:
-# return response.text
-# else:
-# print(f"Failed to get token: {response.status_code}")
-# print(response.text)
-# response.raise_for_status()
-#
-# def get_root_process_group_id(nifi_url, token, verify_ssl=False):
-# """
-# Get the root process group ID from NiFi.
-# """
-# root_url = f"{nifi_url}/flow/process-groups/root"
-# headers = {
-# "Authorization": f"Bearer {token}"
-# }
-#
-# response = requests.get(root_url, headers=headers, verify=verify_ssl)
-#
-# if response.ok:
-# return response.json()["processGroupFlow"]["id"]
-# else:
-# print(f"Failed to get root process group ID: {response.status_code}")
-# print(response.text)
-# response.raise_for_status()
-#
-#
-# def upload_nifi_pipeline_nifi_2_1(file_path, nifi_url, username, password, process_group_id=None, verify_ssl=False):
-# """
-# Authenticate to NiFi 2.1+, fetch root process group ID, and upload the flow definition JSON.
-# """
-# token = get_nifi_token(nifi_url, username, password, verify_ssl)
-#
-# if not process_group_id:
-# process_group_id = get_root_process_group_id(nifi_url, token, verify_ssl)
-#
-# with open(file_path, 'r') as f:
-# pipeline_json = json.load(f)
-#
-# import_endpoint = f"{nifi_url}/versions/process-groups/{process_group_id}/import"
-# headers = {
-# "Content-Type": "application/json",
-# "Accept": "application/json",
-# "Authorization": f"Bearer {token}"
-# }
-#
-# response = requests.post(
-# import_endpoint,
-# headers=headers,
-# data=json.dumps(pipeline_json),
-# verify=verify_ssl
-# )
-#
-# if response.ok:
-# print("✅ Pipeline uploaded successfully (NiFi 2.1).")
-# return response.json()
-# else:
-# print(f"❌ Failed to upload pipeline: {response.status_code}")
-# print(response.text)
-# response.raise_for_status()
-#
-#
-#
-# #################### Testing ###################
-# upload_nifi_pipeline_nifi_2_1(
-# file_path="templates/basic_ETL.json",
-# nifi_url="https://127.0.0.1.nip.io/nifi-api",
-# username="lab08nifiuser",
-# password="tartunifi2023",
-# #process_group_id="5bb69687-0194-1000-ce98-931e7e192b3d",
-# verify_ssl=False # set to True if using trusted SSL certs
-# )
+import requests
+import config
+
+
+
+# export TOKEN=$(curl -k -X POST https://127.0.0.1.nip.io/nifi-api/access/token\
+# -H "Content-Type: application/x-www-form-urlencoded" -d 'username=lab08nifiuser&password=tartunifi2023')
+
+def get_access_token():
+ token_resp = requests.post(
+ f"{config.NIFI_HOST}/nifi-api/access/token",
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ data={"username": config.NIFI_USER, "password": config.NIFI_PASS},
+ verify=False
+ )
+ token_resp.raise_for_status()
+ token = token_resp.text.strip()
+ return token
+
+
+# curl -sk -X POST "https://127.0.0.1.nip.io/nifi-api/process-groups/root/process-groups/upload" -H "Authorization: Bearer $TOKEN"\
+# -F "file=@pipelines/test_pipeline.json" -F "groupName=MyNewProcessGroup" -F "positionX=0" -F "positionY=0"
+# -F "disconnectedNodeAcknowledged=false" -F "clientId=unique-client-id-123"
+
+
+def upload_nifi_pipeline(token, pipeline_path, processorGroup_name, username=config.NIFI_USER, password=config.NIFI_PASS, nifi_url=config.NIFI_HOST, position_x=0, position_y=0, client_id = "unique-client-id-123"):
+
+ with open(pipeline_path, "r") as json_file:
+ files = {
+ "file": json_file,
+ "groupName": (None, processorGroup_name),
+ "positionX": (None, str(position_x)),
+ "positionY": (None, str(position_y)),
+ "disconnectedNodeAcknowledged": (None, "false"),
+ "clientId": (None, client_id)
+ }
+
+ upload_resp = requests.post(
+ f"{nifi_url}/nifi-api/process-groups/root/process-groups/upload",
+ headers={"Authorization": f"Bearer {token}"},
+ files=files,
+ verify=False
+ )
+
+ upload_resp.raise_for_status()
+ print(f"✅ Uploaded process group '{processorGroup_name}' successfully!")
diff --git a/modules/nifi/templates/basic_ETL.json b/modules/nifi/templates/basic_ETL.json
index 34a0297..076610a 100644
--- a/modules/nifi/templates/basic_ETL.json
+++ b/modules/nifi/templates/basic_ETL.json
@@ -28,7 +28,7 @@
},
"properties": {
"Regular Expression": "(?s)(^.*$)",
- "Replacement Value": "weather,city=\"Tartu\" Temperature=${temperature},Wind=${wind_speed}",
+ "Replacement Value": "placeholder",
"Evaluation Mode": "Entire text",
"Text to Prepend": null,
"Line-by-Line Evaluation Mode": "All",
@@ -147,7 +147,7 @@
"Socket Read Timeout": "15 secs",
"Socket Idle Connections": "5",
"Request Body Enabled": "true",
- "HTTP URL": "https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72&current=temperature_2m,wind_speed_10m",
+ "HTTP URL": "placeholder",
"Request OAuth2 Access Token Provider": null,
"Socket Idle Timeout": "5 mins",
"Response Redirects Enabled": "True",
@@ -457,10 +457,8 @@
"Destination": "flowfile-attribute",
"Max String Length": "20 MB",
"Return Type": "auto-detect",
- "latitude": "$.latitude",
"Null Value Representation": "empty string",
- "Path Not Found Behavior": "ignore",
- "longitude": "$.longitude"
+ "Path Not Found Behavior": "ignore"
},
"propertyDescriptors": {
"Destination": {