summaryrefslogtreecommitdiff
path: root/modules/nifi/core.py
diff options
context:
space:
mode:
Diffstat (limited to 'modules/nifi/core.py')
-rw-r--r--modules/nifi/core.py38
1 files changed, 26 insertions, 12 deletions
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index fccbf91..e4ab1f2 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -87,6 +87,7 @@ def get_data_values():
while True:
chosen_json_values.update(common.inspect_json_top_level_test(json_data))
+ ## Testing
print("Oled hetkel valinud järgmised väärtused JSON lõppväärtused: ", ", ".join(chosen_json_values))
choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower()
@@ -102,10 +103,9 @@ def get_data_values():
## TODO - textReplace part -> fix templates
+def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name):
+ ############### Choosing and modfyfing Template ##############
-def build_pipeline():
- data_values = get_data_values()
-
### Check if splitJson template needed
needs_SplitJson = False
path_parts = []
@@ -118,8 +118,7 @@ def build_pipeline():
### Select template
- ## TODO - unhardcoded template usage
- new_pipeline_name="test_pipeline.json"
+ ## TODO - currently has only 2 templates...
if needs_SplitJson:
template_name="splitJsonETL.json"
else:
@@ -131,8 +130,7 @@ def build_pipeline():
### Processor editing
- ## Measurements setup - TODO: hardcoded.
- measurements_name = "test_measurementName, "
+ measurements_name = config.NIFI_MEASUREMENT_NAME+" "
if needs_SplitJson:
## SplitJson update
@@ -143,7 +141,7 @@ def build_pipeline():
for key, value in data_values.items() :
path_parts = value.split(']')
update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
- measurements_name+=f"{key}={{{key}}}"
+ measurements_name+=f",{key}=${{{key}}}"
## Database Setup
set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties")
@@ -151,19 +149,35 @@ def build_pipeline():
## EvaluateJsonPath processor setup
for key, value in data_values.items() :
update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
- measurements_name+=f"{key}={{{key}}}"
+ measurements_name+=f",{key}=${{{key}}}"
## Database Setup
set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
-
- ##ReplaceText update
+ ## ReplaceText processor update - making it compatible for timeseries database (influxDB)
update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name)
+ ## Update scheduling Periond on API Calls
+ update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod)
+
+###############################################
+
+def build_pipeline():
+ data_values = {}
+ if config.INTERACTIVE_MODE:
+ data_values = get_data_values()
+ print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
+ schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"
+ new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json"
+ else:
+ data_values = config.API_FIELDS
+ schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD
+ new_pipeline_name=config.PIPELINE_NAME
- print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.")
+ modify_all_processors(data_values, schedulingPeriod, new_pipeline_name)
+ print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub kaustas 'pipelines'")
## Pipeline Deployment