トレードをやっている方なら「真のブレイクアウトは出来高を伴う」という投資の助言をよく聞くことがあると思います。
今回の記事では、出来高を使ったフィルターでブレイクアウトの騙しを取り除くことが可能かどうかを検証します。またフィルターに本当に効果があるのかどうか、ただの偶然でないかどうかをpythonで統計的に検証する方法を解説します!
出来高を使ったフィルター
たしかにチャートなどを見ていると、過去30期間のブレイクアウトなどの場面では出来高が平均を上回っているケースが多いような気がします。
もしブレイクアウトが出来高を伴わなければ、それは「騙し」であるというようなことも、よくトレード本には書かれています。例えば、「出来高・価格分析の完全ガイド」などの本に詳しい説明があります。
では、例えば、過去30期間の高値/安値のブレイクアウト時に、「過去30期間の平均出来高を上回っているかどうか?」でフィルターをかける意味はあるのでしょうか? 今回はこれを検証してみたいと思います!
Pythonコード
まずはサクッと前回の記事と同じようにフィルターを作ってみましょう。
まず本編でCryptowatchの出来高データを使うのは初めてなので、以下を忘れずに追記しておいてください。
# CryptowatchのAPIを使用する関数 def get_price(min, before=0, after=0): # 略 if data["result"][str(min)] is not None: for i in data["result"][str(min)]: price.append({ "close_time" : i[0], "close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'), "open_price" : i[1], "high_price" : i[2], "low_price" : i[3], "close_price": i[4], "volume": i[5] }) # 追記 return price
また、前回の記事と同様に、以下のようなフィルター関数を作ります。
この関数は、ドンチアンブレイクアウトの判定をする関数の後に呼ばれ、エントリーの条件判定に利用されます。ここまでは前回の記事と全く同じ流れなので、難しいところはありません。
# エントリーフィルターの関数 def filter( signal ): average_volume = sum(i["volume"] for i in last_data[-1 * buy_term:]) / buy_term if data["volume"] > average_volume * 1.2: return True return False
今回は「過去30期間の平均的な出来高を20%以上上回った場合のみエントリーする」という仕掛けフィルターを作ってみます。
なお、ここでは上値ブレイクアウト期間(buy_term)と下値ブレイクアウト期間(sell_term)を同じ値に設定することを想定しています。もし違う値を設定する場合は、条件分岐が必要です。
検証してみよう!
まずはこちらを普通に前回と同じ方法で検証してみましょう!
実際にフィルターを適用してみて、その成績指標や運用パフォーマンスを比較します。この方法の問題点は後ほど解説しますが、ひとまずやってみましょう。
検証条件
1.検証期間(2017/9/22~2018/5/30)
2.1時間足を使用
3.上値・下値ブレイクアウト 30期間
4.ブレイクアウトの判定 終値/終値
5.ボラティリティ計算期間 30期間
6.ストップレンジ幅 2ATR
7.全トレードで1BTCだけを売買
なお、今回はブレイクアウトの判定には終値を使います。
出来高フィルター無しの場合
----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 101回 勝率 : 42.6% 期待リターン : 2.66% 標準偏差 : 8.76% 平均利益率 : 10.15% 平均損失率 : -2.89% 平均保有期間 : 35.0足分 損切りの回数 : 97回 最大の勝ちトレード : 584043円 最大の負けトレード : -176313円 最大連敗回数 : 11回 最大ドローダウン : -346620円 / -15.2% 利益合計 : 5299362円 損失合計 : -2140345円 最終損益 : 3159017円 初期資金 : 1000000円 最終資金 : 4159017円 運用成績 : 416.0% 手数料合計 : -116704円 ----------------------------------- 各成績指標 ----------------------------------- CAGR(年間成長率) : 701.17% MARレシオ : 20.78 シャープレシオ : 0.3 プロフィットファクター : 2.48 損益レシオ : 3.51 ------------------------------------------ +10%を超えるトレードの回数 : 18回 ------------------------------------------ -10%を下回るトレードの回数 : 0回 ------------------------------------------
出来高のフィルターありの場合
----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 93回 勝率 : 44.1% 平均リターン : 2.88% 標準偏差 : 8.82% 平均利益率 : 10.03% 平均損失率 : -2.75% 平均保有期間 : 36.1足分 損切りの回数 : 89回 最大の勝ちトレード : 584043円 最大の負けトレード : -176313円 最大連敗回数 : 11回 最大ドローダウン : -207374円 / -7.1% 利益合計 : 5060427円 損失合計 : -1660796円 最終損益 : 3399631円 初期資金 : 1000000円 最終資金 : 4399631円 運用成績 : 440.0% 手数料合計 : -104373円 ----------------------------------- 各成績指標 ----------------------------------- CAGR(年間成長率) : 731.58% MARレシオ : 20.81 シャープレシオ : 0.32 プロフィットファクター : 2.73 損益レシオ : 3.62
比較表
フィルター無し | フィルター有り | |
---|---|---|
トレード回数 | 101回 | 93回 |
勝率 | 42.6% | 44.1% |
期待値 | 2.66% | 2.88% |
最大DD | -15.2% | -7.1% |
CAGR | 701.17% | 769.74% |
MARレシオ | 20.78 | 41.97 |
PF | 2.28 | 3.05 |
損益レシオ | 3.51 | 3.64 |
この結果だけを見ると、勝率・期待リターン・プロフィットファクターのどれを見ても成績が改善していて、全体のパフォーマンスも向上しているように見えます。
一応、手仕舞いの方法の影響を受けていないことを確認するために、トレイリングストップを無効にした場合もテストしてみましょう。具体的には、通常のストップを使った場合や、ストップを無効にした場合でも比較してみます。
通常のストップを使った場合
フィルター無し | フィルター有り | |
---|---|---|
トレード回数 | 80回 | 67回 |
勝率 | 36.2% | 38.8% |
期待値 | 2.36% | 2.73% |
最大DD | -10.2% | -14.6% |
CAGR | 469.49% | 457.04% |
MARレシオ | 18.94 | 14.28 |
PF | 2.13 | 2.44 |
損益レシオ | 3.69 | 3.82 |
ストップを使わない場合
フィルター無し | フィルター有り | |
---|---|---|
トレード回数 | 70回 | 58回 |
勝率 | 45.7% | 48.3% |
期待値 | 2.76% | 3.57% |
最大DD | -15.1% | -15.6% |
CAGR | 483.68% | 519.62% |
MARレシオ | 15.55 | 15.95 |
PF | 2.04 | 2.61 |
損益レシオ | 2.38 | 2.76 |
いずれの場合も出来高が少ないブレイクアウトをフィルターにかけた場合の方が、勝率や期待リターンは改善しているように見えます。では、このまますぐに「出来高フィルター」を採用することを決めるべきでしょうか?
個人的にはそうではないと思います。前回のトレンドフィルターの記事でも少しだけ触れましたが、フィルターの条件に一致する回数が少なすぎる場合、その結果はただの偶然の可能性があるからです。
「ただの偶然」の可能性を疑ってみよう!
例えば、次のようなフィルターを想像してみてください。
エントリーのシグナルが出るたびにサイコロを振って、6の目が出ればエントリーしないフィルターです。
普通に考えれば、こんなフィルターを採用したいと思う人はいないでしょう。このフィルターの条件にエッジが全くないことは誰でもわかるからです。しかし実際にこのフィルター関数を作ってバックテストをすると、何回かに1回は成績が改善します。
サイコロを使ったフィルターの例
興味がある方は、実際に以下のような関数を作ってテストしてみてください。 以下は、1~6の範囲で乱数を生成して6に一致したらエントリーしないフィルターです。
import random # エントリーフィルターの関数 def filter( signal ): num = random.randint(1,6) if num != 6: return True return False
これは全ブレイクアウトのシグナルのうち、およそ1/6をランダムにフィルターにかける関数です。フィルターの頻度はさきほどの出来高フィルターと同程度にしてあります。
早速このフィルターを適用して4回ほどテストしてみます。
すると、先ほどの出来高のフィルターを上回るとてもいい成績が出てしまいました!
----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 96回 勝率 : 42.7% 平均リターン : 3.04% 標準偏差 : 8.75% 平均利益率 : 10.62% 平均損失率 : -2.61% 平均保有期間 : 34.7足分 損切りの回数 : 92回 最大の勝ちトレード : 584043円 最大の負けトレード : -170307円 最大連敗回数 : 10回 最大ドローダウン : -349118円 / -11.1% 利益合計 : 5344595円 損失合計 : -1748506円 最終損益 : 3596089円 初期資金 : 1000000円 最終資金 : 4596089円 運用成績 : 460.0% 手数料合計 : -107435円 ----------------------------------- 各成績指標 ----------------------------------- CAGR(年間成長率) : 827.02% MARレシオ : 32.4 シャープレシオ : 0.35 プロフィットファクター : 3.06 損益レシオ : 4.06
もちろん、いくらバックテストで良い成績が出たからといって、このフィルターを使いたい方はいないと思います。それは誰が聞いても、「サイコロを振ることとトレードの勝率が上がることとの間に何の因果関係もない」ことが明らかだからです。
しかし出来高フィルターやトレンドフィルターのように、一見するともっともらしいトレード理論が背景にある場合には、ついバックテストで良い成績が出ると、その因果関係や相関性をそのまま信じてしまいがちです。
またサイコロのように同じ条件で何回も再テストできないことも、幻相関に気付きにくい要因の1つになります。
フィルターの罠
フィルターにこのような錯誤がおこりやすいのは、そもそも以前の記事でも説明したように、トレンドフォローBOTの勝率が悪いことも1つの原因だと思います。
ブレイクアウトBOTは「たまに来る大勝ち」に賭けるタイプのトレードです。そのため、少ない回数しかトレードをしなかった場合、通常は負ける確率の方が高いはずです。
例えば、100回のトレードのうち5回のトレードをランダムに選んでフィルターにかけた場合、フィルターにかかったトレードは、平均の期待リターンを下回っている可能性が高いと思います。
つまりフィルターの条件に一致する回数が少なすぎる場合、平均リターンを下回るトレードだけが偶然フィルターに選ばれる確率も高くなり、フィルターを適用した結果、まるで成績が向上したように見える、という問題があります。
有効性のあるフィルターを見分ける
個人的にそのフィルターが役立つかどうかを見分ける1つの方法は、フィルター条件のシグナルが出たトレードと出ていないトレードをすべて実行し、両方の結果を別々に集計して、リターン分布をヒストグラムにすることです。
例えば、以下のようなヒストグラムを作ります。
▽ フィルター条件と一致したトレード(上)と一致しなかったトレード(下)のリターン分布図
数値上は勝率や期待リターンに違いがある場合でも、このようにリターン分布図にプロットしてみると、「あれ? ほとんど形状が変わらないな…。もしかして、ランダムにトレードを選んだのと同じかな?」と気づきやすくなります。
pythonだと気軽にPandasを使ってこのような比較ができるので便利です。簡単なのでやってみましょう!
Pythonコード
さきほどの出来高のフィルター関数を少しアレンジしてみましょう。
エントリー機会を絞るのではなく、単にフィルターのシグナルが出たことを「記録」しておくだけのコードに変更してみます。例えば、以下のような感じです。
# エントリーフィルターの関数 def filter( flag ): average_volume = sum(i["volume"] for i in last_data[-1 * buy_term:]) / buy_term if data["volume"] > average_volume * 1.2: flag["records"]["volume"].append("high_volume") else: flag["records"]["volume"].append("low_volume") return flag
そして全てのトレードが終わったあとに、pandasで集計します。このやり方は以前に「バックテスト編」で、売りと買いの成績を別々に集計するときに解説した方法と全く同じ手順です。なので詳しくはそちらを参考にしてください。
以下、バックテスト用のコードだけ載せておきます。
# バックテストの集計用の関数 def backtest(flag): # 成績を記録したpandas DataFrameを作成 records = pd.DataFrame({ "Date" : pd.to_datetime(flag["records"]["date"]), "Profit" : flag["records"]["profit"], "Side" : flag["records"]["side"], "Rate" : flag["records"]["return"], "Stop" : flag["records"]["stop-count"], "Periods" : flag["records"]["holding-periods"], "Slippage" : flag["records"]["slippage"], "Volume" : flag["records"]["volume"] # 追記 }) # 出来高フィルターにかかった場面とかかってない場面を集計 high_vol_records = records[records.Volume.isin(["high_volume"])] low_vol_records = records[records.Volume.isin(["low_volume"])] print("バックテストの結果") print("-----------------------------------") print("出来高が多かったときの成績") print("-----------------------------------") print("トレード回数 : {}回".format( len(high_vol_records) )) print("勝率 : {}%".format(round(len(high_vol_records[high_vol_records.Profit>0]) / len(high_vol_records) * 100,1))) print("平均リターン : {}%".format(round(high_vol_records.Rate.mean(),2))) print("総損益 : {}円".format( high_vol_records.Profit.sum() )) print("平均保有期間 : {}足分".format( round(high_vol_records.Periods.mean(),1) )) print("損切りの回数 : {}回".format( high_vol_records.Stop.sum() )) print("-----------------------------------") print("出来高が少なかったときの成績") print("-----------------------------------") print("トレード回数 : {}回".format( len(low_vol_records) )) print("勝率 : {}%".format(round(len(low_vol_records[low_vol_records.Profit>0]) / len(low_vol_records) * 100,1))) print("平均リターン : {}%".format(round(low_vol_records.Rate.mean(),2))) print("総損益 : {}円".format( low_vol_records.Profit.sum() )) print("平均保有期間 : {}足分".format( round(low_vol_records.Periods.mean(),1) )) print("損切りの回数 : {}回".format( low_vol_records.Stop.sum() )) # (略) # 「出来高が多いとき」のリターン分布図 plt.subplot(2,1,1) plt.hist( high_vol_records.Rate,50,rwidth=0.9) plt.xlim(-15,45) # X軸の目盛り幅を揃える plt.axvline( x=0,linestyle="dashed",label="Return = 0" ) plt.axvline( high_vol_records.Rate.mean(), color="orange", label="AverageReturn" ) plt.legend() # 「出来高が少ないとき」のリターン分布図 plt.subplot(2,1,2) plt.hist( low_vol_records.Rate,50,rwidth=0.9,color="coral") plt.xlim(-15,45) # X軸の目盛り幅を揃える plt.gca().invert_yaxis() # 上下反転 plt.axvline( x=0,linestyle="dashed",label="Return = 0" ) plt.axvline( low_vol_records.Rate.mean(), color="orange", label="AverageReturn" ) plt.show()
比較しやすいようにリターン分布図を上下にプロットしたいので、X軸の目盛り幅を、xlim(-15,45)で揃えています。またsubplot(2,1,n)で2行1列(縦向き)にプロットし、下側の図を .invert_yaxis() で上下反転させています。
では実行してみましょう!
出来高の大小それぞれのリターン分布
今回は純粋にエントリー(フィルター)の条件にエッジがあるかどうかだけが知りたいので、トレイリングストップは無効にして、30期間ブレイクアウト以外の条件は何もつけずに検証してみます。
私もこの方法が正解かはわかりませんが、手仕舞いの方法が優れていると結果が歪んでしまう可能性がある気がするので、なるべく最低限の条件(ドテンルールのみ、または通常のストップのみ)で検証した方がわかりやすいと思います。
▽ ストップ無し(ドテンルールのみ)
出来高多 | 出来高少 | |
---|---|---|
回数 | 55回 | 15回 |
勝率 | 49.1% | 33.3% |
期待リターン | 3.39% | 0.46% |
総損益 | 2489447円 | -141504円 |
平均保有 | 86.2期間 | 77.1期間 |
たしかに数値だけ見れば、「エントリー時に出来高が少なかったトレード」は、勝率も期待リターンも悪いように見えます。しかし前述のように、リターン分布図を見ると、一見して明らかなほどの形状の違いはありません。一応、通常のストップを用いた場合も見ておきましょう。
▽ 通常のストップのみ
この違いがわかりにくいのは、前述のように「フィルター条件に一致した回数」が少なすぎるからです。
例えば、前回の記事で紹介したような「終値が長期移動平均線より上にあるか下にあるか?」といったフィルター条件の場合、全トレードのうちおよそ半分がフィルター条件に引っかかります。
そのため、それぞれのリターン分布図は以下のように、比較的わかりやすい分布形状の違いとして現れます。
前回のトレンドフィルターの例
トレンドと一致 | トレンドと不一致 | |
---|---|---|
回数 | 55回 | 55回 |
勝率 | 45.5% | 27.3% |
期待リターン | 3.61% | -0.44% |
総損益 | 2393675円 | -547580円 |
平均保有 | 52.3期間 | 36.7期間 |
損切り | 25回 | 32回 |
トレンドと一致した方向のブレイクアウトでは、リターンは幅広く右側にバラつき、期待リターンは正です。一方、トレンドと一致しない方向にエントリーしたケースでは、リターン分布は左右対称に近い形になっており、期待リターンはマイナスの数値になっています。
これでも完璧に確信が持てるわけではありませんが、それぞれ50回以上のトレード数でシグナルが出ていて、かつこれだけ分布に違いがあれば、トレンドフィルターの有効性を「試す価値がある」と思えるかもしれません。
pythonで「t検定」をしてみよう!
では、リターン分布で明らかな違いがわからないときは、そのままフィルターのアイデアを捨てた方がいいのでしょうか?
もちろん、上記の出来高フィルターにまだ利用価値がないと決まったわけではありません。フィルターにかかった回数が少なすぎて、ただの偶然かどうか判断が難しいというだけです。そこで考え方は2つあります。
(1)観察期間を伸ばしてもっとサンプル数を確保する
(2)統計的な検定手法を使う
一見、データ数が少なすぎて人間の目では直感的にわからない場合でも、統計的な検定手法を使うことで、両者の期待リターンの差が「偶然で説明できる範囲を超えた明かな違いかどうか?」を科学的に検証することができます。
今回は「t検定」という方法を紹介します!
統計の知識がない方でもわかるように説明するので、読んでくれると嬉しいです!
t検定とは
t検定とは、あるグループ(A)の結果とあるグループ(B)の結果の違いが偶然のバラつき(誤差)から生じる確率を計算して、その確率が5%以下なら「この差は偶然ではない」と結論付ける統計手法のことをいいます。
例えば、上記の例では、ブレイクアウト時に平均的な出来高を上回った場合と、出来高が少なかった場合とを比較しました。このとき期待リターンには 3.27% - 0.36% = 2.91% の開きがありましたね。
t検定では、最初に「両者のトレードはランダムに振り分けられただけで元の期待値は全く同じだ」という一番嬉しくない仮説を立てます。その上で、「もしその仮説が正しい場合、ただの偶然からこれだけの差が生じる確率はどのくらいあるのか?」を計算します。この確率のことをp値といいます。
もしp値が5%以下であれば、「ただの偶然にしては差が大きすぎる。だから最初の仮説は間違っていた」(つまり偶然ではなく出来高フィルターに有効性があった)という結論を下します。
△ 図の「2」は適当な数字で、実際はサンプル数によって変わります。
なお、t検定をちゃんと理解したい方には、以下の本がおすすめです。
私はほとんどの統計学の固い教科書が理解できずに挫折しましたが(笑)、西内さんの著書シリーズを読んで、だいぶ統計学の基礎的なところが理解できるようになりました。[実践編]がいいです。
Pythonでt検定する方法
実際のt検定の計算式は少しだけ複雑ですが、pythonの「Scipy」というライブラリを使えば、1行書くだけでp値を計算してくれます。これも手を動かしてやってみましょう!
▽p値を計算するコード
from scipy import stats import numpy as np sample_a = [4,3,3,3,5,-1,2,.......,3] # 条件1の全データの配列 sample_b = [2,-2,-1,3,5,-4,.......,2] # 条件2の全データの配列 (ndarray) p = stats.ttest_ind(sample_a, sample_b) print("p値 : {}".format(p[1]))
参考:scipyの公式リファランス
参考:t検定とpython
コードの解説
それぞれのサンプルデータには、ただの配列(リスト)ではなく、numpy型の配列(ndarray)を用意しなければならない点に注意してください。pandasで集計した列データをndarrayに変換するには、.values を使います。
また比較したいグループ(A)と(B)のデータ数が大体同じであれば、上記のコードのままで大丈夫です。ですが、両方の結果のサンプル数に大きな違いがある場合は、以下のように equal_var = False を付けてください。
p = stats.ttest_ind(sample_a, sample_b, equal_var = False)
※ 以下、興味がある方のために補足しますが、全く意味がわからない方は心配しないで飛ばしてください!
t検定を使うためには両グループの母集団が等分散であるという仮定を置く必要があります。サンプル数が同程度であれば問題ありませんが、そうでない場合は、等分散の仮定をおかない「ウェルチ検定」という類似手法を使う必要があるので、equal_var(等分散)をFalseに設定しています。
では、さきほどpandasで集計したデータからp値を計算するコードを作りましょう!
バックテストの検証コード
from scipy import stats import numpy as np # バックテストの集計用の関数 def backtest(flag): # (略) # T検定を実行する sample_a = high_vol_records.Rate.values sample_b = low_vol_records.Rate.values print("------------------------------------------") print("t検定を実行") print("------------------------------------------") p = stats.ttest_ind(sample_a, sample_b, equal_var = False) print("p値 : {}".format(p[1]))
簡単ですね。
では、これで「出来高が平均を上回るときのブレイクアウト」と「それ以下の出来高のときのブレイクアウト」とのエントリーに、偶然や誤差の範囲を超えるほどの期待リターンの違いがあったのかどうか、結果を見てみましょう!
実行結果
▽ドテンルールのみの場合(ストップ無し)
p値: 0.34626205887421146
▽通常のストップ有りの場合
p値 : 0.4575055281798647
p値はいずれも5%を大幅に上回り、単なる偶然の可能性を否定できない水準(全くランダムなフィルターでも34~45%程度の確率でこのくらいの差が生じる)という結果になりました。
このように統計ツールは万能ではなく、単にヒストグラムから予想された通りの結果を示すだけのことも多いですが、一応、統計的な検定方法を知っておいて損はないと思います!
トレンドフィルターのp値
せっかくなので、さきほどの移動平均線(200MA)を使ったトレンドフィルターのp値も確認しておきましょう。
▽ ドテンルールのみの場合(ストップ無し)
p値 : 0.007921851375299075
▽通常のストップ有りの場合
p値 : 0.012742691982286171
いずれもp値は5%を明らかに下回る水準となりました。つまり「偶然だけでこれだけの期待リターンの差が生じる可能性は考えにくい」ということです。
これは(少なくともヒストリカルデータの検証期間においては)トレンドを使ってエントリー条件に何らかのフィルターをかけることは、統計的な合理性があったことを意味します。
補足
なお、この記事の検証結果は「出来高を使ったフィルターに有用性がない」という意味ではありません。単に私が適当に考えた「過去30期間の出来高を20%上回る場合」という仕掛けのフィルターに、有意なエッジが無かったというだけなので、もっと研究する価値はあると思います。
以上でトレンドフォローBOTのフィルター編は終わりです!
今回使ったコード
一応、今回の出来高フィルターの検証に使ったコードを記載しておきますが、今回のコードはかなり適当に変数名をつけていて、あまり読みやすく作ってません。
フィルター条件の一致を確認する配列だけだと、バックテストがポジションを持ったまま終了したときに数が合わなくなるので、それを手仕舞いのたびに filter-match から volume にコピーして移す、という処理をしていますが、そこがわかりにくいと思います。
なので、参考程度にしてください。
import requests from datetime import datetime from scipy import stats import time import matplotlib.pyplot as plt import pandas as pd import numpy as np #--------設定項目-------- chart_sec = 3600 # 1時間足を使用 buy_term = 30 # 買いエントリーのブレイク期間の設定 sell_term = 30 # 売りエントリーのブレイク期間の設定 judge_price={ "BUY" : "close_price", # ブレイク判断 高値(high_price)か終値(close_price)を使用 "SELL": "close_price" # ブレイク判断 安値 (low_price)か終値(close_price)を使用 } TEST_MODE_LOT = "fixed" # fixed なら常に1BTC固定 / adjustable なら可変ロット volatility_term = 30 # 平均ボラティリティの計算に使う期間 stop_range = 2 # 何レンジ幅にストップを入れるか trade_risk = 0.03 # 1トレードあたり口座の何%まで損失を許容するか levarage = 3 # レバレッジ倍率の設定 start_funds = 1000000 # シミュレーション時の初期資金 entry_times = 2 # 何回に分けて追加ポジションを取るか entry_range = 1 # 何レンジごとに追加ポジションを取るか stop_config = "ON" # ON / OFF / TRAILING の3つが設定可 stop_AF = 0.02 # 加速係数 stop_AF_add = 0.02 # 加速係数を増やす度合 stop_AF_max = 0.2 # 加速係数の上限 wait = 0 # ループの待機時間 slippage = 0.001 # 手数料・スリッページ #-------------検証したいフィルター-------------- # エントリーフィルターの関数 def filter( flag ): average_volume = sum(i["volume"] for i in last_data[-30:]) / 30 if data["volume"] > average_volume * 1.2: flag["records"]["filter-match"] = "high_volume" else: flag["records"]["filter-match"] = "low_volume" return flag #-------------補助ツールの関数-------------- # CryptowatchのAPIを使用する関数 def get_price(min, before=0, after=0): price = [] params = {"periods" : min } if before != 0: params["before"] = before if after != 0: params["after"] = after response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params) data = response.json() if data["result"][str(min)] is not None: for i in data["result"][str(min)]: if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0: price.append({ "close_time" : i[0], "close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'), "open_price" : i[1], "high_price" : i[2], "low_price" : i[3], "close_price": i[4], "volume": i[5] }) return price else: flag["records"]["log"].append("データが存在しません") return None # 時間と高値・安値をログに記録する関数 def log_price( data,flag ): log = "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) + " 終値: " + str(data["close_price"]) + "\n" flag["records"]["log"].append(log) return flag # 平均ボラティリティを計算する関数 def calculate_volatility( last_data ): high_sum = sum(i["high_price"] for i in last_data[-1 * volatility_term :]) low_sum = sum(i["low_price"] for i in last_data[-1 * volatility_term :]) volatility = round((high_sum - low_sum) / volatility_term) flag["records"]["log"].append("現在の{0}期間の平均ボラティリティは{1}円です\n".format( volatility_term, volatility )) return volatility # 単純移動平均を計算する関数 def calculate_MA( value,before=None ): if before is None: MA = sum(i["close_price"] for i in last_data[-1*value:]) / value else: MA = sum(i["close_price"] for i in last_data[-1*value + before: before]) / value return round(MA) # 指数移動平均を計算する関数 def calculate_EMA( value,before=None ): if before is not None: MA = sum(i["close_price"] for i in last_data[-2*value + before : -1*value + before]) / value EMA = (last_data[-1*value + before]["close_price"] * 2 / (value+1)) + (MA * (value-1) / (value+1)) for i in range(value-1): EMA = (last_data[-1*value+before+1 + i]["close_price"] * 2 /(value+1)) + (EMA * (value-1) / (value+1)) else: MA = sum(i["close_price"] for i in last_data[-2*value: -1*value]) / value EMA = (last_data[-1*value]["close_price"] * 2 / (value+1)) + (MA * (value-1) / (value+1)) for i in range(value-1): EMA = (last_data[-1*value+1 + i]["close_price"] * 2 /(value+1)) + (EMA * (value-1) / (value+1)) return round(EMA) #-------------資金管理の関数-------------- # 注文ロットを計算する関数 def calculate_lot( last_data,data,flag ): # 固定ロットでのテスト時 if TEST_MODE_LOT == "fixed": flag["records"]["log"].append("固定ロット(1枚)でテスト中のため、1BTCを注文します\n") lot = 1 volatility = calculate_volatility( last_data ) stop = stop_range * volatility flag["position"]["ATR"] = round( volatility ) return lot,stop,flag # 口座残高を取得する balance = flag["records"]["funds"] # 最初のエントリーの場合 if flag["add-position"]["count"] == 0: # 1回の注文単位(ロット数)と、追加ポジの基準レンジを計算する volatility = calculate_volatility( last_data ) stop = stop_range * volatility calc_lot = np.floor( balance * trade_risk / stop * 100 ) / 100 flag["add-position"]["unit-size"] = np.floor( calc_lot / entry_times * 100 ) / 100 flag["add-position"]["unit-range"] = round( volatility * entry_range ) flag["add-position"]["stop"] = stop flag["position"]["ATR"] = round( volatility ) flag["records"]["log"].append("\n現在のアカウント残高は{}円です\n".format( balance )) flag["records"]["log"].append("許容リスクから購入できる枚数は最大{}BTCまでです\n".format( calc_lot )) flag["records"]["log"].append("{0}回に分けて{1}BTCずつ注文します\n".format( entry_times, flag["add-position"]["unit-size"] )) # 2回目以降のエントリーの場合 else: balance = round( balance - flag["position"]["price"] * flag["position"]["lot"] / levarage ) # ストップ幅には、最初のエントリー時に計算したボラティリティを使う stop = flag["add-position"]["stop"] # 実際に購入可能な枚数を計算する able_lot = np.floor( balance * levarage / data["close_price"] * 100 ) / 100 lot = min(able_lot, flag["add-position"]["unit-size"]) flag["records"]["log"].append("証拠金から購入できる枚数は最大{}BTCまでです\n".format( able_lot )) return lot,stop,flag # 複数回に分けて追加ポジションを取る関数 def add_position( data,flag ): # ポジションがない場合は何もしない if flag["position"]["exist"] == False: return flag # 固定ロット(1BTC)でのテスト時は何もしない if TEST_MODE_LOT == "fixed": return flag # 最初(1回目)のエントリー価格を記録 if flag["add-position"]["count"] == 0: flag["add-position"]["first-entry-price"] = flag["position"]["price"] flag["add-position"]["last-entry-price"] = flag["position"]["price"] flag["add-position"]["count"] += 1 while True: # 以下の場合は、追加ポジションを取らない if flag["add-position"]["count"] >= entry_times: return flag # この関数の中で使う変数を用意 first_entry_price = flag["add-position"]["first-entry-price"] last_entry_price = flag["add-position"]["last-entry-price"] unit_range = flag["add-position"]["unit-range"] current_price = data["close_price"] # 価格がエントリー方向に基準レンジ分だけ進んだか判定する should_add_position = False if flag["position"]["side"] == "BUY" and (current_price - last_entry_price) > unit_range: should_add_position = True elif flag["position"]["side"] == "SELL" and (last_entry_price - current_price) > unit_range: should_add_position = True else: break # 基準レンジ分進んでいれば追加注文を出す if should_add_position == True: flag["records"]["log"].append("\n前回のエントリー価格{0}円からブレイクアウトの方向に{1}ATR({2}円)以上動きました\n".format( last_entry_price, entry_range, round( unit_range ) )) flag["records"]["log"].append("{0}/{1}回目の追加注文を出します\n".format(flag["add-position"]["count"] + 1, entry_times)) # 注文サイズを計算 lot,stop,flag = calculate_lot( last_data,data,flag ) if lot < 0.01: flag["records"]["log"].append("注文可能枚数{}が、最低注文単位に満たなかったので注文を見送ります\n".format(lot)) flag["add-position"]["count"] += 1 return flag # 追加注文を出す if flag["position"]["side"] == "BUY": entry_price = first_entry_price + (flag["add-position"]["count"] * unit_range) #entry_price = round((1 + slippage) * entry_price) flag["records"]["log"].append("現在のポジションに追加して、{0}円で{1}BTCの買い注文を出します\n".format(entry_price,lot)) # ここに買い注文のコードを入れる if flag["position"]["side"] == "SELL": entry_price = first_entry_price - (flag["add-position"]["count"] * unit_range) #entry_price = round((1 - slippage) * entry_price) flag["records"]["log"].append("現在のポジションに追加して、{0}円で{1}BTCの売り注文を出します\n".format(entry_price,lot)) # ここに売り注文のコードを入れる # ポジション全体の情報を更新する flag["position"]["stop"] = stop flag["position"]["price"] = int(round(( flag["position"]["price"] * flag["position"]["lot"] + entry_price * lot ) / ( flag["position"]["lot"] + lot ))) flag["position"]["lot"] = np.round( (flag["position"]["lot"] + lot) * 100 ) / 100 if flag["position"]["side"] == "BUY": flag["records"]["log"].append("{0}円の位置にストップを更新します\n".format(flag["position"]["price"] - stop)) elif flag["position"]["side"] == "SELL": flag["records"]["log"].append("{0}円の位置にストップを更新します\n".format(flag["position"]["price"] + stop)) flag["records"]["log"].append("現在のポジションの取得単価は{}円です\n".format(flag["position"]["price"])) flag["records"]["log"].append("現在のポジションサイズは{}BTCです\n\n".format(flag["position"]["lot"])) flag["add-position"]["count"] += 1 flag["add-position"]["last-entry-price"] = entry_price return flag # トレイリングストップの関数 def trail_stop( data,flag ): # まだ追加ポジションの取得中であれば何もしない if flag["add-position"]["count"] < entry_times and TEST_MODE_LOT != "fixed": return flag # 高値/安値がエントリー価格からいくら離れたか計算 if flag["position"]["side"] == "BUY": moved_range = round( data["high_price"] - flag["position"]["price"] ) if flag["position"]["side"] == "SELL": moved_range = round( flag["position"]["price"] - data["low_price"] ) # 最高値・最安値を更新したか調べる if moved_range < 0 or flag["position"]["stop-EP"] >= moved_range: return flag else: flag["position"]["stop-EP"] = moved_range # 加速係数に応じて損切りラインを動かす flag["position"]["stop"] = round(flag["position"]["stop"] - ( moved_range + flag["position"]["stop"] ) * flag["position"]["stop-AF"]) # 加速係数を更新 flag["position"]["stop-AF"] = round( flag["position"]["stop-AF"] + stop_AF_add ,2 ) if flag["position"]["stop-AF"] >= stop_AF_max: flag["position"]["stop-AF"] = stop_AF_max # ログ出力 if flag["position"]["side"] == "BUY": flag["records"]["log"].append("トレイリングストップの発動:ストップ位置を{}円に動かして、加速係数を{}に更新します\n".format( round(flag["position"]["price"] - flag["position"]["stop"]) , flag["position"]["stop-AF"] )) else: flag["records"]["log"].append("トレイリングストップの発動:ストップ位置を{}円に動かして、加速係数を{}に更新します\n".format( round(flag["position"]["price"] + flag["position"]["stop"]) , flag["position"]["stop-AF"] )) return flag #-------------売買ロジックの部分の関数-------------- # ドンチャンブレイクを判定する関数 def donchian( data,last_data ): highest = max(i["high_price"] for i in last_data[ (-1* buy_term): ]) if data[ judge_price["BUY"] ] > highest: return {"side":"BUY","price":highest} lowest = min(i["low_price"] for i in last_data[ (-1* sell_term): ]) if data[ judge_price["SELL"] ] < lowest: return {"side":"SELL","price":lowest} return {"side" : None , "price":0} # エントリー注文を出す関数 def entry_signal( data,last_data,flag ): signal = donchian( data,last_data ) if signal["side"] == "BUY": flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の価格が{2}円でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]])) # フィルター条件を確認 flag = filter( flag ) lot,stop,flag = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("{0}円で{1}BTCの買い注文を出します\n".format(data["close_price"],lot)) # ここに買い注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] - stop)) flag["position"]["lot"],flag["position"]["stop"] = lot,stop flag["position"]["exist"] = True flag["position"]["side"] = "BUY" flag["position"]["price"] = data["close_price"] else: flag["records"]["log"].append("注文可能枚数{}が、最低注文単位に満たなかったので注文を見送ります\n".format(lot)) if signal["side"] == "SELL": flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の価格が{2}円でブレイクしました\n".format(sell_term,signal["price"],data[judge_price["SELL"]])) # フィルター条件を確認 flag = filter( flag ) lot,stop,flag = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("{0}円で{1}BTCの売り注文を出します\n".format(data["close_price"],lot)) # ここに売り注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] + stop)) flag["position"]["lot"],flag["position"]["stop"] = lot,stop flag["position"]["exist"] = True flag["position"]["side"] = "SELL" flag["position"]["price"] = data["close_price"] else: flag["records"]["log"].append("注文可能枚数{}が、最低注文単位に満たなかったので注文を見送ります\n".format(lot)) return flag # 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数 def close_position( data,last_data,flag ): if flag["position"]["exist"] == False: return flag flag["position"]["count"] += 1 signal = donchian( data,last_data ) if flag["position"]["side"] == "BUY": if signal["side"] == "SELL": flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の価格が{2}円でブレイクしました\n".format(sell_term,signal["price"],data[judge_price["SELL"]])) flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,data["close_price"] ) flag["position"]["exist"] = False flag["position"]["count"] = 0 flag["position"]["stop-AF"] = stop_AF flag["position"]["stop-EP"] = 0 flag["add-position"]["count"] = 0 # ドテン注文の箇所 flag = filter( flag ) lot,stop,flag = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("\n{0}円で{1}BTCの売りの注文を入れてドテンします\n".format(data["close_price"],lot)) # ここに売り注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] + stop)) flag["position"]["lot"],flag["position"]["stop"] = lot,stop flag["position"]["exist"] = True flag["position"]["side"] = "SELL" flag["position"]["price"] = data["close_price"] if flag["position"]["side"] == "SELL": if signal["side"] == "BUY": flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の価格が{2}円でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]])) flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,data["close_price"] ) flag["position"]["exist"] = False flag["position"]["count"] = 0 flag["position"]["stop-AF"] = stop_AF flag["position"]["stop-EP"] = 0 flag["add-position"]["count"] = 0 # ドテン注文の箇所 flag = filter( flag ) lot,stop,flag = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("\n{0}円で{1}BTCの買いの注文を入れてドテンします\n".format(data["close_price"],lot)) # ここに買い注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] - stop)) flag["position"]["lot"],flag["position"]["stop"] = lot,stop flag["position"]["exist"] = True flag["position"]["side"] = "BUY" flag["position"]["price"] = data["close_price"] return flag # 損切ラインにかかったら成行注文で決済する関数 def stop_position( data,flag ): # トレイリングストップを実行 if stop_config == "TRAILING": flag = trail_stop( data,flag ) if flag["position"]["side"] == "BUY": stop_price = flag["position"]["price"] - flag["position"]["stop"] if data["low_price"] < stop_price: flag["records"]["log"].append("{0}円の損切ラインに引っかかりました。\n".format( stop_price )) stop_price = round( stop_price - 2 * calculate_volatility(last_data) / ( chart_sec / 60) ) flag["records"]["log"].append(str(stop_price) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,stop_price,"STOP" ) flag["position"]["exist"] = False flag["position"]["count"] = 0 flag["position"]["stop-AF"] = stop_AF flag["position"]["stop-EP"] = 0 flag["add-position"]["count"] = 0 if flag["position"]["side"] == "SELL": stop_price = flag["position"]["price"] + flag["position"]["stop"] if data["high_price"] > stop_price: flag["records"]["log"].append("{0}円の損切ラインに引っかかりました。\n".format( stop_price )) stop_price = round( stop_price + 2 * calculate_volatility(last_data) / (chart_sec / 60) ) flag["records"]["log"].append(str(stop_price) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,stop_price,"STOP" ) flag["position"]["exist"] = False flag["position"]["count"] = 0 flag["position"]["stop-AF"] = stop_AF flag["position"]["stop-EP"] = 0 flag["add-position"]["count"] = 0 return flag #------------バックテストの部分の関数-------------- # 各トレードのパフォーマンスを記録する関数 def records(flag,data,close_price,close_type=None): flag["records"]["volume"].append(flag["records"]["filter-match"]) # 取引手数料等の計算 entry_price = int(round(flag["position"]["price"] * flag["position"]["lot"])) exit_price = int(round(close_price * flag["position"]["lot"])) trade_cost = round( exit_price * slippage ) log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n" flag["records"]["log"].append(log) flag["records"]["slippage"].append(trade_cost) # 手仕舞った日時と保有期間を記録 flag["records"]["date"].append(data["close_time_dt"]) flag["records"]["holding-periods"].append( flag["position"]["count"] ) # 損切りにかかった回数をカウント if close_type == "STOP": flag["records"]["stop-count"].append(1) else: flag["records"]["stop-count"].append(0) # 値幅の計算 buy_profit = exit_price - entry_price - trade_cost sell_profit = entry_price - exit_price - trade_cost # 利益が出てるかの計算 if flag["position"]["side"] == "BUY": flag["records"]["side"].append( "BUY" ) flag["records"]["profit"].append( buy_profit ) flag["records"]["return"].append( round( buy_profit / entry_price * 100, 4 )) flag["records"]["funds"] = flag["records"]["funds"] + buy_profit if buy_profit > 0: log = str(buy_profit) + "円の利益です\n\n" flag["records"]["log"].append(log) else: log = str(buy_profit) + "円の損失です\n\n" flag["records"]["log"].append(log) if flag["position"]["side"] == "SELL": flag["records"]["side"].append( "SELL" ) flag["records"]["profit"].append( sell_profit ) flag["records"]["return"].append( round( sell_profit / entry_price * 100, 4 )) flag["records"]["funds"] = flag["records"]["funds"] + sell_profit if sell_profit > 0: log = str(sell_profit) + "円の利益です\n\n" flag["records"]["log"].append(log) else: log = str(sell_profit) + "円の損失です\n\n" flag["records"]["log"].append(log) return flag # バックテストの集計用の関数 def backtest(flag): # 成績を記録したpandas DataFrameを作成 records = pd.DataFrame({ "Date" : pd.to_datetime(flag["records"]["date"]), "Profit" : flag["records"]["profit"], "Side" : flag["records"]["side"], "Rate" : flag["records"]["return"], "Stop" : flag["records"]["stop-count"], "Periods" : flag["records"]["holding-periods"], "Slippage" : flag["records"]["slippage"], "Volume" : flag["records"]["volume"] }) # 総損益の列を追加する records["Gross"] = records.Profit.cumsum() # 資産推移の列を追加する records["Funds"] = records.Gross + start_funds # 最大ドローダウンの列を追加する records["Drawdown"] = records.Funds.cummax().subtract(records.Funds) records["DrawdownRate"] = round(records.Drawdown / records.Funds.cummax() * 100,1) # フィルター有無別のトレードをそれぞれ抽出する high_vol_records = records[records.Volume.isin(["high_volume"])] low_vol_records = records[records.Volume.isin(["low_volume"])] print("バックテストの結果") print("-----------------------------------") print("出来高が多かったときの成績") print("-----------------------------------") print("トレード回数 : {}回".format( len(high_vol_records) )) print("勝率 : {}%".format(round(len(high_vol_records[high_vol_records.Profit>0]) / len(high_vol_records) * 100,1))) print("平均リターン : {}%".format(round(high_vol_records.Rate.mean(),2))) print("総損益 : {}円".format( high_vol_records.Profit.sum() )) print("平均保有期間 : {}足分".format( round(high_vol_records.Periods.mean(),1) )) print("損切りの回数 : {}回".format( high_vol_records.Stop.sum() )) print("-----------------------------------") print("出来高が少なかったときの成績") print("-----------------------------------") print("トレード回数 : {}回".format( len(low_vol_records) )) print("勝率 : {}%".format(round(len(low_vol_records[low_vol_records.Profit>0]) / len(low_vol_records) * 100,1))) print("平均リターン : {}%".format(round(low_vol_records.Rate.mean(),2))) print("総損益 : {}円".format( low_vol_records.Profit.sum() )) print("平均保有期間 : {}足分".format( round(low_vol_records.Periods.mean(),1) )) print("損切りの回数 : {}回".format( low_vol_records.Stop.sum() )) # ログファイルの出力 file = open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8') file.writelines(flag["records"]["log"]) # T検定 sample_a = high_vol_records.Rate.values sample_b = low_vol_records.Rate.values print("------------------------------------------") print("t検定を実行") print("------------------------------------------") p = stats.ttest_ind(sample_a, sample_b, equal_var = False) print("p値 : {}".format(p[1])) # 「出来高が多いとき」のリターン分布図 plt.subplot(2,1,1) plt.hist( high_vol_records.Rate,50,rwidth=0.9) plt.xlim(-15,45) plt.axvline( x=0,linestyle="dashed",label="Return = 0" ) plt.axvline( high_vol_records.Rate.mean(), color="orange", label="AverageReturn" ) plt.legend() # 「出来高が少ないとき」のリターン分布図 plt.subplot(2,1,2) plt.hist( low_vol_records.Rate,50,rwidth=0.9,color="coral") plt.xlim(-15,45) plt.gca().invert_yaxis() plt.axvline( x=0,linestyle="dashed",label="Return = 0" ) plt.axvline( low_vol_records.Rate.mean(), color="orange", label="AverageReturn" ) plt.show() #------------ここからメイン処理-------------- # 価格チャートを取得 price = get_price(chart_sec,after=1451606400) flag = { "position":{ "exist" : False, "side" : "", "price": 0, "stop":0, "stop-AF": stop_AF, "stop-EP":0, "ATR":0, "lot":0, "count":0 }, "add-position":{ "count":0, "first-entry-price":0, "last-entry-price":0, "unit-range":0, "unit-size":0, "stop":0 }, "records":{ "date":[], "profit":[], "return":[], "side":[], "stop-count":[], "funds" : start_funds, "holding-periods":[], "slippage":[], "log":[], "filter-match":"", "volume":[] } } last_data = [] need_term = max(buy_term,sell_term,volatility_term) i = 0 while i < len(price): # ドンチャンの判定に使う期間分の安値・高値データを準備する if len(last_data) < need_term: last_data.append(price[i]) flag = log_price(price[i],flag) time.sleep(wait) i += 1 continue data = price[i] flag = log_price(data,flag) # ポジションがある場合 if flag["position"]["exist"]: if stop_config != "OFF": flag = stop_position( data,flag ) flag = close_position( data,last_data,flag ) flag = add_position( data,flag ) # ポジションがない場合 else: flag = entry_signal( data,last_data,flag ) last_data.append( data ) i += 1 time.sleep(wait) print("--------------------------") print("テスト期間:") print("開始時点 : " + str(price[0]["close_time_dt"])) print("終了時点 : " + str(price[-1]["close_time_dt"])) print(str(len(price)) + "件のローソク足データで検証") print("--------------------------") backtest(flag)
第5回以外は一通り全部コーディングしてみました。
python自体少し触った程度でしたが、とてもわかりやすくて面白かったです。
まだまだ自分でコーディングすることはできないのでもう一周したいと思います。
有料ノート買おうと思います。