From 0190bb2cf61cf36906201445cbecc4a2b6b22af1 Mon Sep 17 00:00:00 2001 From: Rasmus Luha Date: Mon, 21 Apr 2025 21:54:58 +0300 Subject: add more config vars and option to opt in and out of interactive mode, other minor fixes --- modules/nifi/core.py | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) (limited to 'modules/nifi/core.py') diff --git a/modules/nifi/core.py b/modules/nifi/core.py index fccbf91..e4ab1f2 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -87,6 +87,7 @@ def get_data_values(): while True: chosen_json_values.update(common.inspect_json_top_level_test(json_data)) + ## Testing print("Oled hetkel valinud järgmised väärtused JSON lõppväärtused: ", ", ".join(chosen_json_values)) choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower() @@ -102,10 +103,9 @@ def get_data_values(): ## TODO - textReplace part -> fix templates +def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name): + ############### Choosing and modfyfing Template ############## -def build_pipeline(): - data_values = get_data_values() - ### Check if splitJson template needed needs_SplitJson = False path_parts = [] @@ -118,8 +118,7 @@ def build_pipeline(): ### Select template - ## TODO - unhardcoded template usage - new_pipeline_name="test_pipeline.json" + ## TODO - currently has only 2 templates... if needs_SplitJson: template_name="splitJsonETL.json" else: @@ -131,8 +130,7 @@ def build_pipeline(): ### Processor editing - ## Measurements setup - TODO: hardcoded. - measurements_name = "test_measurementName, " + measurements_name = config.NIFI_MEASUREMENT_NAME+" " if needs_SplitJson: ## SplitJson update @@ -143,7 +141,7 @@ def build_pipeline(): for key, value in data_values.items() : path_parts = value.split(']') update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1]) - measurements_name+=f"{key}={{{key}}}" + measurements_name+=f",{key}=${{{key}}}" ## Database Setup set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties") @@ -151,19 +149,35 @@ def build_pipeline(): ## EvaluateJsonPath processor setup for key, value in data_values.items() : update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) - measurements_name+=f"{key}={{{key}}}" + measurements_name+=f",{key}=${{{key}}}" ## Database Setup set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") - - ##ReplaceText update + ## ReplaceText processor update - making it compatible for timeseries database (influxDB) update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name) + ## Update scheduling Periond on API Calls + update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod) + +############################################### + +def build_pipeline(): + data_values = {} + if config.INTERACTIVE_MODE: + data_values = get_data_values() + print("\nKui tihti peaks andmekonveier jooksma? (sekundites)") + schedulingPeriod = str(common.ask_digit_input(86400))+ "sec" + new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json" + else: + data_values = config.API_FIELDS + schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD + new_pipeline_name=config.PIPELINE_NAME - print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.") + modify_all_processors(data_values, schedulingPeriod, new_pipeline_name) + print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub kaustas 'pipelines'") ## Pipeline Deployment -- cgit v1.2.3