我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用talib.EMA。
def MA_RIBBON(df, ma_series): ma_array = np.zeros([len(df), len(ma_series)]) ema_list = [] for idx, ma_len in enumerate(ma_series): ema_i = EMA(df, n = ma_len, field = 'close') ma_array[:, idx] = ema_i ema_list.append(ema_i) corr = np.empty([len(df)]) pval = np.empty([len(df)]) dist = np.empty([len(df)]) corr[:] = np.NAN pval[:] = np.NAN dist[:] = np.NAN max_n = max(ma_series) for idy in range(len(df)): if idy >= max_n - 1: corr[idy], pval[idy] = stats.spearmanr(ma_array[idy,:], range(len(ma_series), 0, -1)) dist[idy] = max(ma_array[idy,:]) - min(ma_array[idy,:]) corr_ts = pd.Series(corr*100, index = df.index, name = "MARIBBON_CORR") pval_ts = pd.Series(pval*100, index = df.index, name = "MARIBBON_PVAL") dist_ts = pd.Series(dist, index = df.index, name = "MARIBBON_DIST") return pd.concat([corr_ts, pval_ts, dist_ts] + ema_list, join='outer', axis=1)
def getStrategy_GD(start_trading_day,end_trading_day,_time,_close): point = [] iday = _time.index(start_trading_day) eday = _time.index(end_trading_day) short_ema = ta.EMA(np.array(_close,dtype='f8'),timeperiod=10) long_ema = ta.EMA(np.array(_close,dtype='f8'),timeperiod=25) short_ema = short_ema[iday:eday] long_ema = long_ema[iday:eday] point.append(0) for i in range(1,len(short_ema)): if (short_ema[i-1] <= long_ema[i-1]) and (short_ema[i] >= long_ema[i]): point.append(1) elif (short_ema[i-1] >= long_ema[i-1]) and (short_ema[i] <= long_ema[i]): point.append(-1) else: point.append(0) return point
def atr(df, n = 20): new_tr = max(df['high'][-1]-df['low'][-1], abs(df['high'][-1] - df['close'][-2]), abs(df['low'][-1] - df['close'][-2])) alpha = 2.0/(n+1) df['ATR'+str(n)][-1] = df['ATR'+str(n)][-2] * (1-alpha) + alpha * new_tr # talib matype: 0=SMA, 1=EMA, 2=WMA, 3=DEMA, 4=TEMA, 5=TRIMA, 6=KAMA, 7=MAMA, 8=T3
def EMA(df, n, field = 'close'): return pd.Series(talib.EMA(df[field].values, n), name = 'EMA_' + field.upper() + '_' + str(n), index = df.index)
def EMAVAR(df, n, field = 'close'): ema_ts = EMA(df, n, field) alpha = 2.0 / (n + 1) var_adj = (1-alpha) * (df[field] - ema_ts.shift(1).fillna(0))**2 evar_ts = pd.Series(talib.EMA(var_adj.values, n), name = 'EVAR_' + field.upper() + '_' + str(n), index = df.index) return pd.concat([ema_ts, evar_ts], join='outer', axis=1)
def TEMA(ts, n): n = int(n) ts_ema1 = pd.Series( pd.ewma(ts, span = n, adjust = False), name = 'EMA' + str(n) ) ts_ema2 = pd.Series( pd.ewma(ts_ema1, span = n, adjust = False), name = 'EMA2' + str(n) ) ts_ema3 = pd.Series( pd.ewma(ts_ema2, span = n, adjust = False), name = 'EMA3' + str(n) ) ts_tema = pd.Series( 3 * ts_ema1 - 3 * ts_ema2 + ts_ema3, name = 'TEMA' + str(n) ) return ts_tema
def add_EMA(self, timeperiod=26, type='line', color='secondary', **kwargs): """Exponential Moving Average.""" if not self.has_close: raise Exception() utils.kwargs_check(kwargs, VALID_TA_KWARGS) if 'kind' in kwargs: type = kwargs['kind'] name = 'EMA({})'.format(str(timeperiod)) self.pri[name] = dict(type=type, color=color) self.ind[name] = talib.EMA(self.df[self.cl].values, timeperiod)
def add_TRIX(self, timeperiod=15, type='area', color='secondary', **kwargs): """1-day Rate of Change of Triple Smooth EMA.""" if not self.has_close: raise Exception() utils.kwargs_check(kwargs, VALID_TA_KWARGS) if 'kind' in kwargs: type = kwargs['kind'] name = 'TRIX({})'.format(str(timeperiod)) self.sec[name] = dict(type=type, color=color) self.ind[name] = talib.TRIX(self.df[self.cl].values, timeperiod)
def calculate_obv(self, period_name, closing_prices, volumes): obv = talib.OBV(closing_prices, volumes) obv_ema = talib.EMA(obv, timeperiod=3) self.current_indicators[period_name]['obv_ema'] = obv_ema[-1] self.current_indicators[period_name]['obv'] = obv[-1]
def ema(self, np_array, n, array=False): """????""" if n < 2: result = np_array else: result = talib.EMA(np_array, n) if array: return result return result[-1]
def MACD(self): """??????????????? MACD: (12-day EMA - 26-day EMA) ??? ?? Signal Line: 9-day EMA of MACD ??, ?? MACD Histogram: MACD - Signal Line ???, ??? return : macd, macdsignal, macdhist(??)""" closes = self.getCloses() return talib.MACD(closes)
def MACD(closes): """??????????????? MACD: (12-day EMA - 26-day EMA) ??? ?? Signal Line: 9-day EMA of MACD ??, ?? MACD Histogram: MACD - Signal Line ???, ??? return : macd, macdsignal, macdhist(??)""" return talib.MACD(closes)
def BOLL(closes, matype=MA_Type.EMA): """??? matype: ???????? ????????????? EMA??2??SMA?4????????? return upper, middle, lower""" closes = np.array(closes) return talib.BBANDS(closes, timeperiod=20, matype=matype)
def EMA(security_list, timeperiod=30): # ???????????? if isinstance(security_list, str): security_list = [security_list] # ?? EMA security_data = history(timeperiod * 2, '1d', 'close', security_list, df=False, skip_paused=True) ema = {} for stock in security_list: ema[stock] = talib.EMA(security_data[stock], timeperiod) return ema # DMA
def macd(close, previous_macds=[], fast_period=12, slow_period=26, signal_period=9): """ MACD - Moving Average Convergence Divergence previous_macd: numpy.ndarray of previous MACDs Returns: - macd - macd_line """ dataset_size = close.size if dataset_size < slow_period-1: print('Error in macd.py: passed not enough data! Required: ' + str(slow_period) + ' passed: ' + str(dataset_size)) return None, None try: ema_slow = talib.EMA(close, timeperiod=slow_period)[-1] ema_fast = talib.EMA(close[-fast_period:], timeperiod=fast_period)[-1] macd_value = ema_fast - ema_slow # print('previous_macds:', previous_macds) if len(previous_macds) < signal_period: signal_line = None else: signal_line = talib.EMA(previous_macds[-signal_period:], timeperiod=signal_period)[-1] except Exception as e: print('Got Exception in macd.py. Details: ' + str(e) + '. Data: ' + str(previous_macds)) return None, None return macd_value, signal_line
def calculate_indicator(stock_df): periods = [3, 5, 10, 20, 30, 60] # MA for period in periods: stock_df['MA' + str(period)] = talib.MA(stock_df['close'].values, timeperiod=period) # EMA periods = [3, 5, 10, 20, 30, 60] for period in periods: stock_df['EMA' + str(period)] = talib.EMA(stock_df['close'].values, timeperiod=period) # AMTMA periods = [5, 10, 20] for period in periods: stock_df['AMTMA' + str(period)] = talib.MA(stock_df['amount'].values, timeperiod=period) # ATR periods = [5, 10, 20] for period in periods: stock_df['ATR' + str(period)] = talib.ATR(stock_df['high'].values, stock_df['low'].values, stock_df['close'].values, timeperiod=period) # ADX period = 14 stock_df['ADX' + str(period)] = talib.ADX(stock_df['high'].values, stock_df['low'].values, stock_df['close'].values, timeperiod=period) # MACD stock_df['MACD_DIFF'], stock_df['MACD_DEA'], stock_df['MACD_HIST'] = talib.MACD( stock_df['close'].values, fastperiod=12, slowperiod=26, signalperiod=9) # CCI period = 14 stock_df['CCI' + str(period)] = talib.CCI(stock_df['high'].values, stock_df['low'].values, stock_df['close'].values, timeperiod=period) # MFI period = 14 stock_df['MFI' + str(period)] = talib.MFI(stock_df['high'].values, stock_df['low'].values, stock_df['close'].values, stock_df['volume'].values, timeperiod=period) # ROCP periods = [5, 10, 20] for period in periods: stock_df['ROCP' + str(period)] = talib.ROCP(stock_df['close'].values, timeperiod=period)
def adosc_chartschool(my_close, my_high, my_low, my_volume, fastperiod, slowperiod): MFV = (((my_close - my_low)-(my_high - my_close)) / (my_high - my_low)) * my_volume ADL = np.cumsum(MFV) ADOSC = EMA(ADL.values, timeperiod=fastperiod) - EMA(ADL.values, timeperiod=slowperiod) return ADOSC
def __recountEma(self): """??K??EMA1 ?EMA2""" l = len(self.lineBar) # 1?lineBar????????? if len(self.lineBar) < max(7, self.inputEma1Len, self.inputEma2Len)+2: self.debugCtaLog(u'?????,??Bar?????{0}???EMA???{1}'. format(len(self.lineBar), max(7, self.inputEma1Len, self.inputEma2Len)+2)) return # ?????EMA?? if self.inputEma1Len > 0: if self.inputEma1Len > l: ema1Len = l else: ema1Len = self.inputEma1Len # 3????InputN??(?????????????? listClose=[x.close for x in self.lineBar[-ema1Len - 1:-1]] barEma1 = ta.EMA(numpy.array(listClose, dtype=float), ema1Len)[-1] barEma1 = round(float(barEma1), 3) if len(self.lineEma1) > self.inputEma1Len*8: del self.lineEma1[0] self.lineEma1.append(barEma1) # ?????EMA?? if self.inputEma2Len > 0: if self.inputEma2Len > l: ema2Len = l else: ema2Len = self.inputEma2Len # 3????InputN??(?????????????? listClose=[x.close for x in self.lineBar[-ema2Len - 1:-1]] barEma2 = ta.EMA(numpy.array(listClose, dtype=float), ema2Len)[-1] barEma2 = round(float(barEma2), 3) if len(self.lineEma2) > self.inputEma1Len*8: del self.lineEma2[0] self.lineEma2.append(barEma2)
def overlap_process(event): print(event.widget.get()) overlap = event.widget.get() upperband, middleband, lowerband = ta.BBANDS(close, timeperiod=5, nbdevup=2, nbdevdn=2, matype=0) fig, axes = plt.subplots(2, 1, sharex=True) ax1, ax2 = axes[0], axes[1] axes[0].plot(close, 'rd-', markersize=3) axes[0].plot(upperband, 'y-') axes[0].plot(middleband, 'b-') axes[0].plot(lowerband, 'y-') axes[0].set_title(overlap, fontproperties="SimHei") if overlap == u'???': pass elif overlap == u'????????': real = ta.DEMA(close, timeperiod=30) axes[1].plot(real, 'r-') elif overlap == u'??????? ': real = ta.EMA(close, timeperiod=30) axes[1].plot(real, 'r-') elif overlap == u'??????——?????': real = ta.HT_TRENDLINE(close) axes[1].plot(real, 'r-') elif overlap == u'???????????': real = ta.KAMA(close, timeperiod=30) axes[1].plot(real, 'r-') elif overlap == u'?????': real = ta.MA(close, timeperiod=30, matype=0) axes[1].plot(real, 'r-') elif overlap == u'MESA???????': mama, fama = ta.MAMA(close, fastlimit=0, slowlimit=0) axes[1].plot(mama, 'r-') axes[1].plot(fama, 'g-') elif overlap == u'????????': real = ta.MAVP(close, periods, minperiod=2, maxperiod=30, matype=0) axes[1].plot(real, 'r-') elif overlap == u'???????': real = ta.SMA(close, timeperiod=30) axes[1].plot(real, 'r-') elif overlap == u'????????(T3)': real = ta.T3(close, timeperiod=5, vfactor=0) axes[1].plot(real, 'r-') elif overlap == u'????????': real = ta.TEMA(close, timeperiod=30) axes[1].plot(real, 'r-') elif overlap == u'?????? ': real = ta.TRIMA(close, timeperiod=30) axes[1].plot(real, 'r-') elif overlap == u'???????': real = ta.WMA(close, timeperiod=30) axes[1].plot(real, 'r-') plt.show() # ????
def TDX_ADX(highs, lows, closes): """??????ADX, ?????test_adx return: np.ndarray MTR:=EXPMEMA(MAX(MAX(HIGH-LOW,ABS(HIGH-REF(CLOSE,1))),ABS(REF(CLOSE,1)-LOW)),N); HD :=HIGH-REF(HIGH,1); LD :=REF(LOW,1)-LOW; DMP:=EXPMEMA(IF(HD>0&&HD>LD,HD,0),N); DMM:=EXPMEMA(IF(LD>0&&LD>HD,LD,0),N); PDI: DMP*100/MTR; MDI: DMM*100/MTR; ADX: EXPMEMA(ABS(MDI-PDI)/(MDI+PDI)*100,MM); ADXR:EXPMEMA(ADX,MM); """ assert(len(closes)>30) highs = np.array(highs) lows = np.array(lows) closes = np.array(closes) mtr = np.zeros(len(closes)) for i, v in np.ndenumerate(closes): i = i[0] if i>0: y = closes[i-1] mtr[i] = max(max(highs[i]-lows[i], abs(highs[i]-y)), abs(y-lows[i])) n = 14 mm = 6 mtr = talib.EMA(mtr, n) hd = np.zeros(len(highs)) ld = np.zeros(len(highs)) for i, v in np.ndenumerate(highs): i = i[0] if i>0: hd[i] = highs[i] - highs[i-1] ld[i] = lows[i-1] - lows[i] if not (hd[i] > 0 and hd[i]>ld[i]): hd[i] = 0 if not (ld[i]>0 and ld[i]>hd[i]): ld[i] = 0 dmp = talib.EMA(hd, n) dmm = talib.EMA(ld, n) pdi = dmp * 100 / mtr mdi = dmm * 100 / mtr adx = np.zeros(len(mdi)) for i, v in np.ndenumerate(mdi): i = i[0] adx[i] = abs(mdi[i]-pdi[i]) / (mdi[i]+pdi[i])*100 adx = talib.EMA(adx, mm) return adx
def calculate_features(df): """ Method which calculates and generates features """ close = df['close'].values high = df['high'].values low = df['low'].values volume = df['volume'].values last_row = df.tail(1).copy() # ************** Calc EMAs ema_periods = [2, 4, 8, 12, 16, 20] for ema_period in ema_periods: ema = talib.EMA(close[-ema_period:], timeperiod=ema_period)[-1] last_row['ema' + str(ema_period)] = ema # ************** Calc RSIs rsi_periods = [5] for rsi_period in rsi_periods: rsi = talib.RSI(close[-rsi_period:], timeperiod=rsi_period-1)[-1] last_row['rsi' + str(rsi_period)] = rsi last_row['rsi_above_50' + str(rsi_period)] = int(rsi > 50.0) # ************** Calc CCIs cci_periods = [5] for cci_period in cci_periods: cci = talib.CCI(high[-cci_period:], low[-cci_period:], close[-cci_period:], timeperiod=cci_period)[-1] last_row['cci' + str(cci_period)] = cci # ************** Calc MACD 1 macd_periods = [34] for macd_period in macd_periods: macd, macd_signal, _ = talib.MACD(close[-macd_period:], fastperiod=12, slowperiod=26, signalperiod=9) macd = macd[-1] signal_line = macd_signal[-1] last_row['macd_above_signal' + str(macd_period)] = int(macd > signal_line) last_row['macd_above_zero' + str(macd_period)] = int(macd > 0.0) # ************** Calc OBVs obv_periods = [2, 4, 8, 12, 16, 20] for obv_period in obv_periods: obv = talib.OBV(close[-obv_period:], volume[-obv_period:])[-1] last_row['obv' + str(obv_period)] = obv return last_row
def calculate(self, look_back, wallet): """ Main strategy logic (the meat of the strategy) """ (dataset_cnt, _) = common.get_dataset_count(look_back, self.group_by_field) # Wait until we have enough data if dataset_cnt < self.min_history_ticks: print('dataset_cnt:', dataset_cnt) return self.actions self.actions.clear() # Calculate indicators df = look_back.tail(self.min_history_ticks) close = df['close'].values # ************** Calc EMA20 ema20_period = 25 ema20 = talib.EMA(close[-ema20_period:], timeperiod=ema20_period)[-1] close_price = self.get_price(TradeState.none, df.tail(), self.pair) print('close_price:', close_price, 'ema:', ema20) if close_price <= ema20: new_action = TradeState.sell else: new_action = TradeState.buy # ************** Calc EMA Death Cross ema_interval_short = 6 ema_interval_long = 25 ema_short = talib.EMA(close[-ema_interval_short:], timeperiod=ema_interval_short)[-1] ema_long = talib.EMA(close[-ema_interval_long:], timeperiod=ema_interval_long)[-1] if ema_short <= ema_long: # If we are below death cross, sell new_action = TradeState.sell trade_price = self.get_price(new_action, df.tail(), self.pair) action = TradeAction(self.pair, new_action, amount=None, rate=trade_price, buy_sell_mode=self.buy_sell_mode) self.actions.append(action) return self.actions
def calculate(self, look_back, wallet): """ Main strategy logic (the meat of the strategy) """ (dataset_cnt, _) = common.get_dataset_count(look_back, self.group_by_field) # Wait until we have enough data if dataset_cnt < self.min_history_ticks: print('dataset_cnt:', dataset_cnt) return self.actions self.actions.clear() # Calculate indicators df = look_back.tail(self.min_history_ticks) close = df['close'].values volume = df['volume'].values new_action = TradeState.none close_price = self.get_price(TradeState.none, look_back, self.pair) # ************** OBV (On Balance Volume) obv = talib.OBV(close, volume)[-1] print('obv:', obv) if obv >= 100.0: new_action = TradeState.buy elif obv < 100.0: new_action = TradeState.sell # ************** Calc EMA ema_period = 6 ema = talib.EMA(close[-ema_period:], timeperiod=ema_period)[-1] if close_price <= ema: new_action = TradeState.sell if new_action == TradeState.none: return self.actions trade_price = self.get_price(new_action, df.tail(), self.pair) action = TradeAction(self.pair, new_action, amount=None, rate=trade_price, buy_sell_mode=self.buy_sell_mode) self.actions.append(action) return self.actions