Brembo
1
import requests
import time
import sys
import threading
def query_perplexity_sonar(api_key, query, model="sonar"):
"""
Function to send a query to the Perplexity Sonar API.
- api_key: Your API key
- query: The query text (e.g., "What is the weather in Sofia?")
- model: The model (defaults to 'sonar')
Returns: JSON response from the API.
"""
url = "https://api.perplexity.ai/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": [
{"role": "user", "content": query}
]
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"API request failed with status {response.status_code}: {response.text}")
def animated_search_effect(stop_event):
animation = ['|', '/', '-', '\\']
idx = 0
while not stop_event.is_set():
sys.stdout.write(f"\rSearching {animation[idx % len(animation)]}")
sys.stdout.flush()
idx += 1
time.sleep(0.2)
sys.stdout.write("\rSearch completed! \n")
sys.stdout.flush()
def interactive_query_with_animation(api_key):
query = input("Please enter your query for Perplexity Sonar API: ")
stop_event = threading.Event()
thread = threading.Thread(target=animated_search_effect, args=(stop_event,))
thread.start()
try:
response = query_perplexity_sonar(api_key, query)
except Exception as e:
stop_event.set()
thread.join()
print("Error:", e)
return
stop_event.set()
thread.join()
print("Response:")
print(response)
if __name__ == "__main__":
api_key = "API-KEY"
interactive_query_with_animation(api_key)