BOTの一番成績のいい最適なパラメーターをpythonで総当たりで探索しよう!

さて、前回までの記事でバックテストをして自動売買BOTのパフォーマンスを評価する方法は、大体マスターできたと思います!

今回は、バックテスト編の総仕上げとして最適なパラメーターを探索する方法を紹介しておきましょう。このパラメーター探索ができることこそ、ある意味、自分でpythonのプログラミングができることの優位性といっても過言ではないかもしれません。

手動だと何百通り、何千通りも試す必要のある膨大な組み合わせの中から、もっとも勝率や期待リターンが高く、最もリスクやドローダウンの小さいパラメーターは何なのか? それを調べる方法を解説します。

パラメーター最適化とは

パラメーターとは、ある自動売買BOTのロジックの中で自由に動かして変更できる値のことをいいます。例えば、前回までの記事で学習した一番シンプルなn期間ドンチャン・ブレイクアウトBOTには、以下の2つのパラメーターがありました。

1)X分足の時間軸を使う
2)n期間の最高値(最安値)ブレイクアウトで買う

同じドンチャンブレイクアウトでも、15分足で10期間のドンチャン・ブレイクアウト戦略を使うのと、1時間足で30期間のドンチャン・ブレイクアウト戦略を使うのとでは、全く成績が異なります。

上記の2つの組み合わせだけなら、前回までの記事の方法で1つ1つを手作業で検証することも可能です。しかし組み合わせが何百通り、何千通りと増えてくると、すべてのパターンを手作業で検証して比較するのは難しくなります。

そこでpythonの出番です。pythonのようなプログラムなら、自動的に何百通りものパラメーターの組み合わせでバックテストを行い、最も利益を最大化できるようなパラメーターを探索することができます。今回はその方法を解説していきます!

今回の練習で試すパラメーター

まずは準備として前回までのドンチャン・ブレイクアウトのコードを改良し、以下の4つのパラメーターを設定できるようにしてみましょう。

変更可能なパラメーター

1)X分足の時間軸を使う
2)n期間の最高値のブレイクアウトで買う
3)m期間の最安値のブレイクアウトで売る
4)ブレイクアウトの判定基準に、高値/安値 or 終値 を使う

より実践的なドンチャン・ブレイクアウト戦略では、上値のブレイクアウトと下値のブレイクアウトで異なるパラメーターを使用することがよくあります。そのため、今までのコードを改良し、買いエントリと売りエントリで、それぞれ違う期間のパラメーターを設定できるようにしておきます。

また今までのコードでは、ブレイクアウトの判定基準を「過去n期間の最高値(最安値)を直近の高値が上回ったとき」としていましたが、高値ではなく終値のパターンも検証しておきましょう。

上記の修正済みのコードは、以下の別記事にまとめておきます。
時間のある方は自分でも挑戦してみてください!

パラメーターを設定できる改良版コード

パラメーターの組み合わせを総当たりでテストする方法

パラメーター探索と聞くと、最近流行りの機械学習やニューラルネットワークなど、難しい数学やプログラミングスキルが必要だと誤解している方もいるかもしれません。

しかし基本的には、数千通りくらいのパターンであれば、ただの「総当たり」で全パターンを計算すれば十分です。全パターンを試しても数十秒~1分もあれば終わりますので、複雑な機械学習や最適化アルゴリズムは必要ありません。

例えば、以下のようなパターンの組み合わせをすべて試したいとしましょう。


# 4種類のパラメーター
paramA = [ 10,20,30,40,50 ]
paramB = [ 10,20,30,40,50 ]
paramC = [ "A" , "B" ]
paramD = [ 1,2,3,4,5,6,7,8,9,10 ]

この場合、以下のようにfor文を書くことで全パターンの組み合わせを試すことができます。


# for文で総当たりのパターンを記述
for a in paramA:
	for b in paramB:
		for c in paramC:
			for d in paramD:
				
				# 各組合せで実行したい処理


パラメーターAのfor文処理をするなかで、1つの要素についてさらにパラメーターBのfor文処理をし、パラメーターBのfor文処理のなかの1つの要素に対してさらにパラメーターCのfor文処理をし….という入れ子構造にしていくわけですね。

これを実行することで、5 × 5 × 2 × 10 = 500通りの組み合わせについて、全てのパターンでバックテストを実行することができます。

シンプルな記述方法

なお、pythonでは上記のコードをもっとシンプルにして以下のように書くことができます。


combinations = [(a, b, c, d)
	for a in paramA
	for b in paramB
	for c in paramC
	for d in paramD]

for a,b,c,d in combinations:
	# 各組合せで実行したい処理

試すパラメーターの組み合わせ

なお、今回のドンチャンブレイクアウトのパラメーター最適化では、以下の組み合わせを総当たりで全て試してみることにします。

1)使用する時間軸
⇒ 30分足/1時間足/2時間足/6時間足/12時間足/1日足

2)上値ブレイクアウトの判定期間
⇒ 10/15/20/25/30/35/40/45

3)下値ブレイクアウトの判定期間
⇒  10/15/20/25/30/35/40/45

4)判定に使用する価格
⇒  高値・安値/終値・終値

時間足が6通り、判定期間が上値ブレイクアウト・下値ブレイクアウトでそれぞれ8通りずつ、判定に使用する価格が2通りで、合計 768通りのパターンをテストしてみましょう。

これをfor文の総当たりでテストするためには、以下のように書けばOKです。


# バックテストのパラメーター設定

chart_sec_list  = [ 1800, 3600, 7200, 21600, 43200, 86400 ] # 時間足
buy_term_list   = [ 10,15,20,25,30,35,40,45 ] # 上値ブレイクの判断期間
sell_term_list  = [ 10,15,20,25,30,35,40,45 ] # 下値ブレイクの判断期間
judge_price_list = [
	{"BUY":"close_price","SELL":"close_price"}, # 終値/終値 と 高値/安値
	{"BUY":"high_price","SELL":"low_price"}
]

# for文の記述方法

combinations = [(chart_sec, buy_term, sell_term, judge_price)
	for chart_sec in chart_sec_list
	for buy_term  in buy_term_list
	for sell_term in sell_term_list
	for judge_price in judge_price_list]

for chart_sec, buy_term, sell_term,judge_price in combinations:
	# 各回のバックテスト処理を記述
	# (今までの記事で作成したもの)

これで総当たりのfor文の準備は完了です!

最大化したい指標を決める

何のためにパラメーター最適化を実施するのかといえば、リターンを極限まで高くしたいからですよね。なので「総利益が最大になるようなパラメーターを探す」というのも1つの方法です。

しかし自動売買BOTの評価を最終損益だけで判断するのは不十分です。より厳密にいえば、私たちが探している理想の売買ロジックは、「できるだけリスクを抑えながら、できるだけ大きなリターンを得ること」にあるはずです。そのため、リスクとリターンの比率を1つの数字で表せるような指標を「最大化したい数字」に設定すべきです。

このような指標には、シャープレシオ、MARレシオなど、いくつかの指標がありますが、ここでは一番オーソドックスな「プロフィットファクター」を使うことにします。

プロフィットファクター(PF)とは

同じ最終利益100万円でも、「利益110万円 / 損失10万円」の売買システムと、「利益200万円 / 損失100万円」の売買システムとでは、かなり意味が違ってきます。

前者は、損失が利益のおよそ1/10で済んでいるのに対し、後者は、なんと利益の半分もの損失を出してしまっています。当然、前者の方が安定した売買システムで、後者のほうがより不安定な(リスクの高い)売買システムということになります。

このような、「 総利益 / 総損失 」で表される指標のことをプロフィットファクター(PF)といいます。PFが大きければ大きいほど、そのシステムは安定していてリスクが小さいと評価できます。

プロフィットファクターの計算コード

今まで作成してきたドンチャン・ブレイクアウトのバックテスト検証コードに、プロフィットファクターを計算するコードを追加しておきましょう。

前回のpandasの記事を読んで練習した方なら書けると思います!

▽ プロフィットファクターの計算式 (総利益 ÷ 総損失)

PF = round( -1 * (records[records.Profit>0].Profit.sum() / records[records.Profit<0].Profit.sum()) ,2)

さて、それでは実際のコードを作っていきましょう!

2.pythonコード

いつものように、まず最初に全コードの内容と実行結果を示します。
その後コードの書き方を解説していきます。


import requests
from datetime import datetime
import time
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import csv

#-----設定項目

wait = 0            # ループの待機時間
lot = 1             # BTCの注文枚数
slippage = 0.001    # 手数料・スリッページ


# バックテストのパラメーター設定
#---------------------------------------------------------------------------------------------
chart_sec_list  = [ 1800, 3600, 7200, 21600, 43200, 86400 ] # テストに使う時間軸
buy_term_list   = [ 10,15,20,25,30,35,40,45 ] # テストに使う上値ブレイクアウトの期間
sell_term_list  = [ 10,15,20,25,30,35,40,45 ] # テストに使う下値ブレイクアウトの期間
judge_price_list = [
	{"BUY":"close_price","SELL":"close_price"}, # ブレイクアウト判定に終値を使用
	{"BUY":"high_price","SELL":"low_price"}     # ブレイクアウト判定に高値・安値を使用
]
#---------------------------------------------------------------------------------------------



# CryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
	
	highest = max(i["high_price"] for i in last_data[ (-1* buy_term): ])
	if data[ judge_price["BUY"] ] > highest:
		return {"side":"BUY","price":highest}
	
	lowest = min(i["low_price"] for i in last_data[ (-1* sell_term): ])
	if data[ judge_price["SELL"] ] < lowest:
		return {"side":"SELL","price":lowest}
	
	return {"side" : None , "price":0}


# ドンチャンブレイクを判定してエントリー注文を出す関数
def entry_signal( data,last_data,flag ):
	signal = donchian( data,last_data )
	if signal["side"] == "BUY":

		# ここに買い注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)

	if signal["side"] == "SELL":

		# ここに売り注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)

	return flag



# サーバーに出した注文が約定したか確認する関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数
def close_position( data,last_data,flag ):
	
	flag["position"]["count"] += 1
	signal = donchian( data,last_data )
	
	if flag["position"]["side"] == "BUY":
		if signal["side"] == "SELL":
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			# ここに売り注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "SELL"
			flag["order"]["price"] = round(data["close_price"] * lot)
			

	if flag["position"]["side"] == "SELL":
		if signal["side"] == "BUY":
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			# ここに買い注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "BUY"
			flag["order"]["price"] = round(data["close_price"] * lot)
			
	return flag


# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	flag["records"]["slippage"].append(trade_cost)
	
	# 手仕舞った日時と保有期間を記録
	flag["records"]["date"].append(data["close_time_dt"])
	flag["records"]["holding-periods"].append( flag["position"]["count"] )
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["side"].append( "BUY" )
		flag["records"]["profit"].append( buy_profit )
		flag["records"]["return"].append( round( buy_profit / entry_price * 100, 4 ))
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["side"].append( "SELL" )
		flag["records"]["profit"].append( sell_profit )
		flag["records"]["return"].append( round( sell_profit / entry_price * 100, 4 ))
	
	return flag

# バックテストの集計用の関数
def backtest(flag):
	
	# 成績を記録したpandas DataFrameを作成
	records = pd.DataFrame({
		"Date"     :  pd.to_datetime(flag["records"]["date"]),
		"Profit"   :  flag["records"]["profit"],
		"Side"     :  flag["records"]["side"],
		"Rate"     :  flag["records"]["return"],
		"Periods"  :  flag["records"]["holding-periods"],
		"Slippage" :  flag["records"]["slippage"]
	})
	
	# 総損益の列を追加する
	records["Gross"] = records.Profit.cumsum()
	
	# 最大ドローダウンの列を追加する
	records["Drawdown"] = records.Gross.cummax().subtract(records.Gross)
	records["DrawdownRate"] = round(records.Drawdown / records.Gross.cummax() * 100,1)

	print("バックテストの結果")
	print("-----------------------------------")
	print("総合の成績")
	print("-----------------------------------")
	print("全トレード数       :  {}回".format(len(records) ))
	print("勝率               :  {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1)))
	print("平均リターン       :  {}%".format(round(records.Rate.mean(),2)))
	print("平均保有期間       :  {}足分".format( round(records.Periods.mean(),1) ))
	print("")
	print("最大の勝ちトレード :  {}円".format(records.Profit.max()))
	print("最大の負けトレード :  {}円".format(records.Profit.min()))
	print("最大ドローダウン   :  {0}円 / {1}%".format(-1 * records.Drawdown.max(), -1 * records.DrawdownRate.loc[records.Drawdown.idxmax()]  ))
	print("利益合計           :  {}円".format( records[records.Profit>0].Profit.sum() ))
	print("損失合計           :  {}円".format( records[records.Profit<0].Profit.sum() ))
	print("")
	print("最終損益           :  {}円".format( records.Profit.sum() ))
	print("手数料合計         :  {}円".format( -1 * records.Slippage.sum() ))
	
	# バックテストの計算結果を返す
	result = {
		"トレード回数"     : len(records),
		"勝率"             : round(len(records[records.Profit>0]) / len(records) * 100,1),
		"平均リターン"     : round(records.Rate.mean(),2),
		"最大ドローダウン" : -1 * records.Drawdown.max(),
		"最終損益"         : records.Profit.sum(),
		"プロフィットファクタ―" : round( -1 * (records[records.Profit>0].Profit.sum() / records[records.Profit<0].Profit.sum()) ,2)
	}
	
	return result
	


# ここからメイン処理

# バックテストに必要な時間軸のチャートをすべて取得
price_list = {}
for chart_sec in chart_sec_list:
	price_list[ chart_sec ] = get_price(chart_sec,after=1451606400)
	print("-----{}分軸の価格データをCryptowatchから取得中-----".format( int(chart_sec/60) ))
	time.sleep(10)

# テストごとの各パラメーターの組み合わせと結果を記録する配列を準備
param_buy_term  = []
param_sell_term = []
param_chart_sec = []
param_judge_price = []

result_count = []
result_winRate = []
result_returnRate = []
result_drawdown = []
result_profitFactor = []
result_gross = []

# 総当たりのためのfor文の準備
combinations = [(chart_sec, buy_term, sell_term, judge_price)
	for chart_sec in chart_sec_list
	for buy_term  in buy_term_list
	for sell_term in sell_term_list
	for judge_price in judge_price_list]

for chart_sec, buy_term, sell_term,judge_price in combinations:
	
	price = price_list[ chart_sec ]
	last_data = []
	i = 0
	
	# フラッグ変数の初期化
	flag = {
		"order":{
			"exist" : False,
			"side" : "",
			"price" : 0,
			"count" : 0
		},
		"position":{
			"exist" : False,
			"side" : "",
			"price": 0,
			"count":0
		},
		"records":{
			"date":[],
			"profit":[],
			"return":[],
			"side":[],
			"holding-periods":[],
			"slippage":[]
		}
	}
	
	while i < len(price):
		
		# ドンチャンの判定に使う期間分の安値・高値データを準備する
		if len(last_data) < buy_term or len(last_data) < sell_term:
			last_data.append(price[i])
			time.sleep(wait)
			i += 1
			continue
		
		data = price[i]
		
		if flag["order"]["exist"]:
			flag = check_order( flag )
		elif flag["position"]["exist"]:
			flag = close_position( data,last_data,flag )
		else:
			flag = entry_signal( data,last_data,flag )
		
		last_data.append( data )
		i += 1
		time.sleep(wait)


	print("--------------------------")
	print("テスト期間   :")
	print("開始時点     : " + str(price[0]["close_time_dt"]))
	print("終了時点     : " + str(price[-1]["close_time_dt"]))
	print("時間軸       : " + str(int(chart_sec/60)) + "分足で検証")
	print("パラメータ1 : " + str(buy_term)  + "期間 / 買い" )
	print("パラメータ2 : " + str(sell_term) + "期間 / 売り" )
	print(str(len(price)) + "件のローソク足データで検証")
	print("--------------------------")

	
	result = backtest( flag )
	
	
	# 今回のループで使ったパラメータの組み合わせを配列に記録する
	param_buy_term.append( buy_term )
	param_sell_term.append( sell_term )
	param_chart_sec.append( chart_sec )
	if judge_price["BUY"] == "high_price":
		param_judge_price.append( "高値/安値" )
	else:
		param_judge_price.append( "終値/終値" )
	
	
	# 今回のループのバックテスト結果を配列に記録する
	result_count.append( result["トレード回数"] )
	result_winRate.append( result["勝率"] )
	result_returnRate.append( result["平均リターン"] )
	result_drawdown.append( result["最大ドローダウン"] )
	result_profitFactor.append( result["プロフィットファクタ―"] )
	result_gross.append( result["最終損益"] )
	
	

# 全てのパラメータによるバックテスト結果をPandasで1つの表にする
df = pd.DataFrame({
	"時間軸"        :  param_chart_sec,
	"買い期間"      :  param_buy_term,
	"売り期間"      :  param_sell_term,
	"判定基準"      :  param_judge_price,
	"トレード回数"  :  result_count,
	"勝率"          :  result_winRate,
	"平均リターン"  :  result_returnRate,
	"ドローダウン"  :  result_drawdown,
	"PF"            :  result_profitFactor,
	"最終損益"      :  result_gross
})

# 列の順番を固定する
df = df[[ "時間軸","買い期間","売り期間","判定基準","トレード回数","勝率","平均リターン","ドローダウン","PF","最終損益"  ]]

# トレード回数が100に満たない記録は消す
df.drop( df[ df["トレード回数"] < 100].index, inplace=True )

# 最終結果をcsvファイルに出力
df.to_csv("result-{}.csv".format(datetime.now().strftime("%Y-%m-%d-%H-%M")) )

実行結果

これを実行すると以下のようなCSVファイルが出力されます。

プロフィットファクター(PF)のJ列を「降順」で並び変えてみると、ドンチャン・ブレイクアウトの有望なパラメーターの組み合わせがわかります。

▽ PFで並び替え後の成績ベスト30

上記のコードではトレード回数が100回に満たなかった結果データを全て捨てているので、どのパラメーターの組み合わせも最低限のサンプル数を確保できています。上位30の成績のパラメーターをみると、ほとんどが2時間足以上の時間軸なのがわかります。

▽ PFで並び替え後の成績ワースト30

逆に最もプロフィットファクターの低かった(悪かった成績)30コをみると、短い時間軸(30分足)に集中しているのがわかります。ドンチアンブレイクアウトは、あまり短い時間軸には適さない戦略の可能性がありそうです。

(ただし時間軸によってテストに使用しているローソク足の期間が違うので、厳密には比較できない点に注意してください。)

実行結果2

次はさらに範囲を絞ってテストをしてみましょう!

今度は時間軸を1時間足・2時間足だけに絞り、上値ブレイクアウトの期間・下値ブレイクアウトの期間を10~55期間の範囲にして1期間刻みでテストしています。以下のようにパラメーターを設定すればOKですね。

# バックテストのパラメーター設定

chart_sec_list  = [ 3600,7200 ]
buy_term_list   = np.arange(10,56) # 10~55までの連番の配列を作る
sell_term_list  = np.arange(10,56)

この条件でテストしてみると以下のようなCSVファイルが出力されます。

・CSV結果出力ファイル1
・CSV結果出力ファイル2

ファイル1は先ほどと同じくトレード回数が100回に満たなかったテスト結果を削除したもの、ファイル2は削除せずに全ての結果を残したものです。今回はファイル2を使ってみましょう。

▽ PFで並び替え後の成績ベスト30(1時間足)

1時間足の上位の成績を見ると、明らかに上値ブレイクアウト(買い)の期間は40前後、下値ブレイクアウト(売り)の期間は20前後が良さそう、という傾向が見えます。また判定基準は「終値」が上位を独占していますね。

▽ PFで並び替え後の成績ベスト30(2時間足)

2時間足の上位の成績を見ると、上値ブレイクアウト(買い)の期間は50以上、下値ブレイクアウト(売り)の期間は20~30期間がいいようですね。またやはり「終値」でブレイクアウトの判定をした方が、トレード回数は減るものの成績が良くなる傾向にありそうです。

より詳しく分析したい方は、上位のパラメーターを前回までの記事で学習した「月別集計」「資産曲線」などを使って調べてみるといいでしょう。

Pythonコードの解説

さて、それでは新しく変更した部分のpythonコードを解説しておきましょう!

基本的なコードは前回まで勉強した内容とほとんど変わっていないことに気付いたと思います。要するに、今まで作成してきた「While文でローソク足を回してバックテストし、最終成績をbacktest() で集計するコード」を、まるまる上記で説明した「総当たりのfor文」の中に入れるだけですね。

図にすると以下のような感じです。

⇒ 各パラメーターの組み合わせでループする ⇒ ローソク足のデータを1本ずつループする ⇒ 全てのトレード成績を配列に記録する ⇒ トレード結果の配列を集計してpandasで表にする ⇒ 各バックテストの結果を計算して配列に記録する⇒ バックテスト結果の配列を集計してpandasで表にする ⇒ 最終結果の表をCSVに出力する

こののような入れ子構造になっている点だけ混乱しなければ、上記のコードで難しい箇所はなかったと思います。

今までの記事では、バックテスト集計用の関数 backtest() は、単に集計・計算した結果を表示(print)するだけでしたが、今回のパラメーター最適化のコードでは、各バックテストの結果を返すように変更する必要があります。

そのためバックテスト集計用の関数に以下の部分を足しています。

# バックテストの計算結果を返す
result = {
	"トレード回数"     : len(records),
	"勝率"             : round(len(records[records.Profit>0]) / len(records) * 100,1),
	"平均リターン"     : round(records.Rate.mean(),2),
	"最大ドローダウン" : -1 * records.Drawdown.max(),
	"最終損益"         : records.Profit.sum(),
	"プロフィットファクタ―" : round( -1 * (records[records.Profit>0].Profit.sum() / records[records.Profit<0].Profit.sum()) ,2)
}

return result

各バックテストの結果を配列に記録して、それを最後にpandasで1つの表にする方法は、前回までの記事の「各トレード結果を配列に記録して最後に集計する方法」と全く同じなので、説明は省きます。

drop() で不要な行を削除する

唯一新しく登場したのは、最後のdrop()の箇所でしょう。

今回のコードでは、トレード回数の少ないパラメーターの組み合わせを除外できるようにしています。それが以下の部分のコードです。

# トレード回数が100に満たない記録は消す
df.drop( df[ df["トレード回数"] < 100].index, inplace=True )

表名.drop()は、行のindexを指定することで、その行を表から削除することのできる関数です。これを使って以下のような仕組みで、トレード回数の少なすぎるバックテスト結果を削除しています。


# トレード回数が100未満のデータを抽出
df[df["トレード回数"] < 100]

# トレード回数が100未満のデータの行のindexを取得
df[df["トレード回数"] < 100].index

# トレード回数が100未満のデータを行ごと削除
df.drop( df[df["トレード回数"] < 100].index )

なお、inplce=Trueは、現在の表データに結果をそのまま上書きする、という意味です。以上でコードの解説はおしまいです!

まとめ

この章の前の方の記事でも解説したように、このようなパラメーター調整には常にカーブフィッティングの問題がつきまといます。以下のような点に注意しておきましょう。

(1)パラメーターの数を増やし過ぎないようにする
(2)おおまかな傾向を捉えることを目的にする
(3)トレード数(サンプル)が少なくならないようにする

一般論としていえば、今回のドンチャン・チャネル・ブレイクアウトのように広範なパラメーターのどの値を使ってもプラスの期待値が出る、という場合で、おおまかな傾向を捉える目的(例えば、20期間あたりが一番良さそう)でパラメーターを最適化するのは問題ありません。

一番問題があるのは、例えば、「6」という数字を使えば利益が出る、「8」という数字を使うとマイナスに転落する、「13」という数字を使えばプラスになる、といった全く連続性も傾向もないパラメーターを恣意的に調整することです。これをすると、過去データでしか利益の出せないパラメーター調整になります。

次回

今回の記事で、いったんバックテスト編はおしまいです。

ドンチャン・チャネルブレイクアウトで買い/売りの期間を別々に設定できるようにする

次回の記事「pythonで最適なパラメーターを自動的に探索する」での練習にあたって、まず先に以下のようなパラメーターを自由に設定できるようにドンチャン・ブレイクアウトのコードを改良しておきましょう。

設定可能なパラメーター

1)X分足の時間軸を使う
2)n期間の最高値のブレイクアウトで買う
3)m期間の最安値のブレイクアウトで売る
4)ブレイクアウトの判定に、高値/安値 or 終値/終値 を使う

今回の記事では、これらのパラメーターを設定できるようにコードを修正する方法を解説し、最後に1時間足で買い(30期間)、売り(20期間)、判定基準(終値)でのバックテスト結果を見てみましょう。

Pythonコード

先に改良版のコード全体を記します。


import requests
from datetime import datetime
import time
import matplotlib.pyplot as plt
import pandas as pd


#--------設定項目--------

chart_sec = 3600           #  1時間足を使用
buy_term =  30             #  買いエントリーのブレイク期間の設定
sell_term = 30             #  売りエントリーのブレイク期間の設定
judge_price={
  "BUY" : "high_price",    #  ブレイク判断 高値(high_price)か終値(close_price)を使用
  "SELL": "low_price"      #  ブレイク判断 安値 (low_price)か終値(close_price)を使用
}
wait = 0                   #  ループの待機時間
lot = 1                    #  BTCの注文枚数
slippage = 0.001           #  手数料・スリッページ

#------------------------


# CryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# 時間と高値・安値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) + " 終値: " + str(data["close_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag


# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
	
	highest = max(i["high_price"] for i in last_data[ (-1* buy_term): ])
	if data[ judge_price["BUY"] ] > highest:
		return {"side":"BUY","price":highest}
	
	lowest = min(i["low_price"] for i in last_data[ (-1* sell_term): ])
	if data[ judge_price["SELL"] ] < lowest:
		return {"side":"SELL","price":lowest}
	
	return {"side" : None , "price":0}


# ドンチャンブレイクを判定してエントリー注文を出す関数
def entry_signal( data,last_data,flag ):
	signal = donchian( data,last_data )
	if signal["side"] == "BUY":
		flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の価格が{2}円でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]]))
		flag["records"]["log"].append(str(data["close_price"]) + "円で買いの指値注文を出します\n")

		# ここに買い注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)

	if signal["side"] == "SELL":
		flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の価格が{2}円でブレイクしました\n".format(sell_term,signal["price"],data[judge_price["SELL"]]))
		flag["records"]["log"].append(str(data["close_price"]) + "円で売りの指値注文を出します\n")

		# ここに売り注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)

	return flag



# サーバーに出した注文が約定したか確認する関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数
def close_position( data,last_data,flag ):
	
	flag["position"]["count"] += 1
	signal = donchian( data,last_data )
	
	if flag["position"]["side"] == "BUY":
		if signal["side"] == "SELL":
			flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の価格が{2}円でブレイクしました\n".format(sell_term,signal["price"],data[judge_price["SELL"]]))
			flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします\n")
			
			# ここに売り注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "SELL"
			flag["order"]["price"] = round(data["close_price"] * lot)
			

	if flag["position"]["side"] == "SELL":
		if signal["side"] == "BUY":
			flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の価格が{2}円でブレイクしました\n".format(buy_term,signal["price"],data[judge_price["BUY"]]))
			flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で買いの指値注文を入れてドテンします\n")
			
			# ここに買い注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "BUY"
			flag["order"]["price"] = round(data["close_price"] * lot)
			
	return flag


# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 手仕舞った日時と保有期間を記録
	flag["records"]["date"].append(data["close_time_dt"])
	flag["records"]["holding-periods"].append( flag["position"]["count"] )
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["side"].append( "BUY" )
		flag["records"]["profit"].append( buy_profit )
		flag["records"]["return"].append( round( buy_profit / entry_price * 100, 4 ))
		if buy_profit  > 0:
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["side"].append( "SELL" )
		flag["records"]["profit"].append( sell_profit )
		flag["records"]["return"].append( round( sell_profit / entry_price * 100, 4 ))
		if sell_profit > 0:
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag

# バックテストの集計用の関数
def backtest(flag):
	
	# 成績を記録したpandas DataFrameを作成
	records = pd.DataFrame({
		"Date"     :  pd.to_datetime(flag["records"]["date"]),
		"Profit"   :  flag["records"]["profit"],
		"Side"     :  flag["records"]["side"],
		"Rate"     :  flag["records"]["return"],
		"Periods"  :  flag["records"]["holding-periods"],
		"Slippage" :  flag["records"]["slippage"]
	})
	
	# 総損益の列を追加する
	records["Gross"] = records.Profit.cumsum()
	
	# 最大ドローダウンの列を追加する
	records["Drawdown"] = records.Gross.cummax().subtract(records.Gross)
	records["DrawdownRate"] = round(records.Drawdown / records.Gross.cummax() * 100,1)	

	# 買いエントリーと売りエントリーだけをそれぞれ抽出する
	buy_records = records[records.Side.isin(["BUY"])]
	sell_records = records[records.Side.isin(["SELL"])]
	
	# 月別のデータを集計する
	records["月別集計"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m')))
	grouped = records.groupby("月別集計")
	
	month_records = pd.DataFrame({
		"Number"   :  grouped.Profit.count(),
		"Gross"    :  grouped.Profit.sum(),
		"Rate"     :  round(grouped.Rate.mean(),2),
		"Drawdown" :  grouped.Drawdown.max(),
		"Periods"  :  grouped.Periods.mean()
		})
	
	print("バックテストの結果")
	print("-----------------------------------")
	print("買いエントリの成績")
	print("-----------------------------------")
	print("トレード回数       :  {}回".format( len(buy_records) ))
	print("勝率               :  {}%".format(round(len(buy_records[buy_records.Profit>0]) / len(buy_records) * 100,1)))
	print("平均リターン       :  {}%".format(round(buy_records.Rate.mean(),2)))
	print("総損益             :  {}円".format( buy_records.Profit.sum() ))
	print("平均保有期間       :  {}足分".format( round(buy_records.Periods.mean(),1) ))
	
	print("-----------------------------------")
	print("売りエントリの成績")
	print("-----------------------------------")
	print("トレード回数       :  {}回".format( len(sell_records) ))
	print("勝率               :  {}%".format(round(len(sell_records[sell_records.Profit>0]) / len(sell_records) * 100,1)))
	print("平均リターン       :  {}%".format(round(sell_records.Rate.mean(),2)))
	print("総損益             :  {}円".format( sell_records.Profit.sum() ))
	print("平均保有期間       :  {}足分".format( round(sell_records.Periods.mean(),1) ))
	
	print("-----------------------------------")
	print("総合の成績")
	print("-----------------------------------")
	print("全トレード数       :  {}回".format(len(records) ))
	print("勝率               :  {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1)))
	print("平均リターン       :  {}%".format(round(records.Rate.mean(),2)))
	print("平均保有期間       :  {}足分".format( round(records.Periods.mean(),1) ))
	print("")
	print("最大の勝ちトレード :  {}円".format(records.Profit.max()))
	print("最大の負けトレード :  {}円".format(records.Profit.min()))
	print("最大ドローダウン   :  {0}円 / {1}%".format(-1 * records.Drawdown.max(), -1 * records.DrawdownRate.loc[records.Drawdown.idxmax()]  ))
	print("利益合計           :  {}円".format( records[records.Profit>0].Profit.sum() ))
	print("損失合計           :  {}円".format( records[records.Profit<0].Profit.sum() ))
	print("")
	print("最終損益           :  {}円".format( records.Profit.sum() ))
	print("手数料合計         :  {}円".format( -1 * records.Slippage.sum() ))
	
	print("-----------------------------------")
	print("月別の成績")
	
	for index , row in month_records.iterrows():
		print("-----------------------------------")
		print( "{0}年{1}月の成績".format( index.year, index.month ) )
		print("-----------------------------------")
		print("トレード数         :  {}回".format( row.Number.astype(int) ))
		print("月間損益           :  {}円".format( row.Gross.astype(int) ))
		print("平均リターン       :  {}%".format( row.Rate ))
		print("月間ドローダウン   :  {}円".format( -1 * row.Drawdown.astype(int) ))

	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])

	# 損益曲線をプロット
	plt.plot( records.Date, records.Gross )
	plt.xlabel("Date")
	plt.ylabel("Balance")
	plt.xticks(rotation=50) # X軸の目盛りを50度回転
	
	plt.show()
	


# ここからメイン処理

# 価格チャートを取得
price = get_price(chart_sec,after=1451606400)

flag = {
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"date":[],
		"profit":[],
		"return":[],
		"side":[],
		"holding-periods":[],
		"slippage":[],
		"log":[]
	}
}


last_data = []
i = 0
while i < len(price):

	# ドンチャンの判定に使う期間分の安値・高値データを準備する
	if len(last_data) < buy_term or len(last_data) < sell_term:
		last_data.append(price[i])
		flag = log_price(price[i],flag)
		time.sleep(wait)
		i += 1
		continue
	
	data = price[i]
	flag = log_price(data,flag)
	
	
	if flag["order"]["exist"]:
		flag = check_order( flag )
	elif flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )
	else:
		flag = entry_signal( data,last_data,flag )
	
	last_data.append( data )
	i += 1
	time.sleep(wait)


print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")

backtest(flag)

これでドンチャン・ブレイクアウトの自動売買ロジックに、上値のブレイクアウトと下値のブレイクアウトとで異なる期間を設定できるようになりました。

またブレイクアウトの判定基準について、高値・安値を使うことも、終値を使うこともできるようになりました。

コードの解説

今までの「最もシンプルなドンチャン・ブレイクアウト」では、買い・売りともに同じ期間(例えば30期間)を使っていました。そのため、ブレイクアウトを判定する際に、比較に使うための過去のローソク足データは、配列に30個ぴったり用意しておけばOKでした。

しかし買い・売りで別々の期間を設定する場合は、そうはいきません。そこで、ドンチャンブレイクアウトを判定するための関数を以下のように改良します。

# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
	
	highest = max(i["high_price"] for i in last_data[ (-1* buy_term): ])
	if data[ judge_price["BUY"] ] > highest:
		return {"side":"BUY","price":highest}
	
	lowest = min(i["low_price"] for i in last_data[ (-1* sell_term): ])
	if data[ judge_price["SELL"] ] < lowest:
		return {"side":"SELL","price":lowest}
	
	return {"side" : None , "price":0}

ここでは、過去のローソク足データ(last_data)を30個丁度にキープするのではなく、どんどん溜めていって、後ろから欲しい数だけを取り出すようにしています。

例えば、上値ブレイクアウトの基準が30期間、下値ブレイクアウトの基準が20期間であれば、買いエントリーの判定では last_data の後ろから30コを取り出し、売りエントリーの判定では last_data の後ろから20コを取り出すようにしています。

ここで新しく使っているのが、スライス(:)という記述方法です。

スライスとは

スライスは、配列データの範囲を指定したり、pandasの表データで行や列の範囲を指定するときに、非常に便利な記述方法です。以下のように書くことで、配列や表(行・列)の範囲を指定することができます。

[2:5] 3番目~5番目まで
[:5]   先頭~5番目まで
[3:]   4番目~最後まで
[-3:] 後ろから3番目~最後まで

プログラミングの世界では、先頭は0番目から数えるので、指定した数字から1つズレる点に注意してください。最後はズレません。

number = [0,1,2,3,4,5,6,7]

print( number[2:5] )
[2, 3, 4]

print( number[:5] )
[0, 1, 2, 3, 4]

print( number[3:] )
[3, 4, 5, 6, 7]

print( number[-3:] )
[5, 6, 7]

そのため、上値ブレイクアウトの判定に必要な期間分のローソク足データは、以下のように指定することができます。

last_data[ (-1* buy_term): ]

この知識を使うと、ブレイクアウトの判定に必要な期間のデータを取り出しその中から最大値を抽出する、というコードを以下の1行で書くことができます。

highest = max(i["high_price"] for i in last_data[ (-1* buy_term): ])

2.ブレイクアウトの判定基準に「終値」を使えるようにする

これは簡単ですね。
変更前のコードで、data["high_price"] と指定していた部分を変数にすればいいだけです。

# 変更前
if data["high_price"] > highest:
	return {"side":"BUY","price":highest}

# 変更後
#--------設定項目--------
judge_price={
  "BUY" : "high_price", # high_price か close_price を指定
  "SELL": "low_price"   # low_price  か close_price を指定
}
#-----------------------

if data[ judge_price["BUY"] ] > highest:
	return {"side":"BUY","price":highest}

3.メイン処理のループ文の変更

最後にメイン処理のWhile文ループの中身を修正します。

# 変更前
last_data = []
i = 0
while i < len(price):

	# ドンチャンの判定に使う期間分の安値・高値データを準備する
	if len(last_data) < term:
		(略)
		continue

# 変更後
	# ドンチャンの判定に使う期間分の安値・高値データを準備する
	if len(last_data) < buy_term or len(last_data) < sell_term:
		(略)
		continue

売り・買いエントリーで別々の期間を使用することになるので、最初に buy_term と sell_term の両方の期間分を超えるまでデータを蓄積します。

# 変更前
	# 過去データを30個に保つために先頭を削除
	del last_data[0]

# 変更後
	(削除)

またスライスを使うことで、過去のローソク足データをぴったりn個にキープしておく必要がなくなったので、上記の行は削除しておきます。これで完成です!

あとはログファイルの出力が「直近の安値が~」「直近の高値が~」になっているので、そこは各自で修正しておいてください。(上記のコードでは修正済)

実行結果

試しに1時間足で、買いエントリーは30期間の最高値ブレイクアウト、売りエントリーは20期間の最安値ブレイクアウト、各ブレイクアウトの判定には終値を使う、という場合の成績を見てみましょう。

# パラメーター設定
chart_sec = 3600
buy_term =  30
sell_term = 20
judge_price={
  "BUY" : "close_price",
  "SELL": "close_price"
}

すると以下のようになります。

損益曲線

成績

--------------------------
テスト期間:
開始時点 : 2017/08/15 01:00
終了時点 : 2018/04/22 08:00
6000件のローソク足データで検証
--------------------------
バックテストの結果
-----------------------------------
買いエントリの成績
-----------------------------------
トレード回数       :  40回
勝率               :  52.5%
平均リターン       :  4.39%
総損益             :  1628677円
平均保有期間       :  70.1足分
-----------------------------------
売りエントリの成績
-----------------------------------
トレード回数       :  40回
勝率               :  52.5%
平均リターン       :  1.77%
総損益             :  1223424円
平均保有期間       :  74.7足分
-----------------------------------
総合の成績
-----------------------------------
全トレード数       :  80回
勝率               :  52.5%
平均リターン       :  3.08%
平均保有期間       :  72.4足分

最大の勝ちトレード :  545284円
最大の負けトレード :  -220610円
最大ドローダウン   :  -381395円 / -13.2%
利益合計           :  4905987円
損失合計           :  -2053886円

最終損益           :  2852101円
手数料合計         :  -84811円
-----------------------------------
月別の成績
-----------------------------------
2017年8月の成績
-----------------------------------
トレード数         :  4回
月間損益           :  -27411円
平均リターン       :  -1.34%
月間ドローダウン   :  -13500円
-----------------------------------
2017年9月の成績
-----------------------------------
トレード数         :  10回
月間損益           :  88980円
平均リターン       :  1.96%
月間ドローダウン   :  -20785円
-----------------------------------
2017年10月の成績
-----------------------------------
トレード数         :  10回
月間損益           :  86498円
平均リターン       :  2.21%
月間ドローダウン   :  -92319円
-----------------------------------
2017年11月の成績
-----------------------------------
トレード数         :  9回
月間損益           :  375451円
平均リターン       :  5.46%
月間ドローダウン   :  -114141円
-----------------------------------
2017年12月の成績
-----------------------------------
トレード数         :  6回
月間損益           :  988615円
平均リターン       :  8.9%
月間ドローダウン   :  -220610円
-----------------------------------
2018年1月の成績
-----------------------------------
トレード数         :  14回
月間損益           :  425674円
平均リターン       :  1.03%
月間ドローダウン   :  -381395円
-----------------------------------
2018年2月の成績
-----------------------------------
トレード数         :  11回
月間損益           :  374125円
平均リターン       :  3.46%
月間ドローダウン   :  -316547円
-----------------------------------
2018年3月の成績
-----------------------------------
トレード数         :  9回
月間損益           :  305010円
平均リターン       :  3.1%
月間ドローダウン   :  -327540円
-----------------------------------
2018年4月の成績
-----------------------------------
トレード数         :  7回
月間損益           :  235159円
平均リターン       :  3.93%
月間ドローダウン   :  -38818円

※ ここでの月間ドローダウンは、その月に経験している継続中の最大ドローダウンのことです。その月だけのドローダウンを計算しているわけではありません。

このように、同じドンチャン・ブレイクアウトのロジックでも、買い判定の期間、売り判定の期間、ブレイクアウトの判定を高値(安値)でするか、終値でするか、などの設定によってかなり異なる結果になります。

しかしどのようなパラメーター設定が最適なのかを手作業で探すのは困難です。以下のようなパラメーター設定だけでも、その組み合わせの数は膨大だからです。

最適なパラメーターの組み合わせ

1)使用する時間軸
⇒ 15分足/30分足/1時間足/2時間足/6時間足

2)上値ブレイクアウトの判定期間
⇒ 10/15/20/25/30/35/40/45

3)下値ブレイクアウトの判定期間
⇒  10/15/20/25/30/35/40/45

4)判定に使用する価格
⇒  高値・安値/終値・終値

上記の組み合わせを試すだけでも、そのパターンは640通りにもなります。ちょっと手作業で1つ1つ結果をテストする気にはなりませんよね(笑)

こういうときこそpythonの出番です! 次回の記事では、pythonで自動的に最適なパラメーターを探索する方法を解説します!

次回記事:一番成績のいい最適なパラメーターを自動で探索する

Pandasを使ってBTCFXの自動売買BOTの月別の成績を集計しよう!

前回の記事では、ドンチャンブレイクアウトの自動売買BOTの成績を、matplotlibを使って視覚的なグラフにする方法や、最大ドローダウンを計算する方法を説明しました。

しかし自動売買BOTの安定性を評価するためには、最終的な損益だけでなく途中の経過、つまり月別の成績を把握しておくことも必須です。

今回の記事では、これらの成績(平均リターン・ドローダウン・勝率など)を月別に集計する方法を解説します!

Pandasを使ってデータを集計しよう!

前回までのやり方の問題点

前回までの記事では、なるべくpythonのプログラミング初心者でもわかりやすいように、すべてのトレード成績を配列に記録していました。

flag変数に以下のような配列を用意して、ポジションを手仕舞うたびに各トレードの成績や指標を計算し、各配列に append していましたよね。

flag = {
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		"buy-holding-periods":[],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		"sell-holding-periods":[],
		
		"drawdown": 0,
		"date":[],
		"gross-profit":[0],
		"slippage":[],
		"log":[]
	}
}

しかしこのやり方だと、買いエントリーと売りエントリーに分けて成績を記録しているため、違う切り口でデータを集計したいときに凄く不便です。

例えば、「売り・買いで分けずに、月別の平均リターンや勝率を検証したい」と思った場合、売り・買いそれぞれのデータを(順番がおかしくならないように)くっつけて、それを日付で分割する、というややこしい処理をしなければなりません。

新しく月別の勝率を記録する変数を用意する方法もありますが、そうするとflag変数がどんどん増えていってしまい、pythonコードがどんどん長くややこしくなります。

もっとシンプルにデータを記録しておいて、あとから欲しい数字だけを上手に集計する方法はないでしょうか?

表のような形式で成績を記録する

もし以下のような表形式で各トレードの成績を記録しておけば、どうでしょうか? Excelのシートのようなイメージですね。

日付 損益 方向 保有期間
2018-02-04 20143円 買いエントリ 6足分
2018-02-04 22121円 売りエントリ 14足分
2018-02-05 33412円 売りエントリ 13足分
2018-03-06 -2312円 買いエントリ 15足分
2018-03-08 -16125円 売りエントリ 9足分

もし、このような表データを1つの変数として保持することができ、後から以下のような計算ができれば便利です。

————————————————————–
例)
・B列の「損益」のうちC列が「買い」のものだけを合計する
・A列の日付が2018-02のものだけで、B列の「損益」の平均値を求める
・後からE列を追加してそこに左列の損益の合計値(累積和)を追加する
・B列の「損益」のうち値がプラスのもの(=勝ち数)だけをカウントする
————————————————————–

このようなことは、Excelシートだと簡単にできますよね。同じことがpythonでも出来ないでしょうか?

実は、全く同じようなことがpythonでできるライブラリがあります。それがPandasデータフレームです。

Pandasは表計算ソフトのようなもの

Pythonを勉強しはじめた初心者の方でも、「Pandas」という言葉を聞いたことがあるかもしれません。「なんだか上級者向きの難しい奴でしょ?」というイメージをお持ちの方も多いかもしれません。

しかしPandasは全く難しくありません。要するに「Excelのような表計算ソフトと同じようなものだ」と思えば、文系の方でも少し親近感が湧きますよね。

この記事では、バックテストのデータ集計に必要なpandasの使い方を、1回でほぼ全て習得することを目指します! ぜひ今回の記事でpandasの使い方をマスターしてしまいましょう!

Pandasの使い方をマスターする

1)まずは表型のデータを作ってみよう!

pandasでは、複数行・複数列にわたる表形式のデータのことを「DataFrame」といいます。まずは練習としてDataFrame型の変数を1つ作ってみましょう!

今回は、以下のような5回分のトレード成績を記録した配列を用意しました。これは先ほど表の例と全く同じものです。今回はこちらを例に説明していきます。


# 今回の練習で使う5回分のトレード成績のサンプル

Date = ["2018-02-04","2018-02-04","2018-02-05","2018-03-06","2018-03-08"]
Profit = [20143,22121,33412,-2312,-16125]
Side = ["BUY","SELL","SELL","BUY","SELL"]
Rate = [0.024,0.012,0.014,-0.022,-0.019]
Periods = [6,14,13,15,9]

まずは上記のように、「日付」「各トレードの損益」「エントリーの方向」「保有期間」など、表を作るために必要な最低限の情報を記録した配列を用意します。

「トレード回数」「ドローダウン」「総損益」「買いエントリーの勝率」など、後から表をもとに計算できる数字は、この時点で準備する必要はありません。つまりトレードするたびに記録する数字は、上記の5種類だけでOKということです。

すべてのトレードが終わったら、上記の配列をくっつけて1つのDataFrame型の表データに変換します。それが以下のコードです。

import pandas as pd

# 上の配列データをくっつけて1つの表データにする

records = pd.DataFrame({
	"Date":pd.to_datetime(Date), # 文字を日付型に変換
	"Profit":Profit,
	"Side":Side,
	"Rate":Rate,
	"Periods":Periods
})

各配列を縦向きの「列」として結合し、1つのDataFrame型の表にして records という名前の変数に入れています。

これを公式にすると、以下のような感じですね。

表型の変数名 = pd.DataFrame({
	"列名" : データ配列 ,
	"列名" : データ配列 ,
	"列名" : データ配列 ,
	"列名" : データ配列
})

日付データだけは、pd.to_datetime()を使って、テキスト型から日付型に変換しておきます。その理由は、前回の記事で解説しています。

さて、これで先ほどイメージした表データをpythonで作ることができました! 念のためにprintしてみましょう!

print( records )

以下のようになります。

▽ 上記のコードで作った配列データ

▽ 今回の練習で使う表

欲しい数字だけを集計して取り出そう!

では、次にこの表データ(records)から「欲しい数字」を取り出していきましょう。 以下、よく使うパターンをまとめて列挙していきます。

1.損益データだけを取り出す

print( records.Profit )

表名.列名で取り出す

列名が英文字の場合は、records.profit のように「表名.列名」指定できます。 列名が日本語の場合は、records[“損益”] のように指定します。

2.平均リターンを計算する

print( records.Profit.mean() )

表名.列名.mean()で列の平均値を計算する

3.損益の合計を計算する

print( records.Profit.sum() )

表名.列名.sum()で列の合計値を計算する

3.特定の行のデータを取り出す

print( records.iloc[2] )

表名.iloc[2]で2行目だけを取り出す

locは、loc[ 行名, 列名 ] のかたちで表データの範囲を指定する記述方法です。一方、ilocは、iloc[ 行番号, 列番号 ]のかたちで表データ範囲を指定する記述方法です。

これらは以下のサイトに詳しい説明があります。

参考:at/iat/loc/ilocの使い方

4.買いエントリーのデータを取り出す

print( records[records.Side.isin(["BUY"])] )

表名[ 表名.列名.isin([値]) ]でSide列が”BUY”の行だけ取り出す

5.買いエントリーの損益だけを取り出す

print( records[records.Side.isin(["BUY"])].Profit )

表名[ 表名.列名.isin([値]) ].列名でSide列が”BUY”の行のProfit列を取り出す

6.買いエントリーの平均リターンを計算する

print( records[records.Side.isin(["BUY"])].Profit.mean() )

表名[ 表名.列名.isin([値]) ].mean() でSide列が BUYの行のProfit列の平均値を計算する

7.トレード回数をカウントする

# 全トレード数をカウントする
print( len(records) )

# 買いエントリーのトレード数をカウントする
print( len( records[records.Side.isin(["BUY"])] ))

len(表名) で行数を数える / len(表名[ 表名.列名.isin([値]) ])でSide列が BUYの行の行数を数える

len( 表名 )のかわりに、表名.列名.count() と書いても構いません。

8.勝率を計算する

# 全体の勝ちトレードの数をカウントする
print( len(records[records.Profit > 0]) )

# 全体の勝率を計算する
print( len(records[records.Profit > 0]) / len(records) * 100 )

Profit列がプラスの行だけ選択する / len(表名)で勝利トレードの行数を数えて全体の行数で割る

勝率は、勝ちトレード数 / 全体のトレード数 で計算できます。そのため、Profit列が0以上の行の数を数えて、それを全体の行数で割れば、全体の勝率が計算できます。

# 買いエントリーの勝ちトレードだけを取り出す
print( records[records.Side.isin(["BUY"]) & records.Profit>0] )

# 買いエントリーの勝ちトレード数をカウントする
print( len(records[records.Side.isin(["BUY"]) & records.Profit>0]) )

# 買いエントリーの勝率を計算する
print( len(records[records.Side.isin(["BUY"]) & records.Profit>0]) / len(records.Side.isin(["BUY"])) * 100 )

同じことを、Side列が BUY の行だけに限定して行えば、買いトレードの勝率を計算できます。

9.各日付時点での総損益を新しい列に追加する

records["Gross"] = records.Profit.cumsum()

表名.列名.cumsum() でProfit列の累積和を計算する

後から新しい列を追加したいときは、records[“新しい列名”] = 式 と書くことで新しい列を作成できます。また総損益とは、要するに利益の累積和のことなので、Profit列のcumsum()を計算します。

10.各日付時点のドローダウンを新しい列に追加する

records["Drawdown"] = records.Gross.cummax().subtract( records.Gross )

cummax()で累積最大値を調べることができます。累積最大値とはその行までの最大値のことです。例えば、Gross列の2行目までの累積最大値は42264、3行目までの累積最大値は75676になります。

最大ドローダウンとは、要するに「 n行目までの総利益の累積最大値から n行目の総利益を差し引いたもの」なので、上記の式で各行の最大ドローダウンを計算できます。引き算には、subtract()を使います。

終了

はい!
ここまでで、今までの記事で勉強してきたバックテストの成績の指標は、すべてpandasに置き換えることができました! 何となくpandasの便利さを実感できたのではないでしょうか?

月別にバックテスト結果を集計しよう!

pandasではさらにグルーピングという便利な機能があります。上記で作成した表データを、月ごとにグルーピングしてみましょう!

以下のように書くだけです。


records["月別集計"] = records.Date.apply(lambda x: x.strftime('%Y/%m'))
grouped = records.groupby("月別集計")

month_records = pd.DataFrame({
	"Gross" : grouped.Profit.sum(),
	"Rate" : round(grouped.Rate.mean()*100,1),
	"Drawdown" : grouped.Drawdown.max(),
	"Periods" : grouped.Periods.mean()
	})
print("---------------------------------------------------")
print( month_records )

groupby()でグループ分けできる!

groupby( 列名 )で特定の列を指定すると、その列の値が同じもの同士をグループ化することができます。例えば、groupby(“Side”)でグループ分けすれば、買いエントリーと売りエントリーとでデータをグループ分けできます。

groupby(列名)で列の値が同じもの同士をグループ化する

ここでは「月別」にデータをグループ分けします。ただし2017年3月と2018年3月が、同じ「3月」でグルーピングされると困りますよね。「〇年〇月」までをセットでグルーピングしなければなりません。

そのため、最初に以下の1行を書いています。

records["月別集計"] = records.Date.apply(lambda x: x.strftime('%Y/%m'))

ここでは、グループ分けをするために専用の新しい列を作っています。

もともと存在したDate列のデータを「年/月」のかたちに変形し、それを新しく作った「月別集計」という列に入れる、という処理をしています。

補足)lambda と apply() で全行のデータを変形する

ここには、apply()とlambdaという新しい記述方法が登場しています。

lambda は「 lambda 変数 : 式 」と書いて、変数を式のかたちに変形する記述方法です。表データに対して、apply( lambda 引数 : 式 ) と記述することで、表データのすべての行に対して、まとめて同じ式の処理を実行することができます。

以下の例をみるとわかりやすいでしょう。

(例)

number = pd.DataFrame({"data":[10,20,30,40,50]})
number = number.apply(lambda x : x+5)

# これを実行するとデータは{ "data":[15,25,35,45,55] } になります。

for文のような処理を、表(DataFrame型)に適用するための記述方法ですね。もっと興味がある方は、apply()やlambdaを調べて勉強してみてください。

さて、これで「年/月」という新しい列ができたので、この列を基準にグループ分けをします。それが以下の行です。

# グループ化する処理
grouped = records.groupby("月別集計")

この1行だけで、「月別集計」の列が同じもの同士をグループ化することができます。

グループ化したデータの使い方

グループ化したデータの結果は、以下のようなかたちで取り出すことができます。

# グループ化したデータから値を取り出す

grouped.Profit.sum()  # グループごとのProfit列の合計値の配列を返す
grouped.Profit.mean() # グループごとのProfit列の平均値の配列を返す
grouped.Profit.max()  # グループごとのProfit列の最大値の配列を返す

上記の結果を実行すれば、例えば、「2018/2」「2018/3」「2018/4」などのグループの、それぞれの利益の合計値、平均値、最大値を取り出すことができます。

月別のデータ集計といっても、欲しい数字は項目によって異なります。例えば、総損益なら合計値(sum)が必要ですし、平均リターンなら平均値(mean)が必要です。最大ドローダウンであれば最大値(max)が必要でしょう。

さて、それぞれ必要なデータを取り出したら、それを「列」として結合して新しい表(DataFrame)を作ります。配列データをくっつけて1つの表データにする方法は、1番最初に勉強しました。覚えていますよね?


month_records = pd.DataFrame({
	"Gross" : grouped.Profit.sum(),
	"Rate" : round(grouped.Rate.mean()*100,1),
	"Drawdown" : grouped.Drawdown.max(),
	"Periods" : grouped.Periods.mean()
	})
print("---------------------------------------------------")
print( month_records )

これを実行すると以下のようになります。

実行結果

これで最初の5回分のトレード成績を、以下のように月別にグルーピングできました。

月別集計 最大ドローダウン 平均リターン 月間損益 平均保有期間
2018/2 0円 1.7% 75676円 11期間
2018/3 -18437円 -2.0% -18437円 12期間

練習問題

ドンチャン・ブレイクアウトBOTの月別リターンを集計しよう!

では、最後にここまで勉強したpandasの知識を使って、前回までの記事で作成した「ドンチャンブレイクアウトBOT」のバックテストのコードを改良しましょう!

以下のように月別の成績を集計して表示できるようにします! 具体的なコードは以下の記事に記載していますが、まずはご自身で挑戦してみてください。


・解答のサンプル記事はこちら

Pandasの練習問題ーBTCFXのドンチャンブレイクアウトBOTの月別成績を集計する

前回の記事「Pandasを使ってBTCFXの自動売買BOTの月別の成績を集計しよう!」の練習問題の回答コードです。

1時間足での30期間ドンチャン・ブレイクアウトBOTの月別の成績を集計してみましょう! なお、今回のコードではPandasの集計方法を使って以下のような指標も加えておきました。

・全トレードの回数と勝率
・全トレードの平均リターン
・最大の勝ちトレードでの利益
・最大の負けトレードでの損失
・最終的な利益合計
・最終的な損失合計

これらも自動売買BOTの成績を評価する際に参考にしてみてください。


import requests
from datetime import datetime
import time
import matplotlib.pyplot as plt
import pandas as pd

#-----設定項目

chart_sec = 3600    # 1時間足を使用
term = 30           # 過去n期間の設定
wait = 0            # ループの待機時間
lot = 1             # BTCの注文枚数
slippage = 0.001    # 手数料・スリッページ


# CryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# 時間と高値・安値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag


# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
	
	highest = max(i["high_price"] for i in last_data)
	if data["high_price"] > highest:
		return {"side":"BUY","price":highest}
	
	lowest = min(i["low_price"] for i in last_data)
	if data["low_price"] < lowest:
		return {"side":"SELL","price":lowest}
	
	return {"side" : None , "price":0}


# ドンチャンブレイクを判定してエントリー注文を出す関数
def entry_signal( data,last_data,flag ):
	signal = donchian( data,last_data )
	if signal["side"] == "BUY":
		flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
		flag["records"]["log"].append(str(data["close_price"]) + "円で買いの指値注文を出します\n")

		# ここに買い注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)

	if signal["side"] == "SELL":
		flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
		flag["records"]["log"].append(str(data["close_price"]) + "円で売りの指値注文を出します\n")

		# ここに売り注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)

	return flag



# サーバーに出した注文が約定したか確認する関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数
def close_position( data,last_data,flag ):
	
	flag["position"]["count"] += 1
	signal = donchian( data,last_data )
	
	if flag["position"]["side"] == "BUY":
		if signal["side"] == "SELL":
			flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
			flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします\n")
			
			# ここに売り注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "SELL"
			flag["order"]["price"] = round(data["close_price"] * lot)
			

	if flag["position"]["side"] == "SELL":
		if signal["side"] == "BUY":
			flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
			flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で買いの指値注文を入れてドテンします\n")
			
			# ここに買い注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "BUY"
			flag["order"]["price"] = round(data["close_price"] * lot)
			
	return flag


# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 手仕舞った日時と保有期間を記録
	flag["records"]["date"].append(data["close_time_dt"])
	flag["records"]["holding-periods"].append( flag["position"]["count"] )
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["side"].append( "BUY" )
		flag["records"]["profit"].append( buy_profit )
		flag["records"]["return"].append( round( buy_profit / entry_price * 100, 4 ))
		if buy_profit  > 0:
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["side"].append( "SELL" )
		flag["records"]["profit"].append( sell_profit )
		flag["records"]["return"].append( round( sell_profit / entry_price * 100, 4 ))
		if sell_profit > 0:
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag

# バックテストの集計用の関数
def backtest(flag):
	
	# 成績を記録したpandas DataFrameを作成
	records = pd.DataFrame({
		"Date"     :  pd.to_datetime(flag["records"]["date"]),
		"Profit"   :  flag["records"]["profit"],
		"Side"     :  flag["records"]["side"],
		"Rate"     :  flag["records"]["return"],
		"Periods"  :  flag["records"]["holding-periods"],
		"Slippage" :  flag["records"]["slippage"]
	})
	
	# 総損益の列を追加する
	records["Gross"] = records.Profit.cumsum()
	
	# 最大ドローダウンの列を追加する
	records["Drawdown"] = records.Gross.cummax().subtract(records.Gross)
	records["DrawdownRate"] = round(records.Drawdown / records.Gross.cummax() * 100,1)
	
	# 買いエントリーと売りエントリーだけをそれぞれ抽出する
	buy_records = records[records.Side.isin(["BUY"])]
	sell_records = records[records.Side.isin(["SELL"])]
	
	# 月別のデータを集計する
	records["月別集計"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m')))
	grouped = records.groupby("月別集計")
	
	month_records = pd.DataFrame({
		"Number"   :  grouped.Profit.count(),
		"Gross"    :  grouped.Profit.sum(),
		"Rate"     :  round(grouped.Rate.mean(),2),
		"Drawdown" :  grouped.Drawdown.max(),
		"Periods"  :  grouped.Periods.mean()
		})
	
	print("バックテストの結果")
	print("-----------------------------------")
	print("買いエントリの成績")
	print("-----------------------------------")
	print("トレード回数       :  {}回".format( len(buy_records) ))
	print("勝率               :  {}%".format(round(len(buy_records[buy_records.Profit>0]) / len(buy_records) * 100,1)))
	print("平均リターン       :  {}%".format(round(buy_records.Rate.mean(),2)))
	print("総損益             :  {}円".format( buy_records.Profit.sum() ))
	print("平均保有期間       :  {}足分".format( round(buy_records.Periods.mean(),1) ))
	
	print("-----------------------------------")
	print("売りエントリの成績")
	print("-----------------------------------")
	print("トレード回数       :  {}回".format( len(sell_records) ))
	print("勝率               :  {}%".format(round(len(sell_records[sell_records.Profit>0]) / len(sell_records) * 100,1)))
	print("平均リターン       :  {}%".format(round(sell_records.Rate.mean(),2)))
	print("総損益             :  {}円".format( sell_records.Profit.sum() ))
	print("平均保有期間       :  {}足分".format( round(sell_records.Periods.mean(),1) ))
	
	print("-----------------------------------")
	print("総合の成績")
	print("-----------------------------------")
	print("全トレード数       :  {}回".format(len(records) ))
	print("勝率               :  {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1)))
	print("平均リターン       :  {}%".format(round(records.Rate.mean(),2)))
	print("平均保有期間       :  {}足分".format( round(records.Periods.mean(),1) ))
	print("")
	print("最大の勝ちトレード :  {}円".format(records.Profit.max()))
	print("最大の負けトレード :  {}円".format(records.Profit.min()))
	print("最大ドローダウン   :  {0}円 / {1}%".format(-1 * records.Drawdown.max(),  -1 * records.DrawdownRate.loc[records.Drawdown.idxmax()]  ))
	print("利益合計           :  {}円".format( records[records.Profit>0].Profit.sum() ))
	print("損失合計           :  {}円".format( records[records.Profit<0].Profit.sum() ))
	print("")
	print("最終損益           :  {}円".format( records.Profit.sum() ))
	print("手数料合計         :  {}円".format( -1 * records.Slippage.sum() ))
	
	print("-----------------------------------")
	print("月別の成績")
	
	for index , row in month_records.iterrows():
		print("-----------------------------------")
		print( "{0}年{1}月の成績".format( index.year, index.month ) )
		print("-----------------------------------")
		print("トレード数         :  {}回".format( row.Number.astype(int) ))
		print("月間損益           :  {}円".format( row.Gross.astype(int) ))
		print("平均リターン       :  {}%".format( row.Rate ))
		print("月間ドローダウン   :  {}円".format( -1 * row.Drawdown.astype(int) ))

	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])

	# 損益曲線をプロット
	plt.plot( records.Date, records.Gross )
	plt.xlabel("Date")
	plt.ylabel("Balance")
	plt.xticks(rotation=50) # X軸の目盛りを50度回転
	
	plt.show()
	


# ここからメイン処理

# 価格チャートを取得
price = get_price(chart_sec,after=1483228800)

flag = {
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"date":[],
		"profit":[],
		"return":[],
		"side":[],
		"holding-periods":[],
		"slippage":[],
		"log":[]
	}
}


last_data = []
i = 0
while i < len(price):

	# ドンチャンの判定に使う過去30期間分の安値・高値データを準備する
	if len(last_data) < term:
		last_data.append(price[i])
		flag = log_price(price[i],flag)
		time.sleep(wait)
		i += 1
		continue
	
	data = price[i]
	flag = log_price(data,flag)
	
	
	if flag["order"]["exist"]:
		flag = check_order( flag )
	elif flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )
	else:
		flag = entry_signal( data,last_data,flag )
	
	
	# 過去データを30個に保つために先頭を削除
	del last_data[0]
	last_data.append( data )
	i += 1
	time.sleep(wait)


print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")

backtest(flag)

実行結果

以下が1時間足でシンプルな30期間のドンチャンブレイクBOTを、2017年8月~2018年4月にかけて運用した場合の成績です。

損益グラフ

ドンチャン・ブレイクアウトBOTの成績


(base) C:\Pydoc>python test.py
--------------------------
テスト期間:
開始時点 : 2017/08/13 08:00
終了時点 : 2018/04/20 15:00
6000件のローソク足データで検証
--------------------------
バックテストの結果
-----------------------------------
買いエントリの成績
-----------------------------------
トレード回数       :  49回
勝率               :  51.0%
平均リターン       :  3.04%
総損益             :  1090811円
平均保有期間       :  62.2足分
-----------------------------------
売りエントリの成績
-----------------------------------
トレード回数       :  49回
勝率               :  42.9%
平均リターン       :  0.85%
総損益             :  670134円
平均保有期間       :  56.7足分
-----------------------------------
総合の成績
-----------------------------------
全トレード数       :  98回
勝率               :  46.9%
平均リターン       :  1.94%
平均保有期間       :  59.5足分

最大の勝ちトレード :  545284円
最大の負けトレード :  -296151円
最大ドローダウン   :  -483332円 / -24.4%
利益合計           :  4591094円
損失合計           :  -2830149円

最終損益           :  1760945円
手数料合計         :  -104961円
-----------------------------------
月別の成績
-----------------------------------
2017年8月の成績
-----------------------------------
トレード数         :  6回
月間損益           :  1552円
平均リターン       :  0.08%
月間ドローダウン   :  -20319円
-----------------------------------
2017年9月の成績
-----------------------------------
トレード数         :  10回
月間損益           :  139626円
平均リターン       :  3.02%
月間ドローダウン   :  -20785円
-----------------------------------
2017年10月の成績
-----------------------------------
トレード数         :  12回
月間損益           :  119928円
平均リターン       :  2.12%
月間ドローダウン   :  -50845円
-----------------------------------
2017年11月の成績
-----------------------------------
トレード数         :  11回
月間損益           :  405276円
平均リターン       :  4.89%
月間ドローダウン   :  -138695円
-----------------------------------
2017年12月の成績
-----------------------------------
トレード数         :  10回
月間損益           :  175018円
平均リターン       :  1.68%
月間ドローダウン   :  -364745円
-----------------------------------
2018年1月の成績
-----------------------------------
トレード数         :  14回
月間損益           :  425925円
平均リターン       :  1.08%
月間ドローダウン   :  -247323円
-----------------------------------
2018年2月の成績
-----------------------------------
トレード数         :  13回
月間損益           :  284263円
平均リターン       :  2.58%
月間ドローダウン   :  -483332円
-----------------------------------
2018年3月の成績
-----------------------------------
トレード数         :  11回
月間損益           :  155253円
平均リターン       :  1.07%
月間ドローダウン   :  -425155円
-----------------------------------
2018年4月の成績
-----------------------------------
トレード数         :  11回
月間損益           :  54104円
平均リターン       :  0.3%
月間ドローダウン   :  -216376円

月別の成績で見ても、すべての月でプラスの成績が出ています。
これはなかなか悪くない結果ですね。

こちらの月別の成績は、あくまで「ポジションを手仕舞った日時」を基準に区切っている点に注意してください。例えば、2月の成績はプラスになっていますが、これは1月から持ち越したポジションで大きな利益が出ているからです。もし2月からBOTの稼働を開始していたら、2月の損益はマイナスになります。

コードの解説

基本的には、前回の記事「Pandasを使って自動売買BOTの成績を月別に集計しよう!」で、例として解説したコードをそのまま使っています。特に難しいところは無かったのではないでしょうか。

以下のところだけ、前回の記事には登場していなかった書き方なので、追加で解説しておきます。

最大ドローダウン率

最大ドローダウン率とは、ある行の最大ドローダウンの金額を、その行までの最大資産額で割った数字です。特定の行までの最大値は、cummax()で取得できます。そのため、以下のような式になります。

records["DrawdownRate"] = round(records.Drawdown / records.Gross.cummax() * 100,1)

ただし実際に最大ドローダウン率を表示するときには注意が必要です。単にドローダウン率の中から最大値を選ぶだけだと、最大ドローダウンの金額と時期が一致するとは限らないからです。

例えば、「最大ドローダウン率でみると序盤の2月の30%が最大だけど、金額ベースでみると4月の300万円が最大ドローダウンだ」ということもあり得ます。

このとき私たちが知りたいのは、最大ドローダウン率ではなく、「最大ドローダウン金額が最大だったときのドローダウン率」であるはずです。そのため、以下のように記述します。


# 最大ドローダウンの行番号を取得
records.Drawdown.idxmax()

# 最大ドローダウンと同じ行のドローダウン率を取得
records.DrawdownRate.loc[records.Drawdown.idxmax()]

これで最大ドローダウンと同じ時期のドローダウン率を取得することができました。

pandasの表データのfor文処理

for index,row in month_records.iterrows():
		print("-----------------------------------")
		print( "{0}年{1}月の成績".format( index.year, index.month ) )
		print("-----------------------------------")
		print("トレード数         :  {}回".format( row.Number.astype(int) ))
		print("月間損益           :  {}円".format( row.Gross.astype(int) ))
		print("平均リターン       :  {}%".format( row.Rate ))
		print("月間ドローダウン   :  {}円".format( -1 * row.Drawdown.astype(int) ))

pandasの表データを1行ずつループ処理したい場合、以下のように書くことができます。

for index,row in 表データ変数.iterrows():
		# 1行ずつ処理したい内容

index (インデックス)というのは、表データから特定の「行」を検索するときの「索引」のことです。

行には通常、1行ごとに先頭から 0,1,2,3,4,5.....などの行番号が割り振られていますが、それだと検索するときに不便です。縦の列にカラム名(ここでは Gross/Rate/Drawdownなど)があるのと同じように、行にも名前を付けることができます。それが index です。

今回のコードでは、pandas の groupby()で「月別」にデータをグループ化しているため、index には「2017/08」「2017/09」「2017/10」などのDate型のデータが指定されています。そのため、for文の中で index.year / index.month を指定することで、これらの数字を print できます。

pythonで資産曲線を作って自動売買BOTの成績を視覚的に評価しよう!

自動売買BOTの世界では、長期的に使える強いトレードシステムのことを「堅牢性(ロバスト性)がある」と表現します。わかりやすい言葉でいうと「環境の変化に強く安定したシステム」のことです。

「堅牢性がある」とは、単に期待リターンが高いというだけではありません。以下のような特徴を備えた売買ロジックのことをいいます。

1.期待リターンに対してリスクが低い
2.毎月の期待リターンのバラツキ具合が小さい
3.最大ドローダウンが小さい
4.最適化された固定値のパラメーターが少ない

前回の記事では、自動売買BOTの勝率・期待リターンなどを計算する方法を勉強しましたが、それだけではシステムの堅牢性は評価できません。

この記事では、ドンチャン・ブレイクアウトBOTの堅牢性を評価するために資産推移のグラフを描写して、さらに最大ドローダウンを計算する方法を解説していきます。

1)グラフをpythonで描画する方法

先にpythonでグラフを描画する方法だけ解説しておきましょう。

大丈夫、めちゃくちゃ簡単です! まずは練習としてBTCの日々の終値の価格グラフを作成してみましょう。以下のように書くだけです。

import matplotlib.pyplot as plt
import pandas as pd

# 日付の配列データ(x軸)
date_list = [
	"2018/1/1",
	"2018/1/2",
	"2018/1/3",
	"2018/1/4",
	"2018/1/5",
	"2018/1/6",
	"2018/1/7",
	"2018/1/8",
	"2018/1/9",
	"2018/1/10",
	"2018/1/11",
	"2018/1/12",
	"2018/1/13",
	"2018/1/14"
]

# BTC価格(ドル)のデータ(y軸)
price_list = [
	"13657",
	"14982",
	"15201",
	"15599",
	"17429",
	"17527",
	"16477",
	"15170",
	"14595",
	"14973",
	"13405",
	"13980",
	"14360",
	"13772"
]

# 日付データに変換
date_list = pd.to_datetime(date_list)

# plot(x,y)でx軸とy軸を指定
plt.plot( date_list,price_list )
plt.xlabel("Date")
plt.ylabel("Price")

# 描写を実行
plt.show()

グラフの描画には matplotlib というライブラリを使います。

これはAnacondaを使ってpythonをインストールした方であれば、最初から入っていますので、以下の1行を先頭に書くだけで使用できます。

import matplotlib.pyplot as plt

コードの解説

まずX軸とY軸にそれぞれ描画したいデータを配列として用意します。

それぞれの要素の数が一致していないといけないので、注意してください。今回は、日付データ(date_list)と価格データ(price_list)にそれぞれ14個ずつのデータを用意しました。

次に、日付データを「テキスト型」から「日付型」に変換します。なぜ日付型に変換するのかというと、テキスト型のままだと以下のようなデータがすべてX軸に等間隔で並んでしまうからです。

例)
date = ["1/1","1/2","3/2","3/4","4/2"]

テキスト型のままだとこの5個のデータは、同じ目盛り上に等しい間隔で並んでしまいます。1/1~1/2と、1/2~3/2が、同じ間隔で並んでしまうと、後ほど資産推移の曲線などを描画するときに困りますね。

これを避けるために、以下のコードで日付型に変換しておきます。

import pandas as pd
# 日付データに変換
date_list = pd.to_datetime(date_list)

ここでは変換にpandasというライブラリを使用しています。こちらもAnacondaでPythonをインストールした方であれば、最初からインストールされています。そうでない方はpipを使ってください。

実行結果

当サイトと同じ方法で(Anacondaで)Pythonをインストールしている方なら、すでにSpyderという実行環境がインストールされています。そのため、上記のコードを実行すると、自動的に以下のようなウィンドウが立ち上がり、グラフの描画が実行されます。

グラフの描画の説明はこれだけです!

2)資産曲線を描く

まずは資産カーブを描くために、ポジションを手仕舞うたびにその時点での損益(資産推移)を記録するコードを作っておきましょう。やり方は前回までの記事と全く同じです。

データを記録する準備

資産推移のグラフを描くために必要なデータは以下の2つです。

1)各ポジションを手仕舞った時点での資産額
2)各ポジションを手仕舞った時点の日付(時間)

まずは資産額と日付を記録するために、flag変数に以下を追加しておきましょう。

"records":{
	"buy-count": 0,
	"buy-winning" : 0,
	#(中略)
	
	"date":[], # 追加
	"gross-profit":[0],# 追加
	"slippage":[],
	"log":[]
	}

総損益を記録するための配列データには、初期値として0を入れておきます。

そして手仕舞いの関数が呼ばれるたびに、その時点での日付データと総損益を append で記録しておきます。 以下のように書けばいいですね。

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	# 手仕舞った日時の記録
	flag["records"]["date"].append(data["close_time_dt"])

	if flag["position"]["side"] == "BUY":
		# 追記部分
		flag["records"]["gross-profit"].append( flag["records"]["gross-profit"][-1] + buy_profit )
	if flag["position"]["side"] == "SELL":
		# 追記部分
		flag["records"]["gross-profit"].append( flag["records"]["gross-profit"][-1] + sell_profit )

各トレードの成績を記録する関数(def records)で、ポジションを手仕舞ったときの日付データを記録しておきます。これはローソク足のデータを記録する変数(data)にある日時情報(data[“close_time_dt”])を代入するだけです。

また、総損益を記録する変数(gross-profit)の前回の値([-1])を取り出し、そこに今回の最新の損益(buy_profit/sell_profit)を足した数字を、appendで配列の末尾に追加します。

これで準備は完了です。

グラフを描画する

最後にバックテスト集計用の関数(def backtest)の中で資産カーブを描画するためのコードを追加します。

# 損益曲線をプロット
del flag["records"]["gross-profit"][0] # X軸/Y軸のデータ数を揃えるため、先頭の0を削除
date_list = pd.to_datetime( flag["records"]["date"] ) # 日付型に変換

plt.plot( date_list, flag["records"]["gross-profit"] )
plt.xlabel("Date")
plt.ylabel("Balance")
plt.xticks(rotation=50) # X軸の目盛りを50度回転
	
plt.show()

最初の例と全く同じですね。

flag[records][gross-profit]には、初期値として0円が入っており、日付データと数が一致しないため、del [0]で先頭の0円を削除しています。また、pd.to_datetime()で、日付のテキストデータを日付型に変換しています。

では実行してみましょう!

実行結果

以下は、30期間のドンチャン・ブレイクアウト(損切なし・ドテン売買)を、1時間足を使って 2017/8/9 ~ 2018/4/16 までの期間、6000足分のローソク足データを使って検証した結果です。

▽ 最終結果

▽ 資産カーブ

右肩上がりといえば右肩上がりですが、何度か大きなドローダウン(利益が大きく削られる局面)を経験していることがわかりますね。では、この売買システムの最大ドローダウンはどのくらいの大きさなのでしょうか?

3)最大ドローダウンを計算する

まずpythonで最大ドローダウンの計算を実装するにあたって、ドローダウンとは何なのか?を明確に定義しておきましょう。

自動売買BOTを自作でプログラミングする最大のメリットは、あらゆるテクニカル指標・売買ロジック・バックテストの指標などをコーディングする過程で、正確にその意味を理解できるようになることです。

私もそうですが、文系で何となく投資本を読んでいるだけだと、ドローダウンの意味1つとっても、「何となくこういうものだろう」というイメージだけで理解してしまいがちです。しかし自分でコーディングするようになると、どんな指標・ロジックでも明確な定義を考える習慣がつきます。

定義を考える習慣がつくようになると、単に概念を本で学んで覚えているだけの人に比べて、その指標の活用方法や弱点に気づきやすくなります。

1.ドローダウンの定義

基本的にはドローダウンは、「ある時点での資産額と、それ以前の資産の最大額との落差のこと」と定義できます。

例えば、以下のように資産が推移したと過程しましょう。

1月 50万円
2月 100万円
3月 200万円 ← 直近の最大額
4月 180万円
5月 150万円 ← 最大ドローダウン
6月 170万円
7月 220万円 ← 直近の最大額を更新
8月 190万円
9月 200万円
10月 170万円 ← 最大ドローダウン
11月 210万円

この場合、大きなドローダウンは2つ存在します。

1つ目は、3月のピーク200万円から5月の150万円までのドローダウン(-50万円)です。もう1つは、7月のピーク(220万円)から10月の170万円までのドローダウン(-50万円)です。

この場合、最大ドローダウンは-50万円、ということになります。%表記にすると、50/220 = 0.2272… なので約22.7%のドローダウンです。

2.Pythonコードで実装する

ではこれを実装していきましょう。

手始めに、とりあえず、いつものように記録用の変数を追加します。以下のような変数を追加しておきましょう。

"records":{
	"buy-count": 0,
	"buy-winning" : 0,
	# 追記
	"drawdown": 0,

次にポジションを閉じるたびに呼ばれる「各トレードの成績を記録する関数」に、以下の3行を追加しましょう。

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	# (中略)
	# ドローダウンの計算
	drawdown =  max(flag["records"]["gross-profit"]) - flag["records"]["gross-profit"][-1] 
	if  drawdown  > flag["records"]["drawdown"]:
		flag["records"]["drawdown"] = drawdown

ポジションを閉じるたびに、その時点における過去の資産の最大額をmax(flag[records][gross-profit])で探します。そこから、現時点での資産額(flag[records][gross-profit][-1])を引いて現在のドローダウンを計算します。

そして現在のドローダウンが過去に記録したものよりも大きければ、flag[records][drawdown] に代入して値を更新します。これにより、最終的に「最大ドローダウン」だけが残ります。

そして最後に「バックテスト集計用の関数」に以下を追記します。

# バックテストの集計用の関数
def backtest(flag):
	# (中略)
	print("最大ドローダウン :  {0}円 / {1}%".format(-1 * flag["records"]["drawdown"], -1 * round(flag["records"]["drawdown"]/max(flag["records"]["gross-profit"])*100,1)  ))

ここで最大ドローダウンの金額を表示するとともに、そのパーセンテージも計算して表示しています。ではこれを実行してみましょう。

3.実行結果

以下のようになりました。

もし過去の半年間に1時間足で30期間のドンチャンブレイクアウトを採用していたら、この部分の最大ドローダウンが約48万円(-23%)だったということですね。

※ 2月中盤~後半にかけて、資産207万円 ⇒ 159万円のドローダウン

3)もし2月から自動売買BOTの運用を開始していた場合

では、もし2月からドンチャン・ブレイクアウトの自動売買BOTの運用を開始していた場合はどうなっていたのでしょうか? これを最後に検証して、今回の記事を終えておきましょう。

以下のコードの部分を変更するだけです。

・UNIX時間変換ツール

これを実行すると、以下のようになります。

▽ バックテストの結果

▽ 資産カーブ

1枚のBTCを売買したとすると、開始早々-20万円のドローダウンに見舞われることになります。その後、+20万円まで持ち直しますが、そこから-約50万円のドローダウンを食らいます。

長期的には期待値プラスの売買ロジックでも、このように開始早々、大きなマイナスを食らったり、数カ月連続してプラス転換しない場合もあります。このようなときに、長期的な期待値を信じて我慢できるかどうか、あるいは「相場が変わったからもうこのシステムは通用しないんだ…」と諦めるべきかは、判断が難しいところです。

パラメーターの過剰最適化がされていない、バックテストのデータ数が十分である、など、堅牢性の高いシステムであればあるほど、このような局面でも期待値を信じて我慢することが可能になります。この章のはじめで、サンプル数の重要性などを説明したのはそのためです。

今回の勉強で使ったコード

最後に今回の記事で作ったpythonコードを掲載しておきましょう。


import requests
from datetime import datetime
import time
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

#-----設定項目

chart_sec = 3600    # 1時間足を使用
term = 30           # 過去n足の設定
wait = 0            # ループの待機時間
lot = 1             # BTCの注文枚数
slippage = 0.001    # 手数料・スリッページ


# CryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# 時間と高値・安値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag


# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
	
	highest = max(i["high_price"] for i in last_data)
	if data["high_price"] > highest:
		return {"side":"BUY","price":highest}
	
	lowest = min(i["low_price"] for i in last_data)
	if data["low_price"] < lowest:
		return {"side":"SELL","price":lowest}
	
	return {"side" : None , "price":0}


# ドンチャンブレイクを判定してエントリー注文を出す関数
def entry_signal( data,last_data,flag ):
	signal = donchian( data,last_data )
	if signal["side"] == "BUY":
		flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
		flag["records"]["log"].append(str(data["close_price"]) + "円で買いの指値注文を出します\n")

		# ここに買い注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)

	if signal["side"] == "SELL":
		flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
		flag["records"]["log"].append(str(data["close_price"]) + "円で売りの指値注文を出します\n")

		# ここに売り注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)

	return flag



# サーバーに出した注文が約定したか確認する関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数
def close_position( data,last_data,flag ):
	
	flag["position"]["count"] += 1
	signal = donchian( data,last_data )
	
	if flag["position"]["side"] == "BUY":
		if signal["side"] == "SELL":
			flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
			flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします\n")
			
			# ここに売り注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "SELL"
			flag["order"]["price"] = round(data["close_price"] * lot)
			

	if flag["position"]["side"] == "SELL":
		if signal["side"] == "BUY":
			flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
			flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で買いの指値注文を入れてドテンします\n")
			
			# ここに買い注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "BUY"
			flag["order"]["price"] = round(data["close_price"] * lot)
			
	return flag


# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 手仕舞った日時の記録
	flag["records"]["date"].append(data["close_time_dt"])
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["buy-count"] += 1
		flag["records"]["buy-profit"].append( buy_profit )
		flag["records"]["gross-profit"].append( flag["records"]["gross-profit"][-1] + buy_profit )
		flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
		flag["records"]["buy-holding-periods"].append( flag["position"]["count"] )
		if buy_profit  > 0:
			flag["records"]["buy-winning"] += 1
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["sell-count"] += 1
		flag["records"]["sell-profit"].append( sell_profit )
		flag["records"]["gross-profit"].append( flag["records"]["gross-profit"][-1] + sell_profit )
		flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
		flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )
		if sell_profit > 0:
			flag["records"]["sell-winning"] += 1
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	# ドローダウンの計算
	drawdown =  max(flag["records"]["gross-profit"]) - flag["records"]["gross-profit"][-1] 
	if  drawdown  > flag["records"]["drawdown"]:
		flag["records"]["drawdown"] = drawdown
	
	return flag

# バックテストの集計用の関数
def backtest(flag):
	
	buy_gross_profit = np.sum(flag["records"]["buy-profit"])
	sell_gross_profit = np.sum(flag["records"]["sell-profit"])
	
	print("バックテストの結果")
	print("--------------------------")
	print("買いエントリの成績")
	print("--------------------------")
	print("トレード回数     :  {}回".format(flag["records"]["buy-count"] ))
	print("勝率             :  {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
	print("平均リターン     :  {}%".format(round(np.average(flag["records"]["buy-return"]),2)))
	print("総損益           :  {}円".format( np.sum(flag["records"]["buy-profit"]) ))
	print("平均保有期間     :  {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) ))
	
	print("--------------------------")
	print("売りエントリの成績")
	print("--------------------------")
	print("トレード回数     :  {}回".format(flag["records"]["sell-count"] ))
	print("勝率             :  {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
	print("平均リターン     :  {}%".format(round(np.average(flag["records"]["sell-return"]),2)))
	print("総損益           :  {}円".format( np.sum(flag["records"]["sell-profit"]) ))
	print("平均保有期間     :  {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))
	
	print("--------------------------")
	print("総合の成績")
	print("--------------------------")
	print("最大ドローダウン :  {0}円 / {1}%".format(-1 * flag["records"]["drawdown"], -1 * round(flag["records"]["drawdown"]/max(flag["records"]["gross-profit"])*100,1)  ))
	print("総損益           :  {}円".format( flag["records"]["gross-profit"][-1] ))
	print("手数料合計       :  {}円".format( -1 * np.sum(flag["records"]["slippage"]) ))
	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])

	# 損益曲線をプロット
	del flag["records"]["gross-profit"][0]
	date_list = pd.to_datetime( flag["records"]["date"] )
	
	plt.plot( date_list, flag["records"]["gross-profit"] )
	plt.xlabel("Date")
	plt.ylabel("Balance")
	plt.xticks(rotation=50) # X軸の目盛りを50度回転
	
	plt.show()
	


# ここからメイン処理

# 価格チャートを取得
price = get_price(chart_sec,after=1483228800)

flag = {
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		"buy-holding-periods":[],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		"sell-holding-periods":[],
		
		"drawdown": 0,
		"date":[],
		"gross-profit":[0],
		"slippage":[],
		"log":[]
	}
}


last_data = []
i = 0
while i < len(price):

	# ドンチャンの判定に使う過去30足分の安値・高値データを準備する
	if len(last_data) < term:
		last_data.append(price[i])
		flag = log_price(price[i],flag)
		time.sleep(wait)
		i += 1
		continue
	
	data = price[i]
	flag = log_price(data,flag)
	
	
	if flag["order"]["exist"]:
		flag = check_order( flag )
	elif flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )
	else:
		flag = entry_signal( data,last_data,flag )
	
	
	# 過去データを30個に保つために先頭を削除
	del last_data[0]
	last_data.append( data )
	i += 1
	time.sleep(wait)


print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")

backtest(flag)

BTCFXのドンチャンチャネルブレイクアウトの勝率をバックテストで検証する

前回までの記事で、過去チャートのバックテストで自動売買BOTの勝率や平均リターン、総損益を検証する基本的な方法がわかったと思います。

今回の記事では、「ドンチャン・ブレイクアウト」というより実践的な売買ロジックを使って、さらに高度な自動売買BOTの評価方法を解説していきたいと思います!

ドンチャンブレイクアウトとは

ブレイクアウト手法とは、狭い値幅での揉み合い(レンジ相場)をブレイクアウトした瞬間を捕まえてエントリーし、その後のトレンドに乗って利益を出す順張り(トレンドフォロー型)の手法のことをいいます。

ブレイクアウトを捉えるための手法はさまざまなものが開発されていますが、シストレなどの自動売買BOTの世界では、以下の2つが最も定番です。

1.ドンチャン・チャネル・ブレイクアウト

過去n期間の最高値・最安値を更新したときに、その方向にエントリーする手法です。

例えば、過去20期間の最高値を更新したときに「買い」でエントリーします。次に過去20日の最安値を更新したときに決済して手仕舞いますが、このときさらに反対方向にそのままエントリー(ドテン)する場合も多いです。

このシンプルな戦略の優位性は、トレーダーの必読本である「タートル流投資の魔術」や、ジョン・ヒルの「究極のトレーディングガイド」などの本に詳しい解説があります。

非常に単純な売買ロジックなので、プログラミングしやすく、シストレや自動売買BOTの勉強にも最適です。

2.オープニングレンジ・ブレイクアウト

オープニングレンジ・ブレイクアウトも同じく「どうやったらブレイクアウトの開始点を、シンプルな数字とロジックで発見できるか?」という視点で開発された手法です。

こちらは、過去n期間の値幅の平均値(ATR)を基準値として利用し、ある足が始値から(基準値の)X%以上動いたらその方向にエントリーします。例えば、過去3期間の安値–高値の平均値が10万円でXが70%とすると、次の足で始値から7万円上に動いた時点で買いでエントリーします。

前の足が陰線か陽線かで場合分けをして、買いと売りとで別々のX%のパラメーターを使用することが多いです。

この戦略については、ラリーウィリアムズの「短期売買法」や、先ほどのジョンヒルの「究極のトレーディングガイド」などの本に詳しい解説があります。

3.戦略の使い方

どちらも多くのトレーダーが知っている手法ですが、実際には、どの時間軸でどういうパラメーターを使うか、どういうトレンド判定の条件やフィルターと組み合わて「騙し」を除去するか、どういう手仕舞いの方法を選択するか、などによって全く違う成績になります。

これらの手法は、あくまで売買ロジックを考えるときにベースのアイデアとして使いやすいというだけで、それ自体が必勝法というわけではありません。詳しい戦略のカスタマイズ方法などは、今後、解説していく予定です。

今回はバックテストの方法の解説記事なので、まずは一番シンプルな「ドンチャン・ブレイクアウト」のロジックをpythonでコーディングするところから始めます。

ドンチャンブレイクアウトは非常に単純なロジックなので、今まで勉強した知識だけでも、pythonのプログラムコードを書くことができます。早速、やっていきましょう!

ドンチャン・ブレイクアウトのpythonコード

プログラミング初心者の方は、いきなり複雑なロジックを書こうとしてはいけません。まずは必要最小限のシンプルな骨組みの部分を作り、そこから徐々に条件などを足していくのがお勧めです。

例えば、買いと売りの両方を考えて混乱するならまずは買いエントリーだけ実装しましょう。エントリーと手仕舞いの両方を考えて混乱するなら、まずは「n足後に無条件で手仕舞う」といったシンプルなロジックで実装しましょう。

ではドンチャン・ブレイクアウトの最もシンプルなロジックとは何でしょうか? 今回は以下のように定義してみます。

1.今回実装するロジック

1)過去n期間の最高値を直近の足の高値が上回ったら(その足の終値の指値で)買いエントリーする。過去n期間の最安値を直近の足の安値が下回ったら売りエントリーする。
2)買いエントリーをした場合、過去n期間の最安値を更新したら手仕舞い、さらに売りエントリーする
3)売りエントリーをした場合、過去n期間の最高値を更新したら手仕舞い、さらに買いエントリーする

これだけです!

ではこれを「第4回 BOT作成編」で勉強した内容を前提としながらコーディングしてみましょう。

2.pythonコード

以下は、Cryptowatchから過去のローソク足を取得して、20期間のドンチャンブレイクアウトで売買シグナルや手仕舞いのタイミングを確認するためのコードです。



import requests
from datetime import datetime
import time


#-----設定項目

chart_sec = 3600  # 1時間足を使用
term = 20         # 過去n期間の設定
wait = 0          # ループの待機時間



# CryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# 時間と始値・終値を表示する関数
def print_price( data ):
	print( "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) )


# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
	
	highest = max(i["high_price"] for i in last_data)
	if data["high_price"] > highest:
		return {"side":"BUY","price":highest}
	
	lowest = min(i["low_price"] for i in last_data)
	if data["low_price"] < lowest:
		return {"side":"SELL","price":lowest}
	
	return {"side" : None , "price":0}


# ドンチャンブレイクを判定してエントリー注文を出す関数
def entry_signal( data,last_data,flag ):
	signal = donchian( data,last_data )
	if signal["side"] == "BUY":
		print("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました".format(term,signal["price"],data["high_price"]))
		print(str(data["close_price"]) + "円で買いの指値注文を出します")

		# ここに買い注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"

	if signal["side"] == "SELL":
		print("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました".format(term,signal["price"],data["low_price"]))
		print(str(data["close_price"]) + "円で売りの指値注文を出します")

		# ここに売り注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"

	return flag



# サーバーに出した注文が約定したか確認する関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数
def close_position( data,last_data,flag ):
	
	flag["position"]["count"] += 1
	signal = donchian( data,last_data )
	
	if flag["position"]["side"] == "BUY":
		if signal["side"] == "SELL":
			print("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました".format(term,signal["price"],data["low_price"]))
			print("成行注文を出してポジションを決済します")
			
			# 決済の成行注文コードを入れる
			
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			print("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします")
			
			# ここに売り注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "SELL"
			
			

	if flag["position"]["side"] == "SELL":
		if signal["side"] == "BUY":
			print("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました".format(term,signal["price"],data["high_price"]))
			print("成行注文を出してポジションを決済します")
			
			# 決済の成行注文コードを入れる
			
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			print("さらに" + str(data["close_price"]) + "円で買いの指値注文を入れてドテンします")
			
			# ここに買い注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "BUY"
			
	return flag


# ------------------------------
# ここからメイン処理
# ------------------------------

price = get_price(chart_sec)
last_data = []

flag = {
	"order":{
		"exist" : False,
		"side" : "",
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"count" : 0
	}
}

i = 0
while i < len(price):

	# ドンチャンの判定に使う過去n足分の安値・高値データを準備する
	if len(last_data) < term:
		last_data.append(price[i])
		print_price(price[i])
		time.sleep(wait)
		i += 1
		continue
	
	data = price[i]
	print_price(data)
	
	
	if flag["order"]["exist"]:
		flag = check_order( flag )
	elif flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )
	else:
		flag = entry_signal( data,last_data,flag )
	
	
	# 過去データをn個ピッタリに保つために先頭を削除
	del last_data[0]
	last_data.append( data )
	i += 1
	time.sleep(wait)

基本的な仕組みは、第4回で作成した練習用のBOTと同じです。

data という変数に最新のローソク足のデータを入れて、last_data という変数に過去のローソク足のデータを入れて、それを比べることでエントリーシグナルの条件を満たしたどうかを判定しています。

1.ドンチャンブレイクアウトのデータ準備

ただしドンチャンブレイクアウトでは、過去n足(以下、20期間として説明します)の最高値をブレイクしたかどうかを判定する必要があるため、過去20期間分のローソク足データを保持する変数が必要です。

そこでメイン処理のループ文の最初に以下のような処理を入れます。

while i < len(price):
	# ドンチャンの判定に使う過去n足分の安値・高値データを準備する
	if len(last_data) < term:
		last_data.append(price[i])
		print_price(price[i])
		time.sleep(wait)
		i += 1
		continue

まず3行目の if len(last_data) < term: の部分で、last_dataに含まれているローソク足データの数が、20足分に達するまで上記の処理を実行するように指示しています。

continue は「これより下は実行せずにwhile文の先頭に戻る」という意味です。つまり、last_dataという変数に過去20足のローソク足データが保持されるまでは、この先の処理はおこなわず、この部分の処理だけをループで実行します。

いったん20足分のデータが溜まった後は、ループのたびに以下の処理を実行します。

# 過去データをn個ピッタリに保つために先頭を削除
del last_data[0]
last_data.append( data )

1行目のdel 変数名[0] で、last_data に含まれる先頭のデータを削除しています。さらに、2行目の last_data.append(data)で、最新のローソク足のデータを last_data の末尾に追加しています。

メイン処理のループのたびにこれを実行することで、last_data という変数の中身を、常に過去の新しい20期間分のローソク足データにぴったり揃えることができます。

2.ドンチャンブレイクアウトを判定する関数

# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
	
	highest = max(i["high_price"] for i in last_data)
	if data["high_price"] > highest:
		return {"side":"BUY","price":highest}
	
	lowest = min(i["low_price"] for i in last_data)
	if data["low_price"] < lowest:
		return {"side":"SELL","price":lowest}
	
	return {"side" : None , "price":0}

この関数で、ドンチアン・ブレイクアウトが発生したかどうか、どちらの方向に発生したかを判定しています。

ロングの方向にブレイクアウトが発生した場合は「BUY」シグナルを返し、ショートの方向にブレイクアウトが発生した場合は「SELL」シグナルを返します。また一緒に、ブレイクアウトされた最高値・最安値の価格を返す(return)ようにしておきます。

過去20足の最高値を更新したかどうか、を判定するロジックが以下の部分です。

highest = max(i["high_price"] for i in last_data)
if data["high_price"] > highest:
	return {"side":"BUY","price":highest}

1行目で、last_data に含まれる変数を1個ずつチェックし、["high_price"]だけを取り出した配列を作り、max()でそこから最大値を取得しています。

これはリスト内包表記というfor文を1行で書くための書き方ですが、少し難しく感じる方は、今まで勉強した知識を使って以下のように書いても構いません。

# 「リスト内包表記」で1行で書いた場合
highest = max(i["high_price"] for i in last_data)

# わかりやすく書いた場合
highest = 0
for i in last_data:
	if highest < i["high_price"]:
		highest = i["high_price"]

また、この最高値(highest)を、現在の最新のローソク足の高値(data["high_price"])と比べ、もし現在の最新のローソク足の高値の方が高ければ、ブレイクアウトの条件を満たしたことになります。

ブレイクアウトされていた場合は、その方向と価格をセットで返します。

3.エントリー注文を出す関数

def entry_signal( data,last_data,flag ):
	signal = donchian( data,last_data )
	if signal["side"] == "BUY":
		print("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました".format(term,signal["price"],data["high_price"]))
		print(str(data["close_price"]) + "円で買いの指値注文を出します")

		# ここに買い注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"

最初に1行目でさきほど作成した donchian() 関数を呼び、ドンチャンブレイクアウトが発生しているかどうかを確認します。返ってきたデータは signal という変数で受け取ります。

signal = donchian( data,last_data )

これで"BUY"が返ってくれば買いでエントリーし、"SELL"が返ってくれば売りでエントリーしています。

4.決済注文とドテン注文を出す関数

def close_position( data,last_data,flag ):
	flag["position"]["count"] += 1 # ポジションを持って何足経過してるかを計測
	signal = donchian( data,last_data )

	if flag["position"]["side"] == "BUY":
		if signal["side"] == "SELL":
			print("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました".format(term,signal["price"],data["low_price"]))
			print("成行注文を出してポジションを決済します")
			
			# 決済の成行注文コードを入れる
			
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			print("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします")
			
			# ここに売り注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "SELL"

こちらも基本的な手順は同じです。最初の1行目に signal = donchian() を実行して、ドンチャンブレイクアウトのシグナルを確認します。

もし買いポジションを持っている場合は、売り方向にブレイクアウトが発生した場合のみ、ポジションを決済します。

if flag["position"]["side"] == "BUY":
	if signal["side"] == "SELL":
		print("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました".format(term,signal["price"],data["low_price"]))
		print("成行注文を出してポジションを決済します")
		
		# 決済の成行注文コードを入れる
		
		flag["position"]["exist"] = False
		flag["position"]["count"] = 0

ポジションを手仕舞ったので、flag変数の[exist]をFalseに更新しておきます。

さらに、そのまま売り方向でのエントリーを試みます。これがドテン注文です。

print("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします")

# ここに売り注文のコードを入れる

flag["order"]["exist"] = True
flag["order"]["side"] = "SELL"

ドテンでエントリー注文を出したので、またflag変数の[order][exist]をTrueに更新しておきます。これで、次のwhile文では「注文が通ったかどうか確認する関数」が呼ばれることになります。

それでは最後に実行結果を見ておきましょう。

実行結果

練習問題

このドンチャンブレイクアウトのバックテスト(勝率の検証用)のコードを作り、以下の項目を計算してみましょう。コードの書き方は前回の記事で説明したのと全く同じです。

・トレード回数
・勝率
・平均リターン
・総損益
・平均保有期間

解答サンプルのコード

次回記事

上記の練習問題で、こちらのドンチャンブレイクアウトの勝率をバックテストで検証すると、以下のような成績になります。

これだけシンプルな売買ロジックでも、成績はそれほど悪くないように見えますね。

トレンドフォロー戦略は、一般論として勝率が低いです。トレードの相場の大半がレンジ相場なので、ブレイクアウトの方向に順張りするロジックは「騙し」に引っかかりやすく、勝率が低くなりがちです。その代わり、1度トレンドに乗れれば大きな利益を得られるため、勝率が50%を下回っても期待値は正(プラス)になる、というのが典型的な教科書の考え方です。

バックテストの注意点

しかしこのような戦略には、特有の弱点もあります。それはドローダウンが大きいことです。平均保有期間が長いため、途中で一時的に大きく資産が目減りする時期があるかもしれません。

また月別のリターンにかなり大きなバラつきがある可能性もあります。最終的な平均リターンが2%でも、例えば、その範囲がマイナス数十%までバラついている可能性もあります。

これらのリスクは、上記の成績表を見ているだけではわかりません。

そのため、次回の記事では、バックテストのコードをさらに改良して、資産カーブ(曲線グラフ)を描画したり、平均リターンのバラツキ具合をグラフにする方法を解説します!

【解答】ドンチャンチャネルブレイクアウトBOTの勝率や平均リターンを検証するコード

前回の記事「BTCFXでドンチャンブレイクアウトの勝率をバックテストで検証する」の練習問題の回答コードです。


import requests
from datetime import datetime
import time
import numpy as np


#-----設定項目

chart_sec = 3600    # 1時間足を使用
term = 30           # 過去n日の設定
wait = 0            # ループの待機時間
lot = 1             # BTCの注文枚数
slippage = 0.001    # 手数料・スリッページ


# CryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# 時間と高値・安値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag


# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
	
	highest = max(i["high_price"] for i in last_data)
	if data["high_price"] > highest:
		return {"side":"BUY","price":highest}
	
	lowest = min(i["low_price"] for i in last_data)
	if data["low_price"] < lowest:
		return {"side":"SELL","price":lowest}
	
	return {"side" : None , "price":0}


# ドンチャンブレイクを判定してエントリー注文を出す関数
def entry_signal( data,last_data,flag ):
	signal = donchian( data,last_data )
	if signal["side"] == "BUY":
		flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
		flag["records"]["log"].append(str(data["close_price"]) + "円で買いの指値注文を出します\n")

		# ここに買い注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)

	if signal["side"] == "SELL":
		flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
		flag["records"]["log"].append(str(data["close_price"]) + "円で売りの指値注文を出します\n")

		# ここに売り注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)

	return flag



# サーバーに出した注文が約定したか確認する関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数
def close_position( data,last_data,flag ):
	
	flag["position"]["count"] += 1
	signal = donchian( data,last_data )
	
	if flag["position"]["side"] == "BUY":
		if signal["side"] == "SELL":
			flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
			flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします\n")
			
			# ここに売り注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "SELL"
			flag["order"]["price"] = round(data["close_price"] * lot)
			

	if flag["position"]["side"] == "SELL":
		if signal["side"] == "BUY":
			flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
			flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で買いの指値注文を入れてドテンします\n")
			
			# ここに買い注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "BUY"
			flag["order"]["price"] = round(data["close_price"] * lot)
			
	return flag


# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["buy-count"] += 1
		flag["records"]["buy-profit"].append( buy_profit )
		flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
		flag["records"]["buy-holding-periods"].append( flag["position"]["count"] )
		if buy_profit  > 0:
			flag["records"]["buy-winning"] += 1
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["sell-count"] += 1
		flag["records"]["sell-profit"].append( sell_profit )
		flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
		flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )
		if sell_profit > 0:
			flag["records"]["sell-winning"] += 1
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag

# バックテストの集計用の関数
def backtest(flag):
	
	buy_gross_profit = np.sum(flag["records"]["buy-profit"])
	sell_gross_profit = np.sum(flag["records"]["sell-profit"])
	
	print("バックテストの結果")
	print("--------------------------")
	print("買いエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["buy-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["buy-return"]),2)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["buy-profit"]) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) ))
	
	print("--------------------------")
	print("売りエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["sell-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["sell-return"]),2)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))
	
	print("--------------------------")
	print("総合の成績")
	print("--------------------------")
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) ))
	print("手数料合計    :  {}円".format( np.sum(flag["records"]["slippage"]) ))
	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])



# ここからメイン処理

# 価格チャートを取得
price = get_price(chart_sec,after=1483228800)

flag = {
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		"buy-holding-periods":[],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		"sell-holding-periods":[],
		
		"slippage":[],
		"log":[]
	}
}


last_data = []
i = 0
while i < len(price):

	# ドンチャンの判定に使う過去30日分の安値・高値データを準備する
	if len(last_data) < term:
		last_data.append(price[i])
		flag = log_price(price[i],flag)
		time.sleep(wait)
		i += 1
		continue
	
	data = price[i]
	flag = log_price(data,flag)
	
	
	if flag["order"]["exist"]:
		flag = check_order( flag )
	elif flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )
	else:
		flag = entry_signal( data,last_data,flag )
	
	
	# 過去データを30個に保つために先頭を削除
	del last_data[0]
	last_data.append( data )
	i += 1
	time.sleep(wait)


print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")

backtest(flag)

Bitflyerの自動売買BOTのバックテストで平均保有期間を追加したコード

前回の記事「Bitflyerの自動売買BOTの勝率・平均リターン・総利益をバックテストで計算する」の最後の練習問題の答え合わせです。

解答


import requests
from datetime import datetime
import time
import json
import ccxt
import numpy as np


# ----バックテスト用の初期設定値
chart_sec = 300            # 5分足
lot = 1                    # 1トレードの枚数
slippage = 0.0005          # 手数料やスリッページ(0.05%初期値)
close_condition = 0        # エントリー後、n足が経過するまでは手仕舞わない(初期値0)


# パラーメータを指定してCryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# json形式のファイルから価格データを読み込む関数
def get_price_from_file(path):
	file = open(path,'r',encoding='utf-8')
	price = json.load(file)
	return price



# 時間と始値・終値を表示する関数
def print_price( data ):
	print( "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) )

# 時間と始値・終値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag



# 各ローソク足が陽線・陰線の基準を満たしているか確認する関数
def check_candle( data,side ):
	try:
		realbody_rate = abs(data["close_price"] - data["open_price"]) / (data["high_price"]-data["low_price"]) 
		increase_rate = data["close_price"] / data["open_price"] - 1
	except ZeroDivisionError as e:
		return False
	
	if side == "buy":
		if data["close_price"] < data["open_price"] : return False
		#elif increase_rate <  0.0003 : return False
		#elif realbody_rate < 0.5 : return False
		else : return True
		
	if side == "sell":
		if data["close_price"] > data["open_price"] : return False
		#elif increase_rate > -0.0003 : return False
		#elif realbody_rate < 0.5 : return False
		else : return True


# ローソク足が連続で上昇しているか確認する関数
def check_ascend( data,last_data ):
	if data["open_price"] > last_data["open_price"] and data["close_price"] > last_data["close_price"]:
		return True
	else:
		return False

# ローソク足が連続で下落しているか確認する関数
def check_descend( data,last_data ):
	if data["open_price"] < last_data["open_price"] and data["close_price"] < last_data["close_price"]:
		return True
	else:
		return False


# 買いシグナルが出たら指値で買い注文を出す関数
def buy_signal( data,last_data,flag ):
	if flag["buy_signal"] == 0 and check_candle( data,"buy" ):
		flag["buy_signal"] = 1

	elif flag["buy_signal"] == 1 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		flag["buy_signal"] = 2

	elif flag["buy_signal"] == 2 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		log = "3本連続で陽線 なので" + str(data["close_price"]) + "円で買い指値を入れます\n"
		flag["records"]["log"].append(log)
		flag["buy_signal"] = 3
		
		# ここにBitflyerへの買い注文コードを入れる

		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)
	
	else:
		flag["buy_signal"] = 0
	return flag


# 売りシグナルが出たら指値で売り注文を出す関数
def sell_signal( data,last_data,flag ):
	if flag["sell_signal"] == 0 and check_candle( data,"sell" ):
		flag["sell_signal"] = 1

	elif flag["sell_signal"] == 1 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		flag["sell_signal"] = 2

	elif flag["sell_signal"] == 2 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		log = "3本連続で陰線 なので" + str(data["close_price"]) + "円で売り指値を入れます\n"
		flag["records"]["log"].append(log)
		flag["sell_signal"] = 3
		
		# ここにBitflyerへの売り注文コードを入れる

		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)
		
	else:
		flag["sell_signal"] = 0
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文を出す関数
def close_position( data,last_data,flag ):
	flag["position"]["count"] += 1
	
	if flag["position"]["side"] == "BUY":
		if data["close_price"] < last_data["close_price"] and flag["position"]["count"] > close_condition:
			log = "前回の終値を下回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
			flag["records"]["log"].append(log)
			
			# 決済の成行注文コードを入れる

			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
	if flag["position"]["side"] == "SELL":
		if data["close_price"] > last_data["close_price"] and flag["position"]["count"] > close_condition:
			log = "前回の終値を上回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
			flag["records"]["log"].append(log)
			
			# 決済の成行注文コードを入れる

			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
	return flag


# サーバーに出した注文が約定したかどうかチェックする関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["buy-count"] += 1
		flag["records"]["buy-profit"].append( buy_profit )
		flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
		flag["records"]["buy-holding-periods"].append( flag["position"]["count"] )
		if buy_profit  > 0:
			flag["records"]["buy-winning"] += 1
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["sell-count"] += 1
		flag["records"]["sell-profit"].append( sell_profit )
		flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
		flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )
		if sell_profit > 0:
			flag["records"]["sell-winning"] += 1
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag


# バックテストの集計用の関数
def backtest(flag):
	
	buy_gross_profit = np.sum(flag["records"]["buy-profit"])
	sell_gross_profit = np.sum(flag["records"]["sell-profit"])
	
	print("バックテストの結果")
	print("--------------------------")
	print("買いエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["buy-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["buy-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["buy-profit"]) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) ))
	
	print("--------------------------")
	print("売りエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["sell-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["sell-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))
	
	print("--------------------------")
	print("総合の成績")
	print("--------------------------")
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) ))
	print("手数料合計    :  {}円".format( np.sum(flag["records"]["slippage"]) ))
	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])
	
# ここからメインの実行処理

# 価格チャートを取得
price = get_price(chart_sec,after=1483228800)

print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")



last_data = price[0] # 初期値となる価格データをセット


flag = {
	"buy_signal":0,
	"sell_signal":0,
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		"buy-holding-periods":[],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		"sell-holding-periods":[],
		
		"slippage":[],
		"log":[]
	}
}

i = 1
while i < len(price):
	if flag["order"]["exist"]:
		flag = check_order( flag )
	
	data = price[i]
	flag = log_price(data,flag)
	
	if flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )			
	else:
		flag = buy_signal( data,last_data,flag )
		flag = sell_signal( data,last_data,flag )
	last_data["close_time"] = data["close_time"]
	last_data["open_price"] = data["open_price"]
	last_data["close_price"] = data["close_price"]
	i+=1

backtest(flag)

解説

まずflag変数に新しく以下の2つの変数を追加しました。

"sell-holding-periods":[]
"buy-holding-periods":[]

ポジションを保有している期間は、もともと手仕舞いのための関数 def close_position() の先頭部分でカウントしています。

def close_position( data,last_data,flag ):
	flag["position"]["count"] += 1

そのため、ポジションを手仕舞うたびに、ここで記録している[count]の情報を、上記の holding-periods:[] に append していけばOKです。そのため、関数 def records() に以下の部分を追記します。

追記部分

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	(中略)
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		# ここに追記
		flag["records"]["buy-holding-periods"].append( flag["position"]["count"] )
	if flag["position"]["side"] == "SELL":
		# ここに追記
		flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )

また最後にバックテスト集計用の関数のところで、平均保有期間を計算してprintします。計算には、numpy の np.average() を使い、それを round() で小数点1桁に丸めています。

以下の部分です。

追記部分

# バックテストの集計用の関数
def backtest(flag):
	(中略)
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))

Bitflyerの自動売買BOTの勝率・平均リターン・総利益をバックテストで計算する

さて、いよいよ前の章で作成した自動売買BOTの勝率を過去データを使って検証していきましょう。過去データには、前回の記事で取得した5分足の過去6000件のデータを使います。

pythonコード

まずは先にイメージが湧きやすいように、具体的なコードと実行結果を示します。各行のコードの意味は後ほど詳しく説明するので、いきなり長いコードを読むのがしんどい方は、以下のコードは読み飛ばしてください。


import requests
from datetime import datetime
import time
import json
import ccxt
import numpy as np


# ----バックテスト用の初期設定値
chart_sec = 300            # 5分足
lot = 1                    # 1トレードの枚数
slippage = 0.0005          # 手数料やスリッページ(0.05%初期値)
close_condition = 0        # エントリー後、n足が経過するまでは手仕舞わない(初期値0)


# パラーメータを指定してCryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# json形式のファイルから価格データを読み込む関数
def get_price_from_file(path):
	file = open(path,'r',encoding='utf-8')
	price = json.load(file)
	return price



# 時間と始値・終値を表示する関数
def print_price( data ):
	print( "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) )

# 時間と始値・終値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag



# 各ローソク足が陽線・陰線の基準を満たしているか確認する関数
def check_candle( data,side ):
	try:
		realbody_rate = abs(data["close_price"] - data["open_price"]) / (data["high_price"]-data["low_price"]) 
		increase_rate = data["close_price"] / data["open_price"] - 1
	except ZeroDivisionError as e:
		return False
	
	if side == "buy":
		if data["close_price"] < data["open_price"] : return False
		#elif increase_rate <  0.0003 : return False
		#elif realbody_rate < 0.5 : return False
		else : return True
		
	if side == "sell":
		if data["close_price"] > data["open_price"] : return False
		#elif increase_rate > -0.0003 : return False
		#elif realbody_rate < 0.5 : return False
		else : return True


# ローソク足が連続で上昇しているか確認する関数
def check_ascend( data,last_data ):
	if data["open_price"] > last_data["open_price"] and data["close_price"] > last_data["close_price"]:
		return True
	else:
		return False

# ローソク足が連続で下落しているか確認する関数
def check_descend( data,last_data ):
	if data["open_price"] < last_data["open_price"] and data["close_price"] < last_data["close_price"]:
		return True
	else:
		return False


# 買いシグナルが出たら指値で買い注文を出す関数
def buy_signal( data,last_data,flag ):
	if flag["buy_signal"] == 0 and check_candle( data,"buy" ):
		flag["buy_signal"] = 1

	elif flag["buy_signal"] == 1 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		flag["buy_signal"] = 2

	elif flag["buy_signal"] == 2 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		log = "3本連続で陽線 なので" + str(data["close_price"]) + "円で買い指値を入れます\n"
		flag["records"]["log"].append(log)
		flag["buy_signal"] = 3
		
		# ここにBitflyerへの買い注文コードを入れる

		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)
	
	else:
		flag["buy_signal"] = 0
	return flag


# 売りシグナルが出たら指値で売り注文を出す関数
def sell_signal( data,last_data,flag ):
	if flag["sell_signal"] == 0 and check_candle( data,"sell" ):
		flag["sell_signal"] = 1

	elif flag["sell_signal"] == 1 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		flag["sell_signal"] = 2

	elif flag["sell_signal"] == 2 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		log = "3本連続で陰線 なので" + str(data["close_price"]) + "円で売り指値を入れます\n"
		flag["records"]["log"].append(log)
		flag["sell_signal"] = 3
		
		# ここにBitflyerへの売り注文コードを入れる

		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)
		
	else:
		flag["sell_signal"] = 0
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文を出す関数
def close_position( data,last_data,flag ):
	flag["position"]["count"] += 1
	
	if flag["position"]["side"] == "BUY":
		if data["close_price"] < last_data["close_price"] and flag["position"]["count"] > close_condition:
			log = "前回の終値を下回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
			flag["records"]["log"].append(log)
			
			# 決済の成行注文コードを入れる

			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
	if flag["position"]["side"] == "SELL":
		if data["close_price"] > last_data["close_price"] and flag["position"]["count"] > close_condition:
			log = "前回の終値を上回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
			flag["records"]["log"].append(log)
			
			# 決済の成行注文コードを入れる

			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
	return flag


# サーバーに出した注文が約定したかどうかチェックする関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["buy-count"] += 1
		flag["records"]["buy-profit"].append( buy_profit )
		flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
		if buy_profit  > 0:
			flag["records"]["buy-winning"] += 1
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["sell-count"] += 1
		flag["records"]["sell-profit"].append( sell_profit )
		flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
		if sell_profit > 0:
			flag["records"]["sell-winning"] += 1
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag


# バックテストの集計用の関数
def backtest(flag):
	
	buy_gross_profit = np.sum(flag["records"]["buy-profit"])
	sell_gross_profit = np.sum(flag["records"]["sell-profit"])
	
	print("バックテストの結果")
	print("--------------------------")
	print("買いエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["buy-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["buy-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["buy-profit"]) ))
	
	print("--------------------------")
	print("売りエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["sell-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["sell-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) ))
	
	print("--------------------------")
	print("総合の成績")
	print("--------------------------")
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) ))
	print("手数料合計    :  {}円".format( np.sum(flag["records"]["slippage"]) ))
	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])
	
# ここからメインの実行処理

# 価格チャートを取得
price = get_price(chart_sec,after=1514764800)

print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")



last_data = price[0] # 初期値となる価格データをセット


flag = {
	"buy_signal":0,
	"sell_signal":0,
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		
		"slippage":[],
		"log":[]
	}
}

i = 1
while i < len(price):
	if flag["order"]["exist"]:
		flag = check_order( flag )
	
	data = price[i]
	flag = log_price(data,flag)
	
	if flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )			
	else:
		flag = buy_signal( data,last_data,flag )
		flag = sell_signal( data,last_data,flag )
	last_data["close_time"] = data["close_time"]
	last_data["open_price"] = data["open_price"]
	last_data["close_price"] = data["close_price"]
	i+=1

backtest(flag)

先に実行結果と、このバックテストの結果を「どう解釈するか?」から見ておきましょう!

実行結果

同時に以下のようなログファイルも出力されるため、どのようなタイミング(価格)でいつエントリーして、各トレードでどの程度の利益・損失が出たのかも確認できます。

▽ 自動で取引のログファイルを出力

▽ 各トレードの価格と利益・手数料を記録

※ ここではサンプル数の問題から、実体の長さ(increase_rate)、実体率(realbody_rate)の条件は無効にしています。有効にして試したい方は、def check_candle の中のコメントアウト(#)部分を外してください。

とりあえず、このテストで作った売買BOTのロジックのままでは、勝てなさそうなことがわかりましたね(笑)

売買ロジックの考察

テストBOTのコードなので深くは考察しませんが、いくつか結果からポイントを挙げておきます。pythonのソースコードの解説だけ読みたい方は、後半まで読み飛ばしてください。

まず手数料コストの23万円が痛いですが、手数料率を0%にして再計算しても、やはり最終損益はマイナスになります。つまり売買ロジックそのものの期待値がマイナスです。

1.「売り」と「買い」をひっくり返してみる

売買ロジックの期待値がマイナスということは、エントリー条件をひっくり返してみたらどうでしょうか?

つまり3本連続の陽線を順張りの「買いシグナル」ではなく逆張りの「売りシグナル」として考え、同じように3本連続の陰線を「売りシグナル」ではなく「買いシグナル」と考えてみます。これは以下の部分を2箇所、書き換えるだけで簡単にテストしてみることができます。

これを実行すると以下のようになりました。

▽ 売買の条件を反転させた場合

勝率は少し改善したものの、買い・売りの条件を正反対にひっくり返してもやはり期待値はマイナスの結果になりました。

不思議に思う方もいるかもしれませんが、手仕舞いの条件が正反対になっていないので、このようなことは当然ありえます。どちらの場合でも期待値がマイナスということは、手仕舞いの方法にも問題があるといえそうです。

2.手仕舞いを条件を変更してみる

では、手仕舞いの条件に「エントリーした次の足では手仕舞いしない」という条件を追加してみたらどうでしょうか?

もし仮に「3本連続の陽線が買いシグナル」だとしても、4本目の足では一時的に反発して戻す可能性も考えられます。ログファイルを見ていると、エントリーした次の足ですぐに損切にかかっているケースが非常に多いですね。このような仮説から「4本目の足は手仕舞いの条件判定から除外する」というロジックを追加することも考えられそうです。

上記のコードでは、初期設定値の close_condition を使って、そのような条件を試すことができるようにしておきました。興味がある方は試してみてください。

3.パラメーターの調整の問題点

しかしこのようなパラメーターの調整にはやりすぎると問題です。片っ端から色々なパラメーターを試していけば、簡単にプラスの期待値を持つ売買ロジックを作ることができてしまうからです。例えば、以下のように。

▽ 手数料0.05%でもプラスになった売買ロジック

1)売り/買いの反転
 3本連続の陽線を売りシグナルとする(逆張り)
 3本連続の陰線を買いシグナルとする(逆張り)
2)実体の長さ、実体率の条件を有効化
 increase_rate と realbody_rateの設定
3)5分足ではなく1時間足に変更
4)close_conditionのパラメータを2に設定
 エントリー後、最低2足経過するまで手仕舞わない

上記の条件でテストをすると、2017年7月~2018年4月までのテスト期間で売り・買いともに勝率は50%を上回り、手数料を差し引いてもプラスの利益を上げることができました! これは使えるストラテジーを見つけてしまったのではないでしょうか!

...いえ、残念ながらそうとは限りません。ここでプラスの成績が出ているのは、「プラスの成績が出るまで色々な組み合わせを試した」からに過ぎません。

まずトレード数が少ない(買い16回、売り33回)のも気になりますが、一般論としていえば、過去テストのパフォーマンスを見ながらどんどん後から条件を足していく行為は、「カーブフィッティング」(過剰最適化)と呼ばれ、システムトレードではあまりやってはいけない行為とされています。

以下、理由を簡単に説明します。ただしこれから自動売買BOTを始める方は、あまり最初から頭でっかちになっても良くありません。軽く読み流す程度でも構いません。

過剰最適化とは?

例えば、「エントリー後、〇本目の足で無条件に手仕舞う」という手仕舞いのロジックを考えたとします。このルールに基づいて、「〇本目(n)の足」の部分に1、2、3、4、5...と、順番に数字を入れていき、なぜかたまたま「9」を入れたときに、めちゃくちゃ良い成績が出たとします。

この「理由はわからないけど、9という数字を使うと異様にいい成績が出る」という固定値を無理やり探す行為は、やりすぎると過去のデータに対する過剰な最適化となります。

例えば、典型的なのが移動平均線のクロスを使ったエントリーです。短期移動平均線(5/6/7/8...)と長期移動平均線(20/21/22/23...)などの組み合わせを片っ端から試し、ある特定期間で明らかに「5期間と37期間」の組み合わせでパフォーマンスが良かったとしましょう。

しかし他の期間でもそれが当てはまるとは限りません。過去データにおいて、たまたま上手く当てはまった数字(パラメーター)が、将来にも当てはまるとは限りません。

変数が2つくらいならまだ大丈夫ですが、ここにさらに「火曜日はトレードの対象から外す」「手仕舞いには8期間の経過を待つ」「損切りラインをエントリー価格より 1.7ATR 下に入れる」など、理由のよくわからない恣意的な数字を使った条件を足していくと、いくらでも(過去データにおける)良い成績を作れてしまいます。

もし恣意的な数字を選んで使う場合は、そのパラメータは「少なければ少ないほどいい」と覚えておいてください。また、テスト期間を半分に分けて、前半で調整したパラメーターを後半の期間で再テストしてみるのも有効です。

バックテスト検証コードの作り方の解説

少し話が逸れてしまいました。
ここからバックテスト検証コードの解説部分に入ります。

1.成績を管理するための変数を用意する

前回の章で自動売買BOTを作成したときに、売買シグナルの有無やポジションの状況などの情報をプログラム上で管理させるために「flag」という変数を作りました。バックテストの検証でもこの「flag」変数を以下のように改良して使います。


flag = {
	"buy_signal":0,
	"sell_signal":0,
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		
		"slippage":[],
		"log":[]
	}
}

バックテストのために追加したのは以下の部分ですね。

"records":{
	"buy-count": 0, # 買いエントリのトレード数を記録
	"buy-winning" : 0, # 勝った数を記録
	"buy-return":[], # 各トレードでのリターン(利益率)を記録
	"buy-profit": [], # 各トレードでの利益・損失額を記録
	
	"sell-count": 0, # 売りエントリのトレード数を記録
	"sell-winning" : 0, # 勝った数を記録
	"sell-return":[], # 各トレードでのリターン(利益率)を記録
	"sell-profit":[], # 各トレードでの利益・損失額を記録
	
	"slippage":[], # 各トレードで生じた手数料を記録
	"log":[] # あとでテキストファイルに出力したい内容を記録
}

基本的な考え方は、「ポジションを手仕舞うたびに上記のフラッグを更新して、各トレードの成績をflag変数に記録していく」というだけです。

バックテストといっても、そのコード内容は基本的にはリアルタイム(実践用)のコードと同じです。最初にまとめて用意したローソク足を、while文を使って1足ずつループで処理していき、売買シグナルが点灯する ⇒ 買い/売りでエントリーする ⇒ 条件を満たしたら手仕舞う の流れを忠実に再現しています。

その仕組みがわかっていれば、今まで作成していた売買BOTの「手仕舞いのための関数 def close_position()」が呼ばれたときに、一緒に flag["records"]を以下のように更新するpythonコードを作ればいい、とわかりますね。

1.トレード回数(count)を1増やす
2.勝ったか負けたか(winning)をカウントする
3.利益額(profit)を記録する
4.リターン率(return)を記録する

買い/売りの記録を分ける理由

今回は、買いエントリーと売りエントリーの勝率を、それぞれ別々に検証しておきたいので、あらかじめ全ての変数を「buy-〇〇」と「sell-〇〇」に分類しておきます。

BitflyerのBTCFXのように短い期間でしか過去データを検証できない状況の場合、そのときの局面(下落トレンドか上昇トレンドか)の影響を受けやすくなります。買いエントリーで入るか、売りエントリーで入るかによって、バイアス(勝率の偏り)が生じている可能性があるため、一緒くたに勝率を計測してしまうと、売買ロジック自体の有効性が検証しにくくなります。

また利益額を計算するためには、エントリーしたときの価格(指値)を覚えておく必要がありますね。そのために上記のflagでは、flag[position]やflag[order] に新たに [price] という項目を追加しています。

2.全体のコードの流れを把握する

まず全体のコードの中で、追加しなければならない処理を大まかにまとめておきましょう。

1)エントリー注文する関数
・エントリーしたときの指値価格を記録しておく
・エントリーした日時と価格をログ変数に入れる

2)ポジションを手仕舞いする関数
・成績を記録するための関数を呼ぶ

3)成績を記録するための関数(NEW)
・トレード回数をカウントする
・手数料(スリッページ)を計算する
・利益額を計算する
・勝ったか負けたかを記録する
・リターン(率)を計算する
・利益/損失の日時と額をログ変数に入れる

4)最終的に集計する関数(NEW)
・総利益額を計算する
・勝率(勝ち数/トレード総数)を計算する
・平均リターンを計算する
・買い/売りごとの成績を print で表示する
・ログ内容をテキストファイルで出力する

では、具体的なコードを見ていきましょう!

1)買いシグナルでエントリーする関数

log = "3本連続で陽線 なので" + str(data["close_price"]) + "円で買い指値を入れます\n"
flag["records"]["log"].append(log) # テキスト内容をログに記録
flag["buy_signal"] = 3

# ここにBitflyerへの買い注文コードを入れる

flag["order"]["exist"] = True
flag["order"]["side"] = "BUY"
flag["order"]["price"] = round(data["close_price"] * lot) # エントリー価格を計算

以下の部分を新たに変更しました。

1~2行目は、今までAnacondaプロンプトに表示していた「3本連続なので 1223023円 で買い指値を入れます」というテキスト内容を、かわりに[log]変数に記録しています。バックテストでは、プロンプトにいちいち全ての情報を出力するのは邪魔なので、最終成績以外の細かい情報はすべてログファイルの方に出力するようにします。

なお、ここで覚えておいて欲しいのが、.append()という関数です。

便利すぎる.appned()を使いこなそう!

.append()というのは、配列の要素に新しい要素を追加するときに使う関数です。例えば、以下のような配列があるとします。

crypto = ["BTC","ETH","XRP"]

ここに新しく"XEM"という要素を加えたい場合は以下のように書きます。

cyrpto.append("XEM")

これを実行すると、以下のようになります。

crypto = ["BTC","ETH","XRP","XEM"]

この.append()は、while文やfor文などのループ処理の中で、データを記録するときに、めちゃくちゃ使います。この後も何度も出てくるので、ここで覚えておきましょう!

2)手仕舞い(決済)する関数

flag["position"]["count"] += 1
if data["close_price"] < last_data["close_price"] and flag["position"]["count"] > close_condition:
	log = "前回の終値を下回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
	flag["records"]["log"].append(log)
	
	# 決済の成行注文コードを入れる

	records( flag,data )
	flag["position"]["exist"] = False
	flag["position"]["count"] = 0

最初の1行はバックテストの検証コードとは関係ない部分です。

今回の記事から「〇足経過するまでは手仕舞いをしない」という新しい手仕舞いのための条件を追加したため、手仕舞いの関数が呼ばれるたびに1ずつ増えるカウンタ(flag[position][count])を新たに用意しています。

このカウンタが、最初に設定した(close_condition)を上回らない限り、手仕舞いのための条件を満たさない、という仕組みです。

また手仕舞いの条件を満たした場合には、成行注文を出した後に、以下の「成績を記録するための関数」を呼んでいます。

3)各トレードの成績などを記録する関数

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["buy-count"] += 1
		flag["records"]["buy-profit"].append( buy_profit )
		flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
		if buy_profit  > 0:
			flag["records"]["buy-winning"] += 1
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["sell-count"] += 1
		flag["records"]["sell-profit"].append( sell_profit )
		flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
		if sell_profit > 0:
			flag["records"]["sell-winning"] += 1
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag

最初に、注文枚数(lot)に応じた手仕舞い価格を計算し、そこから手数料(スリッページコスト)を計算しています。次に、そのコストを加味した上で利益が出ているかどうかを計算しています。

基本的には1行ずつ読んでいただければ、何をやっているかは理解できると思います。

以下の項目は、利益が出ているか出ていないかに関わらず、共通で実行する項目です。

flag["records"]["buy-count"] += 1 #トレード回数の記録
flag["records"]["buy-profit"].append( buy_profit ) #利益、損失の記録
flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 )) #利益率(損失率)を計算

また勝った場合のみ、以下の処理を追加で実行して「勝った数」をカウントします。

買いエントリーであれば、手仕舞い価格がエントリー価格を上回っていれば「勝ち」です。売りエントリーであれば、エントリー価格が手仕舞い価格を上回っていれば勝ちです。この「勝ち」カウンタは最後に勝率を計算するときに使います。

flag["records"]["sell-winning"] += 1

また「手数料は〇円でした」「〇円の利益(損失)でした」というログ用のテキストを、flag[records][log] に追加しています。ここではログだけでなく、リターンや利益額など、数字の記録にもすべて append() を使っています。

4)メイン処理の最初部分

price = get_price(chart_sec,after=1514764800)

print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")

Cryptowatchから過去のローソク足のデータを取得する部分のコードです。

(after = UNIX時間) で指定された期間以降のBitflyerの価格データをCryptowatchから取得し、それがいつからいつのデータなのかをprintで表示しています。この部分のソースコードは、前回の記事で解説したものと全く同じなので、そちらを参考にしてください。

上記の get_price() の部分を、get_price_form_file() にすれば、自分で保存しておいたjsonファイルからローソク足データを読み込むこともできます。これも前回の記事で解説済です。

5)メイン処理のループ部分

i = 1
while i < len(price):
	if flag["order"]["exist"]: # 未約定の注文がないかチェック
		flag = check_order( flag )
	
	data = price[i]
	flag = log_price(data,flag) # 新しいローソク足の日時と始値・終値をログ
	
	if flag["position"]["exist"]: # ポジションを保有していれば、手仕舞いの条件をチェック
		flag = close_position( data,last_data,flag )			
	else:
		flag = buy_signal( data,last_data,flag ) # 買いエントリの条件をチェック
		flag = sell_signal( data,last_data,flag ) # 売りエントリの条件をチェック
	last_data["close_time"] = data["close_time"]
	last_data["open_price"] = data["open_price"]
	last_data["close_price"] = data["close_price"]
	i+=1

backtest(flag)

全体のメイン処理のループ部分では、特に変わった箇所はありません。
変わったのは、7行目のlog_price()くらいですね。

今までは、すべてのローソク足の「日時: 始値: 終値:」をプロンプト(黒い画面)に表示していました。しかしバックテストで全てのローソク足を表示するのは邪魔なので、ログファイルに出力するよう変更しています。

今回のコードでは、以下のようなlog_price()関数を作り、それを呼んでいます。

# 時間と始値・終値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag

またループ処理が終わった後に、一番最後に、backtest() という「最後の集計用の関数」を実行しています。では最後にこの関数の中身を見てみましょう。

6)バックテストの集計用の関数

def backtest(flag):
	
	buy_gross_profit = np.sum(flag["records"]["buy-profit"])
	sell_gross_profit = np.sum(flag["records"]["sell-profit"])
	
	print("バックテストの結果")
	print("--------------------------")
	print("買いエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["buy-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["buy-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["buy-profit"]) ))
	
	print("--------------------------")
	print("売りエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["sell-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["sell-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) ))
	
	print("--------------------------")
	print("総合の成績")
	print("--------------------------")
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) ))
	print("手数料合計    :  {}円".format( np.sum(flag["records"]["slippage"]) ))
	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])

今回、新しく登場したのが「numpy」です。

numpyを使って平均や合計を計算しよう!

これは、平均値、合計値、標準偏差、中央値、などを簡単に計算してくれる便利な計算用のライブラリです。例えば、以下のような配列の合計値や平均値を1行で計算することができます。

例)

score = [112,342,124,34,56,234,99,124,553] #各スコア

sum_score = np.sum(score) #スコアの合計
average_score = np.average(score) #スコアの平均値

np.sum() や np.average() というのは、numpyライブラリのsum関数、average関数を使います、という意味です。Bitflyerの注文を出すときに、「bitflyer.create_order()」と書いたのと同じイメージですね。

また一番最初のimport文で以下のように記載しています。

import numpy as np

このように「as np」と書いておくことで、プログラムのコード中で numpy.sum() と書く代わりに、np.sum() と書くことができるようになります。

正直、初心者にとっては、numpy.sum()と書いた方がわかりやすい気がするのですが、なぜかあらゆるプログラムでは「np.sum()」と書くのが慣例になっています。そのためそういったコードに慣れるために、ここでもそうしておきます。

round()を使って四捨五入をしよう!

またround()は四捨五入するための関数です。round( 数字 , 小数点以下の桁数 )と書くことで、指定した数字を指定した桁数(小数点以下の桁数)に丸めることができます。

例:
x = 186.44444
round(x,2)
#x = 186.44 

上記のnumpyやround関数を使って、

・勝率
・平均リターン
・総損益
・手数料の合計

を順番に計算して出力しています。
これでバックテストのコードの解説は終わりです。

まとめ

今回、新しく作成した勝率検証(バックテスト)のためのコードは、基本的に「売買シグナルの判定」「手仕舞いの条件判定」の箇所とは独立した関数です。そのため、どんな売買ロジックのpythonコードにもそのまま応用することができます。

次回からは、「ドンチャン・ブレイクアウト」という有名なブレイクアウト手法を使って、同じように勝率を検証する方法を解説します。また、平均リターンのバラツキ(標準偏差)という概念や、最大ドローダウンの計算方法、そして損益グラフを視覚的に描画(プロット)する方法などを解説していきます!

練習問題

バックテストでの検証項目に「平均保有期間」を追加してみましょう!

平均保有期間とは「各トレードで平均してどのくらいの期間、ポジションを保有していたか?」を表す指標です。一般的には、ポジションは長く保有するほどリスクが大きくなります。またエントリー機会も減ってしまうため、同じ勝率・同じリターン(期待値)なら、平均保有期間の短い売買ロジックの方が優秀です。

ヒント

上記のコードでは、flag["position"]["count"]でポジションを持っている期間をカウントしていますね。これを成績を記録する関数(def records)の中で、appendを使って配列に記録しておけば良いだけです!(そのためには、flagに記録用の新しい変数を追加する必要があります)

そして、最後にバックテスト集計用の関数(backtest)の中でnumpyを使って平均値を計算します。今回の記事の総復習になりますので是非やってみてください!

解答のコードはこちら

バックテストに必要なBitflyerの過去のローソク足の価格データを集めよう!

Bitflyerで使う自動売買BOTの勝率を検証するために、まずは十分な量の過去の価格データを入手する必要があります。取得元はCryptowatchを使います。

CryptowatchのAPIを改めて理解する

まず最初にもう1度、CryptowatchのAPIの特徴を確認しておきましょう。CryptowatchからOHLC価格(ローソク足の始値・高値・安値・終値)を取得するAPIには、以下の3つのパラメーターが設定できます。

periods ・・・ 時間軸を秒で指定
after ・・・ 〇〇(UNIX日時)以降のデータを取得
before ・・・〇〇(UNIX日時)より前のデータを取得

・CryptowatchのAPI仕様書
・UNIX時間変換ツール

今までは1分足の価格データを取得するために「periods」のパラメーターだけを 60秒(1分)に指定していました。このように時間軸だけを指定した場合、デフォルトではCryptowatchは直近500件のデータを返します。

ではパラメーターを指定するとどのようなデータが返ってくるのでしょうか? テストするために、以下のような関数を作ってみましょう。

パラメーターをテストする関数

今回は様々なパラメーターでテストができるように、新しい関数を作ります。今までとは違う新しいpythonファイルを作って以下のコードを記載してください。


import requests
from datetime import datetime
import time


def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			price.append({ "close_time" : i[0],
				"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
				"open_price" : i[1],
				"high_price" : i[2],
				"low_price" : i[3],
				"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None

# ここからメイン
price = get_price(60)

if price is not None:
	print("先頭データ : " + price[0]["close_time_dt"] + "  UNIX時間 : " + str(price[0]["close_time"]))
	print("末尾データ : " + price[-1]["close_time_dt"] + "  UNIX時間 : " + str(price[-1]["close_time"]))
	print("合計 : " + str(len(price)) + "件のローソク足データを取得")
	print("--------------------------")
	print("--------------------------")

今回作った関数では、get_price()の中で、before/afterのパラメーターも設定できるように改良しました。これを get_price.py などの名前で保存しておきます。

以下のように書けば、パラメーター無しの5分足(300秒)のデータが取得できます。

get_price(300)

以下のように書けば、UNIX時間で1521849600(2018/3/24 09:00:00)より前の5分足の価格データを取得できます。

get_price(300,before=1521849600)

以下のように書けば、UNIX時間で1521849600(2018/3/24 09:00:00)以降の1時間足の価格データを取得できます。

get_price(3600,after=1521849600)

返ってくるデータ件数と期間

まずは何もパラメーターを設定しない場合をテストしてみましょう。
以下の3つをそれぞれ実行してみます。

get_price(60)
get_price(300)
get_price(3600)

1分足の場合

先頭データ : 2018/03/29 12:56
末尾データ : 2018/03/29 21:15
500件のローソク足データを取得

およそ8時間分のデータが返ってきました。

5分足の場合

先頭データ : 2018/03/28 03:35
末尾データ : 2018/03/29 21:20
500件のローソク足データを取得

およそ1~2日分のデータが返ってきました。

1時間足の場合

先頭データ : 2018/03/09 03:00
末尾データ : 2018/03/29 22:00
500件のローソク足データを取得

およそ20日分のデータが返ってきました。

結果画面

やはりどの時間軸でも、基本的にはデフォルトで500件のデータが返ってきています。

Beforeを指定してみると…?

それでは、この先頭の日付を《before》パラメータに設定することで、ここからさらに遡って500件を取得できるのでしょうか? 試しに以下を実行してみましょう。

get_price(60,before=1522295760)

先ほど返ってきた1分足のデータの先頭の日時(2018/03/28 03:35 = UNIX時間で1522295760)を基準に、その前のデータを要求してみます。すると、以下のように空データが返ってきてしまいました。

時間を少しズラシて色々と試してみても、《before》パラメータでは、直近の500件以上のデータは取得できないようです。

Afterを指定してみると…?

それでは、afterで指定してみてはどうでしょうか? 試しにダメ元で2018年1月1日午前9時(UNIX時間で1514764800)以降の1分足データをすべて要求してみます。

get_price(60,after=1514764800)

すると、以下のように6000件のデータが返ってきます。先頭日時は3月25日です。期間にすると、直近4日分くらいの1分足データですね。

先頭データ : 2018/03/25 16:45
末尾データ : 2018/03/29 21:28
6000件のローソク足データを取得

いろいろな日時にパラメーターを変更して試してみても、《after》でパラメーターを指定する方法だと、取得できるデータ件数の上限は直近6000件までのようです。他の時間軸で試しても、以下のような結果になります。

5分足の場合

先頭データ : 2018/03/08 23:45
末尾データ : 2018/03/29 21:35
合計 : 6000件のローソク足データを取得

5分足だと6000件で約2週間分以上のデータが得られることになります。

1時間足の場合

先頭データ : 2017/07/22 15:00
末尾データ : 2018/03/29 22:00
合計 : 6000件のローソク足データを取得

1時間足だと6000件で約7~8カ月分程度のデータが得られることになります。

どの時間軸でバックテストをするにしても、大体、ローソク足6000本くらいのデータで検証すれば、とりあえずテストとしては十分だと思います。もちろんデータは多ければ多いほどいいのですが、無いものは仕方ありません。

自分でデータを集める方法

もっと広範にテストをしたい人は、自前で1分足の価格データを定期的に保存しておくといいでしょう。バックテストの検証に限らず、あらゆる統計的なテストで最も難しいのは「十分な量のデータを集めること」です。自前で十分な量の価格データを持っていれば、それだけ精度の高い検証が可能です。

取得したデータをjsonファイルに保存する

ひとまずjson形式のファイルで保存するだけなら、print()の後に以下のコードを書き足せばOKです。

import json #これだけ先頭に追加で記述

#ファイルに書き込む
file = open("./{0}-{1}-price.json".format(price[0]["close_time"],price[-1]["close_time"]),"w",encoding="utf-8")
json.dump(price,file,indent=4)

これを実行すると、以下のような形式でファイルが自動的に作成され、その中に価格データが保存されます。

・先頭ローソク足の日時-末尾ローソク足の日時-price.json
※ 日時はUNIXタイムスタンプ

中身を開くと以下のようなJSON形式で、価格データが保存されているのがわかります。

保存したjsonファイルを読み込む

逆に読み込むときは、以下のような関数を作っておけばOKです。

import json #これだけ先頭に追加で記述

#jsonファイルを読み込む関数
def get_price_from_file(path):
	file = open(path,"r",encoding="utf-8")
	price = json.load(file)
	return price

# ここからメイン
price = get_price_from_file("./1521978660-1522341240-price.json")

これで上のコードを実行すると、APIを叩いたときと全く同じ結果が得られます。つまり、WEBで取得したデータと同じように扱うことができます。

▽ 上がCryptowatchのAPIで価格データを取得しjsonで保存した場面
  下がそのjsonファイルを読み込んで価格データを取得した場面

なお、もし自分でデータを保存するのであれば、1分足のデータだけあれば十分です。1分足のデータさえあれば、そこから加工して5分足、15分足、1時間足のデータを作るのは簡単(最高値と最安値以外を切り捨てるだけ)です。

また今回の章ではいったん6000件のデータがあれば問題ありません。自前で価格データを収集して蓄積することに興味がある方は、以下の記事を参考にしてください。

・Cryptowatchで過去6000件以上のローソク足を保存して自前で価格データを蓄積する

まとめ

最後に今回使ったpythonのコードをまとめておきます。


import requests
from datetime import datetime
import time
import json

# パラーメータを指定してCryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			price.append({ "close_time" : i[0],
				"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
				"open_price" : i[1],
				"high_price" : i[2],
				"low_price" : i[3],
				"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None

# json形式のファイルから価格データを読み込む関数
def get_price_from_file(path):
	file = open(path,"r",encoding="utf-8")
	price = json.load(file)
	return price


# ここからメイン
price = get_price(60,after=1514764800)

#ファイルから読みこむ場合
#price = get_price_from_file("./ファイル名.json")


if price is not None:
	print("先頭データ : " + price[0]["close_time_dt"] + "  UNIX時間 : " + str(price[0]["close_time"]))
	print("末尾データ : " + price[-1]["close_time_dt"] + "  UNIX時間 : " + str(price[-1]["close_time"]))
	print("合計 : " + str(len(price)) + "件のローソク足データを取得")
	print("--------------------------")
	print("--------------------------")
	
	#ファイルに書き込む場合
	#file = open("./{0}-{1}-price.json".format(price[0]["close_time"],price[-1]["close_time"]),"w",encoding="utf-8")
	#json.dump(price,file,indent=4)

「#」が先頭に付いているコードは、コメントとして扱われるため実行されません。上記のファイル書き込み、読み込みを使う場合は先頭の「#」を外してください。