summaryrefslogtreecommitdiff
path: root/modules/nifi/core.py
diff options
context:
space:
mode:
Diffstat (limited to 'modules/nifi/core.py')
-rw-r--r--modules/nifi/core.py28
1 files changed, 21 insertions, 7 deletions
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index f4c377e..be3cc1f 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -11,6 +11,7 @@ import json
import shutil
import requests
import re
+import base64
def introduction():
@@ -92,7 +93,7 @@ def get_data_values():
choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower()
if choose_another == 'e':
- return chosen_json_values, api_url
+ return chosen_json_values, api_url, username, passwd
else:
choice = common.ask_binary_input(prompt="\nKas soovid URL-i (m)uuta URL-i või (v)äljuda?(m/v): ",valikud=["m","v"]).strip().lower()
if choice == 'v':
@@ -103,7 +104,7 @@ def get_data_values():
## TODO - textReplace part -> fix templates
-def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url):
+def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password):
############### Choosing and modfyfing Template ##############
### Check if splitJson template needed
@@ -129,13 +130,15 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
- ### Processor editing
+ ### Processor editing - TODO only from config file currently
measurements_name = config.NIFI_MEASUREMENT_NAME+" "
if needs_SplitJson:
## SplitJson update
split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0])
+ print("Got here")
update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path)
+ print("Got also here")
## EvaluateJsonPath processor setup
for key, value in data_values.items() :
@@ -163,27 +166,38 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
## Update scheduling Periond on API Calls
update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod)
+ ## Add api credentials
+ if api_username != "placeholder":
+ update_template(new_pipeline_path, "flowContents.processors[1]", "Request Username", api_username)
+ #update_template(new_pipeline_path, "flowContents.processors[1]", "Request Password", api_password)
+ #update_template(new_pipeline_path, "flowContents.processors[1]", "Request Password", base64.b64encode(api_password.encode()).decode())
+
+
+
###############################################
def build_pipeline():
if config.INTERACTIVE_MODE:
- data_values, api_url= get_data_values()
+ data_values, api_url, api_username, api_password= get_data_values()
+
print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"
+
new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json"
+
else:
api_url = config.API_URL
data_values = config.API_FIELDS
schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD
new_pipeline_name = config.PIPELINE_NAME
+ api_username = config.API_USERNAME
+ api_password = config.API_PASSWORD
-
- modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url)
+ modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password)
print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.")
-
## Pipeline Deployment
if (config.NIFI_DEPLOY):
nifi_utils.upload_nifi_exported_flow( nifi_host=config.NIFI_HOST, username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False)