必威体育Betway必威体育官网
当前位置:首页 > IT技术

种子发布和bt文件分发系统

时间:2019-10-10 01:13:38来源:IT技术作者:seo实验室小编阅读:60次「手机版」
 

种子发布系统

基于github开源项目Taipei-Torrent

btmaster

主要监听61111端口,可以制作种子和发送种子文件,开启tracker,开启原始下载BT

btslave

向btmaster报告状态,下载种子文件,根据种子下载bt分发文件

btmaster源码:

package main

import (
	"crypto/md5"
	"encoding/base64"
	"encoding/hex"
	"encoding/json"
	"fmt"
	"github.com/jackpal/Taipei-Torrent/torrent"
	"github.com/jackpal/Taipei-Torrent/tracker"
	"golang.org/x/net/proxy"
	"io"
	"io/ioutil"
	"log"
	"math"
	"net"
	"os"
	"os/signal"
	"path"
	"protocol"
)

//包括tracker,bt和种子及命令下发
//1. 制作文件种子
//2. 启动tracker
//3. 启动上传bt
//4. 向slave下发种子
//5. 向slave下发开始下载命令
//6. 统计slave下载完成情况
//7. 终止bt下载任务

//制作文件种子
func Make_Torrent(Filename string, Tracker string) (string, string, string) {

	torrentFileN := Filename + ".torrent"
	createTorrent := Filename
	createTracker := Tracker
	torrentFile, err := os.OpenFile(torrentFileN, os.O_RDWR|os.O_CREATE, 0)
	if err != nil {
		log.Fatalln("failed to open file!", err)
	}
	defer torrentFile.Close()
	wterr := torrent.WriteMetaInfoBytes(createTorrent, createTracker, torrentFile)
	if wterr != nil {
		log.Fatal("Could not create torrent file:", wterr)
	}
	//计算torrent文件base64编码
	torrentbytes, _ := ioutil.ReadFile(torrentFileN)
	torrentbase64 := base64.StdEncoding.EncodeToString(torrentbytes)
	//计算torrent文件md5
	filemd5, _ := GetFileMd5(torrentFileN)
	return torrentFileN, torrentbase64, filemd5

}

//文件md5值计算
func GetFileMd5(filename string) (string, ERROR) {
	file, err := os.Open(filename)
	if err != nil {
		fmt.println("os Open error")
		return "", err
	}
	md5 := md5.New()
	_, err = io.Copy(md5, file)
	if err != nil {
		fmt.Println("io copy error")
		return "", err
	}
	md5Str := hex.EncodeToString(md5.Sum(nil))
	return md5Str, nil
}

//启动tracker
func Start_Tracker(addr string, torrentFiles []string) (err error) {
	t := tracker.NewTracker()
	// TODO(jackpal) Allow caller to choose port number
	t.Addr = addr
	dial := proxy.Fromenvironment()
	for _, torrentFile := range torrentFiles {
		var metaInfo *torrent.MetaInfo
		metaInfo, err = torrent.GetMetaInfo(dial, torrentFile)
		if err != nil {
			return
		}
		name := metaInfo.Info.Name
		if name == "" {
			name = path.Base(torrentFile)
		}
		err = t.Register(metaInfo.InfoHash, name)
		if err != nil {
			return
		}
	}
	go func() {
		quitChan := listenSigInt()
		select {
		case <-quitChan:
			log.Printf("got control-C")
			t.Quit()
		}
	}()

	err = t.ListenAndServe()
	if err != nil {
		return
	}
	return
}

//启动上传bt
var (
	cpuprofile    = "" //If not empty, collects CPU profile samples and writes the profile to the given file before the program exits
	memprofile    = "" //If not empty, writes memory heap allocations to the given file before the program exits
	createTorrent = "" //If not empty, creates a torrent file from the given root. Writes to stdout
	createTracker = "" //Creates a tracker serving the given torrent file on the given address. example --createTracker=:8080 to serve on port 8080.

	port                = 7777        //Port to listen on. 0 means pick random port. Note that 6881 is blacklisted by some trackers.
	fileDir             = "."         //path to directory where files are stored
	seedRatio           = math.Inf(0) //Seed until ratio >= this value before quitting.
	useDeadlockDetector = false       //Panic and print stack dumps when the program is stuck.
	useLPD              = false       //Use local Peer Discovery
	useUPnP             = false       //Use UPnP to open port in firewall.
	useNATPMP           = false       //Use NAT-PMP to open port in firewall.
	gateway             = ""          //IP Address of gateway.
	useDHT              = false       //Use DHT to get peers.
	trackerlessMode     = false       //Do not get peers from the tracker. Good for testing DHT mode.
	proxyAddress        = ""          //Address of a SOCKS5 proxy to use.
	initialCheck        = true        //Do an initial hash check on files when adding torrents.
	useSFTP             = ""          //SFTP connection string, to store torrents over SFTP. e.g. 'username:password@192.168.1.25:22/path/'
	useRamcache         = 0           //Size in MiB of cache in ram, to reduce traffic on torrent storage.
	useHdCache          = 0           //Size in MiB of cache in OS temp directory, to reduce traffic on torrent storage.
	execOnSeeding       = ""          //command to execute when torrent has fully downloaded and has begun seeding.
	quickResume         = false       //Save torrenting data to resume faster. '-initialCheck' should be set to false, to prevent hash check on resume.
	maxActive           = 16          //How many torrents should be active at a time. Torrents added beyond this value are queued.
	memoryPerTorrent    = -1          //Maximum memory (in MiB) per torrent used for Active Pieces. 0 means Minimum. -1 (default) means unlimited.
	torrentFiles        []string
)

func parseTorrentFlags() (flags *torrent.TorrentFlags, err error) {
	dialer := proxy.FromEnvironment()

	flags = &torrent.TorrentFlags{
		Dial:                dialer,
		Port:                port,
		FileDir:             fileDir,
		SeedRatio:           seedRatio,
		UseDeadlockDetector: useDeadlockDetector,
		UseLPD:              useLPD,
		UseDHT:              useDHT,
		UseUPnP:             useUPnP,
		UseNATPMP:           useNATPMP,
		TrackerlessMode:     trackerlessMode,
		// IP address of gateway
		Gateway:            gateway,
		InitialCheck:       initialCheck,
		FileSystemProvider: torrent.OsFsProvider{},
		Cacher:             nil,
		ExecOnSeeding:      execOnSeeding,
		QuickResume:        quickResume,
		MaxActive:          maxActive,
		MemoryPerTorrent:   memoryPerTorrent,
	}
	return
}

func Start_BT(torrentFile string) {
	torrentFiles = []string{torrentFile}

	torrentFlags, err := parseTorrentFlags()
	if err != nil {
		log.Fatal("Could not parse flags:", err)
	}

	log.Println("Starting.")

	err = torrent.RunTorrents(torrentFlags, torrentFiles)
	if err != nil {
		log.Fatal("Could not run torrents", err)
	}
}

//向slave下发种子
func Distribute_Torrent() {

}

//向slave下发开始下载命令
func Start_Btslave() {

}

//统计slave下载完成情况
func Mission_Status() {

}

//终止bt下载任务
func Stop_Btslave() {

}

//功能函数
//判断文件是否存在
func Pathexists(path string) (bool, error) {
	_, err := os.Stat(path)
	if err == nil {
		return true, nil
	}
	if os.IsNotExist(err) {
		return false, nil
	}
	return false, err
}

//手动中断
func listenSigInt() chan os.Signal {
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, os.Kill)
	return c
}

//socket通信函数
//下发任务
func handleConnection_SendMission(conn net.Conn, mission string) {

	//sendmission := "{\"Mission\":\"" + mission + "\", \"TorrentFile\":\"" + torrentfile + "\", \"Torrentcontent\":\"" + torrentcontent + "\", \"TorrentMD5\": \"" + torrentMD5 + "\"}"
	//startbt := "{\"Mission\":\"StartBT\", \"TorrentFile\":\"test.torrent\", \"TorrentContent\":\"\"}"
	//stopbt := "{\"Mission\":\"StopBT\", \"TorrentFile\":\"test.torrent\", \"TorrentContent\":\"\"}"
	conn.Write(protocol.Enpack([]byte(mission)))
	//conn.Write(protocol.Enpack([]byte(startbt)))
	//conn.Write(protocol.Enpack([]byte(stopbt)))
	Log(mission)
	Log("Mission sent.")
	//defer conn.Close()
}

func handleConnection_getStatus(conn net.Conn) {

	// 缓冲区,存储被截断的数据
	tmpBuffer := make([]byte, 0)

	//接收解包
	readerChannel := make(chan []byte, 16)
	go reader(readerChannel, conn)

	buffer := make([]byte, 1024)
	for {
		n, err := conn.Read(buffer)
		if err != nil {
			if err == io.EOF {
				Log("Client disconnected.")
			} else {
				Log(conn.RemoteAddr().String(), " connection error: ", err)
			}
			return
		}

		tmpBuffer = protocol.Depack(APPend(tmpBuffer, buffer[:n]...), readerChannel)
	}
	//defer conn.Close()

}

//读取channel中的消息并作出相应的操作和回应
func reader(readerChannel chan []byte, conn net.Conn) {
	for {
		select {
		case data := <-readerChannel:
			var dat map[string]interface{}
			if err := json.Unmarshal([]byte(string(data)), &dat); err == nil {
				if dat["Mission"].(string) == "DownloadTorrent" {
					status := dat["Status"].(string)
					Log("DownloadTorrent Mission Status: " + status)
					torrentfile := "Rise.of.the.Tomb.Raider.CN.HD.part01.rar.torrent"
					if status == "OK" {
					//	mission := "{\"Mission\":\"StartBT\", \"TorrentFile\":\"" + dat["TorrentFile"].(string) + "\", \"TorrentContent\":\"\", \"TorrentMD5\": \"\"}"
					//	Log(mission)
						handleConnection_SendMission(conn, "{\"Mission\":\"StartBT\", \"TorrentFile\":\"" + torrentfile + "\", \"TorrentContent\":\"\", \"TorrentMD5\": \"\"}")
					}
				}
				if dat["Mission"].(string) == "StartBT" {
					status := dat["Status"].(string)
					if status == "OK" {
						Log(conn.RemoteAddr().String(), "BT started.")
					}
				}
				if dat["Mission"].(string) == "StopBT" {

				}
			} else {
				fmt.Println(err)
			}
		}
	}
}

func Log(v ...interface{}) {
	log.Println(v...)
}

func CheckError(err error) {
	if err != nil {
		fmt.fprintf(os.Stderr, "Fatal error: %s", err.Error())
		os.Exit(1)
	}
}

//main函数
func main() {

	//json mission结构体
	//type Mission struct {
	//	mission			string
	//	torrentfile		string
	//	torrentcontent	string
	//	torrentMD5		string
	//}

	//建立socket,监听端口
	netListen, err := net.Listen("tcp", "0.0.0.0:61111")
	CheckError(err)
	defer netListen.Close()

	Log("Waiting for clients")

	tracker := "127.0.0.1:6969"
	dir := ""
	file := dir + "Rise.of.the.Tomb.Raider.CN.HD.part01.rar"
	torrent := file + ".torrent"
	hastorrent, _ := PathExists(torrent)
	if !hastorrent {
		torrentfile, torrentcontent, torrentMD5 := Make_Torrent(file, tracker)
		//启动tracker
		go Start_Tracker(tracker, []string{torrentfile})
		//启动上传bt
		go Start_BT(torrentfile)
		//向btslave发送命令和文件
		for {
			conn, err := netListen.Accept()
			if err != nil {
				continue
			}

			Log(conn.RemoteAddr().String(), " tcp connect success")

			mission := "{\"Mission\":\"DownloadTorrent\", \"TorrentFile\":\"" + torrentfile + "\", \"TorrentContent\":\"" + torrentcontent + "\", \"TorrentMD5\": \"" + torrentMD5 + "\"}"
			go handleConnection_SendMission(conn, mission)
			go handleConnection_getStatus(conn)
			//go handleConnection_getStatus(conn)
			//if newmission {
			//	maketorrent
			//	gettorrentmd5
			//	base64torrent
			//	starttracker
			//	startsourcebt
			//	sendtorrenttoslave
			//	sendcommandtoslavetostartbt
			//}
			//if missionfinished {
			//	sendcommand(stopslavebt)
			//	stopsourcebt
			//	stoptracker
			//	reportsuccess
			//}
		}
	} else {
		os.Remove(torrent)
		torrentfile, torrentcontent, torrentMD5 := Make_Torrent(file, tracker)
		//启动tracker
		go Start_Tracker(tracker, []string{torrentfile})
		//启动上传bt
		go Start_BT(torrentfile)
		//向btslave发送命令和文件
		for {
			conn, err := netListen.Accept()
			if err != nil {
				continue
			}

			Log(conn.RemoteAddr().String(), " tcp connect success")

			mission := "{\"Mission\":\"DownloadTorrent\", \"TorrentFile\":\"" + torrentfile + "\", \"TorrentContent\":\"" + torrentcontent + "\", \"TorrentMD5\": \"" + torrentMD5 + "\"}"
			go handleConnection_SendMission(conn, mission)
			go handleConnection_getStatus(conn)
		}
	}
}



//ToDO:
//如何stopbt任务
//如何判断何时终止bt任务



btslave源码

package main

import (
	"encoding/json"
	"fmt"
	"github.com/jackpal/Taipei-Torrent/torrent"
	"golang.org/x/net/proxy"
	"io"
	"log"
	"math"
	"net"
	"os"
	"protocol"
	"encoding/base64"
	"crypto/md5"
	"encoding/hex"
)

//1. 接收master命令下载种子文件
//2. 接收master命令开始bt下载
//3. 接收master命令终止bt下载

//接收master命令下载种子文件
func Receive_Torrent() {

}

//接收master命令开始bt下载
func Start_Bt() {

}

//接收master命令终止bt下载
func Stop_Bt() {

}

//发送任务状态
func handleConnection_SendStatus(conn net.Conn, mission string, status string) {

	sendstatus := "{\"Mission\":\"" + mission + "\", \"Status\":\"" + status + "\"}"
	Log(sendstatus)
	conn.Write(protocol.Enpack([]byte(sendstatus)))

	Log("Status sent.")
	//defer conn.Close()

}

func handleConnection_getMission(conn net.Conn) {

	// 缓冲区,存储被截断的数据
	tmpBuffer := make([]byte, 0)

	//接收解包
	readerChannel := make(chan []byte, 16)
	go reader(readerChannel, conn)

	buffer := make([]byte, 1024)
	for {
		n, err := conn.Read(buffer)
		if err != nil {
			if err == io.EOF {
				Log("Client disconnected.")
			} else {
				Log(conn.RemoteAddr().String(), " connection error: ", err)
			}
			return
		}

		tmpBuffer = protocol.Depack(append(tmpBuffer, buffer[:n]...), readerChannel)
	}

}

func Log(v ...interface{}) {
	fmt.Println(v...)
}

//读取channel中的消息并作出相应的操作和回应
func reader(readerChannel chan []byte, conn net.Conn) {
	for {
		select {
		case data := <-readerChannel:
			Log(string(data))
			var dat map[string]interface{}
			if err := json.Unmarshal(data, &dat); err == nil {
				if dat["Mission"].(string) == "DownloadTorrent" {
					Log("Received Mission: DownloadTorrent")
					mission := "DownloadTorrent"
					//接收种子,写入文件,校验md5
					torrentFilename := dat["TorrentFile"].(string)
					decodeBytes, err := base64.StdEncoding.DecodeString(dat["TorrentContent"].(string))
					if err != nil {
						log.Fatalln(err)
					}
					torrentMD5 := dat["TorrentMD5"].(string)
					torrentFile, err := os.OpenFile(torrentFilename, os.O_RDWR|os.O_CREATE, 0)
					if err != nil {
						log.Fatalln("Failed to open file!", err)
					}
					defer torrentFile.Close()
					torrentFile.Write(decodeBytes)
					torrentFilemd5, _ := GetFileMd5(torrentFilename)
					if torrentMD5 == torrentFilemd5 {
						status := "OK"
						Log("Torrent downloaded.")
						handleConnection_SendStatus(conn, mission, status)
					}
				}
				if dat["Mission"].(string) == "StartBT" {
					go Start_BT(dat["TorrentFile"].(string))
					Log("BT started.")
					mission := "StartBT"
					status := "OK"
					handleConnection_SendStatus(conn, mission, status)
				}
				if dat["Mission"].(string) == "StopBT" {

				}
				//if dat["Mission"] == "StartBT" {
				//	//检查BT启动情况并报告状态
				//	//checkbtstarted
				//	handleConnection_SendStatus(conn, mission, status)
				//}
				//if dat["Mission"] == "StopBT" {
				//	//检查BT停止情况并报告状态
				//	//checkbtstopped
				//	handleConnection_SendStatus(conn, mission, status)
				//}
			} else {
				fmt.Println(err, "Json parse failed!")
			}
		}
	}
}

//文件md5值计算
func GetFileMd5(filename string) (string, error) {
	file, err := os.Open(filename)
	if err != nil {
		fmt.Println("os Open error")
		return "", err
	}
	md5 := md5.New()
	_, err = io.Copy(md5, file)
	if err != nil {
		fmt.Println("io copy error")
		return "", err
	}
	md5Str := hex.EncodeToString(md5.Sum(nil))
	return md5Str, nil
}

//bt客户端
var (
	cpuprofile    = "" //If not empty, collects CPU profile samples and writes the profile to the given file before the program exits
	memprofile    = "" //If not empty, writes memory heap allocations to the given file before the program exits
	createTorrent = "" //If not empty, creates a torrent file from the given root. Writes to stdout
	createTracker = "" //Creates a tracker serving the given torrent file on the given address. Example --createTracker=:8080 to serve on port 8080.

	port                = 7778        //Port to listen on. 0 means pick random port. Note that 6881 is blacklisted by some trackers.
	fileDir             = "."         //path to directory where files are stored
	seedRatio           = math.Inf(0) //Seed until ratio >= this value before quitting.
	useDeadlockDetector = false       //Panic and print stack dumps when the program is stuck.
	useLPD              = false       //Use Local Peer Discovery
	useUPnP             = false       //Use UPnP to open port in firewall.
	useNATPMP           = false       //Use NAT-PMP to open port in firewall.
	gateway             = ""          //IP Address of gateway.
	useDHT              = false       //Use DHT to get peers.
	trackerlessMode     = false       //Do not get peers from the tracker. Good for testing DHT mode.
	proxyAddress        = ""          //Address of a SOCKS5 proxy to use.
	initialCheck        = true        //Do an initial hash check on files when adding torrents.
	useSFTP             = ""          //SFTP connection string, to store torrents over SFTP. e.g. 'username:[email protected]:22/path/'
	useRamCache         = 0           //Size in MiB of cache in ram, to reduce traffic on torrent storage.
	useHdCache          = 0           //Size in MiB of cache in OS temp directory, to reduce traffic on torrent storage.
	execOnSeeding       = ""          //Command to execute when torrent has fully downloaded and has begun seeding.
	quickResume         = false       //Save torrenting data to resume faster. '-initialCheck' should be set to false, to prevent hash check on resume.
	maxActive           = 16          //How many torrents should be active at a time. Torrents added beyond this value are queued.
	memoryPerTorrent    = -1          //Maximum memory (in MiB) per torrent used for Active Pieces. 0 means minimum. -1 (default) means unlimited.
	torrentFiles        []string
)

func parseTorrentFlags() (flags *torrent.TorrentFlags, err error) {
	dialer := proxy.FromEnvironment()

	flags = &torrent.TorrentFlags{
		Dial:                dialer,
		Port:                port,
		FileDir:             fileDir,
		SeedRatio:           seedRatio,
		UseDeadlockDetector: useDeadlockDetector,
		UseLPD:              useLPD,
		UseDHT:              useDHT,
		UseUPnP:             useUPnP,
		UseNATPMP:           useNATPMP,
		TrackerlessMode:     trackerlessMode,
		// IP address of gateway
		Gateway:            gateway,
		InitialCheck:       initialCheck,
		FileSystemProvider: torrent.OsFsProvider{},
		Cacher:             nil,
		ExecOnSeeding:      execOnSeeding,
		QuickResume:        quickResume,
		MaxActive:          maxActive,
		MemoryPerTorrent:   memoryPerTorrent,
	}
	return
}

//bt启动函数
func Start_BT(torrentFile string) {
	torrentFiles = []string{torrentFile}

	torrentFlags, err := parseTorrentFlags()
	if err != nil {
		log.Fatal("Could not parse flags:", err)
	}

	log.Println("Starting.")

	err = torrent.RunTorrents(torrentFlags, torrentFiles)
	if err != nil {
		log.Fatal("Could not run torrents", err)
	}

}

//main函数
func main() {
	server := "10.9.3.132:61111"
	tcpAddr, err := net.resolveTCPAddr("tcp4", server)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
		os.Exit(1)
	}

	conn, err := net.DialTCP("tcp", nil, tcpAddr)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
		os.Exit(1)
	}

	fmt.Println("connect success")

	handleConnection_getMission(conn)
}

自定义protocol模块源码

//通讯协议处理
package protocol

import (
	"bytes"
	"encoding/binary"
)

const (
	ConstHeader       = "headers"
	ConstHeaderLength = 7
	ConstMLength      = 4
)

//封包
func Enpack(message []byte) []byte {
	return append(append([]byte(ConstHeader), IntToBytes(len(message))...), message...)
}

//解包
func Depack(buffer []byte, readerChannel chan []byte) []byte {
	length := len(buffer)

	var i int
	for i = 0; i < length; i = i + 1 {
		if length < i+ConstHeaderLength+ConstMLength {
			break
		}
		if string(buffer[i:i+ConstHeaderLength]) == ConstHeader {
			messageLength := BytesToInt(buffer[i+ConstHeaderLength : i+ConstHeaderLength+ConstMLength])
			if length < i+ConstHeaderLength+ConstMLength+messageLength {
				break
			}
			data := buffer[i+ConstHeaderLength+ConstMLength : i+ConstHeaderLength+ConstMLength+messageLength]
			readerChannel <- data

		}
	}

	if i == length {
		return make([]byte, 0)
	}
	return buffer[i:]
}

//整形转换成字节
func IntToBytes(n int) []byte {
	x := int32(n)

	bytesBuffer := bytes.NewBuffer([]byte{})
	binary.Write(bytesBuffer, binary.BigEndian, x)
	return bytesBuffer.Bytes()
}

//字节转换成整形
func BytesToInt(b []byte) int {
	bytesBuffer := bytes.NewBuffer(b)

	var x int32
	binary.Read(bytesBuffer, binary.BigEndian, &x)

	return int(x)
}

相关阅读

hiberfil文件的删除

系统老是提示系统内部组件禁止了休眠功能,百度查不到资料,我在无意中下载了最新的系统组件和更新了驱动,有了睡眠和休眠功能。打开cm

HTML引入外部JS文件的方法

在引入外部JS文件的情况下,不能在<script></script>之间插入代码,插入的代码不执行,只执行引入的外部文件。 attack.html 代码: <!D

你所知道 &不知道的「文件上传」

好久没有动笔了,久到都快要忘记文字应该怎么写、开头怎么写。最近做完了一个内部系统,趁下一个还在路上,总结一下在这个系统里遇到的

jQuery插件AjaxFileUpload实现ajax文件上传

jQuery插件AjaxFileUpload用来实现ajax文件上传,该插件使用非常简单,接下来写个demo演示怎么用AjaxFileUpload插件实现文件上传。 1

EPS基本文件格式

1. Required DSC Header Comments 有两条注释是必需的 %!PS-Adobe-3.0 EPSF-3.0 %%BoundingBox: llx lly urx ury 第一行注

分享到:

栏目导航

推荐阅读

热门阅读