Commit 9c213f21 authored by Mohd Bilal's avatar Mohd Bilal
Browse files

added mqtt subscriber

parent 3e82f58e
from utils.helpers import setup_required
from extensions.mqtt import AWSMQTT as mqtt
from app.models.users import User
from app.models.satellites import Satellite
from app.models.stations import Station
import traceback
import config
import logging
import datetime
import json
import time
import sys
import os
logger = logging.getLogger(__name__)
......@@ -22,6 +20,7 @@ class TIMGSNClient:
self.settings = None
self._is_setup = None
self.users = []
self.mqtt = mqtt()
@staticmethod
def get_instance(*args, **kwargs):
......@@ -29,8 +28,64 @@ class TIMGSNClient:
TIMGSNClient.__instances = TIMGSNClient(*args, **kwargs)
return TIMGSNClient.__instances
def _set_up(self):
logger.info("setting up TIMGSN client...")
def _callback_sub_ack(self, mid, data):
logger.info("Subscription successful..")
logger.info(f"SUBACK packet id: {mid}")
logger.info(f"Subscription QoS granted: {data}")
def _callback_schedule_update(self, client, user_data, message):
logger.info("new schedule received..")
# extract data
data = json.loads(message.payload)
overpasses = data.get("overpasses")
scheduling_method = data.get("scheduling_method")
updated_on = data.get('created_on')
for user in self.users:
stations = user.get_stations()
for station in stations:
schedule = []
for op in overpasses:
if op['station_name'] == station.name:
start = op['start_date_str'] + ' ' + op[
'start_time_str']
end = op['end_date_str'] + ' ' + op['end_time_str']
norad = op['norad']
contact_type = op['request_types'][0]
schedule.append({
"contact_type": contact_type,
"norad": norad,
"start": start,
"end": end,
"updated_on": updated_on,
"scheduling_method": scheduling_method
})
station.update_schedule(schedule=schedule,
schedule_type="timgsn")
#TODO: update satellite plan too
def _subscribe_station_update(self, **kwargs):
async_ = kwargs['async_'] if 'async_' in kwargs else False
topic = config.aws_topics_sub.get("schedule_update_sub")
if topic:
if async_:
self.mqtt.connection.subscribeAsync(
topic=topic,
QoS=1,
ackCallback=self._callback_sub_ack,
messageCallback=self._callback_schedule_update)
return True
self.mqtt.connection.subscribe(
topic=topic, QoS=1, callback=self._callback_schedule_update)
return True
return False
def _setup_mqtt(self):
self._subscribe_station_update(async_=True)
def _setup_user(self):
with open('settings.json', 'r') as f:
self.settings = json.load(f)
users_data = self.settings.get('client').get('users')
......@@ -39,14 +94,19 @@ class TIMGSNClient:
password=os.getenv(user_data.get("password_env")))
user.authenticate(os.getenv(user_data.get("username_env")),
os.getenv(user_data.get("password_env")))
logger.debug(f"user is authenticated: {user.is_authenticated}")
user.load_data(user_data)
self.users.append(user)
def _set_up(self):
logger.info("setting up TIMGSN client...")
self._setup_user()
self._setup_mqtt()
self._is_setup = True
def _tear_down(self):
for user in self.users:
user.terminate()
self.mqtt.teardown()
def stop(self):
self._tear_down()
......
......@@ -7,13 +7,16 @@
"band_rx": "UHF",
"band_tx": "UHF",
"scheduling_horizon": 86400,
"refresh_rate": 43200,
"schedule": {
"timgsn": [
{
"contact_type": "TELEMETRY",
"norad": 39446,
"start": "2021-04-20 13:54:30",
"end": "2021-04-20 14:05:30"
"end": "2021-04-20 14:05:30",
"updated_on": "2021-04-20 13:54:30",
"scheduling_method": "Event"
}
],
"local": []
......
......@@ -7,6 +7,7 @@ import json
import logging
import config
import itertools
import traceback
logger = logging.getLogger(__name__)
......@@ -42,14 +43,16 @@ class Satellite(ThreadedModule):
plan.extend([{
"contact_type": "TELEMETRY"
}] * (len(overpasses) - len(plan)))
# request overpasses
for op, action in zip(overpasses, plan):
response = self.request_overpass(op['uid'],
action['contact_type'])
logger.info(response)
responses.append(response)
except Exception as err:
logger.error(err)
logger.debug(traceback.format_exc())
def _parse_json_data(self, data):
self.name = data.get("name")
......@@ -80,7 +83,6 @@ class Satellite(ThreadedModule):
data = json.load(f)
if not data:
raise ValueError("Filename has no satellite data.")
logger.debug(data)
self._parse_json_data(data)
return data
......@@ -116,7 +118,7 @@ class Satellite(ThreadedModule):
self.owner.password),
data=data)
if response.status_code == 200:
overpasses.append(response.json()['overpasses'])
overpasses.extend(response.json()['overpasses'])
return overpasses
@setup_required
......@@ -148,7 +150,7 @@ class Satellite(ThreadedModule):
logger.info("Request to track overpass successful.")
tracking_requests = result.get("requests")
status = True
info = "Successfully created response."
info = tracking_requests
elif response.status_code == 404 or response.status_code == 403:
logger.info("Request unsuccessful.")
info = result
......
from utils.threading import ThreadedModule
from utils.helpers import setup_required
from utils.threading import ThreadedModule
from app.client import create_response
import config
import datetime
import requests
......@@ -30,7 +31,6 @@ class Station(ThreadedModule):
self.planning_horizon = kwargs[
'planning_horizon'] if 'planning_horizon' in kwargs else 86400
def __str__(self) -> str:
return f"Ground station: {self.name} | lat: {self.latitude} | long: {self.longitutde}"
......@@ -47,6 +47,7 @@ class Station(ThreadedModule):
responses = []
for tracking_request in received_requests:
response = self.respond_to_request(tracking_request)
logger.info(response)
responses.append(response)
def respond_to_request(self, tracking_request):
......@@ -61,14 +62,14 @@ class Station(ThreadedModule):
config.time_format)
busy = self.is_busy(start, end)
response = None
response = create_response(False, [], "No condition matched")
if status == "PENDING" and not busy:
response = self.accept(request_uid)
response = self.accept_request(request_uid)
elif status == "ACCEPTED":
# do nothing
pass
elif status == "REJECTED" and not busy:
response = self.accept(request_uid)
response = self.accept_request(request_uid)
elif status == "SCHEDULED":
# do nothing
pass
......@@ -80,31 +81,76 @@ class Station(ThreadedModule):
pass
else:
# reject
response = self.reject(request_uid)
response = self.reject_request(request_uid)
return response
def create_request_url(self, request_uid):
return f"{self.url}/{request_uid}"
return f"{config.endpoint_requests}/{request_uid}"
@setup_required
def accept_request(self, request_uid):
logger.info(f"accepting request: {request_uid}")
action = {'action': 'accept'}
url = self.create_request_url(request_uid)
response = requests.put(url=url,
auth=(self.owner.username,
self.owner.password),
data=action)
return response
result = None
data = []
status = False
info = ""
try:
result = response.json()
except:
result = response.text
if response.status_code == 200:
logger.info("Accepting overpass successful.")
status = True
info = "Accepting overpass successful."
data = result
elif response.status_code == 404 or response.status_code == 403:
logger.info("Failed to accept tracking request.")
info = result
data = []
else:
logger.info("Failed to accept tracking request.")
info = result
data = []
return create_response(status, data, info)
@setup_required
def reject_request(self, request_uid):
logger.info(f"rejecting request: {request_uid}")
action = {'action': 'reject'}
url = self.create_request_url(request_uid)
response = requests.put(url=url,
auth=(self.owner.username,
self.owner.password),
data=action)
return response
result = None
data = []
status = False
info = ""
try:
result = response.json()
except:
result = response.text
if response.status_code == 200:
status = True
info = "Rejecting tracking request successful."
data = result
elif response.status_code == 404 or response.status_code == 403:
info = result
data = []
else:
info = result
data = []
return create_response(status, data, info)
@setup_required
def mark_tracked(self, request_uid):
......@@ -128,6 +174,7 @@ class Station(ThreadedModule):
@setup_required
def fetch_received_requests(self):
logger.info("fetching received requests...")
tracking_requests = [] # tracking requests for this station
data = {"direction": "received"}
response = requests.get(url=config.endpoint_requests,
......@@ -135,12 +182,12 @@ class Station(ThreadedModule):
self.owner.password),
data=data)
if response.status_code == 200:
received_requests = response['requests']
received_requests = response.json()['requests']
tracking_requests = list(
filter(
lambda req: req['overpass']['station_name'] == self.name,
received_requests))
logger.info(f"fetched {len(received_requests)} received requests.")
return tracking_requests
def set_owner(self, owner):
......@@ -162,6 +209,21 @@ class Station(ThreadedModule):
schedule.append(action)
return schedule
def update_schedule(self, schedule, schedule_type="timgsn"):
existing_schedule = None
settings = None
with open(f'app/models/{self.name}.json', 'r') as f:
settings = json.load(f)
existing_schedule = settings.get('schedule')
if existing_schedule is not None:
existing_schedule[schedule_type].extend(schedule)
else:
settings.update({"schedule": {schedule_type: schedule}})
with open(f'app/models/{self.name}.json', 'w') as f:
json.dump(settings)
self.load_from_json()
def load_from_json(self, filename=""):
data = None
if not filename:
......@@ -170,7 +232,6 @@ class Station(ThreadedModule):
data = json.load(f)
if not data:
raise ValueError("Filename has no Station data.")
logger.debug(data)
self._parse_json_data(data)
return data
......@@ -185,6 +246,9 @@ class Station(ThreadedModule):
p['start'] = datetime.datetime.strptime(p['start'],
config.time_format)
p['end'] = datetime.datetime.strptime(p['end'], config.time_format)
if 'updated_on' in p:
p['updated_on'] = datetime.datetime.strptime(
p['updated_on'], config.time_format)
return str_to_date
list(map(str_to_date, data.get("schedule").get("local")))
......@@ -193,14 +257,14 @@ class Station(ThreadedModule):
def is_busy(self, start, end):
busy = False
overpasses = self.query_overpasses(start, end)
overpasses = self.query_owner_sats_overpasses(start, end)
schedules = self.query_schedules(start, end)
if overpasses or schedules:
return True
busy = True
return busy
@setup_required
def query_overpasses(self, start=None, end=None):
def query_owner_sats_overpasses(self, start=None, end=None):
overpasses = []
if not start:
start = datetime.datetime.utcnow()
......@@ -223,18 +287,20 @@ class Station(ThreadedModule):
@setup_required
def query_schedules(self, start=None, end=None):
schedules = []
data = {
'station_name': self.name,
'start_time': start.strftime(config.time_format),
'end_time': end.strftime(config.time_format)
}
schedules = requests.get(config.endpoint_scheduler,
auth=(self.owner.username,
self.owner.password),
data=data)
schedules.extend(
list(
filter(lambda s: s['start'] >= start and s['end' <= end],
self.schedule.get("local"))))
response = requests.get(config.endpoint_scheduler,
auth=(self.owner.username,
self.owner.password),
data=data)
if response.status_code == 200:
schedules = response.json()['schedules']
schedules.extend(
list(
filter(lambda s: s['start'] >= start and s['end' <= end],
self.schedule.get("local"))))
return schedules
......@@ -34,11 +34,11 @@ class User:
return f"User: {self._username}"
def _tear_down(self):
logger.info(f"{self.name}: Stopping operations...")
logger.info(f"{self.name}: Stopping satellite operations...")
logger.info(f"{self._username}: Stopping operations...")
logger.info(f"{self._username}: Stopping satellite operations...")
for satellite in self.satellites:
satellite.kill()
logger.info(f"{self.name}: Stopping ground station operations...")
logger.info(f"{self._username}: Stopping ground station operations...")
for station in self.stations:
station.kill()
......
......@@ -46,3 +46,22 @@ user_settings = load_user_settings()
# time format
time_format = "%Y-%m-%d %H:%M:%S"
## aws settings
# id
aws_id = ""
# endpoint
aws_endpoint = os.getenv('AWS_ENDPOINT')
# Amazon Root Certificate
aws_root_cert = os.path.join(basedir, 'security', 'AmazonRootCA1.pem')
# Amazon Private Key
aws_private_key = os.path.join(basedir, 'security', 'private.pem.key')
# Amazon Certificate
aws_cert = os.path.join(basedir, 'security', 'certificate.pem.crt')
# aws topics
aws_topics_sub = {"schedule_update_sub": "tim/scheduler/all/schedules/update"}
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient as AWS
import logging
import config
logger = logging.getLogger(__name__)
class AWSMQTT(object):
def __init__(self) -> None:
self._connection = self.connect()
def connect(self):
aws = AWS(config.aws_id)
print(config.aws_private_key)
aws.configureEndpoint(config.aws_endpoint, 8883)
aws.configureCredentials(config.aws_root_cert, config.aws_private_key,
config.aws_cert)
aws.connect()
return aws
def teardown(self):
logger.info("disconnecting from aws client")
self._connection.disconnect()
@property
def connection(self):
return self._connection
......@@ -3,6 +3,8 @@ from app import TIMGSNClient
import logging
import time
import sys
import traceback
def setup_logger():
logger = logging.getLogger('app')
......@@ -17,7 +19,8 @@ def setup_logger():
logger.addHandler(stream_handler)
return logger
if __name__=="__main__":
if __name__ == "__main__":
logger = setup_logger()
client = TIMGSNClient.get_instance()
client.start()
......@@ -25,9 +28,11 @@ if __name__=="__main__":
while True:
logger.info("running main...")
time.sleep(1)
except KeyboardInterrupt as err:
except (KeyboardInterrupt, Exception) as err:
logger.info("exiting app...")
client.stop()
try:
client.stop()
except Exception as err:
logger.debug(traceback.format_exc())
logger.info("exiting system...")
sys.exit(0)
......@@ -6,7 +6,7 @@
"password_env": "TIMGSN_CLIENT_USER1_PASSWORD",
"satellites": [39446, 43880],
"stations": ["jmuw_uhf"],
"curent_project": "test_project"
"current_project": "test_project"
}
]
},
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment