From 439aaa998c0a9b0defe6fa54e58c22f8420f00b7 Mon Sep 17 00:00:00 2001 From: Rasmus Luha Date: Fri, 11 Apr 2025 00:23:57 +0300 Subject: add json list items support, start nifi platform upload process --- modules/nifi/core.py | 53 ++++++++++++++++++++---------- modules/nifi/nifi_utils.py | 82 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+), 17 deletions(-) create mode 100644 modules/nifi/nifi_utils.py (limited to 'modules/nifi') diff --git a/modules/nifi/core.py b/modules/nifi/core.py index 50373a8..6746bc6 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -1,10 +1,14 @@ from pyfiglet import figlet_format from rich.console import Console from common import core as common +import config as config ## TODO - check syntax +#from modules.nifi import nifi_utils as nifi_utils + import sys import json import shutil +import requests def introduction(): @@ -14,7 +18,6 @@ def introduction(): print("Valisid Nifi Platformi!\n") -## TODO def update_template(file_path, dot_path, new_key, new_value): # Step 2: Load the copied JSON @@ -43,14 +46,19 @@ def update_template(file_path, dot_path, new_key, new_value): print("✅ Changes saved.") -### Example Usage ### -# copy_and_modify_json( -# "template.json", -# "pipeline_copy.json", -# "flowContents.processors[1].properties", -# "New Config Key", -# "New Config Value" -# ) + + +def set_database_credentials(file_path,dot_path): + ## Update URL + update_template(file_path, dot_path, "HTTP URL", config.DB_URL) + + ## Update username + update_template(file_path, dot_path, "username", config.DB_USER) + + ## Update username + update_template(file_path, dot_path, "password", config.DB_PASS) + + @@ -76,8 +84,8 @@ def get_data_values(): if api_url_correct: while True: - chosen_json_values.update(common.inspect_json_top_level(json_data)) - print("Oled hetkel valinud järgmised väärtused:", ", ".join(chosen_json_values)) + chosen_json_values.update(common.inspect_json_top_level_test(json_data)) + print("Oled hetkel valinud järgmised väärtused JSON lõppväärtused: ", ", ".join(chosen_json_values)) choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower() if choose_another == 'e': @@ -88,17 +96,28 @@ def get_data_values(): print("Väljun programmist.") sys.exit() + + +def update_template_with_json_list(): + update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) + +## TODO - textReplace part -> fix templates def build_pipeline(): data_values = get_data_values() - ## TODO - shutil.copy("modules/nifi/templates/basic_ETL.json", "pipelines/test_pipeline.json") - - ## TODO + ## TODO - unhardcode + new_pipeline_path = "pipelines/test_pipeline.json" + shutil.copy("modules/nifi/templates/basic_ETL.json", new_pipeline_path) + ## TODO - make a function for different types ... etc for key, value in data_values.items() : - #print (key, value) - update_template("pipelines/test_pipeline.json", "flowContents.processors[2].properties", key, value) + update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) + + set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") + print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.") + ## TODO - not working + #nifi_utils.upload_nifi_exported_flow( nifi_host="https://127.0.0.1.nip.io", username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False) + diff --git a/modules/nifi/nifi_utils.py b/modules/nifi/nifi_utils.py new file mode 100644 index 0000000..9994d61 --- /dev/null +++ b/modules/nifi/nifi_utils.py @@ -0,0 +1,82 @@ +import requests +import json + +############################## + +def extract_snippet_fields(flow): + return { + key: flow.get(key, []) + for key in [ + "processors", + "connections", + "funnels", + "inputPorts", + "outputPorts", + "labels", + "controllerServices", + "processGroups" + ] + } + +def upload_nifi_exported_flow( + nifi_host: str, + username: str, + password: str, + json_file_path: str, + verify_ssl: bool = False +): + try: + token_resp = requests.post( + f"{nifi_host}/nifi-api/access/token", + data={"username": username, "password": password}, + verify=verify_ssl + ) + token_resp.raise_for_status() + token = token_resp.text + + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json" + } + + root_resp = requests.get(f"{nifi_host}/nifi-api/flow/process-groups/root", headers=headers, verify=verify_ssl) + root_resp.raise_for_status() + root_pg_id = root_resp.json()["processGroupFlow"]["id"] + + with open(json_file_path, "r") as f: + raw = json.load(f) + + flow = raw.get("flowContents") + if not flow: + raise ValueError("Missing 'flowContents' in provided file.") + + snippet = extract_snippet_fields(flow) + + payload = { + "revision": { "version": 0 }, + "component": { + "name": flow.get("name", "ImportedGroup"), + "position": { "x": 0.0, "y": 0.0 }, + "flowSnippet": snippet + } + } + + url = f"{nifi_host}/nifi-api/process-groups/{root_pg_id}/process-groups" + resp = requests.post(url, headers=headers, json=payload, verify=verify_ssl) + + if resp.status_code == 201: + print("✅ Flow uploaded successfully!") + pg_id = resp.json()["component"]["id"] + print(f"🔗 View it at: {nifi_host}/nifi/#/process-groups/{root_pg_id}/process-group/{pg_id}") + else: + print(f"❌ Upload failed: {resp.status_code} - {resp.text}") + + except Exception as e: + print(f"🚨 Error: {e}") + + + + + + + -- cgit v1.2.3