diff options
-rw-r--r-- | README.md | 45 | ||||
-rw-r--r-- | common/core.py | 51 | ||||
-rw-r--r-- | config.py | 15 | ||||
-rw-r--r-- | modules/nifi/core.py | 53 | ||||
-rw-r--r-- | modules/nifi/nifi_utils.py | 82 | ||||
-rw-r--r-- | pipelines/test_pipeline.json | 6 |
6 files changed, 225 insertions, 27 deletions
@@ -1,4 +1,41 @@ -Next step: modify processor - - templates added: DONE - - Chose which one to use: IN PROGRESS - - and use it +# Pipeline generator +This is a pipeline generator that currently supports the following platforms: `Nifi`, `Telegraf`(TODO). + +## Usage +WIP +- `source venv/bin/activate` +- `pip install -r requirements.txt` +- Setup sutff in `config.py` (database and nifi auth etc) +- Then run `main.py` + + +## Configuration info +Can configure under config.py +sample VAR-s are given +WIP + + +## Current Repo structure +|-- common +| `-- core.py +|-- config.py +|-- main.py +|-- modules +| |-- nifi +| | |-- core.py +| | `-- templates +| | |-- ... +| `-- telegraf +| `-- core.py +|-- pipelines +| `-- <Generated pipelines> +|-- README.md +`-- requirements.txt + + +### Samples + +Currently testing with 2 following apis (sample urls): + - https://api.open-meteo.com/v1/forecast?latitude=58.38&longitude=26.72¤t_weather=true + - https://delta.iot.cs.ut.ee/measurement/measurements?source=780&?dateFrom=2024-10-15T00:00:00Z&dateTo=2024-10-16T00:00:01Z&pageSize=200&type=KogEN + diff --git a/common/core.py b/common/core.py index 3eb46df..3d4fac1 100644 --- a/common/core.py +++ b/common/core.py @@ -87,3 +87,54 @@ def inspect_json_top_level(json_data): print(f"\nValitud võti: '{selected_key}':") path += "." + selected_key return {selected_key: path} + + + + + +###### Other option ########## + +def inspect_json_top_level_test(json_data, has_list=False): + path = "" + + while True: + print(json.dumps(json_data, indent=2)) + print("\nVali json võti või indeks millest soovid väärtuse andmekonveieriga ekstrakteerida\n") + + if isinstance(json_data, dict): + keys = list(json_data.keys()) + for index, key in enumerate(keys): + value = json_data[key] + value_type = type(value).__name__ + suggestion = "SplitJson" if isinstance(value, list) else "EvaluateJsonPath" + print(f" [{index}] {key} ({value_type}) → {suggestion}") + + selected_index = ask_digit_input(len(keys) - 1) + selected_key = keys[selected_index] + selected_value = json_data[selected_key] + path += "." + selected_key + + elif isinstance(json_data, list): + has_list = True + for index, item in enumerate(json_data): + item_type = type(item).__name__ + print(f" [{index}] [{item_type}]") + + selected_index = ask_digit_input(len(json_data) - 1) + selected_value = json_data[selected_index] + path += f"[{selected_index}]" + + else: + # Primitive value, nothing to dive into + print(f"\nLõppväärtus: {json_data}") + return {"value": path} + + + + if isinstance(selected_value, (dict, list)): + json_data = selected_value + else: + #print(f"\nValitud väärtus: '{selected_value}'") + print(f"\nValitud väärtus: '{path}'") + return {"value": path} + @@ -1,4 +1,11 @@ -## TODO -DB=influxdb -DB_USER= -DB_PASS= +## Nifi +NIFI_USER="lab08nifiuser" +NIFI_PASS="tartunifi2023" + + +## Database +DB_URL="http://influxdb:8086/write?db=nifi_weatherData" +DB_USER="admin" + +## TODO - somehow must be hidden inside the pipeline in the end +DB_PASS="admin" diff --git a/modules/nifi/core.py b/modules/nifi/core.py index 50373a8..6746bc6 100644 --- a/modules/nifi/core.py +++ b/modules/nifi/core.py @@ -1,10 +1,14 @@ from pyfiglet import figlet_format from rich.console import Console from common import core as common +import config as config ## TODO - check syntax +#from modules.nifi import nifi_utils as nifi_utils + import sys import json import shutil +import requests def introduction(): @@ -14,7 +18,6 @@ def introduction(): print("Valisid Nifi Platformi!\n") -## TODO def update_template(file_path, dot_path, new_key, new_value): # Step 2: Load the copied JSON @@ -43,14 +46,19 @@ def update_template(file_path, dot_path, new_key, new_value): print("✅ Changes saved.") -### Example Usage ### -# copy_and_modify_json( -# "template.json", -# "pipeline_copy.json", -# "flowContents.processors[1].properties", -# "New Config Key", -# "New Config Value" -# ) + + +def set_database_credentials(file_path,dot_path): + ## Update URL + update_template(file_path, dot_path, "HTTP URL", config.DB_URL) + + ## Update username + update_template(file_path, dot_path, "username", config.DB_USER) + + ## Update username + update_template(file_path, dot_path, "password", config.DB_PASS) + + @@ -76,8 +84,8 @@ def get_data_values(): if api_url_correct: while True: - chosen_json_values.update(common.inspect_json_top_level(json_data)) - print("Oled hetkel valinud järgmised väärtused:", ", ".join(chosen_json_values)) + chosen_json_values.update(common.inspect_json_top_level_test(json_data)) + print("Oled hetkel valinud järgmised väärtused JSON lõppväärtused: ", ", ".join(chosen_json_values)) choose_another = common.ask_binary_input(prompt="\nKas soovid (v)alida veel mõne väärtuse või liikuda (e)dasi?(v/e): ",valikud=["v","e"]).strip().lower() if choose_another == 'e': @@ -88,17 +96,28 @@ def get_data_values(): print("Väljun programmist.") sys.exit() + + +def update_template_with_json_list(): + update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) + +## TODO - textReplace part -> fix templates def build_pipeline(): data_values = get_data_values() - ## TODO - shutil.copy("modules/nifi/templates/basic_ETL.json", "pipelines/test_pipeline.json") - - ## TODO + ## TODO - unhardcode + new_pipeline_path = "pipelines/test_pipeline.json" + shutil.copy("modules/nifi/templates/basic_ETL.json", new_pipeline_path) + ## TODO - make a function for different types ... etc for key, value in data_values.items() : - #print (key, value) - update_template("pipelines/test_pipeline.json", "flowContents.processors[2].properties", key, value) + update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value) + + set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties") + print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier asub siin: {new_pipeline_path}.") + ## TODO - not working + #nifi_utils.upload_nifi_exported_flow( nifi_host="https://127.0.0.1.nip.io", username=config.NIFI_USER, password=config.NIFI_PASS, json_file_path="pipelines/test_pipeline.json", verify_ssl=False) + diff --git a/modules/nifi/nifi_utils.py b/modules/nifi/nifi_utils.py new file mode 100644 index 0000000..9994d61 --- /dev/null +++ b/modules/nifi/nifi_utils.py @@ -0,0 +1,82 @@ +import requests +import json + +############################## + +def extract_snippet_fields(flow): + return { + key: flow.get(key, []) + for key in [ + "processors", + "connections", + "funnels", + "inputPorts", + "outputPorts", + "labels", + "controllerServices", + "processGroups" + ] + } + +def upload_nifi_exported_flow( + nifi_host: str, + username: str, + password: str, + json_file_path: str, + verify_ssl: bool = False +): + try: + token_resp = requests.post( + f"{nifi_host}/nifi-api/access/token", + data={"username": username, "password": password}, + verify=verify_ssl + ) + token_resp.raise_for_status() + token = token_resp.text + + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json" + } + + root_resp = requests.get(f"{nifi_host}/nifi-api/flow/process-groups/root", headers=headers, verify=verify_ssl) + root_resp.raise_for_status() + root_pg_id = root_resp.json()["processGroupFlow"]["id"] + + with open(json_file_path, "r") as f: + raw = json.load(f) + + flow = raw.get("flowContents") + if not flow: + raise ValueError("Missing 'flowContents' in provided file.") + + snippet = extract_snippet_fields(flow) + + payload = { + "revision": { "version": 0 }, + "component": { + "name": flow.get("name", "ImportedGroup"), + "position": { "x": 0.0, "y": 0.0 }, + "flowSnippet": snippet + } + } + + url = f"{nifi_host}/nifi-api/process-groups/{root_pg_id}/process-groups" + resp = requests.post(url, headers=headers, json=payload, verify=verify_ssl) + + if resp.status_code == 201: + print("✅ Flow uploaded successfully!") + pg_id = resp.json()["component"]["id"] + print(f"🔗 View it at: {nifi_host}/nifi/#/process-groups/{root_pg_id}/process-group/{pg_id}") + else: + print(f"❌ Upload failed: {resp.status_code} - {resp.text}") + + except Exception as e: + print(f"🚨 Error: {e}") + + + + + + + diff --git a/pipelines/test_pipeline.json b/pipelines/test_pipeline.json index 2374eb7..3713b88 100644 --- a/pipelines/test_pipeline.json +++ b/pipelines/test_pipeline.json @@ -461,7 +461,7 @@ "Null Value Representation": "empty string", "Path Not Found Behavior": "ignore", "longitude": "$.longitude", - "temperature": ".current_weather.temperature" + "value": "$.measurements[0].KogEN.T.value" }, "propertyDescriptors": { "Destination": { @@ -595,7 +595,9 @@ "Request Digest Authentication Enabled": "false", "Request Multipart Form-Data Name": null, "Response Cache Size": "10MB", - "Response Body Ignored": "false" + "Response Body Ignored": "false", + "username": "admin", + "password": "admin" }, "propertyDescriptors": { "Request Content-Encoding": { |