summaryrefslogtreecommitdiff
path: root/modules/nifi/core.py
blob: 5c1583777bbb8a631f6f5c7aaed80b5026ef32f0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
## TODO - check syntax
from common import core as common
import config
from modules.nifi import nifi_utils

from pyfiglet import figlet_format
from rich.console import Console

import shutil
import re


def introduction():
    console = Console()
    ascii_art = figlet_format("Nifi")
    console.print(ascii_art, style="cyan")
    print("Valisid Nifi Platformi!\n")



def modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password):
    """
    data_values: valitud andmeväljad, mida konveier filtreerib
    scedulingPeriod: kui tihti konveier jookseb
    new_pipeline_name: uue konveieri nimi
    api_url: andmete tõmbamise API url
    api_username: Olemasolu korral API kasutaja nimi
    api_parool: Olemasolu korral API kasutaja parool

    Teeb mallis kõik vajalikud muudatused andmekonveieri valmimiseks
    """


    ############### Choosing and modfyfing Template ##############

    ### Check if splitJson template needed
    needs_SplitJson = False
    path_parts = []
    for el in data_values.values():
        if '[' in el:
            needs_SplitJson = True
            #path_parts = el.split(']')
            path_parts = re.split(r'(?<=\])', el)

    ### Select template 
    if needs_SplitJson:
        template_name="splitJsonETL.json"
    else:
        template_name="basic_ETL.json"

    new_pipeline_path = f"pipelines/{new_pipeline_name}"
    shutil.copy(f"modules/nifi/templates/{template_name}", new_pipeline_path)


    ### Processor editing

    ## Measurements name defining
    if config.INTERACTIVE_MODE:
        measurements_name = str(input("Palun sisesta andmebaasi jaoks vajalik 'measurement' nimi (influxDB): "))+" "
    else:
        measurements_name = config.MEASUREMENT_NAME+" "


    if needs_SplitJson:
        ## SplitJson update
        split_json_path = "$"+re.sub(r'\[(.*?)\]', r'[*]', path_parts[0])
        nifi_utils.update_template(new_pipeline_path, "flowContents.processors[3].properties", "JsonPath Expression", split_json_path)

        ## EvaluateJsonPath processor setup
        ## TODO
        for key, value in data_values.items() :
            path_parts = value.split(']')
            nifi_utils.update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+path_parts[1])
            measurements_name+=f"{key}=${{{key}}},"

        ## Database Setup
        nifi_utils.set_database_credentials(new_pipeline_path, "flowContents.processors[4].properties")
    else:
        ## EvaluateJsonPath processor setup
        for key, value in data_values.items() :
            nifi_utils.update_template(new_pipeline_path, "flowContents.processors[2].properties", key, "$"+value)
            measurements_name+=f"{key}=${{{key}}},"

        ## Database Setup
        nifi_utils.set_database_credentials(new_pipeline_path, "flowContents.processors[3].properties")

    ## ReplaceText processor update - making it compatible for timeseries database (influxDB)
    nifi_utils.update_template(new_pipeline_path, "flowContents.processors[0].properties", "Replacement Value", measurements_name[:-1]) # Delete last coma

    ## Update API call URL
    nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "HTTP URL", api_url)

    ## Update scheduling Periond on API Calls
    nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1]", "schedulingPeriod", schedulingPeriod)

    ## Add api credentials
    if api_username != "placeholder":
        nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Username", api_username)
        nifi_utils.update_template(new_pipeline_path, "flowContents.processors[1].properties", "Request Password", api_password)


###


def build_pipeline():
    """
    Ehitab andmekonveieri kokku ning paigaldab soovi korral ka platvormile
    """

    if config.INTERACTIVE_MODE:
        data_values, api_url, api_username, api_password= common.get_data_values()
        print(data_values)

        print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
        schedulingPeriod = str(common.ask_digit_input(86400))+ "sec"

        new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".json"

    else:
        api_url = config.API_URL
        data_values = config.API_FIELDS
        schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD+"sec"
        new_pipeline_name = config.PIPELINE_NAME+".json"
        api_username = config.API_USERNAME
        api_password = config.API_PASSWORD

    modify_all_processors(data_values, schedulingPeriod, new_pipeline_name, api_url, api_username, api_password)
    print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.")


    ## Pipeline Deployment
    if (config.NIFI_DEPLOY):
        token = nifi_utils.get_access_token()
        nifi_utils.upload_nifi_pipeline(token, f"pipelines/{new_pipeline_name}", new_pipeline_name.split(".")[0], username=config.NIFI_USER, password=config.NIFI_PASS, nifi_url=config.NIFI_HOST, position_x=0, position_y=0)
    else:
        choice = common.ask_binary_input(prompt="\nKas soovid genereeritud andmekonveieri nifi platvormile paigaldada?(jah/ei): ",valikud=["jah","ei"]).strip().lower()
        if choice == "jah":
            token = nifi_utils.get_access_token()
            nifi_utils.upload_nifi_pipeline(token, f"pipelines/{new_pipeline_name}", new_pipeline_name.split(".")[0], username=config.NIFI_USER, password=config.NIFI_PASS, nifi_url=config.NIFI_HOST, position_x=0, position_y=0)