Code & Queries

Code & Queries: Your Source for SQL, Python, and AI Insights

In der heutigen datengetriebenen Welt ist die Fähigkeit, Daten effizient zu sammeln, zu verarbeiten und zu analysieren, von entscheidender Bedeutung. Eine Datenpipeline ist ein zentrales Konzept, das diesen Prozess automatisiert und optimiert. In diesem Blogbeitrag werden wir eine umfassende Datenpipeline mit Python erstellen, die Daten aus verschiedenen Quellen sammelt, transformiert und in einer Datenbank speichert.

Was ist eine Datenpipeline?

Eine Datenpipeline ist eine Reihe von Prozessen, die Daten von einer Quelle zu einem Ziel transportieren, wobei die Daten auf dem Weg transformiert, bereinigt oder angereichert werden können. Typische Schritte in einer Datenpipeline umfassen:

  1. Datenerfassung: Daten aus verschiedenen Quellen wie APIs, Dateien oder Datenbanken sammeln.
  2. Datenbereinigung: Ungültige oder fehlende Daten entfernen oder korrigieren.
  3. Datentransformation: Daten in ein geeignetes Format für die Analyse oder Speicherung umwandeln.
  4. Datenspeicherung: Die verarbeiteten Daten in einer Datenbank, einem Data Warehouse oder einem anderen Speichersystem ablegen.

Beispiel: Eine Datenpipeline für Wetterdaten

In diesem Beispiel erstellen wir eine Datenpipeline, die Wetterdaten von einer öffentlichen API abruft, die Daten bereinigt und transformiert und sie schließlich in einer SQLite-Datenbank speichert.

Voraussetzungen

Bevor wir beginnen, stellen Sie sicher, dass Sie die folgenden Python-Bibliotheken installiert haben:

pip install requests pandas sqlite3

Schritt 1: Datenerfassung

Zuerst müssen wir die Wetterdaten von einer API abrufen. Wir verwenden die OpenWeatherMap API, die aktuelle Wetterdaten für verschiedene Städte bereitstellt.

import requests
import pandas as pd
import sqlite3

# API-Schlüssel und URL
API_KEY = 'dein_api_schluessel'
BASE_URL = 'http://api.openweathermap.org/data/2.5/weather'

# Funktion zum Abrufen von Wetterdaten
def fetch_weather_data(city):
    params = {
        'q': city,
        'appid': API_KEY,
        'units': 'metric'
    }
    response = requests.get(BASE_URL, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Fehler beim Abrufen der Daten für {city}: {response.status_code}")
        return None

# Liste der Städte, für die wir Wetterdaten abrufen möchten
cities = ['Berlin', 'Munich', 'Hamburg', 'Frankfurt', 'Cologne']

# Wetterdaten für alle Städte abrufen
weather_data = [fetch_weather_data(city) for city in cities]

Schritt 2: Datenbereinigung und -transformation

Die von der API zurückgegebenen Daten sind im JSON-Format. Wir werden diese Daten in ein Pandas DataFrame umwandeln und bereinigen.

# Funktion zur Transformation der Wetterdaten
def transform_weather_data(data):
    transformed_data = []
    for entry in data:
        if entry:
            transformed_data.append({
                'city': entry['name'],
                'temperature': entry['main']['temp'],
                'humidity': entry['main']['humidity'],
                'pressure': entry['main']['pressure'],
                'wind_speed': entry['wind']['speed'],
                'weather_description': entry['weather'][0]['description']
            })
    return pd.DataFrame(transformed_data)

# Wetterdaten transformieren
weather_df = transform_weather_data(weather_data)

# Datenbereinigung: Entfernen von Zeilen mit fehlenden Werten
weather_df.dropna(inplace=True)

Schritt 3: Datenspeicherung

Schließlich speichern wir die transformierten Daten in einer SQLite-Datenbank.

# Verbindung zur SQLite-Datenbank herstellen
conn = sqlite3.connect('weather_data.db')

# DataFrame in die Datenbank schreiben
weather_df.to_sql('weather', conn, if_exists='replace', index=False)

# Verbindung schließen
conn.close()

Vollständiges Skript

Hier ist das vollständige Skript, das alle Schritte kombiniert:

import requests
import pandas as pd
import sqlite3

# API-Schlüssel und URL
API_KEY = 'dein_api_schluessel'
BASE_URL = 'http://api.openweathermap.org/data/2.5/weather'

# Funktion zum Abrufen von Wetterdaten
def fetch_weather_data(city):
    params = {
        'q': city,
        'appid': API_KEY,
        'units': 'metric'
    }
    response = requests.get(BASE_URL, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Fehler beim Abrufen der Daten für {city}: {response.status_code}")
        return None

# Funktion zur Transformation der Wetterdaten
def transform_weather_data(data):
    transformed_data = []
    for entry in data:
        if entry:
            transformed_data.append({
                'city': entry['name'],
                'temperature': entry['main']['temp'],
                'humidity': entry['main']['humidity'],
                'pressure': entry['main']['pressure'],
                'wind_speed': entry['wind']['speed'],
                'weather_description': entry['weather'][0]['description']
            })
    return pd.DataFrame(transformed_data)

# Liste der Städte, für die wir Wetterdaten abrufen möchten
cities = ['Berlin', 'Munich', 'Hamburg', 'Frankfurt', 'Cologne']

# Wetterdaten für alle Städte abrufen
weather_data = [fetch_weather_data(city) for city in cities]

# Wetterdaten transformieren
weather_df = transform_weather_data(weather_data)

# Datenbereinigung: Entfernen von Zeilen mit fehlenden Werten
weather_df.dropna(inplace=True)

# Verbindung zur SQLite-Datenbank herstellen
conn = sqlite3.connect('weather_data.db')

# DataFrame in die Datenbank schreiben
weather_df.to_sql('weather', conn, if_exists='replace', index=False)

# Verbindung schließen
conn.close()

print("Datenpipeline erfolgreich abgeschlossen!")

Fazit

In diesem Blogbeitrag haben wir eine einfache Datenpipeline mit Python erstellt, die Wetterdaten von einer API abruft, diese Daten bereinigt und transformiert und sie schließlich in einer SQLite-Datenbank speichert. Diese Pipeline kann leicht erweitert werden, um zusätzliche Datenquellen, komplexere Transformationen oder andere Speichersysteme zu integrieren.