導入(
“上下文”
"錯誤"
“時間”
net " GX/ipfs/qmpjvxtpvh 8 qjyqdnxnsxf 9kv 9 jezkd 1 kozz 1 hs 3 fcgsnh/go-lib P2P-net "
MANET " GX/ipfs/qmv 6 fjemm 1k 8 0x Jr vuq 3 wuvwwou 2 tldpmnnkrxhzy 3v 6 ai/go-multi addr-net "
ma " GX/ipfs/qmymsdtj 3 hsodkepe 3 eu 3 tscap 2 yvpzj 4 loxnnkde 5 tpt 7/go-multi addr "
pro " GX/ipfs/qmznkthpqfvxs 9 gnbexprfbbxslnyekre 7 jwfm 2 oqhbyqn/go-libp2p-protocol "
p store " GX/ipfs/qmzr 2 xwvvbcbtbgbwnqhwk 2 xcqfar 3 w8 faqpriaaj 7 RSR/go-libp2p-peer store "
p2phost " GX/ipfs/qmb 8t 6 ybbsjysvgfrihqlfcjvcezznnesbqbkkyebwdjge/go-libp2p-host "
peer " GX/ipfs/qmdvrmn 1 lhb 4 ybb 8 hmvamlxna 8 xrsewmnk 6 yqxkxotcrvn/go-lib P2P-peer "
)
//P2P結構保存當前運行的流/監聽器的信息。
// P2P結構保存當前運行的流/偵聽器的信息
P2P結構類型{
//監聽器
ListenerRegistry註冊表
//數據流
流流註冊表
//節點ID
身份對等。身份證明
//節點地址
peerHost p2phost。主持
//線程安全對等節點存儲
peerstore pstore。皮爾斯托爾
}
//創建新的p2p結構
// NewP2P創建新的P2P結構
//這個新的p2p結構不包含p2p結構中的偵聽器和數據流。
func NewP2P(身份對等。ID,peerHost p2phost。主持人peerstore pstore。Peerstore) *P2P {
返回& ampP2P{
身份:身份,
皮爾霍斯特:皮爾霍斯特,
peerstore: peerstore,
}
}
//創建新的數據流工具方法,用節點id、內容和協議構建流。
func (p2p P2P) newStreamTo(ctx2上下文。上下文,p對等。ID,協議字符串)(net。流,錯誤){
//30s後會自動超時。
ctx,取消:=上下文。WithTimeout(ctx2,time。秒30) //TODO:可配置?
推遲取消()
err := p2p.peerHost.Connect(ctx,pstore。PeerInfo{ID: p})
如果err!=零{
返回零,錯誤
}
返回p2p.peerHost.NewStream(ctx2,p,pro。ID(協議))
}
//對話為遠程偵聽器創建新的P2P流。
//創建壹個新的p2p流來監視會話。
// Dial為遠程偵聽器創建新的P2P流
//Multiaddr是跨協議、跨平臺的互聯網地址。它強調清晰和自我描述。
//內部接收
func (p2p P2P)撥號(ctx上下文。上下文,地址ma。多地址,對等。ID,原型字符串,bindAddr ma。Multiaddr) ( ListenerInfo,error) {
//獲取壹些節點信息網絡,主機,nil。
lnet,_,err := manet。DialArgs(bindAddr)
如果err!=零{
返回零,錯誤
}
//監控信息
listenerInfo := ListenerInfo{
//節點標識
身份:p2p.identity,
////應用程序協議標識符。
協議:proto,
}
//調用newStreamTo通過ctx (content) peer (node id) proto(協議標識符)參數獲取新的數據流。
remote,err := p2p.newStreamTo(ctx,peer,proto)
如果err!=零{
返回零,錯誤
}
//網絡協議標識
交換機lnet {
//網絡是“TCP”、“TCP 4”、“TCP 6”。
案例“tcp”、“tcp4”、“tcp6”:
//從監聽器nla獲取新信息。偵聽器,零
監聽器,err := manet。聽(bindAddr)
如果err!=零{
如果err2 := remote。reset();err2!=零{
返回零,錯誤2
}
返回零,錯誤
}
//將獲取的新信息保存到listenerInfo。
listenerInfo。地址=監聽器。多地址()
listenerInfo。更接近=傾聽者
listenerInfo。運行=真
//開放接受
go P2P . do accept(& amp;listenerInfo,remote,listener)
默認值:
返回零,錯誤。新("不支持的協議:"+ lnet)
}
返回& amplistenerInfo,零
}
//
func(P2P * P2P)do accept(listener info * listener info,remote net。流,聽者馬奈。聽眾){
//關閉偵聽器並刪除流處理程序。
推遲收聽者。關閉()
//返回多地址友好連接
//具有良好多地址連接的接口
本地,錯誤:=偵聽器。接受()
如果err!=零{
返回
}
stream := StreamInfo{
//連接協議
協議:listenerInfo。協議,
//定位節點
LocalPeer: listenerInfo。身份,
//定位節點地址
LocalAddr: listenerInfo。地址,
//遠程節點
RemotePeer:遠程。連接()。RemotePeer(),
//遠程節點地址
RemoteAddr: remote。連接()。RemoteMultiaddr(),
//定位
本地:本地,
//遠程
遠程:遠程,
//註冊碼
註冊表:& ampp2p。溪流,
}
//註冊連接信息
p2p。註冊(& amp流)
//打開節點廣播
stream.startStreaming()
}
//Listener將流處理程序包裝到偵聽器中。
//偵聽器將流處理程序包裝到偵聽器中
類型偵聽器接口{
Accept() (net。流,錯誤)
Close()錯誤
}
//P2PListener保存有關偵聽器的信息。
// P2PListener保存有關偵聽器的信息
P2PListener結構標牌
peerHost p2phost。主持
海螺饞網。溪流
proto pro。身份證明
ctx上下文。語境
取消函數()
}
//等待偵聽器連接。
// Accept等待來自偵聽器的連接
func(il * P2PListener)Accept()(net。流,錯誤){
選擇{
案例c:= & lt;-il.conCh:
返回c,零
案例& lt-il.ctx.Done():
return nil,il.ctx.Err()
}
}
//關閉偵聽器並刪除流處理程序。
// Close關閉偵聽器並移除流處理程序
func (il *P2PListener) Close()錯誤{
il.cancel()
il . peer host . removestreamhandler(il . proto)
返回零
}
// Listen創建壹個新的P2PListener。
// Listen創建新的P2PListener
func(P2P P2P)registerStreamHandler(CT x2上下文。上下文,協議字符串)(P2PListener,錯誤){
ctx,取消:=上下文。帶取消(ctx2)
列表:= & ampP2PListener{
peerHost: p2p.peerHost,
proto: pro。ID(協議),
海螺:做(饞網。流),
ctx: ctx,
取消:取消,
}
P2P . peer host . setstreamhandler(list . proto,func(s net。流){
選擇{
案例列表. conch & lt;- s:
案例& ltctx。完成():
南重置()
}
})
返回列表,零
}
// NewListener創建新的p2p偵聽器。
// NewListener創建新的p2p偵聽器
//外部廣播
func (p2p P2P) NewListener(ctx上下文。上下文,原型字符串,地址ma。Multiaddr) ( ListenerInfo,error) {
//調用registerStreamHandler構造新的偵聽器。
listener,err:= P2P . registerstreamhandler(CTX,proto)
如果err!=零{
返回零,錯誤
}
//構造壹個新的listenerInfo
listenerInfo := ListenerInfo{
身份:p2p.identity,
協議:proto,
地址:addr,
親近者:傾聽者,
跑步:真的,
註冊表:& ampp2p。聽眾們,
}
go P2P . accept streams(& amp;listenerInfo,監聽器)
//註冊連接信息
p2p。聽眾。註冊(& amplistenerInfo)
返回& amplistenerInfo,零
}
//接受流
func(P2P * P2P)accept streams(Listener info * Listener info,listener Listener) {
對於listenerInfo。跑步{
//壹個具有良好遠程連接的
遠程,錯誤:=監聽器。接受()
如果err!=零{
聽眾。關閉()
破裂
}
}
//取消註冊表中的p2p監聽器
p2p。listeners . de register(listener info。協議)
}
// CheckProtoExists檢查協議處理程序是否已註冊。
//多路復用處理程序
// CheckProtoExists檢查協議處理程序是否註冊到
//多路復用處理程序
func(P2P * P2P)check proto exists(proto string)bool {
protos := p2p.peerHost.Mux()。協議()
for _,p := range protos {
如果p!=原型
繼續
}
返回true
}
返回false
}