次回の記事「pythonで最適なパラメーターを自動的に探索する」での練習にあたって、まず先に以下のようなパラメーターを自由に設定できるようにドンチャン・ブレイクアウトのコードを改良しておきましょう。
設定可能なパラメーター
1)X分足の時間軸を使う
2)n期間の最高値のブレイクアウトで買う
3)m期間の最安値のブレイクアウトで売る
4)ブレイクアウトの判定に、高値/安値 or 終値/終値 を使う
今回の記事では、これらのパラメーターを設定できるようにコードを修正する方法を解説し、最後に1時間足で買い(30期間)、売り(20期間)、判定基準(終値)でのバックテスト結果を見てみましょう。
Pythonコード
先に改良版のコード全体を記します。
import requests from datetime import datetime import time import matplotlib.pyplot as plt import pandas as pd #--------設定項目-------- chart_sec = 3600 # 1時間足を使用 buy_term = 30 # 買いエントリーのブレイク期間の設定 sell_term = 30 # 売りエントリーのブレイク期間の設定 judge_price={ "BUY" : "high_price", # ブレイク判断 高値(high_price)か終値(close_price)を使用 "SELL": "low_price" # ブレイク判断 安値 (low_price)か終値(close_price)を使用 } wait = 0 # ループの待機時間 lot = 1 # BTCの注文枚数 slippage = 0.001 # 手数料・スリッページ #------------------------ # CryptowatchのAPIを使用する関数 def get_price(min, before=0, after=0): price = [] params = {"periods" : min } if before != 0: params["before"] = before if after != 0: params["after"] = after response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params) data = response.json() if data["result"][str(min)] is not None: for i in data["result"][str(min)]: if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0: price.append({ "close_time" : i[0], "close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'), "open_price" : i[1], "high_price" : i[2], "low_price" : i[3], "close_price": i[4] }) return price else: print("データが存在しません") return None # 時間と高値・安値をログに記録する関数 def log_price( data,flag ): log = "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) + " 終値: " + str(data["close_price"]) + "\n" flag["records"]["log"].append(log) return flag # ドンチャンブレイクを判定する関数 def donchian( data,last_data ): highest = max(i["high_price"] for i in last_data[ (-1* buy_term): ]) if data[ judge_price["BUY"] ] > highest: return {"side":"BUY","price":highest} lowest = min(i["low_price"] for i in last_data[ (-1* sell_term): ]) if data[ judge_price["SELL"] ] < lowest: return {"side":"SELL","price":lowest} return {"side" : None , "price":0} # ドンチャンブレイクを判定してエントリー注文を出す関数 def entry_signal( data,last_data,flag ): signal = donchian( data,last_data ) if signal["side"] == "BUY": flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の価格が{2}円でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]])) flag["records"]["log"].append(str(data["close_price"]) + "円で買いの指値注文を出します\n") # ここに買い注文のコードを入れる flag["order"]["exist"] = True flag["order"]["side"] = "BUY" flag["order"]["price"] = round(data["close_price"] * lot) if signal["side"] == "SELL": flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の価格が{2}円でブレイクしました\n".format(sell_term,signal["price"],data[judge_price["SELL"]])) flag["records"]["log"].append(str(data["close_price"]) + "円で売りの指値注文を出します\n") # ここに売り注文のコードを入れる flag["order"]["exist"] = True flag["order"]["side"] = "SELL" flag["order"]["price"] = round(data["close_price"] * lot) return flag # サーバーに出した注文が約定したか確認する関数 def check_order( flag ): # 注文状況を確認して通っていたら以下を実行 # 一定時間で注文が通っていなければキャンセルする flag["order"]["exist"] = False flag["order"]["count"] = 0 flag["position"]["exist"] = True flag["position"]["side"] = flag["order"]["side"] flag["position"]["price"] = flag["order"]["price"] return flag # 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数 def close_position( data,last_data,flag ): flag["position"]["count"] += 1 signal = donchian( data,last_data ) if flag["position"]["side"] == "BUY": if signal["side"] == "SELL": flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の価格が{2}円でブレイクしました\n".format(sell_term,signal["price"],data[judge_price["SELL"]])) flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data ) flag["position"]["exist"] = False flag["position"]["count"] = 0 flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします\n") # ここに売り注文のコードを入れる flag["order"]["exist"] = True flag["order"]["side"] = "SELL" flag["order"]["price"] = round(data["close_price"] * lot) if flag["position"]["side"] == "SELL": if signal["side"] == "BUY": flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の価格が{2}円でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]])) flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data ) flag["position"]["exist"] = False flag["position"]["count"] = 0 flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で買いの指値注文を入れてドテンします\n") # ここに買い注文のコードを入れる flag["order"]["exist"] = True flag["order"]["side"] = "BUY" flag["order"]["price"] = round(data["close_price"] * lot) return flag # 各トレードのパフォーマンスを記録する関数 def records(flag,data): # 取引手数料等の計算 entry_price = flag["position"]["price"] exit_price = round(data["close_price"] * lot) trade_cost = round( exit_price * slippage ) log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n" flag["records"]["log"].append(log) flag["records"]["slippage"].append(trade_cost) # 手仕舞った日時と保有期間を記録 flag["records"]["date"].append(data["close_time_dt"]) flag["records"]["holding-periods"].append( flag["position"]["count"] ) # 値幅の計算 buy_profit = exit_price - entry_price - trade_cost sell_profit = entry_price - exit_price - trade_cost # 利益が出てるかの計算 if flag["position"]["side"] == "BUY": flag["records"]["side"].append( "BUY" ) flag["records"]["profit"].append( buy_profit ) flag["records"]["return"].append( round( buy_profit / entry_price * 100, 4 )) if buy_profit > 0: log = str(buy_profit) + "円の利益です\n" flag["records"]["log"].append(log) else: log = str(buy_profit) + "円の損失です\n" flag["records"]["log"].append(log) if flag["position"]["side"] == "SELL": flag["records"]["side"].append( "SELL" ) flag["records"]["profit"].append( sell_profit ) flag["records"]["return"].append( round( sell_profit / entry_price * 100, 4 )) if sell_profit > 0: log = str(sell_profit) + "円の利益です\n" flag["records"]["log"].append(log) else: log = str(sell_profit) + "円の損失です\n" flag["records"]["log"].append(log) return flag # バックテストの集計用の関数 def backtest(flag): # 成績を記録したpandas DataFrameを作成 records = pd.DataFrame({ "Date" : pd.to_datetime(flag["records"]["date"]), "Profit" : flag["records"]["profit"], "Side" : flag["records"]["side"], "Rate" : flag["records"]["return"], "Periods" : flag["records"]["holding-periods"], "Slippage" : flag["records"]["slippage"] }) # 総損益の列を追加する records["Gross"] = records.Profit.cumsum() # 最大ドローダウンの列を追加する records["Drawdown"] = records.Gross.cummax().subtract(records.Gross) records["DrawdownRate"] = round(records.Drawdown / records.Gross.cummax() * 100,1) # 買いエントリーと売りエントリーだけをそれぞれ抽出する buy_records = records[records.Side.isin(["BUY"])] sell_records = records[records.Side.isin(["SELL"])] # 月別のデータを集計する records["月別集計"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m'))) grouped = records.groupby("月別集計") month_records = pd.DataFrame({ "Number" : grouped.Profit.count(), "Gross" : grouped.Profit.sum(), "Rate" : round(grouped.Rate.mean(),2), "Drawdown" : grouped.Drawdown.max(), "Periods" : grouped.Periods.mean() }) print("バックテストの結果") print("-----------------------------------") print("買いエントリの成績") print("-----------------------------------") print("トレード回数 : {}回".format( len(buy_records) )) print("勝率 : {}%".format(round(len(buy_records[buy_records.Profit>0]) / len(buy_records) * 100,1))) print("平均リターン : {}%".format(round(buy_records.Rate.mean(),2))) print("総損益 : {}円".format( buy_records.Profit.sum() )) print("平均保有期間 : {}足分".format( round(buy_records.Periods.mean(),1) )) print("-----------------------------------") print("売りエントリの成績") print("-----------------------------------") print("トレード回数 : {}回".format( len(sell_records) )) print("勝率 : {}%".format(round(len(sell_records[sell_records.Profit>0]) / len(sell_records) * 100,1))) print("平均リターン : {}%".format(round(sell_records.Rate.mean(),2))) print("総損益 : {}円".format( sell_records.Profit.sum() )) print("平均保有期間 : {}足分".format( round(sell_records.Periods.mean(),1) )) print("-----------------------------------") print("総合の成績") print("-----------------------------------") print("全トレード数 : {}回".format(len(records) )) print("勝率 : {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1))) print("平均リターン : {}%".format(round(records.Rate.mean(),2))) print("平均保有期間 : {}足分".format( round(records.Periods.mean(),1) )) print("") print("最大の勝ちトレード : {}円".format(records.Profit.max())) print("最大の負けトレード : {}円".format(records.Profit.min())) print("最大ドローダウン : {0}円 / {1}%".format(-1 * records.Drawdown.max(), -1 * records.DrawdownRate.loc[records.Drawdown.idxmax()] )) print("利益合計 : {}円".format( records[records.Profit>0].Profit.sum() )) print("損失合計 : {}円".format( records[records.Profit<0].Profit.sum() )) print("") print("最終損益 : {}円".format( records.Profit.sum() )) print("手数料合計 : {}円".format( -1 * records.Slippage.sum() )) print("-----------------------------------") print("月別の成績") for index , row in month_records.iterrows(): print("-----------------------------------") print( "{0}年{1}月の成績".format( index.year, index.month ) ) print("-----------------------------------") print("トレード数 : {}回".format( row.Number.astype(int) )) print("月間損益 : {}円".format( row.Gross.astype(int) )) print("平均リターン : {}%".format( row.Rate )) print("月間ドローダウン : {}円".format( -1 * row.Drawdown.astype(int) )) # ログファイルの出力 file = open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8') file.writelines(flag["records"]["log"]) # 損益曲線をプロット plt.plot( records.Date, records.Gross ) plt.xlabel("Date") plt.ylabel("Balance") plt.xticks(rotation=50) # X軸の目盛りを50度回転 plt.show() # ここからメイン処理 # 価格チャートを取得 price = get_price(chart_sec,after=1451606400) flag = { "order":{ "exist" : False, "side" : "", "price" : 0, "count" : 0 }, "position":{ "exist" : False, "side" : "", "price": 0, "count":0 }, "records":{ "date":[], "profit":[], "return":[], "side":[], "holding-periods":[], "slippage":[], "log":[] } } last_data = [] i = 0 while i < len(price): # ドンチャンの判定に使う期間分の安値・高値データを準備する if len(last_data) < buy_term or len(last_data) < sell_term: last_data.append(price[i]) flag = log_price(price[i],flag) time.sleep(wait) i += 1 continue data = price[i] flag = log_price(data,flag) if flag["order"]["exist"]: flag = check_order( flag ) elif flag["position"]["exist"]: flag = close_position( data,last_data,flag ) else: flag = entry_signal( data,last_data,flag ) last_data.append( data ) i += 1 time.sleep(wait) print("--------------------------") print("テスト期間:") print("開始時点 : " + str(price[0]["close_time_dt"])) print("終了時点 : " + str(price[-1]["close_time_dt"])) print(str(len(price)) + "件のローソク足データで検証") print("--------------------------") backtest(flag)
これでドンチャン・ブレイクアウトの自動売買ロジックに、上値のブレイクアウトと下値のブレイクアウトとで異なる期間を設定できるようになりました。
またブレイクアウトの判定基準について、高値・安値を使うことも、終値を使うこともできるようになりました。
コードの解説
今までの「最もシンプルなドンチャン・ブレイクアウト」では、買い・売りともに同じ期間(例えば30期間)を使っていました。そのため、ブレイクアウトを判定する際に、比較に使うための過去のローソク足データは、配列に30個ぴったり用意しておけばOKでした。
しかし買い・売りで別々の期間を設定する場合は、そうはいきません。そこで、ドンチャンブレイクアウトを判定するための関数を以下のように改良します。
# ドンチャンブレイクを判定する関数 def donchian( data,last_data ): highest = max(i["high_price"] for i in last_data[ (-1* buy_term): ]) if data[ judge_price["BUY"] ] > highest: return {"side":"BUY","price":highest} lowest = min(i["low_price"] for i in last_data[ (-1* sell_term): ]) if data[ judge_price["SELL"] ] < lowest: return {"side":"SELL","price":lowest} return {"side" : None , "price":0}
ここでは、過去のローソク足データ(last_data)を30個丁度にキープするのではなく、どんどん溜めていって、後ろから欲しい数だけを取り出すようにしています。
例えば、上値ブレイクアウトの基準が30期間、下値ブレイクアウトの基準が20期間であれば、買いエントリーの判定では last_data の後ろから30コを取り出し、売りエントリーの判定では last_data の後ろから20コを取り出すようにしています。
ここで新しく使っているのが、スライス(:)という記述方法です。
スライスとは
スライスは、配列データの範囲を指定したり、pandasの表データで行や列の範囲を指定するときに、非常に便利な記述方法です。以下のように書くことで、配列や表(行・列)の範囲を指定することができます。
[2:5] 3番目~5番目まで
[:5] 先頭~5番目まで
[3:] 4番目~最後まで
[-3:] 後ろから3番目~最後まで
プログラミングの世界では、先頭は0番目から数えるので、指定した数字から1つズレる点に注意してください。最後はズレません。
number = [0,1,2,3,4,5,6,7] print( number[2:5] ) [2, 3, 4] print( number[:5] ) [0, 1, 2, 3, 4] print( number[3:] ) [3, 4, 5, 6, 7] print( number[-3:] ) [5, 6, 7]
そのため、上値ブレイクアウトの判定に必要な期間分のローソク足データは、以下のように指定することができます。
last_data[ (-1* buy_term): ]
この知識を使うと、ブレイクアウトの判定に必要な期間のデータを取り出しその中から最大値を抽出する、というコードを以下の1行で書くことができます。
highest = max(i["high_price"] for i in last_data[ (-1* buy_term): ])
2.ブレイクアウトの判定基準に「終値」を使えるようにする
これは簡単ですね。
変更前のコードで、data["high_price"] と指定していた部分を変数にすればいいだけです。
# 変更前 if data["high_price"] > highest: return {"side":"BUY","price":highest} # 変更後 #--------設定項目-------- judge_price={ "BUY" : "high_price", # high_price か close_price を指定 "SELL": "low_price" # low_price か close_price を指定 } #----------------------- if data[ judge_price["BUY"] ] > highest: return {"side":"BUY","price":highest}
3.メイン処理のループ文の変更
最後にメイン処理のWhile文ループの中身を修正します。
# 変更前 last_data = [] i = 0 while i < len(price): # ドンチャンの判定に使う期間分の安値・高値データを準備する if len(last_data) < term: (略) continue # 変更後 # ドンチャンの判定に使う期間分の安値・高値データを準備する if len(last_data) < buy_term or len(last_data) < sell_term: (略) continue
売り・買いエントリーで別々の期間を使用することになるので、最初に buy_term と sell_term の両方の期間分を超えるまでデータを蓄積します。
# 変更前 # 過去データを30個に保つために先頭を削除 del last_data[0] # 変更後 (削除)
またスライスを使うことで、過去のローソク足データをぴったりn個にキープしておく必要がなくなったので、上記の行は削除しておきます。これで完成です!
あとはログファイルの出力が「直近の安値が~」「直近の高値が~」になっているので、そこは各自で修正しておいてください。(上記のコードでは修正済)
実行結果
試しに1時間足で、買いエントリーは30期間の最高値ブレイクアウト、売りエントリーは20期間の最安値ブレイクアウト、各ブレイクアウトの判定には終値を使う、という場合の成績を見てみましょう。
# パラメーター設定 chart_sec = 3600 buy_term = 30 sell_term = 20 judge_price={ "BUY" : "close_price", "SELL": "close_price" }
すると以下のようになります。
損益曲線
成績
-------------------------- テスト期間: 開始時点 : 2017/08/15 01:00 終了時点 : 2018/04/22 08:00 6000件のローソク足データで検証 -------------------------- バックテストの結果 ----------------------------------- 買いエントリの成績 ----------------------------------- トレード回数 : 40回 勝率 : 52.5% 平均リターン : 4.39% 総損益 : 1628677円 平均保有期間 : 70.1足分 ----------------------------------- 売りエントリの成績 ----------------------------------- トレード回数 : 40回 勝率 : 52.5% 平均リターン : 1.77% 総損益 : 1223424円 平均保有期間 : 74.7足分 ----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 80回 勝率 : 52.5% 平均リターン : 3.08% 平均保有期間 : 72.4足分 最大の勝ちトレード : 545284円 最大の負けトレード : -220610円 最大ドローダウン : -381395円 / -13.2% 利益合計 : 4905987円 損失合計 : -2053886円 最終損益 : 2852101円 手数料合計 : -84811円 ----------------------------------- 月別の成績 ----------------------------------- 2017年8月の成績 ----------------------------------- トレード数 : 4回 月間損益 : -27411円 平均リターン : -1.34% 月間ドローダウン : -13500円 ----------------------------------- 2017年9月の成績 ----------------------------------- トレード数 : 10回 月間損益 : 88980円 平均リターン : 1.96% 月間ドローダウン : -20785円 ----------------------------------- 2017年10月の成績 ----------------------------------- トレード数 : 10回 月間損益 : 86498円 平均リターン : 2.21% 月間ドローダウン : -92319円 ----------------------------------- 2017年11月の成績 ----------------------------------- トレード数 : 9回 月間損益 : 375451円 平均リターン : 5.46% 月間ドローダウン : -114141円 ----------------------------------- 2017年12月の成績 ----------------------------------- トレード数 : 6回 月間損益 : 988615円 平均リターン : 8.9% 月間ドローダウン : -220610円 ----------------------------------- 2018年1月の成績 ----------------------------------- トレード数 : 14回 月間損益 : 425674円 平均リターン : 1.03% 月間ドローダウン : -381395円 ----------------------------------- 2018年2月の成績 ----------------------------------- トレード数 : 11回 月間損益 : 374125円 平均リターン : 3.46% 月間ドローダウン : -316547円 ----------------------------------- 2018年3月の成績 ----------------------------------- トレード数 : 9回 月間損益 : 305010円 平均リターン : 3.1% 月間ドローダウン : -327540円 ----------------------------------- 2018年4月の成績 ----------------------------------- トレード数 : 7回 月間損益 : 235159円 平均リターン : 3.93% 月間ドローダウン : -38818円
※ ここでの月間ドローダウンは、その月に経験している継続中の最大ドローダウンのことです。その月だけのドローダウンを計算しているわけではありません。
このように、同じドンチャン・ブレイクアウトのロジックでも、買い判定の期間、売り判定の期間、ブレイクアウトの判定を高値(安値)でするか、終値でするか、などの設定によってかなり異なる結果になります。
しかしどのようなパラメーター設定が最適なのかを手作業で探すのは困難です。以下のようなパラメーター設定だけでも、その組み合わせの数は膨大だからです。
最適なパラメーターの組み合わせ
1)使用する時間軸
⇒ 15分足/30分足/1時間足/2時間足/6時間足
2)上値ブレイクアウトの判定期間
⇒ 10/15/20/25/30/35/40/45
3)下値ブレイクアウトの判定期間
⇒ 10/15/20/25/30/35/40/45
4)判定に使用する価格
⇒ 高値・安値/終値・終値
上記の組み合わせを試すだけでも、そのパターンは640通りにもなります。ちょっと手作業で1つ1つ結果をテストする気にはなりませんよね(笑)
こういうときこそpythonの出番です! 次回の記事では、pythonで自動的に最適なパラメーターを探索する方法を解説します!