1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
from pyfiglet import figlet_format
from rich.console import Console
from modules.telegraf import telegraf_utils
#from modules.nifi import nifi_utils
from common import core as common
import config
import toml
import shutil
import sys
def introduction():
console = Console()
ascii_art = figlet_format("Telegraf")
console.print(ascii_art, style="cyan")
print("Valisid Telegraf Platformi!\n")
###########################
def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name, api_username, api_password, template_name):
"""
data_values: valitud andmeväljad, mida konveier filtreerib
scedulingPeriod: kui tihti konveier jookseb
new_pipeline_name: uue konveieri nimi
api_url: andmete tõmbamise API url
api_username: Olemasolu korral API kasutaja nimi
api_parool: Olemasolu korral API kasutaja parool
Teeb mallis kõik vajalikud muudatused andmekonveieri valmimiseks
"""
## Pipeline interval
telegraf_utils.modify_agent(new_pipeline_path,"interval", schedulingPeriod)
## API url
telegraf_utils.modify_input(new_pipeline_path,"urls", [api_url])
### Pluggins
fields=[]
json_query = ""
if template_name == "basic_ETL.toml":
for key, value in data_values.items():
fields.append(key)
parts = value.rsplit('.', 2)
json_query = '.'.join(parts[:-1])[1:] # Get the json path till last item (second last dot(.))
telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query)
telegraf_utils.modify_input(new_pipeline_path,"fieldinclude", fields)
elif template_name == "advanced_ETL.toml":
for key, value in data_values.items():
parts = value.split(']', 1)
json_query = parts[0].split("[")[0][1:]
fields.append(parts[1][1:])
telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query)
telegraf_utils.modify_input(new_pipeline_path,"json_string_fields", fields)
elif template_name == "different_jsonPaths_ETL.toml":
for key, value in data_values.items():
fields.append(value[1:].replace(".","_"))
telegraf_utils.modify_input(new_pipeline_path, "fieldpass", fields)
#sys.exit(1)
else:
print("Malli valimisel tekkis probleem...")
sys.exit(1)
## Measurement
telegraf_utils.modify_input(new_pipeline_path,"name_override", measurement_name)
## Database
telegraf_utils.modify_output(new_pipeline_path, "urls", [config.DB_URL])
telegraf_utils.modify_output(new_pipeline_path, "database", config.DB_NAME)
telegraf_utils.modify_output(new_pipeline_path, "username", config.DB_USER)
telegraf_utils.modify_output(new_pipeline_path, "password", config.DB_PASS)
## If authenctication needed
if api_username and api_username.lower() != "placeholder":
telegraf_utils.modify_input(new_pipeline_path,"username", api_username)
telegraf_utils.modify_input(new_pipeline_path,"password", api_password)
def build_pipeline():
"""
Ehitab andmekonveieri kokku
"""
if config.INTERACTIVE_MODE:
data_values, api_url, api_username, api_password= common.get_data_values()
print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
schedulingPeriod = str(common.ask_digit_input(86400))+ "s"
measurement_name = str(input("Palun sisesta andmebaasi (influxDB) jaoks vajalik 'measurement' nimi: "))
new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".toml"
else:
api_url = config.API_URL
data_values = config.API_FIELDS
schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD+"s"
new_pipeline_name = config.PIPELINE_NAME+".toml"
api_username = config.API_USERNAME
api_password = config.API_PASSWORD
measurement_name = config.MEASUREMENT_NAME
## Check that API URL is correct
needs_auth = False
if (api_username.lower() and api_username.lower() != "placeholder") and (api_password.lower() and api_password.lower() != "placeholder"):
needs_auth = True
_, api_url_correct = common.is_app_url_correct(api_url, needs_auth, api_username, api_password)
if not api_url_correct:
print("\nEtteantud API URL-i kutsel tekkis viga, sulgen rakenduse...")
sys.exit(1)
### Select template
## Check if multiple root json paths template should be used
prev=""
multpleJsonPaths=False
for el in data_values.values():
cur = el.split(".", 2)[1]
if cur != prev and prev != "":
multpleJsonPaths = True
prev = cur
if (api_username and api_username.lower() != "placeholder") and (api_password and api_password.lower() != "placeholder"):
template_name="advanced_ETL.toml"
elif multpleJsonPaths:
template_name="different_jsonPaths_ETL.toml"
else:
template_name="basic_ETL.toml"
new_pipeline_path = f"pipelines/{new_pipeline_name}"
shutil.copy(f"modules/telegraf/templates/{template_name}", new_pipeline_path)
modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name, api_username, api_password, template_name)
print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.")
|