summaryrefslogtreecommitdiff
path: root/modules/nifi/core.py
diff options
context:
space:
mode:
Diffstat (limited to 'modules/nifi/core.py')
-rw-r--r--modules/nifi/core.py24
1 files changed, 14 insertions, 10 deletions
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index e4ab1f2..f4c377e 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -92,7 +92,7 @@ def get_data_values():
choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower()
if choose_another == 'e':
- return chosen_json_values
+ return chosen_json_values, api_url
else:
choice = common.ask_binary_input(prompt="\nKas soovid URL-i (m)uuta URL-i või (v)äljuda?(m/v): ",valikud=["m","v"]).strip().lower()
if choice == 'v':
@@ -103,7 +103,7 @@ def get_data_values():
## TODO - textReplace part -> fix templates
-def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name):
+def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url):
############### Choosing and modfyfing Template ##############
### Check if splitJson template needed
@@ -141,7 +141,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name):
for key, value in data_values.items() :
path_parts = value.split(']')
update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
- measurements_name+=f",{key}=${{{key}}}"
+ measurements_name+=f"{key}=${{{key}}},"
## Database Setup
set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties")
@@ -149,13 +149,16 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name):
## EvaluateJsonPath processor setup
for key, value in data_values.items() :
update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
- measurements_name+=f",{key}=${{{key}}}"
+ measurements_name+=f"{key}=${{{key}}},"
## Database Setup
set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
## ReplaceText processor update - making it compatible for timeseries database (influxDB)
- update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name)
+ update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name[:-1]) # Delete last coma
+
+ ## Update API call URL
+ update_template(new_pipeline_path, "flowContents.processors[1].properties", "HTTP URL", api_url)
## Update scheduling Periond on API Calls
update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod)
@@ -164,20 +167,21 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name):
###############################################
def build_pipeline():
- data_values = {}
+
if config.INTERACTIVE_MODE:
- data_values = get_data_values()
+ data_values, api_url= get_data_values()
print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"
new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json"
else:
+ api_url = config.API_URL
data_values = config.API_FIELDS
schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD
- new_pipeline_name=config.PIPELINE_NAME
+ new_pipeline_name = config.PIPELINE_NAME
- modify_all_processors(data_values, schedulingPeriod, new_pipeline_name)
- print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub kaustas 'pipelines'")
+ modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url)
+ print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.")
## Pipeline Deployment