KAPITEL 6

Entwurfsmuster für Reproduzierbarkeit

Software-Best-Practices wie zum Beispiel Unit-Tests gehen davon aus, dass ausgeführter Code eine deterministische Ausgabe liefert:

def sigmoid(x):

return 1.0 / (1 + np.exp(-x))

class TestSigmoid(unittest.TestCase):

def test_zero(self):

self.assertAlmostEqual(sigmoid(0), 0.5)

def test_neginf(self):

self.assertAlmostEqual(sigmoid(float("-inf")), 0)

def test_inf(self):

self.assertAlmostEqual(sigmoid(float("inf")), 1)

Beim maschinellen Lernen ist eine derartige Reproduzierbarkeit schwierig. Beim Training werden ML-Modelle mit Zufallswerten initialisiert und dann basierend auf den Trainingsdaten angepasst. Wenn Sie aber in dem von scikit-learn implementierten einfachen k-Means-Algorithmus random_state festlegen, liefert der Algorithmus jedes Mal die gleichen Ergebnisse:

def cluster_kmeans(X):

from sklearn import cluster

k_means = cluster.KMeans(n_clusters=10, random_state=10)

labels = k_means.fit(X).labels_[::]

return labels

Abgesehen vom zufälligen Startwert (Seed) müssen viele andere Artefakte festgelegt werden, um Reproduzierbarkeit beim Training sicherzustellen. Darüber hinaus besteht maschinelles Lernen aus verschiedenen Phasen wie zum Beispiel Training, Bereitstellung und Retraining. Oft ist es wichtig, dass manche Dinge auch über diese Phasen hinweg reproduzierbar sind.

Dieses Kapitel stellt Entwurfsmuster vor, die mit verschiedenen Aspekten der Reproduzierbarkeit zu tun haben. Das Entwurfsmuster Transformation erfasst Abhängigkeiten bei der Datenaufbereitung aus der Modell-Trainingspipeline, um sie beim Serving zu reproduzieren. Das Entwurfsmuster Wiederholbare Aufteilung befasst sich mit der Art und Weise, wie die Daten zwischen Trainings-, Validierungs- und Testdatensätzen aufzuteilen sind, damit ein im Training verwendetes Beispiel niemals für die Bewertung oder das Testen herangezogen wird, selbst wenn der Datensatz wächst. Beim Entwurfsmuster Bridged Schema geht es darum, wie sich Reproduzierbarkeit sicherstellen lässt, wenn der Trainingsdatensatz eine Mischung von Daten ist, die verschiedenen Schemas entsprechen. Das Entwurfsmuster Workflow-Pipeline erfasst alle Schritte im ML-Prozess, um sicherzustellen, dass beim Retraining des Modells Teile der Pipeline wiederverwendet werden können. Beim Entwurfsmuster Feature Store dreht sich alles um Reproduzierbarkeit und Wiederverwendbarkeit von Features über verschiedene ML-Jobs hinweg. Das Entwurfsmuster Windowed Inference stellt sicher, dass sich Features, die dynamisch und zeitabhängig berechnet werden, zwischen Training und Serving korrekt wiederholen lassen. Die Versionierung von Daten und Modellen ist eine Voraussetzung für den Umgang mit vielen der in diesem Kapitel beschriebenen Entwurfsmuster.

Entwurfsmuster 21: Transformation

Das Entwurfsmuster Transformation erleichtert es, ein ML-Modell in die Produktion zu überführen, indem Eingaben, Features und Transformationen sorgfältig getrennt bleiben.

Problem

Das Problem ist, dass die Eingaben in ein ML-Modell nicht die Features sind, die das ML-Modell bei seinen Berechnungen verwendet. Zum Beispiel sind die Eingaben in einem Textklassifizierungsmodell die rohen Textdokumente, und die Features sind die numerischen Darstellungen dieses Texts in der Einbettung. Wenn wir ein ML-Modell trainieren, dann trainieren wir es mit Features, die aus den rohen Eingaben extrahiert werden. Wir nehmen hier dieses Modell, das mit BigQuery ML dafür trainiert wurde, die Dauer von Fahrradtouren in London vorherzusagen:

CREATE OR REPLACE MODEL ch09eu.bicycle_model

OPTIONS(input_label_cols=['duration'],

model_type='linear_reg')

AS

SELECT

duration

, start_station_name

, CAST(EXTRACT(dayofweek from start_date) AS STRING)

as dayofweek

, CAST(EXTRACT(hour from start_date) AS STRING)

as hourofday

FROM

`bigquery-public-data.london_bicycles.cycle_hire`

Dieses Modell hat drei Features (start_station_name, dayofweek und hourofday), die aus zwei Eingaben (start_station_name und start_date) berechnet werden, wie Abbildung 6-1 zeigt.

image

Abbildung 6-1: Das Modell umfasst drei Features, die aus zwei Eingaben berechnet werden.

Der obige SQL-Code mischt aber die Eingaben und Features, ohne dabei die ausgeführten Transformationen zu verfolgen. Das macht sich bemerkbar, wenn wir versuchen, mit diesem Modell Vorhersagen zu treffen. Da das Modell auf drei Features trainiert wurde, muss die Vorhersagesignatur wie folgt aussehen:

SELECT * FROM ML.PREDICT(MODEL ch09eu.bicycle_model,(

'Kings Cross' AS start_station_name

, '3' as dayofweek

, '18' as hourofday

))

Zur Inferenzzeit müssen wir wissen, auf welchen Features das Modell trainiert wurde, wie sie interpretiert werden sollen und wie die angewandten Transformationen im Detail aussehen – zum Beispiel, dass wir '3' für den Wochentag (dayofweek) einzugeben haben. Was bedeutet diese '3'? Dienstag oder Mittwoch? Das hängt davon ab, welche Bibliothek das Modell verwendet oder welchen Tag wir als Wochenanfang betrachten!

Die Verzerrungen zwischen Training und Serving, die aus Unterschieden bei diesen Faktoren zwischen Trainings- und Serving-Umgebungen resultieren, ist einer der Hauptgründe, warum es so schwierig ist, ML-Modelle in produktionsreife Versionen zu überführen.

Lösung

Die Lösung besteht darin, die zum Konvertieren der Modelleingaben in Features angewandten Transformationen explizit zu erfassen. In BigQuery ML erledigen Sie dies mit der TRANSFORM-Klausel. Damit stellen Sie sicher, dass die Transformationen automatisch während ML.PREDICT angewendet werden.

Mit der Unterstützung für TRANSFORM sollte das obige Modell wie folgt umgeschrieben werden:

CREATE OR REPLACE MODEL ch09eu.bicycle_model

OPTIONS(input_label_cols=['duration'],

model_type='linear_reg')

TRANSFORM(

SELECT * EXCEPT(start_date)

, CAST(EXTRACT(dayofweek from start_date) AS STRING)

as dayofweek -- feature1

, CAST(EXTRACT(hour from start_date) AS STRING)

as hourofday –- feature2

)

AS

SELECT

duration, start_station_name, start_date -- Eingaben

FROM

`bigquery-public-data.london_bicycles.cycle_hire`

Die Eingaben (in der SELECT-Klausel) haben wir jetzt klar von den Features (in der TRANSFORM-Klausel) getrennt. Die Vorhersage ist jetzt viel einfacher. Wir brauchen lediglich den Stationsnamen und einen Zeitstempel (die Eingaben) an das Modell zu senden:

SELECT * FROM ML.PREDICT(MODEL ch09eu.bicycle_model,(

'Kings Cross' AS start_station_name

, CURRENT_TIMESTAMP() as start_date

))

Das Modell kümmert sich dann darum, die entsprechenden Transformationen durchzuführen, um die erforderlichen Features zu erzeugen. Dazu erfasst es sowohl die Transformationslogik als auch die Artefakte (wie zum Beispiel Skalierungskonstanten, Einbettungskoeffizienten, Nachschlagetabellen usw.), die für die Transformation benötigt werden.

Solange wir genau darauf achten, in der SELECT-Anweisung nur die Roheingaben zu verwenden und die gesamte darauffolgende Verarbeitung der Eingabe in der TRANSFORM-Klausel unterzubringen, wendet BigQuery ML diese Transformationen während der Vorhersage automatisch an.

Kompromisse und Alternativen

Die oben beschriebene Lösung funktioniert, weil BigQuery ML die Transformationslogik und Artefakte für uns verfolgt, sie im Modellgraphen speichert und die Transformationen während der Vorhersage automatisch anwendet.

Wenn wir ein Framework verwenden, das das Entwurfsmuster Transformation von Haus aus nicht unterstützt, sollten wir unsere Modellarchitektur so entwerfen, dass sich die während des Trainings durchgeführten Transformationen während des Servings leicht reproduzieren lassen. Wir erreichen das, indem wir die Transformationen im Modellgraphen speichern oder ein Repository von transformierten Features erstellen (siehe »Entwurfsmuster 26: Feature Store« auf Seite 325).

Transformationen in TensorFlow und Keras

Nehmen wir an, wir trainierten ein ML-Modell mit sechs Eingaben (Breitengrad beim Zusteigen, Längengrad beim Zusteigen, Breitengrad beim Aussteigen, Längengrad beim Aussteigen, Anzahl der Mitfahrer und Zeit des Zusteigens), um den Taxitarif in New York zu schätzen. TensorFlow unterstützt das Konzept der Feature-Spalten, die im Modellgraphen gespeichert werden. Allerdings geht die API konzeptionell davon aus, dass die Roheingaben die gleichen sind wie die Features.

Wenn wir zum Beispiel die Breiten- und Längengrade skalieren wollen (Details hierzu siehe »Einfache Datendarstellungen« auf Seite 40 in Kapitel 2), erstellen wir ein transformiertes Feature, das den euklidischen Abstand darstellt, und extrahieren die Stunde des Tages aus dem Zeitstempel. Den Modellgraphen (siehe Abbildung 6-2) müssen wir sorgfältig entwerfen und dabei das Transformation-Konzept im Hinterkopf behalten. Beachten Sie im unten erläuterten Code, wie wir drei separate Schichten in unserem Keras-Modell entwerfen – die Eingabeschicht, die Transformationsschicht und eine DenseFeatures-Schicht.

image

Abbildung 6-2: Der Modellgraph für das Problem der Taxitarifschätzung in Keras

Machen Sie zunächst jede Eingabe in das Keras-Modell zu einer Input-Schicht (den vollständigen Code finden Sie auf GitHub in einem Notebook unter https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/quests/serverlessml/06_feateng_keras/solution/taxifare_fc.ipynb):

inputs = {

colname : tf.keras.layers.Input(

name=colname, shape=(), dtype='float32')

for colname in ['pickup_longitude', 'pickup_latitude',

'dropoff_longitude', 'dropoff_latitude']

}

In Abbildung 6-2 sind die Kästen mit dropoff_latitude, dropoff_longitude usw. gekennzeichnet.

Als Zweites verwalten Sie ein Dictionary der transformierten Features und machen jede Transformation entweder zu einer Keras-Vorverarbeitungsschicht oder zu einer Lambda-Schicht. Hier skalieren wir die Eingaben mithilfe von Lambda-Schichten:

transformed = {}

for lon_col in ['pickup_longitude', 'dropoff_longitude']:

transformed[lon_col] = tf.keras.layers.Lambda(

lambda x: (x+78)/8.0,

name='scale_{}'.format(lon_col)

)(inputs[lon_col])

for lat_col in ['pickup_latitude', 'dropoff_latitude']:

transformed[lat_col] = tf.keras.layers.Lambda(

lambda x: (x-37)/8.0,

name='scale_{}'.format(lat_col)

)(inputs[lat_col])

In Abbildung 6-2 sind dies die Kästen, die mit scale_dropoff_latitude, scale_dropoff_longitude usw. gekennzeichnet sind.

Außerdem richten wir eine Lambda-Schicht für den euklidischen Abstand ein, der aus vier der Input-Schichten berechnet wird (siehe Abbildung 6-2):

def euclidean(params):

lon1, lat1, lon2, lat2 = params

londiff = lon2 - lon1

latdiff = lat2 - lat1

return tf.sqrt(londiff*londiff + latdiff*latdiff)

transformed['euclidean'] = tf.keras.layers.Lambda(euclidean, name='euclidean')([

inputs['pickup_longitude'],

inputs['pickup_latitude'],

inputs['dropoff_longitude'],

inputs['dropoff_latitude']

])

Ebenso ist die Spalte, in der die Stunde des Tages aus dem Zeitstempel gespeichert wird, eine Lambda-Schicht:

transformed['hourofday'] = tf.keras.layers.Lambda(

lambda x: tf.strings.to_number(tf.strings.substr(x, 11, 2),

out_type=tf.dtypes.int32),

name='hourofday'

)(inputs['pickup_datetime'])

Drittens werden alle diese transformierten Schichten zu einer DenseFeatures-Schicht verkettet:

dnn_inputs = tf.keras.layers.DenseFeatures(feature_columns.values())(transformed)

Da der Konstruktor für DenseFeatures einen Satz von Feature-Spalten benötigt, müssen wir angeben, wie die einzelnen transformierten Werte zu übernehmen sind, und sie in eine Eingabe für das neuronale Netz konvertieren. Wir können sie so verwenden, wie sie sind, 1-aus-n-codieren oder die Zahlen partitionieren. Der Einfachheit halber verwenden wir sie einfach alle so, wie sie sind:

feature_columns = {

colname: tf.feature_column.numeric_column(colname)

for colname in ['pickup_longitude', 'pickup_latitude',

'dropoff_longitude', 'dropoff_latitude']

}

feature_columns['euclidean'] = \

tf.feature_column.numeric_column('euclidean')

Nachdem wir eine DenseFeatures-Eingabeschicht haben, können wir den Rest des Keras-Modells wie gewohnt aufbauen:

h1 = tf.keras.layers.Dense(32, activation='relu', name='h1')(dnn_inputs)

h2 = tf.keras.layers.Dense(8, activation='relu', name='h2')(h1)

output = tf.keras.layers.Dense(1, name='fare')(h2)

model = tf.keras.models.Model(inputs, output)

model.compile(optimizer='adam', loss='mse', metrics=['mse'])

Das vollständige Beispiel finden Sie auf GitHub unter https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/quests/serverlessml/06_feateng_keras/solution/taxifare_fc.ipynb.

Die erste Schicht des Keras-Modells ist als inputs-Schicht eingerichtet. Die zweite Schicht ist die Transform-Schicht. Die DenseFeatures-Schicht als dritte Schicht kombiniert die ersten beiden Schichten. Nach dieser Folge von Schichten beginnt die übliche Modellarchitektur. Da die transform-Schicht Teil des Modellgraphen ist, funktionieren die üblichen Lösungen mit Serving-Funktion und Batch-Serving (siehe Kapitel 5) wie gehabt.

Effiziente Transformationen mit tf.transform

Nachteilig beim obigen Ansatz ist, dass die Transformationen bei jeder Iteration des Trainings durchgeführt werden. Das ist nicht tragisch, wenn wir lediglich mit bekannten Konstanten skalieren. Doch wie sieht es aus, wenn unsere Transformationen rechenintensiver sind? Was ist, wenn wir nach dem Mittelwert und der Varianz skalieren wollen, wobei wir zuerst sämtliche Daten durchlaufen müssen, um diese Variablen zu berechnen?

image

Es ist hilfreich, zu unterscheiden zwischen Transformationen auf Instanzebene, die direkt Teil des Modells sein können (wobei der einzige Nachteil darin besteht, sie bei jeder Trainingsiteration anzuwenden), und Transformationen auf Datensatzebene, bei denen wir einen vollständigen Durchlauf benötigen, um die Gesamtstatistiken oder das Vokabular einer kategorialen Variablen zu berechnen. Solche Transformationen auf Datensatzebene können nicht Teil des Modells sein und müssen als skalierbarer Vorverarbeitungsschritt angewendet werden, der die Transformation erzeugt und dabei die Logik und die Artefakte (Mittelwert, Varianz, Vokabular usw.) erfasst, die dem Modell zugeordnet werden. Für Transformationen auf Datensatzebene verwenden Sie tf.transform.

Die Bibliothek tf.transform (die Teil von TensorFlow Extended (https://oreil.ly/OznI3) ist) bietet eine effiziente Möglichkeit, Transformationen über einen Vorverarbeitungsdurchlauf durch die Daten durchzuführen und die resultierenden Features und Transformationsartefakte zu speichern, sodass sich die Transformationen durch TensorFlow Serving während der Vorhersagezeit anwenden lassen.

Der erste Schritt besteht darin, die Transformationsfunktion zu definieren. Um zum Beispiel alle Eingaben so zu skalieren, dass sich der Mittelwert null und die Einheitsvarianz ergeben, würden wir die folgende Vorverarbeitungsfunktion erstellen (den vollständigen Code finden Sie auf GitHub unter https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py):

def preprocessing_fn(inputs):

outputs = {}

for key in ...:

outputs[key + '_z'] = tft.scale_to_z_score(inputs[key])

outputs[key + '_bkt'] = tft.bucketize(inputs[key], 5)

return outputs

Vor dem Training werden die Rohdaten in Apache Beam gelesen und mit der obigen Funktion transformiert:

transformed_dataset, transform_fn = (raw_dataset |

beam_impl.AnalyzeAndTransformDataset(preprocessing_fn))

transformed_data, transformed_metadata = transformed_dataset

Die transformierten Daten werden dann in einem Format ausgegeben, das für das Lesen durch die Trainingspipeline geeignet ist:

transformed_data | tfrecordio.WriteToTFRecord(

PATH_TO_TFT_ARTIFACTS,

coder=example_proto_coder.ExampleProtoCoder(

transformed_metadata.schema))

Die Beam-Pipeline speichert auch die Vorverarbeitungsfunktion, die ausgeführt werden muss, zusammen mit allen Artefakten, die die Funktion benötigt, in einem Artefakt im TensorFlow-Graph-Format. Zum Beispiel würde dieses Artefakt im obigen Fall den Mittelwert und die Varianz für die Skalierung der Zahlen und die Bucket-Grenzen für die Partitionierung der Zahlen enthalten. Die Trainingsfunktion liest transformierte Daten, und demzufolge müssen die Transformationen innerhalb der Trainingsschleife nicht wiederholt werden.

Die Serving-Funktion muss diese Artefakte laden und eine Transformationsschicht erstellen:

tf_transform_output = tft.TFTransformOutput(PATH_TO_TFT_ARTIFACTS)

tf_transform_layer = tf_transform_output.transform_features_layer()

Dann kann die Serving-Funktion die Transform-Schicht auf die geparsten Eingabe-Features anwenden und das Modell mit den transformierten Daten aufrufen, um die Modellausgabe zu berechnen:

@tf.function

def serve_tf_examples_fn(serialized_tf_examples):

feature_spec = tf_transform_output.raw_feature_spec()

feature_spec.pop(_LABEL_KEY)

parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)

transformed_features = tf_transform_layer(parsed_features)

return model(transformed_features)

Auf diese Weise stellen wir sicher, dass die Transformationen für das Serving in den Modellgraphen eingefügt werden. Da das Modelltraining auf den transformierten Daten stattfindet, muss unsere Trainingsschleife diese Transformationen nicht während jeder Epoche durchführen.

Text- und Bildtransformationen

In Textmodellen ist es üblich, den Eingabetext aufzubereiten (unter anderem Satzzeichen, Stoppwörter, Großschreibung, Stemming usw. entfernen) und erst dann den bereinigten Text dem Modell als Feature bereitzustellen. Zum Feature Engineering für Texteingaben gehören auch die Tokenisierung und das Abgleichen mit regulären Ausdrücken. Wichtig ist, die gleichen Bereinigungs- oder Extraktionsschritte auch zur Inferenzzeit auszuführen.

Die Notwendigkeit, Transformationen zu erfassen, ist selbst dann wichtig, wenn es kein explizites Feature Engineering gibt, etwas beim Deep Learning mit Bildern. Bildmodelle besitzen in der Regel eine Eingabeschicht, die für Bilder einer bestimmten Größe ausgelegt ist. Bilder mit abweichenden Größen muss man beschneiden, auffüllen oder auf die feststehende Eingabegröße bringen, bevor sie in das Modell eingespeist werden. Andere gängige Transformationen in Bildmodellen sind Farbmanipulationen (Gammakorrektur, Graustufenumwandlung usw.) und Lagekorrekturen. Entscheidend ist, dass derartige Transformationen sowohl auf dem Trainingsdatensatz als auch während der Inferenzzeit identisch durchgeführt werden. Das Entwurfsmuster Transformation hilft, diese Reproduzierbarkeit sicherzustellen.

Bei Bildmodellen wendet man einige Transformationen (wie zum Beispiel Datenerweiterung durch zufälliges Cropping und Zoomen) nur während des Trainings an. Diese Transformationen müssen während der Inferenz nicht erfasst werden, und sie sind auch nicht Teil des Entwurfsmusters Transformation.

Alternative Musteransätze

Ein alternativer Ansatz, der das Problem der Verzerrung zwischen Training und Serving lösen soll, ist das Muster Feature Store. Es besteht aus einem koordinierten Berechnungsmodul und Repository von transformierten Feature-Daten. Das Berechnungsmodul unterstützt Zugriffe mit niedriger Latenz für die Inferenz und die Batch-Erstellung von transformierten Features, während das Daten-Repository schnellen Zugriff auf transformierte Features für das Modelltraining bietet. Ein Feature-Speicher hat den Vorteil, dass die Transformationsoperationen nicht in den Modellgraphen passen müssen. Wenn zum Beispiel der Feature-Speicher Java unterstützt, könnten Sie die Vorverarbeitungsoperationen in Java durchführen, während das Modell selbst in PyTorch geschrieben sein könnte. Der Nachteil eines Feature Store besteht darin, dass er das Modell vom Feature Store abhängig und die Serving-Infrastruktur wesentlich komplexer macht.

Die Programmiersprache und das für die Transformation der Features verwendete Framework kann man auch von der Sprache, mit der das Modell geschrieben wird, trennen, indem man die Vorverarbeitung in Containern durchführt und diese benutzerdefinierten Container sowohl als Teil des Trainings als auch des Servings verwendet. Auf dieses Verfahren, das auch in der Praxis von Kubeflow Serving übernommen wird, geht der Abschnitt »Entwurfsmuster 25: Workflow-Pipeline« auf Seite 312 ein.

Entwurfsmuster 22: Wiederholbare Aufteilung

Um wiederholbares und reproduzierbares Sampling sicherzustellen, ist es notwendig, eine gut verteilte Spalte und eine deterministische Hashfunktion zu verwenden, um die verfügbaren Daten in Trainings-, Validierungs- und Testdatensätze aufzuteilen.

Problem

Viele Tutorials zum maschinellen Lernen schlagen vor, mit Code ähnlich dem folgenden die Daten zufällig auf Trainings-, Validierungs- und Testdatensätze aufzuteilen:

df = pd.DataFrame(...)

rnd = np.random.rand(len(df))

train = df[ rnd < 0.8 ]

valid = df[ rnd >= 0.8 & rnd < 0.9 ]

test = df[ rnd >= 0.9 ]

Leider versagt dieser Ansatz in vielen realen Situationen, denn die Zeilen sind nur selten unabhängig. Wenn wir zum Beispiel ein Modell trainieren, um Flugverspätungen vorherzusagen, werden die Ankunftsverspätungen der Flüge am gleichen Tag stark miteinander korreliert sein. Dies führt zu einem Informationsverlust zwischen dem Trainings- und dem Testdatensatz, wenn einige Flüge an einem bestimmten Tag im Trainingsdatensatz und einige andere Flüge am selben Tag im Testdatensatz vorkommen. Dieser Verlust infolge korrelierter Zeilen ist ein häufig auftretendes Problem, das wir beim maschinellen Lernen vermeiden müssen.

Außerdem bewirkt jede Ausführung der Funktion rand(), dass die Daten neu sortiert werden. Wenn wir also das Programm erneut ausführen, erhalten wir zu 80 % andere Zeilen. Das kann zu Problemen führen, wenn wir mit verschiedenen ML-Modellen mit dem Ziel experimentieren, das beste Modell auszuwählen – wir müssen die Modellperformance auf demselben Testdatensatz vergleichen. Um dem zu begegnen, müssen wir den Startwert für die Zufallszahlenerzeugung im Voraus festlegen oder die Daten nach ihrer Aufteilung speichern. Es empfiehlt sich auch nicht, die Aufteilung der Daten fest zu codieren, denn beim Einsatz von Techniken wie Bootstrapping, Kreuzvalidierung und Hyperparameter-Optimierung müssen wir diese Datenteilung ändern, und zwar in einer Weise, die es uns erlaubt, individuelle Versuche durchzuführen.

Für das maschinelle Lernen brauchen wir ein leichtgewichtiges, wiederholbares Teilen der Daten, das unabhängig von der Programmiersprache oder den zufälligen Startwerten funktioniert. Außerdem wollen wir sicherstellen, dass korrelierte Zeilen in denselben Teilbereich fallen. Zum Beispiel wollen wir keine Flüge vom 2. Januar 2019 im Testdatensatz haben, wenn Flüge an diesem Tag im Trainingsdatensatz vorhanden sind.

Lösung

Zunächst identifizieren wir eine Spalte, die die Korrelationsbeziehung zwischen den Zeilen erfasst. In unserem Datensatz mit den Verspätungen von Fluggesellschaften ist dies die Spalte date. Dann verwenden wir die letzten Stellen einer Hashfunktion für diese Spalte, um die Daten aufzuteilen. Für das Problem der Flugverspätungen können wir mit dem Hashing-Algorithmus Farm-Fingerprint auf der Spalte date die verfügbaren Daten in Trainings-, Validierungs- und Testdatensätze aufteilen.

image

Weitere Informationen zum Farm-Fingerprint-Algorithmus, die Unterstützung anderer Frameworks und Sprachen sowie die Beziehung zwischen Hashing und Kryptografie finden Sie unter »Entwurfsmuster 1: Hashed Feature« auf Seite 50 in Kapitel 2. Insbesondere sind Open-Source-Wrapper des Farm-Hash-Algorithmus (https://github.com/google/farmhash) in einer Reihe von Sprachen (einschließlich Python [https://oreil.ly/526Dc]) verfügbar, sodass sich dieses Muster sogar dann anwenden lässt, wenn sich die Daten nicht in einem Data Warehouse befinden, das einen wiederholbaren und gebrauchsfertigen Hash unterstützt.

Mit der folgenden Anweisung teilen Sie den Datensatz basierend auf dem Hashwert der Spalte date auf:

SELECT

airline,

departure_airport,

departure_schedule,

arrival_airport,

arrival_delay

FROM

`bigquery-samples`.airline_ontime_data.flights

WHERE

ABS(MOD(FARM_FINGERPRINT(date), 10)) < 8 -- 80% for TRAIN

Um bezüglich der Spalte date zu teilen, berechnen wir ihren Hashwert mit der Funktion FARM_FINGERPRINT und bilden dann mit der Modulo-Funktion eine beliebige Untermenge, die 80 % der Zeilen enthält. Dies ist jetzt wiederholbar – da die Funktion FARM_FINGERPRINT bei jedem Aufruf mit einem bestimmten Datum den gleichen Wert zurückgibt, können wir sicher sein, dass wir jedes Mal die gleichen 80 % der Daten erhalten. Im Ergebnis werden alle Flüge an einem bestimmten Datum zum selben Teilbereich gehören – Training, Validierung oder Test. Dies ist unabhängig vom Zufallsstartwert wiederholbar. Möchten wir unsere Daten nach arrival_airport aufteilen (sodass 80 % der Flughäfen im Trainingsdatensatz vorkommen, weil wir vielleicht versuchen, etwas über Flughafeneinrichtungen vorherzusagen), würden wir den Hashwert auf arrival_airport statt auf date berechnen.

Die Validierungsdaten lassen sich ebenfalls ganz einfach erhalten: Ändern Sie den Ausdruck < 8 in der obigen Abfrage in =8 und für die Testdaten in =9. Somit erhalten wir 10 % der Stichproben für die Validierung und 10 % für die Tests.

Was gilt es bei der Auswahl der Spalte, nach der aufgeteilt werden soll, zu beachten? Die Spalte date muss mehrere Eigenschaften haben, damit wir sie als Aufteilungsspalte verwenden können:

image

Wir können die Überprüfung, ob die Label-Verteilungen in den drei Datensätzen ähnlich sind, mit dem Kolmogorow-Smirnow-Test automatisieren: Stellen Sie einfach die kumulativen Verteilungsfunktionen der Labels in den drei Datensätzen dar und ermitteln Sie den maximalen Abstand zwischen jedem Paar. Je kleiner der maximale Abstand, desto besser ist die Aufteilung.

Kompromisse und Alternativen

Dieser Abschnitt beschreibt nun einige Varianten dazu, wie sich wiederholtes Teilen durchführen lässt, und erörtert die Vor- und Nachteile der einzelnen Varianten. Außerdem untersuchen wir, wie sich diese Idee erweitern lässt, um wiederholbare Stichproben zu entnehmen und nicht nur Aufteilungen vorzunehmen.

Einzelne Abfrage

Wir brauchen keine drei separaten Abfragen, um Trainings-, Validierungs- und Testteilungen zu generieren. Es ist auch möglich, dies in einer einzigen Abfrage zu tun:

CREATE OR REPLACE TABLE mydataset.mytable AS

SELECT

airline,

departure_airport,

departure_schedule,

arrival_airport,

arrival_delay,

CASE(ABS(MOD(FARM_FINGERPRINT(date), 10)))

WHEN 9 THEN 'test'

WHEN 8 THEN 'validation'

ELSE 'training' END AS split_col

FROM

`bigquery-samples`.airline_ontime_data.flights

Wir können dann anhand der Spalte split_col entscheiden, in welchen der drei Datensätze eine bestimmte Zeile fällt. Eine einzelne Abfrage verringert zwar die Rechenzeit, erfordert aber, dass eine neue Tabelle erzeugt oder die Quelltabelle modifiziert wird, um die zusätzliche Spalte split_col hinzuzufügen.

Zufällige Aufteilung

Wie sieht es aus, wenn die Zeilen nicht korreliert sind? In diesem Fall streben wir eine zufällige wiederholbare Aufteilung an, verfügen aber nicht über eine Spalte, die sich als Aufteilungskriterium anbietet. Wir können einen Hashwert für die gesamte Datenzeile bilden, indem wir sie in einen String konvertieren und die Hashfunktion mit diesem String aufrufen:

SELECT

airline,

departure_airport,

departure_schedule,

arrival_airport,

arrival_delay

FROM

`bigquery-samples`.airline_ontime_data.flights f

WHERE

ABS(MOD(FARM_FINGERPRINT(TO_JSON_STRING(f), 10)) < 8

Wenn doppelte Zeilen vorkommen, landen sie immer im selben Teilungsbereich. Ist das so gewollt, gibt es nichts weiter zu tun. Andernfalls müssen wir eine eindeutige ID-Spalte in die SELECT-Abfrage einbauen.

Aufteilen nach mehreren Spalten

Bisher haben wir die Korrelation zwischen Zeilen in einer einzelnen Spalte erfasst. Was ist, wenn eine Kombination von Spalten die Korrelation von zwei Zeilen erfassen soll? In derartigen Fällen verketten Sie einfach die Felder (was ein Feature Cross ist), bevor Sie den Hashwert berechnen. Nehmen wir zum Beispiel an, wir möchten nur sicherstellen, dass Flüge vom selben Flughafen am selben Tag nicht in verschiedenen Teilungsbereichen auftauchen. Das erreichen Sie mit einer Anweisung wie der folgenden:

SELECT

airline,

departure_airport,

departure_schedule,

arrival_airport,

arrival_delay

FROM

`bigquery-samples`.airline_ontime_data.flights

WHERE

ABS(MOD(FARM_FINGERPRINT(CONCAT(date, arrival_airport)), 10)) < 8

Wenn wir auf einem Feature Cross aus mehreren Spalten aufteilen, können wir arrival_airport als eine der Eingaben in das Modell verwenden, da es sowohl in den Trainings- als auch in den Testsätzen Beispiele für jeden bestimmten Flughafen geben wird. Hätten wir andererseits nur bezüglich arrival_airport geteilt, werden die Trainings- und Testsätze einen sich gegenseitig ausschließenden Satz von Ankunftsflughäfen enthalten, und daher kann arrival_airport keine Eingabe des Modells sein.

Wiederholbares Sampling

Die grundlegende Lösung ist gut, wenn wir 80 % des gesamten Datensatzes für das Training benötigen. Wie sieht es aber aus, wenn wir mit einem kleineren Datensatz als dem in BigQuery vorhandenen experimentieren möchten? Dies ist bei lokaler Entwicklung üblich. Der Datensatz für Flüge umfasst 70 Millionen Zeilen, vielleicht wollen wir aber einen kleineren Datensatz von einer Million Flügen haben. Wie würden wir einen von 70 Flügen herausgreifen und dann 80 % davon als Trainingsdaten nehmen?

Was wir nicht tun können, ist etwas in der folgenden Art:

SELECT

date,

airline,

departure_airport,

departure_schedule,

arrival_airport,

arrival_delay

FROM

`bigquery-samples`.airline_ontime_data.flights

WHERE

ABS(MOD(FARM_FINGERPRINT(date), 70)) = 0

AND ABS(MOD(FARM_FINGERPRINT(date), 10)) < 8

Wir können nicht 1 aus 70 Zeilen herausgreifen und dann 8 aus 10. Wenn wir Zahlen auswählen, die durch 70 teilbar sind, sind sie natürlich auch durch 10 teilbar! Die zweite Modulo-Operation ist nutzlos.

Hier eine bessere Lösung:

SELECT

date,

airline,

departure_airport,

departure_schedule,

arrival_airport,

arrival_delay

FROM

`bigquery-samples`.airline_ontime_data.flights

WHERE

ABS(MOD(FARM_FINGERPRINT(date), 70)) = 0

AND ABS(MOD(FARM_FINGERPRINT(date), 700)) < 560

In dieser Abfrage ist die 700 gleich 70 * 10, und die 560 ist 70 * 8. Die erste Modulo-Operation greift 1 aus 70 Zeilen heraus und die zweite Modulo-Operation 8 aus 10 dieser Zeilen. Für die Validierungsdaten ersetzen Sie die Bedingung < 560 durch den entsprechenden Bereich:

ABS(MOD(FARM_FINGERPRINT(date), 70)) = 0

AND ABS(MOD(FARM_FINGERPRINT(date), 700)) BETWEEN 560 AND 629

Im obigen Code stammen unsere eine Million Flüge von nur 1/70 der Tage im Datensatz. Dies kann genau das sein, was wir wollen – zum Beispiel können wir das volle Spektrum der Flüge an einem bestimmten Tag modellieren, wenn wir mit dem kleineren Datensatz experimentieren. Wenn wir jedoch 1/70 der Flüge an einem bestimmten Tag haben wollen, müssen wir RAND() verwenden und das Ergebnis als neue Tabelle speichern, um Wiederholbarkeit zu ermöglichen. Aus dieser kleineren Tabelle können wir mit der Funktion FARM_FINGERPRINT() eine Stichprobe von 80 % der Datumswerte entnehmen. Da diese neue Tabelle nur eine Million Zeilen umfasst und nur dem Experimentieren dient, kann die Duplizierung akzeptabel sein.

Sequenzielle Aufteilung

Bei Zeitreihenmodellen ist der Ansatz üblich, sequenzielle Aufteilungen der Daten zu verwenden. Um zum Beispiel ein Nachfragevorhersagemodell zu trainieren, bei dem wir ein Modell auf den Daten der letzten 45 Tage trainieren, um die Nachfrage für die nächsten 14 Tage vorherzusagen, würden wir das Modell (vollständiger Code unter https://github.com/GoogleCloudPlatform/bigquery-oreilly-book/blob/master/blogs/bqml_arima/bqml_arima.ipynb) trainieren, indem wir die notwendigen Daten abrufen:

CREATE OR REPLACE MODEL ch09eu.numrentals_forecast

OPTIONS(model_type='ARIMA',

time_series_data_col='numrentals',

time_series_timestamp_col='date') AS

SELECT

CAST(EXTRACT(date from start_date) AS TIMESTAMP) AS date

, COUNT(*) AS numrentals

FROM

`bigquery-public-data`.london_bicycles.cycle_hire

GROUP BY date

HAVING date BETWEEN

DATE_SUB(CURRENT_DATE(), INTERVAL 45 DAY) AND CURRENT_DATE()

Eine derartige sequenzielle Aufteilung der Daten ist auch in sich schnell verändernden Umgebungen notwendig, selbst wenn das Ziel nicht darin besteht, den Zukunftswert einer Zeitreihe vorherzusagen. Zum Beispiel passen sich in einem Betrugserkennungsmodell die Betrüger schnell an den Betrugsalgorithmus an, und das Modell muss daher ständig auf den neuesten Daten neu trainiert werden, um zukünftigen Betrug vorherzusagen. Es genügt nicht, die Bewertungsdaten aus einem zufälligen Aufteilungsbereich des historischen Datensatzes zu generieren, da das Ziel darin besteht, Verhalten vorherzusagen, das Betrüger in der Zukunft an den Tag legen. Das indirekte Ziel ist das gleiche wie das eines Zeitreihenmodells: Ein gutes Modell wird in der Lage sein, auf historischen Daten zu trainieren und zukünftigen Betrug vorherzusagen. Um dies korrekt zu bewerten, müssen die Daten zeitlich sequenziell aufgeteilt werden, zum Beispiel so (vollständiger Code unter https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/blogs/bigquery_datascience/bigquery_tensorflow.ipynb):

def read_dataset(client, row_restriction, batch_size=2048):

...

bqsession = client.read_session(

...

row_restriction=row_restriction)

dataset = bqsession.parallel_read_rows()

return (dataset.prefetch(1).map(features_and_labels)

.shuffle(batch_size*10).batch(batch_size))

client = BigQueryClient()

train_df = read_dataset(client, 'Time <= 144803', 2048)

eval_df = read_dataset(client, 'Time > 144803', 2048)

Eine sequenzielle Aufteilung der Daten ist auch dann erforderlich, wenn die Daten zwischen aufeinanderfolgenden Zeitpunkten stark korreliert sind. Zum Beispiel sind bei der Wettervorhersage die Wetterdaten von aufeinanderfolgenden Tagen stark korreliert. Daher ist es nicht sinnvoll, den 12. Oktober im Trainingsdatensatz und den 13. Oktober im Testdatensatz unterzubringen, da dann mit erheblichen Datenverlusten zu rechnen ist (stellen Sie sich zum Beispiel vor, dass am 12. Oktober ein Hurrikan durch das Land zieht). Da das Wetter außerdem stark von der Jahreszeit abhängig ist, müssen Tage aus allen Jahreszeiten in allen drei Aufteilungen enthalten sein. Um das Vorhersagemodell richtig zu bewerten, kann man zwar bei einer sequenziellen Aufteilung bleiben, die jahreszeitliche Abhängigkeit aber berücksichtigen, indem man die ersten 20 Tage jedes Monats dem Trainingsdatensatz, die nächsten 5 Tage dem Validierungsdatensatz und die letzten 5 Tage dem Testdatensatz zuordnet.

In allen diesen Fällen verlangt die wiederholbare Aufteilung nur, dass wir die Logik, die für die Aufteilung zuständig ist, in die Versionskontrolle aufnehmen und die Aktualisierung der Modellversion sicherstellen, wenn die Logik geändert wird.

Stratifizierte Aufteilung

Das obige Beispiel dafür, wie sich die Wettermuster zwischen verschiedenen Jahreszeiten unterscheiden, zeigt eine Situation, in der die Aufteilung stattfinden muss, nachdem der Datensatz stratifiziert (geschichtet) wurde. Da wir sicherstellen mussten, dass in jeder Aufteilung Beispiele für alle Jahreszeiten enthalten sind, haben wir den Datensatz zunächst nach Monaten stratifiziert und dann erst aufgeteilt. Die ersten 20 Tage jedes Monats haben wir im Trainingsdatensatz, die nächsten 5 Tage im Validierungsdatensatz und die letzten 5 Tage im Testdatensatz verwendet. Müssten wir uns nicht um die Korrelation zwischen aufeinanderfolgenden Tagen kümmern, könnten wir die Datumswerte innerhalb jedes Monats zufällig aufteilen.

Je größer der Datensatz ist, desto weniger müssen wir uns mit Stratifizierung befassen. Bei sehr großen Datensätzen ist die Wahrscheinlichkeit sehr hoch, dass die Feature-Werte unter allen Aufteilungen gut verteilt sind. Daher ist beim maschinellen Lernen im großen Maßstab eine Stratifizierung nur bei Datensätzen mit schiefen Verteilungen erforderlich. Zum Beispiel umfasst der Flüge-Datensatz weniger als 1 % der Flüge, die vor 6:00 Uhr starten, sodass es möglicherweise nur wenige Flüge gibt, die dieses Kriterium erfüllen. Wenn es für unseren geschäftlichen Anwendungsfall entscheidend ist, das Verhalten dieser Flüge korrekt widerzuspiegeln, sollten wir den Datensatz basierend auf der Stunde des Abflugs stratifizieren und jede Schicht gleichmäßig aufteilen.

Die Abflugzeit war ein Beispiel für ein verzerrtes Feature. In einem unausgewogenen Klassifizierungsproblem (wie zum Beispiel der Betrugserkennung, bei der die Anzahl der Betrugsbeispiele recht gering ist) könnten wir den Datensatz nach dem Label stratifizieren und jede Stratifizierung gleichmäßig aufteilen. Dies ist auch wichtig, wenn bei einem Multilabel-Problem einige der Labels seltener sind als andere. Damit befasst sich »Entwurfsmuster 10: Rebalancing« auf Seite 144 in Kapitel 3.

Unstrukturierte Daten

Obwohl wir uns in diesem Abschnitt auf strukturierte Daten konzentriert haben, gelten die gleichen Prinzipien auch für unstrukturierte Daten wie Bilder, Videos, Audiodateien oder Freiformtexte. Die Aufteilung nehmen Sie einfach anhand der Metadaten vor. Wenn zum Beispiel Videos, die am selben Tag aufgenommen wurden, korreliert sind, nehmen Sie das Erfassungsdatum eines Videos aus dessen Metadaten, um die Videos auf unabhängige Datensätze aufzuteilen. Ähnlich verhält es sich, wenn Textrezensionen von derselben Person korreliert sind: Verwenden Sie den Farm-Fingerprint der user_id des Rezensenten, um wiederholt Rezensionen zwischen den Datensätzen aufzuteilen. Wenn die Metadaten nicht verfügbar sind oder es keine Korrelation zwischen den Instanzen gibt, codieren Sie das Bild oder Video mit Base64-Codierung und berechnen den Fingerabdruck der codierten Daten.

Eine einfache Methode, Textdatensätze aufzuteilen, verwendet den Hashwert des Texts selbst für die Aufteilung. Allerdings ähnelt dies einer zufälligen Aufteilung und geht nicht auf das Problem der Korrelationen zwischen Rezensionen ein. Wenn zum Beispiel eine Person in ihren negativen Rezensionen häufig das Wort »atemberaubend« verwendet oder wenn jemand sämtliche Star-Wars-Filme als schlecht bewertet, sind deren Rezensionen korreliert. In ähnlicher Weise lassen sich Bild- oder Audiodatensätze aufteilen, indem man den Hashwert des Dateinamens für die Aufteilung verwendet. Allerdings löst dies nicht das Problem von Korrelationen zwischen Bildern oder Videos. Es lohnt sich also, genau darüber nachzudenken, wie man einen Datensatz am besten aufteilt. Unserer Erfahrung nach lassen sich viele Probleme mit schlechter ML-Performance beheben, indem man potenzielle Korrelationen bei der Datenaufteilung (und Datenerfassung) einkalkuliert.

Bei der Berechnung von Einbettungen oder dem Vortraining von Autoencodern sollten wir sicherstellen, dass wir zuerst die Daten aufteilen und diese Vorberechnungen nur auf dem Trainingsdatensatz durchführen. Deshalb sollte das Aufteilen nicht auf den Einbettungen der Bilder, Videos oder Texte erfolgen, es sei denn, diese Einbettungen wurden auf einem gänzlich separaten Datensatz erzeugt.

Entwurfsmuster 23: Bridged Schema

Das Entwurfsmuster Bridged Schema bietet Möglichkeiten, die zum Training eines Modells verwendeten Daten von einem älteren, ursprünglichen Datenschema an neuere, bessere Daten anzupassen. Dieses Muster ist nützlich, denn wenn der Provider von Eingabedaten Verbesserungen an seinem Datenfeed vornimmt, dauert es oft eine Weile, bis genügend Daten des verbesserten Schemas gesammelt sind, um ein Ersatzmodell angemessen trainieren zu können. Das Entwurfsmuster Bridged Schema versetzt uns in die Lage, möglichst viele der verfügbaren neueren Daten zu verwenden, sie aber um einige der älteren Daten aufzustocken, um die Modellgenauigkeit zu verbessern.

Problem

Betrachten Sie eine POS-Anwendung (POS – Point of Sale, also Kasse, Verkaufsstelle), die vorschlägt, wie viel Trinkgeld man einem Zusteller geben sollte. Die Anwendung könnte ein ML-Modell verwenden, das den Trinkgeldbetrag vorhersagt und dabei die Bestellmenge, die Lieferzeit, die Lieferentfernung usw. berücksichtigt. Ein derartiges Modell würde man auf den tatsächlich von Kunden gegebenen Trinkgeldern trainieren.

Eine der Eingaben in das Modell soll die Zahlungsart sein. In den historischen Daten ist dies als »Bar« oder »Karte« verzeichnet. Nehmen wir nun aber an, dass das Zahlungssystem aktualisiert wurde und es jetzt mehr Details über die Art der verwendeten Karte bereitstellt (Geschenkkarte, Debitkarte, Kreditkarte). Diese Information ist äußerst nützlich, da das Trinkgeldverhalten zwischen den drei Arten von Karten variiert.

Zum Zeitpunkt der Vorhersage werden die neueren Informationen stets verfügbar sein, da wir immer die Trinkgeldbeträge für Transaktionen vorhersagen, die nach der Aktualisierung des Zahlungssystems durchgeführt wurden. Da die neuen Informationen äußerst wertvoll sind und dem Vorhersagesystem bereits in der Produktion zur Verfügung stehen, möchten wir sie sobald wie möglich im Modell verwenden.

Wir können ein neues Modell nicht ausschließlich auf den neueren Daten trainieren, da die Menge der neuen Daten ziemlich klein sein wird, denn sie ist auf die Transaktionen nach der Aktualisierung des Zahlungssystems beschränkt. Da die Qualität eines ML-Modells stark von der Datenmenge abhängt, mit der es trainiert wird, schneidet ein Modell, das nur mit den neuen Daten trainiert wird, höchstwahrscheinlich schlecht ab.

Lösung

Die Lösung besteht darin, das Schema der alten Daten zu überbrücken, um den neuen Daten zu entsprechen. Dann trainieren wir ein ML-Modell mit möglichst vielen der verfügbaren neuen Daten und stocken es mit den älteren Daten auf. Zwei Fragen sind nun zu beantworten. Erstens: Wie gleichen wir die Tatsache aus, dass die älteren Daten nur zwei Kategorien für die Zahlungsart haben, während die neuen Daten vier Kategorien unterscheiden? Zweitens: Wie erfolgt die Aufstockung, um Datensätze für Training, Validierung und Testen zu erzeugen?

Bridged Schema

Betrachten wir den Fall, in dem die älteren Daten zwei Kategorien umfassen (Bargeld und Karte). Im neuen Schema ist die Kartenkategorie jetzt viel granularer (Geschenkkarte, Debitkarte, Kreditkarte). Was wir wissen, ist, dass eine in den alten Daten als »card« codierte Transaktion zu einer dieser Arten gehört hätte, der tatsächliche Typ aber nicht aufgezeichnet worden ist. Es ist möglich, das Schema probabilistisch oder statisch zu überbrücken. Wir empfehlen die statische Methode, doch das Ganze ist einfacher zu verstehen, wenn wir zuerst die probabilistische Methode durchgehen.

Probabilistische Methode.Angenommen, wir schätzten anhand der neueren Trainingsdaten, dass sich die Kartentransaktionen auf 10 % Geschenkkarten, 30 % Debitkarten und 60 % Kreditkarten verteilen. Jedes Mal, wenn ein älteres Trainingsbeispiel in das Trainerprogramm geladen wird, könnten wir den Kartentyp auswählen, indem wir eine gleichverteilte Zufallszahl im Bereich [0, 100) erzeugen und eine Geschenkkarte wählen, wenn die Zufallszahl kleiner als 10 ist, eine Debitkarte, wenn sie im Bereich [10, 40) liegt, und sonst eine Kreditkarte. Vorausgesetzt, dass wir für genügend Epochen trainieren, würde jedes Trainingsbeispiel durch alle drei Kategorien dargestellt, aber proportional zu ihrer tatsächlichen Vorkommenshäufigkeit. Die neueren Trainingsbeispiele würden natürlich immer die tatsächlich aufgezeichnete Kategorie haben.

Der probabilistische Ansatz ist insofern gerechtfertigt, dass wir jedes ältere Beispiel so behandeln, als wäre es schon Hunderte Male vorgekommen. Wenn der Trainer die Daten durchläuft, simulieren wir in jeder Epoche eine dieser Instanzen. In der Simulation erwarten wir, dass in 10 % der Fälle, in denen die Transaktion mit Karte abgewickelt wurde, dies mit einer Geschenkkarte erfolgt ist. Deshalb wählen wir in 10 % der Fälle »gift card« als Wert der kategorialen Eingabe aus. Dies ist natürlich eine vereinfachte Darstellung – nur weil Geschenkkarten in insgesamt 10 % der Fälle verwendet werden, trifft es nicht zu, dass Geschenkkarten in 10 % der Fälle für eine bestimmte Transaktion verwendet werden. Ein extremes Beispiel könnte sein, dass Taxiunternehmen die Verwendung von Geschenkkarten bei Fahrten zum Flughafen untersagen, sodass eine Geschenkkarte für einige historische Beispiele nicht einmal ein legaler Wert wäre. Allerdings nehmen wir in Ermangelung zusätzlicher Informationen an, dass die Häufigkeitsverteilung für sämtliche historischen Beispiele die gleiche ist.

Statische Methode.Kategoriale Variablen werden in der Regel 1-aus-n-codiert. Wenn wir dem oben beschriebenen probabilistischen Ansatz folgen und lange genug trainieren, wird der durchschnittliche 1-aus-n-codierte Wert, der dem Trainingsprogramm für »card« in den älteren Daten präsentiert wird, [0, 0.1, 0.3, 0.6] lauten. Die erste 0 entspricht der Kategorie »cash«. Die zweite Zahl ist 0.1, weil diese Zahl bei 10 % der Kartentransaktionen 1 und in allen anderen Fällen 0 ist. Ähnlich ist es bei 0.3 für Debitkarten und 0.6 für Kreditkarten.

Um die älteren Daten in das neuere Schema zu überführen, können wir die älteren kategorialen Daten in diese Darstellung transformieren, wobei wir die A-priori-Wahrscheinlichkeit der neuen Klassen einfügen, wie wir sie aus den Trainingsdaten geschätzt haben. Die neueren Daten werden hingegen [0, 0, 1, 0] für eine Transaktion lauten, von der bekannt ist, dass sie mit einer Debitkarte abgewickelt wurde.

Wir empfehlen die statische Methode statt der probabilistischen Methode, weil sie effektiv das ist, was passiert, wenn die probabilistische Methode lange genug läuft. Sie ist auch viel einfacher zu implementieren, da jede Kartenzahlung aus den alten Daten genau den gleichen Wert hat (das vierelementige Array [0, 0.1, 0.3, 0.6]). Die älteren Daten können wir in einer Codezeile aktualisieren, anstatt ein Skript zu schreiben, um Zufallszahlen wie in der probabilistischen Methode zu erzeugen. Zudem ist sie auch rechentechnisch viel weniger aufwendig.

Erweiterte Daten

Um die neuen Daten möglichst gut nutzen zu können, sollten Sie nur zwei Aufteilungen der Daten verwenden, was in »Entwurfsmuster 12: Checkpoints« auf Seite 174 in Kapitel 4 erörtert wird. Angenommen, wir hätten eine Million Beispiele mit dem alten Schema zur Verfügung, aber nur 5.000 Beispiele mit dem neuen Schema. Wie sollten wir die Trainings- und Bewertungsdatensätze erstellen?

Nehmen wir zuerst den Bewertungsdatensatz. Wichtig ist, zu erkennen, dass das Training eines ML-Modells den Zweck hat, Vorhersagen auf bisher nicht gesehenen Daten zu treffen. Die ungesehenen Daten sind in unserem Fall ausschließlich Daten, die dem neuen Schema entsprechen. Demzufolge müssen wir eine ausreichende Anzahl von Beispielen aus den neuen Daten beiseitelegen, um die Verallgemeinerungsleistung adäquat zu bewerten. Vielleicht brauchen wir 2.000 Beispiele in unserem Bewertungsdatensatz, um sicher zu sein, dass das Modell auch in der Produktion gut abschneidet. Der Bewertungsdatensatz enthält keinerlei ältere Beispiele, die an das neue Schema angepasst wurden.

Woher wissen wir, ob wir 1.000 Beispiele im Bewertungsdatensatz benötigen oder 2.000? Um diese Zahl zu schätzen, berechnen wir die Bewertungsmetrik des aktuellen Produktionsmodells (das auf dem alten Schema trainiert wurde) auf Teilmengen seines Bewertungsdatensatzes und bestimmen, wie groß die Teilmenge sein muss, damit die Bewertungsmetrik konsistent ist. Die Bewertungsmetrik auf verschiedenen Teilmengen lässt sich mit dem folgenden Code berechnen (wobei der vollständige Code wie üblich im Code-Repository für dieses Buch auf GitHub unter https://github.com/GoogleCloudPlatform/ml-design-patterns/blob/master/06_reproducibility/bridging_schema.ipynb zu finden ist):

for subset_size in range(100, 5000, 100):

sizes.append(subset_size)

# Variabilität der Bewertungsmetrik auf dieser

# Teilmengengröße über 25 Versuche berechnen.

scores = []

for x in range(1, 25):

indices = np.random.choice(N_eval,

size=subset_size, replace=False)

scores.append(

model.score(df_eval[indices],

df_old.loc[N_train+indices, 'tip'])

)

score_mean.append(np.mean(scores))

score_stddev.append(np.std(scores))

Im obigen Code probieren wir Bewertungsgrößen von 100, 200, …, 5.000 aus. Für jede Teilmengengröße bewerten wir das Modell 25-mal – jedes Mal auf einer anderen, zufällig ausgewählten Teilmenge der vollständigen Auswertungsmenge. Da dies die Bewertungsmenge des aktuellen Produktionsmodells ist (das wir mit einer Million Beispielen trainieren können), könnte der Bewertungsdatensatz hier Hunderttausende von Beispielen enthalten. Wir können dann die Standardabweichung der Bewertungsmetrik über die 25 Teilmengen berechnen, dies bei verschiedenen Bewertungsgrößen wiederholen und diese Standardabweichung über der Bewertungsgröße grafisch darstellen. Das resultierende Diagramm sieht dann etwa wie das in Abbildung 6-3 aus.

image

Abbildung 6-3: Um die Anzahl der benötigten Bewertungsbeispiele zu ermitteln, werten Sie das Produktionsmodell auf Teilmengen variierender Größen aus und verfolgen die Variabilität der Bewertungsmetrik nach der Größe der Teilmenge. Hier geht die Standardabweichung ab etwa 2.000 Beispielen in ein Plateau über.

Aus Abbildung 6-3 geht hervor, dass die Anzahl der Bewertungsbeispiele mindestens 2.000 betragen muss und besser noch bei 3.000 oder mehr liegen sollte. Für den Rest dieser Diskussion gehen wir davon aus, dass wir die Bewertung mit 2.500 Beispielen vornehmen.

Der Trainingsdatensatz würde die übrigen 2.500 neuen Beispiele enthalten (die Menge der verfügbaren neuen Daten, nachdem 2.500 für die Bewertung zurückgehalten wurden), ergänzt durch einige ältere Beispiele, die auf das neue Schema angepasst wurden. Woher wissen wir, wie viele ältere Beispiele wir brauchen? Wir wissen es nicht. Dies ist ein Hyperparameter, den wir optimieren müssen. So erkennen wir beim Trinkgeldproblem mithilfe der Rastersuche aus Abbildung 6-4, dass die Bewertungsmetrik bis 20.000 Beispiele steil abfällt und dann ein Plateau erreicht (das Notebook mit allen Details finden Sie auf GitHub unter https://github.com/GoogleCloudPlatform/ml-design-patterns/blob/master/06_reproducibility/bridging_schema.ipynb).

image

Abbildung 6-4: Die Anzahl der zu überbrückenden älteren Beispiele durch Hyperparameter-Optimierung ermitteln. In diesem Fall ist es offensichtlich, dass es nach 20.000 überbrückten Beispielen rückläufige Erträge gibt.

Um beste Ergebnisse zu erreichen, sollten wir die kleinste Anzahl älterer Beispiele wählen, mit der wir auskommen können – im Idealfall werden wir uns immer weniger auf überbrückte Beispiele stützen, wenn die Anzahl der Beispiele im Laufe der Zeit zunimmt. Irgendwann werden wir in der Lage sein, auf die älteren Beispiele gänzlich zu verzichten.

Erwähnenswert ist, dass bei diesem Problem die Überbrückung vorteilhaft ist, da die Bewertungsmetrik ohne die überbrückten Beispiele schlechter ist. Wenn dies nicht der Fall ist, sollte man wieder die Imputationsmethode ins Auge fassen (die Methode zur Auswahl des statischen Werts, der für die Überbrückung verwendet wird). Im nächsten Abschnitt schlagen wir eine alternative Imputationsmethode (Kaskade) vor.

image

Es ist äußerst wichtig, die Performance des neueren Modells, das auf überbrückten Beispielen trainiert wird, mit der Performance des älteren, unveränderten Modells auf dem Bewertungsdatensatz zu vergleichen. Es könnte sein, dass die neuen Informationen noch keinen ausreichenden Wert haben.

 

Da wir anhand des Bewertungsdatensatzes testen, ob das überbrückte Modell einen Wert hat oder nicht, ist es wichtig, dass der Bewertungsdatensatz weder während des Trainings noch beim Optimieren der Hyperparameter verwendet wird. Daher müssen Techniken wie Early Stopping oder Checkpoint-Auswahl vermieden werden. Verwenden Sie stattdessen Regularisierung, um Überanpassung zu steuern. Der Trainingsverlust muss als Metrik für die Hyperparameter-Abstimmung dienen. In der Diskussion zum Entwurfsmuster Checkpoints in Kapitel 4 finden Sie weitere Details dazu, wie Sie Daten einsparen können, indem Sie lediglich zwei Aufteilungen verwenden.

Kompromisse und Alternativen

Sehen Sie sich einen häufig vorgeschlagenen Ansatz an, der allerdings nicht funktioniert, sowie eine komplexe Alternative zum Überbrücken und eine Erweiterung der Lösung für ein ähnliches Problem.

Vereinigungsschema

Es kann verlockend sein, die älteren und neueren Schemas einfach zu vereinigen. Zum Beispiel könnten wir das Schema für die Zahlungsart mit fünf möglichen Werten definieren: Bargeld, Karte, Geschenkkarte, Debitkarte und Kreditkarte. Das macht sowohl die historischen Daten als auch die neueren Daten gültig, und das ist der Ansatz, den wir im Data Warehousing bei solchen Änderungen anwenden würden. Auf diese Weise sind die alten Daten und die neuen Daten gültig, und zwar so, wie sie sind, und ohne Änderungen.

Der abwärtskompatible Ansatz Vereinigung von Schemas funktioniert jedoch nicht für maschinelles Lernen.

Zur Vorhersagezeit bekommen wir niemals den Wert »card« für die Zahlungsart, weil die Eingabeprovider ausnahmslos aktualisiert wurden. Unterm Strich waren alle diese Trainingsinstanzen umsonst. Für die Reproduzierbarkeit (deshalb ist dieses Muster als Reproduzierbarkeitsmuster klassifiziert) müssen wir das ältere Schema in das neuere Schema überführen und können die beiden Schemas nicht vereinigen.

Die Methode Kaskadieren

In der Statistik versteht man unter Imputation eine Reihe von Techniken, mit denen sich fehlende Daten durch gültige Werte vervollständigen lassen. Eine gängige Imputationstechnik ersetzt einen NULL-Wert durch den Mittelwert dieser Spalte in den Trainingsdaten. Warum wählen wir den Mittelwert? Weil in Ermangelung weiterer Informationen und unter der Annahme, dass die Werte normalverteilt sind, der wahrscheinlichste Wert der Mittelwert ist.

Die in der Hauptlösung erörterte statische Methode, a priori Häufigkeiten zuzuweisen, ist ebenfalls eine Imputationsmethode. Wir nehmen an, dass die kategoriale Variable entsprechend einem Häufigkeitsdiagramm (das wir aus den Trainingsdaten schätzen) verteilt ist, und imputieren den mittleren 1-aus-n-codierten Wert (gemäß dieser Häufigkeitsverteilung) der »fehlenden« kategorialen Variablen.

Kennen wir eine andere Möglichkeit, unbekannte Werte anhand von Beispielen zu schätzen? Ja, natürlich! Maschinelles Lernen. Wir können nämlich eine Kaskade von Modellen trainieren (siehe »Entwurfsmuster 8: Kaskade« auf Seite 130 in Kapitel 3). Das erste Modell verwendet die neuen Beispiele, die verfügbar sind, um ein ML-Modell zu trainieren und den Kartentyp vorherzusagen. Wenn das ursprüngliche Trinkgeld-Modell fünf Eingänge hat, wird dieses Modell vier Eingänge haben. Der fünfte Eingang (die Zahlungsart) wird das Label für dieses Modell sein. Die Ausgabe des ersten Modells wird dann verwendet, um das zweite Modell zu trainieren.

In der Praxis fügt das Kaskadenmuster zu viel Komplexität für etwas hinzu, das als temporäre Abhilfe gedacht ist, bis genügend neue Daten zur Verfügung stehen. Die statische Methode ist im Grunde das einfachste ML-Modell – es ist das Modell, das wir erhalten würden, wenn wir uninformative Eingaben hätten. Wir empfehlen den statischen Ansatz und die Verwendung des Entwurfsmusters Kaskade nur, wenn die statische Methode nicht gut genug funktioniert.

Mit neuen Features umgehen

Eine Überbrückung könnte auch dann erforderlich sein, wenn der Provider der Eingabedaten zusätzliche Informationen dem Eingabestrom hinzufügt. So könnten wir in dem Beispiel des Taxitarifs Daten darüber empfangen, ob die Scheibenwischer des Taxis eingeschaltet sind oder ob sich das Fahrzeug bewegt. Aus diesen Daten können wir ein Feature erstellen, das aussagt, ob es zu Beginn der Taxifahrt geregnet hat, wie viel Prozent der Fahrzeit das Taxi im Leerlauf war usw.

Wenn wir neue Eingabe-Features haben, die wir sofort verwenden wollen, sollten wir die älteren Daten überbrücken (wo dieses neue Feature fehlen wird), indem wir einen Wert für das neue Feature imputieren. Empfohlene Imputationswerte sind:

Wenn das Feature angibt, ob es regnet oder nicht, ist es boolesch, und somit würde der imputierte Wert beispielsweise 0,02 sein, wenn es im Trainingsdatensatz 2 % der Zeit regnet. Ist das Feature der Anteil der Leerlaufminuten, könnten wir den Medianwert verwenden. Der Ansatz mit dem Entwurfsmuster Kaskade bleibt für alle diese Fälle praktikabel, wobei aber eine statische Imputation einfacher und oftmals ausreichend ist.

Mit Genauigkeitserhöhungen umgehen

Wenn der Provider der Eingaben die Genauigkeit seines Datenstroms erhöht, folgen Sie dem Überbrückungsansatz, um einen Trainingsdatensatz zu erstellen, der aus Daten mit höherer Auflösung besteht und mit einigen der älteren Daten ergänzt wird.

Bei Gleitkommawerten ist es nicht notwendig, die älteren Daten explizit zu überbrücken, um sie an die Genauigkeit der neueren Daten anzupassen. Um den Grund dafür zu sehen, betrachten Sie den Fall, in dem einige Daten ursprünglich mit einer Dezimalstelle (z. B. 3,5 oder 4,2) bereitgestellt wurden, jetzt aber mit zwei Dezimalstellen angegeben werden (z. B. 3,48 oder 4,23). Wenn wir annehmen, dass 3,5 in den älteren Daten aus Werten besteht, die in den neueren Daten im Bereich [3,45; 3,55] gleichverteilt1 sind, wäre der statistisch imputierte Wert 3,5, was genau der Wert ist, der in den älteren Daten gespeichert ist.

Bei kategorialen Werten – wenn zum Beispiel die älteren Daten den Ort als state oder provincial code gespeichert haben und die neueren Daten county oder district code liefern – verwenden Sie die Häufigkeitsverteilung der county-Werte innerhalb der state-Werte, wie in der Hauptlösung beschrieben, um eine statische Imputation zu realisieren.

Entwurfsmuster 24: Windowed Inference

Das Entwurfsmuster Windowed Inference behandelt Modelle, die eine fortlaufende Sequenz von Instanzen benötigen, um die Inferenz auszuführen. Dieses Muster funktioniert, indem der Modellstatus externalisiert und das Modell von einer Streamanalyse-Pipeline aufgerufen wird. Nützlich ist dieses Muster auch, wenn ein ML-Modell Features benötigt, die aus Aggregaten über Zeitfenster berechnet werden müssen. Indem der Zustand in eine Stream-Pipeline externalisiert wird, stellt das Entwurfsmuster Windowed Inference sicher, dass dynamisch und zeitabhängig berechnete Features zwischen Training und Serving korrekt wiederholt werden können. Es ist eine Möglichkeit, um bei zeitlichen Aggregat-Features eine Verzerrung zwischen Training und Serving zu vermeiden.

Problem

Sehen Sie sich die Ankunftsverspätungen am Flughafen Dallas Fort Worth (DFW) an, die in Abbildung 6-5 für einige Tage im Mai 2010 dargestellt sind (das vollständige Notebook finden Sie auf GitHub unter https://github.com/GoogleCloudPlatform/ml-design-patterns/blob/master/06_reproducibility/stateful_stream.ipynb).

image

Abbildung 6-5: Ankunftsverspätungen am Flughafen Dallas Fort Worth (DFW) am 10. und 11. Mai 2010. Ungewöhnliche Ankunftsverspätungen sind mit einem Punkt markiert.

Die Ankunftsverspätungen weisen eine beträchtliche Variabilität auf, doch es ist trotzdem möglich, ungewöhnlich große Ankunftsverspätungen (markiert durch einen Punkt) festzustellen. Beachten Sie, dass die Definition von »ungewöhnlich« je nach Kontext variiert. Am frühen Morgen (linke Ecke des Diagramms) sind die meisten Flüge pünktlich, sodass selbst die kleine Spitze anomal ist. Mitten am Tag (nach 12 Uhr am 10. Mai) steigt die Variabilität an, und Verspätungen von 25 Minuten sind recht häufig, aber eine 75-minütige Verspätung ist dennoch ungewöhnlich.

Ob eine bestimmte Verspätung anomal ist, hängt vom zeitlichen Kontext ab, beispielsweise von den Ankunftsverspätungen, die in den letzten zwei Stunden beobachtet wurden. Um festzustellen, ob eine Verspätung anomal ist, müssen wir den Dataframe zunächst nach der Zeit sortieren, wie im Diagramm in Abbildung 6-5 und hier in Pandas zu sehen:

df = df.sort_values(by='scheduled_time').set_index('scheduled_time')

Dann müssen wir eine Funktion zur Anomalieerkennung auf gleitende Fenster von zwei Stunden anwenden:

df['delay'].rolling('2h').apply(is_anomaly, raw=False)

Die Funktion zur Anomalieerkennung is_anomaly kann recht anspruchsvoll sein, doch wir nehmen den einfachen Fall, indem wir Extremwerte verwerfen und einen Datenwert als anomal bezeichnen, wenn er mehr als vier Standardabweichungen vom Mittelwert im Zweistundenfenster abweicht:

def is_anomaly(d):

outcome = d[-1] # das letzte Element

# Kleinst- und Größtwert sowie aktuelles (letztes) Element verwerfen.

xarr = d.drop(index=[d.idxmin(), d.idxmax(), d.index[-1]])

prediction = xarr.mean()

acceptable_deviation = 4 * xarr.std()

return np.abs(outcome - prediction) > acceptable_deviation

Dies funktioniert bei historischen (Trainings-)Daten, da der gesamte Dataframe verfügbar ist. Doch wenn wir Rückschlüsse aus unserem Produktionsmodell ziehen, steht uns natürlich nicht der gesamte Dataframe zur Verfügung. In der Produktion empfangen wir die Informationen über die Ankunft von Flügen einzeln, wenn jeder Flug eintrifft. Somit haben wir lediglich einen einzelnen Verzögerungswert zu einem Zeitstempel:

2010-02-03 08:45:00,19.0

Wenn der obige Flug (um 08:45 Uhr am 3. Februar) 19 Minuten Verspätung hat, stellt sich die Frage, ob diese Verspätung üblich ist oder nicht. Um ML-Inferenz für einen Flug durchzuführen, brauchen wir normalerweise nur die Features dieses Flugs. In diesem Fall benötigt das Modell jedoch Informationen über alle Flüge zum Flughafen DFW zwischen 06:45 und 08:45 Uhr:

2010-02-03 06:45:00,?

2010-02-03 06:?:00,?

...

2010-02-03 08:45:00,19.0

Es ist nicht möglich, Inferenz für einen Flug nach dem anderen durchzuführen. Wir müssen dem Modell irgendwie Informationen über alle vorherigen Flüge bereitstellen.

Wie führen wir Inferenz aus, wenn das Modell nicht nur eine Instanz verlangt, sondern eine Sequenz von Instanzen?

Lösung

Die Lösung ist eine zustandsbehaftete Streamverarbeitung – d. h. eine Streamverarbeitung, die den Modellzustand über die Zeit hinweg verfolgt:

Für Streaming-Pipelines können wir Apache Beam verwenden, weil dann derselbe Code sowohl auf den historischen Daten als auch auf neu eintreffenden Daten funktioniert. In Apache Beam richten Sie das gleitende Fenster wie folgt ein (den vollständigen Code finden Sie auf GitHub unter https://github.com/GoogleCloudPlatform/ml-design-patterns/blob/master/06_reproducibility/find_anomalies_model.py):

windowed = (data

| 'window' >> beam.WindowInto(

beam.window.SlidingWindows(2 * 60 * 60, 10*60))

Das Modell wird aktualisiert, indem alle in den letzten zwei Stunden gesammelten Flugdaten kombiniert und an eine Funktion, die wir ModelFn nennen, übergeben werden:

model_state = (windowed

| 'model' >> beam.transforms.CombineGlobally(ModelFn()))

ModelFn aktualisiert den internen Modellzustand mit Fluginformationen. Hier besteht der interne Modellzustand aus einem Pandas-Dataframe, der mit den Flügen im Fenster aktualisiert wird:

class ModelFn(beam.CombineFn):

def create_accumulator(self):

return pd.DataFrame()

def add_input(self, df, window):

return df.append(window, ignore_index=True)

Bei jedem Schließen des Fensters wird die Ausgabe extrahiert. Hier besteht die Ausgabe (die wir als externalisierten Modellzustand bezeichnen) aus den Modellparametern:

def extract_output(self, df):

if len(df) < 1:

return {}

orig = df['delay'].values

xarr = np.delete(orig, [np.argmin(orig), np.argmax(orig)])

return {

'prediction': np.mean(xarr),

'acceptable_deviation': 4 * np.std(xarr)

}

Der externalisierte Modellzustand wird alle zehn Minuten basierend auf einem rollierenden Zweistundenfenster aktualisiert.

Schließzeit des Fensters

prediction (Vorhersage)

acceptable_deviation (akzeptierte Abweichung)

2010-05-10T06:35:00

-2.8421052631578947

10.48412597725367

2010-05-10T06:45:00

-2.6818181818181817

12.083729926046008

2010-05-10T06:55:00

-2.9615384615384617

11.765962341537781

Der oben gezeigte Code zum Extrahieren der Modellparameter ähnelt dem im Pandas-Fall, wird aber innerhalb einer Beam-Pipeline ausgeführt. Dadurch kann der Code im Streaming arbeiten, aber der Modellstatus ist nur im Kontext des gleitenden Fensters verfügbar. Um bei jedem eintreffenden Flug eine Inferenz durchzuführen, müssen wir den Modellzustand externalisieren (ähnlich wie wir die Modellgewichte im Entwurfsmuster Zustandslose Serving-Funktion in eine Datei exportiert haben, um sie vom Kontext des Trainingsprogramms, in dem diese Gewichte berechnet werden, zu entkoppeln):

model_external = beam.pvalue.AsSingleton(model_state)

Anhand dieses externalisierten Zustands lässt sich erkennen, ob ein bestimmter Flug anomal ist oder nicht:

def is_anomaly(flight, model_external_state):

result = flight.copy()

error = flight['delay'] - model_external_state['prediction']

tolerance = model_external_state['acceptable_deviation']

result['is_anomaly'] = np.abs(error) > tolerance

return result

Die Funktion is_anomaly wird dann auf jedes Element im letzten Bereich des gleitenden Fensters angewendet:

anomalies = (windowed

| 'latest_slice' >> beam.FlatMap(is_latest_slice)

| 'find_anomaly' >> beam.Map(is_anomaly, model_external))

Kompromisse und Alternativen

Die oben vorgeschlagene Lösung ist im Fall von Datenströmen mit hohem Durchsatz rechentechnisch effizient, lässt sich aber weiter verbessern, wenn die ML-Modellparameter online aktualisiert werden können. Dieses Muster ist zudem anwendbar auf zustandsbehaftete ML-Modelle wie zum Beispiel rekurrente neuronale Netze und wenn ein zustandsloses Modell zustandsbehaftete Eingabe-Features erfordert.

Den Rechenaufwand verringern

Im Abschnitt »Problem« weiter oben haben wir den folgenden Pandas-Code verwendet:

dfw['delay'].rolling('2h').apply(is_anomaly, raw=False);

Hingegen sieht der Beam-Code im Abschnitt »Lösung« so aus:

windowed = (data

| 'window' >> beam.WindowInto(

beam.window.SlidingWindows(2 * 60 * 60, 10*60))

model_state = (windowed

| 'model' >> beam.transforms.CombineGlobally(ModelFn()))

Signifikante Unterschiede zwischen dem gleitenden Fenster in Pandas und dem gleitenden Fenster in Apache Beam ergeben sich daraus, wie oft die Funktion is_anomaly aufgerufen wird und wie oft die Modellparameter (Mittelwert und Standardabweichung) berechnet werden müssen. Die folgenden Abschnitte gehen näher darauf ein.

Pro Element im Vergleich zu einem Zeitintervall.Im Pandas-Code wird die Funktion is_anomaly auf jeder Instanz im Datensatz aufgerufen. Der Code zur Anomalieerkennung berechnet die Modellparameter und wendet sie unmittelbar auf das letzte Element im Fenster an. In der Beam-Pipeline wird der Modellzustand ebenfalls auf jedem gleitenden Fenster erzeugt, wobei aber das gleitende Fenster in diesem Fall auf der Zeit basiert. Daher werden die Modellparameter nur einmal alle zehn Minuten berechnet.

Die Anomalieerkennung selbst wird auf jeder Instanz durchgeführt:

anomalies = (windowed

| 'latest_slice' >> beam.FlatMap(is_latest_slice)

| 'find_anomaly' >> beam.Map(is_anomaly, model_external))

Hier wird das rechenintensive Training sorgfältig von der rechentechnisch preiswerten Inferenz getrennt. Der rechenintensive Teil wird nur alle zehn Minuten ausgeführt, während jede Instanz als anomal oder nicht anomal klassifiziert werden kann.

Datenströme mit hohem Durchsatz.Das Datenvolumen nimmt ständig zu, und das ist zu einem großen Teil auf Echtzeitdaten zurückzuführen. Folglich muss dieses Muster auf Datenströme mit hohem Durchsatz angewendet werden – Ströme, bei denen die Anzahl der Elemente über Tausende Elemente pro Sekunde liegen kann. Denken Sie zum Beispiel an Klickströme von Websites oder Streams der Maschinenaktivität von Computern, tragbaren Geräten oder Autos.

Die vorgeschlagene Lösung mit einer Streaming-Pipeline hat den Vorteil, dass das Modell nicht bei jeder Instanz neu trainiert werden muss, was beim Pandas-Code laut Aussage im Abschnitt »Problem« der Fall ist. Allerdings gibt die vorgeschlagene Lösung diese Gewinne wieder zurück, indem sie einen speicherinternen Dataframe aller empfangenen Datensätze erstellt. Wenn wir 5.000 Elemente pro Sekunde empfangen, wird der Dataframe über zehn Minuten drei Millionen Datensätze enthalten. Da es zwölf gleitende Fenster gibt, die zu jedem Zeitpunkt verwaltet werden müssen (Zehnminutenfenster, jedes über zwei Stunden), kann der Speicherbedarf beträchtlich werden.

Wenn alle empfangenen Datensätze gespeichert werden, um die Modellparameter am Ende des Fensters zu berechnen, kann dies zu Problemen führen. Hat der Datenstrom einen hohen Durchsatz, ist es unabdingbar, die Modellparameter mit jedem Element aktualisieren zu können. Dies lässt sich erreichen, indem man ModelFn wie folgt ändert (vollständiger Code auf GitHub unter https://github.com/GoogleCloudPlatform/ml-design-patterns/blob/master/06_reproducibility/find_anomalies_model.py):

class OnlineModelFn(beam.CombineFn):

...

def add_input(self, inmem_state, input_dict):

(sum, sumsq, count) = inmem_state

input = input_dict['delay']

return (sum + input, sumsq + input*input, count + 1)

def extract_output(self, inmem_state):

(sum, sumsq, count) = inmem_state

...

mean = sum / count

variance = (sumsq / count) - mean*mean

stddev = np.sqrt(variance) if variance > 0 else 0

return {

'prediction': mean,

'acceptable_deviation': 4 * stddev

}

...

Der Hauptunterschied besteht darin, dass im Arbeitsspeicher nur drei Gleitkommazahlen (sum, sum2, count), die zum Extrahieren des Modellzustands erforderlich sind, und nicht das gesamte Dataframe der empfangenen Instanzen gehalten werden. Wenn jeweils nur eine Instanz der Modellparameter aktualisiert wird, spricht man von einer Online-Aktualisierung, die nur möglich ist, wenn das Modelltraining keine Iteration über den gesamten Datensatz erfordert. Daher wird in der obigen Implementierung die Varianz berechnet, indem eine Summe von x2 verwaltet wird, sodass sich ein zweiter Durchlauf durch die Daten erübrigt, nachdem der Mittelwert berechnet wurde.

Streaming SQL

Besteht unsere Infrastruktur aus einer hochperformanten SQL-Datenbank, die Streaming-Daten verarbeiten kann, lässt sich das Entwurfsmuster Windowed Inference in einer alternativen Form als Aggregationsfenster implementieren (vollständiger Code auf GitHub unter https://github.com/GoogleCloudPlatform/ml-design-patterns/blob/master/06_reproducibility/find_anomalies_model.py).

Wir ziehen die Flugdaten aus BigQuery heraus:

WITH data AS (

SELECT

PARSE_DATETIME('%Y-%m-%d-%H%M',

CONCAT(CAST(date AS STRING),

'-', FORMAT('%04d', arrival_schedule))

) AS scheduled_arrival_time,

arrival_delay

FROM `bigquery-samples.airline_ontime_data.flights`

WHERE arrival_airport = 'DFW' AND SUBSTR(date, 0, 7) = '2010-05'

),

Dann erstellen wir den Modellzustand model_state, indem wir die Modellparameter über ein Zeitfenster berechnen, das als zwei Stunden vor bis eine Sekunde vor spezifiziert ist:

model_state AS (

SELECT

scheduled_arrival_time,

arrival_delay,

AVG(arrival_delay) OVER (time_window) AS prediction,

4*STDDEV(arrival_delay) OVER (time_window) AS acceptable_deviation

FROM data

WINDOW time_window AS

(ORDER BY UNIX_SECONDS(TIMESTAMP(scheduled_arrival_time))

RANGE BETWEEN 7200 PRECEDING AND 1 PRECEDING)

)

Schließlich wenden wir den Algorithmus zur Erkennung von Anomalien auf jede Instanz an:

SELECT

*,

(ABS(arrival_delay - prediction) > acceptable_deviation) AS is_anomaly

FROM model_state

Tabelle 6-1 zeigt ein Ergebnis, wobei die Ankunftsverspätung von 54 Minuten als Anomalie markiert ist, da alle vorherigen Flüge früher angekommen sind.

Tabelle 6-1: Das Ergebnis einer BigQuery-Abfrage, die bestimmt, ob eingehende Flugdaten eine Anomalie darstellen

image

Im Gegensatz zur Apache-Beam-Lösung erlaubt uns die Effizienz von verteiltem SQL, das Zweistundenfenster zentriert über jeder Instanz zu berechnen (statt mit einer Auflösung von Zehnminutenfenstern). Nachteilig ist jedoch, dass BigQuery zu einer relativ hohen Latenz (in der Größenordnung von Sekunden) neigt und sich daher nicht für Echtzeitsteuerungsanwendungen eignet.

Sequenzmodelle

Das Muster Windowed Inference, bei dem ein gleitendes Fenster vorheriger Instanzen an eine Inferenzfunktion übergeben wird, ist nicht nur bei der Erkennung von Anomalien oder sogar bei Zeitserienmodellen nützlich. Insbesondere eignet es sich für jede Klasse von Modellen, wie zum Beispiel für Sequenzmodelle, die einen vergangenheitsbezogenen Zustand benötigen. So muss ein Übersetzungsmodell mehrere aufeinanderfolgende Wörter sehen, bevor es die Übersetzung ausführen und dabei den Kontext des Worts berücksichtigen kann. Schließlich variiert die Übersetzung der Wörter »left«, »Chicago« und »road« zwischen den Sätzen »I left Chicago by road« (Ich verließ Chicago mit dem Auto) und »Turn left on Chicago Road« (Biegen Sie links ab auf die Chicago Road).

Aus Performancegründen wird das Übersetzungsmodell so eingerichtet, dass es zustandslos ist und der Benutzer den Kontext bereitstellen muss. Ein zustandsloses Modell kann beispielsweise als Reaktion auf zunehmenden Datenverkehr automatisch skaliert und parallel aufgerufen werden, um die Übersetzung zu beschleunigen. So könnte die Übersetzung des berühmten Monologs aus Shakespeares Hamlet ins Deutsche diesen Schritten folgen, wobei das fett gedruckte Wort in der Mitte herausgegriffen und übersetzt wird:

Eingabe (9 Wörter, auf jeder Seite 4)

Ausgabe

The undiscovered country, from whose bourn No traveller returns

dessen

undiscovered country, from whose bourn No traveller returns, puzzles

Bourn

country, from whose bourn No traveller returns, puzzles the

Kein

from whose bourn No traveller returns, puzzles the will,

Reisender

Der Client benötigt daher eine Streaming-Pipeline. Die Pipeline könnte den englischen Eingabetext übernehmen, ihn tokenisieren, jeweils neun Token auf einmal weitersenden, die Ausgaben sammeln und sie zu deutschen Sätzen und Absätzen verketten.

Die meisten Sequenzmodelle, wie zum Beispiel rekurrente neuronale Netze und LSTMs, benötigen Streaming-Pipelines für eine leistungsstarke Inferenz.

Zustandsbehaftete Features

Das Muster Windowed Inference kann nützlich sein, wenn ein Eingabe-Feature für das Modell einen Zustand erfordert, obwohl das Modell selbst zustandslos ist. Nehmen wir zum Beispiel an, wir trainierten ein Modell, um Ankunftsverspätungen vorherzusagen, und eine der Eingaben in das Modell ist die Abflugverspätung. Nun könnten wir die durchschnittliche Abflugverspätung der Flüge von diesem Flughafen in den letzten zwei Stunden als Eingabe in das Modell einbeziehen wollen.

Während des Trainings können wir den Datensatz mit einer SQL-Fensterfunktion erstellen:

WITH data AS (

SELECT

SAFE.PARSE_DATETIME('%Y-%m-%d-%H%M',

CONCAT(CAST(date AS STRING), '-',

FORMAT('%04d', departure_schedule))

) AS scheduled_depart_time,

arrival_delay,

departure_delay,

departure_airport

FROM `bigquery-samples.airline_ontime_data.flights`

WHERE arrival_airport = 'DFW'

),

SELECT

* EXCEPT(scheduled_depart_time),

EXTRACT(hour from scheduled_depart_time) AS hour_of_day,

AVG(departure_delay) OVER (depart_time_window) AS avg_depart_delay

FROM data

WINDOW depart_time_window AS

(PARTITION BY departure_airport ORDER BY

UNIX_SECONDS(TIMESTAMP(scheduled_depart_time))

RANGE BETWEEN 7200 PRECEDING AND 1 PRECEDING)

Der Trainingsdatensatz enthält nun die durchschnittliche Verspätung als weiteres Feature:

image

Während der Inferenz benötigen wir jedoch eine Streaming-Pipeline, um diese durchschnittliche Abflugverspätung zu berechnen, damit wir sie an das Modell weitergeben können. Um die Schiefe zwischen Training und Serving zu begrenzen, ist es besser, dasselbe SQL in einer Fensterfunktion in einer Streaming-Pipeline zu verwenden, anstatt zu versuchen, das SQL in Scala, Python oder Java zu übersetzen.

Vorhersageanfragen im Stapel verarbeiten

Das Muster Windowed Inference bietet sich auch in einem weiteren Szenario an, selbst wenn das Modell zustandslos ist, nämlich wenn das Modell in der Cloud bereitgestellt wird, der Client aber in einem Gerät oder lokal eingebettet ist. Hier könnte die Netzwerklatenz stark anwachsen, wenn Inferenzanfragen einzeln nacheinander an ein in der Cloud bereitgestelltes Modell gesendet werden. In dieser Situation kommt das in »Entwurfsmuster 19: Zweiphasen-Vorhersagen« auf Seite 258 in Kapitel 5 beschriebene Muster infrage, bei dem die erste Phase mit einer Pipeline eine Reihe von Anfragen sammelt und die zweite Phase sie in einem Batch an den Dienst sendet.

Dies ist nur für latenztolerante Anwendungsfälle geeignet. Wenn wir Eingabeinstanzen über fünf Minuten sammeln, muss der Client eine Verzögerung bis zu fünf Minuten bei der Rückgabe der Vorhersagen tolerieren.