summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md45
-rw-r--r--common/core.py51
-rw-r--r--config.py15
-rw-r--r--modules/nifi/core.py53
-rw-r--r--modules/nifi/nifi_utils.py82
-rw-r--r--pipelines/test_pipeline.json6
6 files changed, 225 insertions, 27 deletions
diff --git a/README.md b/README.md
index a923cd3..78f0718 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,41 @@
-Next step: modify processor
- - templates added: DONE
- - Chose which one to use: IN PROGRESS
- - and use it
+# Pipeline generator
+This is a pipeline generator that currently supports the following platforms: `Nifi`, `Telegraf`(TODO).
+
+## Usage
+WIP
+- `source venv/bin/activate`
+- `pip install -r requirements.txt`
+- Setup sutff in `config.py` (database and nifi auth etc)
+- Then run `main.py`
+
+
+## Configuration info
+Can configure under config.py
+sample VAR-s are given
+WIP
+
+
+## Current Repo structure
+|-- common
+| `-- core.py
+|-- config.py
+|-- main.py
+|-- modules
+| |-- nifi
+| | |-- core.py
+| | `-- templates
+| | |-- ...
+| `-- telegraf
+| `-- core.py
+|-- pipelines
+| `-- <Generated pipelines>
+|-- README.md
+`-- requirements.txt
+
+
+### Samples
+
+Currently testing with 2 following apis (sample urls):
+ - https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72&current_weather=true
+ - https://delta.iot.cs.ut.ee/measurement/measurements?source=780&?dateFrom=2024-10-15T00:00:00Z&dateTo=2024-10-16T00:00:01Z&pageSize=200&type=KogEN
+
diff --git a/common/core.py b/common/core.py
index 3eb46df..3d4fac1 100644
--- a/common/core.py
+++ b/common/core.py
@@ -87,3 +87,54 @@ def inspect_json_top_level(json_data):
print(f"\nValitud võti: '{selected_key}':")
path += "." + selected_key
return {selected_key: path}
+
+
+
+
+
+###### Other option ##########
+
+def inspect_json_top_level_test(json_data, has_list=False):
+ path = ""
+
+ while True:
+ print(json.dumps(json_data, indent=2))
+ print("\nVali json võti või indeks millest soovid väärtuse andmekonveieriga ekstrakteerida\n")
+
+ if isinstance(json_data, dict):
+ keys = list(json_data.keys())
+ for index, key in enumerate(keys):
+ value = json_data[key]
+ value_type = type(value).__name__
+ suggestion = "SplitJson" if isinstance(value, list) else "EvaluateJsonPath"
+ print(f" [{index}] {key} ({value_type}) → {suggestion}")
+
+ selected_index = ask_digit_input(len(keys) - 1)
+ selected_key = keys[selected_index]
+ selected_value = json_data[selected_key]
+ path += "." + selected_key
+
+ elif isinstance(json_data, list):
+ has_list = True
+ for index, item in enumerate(json_data):
+ item_type = type(item).__name__
+ print(f" [{index}] [{item_type}]")
+
+ selected_index = ask_digit_input(len(json_data) - 1)
+ selected_value = json_data[selected_index]
+ path += f"[{selected_index}]"
+
+ else:
+ # Primitive value, nothing to dive into
+ print(f"\nLõppväärtus: {json_data}")
+ return {"value": path}
+
+
+
+ if isinstance(selected_value, (dict, list)):
+ json_data = selected_value
+ else:
+ #print(f"\nValitud väärtus: '{selected_value}'")
+ print(f"\nValitud väärtus: '{path}'")
+ return {"value": path}
+
diff --git a/config.py b/config.py
index 1274541..a41d686 100644
--- a/config.py
+++ b/config.py
@@ -1,4 +1,11 @@
-## TODO
-DB=influxdb
-DB_USER=
-DB_PASS=
+## Nifi
+NIFI_USER="lab08nifiuser"
+NIFI_PASS="tartunifi2023"
+
+
+## Database
+DB_URL="http://influxdb:8086/write?db=nifi_weatherData"
+DB_USER="admin"
+
+## TODO - somehow must be hidden inside the pipeline in the end
+DB_PASS="admin"
diff --git a/modules/nifi/core.py b/modules/nifi/core.py
index 50373a8..6746bc6 100644
--- a/modules/nifi/core.py
+++ b/modules/nifi/core.py
@@ -1,10 +1,14 @@
from pyfiglet import figlet_format
from rich.console import Console
from common import core as common
+import config as config ## TODO - check syntax
+#from modules.nifi import nifi_utils as nifi_utils
+
import sys
import json
import shutil
+import requests
def introduction():
@@ -14,7 +18,6 @@ def introduction():
print("Valisid Nifi Platformi!\n")
-## TODO
def update_template(file_path, dot_path, new_key, new_value):
# Step 2: Load the copied JSON
@@ -43,14 +46,19 @@ def update_template(file_path, dot_path, new_key, new_value):
print("✅ Changes saved.")
-### Example Usage ###
-# copy_and_modify_json(
-# "template.json",
-# "pipeline_copy.json",
-# "flowContents.processors[1].properties",
-# "New Config Key",
-# "New Config Value"
-# )
+
+
+def set_database_credentials(file_path,dot_path):
+ ## Update URL
+ update_template(file_path, dot_path, "HTTP URL", config.DB_URL)
+
+ ## Update username
+ update_template(file_path, dot_path, "username", config.DB_USER)
+
+ ## Update username
+ update_template(file_path, dot_path, "password", config.DB_PASS)
+
+
@@ -76,8 +84,8 @@ def get_data_values():
if api_url_correct:
while True:
- chosen_json_values.update(common.inspect_json_top_level(json_data))
- print("Oled hetkel valinud järgmised väärtused:", ", ".join(chosen_json_values))
+ chosen_json_values.update(common.inspect_json_top_level_test(json_data))
+ print("Oled hetkel valinud järgmised väärtused JSON lõppväärtused: ", ", ".join(chosen_json_values))
choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower()
if choose_another == 'e':
@@ -88,17 +96,28 @@ def get_data_values():
print("Väljun programmist.")
sys.exit()
+
+
+def update_template_with_json_list():
+ update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
+
+## TODO - textReplace part -> fix templates
def build_pipeline():
data_values = get_data_values()
- ## TODO
- shutil.copy("modules/nifi/templates/basic_ETL.json", "pipelines/test_pipeline.json")
-
- ## TODO
+ ## TODO - unhardcode
+ new_pipeline_path = "pipelines/test_pipeline.json"
+ shutil.copy("modules/nifi/templates/basic_ETL.json", new_pipeline_path)
+ ## TODO - make a function for different types ... etc
for key, value in data_values.items() :
- #print (key, value)
- update_template("pipelines/test_pipeline.json", "flowContents.processors[2].properties", key, value)
+ update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
+
+ set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")
+ print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.")
+ ## TODO - not working
+ #nifi_utils.upload_nifi_exported_flow( nifi_host="https://127.0.0.1.nip.io", username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False)
+
diff --git a/modules/nifi/nifi_utils.py b/modules/nifi/nifi_utils.py
new file mode 100644
index 0000000..9994d61
--- /dev/null
+++ b/modules/nifi/nifi_utils.py
@@ -0,0 +1,82 @@
+import requests
+import json
+
+##############################
+
+def extract_snippet_fields(flow):
+ return {
+ key: flow.get(key, [])
+ for key in [
+ "processors",
+ "connections",
+ "funnels",
+ "inputPorts",
+ "outputPorts",
+ "labels",
+ "controllerServices",
+ "processGroups"
+ ]
+ }
+
+def upload_nifi_exported_flow(
+ nifi_host: str,
+ username: str,
+ password: str,
+ json_file_path: str,
+ verify_ssl: bool = False
+):
+ try:
+ token_resp = requests.post(
+ f"{nifi_host}/nifi-api/access/token",
+ data={"username": username, "password": password},
+ verify=verify_ssl
+ )
+ token_resp.raise_for_status()
+ token = token_resp.text
+
+ headers = {
+ "Authorization": f"Bearer {token}",
+ "Content-Type": "application/json"
+ }
+
+ root_resp = requests.get(f"{nifi_host}/nifi-api/flow/process-groups/root", headers=headers, verify=verify_ssl)
+ root_resp.raise_for_status()
+ root_pg_id = root_resp.json()["processGroupFlow"]["id"]
+
+ with open(json_file_path, "r") as f:
+ raw = json.load(f)
+
+ flow = raw.get("flowContents")
+ if not flow:
+ raise ValueError("Missing 'flowContents' in provided file.")
+
+ snippet = extract_snippet_fields(flow)
+
+ payload = {
+ "revision": { "version": 0 },
+ "component": {
+ "name": flow.get("name", "ImportedGroup"),
+ "position": { "x": 0.0, "y": 0.0 },
+ "flowSnippet": snippet
+ }
+ }
+
+ url = f"{nifi_host}/nifi-api/process-groups/{root_pg_id}/process-groups"
+ resp = requests.post(url, headers=headers, json=payload, verify=verify_ssl)
+
+ if resp.status_code == 201:
+ print("✅ Flow uploaded successfully!")
+ pg_id = resp.json()["component"]["id"]
+ print(f"🔗 View it at: {nifi_host}/nifi/#/process-groups/{root_pg_id}/process-group/{pg_id}")
+ else:
+ print(f"❌ Upload failed: {resp.status_code} - {resp.text}")
+
+ except Exception as e:
+ print(f"🚨 Error: {e}")
+
+
+
+
+
+
+
diff --git a/pipelines/test_pipeline.json b/pipelines/test_pipeline.json
index 2374eb7..3713b88 100644
--- a/pipelines/test_pipeline.json
+++ b/pipelines/test_pipeline.json
@@ -461,7 +461,7 @@
"Null Value Representation": "empty string",
"Path Not Found Behavior": "ignore",
"longitude": "$.longitude",
- "temperature": ".current_weather.temperature"
+ "value": "$.measurements[0].KogEN.T.value"
},
"propertyDescriptors": {
"Destination": {
@@ -595,7 +595,9 @@
"Request Digest Authentication Enabled": "false",
"Request Multipart Form-Data Name": null,
"Response Cache Size": "10MB",
- "Response Body Ignored": "false"
+ "Response Body Ignored": "false",
+ "username": "admin",
+ "password": "admin"
},
"propertyDescriptors": {
"Request Content-Encoding": {