Creating ML alerts manually

Now that we've seen the default bucket-level alert that you get automatically by using the ML UI in Kibana, let's look at a more complex watch that was created manually to solve a more interesting use case.

In this example, there is a desire to alert when a certain ML job has an elevated anomaly score at the bucket level, but it will only notify us (invoke the action clause) if there are also anomalies in two other supporting ML jobs within a 10 minute window (looking backwards in time). The main premise here is that the first job is an analysis of some important KPI that's worthy of Alerting upon, but only if there's supporting evidence of things that may have caused the KPI to deviate, some supporting, corroborating anomalies from other datasets analyzed in other ML jobs. If this is true, then give the user an alert that has all of the information consolidated together.

The full text of the example is shown at https://github.com/PacktPublishing/Machine-Learning-with-the-Elastic-Stack/blob/master/Chapter06/custom_ML_watch.json.

The scenario used in this example is the same one that we referenced at the end of Chapter 4, IT Operational Analytics and Root Cause Analysis. In this case, we want to proactively identify correlations across jobs instead of doing visual correlation in the Anomaly Explorer.

Breaking down only the unique sections of this watch, we can immediately see a new concept, the metadata section:

    "metadata": {
"watch_timespan" : "10m", //how far back watch looks each invocation (should be > 2x bucket_span)
"lookback_window" : "10m", //how far back to look in other jobs for related anomalies
"job1_name" : "it_ops_kpi",
"job1_min_anomaly_score": 75, //minimum anomaly score (bucket score) for job1
"job2_name" : "it_ops_network",
"job2_min_record_score" : 10, //minimum record score for anomalies in job2
"job3_name" : "it_ops_sql",
"job3_min_record_score" : 5 //minimum record score for anomalies in job3
},

This technique within Watcher allows variables to be used in subsequent sections of the watch definition, thus making the prototyping and modification of things easier. The gist of what is defined here includes the names of the three jobs to be involved in this watch, with the first job (it_ops_kpi) being the anchor for the whole Alerting condition. If this first job is never anomalous, then the watch will never fire. Correlated anomalies in the other two jobs (it_ops_network and it_ops_sql) will be looked for in the 10 minutes prior to the first job's anomaly, and each of these subsequent jobs have their own minimum record score thresholds.

The next section (the watch input section) leverages a special capability called input chains in which a sequence of inputs can be linked together and executed serially, with optional dependencies added between them. In our case, the collapsed view of this is as follows:

Each input within the chain employs parameters not only from the metadata section (to select specific data using query filters) but the second and third inputs leverage information that has been gathered in the first input chain. Specifically, this is the timestamp of the anomalous bucket. This information is passed on to the range filter of the second and third inputs explicitly in the following line (here, we're only showing this for the second input):

 { "range": { "timestamp": {"gte": "{{ctx.payload.job1.hits.hits.0._source.timestamp}}||-{{ctx.metadata.lookback_window}}", "lte": "{{ctx.payload.job1.hits.hits.0._source.timestamp}}"}}},

The interpretation of this is as follows: look only in the range of the timestamp of the anomalous bucket in X, minus the lookback window. Notice the usage of job1's context payload ctx.payload.job1 variable, which will yield the anomalous bucket's timestamp. The ||- notation is used to perform the subtraction of the timestamps using date math.

Further on in the condition section of the watch, the following logic is employed:

  "condition" : {
"script" : {
"source" : "return ctx.payload.job1.hits.total > 0 && ctx.payload.job2.hits.total > 0 && ctx.payload.job3.hits.total > 0"
}
},

In other words, return true, but only if all three jobs have anomalies (the original KPI job in job1, plus the corroborating anomalies in job2 and job3). The use of logical ANDs (&&) could be modified, of course, for different logic, such as (job1 AND (job2 OR job3)) if so desired. The key point here is that you can make this behave in any way you desire.

The most difficult section of the watch to explain is the transform of the actions section. In this case, we are leveraging the full power of the scripting language of Elasticsearch to reformat and organize the pieces of information that have been gathered from all three jobs:

        "transform": {
"script": "return ['anomaly_score': ctx.payload.job1.hits.hits.0._source.anomaly_score, 'bucket_time': Instant.ofEpochMilli(ctx.payload.job1.hits.hits.0._source.timestamp).atZone(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern('yyyy-MM-dd HH:mm:ss')),'job2_anomaly_details':ctx.payload.job2.hits.hits.stream().map(p -> ['bucket_time': Instant.ofEpochMilli(ctx.payload.job2.hits.hits.0._source.timestamp).atZone(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern('yyyy-MM-dd HH:mm:ss')),'field_name':p._source.field_name,'score':p._source.record_score,'actual':p._source.actual.0,'typical':p._source.typical.0]).collect(Collectors.toList()),'job3_anomaly_details':ctx.payload.job3.hits.hits.stream().map(p -> ['bucket_time': Instant.ofEpochMilli(ctx.payload.job3.hits.hits.0._source.timestamp).atZone(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern('yyyy-MM-dd HH:mm:ss')),'hostname':p._source.hostname.0,'field_name':p._source.field_name,'score':p._source.record_score,'actual':p._source.actual.0,'typical':p._source.typical.0]).collect(Collectors.toList())]"
},

The technique that's used here is to employ Java streams to assemble these disparate pieces of information into a concise list of JSON objects that can be easily iterated through later using the Mustache syntax. Despite the complicated-looking syntax, just realize that the purpose here is to create four things:

Once this transform is run (assuming that the condition section has been met), then an example of the output of this transform could look something like this:

            "payload": {
"anomaly_score": 85.4309,
"job3_anomaly_details": [
{
"score": 6.023424,
"actual": 846.0000000000005,
"hostname": "dbserver.acme.com",
"bucket_time": "2017-02-08 15:10:00",
"typical": 12.609336298838242,
"field_name": "SQLServer_Buffer_Manager_Page_life_expectancy"
},
{
"score": 8.337633,
"actual": 96.93249340057375,
"hostname": "dbserver.acme.com",
"bucket_time": "2017-02-08 15:10:00",
"typical": 98.93088463835487,
"field_name": "SQLServer_Buffer_Manager_Buffer_cache_hit_ratio"
},
{
"score": 27.97728,
"actual": 168.15000000000006,
"hostname": "dbserver.acme.com",
"bucket_time": "2017-02-08 15:10:00",
"typical": 196.1486370757187,
"field_name": "SQLServer_General_Statistics_User_Connections"
}
],
"bucket_time": "2017-02-08 15:15:00",
"job2_anomaly_details": [
{
"score": 11.217614808972602,
"actual": 13610.62255859375,
"bucket_time": "2017-02-08 15:15:00",
"typical": 855553.8944717721,
"field_name": "In_Octets"
},
{
"score": 17.00518,
"actual": 190795357.83333334,
"bucket_time": "2017-02-08 15:15:00",
"typical": 1116062.402864764,
"field_name": "Out_Octets"
},
{
"score": 72.99199,
"actual": 137.04444376627606,
"bucket_time": "2017-02-08 15:15:00",
"typical": 0.012289061361553099,
"field_name": "Out_Discards"
}
]
}

We can see that the anomaly_score of the first job was 85.4309, which exceeds the defined threshold in the watch of 75. The time of this bucket was 2017-02-08 15:15:00, as seen in the bucket_time. The job2_anomaly_details and job3_anomaly_details arrays are filled with several anomalies that were found in their respective jobs in the 10 minutes between 15:05 and 15:15. For simplicity's sake, the actual, typical, and score values are not rounded to a reasonable number of significant figures, but that could also be done in the transform block.

The logging part of the action section simply iterates through these values using Mustache:

        "logging": {
"text": "[CRITICAL] Anomaly Alert for job {{ctx.metadata.job1_name}}: score={{ctx.payload.anomaly_score}} at {{ctx.payload.bucket_time}} UTC \nPossibly influenced by these other anomalous metrics (within the prior 10 minutes):\njob:{{ctx.metadata.job2_name}}: (anomalies with at least a record score of {{ctx.metadata.job2_min_record_score}}):\n{{#ctx.payload.job2_anomaly_details}}field={{field_name}}: score={{score}}, value={{actual}} (typical={{typical}}) at {{bucket_time}} UTC\n{{/ctx.payload.job2_anomaly_details}}\njob:{{ctx.metadata.job3_name}}: (anomalies with at least a record score of {{ctx.metadata.job3_min_record_score}}):\n{{#ctx.payload.job3_anomaly_details}}hostname={{hostname}} field={{field_name}}: score={{score}}, value={{actual}} (typical={{typical}}) at {{bucket_time}} UTC\n{{/ctx.payload.job3_anomaly_details}}"
}

This produces the following login output, given our sample output from the transform:

[CRITICAL] Anomaly Alert for job it_ops_kpi: score=85.4309 at 2017-02-08 15:15:00 UTC
Possibly influenced by these other anomalous metrics (within the prior 10 minutes):
job:it_ops_network: (anomalies with at least a record score of 10):
field=In_Octets: score=11.217614808972602, value=13610.62255859375 (typical=855553.8944717721) at 2017-02-08 15:15:00 UTC
field=Out_Octets: score=17.00518, value=1.9079535783333334E8 (typical=1116062.402864764) at 2017-02-08 15:15:00 UTC
field=Out_Discards: score=72.99199, value=137.04444376627606 (typical=0.012289061361553099) at 2017-02-08 15:15:00 UTC
job:it_ops_sql: (anomalies with at least a record score of 5):
hostname=dbserver.acme.com field=SQLServer_Buffer_Manager_Page_life_expectancy: score=6.023424,
value=846.0000000000005 (typical=12.609336298838242) at 2017-02-08 15:10:00 UTC
hostname=dbserver.acme.com field=SQLServer_Buffer_Manager_Buffer_cache_hit_ratio: score=8.337633, value=96.93249340057375 (typical=98.93088463835487) at 2017-02-08 15:10:00 UTC
hostname=dbserver.acme.com field=SQLServer_General_Statistics_User_Connections: score=27.97728, value=168.15000000000006 (typical=196.1486370757187) at 2017-02-08 15:10:00 UTC