前回の記事「Bitflyerの自動売買BOTの勝率・平均リターン・総利益をバックテストで計算する」の最後の練習問題の答え合わせです。
解答
import requests from datetime import datetime import time import json import ccxt import numpy as np # ----バックテスト用の初期設定値 chart_sec = 300 # 5分足 lot = 1 # 1トレードの枚数 slippage = 0.0005 # 手数料やスリッページ(0.05%初期値) close_condition = 0 # エントリー後、n足が経過するまでは手仕舞わない(初期値0) # パラーメータを指定してCryptowatchのAPIを使用する関数 def get_price(min, before=0, after=0): price = [] params = {"periods" : min } if before != 0: params["before"] = before if after != 0: params["after"] = after response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params) data = response.json() if data["result"][str(min)] is not None: for i in data["result"][str(min)]: if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0: price.append({ "close_time" : i[0], "close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'), "open_price" : i[1], "high_price" : i[2], "low_price" : i[3], "close_price": i[4] }) return price else: print("データが存在しません") return None # json形式のファイルから価格データを読み込む関数 def get_price_from_file(path): file = open(path,'r',encoding='utf-8') price = json.load(file) return price # 時間と始値・終値を表示する関数 def print_price( data ): print( "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) ) # 時間と始値・終値をログに記録する関数 def log_price( data,flag ): log = "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) + "\n" flag["records"]["log"].append(log) return flag # 各ローソク足が陽線・陰線の基準を満たしているか確認する関数 def check_candle( data,side ): try: realbody_rate = abs(data["close_price"] - data["open_price"]) / (data["high_price"]-data["low_price"]) increase_rate = data["close_price"] / data["open_price"] - 1 except ZeroDivisionError as e: return False if side == "buy": if data["close_price"] < data["open_price"] : return False #elif increase_rate < 0.0003 : return False #elif realbody_rate < 0.5 : return False else : return True if side == "sell": if data["close_price"] > data["open_price"] : return False #elif increase_rate > -0.0003 : return False #elif realbody_rate < 0.5 : return False else : return True # ローソク足が連続で上昇しているか確認する関数 def check_ascend( data,last_data ): if data["open_price"] > last_data["open_price"] and data["close_price"] > last_data["close_price"]: return True else: return False # ローソク足が連続で下落しているか確認する関数 def check_descend( data,last_data ): if data["open_price"] < last_data["open_price"] and data["close_price"] < last_data["close_price"]: return True else: return False # 買いシグナルが出たら指値で買い注文を出す関数 def buy_signal( data,last_data,flag ): if flag["buy_signal"] == 0 and check_candle( data,"buy" ): flag["buy_signal"] = 1 elif flag["buy_signal"] == 1 and check_candle( data,"buy" ) and check_ascend( data,last_data ): flag["buy_signal"] = 2 elif flag["buy_signal"] == 2 and check_candle( data,"buy" ) and check_ascend( data,last_data ): log = "3本連続で陽線 なので" + str(data["close_price"]) + "円で買い指値を入れます\n" flag["records"]["log"].append(log) flag["buy_signal"] = 3 # ここにBitflyerへの買い注文コードを入れる flag["order"]["exist"] = True flag["order"]["side"] = "BUY" flag["order"]["price"] = round(data["close_price"] * lot) else: flag["buy_signal"] = 0 return flag # 売りシグナルが出たら指値で売り注文を出す関数 def sell_signal( data,last_data,flag ): if flag["sell_signal"] == 0 and check_candle( data,"sell" ): flag["sell_signal"] = 1 elif flag["sell_signal"] == 1 and check_candle( data,"sell" ) and check_descend( data,last_data ): flag["sell_signal"] = 2 elif flag["sell_signal"] == 2 and check_candle( data,"sell" ) and check_descend( data,last_data ): log = "3本連続で陰線 なので" + str(data["close_price"]) + "円で売り指値を入れます\n" flag["records"]["log"].append(log) flag["sell_signal"] = 3 # ここにBitflyerへの売り注文コードを入れる flag["order"]["exist"] = True flag["order"]["side"] = "SELL" flag["order"]["price"] = round(data["close_price"] * lot) else: flag["sell_signal"] = 0 return flag # 手仕舞いのシグナルが出たら決済の成行注文を出す関数 def close_position( data,last_data,flag ): flag["position"]["count"] += 1 if flag["position"]["side"] == "BUY": if data["close_price"] < last_data["close_price"] and flag["position"]["count"] > close_condition: log = "前回の終値を下回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n" flag["records"]["log"].append(log) # 決済の成行注文コードを入れる records( flag,data ) flag["position"]["exist"] = False flag["position"]["count"] = 0 if flag["position"]["side"] == "SELL": if data["close_price"] > last_data["close_price"] and flag["position"]["count"] > close_condition: log = "前回の終値を上回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n" flag["records"]["log"].append(log) # 決済の成行注文コードを入れる records( flag,data ) flag["position"]["exist"] = False flag["position"]["count"] = 0 return flag # サーバーに出した注文が約定したかどうかチェックする関数 def check_order( flag ): # 注文状況を確認して通っていたら以下を実行 # 一定時間で注文が通っていなければキャンセルする flag["order"]["exist"] = False flag["order"]["count"] = 0 flag["position"]["exist"] = True flag["position"]["side"] = flag["order"]["side"] flag["position"]["price"] = flag["order"]["price"] return flag # 各トレードのパフォーマンスを記録する関数 def records(flag,data): # 取引手数料等の計算 entry_price = flag["position"]["price"] exit_price = round(data["close_price"] * lot) trade_cost = round( exit_price * slippage ) log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n" flag["records"]["log"].append(log) flag["records"]["slippage"].append(trade_cost) # 値幅の計算 buy_profit = exit_price - entry_price - trade_cost sell_profit = entry_price - exit_price - trade_cost # 利益が出てるかの計算 if flag["position"]["side"] == "BUY": flag["records"]["buy-count"] += 1 flag["records"]["buy-profit"].append( buy_profit ) flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 )) flag["records"]["buy-holding-periods"].append( flag["position"]["count"] ) if buy_profit > 0: flag["records"]["buy-winning"] += 1 log = str(buy_profit) + "円の利益です\n" flag["records"]["log"].append(log) else: log = str(buy_profit) + "円の損失です\n" flag["records"]["log"].append(log) if flag["position"]["side"] == "SELL": flag["records"]["sell-count"] += 1 flag["records"]["sell-profit"].append( sell_profit ) flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 )) flag["records"]["sell-holding-periods"].append( flag["position"]["count"] ) if sell_profit > 0: flag["records"]["sell-winning"] += 1 log = str(sell_profit) + "円の利益です\n" flag["records"]["log"].append(log) else: log = str(sell_profit) + "円の損失です\n" flag["records"]["log"].append(log) return flag # バックテストの集計用の関数 def backtest(flag): buy_gross_profit = np.sum(flag["records"]["buy-profit"]) sell_gross_profit = np.sum(flag["records"]["sell-profit"]) print("バックテストの結果") print("--------------------------") print("買いエントリの成績") print("--------------------------") print("トレード回数 : {}回".format(flag["records"]["buy-count"] )) print("勝率 : {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1))) print("平均リターン : {}%".format(round(np.average(flag["records"]["buy-return"]),4))) print("総損益 : {}円".format( np.sum(flag["records"]["buy-profit"]) )) print("平均保有期間 : {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) )) print("--------------------------") print("売りエントリの成績") print("--------------------------") print("トレード回数 : {}回".format(flag["records"]["sell-count"] )) print("勝率 : {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1))) print("平均リターン : {}%".format(round(np.average(flag["records"]["sell-return"]),4))) print("総損益 : {}円".format( np.sum(flag["records"]["sell-profit"]) )) print("平均保有期間 : {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) )) print("--------------------------") print("総合の成績") print("--------------------------") print("総損益 : {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) )) print("手数料合計 : {}円".format( np.sum(flag["records"]["slippage"]) )) # ログファイルの出力 file = open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8') file.writelines(flag["records"]["log"]) # ここからメインの実行処理 # 価格チャートを取得 price = get_price(chart_sec,after=1483228800) print("--------------------------") print("テスト期間:") print("開始時点 : " + str(price[0]["close_time_dt"])) print("終了時点 : " + str(price[-1]["close_time_dt"])) print(str(len(price)) + "件のローソク足データで検証") print("--------------------------") last_data = price[0] # 初期値となる価格データをセット flag = { "buy_signal":0, "sell_signal":0, "order":{ "exist" : False, "side" : "", "price" : 0, "count" : 0 }, "position":{ "exist" : False, "side" : "", "price": 0, "count":0 }, "records":{ "buy-count": 0, "buy-winning" : 0, "buy-return":[], "buy-profit": [], "buy-holding-periods":[], "sell-count": 0, "sell-winning" : 0, "sell-return":[], "sell-profit":[], "sell-holding-periods":[], "slippage":[], "log":[] } } i = 1 while i < len(price): if flag["order"]["exist"]: flag = check_order( flag ) data = price[i] flag = log_price(data,flag) if flag["position"]["exist"]: flag = close_position( data,last_data,flag ) else: flag = buy_signal( data,last_data,flag ) flag = sell_signal( data,last_data,flag ) last_data["close_time"] = data["close_time"] last_data["open_price"] = data["open_price"] last_data["close_price"] = data["close_price"] i+=1 backtest(flag)
解説
まずflag変数に新しく以下の2つの変数を追加しました。
"sell-holding-periods":[] "buy-holding-periods":[]
ポジションを保有している期間は、もともと手仕舞いのための関数 def close_position() の先頭部分でカウントしています。
def close_position( data,last_data,flag ): flag["position"]["count"] += 1
そのため、ポジションを手仕舞うたびに、ここで記録している[count]の情報を、上記の holding-periods:[] に append していけばOKです。そのため、関数 def records() に以下の部分を追記します。
追記部分
# 各トレードのパフォーマンスを記録する関数 def records(flag,data): (中略) # 利益が出てるかの計算 if flag["position"]["side"] == "BUY": # ここに追記 flag["records"]["buy-holding-periods"].append( flag["position"]["count"] ) if flag["position"]["side"] == "SELL": # ここに追記 flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )
また最後にバックテスト集計用の関数のところで、平均保有期間を計算してprintします。計算には、numpy の np.average() を使い、それを round() で小数点1桁に丸めています。
以下の部分です。
追記部分
# バックテストの集計用の関数 def backtest(flag): (中略) print("平均保有期間 : {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) )) print("平均保有期間 : {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))
いつも初心者にも分かりやすい記事ありがとうございます。
建玉の平均価格の例えば1万円下に注文するといったやり方を知りたいです。
よろしくお願いします!
コメントありがとうございます!
なるほど、それは知りたい方も多いかもですね。
約束はできないですが、時間があれば記事にしてみますね^^