1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
from pyfiglet import figlet_format
from rich.console import Console
from modules.telegraf import telegraf_utils
#from modules.nifi import nifi_utils
from common import core as common
import config
import toml
import shutil
import sys
def introduction():
console = Console()
ascii_art = figlet_format("Telegraf")
console.print(ascii_art, style="cyan")
print("Valisid Telegraf Platformi!\n")
###########################
def modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name, api_username, api_password, template_name):
## Pipeline interval
telegraf_utils.modify_agent(new_pipeline_path,"interval", schedulingPeriod)
## API url
telegraf_utils.modify_input(new_pipeline_path,"urls", [api_url])
### Pluggins
fields=[]
json_query = ""
if template_name == "basic_ETL.toml":
for key, value in data_values.items():
fields.append(key)
parts = value.rsplit('.', 2)
json_query = '.'.join(parts[:-1])[1:] # Get the json path till last item (second last dot(.))
telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query)
telegraf_utils.modify_input(new_pipeline_path,"fieldinclude", fields)
elif template_name == "advanced_ETL.toml":
for key, value in data_values.items():
parts = value.split(']', 1)
json_query = parts[0].split("[")[0][1:]
fields.append(parts[1][1:])
telegraf_utils.modify_input(new_pipeline_path,"json_query", json_query)
telegraf_utils.modify_input(new_pipeline_path,"json_string_fields", fields)
elif template_name == "different_jsonPaths_ETL.toml":
for key, value in data_values.items():
fields.append(value[1:].replace(".","_"))
telegraf_utils.modify_input(new_pipeline_path, "fieldpass", fields)
#sys.exit(1)
else:
print("Malli valimisel tekkis probleem...")
sys.exit(1)
## Measurement
telegraf_utils.modify_input(new_pipeline_path,"name_override", measurement_name)
## Database
telegraf_utils.modify_output(new_pipeline_path, "urls", [config.DB_URL])
telegraf_utils.modify_output(new_pipeline_path, "database", config.DB_NAME)
telegraf_utils.modify_output(new_pipeline_path, "username", config.DB_USER)
telegraf_utils.modify_output(new_pipeline_path, "password", config.DB_PASS)
## If authenctication needed
if api_username and api_username.lower() != "placeholder":
telegraf_utils.modify_input(new_pipeline_path,"username", api_username)
telegraf_utils.modify_input(new_pipeline_path,"password", api_password)
def build_pipeline():
if config.INTERACTIVE_MODE:
data_values, api_url, api_username, api_password= common.get_data_values()
print("\nKui tihti peaks andmekonveier jooksma? (sekundites)")
schedulingPeriod = str(common.ask_digit_input(86400))+ "s"
measurement_name = str(input("Palun sisesta andmebaasi (influxDB) jaoks vajalik 'measurement' nimi: "))
new_pipeline_name=input("Mis saab andmekonveieri nimeks: ")+".toml"
## TODO
else:
api_url = config.API_URL
data_values = config.API_FIELDS
schedulingPeriod = config.PIPELINE_SCHEDULING_PERIOD+"s"
new_pipeline_name = config.PIPELINE_NAME+".toml"
api_username = config.API_USERNAME
api_password = config.API_PASSWORD
measurement_name = config.MEASUREMENT_NAME
### Select template
## Check if multiple root json paths template should be used
prev=""
multpleJsonPaths=False
for el in data_values.values():
cur = el.split(".", 2)[1]
if cur != prev and prev != "":
multpleJsonPaths = True
prev = cur
if (api_username and api_username.lower() != "placeholder") and (api_password and api_password.lower() != "placeholder"):
template_name="advanced_ETL.toml"
elif multpleJsonPaths:
template_name="different_jsonPaths_ETL.toml"
else:
template_name="basic_ETL.toml"
new_pipeline_path = f"pipelines/{new_pipeline_name}"
shutil.copy(f"modules/telegraf/templates/{template_name}", new_pipeline_path)
modify_template(new_pipeline_path, api_url, schedulingPeriod, data_values, measurement_name, api_username, api_password, template_name)
print(f"✅✅✅ Valmis. Uus genereeritud andmekoveier nimega '{new_pipeline_name}' asub kaustas 'pipelines'.")
|