我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用RPi.GPIO.wait_for_edge()。
def measureDistance_HCSRO4(TRIG_ID, ECHO_ID): GPIO.output(TRIG_ID, True) time.sleep(0.00001) GPIO.output(TRIG_ID, False) edge_detect = GPIO.wait_for_edge(ECHO_ID, GPIO.RISING, timeout = 100) if edge_detect is not None: pulse_start = time.time() else: return MAX_DIST edge_detect = GPIO.wait_for_edge(ECHO_ID, GPIO.FALLING, timeout = 100) if edge_detect is not None: pulse_end = time.time() else: return MAX_DIST pulse_duration = pulse_end - pulse_start distance = round(pulse_duration * 17150, 2) return distance
def start(): last = GPIO.input(button) while True: val = GPIO.input(button) GPIO.wait_for_edge(button, GPIO.FALLING) # we wait for the button to be pressed GPIO.output(lights[1], GPIO.HIGH) inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NORMAL, device) inp.setchannels(1) inp.setrate(16000) inp.setformat(alsaaudio.PCM_FORMAT_S16_LE) inp.setperiodsize(500) audio = "" while(GPIO.input(button)==0): # we keep recording while the button is pressed l, data = inp.read() if l: audio += data rf = open(path+'recording.wav', 'w') rf.write(audio) rf.close() inp = None alexa()
def wait_edge(self, io_number): if self.importlib is not None: # La première consiste à bloquer l'exécution du programme jusqu'à ce que l'événement se produise. return GPIO.wait_for_edge(io_number, GPIO.RISING)
def listen(assistant): while True: GPIO.wait_for_edge(button_pin, GPIO.RISING) sleep(.5) print("Trigger button gedrückt") mute(assistant) assistant.start_conversation() # speak_tts erzeugt aus einem übergebenen Text mittel Googles TTS-Dienst eine # MP3-Datei. Diese wird von sox abgespielt. # Optional kann eine Sprache angegeben werden.
def mute(assistant): global muted while True: GPIO.wait_for_edge(button_pin, GPIO.RISING) sleep(.5) print('button') muted = not muted assistant.set_mic_mute(muted)
def mute(assistant): global muted while True: GPIO.wait_for_edge(button_pin, GPIO.RISING) sleep(.5) print("Mute Button pressed") muted = not muted assistant.set_mic_mute(muted) # speak_tts erzeugt aus einem übergebenen Text mittel Googles TTS-Dienst eine # MP3-Datei. Diese wird von sox abgespielt. # Optional kann eine Sprache angegeben werden.
def irqWait(self, timeout = 30000): # CHANGE: detect module name because wait_for_edge is not available in # other libraries if GPIO.__name__ != "Adafruit_BBIO.GPIO": return False # TODO: A race condition may occur here. if GPIO.input(self.irq_pin) == 0: # Pin is already down. Packet is waiting? return True return GPIO.wait_for_edge(self.irq_pin, GPIO.FALLING, timeout) == 1
def wait_for_input(self): GPIO.wait_for_edge(self.GPIO_PIN, GPIO.RISING)
def run(self): while not self.quit: button_pressed = False if 0 != BUTTONPIN: button_pressed = not GPIO.input(BUTTONPIN) if button_pressed and self.core.current_call is None: # We do not check the time here. They can keep "ringing" the doorbell if they want # but it won't matter once a call is initiated. if self.doorbell_sound: self.doorbell_sound.play() try: if time.time() - self.lastMessageTicks > WAITSECONDS: self.notify_chat_contacts(message_template="Doorbell ring on %s at %s") params = self.core.create_call_params(None) params.audio_enabled = True params.video_enabled = True params.audio_multicast_enabled = False # Set these = True if you want multiple params.video_multicast_enabled = False # people to connect at once. address = linphone.Address.new(doorbellToAddress) logging.info('address = {address}, used_video_codec = {codec}'.format( address=address, codec=params.used_video_codec)) self.current_call = self.core.invite_address_with_params(address, params) if None is self.current_call: logging.error("Error creating call and inviting with params... outgoing call aborted.") if time.time() - self.lastEmailTicks >= WAITEMAILSECONDS: if LEDPINDOORBELL: self.flash_led(ledpin=LEDPINDOORBELL, stay_on=True, blink_cam_led=False, delay=0.25, blink_count=8) else: self.flash_led() self.notify_email_contacts() except KeyboardInterrupt: self.quit = True break elif detectMotion and self.core.current_call is None: motion_detected = False # Incoming calls have been handled, so check the motion detector: if 0 != PIRPIN: # motion_detected = GPIO.wait_for_edge(PIRPIN,GPIO.RISING) # motion_detected = GPIO.event_detected(PIRPIN) motion_detected = GPIO.input(PIRPIN) logging.debug("\rmotion_detected = %s, GPIO.input(PIRPIN) = %s" % ( str(motion_detected), str(GPIO.input(PIRPIN)))) elif 0 != MDPIN: motion_detected = GPIO.input(MDPIN) == 0 if motion_detected: self.motion_detected() # else: # time.sleep(0.01) # self.core.iterate()
def buttonCallback(self, channel): """Callback function for user input (pressed buttons)""" # record button state transitions if channel == RECORD_BUTTON: # if the current state is IDLE change it to RECORD if self.state == IDLE: # set RECORD state self.setState(RECORD) # empty payloads list self.payloads = [] # if the current state is RECORD change it to IDLE elif self.state == RECORD: # set IDLE state self.setState(IDLE) # play button state transitions elif channel == REPLAY_BUTTON: # if the current state is IDLE change it to REPLAY if self.state == IDLE: # set REPLAY state self.setState(REPLAY) # scan button state transitions elif channel == SCAN_BUTTON: # wait a short a time to see whether the record button is also # press in order to perform a graceful shutdown # remove event detection for record button GPIO.remove_event_detect(RECORD_BUTTON) chan = GPIO.wait_for_edge(RECORD_BUTTON, GPIO.RISING, timeout=1000) if chan != None: # set SHUTDOWN state self.setState(SHUTDOWN) # set callback function for record button GPIO.remove_event_detect(RECORD_BUTTON) GPIO.add_event_detect(RECORD_BUTTON, GPIO.RISING, callback = self.buttonCallback, bouncetime = 250) # if the current state is IDLE change it to SCAN if self.state == IDLE: # set SCAN state self.setState(SCAN) # attack button state transitions elif channel == ATTACK_BUTTON: # if the current state is IDLE change it to ATTACK if self.state == IDLE: # set ATTACK state self.setState(ATTACK) # debug output debug("State: {0}".format(self.state))