|
|
@@ -5,6 +5,7 @@ import json |
|
|
|
import yaml |
|
|
|
import argparse |
|
|
|
import configparser |
|
|
|
from os.path import abspath, dirname, realpath |
|
|
|
from datetime import datetime, timedelta |
|
|
|
from O365 import (Account, Connection, FileSystemTokenBackend, |
|
|
|
MSOffice365Protocol) |
|
|
@@ -20,31 +21,31 @@ parser.add_argument("-o", "--output", type=str, dest="output_path", default="Sea |
|
|
|
strftime_pattern = "%Y-%m-%dT%H:%M:%S" |
|
|
|
strptime_pattern = "%Y-%m-%dT%H:%M:%S.%f" |
|
|
|
|
|
|
|
# TODO: Input for staff emails |
|
|
|
|
|
|
|
def read_config_file(config_path): |
|
|
|
""" |
|
|
|
Reads config file and sets up variables foo |
|
|
|
|
|
|
|
:return: config as a dict |
|
|
|
""" |
|
|
|
with open(config_path, "r") as fp: |
|
|
|
config = yaml.load(fp) |
|
|
|
config = yaml.load(fp, Loader=yaml.FullLoader) |
|
|
|
return config |
|
|
|
|
|
|
|
|
|
|
|
def parse_args(): |
|
|
|
""" |
|
|
|
Parses arguments from the commandline. |
|
|
|
|
|
|
|
:return: config yaml file as a dict |
|
|
|
""" |
|
|
|
# TODO: Maybe the prints before exit could be a lil better at explaining the error. |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
# Read the file provided and return the required config |
|
|
|
|
|
|
|
if args.config_path: |
|
|
|
# Read the file provided and return the required config |
|
|
|
config = read_config_file(args.config_path) |
|
|
|
config["output_path"] = args.output_path or "./Seating Plan.csv" # Create a default seating plan | I could do this in argparser tbh |
|
|
|
config["config_path"] = args.config_path |
|
|
|
config["output_path"] = args.output_path |
|
|
|
config["users"] = sorted([x.lower() for x in config["users"]]) # make all names lowercase and sort alphabetically |
|
|
|
return config |
|
|
|
|
|
|
@@ -57,12 +58,13 @@ def parse_args(): |
|
|
|
def create_session(credentials, tenant_id): |
|
|
|
""" |
|
|
|
Create a session with the API and save the token for later use. |
|
|
|
|
|
|
|
:param credentials: tuple of (client_id, client_secret) |
|
|
|
:param tentant_id: str of tenant_id |
|
|
|
:return: Account class and email: str |
|
|
|
""" |
|
|
|
my_protocol = MSOffice365Protocol(api_version='v2.0') |
|
|
|
token_backend = FileSystemTokenBackend(token_filename='access_token') |
|
|
|
token_backend = FileSystemTokenBackend(token_filename=f"{dirname(realpath(__file__))}/access_token") # Put access token where the file is so it can always access it. |
|
|
|
return Account( |
|
|
|
credentials, |
|
|
|
protocol=my_protocol, |
|
|
@@ -74,8 +76,7 @@ def create_session(credentials, tenant_id): |
|
|
|
def authenticate_session(session: Account): |
|
|
|
""" |
|
|
|
Authenticates account session object with oauth. Uses the default auth flow that comes with the library |
|
|
|
|
|
|
|
It could be merged with oauth wrapper but this works too. |
|
|
|
|
|
|
|
:param session: Account object |
|
|
|
:return: |
|
|
|
""" |
|
|
@@ -86,25 +87,12 @@ def authenticate_session(session: Account): |
|
|
|
session.con.oauth_request("https://outlook.office.com/api/v2.0/me/calendar", "get") |
|
|
|
|
|
|
|
|
|
|
|
def oauth_request(connection: Connection, url: str): |
|
|
|
""" |
|
|
|
Wrapper for Connection.oauth_request to provide some error handling |
|
|
|
:param connection: |
|
|
|
:param url: |
|
|
|
:return: |
|
|
|
""" |
|
|
|
request = connection.oauth_request(url, "get") |
|
|
|
if request.status_code != 200: |
|
|
|
print(f"Request failed: GET request to {url} failed with {request.status_code} error") |
|
|
|
else: |
|
|
|
return request |
|
|
|
|
|
|
|
|
|
|
|
def get_week_datetime(): |
|
|
|
""" |
|
|
|
Gets the current week's Monday and Friday to be used to filter a calendar. |
|
|
|
|
|
|
|
If this script is ran during the work week (Monday-Friday), it will be the current week. If it is ran on the weekend, it will generate for next week. |
|
|
|
|
|
|
|
:return: Monday and Friday: Datetime object |
|
|
|
""" |
|
|
|
today = datetime.now() |
|
|
@@ -124,6 +112,13 @@ def get_week_datetime(): |
|
|
|
|
|
|
|
|
|
|
|
def get_event_range(beginning_of_week: datetime, connection: Connection, email: str): |
|
|
|
""" |
|
|
|
Makes api call to grab a calender view within a 2 week window either side of the current week. |
|
|
|
|
|
|
|
:param beginning_of_week: datetime object for this weeks monday |
|
|
|
:param connection: a connection to the office365 api |
|
|
|
:return: dict of json response |
|
|
|
""" |
|
|
|
base_url = "https://outlook.office.com/api/v2.0/" |
|
|
|
scope = f"users/{email}/CalendarView" |
|
|
|
|
|
|
@@ -143,33 +138,40 @@ def get_event_range(beginning_of_week: datetime, connection: Connection, email: |
|
|
|
|
|
|
|
|
|
|
|
def add_attendees_to_ooo_list(attendees: list, ooo_list: list): |
|
|
|
""" |
|
|
|
Function to aid the adding of attendees in a list to the list of people who will be out of offce. |
|
|
|
|
|
|
|
:param attendees: list of attendees to event |
|
|
|
:param ooo_list: list of the current days out of office users |
|
|
|
:return: ooo_list once appended |
|
|
|
""" |
|
|
|
for attendee in attendees: |
|
|
|
attendee_name = attendee["EmailAddress"]["Name"].split(" ")[0] # Get first name |
|
|
|
if attendee_name not in ooo_list.copy(): |
|
|
|
ooo_list.append(attendee_name.lower()) |
|
|
|
ooo_list.append(attendee_name.lower()) # Converted to lowercase so program is case insensitive |
|
|
|
return ooo_list |
|
|
|
|
|
|
|
|
|
|
|
def get_ooo_list(email: str, connection: Connection): |
|
|
|
# TODO: Docstrings of new functions |
|
|
|
# Get month |
|
|
|
# check for any events that are longer than 1 day that has a start or end point in the month |
|
|
|
# TODO: Update readme to explain that if an event is longer than a month, it won't be picked up by the script |
|
|
|
# check if those events happen within the week by checking if the day we are checking is in between the two points of the event |
|
|
|
# add this to a list of just events happening in this week that are defo a day long or less. |
|
|
|
""" |
|
|
|
Makes request and parses data into a list of users who will not be in the office |
|
|
|
|
|
|
|
:param email: string of the outofoffice email where the out of office calender is located |
|
|
|
:param connection: a connection to the office365 api |
|
|
|
:return: list of 5 lists representing a 5 day list. Each list contains the lowercase names of who is not in the office. |
|
|
|
""" |
|
|
|
monday, friday = get_week_datetime() |
|
|
|
events = get_event_range(monday, connection, email) |
|
|
|
|
|
|
|
events = events["value"] |
|
|
|
outofoffice = [[], [], [], [], []] |
|
|
|
# Using a list to take advantage of datetime.weekday instead of dealing with trying to figure out what key to use |
|
|
|
|
|
|
|
for event in events: |
|
|
|
# removes last char due to microsofts datetime using 7 sigfigs for microseconds, python uses 6 |
|
|
|
# removes last char due to microsoft's datetime using 7 sigfigs for microseconds, python uses 6 |
|
|
|
start = datetime.strptime(event["Start"]["DateTime"][:-1], strptime_pattern) |
|
|
|
end = datetime.strptime(event["End"]["DateTime"][:-1], strptime_pattern) |
|
|
|
attendees = event["Attendees"] |
|
|
|
# excluding outofoffice account |
|
|
|
# remove outofoffice account by list comprehension |
|
|
|
attendees = [x for x in attendees if x["EmailAddress"]["Address"] != email] |
|
|
|
organizer = event["Organizer"] |
|
|
|
|
|
|
@@ -183,7 +185,6 @@ def get_ooo_list(email: str, connection: Connection): |
|
|
|
# Event is within the week we are looking at, add all attendees + |
|
|
|
weekday = outofoffice[start.weekday()] |
|
|
|
weekday = add_attendees_to_ooo_list(attendees, weekday) |
|
|
|
|
|
|
|
else: |
|
|
|
# Check if long events cover the days of this week |
|
|
|
for x, day_array in enumerate(outofoffice.copy()): |
|
|
@@ -191,12 +192,17 @@ def get_ooo_list(email: str, connection: Connection): |
|
|
|
if start <= current_day <= end: |
|
|
|
# if day is inside of the long event |
|
|
|
outofoffice[x] = add_attendees_to_ooo_list(attendees, day_array) |
|
|
|
|
|
|
|
return outofoffice |
|
|
|
|
|
|
|
|
|
|
|
def create_csv(ooo: list, users: list, output_path: str): |
|
|
|
# maybe input all the names then selectivly remove them based on the events? |
|
|
|
def create_ooo_csv(ooo: list, users: list, output_path: str): |
|
|
|
""" |
|
|
|
Creates a csv of who is in the office and on what day. |
|
|
|
|
|
|
|
:param ooo: a list of lists representing each day of a 5 day week. Each day's list has users who are not in that day |
|
|
|
:param users: a list of names of people in the office |
|
|
|
:param output_path: a str representing the output path of the csv file |
|
|
|
""" |
|
|
|
with open(output_path, 'w', newline='', encoding='utf-8') as fp: |
|
|
|
fieldnames = ("Monday", "Tuesday", "Wednesday", "Thursday", "Friday") |
|
|
|
writer = csv.DictWriter(fp, fieldnames=fieldnames) |
|
|
@@ -204,6 +210,7 @@ def create_csv(ooo: list, users: list, output_path: str): |
|
|
|
for user in users: |
|
|
|
row = {} |
|
|
|
for i, day in enumerate(fieldnames): |
|
|
|
# for each day, check if the user is in that day, if not do not write their name into that day. |
|
|
|
if user not in ooo[i]: |
|
|
|
row[day] = user |
|
|
|
else: |
|
|
@@ -216,11 +223,20 @@ def main(): |
|
|
|
Main function that is ran on start up. Script is ran from here. |
|
|
|
""" |
|
|
|
config = parse_args() |
|
|
|
session = create_session((config["client_id"], config["client_secret"]), config["tenant_id"]) |
|
|
|
|
|
|
|
client_id = config["client_id"] |
|
|
|
client_secret = config["client_secret"] |
|
|
|
tenant_id = config["tenant_id"] |
|
|
|
output_path = config["output_path"] |
|
|
|
users = config["users"] |
|
|
|
email = config["ooo_email"] |
|
|
|
|
|
|
|
session = create_session((client_id, client_secret), tenant_id) |
|
|
|
authenticate_session(session) |
|
|
|
|
|
|
|
ooo = get_ooo_list(config["ooo_email"], session.con) |
|
|
|
create_csv(ooo, config["users"], config["output_path"]) |
|
|
|
ooo = get_ooo_list(email, session.con) |
|
|
|
create_ooo_csv(ooo, users, output_path) |
|
|
|
print("Created {}".format(abspath(output_path))) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |