summaryrefslogtreecommitdiff
path: root/modules/nifi/core.py
blob: 79ab5b9a939ddd70d533534cb2a0aa828865522f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
## TODO - check syntax
from common import core as common
import config
from modules.nifi import nifi_utils

from pyfiglet import figlet_format
from rich.console import Console

import shutil
import re


def introduction():
    console = Console()
    ascii_art = figlet_format("Nifi")
    console.print(ascii_art, style="cyan")
    print("Valisid Nifi Platformi!\n")



def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password):
    ############### Choosing and modfyfing Template ##############

    ### Check if splitJson template needed
    needs_SplitJson = False
    path_parts = []
    for el in data_values.values():
        if '[' in el:
            needs_SplitJson = True
            #path_parts = el.split(']')
            path_parts = re.split(r'(?<=\])', el)

    ### Select template 
    if needs_SplitJson:
        template_name="splitJsonETL.json"
    else:
        template_name="basic_ETL.json"

    new_pipeline_path = f"pipelines/{new_pipeline_name}"
    shutil.copy(f"modules/nifi/templates/{template_name}", new_pipeline_path)


    ### Processor editing

    ## Measurements name defining
    if config.INTERACTIVE_MODE:
        measurements_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): "))
    else:
        measurements_name = config.MEASUREMENT_NAME+" "


    if needs_SplitJson:
        ## SplitJson update
        split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0])
        nifi_utils.update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path)

        ## EvaluateJsonPath processor setup
        for key, value in data_values.items() :
            path_parts = value.split(']')
            nifi_utils.update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
            measurements_name+=f"{key}=${{{key}}},"

        ## Database Setup
        nifi_utils.set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties")
    else:
        ## EvaluateJsonPath processor setup
        for key, value in data_values.items() :
            nifi_utils.update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
            measurements_name+=f"{key}=${{{key}}},"

        ## Database Setup
        nifi_utils.set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")

    ## ReplaceText processor update - making it compatible for timeseries database (influxDB)
    nifi_utils.update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name[:-1]) # Delete last coma

    ## Update API call URL
    nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "HTTP URL", api_url)

    ## Update scheduling Periond on API Calls
    nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod)

    ## Add api credentials
    if api_username != "placeholder":
        nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Username", api_username)
        nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Password", api_password)


###


def build_pipeline():

    if config.INTERACTIVE_MODE:
        data_values, api_url, api_username, api_password= common.get_data_values()

        print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
        schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"

        new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json"

    else:
        api_url = config.API_URL
        data_values = config.API_FIELDS
        schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD+"sec"
        new_pipeline_name = config.PIPELINE_NAME+".json"
        api_username = config.API_USERNAME
        api_password = config.API_PASSWORD

    modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password)
    print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.")


    ## Pipeline Deployment
    if (config.NIFI_DEPLOY):
        token = nifi_utils.get_access_token()
        nifi_utils.upload_nifi_pipeline(token, f"pipelines/{new_pipeline_name}", new_pipeline_name.split(".")[0], username=config.NIFI_USER, password=config.NIFI_PASS, nifi_url=config.NIFI_HOST, position_x=0, position_y=0)
    else:
        choice = common.ask_binary_input(prompt="\nKas soovid genereeritud andmekonveieri nifi platvormile paigaldada?(jah/ei): ",valikud=["jah","ei"]).strip().lower()
        if choice == "jah":
            token = nifi_utils.get_access_token()
            nifi_utils.upload_nifi_pipeline(token, f"pipelines/{new_pipeline_name}", new_pipeline_name.split(".")[0], username=config.NIFI_USER, password=config.NIFI_PASS, nifi_url=config.NIFI_HOST, position_x=0, position_y=0)