ブレイクアウト戦略では、通常、勝率の悪さをカバーするために出来るだけ大きな利益を狙うため、利益目標(利確ライン)を設定しません。その代わりに利益を取り逃がさないようにトレイリングストップを設定することがあります。
1.トレイリングストップとは
トレイリングストップとは、エントリー後の含み益を逃がさないように、利益が乗るにつれて、最初に設定した損切ラインを少しずつエントリーした方向に動かしていくテクニックのことをいいます。
例えば、上値ブレイクアウトで買いエントリーし、当初はエントリー価格の2レンジ(ATR)幅下に損切りを設定していたとします。この場合、ブレイクアウトに成功して1レンジ上に値が動くたびに、ストップ位置も1レンジずつ上にずらしていきます。
トレイリング(trailing)は英語で「引きずる」という意味です。価格に引きずられるようにストップ位置が動いていくため、トレイリングストップといいます。
2.具体的なロジックの実装
今回は以下のようなルールを定義します。
1)損切りラインはエントリーと同じ方向にのみ動かせる
2)価格(終値)がエントリー方向に1レンジ進むごとに、損切りラインをmレンジ動かす(0≦m≦1)
3)どこまで価格を追いかけるかを選択できるようにする
一般論としてトレイリングストップには、トレンドによる含み益を逃がさない効果があります。
ただしあまりにストップの位置を価格に近づけすぎると、一時的な押し目で退出させられてしまい、本来の売買ロジックが持つパフォーマンスを発揮できなくなります。売買ロジックの期待値を最大限に活かすためには、できるだけ余計な条件を足さない方が得策です。
そのため、価格と損切ラインを同じだけ動かすのではなく、価格が1レンジ動くたびに損切ラインは1/4~1/2レンジだけ動かすなど、より緩いトレイリングの設定ができるようにしておきます(この設定値が上記の2です)。
またどこまでも価格を追従するのではなく、最初のエントリー価格の位置(損益が±0になるライン)までだけストップを動かし、それ以降は追従しないような設定もできるようにしておきましょう。
3.トレイリング・ストップのPythonコード
では実際にpythonコードを作りましょう。
#--------設定項目-------- stop_range = 2 # 最初に何レンジ幅にストップを入れるか trail_ratio = 0.5 # 価格が1レンジ動くたびに何レンジ損切り位置をトレイルするか trail_until_breakeven = True # 損益ゼロの位置までしかトレイルしない # trail_ratio は 0~1 の範囲でのみ設定可 # trail_ratio を 0 に設定するとトレイリングストップを無効にできます # トレイリングストップの関数 def trail_stop( data,flag ): # ポジションの追加取得(増し玉)が終わるまでは何もしない if flag["add-position"]["count"] < entry_times: return flag last_stop = flag["position"]["stop"] # 前回のストップ幅 first_stop = flag["position"]["ATR"] * stop_range # 最初のストップ幅 # 終値がエントリー価格からいくら離れたか計算する if flag["position"]["side"] == "BUY" and data["close_price"] > flag["position"]["price"]: moved_range = round(data["close_price"] - flag["position"]["price"]) elif flag["position"]["side"] == "SELL" and data["close_price"] < flag["position"]["price"]: moved_range = round(flag["position"]["price"] - data["close_price"]) else: moved_range = 0 # 動いたレンジ幅に合わせてストップ位置を更新する if moved_range > flag["position"]["ATR"]: number = int(np.floor(moved_range / flag["position"]["ATR"])) flag["position"]["stop"] = round(first_stop - ( number * flag["position"]["ATR"] * trail_ratio )) # 損益0ラインまでしかトレイルしない場合 if trail_until_breakeven and flag["position"]["stop"] < 0: flag["position"]["stop"] = 0 # ストップがエントリー方向と逆に動いたら更新しない if flag["position"]["stop"] > last_stop: flag["position"]["stop"] = last_stop # ストップが動いた場合のみログ出力 if flag["position"]["stop"] != last_stop: if flag["position"]["side"] == "BUY": flag["records"]["log"].append("トレイリングストップの発動:ストップ位置を{}円に動かします\n".format( round(flag["position"]["price"] - flag["position"]["stop"]) )) else: flag["records"]["log"].append("トレイリングストップの発動:ストップ位置を{}円に動かします\n".format( round(flag["position"]["price"] + flag["position"]["stop"]) )) return flag
新しいローソク足(終値)を取得するたびに、上記の関数でトレイリングストップの値幅を計算します。トレイリングストップの値幅は、以下の式で計算します。
1)最初のストップ幅 = エントリー時の平均ボラティリティ(ATR) × ストップレンジ
2)トレイリング・ストップの値幅 = 最初のストップ幅 - (動いたレンジ数 × ATR × トレイリング率)
1)トレイリング率
トレイリング率(trail_ratio)とは、さきほどの定義ルールのところで説明したm値のことです。「1」に設定すると、動いた価格と同じ値幅分だけストップ位置を動かします。「0.5」を設定すると、動いた価格の半分だけストップ位置を動かします。
1以上を設定すると価格をストップ位置が追い越してしまうので、1以上を設定することはありえません。そのため、メイン処理に以下のようなコードを書いておきましょう。
# トレイリングの比率に0~1以上の数値を設定できないようにする if trail_ratio > 1: trail_ratio = 1 elif trail_ratio < 0: trail_ratio = 0
2)損益ゼロのラインの計算
上記の計算式でトレイリングストップの値幅を計算すると、買いエントリーでトレイリングストップの位置がエントリー価格より上になった場合(または売りエントリーでトレイリングストップの位置がエントリー価格より下になった場合)に、ストップ幅がマイナスの値になります。
そのため、損益ゼロのラインまでしかトレイリングしたくない場合には、ストップ幅が0未満かどうかを判定して、0未満であれば0を代入します。
# 設定項目 trail_until_breakeven = True # 損益0ラインまでしかトレイルしない場合 if trail_until_breakeven and flag["position"]["stop"] < 0: flag["position"]["stop"] = 0
これを応用すると、含み益の最大値の50%を守る位置にトレイリングストップを置くこともできます。この方法は後述します。
3)ストップ位置をエントリー方向にのみ動かす
一般論として、トレイリングストップには片方向(エントリーと同じ方向)にしか動かしてはいけないというルールがあります。逆向きに動かしていいのであれば、いつまでたっても損切りラインにかからず、損切ラインを動かす意味がないからです。
そのため、最初に前回のストップ幅を記録しておき、新しく計算したストップ幅と比較します。ストップ幅はどんどん小さくならないといけないので、前回の数値よりも大きければ、前回のストップ幅をそのまま採用します。
# 前回のストップ幅 last_stop = flag["position"]["stop"] # ストップがエントリー方向と逆に動いたら更新しない if flag["position"]["stop"] > last_stop: flag["position"]["stop"] = last_stop
損切りの関数
トレイリングストップの関数が完成したら、あとは以前の記事で作成した「損切りの関数」の先頭で、このトレイリングストップの関数を呼ぶだけです。
# 損切ラインにかかったら成行注文で決済する関数 def stop_position( data,flag ): # トレイリングストップを実行 flag = trail_stop( data,flag )
これで基本となるコードは完成です!
4.実行結果
これを実行してみると、以下のようにストップ位置が移動しているのがわかります。
含み益の50%を守るトレイリングストップ
上記のコードをカスタマイズすると、常に含み益の50%を守る位置にトレイリングストップを置くこともできます。
買いエントリーの場合は直近の高値、売りエントリーの場合は直近の安値をベースに moved_range を計算して、その半分(moved_range × 0.5)をトレイリングストップ幅に代入します。
# トレイリングストップの関数 def trail_stop( data,flag ): # 高値/安値 がエントリー価格からいくら離れたか計算 if flag["position"]["side"] == "BUY" and data["high_price"] > flag["position"]["price"]: moved_range = round(data["high_price"] - flag["position"]["price"]) elif flag["position"]["side"] == "SELL" and data["low_price"] < flag["position"]["price"]: moved_range = round(flag["position"]["price"] - data["low_price"]) else: moved_range = 0 # 動いたレンジ幅に合わせてストップ位置を更新する if moved_range > flag["position"]["ATR"]: number = int(np.floor(moved_range / flag["position"]["ATR"])) flag["position"]["stop"] = round(first_stop - ( number * flag["position"]["ATR"] * trail_ratio )) # 損益ゼロラインを超えたら、含み益の1/2を守る位置にストップを置く if flag["position"]["stop"] < 0: flag["position"]["stop"] = round( moved_range * 0.5 ) * -1 # ストップがエントリー方向と逆に動いたら更新しない if flag["position"]["stop"] > last_stop: flag["position"]["stop"] = last_stop # 以下略
エントリー価格を超えた位置にストップを置く場合は、ストップ幅がマイナスの値になりますので、-1を掛けるのを忘れないようにしましょう。
これを実行すると、買いエントリーの場合は高値を更新するたびに、売りエントリーの場合は安値を更新するたびに、トレイリングストップをピーク時の含み益の50%の位置に置くことができます。
5.検証してみよう!
ではトレイリングストップを採用した場合に、運用成績がどうなるか検証してみましょう。検証には前回の記事と同じ条件を使います。
売買ロジック
・1時間足を使用(2017/8~2018/5)
・上値ブレイクアウト判定期間 30期間
・下値ブレイクアウト判定期間 30期間
・平均ボラティリティ計算期間 30期間
・ブレイクアウトの判定基準(高値/安値)
資金管理
・初期資金30万円
・レバレッジ3倍
・初期ストップのレンジ幅 2ATR
・1トレードで許容するリスク 3%
・分割エントリー2回
1)トレイリングストップを使わない場合
----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 111回 勝率 : 34.2% 平均リターン : 1.26% 平均保有期間 : 45.4足分 損切りの回数 : 57回 最大の勝ちトレード : 650219円 最大の負けトレード : -109947円 最大連敗回数 : 10回 最大ドローダウン : -689533円 / -20.0% 利益合計 : 5796568円 損失合計 : -2452422円 最終損益 : 3344146円 初期資金 : 300000円 最終資金 : 3644146円 運用成績 : 1215.0%
2)価格が1レンジ動くたびにストップ位置を1レンジ動かした場合
平均ボラティリティ(ATR)を基準に、価格がエントリー方向に1レンジ動くたびにストップ位置も同じだけ動かし、損切りにかかるかドテンするまでトレイリングし続けた場合の成績です。
-設定値
trail_ratio = 1
trail_until_breakeven = False
----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 180回 勝率 : 36.7% 平均リターン : -0.25% 平均保有期間 : 18.9足分 損切りの回数 : 169回 最大の勝ちトレード : 109787円 最大の負けトレード : -20211円 最大連敗回数 : 9回 最大ドローダウン : -95210円 / -15.0% 利益合計 : 1233897円 損失合計 : -807878円 最終損益 : 426019円 初期資金 : 300000円 最終資金 : 726019円 運用成績 : 242.0%
3)含み益の50%を保護するようにストップを動かした場合
損益ゼロのラインに達するまでは、価格が1レンジ動くごとにストップを1レンジ動かし、損益ゼロラインに達した後は、ピーク時の含み益の半分を保護する位置に常にトレイリングストップを置いた場合の成績です。
----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 187回 勝率 : 36.4% 平均リターン : -0.36% 平均保有期間 : 18.1足分 損切りの回数 : 177回 最大の勝ちトレード : 140071円 最大の負けトレード : -20211円 最大連敗回数 : 9回 最大ドローダウン : -95108円 / -14.8% 利益合計 : 1185148円 損失合計 : -848540円 最終損益 : 336608円 初期資金 : 300000円 最終資金 : 636608円 運用成績 : 212.0%
4)価格が1レンジ動くたびにストップ位置を1/2レンジ動かした場合
価格が1レンジ動くたびに、ストップ位置は1/2ずつだけ同じ方向に動かした場合の成績です。
-設定値
trail_ratio = 0.5
trail_until_breakeven = False
----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 112回 勝率 : 34.8% 平均リターン : 1.52% 平均保有期間 : 41.4足分 損切りの回数 : 78回 最大の勝ちトレード : 722466円 最大の負けトレード : -116576円 最大連敗回数 : 10回 最大ドローダウン : -624599円 / -16.9% 利益合計 : 5924759円 損失合計 : -2146820円 最終損益 : 3777939円 初期資金 : 300000円 最終資金 : 4077939円 運用成績 : 1359.0%
5)損益±0のラインまでだけトレイリングした場合
価格が1レンジ動くたびにストップ位置は1/2ずつだけ同じ方向に動かし、損切りラインがエントリー価格と同じライン(損益分岐点)に達したラインでトレイリングを辞めた場合の成績です。
-設定値
trail_ratio = 0.5
trail_until_breakeven = True
----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 115回 勝率 : 30.4% 平均リターン : 1.26% 平均保有期間 : 42.2足分 損切りの回数 : 73回 最大の勝ちトレード : 636840円 最大の負けトレード : -102862円 最大連敗回数 : 10回 最大ドローダウン : -550789円 / -16.9% 利益合計 : 5266126円 損失合計 : -1970726円 最終損益 : 3295400円 初期資金 : 300000円 最終資金 : 3595400円 運用成績 : 1198.0%
結論
結果が良かった順に並べると以下のようになりました。
1)トレイリング率 1/2 で損切りラインを動かした場合
2)何もしなかった場合(トレイリングストップ無)
3)トレイリング率 1/2 で損益±0ラインまで動かした場合
4)価格と同じ比率で損切りラインを動かした場合
5)含み益の50%を保護するよう損切りラインを動かした場合
トレイリングの条件を厳しくしすぎると、パフォーマンスは悪化する結果となりました。やはり損切りのラインを厳しくすることで、本来の売買ロジックが持つ期待値や優位性(エッジ)が失われてしまうのかもしれません。
ただしトレイリング比率によっては、「何もしなかった場合」を上回る成績を残したパターンもありました。
練習問題
今回の記事では、価格の動きに従って一定の比率で損切りラインを動かすトレイリングストップの作り方を勉強しました。しかしこの方法は実際のところあまり実践的ではありません。
トレイリングの比率を上げすぎると序盤で損切りにかかってしまい、退出させられてしまう可能性が高くなりますし、比率を下げ過ぎると、今度は価格と損切りラインがどんどん乖離してしまい、「トレンドの含み益を逃がさない」という本来の役割を果たせなくなってしまうからです。
そこで以下の記事では、より実践的なアイデアとして、「長くポジションを保有すればするほど、どんどんトレイリング比率が加速していく」ようなトレイリングストップを紹介します。ぜひ定義を読んで自分でも実装してみてください。
次回記事:パラボリックSARを使って加速するトレイリングストップを作ろう!
5.今回勉強したコード
import requests from datetime import datetime import time import matplotlib.pyplot as plt import pandas as pd import numpy as np #--------設定項目-------- chart_sec = 3600 # 1時間足を使用 buy_term = 30 # 買いエントリーのブレイク期間の設定 sell_term = 30 # 売りエントリーのブレイク期間の設定 judge_price={ "BUY" : "high_price", # ブレイク判断 高値(high_price)か終値(close_price)を使用 "SELL": "low_price" # ブレイク判断 安値 (low_price)か終値(close_price)を使用 } volatility_term = 30 # 平均ボラティリティの計算に使う期間 stop_range = 2 # 何レンジ幅にストップを入れるか trade_risk = 0.03 # 1トレードあたり口座の何%まで損失を許容するか levarage = 3 # レバレッジ倍率の設定 start_funds = 300000 # シミュレーション時の初期資金 entry_times = 2 # 何回に分けて追加ポジションを取るか entry_range = 1 # 何レンジごとに追加ポジションを取るか trail_ratio = 0.5 # 価格が1レンジ動くたびに何レンジ損切り位置をトレイルするか trail_until_breakeven = True # 損益ゼロの位置までしかトレイルしない wait = 0 # ループの待機時間 slippage = 0.001 # 手数料・スリッページ #-------------補助ツールの関数-------------- # CryptowatchのAPIを使用する関数 def get_price(min, before=0, after=0): price = [] params = {"periods" : min } if before != 0: params["before"] = before if after != 0: params["after"] = after response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params) data = response.json() if data["result"][str(min)] is not None: for i in data["result"][str(min)]: if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0: price.append({ "close_time" : i[0], "close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'), "open_price" : i[1], "high_price" : i[2], "low_price" : i[3], "close_price": i[4] }) return price else: flag["records"]["log"].append("データが存在しません") return None # 時間と高値・安値をログに記録する関数 def log_price( data,flag ): log = "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) + " 終値: " + str(data["close_price"]) + "\n" flag["records"]["log"].append(log) return flag # 平均ボラティリティを計算する関数 def calculate_volatility( last_data ): high_sum = sum(i["high_price"] for i in last_data[-1 * volatility_term :]) low_sum = sum(i["low_price"] for i in last_data[-1 * volatility_term :]) volatility = round((high_sum - low_sum) / volatility_term) flag["records"]["log"].append("現在の{0}期間の平均ボラティリティは{1}円です\n".format( volatility_term, volatility )) return volatility #-------------資金管理の関数-------------- # 注文ロットを計算する関数 def calculate_lot( last_data,data,flag ): # 口座残高を取得する balance = flag["records"]["funds"] # 最初のエントリーの場合 if flag["add-position"]["count"] == 0: # 1回の注文単位(ロット数)と、追加ポジの基準レンジを計算する volatility = calculate_volatility( last_data ) stop = stop_range * volatility calc_lot = np.floor( balance * trade_risk / stop * 100 ) / 100 flag["add-position"]["unit-size"] = np.floor( calc_lot / entry_times * 100 ) / 100 flag["add-position"]["unit-range"] = round( volatility * entry_range ) flag["add-position"]["stop"] = stop flag["position"]["ATR"] = round( volatility ) flag["records"]["log"].append("\n現在のアカウント残高は{}円です\n".format( balance )) flag["records"]["log"].append("許容リスクから購入できる枚数は最大{}BTCまでです\n".format( calc_lot )) flag["records"]["log"].append("{0}回に分けて{1}BTCずつ注文します\n".format( entry_times, flag["add-position"]["unit-size"] )) # 2回目以降のエントリーの場合 else: balance = round( balance - flag["position"]["price"] * flag["position"]["lot"] / levarage ) # ストップ幅には、最初のエントリー時に計算したボラティリティを使う stop = flag["add-position"]["stop"] # 実際に購入可能な枚数を計算する able_lot = np.floor( balance * levarage / data["close_price"] * 100 ) / 100 lot = min(able_lot, flag["add-position"]["unit-size"]) flag["records"]["log"].append("証拠金から購入できる枚数は最大{}BTCまでです\n".format( able_lot )) return lot,stop,flag # 複数回に分けて追加ポジションを取る関数 def add_position( data,flag ): # ポジションがない場合は何もしない if flag["position"]["exist"] == False: return flag # 最初(1回目)のエントリー価格を記録 if flag["add-position"]["count"] == 0: flag["add-position"]["first-entry-price"] = flag["position"]["price"] flag["add-position"]["last-entry-price"] = flag["position"]["price"] flag["add-position"]["count"] += 1 while True: # 以下の場合は、追加ポジションを取らない if flag["add-position"]["count"] >= entry_times: return flag # この関数の中で使う変数を用意 first_entry_price = flag["add-position"]["first-entry-price"] last_entry_price = flag["add-position"]["last-entry-price"] unit_range = flag["add-position"]["unit-range"] current_price = data["close_price"] # 価格がエントリー方向に基準レンジ分だけ進んだか判定する should_add_position = False if flag["position"]["side"] == "BUY" and (current_price - last_entry_price) > unit_range: should_add_position = True elif flag["position"]["side"] == "SELL" and (last_entry_price - current_price) > unit_range: should_add_position = True else: break # 基準レンジ分進んでいれば追加注文を出す if should_add_position == True: flag["records"]["log"].append("\n前回のエントリー価格{0}円からブレイクアウトの方向に{1}ATR({2}円)以上動きました\n".format( last_entry_price, entry_range, round( unit_range ) )) flag["records"]["log"].append("{0}/{1}回目の追加注文を出します\n".format(flag["add-position"]["count"] + 1, entry_times)) # 注文サイズを計算 lot,stop,flag = calculate_lot( last_data,data,flag ) if lot < 0.01: flag["records"]["log"].append("注文可能枚数{}が、最低注文単位に満たなかったので注文を見送ります\n".format(lot)) flag["add-position"]["count"] += 1 return flag # 追加注文を出す if flag["position"]["side"] == "BUY": entry_price = first_entry_price + (flag["add-position"]["count"] * unit_range) #entry_price = round((1 + slippage) * entry_price) flag["records"]["log"].append("現在のポジションに追加して、{0}円で{1}BTCの買い注文を出します\n".format(entry_price,lot)) # ここに買い注文のコードを入れる if flag["position"]["side"] == "SELL": entry_price = first_entry_price - (flag["add-position"]["count"] * unit_range) #entry_price = round((1 - slippage) * entry_price) flag["records"]["log"].append("現在のポジションに追加して、{0}円で{1}BTCの売り注文を出します\n".format(entry_price,lot)) # ここに売り注文のコードを入れる # ポジション全体の情報を更新する flag["position"]["stop"] = stop flag["position"]["price"] = int(round(( flag["position"]["price"] * flag["position"]["lot"] + entry_price * lot ) / ( flag["position"]["lot"] + lot ))) flag["position"]["lot"] = np.round( (flag["position"]["lot"] + lot) * 100 ) / 100 if flag["position"]["side"] == "BUY": flag["records"]["log"].append("{0}円の位置にストップを更新します\n".format(flag["position"]["price"] - stop)) elif flag["position"]["side"] == "SELL": flag["records"]["log"].append("{0}円の位置にストップを更新します\n".format(flag["position"]["price"] + stop)) flag["records"]["log"].append("現在のポジションの取得単価は{}円です\n".format(flag["position"]["price"])) flag["records"]["log"].append("現在のポジションサイズは{}BTCです\n\n".format(flag["position"]["lot"])) flag["add-position"]["count"] += 1 flag["add-position"]["last-entry-price"] = entry_price return flag # トレイリングストップの関数 def trail_stop( data,flag ): # まだ追加ポジションの取得中であれば何もしない if flag["add-position"]["count"] < entry_times: return flag last_stop = flag["position"]["stop"] # 前回のストップ幅 first_stop = flag["position"]["ATR"] * stop_range # 最初のストップ幅 # エントリー価格からいくら離れたか計算する if flag["position"]["side"] == "BUY" and data["close_price"] > flag["position"]["price"]: moved_range = round(data["close_price"] - flag["position"]["price"]) elif flag["position"]["side"] == "SELL" and data["close_price"] < flag["position"]["price"]: moved_range = round(flag["position"]["price"] - data["close_price"]) else: moved_range = 0 # 動いたレンジ幅に合わせてストップ位置を更新する if moved_range > flag["position"]["ATR"]: number = int(np.floor(moved_range / flag["position"]["ATR"])) flag["position"]["stop"] = round(first_stop - ( number * flag["position"]["ATR"] * trail_ratio )) # 損益0ラインまでしかトレイルしない場合 if trail_until_breakeven and flag["position"]["stop"] < 0: flag["position"]["stop"] = 0 # ストップがエントリー方向と逆に動いたら更新しない if flag["position"]["stop"] > last_stop: flag["position"]["stop"] = last_stop # ログ出力 if flag["position"]["stop"] != last_stop: if flag["position"]["side"] == "BUY": flag["records"]["log"].append("トレイリングストップの発動:ストップ位置を{}円に動かします\n".format( round(flag["position"]["price"] - flag["position"]["stop"]) )) else: flag["records"]["log"].append("トレイリングストップの発動:ストップ位置を{}円に動かします\n".format( round(flag["position"]["price"] + flag["position"]["stop"]) )) return flag #-------------売買ロジックの部分の関数-------------- # ドンチャンブレイクを判定する関数 def donchian( data,last_data ): highest = max(i["high_price"] for i in last_data[ (-1* buy_term): ]) if data[ judge_price["BUY"] ] > highest: return {"side":"BUY","price":highest} lowest = min(i["low_price"] for i in last_data[ (-1* sell_term): ]) if data[ judge_price["SELL"] ] < lowest: return {"side":"SELL","price":lowest} return {"side" : None , "price":0} # エントリー注文を出す関数 def entry_signal( data,last_data,flag ): signal = donchian( data,last_data ) if signal["side"] == "BUY": flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の価格が{2}円でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]])) lot,stop,flag = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("{0}円で{1}BTCの買い注文を出します\n".format(data["close_price"],lot)) # ここに買い注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] - stop)) flag["position"]["lot"],flag["position"]["stop"] = lot,stop flag["position"]["exist"] = True flag["position"]["side"] = "BUY" flag["position"]["price"] = data["close_price"] else: flag["records"]["log"].append("注文可能枚数{}が、最低注文単位に満たなかったので注文を見送ります\n".format(lot)) if signal["side"] == "SELL": flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の価格が{2}円でブレイクしました\n".format(sell_term,signal["price"],data[judge_price["SELL"]])) lot,stop,flag = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("{0}円で{1}BTCの売り注文を出します\n".format(data["close_price"],lot)) # ここに売り注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] + stop)) flag["position"]["lot"],flag["position"]["stop"] = lot,stop flag["position"]["exist"] = True flag["position"]["side"] = "SELL" flag["position"]["price"] = data["close_price"] else: flag["records"]["log"].append("注文可能枚数{}が、最低注文単位に満たなかったので注文を見送ります\n".format(lot)) return flag # 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数 def close_position( data,last_data,flag ): if flag["position"]["exist"] == False: return flag flag["position"]["count"] += 1 signal = donchian( data,last_data ) if flag["position"]["side"] == "BUY": if signal["side"] == "SELL": flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の価格が{2}円でブレイクしました\n".format(sell_term,signal["price"],data[judge_price["SELL"]])) flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,data["close_price"] ) flag["position"]["exist"] = False flag["position"]["count"] = 0 flag["add-position"]["count"] = 0 lot,stop,flag = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("\n{0}円で{1}BTCの売りの注文を入れてドテンします\n".format(data["close_price"],lot)) # ここに売り注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] + stop)) flag["position"]["lot"],flag["position"]["stop"] = lot,stop flag["position"]["exist"] = True flag["position"]["side"] = "SELL" flag["position"]["price"] = data["close_price"] if flag["position"]["side"] == "SELL": if signal["side"] == "BUY": flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の価格が{2}円でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]])) flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,data["close_price"] ) flag["position"]["exist"] = False flag["position"]["count"] = 0 flag["add-position"]["count"] = 0 lot,stop,flag = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("\n{0}円で{1}BTCの買いの注文を入れてドテンします\n".format(data["close_price"],lot)) # ここに買い注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] - stop)) flag["position"]["lot"],flag["position"]["stop"] = lot,stop flag["position"]["exist"] = True flag["position"]["side"] = "BUY" flag["position"]["price"] = data["close_price"] return flag # 損切ラインにかかったら成行注文で決済する関数 def stop_position( data,flag ): # トレイリングストップを実行 flag = trail_stop( data,flag ) if flag["position"]["side"] == "BUY": stop_price = flag["position"]["price"] - flag["position"]["stop"] if data["low_price"] < stop_price: flag["records"]["log"].append("{0}円の損切ラインに引っかかりました。\n".format( stop_price )) stop_price = round( stop_price - 2 * calculate_volatility(last_data) / ( chart_sec / 60) ) flag["records"]["log"].append(str(stop_price) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,stop_price,"STOP" ) flag["position"]["exist"] = False flag["position"]["count"] = 0 flag["add-position"]["count"] = 0 if flag["position"]["side"] == "SELL": stop_price = flag["position"]["price"] + flag["position"]["stop"] if data["high_price"] > stop_price: flag["records"]["log"].append("{0}円の損切ラインに引っかかりました。\n".format( stop_price )) stop_price = round( stop_price + 2 * calculate_volatility(last_data) / (chart_sec / 60) ) flag["records"]["log"].append(str(stop_price) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,stop_price,"STOP" ) flag["position"]["exist"] = False flag["position"]["count"] = 0 flag["add-position"]["count"] = 0 return flag #------------バックテストの部分の関数-------------- # 各トレードのパフォーマンスを記録する関数 def records(flag,data,close_price,close_type=None): # 取引手数料等の計算 entry_price = int(round(flag["position"]["price"] * flag["position"]["lot"])) exit_price = int(round(close_price * flag["position"]["lot"])) trade_cost = round( exit_price * slippage ) log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n" flag["records"]["log"].append(log) flag["records"]["slippage"].append(trade_cost) # 手仕舞った日時と保有期間を記録 flag["records"]["date"].append(data["close_time_dt"]) flag["records"]["holding-periods"].append( flag["position"]["count"] ) # 損切りにかかった回数をカウント if close_type == "STOP": flag["records"]["stop-count"].append(1) else: flag["records"]["stop-count"].append(0) # 値幅の計算 buy_profit = exit_price - entry_price - trade_cost sell_profit = entry_price - exit_price - trade_cost # 利益が出てるかの計算 if flag["position"]["side"] == "BUY": flag["records"]["side"].append( "BUY" ) flag["records"]["profit"].append( buy_profit ) flag["records"]["return"].append( round( buy_profit / entry_price * 100, 4 )) flag["records"]["funds"] = flag["records"]["funds"] + buy_profit if buy_profit > 0: log = str(buy_profit) + "円の利益です\n\n" flag["records"]["log"].append(log) else: log = str(buy_profit) + "円の損失です\n\n" flag["records"]["log"].append(log) if flag["position"]["side"] == "SELL": flag["records"]["side"].append( "SELL" ) flag["records"]["profit"].append( sell_profit ) flag["records"]["return"].append( round( sell_profit / entry_price * 100, 4 )) flag["records"]["funds"] = flag["records"]["funds"] + sell_profit if sell_profit > 0: log = str(sell_profit) + "円の利益です\n\n" flag["records"]["log"].append(log) else: log = str(sell_profit) + "円の損失です\n\n" flag["records"]["log"].append(log) return flag # バックテストの集計用の関数 def backtest(flag): # 成績を記録したpandas DataFrameを作成 records = pd.DataFrame({ "Date" : pd.to_datetime(flag["records"]["date"]), "Profit" : flag["records"]["profit"], "Side" : flag["records"]["side"], "Rate" : flag["records"]["return"], "Stop" : flag["records"]["stop-count"], "Periods" : flag["records"]["holding-periods"], "Slippage" : flag["records"]["slippage"] }) # 連敗回数をカウントする consecutive_defeats = [] defeats = 0 for p in flag["records"]["profit"]: if p < 0: defeats += 1 else: consecutive_defeats.append( defeats ) defeats = 0 # 総損益の列を追加する records["Gross"] = records.Profit.cumsum() # 資産推移の列を追加する records["Funds"] = records.Gross + start_funds # 最大ドローダウンの列を追加する records["Drawdown"] = records.Funds.cummax().subtract(records.Funds) records["DrawdownRate"] = round(records.Drawdown / records.Funds.cummax() * 100,1) # 買いエントリーと売りエントリーだけをそれぞれ抽出する buy_records = records[records.Side.isin(["BUY"])] sell_records = records[records.Side.isin(["SELL"])] # 月別のデータを集計する records["月別集計"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m'))) grouped = records.groupby("月別集計") month_records = pd.DataFrame({ "Number" : grouped.Profit.count(), "Gross" : grouped.Profit.sum(), "Funds" : grouped.Funds.last(), "Rate" : round(grouped.Rate.mean(),2), "Drawdown" : grouped.Drawdown.max(), "Periods" : grouped.Periods.mean() }) print("バックテストの結果") print("-----------------------------------") print("買いエントリの成績") print("-----------------------------------") print("トレード回数 : {}回".format( len(buy_records) )) print("勝率 : {}%".format(round(len(buy_records[buy_records.Profit>0]) / len(buy_records) * 100,1))) print("平均リターン : {}%".format(round(buy_records.Rate.mean(),2))) print("総損益 : {}円".format( buy_records.Profit.sum() )) print("平均保有期間 : {}足分".format( round(buy_records.Periods.mean(),1) )) print("損切りの回数 : {}回".format( buy_records.Stop.sum() )) print("-----------------------------------") print("売りエントリの成績") print("-----------------------------------") print("トレード回数 : {}回".format( len(sell_records) )) print("勝率 : {}%".format(round(len(sell_records[sell_records.Profit>0]) / len(sell_records) * 100,1))) print("平均リターン : {}%".format(round(sell_records.Rate.mean(),2))) print("総損益 : {}円".format( sell_records.Profit.sum() )) print("平均保有期間 : {}足分".format( round(sell_records.Periods.mean(),1) )) print("損切りの回数 : {}回".format( sell_records.Stop.sum() )) print("-----------------------------------") print("総合の成績") print("-----------------------------------") print("全トレード数 : {}回".format(len(records) )) print("勝率 : {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1))) print("平均リターン : {}%".format(round(records.Rate.mean(),2))) print("平均保有期間 : {}足分".format( round(records.Periods.mean(),1) )) print("損切りの回数 : {}回".format( records.Stop.sum() )) print("") print("最大の勝ちトレード : {}円".format(records.Profit.max())) print("最大の負けトレード : {}円".format(records.Profit.min())) print("最大連敗回数 : {}回".format( max(consecutive_defeats) )) print("最大ドローダウン : {0}円 / {1}%".format(-1 * records.Drawdown.max(), -1 * records.DrawdownRate.loc[records.Drawdown.idxmax()] )) print("利益合計 : {}円".format( records[records.Profit>0].Profit.sum() )) print("損失合計 : {}円".format( records[records.Profit<0].Profit.sum() )) print("最終損益 : {}円".format( records.Profit.sum() )) print("") print("初期資金 : {}円".format( start_funds )) print("最終資金 : {}円".format( records.Funds.iloc[-1] )) print("運用成績 : {}%".format( round(records.Funds.iloc[-1] / start_funds * 100),2 )) print("手数料合計 : {}円".format( -1 * records.Slippage.sum() )) # ログファイルの出力 file = open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8') file.writelines(flag["records"]["log"]) # 損益曲線をプロット plt.plot( records.Date, records.Funds ) plt.xlabel("Date") plt.ylabel("Balance") plt.xticks(rotation=50) # X軸の目盛りを50度回転 plt.show() #------------ここからメイン処理-------------- # 価格チャートを取得 price = get_price(chart_sec,after=1451606400) flag = { "position":{ "exist" : False, "side" : "", "price": 0, "stop":0, "ATR":0, "lot":0, "count":0 }, "add-position":{ "count":0, "first-entry-price":0, "last-entry-price":0, "unit-range":0, "unit-size":0, "stop":0 }, "records":{ "date":[], "profit":[], "return":[], "side":[], "stop-count":[], "funds" : start_funds, "holding-periods":[], "slippage":[], "log":[] } } # トレイリングの比率に0~1以上の数値を設定できないようにする if trail_ratio > 1: trail_ratio = 1 elif trail_ratio < 0: trail_ratio = 0 last_data = [] need_term = max(buy_term,sell_term,volatility_term) i = 0 while i < len(price): # ドンチャンの判定に使う期間分の安値・高値データを準備する if len(last_data) < need_term: last_data.append(price[i]) flag = log_price(price[i],flag) time.sleep(wait) i += 1 continue data = price[i] flag = log_price(data,flag) # ポジションがある場合 if flag["position"]["exist"]: flag = stop_position( data,flag ) flag = close_position( data,last_data,flag ) flag = add_position( data,flag ) # ポジションがない場合 else: flag = entry_signal( data,last_data,flag ) last_data.append( data ) i += 1 time.sleep(wait) print("--------------------------") print("テスト期間:") print("開始時点 : " + str(price[0]["close_time_dt"])) print("終了時点 : " + str(price[-1]["close_time_dt"])) print(str(len(price)) + "件のローソク足データで検証") print("--------------------------") backtest(flag)