summaryrefslogtreecommitdiff
path: root/modules/nifi/nifi_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'modules/nifi/nifi_utils.py')
-rw-r--r--modules/nifi/nifi_utils.py248
1 files changed, 166 insertions, 82 deletions
diff --git a/modules/nifi/nifi_utils.py b/modules/nifi/nifi_utils.py
index 9994d61..34e6ad8 100644
--- a/modules/nifi/nifi_utils.py
+++ b/modules/nifi/nifi_utils.py
@@ -1,82 +1,166 @@
-import requests
-import json
-
-##############################
-
-def extract_snippet_fields(flow):
- return {
- key: flow.get(key, [])
- for key in [
- "processors",
- "connections",
- "funnels",
- "inputPorts",
- "outputPorts",
- "labels",
- "controllerServices",
- "processGroups"
- ]
- }
-
-def upload_nifi_exported_flow(
- nifi_host: str,
- username: str,
- password: str,
- json_file_path: str,
- verify_ssl: bool = False
-):
- try:
- token_resp = requests.post(
- f"{nifi_host}/nifi-api/access/token",
- data={"username": username, "password": password},
- verify=verify_ssl
- )
- token_resp.raise_for_status()
- token = token_resp.text
-
- headers = {
- "Authorization": f"Bearer {token}",
- "Content-Type": "application/json"
- }
-
- root_resp = requests.get(f"{nifi_host}/nifi-api/flow/process-groups/root", headers=headers, verify=verify_ssl)
- root_resp.raise_for_status()
- root_pg_id = root_resp.json()["processGroupFlow"]["id"]
-
- with open(json_file_path, "r") as f:
- raw = json.load(f)
-
- flow = raw.get("flowContents")
- if not flow:
- raise ValueError("Missing 'flowContents' in provided file.")
-
- snippet = extract_snippet_fields(flow)
-
- payload = {
- "revision": { "version": 0 },
- "component": {
- "name": flow.get("name", "ImportedGroup"),
- "position": { "x": 0.0, "y": 0.0 },
- "flowSnippet": snippet
- }
- }
-
- url = f"{nifi_host}/nifi-api/process-groups/{root_pg_id}/process-groups"
- resp = requests.post(url, headers=headers, json=payload, verify=verify_ssl)
-
- if resp.status_code == 201:
- print("✅ Flow uploaded successfully!")
- pg_id = resp.json()["component"]["id"]
- print(f"🔗 View it at: {nifi_host}/nifi/#/process-groups/{root_pg_id}/process-group/{pg_id}")
- else:
- print(f"❌ Upload failed: {resp.status_code} - {resp.text}")
-
- except Exception as e:
- print(f"🚨 Error: {e}")
-
-
-
-
-
-
-
+# import requests
+# import json
+#
+# def extract_snippet_fields(flow):
+# return {
+# key: flow.get(key, [])
+# for key in [
+# "processors",
+# "connections",
+# "funnels",
+# "inputPorts",
+# "outputPorts",
+# "labels",
+# "controllerServices",
+# "processGroups"
+# ]
+# }
+#
+# def upload_nifi_exported_flow(
+# nifi_host: str,
+# username: str,
+# password: str,
+# json_file_path: str,
+# verify_ssl: bool = False
+# ):
+# try:
+# token_resp = requests.post(
+# f"{nifi_host}/nifi-api/access/token",
+# data={"username": username, "password": password},
+# verify=verify_ssl
+# )
+# token_resp.raise_for_status()
+# token = token_resp.text
+#
+# headers = {
+# "Authorization": f"Bearer {token}",
+# "Content-Type": "application/json"
+# }
+#
+# root_resp = requests.get(f"{nifi_host}/nifi-api/flow/process-groups/root", headers=headers, verify=verify_ssl)
+# root_resp.raise_for_status()
+# root_pg_id = root_resp.json()["processGroupFlow"]["id"]
+#
+# with open(json_file_path, "r") as f:
+# raw = json.load(f)
+#
+# flow = raw.get("flowContents")
+# if not flow:
+# raise ValueError("Missing 'flowContents' in provided file.")
+#
+# snippet = extract_snippet_fields(flow)
+#
+# payload = {
+# "revision": { "version": 0 },
+# "component": {
+# "name": flow.get("name", "ImportedGroup"),
+# "position": { "x": 0.0, "y": 0.0 },
+# "flowSnippet": snippet
+# }
+# }
+#
+# url = f"{nifi_host}/nifi-api/process-groups/{root_pg_id}/process-groups"
+# resp = requests.post(url, headers=headers, json=payload, verify=verify_ssl)
+#
+# if resp.status_code == 201:
+# print("✅ Flow uploaded successfully!")
+# pg_id = resp.json()["component"]["id"]
+# print(f"🔗 View it at: {nifi_host}/nifi/#/process-groups/{root_pg_id}/process-group/{pg_id}")
+# else:
+# print(f"❌ Upload failed: {resp.status_code} - {resp.text}")
+#
+# except Exception as e:
+# print(f"🚨 Error: {e}")
+#
+#
+#
+#
+# #####################################################################################
+#
+#
+# def get_nifi_token(nifi_url, username, password, verify_ssl=False):
+# """
+# Get authentication token from NiFi.
+# """
+# token_url = f"{nifi_url}/access/token"
+# headers = {
+# "Content-Type": "application/x-www-form-urlencoded"
+# }
+# data = {
+# "username": username,
+# "password": password
+# }
+#
+# response = requests.post(token_url, headers=headers, data=data, verify=verify_ssl)
+#
+# if response.ok:
+# return response.text
+# else:
+# print(f"Failed to get token: {response.status_code}")
+# print(response.text)
+# response.raise_for_status()
+#
+# def get_root_process_group_id(nifi_url, token, verify_ssl=False):
+# """
+# Get the root process group ID from NiFi.
+# """
+# root_url = f"{nifi_url}/flow/process-groups/root"
+# headers = {
+# "Authorization": f"Bearer {token}"
+# }
+#
+# response = requests.get(root_url, headers=headers, verify=verify_ssl)
+#
+# if response.ok:
+# return response.json()["processGroupFlow"]["id"]
+# else:
+# print(f"Failed to get root process group ID: {response.status_code}")
+# print(response.text)
+# response.raise_for_status()
+#
+#
+# def upload_nifi_pipeline_nifi_2_1(file_path, nifi_url, username, password, process_group_id=None, verify_ssl=False):
+# """
+# Authenticate to NiFi 2.1+, fetch root process group ID, and upload the flow definition JSON.
+# """
+# token = get_nifi_token(nifi_url, username, password, verify_ssl)
+#
+# if not process_group_id:
+# process_group_id = get_root_process_group_id(nifi_url, token, verify_ssl)
+#
+# with open(file_path, 'r') as f:
+# pipeline_json = json.load(f)
+#
+# import_endpoint = f"{nifi_url}/versions/process-groups/{process_group_id}/import"
+# headers = {
+# "Content-Type": "application/json",
+# "Accept": "application/json",
+# "Authorization": f"Bearer {token}"
+# }
+#
+# response = requests.post(
+# import_endpoint,
+# headers=headers,
+# data=json.dumps(pipeline_json),
+# verify=verify_ssl
+# )
+#
+# if response.ok:
+# print("✅ Pipeline uploaded successfully (NiFi 2.1).")
+# return response.json()
+# else:
+# print(f"❌ Failed to upload pipeline: {response.status_code}")
+# print(response.text)
+# response.raise_for_status()
+#
+#
+#
+# #################### Testing ###################
+# upload_nifi_pipeline_nifi_2_1(
+# file_path="templates/basic_ETL.json",
+# nifi_url="https://127.0.0.1.nip.io/nifi-api",
+# username="lab08nifiuser",
+# password="tartunifi2023",
+# #process_group_id="5bb69687-0194-1000-ce98-931e7e192b3d",
+# verify_ssl=False # set to True if using trusted SSL certs
+# )