diff options
author | Rasmus Luha <rasmus.luha@ut.ee> | 2025-04-21 21:54:58 +0300 |
---|---|---|
committer | Rasmus Luha <rasmus.luha@ut.ee> | 2025-04-21 21:54:58 +0300 |
commit | 0190bb2cf61cf36906201445cbecc4a2b6b22af1 (patch) | |
tree | c4c0056b8473c11db7e0959ce601ee79c46862ef | |
parent | 5bbcd5b6bc8340eedb7e8dcd88e156a0987d42f6 (diff) |
add more config vars and option to opt in and out of interactive mode, other minor fixes
-rw-r--r-- | common/core.py | 7 | ||||
-rw-r--r-- | config.py | 18 | ||||
-rw-r--r-- | modules/nifi/core.py | 38 | ||||
-rw-r--r-- | modules/nifi/nifi_utils.py | 248 | ||||
-rw-r--r-- | modules/nifi/templates/protsessorite_järjekorrad.txt | 18 |
5 files changed, 233 insertions, 96 deletions
diff --git a/common/core.py b/common/core.py index 3d4fac1..6160ee3 100644 --- a/common/core.py +++ b/common/core.py @@ -96,6 +96,7 @@ def inspect_json_top_level(json_data): def inspect_json_top_level_test(json_data, has_list=False): path = "" + last_key = "value" #Placeholder while True: print(json.dumps(json_data, indent=2)) @@ -113,6 +114,7 @@ def inspect_json_top_level_test(json_data, has_list=False): selected_key = keys[selected_index] selected_value = json_data[selected_key] path += "." + selected_key + last_key = selected_key elif isinstance(json_data, list): has_list = True @@ -123,11 +125,12 @@ def inspect_json_top_level_test(json_data, has_list=False): selected_index = ask_digit_input(len(json_data) - 1) selected_value = json_data[selected_index] path += f"[{selected_index}]" + last_key = str(selected_index) else: # Primitive value, nothing to dive into print(f"\nLõppväärtus: {json_data}") - return {"value": path} + return {last_key: path} @@ -136,5 +139,5 @@ def inspect_json_top_level_test(json_data, has_list=False): else: #print(f"\nValitud väärtus: '{selected_value}'") print(f"\nValitud väärtus: '{path}'") - return {"value": path} + return {last_key: path} @@ -1,3 +1,5 @@ +INTERACTIVE_MODE=True + ## Nifi NIFI_USER="lab08nifiuser" NIFI_PASS="tartunifi2023" @@ -5,6 +7,8 @@ NIFI_PASS="tartunifi2023" NIFI_HOST="https://127.0.0.1.nip.io" NIFI_DEPLOY=False +NIFI_MEASUREMENT_NAME="test_measurementName" + ## Database DB_URL="http://influxdb:8086/write?db=nifi_weatherData" @@ -12,3 +16,17 @@ DB_USER="admin" ## TODO - somehow must be hidden inside the pipeline in the end DB_PASS="admin" + + + +############################### + + + +## Optional +API_URL="https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72¤t_weather=true" +API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'} +API_URL_USERNAME="TODO" +API_URL_PASSWORD="TODO" +PIPELINE_SCHEDULING_PERIOD="5 sec" +PIPELINE_NAME="test_pipeline.json" diff --git a/modules/nifi/core.py b/modules/nifi/core.py index fccbf91..e4ab1f2 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -87,6 +87,7 @@ def get_data_values(): while True: chosen_json_values.update(common.inspect_json_top_level_test(json_data)) + ## Testing print("Oled hetkel valinud järgmised väärtused JSON lõppväärtused: ", ", ".join(chosen_json_values)) choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower() @@ -102,10 +103,9 @@ def get_data_values(): ## TODO - textReplace part -> fix templates +def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name): + ############### Choosing and modfyfing Template ############## -def build_pipeline(): - data_values = get_data_values() - ### Check if splitJson template needed needs_SplitJson = False path_parts = [] @@ -118,8 +118,7 @@ def build_pipeline(): ### Select template - ## TODO - unhardcoded template usage - new_pipeline_name="test_pipeline.json" + ## TODO - currently has only 2 templates... if needs_SplitJson: template_name="splitJsonETL.json" else: @@ -131,8 +130,7 @@ def build_pipeline(): ### Processor editing - ## Measurements setup - TODO: hardcoded. - measurements_name = "test_measurementName, " + measurements_name = config.NIFI_MEASUREMENT_NAME+" " if needs_SplitJson: ## SplitJson update @@ -143,7 +141,7 @@ def build_pipeline(): for key, value in data_values.items() : path_parts = value.split(']') update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1]) - measurements_name+=f"{key}={{{key}}}" + measurements_name+=f",{key}=${{{key}}}" ## Database Setup set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties") @@ -151,19 +149,35 @@ def build_pipeline(): ## EvaluateJsonPath processor setup for key, value in data_values.items() : update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) - measurements_name+=f"{key}={{{key}}}" + measurements_name+=f",{key}=${{{key}}}" ## Database Setup set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") - - ##ReplaceText update + ## ReplaceText processor update - making it compatible for timeseries database (influxDB) update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name) + ## Update scheduling Periond on API Calls + update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod) + +############################################### + +def build_pipeline(): + data_values = {} + if config.INTERACTIVE_MODE: + data_values = get_data_values() + print("\nKui tihti peaks andmekonveier jooksma? (sekundites)") + schedulingPeriod = str(common.ask_digit_input(86400))+ "sec" + new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json" + else: + data_values = config.API_FIELDS + schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD + new_pipeline_name=config.PIPELINE_NAME - print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.") + modify_all_processors(data_values, schedulingPeriod, new_pipeline_name) + print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub kaustas 'pipelines'") ## Pipeline Deployment diff --git a/modules/nifi/nifi_utils.py b/modules/nifi/nifi_utils.py index 9994d61..34e6ad8 100644 --- a/modules/nifi/nifi_utils.py +++ b/modules/nifi/nifi_utils.py @@ -1,82 +1,166 @@ -import requests -import json - -############################## - -def extract_snippet_fields(flow): - return { - key: flow.get(key, []) - for key in [ - "processors", - "connections", - "funnels", - "inputPorts", - "outputPorts", - "labels", - "controllerServices", - "processGroups" - ] - } - -def upload_nifi_exported_flow( - nifi_host: str, - username: str, - password: str, - json_file_path: str, - verify_ssl: bool = False -): - try: - token_resp = requests.post( - f"{nifi_host}/nifi-api/access/token", - data={"username": username, "password": password}, - verify=verify_ssl - ) - token_resp.raise_for_status() - token = token_resp.text - - headers = { - "Authorization": f"Bearer {token}", - "Content-Type": "application/json" - } - - root_resp = requests.get(f"{nifi_host}/nifi-api/flow/process-groups/root", headers=headers, verify=verify_ssl) - root_resp.raise_for_status() - root_pg_id = root_resp.json()["processGroupFlow"]["id"] - - with open(json_file_path, "r") as f: - raw = json.load(f) - - flow = raw.get("flowContents") - if not flow: - raise ValueError("Missing 'flowContents' in provided file.") - - snippet = extract_snippet_fields(flow) - - payload = { - "revision": { "version": 0 }, - "component": { - "name": flow.get("name", "ImportedGroup"), - "position": { "x": 0.0, "y": 0.0 }, - "flowSnippet": snippet - } - } - - url = f"{nifi_host}/nifi-api/process-groups/{root_pg_id}/process-groups" - resp = requests.post(url, headers=headers, json=payload, verify=verify_ssl) - - if resp.status_code == 201: - print("✅ Flow uploaded successfully!") - pg_id = resp.json()["component"]["id"] - print(f"🔗 View it at: {nifi_host}/nifi/#/process-groups/{root_pg_id}/process-group/{pg_id}") - else: - print(f"❌ Upload failed: {resp.status_code} - {resp.text}") - - except Exception as e: - print(f"🚨 Error: {e}") - - - - - - - +# import requests +# import json +# +# def extract_snippet_fields(flow): +# return { +# key: flow.get(key, []) +# for key in [ +# "processors", +# "connections", +# "funnels", +# "inputPorts", +# "outputPorts", +# "labels", +# "controllerServices", +# "processGroups" +# ] +# } +# +# def upload_nifi_exported_flow( +# nifi_host: str, +# username: str, +# password: str, +# json_file_path: str, +# verify_ssl: bool = False +# ): +# try: +# token_resp = requests.post( +# f"{nifi_host}/nifi-api/access/token", +# data={"username": username, "password": password}, +# verify=verify_ssl +# ) +# token_resp.raise_for_status() +# token = token_resp.text +# +# headers = { +# "Authorization": f"Bearer {token}", +# "Content-Type": "application/json" +# } +# +# root_resp = requests.get(f"{nifi_host}/nifi-api/flow/process-groups/root", headers=headers, verify=verify_ssl) +# root_resp.raise_for_status() +# root_pg_id = root_resp.json()["processGroupFlow"]["id"] +# +# with open(json_file_path, "r") as f: +# raw = json.load(f) +# +# flow = raw.get("flowContents") +# if not flow: +# raise ValueError("Missing 'flowContents' in provided file.") +# +# snippet = extract_snippet_fields(flow) +# +# payload = { +# "revision": { "version": 0 }, +# "component": { +# "name": flow.get("name", "ImportedGroup"), +# "position": { "x": 0.0, "y": 0.0 }, +# "flowSnippet": snippet +# } +# } +# +# url = f"{nifi_host}/nifi-api/process-groups/{root_pg_id}/process-groups" +# resp = requests.post(url, headers=headers, json=payload, verify=verify_ssl) +# +# if resp.status_code == 201: +# print("✅ Flow uploaded successfully!") +# pg_id = resp.json()["component"]["id"] +# print(f"🔗 View it at: {nifi_host}/nifi/#/process-groups/{root_pg_id}/process-group/{pg_id}") +# else: +# print(f"❌ Upload failed: {resp.status_code} - {resp.text}") +# +# except Exception as e: +# print(f"🚨 Error: {e}") +# +# +# +# +# ##################################################################################### +# +# +# def get_nifi_token(nifi_url, username, password, verify_ssl=False): +# """ +# Get authentication token from NiFi. +# """ +# token_url = f"{nifi_url}/access/token" +# headers = { +# "Content-Type": "application/x-www-form-urlencoded" +# } +# data = { +# "username": username, +# "password": password +# } +# +# response = requests.post(token_url, headers=headers, data=data, verify=verify_ssl) +# +# if response.ok: +# return response.text +# else: +# print(f"Failed to get token: {response.status_code}") +# print(response.text) +# response.raise_for_status() +# +# def get_root_process_group_id(nifi_url, token, verify_ssl=False): +# """ +# Get the root process group ID from NiFi. +# """ +# root_url = f"{nifi_url}/flow/process-groups/root" +# headers = { +# "Authorization": f"Bearer {token}" +# } +# +# response = requests.get(root_url, headers=headers, verify=verify_ssl) +# +# if response.ok: +# return response.json()["processGroupFlow"]["id"] +# else: +# print(f"Failed to get root process group ID: {response.status_code}") +# print(response.text) +# response.raise_for_status() +# +# +# def upload_nifi_pipeline_nifi_2_1(file_path, nifi_url, username, password, process_group_id=None, verify_ssl=False): +# """ +# Authenticate to NiFi 2.1+, fetch root process group ID, and upload the flow definition JSON. +# """ +# token = get_nifi_token(nifi_url, username, password, verify_ssl) +# +# if not process_group_id: +# process_group_id = get_root_process_group_id(nifi_url, token, verify_ssl) +# +# with open(file_path, 'r') as f: +# pipeline_json = json.load(f) +# +# import_endpoint = f"{nifi_url}/versions/process-groups/{process_group_id}/import" +# headers = { +# "Content-Type": "application/json", +# "Accept": "application/json", +# "Authorization": f"Bearer {token}" +# } +# +# response = requests.post( +# import_endpoint, +# headers=headers, +# data=json.dumps(pipeline_json), +# verify=verify_ssl +# ) +# +# if response.ok: +# print("✅ Pipeline uploaded successfully (NiFi 2.1).") +# return response.json() +# else: +# print(f"❌ Failed to upload pipeline: {response.status_code}") +# print(response.text) +# response.raise_for_status() +# +# +# +# #################### Testing ################### +# upload_nifi_pipeline_nifi_2_1( +# file_path="templates/basic_ETL.json", +# nifi_url="https://127.0.0.1.nip.io/nifi-api", +# username="lab08nifiuser", +# password="tartunifi2023", +# #process_group_id="5bb69687-0194-1000-ce98-931e7e192b3d", +# verify_ssl=False # set to True if using trusted SSL certs +# ) diff --git a/modules/nifi/templates/protsessorite_järjekorrad.txt b/modules/nifi/templates/protsessorite_järjekorrad.txt new file mode 100644 index 0000000..fe610fc --- /dev/null +++ b/modules/nifi/templates/protsessorite_järjekorrad.txt @@ -0,0 +1,18 @@ +basic_ETL.json +-------------- +0: ReplaceText +1: InvokeHTTP in +2: EvaluateJsonPath +3: InvokeHTTP Out + + + + +splitJsonETL.json +----------------- + +0: ReplaceText +1: InvokeHTTP in +2: EvaluateJsonPath +3: SplitJson +3: InvokeHTTP Out |