2025-01-14 19:51:29 +01:00
|
|
|
import json
|
2025-01-15 06:27:14 +01:00
|
|
|
import time
|
2025-01-14 19:51:29 +01:00
|
|
|
from datetime import datetime
|
2025-01-21 21:04:40 +01:00
|
|
|
|
|
|
|
|
import pandas as pd
|
2025-01-14 19:51:29 +01:00
|
|
|
from amadeus import Client, ResponseError
|
2025-01-21 21:04:40 +01:00
|
|
|
import calender_maker
|
|
|
|
|
from inspiration.TingSomSkalTilføjes.calender_maker import CalenderMaker
|
2025-01-14 19:51:29 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# OopCompanion:suppressRename
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AmadeusClient:
|
2025-01-21 21:04:40 +01:00
|
|
|
def __init__(self, prod=bool):
|
2025-01-14 19:51:29 +01:00
|
|
|
"""
|
|
|
|
|
Initialize the Amadeus API client.
|
|
|
|
|
"""
|
2025-01-21 21:04:40 +01:00
|
|
|
if prod:
|
|
|
|
|
client_id = "ABRGQv6U7IWAYxwwmjqAOPUDGvuFMSjw"
|
|
|
|
|
client_secret = "BcwpSKf3FICJIxaw"
|
|
|
|
|
self.client = Client(client_id=client_id, client_secret=client_secret,host="api.amadeus.com")
|
|
|
|
|
print("Getting production Amadeus API client")
|
|
|
|
|
else:
|
|
|
|
|
client_id = "uxDqIh36xPAUvpXnXynwAnH86pGBdIch"
|
|
|
|
|
client_secret = "xTSLooNZpJWantb5"
|
|
|
|
|
self.client = Client( client_id = client_id, client_secret = client_secret,host="test.api.amadeus.com" )
|
|
|
|
|
print( "Getting development Amadeus API client" )
|
2025-01-14 19:51:29 +01:00
|
|
|
|
|
|
|
|
def find_cheapest_flights(self, origin: str, destination: str, departure_date: str, adults: int = 1):
|
|
|
|
|
"""
|
|
|
|
|
Find the cheapest flights from the origin to the destination on a specific date.
|
|
|
|
|
|
|
|
|
|
:param origin: The IATA code for the origin location.
|
|
|
|
|
:param destination: The IATA code for the destination location.
|
|
|
|
|
:param departure_date: The date of departure in YYYY-MM-DD format.
|
|
|
|
|
:param adults: Number of adult passengers.
|
|
|
|
|
:return: Flight offers data.
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
response = self.client.shopping.flight_offers_search.get(
|
2025-01-15 06:27:14 +01:00
|
|
|
originLocationCode=origin,
|
|
|
|
|
destinationLocationCode=destination,
|
|
|
|
|
departureDate=departure_date,
|
|
|
|
|
adults=adults,
|
2025-01-14 19:51:29 +01:00
|
|
|
max=250,
|
2025-01-15 06:27:14 +01:00
|
|
|
nonStop="true",
|
2025-01-14 19:51:29 +01:00
|
|
|
)
|
|
|
|
|
return response.data
|
2025-01-21 21:04:40 +01:00
|
|
|
|
2025-01-14 19:51:29 +01:00
|
|
|
except ResponseError as error:
|
2025-01-15 06:27:14 +01:00
|
|
|
if error.response.status_code == 429:
|
|
|
|
|
retry_after = error.response.headers.get( 'Retry-After', '60' ) # standard fallback til 60 sekunder
|
|
|
|
|
print( f"Rate limit reached. Retry after {retry_after} seconds." )
|
|
|
|
|
time.sleep( int( retry_after ) )
|
2025-01-21 21:04:40 +01:00
|
|
|
print(error)
|
2025-01-14 19:51:29 +01:00
|
|
|
|
|
|
|
|
|
2025-01-15 06:27:14 +01:00
|
|
|
def parse_flight_data(self, flights):
|
|
|
|
|
"""
|
|
|
|
|
Parse the flight data to extract useful information about flights.
|
2025-01-14 19:51:29 +01:00
|
|
|
|
2025-01-15 06:27:14 +01:00
|
|
|
:param flights: Raw flight offers data.
|
|
|
|
|
:return: Parsed flight details.
|
|
|
|
|
"""
|
|
|
|
|
result = []
|
2025-01-14 19:51:29 +01:00
|
|
|
for flight_offer in flights:
|
2025-01-15 06:27:14 +01:00
|
|
|
itineraries = flight_offer.get("itineraries", [])
|
|
|
|
|
price = float(flight_offer.get("price", {}).get("total", 0))
|
|
|
|
|
|
2025-01-14 19:51:29 +01:00
|
|
|
for itinerary in itineraries:
|
2025-01-15 06:27:14 +01:00
|
|
|
segments = itinerary.get("segments", [])
|
2025-01-14 19:51:29 +01:00
|
|
|
for segment in segments:
|
2025-01-15 06:27:14 +01:00
|
|
|
if segment.get("numberOfStops", -1) == 0: # Only non-stop flights
|
|
|
|
|
departure = segment.get("departure", {})
|
|
|
|
|
arrival = segment.get("arrival", {})
|
|
|
|
|
travel_time = (
|
|
|
|
|
datetime.fromisoformat(arrival["at"]) - datetime.fromisoformat(departure["at"])
|
|
|
|
|
).total_seconds() / 3600
|
2025-01-14 19:51:29 +01:00
|
|
|
|
2025-01-15 06:27:14 +01:00
|
|
|
result.append({
|
2025-01-14 19:51:29 +01:00
|
|
|
"departure": {
|
2025-01-15 06:27:14 +01:00
|
|
|
"iataCode": departure.get("iataCode"),
|
|
|
|
|
"at": departure.get("at"),
|
2025-01-14 19:51:29 +01:00
|
|
|
},
|
|
|
|
|
"arrival": {
|
2025-01-15 06:27:14 +01:00
|
|
|
"iataCode": arrival.get("iataCode"),
|
|
|
|
|
"at": arrival.get("at"),
|
|
|
|
|
},
|
|
|
|
|
"travel_time": travel_time,
|
|
|
|
|
"price": price,
|
|
|
|
|
})
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def get_flight_summary(self, departure_date: str, origin: str, destination: str):
|
|
|
|
|
"""
|
|
|
|
|
Get a summary of the flight details for a specific date, origin, and destination.
|
|
|
|
|
|
|
|
|
|
:param departure_date: The date of travel in YYYY-MM-DD format.
|
|
|
|
|
:param origin: The IATA code for the origin location.
|
|
|
|
|
:param destination: The IATA code for the destination location.
|
|
|
|
|
:return: Formatted flight summary string.
|
|
|
|
|
"""
|
|
|
|
|
time.sleep(5) # Avoid rate limiting
|
|
|
|
|
flights = self.find_cheapest_flights(origin, destination, departure_date)
|
|
|
|
|
parsed_flights = self.parse_flight_data(flights)
|
|
|
|
|
with open("data_chunks.txt", "a") as fp:
|
|
|
|
|
json.dump(parsed_flights, fp,indent = 4)
|
|
|
|
|
fp.write(",")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not parsed_flights:
|
|
|
|
|
return f"No flights found for {departure_date}: {origin} -> {destination}"
|
|
|
|
|
|
|
|
|
|
average_price = sum(f["price"] for f in parsed_flights) / len(parsed_flights)
|
|
|
|
|
first_departure = parsed_flights[0]["departure"]["iataCode"]
|
|
|
|
|
last_arrival = parsed_flights[-1]["arrival"]["iataCode"]
|
|
|
|
|
|
2025-01-21 21:04:40 +01:00
|
|
|
#return f"{departure_date}: {first_departure} -> {last_arrival} - {average_price:.2f}"
|
|
|
|
|
return {"departure_date":departure_date, "departure": first_departure, "arrival": last_arrival,"price": average_price}
|
2025-01-15 06:27:14 +01:00
|
|
|
|
|
|
|
|
# Example usage
|
|
|
|
|
if __name__ == "__main__":
|
2025-01-21 21:04:40 +01:00
|
|
|
cd = CalenderMaker()
|
|
|
|
|
travel_days = cd.get_dates(year = 2025, dest_day="Thursday", orgin_day="Tuesday")
|
|
|
|
|
amadeus_client = AmadeusClient( prod = True)
|
|
|
|
|
import csv
|
|
|
|
|
|
|
|
|
|
with open( "summery.txt", 'w' ) as fp, open( 'data_csv.csv', "w" ) as fp2:
|
|
|
|
|
writer = csv.DictWriter( fp2, fieldnames = ['orgin', 'dest', 'date', 'price','note'] )
|
|
|
|
|
writer.writeheader()
|
2025-01-15 06:27:14 +01:00
|
|
|
for days in travel_days:
|
|
|
|
|
origin = days['orgin']
|
|
|
|
|
destination = days['dest']
|
|
|
|
|
departure_date = days['date']
|
|
|
|
|
try:
|
2025-01-21 21:04:40 +01:00
|
|
|
try:
|
|
|
|
|
summary = amadeus_client.get_flight_summary( departure_date, origin, destination )
|
|
|
|
|
|
|
|
|
|
print(summary)
|
|
|
|
|
writer.writerow( {
|
|
|
|
|
"orgin": summary["departure_date"],
|
|
|
|
|
"dest": summary["departure"],
|
|
|
|
|
"date": summary["arrival"],
|
|
|
|
|
"price": summary['price'],
|
|
|
|
|
"note": ""
|
|
|
|
|
} )
|
|
|
|
|
fp.writelines( str(summary) + '\n' )
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print( f"Error fetching flight summary for {origin} -> {destination} on {departure_date}" )
|
|
|
|
|
writer.writerow( {
|
|
|
|
|
"orgin": origin,
|
|
|
|
|
"dest": destination,
|
|
|
|
|
"date": departure_date,
|
|
|
|
|
"price": 214.0,
|
|
|
|
|
"note": "False - Flight not found"
|
|
|
|
|
} )
|
|
|
|
|
pass
|
2025-01-15 06:27:14 +01:00
|
|
|
except ResponseError as error:
|
|
|
|
|
print( f"An error occurred: {error}" )
|
|
|
|
|
|
2025-01-21 21:04:40 +01:00
|
|
|
df = pd.read_csv( 'data_csv.csv' )
|
|
|
|
|
|
|
|
|
|
# Write the raw data to the first sheet
|
|
|
|
|
with pd.ExcelWriter( 'output.xlsx' ) as writer:
|
|
|
|
|
df.to_excel( writer, sheet_name = 'Sheet1', index = False )
|
|
|
|
|
|
|
|
|
|
# Calculate the average monthly data
|
|
|
|
|
df['date'] = pd.to_datetime( df['date'] )
|
|
|
|
|
df['month'] = df['date'].dt.to_period( 'M' )
|
|
|
|
|
avg_monthly_data = df.groupby( 'month' )['price'].mean().reset_index()
|
|
|
|
|
avg_monthly_data.columns = ['Month', 'Average Price']
|
|
|
|
|
|
|
|
|
|
# Write the average monthly data to the second sheet
|
|
|
|
|
avg_monthly_data.to_excel( writer, sheet_name = 'Sheet2', index = False )
|
|
|
|
|
|