Python database 模块,Database() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用database.Database()

项目:IotCenter    作者:panjanek    | 项目源码 | 文件源码
def __init__(self, config):
        self.service = None
        self.webServer = None
        self.config = config
        self.httpsPort = int(self.config.get('web', 'httpsPort'))
        self.httpPort = int(self.config.get('web', 'httpPort'))        
        self.adminPasswordHash = self.config.get('web', 'adminPasswordHash')
        self.apiSecret = self.config.get('web', 'apiSecret')
        self.uploadDir = self.config.get('web', 'uploadDir')
        self.dbFile = self.config.get('web', 'dbFile')
        self.httpsCertFile = self.config.get('web', 'httpsCertFile')
        self.httpsKeyFile = self.config.get('web', 'httpsKeyFile')
        self.httpsChainFile = self.config.get('web', 'httpsChainFile')
        self.localVideoPort = int(self.config.get('web', 'localVideoPort'))
        dir = os.path.dirname(os.path.realpath(sys.argv[0]))        
        self.database = database.Database(self.dbFile)
        self.deviceConfig = dict()
        for deviceId, jsonConf in dict(self.config.items('devices')).iteritems():
            self.deviceConfig[deviceId] = json.loads(jsonConf, object_pairs_hook=OrderedDict)
        self.trends = dict()
        self.lock = threading.Lock()
项目:bandit-http-server    作者:evancasey    | 项目源码 | 文件源码
def __init__(self, request):

        arms = {}
        for i in range(request.json['arm_count']):          
            arm = Arm()
            arms[arm.id] = arm.__dict__

        self.id = self.bandit_keys()
        self.name = request.json['name']
        self.arm_count = request.json['arm_count']
        self.arms = arms
        self.algo_type = request.json['algo_type']
        self.budget_type = request.json['budget_type']
        self.budget = request.json['budget']
        self.epsilon = request.json['epsilon']
        self.reward_type = request.json['reward_type']
        self.max_reward = 1
        self.total_reward = 0
        self.total_count = 0
        self.regret = 0         

        Database().hset("bandits", self.id, self.__dict__)
项目:Enibar    作者:ENIB    | 项目源码 | 文件源码
def set_multiple_values(prices):
    """ Update all prices with their new values
    Note: this function should be prefered when dealing with multiple prices at
    once for performance purposes.

    :param list prices: List of prices
    """
    with Database() as database:
        database.transaction()
        cursor = QtSql.QSqlQuery(database)
        cursor.prepare("UPDATE prices SET value=:value WHERE id=:id")
        for price in prices:
            cursor.bindValue(":id", price['id'])
            cursor.bindValue(":value", price['value'])
            cursor.exec_()

        database.commit()
        return True
项目:Enibar    作者:ENIB    | 项目源码 | 文件源码
def transactions(notes, diff, *, do_not=False):
    """ Change the note on multiple notes

        :param str nickname: The nickname of the note
        :param float diff: Will add the diff to the note.

        :return bool: True if success else False
    """
    with Database() as database:
        database.transaction()
        cursor = QtSql.QSqlQuery(database)
        cursor.prepare("UPDATE notes SET note=note+:diff WHERE nickname=:nick")
        for nick in notes:
            cursor.bindValue(':nick', nick)
            cursor.bindValue(':diff', diff)
            cursor.exec_()
        value = database.commit()
    if not do_not:
        api.redis.send_message("enibar-notes", notes)
    return value
项目:aria    作者:mattmaynes2    | 项目源码 | 文件源码
def setUp(self):

        try:
            os.remove(self._testMethodName + ".db")
        except OSError as e:
            pass

        self.database    = Database(self._testMethodName + ".db")
        self.hub         = Hub()
        self.hub.mode    = HubMode.Learning
        self.cli         = CLI(self.hub)
        self.exchange    = Exchange(self.hub, self.cli, self.database)
        self.hub.addCommand(GetEventWindowCommand(self.database))
        self.hub.addCommand(GetDeviceEventsCommand(self.database))
        self.testAdapter = StubDeviceAdapter()
        self.exchange.register('stub', self.testAdapter)
        self.exchange.register('hub',HubAdapter(self.hub))
        self.exchange.discovered(self.hub)
        self.registerFakeDevices()
        self.exchange.start()

        self.db = TestDatabase(self._testMethodName + ".db")
项目:aria    作者:mattmaynes2    | 项目源码 | 文件源码
def setUp(self):
        try:
            os.remove(self._testMethodName + ".db")
        except OSError as e:
            pass

        self.database    = Database(self._testMethodName + ".db")
        self.hub         = Hub()
        self.hub.mode    = HubMode.Learning
        self.cli         = CLI(self.hub)
        self.exchange    = Exchange(self.hub, self.cli, self.database)
        self.registerAdapters()
        self.testAdapter = StubDeviceAdapter()
        self.exchange.register('stub', self.testAdapter)
        self.exchange.register('hub',HubAdapter(self.hub))
        self.exchange.discovered(self.hub)
        self.exchange.start()
        self.db = TestDatabase(self._testMethodName + ".db")
        self.exchange.discovered(self.device)
项目:aria    作者:mattmaynes2    | 项目源码 | 文件源码
def main ():
    global hub, cli, exchange
    argv = args.parse()
    if argv.daemonize:
        daemon.daemonize()

    pid = os.getpid()
    signal.signal(signal.SIGTERM, signalHandler)

    database    = Database()
    hub         = Hub(argv, exit)
    cli         = CLI(hub)
    setupCommands(hub,database)
    exchange    = create_exchange(hub, cli, database)
    exchange.discovered(hub)

    cli.start()
    exchange.start()
项目:BlitzcrankBotV2    作者:SuperFrosty    | 项目源码 | 文件源码
def view(self, ctx):
        """View your server's default region"""
        title = "Default Region for {0}:".format(ctx.guild.name)
        try:
            db = database.Database('guilds.db')
            region = db.find_entry(ctx.guild.id)
            db.close_connection()
            embed = discord.Embed(
                title=title, description=region, colour=0x1AFFA7)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
        except TypeError:
            embed = discord.Embed(
                title=title,
                description="A default region for this server has not been set!",
                colour=0x1AFFA7)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
项目:BlitzcrankBotV2    作者:SuperFrosty    | 项目源码 | 文件源码
def remove(self, ctx):
        """Remove your server's default region"""
        db = database.Database('guilds.db')
        try:
            db.find_entry(ctx.guild.id)
            db.remove_entry(ctx.guild.id)
            db.close_connection()
            embed = discord.Embed(
                title='Success!',
                description="Default region for {0} has been cleared!".format(
                    ctx.guild.name),
                colour=0x1AFFA7)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
        except TypeError:
            db.close_connection()
            embed = discord.Embed(
                title="Error!",
                description="A default region for this server has not been set!",
                colour=0xCA0147)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
项目:BlitzcrankBotV2    作者:SuperFrosty    | 项目源码 | 文件源码
def no_region_check(self, ctx, region):
        """Returns server's default region if none is set"""
        if region is None:
            try:
                db = database.Database('guilds.db')
                region = db.find_entry(ctx.guild.id)
                db.close_connection()
                return region
            except TypeError:
                embed = self.error_embed(
                    ctx, "Please specify a region, or set a default region with `b!region set [region]`.")
                await ctx.send("", embed=embed)
                return
        else:
            return region

# Base finished
项目:porn_sieve    作者:PornSieve    | 项目源码 | 文件源码
def run(self):
        db = Database()
        site = site_selector(self.site)
        for i in range(self.start_pg, self.start_pg + self.max_pgs + 1):
            gal_url = site.fmt_gallery(self.niche, i)
            for j, (vid_url, img_url) in enumerate(site.scrape_gallery(gal_url)):
                # update progress bar
                progress = (i * 20 + j) / ((self.max_pgs+1) * 20) * 100
                self.updateProgress.emit(progress)

                data = site.scrape_video(vid_url)
                if not data:
                    continue
                data["img"] = img_url
                if not db.has_feedback(data["url"]):
                    db.save(data)
                    with self.winlock:
                        self.q.put((-self.pred.predict(data), data["url"]))
        self.updateProgress.emit(100)
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def createPlaylist(self, token):
        #db = Database()
        #token = db.getEventSpotifyToken(eventID)
        #use GET command to get user info
        req = requests.get("https://api.spotify.com/v1/me", headers={"Authorization":'Bearer ' + token})
        j = json.loads(req.text)
        userID = j['id'] 
        #print("userID === " + userID)
        playlist_name = 'Chorus'
        sp = spotipy.Spotify(auth=token)
        sp.trace = False
        sp.user_playlist_create(userID, playlist_name)
        playlists = sp.user_playlists(userID, limit=50, offset=0)

        for playlist in playlists['items']:
            if(playlist['name'] == "Chorus"):
                playlist_id = playlist['id']

        headers={"Authorization":'Bearer ' + token}
        requests.put('https://api.spotify.com/v1/me/player/shuffle?state=false',headers={"Authorization":'Bearer ' + token})

        data = []
        data.append(userID)
        data.append(playlist_id)
        return data
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def addSongs(self, eventID):
        db = Database()
        trackID = db.getTopSong(eventID)
        spotify = Spotify()
        token = db.getEventSpotifyToken(eventID)
        if(trackID == -1):
            trackID = spotify.recommend_fallback(eventID)

        username = db.getHostSpotifyUserName(eventID)
        playlist_id = db.getPlaylistID(eventID)
        if(trackID == -1): print("NO TOP VOTED")
        sp = spotipy.Spotify(auth=token)
        sp.trace = False
        print("100%")
        sp.user_playlist_add_tracks(username, str(playlist_id), {str(trackID)})
        print("NOT 100%")
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def addTwo(self, eventID):
        db = Database()
        token = db.getEventSpotifyToken(eventID)
        username = db.getHostSpotifyUserName(eventID)
        playlist_id = db.getPlaylistID(eventID)
        #print("--------")
        #print(username)
        #print(playlist_id)
        #print("--------")
        sp = spotipy.Spotify(auth=token)
        sp.trace = False
        #ERROR: int has no attribute split
        sp.user_playlist_add_tracks(username, str(playlist_id), {'0KKkJNfGyhkQ5aFogxQAPU', '6b8Be6ljOzmkOmFslEb23P'})
        #, '0KKkJNfGyhkQ5aFogxQAPU'
        db.updateCurrentSong('0KKkJNfGyhkQ5aFogxQAPU', eventID)
        db.insertSong('0KKkJNfGyhkQ5aFogxQAPU', eventID, "0", "Thats What I Like", "Bruno Mars", "0", "0", "0")
        db.insertSong('6b8Be6ljOzmkOmFslEb23P', eventID, "0", "24K Magic", "Bruno Mars", "0", "0", "0")
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def recommend_fallback(self, eventID): 
        db = Database()
        token = db.getEventSpotifyToken(eventID)
        playlist_id = db.getPlaylistID(eventID)
        username = db.getHostID(eventID)
        track_id = db.getCurrentPlayingSong(eventID)
        headers={"Authorization":'Bearer ' + token}
        count = 0
        sp = spotipy.Spotify(auth=token)
        sp.trace = False
        req = requests.get('https://api.spotify.com/v1/recommendations?seed_tracks=' + track_id, headers={"Authorization":"Bearer "+str(token)})
        #print(req.content)
        json_obj = json.loads(req.text)
        for i in json_obj['tracks']:
            if(count < 1):
                return i
#addSongs(token, i, playlist_id, username)
                count = count + 1
            break
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def CreateUser(currentEvent, inEvent, isHost):
    try:
        # Parse the arguments
        '''
        parser = reqparse.RequestParser()
        parser.add_argument('currentEvent', type=str)
        parser.add_argument('inEvent', type=str)
        parser.add_argument('host', type=str)
        args = parser.parse_args()
        '''     


        db = Database()
        userID = db.insertUser(currentEvent, inEvent, isHost)

        return userID

    except Exception as e:
        return str(e)

#class CreateHost(Resource):
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def post(self):
        try:
            # Parse the arguments
            parser = reqparse.RequestParser()
            parser.add_argument('userid', type=str, help='ID of User that is sending vote')
            parser.add_argument('eventid', type=str, help='ID of event user is in')
            args = parser.parse_args()

            _userID = args['userid']
            _eventID = args['eventid']

            db = Database()
            songs = db.getQueue(_eventID, _userID)

            return json.dumps({'songs': songs})

        except Exception as e:
            return {'error': str(e)}
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def post(self):
        try:
            # Parse the arguments                                                                                               
            parser = reqparse.RequestParser()
            parser.add_argument('userid', type=str, help='ID of User that is sending vote')
            parser.add_argument('eventid', type=str, help='ID of event user is in')
            args = parser.parse_args()

            _userID = args['userid']           
            _eventID = args['eventid']

            db = Database()
            print(_eventID)
            print(_userID)
            songs = db.getPlayedSongs(_eventID, _userID)

            return json.dumps({'songs': songs})

        except Exception as e:
            return {'error': str(e)}
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def post(self):
        try:
            # Parse the arguments
            parser = reqparse.RequestParser()
            parser.add_argument('password', type=str, help='Password to create user')
            args = parser.parse_args()

            _eventPassword = args['password']

            #print(_eventPassword)
            db = Database()
            #print("name :" +_eventPassword)

            eventID = db.getEventid(_eventPassword)
            userID  = CreateUser(eventID, "0", "0")

            return json.dumps({'eventID': eventID, 'userID': userID})

        except Exception as e:
            return {'error': str(e)}
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def authtarget():
    sp  = Spotify()
    db = Database()
    resultList = []
    while True:
        resultList = db.getAllEventID()
        time.sleep(10)
        for i in resultList:
            t = threading.Thread(target = authtarget)
            t.daemon = True
            t.start()
            sp.timer(i) 

#define API endpoints
#api.add_resource(CreateUser, '/CreateUser')
#api.add_resource(CreateHost, '/CreateHost')
项目:RobotNSGA    作者:LuisLaraP    | 项目源码 | 文件源码
def main(args):
    '''Graph module's main function'''
    abs_path = ''
    if args.database is None:
        abs_path = os.getcwd()
    else:
        abs_path = os.path.abspath(args.database)
    if not database.is_database(abs_path, verbose=True):
        sys.exit()

    db = database.Database(abs_path)

    for graph_type in args.types:
        if graph_type == 'fitness':
            graph.fitness_graph(db, args.objectives)
        elif graph_type == 'pareto':
            graph.pareto_graph(db, args.objectives)
项目:RobotNSGA    作者:LuisLaraP    | 项目源码 | 文件源码
def initialize_database(args, default_dir):
    '''Performs the necessary checks and initialization procedures to initialize the database

    Returns the database to be used.
    '''
    if args.database is None:
        args.database = default_dir
    ret_db = Database(args.database)
    if args.reset:
        ret_db.reset()
    if ret_db.properties['highest_population'] == 0:
        if args.size is None:
            print('ERROR: Population size must be specified when starting a new run.')
            sys.exit()
        ret_db.set_property('population_size', args.size)
    else:
        ret_db.select()
    return ret_db
项目:RobotNSGA    作者:LuisLaraP    | 项目源码 | 文件源码
def test(args):
    '''Runs a single test, using a prevously generated individual'''
    global database
    global goal_positions
    pygame.init()
    random.seed()
    np.random.seed()
    if args.database is None:
        args.database = 'RobotTrainingData'
    database = Database(args.database)
    problem = EV3Problem(log_to_file=False)

    res_path = os.path.abspath(pkg_resources.resource_filename('resources.ev3', 'y_test.txt'))
    goal_positions = np.loadtxt(res_path)

    database.select(args.generation)
    chromosome = np.fromstring(database.load()['I' + str(args.individual)]).tolist()
    problem.robot.home()
    for goal in goal_positions:
        problem.run_test(goal, chromosome)
项目:Mk3-Firmware    作者:emfcamp    | 项目源码 | 文件源码
def set_NTP_time():
    import time
    from pyb import RTC
    print("Setting time from NTP")

    t = get_NTP_time()
    if t is None:
        print("Could not set time from NTP")
        return False

    tz = 0
    with database.Database() as db:
        tz = db.get("timezone", 0)

    tz_minutes = int(abs(tz) % 100) * (1 if tz >= 0 else -1)
    tz_hours = int(tz / 100)
    t += (tz_hours * 3600) + (tz_minutes * 60)

    tm = time.localtime(t)
    tm = tm[0:3] + (0,) + tm[3:6] + (0,)

    rtc = RTC()
    rtc.init()
    rtc.datetime(tm)

    return True
项目:bandit-http-server    作者:evancasey    | 项目源码 | 文件源码
def bandit_keys(self):
        return Database().hincrby("unique_ids", "bandit")
项目:bandit-http-server    作者:evancasey    | 项目源码 | 文件源码
def get_bandit(self, id):
        return eval(Database().hget("bandits", id))
项目:bandit-http-server    作者:evancasey    | 项目源码 | 文件源码
def set_bandit(self, id, bandit):
        Database().hset("bandits", id, bandit)
项目:bandit-http-server    作者:evancasey    | 项目源码 | 文件源码
def is_bandit(self,id):
        if Database().hexists("bandits",id):
            return True
        else:
            return False
项目:bandit-http-server    作者:evancasey    | 项目源码 | 文件源码
def arm_keys(self):     
        return Database().hincrby("unique_ids", "arm")
项目:Belati    作者:aancw    | 项目源码 | 文件源码
def __init__(self):
        self.db = Database()
        self.project_id = 0
项目:CloudKitPy    作者:Baza207    | 项目源码 | 文件源码
def __init__(
        self,
        container_identifier,
        environment,
        apns_environment=None,
        api_token=None,
        server_to_server_key=None,
        cert_path=None,
        logger=None
    ):
        self.__logger = logger

        self.container_identifier = container_identifier
        self.environment = environment
        self.apns_environment = apns_environment
        self.api_token = api_token
        self.server_to_server_key = server_to_server_key
        self.cert_path = cert_path

        # Setup public and private cloud databases
        self.public_cloud_database = Database(
            self,
            'public',
            self.__logger
        )
        self.private_cloud_database = Database(
            self,
            'private',
            self.__logger
        )

    # Discovering Users
项目:Enibar    作者:ENIB    | 项目源码 | 文件源码
def add_descriptor(name, category, quantity):
    """ Add price descriptor

    :param str name: Price name
    :param int category: category id
    :param int quantity: The quantity (in mL) of the product
    :return int: Return price id created or None
    """
    with Database() as database:
        database.transaction()
        cursor = QtSql.QSqlQuery(database)
        cursor.prepare("INSERT INTO price_description(label, category, quantity) \
            VALUES(:label, :category, :quantity)")
        cursor.bindValue(":label", name)
        cursor.bindValue(":category", category)
        cursor.bindValue(":quantity", quantity)
        if cursor.exec_():
            desc_id = cursor.lastInsertId()
            cursor.prepare("SELECT id FROM products WHERE category=:category")
            cursor.bindValue(":category", category)
            if cursor.exec_():
                while cursor.next():
                    pcurser = QtSql.QSqlQuery(database)
                    pcurser.prepare("INSERT INTO prices(price_description, \
                            product, value) VALUES(:desc, :product, :value)")
                    pcurser.bindValue(":desc", desc_id)
                    pcurser.bindValue(":product", cursor.value('id'))
                    pcurser.bindValue(":value", 0.00)
                    pcurser.exec_()
            database.commit()
            return desc_id

        # Something went wrong
        database.rollback()
        return None
项目:CMS    作者:Vinlagrenouille    | 项目源码 | 文件源码
def get_db():
    db = getattr(g, '_database', None)
    if db is None:
        g._database = Database()
    return g._database
项目:wifi-probe-hack    作者:kings-way    | 项目源码 | 文件源码
def __init__(self, mon):
        self.mon = mon
        self.stations = []
        self.ssids = []
        self.db = Database()
        self.airbase_process = Process(target=self.launch_airbase)
        self.airbase_process.daemon = True
        self.airbase_process.start()

    # deduplicate the ssids
项目:aria    作者:mattmaynes2    | 项目源码 | 文件源码
def setUp(self):
        try:
            os.remove(self._testMethodName + ".db")
        except OSError as e:
            pass
        self.database    = Database(self._testMethodName + ".db")
        self.testDatabase  =TestDatabase(self._testMethodName + ".db") 
        self.retriever = Retriever(self.database)
项目:aria    作者:mattmaynes2    | 项目源码 | 文件源码
def setUp(self):
        try:
            os.remove(self._testMethodName + ".db")
        except OSError as e:
            pass
        self.database    = Database(self._testMethodName + ".db")
项目:aria    作者:mattmaynes2    | 项目源码 | 文件源码
def setUp(self):
        try:
            os.remove(self._testMethodName + ".db")
        except OSError as e:
            pass
        self.database    = Database(self._testMethodName + ".db")
项目:multichain-walker-simulation    作者:pimveldhuisen    | 项目源码 | 文件源码
def __init__(self, max_time, log_dir, verbose, persistent_walking, directed_walking, block_limit, alpha,
                 teleport_probability):
        self.max_time = max_time
        self.bootstrap = Node(0, self)
        self.nodes = []
        self.event_queue = []
        self.time = 0
        self.block_file = os.path.join(log_dir, 'blocks.dat')
        self.load_balance_file = os.path.join(log_dir, 'load.dat')
        self.ranking_deviation_file = os.path.join(log_dir, 'ranking.dat')
        self.verbose = verbose
        self.persistent_walking = persistent_walking
        self.directed_walking = directed_walking
        self.block_limit = block_limit
        self.alpha = alpha
        self.teleport_probability = teleport_probability
        self.last_progress_print = None

        print "Reading multichain database.."
        database = Database("multichain.db", self.block_limit)
        public_keys = database.get_identities()
        for public_key in public_keys:
            node = Node(public_key, self, self.persistent_walking, self.directed_walking, self.alpha,
                        self.teleport_probability)
            node.add_blocks(database.get_blocks(public_key))
            node.receive_identity(self.bootstrap)
            node.send_identity(self.bootstrap)
            self.nodes.append(node)
            self.add_event(Simulation.initialisation_delay(), node.take_walk_step)
            self.add_event(0, node.update_ranking)

        print "Calculating rankings.."
        # Here rankings are calculated based on the full database, not the individual databases of the nodes
        self.rankings = {}
        for public_key in public_keys:
            self.rankings[str(public_key)] = get_ranking(database, public_key)


        print "Scheduling data gathering.."
        self.log_data_times = range(self.max_time, -60000, -60000)
        print self.log_data_times
项目:multichain-walker-simulation    作者:pimveldhuisen    | 项目源码 | 文件源码
def test_ranking_integrated(self):

        global_database = Database("../multichain.db", 100)
        personal_database = Database("temp.db", None)
        public_key = random.choice(global_database.get_identities())
        personal_database.add_blocks(global_database.get_blocks(public_key))
        global_ranking = get_ranking(global_database, public_key)
        personal_ranking = get_ranking(personal_database, public_key)

        ranking_similarity = calculate_ranking_similarity(personal_ranking, global_ranking)
        assert ranking_similarity <= 1
        assert ranking_similarity >= 0
项目:multichain-walker-simulation    作者:pimveldhuisen    | 项目源码 | 文件源码
def test_ranking_integrated_full_database(self):

        global_database = Database("../multichain.db", 100)
        personal_database = Database("temp.db", None)
        public_key = random.choice(global_database.get_identities())
        for public_key in global_database.get_identities():
            personal_database.add_blocks(global_database.get_blocks(public_key))
        global_ranking = get_ranking(global_database, public_key)
        personal_ranking = get_ranking(personal_database, public_key)

        ranking_similarity = calculate_ranking_similarity(personal_ranking, global_ranking)

        self.assertEqual(ranking_similarity, 1)
项目:multichain-walker-simulation    作者:pimveldhuisen    | 项目源码 | 文件源码
def __init__(self, public_key, simulation, persistent_walking=False, directed_walking=False, alpha=0.1,
                 teleport_probability=0.5):
        self.public_key = public_key
        self.simulation = simulation
        self.live_edges = []
        self.node_directory = "nodes/" + base64.encodestring(str(self.public_key))
        if not os.path.exists(self.node_directory):
            os.makedirs(self.node_directory)
        self.block_database = Database(self.node_directory + "/multichain.db")
        self.log = open(self.node_directory + "/log.txt", 'w')

        self.directed_walking = directed_walking
        self.persistent_walking = persistent_walking

        if self.persistent_walking:
            self.teleport_probability = teleport_probability
            self.current_walk = None
            if self.directed_walking:
                self.walk_function = self.walk_statefull_directed
            else:
                self.walk_function = self.walk_statefull_undirected
        else:
            if self.directed_walking:
                self.walk_function = self.walk_stateless_directed
            else:
                self.walk_function = self.walk_stateless_undirected

        self.ranking = {}
        self.number_of_requests_received = 0
        self.alpha = alpha
项目:PyTaskHelper    作者:AvSinStudio    | 项目源码 | 文件源码
def build_task_database(files, entry_class):
    task_db = database.Database(entry_class)
    for filename in files:
        with open(filename, encoding='utf-8') as file:
            data = json.load(file)
        for task in data:
            task_db.add_entry(task)
    if 'finalize' in dir(task_db):
        task_db.finalize()
    return task_db
项目:RelayBot2.0    作者:nukeop    | 项目源码 | 文件源码
def __init__(self, logfilename=None):
        self.configure_logging(logfilename)
        errors.set_exception_handler()
        self.user = None

        self.database = database.Database('relaybot.db')

        self.import_plugins()
        self.plugins = []

        for plugin in plugins.plugin.Plugin.__subclasses__():
            plugininst = plugin(self)
            self.plugins.append(plugininst)
项目:MTodo    作者:mortezaipo    | 项目源码 | 文件源码
def __init__(self):
        self.__config = Config()
        self.__database = Database(self.__config.database_path)
项目:MTodo    作者:mortezaipo    | 项目源码 | 文件源码
def start():
    """Start progress."""
    config = Config()
    config.start()

    database = Database(config.database_path)
    database.start()

    interface = Interface()
    interface.start()
项目:MTodo    作者:mortezaipo    | 项目源码 | 文件源码
def __init__(self):
        """Initialize UserInterface class."""
        self.__config = Config()
        self.__action = Action()
        self.__database = Database(self.__config.database_path)
项目:BlitzcrankBotV2    作者:SuperFrosty    | 项目源码 | 文件源码
def on_guild_join(self, guild):
        """Check for bot collection"""
        l = list(filter(lambda m: m.bot, guild.members))
        members = len(guild.members)
        if len(l) / members >= .55:
            bots = "{0:.0F}% bots".format(100 * (len(l) / members))
            channel_test = discord.utils.find(lambda c: c.permissions_for(
                c.guild.me).send_messages, guild.text_channels)
            await channel_test.send("To avoid bot collection servers, I auto leave any server where 55% or above of the users are bots, sorry!")
            await guild.leave()
            embed = discord.Embed(title="Left Server", colour=0x1affa7)
            embed.add_field(name="Server:", value=guild.name, inline=True)
            embed.add_field(
                name="Reason:", value="Bot collection server", inline=True)
            embed.add_field(name="Users:", value=members, inline=True)
            embed.add_field(name="Justification:", value=bots, inline=True)
            channel = self.bot.get_channel(295831639219634177)
            await channel.send('', embed=embed)
        else:
            embed = discord.Embed(title="Joined Server", colour=0x1affa7)
            embed.add_field(name="Server:", value=guild.name, inline=True)
            embed.add_field(name="Users:", value=members, inline=True)
            embed.add_field(name="Total:", value=len(
                self.bot.guilds), inline=True)
            channel = self.bot.get_channel(295831639219634177)
            await channel.send('', embed=embed)
            channel_test = discord.utils.find(lambda c: c.permissions_for(
                c.guild.me).send_messages, guild.text_channels)
            await channel_test.send("Beep, boop! To set up a default LoL region for my lookup commands, please use the `b!region set` command! (Example, `b!region set OCE`)")
            db = database.Database("guilds.db")
            db.add_table(str(guild.id))
            db.close_connection()
            await self.post_stats()
项目:BlitzcrankBotV2    作者:SuperFrosty    | 项目源码 | 文件源码
def set(self, ctx, region: str):
        """Set your server's default region"""
        try:
            Summoner(name="", region=region)
        except ValueError:
            embed = discord.Embed(
                title="Error!",
                description="{0} is not a valid region!".format(region),
                colour=0xCA0147)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
            return
        db = database.Database('guilds.db')
        try:
            region_found = db.find_entry(ctx.guild.id)
            db.close_connection()
            embed = discord.Embed(
                title="Error!",
                description="{0} is already {1}'s default region!".format(
                    region_found, ctx.guild.name),
                colour=0xCA0147)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
        except TypeError:
            db.add_entry(ctx.guild.id, region)
            db.close_connection()
            embed = discord.Embed(
                title="Success!",
                description="{0} set as {1}'s default region!".format(
                    region, ctx.guild.name),
                colour=0x1AFFA7)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
项目:BlitzcrankBotV2    作者:SuperFrosty    | 项目源码 | 文件源码
def update(self, ctx, region: str):
        """Update your server's default region"""
        try:
            Summoner(name="", region=region)
        except ValueError:
            embed = discord.Embed(
                title="Error!",
                description="{0} is not a valid region!".format(region),
                colour=0xCA0147)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
            return
        db = database.Database('guilds.db')
        try:
            db.find_entry(ctx.guild.id)
            db.update_entry(ctx.guild.id, region)
            db.close_connection()
            embed = discord.Embed(
                title='Success!',
                description="Set {0} as {1}'s default region!".format(
                    region, ctx.guild.name),
                colour=0x1AFFA7)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
        except TypeError:
            db.close_connection()
            embed = discord.Embed(
                title="Error!",
                description="A default region for this server has not been set!",
                colour=0xCA0147)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
项目:wednesday    作者:wcauchois    | 项目源码 | 文件源码
def test_pubsub():
    async with create_pool(get_db_url()) as e:
        async with e.acquire() as listen_conn:
            listener = listen_helper(listen_conn)
            db = Database()
            await db.startup()
            await asyncio.gather(listener, db.insert_post(parent_id=290, content='testing notify'))
            print("listen/notify done!")