前回の記事の続きです。
適切な損切りラインを設定したら、そのストップ幅を基準にポジションサイズを計算して注文を出すコードを書いてみましょう。
1.ポジションサイズの計算式
この章の最初の記事で説明したように、ポジションサイズ(枚数)の計算式には、以下を使うことにします。
例えば、以下のような感じです。
1)理論上のポジションサイズの計算例
・証拠金残高 30万円
・1トレードで失ってもいい口座の割合 3%
・1時間足の平均ボラティリティ(ATR) 9000円
・損切ライン 2ATR
この場合、次のエントリー時の注文サイズは以下のようになります。
・理論上の注文サイズ = 30万円 × 0.03 ÷ ( 9000円 × 2 ) = 0.5枚
最大0.5枚までなら損切にかかっても口座資金の3%以上を失うことはない、ということですね。
2)実際に購入可能なポジションサイズの計算例
しかし一方で、「実際に購入可能な枚数」も計算しておく必要があります。実際に購入可能なサイズの計算式は以下です。
・証拠金残高 30万円
・レバレッジ設定 2倍
・エントリー時のBTC価格 104万円
・実際に購入可能なサイズ = 証拠金残高 × レバレッジ設定比率 ÷ BTC価格 = 0.58枚
このように、実際に購入可能な枚数というのはレバレッジ比率で決まります。そのため、BOTのプログラムでは、この両方のサイズを計算してどちらか「小さい方」で注文を出すという手順が必要になります。
2.注文サイズを計算するpythonコード
まずは注文サイズを計算するpythonコードを作ってみましょう。
なお、ここでは前回の記事で作成した「平均ボラティリティを計算する関数」calculate_volatility() を使うことを前提とします。
#-------------設定項目------------ stop_range = 2 # 何レンジ幅にストップを入れるか trade_risk = 0.03 # 1トレードあたり口座の何%まで損失を許容するか levarage = 2 # レバレッジ倍率の設定 #-------------------------------- # 口座残高を取得する関数 def bitflyer_collateral(): while True: try: collateral = bitflyer.private_get_getcollateral() print("現在のアカウント残高は{}円です".format( int(collateral["collateral"]) )) return int( collateral["collateral"] ) except ccxt.BaseError as e: print("BitflyerのAPIでの口座残高取得に失敗しました : ", e) print("20秒待機してやり直します") time.sleep(20) # 注文ロットを計算する関数 def calculate_lot( last_data,data,flag ): lot = 0 # 口座残高を取得する balance = bitflyer_collateral() # 1期間のボラティリティを基準にストップ位置を計算する volatility = calculate_volatility( last_data ) stop = stop_range * volatility # 注文可能なロット数を計算する calc_lot = np.floor( balance * trade_risk / stop * 100 ) / 100 able_lot = np.floor( balance * levarage / data["close_price"] * 100 ) / 100 lot = min(able_lot,calc_lot) print("許容リスクから購入できる枚数は最大{}BTCまでです".format(calc_lot)) print("証拠金から購入できる枚数は最大{}BTCまでです".format(able_lot)) return lot,stop
コードの解説
まず最初に、bitflyer_collateral()という関数を作って、BitflyerのAPIで現在の証拠金残高を取得します。
現在の口座残高を調べるには「GET /v1/me/vetcollateral/」というAPIを使います。レスポンス結果のJSONデータから[“collateral”]を指定すると、証拠金を取得できます。
参考
・CCXTライブラリでBitflyerに注文を出す方法をマスターする
・Bitflyerの公式APIドキュメント
アカウント残高を取得したら、さきほどの計算式を使って「理論上の注文サイズ」(calc_lot)と「実際に購入可能なサイズ」(able_lot)をそれぞれ計算します。
Bitflyerの注文サイズは最小単位が0.01なので、100を掛けてから np.floor() を使って小数点以下を切り捨て、それをまた100で割っています。
calc_lot = np.floor( balance * trade_risk / stop * 100 ) / 100 able_lot = np.floor( balance * levarage / data["close_price"] * 100 ) / 100
※注) 最小単位と最低注文数量は違うので注意してください。BitflyerFXの最小単位は0.001ですが、最低注文単位は0.01です。例えば、0.011の注文は出せますが、0.001の注文は出せません。
計算が完了したら、min(A,B) でどちらか小さい方をロット数に設定して返します。
エントリー注文を出す関数
これに合わせてエントリー注文を出す関数の方も修正する必要があります。
具体的には、上記のcalculate_lot() から返ってきた注文数量が、最低注文単位に満たない場合には何もしない、という処理を加えます。前回までのドンチアン・チャネルブレイクアウトBOTのコードを以下のように修正しましょう。
# エントリー注文を出す関数 def entry_signal( data,last_data,flag ): signal = donchian( data,last_data ) if signal["side"] == "BUY": print("過去{0}足の最高値{1}円を、直近の価格が{2}円でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]])) # ここから修正箇所 lot,stop = calculate_lot( last_data,data,flag ) if lot >= 0.01: print("{0}円あたりに{1}BTCで買いの注文を出します\n".format(data["close_price"],lot)) # ここに買い注文のコードを入れる flag["order"]["lot"],flag["order"]["stop"] = lot,stop flag["order"]["exist"] = True flag["order"]["side"] = "BUY" flag["order"]["price"] = data["close_price"] else: print("注文可能枚数{}が、最低注文単位に満たなかったので注文を見送ります".format(lot))
上記を動かすにあたり、あらかじめflag変数にあたらしくロット数を管理する項目を作成しておく必要があります。
・flag[“order”][“lot”]
・flag[“position”][“lot”]
これは買いエントリーの箇所ですが、同じように売りエントリーの箇所やドテン注文の箇所にも修正を加えます。
バックテスト用の関数
バックテストの場合は、証拠金はBitflyerから取得するのではなく、自分で変数に口座残高を記録しておいて計算する必要があります。そのため、以下のようなコードになります。
#-------------設定項目------------ stop_range = 2 # 何レンジ幅にストップを入れるか trade_risk = 0.03 # 1トレードあたり口座の何%まで損失を許容するか levarage = 2 # レバレッジ倍率の設定 start_funds = 300000 # シミュレーション時の初期資金 #-------------------------------- # 記録用の変数flagに新しくfundを追加する flag = { (略) "records":{ "funds" : start_funds, # 追加 } } # 注文ロットを計算する関数 def calculate_lot( last_data,data,flag ): lot = 0 # 口座残高を取得する(バックテスト用) balance = flag["records"]["funds"] # printではなくログにする flag["records"]["log"].append("現在のアカウント残高は{}円です\n".format(balance)) flag["records"]["log"].append("許容リスクから購入できる枚数は最大{}BTCまでです\n".format(calc_lot)) flag["records"]["log"].append("証拠金から購入できる枚数は最大{}BTCまでです\n".format(able_lot))
今回は練習なのでこちらのコードを使います。
ついでに前回までのバックテスト検証用コードで初期資金の増減をシミュレーションできるようにコードを修正しておきましょう。変更点をまとめて列挙しておきます。
その他の変更点
# 各トレードのパフォーマンスを記録する関数 def records(flag,data): # 利益が出てるかの計算 if flag["position"]["side"] == "BUY": flag["records"]["funds"] = flag["records"]["funds"] + buy_profit # 追加 if flag["position"]["side"] == "SELL": flag["records"]["funds"] = flag["records"]["funds"] + sell_profit # 追加 # バックテストの集計用の関数 def backtest(flag): # 資産推移の列を追加 records["Funds"] = records.Gross + start_funds # 追加 # 最大ドローダウンの計算を資産ベースに変更 records["Drawdown"] = records.Funds.cummax().subtract(records.Funds) records["DrawdownRate"] = round(records.Drawdown / records.Funds.cummax() * 100,1) print("バックテストの結果") print("-----------------------------------") print("総合の成績") print("-----------------------------------") # ドローダウンの%を資産ベースに変更 print("最大ドローダウン : {0}円 / {1}%".format(-1 * records.Drawdown.max(), -1 * records.DrawdownRate.loc[records.Drawdown.idxmax()] )) # 初期資金が最終的に何倍になったかを追加 print("初期資金 : {}円".format( start_funds )) print("最終資金 : {}円".format( records.Funds.iloc[-1] )) print("運用成績 : {}%".format( round(records.Funds.iloc[-1] / start_funds * 100),2 )) print("-----------------------------------") print("月別の成績") for index , row in month_records.iterrows(): print("-----------------------------------") print( "{0}年{1}月の成績".format( index.year, index.month ) ) print("-----------------------------------") # 毎月の月末時点での資金額を追加 print("月末資金 : {}円".format( row.Funds.astype(int) )) # グラフのY軸を「総損益」から「資産推移」に変更 plt.plot( records.Date, records.Funds )
では、これらのコードをすべて修正した上で、シミュレーション結果を検証してみましょう!
3.検証結果
まずは前回の記事と全く同じ前提条件を考えます。
基本条件
・1時間足を使用(2017/8~2018/4)
・上値ブレイクアウト判定期間 30期間
・下値ブレイクアウト判定期間 30期間
・平均ボラティリティ計算期間 30期間
・ブレイクアウトの判定基準(高値/安値)
・ストップのレンジ幅 2ATR
その上で、最初の資金を30万円で開始して1回あたりのトレードで取るリスク(= 口座残高の何%を賭けるか?)を変更していくと、結果がどう変わるかをシミュレーションしていきましょう。レバレッジは3倍までを上限とすると仮定します。
検証条件
・最初の軍資金 30万円
・レバレッジ3倍
・1回のトレードで取るリスクn%を変更
条件(1)口座の2%をリスクに晒す場合
一般によくトレードの入門書や教本には、1回のトレードで取るリスクは口座残高の1~2%がいいと記載されていますよね。有名な本「投資苑」の著者アレキサンダー・エルダー氏も、この2%ルールを推奨しています。
まずは毎回、口座残高の2%を賭けてトレードした場合の結果を検証してみましょう。
----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 110回 勝率 : 38.2% 平均リターン : 1.76% 平均保有期間 : 45.4足分 損切りの回数 : 52回 最大の勝ちトレード : 229799円 最大の負けトレード : -34769円 最大ドローダウン : -287552円 / -17.9% 初期資金 : 300000円 最終資金 : 1548945円 運用成績 : 516.0%
もし30万円の資金でスタートしていたら、150万円にまで増えていたことがわかりました。8カ月で5倍になった計算ですからこれでも十分すぎる成績といえます。
ではもう少し1トレード当たりのリスクを取ってみるとどうなるでしょうか?
条件(2)口座の5%をリスクに晒す場合
----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 110回 勝率 : 38.2% 平均リターン : 1.76% 平均保有期間 : 45.4足分 損切りの回数 : 52回 最大の勝ちトレード : 2146354円 最大の負けトレード : -442926円 最大ドローダウン : -3148210円 / -37.8% 初期資金 : 300000円 最終資金 : 7002507円 運用成績 : 2334.0%
今度は驚くべきパフォーマンスになりましたね!
運用成績は2334%(約23倍)で、当初の資金30万円は約700万円にまで増えました。 その代わりにドローダウンもかなり大きいですね。最大でピーク時の資金の38%を失っています。
詳しくはログファイルを見るとわかりますが、3月25日~4月2日の期間に11BTCのポジションを建てて1回のトレードで214万円の利益を上げています。さらにピーク時には最大832万円付近まで資金を増やしていますが、その後わずか2週間で10連敗し、300万円以上の資金を失っています。
▽ ピーク時のログ
▽ 連敗後のログ
当たり前ですが、トレード数や勝率・平均リターンなどは全く変わっていない点に注目してください。つまりこの成績の差は、純粋に「資金管理の方法」だけから生じているパフォーマンスの違いになります。
口座の何%のリスクを取るのが適切なのか?
このように1回のトレードで取るリスク(失ってもいい口座資金の割合)が大きいほど、資金が増えるのも爆発的に早くなりますが、1回の負けトレードで失うのも早くなります。
どのくらいの割合(%)が適切なのかは、資金量によって異なります。
資金が数十万円程度であれば、多少大きな損失を出しても、トレード以外の方法(= 労働・給料)で取り返すのにかかるコストや時間が相対的に小さいです。そのため、小資金の人ほどリスクを取りやすくなります。
例えば、1000万円を運用していて20%のドローダウンを受けるのと、30万円を運用していて20%のドローダウンを受けるのは全く意味が違います。後者は1~2カ月働けば取り返せるでしょうから、そこまで過度にリスク回避的になる必要はありません。
たまにBTCFXの世界では、「1万円でスタートして100倍にした!」といった実績を表明している方を見かけます。しかし「100万円を100倍にした!」と言ってる人は滅多に見かけません。これは少ない資金の人ほど1回のトレードで大きなリスクを取れることが理由です。
極端な話、1万円スタートであれば1回のトレードで口座の30%をリスクに晒しても別に構わないわけです。逆に数百万円以上の資金を運用するなら、1回のトレードの許容リスクは教科書通り1~3%に押さえるのが現実的です。
(例)
・初期資金 数十万円 n=5%~ で始める
・100万円を超えたら n=2~4%にする
・500万円を超えたら n=1~2%にする
※ nは1回のトレードで失ってもいい口座資金の割合
許容できる破産リスク
これは別の言い方をすると、運用資金の大きさによって許容できる破産リスクが違うということです。
30万円の資金で始める人なら、20%くらいの破産リスクを背負ってもいいからハイリターンを狙いたい!と考える人がいるかもしれません。一方、全財産の1000万円を運用している人からしたら、破産リスクは2%でも高すぎるかもしれません。
このように取れるリスクは、許容できる破産確率によって決まります。破産確率は、「システムの勝率」「平均利益」「平均損失」「1回のトレードで賭ける資金の割合」の4つの数字から、数学的に計算できます。
破産確率についてはまた別の記事で解説する予定です。
4.今回の勉強で使用したコード
少し関数が増えてきたので整理しておきました。
import requests from datetime import datetime import time import matplotlib.pyplot as plt import pandas as pd import numpy as np #--------設定項目-------- chart_sec = 3600 # 1時間足を使用 buy_term = 30 # 買いエントリーのブレイク期間の設定 sell_term = 30 # 売りエントリーのブレイク期間の設定 judge_price={ "BUY" : "high_price", # ブレイク判断 高値(high_price)か終値(close_price)を使用 "SELL": "low_price" # ブレイク判断 安値 (low_price)か終値(close_price)を使用 } volatility_term = 30 # 平均ボラティリティの計算に使う期間 stop_range = 2 # 何レンジ幅にストップを入れるか trade_risk = 0.02 # 1トレードあたり口座の何%まで損失を許容するか levarage = 3 # レバレッジ倍率の設定 start_funds = 300000 # シミュレーション時の初期資金 wait = 0 # ループの待機時間 slippage = 0.001 # 手数料・スリッページ #-------------補助ツールの関数-------------- # CryptowatchのAPIを使用する関数 def get_price(min, before=0, after=0): price = [] params = {"periods" : min } if before != 0: params["before"] = before if after != 0: params["after"] = after response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params) data = response.json() if data["result"][str(min)] is not None: for i in data["result"][str(min)]: if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0: price.append({ "close_time" : i[0], "close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'), "open_price" : i[1], "high_price" : i[2], "low_price" : i[3], "close_price": i[4] }) return price else: print("データが存在しません") return None # 時間と高値・安値をログに記録する関数 def log_price( data,flag ): log = "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) + " 終値: " + str(data["close_price"]) + "\n" flag["records"]["log"].append(log) return flag # 平均ボラティリティを計算する関数 def calculate_volatility( last_data ): high_sum = sum(i["high_price"] for i in last_data[-1 * volatility_term :]) low_sum = sum(i["low_price"] for i in last_data[-1 * volatility_term :]) volatility = round((high_sum - low_sum) / volatility_term) flag["records"]["log"].append("現在の{0}期間の平均ボラティリティは{1}円です\n".format( volatility_term, volatility )) return volatility # 注文ロットを計算する関数 def calculate_lot( last_data,data,flag ): lot = 0 balance = flag["records"]["funds"] volatility = calculate_volatility( last_data ) stop = stop_range * volatility calc_lot = np.floor( balance * trade_risk / stop * 100 ) / 100 able_lot = np.floor( balance * levarage / data["close_price"] * 100 ) / 100 lot = min(able_lot,calc_lot) flag["records"]["log"].append("現在のアカウント残高は{}円です\n".format(balance)) flag["records"]["log"].append("許容リスクから購入できる枚数は最大{}BTCまでです\n".format(calc_lot)) flag["records"]["log"].append("証拠金から購入できる枚数は最大{}BTCまでです\n".format(able_lot)) return lot,stop #-------------売買ロジックの部分の関数-------------- # ドンチャンブレイクを判定する関数 def donchian( data,last_data ): highest = max(i["high_price"] for i in last_data[ (-1* buy_term): ]) if data[ judge_price["BUY"] ] > highest: return {"side":"BUY","price":highest} lowest = min(i["low_price"] for i in last_data[ (-1* sell_term): ]) if data[ judge_price["SELL"] ] < lowest: return {"side":"SELL","price":lowest} return {"side" : None , "price":0} # エントリー注文を出す関数 def entry_signal( data,last_data,flag ): signal = donchian( data,last_data ) if signal["side"] == "BUY": flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の価格が{2}円でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]])) lot,stop = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("{0}円で{1}BTCの買い注文を出します\n".format(data["close_price"],lot)) # ここに買い注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] - stop)) flag["order"]["lot"],flag["order"]["stop"] = lot,stop flag["order"]["exist"] = True flag["order"]["side"] = "BUY" flag["order"]["price"] = data["close_price"] else: flag["records"]["log"].append("注文可能枚数{}が、最低注文単位に満たなかったので注文を見送ります\n".format(lot)) if signal["side"] == "SELL": flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の価格が{2}円でブレイクしました\n".format(sell_term,signal["price"],data[judge_price["SELL"]])) lot,stop = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("{0}円で{1}BTCの売り注文を出します\n".format(data["close_price"],lot)) # ここに売り注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] + stop)) flag["order"]["lot"],flag["order"]["stop"] = lot,stop flag["order"]["exist"] = True flag["order"]["side"] = "SELL" flag["order"]["price"] = data["close_price"] else: flag["records"]["log"].append("注文可能枚数{}が、最低注文単位に満たなかったので注文を見送ります\n".format(lot)) return flag # サーバーに出した注文が約定したか確認する関数 def check_order( flag ): # 注文状況を確認して通っていたら以下を実行 # 一定時間で注文が通っていなければキャンセルする flag["order"]["exist"] = False flag["order"]["count"] = 0 flag["position"]["exist"] = True flag["position"]["side"] = flag["order"]["side"] flag["position"]["stop"] = flag["order"]["stop"] flag["position"]["price"] = flag["order"]["price"] flag["position"]["lot"] = flag["order"]["lot"] return flag # 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数 def close_position( data,last_data,flag ): if flag["position"]["exist"] == False: return flag flag["position"]["count"] += 1 signal = donchian( data,last_data ) if flag["position"]["side"] == "BUY": if signal["side"] == "SELL": flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の価格が{2}円でブレイクしました\n".format(sell_term,signal["price"],data[judge_price["SELL"]])) flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,data["close_price"] ) flag["position"]["exist"] = False flag["position"]["count"] = 0 lot,stop = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("さらに{0}円で{1}BTCの売りの注文を入れてドテンします\n".format(data["close_price"],lot)) # ここに売り注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] + stop)) flag["order"]["lot"],flag["order"]["stop"] = lot,stop flag["order"]["exist"] = True flag["order"]["side"] = "SELL" flag["order"]["price"] = data["close_price"] if flag["position"]["side"] == "SELL": if signal["side"] == "BUY": flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の価格が{2}円でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]])) flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,data["close_price"] ) flag["position"]["exist"] = False flag["position"]["count"] = 0 lot,stop = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("さらに{0}円で{1}BTCの買いの注文を入れてドテンします\n".format(data["close_price"],lot)) # ここに買い注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] - stop)) flag["order"]["lot"],flag["order"]["stop"] = lot,stop flag["order"]["exist"] = True flag["order"]["side"] = "BUY" flag["order"]["price"] = data["close_price"] return flag # 損切ラインにかかったら成行注文で決済する関数 def stop_position( data,flag ): if flag["position"]["side"] == "BUY": stop_price = flag["position"]["price"] - flag["position"]["stop"] if data["low_price"] < stop_price: flag["records"]["log"].append("{0}円の損切ラインに引っかかりました。\n".format( stop_price )) stop_price = round( stop_price - 2 * calculate_volatility(last_data) / ( chart_sec / 60) ) flag["records"]["log"].append(str(stop_price) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,stop_price,"STOP" ) flag["position"]["exist"] = False flag["position"]["count"] = 0 if flag["position"]["side"] == "SELL": stop_price = flag["position"]["price"] + flag["position"]["stop"] if data["high_price"] > stop_price: flag["records"]["log"].append("{0}円の損切ラインに引っかかりました。\n".format( stop_price )) stop_price = round( stop_price + 2 * calculate_volatility(last_data) / (chart_sec / 60) ) flag["records"]["log"].append(str(stop_price) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,stop_price,"STOP" ) flag["position"]["exist"] = False flag["position"]["count"] = 0 return flag #------------バックテストの部分の関数-------------- # 各トレードのパフォーマンスを記録する関数 def records(flag,data,close_price,close_type=None): # 取引手数料等の計算 entry_price = int(round(flag["position"]["price"] * flag["position"]["lot"])) exit_price = int(round(close_price * flag["position"]["lot"])) trade_cost = round( exit_price * slippage ) log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n" flag["records"]["log"].append(log) flag["records"]["slippage"].append(trade_cost) # 手仕舞った日時と保有期間を記録 flag["records"]["date"].append(data["close_time_dt"]) flag["records"]["holding-periods"].append( flag["position"]["count"] ) # 損切りにかかった回数をカウント if close_type == "STOP": flag["records"]["stop-count"].append(1) else: flag["records"]["stop-count"].append(0) # 値幅の計算 buy_profit = exit_price - entry_price - trade_cost sell_profit = entry_price - exit_price - trade_cost # 利益が出てるかの計算 if flag["position"]["side"] == "BUY": flag["records"]["side"].append( "BUY" ) flag["records"]["profit"].append( buy_profit ) flag["records"]["return"].append( round( buy_profit / entry_price * 100, 4 )) flag["records"]["funds"] = flag["records"]["funds"] + buy_profit if buy_profit > 0: log = str(buy_profit) + "円の利益です\n\n" flag["records"]["log"].append(log) else: log = str(buy_profit) + "円の損失です\n\n" flag["records"]["log"].append(log) if flag["position"]["side"] == "SELL": flag["records"]["side"].append( "SELL" ) flag["records"]["profit"].append( sell_profit ) flag["records"]["return"].append( round( sell_profit / entry_price * 100, 4 )) flag["records"]["funds"] = flag["records"]["funds"] + sell_profit if sell_profit > 0: log = str(sell_profit) + "円の利益です\n\n" flag["records"]["log"].append(log) else: log = str(sell_profit) + "円の損失です\n\n" flag["records"]["log"].append(log) return flag # バックテストの集計用の関数 def backtest(flag): # 成績を記録したpandas DataFrameを作成 records = pd.DataFrame({ "Date" : pd.to_datetime(flag["records"]["date"]), "Profit" : flag["records"]["profit"], "Side" : flag["records"]["side"], "Rate" : flag["records"]["return"], "Stop" : flag["records"]["stop-count"], "Periods" : flag["records"]["holding-periods"], "Slippage" : flag["records"]["slippage"] }) # 連敗回数をカウントする consecutive_defeats = [] defeats = 0 for p in flag["records"]["profit"]: if p < 0: defeats += 1 else: consecutive_defeats.append( defeats ) defeats = 0 # 総損益の列を追加する records["Gross"] = records.Profit.cumsum() # 資産推移の列を追加する records["Funds"] = records.Gross + start_funds # 最大ドローダウンの列を追加する records["Drawdown"] = records.Funds.cummax().subtract(records.Funds) records["DrawdownRate"] = round(records.Drawdown / records.Funds.cummax() * 100,1) # 買いエントリーと売りエントリーだけをそれぞれ抽出する buy_records = records[records.Side.isin(["BUY"])] sell_records = records[records.Side.isin(["SELL"])] # 月別のデータを集計する records["月別集計"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m'))) grouped = records.groupby("月別集計") month_records = pd.DataFrame({ "Number" : grouped.Profit.count(), "Gross" : grouped.Profit.sum(), "Funds" : grouped.Funds.last(), "Rate" : round(grouped.Rate.mean(),2), "Drawdown" : grouped.Drawdown.max(), "Periods" : grouped.Periods.mean() }) print("バックテストの結果") print("-----------------------------------") print("買いエントリの成績") print("-----------------------------------") print("トレード回数 : {}回".format( len(buy_records) )) print("勝率 : {}%".format(round(len(buy_records[buy_records.Profit>0]) / len(buy_records) * 100,1))) print("平均リターン : {}%".format(round(buy_records.Rate.mean(),2))) print("総損益 : {}円".format( buy_records.Profit.sum() )) print("平均保有期間 : {}足分".format( round(buy_records.Periods.mean(),1) )) print("損切りの回数 : {}回".format( buy_records.Stop.sum() )) print("-----------------------------------") print("売りエントリの成績") print("-----------------------------------") print("トレード回数 : {}回".format( len(sell_records) )) print("勝率 : {}%".format(round(len(sell_records[sell_records.Profit>0]) / len(sell_records) * 100,1))) print("平均リターン : {}%".format(round(sell_records.Rate.mean(),2))) print("総損益 : {}円".format( sell_records.Profit.sum() )) print("平均保有期間 : {}足分".format( round(sell_records.Periods.mean(),1) )) print("損切りの回数 : {}回".format( sell_records.Stop.sum() )) print("-----------------------------------") print("総合の成績") print("-----------------------------------") print("全トレード数 : {}回".format(len(records) )) print("勝率 : {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1))) print("平均リターン : {}%".format(round(records.Rate.mean(),2))) print("平均保有期間 : {}足分".format( round(records.Periods.mean(),1) )) print("損切りの回数 : {}回".format( records.Stop.sum() )) print("") print("最大の勝ちトレード : {}円".format(records.Profit.max())) print("最大の負けトレード : {}円".format(records.Profit.min())) print("最大連敗回数 : {}回".format( max(consecutive_defeats) )) print("最大ドローダウン : {0}円 / {1}%".format(-1 * records.Drawdown.max(), -1 * records.DrawdownRate.loc[records.Drawdown.idxmax()] )) print("利益合計 : {}円".format( records[records.Profit>0].Profit.sum() )) print("損失合計 : {}円".format( records[records.Profit<0].Profit.sum() )) print("最終損益 : {}円".format( records.Profit.sum() )) print("") print("初期資金 : {}円".format( start_funds )) print("最終資金 : {}円".format( records.Funds.iloc[-1] )) print("運用成績 : {}%".format( round(records.Funds.iloc[-1] / start_funds * 100),2 )) print("手数料合計 : {}円".format( -1 * records.Slippage.sum() )) print("-----------------------------------") print("月別の成績") for index , row in month_records.iterrows(): print("-----------------------------------") print( "{0}年{1}月の成績".format( index.year, index.month ) ) print("-----------------------------------") print("トレード数 : {}回".format( row.Number.astype(int) )) print("月間損益 : {}円".format( row.Gross.astype(int) )) print("平均リターン : {}%".format( row.Rate )) print("継続ドローダウン : {}円".format( -1 * row.Drawdown.astype(int) )) print("月末資金 : {}円".format( row.Funds.astype(int) )) # ログファイルの出力 file = open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8') file.writelines(flag["records"]["log"]) # 損益曲線をプロット plt.plot( records.Date, records.Funds ) plt.xlabel("Date") plt.ylabel("Balance") plt.xticks(rotation=50) # X軸の目盛りを50度回転 plt.show() #------------ここからメイン処理-------------- # 価格チャートを取得 price = get_price(chart_sec,after=1451606400) flag = { "order":{ "exist" : False, "side" : "", "price" : 0, "stop" : 0, "ATR" : 0, "lot":0, "count" : 0 }, "position":{ "exist" : False, "side" : "", "price": 0, "stop":0, "ATR" : 0, "lot":0, "count":0 }, "records":{ "date":[], "profit":[], "return":[], "side":[], "stop-count":[], "funds" : start_funds, "holding-periods":[], "slippage":[], "log":[] } } last_data = [] need_term = max(buy_term,sell_term,volatility_term) i = 0 while i < len(price): # ドンチャンの判定に使う期間分の安値・高値データを準備する if len(last_data) < need_term: last_data.append(price[i]) flag = log_price(price[i],flag) time.sleep(wait) i += 1 continue data = price[i] flag = log_price(data,flag) if flag["order"]["exist"]: flag = check_order( flag ) elif flag["position"]["exist"]: flag = stop_position( data,flag ) flag = close_position( data,last_data,flag ) else: flag = entry_signal( data,last_data,flag ) last_data.append( data ) i += 1 time.sleep(wait) print("--------------------------") print("テスト期間:") print("開始時点 : " + str(price[0]["close_time_dt"])) print("終了時点 : " + str(price[-1]["close_time_dt"])) print(str(len(price)) + "件のローソク足データで検証") print("--------------------------") backtest(flag)
最近BOTトレードに興味をもって色々調べていたところ、このサイトを見つけて勉強させていただいてます!最近プログラミングを勉強し始めた自分でも理解できる質の高い内容で、毎回目から鱗が落ちる思いです。
現在も更新中のようですが、今後はどのような内容を掲載していく予定でしょうか?
コメントありがとうございます!
できる限りpythonを始めたばかりの方でもわかる内容を目指してるので、そう言っていただけてホッとしてます^^
書く順番は基本的にはそのときの気分で決めてます(笑) 今の資金管理編が終わったら、各メジャーなテクニカル指標のpythonでの実装方法を記事にしようかなと思ってます!