summaryrefslogtreecommitdiff
path: root/modules/nifi/nifi_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'modules/nifi/nifi_utils.py')
-rw-r--r--modules/nifi/nifi_utils.py212
1 files changed, 46 insertions, 166 deletions
diff --git a/modules/nifi/nifi_utils.py b/modules/nifi/nifi_utils.py
index 34e6ad8..88953d6 100644
--- a/modules/nifi/nifi_utils.py
+++ b/modules/nifi/nifi_utils.py
@@ -1,166 +1,46 @@
-# import requests
-# import json
-#
-# def extract_snippet_fields(flow):
-# return {
-# key: flow.get(key, [])
-# for key in [
-# "processors",
-# "connections",
-# "funnels",
-# "inputPorts",
-# "outputPorts",
-# "labels",
-# "controllerServices",
-# "processGroups"
-# ]
-# }
-#
-# def upload_nifi_exported_flow(
-# nifi_host: str,
-# username: str,
-# password: str,
-# json_file_path: str,
-# verify_ssl: bool = False
-# ):
-# try:
-# token_resp = requests.post(
-# f"{nifi_host}/nifi-api/access/token",
-# data={"username": username, "password": password},
-# verify=verify_ssl
-# )
-# token_resp.raise_for_status()
-# token = token_resp.text
-#
-# headers = {
-# "Authorization": f"Bearer {token}",
-# "Content-Type": "application/json"
-# }
-#
-# root_resp = requests.get(f"{nifi_host}/nifi-api/flow/process-groups/root", headers=headers, verify=verify_ssl)
-# root_resp.raise_for_status()
-# root_pg_id = root_resp.json()["processGroupFlow"]["id"]
-#
-# with open(json_file_path, "r") as f:
-# raw = json.load(f)
-#
-# flow = raw.get("flowContents")
-# if not flow:
-# raise ValueError("Missing 'flowContents' in provided file.")
-#
-# snippet = extract_snippet_fields(flow)
-#
-# payload = {
-# "revision": { "version": 0 },
-# "component": {
-# "name": flow.get("name", "ImportedGroup"),
-# "position": { "x": 0.0, "y": 0.0 },
-# "flowSnippet": snippet
-# }
-# }
-#
-# url = f"{nifi_host}/nifi-api/process-groups/{root_pg_id}/process-groups"
-# resp = requests.post(url, headers=headers, json=payload, verify=verify_ssl)
-#
-# if resp.status_code == 201:
-# print("✅ Flow uploaded successfully!")
-# pg_id = resp.json()["component"]["id"]
-# print(f"🔗 View it at: {nifi_host}/nifi/#/process-groups/{root_pg_id}/process-group/{pg_id}")
-# else:
-# print(f"❌ Upload failed: {resp.status_code} - {resp.text}")
-#
-# except Exception as e:
-# print(f"🚨 Error: {e}")
-#
-#
-#
-#
-# #####################################################################################
-#
-#
-# def get_nifi_token(nifi_url, username, password, verify_ssl=False):
-# """
-# Get authentication token from NiFi.
-# """
-# token_url = f"{nifi_url}/access/token"
-# headers = {
-# "Content-Type": "application/x-www-form-urlencoded"
-# }
-# data = {
-# "username": username,
-# "password": password
-# }
-#
-# response = requests.post(token_url, headers=headers, data=data, verify=verify_ssl)
-#
-# if response.ok:
-# return response.text
-# else:
-# print(f"Failed to get token: {response.status_code}")
-# print(response.text)
-# response.raise_for_status()
-#
-# def get_root_process_group_id(nifi_url, token, verify_ssl=False):
-# """
-# Get the root process group ID from NiFi.
-# """
-# root_url = f"{nifi_url}/flow/process-groups/root"
-# headers = {
-# "Authorization": f"Bearer {token}"
-# }
-#
-# response = requests.get(root_url, headers=headers, verify=verify_ssl)
-#
-# if response.ok:
-# return response.json()["processGroupFlow"]["id"]
-# else:
-# print(f"Failed to get root process group ID: {response.status_code}")
-# print(response.text)
-# response.raise_for_status()
-#
-#
-# def upload_nifi_pipeline_nifi_2_1(file_path, nifi_url, username, password, process_group_id=None, verify_ssl=False):
-# """
-# Authenticate to NiFi 2.1+, fetch root process group ID, and upload the flow definition JSON.
-# """
-# token = get_nifi_token(nifi_url, username, password, verify_ssl)
-#
-# if not process_group_id:
-# process_group_id = get_root_process_group_id(nifi_url, token, verify_ssl)
-#
-# with open(file_path, 'r') as f:
-# pipeline_json = json.load(f)
-#
-# import_endpoint = f"{nifi_url}/versions/process-groups/{process_group_id}/import"
-# headers = {
-# "Content-Type": "application/json",
-# "Accept": "application/json",
-# "Authorization": f"Bearer {token}"
-# }
-#
-# response = requests.post(
-# import_endpoint,
-# headers=headers,
-# data=json.dumps(pipeline_json),
-# verify=verify_ssl
-# )
-#
-# if response.ok:
-# print("✅ Pipeline uploaded successfully (NiFi 2.1).")
-# return response.json()
-# else:
-# print(f"❌ Failed to upload pipeline: {response.status_code}")
-# print(response.text)
-# response.raise_for_status()
-#
-#
-#
-# #################### Testing ###################
-# upload_nifi_pipeline_nifi_2_1(
-# file_path="templates/basic_ETL.json",
-# nifi_url="https://127.0.0.1.nip.io/nifi-api",
-# username="lab08nifiuser",
-# password="tartunifi2023",
-# #process_group_id="5bb69687-0194-1000-ce98-931e7e192b3d",
-# verify_ssl=False # set to True if using trusted SSL certs
-# )
+import requests
+import config
+
+
+
+# export TOKEN=$(curl -k -X POST https://127.0.0.1.nip.io/nifi-api/access/token\
+# -H "Content-Type: application/x-www-form-urlencoded" -d 'username=lab08nifiuser&password=tartunifi2023')
+
+def get_access_token():
+ token_resp = requests.post(
+ f"{config.NIFI_HOST}/nifi-api/access/token",
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ data={"username": config.NIFI_USER, "password": config.NIFI_PASS},
+ verify=False
+ )
+ token_resp.raise_for_status()
+ token = token_resp.text.strip()
+ return token
+
+
+# curl -sk -X POST "https://127.0.0.1.nip.io/nifi-api/process-groups/root/process-groups/upload" -H "Authorization: Bearer $TOKEN"\
+# -F "file=@pipelines/test_pipeline.json" -F "groupName=MyNewProcessGroup" -F "positionX=0" -F "positionY=0"
+# -F "disconnectedNodeAcknowledged=false" -F "clientId=unique-client-id-123"
+
+
+def upload_nifi_pipeline(token, pipeline_path, processorGroup_name, username=config.NIFI_USER, password=config.NIFI_PASS, nifi_url=config.NIFI_HOST, position_x=0, position_y=0, client_id = "unique-client-id-123"):
+
+ with open(pipeline_path, "r") as json_file:
+ files = {
+ "file": json_file,
+ "groupName": (None, processorGroup_name),
+ "positionX": (None, str(position_x)),
+ "positionY": (None, str(position_y)),
+ "disconnectedNodeAcknowledged": (None, "false"),
+ "clientId": (None, client_id)
+ }
+
+ upload_resp = requests.post(
+ f"{nifi_url}/nifi-api/process-groups/root/process-groups/upload",
+ headers={"Authorization": f"Bearer {token}"},
+ files=files,
+ verify=False
+ )
+
+ upload_resp.raise_for_status()
+ print(f"✅ Uploaded process group '{processorGroup_name}' successfully!")