Python scipy.stats 模块,ks_2samp() 实例源码

我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用scipy.stats.ks_2samp()

项目:psp    作者:cmap    | 项目源码 | 文件源码
def ks_test_single(test_vals, bg_vals):
    """ Compute KS-test statistic for one pair of test values and background
    values.

    Args:
        test_vals (numpy array)
        bg_vals (numpy array)

    Returns:
        ks_stat (float)
        pval (float)

    """
    # Do KS-test
    try:
        (ks_stat, pval) = stats.ks_2samp(test_vals, bg_vals)

    # Return NaN if test fails
    except ValueError:
        logger.warning("KS-test failed.")
        ks_stat = np.nan
        pval = np.nan

    return ks_stat, pval
项目:pcog    作者:Ivan1931    | 项目源码 | 文件源码
def unfringe(self, alpha=0.05):
        """
        This method decides whether to expand the suffix trie onto the fringe.
        It does this by constructing a distribution of the utilities of all the current states and from one
        that includes the current state as well fringe nodes that are on the unofficial leaf of the tree.
        If the two distributions are sufficiently different then the state space is expanded to include all leaf nodes
        currently in the tree. The distribution comparison is performed using a KS test.
        """
        all_leaves = self.tree_leaves()
        all_leaves_dist = list(map(lambda leaf: self.utility(leaf), all_leaves))
        print(all_leaves_dist)
        current_leaves = self.get_states()
        current_dist = list(map(lambda leaf: self.utility(leaf), current_leaves))
        print(current_dist)
        D, p_value = test_result = ks_2samp(all_leaves_dist, current_dist)
        print(D)
        print(p_value)
        if p_value < alpha or alpha < D:
            for leaf in all_leaves:
                if leaf.is_fringe:
                    leaf.set_fringe(False)
                    self._correct_fringe(leaf)
            return True
        else:
            return False
项目:cgpm    作者:probcomp    | 项目源码 | 文件源码
def test_joint(kde_xz):
    # Simulate from the joint distribution of x,z (see
    # generate_real_nominal_data) and perform a KS tests at each of the
    # subpopulations at the six levels of z.

    data = np.asarray(kde_xz.data.values())
    indicators = sorted(set(data[:,1].astype(int)))
    joint_samples = kde_xz.simulate(-1, [0,1], N=len(data))
    _, ax = plt.subplots()
    ax.set_title('Joint Simulation')
    for t in indicators:
        # Plot original data.
        data_subpop = data[data[:,1] == t]
        ax.scatter(data_subpop[:,1], data_subpop[:,0], color=gu.colors[t])
        # Plot simulated data for indicator t.
        samples_subpop = [j[0] for j in joint_samples if j[1] == t]
        ax.scatter(
            np.add([t]*len(samples_subpop), .25), samples_subpop,
            color=gu.colors[t])
        # KS test.
        _, p = ks_2samp(data_subpop[:,0], samples_subpop)
        assert .05 < p
    ax.set_xlabel('z')
    ax.set_ylabel('x')
    ax.grid()
项目:cgpm    作者:probcomp    | 项目源码 | 文件源码
def test_conditional_indicator(kde_xz):
    # Simulate from the conditional distribution of x|z (see
    # generate_real_nominal_data) and perfrom a KS tests at each of the
    # subpopulations at the six levels of z.

    data = np.asarray(kde_xz.data.values())
    indicators = sorted(set(data[:,1].astype(int)))
    _, ax = plt.subplots()
    ax.set_title('Conditional Simulation Of X Given Indicator Z')
    for t in indicators:
        # Plot original data.
        data_subpop = data[data[:,1] == t]
        ax.scatter(data_subpop[:,1], data_subpop[:,0], color=gu.colors[t])
        # Plot simulated data.
        samples_subpop = [s[0] for s in
            kde_xz.simulate(-1, [0], {1:t}, None, N=len(data_subpop))]
        ax.scatter(
            np.repeat(t, len(data_subpop)) + .25,
            samples_subpop, color=gu.colors[t])
        # KS test.
        _, p = ks_2samp(data_subpop[:,0], samples_subpop)
        assert .1 < p
    ax.set_xlabel('z')
    ax.set_ylabel('x')
    ax.grid()
项目:cgpm    作者:probcomp    | 项目源码 | 文件源码
def test_joint(knn_xz):
    # Simulate from the joint distribution of x,z (see
    # generate_real_nominal_data) and perform a KS tests at each of the
    # subpopulations at the six levels of z.

    data = np.asarray(knn_xz.data.values())
    indicators = sorted(set(data[:,1].astype(int)))
    joint_samples = knn_xz.simulate(-1, [0,1], N=len(data))
    _, ax = plt.subplots()
    ax.set_title('Joint Simulation')
    for t in indicators:
        # Plot original data.
        data_subpop = data[data[:,1] == t]
        ax.scatter(data_subpop[:,1], data_subpop[:,0], color=gu.colors[t])
        # Plot simulated data for indicator t.
        samples_subpop = [j[0] for j in joint_samples if j[1] == t]
        ax.scatter(
            np.add([t]*len(samples_subpop), .25), samples_subpop,
            color=gu.colors[t])
        # KS test.
        pvalue = ks_2samp(data_subpop[:,0], samples_subpop)[1]
        assert .05 < pvalue
    ax.set_xlabel('z')
    ax.set_ylabel('x')
    ax.grid()
项目:cgpm    作者:probcomp    | 项目源码 | 文件源码
def test_joint(state):
    # Simulate from the joint distribution of (x,z).
    joint_samples = state.simulate(-1, [0,1], N=N_SAMPLES)
    _, ax = plt.subplots()
    ax.set_title('Joint Simulation')
    for t in INDICATORS:
        # Plot original data.
        data_subpop = DATA[DATA[:,1] == t]
        ax.scatter(data_subpop[:,1], data_subpop[:,0], color=gu.colors[t])
        # Plot simulated data for indicator t.
        samples_subpop = [j[0] for j in joint_samples if j[1] == t]
        ax.scatter(
            np.add([t]*len(samples_subpop), .25), samples_subpop,
            color=gu.colors[t])
        # KS test.
        pvalue = ks_2samp(data_subpop[:,0], samples_subpop)[1]
        assert .05 < pvalue
    ax.set_xlabel('Indicator')
    ax.set_ylabel('x')
    ax.grid()
项目:cgpm    作者:probcomp    | 项目源码 | 文件源码
def test_conditional_indicator(state):
    # Simulate from the conditional X|Z
    _, ax = plt.subplots()
    ax.set_title('Conditional Simulation Of Data X Given Indicator Z')
    for t in INDICATORS:
        # Plot original data.
        data_subpop = DATA[DATA[:,1] == t]
        ax.scatter(data_subpop[:,1], data_subpop[:,0], color=gu.colors[t])
        # Plot simulated data.
        samples_subpop = [s[0] for s in
            state.simulate(-1, [0], {1:t}, None, len(data_subpop))]
        ax.scatter(
            np.repeat(t, len(data_subpop)) + .25,
            samples_subpop, color=gu.colors[t])
        # KS test.
        pvalue = ks_2samp(data_subpop[:,0], samples_subpop)[1]
        assert .01 < pvalue
    ax.set_xlabel('Indicator')
    ax.set_ylabel('x')
    ax.grid()
项目:Default-Credit-Card-Prediction    作者:AlexPnt    | 项目源码 | 文件源码
def kolmogorov_smirnov_two_sample_test(X,y):
    """
    Performs the two sample Kolmogorov-Smirnov test, testing wheter feature values of each class are drawn from identical distributions

    Keyword arguments:
    X -- The feature vectors
    y -- The target vector
    """

    kolmogorov_smirnov=[[(0,0)]]*len(X[0])
    # print kolmogorov_smirnov
    for feature_col in xrange(len(X[0])):
            ks_test_statistic,p_value=stats.ks_2samp(X[y==0,feature_col],X[y==1,feature_col])
            kolmogorov_smirnov[feature_col]=(ks_test_statistic,p_value)

    #debug
    for f in xrange(23):
        print kolmogorov_smirnov[f]

    return kolmogorov_smirnov
项目:numerai    作者:gansanay    | 项目源码 | 文件源码
def kolmogorov_smirnov(x_train, x_test):
    r = []
    p = []
    for c in x_train.columns:
        r_, p_ = ks_2samp(x_train[c], x_test[c])
        r.append(r_)
        p.append(p_)
    dfks = pd.DataFrame(index=range(1, 1 + len(x_train.columns)))
    dfks['KS'] = r
    dfks['KS_p'] = p
    return dfks
项目:bnn-analysis    作者:myshkov    | 项目源码 | 文件源码
def ks_distance(p_samples, q_samples):
    if isinstance(p_samples, tuple):
        idx, p_samples = p_samples

    return sc.ks_2samp(p_samples, q_samples)[0]
项目:sparse-digraph-generator    作者:papoudakis    | 项目源码 | 文件源码
def optimize_bollobas(graph):
    num_edges = len(graph.edges())
    in_degree_or = sorted(graph.in_degree().values())
    out_degree_or = sorted(graph.out_degree().values())
    cdf_in_emp = to_cumulative(in_degree_or)
    cdf_out_emp = to_cumulative(out_degree_or)
    alpha = 0.1
    beta = 0.1
    best_Ks = np.inf
    best_graph = None
    best_alpha = -1
    best_beta = -1
    while alpha < 1.0:
        while alpha + beta < 1.0 - 1e-6:
            gen_graph = bollobas(num_edges, alpha, beta)
            in_degree_gen = sorted(gen_graph.in_degree().values())
            out_degree_gen = sorted(gen_graph.out_degree().values())
            cdf_in_gen = to_cumulative(in_degree_gen)
            cdf_out_gen = to_cumulative(out_degree_gen)

            temp_Ks = max(ks_2samp(cdf_in_gen, cdf_in_emp)[0], ks_2samp(cdf_out_gen, cdf_out_emp)[0])
            if temp_Ks < best_Ks:
                best_graph = gen_graph
                best_Ks = temp_Ks
                best_alpha = alpha
                best_beta = beta
            beta += 0.1
        alpha += 0.1
        beta = 0.1
    return best_graph, best_alpha, best_beta
项目:dc_stat_think    作者:justinbois    | 项目源码 | 文件源码
def test_ks_stat(x):
    theor_data = np.random.normal(0, 1, size=100)
    correct, _ = st.ks_2samp(x, theor_data)
    assert np.isclose(dcst.ks_stat(x, theor_data), correct)

    theor_data = np.random.exponential(1, size=100)
    correct, _ = st.ks_2samp(x, theor_data)
    assert np.isclose(dcst.ks_stat(x, theor_data), correct)

    theor_data = np.random.logistic(0, 1, size=100)
    correct, _ = st.ks_2samp(x, theor_data)
    assert np.isclose(dcst.ks_stat(x, theor_data), correct)
项目:dc_stat_think    作者:justinbois    | 项目源码 | 文件源码
def test_pandas_conversion(seed):
    df = pd.DataFrame({'a': [3, 2, 1, 4],
                       'b': [8, 6, 7, 5],
                       'c': [9.1, 10.1, 11.1, np.nan]})

    x, y = dcst.ecdf(df.loc[:, 'a'])
    assert (x == np.array([1, 2, 3, 4])).all()
    assert (y == np.array([0.25, 0.5, 0.75, 1.0])).all()

    x, y = dcst.ecdf(df.loc[:, 'c'])
    assert np.allclose(x, np.array([9.1, 10.1, 11.1]))
    assert np.allclose(y, np.array([1/3, 2/3, 1.0]))

    df = pd.DataFrame({
        'a': np.concatenate((np.random.normal(0, 1, size=10), [np.nan]*990)),
        'b': np.random.normal(0, 1, size=1000)})
    correct, _ = st.ks_2samp(df['a'].dropna(), df['b'])
    assert np.isclose(dcst.ks_stat(df['a'], df['b']), correct)

    df = pd.DataFrame({
        'a': np.concatenate((np.random.normal(0, 1, size=80), [np.nan]*20)),
        'b': np.random.normal(0, 1, size=100)})
    dcst_private._seed_numba(seed)
    correct = dcst.draw_bs_reps(df['a'].values, np.mean, size=100)
    dcst_private._seed_numba(seed)
    assert np.allclose(dcst.draw_bs_reps(df['a'], np.mean, size=100), correct,
                       atol=atol)

    dcst_private._seed_numba(seed)
    correct = dcst.draw_bs_reps(df['b'].values, np.mean, size=100)
    dcst_private._seed_numba(seed)
    assert np.allclose(dcst.draw_bs_reps(df['b'], np.mean, size=100), correct,
                       atol=atol)

    dcst_private._seed_numba(seed)
    correct = dcst.draw_perm_reps(df['a'].values, df['b'].values,
                                  dcst.diff_of_means, size=100)
    dcst_private._seed_numba(seed)
    assert np.allclose(dcst.draw_perm_reps(df['a'], df['b'], 
                       dcst.diff_of_means, size=100), correct, atol=atol)
项目:ModelFlow    作者:yuezPrincetechs    | 项目源码 | 文件源码
def cal_ks(y,y_prob,pos_label=1,return_split=False,decimals=0):
    '''
    ??KS????????
    y: ?????series?????????{0,1}?{-1,1}??
    y_prob: ?????dataframe???????????????????????????????????
            ?????????series?????????dataframe?????
    pos_label: int?????positive?????
    return_split: ??????????
    decimals: ?????????
    ??KS??????????????sklearn???????
    '''
    y=pd.Series(pd.Series(y).values)
    if len(y_prob.shape)==1:
        y_pred=pd.Series(pd.Series(y_prob).values)
    else:
        y_pred=pd.Series(pd.DataFrame(y_prob).iloc[:,1].values)
    Bad=y_pred[y==pos_label]
    Good=y_pred[y!=pos_label]
    ks, pvalue = stats.ks_2samp(Bad.values, Good.values)
    if not return_split:
        return ks
    crossfreq=pd.crosstab(y_pred.round(decimals),y)
    crossdens = crossfreq.cumsum(axis=0) / crossfreq.sum()
    crossdens['gap'] = abs(crossdens[0] - crossdens[1])
    score_split = crossdens[crossdens['gap'] == crossdens['gap'].max()].index[0]
    return score_split
项目:cgpm    作者:probcomp    | 项目源码 | 文件源码
def test_univariate_two_sample(i):
    # This test ensures posterior sampling of uni/bimodal dists on R. When the
    # plot is shown, a density curve overlays the samples which is useful for
    # seeing that logpdf/simulate agree.
    N_SAMPLES = 100

    rng = gu.gen_rng(2)
    # Synthetic samples.
    samples_train = SAMPLES[i](N_SAMPLES, rng)
    samples_test = SAMPLES[i](N_SAMPLES, rng)
    # Univariate KDE.
    kde = MultivariateKde([3], None, distargs={O: {ST: [N], SA:[{}]}}, rng=rng)
    # Incorporate observations.
    for rowid, x in enumerate(samples_train):
        kde.incorporate(rowid, {3: x})
    # Run inference.
    kde.transition()
    # Generate posterior samples.
    samples_gen = [s[3] for s in kde.simulate(-1, [3], N=N_SAMPLES)]
    # Plot comparison of all train, test, and generated samples.
    fig, ax = plt.subplots()
    ax.scatter(samples_train, [0]*len(samples_train), color='b', label='Train')
    ax.scatter(samples_gen, [1]*len(samples_gen), color='r', label='KDE')
    ax.scatter(samples_test, [2]*len(samples_test), color='g', label='Test')
    # Overlay the density function.
    xs = np.linspace(ax.get_xlim()[0], ax.get_xlim()[1], 200)
    pdfs = [kde.logpdf(-1, {3: x}) for x in xs]
    # Convert the pdfs from the range to 1 to 1.5 by rescaling.
    pdfs_plot = np.exp(pdfs)+1
    pdfs_plot = (pdfs_plot/max(pdfs_plot)) * 1.5
    ax.plot(xs, pdfs_plot, color='k')
    # Clear up some labels.
    ax.set_title('Univariate KDE Posterior versus Generator')
    ax.set_xlabel('x')
    ax.set_yticklabels([])
    # Show the plot.
    ax.grid()
    plt.close()
    # KS test
    _, p = ks_2samp(samples_test, samples_gen)
    assert .05 < p
项目:cgpm    作者:probcomp    | 项目源码 | 文件源码
def two_sample_test(cctype, X, Y):
    model = cu.cctype_class(cctype)
    if model.is_numeric(): # XXX WRONG CHOICE FOR DISCRETE NUMERIC XXX
        _, pval = ks_2samp(X, Y)
    else:
        Xb, Yb = aligned_bincount([X, Y])
        ignore = np.logical_and(Xb==0, Yb==0)
        Xb, Yb = Xb[np.logical_not(ignore)], Yb[np.logical_not(ignore)]
        Xb = Xb/float(sum(Xb)) * 1000
        Yb = Yb/float(sum(Yb)) * 1000
        _, pval = chisquare(Yb, f_exp=Xb)
    return pval
项目:perfume    作者:leifwalsh    | 项目源码 | 文件源码
def _ks_Z(a, b):
    result = stats.ks_2samp(a, b)
    n = len(a)
    m = len(b)
    return result.statistic / np.sqrt((n + m) / (n * m))
项目:psp    作者:cmap    | 项目源码 | 文件源码
def test_compute_connectivities(self):
        # External query against build
        test_df_index = pd.MultiIndex.from_arrays(
            [["A", "A", "B", "B"], ["A375", "A375", "A375", "A375"],
             ["A:A375", "A:A375", "B:A375", "B:A375"]], names=["pert", "cell", "aggregated"])
        test_df_columns = pd.MultiIndex.from_arrays(
            [["D", "D", "D", "E", "E", "E"], ["A375", "A375", "A375", "A375", "A375", "A375"],
             ["D:A375", "D:A375", "D:A375", "E:A375", "E:A375", "E:A375"]],
            names=["pert_iname", "cell", "aggregated2"])
        test_df = pd.DataFrame(
            [[0.1, -0.3, -0.1, -0.4, 0.6, -0.7],
             [0.5, -0.7, -0.2, -1, 0.4, 0.2],
             [-0.2, 0.3, 0.7, 0.1, 0.4, -0.9],
             [0.1, 0.4, 0.2, 0.6, 0.4, -0.1]],
            index=test_df_index, columns=test_df_columns)

        bg_df_index = pd.MultiIndex.from_arrays(
            [["A", "B", "A", "B", "C", "C"], ["A375", "A375", "A375", "A375", "A375", "A375"],
             ["A:A375", "B:A375", "A:A375", "B:A375", "C:A375", "C:A375"]],
            names=["pert", "cell", "bg_aggregated"])
        bg_df = pd.DataFrame(
            [[1.0, 0.5, 1.0, -0.4, 1.1, -0.6],
             [0.5, 1.0, 1.2, -0.8, -0.9, 0.4],
             [1.0, 1.2, 1.0, 0.1, 0.3, 1.3],
             [-0.4, -0.8, 0.1, 1.0, 0.5, -0.2],
             [1.1, -0.9, 0.3, 0.5, 1.0, 0.7],
             [-0.6, 0.4, 1.3, -0.2, 0.7, 1.0]],
            index=bg_df_index, columns=bg_df_index)

        A_bg = [0.5, 1.0, -0.4, 1.1, -0.6, 1.2, 0.1, 0.3, 1.3] # med = 0.4
        B_bg = [0.5, 1.2, -0.8, -0.9, 0.4, -0.4, 0.1, 0.5, -0.2] # med = 0.1
        (e_D_v_A, _) = stats.ks_2samp([0.1, -0.3, -0.1, 0.5, -0.7, -0.2], A_bg) # med = -1.5, so -
        (e_D_v_B, _) = stats.ks_2samp([-0.2, 0.3, 0.7, 0.1, 0.4, 0.2], B_bg) # med = 0.25, so +
        (e_E_v_A, _) = stats.ks_2samp([-0.4, 0.6, -0.7, -1, 0.4, 0.2], A_bg) # med = -0.1, so -
        (e_E_v_B, _) = stats.ks_2samp([0.1, 0.4, -0.9, 0.6, 0.4, -0.1], B_bg) # med = 0.25, so +

        e_conn_df_index = pd.MultiIndex.from_arrays(
            [["A", "B"], ["A375", "A375"], ["A:A375", "B:A375"]],
            names=["pert", "cell", "aggregated"])
        e_conn_df_columns = pd.MultiIndex.from_arrays(
            [["D", "E"], ["A375", "A375"], ["D:A375", "E:A375"]],
            names=["pert_iname", "cell", "aggregated2"])
        e_conn_df = pd.DataFrame(
            [[e_D_v_A, e_E_v_A], [e_D_v_B, e_E_v_B]], index=e_conn_df_index, columns=e_conn_df_columns)
        e_signed_conn_df = pd.DataFrame(
            [[-e_D_v_A, -e_E_v_A], [e_D_v_B, e_E_v_B]], index=e_conn_df_index, columns=e_conn_df_columns)

        (conn_df, signed_conn_df) = sip.compute_connectivities(
            test_df, bg_df, "aggregated2", "aggregated", "bg_aggregated", "ks_test", False)

        pd.util.testing.assert_frame_equal(conn_df, e_conn_df, (
            "\nconn_df:\n{}\ne_conn_df:\n{}").format(conn_df, e_conn_df))
        pd.util.testing.assert_frame_equal(signed_conn_df, e_signed_conn_df, (
            "\nsigned_conn_df:\n{}\ne_signed_conn_df:\n{}").format(
            signed_conn_df, e_signed_conn_df))

        # Check that assertion works
        with self.assertRaises(Exception) as e:
            sip.compute_connectivities(test_df, bg_df, "aggregated2", "aggregated", "bg_aggregated", "wtcs", False)
        self.assertIn("connectivity metric must be either ks_test or", str(e.exception))
项目:cgpm    作者:probcomp    | 项目源码 | 文件源码
def test_bivariate_conditional_two_sample(noise):
    # This test checks joint and conditional simulation of a bivarate normal
    # with (correlation 1-noise). The most informative use is plotting but
    # there is a numerical test for the conditional distributions.
    N_SAMPLES = 100

    rng = gu.gen_rng(2)
    # Synthetic samples.
    linear = Linear(outputs=[0,1], noise=noise, rng=rng)
    samples_train = np.asarray(
        [[s[0], s[1]] for s in linear.simulate(-1, [0,1], N=N_SAMPLES)])
    # Bivariate KDE.
    kde = MultivariateKde(
        [0,1], None, distargs={O: {ST: [N,N], SA:[{},{}]}}, rng=rng)
    # Incorporate observations.
    for rowid, x in enumerate(samples_train):
        kde.incorporate(rowid, {0: x[0], 1: x[1]})
    # Run inference.
    kde.transition()
    # Generate posterior samples from the joint.
    samples_gen = np.asarray(
        [[s[0],s[1]] for s in kde.simulate(-1, [0,1], N=N_SAMPLES)])
    # Plot comparisons of the joint.
    fig, ax = plt.subplots(nrows=1, ncols=2)
    plot_data = zip(
        ax, ['b', 'r'], ['Train', 'KDE'], [samples_train, samples_gen])
    for (a, c, l, s) in plot_data:
        a.scatter(s[:,0], s[:,1], color=c, label=l)
        a.grid()
        a.legend(framealpha=0)
    # Generate posterior samples from the conditional.
    xs = np.linspace(-3, 3, 100)
    cond_samples_a = np.asarray(
        [[s[1] for s in linear.simulate(-1, [1], {0: x0}, N=N_SAMPLES)]
        for x0 in xs])
    cond_samples_b = np.asarray(
        [[s[1] for s in kde.simulate(-1, [1], {0: x0}, N=N_SAMPLES)]
        for x0 in xs])
    # Plot the mean value on the same plots.
    for (a, s) in zip(ax, [cond_samples_a, cond_samples_b]):
        a.plot(xs, np.mean(s, axis=1), linewidth=3, color='g')
        a.set_xlim([-5,4])
        a.set_ylim([-5,4])
    plt.close('all')
    # Perform a two sample test on the means.
    mean_a = np.mean(cond_samples_a, axis=1)
    mean_b = np.mean(cond_samples_b, axis=1)
    _, p = ks_2samp(mean_a, mean_b)
    assert .01 < p
项目:Default-Credit-Card-Prediction    作者:AlexPnt    | 项目源码 | 文件源码
def kolmogorov_smirnov_two_sample_test(sample_a,sample_b):
    """
    Performs the two sample Kolmogorov-Smirnov test, testing wheter twoa samples are drawn from identical distributions

    Keyword arguments:
    sample_a -- The first sample
    sample_b -- The second sample
    """

    return stats.ks_2samp(sample_a,sample_b)