diff options
-rw-r--r-- | config.py | 4 | ||||
-rw-r--r-- | modules/nifi/core.py | 24 | ||||
-rw-r--r-- | modules/nifi/templates/splitJsonETL.json | 3 |
3 files changed, 17 insertions, 14 deletions
@@ -1,4 +1,4 @@ -INTERACTIVE_MODE=True +INTERACTIVE_MODE=False ## Nifi NIFI_USER="lab08nifiuser" @@ -23,7 +23,7 @@ DB_PASS="admin" -## Optional +## Needed if Interactive mode turned off API_URL="https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72¤t_weather=true" API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'} API_URL_USERNAME="TODO" diff --git a/modules/nifi/core.py b/modules/nifi/core.py index e4ab1f2..f4c377e 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -92,7 +92,7 @@ def get_data_values(): choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower() if choose_another == 'e': - return chosen_json_values + return chosen_json_values, api_url else: choice = common.ask_binary_input(prompt="\nKas soovid URL-i (m)uuta URL-i või (v)äljuda?(m/v): ",valikud=["m","v"]).strip().lower() if choice == 'v': @@ -103,7 +103,7 @@ def get_data_values(): ## TODO - textReplace part -> fix templates -def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name): +def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url): ############### Choosing and modfyfing Template ############## ### Check if splitJson template needed @@ -141,7 +141,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name): for key, value in data_values.items() : path_parts = value.split(']') update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1]) - measurements_name+=f",{key}=${{{key}}}" + measurements_name+=f"{key}=${{{key}}}," ## Database Setup set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties") @@ -149,13 +149,16 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name): ## EvaluateJsonPath processor setup for key, value in data_values.items() : update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) - measurements_name+=f",{key}=${{{key}}}" + measurements_name+=f"{key}=${{{key}}}," ## Database Setup set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") ## ReplaceText processor update - making it compatible for timeseries database (influxDB) - update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name) + update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name[:-1]) # Delete last coma + + ## Update API call URL + update_template(new_pipeline_path, "flowContents.processors[1].properties", "HTTP URL", api_url) ## Update scheduling Periond on API Calls update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod) @@ -164,20 +167,21 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name): ############################################### def build_pipeline(): - data_values = {} + if config.INTERACTIVE_MODE: - data_values = get_data_values() + data_values, api_url= get_data_values() print("\nKui tihti peaks andmekonveier jooksma? (sekundites)") schedulingPeriod = str(common.ask_digit_input(86400))+ "sec" new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json" else: + api_url = config.API_URL data_values = config.API_FIELDS schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD - new_pipeline_name=config.PIPELINE_NAME + new_pipeline_name = config.PIPELINE_NAME - modify_all_processors(data_values, schedulingPeriod, new_pipeline_name) - print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub kaustas 'pipelines'") + modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url) + print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.") ## Pipeline Deployment diff --git a/modules/nifi/templates/splitJsonETL.json b/modules/nifi/templates/splitJsonETL.json index aa3b536..cc4d2b6 100644 --- a/modules/nifi/templates/splitJsonETL.json +++ b/modules/nifi/templates/splitJsonETL.json @@ -163,14 +163,13 @@ "Request User-Agent": null, "Response Header Request Attributes Enabled": "false", "HTTP Method": "GET", - "Request Username": "rasmus.luha", + "Request Username": null "Request Content-Type": "${mime.type}", "Response Body Attribute Name": null, "Request Digest Authentication Enabled": "false", "Request Multipart Form-Data Name": null, "Response Cache Size": "10MB", "Response Body Ignored": "false", - "Replacement Value": "energy,building=\"Delta\" kilowattHours=${energy_value}" }, "propertyDescriptors": { "Request Content-Encoding": { |