Commit ed448601 authored by Mohd Bilal's avatar Mohd Bilal
Browse files

added concurrency to satellite class

parent 1c28ef35
......@@ -7,10 +7,20 @@ import json
import logging
import config
import itertools
import concurrent.futures
import threading
import traceback
logger = logging.getLogger(__name__)
thread_local = threading.local()
def get_session():
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session
class Satellite(ThreadedModule):
def __init__(self, norad, name="", *args, **kwargs) -> None:
......@@ -32,7 +42,6 @@ class Satellite(ThreadedModule):
logger.info(f"{self.name}: Stopping operation...")
def _loop(self):
responses = []
try:
start = datetime.datetime.utcnow()
end = start + datetime.timedelta(seconds=self.planning_horizon)
......@@ -46,11 +55,17 @@ class Satellite(ThreadedModule):
}] * (len(overpasses) - len(plan)))
# request overpasses
for op, action in zip(overpasses, plan):
response = self.request_overpass(op['uid'],
action['contact_type'])
logger.info(response)
responses.append(response)
overpass_uids = [op['uid'] for op in overpasses]
contact_types = [a['contact_type'] for a in plan]
with concurrent.futures.ThreadPoolExecutor(
max_workers=5) as executor:
executor.map(self.request_overpass, overpass_uids,
contact_types)
# for op, action in zip(overpasses, plan):
# response = self.request_overpass(op['uid'],
# action['contact_type'])
# logger.info(response)
# responses.append(response)
logger.info(f"{self.name}: "
f"requested all overpasses in planning horizon")
logger.info(f"{self.name}: next request phase will start at "
......@@ -156,12 +171,19 @@ class Satellite(ThreadedModule):
'overpass_uid': overpass_uid,
'request_type': request_type
}
response = requests.post(url=config.endpoint_requests,
auth=(self.owner.username,
self.owner.password),
data=data)
return self._request_api_response_handler(response)
session = get_session()
response = None
with session as s:
response = s.post(url=config.endpoint_requests,
auth=(self.owner.username, self.owner.password),
data=data)
# response = requests.post(url=config.endpoint_requests,
# auth=(self.owner.username,
# self.owner.password),
# data=data)
self._request_api_response_handler(response)
return None
@setup_required
def _request_api_response_handler(self, response):
......@@ -184,4 +206,4 @@ class Satellite(ThreadedModule):
else:
logger.info("Request unsuccessful.")
info = result
return create_response(status, data, info)
logger.info(f"TIMGSN: {create_response(status, data, info)}")
......@@ -5,6 +5,7 @@ from utils.threading import ThreadedModule
from app.client import create_response
from app.gnuradio.flowgraphs.gsn_receiver import Receiver
import concurrent.futures
import threading
import config
import datetime
import requests
......@@ -14,6 +15,13 @@ import json
import time
logger = logging.getLogger(__name__)
thread_local = threading.local()
def get_session():
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session
class Station(ThreadedModule):
......@@ -189,9 +197,6 @@ class Station(ThreadedModule):
and schedule['end'] >= datetime.datetime.utcnow():
timgsn = schedule
op.update({"timgsn": timgsn})
logger.info(f"found current overpasses: {op}")
logger.info(f"schedule is: {self.schedule}")
return op
def respond_to_request(self, tracking_request):
......@@ -208,24 +213,25 @@ class Station(ThreadedModule):
satellite = self.owner.get_satellite(overpass['norad'])
busy = self.is_busy(start, end)
response = create_response(False, [], "No condition matched")
# if request is for the satellite of the owner, then always accept
if status == "PENDING" or status == "REJECTED" and satellite:
response = self.accept_request(request_uid)
return response
self.accept_request(request_uid)
return None
if status == "ACCEPTED" and satellite:
return (create_response(True, [tracking_request],
"Request is already accepted."))
logger.info(
create_response(True, [tracking_request],
"Request is already accepted."))
return None
# if some other user's satellite then check if busy
if busy and status == "PENDING" or status == "ACCEPTED" or status == "REJECTED":
response = self.reject_request(request_uid)
self.reject_request(request_uid)
elif not busy and status == "PENDING" or status == "REJECTED":
response = self.accept_request(request_uid)
logger.info(f"Server response for tracking request: {response}")
return response
self.accept_request(request_uid)
else:
logger.info(create_response(False, [], "No condition matched"))
return None
def create_request_url(self, request_uid):
return f"{config.endpoint_requests}/{request_uid}"
......@@ -233,12 +239,19 @@ class Station(ThreadedModule):
@setup_required
def accept_request(self, request_uid):
logger.info(f"accepting request: {request_uid}")
session = get_session()
action = {'action': 'accept'}
url = self.create_request_url(request_uid)
response = requests.put(url=url,
auth=(self.owner.username,
self.owner.password),
data=action)
response = None
with session as s:
response = s.put(url=url,
auth=(self.owner.username, self.owner.password),
data=action)
# response = requests.put(url=url,
# auth=(self.owner.username,
# self.owner.password),
# data=action)
result = None
data = []
status = False
......@@ -262,17 +275,23 @@ class Station(ThreadedModule):
logger.info("Failed to accept tracking request.")
info = result
data = []
return create_response(status, data, info)
logger.info(f"TIMGSN Server: {create_response(status, data, info)}")
@setup_required
def reject_request(self, request_uid):
logger.info(f"rejecting request: {request_uid}")
session = get_session()
response = None
action = {'action': 'reject'}
url = self.create_request_url(request_uid)
response = requests.put(url=url,
auth=(self.owner.username,
self.owner.password),
data=action)
# response = requests.put(url=url,
# auth=(self.owner.username,
# self.owner.password),
# data=action)
with session as s:
response = s.put(url=url,
auth=(self.owner.username, self.owner.password),
data=action)
result = None
data = []
status = False
......@@ -293,26 +312,38 @@ class Station(ThreadedModule):
else:
info = result
data = []
return create_response(status, data, info)
logger.info(f"TIMGSN: {create_response(status, data, info)}")
@setup_required
def mark_tracked(self, request_uid):
session = get_session()
response = None
action = {'action': 'tracked'}
url = self.create_request_url(request_uid)
response = requests.put(url=url,
auth=(self.owner.username,
self.owner.password),
data=action)
return response
# response = requests.put(url=url,
# auth=(self.owner.username,
# self.owner.password),
# data=action)
with session as s:
response = s.put(url=url,
auth=(self.owner.username, self.owner.password),
data=action)
logger.info(f"TIMGSN: {response.json()}")
@setup_required
def mark_failed(self, request_uid):
session = get_session()
response = None
action = {'action': 'failed'}
url = self.create_request_url(request_uid)
response = requests.put(url=url,
auth=(self.owner.username,
self.owner.password),
data=action)
# response = requests.put(url=url,
# auth=(self.owner.username,
# self.owner.password),
# data=action)
with session as s:
response = s.put(url=url,
auth=(self.owner.username, self.owner.password),
data=action)
return response
@setup_required
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment