diff options
-rw-r--r-- | config.py | 8 | ||||
-rw-r--r-- | modules/nifi/core.py | 14 | ||||
-rw-r--r-- | modules/nifi/nifi_utils.py | 212 | ||||
-rw-r--r-- | modules/nifi/templates/basic_ETL.json | 8 |
4 files changed, 63 insertions, 179 deletions
@@ -1,4 +1,4 @@ -INTERACTIVE_MODE=True +INTERACTIVE_MODE=False ## Nifi NIFI_USER="lab08nifiuser" @@ -13,8 +13,6 @@ NIFI_MEASUREMENT_NAME="test_measurementName" ## Database DB_URL="http://influxdb:8086/write?db=nifi_weatherData" DB_USER="admin" - -## TODO - somehow must be hidden inside the pipeline in the end DB_PASS="admin" @@ -26,7 +24,7 @@ DB_PASS="admin" ## Needed if Interactive mode turned off API_URL="https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72¤t_weather=true" API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'} -API_USERNAME="TODO" -API_PASSWORD="TODO" +API_USERNAME="Placeholder" +API_PASSWORD="Placehoder" PIPELINE_SCHEDULING_PERIOD="5 sec" PIPELINE_NAME="test_pipeline.json" diff --git a/modules/nifi/core.py b/modules/nifi/core.py index 24489e4..4b7f748 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -1,7 +1,7 @@ ## TODO - check syntax from common import core as common import config as config -from modules.nifi import nifi_utils as nifi_utils +from modules.nifi import nifi_utils from pyfiglet import figlet_format @@ -195,7 +195,15 @@ def build_pipeline(): modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password) print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.") + + ## Pipeline Deployment if (config.NIFI_DEPLOY): - nifi_utils.upload_nifi_exported_flow( nifi_host=config.NIFI_HOST, username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False) - print("Andmekonveier on deploytud - TODO") + token = nifi_utils.get_access_token() + nifi_utils.upload_nifi_pipeline(token, "pipelines/test_pipeline.json", "test_pipeline", username=config.NIFI_USER, password=config.NIFI_PASS, nifi_url=config.NIFI_HOST, position_x=0, position_y=0) + else: + choice = common.ask_binary_input(prompt="\nKas soovid genereeritud andmekonveieri nifi platvormile paigaldada?(jah/ei): ",valikud=["jah","ei"]).strip().lower() + if choice == "jah": + token = nifi_utils.get_access_token() + nifi_utils.upload_nifi_pipeline(token, "pipelines/test_pipeline.json", "test_pipeline", username=config.NIFI_USER, password=config.NIFI_PASS, nifi_url=config.NIFI_HOST, position_x=0, position_y=0) + diff --git a/modules/nifi/nifi_utils.py b/modules/nifi/nifi_utils.py index 34e6ad8..88953d6 100644 --- a/modules/nifi/nifi_utils.py +++ b/modules/nifi/nifi_utils.py @@ -1,166 +1,46 @@ -# import requests -# import json -# -# def extract_snippet_fields(flow): -# return { -# key: flow.get(key, []) -# for key in [ -# "processors", -# "connections", -# "funnels", -# "inputPorts", -# "outputPorts", -# "labels", -# "controllerServices", -# "processGroups" -# ] -# } -# -# def upload_nifi_exported_flow( -# nifi_host: str, -# username: str, -# password: str, -# json_file_path: str, -# verify_ssl: bool = False -# ): -# try: -# token_resp = requests.post( -# f"{nifi_host}/nifi-api/access/token", -# data={"username": username, "password": password}, -# verify=verify_ssl -# ) -# token_resp.raise_for_status() -# token = token_resp.text -# -# headers = { -# "Authorization": f"Bearer {token}", -# "Content-Type": "application/json" -# } -# -# root_resp = requests.get(f"{nifi_host}/nifi-api/flow/process-groups/root", headers=headers, verify=verify_ssl) -# root_resp.raise_for_status() -# root_pg_id = root_resp.json()["processGroupFlow"]["id"] -# -# with open(json_file_path, "r") as f: -# raw = json.load(f) -# -# flow = raw.get("flowContents") -# if not flow: -# raise ValueError("Missing 'flowContents' in provided file.") -# -# snippet = extract_snippet_fields(flow) -# -# payload = { -# "revision": { "version": 0 }, -# "component": { -# "name": flow.get("name", "ImportedGroup"), -# "position": { "x": 0.0, "y": 0.0 }, -# "flowSnippet": snippet -# } -# } -# -# url = f"{nifi_host}/nifi-api/process-groups/{root_pg_id}/process-groups" -# resp = requests.post(url, headers=headers, json=payload, verify=verify_ssl) -# -# if resp.status_code == 201: -# print("✅ Flow uploaded successfully!") -# pg_id = resp.json()["component"]["id"] -# print(f"🔗 View it at: {nifi_host}/nifi/#/process-groups/{root_pg_id}/process-group/{pg_id}") -# else: -# print(f"❌ Upload failed: {resp.status_code} - {resp.text}") -# -# except Exception as e: -# print(f"🚨 Error: {e}") -# -# -# -# -# ##################################################################################### -# -# -# def get_nifi_token(nifi_url, username, password, verify_ssl=False): -# """ -# Get authentication token from NiFi. -# """ -# token_url = f"{nifi_url}/access/token" -# headers = { -# "Content-Type": "application/x-www-form-urlencoded" -# } -# data = { -# "username": username, -# "password": password -# } -# -# response = requests.post(token_url, headers=headers, data=data, verify=verify_ssl) -# -# if response.ok: -# return response.text -# else: -# print(f"Failed to get token: {response.status_code}") -# print(response.text) -# response.raise_for_status() -# -# def get_root_process_group_id(nifi_url, token, verify_ssl=False): -# """ -# Get the root process group ID from NiFi. -# """ -# root_url = f"{nifi_url}/flow/process-groups/root" -# headers = { -# "Authorization": f"Bearer {token}" -# } -# -# response = requests.get(root_url, headers=headers, verify=verify_ssl) -# -# if response.ok: -# return response.json()["processGroupFlow"]["id"] -# else: -# print(f"Failed to get root process group ID: {response.status_code}") -# print(response.text) -# response.raise_for_status() -# -# -# def upload_nifi_pipeline_nifi_2_1(file_path, nifi_url, username, password, process_group_id=None, verify_ssl=False): -# """ -# Authenticate to NiFi 2.1+, fetch root process group ID, and upload the flow definition JSON. -# """ -# token = get_nifi_token(nifi_url, username, password, verify_ssl) -# -# if not process_group_id: -# process_group_id = get_root_process_group_id(nifi_url, token, verify_ssl) -# -# with open(file_path, 'r') as f: -# pipeline_json = json.load(f) -# -# import_endpoint = f"{nifi_url}/versions/process-groups/{process_group_id}/import" -# headers = { -# "Content-Type": "application/json", -# "Accept": "application/json", -# "Authorization": f"Bearer {token}" -# } -# -# response = requests.post( -# import_endpoint, -# headers=headers, -# data=json.dumps(pipeline_json), -# verify=verify_ssl -# ) -# -# if response.ok: -# print("✅ Pipeline uploaded successfully (NiFi 2.1).") -# return response.json() -# else: -# print(f"❌ Failed to upload pipeline: {response.status_code}") -# print(response.text) -# response.raise_for_status() -# -# -# -# #################### Testing ################### -# upload_nifi_pipeline_nifi_2_1( -# file_path="templates/basic_ETL.json", -# nifi_url="https://127.0.0.1.nip.io/nifi-api", -# username="lab08nifiuser", -# password="tartunifi2023", -# #process_group_id="5bb69687-0194-1000-ce98-931e7e192b3d", -# verify_ssl=False # set to True if using trusted SSL certs -# ) +import requests +import config + + + +# export TOKEN=$(curl -k -X POST https://127.0.0.1.nip.io/nifi-api/access/token\ +# -H "Content-Type: application/x-www-form-urlencoded" -d 'username=lab08nifiuser&password=tartunifi2023') + +def get_access_token(): + token_resp = requests.post( + f"{config.NIFI_HOST}/nifi-api/access/token", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data={"username": config.NIFI_USER, "password": config.NIFI_PASS}, + verify=False + ) + token_resp.raise_for_status() + token = token_resp.text.strip() + return token + + +# curl -sk -X POST "https://127.0.0.1.nip.io/nifi-api/process-groups/root/process-groups/upload" -H "Authorization: Bearer $TOKEN"\ +# -F "file=@pipelines/test_pipeline.json" -F "groupName=MyNewProcessGroup" -F "positionX=0" -F "positionY=0" +# -F "disconnectedNodeAcknowledged=false" -F "clientId=unique-client-id-123" + + +def upload_nifi_pipeline(token, pipeline_path, processorGroup_name, username=config.NIFI_USER, password=config.NIFI_PASS, nifi_url=config.NIFI_HOST, position_x=0, position_y=0, client_id = "unique-client-id-123"): + + with open(pipeline_path, "r") as json_file: + files = { + "file": json_file, + "groupName": (None, processorGroup_name), + "positionX": (None, str(position_x)), + "positionY": (None, str(position_y)), + "disconnectedNodeAcknowledged": (None, "false"), + "clientId": (None, client_id) + } + + upload_resp = requests.post( + f"{nifi_url}/nifi-api/process-groups/root/process-groups/upload", + headers={"Authorization": f"Bearer {token}"}, + files=files, + verify=False + ) + + upload_resp.raise_for_status() + print(f"✅ Uploaded process group '{processorGroup_name}' successfully!") diff --git a/modules/nifi/templates/basic_ETL.json b/modules/nifi/templates/basic_ETL.json index 34a0297..076610a 100644 --- a/modules/nifi/templates/basic_ETL.json +++ b/modules/nifi/templates/basic_ETL.json @@ -28,7 +28,7 @@ }, "properties": { "Regular Expression": "(?s)(^.*$)", - "Replacement Value": "weather,city=\"Tartu\" Temperature=${temperature},Wind=${wind_speed}", + "Replacement Value": "placeholder", "Evaluation Mode": "Entire text", "Text to Prepend": null, "Line-by-Line Evaluation Mode": "All", @@ -147,7 +147,7 @@ "Socket Read Timeout": "15 secs", "Socket Idle Connections": "5", "Request Body Enabled": "true", - "HTTP URL": "https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72¤t=temperature_2m,wind_speed_10m", + "HTTP URL": "placeholder", "Request OAuth2 Access Token Provider": null, "Socket Idle Timeout": "5 mins", "Response Redirects Enabled": "True", @@ -457,10 +457,8 @@ "Destination": "flowfile-attribute", "Max String Length": "20 MB", "Return Type": "auto-detect", - "latitude": "$.latitude", "Null Value Representation": "empty string", - "Path Not Found Behavior": "ignore", - "longitude": "$.longitude" + "Path Not Found Behavior": "ignore" }, "propertyDescriptors": { "Destination": { |