diff options
Diffstat (limited to 'modules/nifi/core.py')
-rw-r--r-- | modules/nifi/core.py | 24 |
1 files changed, 14 insertions, 10 deletions
diff --git a/modules/nifi/core.py b/modules/nifi/core.py index e4ab1f2..f4c377e 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -92,7 +92,7 @@ def get_data_values(): choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower() if choose_another == 'e': - return chosen_json_values + return chosen_json_values, api_url else: choice = common.ask_binary_input(prompt="\nKas soovid URL-i (m)uuta URL-i või (v)äljuda?(m/v): ",valikud=["m","v"]).strip().lower() if choice == 'v': @@ -103,7 +103,7 @@ def get_data_values(): ## TODO - textReplace part -> fix templates -def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name): +def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url): ############### Choosing and modfyfing Template ############## ### Check if splitJson template needed @@ -141,7 +141,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name): for key, value in data_values.items() : path_parts = value.split(']') update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1]) - measurements_name+=f",{key}=${{{key}}}" + measurements_name+=f"{key}=${{{key}}}," ## Database Setup set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties") @@ -149,13 +149,16 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name): ## EvaluateJsonPath processor setup for key, value in data_values.items() : update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) - measurements_name+=f",{key}=${{{key}}}" + measurements_name+=f"{key}=${{{key}}}," ## Database Setup set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") ## ReplaceText processor update - making it compatible for timeseries database (influxDB) - update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name) + update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name[:-1]) # Delete last coma + + ## Update API call URL + update_template(new_pipeline_path, "flowContents.processors[1].properties", "HTTP URL", api_url) ## Update scheduling Periond on API Calls update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod) @@ -164,20 +167,21 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name): ############################################### def build_pipeline(): - data_values = {} + if config.INTERACTIVE_MODE: - data_values = get_data_values() + data_values, api_url= get_data_values() print("\nKui tihti peaks andmekonveier jooksma? (sekundites)") schedulingPeriod = str(common.ask_digit_input(86400))+ "sec" new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json" else: + api_url = config.API_URL data_values = config.API_FIELDS schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD - new_pipeline_name=config.PIPELINE_NAME + new_pipeline_name = config.PIPELINE_NAME - modify_all_processors(data_values, schedulingPeriod, new_pipeline_name) - print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub kaustas 'pipelines'") + modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url) + print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.") ## Pipeline Deployment |