import streamlit as st import sqlite3 import pandas as pd from datetime import datetime, timedelta def fetch_data_from_db(db_path, table_name, start_date=None, end_date=None): conn = sqlite3.connect(db_path) if table_name == "sync_results" and start_date and end_date: query = f"SELECT * FROM {table_name} WHERE date BETWEEN '{start_date}' AND '{end_date}'" else: query = f"SELECT * FROM {table_name}" df = pd.read_sql_query(query, conn) conn.close() return df def insert_user_into_db(db_path, email, password, domain): conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO users (email, password, domain) VALUES (?, ?, ?) ''', (email, password, domain)) conn.commit() conn.close() def delete_user_from_db(db_path, email): conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute(''' DELETE FROM users WHERE email = ? ''', (email,)) cursor.execute(''' DELETE FROM sync_results WHERE email = ? ''', (email,)) conn.commit() conn.close() def main(): st.set_page_config(page_title="Archiv Manager") st.title("IMAPSync Manager") db_path = "/app/db/imapsync_results.db" # Sidebar for navigation with buttons and unique keys st.sidebar.title("Navigation") add_user = st.sidebar.button("Add User", key="add_user_button") view_users = st.sidebar.button("View Users", key="view_users_button") delete_user = st.sidebar.button("Delete User", key="delete_user_button") st.sidebar.markdown( '', unsafe_allow_html=True ) # Use session state to keep track of the selected page if 'page' not in st.session_state: st.session_state.page = 'View Users' if add_user: st.session_state.page = 'Add User' elif view_users: st.session_state.page = 'View Users' elif delete_user: st.session_state.page = 'Delete User' if st.session_state.page == "Add User": st.header("Add a New User") new_email = st.text_input("Email:") new_password = st.text_input("Password:", type="password") new_domain = st.text_input("Domain:") if st.button("Add User", key="submit_add_user"): if new_email and new_password and new_domain: insert_user_into_db(db_path, new_email, new_password, new_domain) st.success(f"User {new_email} added successfully!") else: st.error("Please fill in all fields.") elif st.session_state.page == "View Users": st.header("IMAPSync Users and Email Transfer Results") # Fetch users and email transfer data users_df = fetch_data_from_db(db_path, "users") results_df = fetch_data_from_db(db_path, "sync_results") if users_df.empty: st.write("No user data available in the database.") else: emails = users_df['email'].unique().tolist() emails.insert(0, "All") selected_email = st.selectbox("Filter by Email:", options=emails) # Add date range picker for filtering results by date st.subheader("Filter by Date Range:") start_date = st.date_input("Start Date", value=datetime.now().date()) end_date = st.date_input("End Date + 1 Tag", value=start_date + timedelta(days=1)) if selected_email == "All": filtered_users_df = users_df else: filtered_users_df = users_df[users_df['email'] == selected_email] st.dataframe(filtered_users_df) # Display email transfer results if available if not results_df.empty: # Filter by email and date if selected_email == "All": filtered_results_df = fetch_data_from_db(db_path, "sync_results", start_date=start_date, end_date=end_date) else: filtered_results_df = fetch_data_from_db(db_path, "sync_results", start_date=start_date, end_date=end_date) filtered_results_df = filtered_results_df[filtered_results_df['email'] == selected_email] if not filtered_results_df.empty: # Replace 'auth_failed' values with icons filtered_results_df['auth_failed'] = filtered_results_df['auth_failed'].apply(lambda x: '✅' if x == 0 else '❌') st.write(f"Email Transfer Results for {selected_email}:") st.dataframe(filtered_results_df) else: st.write(f"No transfer data found for {selected_email} in the selected date range.") else: st.write("No email transfer results available.") elif st.session_state.page == "Delete User": st.header("Delete a User") df = fetch_data_from_db(db_path, "users") if df.empty: st.write("No data available in the database.") else: user_to_delete = st.selectbox("Select User to Delete:", df['email'].unique()) if st.button("Delete User", key="confirm_delete_user"): delete_user_from_db(db_path, user_to_delete) st.success(f"User {user_to_delete} deleted successfully!") if __name__ == "__main__": main()