summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRasmus Luha <rasmus.luha@ut.ee>2025-04-21 22:52:37 +0300
committerRasmus Luha <rasmus.luha@ut.ee>2025-04-21 22:52:37 +0300
commitea4cfadef55319da613901017a586043e75e769f (patch)
tree10432285f9ed286710d81c2508b52eb1ec33492a
parent0190bb2cf61cf36906201445cbecc4a2b6b22af1 (diff)
add api url modifying. Nifi weatherAPI pipeline OK!
-rw-r--r--config.py4
-rw-r--r--modules/nifi/core.py24
-rw-r--r--modules/nifi/templates/splitJsonETL.json3
3 files changed, 17 insertions, 14 deletions
diff --git a/config.py b/config.py
index 6d88e31..c737dc8 100644
--- a/config.py
+++ b/config.py
@@ -1,4 +1,4 @@
-INTERACTIVE_MODE=True
+INTERACTIVE_MODE=False
## Nifi
NIFI_USER="lab08nifiuser"
@@ -23,7 +23,7 @@ DB_PASS="admin"
-## Optional
+## Needed if Interactive mode turned off
API_URL="https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72&current_weather=true"
API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'}
API_URL_USERNAME="TODO"
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index e4ab1f2..f4c377e 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -92,7 +92,7 @@ def get_data_values():
choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower()
if choose_another == 'e':
- return chosen_json_values
+ return chosen_json_values, api_url
else:
choice = common.ask_binary_input(prompt="\nKas soovid URL-i (m)uuta URL-i või (v)äljuda?(m/v): ",valikud=["m","v"]).strip().lower()
if choice == 'v':
@@ -103,7 +103,7 @@ def get_data_values():
## TODO - textReplace part -> fix templates
-def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name):
+def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url):
############### Choosing and modfyfing Template ##############
### Check if splitJson template needed
@@ -141,7 +141,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name):
for key, value in data_values.items() :
path_parts = value.split(']')
update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
- measurements_name+=f",{key}=${{{key}}}"
+ measurements_name+=f"{key}=${{{key}}},"
## Database Setup
set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties")
@@ -149,13 +149,16 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name):
## EvaluateJsonPath processor setup
for key, value in data_values.items() :
update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
- measurements_name+=f",{key}=${{{key}}}"
+ measurements_name+=f"{key}=${{{key}}},"
## Database Setup
set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
## ReplaceText processor update - making it compatible for timeseries database (influxDB)
- update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name)
+ update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name[:-1]) # Delete last coma
+
+ ## Update API call URL
+ update_template(new_pipeline_path, "flowContents.processors[1].properties", "HTTP URL", api_url)
## Update scheduling Periond on API Calls
update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod)
@@ -164,20 +167,21 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name):
###############################################
def build_pipeline():
- data_values = {}
+
if config.INTERACTIVE_MODE:
- data_values = get_data_values()
+ data_values, api_url= get_data_values()
print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"
new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json"
else:
+ api_url = config.API_URL
data_values = config.API_FIELDS
schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD
- new_pipeline_name=config.PIPELINE_NAME
+ new_pipeline_name = config.PIPELINE_NAME
- modify_all_processors(data_values, schedulingPeriod, new_pipeline_name)
- print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub kaustas 'pipelines'")
+ modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url)
+ print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.")
## Pipeline Deployment
diff --git a/modules/nifi/templates/splitJsonETL.json b/modules/nifi/templates/splitJsonETL.json
index aa3b536..cc4d2b6 100644
--- a/modules/nifi/templates/splitJsonETL.json
+++ b/modules/nifi/templates/splitJsonETL.json
@@ -163,14 +163,13 @@
"Request User-Agent": null,
"Response Header Request Attributes Enabled": "false",
"HTTP Method": "GET",
- "Request Username": "rasmus.luha",
+ "Request Username": null
"Request Content-Type": "${mime.type}",
"Response Body Attribute Name": null,
"Request Digest Authentication Enabled": "false",
"Request Multipart Form-Data Name": null,
"Response Cache Size": "10MB",
"Response Body Ignored": "false",
- "Replacement Value": "energy,building=\"Delta\" kilowattHours=${energy_value}"
},
"propertyDescriptors": {
"Request Content-Encoding": {