Common Tasks¶
This page demonstrates how to accomplish common datetime-related tasks using Carbonic.
Working with Current Time¶
Getting Current Time in Different Timezones¶
from carbonic import now
# Current time in UTC (default)
utc_now = now()
print(f"UTC: {utc_now}")
# Current time in specific timezones
ny_now = now("America/New_York")
london_now = now("Europe/London")
tokyo_now = now("Asia/Tokyo")
print(f"New York: {ny_now}")
print(f"London: {london_now}")
print(f"Tokyo: {tokyo_now}")
Converting Between Timezones¶
from carbonic import DateTime
# Create meeting time in UTC (default timezone)
meeting_utc = DateTime(2024, 3, 15, 14, 30)
# Create equivalent times in each participant's local timezone
# (representing the same absolute moment)
meeting_ny = DateTime(2024, 3, 15, 10, 30, tz="America/New_York") # UTC-4 in March
meeting_london = DateTime(2024, 3, 15, 14, 30, tz="Europe/London") # UTC+0 in March
meeting_tokyo = DateTime(2024, 3, 15, 23, 30, tz="Asia/Tokyo") # UTC+9
print("Global Meeting Times:")
print(f"UTC: {meeting_utc.format('H:i')}")
print(f"New York: {meeting_ny.format('H:i')}")
print(f"London: {meeting_london.format('H:i')}")
print(f"Tokyo: {meeting_tokyo.format('H:i')}")
Date Calculations¶
Age Calculation¶
from carbonic import Date, today
def calculate_age(birth_date):
"""Calculate age in years."""
today_date = today()
age_diff = today_date.diff(birth_date)
return int(age_diff.in_days() / 365.25)
def calculate_detailed_age(birth_date):
"""Calculate detailed age with years, months, and days."""
today_date = today()
years = today_date.year - birth_date.year
months = today_date.month - birth_date.month
days = today_date.day - birth_date.day
# Adjust for negative days
if days < 0:
months -= 1
# Get days in previous month
prev_month = today_date.subtract_months(1)
days += prev_month.days_in_month
# Adjust for negative months
if months < 0:
years -= 1
months += 12
return years, months, days
# Example usage
birthday = Date(1990, 5, 15)
age = calculate_age(birthday)
detailed_age = calculate_detailed_age(birthday)
print(f"Age: {age} years")
print(f"Detailed age: {detailed_age[0]} years, {detailed_age[1]} months, {detailed_age[2]} days")
Time Until/Since Event¶
from carbonic import DateTime, Duration, now
def time_until_event(event_datetime):
"""Calculate time remaining until an event."""
current = now()
if event_datetime < current:
duration = current - event_datetime
return f"Event was {humanize_duration(duration)} ago"
else:
duration = event_datetime - current
return f"Event in {humanize_duration(duration)}"
def humanize_duration(duration):
"""Convert duration to human-readable string."""
total_seconds = duration.total_seconds()
if total_seconds < 60:
return f"{int(total_seconds)} seconds"
elif total_seconds < 3600:
minutes = int(total_seconds // 60)
return f"{minutes} minutes"
elif total_seconds < 86400:
hours = int(total_seconds // 3600)
return f"{hours} hours"
else:
days = int(total_seconds // 86400)
return f"{days} days"
# Example usage
new_year = DateTime(2025, 1, 1, 0, 0, tz="UTC")
birthday = DateTime(2024, 5, 15, 0, 0, tz="UTC")
print(time_until_event(new_year))
print(time_until_event(birthday))
Business Logic¶
Business Days and Working Hours¶
from carbonic import DateTime, Date, Duration, today
def is_business_hours(dt, start_hour=9, end_hour=17):
"""Check if datetime falls within business hours."""
if not dt.to_date().is_weekday():
return False
return start_hour <= dt.hour < end_hour
def next_business_datetime(dt, target_hour=9):
"""Get next business day at specific hour."""
# Simplified example: add 1 business day and create new datetime at target hour
next_date = dt.to_date().add_business_days(1)
return DateTime(next_date.year, next_date.month, next_date.day, target_hour, 0, 0, tz=dt.tzinfo.key)
def add_business_hours(dt, hours):
"""Add business hours (simplified example)."""
# Simplified: assume 8 hours per business day
business_days_needed = hours // 8
remaining_hours = hours % 8
# Add business days first
result_date = dt.to_date().add_business_days(business_days_needed)
result_dt = DateTime(result_date.year, result_date.month, result_date.day, dt.hour, 0, 0, tz=dt.tzinfo.key)
# Add remaining hours
return result_dt.add(hours=remaining_hours)
# Example usage
start_work = DateTime(2024, 1, 15, 10, 0, tz="UTC") # Monday 10 AM
print(f"Is business hours: {is_business_hours(start_work)}")
print(f"Next business day: {next_business_datetime(start_work)}")
print(f"After 20 business hours: {add_business_hours(start_work, 20)}")
Deadline Management¶
from carbonic import DateTime, Duration, now
class DeadlineTracker:
def __init__(self, deadline, warning_hours=24):
self.deadline = deadline
self.warning_threshold = Duration(hours=warning_hours)
def status(self):
"""Get current status of the deadline."""
current = now()
if current > self.deadline:
overdue_duration = current - self.deadline
return f"OVERDUE by {self._format_duration(overdue_duration)}"
remaining = self.deadline - current
if remaining <= self.warning_threshold:
return f"WARNING: {self._format_duration(remaining)} remaining"
return f"OK: {self._format_duration(remaining)} remaining"
def _format_duration(self, duration):
"""Format duration for display."""
hours = duration.in_hours()
if hours < 1:
return f"{int(duration.total_minutes())} minutes"
elif hours < 24:
return f"{int(hours)} hours"
else:
days = int(duration.in_days())
remaining_hours = int(hours % 24)
if remaining_hours == 0:
return f"{days} days"
return f"{days} days, {remaining_hours} hours"
# Example usage
project_deadline = DateTime(2024, 1, 20, 17, 0, tz="UTC")
tracker = DeadlineTracker(project_deadline, warning_hours=48)
print(f"Project status: {tracker.status()}")
Scheduling and Recurrence¶
Meeting Scheduler¶
from carbonic import DateTime, Period, Duration
class MeetingScheduler:
def __init__(self, start_date, duration_minutes=60):
self.start_date = start_date
self.duration = Duration(minutes=duration_minutes)
def weekly_meetings(self, count=10):
"""Generate weekly recurring meetings."""
meetings = []
current = self.start_date
for _ in range(count):
end_time = current + self.duration
meetings.append({
'start': current,
'end': end_time,
'title': f"Weekly Meeting - {current.format('F j, Y')}"
})
current = current.add(days=7)
return meetings
def monthly_first_friday(self, months=6):
"""Generate monthly meetings on first Friday."""
meetings = []
current_month = self.start_date.start_of("month")
for _ in range(months):
# Find first Friday of the month (simplified implementation)
# Start from the first day of the month
first_day = current_month
# Find the first Friday (weekday 4 = Friday)
days_to_friday = (4 - first_day._dt.weekday()) % 7
first_friday = first_day.add(days=days_to_friday)
# Create meeting time with original time
meeting_time = DateTime(
first_friday.year, first_friday.month, first_friday.day,
self.start_date.hour, self.start_date.minute, self.start_date.second,
tz=self.start_date.tzinfo.key
)
end_time = meeting_time + self.duration
meetings.append({
'start': meeting_time,
'end': end_time,
'title': f"Monthly Review - {meeting_time.format('F Y')}"
})
current_month = current_month.add(months=1)
return meetings
# Example usage
start_meeting = DateTime(2024, 1, 15, 14, 0, tz="America/New_York")
scheduler = MeetingScheduler(start_meeting, duration_minutes=90)
# Weekly meetings
weekly = scheduler.weekly_meetings(4)
print("Weekly Meetings:")
for meeting in weekly:
print(f" {meeting['title']}: {meeting['start'].format('l, F j - H:i')}")
print()
# Monthly meetings
monthly = scheduler.monthly_first_friday(3)
print("Monthly Meetings:")
for meeting in monthly:
print(f" {meeting['title']}: {meeting['start'].format('l, F j - H:i')}")
Event Countdown¶
from carbonic import DateTime, Duration
import math
def create_countdown(event_datetime, event_name):
"""Create a countdown function for an event."""
def countdown():
current = DateTime.now()
if current >= event_datetime:
return f"{event_name} has started!"
remaining = event_datetime - current
days = int(remaining.in_days())
hours = int(remaining.in_hours() % 24)
minutes = int(remaining.total_minutes() % 60)
seconds = int(remaining.total_seconds() % 60)
parts = []
if days > 0:
parts.append(f"{days} days")
if hours > 0:
parts.append(f"{hours} hours")
if minutes > 0:
parts.append(f"{minutes} minutes")
if seconds > 0 and days == 0: # Only show seconds if less than a day
parts.append(f"{seconds} seconds")
if not parts:
return f"{event_name} starts now!"
return f"{event_name} in: {', '.join(parts)}"
return countdown
# Example usage
launch_date = DateTime(2024, 6, 15, 12, 0, tz="UTC")
product_countdown = create_countdown(launch_date, "Product Launch")
print(product_countdown())
Data Processing¶
Log Analysis¶
from carbonic import DateTime
from typing import List, Dict
def parse_log_entries(log_lines: List[str]) -> List[Dict]:
"""Parse log entries with timestamps."""
entries = []
for line in log_lines:
# Assuming format: "2024-01-15 14:30:45 [INFO] Message"
try:
timestamp_str = line[:19] # First 19 characters
dt = DateTime.from_format(timestamp_str, "Y-m-d H:i:s")
# Extract log level and message
rest = line[20:]
level_end = rest.find(']')
level = rest[1:level_end] if rest.startswith('[') else 'UNKNOWN'
message = rest[level_end + 2:] if level_end != -1 else rest
entries.append({
'timestamp': dt,
'level': level,
'message': message.strip()
})
except Exception as e:
print(f"Failed to parse line: {line}")
continue
return entries
def analyze_logs(entries: List[Dict]) -> Dict:
"""Analyze log entries for patterns."""
if not entries:
return {}
# Sort by timestamp
entries.sort(key=lambda x: x['timestamp'])
start_time = entries[0]['timestamp']
end_time = entries[-1]['timestamp']
duration = end_time - start_time
# Count by level
level_counts = {}
for entry in entries:
level = entry['level']
level_counts[level] = level_counts.get(level, 0) + 1
# Find peak hour
hourly_counts = {}
for entry in entries:
hour = entry['timestamp'].hour
hourly_counts[hour] = hourly_counts.get(hour, 0) + 1
peak_hour = max(hourly_counts.items(), key=lambda x: x[1])
return {
'total_entries': len(entries),
'time_span': duration,
'start_time': start_time,
'end_time': end_time,
'level_counts': level_counts,
'peak_hour': f"{peak_hour[0]:02d}:00 ({peak_hour[1]} entries)",
'entries_per_minute': len(entries) / max(duration.total_minutes(), 1)
}
# Example usage
sample_logs = [
"2024-01-15 14:30:45 [INFO] Application started",
"2024-01-15 14:30:46 [INFO] Database connected",
"2024-01-15 14:31:15 [WARNING] High memory usage",
"2024-01-15 14:31:45 [ERROR] Database timeout",
"2024-01-15 14:32:00 [INFO] Retry successful",
]
parsed_entries = parse_log_entries(sample_logs)
analysis = analyze_logs(parsed_entries)
print("Log Analysis:")
for key, value in analysis.items():
print(f" {key}: {value}")
Time Series Data¶
from carbonic import DateTime, Duration
from typing import List, Tuple
class TimeSeriesData:
def __init__(self):
self.data: List[Tuple[DateTime, float]] = []
def add_point(self, timestamp: DateTime, value: float):
"""Add a data point."""
self.data.append((timestamp, value))
# Keep sorted by timestamp
self.data.sort(key=lambda x: x[0])
def get_range(self, start: DateTime, end: DateTime) -> List[Tuple[DateTime, float]]:
"""Get data points within a time range."""
return [(ts, val) for ts, val in self.data if start <= ts <= end]
def resample_hourly(self) -> List[Tuple[DateTime, float]]:
"""Resample data to hourly averages."""
if not self.data:
return []
hourly_data = {}
for timestamp, value in self.data:
# Round down to the hour
hour_key = DateTime(timestamp.year, timestamp.month, timestamp.day, timestamp.hour, 0, 0, tz=timestamp.tzinfo.key)
if hour_key not in hourly_data:
hourly_data[hour_key] = []
hourly_data[hour_key].append(value)
# Calculate averages
result = []
for hour, values in sorted(hourly_data.items()):
avg_value = sum(values) / len(values)
result.append((hour, avg_value))
return result
def find_peaks(self, threshold: float) -> List[Tuple[DateTime, float]]:
"""Find values above threshold."""
return [(ts, val) for ts, val in self.data if val > threshold]
# Example usage
ts_data = TimeSeriesData()
# Simulate adding temperature readings
base_time = DateTime(2024, 1, 15, 10, 0, tz="UTC")
for i in range(24): # 24 hours of data
timestamp = base_time.add(hours=i)
# Simulate temperature variation
temperature = 20 + 5 * (i % 12) / 12 # Varies between 20-25°C
ts_data.add_point(timestamp, temperature)
# Analysis
hourly_avg = ts_data.resample_hourly()
peaks = ts_data.find_peaks(23.0)
print(f"Total data points: {len(ts_data.data)}")
print(f"Hourly averages: {len(hourly_avg)}")
print(f"High temperature readings (>23°C): {len(peaks)}")
if peaks:
print("Peak temperatures:")
for timestamp, temp in peaks[:3]: # Show first 3
print(f" {timestamp.format('H:i')}: {temp:.1f}°C")
File and System Operations¶
Log Rotation¶
from carbonic import DateTime, Duration
import os
from pathlib import Path
class LogRotator:
def __init__(self, log_dir: str, max_age_days: int = 30):
self.log_dir = Path(log_dir)
self.max_age = Duration(days=max_age_days)
def rotate_logs(self):
"""Remove old log files."""
if not self.log_dir.exists():
return
cutoff_time = DateTime.now() - self.max_age
removed_files = []
for log_file in self.log_dir.glob("*.log"):
# Get file modification time
mtime = log_file.stat().st_mtime
file_datetime = DateTime.from_timestamp(mtime)
if file_datetime < cutoff_time:
log_file.unlink() # Delete file
removed_files.append(str(log_file))
return removed_files
def archive_logs(self, archive_pattern: str = "archive_{Y}-{m}"):
"""Archive logs by month."""
if not self.log_dir.exists():
return
# Group files by month
monthly_groups = {}
for log_file in self.log_dir.glob("*.log"):
mtime = log_file.stat().st_mtime
file_datetime = DateTime.from_timestamp(mtime)
# Create month key
month_key = file_datetime.format("Y-m")
if month_key not in monthly_groups:
monthly_groups[month_key] = []
monthly_groups[month_key].append(log_file)
# Create archives
for month_key, files in monthly_groups.items():
archive_name = archive_pattern.format(
Y=month_key[:4],
m=month_key[5:]
)
archive_dir = self.log_dir / archive_name
archive_dir.mkdir(exist_ok=True)
for file_path in files:
# Move file to archive directory
new_path = archive_dir / file_path.name
file_path.rename(new_path)
# Example usage (mock - doesn't actually create files)
rotator = LogRotator("/var/log/myapp", max_age_days=7)
print("Log rotation would remove files older than 7 days")
Backup Scheduling¶
from carbonic import DateTime, Duration, Period
class BackupScheduler:
def __init__(self):
self.schedules = []
def add_daily_backup(self, hour: int = 2):
"""Schedule daily backup at specific hour."""
tomorrow = DateTime.now().add(days=1)
next_backup = DateTime(tomorrow.year, tomorrow.month, tomorrow.day, hour, 0, 0, tz=tomorrow.tzinfo.key)
self.schedules.append({
'type': 'daily',
'next_run': next_backup,
'description': f"Daily backup at {hour:02d}:00"
})
def add_weekly_backup(self, weekday_name: str, hour: int = 1):
"""Schedule weekly backup on specific weekday."""
now = DateTime.now()
# Simplified: schedule for next week same day
next_backup = now.add(days=7)
next_backup = DateTime(next_backup.year, next_backup.month, next_backup.day, hour, 0, 0, tz=next_backup.tzinfo.key)
self.schedules.append({
'type': 'weekly',
'next_run': next_backup,
'description': f"Weekly backup on {weekday_name} at {hour:02d}:00"
})
def get_next_backup(self):
"""Get the next scheduled backup."""
if not self.schedules:
return None
return min(self.schedules, key=lambda x: x['next_run'])
def update_schedule(self):
"""Update completed backups to next occurrence."""
now = DateTime.now()
for schedule in self.schedules:
if schedule['next_run'] <= now:
if schedule['type'] == 'daily':
schedule['next_run'] = schedule['next_run'].add(days=1)
elif schedule['type'] == 'weekly':
schedule['next_run'] = schedule['next_run'].add(days=7)
# Example usage
scheduler = BackupScheduler()
scheduler.add_daily_backup(hour=2) # 2 AM daily
scheduler.add_weekly_backup("Sunday", hour=1) # 1 AM Sunday
next_backup = scheduler.get_next_backup()
if next_backup:
time_until = next_backup['next_run'] - DateTime.now()
print(f"Next backup: {next_backup['description']}")
print(f"Time until backup: {time_until.in_hours():.1f} hours")
Performance Monitoring¶
Execution Time Measurement¶
from carbonic import DateTime, Duration
from contextlib import contextmanager
from typing import Dict, List
class PerformanceMonitor:
def __init__(self):
self.measurements: Dict[str, List[Duration]] = {}
@contextmanager
def measure(self, operation_name: str):
"""Context manager to measure operation duration."""
start_time = DateTime.now()
try:
yield
finally:
end_time = DateTime.now()
duration = end_time - start_time
if operation_name not in self.measurements:
self.measurements[operation_name] = []
self.measurements[operation_name].append(duration)
def get_stats(self, operation_name: str) -> Dict:
"""Get statistics for an operation."""
if operation_name not in self.measurements:
return {}
durations = self.measurements[operation_name]
total_seconds = [d.total_seconds() for d in durations]
return {
'count': len(durations),
'total_time': sum(total_seconds),
'average_time': sum(total_seconds) / len(total_seconds),
'min_time': min(total_seconds),
'max_time': max(total_seconds),
}
def report(self):
"""Generate performance report."""
print("Performance Report:")
print("-" * 50)
for operation, _ in self.measurements.items():
stats = self.get_stats(operation)
print(f"\n{operation}:")
print(f" Executions: {stats['count']}")
print(f" Average: {stats['average_time']:.3f}s")
print(f" Min: {stats['min_time']:.3f}s")
print(f" Max: {stats['max_time']:.3f}s")
print(f" Total: {stats['total_time']:.3f}s")
# Example usage
monitor = PerformanceMonitor()
# Simulate monitoring different operations
import time
with monitor.measure("database_query"):
time.sleep(0.1) # Simulate database query
with monitor.measure("api_call"):
time.sleep(0.05) # Simulate API call
with monitor.measure("database_query"):
time.sleep(0.12) # Another database query
monitor.report()
These examples demonstrate practical applications of Carbonic for real-world datetime manipulation tasks. Each example is self-contained and can be adapted to your specific needs.