Commits

wjzz committed 9b58e12 Draft

Made the peer loop MUCH less CPU consuming.

  • Participants
  • Parent commits 2a5097e

Comments (0)

Files changed (4)

Erlang/metadata.erl

 
 %% single file example
 ex() ->
-    {ok, Content} = file:read_file("data/java.torrent"),
+    {ok, Content} = file:read_file("data/bb.torrent"),
+    %% {ok, Content} = file:read_file("data/java.torrent"),
 %%    {ok, Content} = file:read_file("data/immortal.torrent"),
     Content.
 
                           my_status = idle
                          },
     Pid = spawn(fun () ->
+                        %% this self must be inside
+                        scheduler:schedule_peer(self()),
                         requester_loop(InitialState)
                 end),
     Pid.
 
 
 requester_loop(State) ->                      
-    Timeout = State#state.timeout,
+    %%Timeout = State#state.timeout,
     receive
         Msg -> 
             NewState = handle_msg(Msg, State),
             requester_loop(NewState)
-    after Timeout ->
-            NewState = make_requests(State),
-            requester_loop(NewState)
     end.
+    %% after Timeout ->
+    %%         NewState = make_requests(State),
+    %%         requester_loop(NewState)
+    %% end.
 
 
 %% handle incoming mesages
 
+handle_msg(rechedule, State) ->
+    scheduler:schedule_peer(self()),
+    State;
+
+handle_msg({pieces_available, Sender}, State) ->
+    Available = State#state.available_pieces,
+    Sender ! {pieces_available, Available},
+    State;
+
+
+handle_msg({pieces_bitvector, Pieces}, State) ->
+    %% send interested
+    Socket = State#state.socket,
+    ok = gen_tcp:send(Socket, <<0,0,0,1,2>>),
+
+    self() ! rechedule,
+    State#state{available_pieces = Pieces};
+
+handle_msg({piece_have, PieceNo}, State) ->
+    Available = State#state.available_pieces,
+
+    self() ! rechedule,
+    State#state{available_pieces = [PieceNo|Available]};
+
+
 handle_msg({add_to_request_queue, PieceNo}, State) ->
     %%to_be_scheduled = State#state.my_status,
 
     %% io:format("Got scheduled #~p; [~p]~n",
     %%           [PieceNo, State#state.peer]),
-
-    RequestQueue = State#state.request_queue,
-    NewQueue = [PieceNo|RequestQueue],
-    State#state{request_queue = NewQueue,
-                my_status = scheduled
-               };
+    Timeout = State#state.timeout,
+    case Timeout of
+        infinity ->
+            RequestQueue = State#state.request_queue,
+            NewQueue = [PieceNo|RequestQueue],
+            State#state{request_queue = NewQueue,
+                        my_status = scheduled
+                       };
+        0 ->
+            request_piece(PieceNo, State)
+    end;
 
 handle_msg({piece_completed, PieceNo}, State) ->
     {Host, _} = State#state.peer,
     %%             requested_pieces = [] % Better: remove PieceNo from Waiting
     %%            };
 
-handle_msg({pieces_available, Sender}, State) ->
-    Available = State#state.available_pieces,
-    Sender ! {pieces_available, Available},
-    State;
-
-handle_msg({pieces_bitvector, Pieces}, State) ->
-    %% send interested
-    Socket = State#state.socket,
-    ok = gen_tcp:send(Socket, <<0,0,0,1,2>>),
-
-    State#state{available_pieces = Pieces};
-
-handle_msg({piece_have, PieceNo}, State) ->
-    Available = State#state.available_pieces,
-    State#state{available_pieces = [PieceNo|Available]};
-
 handle_msg({status, choke}, State) ->
     %% we will have to wait for 'unchoke'
     State#state{timeout = infinity};
 
 handle_msg({status, unchoke}, State) ->
+    RequestQueue = State#state.request_queue,
+    NewState = case RequestQueue of
+                   [] -> 
+                       State;
+        
+                   [PieceNo|NewQueue] ->
+                       State1 = State#state{request_queue = NewQueue},
+                       request_piece(PieceNo, State1)
+               end,
+
     %% we can make requests
-    State#state{timeout = 0};
+    NewState#state{timeout = 0};
 
 handle_msg({status, _Interest}, State) ->
     State.
 
-%% send requests and respond to requests from other peers
-
-make_requests(State) ->
+request_piece(PartNo, State) ->
     MetaData     = State#state.meta_data,
     Socket       = State#state.socket,
     Peer         = State#state.peer,
     Waiting      = State#state.requested_pieces,
-    RequestQueue = State#state.request_queue,
-    MyStatus     = State#state.my_status,
+    %%MyStatus     = State#state.my_status,
 
-    case MyStatus of
 
-        idle ->
-            [] = RequestQueue,
-            scheduler:schedule_peer(self()),
-            State#state{my_status = to_be_scheduled};
+  PieceSize = metadata:piece_size(MetaData, PartNo),
 
-        to_be_scheduled ->
-            %% Wait for scheduler msg
-            State;
+  Request = build_whole_part_request(PieceSize, PartNo),
+  ok = gen_tcp:send(Socket, Request), 
 
-        waiting ->
-            %% Wait for 'completed' msg
-            State;
+  {Host,_} = Peer,
+  io:format("Request #~p from ~s~n", [PartNo, pretty_ip(Host)]),
+                      
+  State#state{ requested_pieces = [PartNo | Waiting],
+               my_status = waiting
+             }.
 
-        scheduled ->
-            [PartNo|RemainingRequests] = RequestQueue,
-            PieceSize = metadata:piece_size(MetaData, PartNo),
-            %% RegularPieceSize = metadata:piece_size(MetaData),
-            %% io:format("#~p: PieceSize =~p~n", [PartNo, PieceSize]),
-            %% io:format("RegularSize = ~p~n", [RegularPieceSize]),
 
+%% send requests and respond to requests from other peers
 
-            Request = build_whole_part_request(PieceSize, PartNo),
-            ok = gen_tcp:send(Socket, Request), 
+%% make_requests(State) ->
+%%     MetaData     = State#state.meta_data,
+%%     Socket       = State#state.socket,
+%%     Peer         = State#state.peer,
+%%     Waiting      = State#state.requested_pieces,
+%%     RequestQueue = State#state.request_queue,
+%%     MyStatus     = State#state.my_status,
 
-            {Host,_} = Peer,
-            io:format("Request #~p from ~s~n", [PartNo, pretty_ip(Host)]),
+%%     case MyStatus of
+
+%%         idle ->
+%%             [] = RequestQueue,
+%%             scheduler:schedule_peer(self()),
+%%             State#state{my_status = to_be_scheduled};
+
+%%         to_be_scheduled ->
+%%             %% Wait for scheduler msg
+%%             State;
+
+%%         waiting ->
+%%             %% Wait for 'completed' msg
+%%             State;
+
+%%         scheduled ->
+%%             [PartNo|RemainingRequests] = RequestQueue,
+%%             PieceSize = metadata:piece_size(MetaData, PartNo),
+%%             %% RegularPieceSize = metadata:piece_size(MetaData),
+%%             %% io:format("#~p: PieceSize =~p~n", [PartNo, PieceSize]),
+%%             %% io:format("RegularSize = ~p~n", [RegularPieceSize]),
+
+
+%%             Request = build_whole_part_request(PieceSize, PartNo),
+%%             ok = gen_tcp:send(Socket, Request), 
+
+%%             {Host,_} = Peer,
+%%             io:format("Request #~p from ~s~n", [PartNo, pretty_ip(Host)]),
                       
-            State#state{ request_queue = RemainingRequests,
-                         requested_pieces = [PartNo | Waiting],
-                         my_status = waiting
-                       }
-    end.
+%%             State#state{ request_queue = RemainingRequests,
+%%                          requested_pieces = [PartNo | Waiting],
+%%                          my_status = waiting
+%%                        }
+%%     end.
 
 %%%===================================================================
 %%% Other functions

Erlang/pieces.erl

 
 handle_cast({downloaded_block, {_Peer, PeerPid}, {PartNo, BlockOffset, Data}}, State) ->
     BlockNo = BlockOffset div ?SIZE,
-    io:format("Got #~p/~p.~n", [PartNo, BlockNo]),
+    %% io:format("Got #~p/~p.~n", [PartNo, BlockNo]),
 
     PieceDict = State#state.piece_dict,
     case dict:find({PartNo, PeerPid}, PieceDict) of

Erlang/scheduler.erl

 
 handle_cast({add_peer, PeerPid}, State) ->
     %% Add the peer to the peer pool
-    NewState = schedule_peer(PeerPid, State),
+    NewState = try_schedule_peer(PeerPid, State),
     {noreply, NewState};
 
 handle_cast({schedule_peer, PeerPid}, State) ->
-    NewState = schedule_peer(PeerPid, State),
+    NewState = try_schedule_peer(PeerPid, State),
     {noreply, NewState};
 
 handle_cast({wrong_hash, PartNo}, State) ->
 %%%===================================================================
 
 %%--------------------------------------------------------------------
+%% @doc Schedules a piece for the peer unless he is already scheduled.
+%%
+%% @end
+%%--------------------------------------------------------------------
+
+try_schedule_peer(PeerPid, State) ->
+    Scheduled = State#state.scheduled_pieces,
+    List = dict:to_list(Scheduled),
+    case lists:keymember(PeerPid, 2, List) of
+        true ->
+            State;
+        false ->
+            schedule_peer(PeerPid, State)
+    end.    
+
+%%--------------------------------------------------------------------
 %% @doc Schedules a piece for the peer to request next.
 %%
 %% @spec schedule_peer(PeerPid, State) -> State1