Python math 模块,nan() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用math.nan()

项目:recommandation-film    作者:sambiak    | 项目源码 | 文件源码
def reccomandation(self, x):
        """"""
        y = np.array([[math.nan] * nombre_films()])
        for key in self.films:
            y[0][self.conv.renvoyer_index(key)] = self.films[key]
        max_i = 0
        n_max = 0
        t = np.dot(x, self._theta.T)
        print(t)
        for i, el in enumerate(y[0]):
            if np.isnan(el) and t[i, 0] > n_max:
                print("film : ", self.conv.renvoyer_nom_index(i), "note :", n_max)
                n_max = t[i, 0]
                max_i = i
        print(t)
        print(self._theta)
        return self.conv.renvoyer_nom_index(max_i)
项目:recommandation-film    作者:sambiak    | 项目源码 | 文件源码
def reccomandation(self, x):
        """"""
        y = np.array([[math.nan] * nombre_films()])
        for key in self.films:
            y[0][self.conv.renvoyer_index(key)] = self.films[key]
        max_i = 0
        n_max = 0
        t = np.dot(x, self._theta.T)
        print(t)
        for i, el in enumerate(y[0]):
            if np.isnan(el) and t[i, 0] > n_max:
                print("film : ", self.conv.renvoyer_nom_index(i), "note :", n_max)
                n_max = t[i, 0]
                max_i = i
        print(t)
        print(self._theta)
        return self.conv.renvoyer_nom_index(max_i)
项目:activityio    作者:jmackie4    | 项目源码 | 文件源码
def __init__(self, srmfile, recording_interval):
        self.metres = nan
        self.lat, self.lon = nan, nan

        if srmfile.version < 7:
            self.watts, self.kph = self.compact_power_speed(srmfile)
            self.cad, self.hr = unpack('<BB', srmfile.read(2))
            self.alt, self.temp = nan, nan
        else:
            values = unpack('<HBBllh', srmfile.read(14))
            for name, value in zip(self.__slots__, values):
                setattr(self, name, value)

            if srmfile.version == 9:
                latlon = unpack('<ll', srmfile.read(8))
                self.lat, self.lon = (l * 180 / 0x7fffffff for l in latlon)

            self.temp *= 0.1
            self.kph = 0 if (self.kph < 0) else self.kph * 3.6 / 1000
            self.metres = recording_interval * self.kph / 3.6
项目:inflation_calc    作者:EricSchles    | 项目源码 | 文件源码
def predict(df,steps):
    print("started function")
    start = df.index[0].year
    end = df.index[-1].year
    years_captured = [idx.year for idx in df.index]
    years_inclusive = [elem for elem in range(start, end+1)]
    s = df.T.squeeze()
    for year in years_inclusive:
        if not year in years_captured:
            s = s.set_value(datetime.datetime(year=year,month=1,day=1),math.nan)
    s.sort_index(inplace=True)
    s = s.interpolate()
    data = s.to_frame()
    print("loaded data")
    model_order = brute_search(data)
    model_order = tuple([int(elem) for elem in model_order])
    print("found model order")
    model = sm.tsa.ARIMA(data, model_order).fit(disp=0)
    print("fit model")
    return model.forecast(steps=steps)[0], end
项目:inflation_calc    作者:EricSchles    | 项目源码 | 文件源码
def predict(df,steps):
    start = df.index[0].year
    end = df.index[-1].year
    years_captured = [idx.year for idx in df.index]
    years_inclusive = [elem for elem in range(start, end+1)]
    s = df.T.squeeze()
    for year in years_inclusive:
        if not year in years_captured:
            s = s.set_value(datetime.datetime(year=year,month=1,day=1),math.nan)
    s.sort_index(inplace=True)
    s = s.interpolate()
    data = s.to_frame()
    model_order = brute_search(data)
    model_order = tuple([int(elem) for elem in model_order])
    model = sm.tsa.ARIMA(data, model_order).fit(disp=0)
    return model.forecast(steps=steps)[0], end
项目:recommandation-film    作者:sambiak    | 项目源码 | 文件源码
def theta(self, x, etapes):
        """Fait etapes etapes du gradient sur theta du film puis le renvoie"""
        y = np.array([[math.nan] * nombre_films()])
        for key in self.films:
            y[0][self.conv.renvoyer_index(key)] = self.films[key]
        for etape in range(etapes):
            self._theta = etape_du_gradient(y, 0.0001, self._theta, x)
        return self._theta
项目:recommandation-film    作者:sambiak    | 项目源码 | 文件源码
def theta(self, x, etapes):
        """Fait etapes etapes du gradient sur theta du film puis le renvoie"""
        y = np.array([[math.nan] * nombre_films()])
        for key in self.films:
            y[0][self.conv.renvoyer_index(key)] = self.films[key]
        for etape in range(etapes):
            self._theta = etape_du_gradient(y, 0.0001, self._theta, x)
        return self._theta
项目:foil    作者:portfoliome    | 项目源码 | 文件源码
def none_to_nan(value):
    return nan if value is None else value
项目:foil    作者:portfoliome    | 项目源码 | 文件源码
def test_nan_to_none(self):
        self.assertIsNone(nan_to_none(nan))
        self.assertEqual(1, nan_to_none(1))
项目:dump1090-exporter    作者:claws    | 项目源码 | 文件源码
def process_stats(self,
                      stats: dict,
                      time_periods: Sequence[str] = ('last1min', )) -> None:
        ''' Process dump1090 statistics into exported metrics.

        :param stats: a dict containing dump1090 statistics data.
        '''
        metrics = self.metrics['stats']

        for time_period in time_periods:
            try:
                tp_stats = stats[time_period]
            except KeyError:
                logger.exception(
                    'Problem extracting time period: {}'.format(time_period))
                continue

            labels = dict(time_period=time_period)

            for key in metrics:
                d = tp_stats[key] if key else tp_stats
                for name, metric in metrics[key].items():
                    try:
                        value = d[name]
                        # 'accepted' values are in a list
                        if isinstance(value, list):
                            value = value[0]
                    except KeyError:
                        # 'signal' and 'peak_signal' are not present if
                        # there are no aircraft.
                        if name not in ['peak_signal', 'signal']:
                            logger.warning(
                                "Problem extracting{}item '{}' from: {}".format(
                                    ' {} '.format(key) if key else ' ', name, d))
                        value = math.nan
                    metric.set(labels, value)
项目:sigir2017-bitfunnel    作者:BitFunnel    | 项目源码 | 文件源码
def analyze_lucene_index(self):
        results = IndexCharacteristics("Lucene", self.ingestion_thread_count, self.thread_counts)
        results.index_type = "Lucene"

        # Don't know how to determine bits per posting for Lucene.
        results.bits_per_posting = math.nan

        with open(self.lucene_build_index_log, 'r') as myfile:
            build_index_log = myfile.read()
            results.total_ingestion_time = \
                float(re.findall("Ingested \d+ chunk files in (\d+\.?\d+) seconds.", build_index_log)[0])

        for i, threads in enumerate(self.thread_counts):
            run_queries_log = self.lucene_run_queries_log[i]
            with open(run_queries_log, 'r') as myfile:
                data = myfile.read()
                results.append_float_field("qps", "QPS:", data)
                results.append_float_field("mps", "MPS:", data)
                results.append_float_field("mpq", "MPQ:", data)
                results.append_float_field("mean_query_latency", "Mean query latency:", data)
                results.append_float_field("planning_overhead", r"Planning overhead:", data)

        # Lucene false positive rate is always zero.
        results.false_positive_rate = 0;
        results.false_negative_rate = 0;

        return results


    ###########################################################################
    #
    # MG4J
    #
    ###########################################################################
项目:sigir2017-bitfunnel    作者:BitFunnel    | 项目源码 | 文件源码
def analyze_mg4j_index(self):
        results = IndexCharacteristics("MG4J", self.ingestion_thread_count, self.thread_counts)

        # Compute bits/posting.
        with open(self.mg4j_run_queries_log[0], 'r') as myfile:
            run_queries_log = myfile.read()

            posting_count = float(re.findall("postings=(\d+\.?\d+)", run_queries_log)[0])
            pointers = os.path.join(self.mg4j_index_path, self.basename + "-text.pointers");
            results.bits_per_posting = os.path.getsize(pointers) / posting_count * 8.0

        # Need to annotate build log from Python since Java code doesn't print time.
        results.total_ingestion_time = math.nan

        for i, threads in enumerate(self.thread_counts):
            run_queries_log = self.mg4j_run_queries_log[i]
            with open(run_queries_log, 'r') as myfile:
                data = myfile.read()
            results.append_float_field("qps", "QPS:", data)
            results.append_float_field("mps", "MPS:", data)
            results.append_float_field("mpq", "MPQ:", data)
            results.append_float_field("mean_query_latency", "Mean query latency:", data)
            results.planning_overhead.append(math.nan)

        # MG4J false positive rate is always zero.
        results.false_positive_rate = 0;
        results.false_negative_rate = 0;

        return results


    ###########################################################################
    #
    # Partitioned Elias-Fano (PEF)
    #
    ###########################################################################
项目:sigir2017-bitfunnel    作者:BitFunnel    | 项目源码 | 文件源码
def __init__(self, index_type, ingestion_thread_count, thread_counts):
        self.index_type = index_type
        self.ingestion_thread_count = ingestion_thread_count
        self.thread_counts = thread_counts
        self.bits_per_posting = math.nan
        self.total_ingestion_time = math.nan
        self.false_positive_rate = math.nan
        self.false_negative_rate = math.nan
        self.qps = []
        self.mps = []
        self.mpq = []
        self.mean_query_latency = []
        self.planning_overhead = []
项目:sigir2017-bitfunnel    作者:BitFunnel    | 项目源码 | 文件源码
def __init__(self, gov2_directories, min_terms_per_document, max_terms_per_document):
        self.gov2_directors = gov2_directories
        self.min_terms_per_document = min_terms_per_document
        self.max_terms_per_document = max_terms_per_document
        self.documents = math.nan
        self.terms = math.nan
        self.postings = math.nan
        self.bytes = math.nan
        self.matches_per_query = math.nan
项目:ROS-Code    作者:Richienb    | 项目源码 | 文件源码
def constant(constanttype):
    import math
    constanttype = constanttype.lower()
    if constanttype == 'pi':
        return math.pi
    elif constanttype == 'e':
        return math.e
    elif constanttype == 'tau':
        return math.tau
    elif constanttype == 'inf':
        return math.inf
    elif constanttype == 'nan':
        return math.nan

# Find The Power Of A Number
项目:sigir2017-bitfunnel    作者:BitFunnel    | 项目源码 | 文件源码
def latex_performance_one(experiment_number, experiment, thread):
    bf = experiment.analyze_bf_index()
    lucene = experiment.analyze_lucene_index()
    mg4j = experiment.analyze_mg4j_index()
    pef = experiment.analyze_pef_index()

    print(r"    \multirow{{5}}{{*}}{{{}}}".format(chr(ord('A') + experiment_number)), end='')
    print(r"& {:<25} & {:>10,.0f} & {:>10,.0f} & {:>10,.0f} & {:>10,.0f} \\".format(
        "QPS",
        bf.qps[thread],
        pef.qps[thread],
        mg4j.qps[thread],
        lucene.qps[thread]))

    print(r"                      ", end='')
    print(r"& {:<25} & {:>10,.2f} & {:>10,.2f} & {:>10,.2f} & {:>10,.2f} \\".format(
        "Fixed overhead (\%)",
        bf.planning_overhead[thread] * 100,
        pef.planning_overhead[thread] * 100,
        mg4j.planning_overhead[thread] * 100,
        lucene.planning_overhead[thread] * 100))

    print(r"                      ", end='')
    print(r"& {:<25} & {:>10,.2f} & {:>10,.2f} & {:>10,.2f} & {:>10,.2f} \\".format(
        "False positives (\%)",
        bf.false_positive_rate * 100,
        pef.false_positive_rate * 100,
        mg4j.false_positive_rate * 100,
        lucene.false_positive_rate * 100))

    print(r"                      ", end='')
    print(r"& {:<25} & {:>10,.2f} & {:>10,.2f} & {:>10,.2f} & {:>10,.2f} \\".format(
        "Bits per posting",
        bf.bits_per_posting,
        pef.bits_per_posting,
        mg4j.bits_per_posting,
        lucene.bits_per_posting))

    print(r"                      ", end='')
    print(r"& {:<25} & {:>10,.0f} & {:>10,.0f} & {:>10,.0f} & {:>10,.0f} \\".format(
        "DQ",
        bf.qps[thread] / bf.bits_per_posting,
        pef.qps[thread] / pef.bits_per_posting,
        mg4j.qps[thread] / mg4j.bits_per_posting,
        math.nan))
项目:fcompile    作者:azag0    | 项目源码 | 文件源码
def scheduler(tasks: Dict[Source, Task],
                    task_queue: TaskQueue,
                    result_queue: ResultQueue,
                    tree: TaskTree,
                    hashes: Dict[Filename, Hash],
                    changed_files: List[Source]) -> None:
    """Schedule tasks and handle compiled tasks."""
    start = time.time()
    n_all_lines = sum(tree.line_nums[src] for src in changed_files)
    n_lines = 0
    waiting = set(changed_files)
    scheduled: Set[Source] = set()
    while True:
        blocking = waiting | scheduled
        for src in list(waiting):
            if not (blocking & tree.ancestors[src]):
                hashes.pop(src, None)  # if compilation gets interrupted
                task_queue.put_nowait((
                    -tree.priority[src],
                    src,
                    Args(tasks[src].args + (str(tasks[src].source),))
                ))
                scheduled.add(src)
                waiting.remove(src)
        sys.stdout.write(
            f' Progress: {len(waiting)} waiting, {len(scheduled)} scheduled, '
            f'{n_lines}/{n_all_lines} lines ({100*n_lines/n_all_lines:.1f}%), '
            f'ETA: {(time.time()-start)*n_all_lines/(n_lines or nan):.1f} s\r'
        )
        sys.stdout.flush()
        if not blocking:
            break
        src, retcode, clock = await result_queue.get()
        if retcode != 0:
            raise CompilationError(src, retcode)
        clocks.append((src, clock, tree.line_nums[src]))
        hashes[src] = tree.hashes[src]
        n_lines += tree.line_nums[src]
        scheduled.remove(src)
        pprint(f'Compiled {src}.')
        for mod in tree.src_mods[src]:
            modfile = mod + '.mod'
            modhash = get_hash(Path(modfile))
            if modhash != hashes.get(modfile):
                hashes[modfile] = modhash
                for src in tree.mod_uses[mod]:
                    assert src not in scheduled
                    hashes.pop(src, None)
                    if src not in waiting:
                        n_all_lines += tree.line_nums[src]
                        waiting.add(src)