Python sklearn.svm 模块,OneClassSVM() 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用sklearn.svm.OneClassSVM()

项目:AnomalyDetection    作者:JayZhuCoding    | 项目源码 | 文件源码
def optimize_training_parameters(self, n):
        # data
        from_timestamp = self.min_timestamp
        to_timestamp = self.min_timestamp + datetime.timedelta(days=365) + datetime.timedelta(hours=1)
        train_timestamps, train_values = self.load_monitor_data(from_timestamp, to_timestamp, "1")
        train_data = np.array(train_values)[:, 0:5]

        # parameters
        nu = np.linspace(start=1e-5, stop=1e-2, num=n)
        gamma = np.linspace(start=1e-6, stop=1e-3, num=n)
        opt_diff = 1.0
        opt_nu = None
        opt_gamma = None
        fw = open("training_param.csv", "w")
        fw.write("nu,gamma,diff\n")
        for i in range(len(nu)):
            for j in range(len(gamma)):
                classifier = svm.OneClassSVM(kernel="rbf", nu=nu[i], gamma=gamma[j])
                classifier.fit(train_data)
                label = classifier.predict(train_data)
                p = 1 - float(sum(label == 1.0)) / len(label)
                diff = math.fabs(p-nu[i])
                if diff < opt_diff:
                    opt_diff = diff
                    opt_nu = nu[i]
                    opt_gamma = gamma[j]
                fw.write(",".join([str(nu[i]), str(gamma[j]), str(diff)]) + "\n")
        fw.close()
        return opt_nu, opt_gamma
项目:Parallel-SGD    作者:angadgill    | 项目源码 | 文件源码
def test_oneclass_decision_function():
    # Test OneClassSVM decision function
    clf = svm.OneClassSVM()
    rnd = check_random_state(2)

    # Generate train data
    X = 0.3 * rnd.randn(100, 2)
    X_train = np.r_[X + 2, X - 2]

    # Generate some regular novel observations
    X = 0.3 * rnd.randn(20, 2)
    X_test = np.r_[X + 2, X - 2]
    # Generate some abnormal novel observations
    X_outliers = rnd.uniform(low=-4, high=4, size=(20, 2))

    # fit the model
    clf = svm.OneClassSVM(nu=0.1, kernel="rbf", gamma=0.1)
    clf.fit(X_train)

    # predict things
    y_pred_test = clf.predict(X_test)
    assert_greater(np.mean(y_pred_test == 1), .9)
    y_pred_outliers = clf.predict(X_outliers)
    assert_greater(np.mean(y_pred_outliers == -1), .9)
    dec_func_test = clf.decision_function(X_test)
    assert_array_equal((dec_func_test > 0).ravel(), y_pred_test == 1)
    dec_func_outliers = clf.decision_function(X_outliers)
    assert_array_equal((dec_func_outliers > 0).ravel(), y_pred_outliers == 1)
项目:AnomalyDetection    作者:JayZhuCoding    | 项目源码 | 文件源码
def detect_outlier(data_train, measurement):
    """
    Detect whether the input measurement is outlier or not.
    :param data_train: data for training the one class SVM model
    :param measurement: one row from the chill_untested.csv
    :return: predicted label for input measurement
    """
    classifier = svm.OneClassSVM(kernel="rbf", nu=0.005, gamma=0.00001)
    classifier.fit(data_train)
    label = classifier.predict(measurement)[0]
    return label
项目:kboc    作者:vmonaco    | 项目源码 | 文件源码
def fit(self, X):
        clf = svm.OneClassSVM(nu=0.5, kernel="rbf", gamma=0.9)
        clf.fit(X)
        self.clf = clf
项目:outlier_detection    作者:AloneGu    | 项目源码 | 文件源码
def __init__(self, param_dict={}):
        self.param_dict = param_dict
        print self.__class__.__name__, self.param_dict
        self.cls = OneClassSVM(**param_dict)
项目:monasca-analytics    作者:openstack    | 项目源码 | 文件源码
def learn_structure(self, samples):
        X_train, X_test = self._generate_train_test_sets(samples, 0.75)
        logger.info("Training with " + str(len(X_train)) +
                    "samples; testing with " + str(len(X_test)) + " samples.")
        svm_detector = svm.OneClassSVM(nu=0.95 * OUTLIERS_FRACTION + 0.05,
                                       kernel="rbf", gamma=0.1)
        svm_detector.fit(X_train)
        Y_test = svm_detector.predict(X_test)
        num_anomalies = Y_test[Y_test == -1].size
        logger.info("Found " + str(num_anomalies) +
                    " anomalies in testing set")
        return svm_detector
项目:monasca-analytics    作者:openstack    | 项目源码 | 文件源码
def test_learn_structure(self):
        data = self.get_testing_data()
        clf = self.svm.learn_structure(data)
        self.assertIsInstance(clf, svm.OneClassSVM)
项目:EasyMKL    作者:jmikko    | 项目源码 | 文件源码
def fit(self):
        global isFitted
        isFitted = True
        print "fit the model"
        train = np.array(self.model.data)
        X = train[:, 0:2]
        y = train[:, 2]

        lam = float(self.complexity.get())
        gamma = float(self.gamma.get())
        coef0 = float(self.coef0.get())
        degree = int(self.degree.get())
        kernel_map = {0: "linear", 1: "rbf", 2: "poly"}
        #if len(np.unique(y)) == 1:
        #    clf = svm.OneClassSVM(kernel=kernel_map[self.kernel.get()],
        #              gamma=gamma, coef0=coef0, degree=degree)
        #    clf.fit(X)
        #else:
        #mysvm = svm.SVC(kernel=kernel_map[self.kernel.get()], C=1000,
        #                  gamma=gamma, coef0=coef0, degree=degree)
        #mysvm.fit(X, y)
        #l = 0.1;
        clf = komd.KOMD(lam=lam, Kf=kernel_map[self.kernel.get()], rbf_gamma=gamma, poly_deg=degree, poly_coeff=coef0)

        clf.fit(X,y)
        #print clf.gamma
        #global gamma, bias
        #gamma = clf.gamma
        #bias = clf.bias

        if hasattr(clf, 'score'):
            print "Accuracy:", clf.score(X, y) * 100
        X1, X2, Z = self.decision_surface(clf)
        self.model.clf = clf
        #self.model.svm = mysvm
        self.clf = clf
        #self.mysvm = mysvm
        self.model.set_surface((X1, X2, Z))
        self.model.surface_type = self.surface_type.get()
        self.fitted = True
        self.model.changed("surface")
项目:Parallel-SGD    作者:angadgill    | 项目源码 | 文件源码
def fit(self):
        print("fit the model")
        train = np.array(self.model.data)
        X = train[:, 0:2]
        y = train[:, 2]

        C = float(self.complexity.get())
        gamma = float(self.gamma.get())
        coef0 = float(self.coef0.get())
        degree = int(self.degree.get())
        kernel_map = {0: "linear", 1: "rbf", 2: "poly"}
        if len(np.unique(y)) == 1:
            clf = svm.OneClassSVM(kernel=kernel_map[self.kernel.get()],
                                  gamma=gamma, coef0=coef0, degree=degree)
            clf.fit(X)
        else:
            clf = svm.SVC(kernel=kernel_map[self.kernel.get()], C=C,
                          gamma=gamma, coef0=coef0, degree=degree)
            clf.fit(X, y)
        if hasattr(clf, 'score'):
            print("Accuracy:", clf.score(X, y) * 100)
        X1, X2, Z = self.decision_surface(clf)
        self.model.clf = clf
        self.model.set_surface((X1, X2, Z))
        self.model.surface_type = self.surface_type.get()
        self.fitted = True
        self.model.changed("surface")
项目:Parallel-SGD    作者:angadgill    | 项目源码 | 文件源码
def test_oneclass():
    # Test OneClassSVM
    clf = svm.OneClassSVM()
    clf.fit(X)
    pred = clf.predict(T)

    assert_array_almost_equal(pred, [-1, -1, -1])
    assert_array_almost_equal(clf.intercept_, [-1.008], decimal=3)
    assert_array_almost_equal(clf.dual_coef_,
                              [[0.632, 0.233, 0.633, 0.234, 0.632, 0.633]],
                              decimal=3)
    assert_raises(ValueError, lambda: clf.coef_)
项目:Parallel-SGD    作者:angadgill    | 项目源码 | 文件源码
def test_immutable_coef_property():
    # Check that primal coef modification are not silently ignored
    svms = [
        svm.SVC(kernel='linear').fit(iris.data, iris.target),
        svm.NuSVC(kernel='linear').fit(iris.data, iris.target),
        svm.SVR(kernel='linear').fit(iris.data, iris.target),
        svm.NuSVR(kernel='linear').fit(iris.data, iris.target),
        svm.OneClassSVM(kernel='linear').fit(iris.data),
    ]
    for clf in svms:
        assert_raises(AttributeError, clf.__setattr__, 'coef_', np.arange(3))
        assert_raises((RuntimeError, ValueError),
                      clf.coef_.__setitem__, (0, 0), 0)
项目:Parallel-SGD    作者:angadgill    | 项目源码 | 文件源码
def check_svm_model_equal(dense_svm, sparse_svm, X_train, y_train, X_test):
    dense_svm.fit(X_train.toarray(), y_train)
    if sparse.isspmatrix(X_test):
        X_test_dense = X_test.toarray()
    else:
        X_test_dense = X_test
    sparse_svm.fit(X_train, y_train)
    assert_true(sparse.issparse(sparse_svm.support_vectors_))
    assert_true(sparse.issparse(sparse_svm.dual_coef_))
    assert_array_almost_equal(dense_svm.support_vectors_,
                              sparse_svm.support_vectors_.toarray())
    assert_array_almost_equal(dense_svm.dual_coef_, sparse_svm.dual_coef_.toarray())
    if dense_svm.kernel == "linear":
        assert_true(sparse.issparse(sparse_svm.coef_))
        assert_array_almost_equal(dense_svm.coef_, sparse_svm.coef_.toarray())
    assert_array_almost_equal(dense_svm.support_, sparse_svm.support_)
    assert_array_almost_equal(dense_svm.predict(X_test_dense), sparse_svm.predict(X_test))
    assert_array_almost_equal(dense_svm.decision_function(X_test_dense),
                              sparse_svm.decision_function(X_test))
    assert_array_almost_equal(dense_svm.decision_function(X_test_dense),
                              sparse_svm.decision_function(X_test_dense))
    if isinstance(dense_svm, svm.OneClassSVM):
        msg = "cannot use sparse input in 'OneClassSVM' trained on dense data"
    else:
        assert_array_almost_equal(dense_svm.predict_proba(X_test_dense),
                                  sparse_svm.predict_proba(X_test), 4)
        msg = "cannot use sparse input in 'SVC' trained on dense data"
    if sparse.isspmatrix(X_test):
        assert_raise_message(ValueError, msg, dense_svm.predict, X_test)
项目:AnomalyDetection    作者:JayZhuCoding    | 项目源码 | 文件源码
def predict(self, nu, gamma):
        # classifier
        classifier = svm.OneClassSVM(kernel="rbf", nu=nu, gamma=gamma)
        # data for test
        from_timestamp = self.min_timestamp + datetime.timedelta(days=365)
        to_timestamp = self.max_timestamp
        test_timestamps, test_values = self.load_monitor_data(from_timestamp, to_timestamp, "nan")
        test_data = np.array(test_values)[:, 0:5]
        # data for train
        to_timestamp = self.min_timestamp + datetime.timedelta(days=365) + datetime.timedelta(hours=1)
        train_timestamps, train_values = self.load_monitor_data(self.min_timestamp, to_timestamp, "1")
        for i in range(len(test_timestamps)):
            # predict
            train_data = np.array(train_values)[:, 0:5]
            classifier.fit(train_data)
            label = classifier.predict(test_data[i])[0]
            test_values[i][5] = int(label)
            if label == 1:
                test_values[i][6] = 0.0
                train_values.append(test_values[i])
            else:
                test_values[i][6] = 1.0
            print test_timestamps[i], label, test_values[i]
        # write result into monitor file
        fr = open(self.monitor_file, "r")
        header = fr.readline()
        lines = fr.readlines()
        fr.close()
        fw = open(self.monitor_file, "w")  # update monitor file
        fw.write(header)
        for line in lines:
            timestamp = datetime.datetime.strptime(line.strip().split(",")[0], "%Y-%m-%d %H:%M:%S")
            if timestamp in test_timestamps:
                idx = test_timestamps.index(timestamp)
                value = test_values[idx]
                timestamp = str(timestamp)
                temperature = str(value[0])
                ph = str(value[1])
                conductivity = str(value[2])
                orp = str(value[3])
                do = str(value[4])
                label = str(int(value[5]))
                outlier_prob = str(value[6])
                event_prob = str(value[7])
                m = [timestamp, temperature, ph, conductivity, orp, do, label, outlier_prob, event_prob]
                fw.write(",".join(m) + "\n")
            else:
                fw.write(line)
        fw.close()
项目:satoshi-mission    作者:lilychai    | 项目源码 | 文件源码
def __init__(self, num_class=2):
        """
        :type num_classes: int
        :rtype: None
        """

        self.__ctrl__ = None
        self.__case__ = None

        with open('../../.dbname', 'r') as f:
            self.__DB_NAME__ = json.load(f)['dbname']
        self.__MG_DOCS_COLL__   = 'raw-docs'           # raw docs
        self.__MG_SENTS_COLL__  = 'bag-of-sents'       # raw sentences
        self.__MG_TOKENS_COLL__ = 'sample-tokens'      # clean tokens (words)
        self.__PG_STATS_TBL__   = 'stats'              # stylometric features
        self.__PG_RESULTS_TBL__ = 'results_' + \
                                  str(num_class) + \
                                  'class'              # cross val results
        self.__PG_PROBAS_TBL__  = 'probabilities'      # cross val probabilities


        self.__model__ = Pipeline([ \
                                 # ('scaler2', StandardScaler()),
                                 # ('scaler', MinMaxScaler()),
                                 # ('scaler3', Normalizer()),
                                  ('classifier', SVC(probability=True,
                                                     kernel='poly',
                                                     degree=2,
                                                     class_weight='balanced') \
                                                 if num_class-1 \
                                            else OneClassSVM(kernel='rbf',
                                                             nu=0.7,
                                                             gamma=1./250))
                                 ])

        print 'Instantiated classifier %s.' % \
              self.__model__.named_steps['classifier'].__class__.__name__


        self.__io__ = DBIO(MG_DB_NAME=self.__DB_NAME__,
                           PG_DB_NAME=self.__DB_NAME__)

        self.__tagger__ = None     # initialise if re-creating samples
        self.__bootstrap__ = None  # initialise in fit
项目:satoshi-mission    作者:lilychai    | 项目源码 | 文件源码
def fit(self, author1, author2, wts1=None, wts2=None,
                              bootstrap=False, verbose=False):
        """
        :type author1: str
        :type author2: str
        :type wts1: str/List[str]
        :type wts2: str/List[str]
        :type verbose:bool
        :rtype: bool
        :
        : Prepares databases and tables/collections.
        :
        """

        self.__bootstrap__ = bootstrap

        cases = []
        for i, (author, wts) in enumerate([(author1, wts1), (author2, wts2)]):
            if not wts:
                wts = [wt.encode('ascii') \
                       for wt in self.__io__.mg_distinct(self.__MG_DOCS_COLL__,
                                                         'type',
                                                         { 'author':author } )]

            if not isinstance(wts, list):
                wts = [wts]

            cases += (author, wts, (1,-1)[i]),   # use 1, -1 to match output
                                                 # from sklearn's OneClassSVM


        self.__ctrl__ = cases[0]    # assign label 1 in y vector
        self.__case__ = cases[1]    # assign be label 0 in y vector
        self.__MG_TOKENS_COLL__ += '-' + cases[0][0] + \
                                   '-' + cases[1][0] + \
                                   '-' + \
                                   ''.join(wt[:3] for wt in cases[0][1]) + \
                                   '-' + \
                                   ''.join(wt[:3] for wt in cases[1][1]) + \
                                   '-' + \
                                   ('nobs','bs')[bootstrap]

        self.__PG_STATS_TBL__   += '_' + cases[0][0] + \
                                   '_' + cases[1][0] + \
                                   '_' + \
                                   ''.join(wt[:3] for wt in cases[0][1]) + \
                                   '_' + \
                                   ''.join(wt[:3] for wt in cases[1][1]) + \
                                   '_' + \
                                   ('nobs','bs')[bootstrap]



        if verbose:
            print 'Control:', self.__ctrl__
            print 'Case:   ', self.__case__
            print 'Saving tokens to', self.__MG_TOKENS_COLL__
            print 'Saving stats to', self.__PG_STATS_TBL__

        return self.__prep_sents__(verbose=verbose) # err in preparing sentences