Procházet zdrojové kódy

Refactored module to use multiple files.

master
Roxie Gibson před 4 roky
rodič
revize
ad0ee4d521
V databázi nebyl nalezen žádný známý klíč pro tento podpis
4 změnil soubory, kde provedl 287 přidání a 201 odebrání
  1. +29
    -0
      supper/__init__.py
  2. +43
    -201
      supper/__main__.py
  3. +164
    -0
      supper/api.py
  4. +51
    -0
      supper/dates.py

+ 29
- 0
supper/__init__.py Zobrazit soubor

@@ -0,0 +1,29 @@
# Copyright (C) 2019 Campaign Against Arms Trade
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Script to generate a seating plan via calendar events in an organisation's Office365."""

import os
import logging

STRFTIME = "%Y-%m-%dT%H:%M:%S"
STRPTIME = "%Y-%m-%dT%H:%M:%S.%f"
# Put access token where the file is so it can always access it.
ACCESS_TOKEN = f"{os.path.dirname(os.path.realpath(__file__))}/access_token"

LOG = logging.getLogger('supper')
HANDLER = logging.StreamHandler()
LOG.addHandler(HANDLER)

HANDLER.setFormatter(logging.Formatter('%(levelname)s: %(asctime)s - %(message)s'))

+ 43
- 201
supper/__main__.py Zobrazit soubor

@@ -14,38 +14,39 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Entry point for Supper Script"""

import argparse
import csv
import logging
import os
from os.path import abspath, dirname, isfile, realpath
from datetime import datetime, timedelta
from os.path import abspath
from datetime import datetime
from pathlib import Path

import yaml
from O365 import (Account, Connection, FileSystemTokenBackend,
MSOffice365Protocol)
from requests import HTTPError

parser = argparse.ArgumentParser(prog='supper', description="Script to generate a seating plan via Office365 Calendars")
parser.add_argument("-c", "--config", type=str, dest="config_path", default="{}/.config/supper.yaml".format(Path.home()),
help="Path to a config file to read settings from. Default: ~/.config/supper.yaml")
parser.add_argument("-o", "--output", type=str, dest="output_path", default="Seating Plan.csv",
help="Path to save the output csv file to")
parser.add_argument("-d", "--debug", action="store_true", help="Enable debug output")


strftime_pattern = "%Y-%m-%dT%H:%M:%S"
strptime_pattern = "%Y-%m-%dT%H:%M:%S.%f"
access_token = f"{dirname(realpath(__file__))}/access_token"


logger = logging.getLogger('supper')
consoleHandler = logging.StreamHandler()
logger.addHandler(consoleHandler)

formatter = logging.Formatter('%(levelname)s: %(asctime)s - %(message)s')
consoleHandler.setFormatter(formatter)
from . import LOG, HANDLER, dates
from .api import Account

parser = argparse.ArgumentParser(
prog='supper',
description="Script to generate a seating plan via Office365 Calendars"
)
parser.add_argument(
"-c", "--config",
type=str,
dest="config_path",
default="{}/.config/supper.yaml".format(Path.home()),
help="Path to a config file to read settings from. Default: ~/.config/supper.yaml"
)
parser.add_argument(
"-o", "--output",
type=str,
dest="output_path",
default="Seating Plan.csv",
help="Path to save the output csv file to"
)
parser.add_argument("-d", "--debug", action="store_true", help="Enable debug output")


def read_config_file(config_path):
@@ -70,12 +71,12 @@ def format_output_path(output_path):
new_path = output_path.format(datetime.now())
if new_path.split(".")[-1] != "csv":
new_path += ".csv"
logger.info("Output path does NOT have '.csv' file extension. Adding '.csv' to end of output_path.")
logger.debug("Formatted output file successfully as '%s'", new_path)
LOG.info("Output path does NOT have '.csv' file extension. Adding '.csv' to end of output_path.")
LOG.debug("Formatted output file successfully as '%s'", new_path)
return new_path
except (SyntaxError, KeyError):
# This is raised when formatting is incorrectly used in naming the output
logger.error("Invalid formatting pattern given for output_path. Cannot name output_path. Exiting...")
LOG.error("Invalid formatting pattern given for output_path. Cannot name output_path. Exiting...")
exit(1)


@@ -88,11 +89,11 @@ def parse_args():
args = parser.parse_args()

if args.debug:
logger.setLevel(logging.DEBUG)
consoleHandler.setLevel(logging.DEBUG)
LOG.setLevel(logging.DEBUG)
HANDLER.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.WARNING)
consoleHandler.setLevel(logging.WARNING)
LOG.setLevel(logging.WARNING)
HANDLER.setLevel(logging.WARNING)

output_path = format_output_path(args.output_path)

@@ -103,183 +104,24 @@ def parse_args():
config["config_path"] = args.config_path
config["output_path"] = output_path
config["users"] = sorted([x.lower() for x in config["users"]]) # make all names lowercase and sort alphabetically
logger.debug("Loaded config successfully from '%s'", args.config_path)
LOG.debug("Loaded config successfully from '%s'", args.config_path)
return config
except FileNotFoundError:
# Cannot open file
logger.error("Cannot find config file provided (%s). Maybe you mistyped it? Exiting...", args.config_path)
LOG.error("Cannot find config file provided (%s). Maybe you mistyped it? Exiting...", args.config_path)
exit(1)
except (yaml.parser.ParserError, TypeError):
# Cannot parse opened file
# TypeError is sometimes raised if the metadata of the file is correct but the content doesn't parse
logger.error("Cannot parse config file. Make sure the provided config is a YAML file and that is is formatted correctly. Exiting...")
LOG.error("Cannot parse config file. Make sure the provided config is a YAML file and that is is formatted correctly. Exiting...")
exit(1)
else:
# No provided -c argument
logger.error("Cannot login. No config file was provided. Exiting...")
LOG.error("Cannot login. No config file was provided. Exiting...")
exit(1)


def create_session(credentials, tenant_id):
"""
Create a session with the API and save the token for later use.

:param credentials: tuple of (client_id, client_secret)
:param tentant_id: str of tenant_id
:return: Account class and email: str
"""
my_protocol = MSOffice365Protocol(api_version='v2.0')
token_backend = FileSystemTokenBackend(token_filename=access_token) # Put access token where the file is so it can always access it.
return Account(
credentials,
protocol=my_protocol,
tenant_id=tenant_id,
token_backend=token_backend,
raise_http_error=False
)


def authenticate_session(session: Account):
"""
Authenticates account session object with oauth. Uses the default auth flow that comes with the library

:param session: Account object
:return: Bool for if the client is authenticated
"""
if not isfile(f"{dirname(realpath(__file__))}/access_token"):
try:
session.authenticate(scopes=['basic', 'address_book', 'users', 'calendar_shared'])
session.con.oauth_request("https://outlook.office.com/api/v2.0/me/", "get")
logger.debug("Successfully tested new access_token")
return True
except:
os.remove(access_token)
logger.error("Could not authenticate. Please make sure the config has the correct client_id, client_secret, etc. Exiting...")
exit(1)
else:
try:
session.con.oauth_request("https://outlook.office.com/api/v2.0/me/", "get")
logger.debug("Successfully tested current access_token")
return True
except:
os.remove(access_token)
logger.warning("Failed to authenticate with current access_token. Deleting access_token and running authentication again.")
return False


def get_week_datetime():
"""
Gets the current week's Monday and Friday to be used to filter a calendar.

If this script is ran during the work week (Monday-Friday), it will be the current week. If it is ran on the weekend, it will generate for next week.

:return: Monday and Friday: Datetime object
"""
today = datetime.now()
weekday = today.weekday()
extra_time = timedelta(hours=today.hour, minutes=today.minute, seconds=today.second, microseconds=today.microsecond)
monday = today - timedelta(days=weekday) - extra_time # Monday = 0-0, Friday = 4-4
friday = (today + timedelta(days=4 - weekday)) - extra_time + timedelta(hours=23, minutes=59)

# If this script is run during the week
if weekday <= 4: # 0 = Monday, 6 = Sunday
# - the hour and minutes to get start of the day instead of when the
return monday, friday
# If the date the script is ran on is the weekend, do next week instead
monday = monday + timedelta(days=7) # Monday = 0-0, Friday = 4-4
friday = friday + timedelta(days=7) # Fri to Fri 4 + (4 - 4), Tues to Fri = 2 + (4 - 2)
return monday, friday


def get_event_range(beginning_of_week: datetime, connection: Connection, email: str):
"""
Makes api call to grab a calender view within a 2 week window either side of the current week.

:param beginning_of_week: datetime object for this weeks monday
:param connection: a connection to the office365 api
:return: dict of json response
"""
base_url = "https://outlook.office.com/api/v2.0/"
scope = f"users/{email}/CalendarView"

# Create a range of dates for this week so we can catch long events within the search
bottom_range = beginning_of_week - timedelta(days=14)
top_range = beginning_of_week + timedelta(days=21)

# Setup url
date_range = "startDateTime={}&endDateTime={}".format(bottom_range.strftime(strftime_pattern), top_range.strftime(strftime_pattern))
limit = "$top=150"
select = "$select=Subject,Organizer,Start,End,Attendees"

url = f"{base_url}{scope}?{date_range}&{select}&{limit}"
resp = connection.oauth_request(url, "get")
return resp.json()


def add_attendees_to_ooo_list(attendees: list, ooo_list: list):
"""
Function to aid the adding of attendees in a list to the list of people who will be out of office.

:param attendees: list of attendees to event
:param ooo_list: list of the current days out of office users
:return: ooo_list once appended
"""
for attendee in attendees:
attendee_name = attendee["EmailAddress"]["Name"].split(" ")[0] # Get first name
if attendee_name not in ooo_list.copy():
ooo_list.append(attendee_name.lower()) # Converted to lowercase so program is case insensitive
return ooo_list


def get_ooo_list(email: str, connection: Connection):
"""
Makes request and parses data into a list of users who will not be in the office

:param email: string of the outofoffice email where the out of office calender is located
:param connection: a connection to the office365 api
:return: list of 5 lists representing a 5 day list. Each list contains the lowercase names of who is not in the office.
"""
monday, friday = get_week_datetime()
try:
events = get_event_range(monday, connection, email)
logger.debug("Received response for two week range from week beginning with {:%Y-%m-%d} from outofoffice account with email: {}".format(monday, email))
except HTTPError as error:
logger.error("Could not request CalendarView | %s", error.response)
exit(1)

events = events["value"]
outofoffice = [[], [], [], [], []]

for event in events:
# removes last char due to microsoft's datetime using 7 sigfigs for microseconds, python uses 6
start = datetime.strptime(event["Start"]["DateTime"][:-1], strptime_pattern)
end = datetime.strptime(event["End"]["DateTime"][:-1], strptime_pattern)
attendees = event["Attendees"]
# remove outofoffice account by list comprehension
attendees = [x for x in attendees if x["EmailAddress"]["Address"] != email]
organizer = event["Organizer"]

if not attendees and organizer["EmailAddress"]["Address"] != email:
# Sometimes user will be the one who makes the event, not the outofoffice account. Get the organizer.
attendees = [event["Organizer"]]

if (end - start) <= timedelta(days=1):
# Event is for one day only, check if it starts within the week
if monday <= start <= friday:
# Event is within the week we are looking at, add all attendees
weekday = outofoffice[start.weekday()]
if not attendees:
logger.warning("Event '%s' has no attendees. Cannot add to outofoffice list.", event["Subject"])
weekday = add_attendees_to_ooo_list(attendees, weekday)
else:
# Check if long events cover the days of this week
for i, day_array in enumerate(outofoffice.copy()):
current_day = monday + timedelta(days=i)
if start <= current_day <= end:
# if day is inside of the long event
outofoffice[i] = add_attendees_to_ooo_list(attendees, day_array)
logger.debug("Parsed events and successfully created out of office list.")
return outofoffice


def create_ooo_csv(ooo: list, users: list, output_path: str):
@@ -303,7 +145,7 @@ def create_ooo_csv(ooo: list, users: list, output_path: str):
else:
row[day] = ""
writer.writerow(row)
logger.debug("Created csv file.")
LOG.debug("Created csv file.")


def main():
@@ -319,16 +161,16 @@ def main():
users = config["users"]
email = config["ooo_email"]

session = create_session((client_id, client_secret), tenant_id)
session = Account.create_session((client_id, client_secret), tenant_id)
auth = False
while not auth:
auth = authenticate_session(session)
auth = session.authenticate_session()

logger.debug("Session created and authenticated. %s", session)
LOG.debug("Session created and authenticated. %s", session)

ooo = get_ooo_list(email, session.con)
ooo = session.get_ooo_list(email)
create_ooo_csv(ooo, users, output_path)
monday, friday = get_week_datetime()
monday, friday = dates.get_week_datetime()
print("\nCreated CSV seating plan for week {:%a %d/%m/%Y} to {:%a %d/%m/%Y} at {}".format(monday, friday, abspath(output_path)))



+ 164
- 0
supper/api.py Zobrazit soubor

@@ -0,0 +1,164 @@
# Copyright (C) 2019 Campaign Against Arms Trade
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Handles all api interaction with Office365"""

import os
from os.path import dirname, isfile, realpath
from datetime import datetime, timedelta

import O365
from requests import HTTPError

from . import ACCESS_TOKEN, STRFTIME, STRPTIME, LOG, dates


class Account(O365.Account):
"""Wrapper for the O365 Account class to add our api interactions"""
@classmethod
def create_session(cls, credentials, tenant_id):
"""
Create a session with the API and save the token for later use.

:param credentials: tuple of (client_id, client_secret)
:param tentant_id: str of tenant_id
:return: Account class and email: str
"""
my_protocol = O365.MSOffice365Protocol(api_version='v2.0')
token_backend = O365.FileSystemTokenBackend(token_filename=ACCESS_TOKEN)
return cls(
credentials,
protocol=my_protocol,
tenant_id=tenant_id,
token_backend=token_backend,
raise_http_error=False
)

def authenticate_session(self):
"""
Authenticates account session object with oauth.
Uses the default auth flow that comes with the library

:param session: Account object
:return: Bool for if the client is authenticated
"""
if not isfile(f"{dirname(realpath(__file__))}/access_token"):
try:
self.authenticate(scopes=['basic', 'address_book', 'users', 'calendar_shared'])
self.con.oauth_request("https://outlook.office.com/api/v2.0/me/", "get")
LOG.debug("Successfully tested new access_token")
return True
except:
os.remove(ACCESS_TOKEN)
LOG.error("Could not authenticate. Make sure config has the correct client_id, client_secret, etc. Exiting...")
exit(1)
else:
try:
self.con.oauth_request("https://outlook.office.com/api/v2.0/me/", "get")
LOG.debug("Successfully tested current access_token")
return True
except:
os.remove(ACCESS_TOKEN)
LOG.warning("Failed to authenticate with current access_token. Deleting access_token and running authentication again.")
return False

def get_event_range(self, beginning_of_week: datetime, email: str):
"""
Makes api call to grab a calender view within a 2 week window either side of the current week.

:param beginning_of_week: datetime object for this weeks monday
:param connection: a connection to the office365 api
:return: dict of json response
"""
base_url = "https://outlook.office.com/api/v2.0/"
scope = f"users/{email}/CalendarView"

# Create a range of dates for this week so we can catch long events within the search
bottom_range = beginning_of_week - timedelta(days=14)
top_range = beginning_of_week + timedelta(days=21)

# Setup url
date_range = "startDateTime={}&endDateTime={}".format(bottom_range.strftime(STRFTIME), top_range.strftime(STRPTIME))
limit = "$top=150"
select = "$select=Subject,Organizer,Start,End,Attendees"

url = f"{base_url}{scope}?{date_range}&{select}&{limit}"
resp = self.con.oauth_request(url, "get")
return resp.json()

def get_ooo_list(self, email: str):
"""
Makes request and parses data into a list of users who will not be in the office

:param email: string of the outofoffice email where the out of office calender is located
:param connection: a connection to the office365 api
:return: list of 5 lists representing 5 days. Each contains lowercase names of who is not in the office.
"""
monday, friday = dates.get_week_datetime()
try:
events = self.get_event_range(monday, email)
LOG.debug("Received response for two week range from week beginning with {:%Y-%m-%d} from outofoffice account with email: {}".format(monday, email))
except HTTPError as error:
LOG.error("Could not request CalendarView | %s", error.response)
exit(1)

events = events["value"]
outofoffice = [[], [], [], [], []]

for event in events:
# removes last char due to microsoft's datetime using 7 sigfigs for microseconds, python uses 6
start = datetime.strptime(event["Start"]["DateTime"][:-1], STRPTIME)
end = datetime.strptime(event["End"]["DateTime"][:-1], STRPTIME)
attendees = event["Attendees"]
# remove outofoffice account by list comprehension
attendees = [x for x in attendees if x["EmailAddress"]["Address"] != email]
organizer = event["Organizer"]

if not attendees and organizer["EmailAddress"]["Address"] != email:
# Sometimes user will be the one who makes the event, not the outofoffice account. Get the organizer.
attendees = [event["Organizer"]]

if (end - start) <= timedelta(days=1):
# Event is for one day only, check if it starts within the week
if monday <= start <= friday:
# Event is within the week we are looking at, add all attendees
weekday = outofoffice[start.weekday()]
if not attendees:
LOG.warning("Event '%s' has no attendees. Cannot add to outofoffice list.", event["Subject"])
weekday = self.add_attendees_to_ooo_list(attendees, weekday)
else:
# Check if long events cover the days of this week
for i, day_array in enumerate(outofoffice.copy()):
current_day = monday + timedelta(days=i)
if start <= current_day <= end:
# if day is inside of the long event
outofoffice[i] = self.add_attendees_to_ooo_list(attendees, day_array)
LOG.debug("Parsed events and successfully created out of office list.")
return outofoffice

@staticmethod
def add_attendees_to_ooo_list(attendees: list, ooo_list: list):
"""
Adds attendees to ooo_list from api list of attendees

:param attendees: list of attendees to event
:param ooo_list: list of the current days out of office users
:return: ooo_list once appended
"""
for attendee in attendees:
attendee_name = attendee["EmailAddress"]["Name"].split(" ")[0] # Get first name
if attendee_name not in ooo_list.copy():
# Converted to lowercase so program is case insensitive
ooo_list.append(attendee_name.lower())
return ooo_list

+ 51
- 0
supper/dates.py Zobrazit soubor

@@ -0,0 +1,51 @@
# Copyright (C) 2019 Campaign Against Arms Trade
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Collection of functions to deal with datetime manipulation"""

from datetime import datetime, timedelta

def get_week_datetime(start: datetime = None):
"""
Gets the current week's Monday and Friday to be used to filter a calendar.

If this script is ran during the work week (Monday-Friday), it will be the current week.
If it is ran on the weekend, it will generate for next week.

:param start: Specifies the today variable, used for make future weeks if given.
:return: Monday and Friday: Datetime objects
"""
if start:
today = start
else:
today = datetime.now()

weekday = today.weekday()
extra_time = timedelta(
hours=today.hour,
minutes=today.minute,
seconds=today.second,
microseconds=today.microsecond
)
monday = today - timedelta(days=weekday) - extra_time # Monday = 0-0, Friday = 4-4
friday = (today + timedelta(days=4 - weekday)) - extra_time + timedelta(hours=23, minutes=59)

# If this script is run during the week
if weekday <= 4: # 0 = Monday, 6 = Sunday
# - the hour and minutes to get start of the day instead of when the
return monday, friday
# If the date the script is ran on is the weekend, do next week instead
monday = monday + timedelta(days=7) # Monday = 0-0, Friday = 4-4
friday = friday + timedelta(days=7) # Fri to Fri 4 + (4 - 4), Tues to Fri = 2 + (4 - 2)
return monday, friday

Načítá se…
Zrušit
Uložit