summaryrefslogtreecommitdiff
path: root/modules/nifi/core.py
diff options
context:
space:
mode:
Diffstat (limited to 'modules/nifi/core.py')
-rw-r--r--modules/nifi/core.py121
1 files changed, 14 insertions, 107 deletions
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index 4ddc268..46455c7 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -1,17 +1,13 @@
## TODO - check syntax
from common import core as common
-import config as config
+import config
from modules.nifi import nifi_utils
-
from pyfiglet import figlet_format
from rich.console import Console
-import sys
-import json
+
import shutil
-import requests
import re
-import base64
def introduction():
@@ -21,89 +17,7 @@ def introduction():
print("Valisid Nifi Platformi!\n")
-def update_template(file_path, dot_path, new_key, new_value):
-
- # Step 2: Load the copied JSON
- with open(file_path, "r") as f:
- data = json.load(f)
-
- # Step 3: Walk the path (e.g. 'flowContents.processors[0].properties')
- keys = dot_path.split(".")
- current = data
-
- for key in keys:
- if key.endswith("]"): # Handle list index like processors[0]
- list_key = key[:key.index("[")]
- index = int(key[key.index("[") + 1 : key.index("]")])
- current = current[list_key][index]
- else:
- current = current[key]
-
- # Step 4: Add or update the key
- current[new_key] = new_value
- print(f"🛠 Added '{new_key}': '{new_value}' at path '{dot_path}'")
-
- # Step 5: Save back the JSON
- with open(file_path, "w") as f:
- json.dump(data, f, indent=2)
- print("✅ Changes saved.")
-
-
-
-
-def set_database_credentials(file_path,dot_path):
- ## Update URL
- update_template(file_path, dot_path, "HTTP URL", config.DB_URL)
-
- ## Update username
- update_template(file_path, dot_path, "username", config.DB_USER)
-
- ## Update username
- update_template(file_path, dot_path, "password", config.DB_PASS)
-
-
-
-
-def get_data_values():
-
- chosen_json_values = {}
-
- ##Getting API url and json values
- while True:
- api_url = input("Palun sisesta andmete API URL: ").strip()
- username = "placeholder"
- passwd = "placeholder"
-
- needs_auth = common.ask_binary_input(prompt="Kas API vajab ka kasutajaga autentimist?(jah/ei): ").strip().lower() == 'jah'
- if needs_auth:
- username=input("Sisesta kasutajanimi: ")
- passwd=input("Sisesta parool: ")
-
- json_data, api_url_correct = common.is_app_url_correct(api_url,needs_auth,username,passwd)
-
-
- ## TODO itemite eemaldamise v6malus
- if api_url_correct:
- while True:
-
- chosen_json_values.update(common.inspect_json_top_level_test(json_data))
- ## Testing
- print("Oled hetkel valinud järgmised väärtused JSON lõppväärtused: ", ", ".join(chosen_json_values))
- choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower()
-
- if choose_another == 'e':
- return chosen_json_values, api_url, username, passwd
- else:
- choice = common.ask_binary_input(prompt="\nKas soovid URL-i (m)uuta URL-i või (v)äljuda?(m/v): ",valikud=["m","v"]).strip().lower()
- if choice == 'v':
- print("Väljun programmist.")
- sys.exit()
-
-
-
-
-## TODO - textReplace part -> fix templates
def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password):
############### Choosing and modfyfing Template ##############
@@ -116,10 +30,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
#path_parts = el.split(']')
path_parts = re.split(r'(?<=\])', el)
-
-
### Select template
- ## TODO - currently has only 2 templates...
if needs_SplitJson:
template_name="splitJsonETL.json"
else:
@@ -129,7 +40,6 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
shutil.copy(f"modules/nifi/templates/{template_name}", new_pipeline_path)
-
### Processor editing
## Measurements name defining
@@ -142,48 +52,47 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
if needs_SplitJson:
## SplitJson update
split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0])
- update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path)
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path)
## EvaluateJsonPath processor setup
for key, value in data_values.items() :
path_parts = value.split(']')
- update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
measurements_name+=f"{key}=${{{key}}},"
## Database Setup
- set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties")
+ nifi_utils.set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties")
else:
## EvaluateJsonPath processor setup
for key, value in data_values.items() :
- update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
measurements_name+=f"{key}=${{{key}}},"
## Database Setup
- set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
+ nifi_utils.set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
## ReplaceText processor update - making it compatible for timeseries database (influxDB)
- update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name[:-1]) # Delete last coma
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name[:-1]) # Delete last coma
## Update API call URL
- update_template(new_pipeline_path, "flowContents.processors[1].properties", "HTTP URL", api_url)
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "HTTP URL", api_url)
## Update scheduling Periond on API Calls
- update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod)
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod)
## Add api credentials
if api_username != "placeholder":
- update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Username", api_username)
- update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Password", api_password)
-
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Username", api_username)
+ nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Password", api_password)
+###
-###############################################
def build_pipeline():
if config.INTERACTIVE_MODE:
- data_values, api_url, api_username, api_password= get_data_values()
+ data_values, api_url, api_username, api_password= common.get_data_values()
print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"
@@ -202,7 +111,6 @@ def build_pipeline():
print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.")
-
## Pipeline Deployment
if (config.NIFI_DEPLOY):
token = nifi_utils.get_access_token()
@@ -212,4 +120,3 @@ def build_pipeline():
if choice == "jah":
token = nifi_utils.get_access_token()
nifi_utils.upload_nifi_pipeline(token, "pipelines/test_pipeline.json", "test_pipeline", username=config.NIFI_USER, password=config.NIFI_PASS, nifi_url=config.NIFI_HOST, position_x=0, position_y=0)
-