Bitflyerの自動売買BOTのバックテストで平均保有期間を追加したコード

前回の記事「Bitflyerの自動売買BOTの勝率・平均リターン・総利益をバックテストで計算する」の最後の練習問題の答え合わせです。

解答


import requests
from datetime import datetime
import time
import json
import ccxt
import numpy as np


# ----バックテスト用の初期設定値
chart_sec = 300            # 5分足
lot = 1                    # 1トレードの枚数
slippage = 0.0005          # 手数料やスリッページ(0.05%初期値)
close_condition = 0        # エントリー後、n足が経過するまでは手仕舞わない(初期値0)


# パラーメータを指定してCryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# json形式のファイルから価格データを読み込む関数
def get_price_from_file(path):
	file = open(path,'r',encoding='utf-8')
	price = json.load(file)
	return price



# 時間と始値・終値を表示する関数
def print_price( data ):
	print( "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) )

# 時間と始値・終値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag



# 各ローソク足が陽線・陰線の基準を満たしているか確認する関数
def check_candle( data,side ):
	try:
		realbody_rate = abs(data["close_price"] - data["open_price"]) / (data["high_price"]-data["low_price"]) 
		increase_rate = data["close_price"] / data["open_price"] - 1
	except ZeroDivisionError as e:
		return False
	
	if side == "buy":
		if data["close_price"] < data["open_price"] : return False
		#elif increase_rate <  0.0003 : return False
		#elif realbody_rate < 0.5 : return False
		else : return True
		
	if side == "sell":
		if data["close_price"] > data["open_price"] : return False
		#elif increase_rate > -0.0003 : return False
		#elif realbody_rate < 0.5 : return False
		else : return True


# ローソク足が連続で上昇しているか確認する関数
def check_ascend( data,last_data ):
	if data["open_price"] > last_data["open_price"] and data["close_price"] > last_data["close_price"]:
		return True
	else:
		return False

# ローソク足が連続で下落しているか確認する関数
def check_descend( data,last_data ):
	if data["open_price"] < last_data["open_price"] and data["close_price"] < last_data["close_price"]:
		return True
	else:
		return False


# 買いシグナルが出たら指値で買い注文を出す関数
def buy_signal( data,last_data,flag ):
	if flag["buy_signal"] == 0 and check_candle( data,"buy" ):
		flag["buy_signal"] = 1

	elif flag["buy_signal"] == 1 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		flag["buy_signal"] = 2

	elif flag["buy_signal"] == 2 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		log = "3本連続で陽線 なので" + str(data["close_price"]) + "円で買い指値を入れます\n"
		flag["records"]["log"].append(log)
		flag["buy_signal"] = 3
		
		# ここにBitflyerへの買い注文コードを入れる

		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)
	
	else:
		flag["buy_signal"] = 0
	return flag


# 売りシグナルが出たら指値で売り注文を出す関数
def sell_signal( data,last_data,flag ):
	if flag["sell_signal"] == 0 and check_candle( data,"sell" ):
		flag["sell_signal"] = 1

	elif flag["sell_signal"] == 1 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		flag["sell_signal"] = 2

	elif flag["sell_signal"] == 2 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		log = "3本連続で陰線 なので" + str(data["close_price"]) + "円で売り指値を入れます\n"
		flag["records"]["log"].append(log)
		flag["sell_signal"] = 3
		
		# ここにBitflyerへの売り注文コードを入れる

		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)
		
	else:
		flag["sell_signal"] = 0
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文を出す関数
def close_position( data,last_data,flag ):
	flag["position"]["count"] += 1
	
	if flag["position"]["side"] == "BUY":
		if data["close_price"] < last_data["close_price"] and flag["position"]["count"] > close_condition:
			log = "前回の終値を下回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
			flag["records"]["log"].append(log)
			
			# 決済の成行注文コードを入れる

			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
	if flag["position"]["side"] == "SELL":
		if data["close_price"] > last_data["close_price"] and flag["position"]["count"] > close_condition:
			log = "前回の終値を上回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
			flag["records"]["log"].append(log)
			
			# 決済の成行注文コードを入れる

			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
	return flag


# サーバーに出した注文が約定したかどうかチェックする関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["buy-count"] += 1
		flag["records"]["buy-profit"].append( buy_profit )
		flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
		flag["records"]["buy-holding-periods"].append( flag["position"]["count"] )
		if buy_profit  > 0:
			flag["records"]["buy-winning"] += 1
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["sell-count"] += 1
		flag["records"]["sell-profit"].append( sell_profit )
		flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
		flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )
		if sell_profit > 0:
			flag["records"]["sell-winning"] += 1
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag


# バックテストの集計用の関数
def backtest(flag):
	
	buy_gross_profit = np.sum(flag["records"]["buy-profit"])
	sell_gross_profit = np.sum(flag["records"]["sell-profit"])
	
	print("バックテストの結果")
	print("--------------------------")
	print("買いエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["buy-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["buy-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["buy-profit"]) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) ))
	
	print("--------------------------")
	print("売りエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["sell-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["sell-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))
	
	print("--------------------------")
	print("総合の成績")
	print("--------------------------")
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) ))
	print("手数料合計    :  {}円".format( np.sum(flag["records"]["slippage"]) ))
	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])
	
# ここからメインの実行処理

# 価格チャートを取得
price = get_price(chart_sec,after=1483228800)

print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")



last_data = price[0] # 初期値となる価格データをセット


flag = {
	"buy_signal":0,
	"sell_signal":0,
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		"buy-holding-periods":[],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		"sell-holding-periods":[],
		
		"slippage":[],
		"log":[]
	}
}

i = 1
while i < len(price):
	if flag["order"]["exist"]:
		flag = check_order( flag )
	
	data = price[i]
	flag = log_price(data,flag)
	
	if flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )			
	else:
		flag = buy_signal( data,last_data,flag )
		flag = sell_signal( data,last_data,flag )
	last_data["close_time"] = data["close_time"]
	last_data["open_price"] = data["open_price"]
	last_data["close_price"] = data["close_price"]
	i+=1

backtest(flag)

解説

まずflag変数に新しく以下の2つの変数を追加しました。

"sell-holding-periods":[]
"buy-holding-periods":[]

ポジションを保有している期間は、もともと手仕舞いのための関数 def close_position() の先頭部分でカウントしています。

def close_position( data,last_data,flag ):
	flag["position"]["count"] += 1

そのため、ポジションを手仕舞うたびに、ここで記録している[count]の情報を、上記の holding-periods:[] に append していけばOKです。そのため、関数 def records() に以下の部分を追記します。

追記部分

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	(中略)
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		# ここに追記
		flag["records"]["buy-holding-periods"].append( flag["position"]["count"] )
	if flag["position"]["side"] == "SELL":
		# ここに追記
		flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )

また最後にバックテスト集計用の関数のところで、平均保有期間を計算してprintします。計算には、numpy の np.average() を使い、それを round() で小数点1桁に丸めています。

以下の部分です。

追記部分

# バックテストの集計用の関数
def backtest(flag):
	(中略)
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))

Bitflyerの自動売買BOTの勝率・平均リターン・総利益をバックテストで計算する

さて、いよいよ前の章で作成した自動売買BOTの勝率を過去データを使って検証していきましょう。過去データには、前回の記事で取得した5分足の過去6000件のデータを使います。

pythonコード

まずは先にイメージが湧きやすいように、具体的なコードと実行結果を示します。各行のコードの意味は後ほど詳しく説明するので、いきなり長いコードを読むのがしんどい方は、以下のコードは読み飛ばしてください。


import requests
from datetime import datetime
import time
import json
import ccxt
import numpy as np


# ----バックテスト用の初期設定値
chart_sec = 300            # 5分足
lot = 1                    # 1トレードの枚数
slippage = 0.0005          # 手数料やスリッページ(0.05%初期値)
close_condition = 0        # エントリー後、n足が経過するまでは手仕舞わない(初期値0)


# パラーメータを指定してCryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# json形式のファイルから価格データを読み込む関数
def get_price_from_file(path):
	file = open(path,'r',encoding='utf-8')
	price = json.load(file)
	return price



# 時間と始値・終値を表示する関数
def print_price( data ):
	print( "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) )

# 時間と始値・終値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag



# 各ローソク足が陽線・陰線の基準を満たしているか確認する関数
def check_candle( data,side ):
	try:
		realbody_rate = abs(data["close_price"] - data["open_price"]) / (data["high_price"]-data["low_price"]) 
		increase_rate = data["close_price"] / data["open_price"] - 1
	except ZeroDivisionError as e:
		return False
	
	if side == "buy":
		if data["close_price"] < data["open_price"] : return False
		#elif increase_rate <  0.0003 : return False
		#elif realbody_rate < 0.5 : return False
		else : return True
		
	if side == "sell":
		if data["close_price"] > data["open_price"] : return False
		#elif increase_rate > -0.0003 : return False
		#elif realbody_rate < 0.5 : return False
		else : return True


# ローソク足が連続で上昇しているか確認する関数
def check_ascend( data,last_data ):
	if data["open_price"] > last_data["open_price"] and data["close_price"] > last_data["close_price"]:
		return True
	else:
		return False

# ローソク足が連続で下落しているか確認する関数
def check_descend( data,last_data ):
	if data["open_price"] < last_data["open_price"] and data["close_price"] < last_data["close_price"]:
		return True
	else:
		return False


# 買いシグナルが出たら指値で買い注文を出す関数
def buy_signal( data,last_data,flag ):
	if flag["buy_signal"] == 0 and check_candle( data,"buy" ):
		flag["buy_signal"] = 1

	elif flag["buy_signal"] == 1 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		flag["buy_signal"] = 2

	elif flag["buy_signal"] == 2 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		log = "3本連続で陽線 なので" + str(data["close_price"]) + "円で買い指値を入れます\n"
		flag["records"]["log"].append(log)
		flag["buy_signal"] = 3
		
		# ここにBitflyerへの買い注文コードを入れる

		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)
	
	else:
		flag["buy_signal"] = 0
	return flag


# 売りシグナルが出たら指値で売り注文を出す関数
def sell_signal( data,last_data,flag ):
	if flag["sell_signal"] == 0 and check_candle( data,"sell" ):
		flag["sell_signal"] = 1

	elif flag["sell_signal"] == 1 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		flag["sell_signal"] = 2

	elif flag["sell_signal"] == 2 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		log = "3本連続で陰線 なので" + str(data["close_price"]) + "円で売り指値を入れます\n"
		flag["records"]["log"].append(log)
		flag["sell_signal"] = 3
		
		# ここにBitflyerへの売り注文コードを入れる

		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)
		
	else:
		flag["sell_signal"] = 0
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文を出す関数
def close_position( data,last_data,flag ):
	flag["position"]["count"] += 1
	
	if flag["position"]["side"] == "BUY":
		if data["close_price"] < last_data["close_price"] and flag["position"]["count"] > close_condition:
			log = "前回の終値を下回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
			flag["records"]["log"].append(log)
			
			# 決済の成行注文コードを入れる

			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
	if flag["position"]["side"] == "SELL":
		if data["close_price"] > last_data["close_price"] and flag["position"]["count"] > close_condition:
			log = "前回の終値を上回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
			flag["records"]["log"].append(log)
			
			# 決済の成行注文コードを入れる

			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
	return flag


# サーバーに出した注文が約定したかどうかチェックする関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["buy-count"] += 1
		flag["records"]["buy-profit"].append( buy_profit )
		flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
		if buy_profit  > 0:
			flag["records"]["buy-winning"] += 1
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["sell-count"] += 1
		flag["records"]["sell-profit"].append( sell_profit )
		flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
		if sell_profit > 0:
			flag["records"]["sell-winning"] += 1
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag


# バックテストの集計用の関数
def backtest(flag):
	
	buy_gross_profit = np.sum(flag["records"]["buy-profit"])
	sell_gross_profit = np.sum(flag["records"]["sell-profit"])
	
	print("バックテストの結果")
	print("--------------------------")
	print("買いエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["buy-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["buy-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["buy-profit"]) ))
	
	print("--------------------------")
	print("売りエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["sell-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["sell-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) ))
	
	print("--------------------------")
	print("総合の成績")
	print("--------------------------")
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) ))
	print("手数料合計    :  {}円".format( np.sum(flag["records"]["slippage"]) ))
	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])
	
# ここからメインの実行処理

# 価格チャートを取得
price = get_price(chart_sec,after=1514764800)

print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")



last_data = price[0] # 初期値となる価格データをセット


flag = {
	"buy_signal":0,
	"sell_signal":0,
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		
		"slippage":[],
		"log":[]
	}
}

i = 1
while i < len(price):
	if flag["order"]["exist"]:
		flag = check_order( flag )
	
	data = price[i]
	flag = log_price(data,flag)
	
	if flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )			
	else:
		flag = buy_signal( data,last_data,flag )
		flag = sell_signal( data,last_data,flag )
	last_data["close_time"] = data["close_time"]
	last_data["open_price"] = data["open_price"]
	last_data["close_price"] = data["close_price"]
	i+=1

backtest(flag)

先に実行結果と、このバックテストの結果を「どう解釈するか?」から見ておきましょう!

実行結果

同時に以下のようなログファイルも出力されるため、どのようなタイミング(価格)でいつエントリーして、各トレードでどの程度の利益・損失が出たのかも確認できます。

▽ 自動で取引のログファイルを出力

▽ 各トレードの価格と利益・手数料を記録

※ ここではサンプル数の問題から、実体の長さ(increase_rate)、実体率(realbody_rate)の条件は無効にしています。有効にして試したい方は、def check_candle の中のコメントアウト(#)部分を外してください。

とりあえず、このテストで作った売買BOTのロジックのままでは、勝てなさそうなことがわかりましたね(笑)

売買ロジックの考察

テストBOTのコードなので深くは考察しませんが、いくつか結果からポイントを挙げておきます。pythonのソースコードの解説だけ読みたい方は、後半まで読み飛ばしてください。

まず手数料コストの23万円が痛いですが、手数料率を0%にして再計算しても、やはり最終損益はマイナスになります。つまり売買ロジックそのものの期待値がマイナスです。

1.「売り」と「買い」をひっくり返してみる

売買ロジックの期待値がマイナスということは、エントリー条件をひっくり返してみたらどうでしょうか?

つまり3本連続の陽線を順張りの「買いシグナル」ではなく逆張りの「売りシグナル」として考え、同じように3本連続の陰線を「売りシグナル」ではなく「買いシグナル」と考えてみます。これは以下の部分を2箇所、書き換えるだけで簡単にテストしてみることができます。

これを実行すると以下のようになりました。

▽ 売買の条件を反転させた場合

勝率は少し改善したものの、買い・売りの条件を正反対にひっくり返してもやはり期待値はマイナスの結果になりました。

不思議に思う方もいるかもしれませんが、手仕舞いの条件が正反対になっていないので、このようなことは当然ありえます。どちらの場合でも期待値がマイナスということは、手仕舞いの方法にも問題があるといえそうです。

2.手仕舞いを条件を変更してみる

では、手仕舞いの条件に「エントリーした次の足では手仕舞いしない」という条件を追加してみたらどうでしょうか?

もし仮に「3本連続の陽線が買いシグナル」だとしても、4本目の足では一時的に反発して戻す可能性も考えられます。ログファイルを見ていると、エントリーした次の足ですぐに損切にかかっているケースが非常に多いですね。このような仮説から「4本目の足は手仕舞いの条件判定から除外する」というロジックを追加することも考えられそうです。

上記のコードでは、初期設定値の close_condition を使って、そのような条件を試すことができるようにしておきました。興味がある方は試してみてください。

3.パラメーターの調整の問題点

しかしこのようなパラメーターの調整にはやりすぎると問題です。片っ端から色々なパラメーターを試していけば、簡単にプラスの期待値を持つ売買ロジックを作ることができてしまうからです。例えば、以下のように。

▽ 手数料0.05%でもプラスになった売買ロジック

1)売り/買いの反転
 3本連続の陽線を売りシグナルとする(逆張り)
 3本連続の陰線を買いシグナルとする(逆張り)
2)実体の長さ、実体率の条件を有効化
 increase_rate と realbody_rateの設定
3)5分足ではなく1時間足に変更
4)close_conditionのパラメータを2に設定
 エントリー後、最低2足経過するまで手仕舞わない

上記の条件でテストをすると、2017年7月~2018年4月までのテスト期間で売り・買いともに勝率は50%を上回り、手数料を差し引いてもプラスの利益を上げることができました! これは使えるストラテジーを見つけてしまったのではないでしょうか!

...いえ、残念ながらそうとは限りません。ここでプラスの成績が出ているのは、「プラスの成績が出るまで色々な組み合わせを試した」からに過ぎません。

まずトレード数が少ない(買い16回、売り33回)のも気になりますが、一般論としていえば、過去テストのパフォーマンスを見ながらどんどん後から条件を足していく行為は、「カーブフィッティング」(過剰最適化)と呼ばれ、システムトレードではあまりやってはいけない行為とされています。

以下、理由を簡単に説明します。ただしこれから自動売買BOTを始める方は、あまり最初から頭でっかちになっても良くありません。軽く読み流す程度でも構いません。

過剰最適化とは?

例えば、「エントリー後、〇本目の足で無条件に手仕舞う」という手仕舞いのロジックを考えたとします。このルールに基づいて、「〇本目(n)の足」の部分に1、2、3、4、5...と、順番に数字を入れていき、なぜかたまたま「9」を入れたときに、めちゃくちゃ良い成績が出たとします。

この「理由はわからないけど、9という数字を使うと異様にいい成績が出る」という固定値を無理やり探す行為は、やりすぎると過去のデータに対する過剰な最適化となります。

例えば、典型的なのが移動平均線のクロスを使ったエントリーです。短期移動平均線(5/6/7/8...)と長期移動平均線(20/21/22/23...)などの組み合わせを片っ端から試し、ある特定期間で明らかに「5期間と37期間」の組み合わせでパフォーマンスが良かったとしましょう。

しかし他の期間でもそれが当てはまるとは限りません。過去データにおいて、たまたま上手く当てはまった数字(パラメーター)が、将来にも当てはまるとは限りません。

変数が2つくらいならまだ大丈夫ですが、ここにさらに「火曜日はトレードの対象から外す」「手仕舞いには8期間の経過を待つ」「損切りラインをエントリー価格より 1.7ATR 下に入れる」など、理由のよくわからない恣意的な数字を使った条件を足していくと、いくらでも(過去データにおける)良い成績を作れてしまいます。

もし恣意的な数字を選んで使う場合は、そのパラメータは「少なければ少ないほどいい」と覚えておいてください。また、テスト期間を半分に分けて、前半で調整したパラメーターを後半の期間で再テストしてみるのも有効です。

バックテスト検証コードの作り方の解説

少し話が逸れてしまいました。
ここからバックテスト検証コードの解説部分に入ります。

1.成績を管理するための変数を用意する

前回の章で自動売買BOTを作成したときに、売買シグナルの有無やポジションの状況などの情報をプログラム上で管理させるために「flag」という変数を作りました。バックテストの検証でもこの「flag」変数を以下のように改良して使います。


flag = {
	"buy_signal":0,
	"sell_signal":0,
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		
		"slippage":[],
		"log":[]
	}
}

バックテストのために追加したのは以下の部分ですね。

"records":{
	"buy-count": 0, # 買いエントリのトレード数を記録
	"buy-winning" : 0, # 勝った数を記録
	"buy-return":[], # 各トレードでのリターン(利益率)を記録
	"buy-profit": [], # 各トレードでの利益・損失額を記録
	
	"sell-count": 0, # 売りエントリのトレード数を記録
	"sell-winning" : 0, # 勝った数を記録
	"sell-return":[], # 各トレードでのリターン(利益率)を記録
	"sell-profit":[], # 各トレードでの利益・損失額を記録
	
	"slippage":[], # 各トレードで生じた手数料を記録
	"log":[] # あとでテキストファイルに出力したい内容を記録
}

基本的な考え方は、「ポジションを手仕舞うたびに上記のフラッグを更新して、各トレードの成績をflag変数に記録していく」というだけです。

バックテストといっても、そのコード内容は基本的にはリアルタイム(実践用)のコードと同じです。最初にまとめて用意したローソク足を、while文を使って1足ずつループで処理していき、売買シグナルが点灯する ⇒ 買い/売りでエントリーする ⇒ 条件を満たしたら手仕舞う の流れを忠実に再現しています。

その仕組みがわかっていれば、今まで作成していた売買BOTの「手仕舞いのための関数 def close_position()」が呼ばれたときに、一緒に flag["records"]を以下のように更新するpythonコードを作ればいい、とわかりますね。

1.トレード回数(count)を1増やす
2.勝ったか負けたか(winning)をカウントする
3.利益額(profit)を記録する
4.リターン率(return)を記録する

買い/売りの記録を分ける理由

今回は、買いエントリーと売りエントリーの勝率を、それぞれ別々に検証しておきたいので、あらかじめ全ての変数を「buy-〇〇」と「sell-〇〇」に分類しておきます。

BitflyerのBTCFXのように短い期間でしか過去データを検証できない状況の場合、そのときの局面(下落トレンドか上昇トレンドか)の影響を受けやすくなります。買いエントリーで入るか、売りエントリーで入るかによって、バイアス(勝率の偏り)が生じている可能性があるため、一緒くたに勝率を計測してしまうと、売買ロジック自体の有効性が検証しにくくなります。

また利益額を計算するためには、エントリーしたときの価格(指値)を覚えておく必要がありますね。そのために上記のflagでは、flag[position]やflag[order] に新たに [price] という項目を追加しています。

2.全体のコードの流れを把握する

まず全体のコードの中で、追加しなければならない処理を大まかにまとめておきましょう。

1)エントリー注文する関数
・エントリーしたときの指値価格を記録しておく
・エントリーした日時と価格をログ変数に入れる

2)ポジションを手仕舞いする関数
・成績を記録するための関数を呼ぶ

3)成績を記録するための関数(NEW)
・トレード回数をカウントする
・手数料(スリッページ)を計算する
・利益額を計算する
・勝ったか負けたかを記録する
・リターン(率)を計算する
・利益/損失の日時と額をログ変数に入れる

4)最終的に集計する関数(NEW)
・総利益額を計算する
・勝率(勝ち数/トレード総数)を計算する
・平均リターンを計算する
・買い/売りごとの成績を print で表示する
・ログ内容をテキストファイルで出力する

では、具体的なコードを見ていきましょう!

1)買いシグナルでエントリーする関数

log = "3本連続で陽線 なので" + str(data["close_price"]) + "円で買い指値を入れます\n"
flag["records"]["log"].append(log) # テキスト内容をログに記録
flag["buy_signal"] = 3

# ここにBitflyerへの買い注文コードを入れる

flag["order"]["exist"] = True
flag["order"]["side"] = "BUY"
flag["order"]["price"] = round(data["close_price"] * lot) # エントリー価格を計算

以下の部分を新たに変更しました。

1~2行目は、今までAnacondaプロンプトに表示していた「3本連続なので 1223023円 で買い指値を入れます」というテキスト内容を、かわりに[log]変数に記録しています。バックテストでは、プロンプトにいちいち全ての情報を出力するのは邪魔なので、最終成績以外の細かい情報はすべてログファイルの方に出力するようにします。

なお、ここで覚えておいて欲しいのが、.append()という関数です。

便利すぎる.appned()を使いこなそう!

.append()というのは、配列の要素に新しい要素を追加するときに使う関数です。例えば、以下のような配列があるとします。

crypto = ["BTC","ETH","XRP"]

ここに新しく"XEM"という要素を加えたい場合は以下のように書きます。

cyrpto.append("XEM")

これを実行すると、以下のようになります。

crypto = ["BTC","ETH","XRP","XEM"]

この.append()は、while文やfor文などのループ処理の中で、データを記録するときに、めちゃくちゃ使います。この後も何度も出てくるので、ここで覚えておきましょう!

2)手仕舞い(決済)する関数

flag["position"]["count"] += 1
if data["close_price"] < last_data["close_price"] and flag["position"]["count"] > close_condition:
	log = "前回の終値を下回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
	flag["records"]["log"].append(log)
	
	# 決済の成行注文コードを入れる

	records( flag,data )
	flag["position"]["exist"] = False
	flag["position"]["count"] = 0

最初の1行はバックテストの検証コードとは関係ない部分です。

今回の記事から「〇足経過するまでは手仕舞いをしない」という新しい手仕舞いのための条件を追加したため、手仕舞いの関数が呼ばれるたびに1ずつ増えるカウンタ(flag[position][count])を新たに用意しています。

このカウンタが、最初に設定した(close_condition)を上回らない限り、手仕舞いのための条件を満たさない、という仕組みです。

また手仕舞いの条件を満たした場合には、成行注文を出した後に、以下の「成績を記録するための関数」を呼んでいます。

3)各トレードの成績などを記録する関数

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["buy-count"] += 1
		flag["records"]["buy-profit"].append( buy_profit )
		flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
		if buy_profit  > 0:
			flag["records"]["buy-winning"] += 1
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["sell-count"] += 1
		flag["records"]["sell-profit"].append( sell_profit )
		flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
		if sell_profit > 0:
			flag["records"]["sell-winning"] += 1
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag

最初に、注文枚数(lot)に応じた手仕舞い価格を計算し、そこから手数料(スリッページコスト)を計算しています。次に、そのコストを加味した上で利益が出ているかどうかを計算しています。

基本的には1行ずつ読んでいただければ、何をやっているかは理解できると思います。

以下の項目は、利益が出ているか出ていないかに関わらず、共通で実行する項目です。

flag["records"]["buy-count"] += 1 #トレード回数の記録
flag["records"]["buy-profit"].append( buy_profit ) #利益、損失の記録
flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 )) #利益率(損失率)を計算

また勝った場合のみ、以下の処理を追加で実行して「勝った数」をカウントします。

買いエントリーであれば、手仕舞い価格がエントリー価格を上回っていれば「勝ち」です。売りエントリーであれば、エントリー価格が手仕舞い価格を上回っていれば勝ちです。この「勝ち」カウンタは最後に勝率を計算するときに使います。

flag["records"]["sell-winning"] += 1

また「手数料は〇円でした」「〇円の利益(損失)でした」というログ用のテキストを、flag[records][log] に追加しています。ここではログだけでなく、リターンや利益額など、数字の記録にもすべて append() を使っています。

4)メイン処理の最初部分

price = get_price(chart_sec,after=1514764800)

print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")

Cryptowatchから過去のローソク足のデータを取得する部分のコードです。

(after = UNIX時間) で指定された期間以降のBitflyerの価格データをCryptowatchから取得し、それがいつからいつのデータなのかをprintで表示しています。この部分のソースコードは、前回の記事で解説したものと全く同じなので、そちらを参考にしてください。

上記の get_price() の部分を、get_price_form_file() にすれば、自分で保存しておいたjsonファイルからローソク足データを読み込むこともできます。これも前回の記事で解説済です。

5)メイン処理のループ部分

i = 1
while i < len(price):
	if flag["order"]["exist"]: # 未約定の注文がないかチェック
		flag = check_order( flag )
	
	data = price[i]
	flag = log_price(data,flag) # 新しいローソク足の日時と始値・終値をログ
	
	if flag["position"]["exist"]: # ポジションを保有していれば、手仕舞いの条件をチェック
		flag = close_position( data,last_data,flag )			
	else:
		flag = buy_signal( data,last_data,flag ) # 買いエントリの条件をチェック
		flag = sell_signal( data,last_data,flag ) # 売りエントリの条件をチェック
	last_data["close_time"] = data["close_time"]
	last_data["open_price"] = data["open_price"]
	last_data["close_price"] = data["close_price"]
	i+=1

backtest(flag)

全体のメイン処理のループ部分では、特に変わった箇所はありません。
変わったのは、7行目のlog_price()くらいですね。

今までは、すべてのローソク足の「日時: 始値: 終値:」をプロンプト(黒い画面)に表示していました。しかしバックテストで全てのローソク足を表示するのは邪魔なので、ログファイルに出力するよう変更しています。

今回のコードでは、以下のようなlog_price()関数を作り、それを呼んでいます。

# 時間と始値・終値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag

またループ処理が終わった後に、一番最後に、backtest() という「最後の集計用の関数」を実行しています。では最後にこの関数の中身を見てみましょう。

6)バックテストの集計用の関数

def backtest(flag):
	
	buy_gross_profit = np.sum(flag["records"]["buy-profit"])
	sell_gross_profit = np.sum(flag["records"]["sell-profit"])
	
	print("バックテストの結果")
	print("--------------------------")
	print("買いエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["buy-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["buy-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["buy-profit"]) ))
	
	print("--------------------------")
	print("売りエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["sell-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["sell-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) ))
	
	print("--------------------------")
	print("総合の成績")
	print("--------------------------")
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) ))
	print("手数料合計    :  {}円".format( np.sum(flag["records"]["slippage"]) ))
	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])

今回、新しく登場したのが「numpy」です。

numpyを使って平均や合計を計算しよう!

これは、平均値、合計値、標準偏差、中央値、などを簡単に計算してくれる便利な計算用のライブラリです。例えば、以下のような配列の合計値や平均値を1行で計算することができます。

例)

score = [112,342,124,34,56,234,99,124,553] #各スコア

sum_score = np.sum(score) #スコアの合計
average_score = np.average(score) #スコアの平均値

np.sum() や np.average() というのは、numpyライブラリのsum関数、average関数を使います、という意味です。Bitflyerの注文を出すときに、「bitflyer.create_order()」と書いたのと同じイメージですね。

また一番最初のimport文で以下のように記載しています。

import numpy as np

このように「as np」と書いておくことで、プログラムのコード中で numpy.sum() と書く代わりに、np.sum() と書くことができるようになります。

正直、初心者にとっては、numpy.sum()と書いた方がわかりやすい気がするのですが、なぜかあらゆるプログラムでは「np.sum()」と書くのが慣例になっています。そのためそういったコードに慣れるために、ここでもそうしておきます。

round()を使って四捨五入をしよう!

またround()は四捨五入するための関数です。round( 数字 , 小数点以下の桁数 )と書くことで、指定した数字を指定した桁数(小数点以下の桁数)に丸めることができます。

例:
x = 186.44444
round(x,2)
#x = 186.44 

上記のnumpyやround関数を使って、

・勝率
・平均リターン
・総損益
・手数料の合計

を順番に計算して出力しています。
これでバックテストのコードの解説は終わりです。

まとめ

今回、新しく作成した勝率検証(バックテスト)のためのコードは、基本的に「売買シグナルの判定」「手仕舞いの条件判定」の箇所とは独立した関数です。そのため、どんな売買ロジックのpythonコードにもそのまま応用することができます。

次回からは、「ドンチャン・ブレイクアウト」という有名なブレイクアウト手法を使って、同じように勝率を検証する方法を解説します。また、平均リターンのバラツキ(標準偏差)という概念や、最大ドローダウンの計算方法、そして損益グラフを視覚的に描画(プロット)する方法などを解説していきます!

練習問題

バックテストでの検証項目に「平均保有期間」を追加してみましょう!

平均保有期間とは「各トレードで平均してどのくらいの期間、ポジションを保有していたか?」を表す指標です。一般的には、ポジションは長く保有するほどリスクが大きくなります。またエントリー機会も減ってしまうため、同じ勝率・同じリターン(期待値)なら、平均保有期間の短い売買ロジックの方が優秀です。

ヒント

上記のコードでは、flag["position"]["count"]でポジションを持っている期間をカウントしていますね。これを成績を記録する関数(def records)の中で、appendを使って配列に記録しておけば良いだけです!(そのためには、flagに記録用の新しい変数を追加する必要があります)

そして、最後にバックテスト集計用の関数(backtest)の中でnumpyを使って平均値を計算します。今回の記事の総復習になりますので是非やってみてください!

解答のコードはこちら

バックテストに必要なBitflyerの過去のローソク足の価格データを集めよう!

Bitflyerで使う自動売買BOTの勝率を検証するために、まずは十分な量の過去の価格データを入手する必要があります。取得元はCryptowatchを使います。

CryptowatchのAPIを改めて理解する

まず最初にもう1度、CryptowatchのAPIの特徴を確認しておきましょう。CryptowatchからOHLC価格(ローソク足の始値・高値・安値・終値)を取得するAPIには、以下の3つのパラメーターが設定できます。

periods ・・・ 時間軸を秒で指定
after ・・・ 〇〇(UNIX日時)以降のデータを取得
before ・・・〇〇(UNIX日時)より前のデータを取得

・CryptowatchのAPI仕様書
・UNIX時間変換ツール

今までは1分足の価格データを取得するために「periods」のパラメーターだけを 60秒(1分)に指定していました。このように時間軸だけを指定した場合、デフォルトではCryptowatchは直近500件のデータを返します。

ではパラメーターを指定するとどのようなデータが返ってくるのでしょうか? テストするために、以下のような関数を作ってみましょう。

パラメーターをテストする関数

今回は様々なパラメーターでテストができるように、新しい関数を作ります。今までとは違う新しいpythonファイルを作って以下のコードを記載してください。


import requests
from datetime import datetime
import time


def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			price.append({ "close_time" : i[0],
				"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
				"open_price" : i[1],
				"high_price" : i[2],
				"low_price" : i[3],
				"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None

# ここからメイン
price = get_price(60)

if price is not None:
	print("先頭データ : " + price[0]["close_time_dt"] + "  UNIX時間 : " + str(price[0]["close_time"]))
	print("末尾データ : " + price[-1]["close_time_dt"] + "  UNIX時間 : " + str(price[-1]["close_time"]))
	print("合計 : " + str(len(price)) + "件のローソク足データを取得")
	print("--------------------------")
	print("--------------------------")

今回作った関数では、get_price()の中で、before/afterのパラメーターも設定できるように改良しました。これを get_price.py などの名前で保存しておきます。

以下のように書けば、パラメーター無しの5分足(300秒)のデータが取得できます。

get_price(300)

以下のように書けば、UNIX時間で1521849600(2018/3/24 09:00:00)より前の5分足の価格データを取得できます。

get_price(300,before=1521849600)

以下のように書けば、UNIX時間で1521849600(2018/3/24 09:00:00)以降の1時間足の価格データを取得できます。

get_price(3600,after=1521849600)

返ってくるデータ件数と期間

まずは何もパラメーターを設定しない場合をテストしてみましょう。
以下の3つをそれぞれ実行してみます。

get_price(60)
get_price(300)
get_price(3600)

1分足の場合

先頭データ : 2018/03/29 12:56
末尾データ : 2018/03/29 21:15
500件のローソク足データを取得

およそ8時間分のデータが返ってきました。

5分足の場合

先頭データ : 2018/03/28 03:35
末尾データ : 2018/03/29 21:20
500件のローソク足データを取得

およそ1~2日分のデータが返ってきました。

1時間足の場合

先頭データ : 2018/03/09 03:00
末尾データ : 2018/03/29 22:00
500件のローソク足データを取得

およそ20日分のデータが返ってきました。

結果画面

やはりどの時間軸でも、基本的にはデフォルトで500件のデータが返ってきています。

Beforeを指定してみると…?

それでは、この先頭の日付を《before》パラメータに設定することで、ここからさらに遡って500件を取得できるのでしょうか? 試しに以下を実行してみましょう。

get_price(60,before=1522295760)

先ほど返ってきた1分足のデータの先頭の日時(2018/03/28 03:35 = UNIX時間で1522295760)を基準に、その前のデータを要求してみます。すると、以下のように空データが返ってきてしまいました。

時間を少しズラシて色々と試してみても、《before》パラメータでは、直近の500件以上のデータは取得できないようです。

Afterを指定してみると…?

それでは、afterで指定してみてはどうでしょうか? 試しにダメ元で2018年1月1日午前9時(UNIX時間で1514764800)以降の1分足データをすべて要求してみます。

get_price(60,after=1514764800)

すると、以下のように6000件のデータが返ってきます。先頭日時は3月25日です。期間にすると、直近4日分くらいの1分足データですね。

先頭データ : 2018/03/25 16:45
末尾データ : 2018/03/29 21:28
6000件のローソク足データを取得

いろいろな日時にパラメーターを変更して試してみても、《after》でパラメーターを指定する方法だと、取得できるデータ件数の上限は直近6000件までのようです。他の時間軸で試しても、以下のような結果になります。

5分足の場合

先頭データ : 2018/03/08 23:45
末尾データ : 2018/03/29 21:35
合計 : 6000件のローソク足データを取得

5分足だと6000件で約2週間分以上のデータが得られることになります。

1時間足の場合

先頭データ : 2017/07/22 15:00
末尾データ : 2018/03/29 22:00
合計 : 6000件のローソク足データを取得

1時間足だと6000件で約7~8カ月分程度のデータが得られることになります。

どの時間軸でバックテストをするにしても、大体、ローソク足6000本くらいのデータで検証すれば、とりあえずテストとしては十分だと思います。もちろんデータは多ければ多いほどいいのですが、無いものは仕方ありません。

自分でデータを集める方法

もっと広範にテストをしたい人は、自前で1分足の価格データを定期的に保存しておくといいでしょう。バックテストの検証に限らず、あらゆる統計的なテストで最も難しいのは「十分な量のデータを集めること」です。自前で十分な量の価格データを持っていれば、それだけ精度の高い検証が可能です。

取得したデータをjsonファイルに保存する

ひとまずjson形式のファイルで保存するだけなら、print()の後に以下のコードを書き足せばOKです。

import json #これだけ先頭に追加で記述

#ファイルに書き込む
file = open("./{0}-{1}-price.json".format(price[0]["close_time"],price[-1]["close_time"]),"w",encoding="utf-8")
json.dump(price,file,indent=4)

これを実行すると、以下のような形式でファイルが自動的に作成され、その中に価格データが保存されます。

・先頭ローソク足の日時-末尾ローソク足の日時-price.json
※ 日時はUNIXタイムスタンプ

中身を開くと以下のようなJSON形式で、価格データが保存されているのがわかります。

保存したjsonファイルを読み込む

逆に読み込むときは、以下のような関数を作っておけばOKです。

import json #これだけ先頭に追加で記述

#jsonファイルを読み込む関数
def get_price_from_file(path):
	file = open(path,"r",encoding="utf-8")
	price = json.load(file)
	return price

# ここからメイン
price = get_price_from_file("./1521978660-1522341240-price.json")

これで上のコードを実行すると、APIを叩いたときと全く同じ結果が得られます。つまり、WEBで取得したデータと同じように扱うことができます。

▽ 上がCryptowatchのAPIで価格データを取得しjsonで保存した場面
  下がそのjsonファイルを読み込んで価格データを取得した場面

なお、もし自分でデータを保存するのであれば、1分足のデータだけあれば十分です。1分足のデータさえあれば、そこから加工して5分足、15分足、1時間足のデータを作るのは簡単(最高値と最安値以外を切り捨てるだけ)です。

また今回の章ではいったん6000件のデータがあれば問題ありません。自前で価格データを収集して蓄積することに興味がある方は、以下の記事を参考にしてください。

・Cryptowatchで過去6000件以上のローソク足を保存して自前で価格データを蓄積する

まとめ

最後に今回使ったpythonのコードをまとめておきます。


import requests
from datetime import datetime
import time
import json

# パラーメータを指定してCryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			price.append({ "close_time" : i[0],
				"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
				"open_price" : i[1],
				"high_price" : i[2],
				"low_price" : i[3],
				"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None

# json形式のファイルから価格データを読み込む関数
def get_price_from_file(path):
	file = open(path,"r",encoding="utf-8")
	price = json.load(file)
	return price


# ここからメイン
price = get_price(60,after=1514764800)

#ファイルから読みこむ場合
#price = get_price_from_file("./ファイル名.json")


if price is not None:
	print("先頭データ : " + price[0]["close_time_dt"] + "  UNIX時間 : " + str(price[0]["close_time"]))
	print("末尾データ : " + price[-1]["close_time_dt"] + "  UNIX時間 : " + str(price[-1]["close_time"]))
	print("合計 : " + str(len(price)) + "件のローソク足データを取得")
	print("--------------------------")
	print("--------------------------")
	
	#ファイルに書き込む場合
	#file = open("./{0}-{1}-price.json".format(price[0]["close_time"],price[-1]["close_time"]),"w",encoding="utf-8")
	#json.dump(price,file,indent=4)

「#」が先頭に付いているコードは、コメントとして扱われるため実行されません。上記のファイル書き込み、読み込みを使う場合は先頭の「#」を外してください。

BTCFXで作成した自動売買BOTの勝率をバックテストで検証してみよう

さて、前回の章までをお読みいただいた方であれば、ひとまず、Bitflyerで自動的に稼働し続ける売買BOTを作ることができたと思います。

当然、次にあなたが疑問に思うのは、「….で、実際にこのBOTはどのくらい勝てるの?」という点でしょう(笑)。今回の章では、過去の価格データから自作した売買ロジックの勝率を検証するための方法を解説していきます。

勝率についての考え方

「すごいぞ! 勝率80%で1回のトレードあたりの平均リターンが20%の売買ロジックができた!」とはしゃいでみても、その売買シグナルが年に1回した点灯しないようだと、実用上は意味がありません。

まずは、バックテストで勝率を検証する際に、考慮すべき指標を5つだけ簡単に説明しておきます。

1.トレードの回数

過去のデータから勝率を検証する場合、統計的には、最低でも50回以上(できれば100回以上)のトレード数が必要です。

トレード回数が少なすぎると、その勝率や平均リターン率が「たまたま偶然良かっただけなのか?」、それとも「継続的に勝てる再現性のあるロジックなのか?」判別できません。

一般論としていえば、トレードの売買ロジックは、エントリー条件を厳しくすればするほど期待できる勝率やリターンは高くなります。しかしその分、エントリーできる機会が少なくなります。

またBitflyerFXの過去の価格データは無限に手に入るわけではありません。例えば、Cryptowatchから価格データを取得する場合、私の知る限り、1分足のデータは最大6000件までしか入手できません。あまりエントリー条件を厳しくすると、十分な回数のテストができなくなります。

2.勝率

勝率というのは、1回のトレードで利益が出る確率です。例えば、100回トレードをしてそのうち36回利益が出た場合、その売買ロジックの勝率は36%となります。

勝率は1つの重要な指標です。しかし勝率だけでは売買ロジックが優秀かどうかを判定することはできません。勝率36%でも、勝ったときの平均利益が9%、負けたときの平均損失が2%なら、その売買ロジックの期待リターンは+(プラス)です。

(例)1万円を賭けたときの期待値
勝ち 1万円 × 9% × 36% = 324円
負け 1万円 × -2% × 64% = -128円
期待値 196円

ただし同じ期待リターンなら、勝率が高いほうが安心感は上です。勝率が高いと(負けが連続する確率が低くなるので)損益収支グラフを見たときに綺麗に右肩上がりで利益が積み上がりやすくなります。

3.平均リターン

平均リターンというのは、1回のトレードで張った金額に対して平均して何%のリターンがあるか、の指標です。言うまでもなく最も重要な指標の1つです。

ここまで紹介した3つの指標「トレード数」「勝率」「平均リターン」はどれも同じくらい重要です。同じ勝率でも、平均リターン10%の売買ロジックで月に5回トレードするよりも、平均リターン3%で月に100回トレードする方が、利益の絶対額は大きくなります。

儲かった利益をそのまま次回のトレードに投入するスタイル(複利運用)の場合には、同じ期待利回りならトレード回数が多いほど利益も大きくなります。また前述のように、トレード回数が多いほうが期待したリターンに収束しやすくなります。

4.総利益額

テスト期間の最初と最後を比較して、結局、この自動売買BOTによって「軍資金は何倍になったのか?」という指標です。

この総利益額という数字には、前述の「トレード数」「勝率」「平均リターン」がすべて考慮されています。結局のところ、私たちが最終的に最大化したい数字はこの「総利益額」なので、この中では最も大事な指標になります。

5.最大ドローダウン

最大ドローダウンというのは、テスト期間中に最大で「何%まで軍資金を減らしたか?」、つまり「どれだけ苦しい時期を乗り越えたか?」という指標です。

過去のテスト検証の結果、最終的な平均リターンや総利益額がプラスだとしても、途中で70%も軍資金が目減りするような場面があったとしたら、その売買ロジックを実用で試すのは非常に怖いはずです。

最大ドローダウンが大きいということは、テストする期間を変えて検証したら最終リターンがマイナスに終わった可能性もあった、ということです。

実際の世界の運用では(テスト検証と違って)どこで終わり、という期間はありません。リアルタイムでBOTを運用していて、軍資金が70%も目減りしてしまったら、平常心でBOTの稼働を続けられる人はほとんどいないでしょう。そのため、最大ドローダウンは30%以内におさえるのが理想とされています。

自動売買BOTに特有の注意点

次に、Bitflyerで自動売買BOTを動かすときに特有の注意点を上げておきます。これらはバックテストでの検証結果と、実際の世界でのBOT稼働の成績が一致しなくなる原因になるため、テストの時点で考慮しておく必要があります。

注文の遅延と成行注文のスリップ

Bitflyerでは頻繁にAPI注文の遅延がおこります。注文の遅延というのは、サーバーに注文を送った後、それが認識される(板に並ぶ)までの間に数十秒~1分以上のラグ(遅延)が生じることをいいます。

また成行注文のスリップとは、リアルタイムの意図した価格で注文が約定しないことをいいます。例えば、以下の試験運用では、終値のシグナルで82万7666円のときに手仕舞いの成行注文を出していますが、実際には82万7426円で約定しています。

▽ 本当はこの価格で手仕舞いたい

▽ 実際のスリッページの例

ここにBitflyerのサーバーエラーや遅延が重なると、どんどん理想の手仕舞い価格と実際の約定価格が乖離していき、テスト通りのパフォーマンスを上げることが難しくなります。

バックテストでは、シグナルが点灯したローソク足の終値(または次の足の始値)で約定したもの、と仮定してテストしますが、実際にはその価格通りに約定することはほとんどなく、不利な方向にズレて約定することが多いです。

対策

対策はいくつかあります。

1)そもそもテスト段階でスリップによる損失を考慮する
2)実際の運用で1分足は使わない。15分足や1時間足で動かす
3)エントリー注文は指値にして、刺さらなければ諦める

解説

例えば、テスト検証のときに、全てのトレードから一律に―0.05~0.1%程度のスリップを考慮して、あらかじめ手数料として差し引いておくと、実際の運用パフォーマンスとの乖離が少なくなります。

また当然、スキャルなどの利幅の小さいトレードほど、APIの遅延やサーバーエラーによるスリップの損失を受けやすくなります。今までの章では、練習のしやすさから全て1分足で勉強してきましたが、実際のテストや運用では、なるべく時間軸は5分以上の足で考えます。

エントリーだけ指値にする

こちらは各々の戦略次第ですが、エントリー注文は指値にして「刺さらなければ諦める」という選択肢もあります。

利確や損切などの手仕舞いのための注文を諦めることはできませんが、エントリー注文に関しては「希望する価格で執行されないなら見送る」ということが、戦略上は可能です。(もちろん機会損失になる可能性はあります)

エントリー注文を指値にすれば、エントリーも手仕舞いも成行注文をする場合に比べて、テスト検証とのズレを半分に抑えることができます。

今回の章で検証する売買ロジック

さて、では実際にテスト検証の方法を解説していきましょう!

今回の章では、前回までの記事で作った自動売買BOTが「実際のところどの程度の勝率なのか?」気になる方もいる(かもしれない)ので、前回のBOTをそのまま題材にします!

・前回作成した自動売買BOTのロジック
・実際にBitflyerで動くBOTコード

バックテストの検証では、以下のpythonコードを前提の教材とし、こちらをカスタマイズしていきます。

検証に使う元のpythonコード

このコードは以前にこちらの記事でテスト用として紹介したソースコード(Bitflyerへの実際の発注処理を行わないコード)を、売り・買いの両方のシグナルに対応させただけのものです。以下のコードの意味は改めて解説しないので、わからない部分がある方は、前の章などを軽く復習しておいてください!


import requests
from datetime import datetime
import time


# 最初に1度だけCryptowatchから価格データ取得
response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params = { "periods" : 60 })

# 指定されたローソク足の価格データ(OHLC)を返す関数
def get_price(min,i):
	data = response.json()
	return { "close_time" : data["result"][str(min)][i][0],
		"open_price" : data["result"][str(min)][i][1],
		"high_price" : data["result"][str(min)][i][2],
		"low_price" : data["result"][str(min)][i][3],
		"close_price": data["result"][str(min)][i][4] }


# 時間と始値・終値を表示する関数
def print_price( data ):
	print( "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) )


# 各ローソク足が陽線・陰線の基準を満たしているか確認する関数
def check_candle( data,side ):
	realbody_rate = abs(data["close_price"] - data["open_price"]) / (data["high_price"]-data["low_price"]) 
	increase_rate = data["close_price"] / data["open_price"] - 1
	
	if side == "buy":
		if data["close_price"] < data["open_price"] : return False
		elif increase_rate < 0.0003 : return False
		elif realbody_rate < 0.5 : return False
		else : return True
		
	if side == "sell":
		if data["close_price"] > data["open_price"] : return False
		elif increase_rate > -0.0003 : return False
		elif realbody_rate < 0.5 : return False
		else : return True


# ローソク足が連続で上昇しているか確認する関数
def check_ascend( data,last_data ):
	if data["open_price"] > last_data["open_price"] and data["close_price"] > last_data["close_price"]:
		return True
	else:
		return False

# ローソク足が連続で下落しているか確認する関数
def check_descend( data,last_data ):
	if data["open_price"] < last_data["open_price"] and data["close_price"] < last_data["close_price"]:
		return True
	else:
		return False


# 買いシグナルが出たら指値で買い注文を出す関数
def buy_signal( data,last_data,flag ):
	if flag["buy_signal"] == 0 and check_candle( data,"buy" ):
		flag["buy_signal"] = 1

	elif flag["buy_signal"] == 1 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		flag["buy_signal"] = 2

	elif flag["buy_signal"] == 2 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		print("3本連続で陽線 なので" + str(data["close_price"]) + "で買い指値を入れます")
		flag["buy_signal"] = 3
		
		# ここに買い指値注文のコードを入れる
		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
	
	else:
		flag["buy_signal"] = 0
	return flag


# 売りシグナルが出たら指値で売り注文を出す関数
def sell_signal( data,last_data,flag ):
	if flag["sell_signal"] == 0 and check_candle( data,"sell" ):
		flag["sell_signal"] = 1

	elif flag["sell_signal"] == 1 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		flag["sell_signal"] = 2

	elif flag["sell_signal"] == 2 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		print("3本連続で陰線 なので" + str(data["close_price"]) + "で売り指値を入れます")
		flag["sell_signal"] = 3
		
		# ここに売り指値注文のコードを入れる
		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		
	else:
		flag["sell_signal"] = 0
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文を出す関数
def close_position( data,last_data,flag ):
	if flag["position"]["side"] == "BUY":
		if data["close_price"] < last_data["close_price"]:
			print("前回の終値を下回ったので" + str(data["close_price"]) + "あたりで成行で決済します")
			# 決済の成行注文のコードを入れる
			flag["position"]["exist"] = False
			
	if flag["position"]["side"] == "SELL":
		if data["close_price"] > last_data["close_price"]:
			print("前回の終値を上回ったので" + str(data["close_price"]) + "あたりで成行で決済します")
			# 決済の成行注文のコードを入れる
			flag["position"]["exist"] = False
	return flag


# サーバーに出した注文が約定したかどうかチェックする関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	return flag


# ここからメイン処理の部分
# まず初期値の取得
last_data = get_price(60,0)
print_price( last_data )


# 注文管理用のフラッグを準備
flag = {
	"buy_signal":0,
	"sell_signal":0,
	"order":{
		"exist" : False,
		"side" : "",
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : ""
	}
}

# 500回ループ処理
i = 1
while i < 500:
	if flag["order"]["exist"]:
		flag = check_order( flag )
	
	data = get_price(60,i)
	print_price( data )
	
	if flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )			
	else:
		flag = buy_signal( data,last_data,flag )
		flag = sell_signal( data,last_data,flag )
	last_data["close_time"] = data["close_time"]
	last_data["open_price"] = data["open_price"]
	last_data["close_price"] = data["close_price"]
	i+=1
	

実行結果

ちなみに、この時点で実行すると以下のようになります。
カンの良い方はわかると思いますが、もう実はテストのベースはできています。

ですが、既にこの記事を読んで勉強していただいた方なら1つ気付いたことがあるはずです。

そう!検証するためには、明らかにデータ数・トレード数ともに全然足りないですね。こちらはCryptowatchで取得できる直近500件の1分足データを参照していますが、500件のデータでは5回ほどしかトレードが成立していません。最低でも10倍程度の量の価格データが必要そうです。

次回の記事では、まず十分な量の元データ(ローソク足の価格データ)を取得する方法から解説していきます。

次回:バックテストに必要なBitflyerFXの価格データを集めよう!

条件を緩和してテスト回数を増やす方法

※ ちなみにどうしてもデータが手に入らない場合は、エントリー条件を緩和してテストする方法があります。例えば、上記のロジックだと、実体の長さやヒゲの割合などの追加条件を外すだけで、同じ500件分のデータから合計52回のトレード機会を得られます。

▽ 実体の長さ・ヒゲの割合の条件あり

▽ 実体の長さ・ヒゲの割合の条件なし

基本的に、無料でデータ(API)を提供して貰っている私たちの立場からすれば、データ数が少ないことに文句は言えません。現実的に得られるデータの範囲で有益な売買ロジックを考えるしかありません。この辺りのトレードオフの考え方も、今後説明していきますね!