summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRasmus Luha <rasmus.luha@ut.ee>2025-04-21 21:54:58 +0300
committerRasmus Luha <rasmus.luha@ut.ee>2025-04-21 21:54:58 +0300
commit0190bb2cf61cf36906201445cbecc4a2b6b22af1 (patch)
treec4c0056b8473c11db7e0959ce601ee79c46862ef
parent5bbcd5b6bc8340eedb7e8dcd88e156a0987d42f6 (diff)
add more config vars and option to opt in and out of interactive mode, other minor fixes
-rw-r--r--common/core.py7
-rw-r--r--config.py18
-rw-r--r--modules/nifi/core.py38
-rw-r--r--modules/nifi/nifi_utils.py248
-rw-r--r--modules/nifi/templates/protsessorite_järjekorrad.txt18
5 files changed, 233 insertions, 96 deletions
diff --git a/common/core.py b/common/core.py
index 3d4fac1..6160ee3 100644
--- a/common/core.py
+++ b/common/core.py
@@ -96,6 +96,7 @@ def inspect_json_top_level(json_data):
def inspect_json_top_level_test(json_data, has_list=False):
path = ""
+ last_key = "value" #Placeholder
while True:
print(json.dumps(json_data, indent=2))
@@ -113,6 +114,7 @@ def inspect_json_top_level_test(json_data, has_list=False):
selected_key = keys[selected_index]
selected_value = json_data[selected_key]
path += "." + selected_key
+ last_key = selected_key
elif isinstance(json_data, list):
has_list = True
@@ -123,11 +125,12 @@ def inspect_json_top_level_test(json_data, has_list=False):
selected_index = ask_digit_input(len(json_data) - 1)
selected_value = json_data[selected_index]
path += f"[{selected_index}]"
+ last_key = str(selected_index)
else:
# Primitive value, nothing to dive into
print(f"\nLõppväärtus: {json_data}")
- return {"value": path}
+ return {last_key: path}
@@ -136,5 +139,5 @@ def inspect_json_top_level_test(json_data, has_list=False):
else:
#print(f"\nValitud väärtus: '{selected_value}'")
print(f"\nValitud väärtus: '{path}'")
- return {"value": path}
+ return {last_key: path}
diff --git a/config.py b/config.py
index f21307e..6d88e31 100644
--- a/config.py
+++ b/config.py
@@ -1,3 +1,5 @@
+INTERACTIVE_MODE=True
+
## Nifi
NIFI_USER="lab08nifiuser"
NIFI_PASS="tartunifi2023"
@@ -5,6 +7,8 @@ NIFI_PASS="tartunifi2023"
NIFI_HOST="https://127.0.0.1.nip.io"
NIFI_DEPLOY=False
+NIFI_MEASUREMENT_NAME="test_measurementName"
+
## Database
DB_URL="http://influxdb:8086/write?db=nifi_weatherData"
@@ -12,3 +16,17 @@ DB_USER="admin"
## TODO - somehow must be hidden inside the pipeline in the end
DB_PASS="admin"
+
+
+
+###############################
+
+
+
+## Optional
+API_URL="https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72&current_weather=true"
+API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'}
+API_URL_USERNAME="TODO"
+API_URL_PASSWORD="TODO"
+PIPELINE_SCHEDULING_PERIOD="5 sec"
+PIPELINE_NAME="test_pipeline.json"
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index fccbf91..e4ab1f2 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -87,6 +87,7 @@ def get_data_values():
while True:
chosen_json_values.update(common.inspect_json_top_level_test(json_data))
+ ## Testing
print("Oled hetkel valinud järgmised väärtused JSON lõppväärtused: ", ", ".join(chosen_json_values))
choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower()
@@ -102,10 +103,9 @@ def get_data_values():
## TODO - textReplace part -> fix templates
+def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name):
+ ############### Choosing and modfyfing Template ##############
-def build_pipeline():
- data_values = get_data_values()
-
### Check if splitJson template needed
needs_SplitJson = False
path_parts = []
@@ -118,8 +118,7 @@ def build_pipeline():
### Select template
- ## TODO - unhardcoded template usage
- new_pipeline_name="test_pipeline.json"
+ ## TODO - currently has only 2 templates...
if needs_SplitJson:
template_name="splitJsonETL.json"
else:
@@ -131,8 +130,7 @@ def build_pipeline():
### Processor editing
- ## Measurements setup - TODO: hardcoded.
- measurements_name = "test_measurementName, "
+ measurements_name = config.NIFI_MEASUREMENT_NAME+" "
if needs_SplitJson:
## SplitJson update
@@ -143,7 +141,7 @@ def build_pipeline():
for key, value in data_values.items() :
path_parts = value.split(']')
update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
- measurements_name+=f"{key}={{{key}}}"
+ measurements_name+=f",{key}=${{{key}}}"
## Database Setup
set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties")
@@ -151,19 +149,35 @@ def build_pipeline():
## EvaluateJsonPath processor setup
for key, value in data_values.items() :
update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
- measurements_name+=f"{key}={{{key}}}"
+ measurements_name+=f",{key}=${{{key}}}"
## Database Setup
set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
-
- ##ReplaceText update
+ ## ReplaceText processor update - making it compatible for timeseries database (influxDB)
update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name)
+ ## Update scheduling Periond on API Calls
+ update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod)
+
+###############################################
+
+def build_pipeline():
+ data_values = {}
+ if config.INTERACTIVE_MODE:
+ data_values = get_data_values()
+ print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
+ schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"
+ new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json"
+ else:
+ data_values = config.API_FIELDS
+ schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD
+ new_pipeline_name=config.PIPELINE_NAME
- print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.")
+ modify_all_processors(data_values, schedulingPeriod, new_pipeline_name)
+ print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub kaustas 'pipelines'")
## Pipeline Deployment
diff --git a/modules/nifi/nifi_utils.py b/modules/nifi/nifi_utils.py
index 9994d61..34e6ad8 100644
--- a/modules/nifi/nifi_utils.py
+++ b/modules/nifi/nifi_utils.py
@@ -1,82 +1,166 @@
-import requests
-import json
-
-##############################
-
-def extract_snippet_fields(flow):
- return {
- key: flow.get(key, [])
- for key in [
- "processors",
- "connections",
- "funnels",
- "inputPorts",
- "outputPorts",
- "labels",
- "controllerServices",
- "processGroups"
- ]
- }
-
-def upload_nifi_exported_flow(
- nifi_host: str,
- username: str,
- password: str,
- json_file_path: str,
- verify_ssl: bool = False
-):
- try:
- token_resp = requests.post(
- f"{nifi_host}/nifi-api/access/token",
- data={"username": username, "password": password},
- verify=verify_ssl
- )
- token_resp.raise_for_status()
- token = token_resp.text
-
- headers = {
- "Authorization": f"Bearer {token}",
- "Content-Type": "application/json"
- }
-
- root_resp = requests.get(f"{nifi_host}/nifi-api/flow/process-groups/root", headers=headers, verify=verify_ssl)
- root_resp.raise_for_status()
- root_pg_id = root_resp.json()["processGroupFlow"]["id"]
-
- with open(json_file_path, "r") as f:
- raw = json.load(f)
-
- flow = raw.get("flowContents")
- if not flow:
- raise ValueError("Missing 'flowContents' in provided file.")
-
- snippet = extract_snippet_fields(flow)
-
- payload = {
- "revision": { "version": 0 },
- "component": {
- "name": flow.get("name", "ImportedGroup"),
- "position": { "x": 0.0, "y": 0.0 },
- "flowSnippet": snippet
- }
- }
-
- url = f"{nifi_host}/nifi-api/process-groups/{root_pg_id}/process-groups"
- resp = requests.post(url, headers=headers, json=payload, verify=verify_ssl)
-
- if resp.status_code == 201:
- print("✅ Flow uploaded successfully!")
- pg_id = resp.json()["component"]["id"]
- print(f"🔗 View it at: {nifi_host}/nifi/#/process-groups/{root_pg_id}/process-group/{pg_id}")
- else:
- print(f"❌ Upload failed: {resp.status_code} - {resp.text}")
-
- except Exception as e:
- print(f"🚨 Error: {e}")
-
-
-
-
-
-
-
+# import requests
+# import json
+#
+# def extract_snippet_fields(flow):
+# return {
+# key: flow.get(key, [])
+# for key in [
+# "processors",
+# "connections",
+# "funnels",
+# "inputPorts",
+# "outputPorts",
+# "labels",
+# "controllerServices",
+# "processGroups"
+# ]
+# }
+#
+# def upload_nifi_exported_flow(
+# nifi_host: str,
+# username: str,
+# password: str,
+# json_file_path: str,
+# verify_ssl: bool = False
+# ):
+# try:
+# token_resp = requests.post(
+# f"{nifi_host}/nifi-api/access/token",
+# data={"username": username, "password": password},
+# verify=verify_ssl
+# )
+# token_resp.raise_for_status()
+# token = token_resp.text
+#
+# headers = {
+# "Authorization": f"Bearer {token}",
+# "Content-Type": "application/json"
+# }
+#
+# root_resp = requests.get(f"{nifi_host}/nifi-api/flow/process-groups/root", headers=headers, verify=verify_ssl)
+# root_resp.raise_for_status()
+# root_pg_id = root_resp.json()["processGroupFlow"]["id"]
+#
+# with open(json_file_path, "r") as f:
+# raw = json.load(f)
+#
+# flow = raw.get("flowContents")
+# if not flow:
+# raise ValueError("Missing 'flowContents' in provided file.")
+#
+# snippet = extract_snippet_fields(flow)
+#
+# payload = {
+# "revision": { "version": 0 },
+# "component": {
+# "name": flow.get("name", "ImportedGroup"),
+# "position": { "x": 0.0, "y": 0.0 },
+# "flowSnippet": snippet
+# }
+# }
+#
+# url = f"{nifi_host}/nifi-api/process-groups/{root_pg_id}/process-groups"
+# resp = requests.post(url, headers=headers, json=payload, verify=verify_ssl)
+#
+# if resp.status_code == 201:
+# print("✅ Flow uploaded successfully!")
+# pg_id = resp.json()["component"]["id"]
+# print(f"🔗 View it at: {nifi_host}/nifi/#/process-groups/{root_pg_id}/process-group/{pg_id}")
+# else:
+# print(f"❌ Upload failed: {resp.status_code} - {resp.text}")
+#
+# except Exception as e:
+# print(f"🚨 Error: {e}")
+#
+#
+#
+#
+# #####################################################################################
+#
+#
+# def get_nifi_token(nifi_url, username, password, verify_ssl=False):
+# """
+# Get authentication token from NiFi.
+# """
+# token_url = f"{nifi_url}/access/token"
+# headers = {
+# "Content-Type": "application/x-www-form-urlencoded"
+# }
+# data = {
+# "username": username,
+# "password": password
+# }
+#
+# response = requests.post(token_url, headers=headers, data=data, verify=verify_ssl)
+#
+# if response.ok:
+# return response.text
+# else:
+# print(f"Failed to get token: {response.status_code}")
+# print(response.text)
+# response.raise_for_status()
+#
+# def get_root_process_group_id(nifi_url, token, verify_ssl=False):
+# """
+# Get the root process group ID from NiFi.
+# """
+# root_url = f"{nifi_url}/flow/process-groups/root"
+# headers = {
+# "Authorization": f"Bearer {token}"
+# }
+#
+# response = requests.get(root_url, headers=headers, verify=verify_ssl)
+#
+# if response.ok:
+# return response.json()["processGroupFlow"]["id"]
+# else:
+# print(f"Failed to get root process group ID: {response.status_code}")
+# print(response.text)
+# response.raise_for_status()
+#
+#
+# def upload_nifi_pipeline_nifi_2_1(file_path, nifi_url, username, password, process_group_id=None, verify_ssl=False):
+# """
+# Authenticate to NiFi 2.1+, fetch root process group ID, and upload the flow definition JSON.
+# """
+# token = get_nifi_token(nifi_url, username, password, verify_ssl)
+#
+# if not process_group_id:
+# process_group_id = get_root_process_group_id(nifi_url, token, verify_ssl)
+#
+# with open(file_path, 'r') as f:
+# pipeline_json = json.load(f)
+#
+# import_endpoint = f"{nifi_url}/versions/process-groups/{process_group_id}/import"
+# headers = {
+# "Content-Type": "application/json",
+# "Accept": "application/json",
+# "Authorization": f"Bearer {token}"
+# }
+#
+# response = requests.post(
+# import_endpoint,
+# headers=headers,
+# data=json.dumps(pipeline_json),
+# verify=verify_ssl
+# )
+#
+# if response.ok:
+# print("✅ Pipeline uploaded successfully (NiFi 2.1).")
+# return response.json()
+# else:
+# print(f"❌ Failed to upload pipeline: {response.status_code}")
+# print(response.text)
+# response.raise_for_status()
+#
+#
+#
+# #################### Testing ###################
+# upload_nifi_pipeline_nifi_2_1(
+# file_path="templates/basic_ETL.json",
+# nifi_url="https://127.0.0.1.nip.io/nifi-api",
+# username="lab08nifiuser",
+# password="tartunifi2023",
+# #process_group_id="5bb69687-0194-1000-ce98-931e7e192b3d",
+# verify_ssl=False # set to True if using trusted SSL certs
+# )
diff --git a/modules/nifi/templates/protsessorite_järjekorrad.txt b/modules/nifi/templates/protsessorite_järjekorrad.txt
new file mode 100644
index 0000000..fe610fc
--- /dev/null
+++ b/modules/nifi/templates/protsessorite_järjekorrad.txt
@@ -0,0 +1,18 @@
+basic_ETL.json
+--------------
+0: ReplaceText
+1: InvokeHTTP in
+2: EvaluateJsonPath
+3: InvokeHTTP Out
+
+
+
+
+splitJsonETL.json
+-----------------
+
+0: ReplaceText
+1: InvokeHTTP in
+2: EvaluateJsonPath
+3: SplitJson
+3: InvokeHTTP Out