Commit 377cd826 authored by 刘潇(23硕士)'s avatar 刘潇(23硕士)

upload

parents
# LLMDetector
This repository contains the implementation for the paper "LLMDetector: Large Language Models with Dual-View Contrastive Learning for Time Series Anomaly Detection".
## Requirements
The dependencies can be installed by:
```pip install -r requirements.txt```
## Data
The datasets can be downloaded in this [link](https://drive.google.com/drive/folders/1ehugseUxqp1o6Xn60woCFxEEvKq4AT0d?usp=sharing). And the files should be put into `datasets/`, where the original data can be located by `datasets/<dataset_name>`. All the datasets are preprocessed into '.npy' format for convenience.
## Usage
To train and evaluate LLMDetector on a dataset, run the following command:
```python main.py --anormly_ratio <anormly ratio> --alpha <alpha> --mode <mode> --dataset <data_name> --input_c <input dimension> --win_size <window size> --patch_size <patch size> --step <step>```
The detailed descriptions about the arguments are as following:
| Parameter Name | Description |
|------------------|-----------------------------------------------------------------------------|
| anomaly_ratio | Threshold for determining whether a timestamp is anomalous. |
| alpha | Weighting factor for the instance-level loss in the training objective. |
| mode | Specifies the execution mode: either `train` or `test`. |
| dataset | Name of the dataset. |
| input_c | Number of input channels (i.e., the dimensionality of the input data). |
| win_size | Length of the sliding window applied to the time series. |
| patch_size | Size of each patch segmented from the window. |
| step | Step size between consecutive windows during sliding window segmentation. |
For example, dataset WADI can be directly trained by the following command:
```python main.py --anormly_ratio 0.003 --alpha 0.1 --num_epochs 20 --batch_size 64 --mode train --dataset WADI --data_path WADI --input_c 123 --win_size 60 --patch_size 6 --step 60```
To test:
```python main.py --anormly_ratio 0.003 --alpha 0.1 --num_epochs 20 --batch_size 64 --mode test --dataset WADI --data_path WADI --input_c 123 --win_size 60 --patch_size 6 --step 60```
The results will be saved in the directory ```./result```.
For all the scripts we used, please refer to the directory ```./Scripts```. Or Just directly run the .sh file, for example:
```sh Scripts/MSL.sh```
\ No newline at end of file
python main.py --anormly_ratio 0.007 --alpha 0.4 --num_epochs 1 --batch_size 64 --mode train --dataset MSL --data_path MSL --input_c 55 --win_size 60 --patch_size 6 --step 60
python main.py --anormly_ratio 0.007 --alpha 0.4 --num_epochs 1 --batch_size 64 --mode test --dataset MSL --data_path MSL --input_c 55 --win_size 60 --patch_size 6 --step 60
python main.py --anormly_ratio 0.007 --alpha 0.8 --num_epochs 20 --batch_size 64 --mode train --dataset GECCO --data_path GECCO --input_c 9 --win_size 60 --patch_size 10 --step 60
python main.py --anormly_ratio 0.007 --alpha 0.8 --num_epochs 20 --batch_size 64 --mode test --dataset GECCO --data_path GECCO --input_c 9 --win_size 60 --patch_size $ps --step 60
python main.py --anormly_ratio 0.001 --alpha 0.7 --num_epochs 20 --batch_size 64 --mode train --dataset SWAN --data_path SWAN --input_c 38 --win_size 36 --patch_size 6 --step 36
python main.py --anormly_ratio 0.001 --alpha 0.7 --num_epochs 20 --batch_size 64 --mode test --dataset SWAN --data_path SWAN --input_c 38 --win_size 36 --patch_size 6 --step 36
python main.py --anormly_ratio 0.005 --alpha 0.7 --num_epochs 20 --batch_size 64 --mode train --dataset PSM --data_path PSM --input_c 25 --win_size 60 --patch_size 5 --step 60
python main.py --anormly_ratio 0.005 --alpha 0.7 --num_epochs 20 --batch_size 64 --mode test --dataset PSM --data_path PSM --input_c 25 --win_size 60 --patch_size 5 --step 60
python main.py --anormly_ratio 0.006 --alpha 0.7 --num_epochs 20 --batch_size 64 --mode train --dataset SMAP --data_path SMAP --input_c 25 --patch_size 6 --win_size 60 --step 60
python main.py --anormly_ratio 0.006 --alpha 0.7 --num_epochs 20 --batch_size 64 --mode test --dataset SMAP --data_path SMAP --input_c 25 --patch_size 6 --win_size 60 --step 60
python main.py --anormly_ratio 0.007 --alpha 0.6 --num_epochs 20 --batch_size 64 --mode train --dataset SMD --data_path SMD --input_c 38 --loss_fuc MSE --win_size 60 --patch_size 6 --step 60
python main.py --anormly_ratio 0.007 --alpha 0.6 --num_epochs 20 --batch_size 64 --mode test --dataset SMD --data_path SMD --input_c 38 --loss_fuc MSE --win_size 60 --patch_size 6 --step 60
python main.py --anormly_ratio 0.008 --alpha 0.5 --num_epochs 20 --batch_size 64 --mode train --dataset SWAT --data_path SWAT --input_c 51 --win_size 60 --patch_size 6 --step 60
python main.py --anormly_ratio 0.008 --alpha 0.5 --num_epochs 20 --batch_size 64 --mode test --dataset SWAT --data_path SWAT --input_c 51 --win_size 60 --patch_size 6 --step 60
python main.py --anormly_ratio 0.003 --alpha 0.1 --num_epochs 20 --batch_size 64 --mode train --dataset WADI --data_path WADI --input_c 123 --win_size 60 --patch_size 6 --step 60
python main.py --anormly_ratio 0.003 --alpha 0.1 --num_epochs 20 --batch_size 64 --mode test --dataset WADI --data_path WADI --input_c 123 --win_size 60 --patch_size 6 --step 60
This diff is collapsed.
import os
import argparse
import numpy as np
from solver import Solver
import warnings
import torch
warnings.filterwarnings('ignore')
import random
def seed_everything(seed=11):
random.seed(seed)
np.random.seed(seed)
os.environ['PYTHONHASHSEED']=str(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.enabled=False
os.environ['CUBLAS_WORKSPACE_CONFIG']=':16:8'
torch.use_deterministic_algorithms(True)
def main(config,setting):
if (not os.path.exists(config.model_save_path)):
os.makedirs(config.model_save_path)
solver = Solver(vars(config))
if config.mode == 'train':
solver.train(setting)
solver.test(setting)
elif config.mode == 'test':
solver.test1(setting)
return solver
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# Alternative
parser.add_argument('--win_size', type=int, default=100)
parser.add_argument('--patch_size', type=int, default=5)
parser.add_argument('--lr', type=float, default=1e-4)
parser.add_argument('--n_heads', type=int, default=1)
parser.add_argument('--d_model', type=int, default=256)
parser.add_argument('--rec_timeseries', action='store_true', default=True)
parser.add_argument('--use_gpu', type=bool, default=True, help='use gpu')
parser.add_argument('--gpu', type=int, default=2, help='gpu')
parser.add_argument('--use_multi_gpu', action='store_true', help='use multiple gpus', default=True)
# Default
parser.add_argument('--index', type=int, default=137)
parser.add_argument('--seed',type=int,default=2024)
parser.add_argument('--num_epochs', type=int, default=10)
parser.add_argument('--batch_size', type=int, default=64)
parser.add_argument('--step',type=int,default=1)
parser.add_argument('--input_c', type=int, default=9)
parser.add_argument('--alpha',type=float,default=1)
parser.add_argument('--patience',type=int,default=3)
parser.add_argument('--dataset', type=str, default='credit')
parser.add_argument('--mode', type=str, default='test', choices=['train', 'test','ana'])
parser.add_argument('--data_path', type=str, default='./dataset/creditcard_ts.csv')
parser.add_argument('--model_save_path', type=str, default='checkpoints')
parser.add_argument('--llm_model',type=str,default='gpt2')
parser.add_argument('--anormly_ratio', type=float, default=4.00)
config = parser.parse_args()
args = vars(config)
seed_everything(config.seed)
setting = '{}_ws{}_pa{}_dm{}_llm{}_nh{}_sp{}_alpha{}'.format(
config.dataset,
config.win_size,
config.patch_size,
config.d_model,
config.llm_model,
config.n_heads,
config.step,
config.alpha
)
print(setting)
main(config,setting)
# used by paper: TSB-UAD as the main evaluator
# github: https://github.com/johnpaparrizos/TSB-UAD/blob/main/TSB_AD/utils/metrics.py
import numpy as np
from sklearn import metrics
from metrics.evaluate_utils import find_length,range_convers_new
def extend_postive_range(x, window=16):
label = x.copy().astype(float)
# print(label)
L = range_convers_new(label) # index of non-zero segments
# print(L)
length = len(label)
for k in range(len(L)):
s = L[k][0]
e = L[k][1]
# x1 is the extended list like [1,2,3] which are non-zero(from the end-e)
x1 = np.arange(e, min(e + window // 2, length))
label[x1] += np.sqrt(1 - (x1 - e) / (window))
# before the start-s
x2 = np.arange(max(s - window // 2, 0), s)
label[x2] += np.sqrt(1 - (s - x2) / (window))
label = np.minimum(np.ones(length), label)
return label
def extend_postive_range_individual(x, percentage=0.2):
label = x.copy().astype(float)
L = range_convers_new(label) # index of non-zero segments
length = len(label)
for k in range(len(L)):
s = L[k][0]
e = L[k][1]
l0 = int((e - s + 1) * percentage)
x1 = np.arange(e, min(e + l0, length))
label[x1] += np.sqrt(1 - (x1 - e) / (2 * l0))
x2 = np.arange(max(s - l0, 0), s)
label[x2] += np.sqrt(1 - (s - x2) / (2 * l0))
label = np.minimum(np.ones(length), label)
return label
def TPR_FPR_RangeAUC(labels, pred, P, L):
product = labels * pred
TP = np.sum(product)
# recall = min(TP/P,1)
P_new = (P + np.sum(labels)) / 2 # so TPR is neither large nor small
# P_new = np.sum(labels)
recall = min(TP / P_new, 1)
# recall = TP/np.sum(labels)
# print('recall '+str(recall))
existence = 0
for seg in L:
if np.sum(product[seg[0]:(seg[1] + 1)]) > 0:
existence += 1
existence_ratio = existence / len(L)
# print(existence_ratio)
# TPR_RangeAUC = np.sqrt(recall*existence_ratio)
# print(existence_ratio)
TPR_RangeAUC = recall * existence_ratio
FP = np.sum(pred) - TP
# TN = np.sum((1-pred) * (1-labels))
# FPR_RangeAUC = FP/(FP+TN)
N_new = len(labels) - P_new
FPR_RangeAUC = FP / N_new
Precision_RangeAUC = TP / np.sum(pred)
return TPR_RangeAUC, FPR_RangeAUC, Precision_RangeAUC
def Range_AUC(score_t_test, y_test, window=5, percentage=0, plot_ROC=False, AUC_type='window'):
# AUC_type='window'/'percentage'
score = score_t_test
labels = y_test
score_sorted = -np.sort(-score)
P = np.sum(labels)
# print(np.sum(labels))
if AUC_type == 'window':
labels = extend_postive_range(labels, window=window)
else:
labels = extend_postive_range_individual(labels, percentage=percentage)
# print(np.sum(labels))
L = range_convers_new(labels)
TPR_list = [0]
FPR_list = [0]
Precision_list = [1]
for i in np.linspace(0, len(score) - 1, 250).astype(int):
threshold = score_sorted[i]
# print('thre='+str(threshold))
pred = score >= threshold
TPR, FPR, Precision = TPR_FPR_RangeAUC(labels, pred, P, L)
TPR_list.append(TPR)
FPR_list.append(FPR)
Precision_list.append(Precision)
TPR_list.append(1)
FPR_list.append(1) # otherwise, range-AUC will stop earlier than (1,1)
tpr = np.array(TPR_list)
fpr = np.array(FPR_list)
prec = np.array(Precision_list)
width = fpr[1:] - fpr[:-1]
height = (tpr[1:] + tpr[:-1]) / 2
AUC_range = np.sum(width * height)
width_PR = tpr[1:-1] - tpr[:-2]
height_PR = (prec[1:] + prec[:-1]) / 2
AP_range = np.sum(width_PR * height_PR)
if plot_ROC:
return AUC_range, AP_range, fpr, tpr, prec
return AUC_range
def point_wise_AUC(score_t_test, y_test, plot_ROC=False):
# area under curve
label = y_test
score = score_t_test
auc = metrics.roc_auc_score(label, score)
# plor ROC curve
if plot_ROC:
fpr, tpr, thresholds = metrics.roc_curve(label, score)
# display = metrics.RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=auc)
# display.plot()
return auc, fpr, tpr
else:
return auc
def main():
y_test = np.zeros(100)
y_test[10:20] = 1
y_test[50:60] = 1
pred_labels = np.zeros(100)
pred_labels[15:17] = 0.5
pred_labels[55:62] = 0.7
# pred_labels[51:55] = 1
# true_events = get_events(y_test)
point_auc = point_wise_AUC(pred_labels, y_test)
range_auc = Range_AUC(pred_labels, y_test)
print("point_auc: {}, range_auc: {}".format(point_auc, range_auc))
if __name__ == "__main__":
main()
\ No newline at end of file
from sklearn.metrics import confusion_matrix
import numpy as np
def MCC(y_test, pred_labels):
tn, fp, fn, tp = confusion_matrix(y_test, pred_labels).ravel()
MCC_score = (tp*tn-fp*fn)/(((tp+fp)*(tp+fn)*(tn+fp)*(tn+fn))**0.5)
return MCC_score
def main():
y_test = np.zeros(100)
y_test[10:20] = 1
y_test[50:60] = 1
pred_labels = np.zeros(100)
pred_labels[15:17] = 1
pred_labels[55:62] = 1
# pred_labels[51:55] = 1
# true_events = get_events(y_test)
confusion_matric = MCC(y_test, pred_labels)
# print(confusion_matric)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from metrics.affiliation._integral_interval import interval_intersection
def t_start(j, Js = [(1,2),(3,4),(5,6)], Trange = (1,10)):
"""
Helper for `E_gt_func`
:param j: index from 0 to len(Js) (included) on which to get the start
:param Js: ground truth events, as a list of couples
:param Trange: range of the series where Js is included
:return: generalized start such that the middle of t_start and t_stop
always gives the affiliation zone
"""
b = max(Trange)
n = len(Js)
if j == n:
return(2*b - t_stop(n-1, Js, Trange))
else:
return(Js[j][0])
def t_stop(j, Js = [(1,2),(3,4),(5,6)], Trange = (1,10)):
"""
Helper for `E_gt_func`
:param j: index from 0 to len(Js) (included) on which to get the stop
:param Js: ground truth events, as a list of couples
:param Trange: range of the series where Js is included
:return: generalized stop such that the middle of t_start and t_stop
always gives the affiliation zone
"""
if j == -1:
a = min(Trange)
return(2*a - t_start(0, Js, Trange))
else:
return(Js[j][1])
def E_gt_func(j, Js, Trange):
"""
Get the affiliation zone of element j of the ground truth
:param j: index from 0 to len(Js) (excluded) on which to get the zone
:param Js: ground truth events, as a list of couples
:param Trange: range of the series where Js is included, can
be (-math.inf, math.inf) for distance measures
:return: affiliation zone of element j of the ground truth represented
as a couple
"""
range_left = (t_stop(j-1, Js, Trange) + t_start(j, Js, Trange))/2
range_right = (t_stop(j, Js, Trange) + t_start(j+1, Js, Trange))/2
return((range_left, range_right))
def get_all_E_gt_func(Js, Trange):
"""
Get the affiliation partition from the ground truth point of view
:param Js: ground truth events, as a list of couples
:param Trange: range of the series where Js is included, can
be (-math.inf, math.inf) for distance measures
:return: affiliation partition of the events
"""
# E_gt is the limit of affiliation/attraction for each ground truth event
E_gt = [E_gt_func(j, Js, Trange) for j in range(len(Js))]
return(E_gt)
def affiliation_partition(Is = [(1,1.5),(2,5),(5,6),(8,9)], E_gt = [(1,2.5),(2.5,4.5),(4.5,10)]):
"""
Cut the events into the affiliation zones
The presentation given here is from the ground truth point of view,
but it is also used in the reversed direction in the main function.
:param Is: events as a list of couples
:param E_gt: range of the affiliation zones
:return: a list of list of intervals (each interval represented by either
a couple or None for empty interval). The outer list is indexed by each
affiliation zone of `E_gt`. The inner list is indexed by the events of `Is`.
"""
out = [None] * len(E_gt)
for j in range(len(E_gt)):
E_gt_j = E_gt[j]
discarded_idx_before = [I[1] < E_gt_j[0] for I in Is] # end point of predicted I is before the begin of E
discarded_idx_after = [I[0] > E_gt_j[1] for I in Is] # start of predicted I is after the end of E
kept_index = [not(a or b) for a, b in zip(discarded_idx_before, discarded_idx_after)]
Is_j = [x for x, y in zip(Is, kept_index)]
out[j] = [interval_intersection(I, E_gt[j]) for I in Is_j]
return(out)
This diff is collapsed.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import math
from metrics.affiliation._affiliation_zone import (
get_all_E_gt_func,
affiliation_partition)
from metrics.affiliation._integral_interval import (
integral_interval_distance,
integral_interval_probaCDF_precision,
integral_interval_probaCDF_recall,
interval_length,
sum_interval_lengths)
def affiliation_precision_distance(Is = [(1,2),(3,4),(5,6)], J = (2,5.5)):
"""
Compute the individual average distance from Is to a single ground truth J
:param Is: list of predicted events within the affiliation zone of J
:param J: couple representating the start and stop of a ground truth interval
:return: individual average precision directed distance number
"""
if all([I is None for I in Is]): # no prediction in the current area
return(math.nan) # undefined
return(sum([integral_interval_distance(I, J) for I in Is]) / sum_interval_lengths(Is))
def affiliation_precision_proba(Is = [(1,2),(3,4),(5,6)], J = (2,5.5), E = (0,8)):
"""
Compute the individual precision probability from Is to a single ground truth J
:param Is: list of predicted events within the affiliation zone of J
:param J: couple representating the start and stop of a ground truth interval
:param E: couple representing the start and stop of the zone of affiliation of J
:return: individual precision probability in [0, 1], or math.nan if undefined
"""
if all([I is None for I in Is]): # no prediction in the current area
return(math.nan) # undefined
return(sum([integral_interval_probaCDF_precision(I, J, E) for I in Is]) / sum_interval_lengths(Is))
def affiliation_recall_distance(Is = [(1,2),(3,4),(5,6)], J = (2,5.5)):
"""
Compute the individual average distance from a single J to the predictions Is
:param Is: list of predicted events within the affiliation zone of J
:param J: couple representating the start and stop of a ground truth interval
:return: individual average recall directed distance number
"""
Is = [I for I in Is if I is not None] # filter possible None in Is
if len(Is) == 0: # there is no prediction in the current area
return(math.inf)
E_gt_recall = get_all_E_gt_func(Is, (-math.inf, math.inf)) # here from the point of view of the predictions
Js = affiliation_partition([J], E_gt_recall) # partition of J depending of proximity with Is
return(sum([integral_interval_distance(J[0], I) for I, J in zip(Is, Js)]) / interval_length(J))
def affiliation_recall_proba(Is = [(1,2),(3,4),(5,6)], J = (2,5.5), E = (0,8)):
"""
Compute the individual recall probability from a single ground truth J to Is
:param Is: list of predicted events within the affiliation zone of J
:param J: couple representating the start and stop of a ground truth interval
:param E: couple representing the start and stop of the zone of affiliation of J
:return: individual recall probability in [0, 1]
"""
Is = [I for I in Is if I is not None] # filter possible None in Is
if len(Is) == 0: # there is no prediction in the current area
return(0)
E_gt_recall = get_all_E_gt_func(Is, E) # here from the point of view of the predictions
Js = affiliation_partition([J], E_gt_recall) # partition of J depending of proximity with Is
return(sum([integral_interval_probaCDF_recall(I, J[0], E) for I, J in zip(Is, Js)]) / interval_length(J))
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from itertools import groupby
from operator import itemgetter
import math
import gzip
import glob
import os
def convert_vector_to_events(vector = [0, 1, 1, 0, 0, 1, 0]):
"""
Convert a binary vector (indicating 1 for the anomalous instances)
to a list of events. The events are considered as durations,
i.e. setting 1 at index i corresponds to an anomalous interval [i, i+1).
:param vector: a list of elements belonging to {0, 1}
:return: a list of couples, each couple representing the start and stop of
each event
"""
positive_indexes = [idx for idx, val in enumerate(vector) if val > 0]
events = []
for k, g in groupby(enumerate(positive_indexes), lambda ix : ix[0] - ix[1]):
cur_cut = list(map(itemgetter(1), g))
events.append((cur_cut[0], cur_cut[-1]))
# Consistent conversion in case of range anomalies (for indexes):
# A positive index i is considered as the interval [i, i+1),
# so the last index should be moved by 1
events = [(x, y+1) for (x,y) in events]
return(events)
def infer_Trange(events_pred, events_gt):
"""
Given the list of events events_pred and events_gt, get the
smallest possible Trange corresponding to the start and stop indexes
of the whole series.
Trange will not influence the measure of distances, but will impact the
measures of probabilities.
:param events_pred: a list of couples corresponding to predicted events
:param events_gt: a list of couples corresponding to ground truth events
:return: a couple corresponding to the smallest range containing the events
"""
if len(events_gt) == 0:
raise ValueError('The gt events should contain at least one event')
if len(events_pred) == 0:
# empty prediction, base Trange only on events_gt (which is non empty)
return(infer_Trange(events_gt, events_gt))
min_pred = min([x[0] for x in events_pred])
min_gt = min([x[0] for x in events_gt])
max_pred = max([x[1] for x in events_pred])
max_gt = max([x[1] for x in events_gt])
Trange = (min(min_pred, min_gt), max(max_pred, max_gt))
return(Trange)
def has_point_anomalies(events):
"""
Checking whether events contain point anomalies, i.e.
events starting and stopping at the same time.
:param events: a list of couples corresponding to predicted events
:return: True is the events have any point anomalies, False otherwise
"""
if len(events) == 0:
return(False)
return(min([x[1] - x[0] for x in events]) == 0)
def _sum_wo_nan(vec):
"""
Sum of elements, ignoring math.isnan ones
:param vec: vector of floating numbers
:return: sum of the elements, ignoring math.isnan ones
"""
vec_wo_nan = [e for e in vec if not math.isnan(e)]
return(sum(vec_wo_nan))
def _len_wo_nan(vec):
"""
Count of elements, ignoring math.isnan ones
:param vec: vector of floating numbers
:return: count of the elements, ignoring math.isnan ones
"""
vec_wo_nan = [e for e in vec if not math.isnan(e)]
return(len(vec_wo_nan))
def read_gz_data(filename = 'data/machinetemp_groundtruth.gz'):
"""
Load a file compressed with gz, such that each line of the
file is either 0 (representing a normal instance) or 1 (representing)
an anomalous instance.
:param filename: file path to the gz compressed file
:return: list of integers with either 0 or 1
"""
with gzip.open(filename, 'rb') as f:
content = f.read().splitlines()
content = [int(x) for x in content]
return(content)
def read_all_as_events():
"""
Load the files contained in the folder `data/` and convert
to events. The length of the series is kept.
The convention for the file name is: `dataset_algorithm.gz`
:return: two dictionaries:
- the first containing the list of events for each dataset and algorithm,
- the second containing the range of the series for each dataset
"""
filepaths = glob.glob('data/*.gz')
datasets = dict()
Tranges = dict()
for filepath in filepaths:
vector = read_gz_data(filepath)
events = convert_vector_to_events(vector)
# ad hoc cut for those files
cut_filepath = (os.path.split(filepath)[1]).split('_')
data_name = cut_filepath[0]
algo_name = (cut_filepath[1]).split('.')[0]
if not data_name in datasets:
datasets[data_name] = dict()
Tranges[data_name] = (0, len(vector))
datasets[data_name][algo_name] = events
return(datasets, Tranges)
def f1_func(p, r):
"""
Compute the f1 function
:param p: precision numeric value
:param r: recall numeric value
:return: f1 numeric value
"""
return(2*p*r/(p+r))
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from metrics.affiliation.generics import (
infer_Trange,
has_point_anomalies,
_len_wo_nan,
_sum_wo_nan,
read_all_as_events)
from metrics.affiliation._affiliation_zone import (
get_all_E_gt_func,
affiliation_partition)
from metrics.affiliation._single_ground_truth_event import (
affiliation_precision_distance,
affiliation_recall_distance,
affiliation_precision_proba,
affiliation_recall_proba)
def test_events(events):
"""
Verify the validity of the input events
:param events: list of events, each represented by a couple (start, stop)
:return: None. Raise an error for incorrect formed or non ordered events
"""
if type(events) is not list:
raise TypeError('Input `events` should be a list of couples')
if not all([type(x) is tuple for x in events]):
raise TypeError('Input `events` should be a list of tuples')
if not all([len(x) == 2 for x in events]):
raise ValueError('Input `events` should be a list of couples (start, stop)')
if not all([x[0] <= x[1] for x in events]):
raise ValueError('Input `events` should be a list of couples (start, stop) with start <= stop')
if not all([events[i][1] < events[i+1][0] for i in range(len(events) - 1)]):
raise ValueError('Couples of input `events` should be disjoint and ordered')
def pr_from_events(events_pred, events_gt, Trange):
"""
Compute the affiliation metrics including the precision/recall in [0,1],
along with the individual precision/recall distances and probabilities
:param events_pred: list of predicted events, each represented by a couple
indicating the start and the stop of the event
:param events_gt: list of ground truth events, each represented by a couple
indicating the start and the stop of the event
:param Trange: range of the series where events_pred and events_gt are included,
represented as a couple (start, stop)
:return: dictionary with precision, recall, and the individual metrics
"""
# testing the inputs
test_events(events_pred)
test_events(events_gt)
# other tests
minimal_Trange = infer_Trange(events_pred, events_gt)
if not Trange[0] <= minimal_Trange[0]:
raise ValueError('`Trange` should include all the events')
if not minimal_Trange[1] <= Trange[1]:
raise ValueError('`Trange` should include all the events')
if len(events_gt) == 0:
raise ValueError('Input `events_gt` should have at least one event')
if has_point_anomalies(events_pred) or has_point_anomalies(events_gt):
raise ValueError('Cannot manage point anomalies currently')
if Trange is None:
# Set as default, but Trange should be indicated if probabilities are used
raise ValueError('Trange should be indicated (or inferred with the `infer_Trange` function')
E_gt = get_all_E_gt_func(events_gt, Trange)
aff_partition = affiliation_partition(events_pred, E_gt)
# Computing precision distance
d_precision = [affiliation_precision_distance(Is, J) for Is, J in zip(aff_partition, events_gt)]
# Computing recall distance
d_recall = [affiliation_recall_distance(Is, J) for Is, J in zip(aff_partition, events_gt)]
# Computing precision
p_precision = [affiliation_precision_proba(Is, J, E) for Is, J, E in zip(aff_partition, events_gt, E_gt)]
# Computing recall
p_recall = [affiliation_recall_proba(Is, J, E) for Is, J, E in zip(aff_partition, events_gt, E_gt)]
if _len_wo_nan(p_precision) > 0:
p_precision_average = _sum_wo_nan(p_precision) / _len_wo_nan(p_precision)
else:
p_precision_average = p_precision[0] # math.nan
p_recall_average = sum(p_recall) / len(p_recall)
dict_out = dict({'precision': p_precision_average,
'recall': p_recall_average,
'individual_precision_probabilities': p_precision,
'individual_recall_probabilities': p_recall,
'individual_precision_distances': d_precision,
'individual_recall_distances': d_recall})
return(dict_out)
def produce_all_results():
"""
Produce the affiliation precision/recall for all files
contained in the `data` repository
:return: a dictionary indexed by data names, each containing a dictionary
indexed by algorithm names, each containing the results of the affiliation
metrics (precision, recall, individual probabilities and distances)
"""
datasets, Tranges = read_all_as_events() # read all the events in folder `data`
results = dict()
for data_name in datasets.keys():
results_data = dict()
for algo_name in datasets[data_name].keys():
if algo_name != 'groundtruth':
results_data[algo_name] = pr_from_events(datasets[data_name][algo_name],
datasets[data_name]['groundtruth'],
Tranges[data_name])
results[data_name] = results_data
return(results)
from f1_score_f1_pa import *
from fc_score import *
from precision_at_k import *
from customizable_f1_score import *
from AUC import *
from Matthews_correlation_coefficient import *
from affiliation.generics import convert_vector_to_events
from affiliation.metrics import pr_from_events
from vus.models.feature import Window
from vus.metrics import get_range_vus_roc
def combine_all_evaluation_scores(y_test, pred_labels, anomaly_scores):
events_pred = convert_vector_to_events(y_test) # [(4, 5), (8, 9)]
events_gt = convert_vector_to_events(pred_labels) # [(3, 4), (7, 10)]
Trange = (0, len(y_test))
affiliation = pr_from_events(events_pred, events_gt, Trange)
true_events = get_events(y_test)
_, _, _, f1_score_ori, f05_score_ori = get_accuracy_precision_recall_fscore(y_test, pred_labels)
f1_score_pa = get_point_adjust_scores(y_test, pred_labels, true_events)[5]
pa_accuracy, pa_precision, pa_recall, pa_f_score = get_adjust_F1PA(y_test, pred_labels)
range_f_score = customizable_f1_score(y_test, pred_labels)
_, _, f1_score_c = get_composite_fscore_raw(y_test, pred_labels, true_events, return_prec_rec=True)
precision_k = precision_at_k(y_test, anomaly_scores, pred_labels)
point_auc = point_wise_AUC(pred_labels, y_test)
range_auc = Range_AUC(pred_labels, y_test)
MCC_score = MCC(y_test, pred_labels)
results = get_range_vus_roc(y_test, pred_labels, 100) # slidingWindow = 100 default
score_list = {"f1_score_ori": f1_score_ori,
"f05_score_ori" : f05_score_ori,
"f1_score_pa": f1_score_pa,
"pa_accuracy":pa_accuracy,
"pa_precision":pa_precision,
"pa_recall":pa_recall,
"pa_f_score":pa_f_score,
"range_f_score": range_f_score,
"f1_score_c": f1_score_c,
"precision_k": precision_k,
"point_auc": point_auc,
"range_auc": range_auc,
"MCC_score":MCC_score,
"Affiliation precision": affiliation['precision'],
"Affiliation recall": affiliation['recall'],
"R_AUC_ROC": results["R_AUC_ROC"],
"R_AUC_PR": results["R_AUC_PR"],
"VUS_ROC": results["VUS_ROC"],
"VUS_PR": results["VUS_PR"]}
return score_list
def main():
y_test = np.zeros(100)
y_test[10:20] = 1
y_test[50:60] = 1
pred_labels = np.zeros(100)
pred_labels[15:17] = 1
pred_labels[55:62] = 1
anomaly_scores = np.zeros(100)
anomaly_scores[15:17] = 0.7
anomaly_scores[55:62] = 0.6
pred_labels[51:55] = 1
true_events = get_events(y_test)
scores = combine_all_evaluation_scores(y_test, pred_labels, anomaly_scores)
# scores = test(y_test, pred_labels)
for key,value in scores.items():
print(key,' : ',value)
if __name__ == "__main__":
main()
\ No newline at end of file
# used by paper: Exathlon: A Benchmark for Explainable Anomaly Detection over Time Series_VLDB 2021
# github: https://github.com/exathlonbenchmark/exathlon
import numpy as np
from metrics.evaluate_utils import range_convers_new
# the existence reward on the bias
def b(bias, i, length):
if bias == 'flat':
return 1
elif bias == 'front-end bias':
return length - i + 1
elif bias == 'back-end bias':
return i
else:
if i <= length / 2:
return i
else:
return length - i + 1
def w(AnomalyRange, p):
MyValue = 0
MaxValue = 0
start = AnomalyRange[0]
AnomalyLength = AnomalyRange[1] - AnomalyRange[0] + 1
# flat/'front-end bias'/'back-end bias'
bias = 'flat'
for i in range(start, start + AnomalyLength):
bi = b(bias, i, AnomalyLength)
MaxValue += bi
if i in p:
MyValue += bi
return MyValue / MaxValue
def Cardinality_factor(Anomolyrange, Prange):
score = 0
start = Anomolyrange[0]
end = Anomolyrange[1]
for i in Prange:
if start <= i[0] <= end:
score += 1
elif i[0] <= start <= i[1]:
score += 1
elif i[0] <= end <= i[1]:
score += 1
elif start >= i[0] and end <= i[1]:
score += 1
if score == 0:
return 0
else:
return 1 / score
def existence_reward(labels, preds):
'''
labels: list of ordered pair
preds predicted data
'''
score = 0
for i in labels:
if np.sum(np.multiply(preds <= i[1], preds >= i[0])) > 0:
score += 1
return score
def range_recall_new(labels, preds, alpha):
p = np.where(preds == 1)[0] # positions of predicted label==1
range_pred = range_convers_new(preds)
range_label = range_convers_new(labels)
Nr = len(range_label) # total # of real anomaly segments
ExistenceReward = existence_reward(range_label, p)
OverlapReward = 0
for i in range_label:
OverlapReward += w(i, p) * Cardinality_factor(i, range_pred)
score = alpha * ExistenceReward + (1 - alpha) * OverlapReward
if Nr != 0:
return score / Nr, ExistenceReward / Nr, OverlapReward / Nr
else:
return 0, 0, 0
def customizable_f1_score(y_test, pred_labels, alpha=0.2):
label = y_test
preds = pred_labels
Rrecall, ExistenceReward, OverlapReward = range_recall_new(label, preds, alpha)
Rprecision = range_recall_new(preds, label, 0)[0]
if Rprecision + Rrecall == 0:
Rf = 0
else:
Rf = 2 * Rrecall * Rprecision / (Rprecision + Rrecall)
return Rf
def main():
y_test = np.zeros(100)
y_test[10:20] = 1
y_test[50:60] = 1
pred_labels = np.zeros(100)
pred_labels[15:19] = 1
pred_labels[55:62] = 1
# pred_labels[51:55] = 1
# true_events = get_events(y_test)
Rf = customizable_f1_score(y_test, pred_labels)
print("Rf: {}".format(Rf))
if __name__ == "__main__":
main()
\ No newline at end of file
import numpy as np
from statsmodels.tsa.stattools import acf
from scipy.signal import argrelextrema
def get_composite_fscore_from_scores(score_t_test, thres, true_events, prec_t, return_prec_rec=False):
pred_labels = score_t_test > thres
tp = np.sum([pred_labels[start:end + 1].any() for start, end in true_events.values()])
fn = len(true_events) - tp
rec_e = tp / (tp + fn)
fscore_c = 2 * rec_e * prec_t / (rec_e + prec_t)
if prec_t == 0 and rec_e == 0:
fscore_c = 0
if return_prec_rec:
return prec_t, rec_e, fscore_c
return fscore_c
class NptConfig:
def __init__(self, config_dict):
for k, v in config_dict.items():
setattr(self, k, v)
def find_length(data):
if len(data.shape) > 1:
return 0
data = data[:min(20000, len(data))]
base = 3
auto_corr = acf(data, nlags=400, fft=True)[base:]
local_max = argrelextrema(auto_corr, np.greater)[0]
try:
max_local_max = np.argmax([auto_corr[lcm] for lcm in local_max])
if local_max[max_local_max] < 3 or local_max[max_local_max] > 300:
return 125
return local_max[max_local_max] + base
except:
return 125
def range_convers_new(label):
'''
input: arrays of binary values
output: list of ordered pair [[a0,b0], [a1,b1]... ] of the inputs
'''
L = []
i = 0
j = 0
while j < len(label):
while label[i] == 0:
i += 1
if i >= len(label):
break
j = i + 1
if j >= len(label):
if j == len(label):
L.append((i, j - 1))
break
while label[j] != 0:
j += 1
if j >= len(label):
L.append((i, j - 1))
break
if j >= len(label):
break
L.append((i, j - 1))
i = j
return L
\ No newline at end of file
This diff is collapsed.
import numpy as np
from sklearn.metrics import precision_recall_curve, roc_curve, auc, roc_auc_score, precision_score, recall_score, \
accuracy_score, fbeta_score, average_precision_score
# function: calculate the point-adjust f-scores(whether top k)
def get_point_adjust_scores(y_test, pred_labels, true_events, thereshold_k=0, whether_top_k=False):
tp = 0
fn = 0
for true_event in true_events.keys():
true_start, true_end = true_events[true_event]
if whether_top_k is False:
if pred_labels[true_start:true_end].sum() > 0:
tp += (true_end - true_start)
else:
fn += (true_end - true_start)
else:
if pred_labels[true_start:true_end].sum() > thereshold_k:
tp += (true_end - true_start)
else:
fn += (true_end - true_start)
fp = np.sum(pred_labels) - np.sum(pred_labels * y_test)
prec, rec, fscore = get_prec_rec_fscore(tp, fp, fn)
return fp, fn, tp, prec, rec, fscore
def get_adjust_F1PA(pred, gt):
anomaly_state = False
for i in range(len(gt)):
if gt[i] == 1 and pred[i] == 1 and not anomaly_state:
anomaly_state = True
for j in range(i, 0, -1):
if gt[j] == 0:
break
else:
if pred[j] == 0:
pred[j] = 1
for j in range(i, len(gt)):
if gt[j] == 0:
break
else:
if pred[j] == 0:
pred[j] = 1
elif gt[i] == 0:
anomaly_state = False
if anomaly_state:
pred[i] = 1
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import accuracy_score
accuracy = accuracy_score(gt, pred)
precision, recall, f_score, support = precision_recall_fscore_support(gt, pred,
average='binary')
return accuracy, precision, recall, f_score
# calculate the point-adjusted f-score
def get_prec_rec_fscore(tp, fp, fn):
if tp == 0:
precision = 0
recall = 0
else:
precision = tp / (tp + fp)
recall = tp / (tp + fn)
fscore = get_f_score(precision, recall)
return precision, recall, fscore
def get_f_score(prec, rec):
if prec == 0 and rec == 0:
f_score = 0
else:
f_score = 2 * (prec * rec) / (prec + rec)
return f_score
# function: calculate the normal edition f-scores
def get_accuracy_precision_recall_fscore(y_true: list, y_pred: list):
accuracy = accuracy_score(y_true, y_pred)
# warn_for=() avoids log warnings for any result being zero
# precision, recall, f_score, _ = prf(y_true, y_pred, average='binary', warn_for=())
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f_score = (2 * precision * recall) / (precision + recall)
if precision == 0 and recall == 0:
f05_score = 0
else:
f05_score = fbeta_score(y_true, y_pred, average='binary', beta=0.5)
return accuracy, precision, recall, f_score, f05_score
from fc_score import *
from f1_score_f1_pa import *
from evaluate_utils import *
default_thres_config = {"top_k_time": {},
"best_f1_test": {"exact_pt_adj": True},
"thresholded_score": {},
"tail_prob": {"tail_prob": 2},
"tail_prob_1": {"tail_prob": 1},
"tail_prob_2": {"tail_prob": 2},
"tail_prob_3": {"tail_prob": 3},
"tail_prob_4": {"tail_prob": 4},
"tail_prob_5": {"tail_prob": 5},
"dyn_gauss": {"long_window": 10000, "short_window": 1, "kernel_sigma": 10},
"nasa_npt": {"batch_size": 70, "window_size": 30, "telem_only": True,
"smoothing_perc": 0.005, "l_s": 250, "error_buffer": 5, "p": 0.05}}
def threshold_and_predict(score_t_test, y_test, true_events, logger, test_anom_frac, thres_method="top_k_time",
point_adjust=False, score_t_train=None, thres_config_dict=dict(), return_auc=False,
composite_best_f1=False):
if thres_method in thres_config_dict.keys():
config = thres_config_dict[thres_method]
else:
config = default_thres_config[thres_method]
# test_anom_frac = (np.sum(y_test)) / len(y_test)
auroc = None
avg_prec = None
if thres_method == "thresholded_score":
opt_thres = 0.5
if set(score_t_test) - {0, 1}:
logger.error("Score_t_test isn't binary. Predicting all as non-anomalous")
pred_labels = np.zeros(len(score_t_test))
else:
pred_labels = score_t_test
elif thres_method == "best_f1_test" and point_adjust:
prec, rec, thresholds = precision_recall_curve(y_test, score_t_test, pos_label=1)
if not config["exact_pt_adj"]:
fscore_best_time = [get_f_score(precision, recall) for precision, recall in zip(prec, rec)]
opt_num = np.squeeze(np.argmax(fscore_best_time))
opt_thres = thresholds[opt_num]
thresholds = np.random.choice(thresholds, size=5000) + [opt_thres]
fscores = []
for thres in thresholds:
_, _, _, _, _, fscore = get_point_adjust_scores(y_test, score_t_test > thres, true_events)
fscores.append(fscore)
opt_thres = thresholds[np.argmax(fscores)]
pred_labels = score_t_test > opt_thres
elif thres_method == "best_f1_test" and composite_best_f1:
prec, rec, thresholds = precision_recall_curve(y_test, score_t_test, pos_label=1)
precs_t = prec
fscores_c = [get_composite_fscore_from_scores(score_t_test, thres, true_events, prec_t) for thres, prec_t in
zip(thresholds, precs_t)]
try:
opt_thres = thresholds[np.nanargmax(fscores_c)]
except:
opt_thres = 0.0
pred_labels = score_t_test > opt_thres
elif thres_method == "top_k_time":
opt_thres = np.nanpercentile(score_t_test, 100 * (1 - test_anom_frac), interpolation='higher')
pred_labels = np.where(score_t_test > opt_thres, 1, 0)
elif thres_method == "best_f1_test":
prec, rec, thres = precision_recall_curve(y_test, score_t_test, pos_label=1)
fscore = [get_f_score(precision, recall) for precision, recall in zip(prec, rec)]
opt_num = np.squeeze(np.argmax(fscore))
opt_thres = thres[opt_num]
pred_labels = np.where(score_t_test > opt_thres, 1, 0)
elif "tail_prob" in thres_method:
tail_neg_log_prob = config["tail_prob"]
opt_thres = tail_neg_log_prob
pred_labels = np.where(score_t_test > opt_thres, 1, 0)
elif thres_method == "nasa_npt":
opt_thres = 0.5
pred_labels = get_npt_labels(score_t_test, y_test, config)
else:
logger.error("Thresholding method {} not in [top_k_time, best_f1_test, tail_prob]".format(thres_method))
return None, None
if return_auc:
avg_prec = average_precision_score(y_test, score_t_test)
auroc = roc_auc_score(y_test, score_t_test)
return opt_thres, pred_labels, avg_prec, auroc
return opt_thres, pred_labels
# most-top funcion
def evaluate_predicted_labels(pred_labels, y_test, true_events, logger, eval_method="time-wise", breaks=[],
point_adjust=False):
"""
Computes evaluation metrics for the binary classifications given the true and predicted labels
:param point_adjust: used to judge whether is pa
:param pred_labels: array of predicted labels
:param y_test: array of true labels
:param eval_method: string that indicates whether we evaluate the classification time point-wise or event-wise
:param breaks: array of discontinuities in the time series, relevant only if you look at event-wise
:param return_raw: Boolean that indicates whether we want to return tp, fp and fn or prec, recall and f1
:return: tuple of evaluation metrics
"""
if eval_method == "time-wise":
# point-adjust fscore
if point_adjust:
fp, fn, tp, prec, rec, fscore = get_point_adjust_scores(y_test, pred_labels, true_events)
# normal fscore
else:
_, prec, rec, fscore, _ = get_accuracy_precision_recall_fscore(y_test, pred_labels)
tp = np.sum(pred_labels * y_test)
fp = np.sum(pred_labels) - tp
fn = np.sum(y_test) - tp
# event-wise
else:
logger.error("Evaluation method {} not in [time-wise, event-wise]".format(eval_method))
return 0, 0, 0
return tp, fp, fn, prec, rec, fscore
import numpy as np
from sklearn.metrics import precision_score
def get_events(y_test, outlier=1, normal=0):
events = dict()
label_prev = normal
event = 0 # corresponds to no event
event_start = 0
for tim, label in enumerate(y_test):
if label == outlier:
if label_prev == normal:
event += 1
event_start = tim
else:
if label_prev == outlier:
event_end = tim - 1
events[event] = (event_start, event_end)
label_prev = label
if label_prev == outlier:
event_end = tim - 1
events[event] = (event_start, event_end)
return events
def get_composite_fscore_raw(y_test, pred_labels, true_events, return_prec_rec=False):
tp = np.sum([pred_labels[start:end + 1].any() for start, end in true_events.values()])
fn = len(true_events) - tp
rec_e = tp / (tp + fn)
prec_t = precision_score(y_test, pred_labels)
fscore_c = 2 * rec_e * prec_t / (rec_e + prec_t)
if prec_t == 0 and rec_e == 0:
fscore_c = 0
if return_prec_rec:
return prec_t, rec_e, fscore_c
return fscore_c
def main():
y_test = np.zeros(100)
y_test[10:20] = 1
y_test[50:60] = 1
pred_labels = np.zeros(100)
pred_labels[15:17] = 1
pred_labels[55:62] = 1
# pred_labels[51:55] = 1
# true_events = get_events(y_test)
prec_t, rec_e, fscore_c = get_composite_fscore_raw(pred_labels, y_test, return_prec_rec=True)
# print("Prec_t: {}, rec_e: {}, fscore_c: {}".format(prec_t, rec_e, fscore_c))
if __name__ == "__main__":
main()
from metrics.f1_score_f1_pa import *
from metrics.fc_score import *
from metrics.precision_at_k import *
from metrics.customizable_f1_score import *
from metrics.AUC import *
from metrics.Matthews_correlation_coefficient import *
from metrics.affiliation.generics import convert_vector_to_events
from metrics.affiliation.metrics import pr_from_events
# from metrics.vus.models.feature import Window
from metrics.vus.metrics import get_range_vus_roc
import numpy as np
def combine_all_evaluation_scores(y_test, pred_labels, anomaly_scores):
events_pred = convert_vector_to_events(y_test)
events_gt = convert_vector_to_events(pred_labels)
Trange = (0, len(y_test))
affiliation = pr_from_events(events_pred, events_gt, Trange)
true_events = get_events(y_test)
pa_accuracy, pa_precision, pa_recall, pa_f_score = get_adjust_F1PA(y_test, pred_labels)
MCC_score = MCC(y_test, pred_labels)
vus_results = get_range_vus_roc(y_test, pred_labels, 100) # default slidingWindow = 100
score_list_simple = {
"pa_accuracy":pa_accuracy,
"pa_precision":pa_precision,
"pa_recall":pa_recall,
"pa_f_score":pa_f_score,
"MCC_score":MCC_score,
"Affiliation precision": affiliation['precision'],
"Affiliation recall": affiliation['recall'],
"R_AUC_ROC": vus_results["R_AUC_ROC"],
"R_AUC_PR": vus_results["R_AUC_PR"],
"VUS_ROC": vus_results["VUS_ROC"],
"VUS_PR": vus_results["VUS_PR"]
}
# return score_list, score_list_simple
return score_list_simple
if __name__ == '__main__':
y_test = np.load("data/events_pred_MSL.npy")+0
pred_labels = np.load("data/events_gt_MSL.npy")+0
anomaly_scores = np.load("data/events_scores_MSL.npy")
print(len(y_test), max(anomaly_scores), min(anomaly_scores))
score_list_simple = combine_all_evaluation_scores(y_test, pred_labels, anomaly_scores)
for key, value in score_list_simple.items():
print('{0:21} :{1:10f}'.format(key, value))
\ No newline at end of file
# k is defined as the number of anomalies
# only calculate the range top k not the whole set
import numpy as np
def precision_at_k(y_test, score_t_test, pred_labels):
# top-k
k = int(np.sum(y_test))
threshold = np.percentile(score_t_test, 100 * (1 - k / len(y_test)))
# precision_at_k = metrics.top_k_accuracy_score(label, score, k)
p_at_k = np.where(pred_labels > threshold)[0]
TP_at_k = sum(y_test[p_at_k])
precision_at_k = TP_at_k / k
return precision_at_k
This diff is collapsed.
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib import cm
import pandas as pd
from tqdm import tqdm as tqdm
import time
from sklearn.preprocessing import MinMaxScaler
import random
import os
import sys
module_path = os.path.abspath(os.path.join('../..'))
if module_path not in sys.path:
sys.path.append(module_path)
from metrics.vus.utils.slidingWindows import find_length
from metrics.vus.utils.metrics import metricor
from metrics.vus.models.distance import Fourier
from metrics.vus.models.feature import Window
from metrics.vus.models.cnn import cnn
from metrics.vus.models.AE_mlp2 import AE_MLP2
from metrics.vus.models.lstm import lstm
from metrics.vus.models.ocsvm import OCSVM
from metrics.vus.models.poly import POLY
from metrics.vus.models.pca import PCA
from metrics.vus.models.norma import NORMA
from metrics.vus.models.matrix_profile import MatrixProfile
from metrics.vus.models.lof import LOF
from metrics.vus.models.iforest import IForest
def find_section_length(label,length):
best_i = None
best_sum = None
current_subseq = False
for i in range(len(label)):
changed = False
if label[i] == 1:
if current_subseq == False:
current_subseq = True
if best_i is None:
changed = True
best_i = i
best_sum = np.sum(label[max(0,i-200):min(len(label),i+9800)])
else:
if np.sum(label[max(0,i-200):min(len(label),i+9800)]) < best_sum:
changed = True
best_i = i
best_sum = np.sum(label[max(0,i-200):min(len(label),i+9800)])
else:
changed = False
if changed:
diff = i+9800 - len(label)
pos1 = max(0,i-200 - max(0,diff))
pos2 = min(i+9800,len(label))
else:
current_subseq = False
if best_i is not None:
return best_i-pos1,(pos1,pos2)
else:
return None,None
def generate_data(filepath,init_pos,max_length):
df = pd.read_csv(filepath, header=None).to_numpy()
name = filepath.split('/')[-1]
#max_length = 30000
data = df[init_pos:init_pos+max_length,0].astype(float)
label = df[init_pos:init_pos+max_length,1]
pos_first_anom,pos = find_section_length(label,max_length)
data = df[pos[0]:pos[1],0].astype(float)
label = df[pos[0]:pos[1],1]
slidingWindow = find_length(data)
#slidingWindow = 70
X_data = Window(window = slidingWindow).convert(data).to_numpy()
data_train = data[:int(0.1*len(data))]
data_test = data
X_train = Window(window = slidingWindow).convert(data_train).to_numpy()
X_test = Window(window = slidingWindow).convert(data_test).to_numpy()
return pos_first_anom,slidingWindow,data,X_data,data_train,data_test,X_train,X_test,label
def compute_score(methods,slidingWindow,data,X_data,data_train,data_test,X_train,X_test):
methods_scores = {}
for method in methods:
start_time = time.time()
if method == 'IForest':
clf = IForest(n_jobs=1)
x = X_data
clf.fit(x)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
score = np.array([score[0]]*math.ceil((slidingWindow-1)/2) + list(score) + [score[-1]]*((slidingWindow-1)//2))
elif method == 'LOF':
clf = LOF(n_neighbors=20, n_jobs=1)
x = X_data
clf.fit(x)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
score = np.array([score[0]]*math.ceil((slidingWindow-1)/2) + list(score) + [score[-1]]*((slidingWindow-1)//2))
elif method == 'MatrixProfile':
clf = MatrixProfile(window = slidingWindow)
x = data
clf.fit(x)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
score = np.array([score[0]]*math.ceil((slidingWindow-1)/2) + list(score) + [score[-1]]*((slidingWindow-1)//2))
elif method == 'NormA':
clf = NORMA(pattern_length = slidingWindow, nm_size=3*slidingWindow)
x = data
clf.fit(x)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
score = np.array([score[0]]*((slidingWindow-1)//2) + list(score) + [score[-1]]*((slidingWindow-1)//2))
elif method == 'PCA':
clf = PCA()
x = X_data
clf.fit(x)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
score = np.array([score[0]]*math.ceil((slidingWindow-1)/2) + list(score) + [score[-1]]*((slidingWindow-1)//2))
elif method == 'POLY':
clf = POLY(power=3, window = slidingWindow)
x = data
clf.fit(x)
measure = Fourier()
measure.detector = clf
measure.set_param()
clf.decision_function(measure=measure)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
elif method == 'OCSVM':
X_train_ = MinMaxScaler(feature_range=(0,1)).fit_transform(X_train.T).T
X_test_ = MinMaxScaler(feature_range=(0,1)).fit_transform(X_test.T).T
clf = OCSVM(nu=0.05)
clf.fit(X_train_, X_test_)
score = clf.decision_scores_
score = np.array([score[0]]*math.ceil((slidingWindow-1)/2) + list(score) + [score[-1]]*((slidingWindow-1)//2))
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
elif method == 'LSTM':
clf = lstm(slidingwindow = slidingWindow, predict_time_steps=1, epochs = 50, patience = 5, verbose=0)
clf.fit(data_train, data_test)
measure = Fourier()
measure.detector = clf
measure.set_param()
clf.decision_function(measure=measure)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
elif method == 'AE':
clf = AE_MLP2(slidingWindow = slidingWindow, epochs=100, verbose=0)
clf.fit(data_train, data_test)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
elif method == 'CNN':
clf = cnn(slidingwindow = slidingWindow, predict_time_steps=1, epochs = 100, patience = 5, verbose=0)
clf.fit(data_train, data_test)
measure = Fourier()
measure.detector = clf
measure.set_param()
clf.decision_function(measure=measure)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
#end_time = time.time()
#time_exec = end_time - start_time
#print(method,"\t time: {}".format(time_exec))
methods_scores[method] = score
return methods_scores
from .utils.metrics import metricor
from .analysis.robustness_eval import generate_curve
def get_range_vus_roc(score, labels, slidingWindow):
grader = metricor()
R_AUC_ROC, R_AUC_PR, _, _, _ = grader.RangeAUC(labels=labels, score=score, window=slidingWindow, plot_ROC=True)
_, _, _, _, _, _,VUS_ROC, VUS_PR = generate_curve(labels, score, 2*slidingWindow)
metrics = {'R_AUC_ROC': R_AUC_ROC, 'R_AUC_PR': R_AUC_PR, 'VUS_ROC': VUS_ROC, 'VUS_PR': VUS_PR}
return metrics
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from statsmodels.tsa.stattools import acf
from scipy.signal import argrelextrema
import numpy as np
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
# determine sliding window (period) based on ACF
def find_length(data):
if len(data.shape)>1:
return 0
data = data[:min(20000, len(data))]
base = 3
auto_corr = acf(data, nlags=400, fft=True)[base:]
local_max = argrelextrema(auto_corr, np.greater)[0]
try:
max_local_max = np.argmax([auto_corr[lcm] for lcm in local_max])
if local_max[max_local_max]<3 or local_max[max_local_max]>300:
return 125
return local_max[max_local_max]+base
except:
return 125
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment