diff options
Diffstat (limited to 'modules/nifi/core.py')
-rw-r--r-- | modules/nifi/core.py | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/modules/nifi/core.py b/modules/nifi/core.py index f4c377e..be3cc1f 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -11,6 +11,7 @@ import json import shutil import requests import re +import base64 def introduction(): @@ -92,7 +93,7 @@ def get_data_values(): choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower() if choose_another == 'e': - return chosen_json_values, api_url + return chosen_json_values, api_url, username, passwd else: choice = common.ask_binary_input(prompt="\nKas soovid URL-i (m)uuta URL-i või (v)äljuda?(m/v): ",valikud=["m","v"]).strip().lower() if choice == 'v': @@ -103,7 +104,7 @@ def get_data_values(): ## TODO - textReplace part -> fix templates -def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url): +def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password): ############### Choosing and modfyfing Template ############## ### Check if splitJson template needed @@ -129,13 +130,15 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_ - ### Processor editing + ### Processor editing - TODO only from config file currently measurements_name = config.NIFI_MEASUREMENT_NAME+" " if needs_SplitJson: ## SplitJson update split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0]) + print("Got here") update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path) + print("Got also here") ## EvaluateJsonPath processor setup for key, value in data_values.items() : @@ -163,27 +166,38 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_ ## Update scheduling Periond on API Calls update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod) + ## Add api credentials + if api_username != "placeholder": + update_template(new_pipeline_path, "flowContents.processors[1]", "Request Username", api_username) + #update_template(new_pipeline_path, "flowContents.processors[1]", "Request Password", api_password) + #update_template(new_pipeline_path, "flowContents.processors[1]", "Request Password", base64.b64encode(api_password.encode()).decode()) + + + ############################################### def build_pipeline(): if config.INTERACTIVE_MODE: - data_values, api_url= get_data_values() + data_values, api_url, api_username, api_password= get_data_values() + print("\nKui tihti peaks andmekonveier jooksma? (sekundites)") schedulingPeriod = str(common.ask_digit_input(86400))+ "sec" + new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json" + else: api_url = config.API_URL data_values = config.API_FIELDS schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD new_pipeline_name = config.PIPELINE_NAME + api_username = config.API_USERNAME + api_password = config.API_PASSWORD - - modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url) + modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password) print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.") - ## Pipeline Deployment if (config.NIFI_DEPLOY): nifi_utils.upload_nifi_exported_flow( nifi_host=config.NIFI_HOST, username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False) |