summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--config.py23
-rw-r--r--modules/nifi/core.py4
-rw-r--r--modules/telegraf/core.py39
-rw-r--r--modules/telegraf/telegraf_utils.py70
-rw-r--r--modules/telegraf/templates/different_jsonPaths_ETL.toml18
5 files changed, 117 insertions, 37 deletions
diff --git a/config.py b/config.py
index 4bd495a..2db70db 100644
--- a/config.py
+++ b/config.py
@@ -1,8 +1,8 @@
-INTERACTIVE_MODE=True
+INTERACTIVE_MODE=False
PLATFORM="Telegraf"
-#### Nifi ####
+#### Nifi specific ####
NIFI_HOST="https://127.0.0.1.nip.io"
NIFI_DEPLOY=True
@@ -10,7 +10,7 @@ NIFI_DEPLOY=True
NIFI_USER="lab08nifiuser"
NIFI_PASS="tartunifi2023"
-MEASUREMENT_NAME="testers_measurementersNames"
+MEASUREMENT_NAME="Tartu_ilmaandmed"
## Database
DB_URL="http://influxdb:8086"
@@ -19,18 +19,15 @@ DB_NAME="telegraf_weatherData"
DB_USER="admin"
DB_PASS="admin"
-#### Telegraf ####
-
-#TBA
-
-
#### Over all ####
## Needed if Interactive mode turned off
-API_URL="https://api.open-meteo.com/v1/forecast?latitude=37.9838&longitude=23.7275&current_weather=true"
-API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'}
+#API_URL="https://api.open-meteo.com/v1/forecast?latitude=37.9838&longitude=23.7275&current_weather=true"
+API_URL="https://api.openweathermap.org/data/2.5/weather?q=Tartu&units=metric&lang=en&appid=01786b7e8b623a1d2112d672ecae1d0d"
+#API_FIELDS={'temperature': '.current_weather.temperature', 'windspeed': '.current_weather.windspeed'}
+API_FIELDS={'temp': '.main.temp', 'winds': '.wind.speed'}
API_USERNAME="rasmus.luha"
-API_PASSWORD="rasmusPass"
-PIPELINE_SCHEDULING_PERIOD="15"
-PIPELINE_NAME="TestPiper"
+API_PASSWORD="Placeholder"
+PIPELINE_SCHEDULING_PERIOD="10"
+PIPELINE_NAME="OpenWeather_pipeline"
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index 79ab5b9..6eddbad 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -44,7 +44,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
## Measurements name defining
if config.INTERACTIVE_MODE:
- measurements_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): "))
+ measurements_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): "))+" "
else:
measurements_name = config.MEASUREMENT_NAME+" "
@@ -55,6 +55,7 @@ def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_
nifi_utils.update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path)
## EvaluateJsonPath processor setup
+ ## TODO
for key, value in data_values.items() :
path_parts = value.split(']')
nifi_utils.update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
@@ -93,6 +94,7 @@ def build_pipeline():
if config.INTERACTIVE_MODE:
data_values, api_url, api_username, api_password= common.get_data_values()
+ print(data_values)
print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"
diff --git a/modules/telegraf/core.py b/modules/telegraf/core.py
index 64fe893..0c9f9c7 100644
--- a/modules/telegraf/core.py
+++ b/modules/telegraf/core.py
@@ -10,6 +10,7 @@ import config
import toml
import shutil
+import sys
@@ -26,7 +27,6 @@ def introduction():
def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name, api_username, api_password, template_name):
-
## Pipeline interval
telegraf_utils.modify_agent(new_pipeline_path,"interval", schedulingPeriod)
@@ -39,32 +39,29 @@ def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, m
if template_name == "basic_ETL.toml":
-
for key, value in data_values.items():
fields.append(key)
-
parts = value.rsplit('.', 2)
json_query = '.'.join(parts[:-1])[1:] # Get the json path till last item (second last dot(.))
-
telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query)
telegraf_utils.modify_input(new_pipeline_path,"fieldinclude", fields)
-
elif template_name == "advanced_ETL.toml":
-
for key, value in data_values.items():
-
parts = value.split(']', 1)
json_query = parts[0].split("[")[0][1:]
fields.append(parts[1][1:])
-
-
telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query)
telegraf_utils.modify_input(new_pipeline_path,"json_string_fields", fields)
-
-
-
+ elif template_name == "different_jsonPaths_ETL.toml":
+ for key, value in data_values.items():
+ fields.append(value[1:].replace(".","_"))
+ telegraf_utils.modify_input(new_pipeline_path, "fieldpass", fields)
+ #sys.exit(1)
+ else:
+ print("Malli valimisel tekkis probleem...")
+ sys.exit(1)
## Measurement
telegraf_utils.modify_input(new_pipeline_path,"name_override", measurement_name)
@@ -82,8 +79,6 @@ def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, m
telegraf_utils.modify_input(new_pipeline_path,"password", api_password)
-
-
def build_pipeline():
if config.INTERACTIVE_MODE:
data_values, api_url, api_username, api_password= common.get_data_values()
@@ -103,14 +98,22 @@ def build_pipeline():
api_password = config.API_PASSWORD
measurement_name = config.MEASUREMENT_NAME
+
+ ### Select template
+ ## Check if multiple root json paths template should be used
+ prev=""
+ multpleJsonPaths=False
+ for el in data_values.values():
+ cur = el.split(".", 2)[1]
+ if cur != prev and prev != "":
+ multpleJsonPaths = True
+ prev = cur
-
- ### Select template
- ##TODO
- #template_name="basic_ETL.toml"
if (api_username and api_username.lower() != "placeholder") and (api_password and api_password.lower() != "placeholder"):
template_name="advanced_ETL.toml"
+ elif multpleJsonPaths:
+ template_name="different_jsonPaths_ETL.toml"
else:
template_name="basic_ETL.toml"
diff --git a/modules/telegraf/telegraf_utils.py b/modules/telegraf/telegraf_utils.py
index 62bc4ea..2e72fbf 100644
--- a/modules/telegraf/telegraf_utils.py
+++ b/modules/telegraf/telegraf_utils.py
@@ -17,8 +17,6 @@ def modify_input(new_pipeline_path, key, value):
##modify_input("templates/basic_ETL.toml", "test_pipers.toml, "urls", ["stillTesting"])
-
-## TODO
def modify_agent(new_pipeline_path, key, value):
data = toml.load(new_pipeline_path)
pluggin = data["agent"]
@@ -34,9 +32,6 @@ def modify_agent(new_pipeline_path, key, value):
toml.dump(data, f)
-
-
-## TODO
def modify_output(new_pipeline_path, key, value):
data = toml.load(new_pipeline_path)
pluggin = data["outputs"]["influxdb"][0]
@@ -50,3 +45,68 @@ def modify_output(new_pipeline_path, key, value):
with open(new_pipeline_path, "w") as f:
toml.dump(data, f)
+
+
+
+
+### different_jsonPaths_ETL template funcs ###
+
+
+#def modify_processorsConventer(new_pipeline_path, key, value):
+# data = toml.load(new_pipeline_path)
+# #print(data)
+# pluggin = data["processors"]["converter"][0]["fields"]
+# print(pluggin)
+#
+# if key in pluggin:
+# pluggin[key] = value
+# with open(new_pipeline_path, "w") as f:
+# toml.dump(data, f)
+#
+#
+#def modify_processorsRename(new_pipeline_path, key, value):
+# data = toml.load(new_pipeline_path)
+# pluggin = data["processors"]["rename"][0]["replace"][0]
+# print(pluggin)
+# pluggin = data["processors"]["rename"][0]["replace"][1]
+# print(pluggin)
+#
+# if key in pluggin:
+# pluggin[key] = value
+# with open(new_pipeline_path, "w") as f:
+# toml.dump(data, f)
+#
+
+
+
+
+
+
+
+
+### ChatGPT was used in the procesess of creating this function
+## def add_new_replace_block(new_pipeline_name):
+##
+## new_block = """ [[processors.rename.replace]]
+## field = "placeholder"
+## dest = "placeholder"
+## """
+##
+## with open(new_pipeline_name, "r") as file:
+## lines = file.readlines()
+##
+## # Find the last occurrence of '[[processors.rename.replace]]'
+## insert_index = -1
+## for i, line in enumerate(lines):
+## if line.strip().startswith("[[processors.rename.replace]]"):
+## insert_index = i
+##
+## while insert_index + 1 < len(lines) and lines[insert_index + 1].startswith(" "):
+## insert_index += 1
+##
+## # Insert the new block
+## lines.insert(insert_index + 1, new_block + "\n")
+##
+## with open(new_pipeline_name, "w") as file:
+## file.writelines(lines)
+##
diff --git a/modules/telegraf/templates/different_jsonPaths_ETL.toml b/modules/telegraf/templates/different_jsonPaths_ETL.toml
new file mode 100644
index 0000000..d7825e3
--- /dev/null
+++ b/modules/telegraf/templates/different_jsonPaths_ETL.toml
@@ -0,0 +1,18 @@
+[agent]
+interval = "10s"
+debug = true
+
+[[inputs.http]]
+ urls = [ ]
+ method = "GET"
+ timeout = "10s"
+ data_format = "json"
+ json_query = "" # empty = use whole JSON
+ name_override = "placeholder"
+ fieldpass = []
+
+[[outputs.influxdb]]
+ urls = []
+ database = "Placeholder"
+ username = "admin"
+ password = "admin"