summaryrefslogtreecommitdiff
path: root/modules/nifi/nifi_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'modules/nifi/nifi_utils.py')
-rw-r--r--modules/nifi/nifi_utils.py82
1 files changed, 82 insertions, 0 deletions
diff --git a/modules/nifi/nifi_utils.py b/modules/nifi/nifi_utils.py
new file mode 100644
index 0000000..9994d61
--- /dev/null
+++ b/modules/nifi/nifi_utils.py
@@ -0,0 +1,82 @@
+import requests
+import json
+
+##############################
+
+def extract_snippet_fields(flow):
+ return {
+ key: flow.get(key, [])
+ for key in [
+ "processors",
+ "connections",
+ "funnels",
+ "inputPorts",
+ "outputPorts",
+ "labels",
+ "controllerServices",
+ "processGroups"
+ ]
+ }
+
+def upload_nifi_exported_flow(
+ nifi_host: str,
+ username: str,
+ password: str,
+ json_file_path: str,
+ verify_ssl: bool = False
+):
+ try:
+ token_resp = requests.post(
+ f"{nifi_host}/nifi-api/access/token",
+ data={"username": username, "password": password},
+ verify=verify_ssl
+ )
+ token_resp.raise_for_status()
+ token = token_resp.text
+
+ headers = {
+ "Authorization": f"Bearer {token}",
+ "Content-Type": "application/json"
+ }
+
+ root_resp = requests.get(f"{nifi_host}/nifi-api/flow/process-groups/root", headers=headers, verify=verify_ssl)
+ root_resp.raise_for_status()
+ root_pg_id = root_resp.json()["processGroupFlow"]["id"]
+
+ with open(json_file_path, "r") as f:
+ raw = json.load(f)
+
+ flow = raw.get("flowContents")
+ if not flow:
+ raise ValueError("Missing 'flowContents' in provided file.")
+
+ snippet = extract_snippet_fields(flow)
+
+ payload = {
+ "revision": { "version": 0 },
+ "component": {
+ "name": flow.get("name", "ImportedGroup"),
+ "position": { "x": 0.0, "y": 0.0 },
+ "flowSnippet": snippet
+ }
+ }
+
+ url = f"{nifi_host}/nifi-api/process-groups/{root_pg_id}/process-groups"
+ resp = requests.post(url, headers=headers, json=payload, verify=verify_ssl)
+
+ if resp.status_code == 201:
+ print("✅ Flow uploaded successfully!")
+ pg_id = resp.json()["component"]["id"]
+ print(f"🔗 View it at: {nifi_host}/nifi/#/process-groups/{root_pg_id}/process-group/{pg_id}")
+ else:
+ print(f"❌ Upload failed: {resp.status_code} - {resp.text}")
+
+ except Exception as e:
+ print(f"🚨 Error: {e}")
+
+
+
+
+
+
+