前回の記事では、含み益を取り逃がさないように、価格の上昇にあわせて一定の比率でストップ位置を移動させていくトレイリングストップの実装方法を解説しました。
1.固定比率のトレイリングストップの問題
しかし固定の比率で損切りラインを動かすトレイリングストップは、実際の運用上は少し問題があります。
例えば、価格が1レンジ動くたびに1レンジ追従する同じ比率のトレイリングストップだと、かなり序盤で損切りにかかってしまう可能性が高いです。それだと、チャネル・ブレイクアウトが持つ本来の期待値を「邪魔」してしまうことになります。
一方で、価格が1レンジ動くたびに1/2レンジずつ追従する低い比率のトレイリングストップだと、時間が経つにつれてどんどん価格と損切りラインが乖離してしまい、損切りにかかる可能性が低くなってしまいます。これだと「大きな含み益を逃がさない」という本来の目的を達成できません。
2.加速係数付きのトレイリングストップ
そこで理想的なトレイリングストップとして、時間の経過とともにトレイリングの比率がだんだん上昇していくようなトレイリングストップを考えます。
例えば、最初は価格が動いてもストップ位置を2%しか動かさず、次に価格が動いたらストップ位置を4%動かし、次に価格が動いたらストップ位置を6%動かし….、といった具合に、徐々にトレイリングの比率を加速させていきます。
この手法のなかでも有名なのは、「投資苑」の著者としてお馴染みのアレキサンダー・エルダー氏が「利食いと損切りのテクニック」という本で紹介しているパラボリックSARを使ったトレイリングストップです。
3.パラボリックSARの定義
まずはパラボリックSARを使ったトレイリングストップを明確に定義しておきましょう。
買いエントリーの場合
1)最初の加速係数を0.02とする
2)エントリー後に新しい高値を更新したら、( 高値 - 損切りライン )× 加速係数 を計算し、その額分だけ損切りラインを上に動かす
3)損切りラインが動くたびに、加速係数に 0.02 を足す
4)加速係数の上限は 0.2 までとする
売りエントリーの場合
1)最初の加速係数を0.02とする
2)エントリー後に新しい安値を更新したら、( 損切りライン - 安値 )× 加速係数 を計算し、その額分だけ損切りラインを下に動かす
3)損切りラインが動くたびに、加速係数に 0.02 を足す
4)加速係数の上限は 0.2 までとする
例題
例えば、BTC価格が100万円のときに買いエントリーし、損切りラインを98万円の位置に置いたとします。その1時間後、BTC価格の高値が102万円まで上昇した場合のストップ位置は以下です。
・加速係数 0.02
・98万円 + (102万円 - 98万円) × 0.02 = 98万800円
もし次の1時間で高値が101万円まで下落したとします。この場合は何もしません。前回の記事でも説明したように、原則としてトレイリングストップを下に動かすことはありえません。
さらに次の1時間で高値が103万円まで上昇したとします。この場合のストップ位置は以下です。
・加速係数 0.04
・98万800円 + (103万円 - 98万800円) × 0.04 = 98万2768円
さらに次の1時間で高値が105万円まで上昇したとします。この場合のストップ位置は以下です。
・加速係数 0.06
・98万2768円 + (105万円 - 98万2768円) × 0.06 = 98万6802円
4.Pythonコードを実装しよう!
では、前回の記事で作成したトレイリングストップの関数を修正して、上記のロジックを実装してみましょう!
なぜかパラボリックSARでは、加速係数の初期値を 0.02、足すのも0.02ずつ、上限は0.2まで、というのが、教科書的な常識とされています。しかし理由がわからないまま、自分で検証していない数値を信じるべきではないので、ここでは自由に変更できるようにしておきます。
▽ pythonコード
#--------設定項目-------- stop_AF = 0.02 # 加速係数 stop_AF_add = 0.02 # 加速係数を増やす度合 stop_AF_max = 0.2 # 加速係数の上限 # トレイリングストップの関数 def trail_stop( data,flag ): # まだ追加ポジション(増し玉)の取得中であれば何もしない if flag["add-position"]["count"] < entry_times: return flag # 高値/安値がエントリー価格からいくら離れたか計算 if flag["position"]["side"] == "BUY": moved_range = round( data["high_price"] - flag["position"]["price"] ) if flag["position"]["side"] == "SELL": moved_range = round( flag["position"]["price"] - data["low_price"] ) # 最高値・最安値を更新したか調べる if moved_range < 0 or flag["position"]["stop-EP"] >= moved_range: return flag else: flag["position"]["stop-EP"] = moved_range # 加速係数に応じて損切りラインを動かす flag["position"]["stop"] = round(flag["position"]["stop"] - ( moved_range + flag["position"]["stop"] ) * flag["position"]["stop-AF"]) # 加速係数を更新する flag["position"]["stop-AF"] = round( flag["position"]["stop-AF"] + stop_AF_add ,2 ) if flag["position"]["stop-AF"] >= stop_AF_max: flag["position"]["stop-AF"] = stop_AF_max # ログを出力する if flag["position"]["side"] == "BUY": flag["records"]["log"].append("トレイリングストップの発動:ストップ位置を{}円に動かして、加速係数を{}に更新します\n".format( round(flag["position"]["price"] - flag["position"]["stop"]) , flag["position"]["stop-AF"] )) else: flag["records"]["log"].append("トレイリングストップの発動:ストップ位置を{}円に動かして、加速係数を{}に更新します\n".format( round(flag["position"]["price"] + flag["position"]["stop"]) , flag["position"]["stop-AF"] )) return flag #--------記録用の変数-------- flag = { "position":{ "stop-AF": stop_AF, # 加速係数を記録 "stop-EP":0, # 最高値/最安値 と エントリー価格の値幅を記録 (略)
stop-AF は加速係数です。
加速係数は英語でAccelerationFactorなのでAFと略します。
stop-EP は直近の最高値・最安値とエントリー価格からの値幅です。
最高値や最安値のことをExtremePoint(EP)と略すことがあります。
コードの解説
パラボリックSARによるトレイリングストップでは、損切りラインを動かすタイミングは、買いエントリーなら最高値を更新したとき、売りエントリーなら最安値を更新したときだけです。
そのため、エントリーして以来の最高値・最安値とエントリー価格との値幅を flag[position][stop-EP] に記録しておきます。そして、現在のローソク足の高値(安値)とエントリー価格の値幅を計算して比較し、最高値を更新したかどうかを判断しています。
ストップ幅の計算と更新
前回と同様、損切りラインの計算や記録には、価格の絶対値ではなくエントリー価格からの相対的な値幅(ストップ幅)を利用しています。この方が計算が簡単だからです。
# 加速係数に応じて損切りラインを動かす flag["position"]["stop"] = round(flag["position"]["stop"] - ( moved_range + flag["position"]["stop"] ) * flag["position"]["stop-AF"])
上記の計算式で何を計算しているのかは、以下の図を見るとイメージしやすいと思います。
パラボリックSARによるストップ価格の計算は、「前回のストップ価格 + 加速係数 × (高値or安値 - 前回のストップ価格)」で計算することは、すでに定義のところで説明しましたが、これは以下を計算するのと同じです。
・ストップ幅 = 前回のストップ幅 - (エントリー価格からの動きの幅 + 前回のストップ幅) × 加速係数
この式だと、売りエントリーか買いエントリーかで場合分けしなくていいので楽ですが、混乱するようであれば、価格ベースで計算するプログラムに書き直してみてください。
手仕舞いの関数
今回は新たに損切りに使う変数を2つ追加しました。ポジションを閉じるたびにこれらの変数を初期化するのを忘れないようにしましょう。
# 手仕舞いや損切の関数 def close_position( data,last_data,flag ): # 決済の成行注文コード flag["position"]["stop-AF"] = stop_AF flag["position"]["stop-EP"] = 0
5.実行結果
このプログラムを前回のコードに入れて実行してみましょう!
以下のようになります。
なお、このような計算を含むコードは思った通りの計算結果になっているかどうか、必ず自分で確認した方がいいです。私の計算やプログラムが間違っている可能性もあります。自分で検証せずに鵜呑みにしていい情報はありません。
▽ 何カ所か検算してみて一致するか確認する
検算の結果が問題なさそうであれば、いつものようにパフォーマンスを検証してみましょう!
6.パフォーマンスの検証
検証には前回と同じ条件を使います。
売買ロジック
・1時間足を使用(2017/8~2018/5)
・上値ブレイクアウト判定期間 30期間
・下値ブレイクアウト判定期間 30期間
・平均ボラティリティ計算期間 30期間
・ブレイクアウトの判定基準(高値/安値)
資金管理
・初期資金30万円
・レバレッジ3倍
・初期ストップのレンジ幅 2ATR
・1トレードで許容するリスク 3%
・分割エントリー2回
まずはトレイリングストップを使用しない場合を見ておきましょう。
損切りの関数から、トレイリングストップの関数を読み込む部分をコメントアウトするだけですね。
1)トレイリングストップを使わない場合
----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 111回 勝率 : 34.2% 平均リターン : 1.26% 平均保有期間 : 45.4足分 損切りの回数 : 57回 最大の勝ちトレード : 650219円 最大の負けトレード : -109947円 最大連敗回数 : 10回 最大ドローダウン : -689533円 / -20.0% 利益合計 : 5796568円 損失合計 : -2452422円 最終損益 : 3344146円 初期資金 : 300000円 最終資金 : 3644146円 運用成績 : 1215.0%
運用成績1215%は前回と同じ結果ですね。では次に、パラボリックSARを用いた加速度付きのトレイリングストップを見てみましょう。
2)パラボリックSARのストップを使った場合
-設定値
・加速係数の初期値(stop_AF) 0.02
・加速係数の増分(stop_AF_add) 0.02
・加速係数の上限値(stop_AF_max) 0.2
----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 123回 勝率 : 38.2% 平均リターン : 1.37% 平均保有期間 : 35.0足分 損切りの回数 : 102回 最大の勝ちトレード : 1133908円 最大の負けトレード : -144024円 最大連敗回数 : 10回 最大ドローダウン : -878122円 / -18.3% 利益合計 : 8510061円 損失合計 : -3218870円 最終損益 : 5291191円 初期資金 : 300000円 最終資金 : 5591191円 運用成績 : 1864.0%
運用成績がかなり大幅に向上しましたね。特にパラボリックSARを用いたトレイリングストップでは、「最大勝ちトレード」が大きく伸びているのがわかります。損切りの回数は40回ほど増えていました。
では実際に両者の「最大勝ちトレード」のログを比較してみましょう。
▽ トレイリングストップ無しのログ
・2018/4/19 06:00
30期間上値ブレイクアウトで買いエントリー(建値 908167円)
・2018/4/25 20:00
30期間下値ブレイクアウトで決済(決済 1043000円)
▽ パラボリックSARのトレイリングストップ
・2018/4/19 06:00
30期間上値ブレイクアウトで買いエントリー(建値 908167円)
・2018/4/25 13:00
トレイリングストップにかかって損切り(決済 1133908円)
同じ場面でエントリーしても、パラボリックSARを用いたトレイリングストップの方が、一歩早く有利な価格で退出しているのがわかります。
他の係数の検証
ではもう少しだけトレイリング率の加速度を上げてみた場合はどうなるでしょうか? 2つほど追加で検証してみましょう。
加速度と上限を1.5倍にした場合
-設定値
・加速係数の初期値(stop_AF) 0.03
・加速係数の増分(stop_AF_add) 0.03
・加速係数の上限値(stop_AF_max) 0.3
----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 134回 勝率 : 40.3% 平均リターン : 1.43% 平均保有期間 : 30.3足分 損切りの回数 : 117回 最大の勝ちトレード : 1692475円 最大の負けトレード : -188440円 最大連敗回数 : 10回 最大ドローダウン : -1148317円 / -18.0% 利益合計 : 11095214円 損失合計 : -4075694円 最終損益 : 7019520円 初期資金 : 300000円 最終資金 : 7319520円 運用成績 : 2440.0%
こちらの方がパフォーマンスは絶大に見えます。運用成績は2440%でトレイリングストップを用いなかった場合の2倍以上の成績となりました。平均保有期間も、元々の2/3程度にまで縮んでいます。
加速度と上限を2倍にした場合
-設定値
・加速係数の初期値(stop_AF) 0.04
・加速係数の増分(stop_AF_add) 0.04
・加速係数の上限値(stop_AF_max) 0.4
----------------------------------- 総合の成績 ----------------------------------- 全トレード数 : 154回 勝率 : 35.1% 平均リターン : 0.44% 平均保有期間 : 24.3足分 損切りの回数 : 143回 最大の勝ちトレード : 212680円 最大の負けトレード : -62663円 最大連敗回数 : 10回 最大ドローダウン : -397158円 / -18.6% 利益合計 : 3998226円 損失合計 : -2124429円 最終損益 : 1873797円 初期資金 : 300000円 最終資金 : 2173797円 運用成績 : 725.0%
こちらはやり過ぎてしまったようですね。さきほどの加速係数に比べると、運用成績が極端に悪化しています。最大勝ちトレードが21万円しかなく、手仕舞いが早すぎた可能性が高いです。
5.今回勉強したコード
import requests from datetime import datetime import time import matplotlib.pyplot as plt import pandas as pd import numpy as np #--------設定項目-------- chart_sec = 3600 # 1時間足を使用 buy_term = 30 # 買いエントリーのブレイク期間の設定 sell_term = 30 # 売りエントリーのブレイク期間の設定 judge_price={ "BUY" : "high_price", # ブレイク判断 高値(high_price)か終値(close_price)を使用 "SELL": "low_price" # ブレイク判断 安値 (low_price)か終値(close_price)を使用 } volatility_term = 30 # 平均ボラティリティの計算に使う期間 stop_range = 2 # 何レンジ幅にストップを入れるか trade_risk = 0.03 # 1トレードあたり口座の何%まで損失を許容するか levarage = 3 # レバレッジ倍率の設定 start_funds = 300000 # シミュレーション時の初期資金 entry_times = 2 # 何回に分けて追加ポジションを取るか entry_range = 1 # 何レンジごとに追加ポジションを取るか stop_AF = 0.02 # 加速係数 stop_AF_add = 0.02 # 加速係数を増やす度合 stop_AF_max = 0.2 # 加速係数の上限 wait = 0 # ループの待機時間 slippage = 0.001 # 手数料・スリッページ #-------------補助ツールの関数-------------- # CryptowatchのAPIを使用する関数 def get_price(min, before=0, after=0): price = [] params = {"periods" : min } if before != 0: params["before"] = before if after != 0: params["after"] = after response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params) data = response.json() if data["result"][str(min)] is not None: for i in data["result"][str(min)]: if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0: price.append({ "close_time" : i[0], "close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'), "open_price" : i[1], "high_price" : i[2], "low_price" : i[3], "close_price": i[4] }) return price else: flag["records"]["log"].append("データが存在しません") return None # 時間と高値・安値をログに記録する関数 def log_price( data,flag ): log = "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) + " 終値: " + str(data["close_price"]) + "\n" flag["records"]["log"].append(log) return flag # 平均ボラティリティを計算する関数 def calculate_volatility( last_data ): high_sum = sum(i["high_price"] for i in last_data[-1 * volatility_term :]) low_sum = sum(i["low_price"] for i in last_data[-1 * volatility_term :]) volatility = round((high_sum - low_sum) / volatility_term) flag["records"]["log"].append("現在の{0}期間の平均ボラティリティは{1}円です\n".format( volatility_term, volatility )) return volatility #-------------資金管理の関数-------------- # 注文ロットを計算する関数 def calculate_lot( last_data,data,flag ): # 口座残高を取得する balance = flag["records"]["funds"] # 最初のエントリーの場合 if flag["add-position"]["count"] == 0: # 1回の注文単位(ロット数)と、追加ポジの基準レンジを計算する volatility = calculate_volatility( last_data ) stop = stop_range * volatility calc_lot = np.floor( balance * trade_risk / stop * 100 ) / 100 flag["add-position"]["unit-size"] = np.floor( calc_lot / entry_times * 100 ) / 100 flag["add-position"]["unit-range"] = round( volatility * entry_range ) flag["add-position"]["stop"] = stop flag["position"]["ATR"] = round( volatility ) flag["records"]["log"].append("\n現在のアカウント残高は{}円です\n".format( balance )) flag["records"]["log"].append("許容リスクから購入できる枚数は最大{}BTCまでです\n".format( calc_lot )) flag["records"]["log"].append("{0}回に分けて{1}BTCずつ注文します\n".format( entry_times, flag["add-position"]["unit-size"] )) # 2回目以降のエントリーの場合 else: balance = round( balance - flag["position"]["price"] * flag["position"]["lot"] / levarage ) # ストップ幅には、最初のエントリー時に計算したボラティリティを使う stop = flag["add-position"]["stop"] # 実際に購入可能な枚数を計算する able_lot = np.floor( balance * levarage / data["close_price"] * 100 ) / 100 lot = min(able_lot, flag["add-position"]["unit-size"]) flag["records"]["log"].append("証拠金から購入できる枚数は最大{}BTCまでです\n".format( able_lot )) return lot,stop,flag # 複数回に分けて追加ポジションを取る関数 def add_position( data,flag ): # ポジションがない場合は何もしない if flag["position"]["exist"] == False: return flag # 最初(1回目)のエントリー価格を記録 if flag["add-position"]["count"] == 0: flag["add-position"]["first-entry-price"] = flag["position"]["price"] flag["add-position"]["last-entry-price"] = flag["position"]["price"] flag["add-position"]["count"] += 1 while True: # 以下の場合は、追加ポジションを取らない if flag["add-position"]["count"] >= entry_times: return flag # この関数の中で使う変数を用意 first_entry_price = flag["add-position"]["first-entry-price"] last_entry_price = flag["add-position"]["last-entry-price"] unit_range = flag["add-position"]["unit-range"] current_price = data["close_price"] # 価格がエントリー方向に基準レンジ分だけ進んだか判定する should_add_position = False if flag["position"]["side"] == "BUY" and (current_price - last_entry_price) > unit_range: should_add_position = True elif flag["position"]["side"] == "SELL" and (last_entry_price - current_price) > unit_range: should_add_position = True else: break # 基準レンジ分進んでいれば追加注文を出す if should_add_position == True: flag["records"]["log"].append("\n前回のエントリー価格{0}円からブレイクアウトの方向に{1}ATR({2}円)以上動きました\n".format( last_entry_price, entry_range, round( unit_range ) )) flag["records"]["log"].append("{0}/{1}回目の追加注文を出します\n".format(flag["add-position"]["count"] + 1, entry_times)) # 注文サイズを計算 lot,stop,flag = calculate_lot( last_data,data,flag ) if lot < 0.01: flag["records"]["log"].append("注文可能枚数{}が、最低注文単位に満たなかったので注文を見送ります\n".format(lot)) flag["add-position"]["count"] += 1 return flag # 追加注文を出す if flag["position"]["side"] == "BUY": entry_price = first_entry_price + (flag["add-position"]["count"] * unit_range) #entry_price = round((1 + slippage) * entry_price) flag["records"]["log"].append("現在のポジションに追加して、{0}円で{1}BTCの買い注文を出します\n".format(entry_price,lot)) # ここに買い注文のコードを入れる if flag["position"]["side"] == "SELL": entry_price = first_entry_price - (flag["add-position"]["count"] * unit_range) #entry_price = round((1 - slippage) * entry_price) flag["records"]["log"].append("現在のポジションに追加して、{0}円で{1}BTCの売り注文を出します\n".format(entry_price,lot)) # ここに売り注文のコードを入れる # ポジション全体の情報を更新する flag["position"]["stop"] = stop flag["position"]["price"] = int(round(( flag["position"]["price"] * flag["position"]["lot"] + entry_price * lot ) / ( flag["position"]["lot"] + lot ))) flag["position"]["lot"] = np.round( (flag["position"]["lot"] + lot) * 100 ) / 100 if flag["position"]["side"] == "BUY": flag["records"]["log"].append("{0}円の位置にストップを更新します\n".format(flag["position"]["price"] - stop)) elif flag["position"]["side"] == "SELL": flag["records"]["log"].append("{0}円の位置にストップを更新します\n".format(flag["position"]["price"] + stop)) flag["records"]["log"].append("現在のポジションの取得単価は{}円です\n".format(flag["position"]["price"])) flag["records"]["log"].append("現在のポジションサイズは{}BTCです\n\n".format(flag["position"]["lot"])) flag["add-position"]["count"] += 1 flag["add-position"]["last-entry-price"] = entry_price return flag # トレイリングストップの関数 def trail_stop( data,flag ): # まだ追加ポジションの取得中であれば何もしない if flag["add-position"]["count"] < entry_times: return flag # 高値/安値がエントリー価格からいくら離れたか計算 if flag["position"]["side"] == "BUY": moved_range = round( data["high_price"] - flag["position"]["price"] ) if flag["position"]["side"] == "SELL": moved_range = round( flag["position"]["price"] - data["low_price"] ) # 最高値・最安値を更新したか調べる if moved_range < 0 or flag["position"]["stop-EP"] >= moved_range: return flag else: flag["position"]["stop-EP"] = moved_range # 加速係数に応じて損切りラインを動かす flag["position"]["stop"] = round(flag["position"]["stop"] - ( moved_range + flag["position"]["stop"] ) * flag["position"]["stop-AF"]) # 加速係数を更新 flag["position"]["stop-AF"] = round( flag["position"]["stop-AF"] + stop_AF_add ,2 ) if flag["position"]["stop-AF"] >= stop_AF_max: flag["position"]["stop-AF"] = stop_AF_max # ログ出力 if flag["position"]["side"] == "BUY": flag["records"]["log"].append("トレイリングストップの発動:ストップ位置を{}円に動かして、加速係数を{}に更新します\n".format( round(flag["position"]["price"] - flag["position"]["stop"]) , flag["position"]["stop-AF"] )) else: flag["records"]["log"].append("トレイリングストップの発動:ストップ位置を{}円に動かして、加速係数を{}に更新します\n".format( round(flag["position"]["price"] + flag["position"]["stop"]) , flag["position"]["stop-AF"] )) return flag #-------------売買ロジックの部分の関数-------------- # ドンチャンブレイクを判定する関数 def donchian( data,last_data ): highest = max(i["high_price"] for i in last_data[ (-1* buy_term): ]) if data[ judge_price["BUY"] ] > highest: return {"side":"BUY","price":highest} lowest = min(i["low_price"] for i in last_data[ (-1* sell_term): ]) if data[ judge_price["SELL"] ] < lowest: return {"side":"SELL","price":lowest} return {"side" : None , "price":0} # エントリー注文を出す関数 def entry_signal( data,last_data,flag ): signal = donchian( data,last_data ) if signal["side"] == "BUY": flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の価格が{2}円でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]])) lot,stop,flag = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("{0}円で{1}BTCの買い注文を出します\n".format(data["close_price"],lot)) # ここに買い注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] - stop)) flag["position"]["lot"],flag["position"]["stop"] = lot,stop flag["position"]["exist"] = True flag["position"]["side"] = "BUY" flag["position"]["price"] = data["close_price"] else: flag["records"]["log"].append("注文可能枚数{}が、最低注文単位に満たなかったので注文を見送ります\n".format(lot)) if signal["side"] == "SELL": flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の価格が{2}円でブレイクしました\n".format(sell_term,signal["price"],data[judge_price["SELL"]])) lot,stop,flag = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("{0}円で{1}BTCの売り注文を出します\n".format(data["close_price"],lot)) # ここに売り注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] + stop)) flag["position"]["lot"],flag["position"]["stop"] = lot,stop flag["position"]["exist"] = True flag["position"]["side"] = "SELL" flag["position"]["price"] = data["close_price"] else: flag["records"]["log"].append("注文可能枚数{}が、最低注文単位に満たなかったので注文を見送ります\n".format(lot)) return flag # 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数 def close_position( data,last_data,flag ): if flag["position"]["exist"] == False: return flag flag["position"]["count"] += 1 signal = donchian( data,last_data ) if flag["position"]["side"] == "BUY": if signal["side"] == "SELL": flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の価格が{2}円でブレイクしました\n".format(sell_term,signal["price"],data[judge_price["SELL"]])) flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,data["close_price"] ) flag["position"]["exist"] = False flag["position"]["count"] = 0 flag["position"]["stop-AF"] = stop_AF flag["position"]["stop-EP"] = 0 flag["add-position"]["count"] = 0 lot,stop,flag = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("\n{0}円で{1}BTCの売りの注文を入れてドテンします\n".format(data["close_price"],lot)) # ここに売り注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] + stop)) flag["position"]["lot"],flag["position"]["stop"] = lot,stop flag["position"]["exist"] = True flag["position"]["side"] = "SELL" flag["position"]["price"] = data["close_price"] if flag["position"]["side"] == "SELL": if signal["side"] == "BUY": flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の価格が{2}円でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]])) flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,data["close_price"] ) flag["position"]["exist"] = False flag["position"]["count"] = 0 flag["position"]["stop-AF"] = stop_AF flag["position"]["stop-EP"] = 0 flag["add-position"]["count"] = 0 lot,stop,flag = calculate_lot( last_data,data,flag ) if lot > 0.01: flag["records"]["log"].append("\n{0}円で{1}BTCの買いの注文を入れてドテンします\n".format(data["close_price"],lot)) # ここに買い注文のコードを入れる flag["records"]["log"].append("{0}円にストップを入れます\n".format(data["close_price"] - stop)) flag["position"]["lot"],flag["position"]["stop"] = lot,stop flag["position"]["exist"] = True flag["position"]["side"] = "BUY" flag["position"]["price"] = data["close_price"] return flag # 損切ラインにかかったら成行注文で決済する関数 def stop_position( data,flag ): # トレイリングストップを実行 flag = trail_stop( data,flag ) if flag["position"]["side"] == "BUY": stop_price = flag["position"]["price"] - flag["position"]["stop"] if data["low_price"] < stop_price: flag["records"]["log"].append("{0}円の損切ラインに引っかかりました。\n".format( stop_price )) stop_price = round( stop_price - 2 * calculate_volatility(last_data) / ( chart_sec / 60) ) flag["records"]["log"].append(str(stop_price) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,stop_price,"STOP" ) flag["position"]["exist"] = False flag["position"]["count"] = 0 flag["position"]["stop-AF"] = stop_AF flag["position"]["stop-EP"] = 0 flag["add-position"]["count"] = 0 if flag["position"]["side"] == "SELL": stop_price = flag["position"]["price"] + flag["position"]["stop"] if data["high_price"] > stop_price: flag["records"]["log"].append("{0}円の損切ラインに引っかかりました。\n".format( stop_price )) stop_price = round( stop_price + 2 * calculate_volatility(last_data) / (chart_sec / 60) ) flag["records"]["log"].append(str(stop_price) + "円あたりで成行注文を出してポジションを決済します\n") # 決済の成行注文コードを入れる records( flag,data,stop_price,"STOP" ) flag["position"]["exist"] = False flag["position"]["count"] = 0 flag["position"]["stop-AF"] = stop_AF flag["position"]["stop-EP"] = 0 flag["add-position"]["count"] = 0 return flag #------------バックテストの部分の関数-------------- # 各トレードのパフォーマンスを記録する関数 def records(flag,data,close_price,close_type=None): # 取引手数料等の計算 entry_price = int(round(flag["position"]["price"] * flag["position"]["lot"])) exit_price = int(round(close_price * flag["position"]["lot"])) trade_cost = round( exit_price * slippage ) log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n" flag["records"]["log"].append(log) flag["records"]["slippage"].append(trade_cost) # 手仕舞った日時と保有期間を記録 flag["records"]["date"].append(data["close_time_dt"]) flag["records"]["holding-periods"].append( flag["position"]["count"] ) # 損切りにかかった回数をカウント if close_type == "STOP": flag["records"]["stop-count"].append(1) else: flag["records"]["stop-count"].append(0) # 値幅の計算 buy_profit = exit_price - entry_price - trade_cost sell_profit = entry_price - exit_price - trade_cost # 利益が出てるかの計算 if flag["position"]["side"] == "BUY": flag["records"]["side"].append( "BUY" ) flag["records"]["profit"].append( buy_profit ) flag["records"]["return"].append( round( buy_profit / entry_price * 100, 4 )) flag["records"]["funds"] = flag["records"]["funds"] + buy_profit if buy_profit > 0: log = str(buy_profit) + "円の利益です\n\n" flag["records"]["log"].append(log) else: log = str(buy_profit) + "円の損失です\n\n" flag["records"]["log"].append(log) if flag["position"]["side"] == "SELL": flag["records"]["side"].append( "SELL" ) flag["records"]["profit"].append( sell_profit ) flag["records"]["return"].append( round( sell_profit / entry_price * 100, 4 )) flag["records"]["funds"] = flag["records"]["funds"] + sell_profit if sell_profit > 0: log = str(sell_profit) + "円の利益です\n\n" flag["records"]["log"].append(log) else: log = str(sell_profit) + "円の損失です\n\n" flag["records"]["log"].append(log) return flag # バックテストの集計用の関数 def backtest(flag): # 成績を記録したpandas DataFrameを作成 records = pd.DataFrame({ "Date" : pd.to_datetime(flag["records"]["date"]), "Profit" : flag["records"]["profit"], "Side" : flag["records"]["side"], "Rate" : flag["records"]["return"], "Stop" : flag["records"]["stop-count"], "Periods" : flag["records"]["holding-periods"], "Slippage" : flag["records"]["slippage"] }) # 連敗回数をカウントする consecutive_defeats = [] defeats = 0 for p in flag["records"]["profit"]: if p < 0: defeats += 1 else: consecutive_defeats.append( defeats ) defeats = 0 # 総損益の列を追加する records["Gross"] = records.Profit.cumsum() # 資産推移の列を追加する records["Funds"] = records.Gross + start_funds # 最大ドローダウンの列を追加する records["Drawdown"] = records.Funds.cummax().subtract(records.Funds) records["DrawdownRate"] = round(records.Drawdown / records.Funds.cummax() * 100,1) # 買いエントリーと売りエントリーだけをそれぞれ抽出する buy_records = records[records.Side.isin(["BUY"])] sell_records = records[records.Side.isin(["SELL"])] # 月別のデータを集計する records["月別集計"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m'))) grouped = records.groupby("月別集計") month_records = pd.DataFrame({ "Number" : grouped.Profit.count(), "Gross" : grouped.Profit.sum(), "Funds" : grouped.Funds.last(), "Rate" : round(grouped.Rate.mean(),2), "Drawdown" : grouped.Drawdown.max(), "Periods" : grouped.Periods.mean() }) print("バックテストの結果") print("-----------------------------------") print("買いエントリの成績") print("-----------------------------------") print("トレード回数 : {}回".format( len(buy_records) )) print("勝率 : {}%".format(round(len(buy_records[buy_records.Profit>0]) / len(buy_records) * 100,1))) print("平均リターン : {}%".format(round(buy_records.Rate.mean(),2))) print("総損益 : {}円".format( buy_records.Profit.sum() )) print("平均保有期間 : {}足分".format( round(buy_records.Periods.mean(),1) )) print("損切りの回数 : {}回".format( buy_records.Stop.sum() )) print("-----------------------------------") print("売りエントリの成績") print("-----------------------------------") print("トレード回数 : {}回".format( len(sell_records) )) print("勝率 : {}%".format(round(len(sell_records[sell_records.Profit>0]) / len(sell_records) * 100,1))) print("平均リターン : {}%".format(round(sell_records.Rate.mean(),2))) print("総損益 : {}円".format( sell_records.Profit.sum() )) print("平均保有期間 : {}足分".format( round(sell_records.Periods.mean(),1) )) print("損切りの回数 : {}回".format( sell_records.Stop.sum() )) print("-----------------------------------") print("総合の成績") print("-----------------------------------") print("全トレード数 : {}回".format(len(records) )) print("勝率 : {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1))) print("平均リターン : {}%".format(round(records.Rate.mean(),2))) print("平均保有期間 : {}足分".format( round(records.Periods.mean(),1) )) print("損切りの回数 : {}回".format( records.Stop.sum() )) print("") print("最大の勝ちトレード : {}円".format(records.Profit.max())) print("最大の負けトレード : {}円".format(records.Profit.min())) print("最大連敗回数 : {}回".format( max(consecutive_defeats) )) print("最大ドローダウン : {0}円 / {1}%".format(-1 * records.Drawdown.max(), -1 * records.DrawdownRate.loc[records.Drawdown.idxmax()] )) print("利益合計 : {}円".format( records[records.Profit>0].Profit.sum() )) print("損失合計 : {}円".format( records[records.Profit<0].Profit.sum() )) print("最終損益 : {}円".format( records.Profit.sum() )) print("") print("初期資金 : {}円".format( start_funds )) print("最終資金 : {}円".format( records.Funds.iloc[-1] )) print("運用成績 : {}%".format( round(records.Funds.iloc[-1] / start_funds * 100),2 )) print("手数料合計 : {}円".format( -1 * records.Slippage.sum() )) # ログファイルの出力 file = open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8') file.writelines(flag["records"]["log"]) # 損益曲線をプロット plt.plot( records.Date, records.Funds ) plt.xlabel("Date") plt.ylabel("Balance") plt.xticks(rotation=50) # X軸の目盛りを50度回転 plt.show() #------------ここからメイン処理-------------- # 価格チャートを取得 price = get_price(chart_sec,after=1451606400) flag = { "position":{ "exist" : False, "side" : "", "price": 0, "stop":0, "stop-AF": stop_AF, "stop-EP":0, "ATR":0, "lot":0, "count":0 }, "add-position":{ "count":0, "first-entry-price":0, "last-entry-price":0, "unit-range":0, "unit-size":0, "stop":0 }, "records":{ "date":[], "profit":[], "return":[], "side":[], "stop-count":[], "funds" : start_funds, "holding-periods":[], "slippage":[], "log":[] } } last_data = [] need_term = max(buy_term,sell_term,volatility_term) i = 0 while i < len(price): # ドンチャンの判定に使う期間分の安値・高値データを準備する if len(last_data) < need_term: last_data.append(price[i]) flag = log_price(price[i],flag) time.sleep(wait) i += 1 continue data = price[i] flag = log_price(data,flag) # ポジションがある場合 if flag["position"]["exist"]: flag = stop_position( data,flag ) flag = close_position( data,last_data,flag ) flag = add_position( data,flag ) # ポジションがない場合 else: flag = entry_signal( data,last_data,flag ) last_data.append( data ) i += 1 time.sleep(wait) print("--------------------------") print("テスト期間:") print("開始時点 : " + str(price[0]["close_time_dt"])) print("終了時点 : " + str(price[-1]["close_time_dt"])) print(str(len(price)) + "件のローソク足データで検証") print("--------------------------") backtest(flag)
はじめまして。
どのサイトよりもわかりやすくて参考にさせてもらってます。
1点質問なのですが、
bifFlyerからBitmexに変更する場合は、どのようにしたらよいのでしょうか?APIのアドレスを変えるだけで良いのかと思いつつ、違うような気がしたもので。
お手数をおかけしますが、よろしくお願いします。
コメントありがとうございます!
異なる取引所を使う場合は、以下の箇所の変更が必要です。
1)売買注文を出す箇所
2)証拠金を取得する箇所
3)CrytptowatchからAPIで価格を取得する箇所
売買注文を出す、証拠金を取得する、など外部サーバーとの通信が必要になる箇所は、まずCCXTの共通関数が対応しているかどうかを確認し、対応していなければ、取引所のAPIを直接使います。詳しくは第2回API入門編と、第3回売買注文編で解説しているので、良かったらぜひ読んでみてください!
外部サーバーと通信する箇所以外は、どの取引所でも同じように使えます。Bitmexはもし余裕があれば、将来的には別記事を作るかもしれません^^
プログラミング超初心者ですが、丁寧な説明のおかげで楽しみながら学習しています。
試しに上記コードを少額で運用しようと思うのですが、その為にはバックテストの関数は全て外せば良いのでしょうか?
また、冒頭の設定項目の初期資金は、プライベートAPI に取って来させる様なイメージでしょうか?
すっとんきょうな質問かも知れませんが宜しくお願いします。
コメントありがとうございます!
初心者の方にそう言って頂けると嬉しいです。
実践用のコードを作る際には、バックテスト系の関数はすべて消して大丈夫です。具体的には、以下は不要です。
・def records()
・def backtest()
・flag変数の[“records”]
あとは、ログを記録する箇所が残ってしまうので、flag[“records”][“log”].append() を、すべてprint() に置き換える必要があります。コードに[records]が残らないようにしてください。サクラエディタならCtrl+Rで一括置換できます。
その他、メイン処理のWhile文も実践BOTでは無限ループ(While Ture:)にする必要があります。書き方は、第4回のBOT作成編を参考にしてみてください!
資金の取得はBitflyerだと[getcollateral]というAPIを使います。
以下の記事で解説しているので、良かったら参考にしてください^^
・証拠金からポジションサイズを計算する
返信、ご丁寧に有難うございます。
キーボード叩きたくてウズウズしてきました!(笑)
これからも楽しく拝見させて頂きますので、無理のない範囲で記事の更新宜しくお願いします。
返信ありがとうございます。
1回では理解できないので、進みつつ戻りを繰り返して進めて行きます!
Bitmexも将来的に余裕があれば作るかもしれないんですね。
楽しみにしてます!!
初めてコメントさせて頂きます。
関数def add_position( data,flag )の ポジション全体の情報を更新する部分について,
flag[“position”][“lot”] = ・・・
flag[“position”][“stop”] = ・・・
flag[“position”][“price”] = ・・・
上記のような順序となっていますが,下記の順序が正ではないでしょうか。
flag[“position”][“price”] = ・・・
flag[“position”][“lot”] = ・・・
flag[“position”][“stop”] = ・・・
前回注文のlotと追加注文のlotの合計を入れたflag[“position”][“lot”]を使用して,平均約定価格を計算しているのでおかしいかなと。。。
間違っていたらすみません。
コメントありがとうございます!
なぜか気付かず、めちゃくちゃ助かりましたm(__)m
2回の分割エントリーの成績が実際より良く出てしまってましたね…!
関連するすべての記事でバックテスト検証結果を修正しました。
ついでに4回分割エントリーの成績が本当はもっと良かったことにも気づきました。(平均単価の計算にflag[“position”][“price”]を使うべきところ、last_entry_priceを使ってました…)
ありがとうございました!^^
1回のトレードの許容資金で分割エントリーしてみよう!
初めてコメントさせていただきます。
プログラミング初心者です。
サイトを参考に勉強させていただいております。
上記コードのメイン処理を第4回の記事を参考にWhile Ture:に書き直してみたのですが、IndexError: list index out of rangeが出てうまくいきません。
勉強不足で申し訳ないのですが、ご教示ください。
お手数をおかけいたしますがよろしくお願いいたします。
コメントありがとうございます!
参考にしていただいて嬉しいです。
申し訳ないですが、コメント欄の内容だけだと何が原因でどこで止まっているのか全然わからないです。何行でエラーが出ているのかは表示されていると思うので、そこを見て、ご自身でエラーの原因を推測して検索してみることの繰り返しが必要です。 初回の方でも書きましたが、エラーが出ないコードを一発で書ける方はいないので、試行錯誤してみてください!m(__)m
わからなくなったら、エラーが出てる箇所に具体的な数字を入れてみる、コメントアウトして動かしてみる、前後にログ(print)を入れてどこで詰まっているか確認する、別のpythonファイルを作ってその箇所だけ部分的にテストする、などの方法がおすすめです。
よろしくお願いします!^-^
早速の返信ありがとうございます。
ご教示ありがとございます。
一つずつ確認して行きたいと思います。
ご丁寧にありがとうございました。
第1回から勉強させて頂いている素人です。バックテストのコードをベースに実践用に変更させたいのですが、下記の内容の事はやったつもりなのですが動きません。設定項目の上に下記を含めたりしまして、何か素人が起こしやすいミスや、変更しないといけない部分、含めないといけない部分がありましたらご教授頂けませんでしょうか?
import ccxt
bitflyer = ccxt.bitflyer()
bitflyer.apiKey =
bitflyer.secret =
実践用のコードを作る際には、バックテスト系の関数はすべて消して大丈夫です。具体的には、以下は不要です。
・def records()
・def backtest()
・flag変数の[“records”]
あとは、ログを記録する箇所が残ってしまうので、flag[“records”][“log”].append() を、すべてprint() に置き換える必要があります。コードに[records]が残らないようにしてください。
コメントありがとうございます。
遅くなってすみません。
変更箇所をすべて書き出すのは難しいです。
どちらかというと、第4回のBOT作成編のコードをベースに、一番シンプルなロジックで実際に取引所で動作することを確認してから、第5回・第6回の記事の内容のうち欲しいものを1つずつ組みこんで、随時、動作確認していく流れになりますね。パーツ単位でみれば、積み増しの関数・トレイリングストップの関数・ATR計算の関数、などはバックテストも実践用もコードは同じですが、検証項目が多すぎるとエラーの理由がわからなくなるので、1つずつ動くことを理解しながら付け足していくのが一番早いと思います。
あとは、注意点としてはバックテストでは例外処理などを一切考慮していないのでその辺りを第4回の内容を参考に付け足したりすることですかね。
おかげで僕も自動売買ボットを動かし始めることができました。ありがとうございます!昼も夜も、働いてくれています(^^
ひとつ気になったのですが、「 連敗回数をカウントする」の部分、最新のprofitがマイナスの場合、連敗回数が更新されないような気がしますが、如何でしょう?
# 連敗回数をカウントする
consecutive_defeats = []
defeats = 0
for p in flag[“records”][“profit”]:
if p < 0:
defeats += 1
else:
consecutive_defeats.append( defeats )
defeats = 0