Copy"""
Copyright (c) 2024 Cisco and/or its affiliates.
This software is licensed to you under the terms of the Cisco Sample
Code License, Version 1.1 (the "License"). You may obtain a copy of the
License at
https://developer.cisco.com/docs/licenses
All use of the material herein must be in accordance with the terms of
the License. All rights not expressly granted by the License are
reserved. Unless required by applicable law or agreed to separately in
writing, software distributed under the License is distributed on an "AS
IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.
"""
'''
Manage Network Tunnel Groups in Umbrella.
Create API key credentials for an organization in Umbrella for Government.
Get an access token using the organization's API credentials.
Get the Network Tunnel Groups in the organization.
Create a Network Tunnel Group.
Get a specific network tunnel group.
Update a specific network tunnel group.
Get the status of a Network Tunnel Group.
Get the statuses of all Network Tunnel Groups in the organization.
Get the tunnel status for a specific network tunnel group, hub, and peer ID.
Delete a network tunnel group.
'''
import requests
import random
import string
import json
import os
from dotenv import dotenv_values
from oauthlib.oauth2 import BackendApplicationClient
from oauthlib.oauth2 import TokenExpiredError
from requests_oauthlib import OAuth2Session
from requests.auth import HTTPBasicAuth
dotenv_config = dotenv_values(".env")
token_url = os.environ.get('TOKEN_URL') or dotenv_config['TOKEN_URL']
client_id = os.environ.get('API_KEY') or dotenv_config['API_KEY']
client_secret = os.environ.get('API_SECRET') or dotenv_config['API_SECRET']
base_uri_env = os.environ.get('BASE_URI') or dotenv_config['BASE_URI']
deployments = 'deployments'
PUT = 'put'
POST = 'post'
GET = 'get'
DELETE = 'delete'
PATCH = 'patch'
output_dir = os.environ.get('OUTPUT_DIR') or dotenv_config['OUTPUT_DIR']
ntg_details_output_file = output_dir + "/ntg_details_json_get.out"
ntg_create_output_file = output_dir + "/ntg_create_json.out"
ntg_get_output_file = output_dir + "/ntg_get_json.out"
ntg_update_output_file = output_dir + "/ntg_update_json.out"
ntg_single_state_output_file = output_dir + "/ntg_single_state_json_get.out"
ntg_hub_peer_state_output_file = output_dir + "/ntg_hub_peer_state_json_get.out"
ntg_all_states_output_file = output_dir + "/ntg_all_states_json_get.out"
regions_output_file = output_dir + "/regions_json_get.out"
ntg_endpoint = "networktunnelgroups"
ntg_single_endpoint = ntg_endpoint + "/{}"
ntg_single_state_endpoint = ntg_endpoint + "/{}/state"
ntg_single_hub_peer_state_endpoint = ntg_endpoint + "/{}/networktunnelhubs/{}/peers/{}/state"
ntg_all_states_endpoint = "networktunnelgroupsstate"
regions_endpoint = "regions"
class API:
def __init__(self, url, ident, secret):
self.url = url
self.ident = ident
self.secret = secret
self.token = None
def GetToken(self):
auth = HTTPBasicAuth(self.ident, self.secret)
client = BackendApplicationClient(client_id=self.ident)
oauth = OAuth2Session(client=client)
self.token = oauth.fetch_token(token_url=self.url, auth=auth)
return self.token
def Query(self, scope, end_point, operation, request_data=None):
success = False
base_uri = base_uri_env + '/' + scope + "/v2"
req = None
if self.token == None:
self.GetToken()
while not success:
try:
api_headers = {
'Authorization': "Bearer " + self.token['access_token'],
"Content-Type": "application/json"
}
if operation in GET:
req = requests.get('{}/{}'.format(base_uri, end_point), headers=api_headers)
elif operation in PATCH:
req = requests.patch('{}/{}'.format(base_uri, end_point), headers=api_headers, json=request_data)
elif operation in POST:
req = requests.post('{}/{}'.format(base_uri, end_point), headers=api_headers, json=request_data)
elif operation in PUT:
req = requests.put('{}/{}'.format(base_uri, end_point), headers=api_headers, json=request_data)
elif operation in DELETE:
req = requests.delete('{}/{}'.format(base_uri, end_point), headers=api_headers)
req.raise_for_status()
success = True
except TokenExpiredError:
token = self.GetToken()
except Exception as e:
raise(e)
return req
def get_network_tunnel_groups(api, endpoint, ntg_file):
'''
Make an API request to the Network Tunnel Groups API API to get the network tunnel groups in the organization.
Write the Network Tunnel Groups information to a file.
Add the includeStatuses query parameter to the API request to get the network tunnel group and tunnel status information.
'''
print(f"Get the network tunnel groups in the organization.")
try:
endpoint = endpoint + "?includeStatuses=true"
response = api.Query(deployments, endpoint, GET)
if response.status_code == 200:
data = response.json()
with open(str(ntg_file), 'w', encoding='utf-8') as write_output_file:
write_output_file.writelines(json.dumps(data, indent=4))
print(f"Network Tunnel Group written to {ntg_file}.")
else:
print(f"Failed to retrieve Network Tunnel Groups. Status code: {response.status_code}")
print("Response:", response.text)
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}.")
def create_network_tunnel_group(api, endpoint, ntg_file, request_data):
'''
Make an API request to the Network Tunnel Groups API API to create a network tunnel group in the organization.
Write the Network Tunnel Group information to a file.
'''
print(f"Create the network tunnel group in the organization with data, {request_data}.")
try:
response = api.Query(deployments, endpoint, POST, request_data)
data = None
if response.status_code == 201:
data = response.json()
with open(str(ntg_file), 'w', encoding='utf-8') as write_output_file:
write_output_file.writelines(json.dumps(data, indent=4))
print(f"Network Tunnel Group information written to {ntg_file}.")
else:
print(f"Failed to retrieve Network Tunnel Group. Status code: {response.status_code}")
print("Response:", response.text)
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}.")
def get_network_tunnel_group(api, endpoint, ntg_file, id):
'''
Make an API request to the Network Tunnel Groups API to get the information about the specific network tunnel group.
'''
print(f"Get the network tunnel group in the organization.")
try:
url = endpoint.format(id)
print(f"Url for get tunnel group: {url}.")
response = api.Query(deployments, url, GET)
if response.status_code == 200:
data = response.json()
with open(str(ntg_file), 'w', encoding='utf-8') as write_output_file:
write_output_file.writelines(json.dumps(data, indent=4))
print(f"Network Tunnel Group information written to {ntg_file}.")
else:
print(f"Failed to retrieve the network tunnel group. Status code: {response.status_code}")
print("Response:", response.text)
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}.")
def update_network_tunnel_group(api, endpoint, ntg_file, id, request_data=None):
'''
Make an API request to the Network Tunnel Groups API to update the information about the specific network tunnel group.
'''
print(f"Update the network tunnel group in the organization.")
try:
url = endpoint.format(id)
print(f"Url for update tunnel group: {url}.")
response = api.Query(deployments, url, PATCH, request_data)
if response.status_code == 200:
data = response.json()
with open(str(ntg_file), 'w', encoding='utf-8') as write_output_file:
write_output_file.writelines(json.dumps(data, indent=4))
print(f"Network Tunnel Group information written to {ntg_file}.")
else:
print(f"Failed to retrieve network tunnel group. Status code: {response.status_code}")
print("Response:", response.text)
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}.")
def delete_network_tunnel_group(api, endpoint, id):
'''
Make an API request to the Network Tunnel Groups API to delete the specific network tunnel group.
'''
print(f"Delete the network tunnel group in the organization.")
try:
url = endpoint.format(id)
print(f"Url for delete tunnel group: {url}.")
response = api.Query(deployments, url, DELETE)
if response.status_code == 204:
print("Delete tunnel group and expected no content in response.")
else:
print(f"Failed to delete the network tunnel group. Status code: {response.status_code}")
print("Response:", response.text)
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}.")
def get_network_tunnel_group_state(api, endpoint, ntg_file, id):
'''
Make an API request to the Network Tunnel Groups API to get the state information for the specific network tunnel group.
'''
print(f"Get the network tunnel group state information.")
try:
url = endpoint.format(id)
print(f"Url for get state of tunnel group: {url}.")
response = api.Query(deployments, url, GET)
if response.status_code == 200:
data = response.json()
with open(str(ntg_file), 'w', encoding='utf-8') as write_output_file:
write_output_file.writelines(json.dumps(data, indent=4))
print(f"Network Tunnel Group state information written to {ntg_file}.")
else:
print(f"Failed to retrieve the network tunnel group. Status code: {response.status_code}")
print("Response:", response.text)
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}.")
def get_ntg_hub_peer_state(api, endpoint, ntg_file, path_parameters):
'''
Make an API request to the Network Tunnel Groups API to get the state information for the specific peer in the hub and network tunnel group.
'''
print(f"Get the state information for the specific peer in the hub and network tunnel group, {path_parameters}")
try:
id = None
hub_id = None
peer_id = None
id = path_parameters['id']
hub_id = path_parameters['hub_id']
peer_id = path_parameters['peer_id']
url = endpoint.format(id).format(hub_id).format(peer_id)
print(f"Url for get state of tunnel group, hub, and peer: {url}.")
response = api.Query(deployments, url, GET)
if response.status_code == 200:
data = response.json()
with open(str(ntg_file), 'w', encoding='utf-8') as write_output_file:
write_output_file.writelines(json.dumps(data, indent=4))
print(f"Network Tunnel Group state information for tunnel group, hub, and peer was written to {ntg_file}.")
else:
print(f"Failed to retrieve the state information for the peer, hub, and tunnel group. Status code: {response.status_code}")
print("Response:", response.text)
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}.")
def get_states_network_tunnel_groups(api, endpoint, ntg_file):
'''
Make an API request to the Network Tunnel Groups API API to get the statuses for all network tunnel groups in the organization.
Write the Network Tunnel Groups information to a file.
'''
print(f"Get the status information for all network tunnel groups in the organization.")
try:
response = api.Query(deployments, endpoint, GET)
if response.status_code == 200:
data = response.json()
with open(str(ntg_file), 'w', encoding='utf-8') as write_output_file:
write_output_file.writelines(json.dumps(data, indent=4))
print(f"State information for Network Tunnel Groups written to {ntg_file}.")
else:
print(f"Failed to retrieve Network Tunnel Groups. Status code: {response.status_code}")
print("Response:", response.text)
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}.")
def get_regions(api, endpoint, regions_file):
'''
Make an API request to the Network Tunnel Groups API API to get the BGP and regions information for the network tunnel groups in the organization.
Write the Regions information to a file.
'''
print(f"Get the regions information for the organization.")
try:
response = api.Query(deployments, endpoint, GET)
if response.status_code == 200:
data = response.json()
with open(str(regions_file), 'w', encoding='utf-8') as write_output_file:
write_output_file.writelines(json.dumps(data, indent=4))
print(f"Regions information written to {regions_file}.")
else:
print(f"Failed to retrieve the Regions information. Status code: {response.status_code}")
print("Response:", response.text)
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}.")
def get_random_string(length):
letters_and_digits = string.ascii_letters + string.digits
return ''.join(random.choice(letters_and_digits) for i in range(length))
def main():
env_vars = {}
env_vars['API_KEY'] = client_id
env_vars['API_SECRET'] = client_secret
env_vars['OUTPUT_DIR'] = output_dir
env_vars['TOKEN_URL'] = token_url
env_vars['BASE_URI'] = base_uri_env
for k, v in env_vars.items():
print(f"env variable: {k} and value: {v}")
if v == '' or v is None:
print("Required environment variable: {} not set".format(k))
exit()
try:
api = API(token_url, client_id, client_secret)
get_network_tunnel_groups(api, ntg_endpoint, ntg_details_output_file)
length = 10
random_string = get_random_string(length)
print(f"random_string: {random_string}")
request_data = {}
request_data['name'] = "Network Tunnel Group " + random_string
request_data['passphrase'] = "abcdefghijLMNOP1"
request_data['authIdPrefix'] = "test-tunnel-12-24"
request_data['region'] = "us-gov-east-1"
create_network_tunnel_group(api, ntg_endpoint, ntg_create_output_file, request_data)
id = None
primary_hub_id = None
secondary_hub_id = None
with open(ntg_create_output_file, 'r') as file:
data = json.load(file)
if 'id' in data:
id = data['id']
if 'hubs' in data:
primary_hub_id = data['hubs'][0]['id']
secondary_hub_id = data['hubs'][1]['id']
print(f"The tunnel group ID: {id}, primary hub ID: {primary_hub_id}, secondary hub ID: {secondary_hub_id}.")
get_network_tunnel_group(api, ntg_single_endpoint, ntg_get_output_file, id)
random_string = get_random_string(length)
request_data = []
data = {}
data['op'] = "replace"
data['path'] = "/name"
data['value'] = "New NTG " + random_string
request_data.append(data)
update_network_tunnel_group(api, ntg_single_endpoint, ntg_update_output_file, id, request_data)
get_network_tunnel_group_state(api, ntg_single_state_endpoint, ntg_single_state_output_file, id)
hub_id = primary_hub_id
peer_id = None
path_parameters = {}
path_parameters['id'] = id
path_parameters['hub_id'] = hub_id
path_parameters['peer_id'] = peer_id
get_states_network_tunnel_groups(api, ntg_all_states_endpoint, ntg_all_states_output_file)
get_regions(api, regions_endpoint, regions_output_file)
delete_network_tunnel_group(api, ntg_single_endpoint, id)
except Exception as e:
print(e)
if __name__ == "__main__":
main()